当Tomcat遇上Netty(续集)

彤哥读源码

共 6656字,需浏览 14分钟

 ·

2020-07-12 19:21

36d23f5a30f4e8001421202912f92c86.webp

本篇接着上篇,主要讲一下Tomcat与Netty是怎么勾搭上的,过程有点复杂。

Tomcat与Netty是如何衔接起来的?

请看下面这张图:

18935d027cbad700dc4c82bb04b95f00.webp

从下往下看,接收请求的时候走的确实是tomcat,然后通过spring cloud gateway的过滤器链,走到了一个叫作 NettyWriteResponseFilter的过滤器。

再接着往下走,又走到了一个叫作 NettyRoutingFilter的一个过滤器,这个过滤器是干什么的呢?

从名字可以看出,它是用来做路由的,也就是在这里把gateway接收到的请求转发给其它服务。

这里路由的实现其实就是创建一个HttpClient,然后根据配置的路由信息拿到目标地址,然后再向目标服务发送一个Http请求。

我们简单地看一下NettyRoutingFilter里面的实现(跟着我的注释看就好):

  1. publicMono<Void> filter(ServerWebExchange exchange,GatewayFilterChain chain){

  2. // 拿到目标地址,在前面放进去的

  3. URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);


  4. String scheme = requestUrl.getScheme();

  5. if(isAlreadyRouted(exchange)

  6. ||(!"http".equals(scheme)&&!"https".equals(scheme))){

  7. return chain.filter(exchange);

  8. }

  9. setAlreadyRouted(exchange);


  10. // 原始请求

  11. ServerHttpRequest request = exchange.getRequest();


  12. finalHttpMethod method =HttpMethod.valueOf(request.getMethodValue());

  13. finalString url = requestUrl.toASCIIString();


  14. HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);


  15. // 构造新的请求头

  16. finalDefaultHttpHeaders httpHeaders =newDefaultHttpHeaders();

  17. filtered.forEach(httpHeaders::set);


  18. boolean preserveHost = exchange

  19. .getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE,false);

  20. Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);


  21. // 获取一个HttpClient

  22. // key1, 此时还在tomcat的线程里

  23. Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange)

  24. // 设置请求头

  25. .headers(headers ->{

  26. headers.add(httpHeaders);

  27. // Will either be set below, or later by Netty

  28. headers.remove(HttpHeaders.HOST);

  29. if(preserveHost){

  30. String host = request.getHeaders().getFirst(HttpHeaders.HOST);

  31. headers.add(HttpHeaders.HOST, host);

  32. }

  33. // 发送请求,本文来源于工纵耗彤哥读源码

  34. }).request(method).uri(url).send((req, nettyOutbound)->{

  35. if(log.isTraceEnabled()){

  36. nettyOutbound

  37. .withConnection(connection -> log.trace("outbound route: "

  38. + connection.channel().id().asShortText()

  39. +", inbound: "+ exchange.getLogPrefix()));

  40. }

  41. // 发送请求

  42. // key2,此时已经到netty的线程里了

  43. return nettyOutbound.send(request.getBody().map(this::getByteBuf));

  44. }).responseConnection((res, connection)->{

  45. // response的处理


  46. // Defer committing the response until all route filters have run

  47. // Put client response as ServerWebExchange attribute and write

  48. // response later NettyWriteResponseFilter

  49. exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);

  50. exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);


  51. ServerHttpResponse response = exchange.getResponse();

  52. // put headers and status so filters can modify the response

  53. HttpHeaders headers =newHttpHeaders();


  54. res.responseHeaders().forEach(

  55. entry -> headers.add(entry.getKey(), entry.getValue()));


  56. String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);

  57. if(StringUtils.hasLength(contentTypeValue)){

  58. exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,

  59. contentTypeValue);

  60. }


  61. setResponseStatus(res, response);


  62. // make sure headers filters run after setting status so it is

  63. // available in response

  64. HttpHeaders filteredResponseHeaders =HttpHeadersFilter.filter(

  65. getHeadersFilters(), headers, exchange,Type.RESPONSE);


  66. if(!filteredResponseHeaders

  67. .containsKey(HttpHeaders.TRANSFER_ENCODING)

  68. && filteredResponseHeaders

  69. .containsKey(HttpHeaders.CONTENT_LENGTH)){

  70. // It is not valid to have both the transfer-encoding header and

  71. // the content-length header.

  72. // Remove the transfer-encoding header in the response if the

  73. // content-length header is present.

  74. response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);

  75. }


  76. exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,

  77. filteredResponseHeaders.keySet());


  78. response.getHeaders().putAll(filteredResponseHeaders);


  79. returnMono.just(res);

  80. });


  81. Duration responseTimeout = getResponseTimeout(route);

  82. if(responseTimeout !=null){

  83. responseFlux = responseFlux

  84. .timeout(responseTimeout,Mono.error(newTimeoutException(

  85. "Response took longer than timeout: "+ responseTimeout)))

  86. .onErrorMap(TimeoutException.class,

  87. th ->newResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,

  88. th.getMessage(), th));

  89. }


  90. // 过滤器链

  91. return responseFlux.then(chain.filter(exchange));

  92. }

忽略代码本身的复杂性,整体逻辑就是向目标服务发送请求,并处理响应体。

但是,这里真正发请求的时候是在Netty线程里发送的,同时,处理响应体也同样是在Netty线程中进行的。

而在处理响应之前呢,当然是收到目标服务的响应,在接收目标服务的响应的时候就是在Netty的NioEventLoop中进行的,Netty接收到响应后会创建一个ByteBuf来承载响应的内容,最后,经过一系列的调用就回到了NettyWriteResponseFilter的回调里,在NettyWriteResponseFilter里对响应体进行写出操作,让我们看一下这个类里面的基本内容:

  1. publicMono<Void> filter(ServerWebExchange exchange,GatewayFilterChain chain){

  2. // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added

  3. // until the NettyRoutingFilter is run

  4. // @formatter:off

  5. return chain.filter(exchange)

  6. .doOnError(throwable -> cleanup(exchange))

  7. .then(Mono.defer(()->{

  8. // 响应的回调


  9. Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);


  10. if(connection ==null){

  11. returnMono.empty();

  12. }

  13. if(log.isTraceEnabled()){

  14. log.trace("NettyWriteResponseFilter start inbound: "

  15. + connection.channel().id().asShortText()+", outbound: "

  16. + exchange.getLogPrefix());

  17. }

  18. // 响应

  19. ServerHttpResponse response = exchange.getResponse();


  20. // TODO: needed?

  21. finalFlux<DataBuffer> body = connection

  22. .inbound()

  23. .receive()

  24. .retain()

  25. .map(byteBuf -> wrap(byteBuf, response));


  26. MediaType contentType =null;

  27. try{

  28. contentType = response.getHeaders().getContentType();

  29. }

  30. catch(Exception e){

  31. if(log.isTraceEnabled()){

  32. log.trace("invalid media type", e);

  33. }

  34. }


  35. // 写出,本文来源于工纵耗彤哥读源码

  36. return(isStreamingMediaType(contentType)

  37. ? response.writeAndFlushWith(body.map(Flux::just))

  38. : response.writeWith(body));

  39. })).doOnCancel(()-> cleanup(exchange));

  40. // @formatter:on

  41. }

但是,这里的response是TomcatServerHttpResponse,因为接收请求是通过tomcat接收的,所以,这里的响应是tomcat的。

因此,最后是调用了tomcat的write()方法或者writeAndFlush()方法,此时,已经没有Netty什么事了。

在整个过程中,接收目标服务的响应的时候通过Netty分配了ByteBuf而把这个响应返回给调用者的时候却是走的tomcat,导致这个分配的ByteBuf一直没有释放,所以,出现了内存泄漏。

如果全程都使用Netty的情况下,也会经历上面说到的这些步骤,只不过在最后这里的响应会变成ReactorServerHttpResponse,而不是tomcat的响应。

在ReactorServerHttpResponse里面就会调用到Netty的相关方法,并往Netty的线程池里放一个任务,这个任务是reactor.netty.channel.MonoSendMany.SendManyInner.AsyncFlush:

  1. finalclassAsyncFlushimplementsRunnable{

  2. @Override

  3. publicvoid run(){

  4. if(pending !=0){

  5. ctx.flush();

  6. }

  7. }

  8. }

在这个任务里面调用ctx.flush(),这个flush()就是把内容真正地发送出去的方法,发送完了之后,也会把相应的ByteBuf给清理了,也就释放了内存。

好了,让我们用一张图来表示一下整个的过程:

47c893b604844fbe9604a3b2f439155f.webp

另外,我给spring cloud gateway官方提了一个issue:https://github.com/spring-cloud/spring-cloud-gateway/issues/1728

但是,这官方有点扯淡,直接把我的issue关了,说不支持tomcat,但是确实能运行起来,而且跑得很好,直到出现内存溢出为止,呵呵了。

最后,我把调试的源码放在github上了,你也可以下载下来自己试一下,地址如下:https://github.com/alan-tang-tt/springcloud-gateway-oom

浏览 99
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报