当Tomcat遇上Netty(续集)

本篇接着上篇,主要讲一下Tomcat与Netty是怎么勾搭上的,过程有点复杂。
Tomcat与Netty是如何衔接起来的?
请看下面这张图:

从下往下看,接收请求的时候走的确实是tomcat,然后通过spring cloud gateway的过滤器链,走到了一个叫作 NettyWriteResponseFilter的过滤器。
再接着往下走,又走到了一个叫作 NettyRoutingFilter的一个过滤器,这个过滤器是干什么的呢?
从名字可以看出,它是用来做路由的,也就是在这里把gateway接收到的请求转发给其它服务。
这里路由的实现其实就是创建一个HttpClient,然后根据配置的路由信息拿到目标地址,然后再向目标服务发送一个Http请求。
我们简单地看一下NettyRoutingFilter里面的实现(跟着我的注释看就好):
publicMono<Void> filter(ServerWebExchange exchange,GatewayFilterChain chain){// 拿到目标地址,在前面放进去的URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if(isAlreadyRouted(exchange)||(!"http".equals(scheme)&&!"https".equals(scheme))){return chain.filter(exchange);}setAlreadyRouted(exchange);// 原始请求ServerHttpRequest request = exchange.getRequest();finalHttpMethod method =HttpMethod.valueOf(request.getMethodValue());finalString url = requestUrl.toASCIIString();HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);// 构造新的请求头finalDefaultHttpHeaders httpHeaders =newDefaultHttpHeaders();filtered.forEach(httpHeaders::set);boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE,false);Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);// 获取一个HttpClient// key1, 此时还在tomcat的线程里Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange)// 设置请求头.headers(headers ->{headers.add(httpHeaders);// Will either be set below, or later by Nettyheaders.remove(HttpHeaders.HOST);if(preserveHost){String host = request.getHeaders().getFirst(HttpHeaders.HOST);headers.add(HttpHeaders.HOST, host);}// 发送请求,本文来源于工纵耗彤哥读源码}).request(method).uri(url).send((req, nettyOutbound)->{if(log.isTraceEnabled()){nettyOutbound.withConnection(connection -> log.trace("outbound route: "+ connection.channel().id().asShortText()+", inbound: "+ exchange.getLogPrefix()));}// 发送请求// key2,此时已经到netty的线程里了return nettyOutbound.send(request.getBody().map(this::getByteBuf));}).responseConnection((res, connection)->{// response的处理// Defer committing the response until all route filters have run// Put client response as ServerWebExchange attribute and write// response later NettyWriteResponseFilterexchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);ServerHttpResponse response = exchange.getResponse();// put headers and status so filters can modify the responseHttpHeaders headers =newHttpHeaders();res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);if(StringUtils.hasLength(contentTypeValue)){exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,contentTypeValue);}setResponseStatus(res, response);// make sure headers filters run after setting status so it is// available in responseHttpHeaders filteredResponseHeaders =HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,Type.RESPONSE);if(!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)){// It is not valid to have both the transfer-encoding header and// the content-length header.// Remove the transfer-encoding header in the response if the// content-length header is present.response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);}exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,filteredResponseHeaders.keySet());response.getHeaders().putAll(filteredResponseHeaders);returnMono.just(res);});Duration responseTimeout = getResponseTimeout(route);if(responseTimeout !=null){responseFlux = responseFlux.timeout(responseTimeout,Mono.error(newTimeoutException("Response took longer than timeout: "+ responseTimeout))).onErrorMap(TimeoutException.class,th ->newResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,th.getMessage(), th));}// 过滤器链return responseFlux.then(chain.filter(exchange));}
忽略代码本身的复杂性,整体逻辑就是向目标服务发送请求,并处理响应体。
但是,这里真正发请求的时候是在Netty线程里发送的,同时,处理响应体也同样是在Netty线程中进行的。
而在处理响应之前呢,当然是收到目标服务的响应,在接收目标服务的响应的时候就是在Netty的NioEventLoop中进行的,Netty接收到响应后会创建一个ByteBuf来承载响应的内容,最后,经过一系列的调用就回到了NettyWriteResponseFilter的回调里,在NettyWriteResponseFilter里对响应体进行写出操作,让我们看一下这个类里面的基本内容:
publicMono<Void> filter(ServerWebExchange exchange,GatewayFilterChain chain){// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added// until the NettyRoutingFilter is run// @formatter:offreturn chain.filter(exchange).doOnError(throwable -> cleanup(exchange)).then(Mono.defer(()->{// 响应的回调Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);if(connection ==null){returnMono.empty();}if(log.isTraceEnabled()){log.trace("NettyWriteResponseFilter start inbound: "+ connection.channel().id().asShortText()+", outbound: "+ exchange.getLogPrefix());}// 响应ServerHttpResponse response = exchange.getResponse();// TODO: needed?finalFlux<DataBuffer> body = connection.inbound().receive().retain().map(byteBuf -> wrap(byteBuf, response));MediaType contentType =null;try{contentType = response.getHeaders().getContentType();}catch(Exception e){if(log.isTraceEnabled()){log.trace("invalid media type", e);}}// 写出,本文来源于工纵耗彤哥读源码return(isStreamingMediaType(contentType)? response.writeAndFlushWith(body.map(Flux::just)): response.writeWith(body));})).doOnCancel(()-> cleanup(exchange));// @formatter:on}
但是,这里的response是TomcatServerHttpResponse,因为接收请求是通过tomcat接收的,所以,这里的响应是tomcat的。
因此,最后是调用了tomcat的write()方法或者writeAndFlush()方法,此时,已经没有Netty什么事了。
在整个过程中,接收目标服务的响应的时候通过Netty分配了ByteBuf而把这个响应返回给调用者的时候却是走的tomcat,导致这个分配的ByteBuf一直没有释放,所以,出现了内存泄漏。
如果全程都使用Netty的情况下,也会经历上面说到的这些步骤,只不过在最后这里的响应会变成ReactorServerHttpResponse,而不是tomcat的响应。
在ReactorServerHttpResponse里面就会调用到Netty的相关方法,并往Netty的线程池里放一个任务,这个任务是reactor.netty.channel.MonoSendMany.SendManyInner.AsyncFlush:
finalclassAsyncFlushimplementsRunnable{@Overridepublicvoid run(){if(pending !=0){ctx.flush();}}}
在这个任务里面调用ctx.flush(),这个flush()就是把内容真正地发送出去的方法,发送完了之后,也会把相应的ByteBuf给清理了,也就释放了内存。
好了,让我们用一张图来表示一下整个的过程:

另外,我给spring cloud gateway官方提了一个issue:https://github.com/spring-cloud/spring-cloud-gateway/issues/1728
但是,这官方有点扯淡,直接把我的issue关了,说不支持tomcat,但是确实能运行起来,而且跑得很好,直到出现内存溢出为止,呵呵了。
最后,我把调试的源码放在github上了,你也可以下载下来自己试一下,地址如下:https://github.com/alan-tang-tt/springcloud-gateway-oom。
