03. Apache thrift 之网络模型
本文我们来分析thrift的网络传输模式。
客户端
Thrift 客户端常用的传输层有以下几种:
TSocket:使用阻塞式I/O进行传输,是最常见的模式TNonblockingSocket:使用非阻塞方式,用于构建异步客户端TFramedTransport:使用非阻塞方式,按块的大小进行传输,类似于Java中的NIO
接下来我们从源码的角度来分析这几种传输层的实现。
阻塞式IO:TSocket
TSocket使用了阻塞式I/O进行传输,代码实现如下:
public class TSocket extends TIOStreamTransport {
private Socket socket_;
/**
* 构造方法
*/
public TSocket(String host, int port) throws TTransportException {
this(new TConfiguration(), host, port, 0);
}
/**
* 构造方法
*/
public TSocket(TConfiguration config, String host, int port, int socketTimeout,
int connectTimeout) throws TTransportException {
super(config);
host_ = host;
port_ = port;
socketTimeout_ = socketTimeout;
connectTimeout_ = connectTimeout;
initSocket();
}
/**
* 初始化方法
*/
private void initSocket() {
// 创建 Socket 对象
socket_ = new Socket();
try {
// 对 socket 进行一些配置
socket_.setSoLinger(false, 0);
socket_.setTcpNoDelay(true);
socket_.setKeepAlive(true);
socket_.setSoTimeout(socketTimeout_);
} catch (SocketException sx) {
LOGGER.error("Could not configure socket.", sx);
}
}
/**
* 进行连接
*/
public void open() throws TTransportException {
...
try {
// 连接操作
socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
inputStream_ = new BufferedInputStream(socket_.getInputStream());
outputStream_ = new BufferedOutputStream(socket_.getOutputStream());
} catch (IOException iox) {
close();
throw new TTransportException(TTransportException.NOT_OPEN, iox);
}
}
...
}
这个类中使用的是标准的BIO操作,使用的是jdk提供的Socket类,连接服务端时使用的是java.net.Socket#connect(java.net.SocketAddress, int)方法。
非阻塞式IO:TNonblockingSocket
TNonblockingSocket 使用的是非阻塞IO方式,代码如下:
public class TNonblockingSocket extends TNonblockingTransport {
private final SocketAddress socketAddress_;
private final SocketChannel socketChannel_;
/**
* 构造方法,打开 SocketChannel
*/
public TNonblockingSocket(String host, int port, int timeout)
throws IOException, TTransportException {
// SocketChannel.open():开启 nio 连接
this(SocketChannel.open(), timeout, new InetSocketAddress(host, port));
}
/**
* 构造方法,处理 SocketChannel 的配置
*/
private TNonblockingSocket(TConfiguration config, SocketChannel socketChannel,
int timeout, SocketAddress socketAddress) throws IOException, TTransportException {
super(config);
socketChannel_ = socketChannel;
socketAddress_ = socketAddress;
// 配置为非阻塞
socketChannel.configureBlocking(false);
// 获取 socket,配置一些参数
Socket socket = socketChannel.socket();
socket.setSoLinger(false, 0);
socket.setTcpNoDelay(true);
socket.setKeepAlive(true);
setTimeout(timeout);
}
/**
* 开启,非阻塞io没有实现
*/
public void open() throws TTransportException {
throw new RuntimeException("open() is not implemented for TNonblockingSocket");
}
/**
* 进行连接
*/
public boolean startConnect() throws IOException {
return socketChannel_.connect(socketAddress_);
}
...
}
这个类中使用的是标准的NIO操作,使用的是jdk提供的SocketChannel类,开启SocketChannel使用的是SocketChannel.open()方法,连接服务端使用的是java.nio.channels.SocketChannel#connect方法。
缓冲TTransport:TFramedTransport
TFramedTransport 的注释如下:
TFramedTransport is a buffered TTransport that ensures a fully read message every time by preceding messages with a 4-byte frame size.
TFramedTransport 是一个缓冲的 TTransport,通过在前面带有4字节帧大小的消息来确保每次完全读取消息。
注释表明,它是一 个缓冲的TTransport,我们再来看下它的代码实现:
public class TFramedTransport extends TLayeredTransport {
...
/**
* 构造方法
* 需要传入 TTransport
*/
public TFramedTransport(TTransport transport) throws TTransportException {
this(transport, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
}
/**
* 构造方法
* 需要传入 transport 与 maxLength
*/
public TFramedTransport(TTransport transport, int maxLength) throws TTransportException {
super(transport);
TConfiguration _configuration = Objects.isNull(transport.getConfiguration())
? new TConfiguration() : transport.getConfiguration();
// 配置最大长度
_configuration.setMaxFrameSize(maxLength);
writeBuffer_.write(sizeFiller_, 0, 4);
readBuffer_= new TMemoryInputTransport(_configuration, new byte[0]);
}
public void open() throws TTransportException {
// getInnerTransport() 得到的就是传入的 transport
getInnerTransport().open();
}
...
}
从代码来看,TFramedTransport 就是个包装类,构建方法中需要传入具体的TTransport,具体的IO操作由传入的TTransport来实现,从注释来看,主要作用就是为TTransport提供缓冲功能,使用时也是配合其他TTransport使用,如配合TNonblockingSocket使用:
// io 操作由 TNonblockingSocket 实现
TNonblockingSocket tSocket = new TNonblockingSocket(host, port);
TTransport transport = new TFramedTransport(tSocket);
服务端
Thrift 服务端支持的 server 类型如下:
TSimpleServer:单线程服务器端,使用标准的阻塞式I/OTThreadPoolServer:多线程服务器端,使用标准的阻塞式I/OTNonblockingServer:单线程服务器端,使用非阻塞式I/OTHsHaServer:半同步半异步服务器端,基于非阻塞式IO读写和多线程工作任务处理TThreadedSelectorServer:多线程选择器服务器端,对THsHaServer在异步IO模型上进行增强
单线程阻塞server:TSimpleServer
单线程服务器端,使用标准的阻塞式I/O,使用方法如下:
TServerTransport transport = new TServerSocket(port);
TServer server = new TSimpleServer(new TServer.Args(transport).processor(processor));
在上一篇文章中,我们就是以该server类型为实例进行分析的,这里就不再分析了。
我们来总结下这种IO模型的优缺点:
优点:实现简单 缺点: 单线程,无法发挥服务器多核优势 由于使用的是阻塞IO,一个连接占用一个线程,因此只能处理一个连接,多于1个连接的情况下,其余的连接会等待
从上面的优缺点来看,单线程阻塞模型除了实现简单外,其他的都是缺点,在生产中一般不会用。
多线程阻塞server:TThreadPoolServer
多线程服务器端,使用标准的阻塞式I/O,使用方法如下:
// 生成 TServerSocket
TServerTransport transport = new TServerSocket(port);
// 生成 TServer
TServer server = new TThreadPoolServer(
new TThreadPoolServer.Args(transport).processor(processor));
server.serve();
在使用TThreadPoolServer时 ,需要传入Socket,这里使用的是TServerSocket,这里我们分别来分析这两个类。
TServerSocket
TServerSocket 的创建过程如下:
public class TServerSocket extends TServerTransport {
/**
* 构造方法,传入 port
*/
public TServerSocket(int port) throws TTransportException {
this(port, 0);
}
/**
* 构造方法,传入 bindAddr 与 port
*/
public TServerSocket(InetSocketAddress bindAddr, int clientTimeout)
throws TTransportException {
this(new ServerSocketTransportArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
}
/**
* 构造方法
* args 里封装的就是 port、clientTimeout
*/
public TServerSocket(ServerSocketTransportArgs args)
throws TTransportException {
clientTimeout_ = args.clientTimeout;
if (args.serverSocket != null) {
this.serverSocket_ = args.serverSocket;
return;
}
try {
// ServerSocket:阻塞 io
serverSocket_ = new ServerSocket();
serverSocket_.setReuseAddress(true);
// 绑定地址与端口
serverSocket_.bind(args.bindAddr, args.backlog);
} catch (IOException ioe) {
close();
throw new TTransportException("Could not create ServerSocket on address "
+ args.bindAddr.toString() + ".", ioe);
}
}
从代码上来看,TServerSocket 中使用的是 ServerSocket,这是个阻塞IO。阻塞IO中,服务端使用的是ServerSocket,客户端使用的Socket,两者一般配套使用,即服务端使用ServerSocket开启端口监听,客户端使用的Socket创建socket连接。
TThreadPoolServer
TThreadPoolServer 的创建过程如下:
public class TThreadPoolServer extends TServer {
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class);
public static class Args extends AbstractServerArgs<Args> {
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
public ExecutorService executorService;
...
}
public void serve() {
if (!preServe()) {
return;
}
// 执行操作
execute();
...
}
/**
* 具体的执行操作,在这里会打开连接
*/
protected void execute() {
while (!stopped_) {
try {
// accept() 会阻塞,等待连接的到来
TTransport client = serverTransport_.accept();
try {
// 连接来了,就丢到线程池中执行
executorService_.execute(new WorkerProcess(client));
} catch (RejectedExecutionException ree) {
if (!stopped_) {
LOGGER.warn("...");
}
client.close();
}
} catch (TTransportException ttx) {
if (!stopped_) {
LOGGER.warn("Transport error occurred during acceptance of message", ttx);
}
}
}
}
}
与单线程阻塞server不同的是,TThreadPoolServer会持有一个工作线程池,当连接创建后,就会把连接交给线程池处理。
处理流程示意图如下:

这里的handler就是线程池中的一个个线程。
这部分的最后,我们也来总结下这种模型的优缺点:
优点:相比于单线程处理,使用多线程后,大大提高了连接处理数 缺点:由于阻塞IO一个连接占用一个线程,因此连接数受限于线程池的数量,无法处理海量连接
虽然该模型无法处理海量连接,但在没有NIO的年代,这种模型是最主要的处理网络请求的方式,不过现在基本用NIO了。
单线程非阻塞server:TNonblockingServer
上面介绍的两种网络模型都是基于阻塞IO的,接下来我们介绍非阻塞IO。
最简单的非阻塞IO模型:单线程非阻塞,使用方式如下:
// 指定socket:这里使用的就是非阻塞socket了
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
// 创建服务端
TServer server = new TNonblockingServer(
new TNonblockingServer.Args(transport).processor(processor));
server.serve();
以上代码中,使用到了两个类:
TNonblockingServerSocket:指定socket类型 TNonblockingServer:指定server类型
接下来我们就来分析这两个类的具体实现。
TNonblockingServerSocket
TNonblockingServerSocket的构造方法如下:
TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args)
throws TTransportException {
clientTimeout_ = args.clientTimeout;
try {
serverSocketChannel = ServerSocketChannel.open();
// 配置为非阻塞
serverSocketChannel.configureBlocking(false);
// serverSocketChannel:非阻塞io
serverSocket_ = serverSocketChannel.socket();
serverSocket_.setReuseAddress(true);
// 绑定地址与端口
serverSocket_.bind(args.bindAddr, args.backlog);
} catch (IOException ioe) {
serverSocket_ = null;
throw new TTransportException("Could not create ServerSocket on address "
+ args.bindAddr.toString() + ".", ioe);
}
}
从上述代码中可以看到,在TNonblockingServerSocket中使用的是ServerSocketChannel,这是非阻塞IO提供的类,作用相当于阻塞IO中的ServerSocket.
TNonblockingServer.Args
TNonblockingServer的构造方法里需要传入AbstractNonblockingServerArgs参数,进入TNonblockingServer.Args,它现它仅是继承了AbstractNonblockingServerArgs:
public static class Args extends AbstractNonblockingServerArgs<Args> {
public Args(TNonblockingServerTransport transport) {
super(transport);
}
}
继续进入AbstractNonblockingServerArgs:
public static abstract class AbstractNonblockingServerArgs
<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
public long maxReadBufferBytes = 256 * 1024 * 1024;
public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
super(transport);
// 设置 transportFactory
transportFactory(new TFramedTransport.Factory());
}
}
在这个方法里,调用transportFactory(...)设置了transport,这里使用的transpost是TFramedTransport,这点有别于阻塞IO.
TNonblockingServer
接下来我们来看看TNonblockingServer,构造方法如下:
public class TNonblockingServer extends AbstractNonblockingServer {
...
private SelectAcceptThread selectAcceptThread_;
/**
* 构造方法
*/
public TNonblockingServer(AbstractNonblockingServerArgs args) {
super(args);
}
...
}
只是调用了父类的构造方法,追踪下去后,发现所做的工作也很简单,只是把传入的TNonblockingServerSocket做了一个赋值操作,这里就不多分析了。
接下来我们来看看TNonblockingServer的serve()方法,进入AbstractNonblockingServer#serve:
public void serve() {
// 启动处理线程
if (!startThreads()) {
return;
}
// 启动监听,其实只是设置了`soTimeout`属性值
if (!startListening()) {
return;
}
// 设置服务标识
setServing(true);
// 上面启动的处理线程join到主线程中,就只是执行了 thread.join() 方法
// 当前线程会在这里阻塞
waitForShutdown();
// 设置服务标识
setServing(false);
// 关闭监听,执行的是 serverTransport.close()操作,即关闭serverSocketChannel
stopListening();
}
AbstractNonblockingServer#serve就是整个连接处理流程了,关键部分已作了详细的注释。
从代码上来看,整个过程共两个线程:
主线程:启动连接服务后,会阻塞直到关闭 连接处理线程:处理连接请求
这里我们主要关注连接处理线程,启动该线程的方法为TNonblockingServer#startThreads,代码如下:
protected boolean startThreads() {
// start the selector
try {
selectAcceptThread_ = new SelectAcceptThread(
(TNonblockingServerTransport)serverTransport_);
// 启动线程
selectAcceptThread_.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start selector thread!", e);
return false;
}
}
这里又出现了一个类:SelectAcceptThread,从代码的执行来看,它显然是Thread或Runnable的实现,我们进入它的构造方法:
public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
throws IOException {
this.serverTransport = serverTransport;
// 注册操作selector
serverTransport.registerSelector(selector);
}
以上代码的关键在于注册selector的操作,我们进入TNonblockingServerSocket#registerSelector一探究竟:
public void registerSelector(Selector selector) {
try {
// 将 selector 注册到 serverSocketChannel,监听的事件是连接事件(OP_ACCEPT)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
// this shouldn't happen, ideally...
// TODO: decide what to do with this.
}
}
其实这就是往serverSocketChannel中注册selector,关心的事件是连接事件(OP_ACCEPT),也就是说, 当新连接过来时,该selector会被唤醒,这一块是NIO网络编程的老套路了,这里就不多做解释了。
上述出现了selector,那么这个selector是从何而来呢?
经过本人的苦苦寻觅,在SelectAcceptThread的父类AbstractSelectThread中找到了selector的来源(AbstractNonblockingServer.AbstractSelectThread):
protected abstract class AbstractSelectThread extends Thread {
protected Selector selector;
// List of FrameBuffers that want to change their selection interests.
protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
public AbstractSelectThread() throws IOException {
// 打开一个 selector 对象
this.selector = SelectorProvider.provider().openSelector();
}
...
}
selector 是由 SelectorProvider.provider().openSelector()获取的,这也是jdk NIO网络编程提供的功能,这里就不多分析了。值得一提的是,selector是SelectAcceptThread的成员变量,即selector只为当前线程持有。
接着我们把目光聚集于SelectAcceptThread对象,进入其run()方法,查看该线程所做的工作:
// TNonblockingServer.SelectAcceptThread#run
public void run() {
try {
// 处理 eventHandler
if (eventHandler_ != null) {
eventHandler_.preServe();
}
while (!stopped_) {
// select 操作
select();
// 处理事件变更
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
// 清除 selectionKey
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
stopped_ = true;
}
}
TNonblockingServer.SelectAcceptThread#run方法的重点地方都已做了注释,这里我们仅关注主要操作,进 入 TNonblockingServer.SelectAcceptThread#select方法:
private void select() {
try {
// select 操作,这里会阻塞
selector.select();
// 获取 selectionKey
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// 跳过无效的key
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// 处理具体的key,如:连接事件、读事件、写事件
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
handleRead(key);
} else if (key.isWritable()) {
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
我们进入SelectAcceptThread#handleAccept方法,来看看连接事件是如何处理的:
private void handleAccept() throws IOException {
SelectionKey clientKey = null;
TNonblockingTransport client = null;
try {
// 得到连接
client = serverTransport.accept();
// 得到连接后,再将该连接注册到 selector 上,这次监听的事件是 读事件(OP_READ)
clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
// 得到 frameBuffer
FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);
clientKey.attach(frameBuffer);
} catch (TTransportException tte) {
...
}
}
从以上代码来看,连接处理步骤如下:
获取连接 将连接注册到 selector,监听的事件是读事件(OP_READ)获取连接数据 frameBuffer,并放入到SelectionKey中
这几个操作也是标准的NIO操作了,注册到selector后,接下来就是处理selector的读事件了,我们进入方法 AbstractNonblockingServer.AbstractSelectThread#handleRead:
protected void handleRead(SelectionKey key) {
// 获取 SelectionKey 的数据
FrameBuffer buffer = (FrameBuffer) key.attachment();
// 进行读数据操作
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
// 数据已读完,调用处理方法
if (buffer.isFrameFullyRead()) {
// 调用处理方法
if (!requestInvoke(buffer)) {
cleanupSelectionKey(key);
}
}
}
最终调用AbstractNonblockingServer.FrameBuffer#invoke方法来处理读事件:
public void invoke() {
frameTrans_.reset(buffer_.array());
response_.reset();
try {
// 执行 eventHandler 的方法
if (eventHandler_ != null) {
eventHandler_.processContext(context_, inTrans_, outTrans_);
}
// 执行 Processor,这里最终会调用具体的服务端实现
processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
responseReady();
return;
} catch (TException te) {
LOGGER.warn("Exception while invoking!", te);
} catch (Throwable t) {
LOGGER.error("Unexpected throwable while invoking!", t);
}
// This will only be reached when there is a throwable.
state_ = FrameBufferState.AWAITING_CLOSE;
requestSelectInterestChange();
}
AbstractNonblockingServer.FrameBuffer#invoke方法就是实际的处理类了,processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_)最终执行的就是我们自己实现类的方法,如HelloServiceImpl#hello。
处理流程示意图如下:

与阻塞IO相比,使用了selector后,线程大部分时间是处于select状态,只有在selector上发生读/写/连接事件时,线程才会处理业务操作。
尽管SelectAcceptThread并不会被某一个连接独占,但处理读/写/连接事件时,只有一个线程在进行,如果线程正好在处理连接A的读事件,此时连接B再发生了读事件,那么线程就无法立即处理连接B的读事件了,只有处理完连接A的读事件后线程才会继续处理连接B的读事件。
由此可见,非阻塞IO下虽然可以同时处理多个连接请求,但由于使用的是单线程,请求处理时可能并不会很及时。
本节的最后,我们来总结下TNonblockingServer的特点:
「主线程」启动服务并阻塞,等待服务停止 「 SelectAcceptThread线程」调用selector.select()获取读/写/连接事件并处理,处理时有可能会阻塞select操作
半同步半异步server:THsHaServer
上一节我们分析到,在单线程非阻塞模型下,由于使用的是单线程,当线程正在处理业务操作时,其他请求处理时可能并不会很及时。为了提高响应速度,thrift在单线程非阻塞模型上引入了线程池操作,这就是THsHaServer,处理流程如下:

从整个操作来看,就是把TNonblockingServer的SelectAcceptThread放在了线程池中执行,接下来我们就来具体分析这块的操作。
THsHaServer 的使用方式如下:
// 创建 ServerSocket,这块与 TNonblockingServer 一致
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
// 创建 THsHaServer
TServer server = new THsHaServer(new THsHaServer.Args(transport).processor(processor));
server.serve();
这部分的操作与TNonblockingServer很相似:
创建了 TNonblockingServerSocket对象创建 THsHaServer对象调用 THsHaServer#serve方法提供对外服务
关于TNonblockingServerSocket对象的操作在上一节已经分析过了,这里我们直接分析THsHaServer。
THsHaServer 构造方法
THsHaServer 构造方法如下:
/**
* 继承了 TNonblockingServer
*/
public class THsHaServer extends TNonblockingServer {
...
/**
* 构造方法
*/
public THsHaServer(Args args) {
super(args);
// 创建线程池
invoker = args.executorService == null
? createInvokerPool(args) : args.executorService;
this.args = args;
}
}
从代码上来看,THsHaServer继承了TNonblockingServer,这就继承了TNonblockingServer中的大多数特性。在其构造方法中,先是调用了父类的构造方法,然后创建了一个线程池,这里我们来看看线程池的创建操作:
// 线程池
private ExecutorService executorService = null;
// 创建线程池
protected static ExecutorService createInvokerPool(Args options) {
// 获取参数
int minWorkerThreads = options.minWorkerThreads;
int maxWorkerThreads = options.maxWorkerThreads;
int stopTimeoutVal = options.stopTimeoutVal;
TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
// 使用的是 ThreadPoolExecutor,jdk提供的线程池
ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
return invoker;
}
THsHaServer中使用的线程池是通过ThreadPoolExecutor直接创建的。
THsHaServer#serve方法
THsHaServer并没有实现serve()方法,它的serve()方法继承自TNonblockingServer,这块在上一节已经分析过了,这里就不再分析了。
requestInvoke(...)方法的调用
在分析TNonblockingServer时,我们看到请求是在TNonblockingServer#requestInvoke方法中处理的,THsHaServer对该方法进行了重写,代码如下:
protected boolean requestInvoke(FrameBuffer frameBuffer) {
try {
// 获取 invocation
Runnable invocation = getRunnable(frameBuffer);
// 放入线程池中执行
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
}
// 获取 Invocation
protected Runnable getRunnable(FrameBuffer frameBuffer){
return new Invocation(frameBuffer);
}
...
}
这一步的操作很简单,就是通过frameBuffer获取Runnable实例,然后把Runnable实例放入线程池中执行,这里我们重点来看Invocation实例是个啥,进入Invocation:
class Invocation implements Runnable {
private final FrameBuffer frameBuffer;
public Invocation(final FrameBuffer frameBuffer) {
this.frameBuffer = frameBuffer;
}
/**
* 线程池执行的内容
*/
public void run() {
frameBuffer.invoke();
}
}
从 代码来看,requestInvoke(...)方法所做的主要工作是把 FrameBuffer#invoke 方法的调用放到了线程里。
本节最后,我们来总结下THsHaServer的特点:
「主线程」启动服务并阻塞,等待服务停止 「 SelectAcceptThread线程」调用selector.select()获取读/写/连接事件将上一步获取到的事件放入「线程池」中处理
同TNonblockingServer相比,引入线程池后,业务操作可以放到线程池中执行,SelectAcceptThread线程可安心地进行select,不必担心被耗时的业务处理操作阻塞了。
多路复用 server:TThreadedSelectorServer
在多数场景下,THsHaServer就可应对了,不过由于SelectAcceptThread线程既处理连接事件,又处理读写事件,在高并发场景下,读写事件会相当频繁(一个连接只会触发一次连接事件,但会触发多次读写事件),因此被频繁触发的读写事件极有可能会成为SelectAcceptThread的瓶颈,导致SelectAcceptThread无法及时获取到读写请求。
为解决读写频繁导致SelectAcceptThread的瓶颈问题,聪明的开发者们也想用多线程来处理读写事件,即读写事件分散给多个线程处理,这就有了TThreadedSelectorServer。
TThreadedSelectorServer 的处理流程如下:

从图中来看,整个TThreadedSelectorServer包含两个selector:
主 selector:处理连接事件,单线程,当触发连接事件后,该channel会注册到从selector上从 selector:处理读写事件,多线程,每个线程都有一个selector对象,当channel注册到该selector后,之后channel上的所有读写事件就都由该selector对应的线程处理了
TThreadedSelectorServer 的使用方式如下:
// 依然是 TNonblockingServerSocket
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
// 创建 TThreadedSelectorServer
TServer server = new TThreadedSelectorServer(
new TThreadedSelectorServer.Args(transport).processor(processor));
// 启动服务
server.serve();
由于TNonblockingServerSocket前面已经分析过了,这里我们的的重点还是TThreadedSelectorServer,进入构造方法:
/**
* AbstractNonblockingServer 的子类
*/
public class TThreadedSelectorServer extends AbstractNonblockingServer {
...
public TThreadedSelectorServer(Args args) {
super(args);
args.validate();
// 创建线程池
invoker = args.executorService == null
? createDefaultExecutor(args) : args.executorService;
this.args = args;
}
/**
* 创建线程池
*/
protected static ExecutorService createDefaultExecutor(Args options) {
// 默认使用 newFixedThreadPool 创建
return (options.workerThreads > 0)
? Executors.newFixedThreadPool(options.workerThreads) : null;
}
...
TThreadedSelectorServer的构造方法还是比较简单的,做了一些赋值操作,然后创建了一个线程池,这里就不多分析了。
接下来我们来看看serve()方法,进入AbstractNonblockingServer#serve:
public void serve() {
// 启动处理线程
if (!startThreads()) {
return;
}
// 启动监听,其实只是设置了`soTimeout`属性值
if (!startListening()) {
return;
}
// 设置服务标识
setServing(true);
// 上面启动的处理线程join到主线程中,就只是执行了 thread.join() 方法
// 当前线程会在这里阻塞
waitForShutdown();
// 设置服务标识
setServing(false);
// 关闭监听,执行的是 serverTransport.close()操作,即关闭serverSocketChannel
stopListening();
}
实际上,这个方法我们在分析TNonblockingServer时已经分析过了,TNonblockingServer实现的也是AbstractNonblockingServer类,因此调用TNonblockingServer#serve方法时,也是调用到这个方法的。
不过,尽管AbstractNonblockingServer#serve的内容一致,但具体的实现却大不相同,这里我们主要关注startThreads()方法,来看看它的实现。startThreads()方法的实现为TThreadedSelectorServer#startThreads,我们跟进去:
protected boolean startThreads() {
try {
for (int i = 0; i < args.selectorThreads; ++i) {
// selector 线程
selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
}
acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
createSelectorThreadLoadBalancer(selectorThreads));
// 启动一个个 selector 线程
for (SelectorThread thread : selectorThreads) {
thread.start();
}
// 启动 acceptThread 线程
acceptThread.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start threads!", e);
return false;
}
}
这一步启动了两个线程:
acceptThread线程:处理连接事件selectorThreads线程组:处理读写事件
下面我们就来分析这两个线程是如何进行合作的。
acceptThread线程
AcceptThread的构造方法如下:
public AcceptThread(TNonblockingServerTransport serverTransport,
SelectorThreadLoadBalancer threadChooser) throws IOException {
this.serverTransport = serverTransport;
// 线程选择器,用来处理如何从众多的selectorThreads中选择一个线程
this.threadChooser = threadChooser;
// 创建一个 accept selector
this.acceptSelector = SelectorProvider.provider().openSelector();
// 注册到 serverTransport,进入该方法会发现该 Selector 只关注 SelectionKey.OP_ACCEPT 事件
this.serverTransport.registerSelector(acceptSelector);
}
再来看看AcceptThread线程的执行内容,进入TThreadedSelectorServer.AcceptThread#run方法:
public void run() {
try {
// 执行 eventHandler
if (eventHandler_ != null) {
eventHandler_.preServe();
}
// 不断地循环调用 select
while (!stopped_) {
select();
}
} catch (Throwable t) {
LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
} finally {
...
}
}
继续进入TThreadedSelectorServer.AcceptThread#select方法,看看具体的select操作:
private void select() {
try {
// 进行select操作,这里会阻塞,直到有连接请求的到来
acceptSelector.select();
// 由于该 selector 只关注连接事件,执行到这 里就表示获取到了连接
Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// 跳过无效的连接
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
// 连接处理操作
handleAccept();
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
继续,我们再来看看连接处理操作,进入TThreadedSelectorServer.AcceptThread#handleAccept方法:
private void handleAccept() {
final TNonblockingTransport client = doAccept();
if (client != null) {
// 为当前连接事件选择一个线程
final SelectorThread targetThread = threadChooser.nextThread();
if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
// 直接在当前线程中执行
doAddAccept(targetThread, client);
} else {
// 放在线程池中执行
try {
invoker.submit(new Runnable() {
public void run() {
doAddAccept(targetThread, client);
}
});
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected accept registration!", rx);
// close immediately
client.close();
}
}
}
}
继续探索doAddAccept(...)的执行操作,进入TThreadedSelectorServer.AcceptThread#doAddAccept方法:
private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
// 在if条件中调用SelectorThread#addAcceptedConnection方法
if (!thread.addAcceptedConnection(client)) {
client.close();
}
}
继续进入TThreadedSelectorServer.SelectorThread#addAcceptedConnection:
public boolean addAcceptedConnection(TNonblockingTransport accepted) {
try {
// 添加到队列中
acceptedQueue.put(accepted);
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while adding accepted connection!", e);
return false;
}
// 将 selector 从 select() 方法中唤醒
selector.wakeup();
return true;
}
到这里,AcceptThread的操作就完成了。
selectorThreads线程组
连接请求放入到acceptedQueue之后,后续该如何处理呢?这就是SelectorThread的工作了,SelectorThread的构造方法如下:
public SelectorThread() throws IOException {
this(new LinkedBlockingQueue<TNonblockingTransport>());
}
/**
* 有参构造
*/
public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue)
throws IOException {
this.acceptedQueue = acceptedQueue;
}
构造方法中仅是做了一个参数赋值,就不多分析了,我们重点关注SelectorThread#run方法,看看线程的执行内容:
public void run() {
try {
while (!stopped_) {
// 处理读写事件
select();
// 处理连接事件
processAcceptedConnections();
// 处理key
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
} finally {
...
}
}
这里我们重点关注读写事件与连接事件的处理。
先看连接事件的处理,处理方法为TThreadedSelectorServer.SelectorThread#processAcceptedConnections,代码如下:
private void processAcceptedConnections() {
// 循环操作
while (!stopped_) {
// 从acceptedQueue获取连接请求
TNonblockingTransport accepted = acceptedQueue.poll();
// 未获取到,取出循环
if (accepted == null) {
break;
}
// 注册连接请求
registerAccepted(accepted);
}
}
还记得在 acceptThread 最终会向acceptedQueue添加连接请求吗,那个连接请求就是在这里拿出来的,获取到连接请求后,接着就调用TThreadedSelectorServer.SelectorThread#registerAccepted进行注册操作了:
private void registerAccepted(TNonblockingTransport accepted) {
SelectionKey clientKey = null;
try {
// 注册到 selector,关注的事件是 OP_READ(读事件)
clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
// 获取请求数据
FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);
clientKey.attach(frameBuffer);
} catch (IOException | TTransportException e) {
LOGGER.warn("Failed to register accepted connection to selector!", e);
if (clientKey != null) {
cleanupSelectionKey(clientKey);
}
accepted.close();
}
}
到这里,就把连接请求注册到SelectorThread的成员变量selector上了,之后读写请求就由该selector来处理了。
接下来我们再来看看读写请求的select操作,进入TThreadedSelectorServer.SelectorThread#select方法:
private void select() {
try {
// 处理select操作
doSelect();
// 如果获取到了 selectedKeys,即有新的读写请求
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// 跳过无效的请求
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// 处理读写操作,调用的是 AbstractNonblockingServer 的方法
if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
这个方法的关键操作分为两步:
进行 select操作,获取该selector上发生的读写事件处理读写事件
处理读写事件的操作调用的是AbstractNonblockingServer方法,前面已经分析过 了,这里我们重点关注select操作,进入 TThreadedSelectorServer.SelectorThread#doSelect方法:
private void doSelect() throws IOException {
long beforeSelect = System.currentTimeMillis();
// select 操作
int selectedNums = selector.select();
long afterSelect = System.currentTimeMillis();
// 可能出现了jvm bug:没有获取到内容,但阻塞被唤醒了
if (selectedNums == 0) {
jvmBug++;
} else {
jvmBug = 0;
}
// 也有可能是调用 selector.weakup() 唤醒的,这里就进行次数重置
long selectedTime = afterSelect - beforeSelect;
// MONITOR_PERIOD = 1000L;
if (selectedTime >= MONITOR_PERIOD) {
jvmBug = 0;
// SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
} else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.",
MONITOR_PERIOD, jvmBug);
// 重建 selector
rebuildSelector();
selector.selectNow();
jvmBug = 0;
}
}
上面的方法中,抛出jvm bug的处理,就只有一行关键代码:
selector.select();
这是jdk提供的方法,就是获取当前selector上触发的读写事件,这里就不多分析了。
这里我们重点看看这里的jvm bug是个啥。实际上,这个jvm bug就是大名鼎鼎(或臭名昭著)的selector空轮询bug,关于该bug,不了解的小伙伴可以看看JDK Epoll空轮询bug一文,这里引用文章中的一张图:

总结下就是,select()没有获取到读写请求就立即返回了,由于while(true)的存在,就导致代码变成了如下操作:
while(true) {
// 这里面的内容就像不存在一样
}
这样整个线程就在空转,白白消耗cpu资源,cpu使用量很快就到100%了。
那么thrift是如何解决这个问题的呢?
从代码来看,thrift处理方式如下:获取到的事件数量为0,且阻塞时间小于1s时,就表明发生了空转,当空转次数达到512次时,就重新构建selector。
本节最后,我们来总结下TThreadedSelectorServer的特点:
「主线程」启动服务并阻塞,等待服务停止 「 AcceptThread线程」调用selector.select()获取连接事件,并从SelectorThread线程组中选择一个SelectorThread线程,将该连接事件放入到SelectorThread对应的acceptedQueue「 SelectorThread线程组」包含多个**SelectorThread线程88,在线程执行时,会从acceptedQueue连接事件,然后将该连接注册到其成员变量selector上,这样就完成连接与**SelectorThread**线程的绑定「 SelectorThread线程」执行select操作,获取selector上发生的读写事件
总结
本文介绍了thrift的几种网络模型,总结如下:
| 类 | 网络模型 | 说明 |
|---|---|---|
| TSimpleServer | 单线程阻塞IO | 单线程,使用的是阻塞IO,只能同时维持一个连接 |
| TThreadPoolServer | 多线程阻塞IO | 基于线程池的阻塞IO,能同时维持的连接数取决于线程池大小 |
| TNonblockingServer | 单线程单selector | 单线程,使用的是非阻塞IO,能同时维持多个连接,但只能同时处理一个连接上的读写请求 |
| THsHaServer | 多线程单selector | TNonblockingServer的多线程版本,能同时维持多个连接,单线程处理连接事件,线程池处理读写事件,可同时处理多个连接上的读写请求,数量由线程池大小决定 |
| TThreadedSelectorServer | 多线程主从selector | 单线程的主selector用于处理连接事件,多线程的从selector用于处理读写请求事件,可同时处理多个连接上的读写请求,可处理海量连接服务 |
关于thrift的网络模型就介绍到这里了。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!
