剖析Netty之底层原理(三)lgli关注共 760字,需浏览 2分钟 ·2020-12-17 12:27 神秘北极圈 阿拉斯加的山巅谁的脸出现海角的天边忽然的瞬间在那遥远的地点我看见恋人幸福的光点灵魂在招唤唱着古老陌生熟悉的歌谣 天空 在微笑我的世界缤纷闪耀前面浑浑噩噩的把服务端的实现底层源码来了个大致,感觉头有点大,这里用图例的方式来理下思绪。一、EventLoopGroup bossGroup = new NioEventLoopGroup();这是服务端的第一句代码,类似的还new了一个workGroup,工作线程组这两句代码,Netty底层的各种初始化动作,前面也都说过了,这里做个总结:底层逐步初始化调用程序如下图:其类调用时序图如下所示这里,需要重点看下io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor,io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)方法源码如下:protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { //当线程数没有的时候,抛出异常,根据前面分析,这里的线程数为服务器内核数量*2 if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } //这里的Executor是null,所以初始化一个 if (executor == null) { //初始化一个类型是io.netty.util.concurrent.ThreadPerTaskExecutor的线程池 //线程池类型为io.netty.util.concurrent.DefaultThreadFactory executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 设置事件驱动数组,大小根据设定或者默认的来 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //设定每个事件驱动数组为io.netty.channel.nio.NioEventLoop //这里调用的newChild,下面会有详细解释 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { //如果添加失败,则关闭每一个io.netty.channel.nio.NioEventLoop children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } //初始化io.netty.util.concurrent.EventExecutorChooserFactory.EventExecutorChooser //其结果值为io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser#PowerOfTwoEventExecutorChooser //或者io.netty.util.concurrent.DefaultEventExecutorChooserFactory.GenericEventExecutorChooser#GenericEventExecutorChooser chooser = chooserFactory.newChooser(children); final FutureListener terminationListener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { // 循环为每一个事件驱动器实例添加监听 //这里的e.terminationFuture()实质调用 //io.netty.util.concurrent.SingleThreadEventExecutor#terminationFuture //因为前面我们说到每一个children数组的实例是io.netty.channel.nio.NioEventLoop //这里io.netty.channel.nio.NioEventLoop继承自 //io.netty.util.concurrent.SingleThreadEventExecutor#terminationFuture e.terminationFuture().addListener(terminationListener); } //这里将children复制出来一份给到readonlyChildren //Collections.unmodifiableSet返回的Set集合不可修改,即readonly Set childrenSet = new LinkedHashSet(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }如上所示为netty-all-4.1.50.final源码,其大致意义,在上面代码中也有添加注释,总结一下就是这里设定了事件驱动数组,并设定事件轮询线程组上面的给每一个EventExecutor数组赋值的代码这里可以发现io.netty.util.concurrent.MultithreadEventExecutorGroup类中,该方法是静态方法方法是由子类io.netty.channel.nio.NioEventLoopGroup调用而来,所以这里实质调用的是io.netty.channel.nio.NioEventLoopGroup#newChild源码如下:@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }这里的args数组,在根据以上配置的情况下,是3,第一个是选择器:args[0] = SelectorProvider.provider()即:数组第二个元素是默认选择器策略args[1] = DefaultSelectStrategyFactory.INSTANCE;即:第三个是默认异常args[2] = RejectedExecutionHandlers.reject();下面主要看下io.netty.channel.nio.NioEventLoopGroup#newChild方法该方法上面源代码已说明,返回了一个io.netty.channel.nio.NioEventLoop#NioEventLoop对象构造方法调用明细如下所示:io.netty.channel.nio.NioEventLoop 的构造方法,通过逐层super,为父级类参数赋值,同时创建了NioEventLoop的多路复用选择器这里可以看下Netty的多路复用选择器是如何构建的:源码如下:private SelectorTuple openSelector() { final Selector unwrappedSelector; try { //首先直接打开选择器 unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } if (DISABLE_KEY_SET_OPTIMIZATION) { //这里选择是否不需要优化多路复用选择器的key //默认是false,即默认需要优化 //可以通过properties文件中io.netty.noKeySetOptimization设定true或者false return new SelectorTuple(unwrappedSelector); } //AccessController 是Java安全策略机制的一种方式,这里表示的是, //不管是否设定安全策略,这里临时扩大访问权限,强制执行方法 //有需要了解更详细的,可以自行查阅资料 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() { @Override public Object run() { try { //尝试返回sun.nio.ch.SelectorImpl实例,可能会抛出异常 return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } //当尝试获取的sun.nio.ch.SelectorImpl实例是异常时候,则直接返回 return new SelectorTuple(unwrappedSelector); } final Class selectorImplClass = (Class) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction() { @Override public Object run() { try { //这里利用反射,获取到selectedKeys和publicSelectedKeys Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); //当java版本在9或者以上,并且sun.misc.Unsafe可以在classpath下找到 //这里如果java版本在1.9及其以上,一般来说,都是可以的 if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. //这里原始注释也比较好理解了:尝试用sun.misc.Unsafe替换SelectionKeySet long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } // We could not retrieve the offset, lets try reflection as last-resort. } Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } //给属性设置值 selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }上面代码,需要注意的地方:各种赋值,打开的多路复用器赋值到了io.netty.channel.nio.NioEventLoop.SelectorTuple属性中同时这个多路复用器的父级类sun.nio.ch.SelectorImpl属性被做出了优化,即针对多路复用选择器实现类sun.nio.ch.SelectorImpl进行了优化这里为何需要做这种优化呢?首先需要知道,原本的sun.nio.ch.SelectorImpl结构是什么样子的?如上图所示,原本的sun.nio.ch.SelectorImpl#selectedKeys和sun.nio.ch.SelectorImpl#publicSelectedKeys属性是一个HashSet,而HashSet底层其实也就是一个HashMap,所以,其数据结构主要是由链表组成,链表就是一个不连续的内存存储,方便插入和删除,但是不利于查询所以这里替换了一个io.netty.channel.nio.SelectedSelectionKeySet其构造方法为:即是一个数组,我们知道数组在内存是一个连续的过程,有一个连续的内存地址,数组不利于新增和删除,但是利于查询而这里sun.nio.ch.SelectorImpl#selectedKeys和sun.nio.ch.SelectorImpl#publicSelectedKeys主要存储的是和事件感兴趣的key值,即前面NIO中说到的可连接、可读、可写等状态:所以这里主要是在轮询的时候方便获取这种状态值,即快速查询,所以这里用数组替换链表最终返回了一个优化后的io.netty.channel.nio.NioEventLoop.SelectorTuple这里的io.netty.channel.nio.NioEventLoop.SelectorTuple也是较之前NIO中所得到的java.nio.channels.Selector多路复用器不同,看下它的结构:io.netty.channel.nio.NioEventLoop.SelectorTuple是io.netty.channel.nio.NioEventLoop的一个内部类,包含2个多路复用器,根据构造函数,这里返回的io.netty.channel.nio.NioEventLoop.SelectorTuple属性值分别是:io.netty.channel.nio.NioEventLoop.SelectorTuple#unwrappedSelector值为java.nio.channels.Selectorio.netty.channel.nio.NioEventLoop.SelectorTuple#selector值为io.netty.channel.nio.SelectedSelectionKeySetSelector#SelectedSelectionKeySetSelector为此,在经历这一系列的关联调用之后(具体源码逻辑请见《剖析Netty之底层原理(一)》),其主要源码类的主要参数变化,这里总结了一张图表,其中上图中,具有相似颜色的类,是同一个类,针对此,可以初步了解完EventLoopGroup bossGroup = new NioEventLoopGroup();和EventLoopGroup workerGroup = new NioEventLoopGroup();方法所执行的所有逻辑<两者逻辑一样>这里简单做出一个小结:初始化NioEventLoopGroup的结果,实质是构建了一个EventExecutor数组,其数组类型是NioEventLoop。由于篇幅已经够长了,所以,后续将接着分享,这里先简单告一段落。有喜欢的,欢迎点赞关注,谢谢麻烦给个再看呢! 浏览 47点赞 评论 收藏 分享 手机扫一扫分享分享 举报 评论图片表情视频评价全部评论推荐 『Go 语言底层原理剖析』文末送书CoderPark0Java并发之AQS原理剖析java12340Java并发之AQS原理剖析程序员的时光0分布式事务之底层原理揭秘服务端思维0AQS 原理剖析码农沉思录0AQS 原理剖析ytao0Node.js 底层原理全栈前端精选0Node.js 底层原理程序源代码0synchronized底层原理Java资料站0Node.js 底层原理前端Sharing0点赞 评论 收藏 分享 手机扫一扫分享分享 举报