剖析Netty之底层原理(三)

共 760字,需浏览 2分钟

 ·

2020-12-17 12:27

神秘北极圈 阿拉斯加的山巅

谁的脸出现海角的天边

忽然的瞬间在那遥远的地点

我看见恋人幸福的光点

灵魂在招唤唱着古老

陌生熟悉的歌谣 天空 在微笑

我的世界缤纷闪耀





前面浑浑噩噩的把服务端的实现底层源码来了个大致,感觉头有点大,这里用图例的方式来理下思绪。


一、EventLoopGroup bossGroup = new NioEventLoopGroup();

这是服务端的第一句代码,类似的还new了一个workGroup,工作线程组

这两句代码,Netty底层的各种初始化动作,前面也都说过了,这里做个总结:


底层逐步初始化调用程序如下图:


5adb79a4b71e73b5ea37d50fb4a8bdfc.webp



其类调用时序图如下所示


f14e3a4c00879c9780e0e0cdf471e0ff.webp


这里,需要重点看下

io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup

(int, 

java.util.concurrent.Executor,

io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...

)方法

源码如下:


protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                            EventExecutorChooserFactory chooserFactory, Object... args) {                //当线程数没有的时候,抛出异常,根据前面分析,这里的线程数为服务器内核数量*2        if (nThreads <= 0) {            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));        }
        //这里的Executor是null,所以初始化一个 if (executor == null) { //初始化一个类型是io.netty.util.concurrent.ThreadPerTaskExecutor的线程池 //线程池类型为io.netty.util.concurrent.DefaultThreadFactory executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }
// 设置事件驱动数组,大小根据设定或者默认的来 children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) { boolean success = false; try {                //设定每个事件驱动数组为io.netty.channel.nio.NioEventLoop                //这里调用的newChild,下面会有详细解释 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { //如果添加失败,则关闭每一个io.netty.channel.nio.NioEventLoop children[j].shutdownGracefully(); }
for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } }
        //初始化io.netty.util.concurrent.EventExecutorChooserFactory.EventExecutorChooser        //其结果值为io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#PowerOfTwoEventExecutorChooser        //或者io.netty.util.concurrent.DefaultEventExecutorChooserFactory.GenericEventExecutorChooser#GenericEventExecutorChooser chooser = chooserFactory.newChooser(children);
final FutureListener terminationListener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } };
for (EventExecutor e: children) {            // 循环为每一个事件驱动器实例添加监听            //这里的e.terminationFuture()实质调用            //io.netty.util.concurrent.SingleThreadEventExecutor#terminationFuture            //因为前面我们说到每一个children数组的实例是io.netty.channel.nio.NioEventLoop            //这里io.netty.channel.nio.NioEventLoop继承自            //io.netty.util.concurrent.SingleThreadEventExecutor#terminationFuture e.terminationFuture().addListener(terminationListener); }
        //这里将children复制出来一份给到readonlyChildren         //Collections.unmodifiableSet返回的Set集合不可修改,即readonly Set childrenSet = new LinkedHashSet(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }


如上所示为netty-all-4.1.50.final源码,其大致意义,在上面代码中也有添加注释,总结一下就是这里设定了事件驱动数组,并设定事件轮询线程组


上面的给每一个EventExecutor数组赋值的代码

这里可以发现

io.netty.util.concurrent.MultithreadEventExecutorGroup

类中,该方法是静态方法


750a0ae623b7eabd1e890be9d64a0858.webp


方法是由子类

io.netty.channel.nio.NioEventLoopGroup

调用而来,所以这里实质调用的是

io.netty.channel.nio.NioEventLoopGroup#newChild


源码如下:

@Override    protected EventLoop newChild(Executor executor, Object... args) throws Exception {        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;        return new NioEventLoop(this, executor, (SelectorProvider) args[0],            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);    }


这里的args数组,在根据以上配置的情况下,是3,第一个是选择器:

args[0] = SelectorProvider.provider()

即:


843437ff57c4878655b29482d43cdf2a.webp


数组第二个元素是默认选择器策略

args[1] = DefaultSelectStrategyFactory.INSTANCE;

即:


aaaa9849c2232458ffc71ef51e6f11b8.webp


第三个是默认异常

args[2] = RejectedExecutionHandlers.reject();


661bc9d13b1cf1fbfbc583570cb8642a.webp


下面主要看下

io.netty.channel.nio.NioEventLoopGroup#newChild

方法


该方法上面源代码已说明,返回了一个

io.netty.channel.nio.NioEventLoop#NioEventLoop

对象


be551028ed9268c21dfecd51db69fc4c.webp


构造方法调用明细如下所示:


901297d656bd65f957e4b81ac2d9da1f.webp


io.netty.channel.nio.NioEventLoop 

的构造方法,通过逐层super,为父级类参数赋值,同时创建了

NioEventLoop的多路复用选择器


d277c7029df2ccce008c83b9a483e599.webp


这里可以看下Netty的多路复用选择器是如何构建的:

源码如下:


private SelectorTuple openSelector() {        final Selector unwrappedSelector;        try {            //首先直接打开选择器            unwrappedSelector = provider.openSelector();        } catch (IOException e) {            throw new ChannelException("failed to open a new selector", e);        }
if (DISABLE_KEY_SET_OPTIMIZATION) {            //这里选择是否不需要优化多路复用选择器的key            //默认是false,即默认需要优化            //可以通过properties文件中io.netty.noKeySetOptimization设定true或者false return new SelectorTuple(unwrappedSelector); }
        //AccessController 是Java安全策略机制的一种方式,这里表示的是,        //不管是否设定安全策略,这里临时扩大访问权限,强制执行方法              //有需要了解更详细的,可以自行查阅资料 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() { @Override public Object run() { try {                    //尝试返回sun.nio.ch.SelectorImpl实例,可能会抛出异常 return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } });
if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {            if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } //当尝试获取的sun.nio.ch.SelectorImpl实例是异常时候,则直接返回 return new SelectorTuple(unwrappedSelector); }
final Class selectorImplClass = (Class) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction() { @Override public Object run() { try { //这里利用反射,获取到selectedKeys和publicSelectedKeys Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");                    //当java版本在9或者以上,并且sun.misc.Unsafe可以在classpath下找到                    //这里如果java版本在1.9及其以上,一般来说,都是可以的 if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags.                        //这里原始注释也比较好理解了:尝试用sun.misc.Unsafe替换SelectionKeySet long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } // We could not retrieve the offset, lets try reflection as last-resort. }
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; }                    //给属性设置值 selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } });
if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }


上面代码,需要注意的地方:


各种赋值,打开的多路复用器


d8c05dc987504212189dcc053e49e05a.webp


赋值到了

io.netty.channel.nio.NioEventLoop.SelectorTuple

属性中


同时这个多路复用器的父级类

sun.nio.ch.SelectorImpl

属性被做出了优化,

即针对多路复用选择器实现类

sun.nio.ch.SelectorImpl

进行了优化

这里为何需要做这种优化呢?


首先需要知道,原本的sun.nio.ch.SelectorImpl

结构是什么样子的?


41c7efa515a33b9ff075da26d5da0249.webp


如上图所示,原本的

sun.nio.ch.SelectorImpl#selectedKeys

sun.nio.ch.SelectorImpl#publicSelectedKeys

属性是一个HashSet,而HashSet底层其实也就是一个HashMap,所以,其数据结构主要是由链表组成,链表就是一个不连续的内存存储,方便插入和删除,但是不利于查询

所以这里替换了一个

io.netty.channel.nio.SelectedSelectionKeySet

其构造方法为:


de5bfcc5e1b0a1dd4e181a14651928bf.webp


即是一个数组,我们知道数组在内存是一个连续的过程,有一个连续的内存地址,数组不利于新增和删除,但是利于查询



而这里

sun.nio.ch.SelectorImpl#selectedKeys

sun.nio.ch.SelectorImpl#publicSelectedKeys

主要存储的是和事件感兴趣的key值,即前面NIO中说到的可连接、可读、可写等状态:


eb4bf2610edb2f1d0d7f3ef317cc691a.webp


所以这里主要是在轮询的时候方便获取这种状态值,即快速查询,所以这里用数组替换链表


最终返回了一个优化后的

io.netty.channel.nio.NioEventLoop.SelectorTuple


这里的

io.netty.channel.nio.NioEventLoop.SelectorTuple

也是较之前NIO中所得到的

java.nio.channels.Selector

多路复用器不同,看下它的结构:


fda2054afb40bc2505ee1ace322e3e32.webp


io.netty.channel.nio.NioEventLoop.SelectorTuple

io.netty.channel.nio.NioEventLoop的一个内部类,包含2个多路复用器,根据构造函数,这里返回的

io.netty.channel.nio.NioEventLoop.SelectorTuple

属性值分别是:


io.netty.channel.nio.NioEventLoop.SelectorTuple#unwrappedSelector

值为

java.nio.channels.Selector


io.netty.channel.nio.NioEventLoop.SelectorTuple#selector

值为

io.netty.channel.nio.SelectedSelectionKeySetSelector#SelectedSelectionKeySetSelector


为此,在经历这一系列的关联调用之后(具体源码逻辑请见《剖析Netty之底层原理(一)》),其主要源码类的主要参数变化,这里总结了一张图表,


9760d7db49141e35c1377947e0faef14.webp

其中上图中,具有相似颜色的类,是同一个类,


针对此,可以初步了解完

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

方法所执行的所有逻辑<两者逻辑一样>


这里简单做出一个小结:


初始化NioEventLoopGroup的结果,实质是构建了一个EventExecutor数组,其数组类型是NioEventLoop。


由于篇幅已经够长了,所以,后续将接着分享,这里先简单告一段落。


有喜欢的,欢迎点赞关注,谢谢



麻烦给个再看呢!

浏览 41
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报