03. Apache thrift 之网络模型
共 52594字,需浏览 106分钟
·
2021-07-03 20:44
本文我们来分析thrift
的网络传输模式。
客户端
Thrift
客户端常用的传输层有以下几种:
TSocket
:使用阻塞式I/O进行传输,是最常见的模式TNonblockingSocket
:使用非阻塞方式,用于构建异步客户端TFramedTransport
:使用非阻塞方式,按块的大小进行传输,类似于Java中的NIO
接下来我们从源码的角度来分析这几种传输层的实现。
阻塞式IO:TSocket
TSocket
使用了阻塞式I/O进行传输,代码实现如下:
public class TSocket extends TIOStreamTransport {
private Socket socket_;
/**
* 构造方法
*/
public TSocket(String host, int port) throws TTransportException {
this(new TConfiguration(), host, port, 0);
}
/**
* 构造方法
*/
public TSocket(TConfiguration config, String host, int port, int socketTimeout,
int connectTimeout) throws TTransportException {
super(config);
host_ = host;
port_ = port;
socketTimeout_ = socketTimeout;
connectTimeout_ = connectTimeout;
initSocket();
}
/**
* 初始化方法
*/
private void initSocket() {
// 创建 Socket 对象
socket_ = new Socket();
try {
// 对 socket 进行一些配置
socket_.setSoLinger(false, 0);
socket_.setTcpNoDelay(true);
socket_.setKeepAlive(true);
socket_.setSoTimeout(socketTimeout_);
} catch (SocketException sx) {
LOGGER.error("Could not configure socket.", sx);
}
}
/**
* 进行连接
*/
public void open() throws TTransportException {
...
try {
// 连接操作
socket_.connect(new InetSocketAddress(host_, port_), connectTimeout_);
inputStream_ = new BufferedInputStream(socket_.getInputStream());
outputStream_ = new BufferedOutputStream(socket_.getOutputStream());
} catch (IOException iox) {
close();
throw new TTransportException(TTransportException.NOT_OPEN, iox);
}
}
...
}
这个类中使用的是标准的BIO
操作,使用的是jdk提供的Socket
类,连接服务端时使用的是java.net.Socket#connect(java.net.SocketAddress, int)
方法。
非阻塞式IO:TNonblockingSocket
TNonblockingSocket
使用的是非阻塞IO方式,代码如下:
public class TNonblockingSocket extends TNonblockingTransport {
private final SocketAddress socketAddress_;
private final SocketChannel socketChannel_;
/**
* 构造方法,打开 SocketChannel
*/
public TNonblockingSocket(String host, int port, int timeout)
throws IOException, TTransportException {
// SocketChannel.open():开启 nio 连接
this(SocketChannel.open(), timeout, new InetSocketAddress(host, port));
}
/**
* 构造方法,处理 SocketChannel 的配置
*/
private TNonblockingSocket(TConfiguration config, SocketChannel socketChannel,
int timeout, SocketAddress socketAddress) throws IOException, TTransportException {
super(config);
socketChannel_ = socketChannel;
socketAddress_ = socketAddress;
// 配置为非阻塞
socketChannel.configureBlocking(false);
// 获取 socket,配置一些参数
Socket socket = socketChannel.socket();
socket.setSoLinger(false, 0);
socket.setTcpNoDelay(true);
socket.setKeepAlive(true);
setTimeout(timeout);
}
/**
* 开启,非阻塞io没有实现
*/
public void open() throws TTransportException {
throw new RuntimeException("open() is not implemented for TNonblockingSocket");
}
/**
* 进行连接
*/
public boolean startConnect() throws IOException {
return socketChannel_.connect(socketAddress_);
}
...
}
这个类中使用的是标准的NIO
操作,使用的是jdk提供的SocketChannel
类,开启SocketChannel
使用的是SocketChannel.open()
方法,连接服务端使用的是java.nio.channels.SocketChannel#connect
方法。
缓冲TTransport
:TFramedTransport
TFramedTransport
的注释如下:
TFramedTransport is a buffered TTransport that ensures a fully read message every time by preceding messages with a 4-byte frame size.
TFramedTransport 是一个缓冲的 TTransport,通过在前面带有4字节帧大小的消息来确保每次完全读取消息。
注释表明,它是一 个缓冲的TTransport
,我们再来看下它的代码实现:
public class TFramedTransport extends TLayeredTransport {
...
/**
* 构造方法
* 需要传入 TTransport
*/
public TFramedTransport(TTransport transport) throws TTransportException {
this(transport, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
}
/**
* 构造方法
* 需要传入 transport 与 maxLength
*/
public TFramedTransport(TTransport transport, int maxLength) throws TTransportException {
super(transport);
TConfiguration _configuration = Objects.isNull(transport.getConfiguration())
? new TConfiguration() : transport.getConfiguration();
// 配置最大长度
_configuration.setMaxFrameSize(maxLength);
writeBuffer_.write(sizeFiller_, 0, 4);
readBuffer_= new TMemoryInputTransport(_configuration, new byte[0]);
}
public void open() throws TTransportException {
// getInnerTransport() 得到的就是传入的 transport
getInnerTransport().open();
}
...
}
从代码来看,TFramedTransport
就是个包装类,构建方法中需要传入具体的TTransport
,具体的IO操作由传入的TTransport
来实现,从注释来看,主要作用就是为TTransport
提供缓冲功能,使用时也是配合其他TTransport
使用,如配合TNonblockingSocket
使用:
// io 操作由 TNonblockingSocket 实现
TNonblockingSocket tSocket = new TNonblockingSocket(host, port);
TTransport transport = new TFramedTransport(tSocket);
服务端
Thrift
服务端支持的 server
类型如下:
TSimpleServer
:单线程服务器端,使用标准的阻塞式I/OTThreadPoolServer
:多线程服务器端,使用标准的阻塞式I/OTNonblockingServer
:单线程服务器端,使用非阻塞式I/OTHsHaServer
:半同步半异步服务器端,基于非阻塞式IO读写和多线程工作任务处理TThreadedSelectorServer
:多线程选择器服务器端,对THsHaServer
在异步IO模型上进行增强
单线程阻塞server
:TSimpleServer
单线程服务器端,使用标准的阻塞式I/O,使用方法如下:
TServerTransport transport = new TServerSocket(port);
TServer server = new TSimpleServer(new TServer.Args(transport).processor(processor));
在上一篇文章中,我们就是以该server
类型为实例进行分析的,这里就不再分析了。
我们来总结下这种IO模型的优缺点:
优点:实现简单 缺点: 单线程,无法发挥服务器多核优势 由于使用的是阻塞IO,一个连接占用一个线程,因此只能处理一个连接,多于1个连接的情况下,其余的连接会等待
从上面的优缺点来看,单线程阻塞模型除了实现简单外,其他的都是缺点,在生产中一般不会用。
多线程阻塞server
:TThreadPoolServer
多线程服务器端,使用标准的阻塞式I/O,使用方法如下:
// 生成 TServerSocket
TServerTransport transport = new TServerSocket(port);
// 生成 TServer
TServer server = new TThreadPoolServer(
new TThreadPoolServer.Args(transport).processor(processor));
server.serve();
在使用TThreadPoolServer
时 ,需要传入Socket
,这里使用的是TServerSocket
,这里我们分别来分析这两个类。
TServerSocket
TServerSocket
的创建过程如下:
public class TServerSocket extends TServerTransport {
/**
* 构造方法,传入 port
*/
public TServerSocket(int port) throws TTransportException {
this(port, 0);
}
/**
* 构造方法,传入 bindAddr 与 port
*/
public TServerSocket(InetSocketAddress bindAddr, int clientTimeout)
throws TTransportException {
this(new ServerSocketTransportArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
}
/**
* 构造方法
* args 里封装的就是 port、clientTimeout
*/
public TServerSocket(ServerSocketTransportArgs args)
throws TTransportException {
clientTimeout_ = args.clientTimeout;
if (args.serverSocket != null) {
this.serverSocket_ = args.serverSocket;
return;
}
try {
// ServerSocket:阻塞 io
serverSocket_ = new ServerSocket();
serverSocket_.setReuseAddress(true);
// 绑定地址与端口
serverSocket_.bind(args.bindAddr, args.backlog);
} catch (IOException ioe) {
close();
throw new TTransportException("Could not create ServerSocket on address "
+ args.bindAddr.toString() + ".", ioe);
}
}
从代码上来看,TServerSocket
中使用的是 ServerSocket
,这是个阻塞IO。阻塞IO中,服务端使用的是ServerSocket
,客户端使用的Socket
,两者一般配套使用,即服务端使用ServerSocket
开启端口监听,客户端使用的Socket
创建socket
连接。
TThreadPoolServer
TThreadPoolServer
的创建过程如下:
public class TThreadPoolServer extends TServer {
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class);
public static class Args extends AbstractServerArgs<Args> {
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
public ExecutorService executorService;
...
}
public void serve() {
if (!preServe()) {
return;
}
// 执行操作
execute();
...
}
/**
* 具体的执行操作,在这里会打开连接
*/
protected void execute() {
while (!stopped_) {
try {
// accept() 会阻塞,等待连接的到来
TTransport client = serverTransport_.accept();
try {
// 连接来了,就丢到线程池中执行
executorService_.execute(new WorkerProcess(client));
} catch (RejectedExecutionException ree) {
if (!stopped_) {
LOGGER.warn("...");
}
client.close();
}
} catch (TTransportException ttx) {
if (!stopped_) {
LOGGER.warn("Transport error occurred during acceptance of message", ttx);
}
}
}
}
}
与单线程阻塞server
不同的是,TThreadPoolServer
会持有一个工作线程池,当连接创建后,就会把连接交给线程池处理。
处理流程示意图如下:
这里的handler
就是线程池中的一个个线程。
这部分的最后,我们也来总结下这种模型的优缺点:
优点:相比于单线程处理,使用多线程后,大大提高了连接处理数 缺点:由于阻塞IO一个连接占用一个线程,因此连接数受限于线程池的数量,无法处理海量连接
虽然该模型无法处理海量连接,但在没有NIO
的年代,这种模型是最主要的处理网络请求的方式,不过现在基本用NIO了。
单线程非阻塞server
:TNonblockingServer
上面介绍的两种网络模型都是基于阻塞IO的,接下来我们介绍非阻塞IO。
最简单的非阻塞IO模型:单线程非阻塞,使用方式如下:
// 指定socket:这里使用的就是非阻塞socket了
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
// 创建服务端
TServer server = new TNonblockingServer(
new TNonblockingServer.Args(transport).processor(processor));
server.serve();
以上代码中,使用到了两个类:
TNonblockingServerSocket:指定socket类型 TNonblockingServer:指定server类型
接下来我们就来分析这两个类的具体实现。
TNonblockingServerSocket
TNonblockingServerSocket
的构造方法如下:
TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args)
throws TTransportException {
clientTimeout_ = args.clientTimeout;
try {
serverSocketChannel = ServerSocketChannel.open();
// 配置为非阻塞
serverSocketChannel.configureBlocking(false);
// serverSocketChannel:非阻塞io
serverSocket_ = serverSocketChannel.socket();
serverSocket_.setReuseAddress(true);
// 绑定地址与端口
serverSocket_.bind(args.bindAddr, args.backlog);
} catch (IOException ioe) {
serverSocket_ = null;
throw new TTransportException("Could not create ServerSocket on address "
+ args.bindAddr.toString() + ".", ioe);
}
}
从上述代码中可以看到,在TNonblockingServerSocket
中使用的是ServerSocketChannel
,这是非阻塞IO提供的类,作用相当于阻塞IO中的ServerSocket
.
TNonblockingServer.Args
TNonblockingServer
的构造方法里需要传入AbstractNonblockingServerArgs
参数,进入TNonblockingServer.Args
,它现它仅是继承了AbstractNonblockingServerArgs
:
public static class Args extends AbstractNonblockingServerArgs<Args> {
public Args(TNonblockingServerTransport transport) {
super(transport);
}
}
继续进入AbstractNonblockingServerArgs
:
public static abstract class AbstractNonblockingServerArgs
<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
public long maxReadBufferBytes = 256 * 1024 * 1024;
public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
super(transport);
// 设置 transportFactory
transportFactory(new TFramedTransport.Factory());
}
}
在这个方法里,调用transportFactory(...)
设置了transport
,这里使用的transpost
是TFramedTransport
,这点有别于阻塞IO.
TNonblockingServer
接下来我们来看看TNonblockingServer
,构造方法如下:
public class TNonblockingServer extends AbstractNonblockingServer {
...
private SelectAcceptThread selectAcceptThread_;
/**
* 构造方法
*/
public TNonblockingServer(AbstractNonblockingServerArgs args) {
super(args);
}
...
}
只是调用了父类的构造方法,追踪下去后,发现所做的工作也很简单,只是把传入的TNonblockingServerSocket
做了一个赋值操作,这里就不多分析了。
接下来我们来看看TNonblockingServer
的serve()
方法,进入AbstractNonblockingServer#serve
:
public void serve() {
// 启动处理线程
if (!startThreads()) {
return;
}
// 启动监听,其实只是设置了`soTimeout`属性值
if (!startListening()) {
return;
}
// 设置服务标识
setServing(true);
// 上面启动的处理线程join到主线程中,就只是执行了 thread.join() 方法
// 当前线程会在这里阻塞
waitForShutdown();
// 设置服务标识
setServing(false);
// 关闭监听,执行的是 serverTransport.close()操作,即关闭serverSocketChannel
stopListening();
}
AbstractNonblockingServer#serve
就是整个连接处理流程了,关键部分已作了详细的注释。
从代码上来看,整个过程共两个线程:
主线程:启动连接服务后,会阻塞直到关闭 连接处理线程:处理连接请求
这里我们主要关注连接处理线程,启动该线程的方法为TNonblockingServer#startThreads
,代码如下:
protected boolean startThreads() {
// start the selector
try {
selectAcceptThread_ = new SelectAcceptThread(
(TNonblockingServerTransport)serverTransport_);
// 启动线程
selectAcceptThread_.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start selector thread!", e);
return false;
}
}
这里又出现了一个类:SelectAcceptThread
,从代码的执行来看,它显然是Thread
或Runnable
的实现,我们进入它的构造方法:
public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
throws IOException {
this.serverTransport = serverTransport;
// 注册操作selector
serverTransport.registerSelector(selector);
}
以上代码的关键在于注册selector
的操作,我们进入TNonblockingServerSocket#registerSelector
一探究竟:
public void registerSelector(Selector selector) {
try {
// 将 selector 注册到 serverSocketChannel,监听的事件是连接事件(OP_ACCEPT)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
// this shouldn't happen, ideally...
// TODO: decide what to do with this.
}
}
其实这就是往serverSocketChannel
中注册selector
,关心的事件是连接事件(OP_ACCEPT
),也就是说, 当新连接过来时,该selector
会被唤醒,这一块是NIO
网络编程的老套路了,这里就不多做解释了。
上述出现了selector
,那么这个selector
是从何而来呢?
经过本人的苦苦寻觅,在SelectAcceptThread
的父类AbstractSelectThread
中找到了selector
的来源(AbstractNonblockingServer.AbstractSelectThread
):
protected abstract class AbstractSelectThread extends Thread {
protected Selector selector;
// List of FrameBuffers that want to change their selection interests.
protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
public AbstractSelectThread() throws IOException {
// 打开一个 selector 对象
this.selector = SelectorProvider.provider().openSelector();
}
...
}
selector
是由 SelectorProvider.provider().openSelector()
获取的,这也是jdk NIO网络编程提供的功能,这里就不多分析了。值得一提的是,selector
是SelectAcceptThread
的成员变量,即selector
只为当前线程持有。
接着我们把目光聚集于SelectAcceptThread
对象,进入其run()
方法,查看该线程所做的工作:
// TNonblockingServer.SelectAcceptThread#run
public void run() {
try {
// 处理 eventHandler
if (eventHandler_ != null) {
eventHandler_.preServe();
}
while (!stopped_) {
// select 操作
select();
// 处理事件变更
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
// 清除 selectionKey
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
stopped_ = true;
}
}
TNonblockingServer.SelectAcceptThread#run
方法的重点地方都已做了注释,这里我们仅关注主要操作,进 入 TNonblockingServer.SelectAcceptThread#select
方法:
private void select() {
try {
// select 操作,这里会阻塞
selector.select();
// 获取 selectionKey
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// 跳过无效的key
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// 处理具体的key,如:连接事件、读事件、写事件
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
handleRead(key);
} else if (key.isWritable()) {
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
我们进入SelectAcceptThread#handleAccept
方法,来看看连接事件是如何处理的:
private void handleAccept() throws IOException {
SelectionKey clientKey = null;
TNonblockingTransport client = null;
try {
// 得到连接
client = serverTransport.accept();
// 得到连接后,再将该连接注册到 selector 上,这次监听的事件是 读事件(OP_READ)
clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
// 得到 frameBuffer
FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);
clientKey.attach(frameBuffer);
} catch (TTransportException tte) {
...
}
}
从以上代码来看,连接处理步骤如下:
获取连接 将连接注册到 selector
,监听的事件是读事件(OP_READ)获取连接数据 frameBuffer
,并放入到SelectionKey
中
这几个操作也是标准的NIO
操作了,注册到selector
后,接下来就是处理selector
的读事件了,我们进入方法 AbstractNonblockingServer.AbstractSelectThread#handleRead
:
protected void handleRead(SelectionKey key) {
// 获取 SelectionKey 的数据
FrameBuffer buffer = (FrameBuffer) key.attachment();
// 进行读数据操作
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
// 数据已读完,调用处理方法
if (buffer.isFrameFullyRead()) {
// 调用处理方法
if (!requestInvoke(buffer)) {
cleanupSelectionKey(key);
}
}
}
最终调用AbstractNonblockingServer.FrameBuffer#invoke
方法来处理读事件:
public void invoke() {
frameTrans_.reset(buffer_.array());
response_.reset();
try {
// 执行 eventHandler 的方法
if (eventHandler_ != null) {
eventHandler_.processContext(context_, inTrans_, outTrans_);
}
// 执行 Processor,这里最终会调用具体的服务端实现
processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
responseReady();
return;
} catch (TException te) {
LOGGER.warn("Exception while invoking!", te);
} catch (Throwable t) {
LOGGER.error("Unexpected throwable while invoking!", t);
}
// This will only be reached when there is a throwable.
state_ = FrameBufferState.AWAITING_CLOSE;
requestSelectInterestChange();
}
AbstractNonblockingServer.FrameBuffer#invoke
方法就是实际的处理类了,processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_)
最终执行的就是我们自己实现类的方法,如HelloServiceImpl#hello
。
处理流程示意图如下:
与阻塞IO相比,使用了selector
后,线程大部分时间是处于select
状态,只有在selector
上发生读/写/连接
事件时,线程才会处理业务操作。
尽管SelectAcceptThread
并不会被某一个连接独占,但处理读/写/连接
事件时,只有一个线程在进行,如果线程正好在处理连接A
的读事件,此时连接B
再发生了读事件,那么线程就无法立即处理连接B
的读事件了,只有处理完连接A
的读事件后线程才会继续处理连接B
的读事件。
由此可见,非阻塞IO下虽然可以同时处理多个连接请求,但由于使用的是单线程,请求处理时可能并不会很及时。
本节的最后,我们来总结下TNonblockingServer
的特点:
「主线程」启动服务并阻塞,等待服务停止 「 SelectAcceptThread
线程」调用selector.select()
获取读/写/连接
事件并处理,处理时有可能会阻塞select
操作
半同步半异步server
:THsHaServer
上一节我们分析到,在单线程非阻塞模型下,由于使用的是单线程,当线程正在处理业务操作时,其他请求处理时可能并不会很及时。为了提高响应速度,thrift
在单线程非阻塞模型上引入了线程池操作,这就是THsHaServer
,处理流程如下:
从整个操作来看,就是把TNonblockingServer
的SelectAcceptThread
放在了线程池中执行,接下来我们就来具体分析这块的操作。
THsHaServer
的使用方式如下:
// 创建 ServerSocket,这块与 TNonblockingServer 一致
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
// 创建 THsHaServer
TServer server = new THsHaServer(new THsHaServer.Args(transport).processor(processor));
server.serve();
这部分的操作与TNonblockingServer
很相似:
创建了 TNonblockingServerSocket
对象创建 THsHaServer
对象调用 THsHaServer#serve
方法提供对外服务
关于TNonblockingServerSocket
对象的操作在上一节已经分析过了,这里我们直接分析THsHaServer
。
THsHaServer
构造方法
THsHaServer
构造方法如下:
/**
* 继承了 TNonblockingServer
*/
public class THsHaServer extends TNonblockingServer {
...
/**
* 构造方法
*/
public THsHaServer(Args args) {
super(args);
// 创建线程池
invoker = args.executorService == null
? createInvokerPool(args) : args.executorService;
this.args = args;
}
}
从代码上来看,THsHaServer
继承了TNonblockingServer
,这就继承了TNonblockingServer
中的大多数特性。在其构造方法中,先是调用了父类的构造方法,然后创建了一个线程池,这里我们来看看线程池的创建操作:
// 线程池
private ExecutorService executorService = null;
// 创建线程池
protected static ExecutorService createInvokerPool(Args options) {
// 获取参数
int minWorkerThreads = options.minWorkerThreads;
int maxWorkerThreads = options.maxWorkerThreads;
int stopTimeoutVal = options.stopTimeoutVal;
TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
// 使用的是 ThreadPoolExecutor,jdk提供的线程池
ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
return invoker;
}
THsHaServer
中使用的线程池是通过ThreadPoolExecutor
直接创建的。
THsHaServer#serve
方法
THsHaServer
并没有实现serve()
方法,它的serve()
方法继承自TNonblockingServer
,这块在上一节已经分析过了,这里就不再分析了。
requestInvoke(...)
方法的调用
在分析TNonblockingServer
时,我们看到请求是在TNonblockingServer#requestInvoke
方法中处理的,THsHaServer
对该方法进行了重写,代码如下:
protected boolean requestInvoke(FrameBuffer frameBuffer) {
try {
// 获取 invocation
Runnable invocation = getRunnable(frameBuffer);
// 放入线程池中执行
invoker.execute(invocation);
return true;
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected execution!", rx);
return false;
}
}
// 获取 Invocation
protected Runnable getRunnable(FrameBuffer frameBuffer){
return new Invocation(frameBuffer);
}
...
}
这一步的操作很简单,就是通过frameBuffer
获取Runnable
实例,然后把Runnable
实例放入线程池中执行,这里我们重点来看Invocation
实例是个啥,进入Invocation
:
class Invocation implements Runnable {
private final FrameBuffer frameBuffer;
public Invocation(final FrameBuffer frameBuffer) {
this.frameBuffer = frameBuffer;
}
/**
* 线程池执行的内容
*/
public void run() {
frameBuffer.invoke();
}
}
从 代码来看,requestInvoke(...)
方法所做的主要工作是把 FrameBuffer#invoke
方法的调用放到了线程里。
本节最后,我们来总结下THsHaServer
的特点:
「主线程」启动服务并阻塞,等待服务停止 「 SelectAcceptThread
线程」调用selector.select()
获取读/写/连接
事件将上一步获取到的事件放入「线程池」中处理
同TNonblockingServer
相比,引入线程池后,业务操作可以放到线程池中执行,SelectAcceptThread
线程可安心地进行select
,不必担心被耗时的业务处理操作阻塞了。
多路复用 server
:TThreadedSelectorServer
在多数场景下,THsHaServer
就可应对了,不过由于SelectAcceptThread
线程既处理连接事件,又处理读写事件,在高并发场景下,读写事件会相当频繁(一个连接只会触发一次连接事件,但会触发多次读写事件),因此被频繁触发的读写事件极有可能会成为SelectAcceptThread
的瓶颈,导致SelectAcceptThread
无法及时获取到读写请求。
为解决读写频繁导致SelectAcceptThread
的瓶颈问题,聪明的开发者们也想用多线程来处理读写事件,即读写事件分散给多个线程处理,这就有了TThreadedSelectorServer
。
TThreadedSelectorServer
的处理流程如下:
从图中来看,整个TThreadedSelectorServer
包含两个selector
:
主 selector
:处理连接事件,单线程,当触发连接事件后,该channel
会注册到从selector
上从 selector
:处理读写事件,多线程,每个线程都有一个selector
对象,当channel
注册到该selector
后,之后channel
上的所有读写事件就都由该selector
对应的线程处理了
TThreadedSelectorServer
的使用方式如下:
// 依然是 TNonblockingServerSocket
TNonblockingServerSocket transport = new TNonblockingServerSocket(port);
// 创建 TThreadedSelectorServer
TServer server = new TThreadedSelectorServer(
new TThreadedSelectorServer.Args(transport).processor(processor));
// 启动服务
server.serve();
由于TNonblockingServerSocket
前面已经分析过了,这里我们的的重点还是TThreadedSelectorServer
,进入构造方法:
/**
* AbstractNonblockingServer 的子类
*/
public class TThreadedSelectorServer extends AbstractNonblockingServer {
...
public TThreadedSelectorServer(Args args) {
super(args);
args.validate();
// 创建线程池
invoker = args.executorService == null
? createDefaultExecutor(args) : args.executorService;
this.args = args;
}
/**
* 创建线程池
*/
protected static ExecutorService createDefaultExecutor(Args options) {
// 默认使用 newFixedThreadPool 创建
return (options.workerThreads > 0)
? Executors.newFixedThreadPool(options.workerThreads) : null;
}
...
TThreadedSelectorServer
的构造方法还是比较简单的,做了一些赋值操作,然后创建了一个线程池,这里就不多分析了。
接下来我们来看看serve()
方法,进入AbstractNonblockingServer#serve
:
public void serve() {
// 启动处理线程
if (!startThreads()) {
return;
}
// 启动监听,其实只是设置了`soTimeout`属性值
if (!startListening()) {
return;
}
// 设置服务标识
setServing(true);
// 上面启动的处理线程join到主线程中,就只是执行了 thread.join() 方法
// 当前线程会在这里阻塞
waitForShutdown();
// 设置服务标识
setServing(false);
// 关闭监听,执行的是 serverTransport.close()操作,即关闭serverSocketChannel
stopListening();
}
实际上,这个方法我们在分析TNonblockingServer
时已经分析过了,TNonblockingServer
实现的也是AbstractNonblockingServer
类,因此调用TNonblockingServer#serve
方法时,也是调用到这个方法的。
不过,尽管AbstractNonblockingServer#serve
的内容一致,但具体的实现却大不相同,这里我们主要关注startThreads()
方法,来看看它的实现。startThreads()
方法的实现为TThreadedSelectorServer#startThreads
,我们跟进去:
protected boolean startThreads() {
try {
for (int i = 0; i < args.selectorThreads; ++i) {
// selector 线程
selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
}
acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
createSelectorThreadLoadBalancer(selectorThreads));
// 启动一个个 selector 线程
for (SelectorThread thread : selectorThreads) {
thread.start();
}
// 启动 acceptThread 线程
acceptThread.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start threads!", e);
return false;
}
}
这一步启动了两个线程:
acceptThread
线程:处理连接事件selectorThreads
线程组:处理读写事件
下面我们就来分析这两个线程是如何进行合作的。
acceptThread
线程
AcceptThread
的构造方法如下:
public AcceptThread(TNonblockingServerTransport serverTransport,
SelectorThreadLoadBalancer threadChooser) throws IOException {
this.serverTransport = serverTransport;
// 线程选择器,用来处理如何从众多的selectorThreads中选择一个线程
this.threadChooser = threadChooser;
// 创建一个 accept selector
this.acceptSelector = SelectorProvider.provider().openSelector();
// 注册到 serverTransport,进入该方法会发现该 Selector 只关注 SelectionKey.OP_ACCEPT 事件
this.serverTransport.registerSelector(acceptSelector);
}
再来看看AcceptThread
线程的执行内容,进入TThreadedSelectorServer.AcceptThread#run
方法:
public void run() {
try {
// 执行 eventHandler
if (eventHandler_ != null) {
eventHandler_.preServe();
}
// 不断地循环调用 select
while (!stopped_) {
select();
}
} catch (Throwable t) {
LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
} finally {
...
}
}
继续进入TThreadedSelectorServer.AcceptThread#select
方法,看看具体的select
操作:
private void select() {
try {
// 进行select操作,这里会阻塞,直到有连接请求的到来
acceptSelector.select();
// 由于该 selector 只关注连接事件,执行到这 里就表示获取到了连接
Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// 跳过无效的连接
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
// 连接处理操作
handleAccept();
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
继续,我们再来看看连接处理操作,进入TThreadedSelectorServer.AcceptThread#handleAccept
方法:
private void handleAccept() {
final TNonblockingTransport client = doAccept();
if (client != null) {
// 为当前连接事件选择一个线程
final SelectorThread targetThread = threadChooser.nextThread();
if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
// 直接在当前线程中执行
doAddAccept(targetThread, client);
} else {
// 放在线程池中执行
try {
invoker.submit(new Runnable() {
public void run() {
doAddAccept(targetThread, client);
}
});
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected accept registration!", rx);
// close immediately
client.close();
}
}
}
}
继续探索doAddAccept(...)
的执行操作,进入TThreadedSelectorServer.AcceptThread#doAddAccept
方法:
private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
// 在if条件中调用SelectorThread#addAcceptedConnection方法
if (!thread.addAcceptedConnection(client)) {
client.close();
}
}
继续进入TThreadedSelectorServer.SelectorThread#addAcceptedConnection
:
public boolean addAcceptedConnection(TNonblockingTransport accepted) {
try {
// 添加到队列中
acceptedQueue.put(accepted);
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while adding accepted connection!", e);
return false;
}
// 将 selector 从 select() 方法中唤醒
selector.wakeup();
return true;
}
到这里,AcceptThread
的操作就完成了。
selectorThreads
线程组
连接请求放入到acceptedQueue
之后,后续该如何处理呢?这就是SelectorThread
的工作了,SelectorThread
的构造方法如下:
public SelectorThread() throws IOException {
this(new LinkedBlockingQueue<TNonblockingTransport>());
}
/**
* 有参构造
*/
public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue)
throws IOException {
this.acceptedQueue = acceptedQueue;
}
构造方法中仅是做了一个参数赋值,就不多分析了,我们重点关注SelectorThread#run
方法,看看线程的执行内容:
public void run() {
try {
while (!stopped_) {
// 处理读写事件
select();
// 处理连接事件
processAcceptedConnections();
// 处理key
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
} finally {
...
}
}
这里我们重点关注读写事件与连接事件的处理。
先看连接事件的处理,处理方法为TThreadedSelectorServer.SelectorThread#processAcceptedConnections
,代码如下:
private void processAcceptedConnections() {
// 循环操作
while (!stopped_) {
// 从acceptedQueue获取连接请求
TNonblockingTransport accepted = acceptedQueue.poll();
// 未获取到,取出循环
if (accepted == null) {
break;
}
// 注册连接请求
registerAccepted(accepted);
}
}
还记得在 acceptThread
最终会向acceptedQueue
添加连接请求吗,那个连接请求就是在这里拿出来的,获取到连接请求后,接着就调用TThreadedSelectorServer.SelectorThread#registerAccepted
进行注册操作了:
private void registerAccepted(TNonblockingTransport accepted) {
SelectionKey clientKey = null;
try {
// 注册到 selector,关注的事件是 OP_READ(读事件)
clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
// 获取请求数据
FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);
clientKey.attach(frameBuffer);
} catch (IOException | TTransportException e) {
LOGGER.warn("Failed to register accepted connection to selector!", e);
if (clientKey != null) {
cleanupSelectionKey(clientKey);
}
accepted.close();
}
}
到这里,就把连接请求注册到SelectorThread
的成员变量selector
上了,之后读写请求就由该selector
来处理了。
接下来我们再来看看读写请求的select
操作,进入TThreadedSelectorServer.SelectorThread#select
方法:
private void select() {
try {
// 处理select操作
doSelect();
// 如果获取到了 selectedKeys,即有新的读写请求
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// 跳过无效的请求
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// 处理读写操作,调用的是 AbstractNonblockingServer 的方法
if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
这个方法的关键操作分为两步:
进行 select
操作,获取该selector
上发生的读写事件处理读写事件
处理读写事件的操作调用的是AbstractNonblockingServer
方法,前面已经分析过 了,这里我们重点关注select
操作,进入 TThreadedSelectorServer.SelectorThread#doSelect
方法:
private void doSelect() throws IOException {
long beforeSelect = System.currentTimeMillis();
// select 操作
int selectedNums = selector.select();
long afterSelect = System.currentTimeMillis();
// 可能出现了jvm bug:没有获取到内容,但阻塞被唤醒了
if (selectedNums == 0) {
jvmBug++;
} else {
jvmBug = 0;
}
// 也有可能是调用 selector.weakup() 唤醒的,这里就进行次数重置
long selectedTime = afterSelect - beforeSelect;
// MONITOR_PERIOD = 1000L;
if (selectedTime >= MONITOR_PERIOD) {
jvmBug = 0;
// SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
} else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.",
MONITOR_PERIOD, jvmBug);
// 重建 selector
rebuildSelector();
selector.selectNow();
jvmBug = 0;
}
}
上面的方法中,抛出jvm bug
的处理,就只有一行关键代码:
selector.select();
这是jdk提供的方法,就是获取当前selector
上触发的读写事件,这里就不多分析了。
这里我们重点看看这里的jvm bug
是个啥。实际上,这个jvm bug
就是大名鼎鼎(或臭名昭著)的selector
空轮询bug,关于该bug,不了解的小伙伴可以看看JDK Epoll空轮询bug一文,这里引用文章中的一张图:
总结下就是,select()
没有获取到读写请求就立即返回了,由于while(true)
的存在,就导致代码变成了如下操作:
while(true) {
// 这里面的内容就像不存在一样
}
这样整个线程就在空转,白白消耗cpu资源,cpu使用量很快就到100%
了。
那么thrift
是如何解决这个问题的呢?
从代码来看,thrift
处理方式如下:获取到的事件数量为0,且阻塞时间小于1s时,就表明发生了空转,当空转次数达到512次时,就重新构建selector
。
本节最后,我们来总结下TThreadedSelectorServer
的特点:
「主线程」启动服务并阻塞,等待服务停止 「 AcceptThread
线程」调用selector.select()
获取连接
事件,并从SelectorThread
线程组中选择一个SelectorThread
线程,将该连接事件放入到SelectorThread
对应的acceptedQueue
「 SelectorThread
线程组」包含多个**SelectorThread
线程88,在线程执行时,会从acceptedQueue
连接事件,然后将该连接注册到其成员变量selector
上,这样就完成连接与**SelectorThread
**线程的绑定「 SelectorThread
线程」执行select
操作,获取selector
上发生的读写事件
总结
本文介绍了thrift的几种网络模型,总结如下:
类 | 网络模型 | 说明 |
---|---|---|
TSimpleServer | 单线程阻塞IO | 单线程,使用的是阻塞IO,只能同时维持一个连接 |
TThreadPoolServer | 多线程阻塞IO | 基于线程池的阻塞IO,能同时维持的连接数取决于线程池大小 |
TNonblockingServer | 单线程单selector | 单线程,使用的是非阻塞IO,能同时维持多个连接,但只能同时处理一个连接上的读写请求 |
THsHaServer | 多线程单selector | TNonblockingServer的多线程版本,能同时维持多个连接,单线程处理连接事件,线程池处理读写事件,可同时处理多个连接上的读写请求,数量由线程池大小决定 |
TThreadedSelectorServer | 多线程主从selector | 单线程的主selector用于处理连接事件,多线程的从selector用于处理读写请求事件,可同时处理多个连接上的读写请求,可处理海量连接服务 |
关于thrift
的网络模型就介绍到这里了。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!