Netty 源码深度解析-EventLoop(1):EventLoop的构造
共 21805字,需浏览 44分钟
·
2021-05-23 09:18
- 前言 -
本文源码地址:netty-source-code-analysis
本文所使用的netty版本4.1.6.Final:带注释的netty源码
EventLoop
在netty中发挥着驱动引擎的作用,本文我们以NioEventLoopGroup
和NioEventLoop
为例着重分析一下EventLoopGroup
和EventLoop
的创建、一些重要的数据结构和netty的一些优化。
- NioEventLoopGroup -
咱们以NioEventLoopGroup
为例进行分析。NioEventLoopGroup
有很多构造方法,咱们不再一一贴出,只贴出2个关键的构造方法。
NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider)
该构造方法给出了SelectStrategyFactory
的默认值为DefaultSelectStrategyFactory.INSTANCE
。EventLoop
在每次循环时需要调用该类的calculateStrategy
方法来决定循环的策略,具体咱们后边讲。
我们通过new NioEventLoopGroup()
或者new NioEventLoopGroup(32)
创建EventLoopGroup
时最终都会调用到这个构造方法。接着又调用了另一个构造方法,咱们跟下去。
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
这里调用了父类 MultithreadEventLoopGroup
的构造方法,咱们继续跟下去。
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
MultithreadEventLoopGroup
的构造方法如下,这里给出了nThreads
的默认值,为MultithreadEventLoopGroup
的静态属性DEFAULT_EVENT_LOOP_THREADS
。接着调用父类MultithreadEventExecutorGroup
的构造方法。
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
DEFAULT_EVENT_LOOP_THREADS
的赋值在MultithreadEventLoopGroup
的静态代码块中,我们看到该值默认为cpu核数×2。
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
咱们接着跟到MultithreadEventExecutorGroup
的构造方法,这里继续调用本类的构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args)
,并且为executor
赋默认值。
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
executor
的默认值为ThreadPerTaskExecutor
,顾名思义就是为每一个任务创建一个线程,我们看一下它的实现,代码如下,execute
方法中为每一个任务创建了一个线程。
public final class ThreadPerTaskExecutor implements Executor {
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
我们继续跟到MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args)
,这里接着调用另一个构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)
并且为chooserFacotry
赋默认值,chooserFactory
咱们在“服务端的启动流程”和“客户端的启动流程”中均有涉及,作用是在为Channel
绑定EventLoop
时轮询地从EventLoopGroup
里选择EventLoop
,这里不再赘述。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
接着看下一个构造方法MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)
,这里就是创建EventLoopGroup
的核心逻辑了。
首先创建了一个EventExecutor
数组,并赋值给children
属性,数据长度和nThreads
相等,很容易想到,这里应该就是保存EventLoop
的数组了。
紧接着循环调用newChild
方法为数组的每一个元素赋值。
最后调用chooserFactory.newChooser(children)
为chooser
赋值。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
} finally {
}
}
chooser = chooserFactory.newChooser(children);
}
我们跟着看一下newChild
方法,该方法为抽象的,这里newChild
方法的实现在NioEventLoopGroup
中。好了,到这里咱们看到了NioEventLoop
的构造方法,继续跟进去。
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
- NioEventLoop -
来看NioEventLoop
的构造方法,这里NioEventLoop
保存了3个属性。
provider
:selector
的工厂类,从provider
可以得到selector
。selector
: 调用provider.openSelector()
得到的selector
,这里netty做了优化,咱们后边讲。selectStrategy
:这个的calculateStrategy
方法返回值,决定了EventLoop
的下一步动作,咱们也是后边讲。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
继续跟到父类SingleThreadEventLoop
的构造方法,这里初始化了一个属性tailTasks
,具体有什么用,咱们一会儿说,先记住这里有一个任务队列叫tailTasks
。
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
继续跟到父类SingleThreadEventExecutor
的构造方法,这里有几个属性赋值。
addTaskWakesUp
:这个是用来唤醒阻塞在taskQueue
上的EventLoop
线程的,在NioEventLoop
中EventLoop
不会阻塞在taskQueue
上,这里用处不是很大,可以先不研究它。maxPendingTasks
:后面紧接着就用到了,任务队列的最大长度。executor
:真正生成线程的类,默认是ThreadPerTaskExecutor
。taskQueue
:任务队列,还记得上边已经有一个tailTask
队列了吗,这里是另外一个队列。rejectedExecutionHandler
:拒绝策略。
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
继续跟到父类AbstractScheduledEventExecutor
的构造方法,这里什么也没干,又继续调用父类的构造方法,但是我们注意到该类中也有一个队列scheduledTaskQueue
,顾名思义是定时任务队列,只是该队列没有在构造方法中初始化,等到有定时任务加入时才初始化。
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}
}
继续跟到父类AbstractEventExecutor
的构造方法,这里为parent
赋值,即该EventLoop
所属的EventLoopGroup
。
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
- Netty 的优化 -
3.1 对selector的优化
我们回到NioEventLoop
类中的openSelector
方法,这个方法在干什么呢,它在通过反射替换sun.nio.ch.SelectorImpl
类的两个属性selectedKeys
和publicSelectedKeys
,将这两个属性都替换成了SelectedSelectionKeySet
类的实例。为什么要这么换呢,咱们接着往下看。
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (ClassNotFoundException e) {
} catch (SecurityException e) {
}
}
});
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
} catch (IllegalAccessException e) {
} catch (RuntimeException e) {
}
}
});
if (maybeException instanceof Exception) {
} else {
//将selectedKeySet保存到selectedKeys属性中
selectedKeys = selectedKeySet;
}
return selector;
}
sun.nio.ch.SelectorImpl
类中的selectedKeys
和publicSelectedKeys
的作用是保存有兴趣事件发生的Selectionkey
,其中publicSelectedKeys
是selectedKeys
的包装类,Util.ungrowableSet(this.selectedKeys)
,看方法名ungrowableSet
表示包装之后这个set
就不能添加元素了,但是可以删除元素。也就是说jdk内部使用selectedKeys
(注意这是个protected
字段)进行添加和删除操作,而暴露给用户的是publicSelectedKeys
只能进行删除操作(看selectedKeys
方法)。
默认实现用的是HashSet
,
public abstract class SelectorImpl extends AbstractSelector {
protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
private Set<SelectionKey> publicKeys;
private Set<SelectionKey> publicSelectedKeys;
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}
public Set<SelectionKey> selectedKeys() {
if (!this.isOpen() && !Util.atBugLevel("1.4")) {
throw new ClosedSelectorException();
} else {
return this.publicSelectedKeys;
}
}
}
再来看看SelectedSelectionKeySet
,这里用的数据结构是数组,很显然netty这里的优化是因为数组的元素添加和遍历操作比HashSet
更快。有同学对这里有两个数组的存在表示疑问,大家不用操心这个问题了,因为我也没看懂为什么会有两个数组,还要交替使用。netty在后来的版本中做了优化,只用一个数组就实现了。所以这里只要知道netty用数组进行优化就可以了。
那么又一个问题来了,为什么jdk里边不用数组或者List
,而用HashSet
呢,那是因为selector
的select
方法每次调用都会把已经准备好的SelectionKey
放入selectedKeys
中,如果用户在第一次调用select
方法之后没有处理相应Channel
的事件的,也没有删除selectedKeys
中的元素,那么再次调用select
之后,相同的SelectionKey
会再次加入selectedKeys
中,如果selectedKeys
使用数组或者List
实现将起不到去重的效果。
另一方面,如果使用List
或者数组,删除成本也比较高。
而netty的实现中保证不会在连续调用两次select
方法之间不删除selectedKeys
中的元素,而且netty
直接将selectedKeys
暴露出来,在删除的时候可以直接将数据中对应索引的元素设置为null。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
private SelectionKey[] keysA;
private int keysASize;
private SelectionKey[] keysB;
private int keysBSize;
private boolean isA = true;
}
3.2 对任务队列的优化
在SingleThreadEventExecutor
的构造方法中taskQueue
属性通过调用newTaskQueue
方法产生,newTaskQueue
方法在NioEventLoop
中被覆盖了,我们看一下。
这里被优化成了MpscQueue
,全称是MultiProducerSingleConsumerQueue
,即多生产者单消费者队列,感兴趣的同学自己去看一下,既然单独做一个这样的队列出来,在当前场景下自然是比BlockingQueue
性能更好了。
为什么可以做这样的优化呢,很显然,这里的队列消费者只有一个那就是EventLoop
,而生产者会有多个。并且,这里有一行注释This event loop never calls takeTask()
,这就说明这个MultiProducerSingleConsumerQueue
并没有阻塞的take
方法,而正好NioEventLoop
也不需要调用阻塞的take
方法,正好适合。
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks);
}
同样的,tailTask
这个队列也被优化成了MpscQueue
。我们看一下scheduledTaskQueue
是什么队列,答案在AbstractScheduledEventExecutor#scheduledTaskQueue
方法中,我们看到scheduledTaskQueue
仅仅是一个普通的优先级队列,甚至都不是一个线程安全的阻塞队列。
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
为什么呢,为什么tailTasks
和taskQueue
是MpscQueue
,而scheduledTaskQueue
是非线程安全的队列呢?可能是因为没有的带优先级功能的MpsQueue
吧。
scheduledTaskQueue
是非线程安全的队列,会不会有多线程安全问题呢,答案是不会。我们看一下AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask<V>)
方法,这里在添加定时任务时,如果是非EventLoop
线程调用,则会发起一个异步调用,最终往scheduledTaskQueue
添加定时任务的还是EventLoop
线程。所以呢,这里又有另一个优化,那就是可以延迟初始化scheduledTaskQueue
。
- 总结 -
画重点来了,本文的重点就这两个。
每个 EventLoop
中有3个队列,分别是tailTasks
、taskQueue
和scheduledTaskQueue
。而且tailTasks
和taskQueue
队列在NioEventLoop
中的被优化为MultiProducerSingleConsumerQueue
。netty对 selector
中的selectedKeys
做了优化,从HashSet
替换为SelectedSelectionKeySet
,SelectedSelectionKeySet
是用数组实现的Set
,添加元素和遍历效率更高。
作者:王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。
来源公众号:种代码