当Tomcat遇上Netty(续集)
共 6656字,需浏览 14分钟
·
2020-07-12 19:21
本篇接着上篇,主要讲一下Tomcat与Netty是怎么勾搭上的,过程有点复杂。
Tomcat与Netty是如何衔接起来的?
请看下面这张图:
从下往下看,接收请求的时候走的确实是tomcat,然后通过spring cloud gateway的过滤器链,走到了一个叫作 NettyWriteResponseFilter
的过滤器。
再接着往下走,又走到了一个叫作 NettyRoutingFilter
的一个过滤器,这个过滤器是干什么的呢?
从名字可以看出,它是用来做路由的,也就是在这里把gateway接收到的请求转发给其它服务。
这里路由的实现其实就是创建一个HttpClient,然后根据配置的路由信息拿到目标地址,然后再向目标服务发送一个Http请求。
我们简单地看一下NettyRoutingFilter里面的实现(跟着我的注释看就好):
publicMono<Void> filter(ServerWebExchange exchange,GatewayFilterChain chain){
// 拿到目标地址,在前面放进去的
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if(isAlreadyRouted(exchange)
||(!"http".equals(scheme)&&!"https".equals(scheme))){
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
// 原始请求
ServerHttpRequest request = exchange.getRequest();
finalHttpMethod method =HttpMethod.valueOf(request.getMethodValue());
finalString url = requestUrl.toASCIIString();
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
// 构造新的请求头
finalDefaultHttpHeaders httpHeaders =newDefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
boolean preserveHost = exchange
.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE,false);
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
// 获取一个HttpClient
// key1, 此时还在tomcat的线程里
Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange)
// 设置请求头
.headers(headers ->{
headers.add(httpHeaders);
// Will either be set below, or later by Netty
headers.remove(HttpHeaders.HOST);
if(preserveHost){
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
headers.add(HttpHeaders.HOST, host);
}
// 发送请求,本文来源于工纵耗彤哥读源码
}).request(method).uri(url).send((req, nettyOutbound)->{
if(log.isTraceEnabled()){
nettyOutbound
.withConnection(connection -> log.trace("outbound route: "
+ connection.channel().id().asShortText()
+", inbound: "+ exchange.getLogPrefix()));
}
// 发送请求
// key2,此时已经到netty的线程里了
return nettyOutbound.send(request.getBody().map(this::getByteBuf));
}).responseConnection((res, connection)->{
// response的处理
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers =newHttpHeaders();
res.responseHeaders().forEach(
entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if(StringUtils.hasLength(contentTypeValue)){
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
contentTypeValue);
}
setResponseStatus(res, response);
// make sure headers filters run after setting status so it is
// available in response
HttpHeaders filteredResponseHeaders =HttpHeadersFilter.filter(
getHeadersFilters(), headers, exchange,Type.RESPONSE);
if(!filteredResponseHeaders
.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders
.containsKey(HttpHeaders.CONTENT_LENGTH)){
// It is not valid to have both the transfer-encoding header and
// the content-length header.
// Remove the transfer-encoding header in the response if the
// content-length header is present.
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
returnMono.just(res);
});
Duration responseTimeout = getResponseTimeout(route);
if(responseTimeout !=null){
responseFlux = responseFlux
.timeout(responseTimeout,Mono.error(newTimeoutException(
"Response took longer than timeout: "+ responseTimeout)))
.onErrorMap(TimeoutException.class,
th ->newResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
th.getMessage(), th));
}
// 过滤器链
return responseFlux.then(chain.filter(exchange));
}
忽略代码本身的复杂性,整体逻辑就是向目标服务发送请求,并处理响应体。
但是,这里真正发请求的时候是在Netty线程里发送的,同时,处理响应体也同样是在Netty线程中进行的。
而在处理响应之前呢,当然是收到目标服务的响应,在接收目标服务的响应的时候就是在Netty的NioEventLoop中进行的,Netty接收到响应后会创建一个ByteBuf来承载响应的内容,最后,经过一系列的调用就回到了NettyWriteResponseFilter的回调里,在NettyWriteResponseFilter里对响应体进行写出操作,让我们看一下这个类里面的基本内容:
publicMono<Void> filter(ServerWebExchange exchange,GatewayFilterChain chain){
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
// until the NettyRoutingFilter is run
// @formatter:off
return chain.filter(exchange)
.doOnError(throwable -> cleanup(exchange))
.then(Mono.defer(()->{
// 响应的回调
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
if(connection ==null){
returnMono.empty();
}
if(log.isTraceEnabled()){
log.trace("NettyWriteResponseFilter start inbound: "
+ connection.channel().id().asShortText()+", outbound: "
+ exchange.getLogPrefix());
}
// 响应
ServerHttpResponse response = exchange.getResponse();
// TODO: needed?
finalFlux<DataBuffer> body = connection
.inbound()
.receive()
.retain()
.map(byteBuf -> wrap(byteBuf, response));
MediaType contentType =null;
try{
contentType = response.getHeaders().getContentType();
}
catch(Exception e){
if(log.isTraceEnabled()){
log.trace("invalid media type", e);
}
}
// 写出,本文来源于工纵耗彤哥读源码
return(isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body));
})).doOnCancel(()-> cleanup(exchange));
// @formatter:on
}
但是,这里的response是TomcatServerHttpResponse,因为接收请求是通过tomcat接收的,所以,这里的响应是tomcat的。
因此,最后是调用了tomcat的write()方法或者writeAndFlush()方法,此时,已经没有Netty什么事了。
在整个过程中,接收目标服务的响应的时候通过Netty分配了ByteBuf而把这个响应返回给调用者的时候却是走的tomcat,导致这个分配的ByteBuf一直没有释放,所以,出现了内存泄漏。
如果全程都使用Netty的情况下,也会经历上面说到的这些步骤,只不过在最后这里的响应会变成ReactorServerHttpResponse,而不是tomcat的响应。
在ReactorServerHttpResponse里面就会调用到Netty的相关方法,并往Netty的线程池里放一个任务,这个任务是reactor.netty.channel.MonoSendMany.SendManyInner.AsyncFlush:
finalclassAsyncFlushimplementsRunnable{
@Override
publicvoid run(){
if(pending !=0){
ctx.flush();
}
}
}
在这个任务里面调用ctx.flush(),这个flush()就是把内容真正地发送出去的方法,发送完了之后,也会把相应的ByteBuf给清理了,也就释放了内存。
好了,让我们用一张图来表示一下整个的过程:
另外,我给spring cloud gateway官方提了一个issue:https://github.com/spring-cloud/spring-cloud-gateway/issues/1728
但是,这官方有点扯淡,直接把我的issue关了,说不支持tomcat,但是确实能运行起来,而且跑得很好,直到出现内存溢出为止,呵呵了。
最后,我把调试的源码放在github上了,你也可以下载下来自己试一下,地址如下:https://github.com/alan-tang-tt/springcloud-gateway-oom。