Netty的异步任务处理与Socket事件处理
有道无术,术尚可求也!有术无道,止于术!
经过前面几章的学习,我们基本是明白了Netty通道的创建、注册、与绑定与JDK NIO的对应关系,如果我们使用的是JDK NIO的方式去开发一个Socket服务端的时候,此时还缺少了一个重要的环节,就是循环处理IO事件!
我们前面不只一次的见到Netty的异步事件,因为我们某些知识还没有学习到,所以我们都按照同步的方式去获取的,所以我们本章节将带你学习,Netty对于IO事件的处理与异步事件的处理!
我们以绑定为出发点,由点到面进行分析!
一、源码入口
我们直接进入到绑定的源码分析:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// 在触发channelRegistered()之前调用此方法。给用户处理程序一个设置的机会
// 其channelRegistered()实现中的管道。
channel.eventLoop().execute(() -> {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
});
}
我们上节课直接分析的channel.bind方法,而忽略上上面的异步方法,这里我们开始分析异步方法,我们进入到channel.eventLoop().execute()方法:
二、源码分析
我们前面分析过,每个Channel绑定一个NioEventLoop,而EventLoop又是SingleThreadEventExecutor的子类,所以我们进入到io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable):
@Override
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
---------------------------分界线------------------------------------
//继续往下追 execute
private void execute(Runnable task, boolean immediate) {
//判断当前执行的线程是不是 NIoEventLoopGroup的线程 这里是false
boolean inEventLoop = inEventLoop();
//将任务加入到队列
addTask(task);
//这里永远只能启动一次 一个eventLoop
if (!inEventLoop) {
//启动线程
startThread();
.....................................
}
//io.netty.channel.nio.NioEventLoop.selector
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
我们这里可以分为两部分:
1. 添加任务
addTask(task);
----------------------------------分界线---------------------------
protected void addTask(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (!offerTask(task)) {
reject(task);
}
}
基础好一点的同学我估计已经有点猜到了,单看这个 offerTask有没有像和队列相关的操作,我们进入到offerTask方法:
final boolean offerTask(Runnable task) {
...............忽略.................
return taskQueue.offer(task);
}
果不其然,果然是入队操作,taskQueue是什么呢?
我们再初始化NioEventLoop的源码分析学习的时候,学习到,我们会创建两个MpscQ队列(多生产者,单消费者),这个taskQueue就是当时我们创建的一个任务队列,这里面将我们提交的异步任务追加到队列里面!
返回异步任务是不是被追加到队列里面了,如果队列满了,或者其他原因追加失败的话,会返回false,就会执行reject方法:
protected final void reject(Runnable task) {
rejectedExecutionHandler.rejected(task, this);
}
这个拒绝策略同样是我们再创建NioEventLoop的时候创建保存的,给大家留一个作业,去追一下这个拒绝策略,判断一下当发生了添加异步任务失败之后,会发生什么呢?
2. 启动消费线程
startThread();
-----------------------------分割线-------------------------
/**
* 启动线程
*/
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
//启动线程
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
注意,这里有个CAS操作 STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED); 判断消费线程是不是已经启动,如果已经启动就不进入这个逻辑,如果没启动就进入这个逻辑!我们第一次调用,肯定没启动,进入这个逻辑:
doStartThread();
----------------------------分割线---------------------------
private void doStartThread() {
assert thread == null;
//创建一条线程并启动
//这个线程又EventLoop
executor.execute(new Runnable() {
@Override
public void run() {
//保存当前线程 给线程赋值的就是这里
thread = Thread.currentThread();
...........................忽略........................
try {
//进行实际的启动
//io.netty.channel.nio.NioEventLoop.run
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...........................忽略........................
}
}
...........................忽略........................
}
...........................忽略........................
}
代码比较长,我们只分析主线逻辑:
thread = Thread.currentThread();
首先保存了一下当前线程到成员变量,这个分支不是很重要,后面有时间进行分析!
SingleThreadEventExecutor.this.run();
这个就是处理异步任务的代码,我们进入到run方法查看:
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
//存在任务就返回IO时间的数量,不存在任务就返回select阻塞等待事件发生
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
//如果不存在异步任务 就进行事件选择
case SelectStrategy.SELECT:
//下一个定时任务的截至时间 当不存在任务的时候就返回-1
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
//不存在任务就去阻塞获取IO事件
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:
}
} catch (IOException e) {
//替换一个选择器
rebuildSelector0();
//选择次数重置为0
selectCnt = 0;
//处理循环异常 主要处理方式就是睡眠一会让程序主动释放CPU
handleLoopException(e);
continue;
}
//本次循环次数+1
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
//这里是默认值 50
final int ioRatio = this.ioRatio;
boolean ranTasks;
//不会进这个分支
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
//当存在I/O事件的时候
} else if (strategy > 0) {
//记录一下当前的时间
final long ioStartTime = System.nanoTime();
try {
//处理IO事件
processSelectedKeys();
} finally {
//计算处理IO事件耗费的事件
final long ioTime = System.nanoTime() - ioStartTime;
//里面的时间是计算处理异步任务的时间尽量保持为1:1
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//没有IO事件的话就处理异步任务
ranTasks = runAllTasks(0);
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
//没有空轮询的话三次一清空
selectCnt = 0;
//如果空轮询的次数超过默认的512次 就处理空轮询BUG的选择器
} else if (unexpectedSelectorWakeup(selectCnt)) {
//空轮询被处理后清空 轮询次数
selectCnt = 0;
}
} catch (CancelledKeyException e) {
...................忽略........................
} finally {
...................忽略........................
}
}
}
这主线逻辑分为三个:如何解决IO事件、如何处理异步任务、如何解决空轮询BUG!!分支代码关注一下注释,这里分析下主线代码:
I. I/O事件的处理
processSelectedKeys();
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
selectedKeys是我们在创建NIOEventLoop的时候,会创建一个优化后的的SelectorKeySet集合,使用数组来实现的,大家忘记的话,可以会看一下NioEventLoop的初始化源码篇!
当你没有禁用优化的时候,就会进入到if分支,我们查看if内部代码的源码:
private void processSelectedKeysOptimized() {
//开始遍历所有的主键
for (int i = 0; i < selectedKeys.size; ++i) {
//获取事件
final SelectionKey k = selectedKeys.keys[i];
//将该位置的数据制空
selectedKeys.keys[i] = null;
//获取之间注册NioServerSocketChannel的时候,绑定的Channel对象
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//开始进行IO事件处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
.........................忽略............................
}
.........................忽略............................
}
}
获取事件集合中的每一个key,同时获取之前绑定的NioServerSocketChannel,然后调用processSelectedKey处理这个事件:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
//当key失效之后,就关闭通道
....................忽略....................
}
try {
//获取当前事件的key 掩码
int readyOps = k.readyOps();
//是否包含连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//获取包含的事件
int ops = k.interestOps();
//剔除OP_CONNECT事件
ops &= ~SelectionKey.OP_CONNECT;
//重新更新关注的事件
k.interestOps(ops);
//传播 connect事件
unsafe.finishConnect();
}
//如果当前返回的关注事件的掩码包含 OP_WRITE的话
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//开始向通道内刷新数据
ch.unsafe().forceFlush();
}
//如果当前的事件掩码包含读、新连接接入事件 或者 不关注任何事件的时候 传播read事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //传播read事件 可能是新连接接入也可能有数据可读
unsafe.read();
}
} catch (CancelledKeyException ignored) {
//发生异常关闭通道
unsafe.close(unsafe.voidPromise());
}
}
大家可以看到,里面的处理基本和我们对于JDK NIO的处理一致,就是判断各种事件然后进行对应的处理!
II、异步任务的处理
runAllTasks();
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
//合并任务 将定时任务的队列里面的任务拉去出来,和异步任务的队列进行合并
fetchedAll = fetchFromScheduledTaskQueue();
//开始执行全部的任务
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll);
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
这里就是异步任务的被执行的地方,这里分为两个步骤:1. 合并任务 2.执行taskQueue异步任务 3.执行tailQueue异步任务!
合并任务
fetchedAll = fetchFromScheduledTaskQueue();
Netty在我们学习中已经知道了两种队列,一种是taskQueue队列,一种是tailQueue队列,现在又出现了第三种队列:
scheduledTaskQueue
,他是一个专门存放定时任务的对队列,这里的合并任务就是将即将要执行的任务合并到taskQueue中等待执行!这行代码执行完毕后,所有即将要执行的任务都被添加在了taskQueue队列中,等待后续的执行!
执行taskQueue异步任务
//注意这里传入的是合并完成后额taskQueue
runAllTasksFrom(taskQueue)上述代码将对应的任务全部集中到了taskQueue队列中后们这里开始消费taskQueue队列进行执行!我们可以适当的看一下源码:
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
//从taskQueue队列中弹出一个任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
//执行任务 调用run方法
safeExecute(task);
//继续弹出任务
task = pollTaskFrom(taskQueue);
//如果弹出的任务为空
if (task == null) {
//直接返回
return true;
}
}
}执行tailQueue异步任务
afterRunningAllTasks();
这里开始执行tailQueue节点的任务,可以看到,tailQueue节点的任务执行优先级低于上述两种队列!
@Override
protected void afterRunningAllTasks() {
//注意这里传入的是 tailQueue
runAllTasksFrom(tailTasks);
}
//继续往下看源码
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
//弹出任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
//执行任务
safeExecute(task);
//再次弹出任务
task = pollTaskFrom(taskQueue);
if (task == null) {
//任务执行完毕 返回true
return true;
}
}
}这里就不作过多讲解了,这里和上面的逻辑基本一致,只是执行的qeueb不是一个!
III、解决臭名昭著的JDK空轮询BUG
可能大家大家都知道,JDK NIO在事件循环判断的时候可能会出现空轮询的BUG,导致CPU100%,虽然Oracle官方宣称空轮询的BUG已经解决了,但是后续经过一些公司实际的业务上证明并没有解决,只是出现几率小了点,Netty事实上并没有解决这个空轮询BUG只是用另外一种比较巧妙的方法规避开了,我们一起学习下:
首先,我们先想一下,我们如何断定我们的程序可能发生了空轮询的BUG,学习过NIO的都知道,我们会调用一个selector.select()进行阻塞等待有完成的事件发生,当selet方法阻塞解除的时候,就证明一定有我么感兴趣的事件发生,但是当我们发现select方法解除了阻塞,但是事件数量却为0的时候,我们就认为可能出现了空轮询的BUG!
但是IO数量为0并不是一定出现了空轮询的BUG,也可能外部调用了markUp方法,所以我们不能每一次出现事件数量为0的时候都认为程序出现了空轮询BUG,所以我们就需要有一个记录它出现该类异常情况发生的次数,当发生的次数达到了我们设置的阈值,就证明它可能发生了空轮询的BUG,这个时候需要处理这个空轮询的BUG!
那么如何处理呢? 我们任务发生空轮询问题是因为(JDK官方认为,这个Linux Epoll告诉JDK有事件了,但是JDK获取事件的时候获取了一个空,所以JDK只能返回一个0)所以就发生了空轮询:
JDK官方给出的解决方案
Netty是使用的第三种,抛弃旧的选择器,重建一个新的选择器,然后替换旧的选择器,我们一起看下源码!
我们看看Netty是如何做的,我们回到io.netty.channel.nio.NioEventLoop#run源码:
我还是,为了方便讲解,把这段代码贴出来省略和空轮询无关的代码(完整代码见上):
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
........................忽略进行事件选择的代码...................
//本次循环次数+1
selectCnt++;
....................忽略事件处理和异步任务执行的代码................
//当处理的异步任务或者IO事件的数量大于0,证明没有发生空轮询
if (ranTasks || strategy > 0) {
//每隔三次打印一次日志
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
//没有空轮询的话清空
selectCnt = 0;
//如果出现异步任务为空 IO事件为空的话就会进入到这个逻辑
} else if (unexpectedSelectorWakeup(selectCnt)) {
//空轮询被处理后清空 轮询次数
selectCnt = 0;
}
} catch (CancelledKeyException e) {
...................忽略........................
} finally {
...................忽略........................
}
}
可以仔细的看一下 上述代码的注释,我们进入到 unexpectedSelectorWakeup(selectCnt) 方法:
private boolean unexpectedSelectorWakeup(int selectCnt) {
..............忽略日志打印................
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
//判断异常情况的次数是不是超过了预设的512次
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//开始重新构建一个selector
rebuildSelector();
return true;
}
return false;
}
我们读源码到这里,可以知道,当异常执行的次数超过了阈值 512次,就会调用一个 rebuildSelector方法,我们点进去看一下:
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
我们按照惯例,按照同步方法调用 rebuildSelector0();
private void rebuildSelector0() {
//获取原始的选择器
final Selector oldSelector = selector;
//声明一个新的选择器
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
//创建一个新的选择器,赋值给新的选择器变量
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
int nChannels = 0;
//开始遍历旧的选择器,将旧选择器的IO事件的key,绑定到新创建的选择器上
for (SelectionKey key: oldSelector.keys()) {
//获取旧选择器的管道
Object a = key.attachment();
try {
//如果key失效了,就跳过!
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
//获取对应关注的事件掩码
int interestOps = key.interestOps();
//将旧key置为失效
key.cancel();
//重新将管道绑定到新的选择器上
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
//替换管道里面保存的选择器事件主键
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
...............省略...............
}
}
//重新保存新的优化后的选择器和原始选择器
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
//关闭旧的选择器
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
...............省略..................
}
}
...............省略..................
}
我们从上述代码可以看到,Netty处理空轮询的问题的策略是,当发现你可能发生空轮询的次数超过了512次的时候,就直接重新获取一个新的选择器,然后将旧的选择器直接替换掉,这样空轮询的BUG也就很轻易的解决了!
三、总结
每一个EventLoop都会启动一条永久运行的线程,用于处理异步任务和IO事件,我们称之为Reactor线程。 如果存在IO事件的话,会先处理IO事件! Reactor线程会先将定时任务里面的任务合并到taskqueue里面,然后执行!taskQueue执行完毕后执行tailQueue队列的任务! 如果空轮询的次数发生了512次,就认为发生了空轮询的BUG,就会抛弃原来的选择器,重建一个新的选择器,将旧选择器上的事件全部绑定到新的选择器上,然后将旧选择器删除!
才疏学浅,如果文章中理解有误,欢迎大佬们私聊指正!欢迎关注作者的公众号,一起进步,一起学习!
❤️「转发」和「在看」,是对我最大的支持❤️