Netty 线程模型

程序员考拉

共 31787字,需浏览 64分钟

 · 2021-06-18

公众号关注 “GitHub今日热榜
设为 “星标”,带你挖掘更多开发神器!










1.前言

 

Netty为什么会高效?回答就是良好的线程模型,和内存管理。在Java的NIO例子中就我将客户端的操作单独放在一个线程中处理了,这么做的原因在于如果将客户端连接串起来,后来的连接就要等前一个处理完,当然这并不意味着多线程比单线程有优势,而是在于每个客户端都需要进行读取准备好的缓存数据,再执行一些业务逻辑。如果业务逻辑耗时很久,那么顺序执行的方式没有多线程优势大。另一个方面目前多核CPU很常见了,多线程是个不错的选择。这些在第一节就说明过,也提到过NIO并不是提升了IO操作的速度,而是减少了CPU的浪费时间,这些概念不能搞混。


本节不涉及内存管理,只介绍相关的线程模型。


2.核心类


 

上图就是我们需要关注的体系内容了,主要从EventExecutorGroup开始往下看,再上层的父接口是JDK提供的并发包内的内容,基础是线程池中可以执行周期任务的线程池服务。所以从这我们可以知道Netty可以实现周期任务,比如心跳检测。接口定义下面将逐一介绍。


2.1 EventExecutorGroup


  

  • isShuttingDown():是否正在关闭,或者是已经关闭。

  • shutdownGracefully():优雅停机,等待所有执行中的任务执行完成,并不再接收新的任务。

  • terminationFuture():返回一个该线程池管理的所有线程都terminated的时候触发的future。

  • shutdown():废弃了的关闭方法,被shutdownGracefully取代。

  • next():返回一个被该Group管理的EventExecutor。

  • iterator():所有管理的EventExecutor的迭代器。

  • submit():提交一个线程任务。

  • schedule():周期执行一个任务。

 

上述方法基本上是对周期线程池的一个封装,但是扩展了EventExecuotr概念,即分了若干个小组,处理事件。另外一个比较实用的就是优雅停机了。


2.2 EventLoopGroup


 

EventLoopGroup中的方法很少,其主要是和channel结合了,就多了一个将channel注册到线程池中的方法。


2.3 EventExecutor


 

EventExecutor继承自EventExecutorGroup,这个之前也提到过该类,相当于Group中的一个子集。


  • next():就是找group中下一个子集

  • parent():就是所属group

  • inEventLoop():当前线程是否是在该子集中

  • newXXX():这个是下一节内容,此处不介绍。


2.4 EventLoop


该接口就一个方法,就是parent();EventLoop和EventLoopGroup与EventExecutor和EventExecutorGroup是一组相似的概念。了解这些就可以了。


3 实现细节

 

EventLoop和EventLoopGroup的实现十分简单,简单看下就可以了,这里介绍几个重要的实现类。


3.1 AbstractEventExecutor

 

该类继承自上节说过的AbstractExecutorService,其最重要的是execute方法未实现。该类是对AbstractExecutorService的一个进一步加工,添加了group的概念,和不同的Future创建方法。这里不要被之前的Java线程池模型所干扰,其不一定是线程池。回到上一节线程池的介绍,最终的样子都是Execute方法决定的。


3.2 AbstractScheduledEventExecutor

 

该类是对AbstractEventExecutor的一个进一步实现,其实现了周期任务的执行。原理是内部持有一个优先队列ScheduledFutureTask。所有周期任务都添加到这个队列中,也实现了取出周期任务的方法,但是该抽象类并没有具体执行周期任务的实现。


3.3 SingleThreadEventExecutor

 

该类是对AbstractScheduledEventExecutor的一个实现,其基本上是我们最终的一个EventLoop的雏形了,很多不同协议的EventLoop都是基于它实现的。


 

虽然名字叫做单线程执行器,但是其不一定是单个线程。Executor默认使用的是ThreadPerTaskExecutor,其executor会为每一个任务创建一个线程并执行,当然你也可以传入自己的executor。Queue使用的是LinkedBlockingQueue,无容量限制的任务队列。其提供了添加任务到任务队列,从任务队列中获取任务的方法。


protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;
 
    do {
        fetchedAll = fetchFromScheduledTaskQueue();
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
 
    if (ranAtLeastOne) {
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}
 
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
    Runnable task = pollTaskFrom(taskQueue);
    if (task == null) {
        return false;
    }
    for (;;) {
        safeExecute(task);
        task = pollTaskFrom(taskQueue);
        if (task == null) {
            return true;
        }
    }
}


执行过程如上:1.先获取所有的周期任务,放入taskQueue;2.不断的执行taskQueue中的任务;3.afterRunningAllTasks就是一个自由发挥的方法。safeExecute就是直接执行run方法。


private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }
 
            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }
 
                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                            SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                            "before run() implementation terminates.");
                }
 
                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.release();
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                    "An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                        }
 
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}


上面是该Executor初始化过程,run方法又是交给子类进行初始化了。


public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
    if (quietPeriod < 0) {
        throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
    }
    if (timeout < quietPeriod) {
        throw new IllegalArgumentException(
                "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }
 
    if (isShuttingDown()) {
        return terminationFuture();
    }
 
    boolean inEventLoop = inEventLoop();
    boolean wakeup;
    int oldState;
    for (;;) {
        if (isShuttingDown()) {
            return terminationFuture();
        }
        int newState;
        wakeup = true;
        oldState = state;
        if (inEventLoop) {
            newState = ST_SHUTTING_DOWN;
        } else {
            switch (oldState) {
                case ST_NOT_STARTED:
                case ST_STARTED:
                    newState = ST_SHUTTING_DOWN;
                    break;
                default:
                    newState = oldState;
                    wakeup = false;
            }
        }
        if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
            break;
        }
    }
    gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
    gracefulShutdownTimeout = unit.toNanos(timeout);
 
    if (oldState == ST_NOT_STARTED) {
        try {
            doStartThread();
        } catch (Throwable cause) {
            STATE_UPDATER.set(this, ST_TERMINATED);
            terminationFuture.tryFailure(cause);
 
            if (!(cause instanceof Exception)) {
                // Also rethrow as it may be an OOME for example
                PlatformDependent.throwException(cause);
            }
            return terminationFuture;
        }
    }
 
    if (wakeup) {
        wakeup(inEventLoop);
    }
 
    return terminationFuture();
}


上面是一个优雅停机的过程,改变该Executor的状态成ST_SHUTTING_DOWN,这里要注意addTask的时候只有shutdown状态才会拒绝,所以此时这里的逻辑还不会拒绝新任务添加,然后返回了一个terminationFuture,这里不做介绍。


3.4 SingleThreadEventLoop

 

此类继承自上面讲解的SingleThreadEventExecutor,这里多了一个tailTask队列,用于每次事件循环后置任务处理,暂且不管。重要的在于很早提到了register方法,将channel注册到线程中。


public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
 
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}


实际上就是生成了一个DefaultChannelPromise,将channel和线程绑定,最后都放入了unsafe对象中。


3.5 NioEventLoop


上面讲了一些杂乱无章的内容,这里借助NioEventLoop好好梳理一下整个设计流程。NioEventLoop继承自SingleThreadEventLoop,先对之前的相关研究进行总结:


1.EventExecutorGroup接口是继承自Java的周期任务接口,是一个事件处理器组的概念,其相关方法有:是否正在关闭;优雅停机;获取一个事件处理器;提交一个任务;提交一个周期任务

  

2.EventExecutor接口是事件处理器,其继承自EventExecutorGroup,目的并不是说每一个事件处理器都是一个事件处理器组,而是为了复用接口定义的方法。每个处理器都应该具备优雅停机,提交任务,判断是否关闭的方法,其它方法有:获取处理器组;获取下一个处理器(覆盖了父接口的next方法);判断是否在事件循环内;创建Promise、ProgressivePromise、SucceededFuture、FailedFuture

  

3.EventLoopGroup继承自EventExecutorGroup,更新了父接口的含义,EventExecutor 的定位是处理单个事件,group就是处理事件组了。EventLoop的定位是处理一个连接的生命周期过程中的周期事件,group是多个EventLoop的集合了。这里又有一个尴尬的地方,group按照定义本不需要定义其它方法,但是由于Server端的设计(之前说过服务端的channel也是一个线程),使用的是group,所以group必须承担单个EventLoop的职责。最终添加了额外的方法:获取下一个EventLoop;注册channel;

  

4.EventLoop,事件循环,其也是一个处理器,最终继承自EventExecutor和EventLoopGroup,方法只有一个:获取父事件循环组EventLoopGroup。

 

上述接口光看名称很容易陷入误解,实际上定义是想将单个loop和group分离,但是实现上由于Server端包含一个服务端监听连接线程,一个客户端连接线程,其Group承担了单个的职责,所以定义了一些本该由单个执行器处理的方法,又为了复用方法,导致loop继承了group,这样看起来怪怪的,接口理解起来就混乱了。结合上面的描述,再看一遍继承图就更清楚了:


 

理解了上面的设定,我们再来看看客户端的事件处理是如何设计的,即总结上诉抽象类做了哪些事情。


1. AbstractEventExecutor:入参就一个parent,该类完成了一个基本处理:


a.将next设置成自己(上面说过继承的group,这个操作就和group区分开了)。

b.优雅停机调用的是带有超时的停机方案,超时为15秒

c.覆盖了Java提供的newTask包装成FutureTask的方法,使用了自己的PromiseTask

d.提供安全执行方法:safeExecute,直接调用的run方法

   

该类是最基础的一个抽象类,基本作用就是与group在定义混乱上做了一个区分。提供了执行器与Future关联方法和一个基本的执行任务的方法。

  

2.AbstractScheduledEventExecutor:入参也是一个parent,该类对AbstractEventExecutor未处理的周期任务提供了具体的完成方法:

    

a.提供计算当前距服务启动的时间

b.提供存储ScheduledFutureTask的优先队列

c.提供了取消所有周期任务的方法

d.提供了获取一个符合的周期任务的方法,要满足时间,并获取后移除

e.提供了获取距最近一个周期任务的时间多久

f.提供了移除一个周期任务的方法

g.提供添加周期任务的方法

   

该类提供了周期任务执行的一些基本方法,涉及添加周期任务,移除,获取等方法。

  

3.SingleThreadEventExecutor:入参包括parent,addTaskWakesUp标志,maxPendingTasks最大任务队列数(16和io.netty.eventexecutor.maxPendingTasks(default Integer.MAX_VALUE)参数更大的那个值),executor执行器(默认是ThreadPerTaskExecutor,每个任务创建一个线程执行),taskQueue任务队列(默认是LinkedBlockingQueue),rejectedExecutionHandler拒绝任务的处理类(默认直接抛出RejectedExecutionException)。该类主要完成了一个单线程的EventExecutor的基本操作:


a.创建一个taskQueue

b.中断线程

c.从任务队列中获取一个任务,takeTask连同周期任务也会获取

d.添加任务到任务队列

e.移除任务队列中指定任务

f.运行所有任务,会先将周期任务存入taskQueue,再使用safeExecute方法执行任务

g.实现了execute方法,会添加任务到任务队列,如果当前线程不是事件循环线程,开启一个线程。通过的就是持有的executor来开启的线程任务。execute方法调用了run方法,该类没有实现run方法。任务的添加都不是通过execute直接执行了,而是走的添加任务到taskQueue,由未实现的run线程来处理这些事件。

h.优雅停机

   

这样就有了一个基础的单线程模型了,开启线程,保存,取出任务的方法都有了,只有在开启线程中执行任务的run()方法还未实现。

  

4.SingleThreadEventLoop:入参和SingleThreadEventExecutor一致,不同的是多了一个tailTasks。该类主要是针对netty自身的事件循环的定义来实现方法了:

    

a.注册channel,实际上是生成了一个DefaultChannelPromise对象,持有了channel,和运行该channel的EventExecutor,然后将该对象交给最底层的unsafe处理。

b.添加一个事件周期结束后执行的尾任务tailTasks

c.执行尾任务

d.删除指定尾任务

   

该类就很简单,没有过多的内容,只是增加了一个每个事件周期后执行的任务而已。

 

回顾完了,上面4个父类构建了一个基本的带定时任务,普通任务,事件循环后置任务的EventLoop,每个channel绑定了一个线程执行器,通过DefaultChannelPromise持有两者,最终交给Unsafe操作。子类只需要实现run方法,处理任务队列中的任务。下面就是重头戏NioEventLoop这个客户端的线程是如何设计的了:


NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}


调用的就是父类方法,不过在创建EventLoop的时候创建了selector,这个是NIO中也提到过的。该EventLoop是在Group中newChild创建的。


protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}


上面是我们需要关注的run方法,该方法被单独的线程执行。通过一个策略来判断执行哪个,默认的策略是任务队列中有任务,select执行一下,返回当前的事件数,没任务返回SelectStrategy.SELECT。简单的说就是有任务就补充新到来的事件执行所有的任务,没任务就执行新到的事件。先处理IO读写任务,再处理其他任务,ioRation设置的耗时比例是IO任务占一个执行周期的百分比,默认50,意思是IO执行了50秒,其他任务也会得到50秒的执行时间。后续操作就是获取所有select的key,执行所有的任务了。这里就有一个判断如果是停机状态,就会closeAll(),之前说优雅停机的时候就是设置了一个这个标志,最后是在执行任务之后判断。processSelectedKey环节都交给unsafe类完成了,这里就挂上了handler相关触发,handler的执行也就说明都在该线程内了。

 

上面的描述虽然把整个过程都关联上了,但是最主要的问题还是混乱的:如何做到一个channel创建一个线程的?上面只是说明了channel和EventExecutor是绑定在DefaultChannelPromise并交给了Unsafe类,并没有看到是如何创建线程的。而且另一个问题在于,processSelectedKey是选择了所有的key,这不是所有的channel共享了一个线程吗?

 

要解决该问题要回到Bootstrap的channel建立过程:initAndRegister()方法中,通过channelFactory创建了一个channel对象,而后ChannelFuture regFuture = config().group().register(channel);主要就是观察register方法,该类是设置的线程池NioEventLoopGroup提供的方法,其继承的是MultithreadEventLoopGroup,是调用了next方法获取的EventLoop,最后接上上面channel和eventloop绑定的内容。next中获取的EventLoop早在类初始化的时候就生成了,在构造方法中MultithreadEventExecutorGroup,children就是Eventloop,next不过是挑选了一个线程池而已,默认数量是CPU核数的2倍。这个也就是前面说的,线程数量不是越多越好。

 

这样我们明白了,客户端注册的时候是分配了一个线程给它。客户端并不需要多线程,但是还是继续看后面的内容:AbstractUnsafe的register方法给出了相关解答。channel持有了该EventLoop,此时线程还是未运行状态,只是有了这么一个对象而已。


if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
    try {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    } catch (Throwable t) {
        logger.warn(
                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                AbstractChannel.this, t);
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}


这里execute方法,这个是之前我们讲过的一个方法,其会判断当前线程是否是该channel的线程,目前都没有初始化线程肯定不是,将任务放入任务队列,开启一个线程(线程处于未启动状态才会开启,否则不执行),这个设计使得execute的时候只会开启一次线程,而所有的任务都会被放入任务队列,由这个线程执行。再回到run方法,这个是channel线程执行的方法,目前每个NioEventLoop都执行了Select等方法啊,这不是处理了所有的channel的工作吗?并没有达到一个channel生命周期控制在一个线程中啊。

 

这里实际上是JAVA NIO的例子带来的误解,认为必须一个线程来使用select,然后遍历事件分配线程给channel执行读写操作。实际上在Netty中不一样,Netty所有线程都在执行select并获取相关事件,但是实际上其并没有执行所有的事件。


private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            return;
        }
        
        if (eventLoop != this || eventLoop == null) {
            return;
        }
         
        unsafe.close(unsafe.voidPromise());
        return;
    }
 
    try {
        int readyOps = k.readyOps();
         
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
 
            unsafe.finishConnect();
        }
 
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
 
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }


看这里,if (eventLoop != this || eventLoop == null) ,如果channel的eventLoop不是当前的eventLoop就不执行,这个就一小段代码,但是就直接决定了EventLoop只对自己所绑定的channel感兴趣,最终达到了只处理自己相关的任务的目的。


3.6 NioEventLoopGroup


该类没有太多需要说明的内容,前面已经讲解了很多。


1.AbstractEventExecutorGroup,实现了基本的方法,所有方法都是调用next()即挑选出一个线程执行器完成的。

  

2.MultithreadEventExecutorGroup,实现了一个基本的线程池,持有子线程,主要工作是初始化了子线程数组,提供了next方法。

  

3.MultithreadEventLoopGroup,实现了Netty的channel线程池,提供了register方法,虽然也是调用next的register方法。

  

4.NioEventLoopGroup,实现了创建子线程数组时newChild方法,所有的EventLoop都是这个方法创建的。

 

group就是两个重点,一个next()挑选事件执行器,一个newChild()创建线程执行器对象。


4.总结


本节耗费了大量的篇幅讲解了Netty的线程模型的设计思路,主要看点如下:


1.解释了EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup这四者之间的关系,和这么复杂混乱的继承关系的原因。

2.解释了group是如何初始化线程,并绑定channel的,next().register()

3.解释了eventloop为什么和channel绑定了,execute()开启线程,以及每个eventloop都在获取IO事件,但是通过channel的eventloop是否等于当前过滤掉其它的事件,只处理自己绑定的channel事件。

 

由于每个线程都在获取IO事件,所以这段逻辑变的非常复杂,这也就是我之前说的写好很IO很困难。

 

最后附上一张对前面所有内容总结的一个图,清醒一下头脑,从复杂的代码中脱身:


 

这个图就是一个基本的执行过程图了,可能有遗漏的地方,但是大体情况如图所示。



出处:cnblogs.com/lighten/p/8967630.html










关注GitHub今日热榜,专注挖掘好用的开发工具,致力于分享优质高效的工具、资源、插件等,助力开发者成长!







点个在看,你最好看


浏览 32
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报