聊聊 WebFlux 线程调度
共 4984字,需浏览 10分钟
·
2024-01-05 09:50
如果你不太了解 Project Reactor,以及非阻塞 IO 编程。强烈建议阅读我的前几篇文章
背景今天在用 WebFlux 开发时偶然发现了一个问题,代码简化为如下
@Slf4j
@RestController
@SpringBootApplication
public class WebfluxScheduleApplication {
public static void main(String[] args) {
SpringApplication.run(WebfluxScheduleApplication.class, args);
}
@Component
public static class LogFilter implements WebFilter {
@NotNull
@Override
public Mono<Void> filter(@NotNull ServerWebExchange serverWebExchange,
WebFilterChain webFilterChain) {
log.info("filter start");
return webFilterChain.filter(serverWebExchange)
.doFinally(signalType -> log.info("filter end"));
}
}
@GetMapping("subscribeOn")
public Mono<String> subscribeOn() {
return Mono.fromCallable(() -> {
String result = "hello world";
log.info("{}", result);
return result;
}).subscribeOn(Schedulers.boundedElastic())
.doOnNext(result -> log.info("doOnNext: {}", result));
}
}
在 /subscribeOn
接口中,我通过 Mono.subscribeOn
将当前发布者调度到 boundedElastic 线程。按照我对 Reactor 的理解,后续所有的消费逻辑都应该在 boundedElastic 上进行。
但是实际程序运行效果如下:
2023-12-14 22:19:17 [reactor-http-nio-2] INFO c.g.s.WebfluxScheduleApplication - filter start
2023-12-14 22:19:17 [boundedElastic-1] INFO c.g.s.WebfluxScheduleApplication - hello world
2023-12-14 22:19:17 [boundedElastic-1] INFO c.g.s.WebfluxScheduleApplication - doOnNext: hello world
2023-12-14 22:19:17 [reactor-http-nio-2] INFO c.g.s.WebfluxScheduleApplication - filter end
与预期不一致的是 filter end
这句日志并没有在 boundedElastic-1 线程输出,而是输出在了 reactor-http-nio-2 线程。
更诡异的是,当我测试了十几次后发现,filter start
和 filter end
这两句日志,永远都在同一个线程。
看了几个晚上的 Reactor 源码实在是没有找到 Webflux 在什么地方进行的线程调度,并且我对 Scheduler 添加了 hook 和断点,也就是说只要有线程调度,一定能被我 debug 到,但是发现并没有走到我的 hook 中。这个时候其实已经证明了不是在 Reactor 这一层做的线程调度。
就在我即将放弃的时候,突然想到,会不会和 Webflux 底层的 Netty 有关系呢?通过 Debug 后发现真的和 Netty 有关系
Netty我们先了解 Netty 以下几点
- NioEventLoop:Netty 实现非阻塞 IO 的核心,由于不是本文重点,就不过多赘述了
- Channel:可以理解为是 Socket 的包装,可以通过 Channel 进行 Socket 的读写操作
- 一个 NioEventLoop 会关联多个 Channel
- 每个 NioEventLoop 会在一个专门的线程上执行
这里我就直接说结论了,Netty 中一个 Channel(Socket)的读写操作全部在单线程上完成的,即使中间发生了线程调度也不会影响这个结论
简单讲讲这是怎么实现的
首先如何保证 NioEventLoop 只在一个线程上执行?
很简单,NioEventLoop 的 run()
就是一个死循环,不断地处理已经就绪的 IO 事件和提交到 NioEventLoop 中的 Task。
将 NioEventLoop 的 run()
提交到线程池,就实现了 NioEventLoop 只在一个专门的线程上执行
发生线程调度后,如何保证 Channel 的读写在同一个线程?
NioEventLoop 中包含一个 taskQueue 属性。上面说过了 NioEventLoop 会不断执行 “提交到 NioEventLoop 中的 Task”,这个 Task 会存储在 taskQueue 中。
由于 NioEventLoop 和 Channel 存在一个关联关系。Channel 可以找到自己对应的 NioEventLoop,在执行 Channel 写操作时,会将写操作转换成一个 Task 提交到 NioEventLoop 中,这就保证了 Channel 的读写在同一个线程
通过 Debug 验证Netty 将读写操作控制在单线程上,也解决了用户使用时需要考虑线程切换的问题
当 Webflux 处理完整个请求时,会调用 Netty 的 Channel.writeAndFlush
,可以看到该方法调用时线程还是处于 boundedElastic 线程,与我最开始的猜想是一致的。
writeAndFlush 方法最终会将 Task 添加到 NioEventLoop 的队列中。当 Socket 写操作结束后,Netty 通过 Listener 通知写成功事件,会执行到 Reactor 的 onComplete,最终反应到我们代码里就是执行了 Filter 中的 doFinally 输出日志。
至此,问题解决。
最后本文完整 demo
- https://github.com/TavenYin/taven-springboot-learning/tree/master/springboot3-webflux-schedule