03. Apache thrift 之网络模型

java技术探秘

共 52594字,需浏览 106分钟

 ·

2021-07-03 20:44

本文我们来分析thrift的网络传输模式。

客户端

Thrift 客户端常用的传输层有以下几种:

  • TSocket:使用阻塞式I/O进行传输,是最常见的模式

  • TNonblockingSocket:使用非阻塞方式,用于构建异步客户端

  • TFramedTransport:使用非阻塞方式,按块的大小进行传输,类似于Java中的NIO

接下来我们从源码的角度来分析这几种传输层的实现。

阻塞式IO:TSocket

TSocket使用了阻塞式I/O进行传输,代码实现如下:

public class TSocket extends TIOStreamTransport {

  private Socket socket_;
  
  /**
   * 构造方法
   */

  public TSocket(String host, int port) throws TTransportException {
    this(new TConfiguration(), host, port, 0);
  }
  
  /**
   * 构造方法
   */

  public TSocket(TConfiguration config, String host, int port, int socketTimeout, 
                 int connectTimeout)
 throws TTransportException 
{
    super(config);
    host_ = host;
    port_ = port;
    socketTimeout_ = socketTimeout;
    connectTimeout_ = connectTimeout;
    initSocket();
  }

  /**
   * 初始化方法
   */

  private void initSocket() {
    // 创建 Socket 对象
    socket_ = new Socket();
    try {
      // 对 socket 进行一些配置
      socket_.setSoLinger(false0);
      socket_.setTcpNoDelay(true);
      socket_.setKeepAlive(true);
      socket_.setSoTimeout(socketTimeout_);
    } catch (SocketException sx) {
      LOGGER.error("Could not configure socket.", sx);
    }
  }

  /**
   * 进行连接
   */

  public void open() throws TTransportException {
    ...

    try {
      // 连接操作
      socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
      inputStream_ = new BufferedInputStream(socket_.getInputStream());
      outputStream_ = new BufferedOutputStream(socket_.getOutputStream());
    } catch (IOException iox) {
      close();
      throw new TTransportException(TTransportException.NOT_OPEN, iox);
    }
  }

  ...
}

这个类中使用的是标准的BIO操作,使用的是jdk提供的Socket类,连接服务端时使用的是java.net.Socket#connect(java.net.SocketAddress, int)方法。

非阻塞式IO:TNonblockingSocket

TNonblockingSocket 使用的是非阻塞IO方式,代码如下:

public class TNonblockingSocket extends TNonblockingTransport {

  private final SocketAddress socketAddress_;

  private final SocketChannel socketChannel_;
  
  /**
   * 构造方法,打开 SocketChannel
   */

  public TNonblockingSocket(String host, int port, int timeout) 
        throws IOException, TTransportException 
{
    // SocketChannel.open():开启 nio 连接
    this(SocketChannel.open(), timeout, new InetSocketAddress(host, port));
  }

  /**
   * 构造方法,处理 SocketChannel 的配置
   */

  private TNonblockingSocket(TConfiguration config, SocketChannel socketChannel, 
        int timeout, SocketAddress socketAddress)
 throws IOException, TTransportException 
{
    super(config);
    socketChannel_ = socketChannel;
    socketAddress_ = socketAddress;

    // 配置为非阻塞
    socketChannel.configureBlocking(false);

    // 获取 socket,配置一些参数
    Socket socket = socketChannel.socket();
    socket.setSoLinger(false0);
    socket.setTcpNoDelay(true);
    socket.setKeepAlive(true);
    setTimeout(timeout);
  }

  /**
   * 开启,非阻塞io没有实现
   */

  public void open() throws TTransportException {
    throw new RuntimeException("open() is not implemented for TNonblockingSocket");
  }
  
  /**
   * 进行连接
   */

  public boolean startConnect() throws IOException {
    return socketChannel_.connect(socketAddress_);
  }

  ...
}

这个类中使用的是标准的NIO操作,使用的是jdk提供的SocketChannel类,开启SocketChannel使用的是SocketChannel.open()方法,连接服务端使用的是java.nio.channels.SocketChannel#connect方法。

缓冲TTransportTFramedTransport

TFramedTransport 的注释如下:

TFramedTransport is a buffered TTransport that ensures a fully read message every time by preceding messages with a 4-byte frame size.

TFramedTransport 是一个缓冲的 TTransport,通过在前面带有4字节帧大小的消息来确保每次完全读取消息。

注释表明,它是一 个缓冲的TTransport,我们再来看下它的代码实现:

public class TFramedTransport extends TLayeredTransport {
  ...

  /**
   * 构造方法
   * 需要传入 TTransport
   */

  public TFramedTransport(TTransport transport) throws TTransportException {
    this(transport, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
  }

  
  /**
   * 构造方法
   * 需要传入 transport 与 maxLength
   */

  public TFramedTransport(TTransport transport, int maxLength) throws TTransportException {
    super(transport);
    TConfiguration _configuration = Objects.isNull(transport.getConfiguration()) 
        ? new TConfiguration() : transport.getConfiguration();
    // 配置最大长度
    _configuration.setMaxFrameSize(maxLength);
    writeBuffer_.write(sizeFiller_, 04);
    readBuffer_= new TMemoryInputTransport(_configuration, new byte[0]);
  }

  public void open() throws TTransportException {
    // getInnerTransport() 得到的就是传入的 transport
    getInnerTransport().open();
  }
  ...
}

从代码来看,TFramedTransport 就是个包装类,构建方法中需要传入具体的TTransport,具体的IO操作由传入的TTransport来实现,从注释来看,主要作用就是为TTransport提供缓冲功能,使用时也是配合其他TTransport使用,如配合TNonblockingSocket使用:

// io 操作由 TNonblockingSocket 实现
TNonblockingSocket tSocket = new TNonblockingSocket(host, port);
TTransport transport = new TFramedTransport(tSocket);

服务端

Thrift 服务端支持的 server 类型如下:

  • TSimpleServer:单线程服务器端,使用标准的阻塞式I/O
  • TThreadPoolServer:多线程服务器端,使用标准的阻塞式I/O
  • TNonblockingServer:单线程服务器端,使用非阻塞式I/O
  • THsHaServer:半同步半异步服务器端,基于非阻塞式IO读写和多线程工作任务处理
  • TThreadedSelectorServer:多线程选择器服务器端,对THsHaServer在异步IO模型上进行增强

单线程阻塞serverTSimpleServer

单线程服务器端,使用标准的阻塞式I/O,使用方法如下:

TServerTransport transport = new TServerSocket(port);
TServer server = new TSimpleServer(new TServer.Args(transport).processor(processor));

在上一篇文章中,我们就是以该server类型为实例进行分析的,这里就不再分析了。

我们来总结下这种IO模型的优缺点:

  • 优点:实现简单
  • 缺点:
    • 单线程,无法发挥服务器多核优势
    • 由于使用的是阻塞IO,一个连接占用一个线程,因此只能处理一个连接,多于1个连接的情况下,其余的连接会等待

从上面的优缺点来看,单线程阻塞模型除了实现简单外,其他的都是缺点,在生产中一般不会用。

多线程阻塞serverTThreadPoolServer

多线程服务器端,使用标准的阻塞式I/O,使用方法如下:

// 生成 TServerSocket
TServerTransport transport = new TServerSocket(port);
// 生成 TServer
TServer server = new TThreadPoolServer(
  new TThreadPoolServer.Args(transport).processor(processor));
server.serve();

在使用TThreadPoolServer时 ,需要传入Socket,这里使用的是TServerSocket,这里我们分别来分析这两个类。

TServerSocket

TServerSocket 的创建过程如下:

public class TServerSocket extends TServerTransport {

  /**
   * 构造方法,传入 port
   */

  public TServerSocket(int port) throws TTransportException {
    this(port, 0);
  }

  /**
   * 构造方法,传入 bindAddr 与 port
   */

  public TServerSocket(InetSocketAddress bindAddr, int clientTimeout) 
        throws TTransportException 
{
    this(new ServerSocketTransportArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
  }

  /**
   * 构造方法
   * args 里封装的就是 port、clientTimeout 
   */

  public TServerSocket(ServerSocketTransportArgs args) 
        throws TTransportException 
{
    clientTimeout_ = args.clientTimeout;
    if (args.serverSocket != null) {
      this.serverSocket_ = args.serverSocket;
      return;
    }
    try {
      // ServerSocket:阻塞 io
      serverSocket_ = new ServerSocket();
      serverSocket_.setReuseAddress(true);
      // 绑定地址与端口
      serverSocket_.bind(args.bindAddr, args.backlog);
    } catch (IOException ioe) {
      close();
      throw new TTransportException("Could not create ServerSocket on address " 
            + args.bindAddr.toString() + ".", ioe);
    }
  }

从代码上来看,TServerSocket 中使用的是 ServerSocket,这是个阻塞IO。阻塞IO中,服务端使用的是ServerSocket,客户端使用的Socket,两者一般配套使用,即服务端使用ServerSocket开启端口监听,客户端使用的Socket创建socket连接。

TThreadPoolServer

TThreadPoolServer 的创建过程如下:

public class TThreadPoolServer extends TServer {
  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class);

  public static class Args extends AbstractServerArgs<Args{
    public int minWorkerThreads = 5;
    public int maxWorkerThreads = Integer.MAX_VALUE;
    public ExecutorService executorService;
    ...
  }

  public void serve() {
    if (!preServe()) {
      return;
    }
    // 执行操作
    execute();
    ...
  }

  /**
   * 具体的执行操作,在这里会打开连接
   */

  protected void execute() {
    while (!stopped_) {
      try {
        // accept() 会阻塞,等待连接的到来
        TTransport client = serverTransport_.accept();
        try {
          // 连接来了,就丢到线程池中执行
          executorService_.execute(new WorkerProcess(client));
        } catch (RejectedExecutionException ree) {
          if (!stopped_) {
            LOGGER.warn("...");
          }
          client.close();
        }
      } catch (TTransportException ttx) {
        if (!stopped_) {
          LOGGER.warn("Transport error occurred during acceptance of message", ttx);
        }
      }
    }
  }
}

与单线程阻塞server不同的是,TThreadPoolServer会持有一个工作线程池,当连接创建后,就会把连接交给线程池处理。

处理流程示意图如下:

这里的handler就是线程池中的一个个线程。

这部分的最后,我们也来总结下这种模型的优缺点:

  • 优点:相比于单线程处理,使用多线程后,大大提高了连接处理数
  • 缺点:由于阻塞IO一个连接占用一个线程,因此连接数受限于线程池的数量,无法处理海量连接

虽然该模型无法处理海量连接,但在没有NIO的年代,这种模型是最主要的处理网络请求的方式,不过现在基本用NIO了。

单线程非阻塞serverTNonblockingServer

上面介绍的两种网络模型都是基于阻塞IO的,接下来我们介绍非阻塞IO。

最简单的非阻塞IO模型:单线程非阻塞,使用方式如下:

// 指定socket:这里使用的就是非阻塞socket了
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
// 创建服务端
TServer server = new TNonblockingServer(
  new TNonblockingServer.Args(transport).processor(processor));
server.serve();

以上代码中,使用到了两个类:

  • TNonblockingServerSocket:指定socket类型
  • TNonblockingServer:指定server类型

接下来我们就来分析这两个类的具体实现。

TNonblockingServerSocket

TNonblockingServerSocket的构造方法如下:

TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args) 
        throws TTransportException {
    clientTimeout_ = args.clientTimeout;
    try {
      serverSocketChannel = ServerSocketChannel.open();
      // 配置为非阻塞
      serverSocketChannel.configureBlocking(false);

      // serverSocketChannel:非阻塞io
      serverSocket_ = serverSocketChannel.socket();
      serverSocket_.setReuseAddress(true);
      // 绑定地址与端口
      serverSocket_.bind(args.bindAddr, args.backlog);
    } catch (IOException ioe) {
      serverSocket_ = null;
      throw new TTransportException("Could not create ServerSocket on address " 
            + args.bindAddr.toString() + ".", ioe);
    }
}

从上述代码中可以看到,在TNonblockingServerSocket中使用的是ServerSocketChannel,这是非阻塞IO提供的类,作用相当于阻塞IO中的ServerSocket.

TNonblockingServer.Args

TNonblockingServer的构造方法里需要传入AbstractNonblockingServerArgs参数,进入TNonblockingServer.Args,它现它仅是继承了AbstractNonblockingServerArgs

public static class Args extends AbstractNonblockingServerArgs<Args{
  public Args(TNonblockingServerTransport transport) {
    super(transport);
  }
}

继续进入AbstractNonblockingServerArgs

public static abstract class AbstractNonblockingServerArgs
    <T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T
{
  public long maxReadBufferBytes = 256 * 1024 * 1024;

  public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
    super(transport);
    // 设置 transportFactory
    transportFactory(new TFramedTransport.Factory());
  }
}

在这个方法里,调用transportFactory(...)设置了transport,这里使用的transpostTFramedTransport,这点有别于阻塞IO.

TNonblockingServer

接下来我们来看看TNonblockingServer,构造方法如下:

public class TNonblockingServer extends AbstractNonblockingServer {

  ...

  private SelectAcceptThread selectAcceptThread_;

  /**
   * 构造方法
   */

  public TNonblockingServer(AbstractNonblockingServerArgs args) {
    super(args);
  }
  ...
}

只是调用了父类的构造方法,追踪下去后,发现所做的工作也很简单,只是把传入的TNonblockingServerSocket做了一个赋值操作,这里就不多分析了。

接下来我们来看看TNonblockingServerserve()方法,进入AbstractNonblockingServer#serve

public void serve() {
    // 启动处理线程
    if (!startThreads()) {
      return;
    }

    // 启动监听,其实只是设置了`soTimeout`属性值
    if (!startListening()) {
      return;
    }
  // 设置服务标识
    setServing(true);

    // 上面启动的处理线程join到主线程中,就只是执行了 thread.join() 方法
    // 当前线程会在这里阻塞
    waitForShutdown();
  
    // 设置服务标识
    setServing(false);

    // 关闭监听,执行的是 serverTransport.close()操作,即关闭serverSocketChannel
    stopListening();
}

AbstractNonblockingServer#serve就是整个连接处理流程了,关键部分已作了详细的注释。

从代码上来看,整个过程共两个线程:

  • 主线程:启动连接服务后,会阻塞直到关闭
  • 连接处理线程:处理连接请求

这里我们主要关注连接处理线程,启动该线程的方法为TNonblockingServer#startThreads,代码如下:

protected boolean startThreads() {
    // start the selector
    try {
      selectAcceptThread_ = new SelectAcceptThread(
        (TNonblockingServerTransport)serverTransport_);
      // 启动线程
      selectAcceptThread_.start();
      return true;
    } catch (IOException e) {
      LOGGER.error("Failed to start selector thread!", e);
      return false;
    }
}

这里又出现了一个类:SelectAcceptThread,从代码的执行来看,它显然是ThreadRunnable的实现,我们进入它的构造方法:

public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
        throws IOException 
{
    this.serverTransport = serverTransport;
    // 注册操作selector
    serverTransport.registerSelector(selector);
}

以上代码的关键在于注册selector的操作,我们进入TNonblockingServerSocket#registerSelector一探究竟:

  public void registerSelector(Selector selector) {
    try {
      // 将 selector 注册到 serverSocketChannel,监听的事件是连接事件(OP_ACCEPT)
      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    } catch (ClosedChannelException e) {
      // this shouldn't happen, ideally...
      // TODO: decide what to do with this.
    }
  }

其实这就是往serverSocketChannel中注册selector,关心的事件是连接事件(OP_ACCEPT),也就是说, 当新连接过来时,该selector会被唤醒,这一块是NIO网络编程的老套路了,这里就不多做解释了。

上述出现了selector,那么这个selector是从何而来呢?

经过本人的苦苦寻觅,在SelectAcceptThread的父类AbstractSelectThread中找到了selector的来源(AbstractNonblockingServer.AbstractSelectThread):

  protected abstract class AbstractSelectThread extends Thread {
    protected Selector selector;

    // List of FrameBuffers that want to change their selection interests.
    protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();

    public AbstractSelectThread() throws IOException {
      // 打开一个 selector 对象
      this.selector = SelectorProvider.provider().openSelector();
    }

    ...
  }

selector 是由 SelectorProvider.provider().openSelector()获取的,这也是jdk NIO网络编程提供的功能,这里就不多分析了。值得一提的是,selectorSelectAcceptThread的成员变量,即selector只为当前线程持有。

接着我们把目光聚集于SelectAcceptThread对象,进入其run()方法,查看该线程所做的工作:

// TNonblockingServer.SelectAcceptThread#run
public void run() {
  try {
    // 处理 eventHandler
    if (eventHandler_ != null) {
      eventHandler_.preServe();
    }

    while (!stopped_) {
      // select 操作
      select();
      // 处理事件变更
      processInterestChanges();
    }
    for (SelectionKey selectionKey : selector.keys()) {
      // 清除 selectionKey
      cleanupSelectionKey(selectionKey);
    }
  } catch (Throwable t) {
    LOGGER.error("run() exiting due to uncaught error", t);
  } finally {
    try {
      selector.close();
    } catch (IOException e) {
      LOGGER.error("Got an IOException while closing selector!", e);
    }
    stopped_ = true;
  }
}

TNonblockingServer.SelectAcceptThread#run方法的重点地方都已做了注释,这里我们仅关注主要操作,进 入 TNonblockingServer.SelectAcceptThread#select方法:

private void select() {
  try {
    // select 操作,这里会阻塞
    selector.select();

    // 获取 selectionKey
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // 跳过无效的key
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }

      // 处理具体的key,如:连接事件、读事件、写事件
      if (key.isAcceptable()) {
        handleAccept();
      } else if (key.isReadable()) {
        handleRead(key);
      } else if (key.isWritable()) {
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }

我们进入SelectAcceptThread#handleAccept方法,来看看连接事件是如何处理的:

private void handleAccept() throws IOException {
  SelectionKey clientKey = null;
  TNonblockingTransport client = null;
  try {
    // 得到连接
    client = serverTransport.accept();
    // 得到连接后,再将该连接注册到 selector 上,这次监听的事件是 读事件(OP_READ)
    clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
    // 得到 frameBuffer
    FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);
    clientKey.attach(frameBuffer);
  } catch (TTransportException tte) {
    ...
  }
}

从以上代码来看,连接处理步骤如下:

  • 获取连接
  • 将连接注册到selector,监听的事件是读事件(OP_READ)
  • 获取连接数据 frameBuffer,并放入到SelectionKey

这几个操作也是标准的NIO操作了,注册到selector后,接下来就是处理selector的读事件了,我们进入方法 AbstractNonblockingServer.AbstractSelectThread#handleRead

protected void handleRead(SelectionKey key) {
  // 获取 SelectionKey 的数据
  FrameBuffer buffer = (FrameBuffer) key.attachment();
  // 进行读数据操作
  if (!buffer.read()) {
    cleanupSelectionKey(key);
    return;
  }

  // 数据已读完,调用处理方法
  if (buffer.isFrameFullyRead()) {
    // 调用处理方法
    if (!requestInvoke(buffer)) {
      cleanupSelectionKey(key);
    }
  }
}

最终调用AbstractNonblockingServer.FrameBuffer#invoke方法来处理读事件:

    public void invoke() {
      frameTrans_.reset(buffer_.array());
      response_.reset();

      try {
        // 执行 eventHandler 的方法
        if (eventHandler_ != null) {
          eventHandler_.processContext(context_, inTrans_, outTrans_);
        }
        // 执行 Processor,这里最终会调用具体的服务端实现
        processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
        responseReady();
        return;
      } catch (TException te) {
        LOGGER.warn("Exception while invoking!", te);
      } catch (Throwable t) {
        LOGGER.error("Unexpected throwable while invoking!", t);
      }
      // This will only be reached when there is a throwable.
      state_ = FrameBufferState.AWAITING_CLOSE;
      requestSelectInterestChange();
    }

AbstractNonblockingServer.FrameBuffer#invoke方法就是实际的处理类了,processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_)最终执行的就是我们自己实现类的方法,如HelloServiceImpl#hello

处理流程示意图如下:

与阻塞IO相比,使用了selector后,线程大部分时间是处于select状态,只有在selector上发生读/写/连接事件时,线程才会处理业务操作。

尽管SelectAcceptThread并不会被某一个连接独占,但处理读/写/连接事件时,只有一个线程在进行,如果线程正好在处理连接A的读事件,此时连接B再发生了读事件,那么线程就无法立即处理连接B的读事件了,只有处理完连接A的读事件后线程才会继续处理连接B的读事件。

由此可见,非阻塞IO下虽然可以同时处理多个连接请求,但由于使用的是单线程,请求处理时可能并不会很及时。

本节的最后,我们来总结下TNonblockingServer的特点:

  • 「主线程」启动服务并阻塞,等待服务停止
  • SelectAcceptThread线程」调用selector.select()获取读/写/连接事件并处理,处理时有可能会阻塞select操作

半同步半异步serverTHsHaServer

上一节我们分析到,在单线程非阻塞模型下,由于使用的是单线程,当线程正在处理业务操作时,其他请求处理时可能并不会很及时。为了提高响应速度,thrift在单线程非阻塞模型上引入了线程池操作,这就是THsHaServer,处理流程如下:

从整个操作来看,就是把TNonblockingServerSelectAcceptThread放在了线程池中执行,接下来我们就来具体分析这块的操作。

THsHaServer 的使用方式如下:

// 创建 ServerSocket,这块与 TNonblockingServer 一致
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
// 创建 THsHaServer 
TServer server = new THsHaServer(new THsHaServer.Args(transport).processor(processor));
server.serve();

这部分的操作与TNonblockingServer很相似:

  • 创建了TNonblockingServerSocket对象
  • 创建THsHaServer 对象
  • 调用THsHaServer#serve方法提供对外服务

关于TNonblockingServerSocket对象的操作在上一节已经分析过了,这里我们直接分析THsHaServer

THsHaServer 构造方法

THsHaServer 构造方法如下:

/**
 * 继承了 TNonblockingServer
 */

public class THsHaServer extends TNonblockingServer {

  ...

  /**
   * 构造方法
   */

  public THsHaServer(Args args) {
    super(args);

    // 创建线程池
    invoker = args.executorService == null 
      ? createInvokerPool(args) : args.executorService;
    this.args = args;
  }


}

从代码上来看,THsHaServer继承了TNonblockingServer,这就继承了TNonblockingServer中的大多数特性。在其构造方法中,先是调用了父类的构造方法,然后创建了一个线程池,这里我们来看看线程池的创建操作:

  // 线程池  
  private ExecutorService executorService = null;

  // 创建线程池
  protected static ExecutorService createInvokerPool(Args options) {
    // 获取参数
    int minWorkerThreads = options.minWorkerThreads;
    int maxWorkerThreads = options.maxWorkerThreads;
    int stopTimeoutVal = options.stopTimeoutVal;
    TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
    // 使用的是 ThreadPoolExecutor,jdk提供的线程池
    ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
      maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);

    return invoker;
  }

THsHaServer中使用的线程池是通过ThreadPoolExecutor直接创建的。

THsHaServer#serve方法

THsHaServer并没有实现serve()方法,它的serve()方法继承自TNonblockingServer,这块在上一节已经分析过了,这里就不再分析了。

requestInvoke(...)方法的调用

在分析TNonblockingServer时,我们看到请求是在TNonblockingServer#requestInvoke方法中处理的,THsHaServer对该方法进行了重写,代码如下:

  protected boolean requestInvoke(FrameBuffer frameBuffer) {
    try {
      // 获取 invocation
      Runnable invocation = getRunnable(frameBuffer);
      // 放入线程池中执行
      invoker.execute(invocation);
      return true;
    } catch (RejectedExecutionException rx) {
      LOGGER.warn("ExecutorService rejected execution!", rx);
      return false;
    }
  }

  // 获取 Invocation
  protected Runnable getRunnable(FrameBuffer frameBuffer){
    return new Invocation(frameBuffer);
  }

  ...

}

这一步的操作很简单,就是通过frameBuffer获取Runnable实例,然后把Runnable实例放入线程池中执行,这里我们重点来看Invocation实例是个啥,进入Invocation

class Invocation implements Runnable {
  private final FrameBuffer frameBuffer;

  public Invocation(final FrameBuffer frameBuffer) {
    this.frameBuffer = frameBuffer;
  }

  /**
   * 线程池执行的内容
   */

  public void run() {
    frameBuffer.invoke();
  }
}

从 代码来看,requestInvoke(...)方法所做的主要工作是把 FrameBuffer#invoke 方法的调用放到了线程里。

本节最后,我们来总结下THsHaServer的特点:

  • 「主线程」启动服务并阻塞,等待服务停止
  • SelectAcceptThread线程」调用selector.select()获取读/写/连接事件
  • 将上一步获取到的事件放入「线程池」中处理

TNonblockingServer相比,引入线程池后,业务操作可以放到线程池中执行,SelectAcceptThread线程可安心地进行select,不必担心被耗时的业务处理操作阻塞了。

多路复用 serverTThreadedSelectorServer

在多数场景下,THsHaServer就可应对了,不过由于SelectAcceptThread线程既处理连接事件,又处理读写事件,在高并发场景下,读写事件会相当频繁(一个连接只会触发一次连接事件,但会触发多次读写事件),因此被频繁触发的读写事件极有可能会成为SelectAcceptThread的瓶颈,导致SelectAcceptThread无法及时获取到读写请求。

为解决读写频繁导致SelectAcceptThread的瓶颈问题,聪明的开发者们也想用多线程来处理读写事件,即读写事件分散给多个线程处理,这就有了TThreadedSelectorServer

TThreadedSelectorServer 的处理流程如下:

从图中来看,整个TThreadedSelectorServer包含两个selector

  • selector:处理连接事件,单线程,当触发连接事件后,该channel会注册到从selector
  • selector:处理读写事件,多线程,每个线程都有一个selector对象,当channel注册到该selector后,之后channel上的所有读写事件就都由该selector对应的线程处理了

TThreadedSelectorServer 的使用方式如下:

// 依然是 TNonblockingServerSocket
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
// 创建 TThreadedSelectorServer
TServer server = new TThreadedSelectorServer(
    new TThreadedSelectorServer.Args(transport).processor(processor));
// 启动服务
server.serve();

由于TNonblockingServerSocket前面已经分析过了,这里我们的的重点还是TThreadedSelectorServer,进入构造方法:

/**
 * AbstractNonblockingServer 的子类
 */

public class TThreadedSelectorServer extends AbstractNonblockingServer {
  ...

  public TThreadedSelectorServer(Args args) {
    super(args);
    args.validate();
    // 创建线程池
    invoker = args.executorService == null 
      ? createDefaultExecutor(args) : args.executorService;
    this.args = args;
  }

  /**
   * 创建线程池
   */

  protected static ExecutorService createDefaultExecutor(Args options) {
    // 默认使用 newFixedThreadPool 创建
    return (options.workerThreads > 0
      ? Executors.newFixedThreadPool(options.workerThreads) : null;
  }

  ...

TThreadedSelectorServer的构造方法还是比较简单的,做了一些赋值操作,然后创建了一个线程池,这里就不多分析了。

接下来我们来看看serve()方法,进入AbstractNonblockingServer#serve

public void serve() {
    // 启动处理线程
    if (!startThreads()) {
      return;
    }

    // 启动监听,其实只是设置了`soTimeout`属性值
    if (!startListening()) {
      return;
    }
  // 设置服务标识
    setServing(true);

    // 上面启动的处理线程join到主线程中,就只是执行了 thread.join() 方法
    // 当前线程会在这里阻塞
    waitForShutdown();
  
    // 设置服务标识
    setServing(false);

    // 关闭监听,执行的是 serverTransport.close()操作,即关闭serverSocketChannel
    stopListening();
}

实际上,这个方法我们在分析TNonblockingServer时已经分析过了,TNonblockingServer实现的也是AbstractNonblockingServer类,因此调用TNonblockingServer#serve方法时,也是调用到这个方法的。

不过,尽管AbstractNonblockingServer#serve的内容一致,但具体的实现却大不相同,这里我们主要关注startThreads()方法,来看看它的实现。startThreads()方法的实现为TThreadedSelectorServer#startThreads,我们跟进去:

  protected boolean startThreads() {
    try {
      for (int i = 0; i < args.selectorThreads; ++i) {
        // selector 线程
        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
      }
      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
        createSelectorThreadLoadBalancer(selectorThreads));
      // 启动一个个 selector 线程
      for (SelectorThread thread : selectorThreads) {
        thread.start();
      }
      // 启动 acceptThread 线程
      acceptThread.start();
      return true;
    } catch (IOException e) {
      LOGGER.error("Failed to start threads!", e);
      return false;
    }
  }

这一步启动了两个线程:

  • acceptThread线程:处理连接事件
  • selectorThreads线程组:处理读写事件

下面我们就来分析这两个线程是如何进行合作的。

acceptThread线程

AcceptThread的构造方法如下:

public AcceptThread(TNonblockingServerTransport serverTransport,
    SelectorThreadLoadBalancer threadChooser)
 throws IOException 
{
  this.serverTransport = serverTransport;
  // 线程选择器,用来处理如何从众多的selectorThreads中选择一个线程
  this.threadChooser = threadChooser;
  // 创建一个 accept selector
  this.acceptSelector = SelectorProvider.provider().openSelector();
  // 注册到 serverTransport,进入该方法会发现该 Selector 只关注 SelectionKey.OP_ACCEPT 事件
  this.serverTransport.registerSelector(acceptSelector);
}

再来看看AcceptThread线程的执行内容,进入TThreadedSelectorServer.AcceptThread#run方法:

public void run() {
  try {
    // 执行 eventHandler
    if (eventHandler_ != null) {
      eventHandler_.preServe();
    }
    // 不断地循环调用 select
    while (!stopped_) {
      select();
    }
  } catch (Throwable t) {
    LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
  } finally {
    ...
  }
}

继续进入TThreadedSelectorServer.AcceptThread#select方法,看看具体的select操作:

private void select() {
  try {
    // 进行select操作,这里会阻塞,直到有连接请求的到来
    acceptSelector.select();

    // 由于该 selector 只关注连接事件,执行到这 里就表示获取到了连接
    Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // 跳过无效的连接
      if (!key.isValid()) {
        continue;
      }

      if (key.isAcceptable()) {
        // 连接处理操作
        handleAccept();
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

继续,我们再来看看连接处理操作,进入TThreadedSelectorServer.AcceptThread#handleAccept方法:

private void handleAccept() {
  final TNonblockingTransport client = doAccept();
  if (client != null) {
    // 为当前连接事件选择一个线程
    final SelectorThread targetThread = threadChooser.nextThread();

    if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
      // 直接在当前线程中执行
      doAddAccept(targetThread, client);
    } else {
      // 放在线程池中执行
      try {
        invoker.submit(new Runnable() {
          public void run() {
            doAddAccept(targetThread, client);
          }
        });
      } catch (RejectedExecutionException rx) {
        LOGGER.warn("ExecutorService rejected accept registration!", rx);
        // close immediately
        client.close();
      }
    }
  }
}

继续探索doAddAccept(...)的执行操作,进入TThreadedSelectorServer.AcceptThread#doAddAccept方法:

private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
  // 在if条件中调用SelectorThread#addAcceptedConnection方法
  if (!thread.addAcceptedConnection(client)) {
    client.close();
  }
}

继续进入TThreadedSelectorServer.SelectorThread#addAcceptedConnection

public boolean addAcceptedConnection(TNonblockingTransport accepted) {
  try {
    // 添加到队列中
    acceptedQueue.put(accepted);
  } catch (InterruptedException e) {
    LOGGER.warn("Interrupted while adding accepted connection!", e);
    return false;
  }
  // 将 selector 从 select() 方法中唤醒
  selector.wakeup();
  return true;
}

到这里,AcceptThread的操作就完成了。

selectorThreads线程组

连接请求放入到acceptedQueue之后,后续该如何处理呢?这就是SelectorThread的工作了,SelectorThread的构造方法如下:

public SelectorThread() throws IOException {
  this(new LinkedBlockingQueue<TNonblockingTransport>());
}

/**
 * 有参构造
 */

public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) 
    throws IOException 
{
  this.acceptedQueue = acceptedQueue;
}

构造方法中仅是做了一个参数赋值,就不多分析了,我们重点关注SelectorThread#run方法,看看线程的执行内容:

public void run() {
  try {
    while (!stopped_) {
      // 处理读写事件
      select();
      // 处理连接事件
      processAcceptedConnections();
      // 处理key
      processInterestChanges();
    }
    for (SelectionKey selectionKey : selector.keys()) {
      cleanupSelectionKey(selectionKey);
    }
  } catch (Throwable t) {
    LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
  } finally {
    ...
  }
}

这里我们重点关注读写事件与连接事件的处理。

先看连接事件的处理,处理方法为TThreadedSelectorServer.SelectorThread#processAcceptedConnections,代码如下:

private void processAcceptedConnections() {
  // 循环操作
  while (!stopped_) {
    // 从acceptedQueue获取连接请求
    TNonblockingTransport accepted = acceptedQueue.poll();
    // 未获取到,取出循环
    if (accepted == null) {
      break;
    }
    // 注册连接请求
    registerAccepted(accepted);
  }
}

还记得在 acceptThread 最终会向acceptedQueue添加连接请求吗,那个连接请求就是在这里拿出来的,获取到连接请求后,接着就调用TThreadedSelectorServer.SelectorThread#registerAccepted进行注册操作了:

private void registerAccepted(TNonblockingTransport accepted) {
  SelectionKey clientKey = null;
  try {
    // 注册到 selector,关注的事件是 OP_READ(读事件)
    clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
    // 获取请求数据
    FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);
    clientKey.attach(frameBuffer);
  } catch (IOException | TTransportException e) {
    LOGGER.warn("Failed to register accepted connection to selector!", e);
    if (clientKey != null) {
      cleanupSelectionKey(clientKey);
    }
    accepted.close();
  }
}

到这里,就把连接请求注册到SelectorThread的成员变量selector上了,之后读写请求就由该selector来处理了。

接下来我们再来看看读写请求的select操作,进入TThreadedSelectorServer.SelectorThread#select方法:

private void select() {
  try {
    // 处理select操作
    doSelect();
    // 如果获取到了 selectedKeys,即有新的读写请求
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // 跳过无效的请求
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }
      // 处理读写操作,调用的是 AbstractNonblockingServer 的方法
      if (key.isReadable()) {
        // deal with reads
        handleRead(key);
      } else if (key.isWritable()) {
        // deal with writes
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

这个方法的关键操作分为两步:

  1. 进行select操作,获取该selector上发生的读写事件
  2. 处理读写事件

处理读写事件的操作调用的是AbstractNonblockingServer方法,前面已经分析过 了,这里我们重点关注select操作,进入 TThreadedSelectorServer.SelectorThread#doSelect方法:

private void doSelect() throws IOException {
  long beforeSelect = System.currentTimeMillis();
  // select 操作
  int selectedNums = selector.select();
  long afterSelect = System.currentTimeMillis();
  
  // 可能出现了jvm bug:没有获取到内容,但阻塞被唤醒了
  if (selectedNums == 0) {
    jvmBug++;
  } else {
    jvmBug = 0;
  }

  // 也有可能是调用 selector.weakup() 唤醒的,这里就进行次数重置
  long selectedTime = afterSelect - beforeSelect;
  // MONITOR_PERIOD = 1000L;
  if (selectedTime >= MONITOR_PERIOD) {
    jvmBug = 0;
  // SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
  } else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
    LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.",
      MONITOR_PERIOD, jvmBug);
    // 重建 selector
    rebuildSelector();
    selector.selectNow();
    jvmBug = 0;
  }

}

上面的方法中,抛出jvm bug的处理,就只有一行关键代码:

selector.select();

这是jdk提供的方法,就是获取当前selector上触发的读写事件,这里就不多分析了。

这里我们重点看看这里的jvm bug是个啥。实际上,这个jvm bug就是大名鼎鼎(或臭名昭著)的selector空轮询bug,关于该bug,不了解的小伙伴可以看看JDK Epoll空轮询bug一文,这里引用文章中的一张图:

总结下就是,select()没有获取到读写请求就立即返回了,由于while(true)的存在,就导致代码变成了如下操作:

while(true) {
  // 这里面的内容就像不存在一样

}

这样整个线程就在空转,白白消耗cpu资源,cpu使用量很快就到100%了。

那么thrift是如何解决这个问题的呢?

从代码来看,thrift处理方式如下:获取到的事件数量为0,且阻塞时间小于1s时,就表明发生了空转,当空转次数达到512次时,就重新构建selector

本节最后,我们来总结下TThreadedSelectorServer的特点:

  • 「主线程」启动服务并阻塞,等待服务停止
  • AcceptThread线程」调用selector.select()获取连接事件,并从SelectorThread线程组中选择一个SelectorThread线程,将该连接事件放入到SelectorThread对应的acceptedQueue
  • SelectorThread线程组」包含多个**SelectorThread线程88,在线程执行时,会从acceptedQueue连接事件,然后将该连接注册到其成员变量selector上,这样就完成连接与**SelectorThread**线程的绑定
  • SelectorThread线程」执行select操作,获取selector上发生的读写事件

总结

本文介绍了thrift的几种网络模型,总结如下:

网络模型说明
TSimpleServer单线程阻塞IO单线程,使用的是阻塞IO,只能同时维持一个连接
TThreadPoolServer多线程阻塞IO基于线程池的阻塞IO,能同时维持的连接数取决于线程池大小
TNonblockingServer单线程单selector单线程,使用的是非阻塞IO,能同时维持多个连接,但只能同时处理一个连接上的读写请求
THsHaServer多线程单selectorTNonblockingServer的多线程版本,能同时维持多个连接,单线程处理连接事件,线程池处理读写事件,可同时处理多个连接上的读写请求,数量由线程池大小决定
TThreadedSelectorServer多线程主从selector单线程的主selector用于处理连接事件,多线程的从selector用于处理读写请求事件,可同时处理多个连接上的读写请求,可处理海量连接服务

关于thrift的网络模型就介绍到这里了。


限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。

本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!


浏览 27
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报