实现 Spring Boot Logstash 日志收集

泥瓦匠BYSocket

共 10707字,需浏览 22分钟

 · 2020-11-08

点击上面,关注:“锅外的大佬”

专注分享国外最新技术内容

帮助每位开发者更优秀地成长

在本文中,我将介绍我的日志库,专门用于 SpringBootRESTfulWeb应用程序。关于这个库的主要设想是:

  • 使用完整正文记录所有传入的HTTP请求和传出的HTTP响应

  • 使用 logstash-logback-encoder库和 Logstash与 ElasticStack集成

  • 对于 RestTemplateOpenFeign,记录所有可能发生的日志

  • 在单个API端点调用中跨所有通信生成和传递关联Id(correlationId)

  • 计算和存储每个请求的执行时间

  • 可自动配置的库——除了引入依赖项之外,不必执行任何操作,就能正常工作

1.简述

我想在阅读了文章的前言后,你可能会问为什么我决定构建一个 SpringBoot已有功能的库。但问题是它真的具有这些功能?你可能会感到惊讶,因为答案是否定的。虽然可以使用一些内置的 Spring组件例如 CommonsRequestLoggingFilter轻松地记录 HTTP请求,但是没有任何用于记录响应主体(response body)的开箱即用机制。

当然你可以基于Spring HTTP拦截器(HandlerInterceptorAdapter)或过滤器(OncePerRequestFilter)实现自定义解决方案,但这并没有你想的那么简单。

第二种选择是使用 ZalandoLogbook,它是一个可扩展的Java库,可以为不同的客户端和服务器端技术启用完整的请求和响应日志记录。这是一个非常有趣的库,专门用于记录HTTP请求和响应,它提供了许多自定义选项并支持不同的客户端。

因此,为了更高级,你可以始终使用此库。我的目标是创建一个简单的库,它不仅记录请求和响应,还提供自动配置,以便将这些日志发送到 Logstash并关联它们。它还会自动生成一些有价值的统计信息,例如请求处理时间。所有这些值都应该发送到 Logstash。我们继续往下看。

2.实现

从依赖开始吧。我们需要一些基本的Spring库,它们包含 spring-web, spring-context在内,并提供了一些额外的注解。为了与 Logstash集成,我们使用 logstash-logback-encoder库。 Slf4j包含用于日志记录的抽象,而 javax.servlet-api用于HTTP通信。 CommonsIO不是必需的,但它提供了一些操作输入和输出流的有用方法。

  1. 11

  2. 2.6

  3. 4.0.1

  4. 5.3

  5. 5.1.6.RELEASE

  6. 1.7.26

  7. org.springframework

  8. spring-context

  9. ${spring.version}

  10. org.springframework

  11. spring-web

  12. ${spring.version}

  13. net.logstash.logback

  14. logstash-logback-encoder

  15. ${logstash-logback.version}

  16. javax.servlet

  17. javax.servlet-api

  18. ${javax-servlet.version}

  19. provided

  20. commons-io

  21. commons-io

  22. ${commons-io.version}

  23. org.slf4j

  24. slf4j-api

  25. ${slf4j.version}

第一步是实现HTTP请求和响应包装器。我们必须这样做,因为无法读取HTTP流两次。如果想记录请求或响应正文,在处理输入流或将其返回给客户端之前,首先必须读取输入流。

Spring提供了HTTP请求和响应包装器的实现,但由于未知原因,它们仅支持某些特定用例,如内容类型 application/x-www-form-urlencoded。因为我们通常在RESTful应用程序之间的通信中使用 aplication/json内容类型,所以Spring ContentCachingRequestWrapper和 ContentCachingResponseWrapper在这没什么用。 

这是我的HTTP请求包装器的实现,可以通过各种方式完成。这只是其中之一:

  1. public class SpringRequestWrapper extends HttpServletRequestWrapper {


  2. private byte[] body;


  3. public SpringRequestWrapper(HttpServletRequest request) {

  4. super(request);

  5. try {

  6. body = IOUtils.toByteArray(request.getInputStream());

  7. } catch (IOException ex) {

  8. body = new byte[0];

  9. }

  10. }


  11. @Override

  12. public ServletInputStream getInputStream() throws IOException {

  13. return new ServletInputStream() {

  14. public boolean isFinished() {

  15. return false;

  16. }


  17. public boolean isReady() {

  18. return true;

  19. }


  20. public void setReadListener(ReadListener readListener) {


  21. }


  22. ByteArrayInputStream byteArray = new ByteArrayInputStream(body);


  23. @Override

  24. public int read() throws IOException {

  25. return byteArray.read();

  26. }

  27. };

  28. }

  29. }

输出流必须做同样的事情,这个实现有点复杂:

  1. public class SpringResponseWrapper extends HttpServletResponseWrapper {


  2. private ServletOutputStream outputStream;

  3. private PrintWriter writer;

  4. private ServletOutputStreamWrapper copier;


  5. public SpringResponseWrapper(HttpServletResponse response) throws IOException {

  6. super(response);

  7. }


  8. @Override

  9. public ServletOutputStream getOutputStream() throws IOException {

  10. if (writer != null) {

  11. throw new IllegalStateException("getWriter() has already been called on this response.");

  12. }


  13. if (outputStream == null) {

  14. outputStream = getResponse().getOutputStream();

  15. copier = new ServletOutputStreamWrapper(outputStream);

  16. }


  17. return copier;

  18. }


  19. @Override

  20. public PrintWriter getWriter() throws IOException {

  21. if (outputStream != null) {

  22. throw new IllegalStateException("getOutputStream() has already been called on this response.");

  23. }


  24. if (writer == null) {

  25. copier = new ServletOutputStreamWrapper(getResponse().getOutputStream());

  26. writer = new PrintWriter(new OutputStreamWriter(copier, getResponse().getCharacterEncoding()), true);

  27. }


  28. return writer;

  29. }


  30. @Override

  31. public void flushBuffer() throws IOException {

  32. if (writer != null) {

  33. writer.flush();

  34. }

  35. else if (outputStream != null) {

  36. copier.flush();

  37. }

  38. }


  39. public byte[] getContentAsByteArray() {

  40. if (copier != null) {

  41. return copier.getCopy();

  42. }

  43. else {

  44. return new byte[0];

  45. }

  46. }


  47. }

我将 ServletOutputStream包装器实现放到另一个类中:

  1. public class ServletOutputStreamWrapper extends ServletOutputStream {


  2. private OutputStream outputStream;

  3. private ByteArrayOutputStream copy;


  4. public ServletOutputStreamWrapper(OutputStream outputStream) {

  5. this.outputStream = outputStream;

  6. this.copy = new ByteArrayOutputStream();

  7. }


  8. @Override

  9. public void write(int b) throws IOException {

  10. outputStream.write(b);

  11. copy.write(b);

  12. }


  13. public byte[] getCopy() {

  14. return copy.toByteArray();

  15. }


  16. @Override

  17. public boolean isReady() {

  18. return true;

  19. }


  20. @Override

  21. public void setWriteListener(WriteListener writeListener) {


  22. }

  23. }

因为我们需要在处理之前包装HTTP请求流和响应流,所以我们应该使用HTTP过滤器。Spring提供了自己的HTTP过滤器实现。

我们的过滤器扩展了它,并使用自定义请求和响应包装来记录有效负载。此外,它还生成和设置 X-Request-ID, X-Correlation-ID header和请求处理时间。

  1. public class SpringLoggingFilter extends OncePerRequestFilter {


  2. private static final Logger LOGGER = LoggerFactory.getLogger(SpringLoggingFilter.class);

  3. private UniqueIDGenerator generator;


  4. public SpringLoggingFilter(UniqueIDGenerator generator) {

  5. this.generator = generator;

  6. }


  7. protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws ServletException, IOException {

  8. generator.generateAndSetMDC(request);

  9. final long startTime = System.currentTimeMillis();

  10. final SpringRequestWrapper wrappedRequest = new SpringRequestWrapper(request);

  11. LOGGER.info("Request: method={}, uri={}, payload={}", wrappedRequest.getMethod(),

  12. wrappedRequest.getRequestURI(), IOUtils.toString(wrappedRequest.getInputStream(),

  13. wrappedRequest.getCharacterEncoding()));

  14. final SpringResponseWrapper wrappedResponse = new SpringResponseWrapper(response);

  15. wrappedResponse.setHeader("X-Request-ID", MDC.get("X-Request-ID"));

  16. wrappedResponse.setHeader("X-Correlation-ID", MDC.get("X-Correlation-ID"));

  17. chain.doFilter(wrappedRequest, wrappedResponse);

  18. final long duration = System.currentTimeMillis() - startTime;

  19. LOGGER.info("Response({} ms): status={}, payload={}", value("X-Response-Time", duration),

  20. value("X-Response-Status", wrappedResponse.getStatus()),

  21. IOUtils.toString(wrappedResponse.getContentAsByteArray(), wrappedResponse.getCharacterEncoding()));

  22. }

  23. }

3.自动配置

完成包装器和HTTP过滤器的实现后,我们可以为库准备自动配置。第一步是创建 @Configuration包含所有必需的bean。

我们必须注册自定义HTTP过滤器 SpringLoggingFilter,以及用于与 Logstash和 RestTemplateHTTP客户端拦截器集成的 logger appender

  1. @Configuration

  2. public class SpringLoggingAutoConfiguration {


  3. private static final String LOGSTASH_APPENDER_NAME = "LOGSTASH";


  4. @Value("${spring.logstash.url:localhost:8500}")

  5. String url;

  6. @Value("${spring.application.name:-}")

  7. String name;


  8. @Bean

  9. public UniqueIDGenerator generator() {

  10. return new UniqueIDGenerator();

  11. }


  12. @Bean

  13. public SpringLoggingFilter loggingFilter() {

  14. return new SpringLoggingFilter(generator());

  15. }


  16. @Bean

  17. public RestTemplate restTemplate() {

  18. RestTemplate restTemplate = new RestTemplate();

  19. List<ClientHttpRequestInterceptor> interceptorList = new ArrayList<ClientHttpRequestInterceptor>();

  20. restTemplate.setInterceptors(interceptorList);

  21. return restTemplate;

  22. }


  23. @Bean

  24. public LogstashTcpSocketAppender logstashAppender() {

  25. LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();

  26. LogstashTcpSocketAppender logstashTcpSocketAppender = new LogstashTcpSocketAppender();

  27. logstashTcpSocketAppender.setName(LOGSTASH_APPENDER_NAME);

  28. logstashTcpSocketAppender.setContext(loggerContext);

  29. logstashTcpSocketAppender.addDestination(url);

  30. LogstashEncoder encoder = new LogstashEncoder();

  31. encoder.setContext(loggerContext);

  32. encoder.setIncludeContext(true);

  33. encoder.setCustomFields("{\"appname\":\"" + name + "\"}");

  34. encoder.start();

  35. logstashTcpSocketAppender.setEncoder(encoder);

  36. logstashTcpSocketAppender.start();

  37. loggerContext.getLogger(Logger.ROOT_LOGGER_NAME).addAppender(logstashTcpSocketAppender);

  38. return logstashTcpSocketAppender;

  39. }


  40. }

库中的配置集合由 SpringBoot加载。 SpringBoot会检查已发布 jar中是否存在 META-INF/spring.factories文件。

该文件应列出key等于 EnableAutoConfiguration的配置类:

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\

  2. pl.piomin.logging.config.SpringLoggingAutoConfiguration

4.与Logstash集成

通过自动配置的日志记录追加器(logging appender)实现与 Logstash集成。

我们可以通过在 application.yml文件中设置属性 spring.logstash.url来覆盖 Logstash目标URL:

  1. spring:

  2. application:

  3. name: sample-app

  4. logstash:

  5. url: 192.168.99.100:5000

要在应用程序中启用本文中描述的所有功能,只需要将我的库包含在依赖项中:

  1. pl.piomin

  2. spring-boot-logging

  3. 1.0-SNAPSHOT

在运行应用程序之前,您应该在计算机上启动 ElasticStack。最好的方法是通过 Docker容器。但首先要创建 Docker网络,以通过容器名称启用容器之间的通信。

  1. $ docker network create es

现在,在端口9200启动 Elasticsearch的单个节点实例,我使用版本为 6.7.2的Elastic Stack工具:

  1. $ docker run -d --name elasticsearch --net es -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:6.7.2

运行 Logstash时,需要提供包含输入和输出定义的其他配置。我将使用JSON编解码器启动TCP输入,默认情况下不启用。 Elasticsearch URL设置为输出。它还将创建一个包含应用程序名称的索引。

  1. input {

  2. tcp {

  3. port => 5000

  4. codec => json

  5. }

  6. }

  7. output {

  8. elasticsearch {

  9. hosts => ["http://elasticsearch:9200"]

  10. index => "micro-%{appname}"

  11. }

  12. }

现在我们可以使用 Docker容器启动 Logstash。它在端口5000上公开并从 logstash.conf文件中读取配置:

  1. docker run -d --name logstash --net es -p 5000:5000 -v ~/logstash.conf:/usr/share/logstash/pipeline/logstash.conf docker.elastic.co/logstash/logstash:6.7.2

最后,我们可以运行仅用于显示日志的 Kibana:

  1. $ docker run -d --name kibana --net es -e "ELASTICSEARCH_URL=http://elasticsearch:9200" -p 5601:5601 docker.elastic.co/kibana/kibana:6.7.2

启动使用 spring-boot-logging库的示例应用程序后, POST请求中的日志将显示在 Kibana中,如下所示: 

响应日志每个条目都包含 X-Correlation-ID, X-Request-ID, X-Response-Time和 X-Response-Status头。

5.摘要

Springlogging library库可以在我的GitHub地址:https://github.com/piomin/spring-boot-logging.git中找到。我还在努力,所以非常欢迎任何反馈或建议。该库专用于基于微服务的体系结构,您的应用程序可以在容器内的许多实例中启动。

在此模型中,将日志存储在文件中没有任何意义。这就是为什么与 ElasticStack集成非常重要的原因。 但是这个库最重要的特性是将HTTP请求/响应与完整正文和一些附加信息记录到此日志中,如相关ID或请求处理时间。库非常精简,包含在应用程序之后,所有都是开箱即用的。

【墙裂推荐】

专注于「开发者」综合成长的深度星球
限时优惠进行中

最近热门内容回顾   #技术人系列



下方二维码关注我

互联网草根,坚持分享技术创业产品心得和总结~



点击“阅读原文”,领取 2020 年最新免费技术资料大全

↓↓↓ 
浏览 33
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报