实现 Spring Boot Logstash 日志收集
点击上面,关注:“锅外的大佬”
专注分享国外最新技术内容
帮助每位开发者更优秀地成长
在本文中,我将介绍我的日志库,专门用于 SpringBootRESTfulWeb
应用程序。关于这个库的主要设想是:
使用完整正文记录所有传入的HTTP请求和传出的HTTP响应
使用
logstash-logback-encoder
库和Logstash
与ElasticStack
集成对于
RestTemplate和OpenFeign
,记录所有可能发生的日志在单个API端点调用中跨所有通信生成和传递关联Id(correlationId)
计算和存储每个请求的执行时间
可自动配置的库——除了引入依赖项之外,不必执行任何操作,就能正常工作
1.简述
我想在阅读了文章的前言后,你可能会问为什么我决定构建一个 SpringBoot
已有功能的库。但问题是它真的具有这些功能?你可能会感到惊讶,因为答案是否定的。虽然可以使用一些内置的 Spring
组件例如 CommonsRequestLoggingFilter
轻松地记录 HTTP
请求,但是没有任何用于记录响应主体(response body)的开箱即用机制。
当然你可以基于Spring HTTP拦截器(HandlerInterceptorAdapter)或过滤器(OncePerRequestFilter)实现自定义解决方案,但这并没有你想的那么简单。
第二种选择是使用 ZalandoLogbook
,它是一个可扩展的Java库,可以为不同的客户端和服务器端技术启用完整的请求和响应日志记录。这是一个非常有趣的库,专门用于记录HTTP请求和响应,它提供了许多自定义选项并支持不同的客户端。
因此,为了更高级,你可以始终使用此库。我的目标是创建一个简单的库,它不仅记录请求和响应,还提供自动配置,以便将这些日志发送到 Logstash
并关联它们。它还会自动生成一些有价值的统计信息,例如请求处理时间。所有这些值都应该发送到 Logstash
。我们继续往下看。
2.实现
从依赖开始吧。我们需要一些基本的Spring库,它们包含 spring-web
, spring-context
在内,并提供了一些额外的注解。为了与 Logstash
集成,我们使用 logstash-logback-encoder
库。 Slf4j
包含用于日志记录的抽象,而 javax.servlet-api
用于HTTP通信。 CommonsIO
不是必需的,但它提供了一些操作输入和输出流的有用方法。
11
2.6
4.0.1
5.3
5.1.6.RELEASE
1.7.26
org.springframework
spring-context
${spring.version}
org.springframework
spring-web
${spring.version}
net.logstash.logback
logstash-logback-encoder
${logstash-logback.version}
javax.servlet
javax.servlet-api
${javax-servlet.version}
provided
commons-io
commons-io
${commons-io.version}
org.slf4j
slf4j-api
${slf4j.version}
第一步是实现HTTP请求和响应包装器。我们必须这样做,因为无法读取HTTP流两次。如果想记录请求或响应正文,在处理输入流或将其返回给客户端之前,首先必须读取输入流。
Spring提供了HTTP请求和响应包装器的实现,但由于未知原因,它们仅支持某些特定用例,如内容类型 application/x-www-form-urlencoded
。因为我们通常在RESTful应用程序之间的通信中使用 aplication/json
内容类型,所以Spring ContentCachingRequestWrapper
和 ContentCachingResponseWrapper
在这没什么用。
这是我的HTTP请求包装器的实现,可以通过各种方式完成。这只是其中之一:
public class SpringRequestWrapper extends HttpServletRequestWrapper {
private byte[] body;
public SpringRequestWrapper(HttpServletRequest request) {
super(request);
try {
body = IOUtils.toByteArray(request.getInputStream());
} catch (IOException ex) {
body = new byte[0];
}
}
@Override
public ServletInputStream getInputStream() throws IOException {
return new ServletInputStream() {
public boolean isFinished() {
return false;
}
public boolean isReady() {
return true;
}
public void setReadListener(ReadListener readListener) {
}
ByteArrayInputStream byteArray = new ByteArrayInputStream(body);
@Override
public int read() throws IOException {
return byteArray.read();
}
};
}
}
输出流必须做同样的事情,这个实现有点复杂:
public class SpringResponseWrapper extends HttpServletResponseWrapper {
private ServletOutputStream outputStream;
private PrintWriter writer;
private ServletOutputStreamWrapper copier;
public SpringResponseWrapper(HttpServletResponse response) throws IOException {
super(response);
}
@Override
public ServletOutputStream getOutputStream() throws IOException {
if (writer != null) {
throw new IllegalStateException("getWriter() has already been called on this response.");
}
if (outputStream == null) {
outputStream = getResponse().getOutputStream();
copier = new ServletOutputStreamWrapper(outputStream);
}
return copier;
}
@Override
public PrintWriter getWriter() throws IOException {
if (outputStream != null) {
throw new IllegalStateException("getOutputStream() has already been called on this response.");
}
if (writer == null) {
copier = new ServletOutputStreamWrapper(getResponse().getOutputStream());
writer = new PrintWriter(new OutputStreamWriter(copier, getResponse().getCharacterEncoding()), true);
}
return writer;
}
@Override
public void flushBuffer() throws IOException {
if (writer != null) {
writer.flush();
}
else if (outputStream != null) {
copier.flush();
}
}
public byte[] getContentAsByteArray() {
if (copier != null) {
return copier.getCopy();
}
else {
return new byte[0];
}
}
}
我将 ServletOutputStream
包装器实现放到另一个类中:
public class ServletOutputStreamWrapper extends ServletOutputStream {
private OutputStream outputStream;
private ByteArrayOutputStream copy;
public ServletOutputStreamWrapper(OutputStream outputStream) {
this.outputStream = outputStream;
this.copy = new ByteArrayOutputStream();
}
@Override
public void write(int b) throws IOException {
outputStream.write(b);
copy.write(b);
}
public byte[] getCopy() {
return copy.toByteArray();
}
@Override
public boolean isReady() {
return true;
}
@Override
public void setWriteListener(WriteListener writeListener) {
}
}
因为我们需要在处理之前包装HTTP请求流和响应流,所以我们应该使用HTTP过滤器。Spring提供了自己的HTTP过滤器实现。
我们的过滤器扩展了它,并使用自定义请求和响应包装来记录有效负载。此外,它还生成和设置 X-Request-ID
, X-Correlation-ID
header和请求处理时间。
public class SpringLoggingFilter extends OncePerRequestFilter {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringLoggingFilter.class);
private UniqueIDGenerator generator;
public SpringLoggingFilter(UniqueIDGenerator generator) {
this.generator = generator;
}
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws ServletException, IOException {
generator.generateAndSetMDC(request);
final long startTime = System.currentTimeMillis();
final SpringRequestWrapper wrappedRequest = new SpringRequestWrapper(request);
LOGGER.info("Request: method={}, uri={}, payload={}", wrappedRequest.getMethod(),
wrappedRequest.getRequestURI(), IOUtils.toString(wrappedRequest.getInputStream(),
wrappedRequest.getCharacterEncoding()));
final SpringResponseWrapper wrappedResponse = new SpringResponseWrapper(response);
wrappedResponse.setHeader("X-Request-ID", MDC.get("X-Request-ID"));
wrappedResponse.setHeader("X-Correlation-ID", MDC.get("X-Correlation-ID"));
chain.doFilter(wrappedRequest, wrappedResponse);
final long duration = System.currentTimeMillis() - startTime;
LOGGER.info("Response({} ms): status={}, payload={}", value("X-Response-Time", duration),
value("X-Response-Status", wrappedResponse.getStatus()),
IOUtils.toString(wrappedResponse.getContentAsByteArray(), wrappedResponse.getCharacterEncoding()));
}
}
3.自动配置
完成包装器和HTTP过滤器的实现后,我们可以为库准备自动配置。第一步是创建 @Configuration
包含所有必需的bean。
我们必须注册自定义HTTP过滤器 SpringLoggingFilter
,以及用于与 Logstash
和 RestTemplate
HTTP客户端拦截器集成的 logger appender
:
@Configuration
public class SpringLoggingAutoConfiguration {
private static final String LOGSTASH_APPENDER_NAME = "LOGSTASH";
@Value("${spring.logstash.url:localhost:8500}")
String url;
@Value("${spring.application.name:-}")
String name;
@Bean
public UniqueIDGenerator generator() {
return new UniqueIDGenerator();
}
@Bean
public SpringLoggingFilter loggingFilter() {
return new SpringLoggingFilter(generator());
}
@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
List<ClientHttpRequestInterceptor> interceptorList = new ArrayList<ClientHttpRequestInterceptor>();
restTemplate.setInterceptors(interceptorList);
return restTemplate;
}
@Bean
public LogstashTcpSocketAppender logstashAppender() {
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
LogstashTcpSocketAppender logstashTcpSocketAppender = new LogstashTcpSocketAppender();
logstashTcpSocketAppender.setName(LOGSTASH_APPENDER_NAME);
logstashTcpSocketAppender.setContext(loggerContext);
logstashTcpSocketAppender.addDestination(url);
LogstashEncoder encoder = new LogstashEncoder();
encoder.setContext(loggerContext);
encoder.setIncludeContext(true);
encoder.setCustomFields("{\"appname\":\"" + name + "\"}");
encoder.start();
logstashTcpSocketAppender.setEncoder(encoder);
logstashTcpSocketAppender.start();
loggerContext.getLogger(Logger.ROOT_LOGGER_NAME).addAppender(logstashTcpSocketAppender);
return logstashTcpSocketAppender;
}
}
库中的配置集合由 SpringBoot
加载。 SpringBoot
会检查已发布 jar
中是否存在 META-INF/spring.factories
文件。
该文件应列出key等于 EnableAutoConfiguration
的配置类:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
pl.piomin.logging.config.SpringLoggingAutoConfiguration
4.与Logstash集成
通过自动配置的日志记录追加器(logging appender)实现与 Logstash
集成。
我们可以通过在 application.yml
文件中设置属性 spring.logstash.url
来覆盖 Logstash
目标URL:
spring:
application:
name: sample-app
logstash:
url: 192.168.99.100:5000
要在应用程序中启用本文中描述的所有功能,只需要将我的库包含在依赖项中:
pl.piomin
spring-boot-logging
1.0-SNAPSHOT
在运行应用程序之前,您应该在计算机上启动 ElasticStack
。最好的方法是通过 Docker
容器。但首先要创建 Docker
网络,以通过容器名称启用容器之间的通信。
$ docker network create es
现在,在端口9200启动 Elasticsearch
的单个节点实例,我使用版本为 6.7.2
的Elastic Stack工具:
$ docker run -d --name elasticsearch --net es -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:6.7.2
运行 Logstash
时,需要提供包含输入和输出定义的其他配置。我将使用JSON编解码器启动TCP输入,默认情况下不启用。 Elasticsearch
URL设置为输出。它还将创建一个包含应用程序名称的索引。
input {
tcp {
port => 5000
codec => json
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "micro-%{appname}"
}
}
现在我们可以使用 Docker
容器启动 Logstash
。它在端口5000上公开并从 logstash.conf
文件中读取配置:
docker run -d --name logstash --net es -p 5000:5000 -v ~/logstash.conf:/usr/share/logstash/pipeline/logstash.conf docker.elastic.co/logstash/logstash:6.7.2
最后,我们可以运行仅用于显示日志的 Kibana
:
$ docker run -d --name kibana --net es -e "ELASTICSEARCH_URL=http://elasticsearch:9200" -p 5601:5601 docker.elastic.co/kibana/kibana:6.7.2
启动使用 spring-boot-logging
库的示例应用程序后, POST
请求中的日志将显示在 Kibana
中,如下所示:
响应日志每个条目都包含 X-Correlation-ID
, X-Request-ID
, X-Response-Time
和 X-Response-Status
头。
5.摘要
Springlogging library
库可以在我的GitHub地址:https://github.com/piomin/spring-boot-logging.git中找到。我还在努力,所以非常欢迎任何反馈或建议。该库专用于基于微服务的体系结构,您的应用程序可以在容器内的许多实例中启动。
在此模型中,将日志存储在文件中没有任何意义。这就是为什么与 ElasticStack
集成非常重要的原因。 但是这个库最重要的特性是将HTTP请求/响应与完整正文和一些附加信息记录到此日志中,如相关ID或请求处理时间。库非常精简,包含在应用程序之后,所有都是开箱即用的。
【墙裂推荐】
最近热门内容回顾 #技术人系列
下方二维码关注我
互联网草根,坚持分享技术、创业、产品等心得和总结~
点击“阅读原文”,领取 2020 年最新免费技术资料大全