Reactor 高性能设计模式

泥瓦匠攻城狮

共 6623字,需浏览 14分钟

 ·

2022-08-14 15:46

Reactor 模式

Rreactor 模型是 I/O 多路复用的升级版,底层依赖于 Java NIO。不熟悉 Java NIO 技术的可以看下:《Java NIO:从 Buffer、Channel、Selector 到 Zero-copy、I/O 多路复用,一篇搞定!》。

Reactor是高性能网络编程中非常经典且重要的一个设计模式。在很多软件设计实现中应用很广泛,像Netty 的设计中很重要的组成部分就是对 Reactor 模型的一种实现;还有 Redis 的实现中也用到了 Reactor 模型,这也是 Redis 之所以底层单线程但是速度却非常快的原因之一。

这里提供两个非常官方且经典的参考资料:

Scalable IO in Java》由 java.util.concurrent 包的作者 Doug Lea 编写的一个关于 Reactor 模型的介绍;

reactor-siemens》 也是由国外作者编写的一篇研究  Reactor 模型的论文;

需要的小伙伴可以关注公众号,公众号内回复:Reactor 获取下载链接!

Reactor 模式介绍

什么是 Reactor 模式

Reactor 模式一般翻译成反应器模式,也有人称为分发者模式。是基于事件驱动的设计模式,拥有一个或多个并发输入源,有一个服务处理器和多个请求处理器,服务处理器会同步地将输入的请求事件以多路复用的方式分发给相应的请求处理器。简单来说就是 由一个线程来接收所有的请求,然后派发这些请求到相关的工作线程中。

为什么使用 Reactor 模式

java 中,没有 NIO 出现之前都是使用 Socket 编程。Socket 接收请求是阻塞的,需要处理完一个请求才能处理下一个请求,所以在面对高并发的服务请求时,性能就会很差。

那有人就会说使用多线程(如下图所示)。接收到一个请求,就创建一个线程处理,这样就不会阻塞了。实际上这样的确是可以在提升性能上起到一定的作用,但是当请求很多的时候,就会创建大量的线程,维护线程需要资源的消耗,线程之间的切换也需要消耗性能。而且系统创建线程的数量也是有限的,所以当高并发时,会直接把系统拖垮。

6ecb081aa3b7e22408262674075d227a.webp

因此,基于 JavaDoug Lea 提出了三种形式的 Reactor 模式:单 Reactor 单线程、单 Reactor 多线程和多 Reactor 多线程。

Reactor 模式中有三个重要的角色:

  • Reactor:负责响应事件,将事件分发到绑定了对应事件的 Handler,如果是连接事件,则分发到 Acceptor
  • Handler:事件处理器。负责执行对应事件对应的业务逻辑;
  • Acceptor:绑定了 connect 事件,当客户端发起 connect 请求时,Reactor 会将 accept 事件分发给 Acceptor 处理;

单 Reactor 单线程版本

2f70d1e5484265c9e59e954f30acf1d0.webp单Reactor单线程

只有一个 Selector 循环接受请求,客户端注册进来由 Reactor 接收注册事件,然后再由 Reactor 分发出去,由对应的 Handler 进行业务逻辑处理。

伪代码实例

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;
    
    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }
    
    
    public void run() {
        try {
            while(!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while(it.hasNext()) {
                    dispatch((SelectionKey)(it.next()))
                }
                selected.clear();
            }
        }catch(IOException e){
            
        }
    }
    
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment());
        if (r != null){
            r.run();
        }
    }
    
    class Acceptor implements Runnable {
        public void run() {
            try{
                SocketChannel c =  serverSocket.accept();
                if (c != null){
                    new Handler(selector, c);
                }
            }catch(IOException e) {
                
            }
        }
    }

}



final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;
    
    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // optionally try first read now
        sk = socket.register(sel,0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        /**
         * selector.wakeup(); 唤醒阻塞在select方法上的线程,使其立即返回
         */

        sel.wakeup();
    }
    
    boolean inputIsComplete(){/*……*/}
    boolean outputIsComplete(){/*……*/}
    void process(){/*……*/}
        
    public void run() {
        try{
            if (state == READING){
                read();
            }else if(state == SENDING){
                send();
            }
        }catch(IOException e){
            
        }
    }
    
    void read() throws IOException{
        socket.read(input);
        if(inputIsComplete()){
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    } 
    
    void send() throws IOException{
        socket.write(output);
        if(outputIsComplete()){
            sk.cacel();
        }
    }
}

这里需要注意的两点是

  • Selector.wakeup() 方法的作用是:唤醒阻塞在select方法上的线程,使其立即返回。
  • Reactor.dispatch() 方法中,调用的是任务的 run 方法,同步执行。

单线程的问题实际上是很明显的。只要其中一个 Handler 方法阻塞了,那就会导致所有的 clientHandler 都被阻塞了,也会导致注册事件也无法处理,无法接收新的请求。所以这种模式用的比较少,因为不能充分利用到多核的资源。因此,这种模式仅仅只能处理 Handler 比较快速完成的场景。

单 Reactor 多线程版本

11a6d51ced5f2a58530e20c7b6ef4a7a.webp单Reactor多线程

在多线程 Reactor 中,注册接收事件都是由 Reactor 来做,其它的计算,编解码由一个线程池来做。从图中可以看出工作线程是多线程的,监听注册事件的 Reactor 还是单线程。

伪代码示例

public class Handler implements Runnable{

    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(Integer.MAX_VALUE);
    ByteBuffer output = ByteBuffer.allocate(Integer.MAX_VALUE);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    static ExecutorService pool = Executors.newCachedThreadPool();

    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // optionally try first read now
        sk = socket.register(sel,0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete(){return true;}
    boolean outputIsComplete(){return true;}
    void process(){}

    public void run() {
        try{
            if (state == READING){
                read();
            }else if(state == SENDING){
                send();
            }
        }catch(IOException e){

        }
    }

    void send() throws IOException {
        socket.write(output);
        if(outputIsComplete()){
            sk.cancel();
        }
    }

    synchronized void read()  throws IOException{
        socket.read(input);
        if(inputIsComplete()){
            pool.execute(new Processer());
        }
    }

    synchronized void processAndHandOff() {
        process();
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_WRITE);
    }

    class Processer implements Runnable {
        @Override
        public void run() {
            processAndHandOff();
        }
    }
}

对于 Reactor 部分,代码不需要调整,因为也是单 ReactorHandler 部分增加了线程池的支持。

对比单 Reactor 单线程模型,多线程 Reactor 模式在 Handler 读写处理时,交给工作线程池处理,可以充分利用多核cpu的处理能力,因为 Reactor 分发和 Handler 处理是分开的,不会导致 Reactor 无法执行。从而提升应用的性能。缺点是 Reactor 只在主线程中运行,承担所有事件的监听和响应,如果短时间的高并发场景下,依然会造成性能瓶颈。

多 Reactor 多线程版本

3980bd7ae29d019d64a602fc864873ea.webp多Reactor多线程

也称为主从 Reactor 模式,在这种模式下,一般会有两个 ReactormainReactorsubReactormainReactor 负责监听客户端请求,专门处理新连接的建立,再将建立好的连接注册到 subReactorsubReactor 将分配的连接加入到队列进行监听,当有新的事件发生时,会调用连接相对应的 Handler 进行业务处理。

这样的模型使得每个模块更加专一,耦合度更低,能支持更高的并发量。许多框架也使用这种模式。

Reactor 模式的优点

  1. 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的。
  2. 可以最大程度地避免复杂的多线程及同步问题,并且避免多线程/进程的切换开销。
  3. 扩展性好,可以方便地通过增加 Reactor 实例个数来充分利用 CPU 资源。
  4. 复用性好,Reactor 模式本身与具体事件处理逻辑无关,具有很高的复用性。

ef7519160d67cceac0206496128ceda8.webp

记得转发在看关注哦!d8bbe266c6d4cbdc8763515b73e973dd.webp
浏览 38
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报