04. Apache thrift 之传输协议
我们都知道,数据在网络上是以二进制方式传输的。
对于一个java对象,从客户端通过网络传输到服务端时,客户端需要将其转换为二进制,然后写入网络IO;服务端从网络IO接收到数据时,也需要将二进制数据转换为对象,然后再进行操作。
这两个转换过程,统称为编解码操作。
在Thrift
中,编解码操作又叫传输协议,由抽象类 TProtocol
来定义,它的方法主要分为两大类:
TProtocol
写入方法:
TProtocol
读取方法:
写入方法规定了数据转换成二进制的方式,读取方法规定了将二进制转换为数据的方式。
thrift
中常用的传输协议有以下几种:
TBinaryProtocol
:二进制编码格式进行数据传输TCompactProtocol
:高效率的、密集的二进制编码格式进行数据传输TJSONProtocol
:使用JSON文本的数据编码协议进行数据传输TSimpleJSONProtocol
:只提供JSON只写的协议,适用于通过脚本语言解析TMultiplexedProtocol
:复合协议,同时处理多个服务
这些传输协议都是TProtocol
的子类,接下来我们来分析下这些方法的操作。
TBinaryProtocol
TBinaryProtocol
是最基本的实现,得到的二进制数据是原始数据(相比于压缩数据而言),这里我们选取几个方法了解转换过程。
读写 I32
类型数据
所谓的I32
数据,是指32位的整型数据,也就是java中的int
类型,写入方法如下:
public void writeI32(int i32) throws TException {
// 将i32类型的数据转换为byte数组
inoutTemp[0] = (byte)(0xff & (i32 >> 24));
inoutTemp[1] = (byte)(0xff & (i32 >> 16));
inoutTemp[2] = (byte)(0xff & (i32 >> 8));
inoutTemp[3] = (byte)(0xff & (i32));
trans_.write(inoutTemp, 0, 4);
}
代码中主要是通过位运算,将int
类型转换为长度为4的byte数组。
int
类型的读取操作如下:
@Override
public int readI32() throws TException {
byte[] buf = inoutTemp;
int off = 0;
// 读取4个byte
if (trans_.getBytesRemainingInBuffer() >= 4) {
buf = trans_.getBuffer();
off = trans_.getBufferPosition();
trans_.consumeBuffer(4);
} else {
readAll(inoutTemp, 0, 4);
}
// 通过位运算转换为int类型
return
((buf[off] & 0xff) << 24) |
((buf[off+1] & 0xff) << 16) |
((buf[off+2] & 0xff) << 8) |
((buf[off+3] & 0xff));
}
读取操作比较简单,先是在数据流上读取4个byte,然后通过位运算将这4个byte转换为int数据。
读写 I64
数据
基本数据类型都是这么操作的,比如long
类型:
@Override
public void writeI64(long i64) throws TException {
// 将i64类型的数据转换为byte数组
inoutTemp[0] = (byte)(0xff & (i64 >> 56));
inoutTemp[1] = (byte)(0xff & (i64 >> 48));
inoutTemp[2] = (byte)(0xff & (i64 >> 40));
inoutTemp[3] = (byte)(0xff & (i64 >> 32));
inoutTemp[4] = (byte)(0xff & (i64 >> 24));
inoutTemp[5] = (byte)(0xff & (i64 >> 16));
inoutTemp[6] = (byte)(0xff & (i64 >> 8));
inoutTemp[7] = (byte)(0xff & (i64));
trans_.write(inoutTemp, 0, 8);
}
long类型占8个byte,因此转换后的byte数组长度为8.
写入方法如下:
@Override
public long readI64() throws TException {
byte[] buf = inoutTemp;
int off = 0;
// 读取8个byte
if (trans_.getBytesRemainingInBuffer() >= 8) {
buf = trans_.getBuffer();
off = trans_.getBufferPosition();
trans_.consumeBuffer(8);
} else {
readAll(inoutTemp, 0, 8);
}
// 将byte数组转换为log
return
((long)(buf[off] & 0xff) << 56) |
((long)(buf[off+1] & 0xff) << 48) |
((long)(buf[off+2] & 0xff) << 40) |
((long)(buf[off+3] & 0xff) << 32) |
((long)(buf[off+4] & 0xff) << 24) |
((long)(buf[off+5] & 0xff) << 16) |
((long)(buf[off+6] & 0xff) << 8) |
((long)(buf[off+7] & 0xff));
}
long
数据的读取操作与int
类型非常相似,先是在数据流上读取8个byte,然后通过位运算将这8个byte转换为long数据。
像byte
、char
、short
等整形数据的操作方式类似:先是在数据流上读取固定长度的byte,然后通过位运算将这些byte转换为指定类型的数据,这里就不多说了。
读写 boolean
类型数据
thrift
在处理boolean
类型数据时,是将其按照byte
类型来处理:
public void writeBool(boolean b) throws TException {
writeByte(b ? (byte)1 : (byte)0);
}
在写入boolean
类型数据时,直接按byte
类型来处理,0为false,1为true.
读取操作也是按byte类型来处理,这里就不多说了。
读写String
类型数据
前面介绍的数据类型都是固定长度的,如int
类型占4个byte,long
类型占8个byte,byte
类型占1个byte,对于String
这种可变长度的数据,该如何写入与读取呢?
String 类型的数据写入方式如下:
public void writeString(String str) throws TException {
byte[] dat = str.getBytes(StandardCharsets.UTF_8);
// 写入长度数据
writeI32(dat.length);
trans_.write(dat, 0, dat.length);
}
写入操作包含两部分:
写入长度 写入二进制数据
与数据类型不同,String
类型的数据长度并不固定,因此在写入时,需要先指定数据长度,这样才能在读取时,知道读取多少个byte
,它的读取方法如下:
@Override
public String readString() throws TException {
// 先读取 String 的长度
int size = readI32();
// 读取固定长度的byte,然后转换为String
if (trans_.getBytesRemainingInBuffer() >= size) {
String s = new String(trans_.getBuffer(), trans_.getBufferPosition(),
size, StandardCharsets.UTF_8);
trans_.consumeBuffer(size);
return s;
}
return readStringBody(size);
}
读写复杂对象数据
以上分析了基本类型的写入与读取,对于复杂对象,该如何读写呢?
需要说明的,复杂对象也是由基本对象构成的,比如QryResult
:
public class QryResult implements ... {
public int code;
public String msg;
...
}
它的写入方法为QryResult.QryResultStandardScheme#write
:
public void write(org.apache.thrift.protocol.TProtocol oprot, QryResult struct)
throws org.apache.thrift.TException {
struct.validate();
// 写入对象开始标识
oprot.writeStructBegin(STRUCT_DESC);
// 写入属性开始标识
oprot.writeFieldBegin(CODE_FIELD_DESC);
oprot.writeI32(struct.code);
// 写入结束开始标识
oprot.writeFieldEnd();
if (struct.msg != null) {
// 写入属性开始标识
oprot.writeFieldBegin(MSG_FIELD_DESC);
oprot.writeString(struct.msg);
// 写入结束开始标识
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
// 写入对象结束标识
oprot.writeStructEnd();
}
从写入操作来看,
对象写入前后,都会写入一个标识,表明对象的开始与结束位置 属性写入前后,都会写入一个标识,表明属性的开始与结束位置,属性会有多个 属性就是基本类型,如 byte
、int
、String
等
写入的属性开始标记CODE_FIELD_DESC
与MSG_FIELD_DESC
如下:
// code
private static final TField CODE_FIELD_DESC = new TField("code", I32, (short)1);
// msg
private static final TField MSG_FIELD_DESC = new TField("msg", TType.STRING, (short)2);
再来看看QryResult
对象的读取操作,进入QryResult.QryResultStandardScheme#read
方法:
public void read(org.apache.thrift.protocol.TProtocol iprot, QryResult struct)
throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
// 读取对象开始标记
iprot.readStructBegin();
while (true)
{
// 读取属性开始标记,标记会包含属性类型
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // CODE
// 判断类型
if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
// 读取 code,按 i32(也就是int)类型读取
struct.code = iprot.readI32();
struct.setCodeIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 2: // MSG
// 判断类型
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
// 读取 msg, 按 String 类型读取
struct.msg = iprot.readString();
struct.setMsgIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
// 读取属性结束标记
iprot.readFieldEnd();
}
// 读取对象结束标记
iprot.readStructEnd();
struct.validate();
}
读取操作如下:
读取对象开始标记 循环读取属性: 读取属性开始标记 读取属性值,由标识里包含里了属性值类型,读取属性值时只需要调用对应方法读取即可 读取属性结束标记 读取对象结束标记
以上就是复杂对象的读取方式了。
TCompactProtocol
接着我们来看看TCompactProtocol
,它的注释如下:
TCompactProtocol2 is the Java implementation of the compact protocol specified in THRIFT-110. The fundamental approach to reducing the overhead of structures is a) use variable-length integers all over the place and b) make use of unused bits wherever possible. Your savings will obviously vary based on the specific makeup of your structs, but in general, the more fields, nested structures, short strings and collections, and low-value i32 and i64 fields you have, the more benefit you'll see.
TCompactProtocol2是THRIFT-110中指定的紧凑协议的Java实现。减少结构开销的基本方法是:a)在整个位置使用长度可变的整数, b)尽可能使用未使用的位。根据结构的具体组成,您的节省显然会有所不同,但通常,您拥有的字段,嵌套结构,短字符串和集合以及低价值的i32和i64字段越多,您将看到更多的好处。
从注释来看,它是用来节省数据空间的,使用的是紧凑协议,我们来看看它是如何做到的。
读写 i32
类型数据
我们进入writeI32
方法:
public void writeI32(int i32) throws TException {
writeVarint32(intToZigZag(i32));
}
/**
* 转换
*/
private int intToZigZag(int n) {
return (n << 1) ^ (n >> 31);
}
/**
* 写入变长操作
*/
private void writeVarint32(int n) throws TException {
int idx = 0;
while (true) {
if ((n & ~0x7F) == 0) {
temp[idx++] = (byte)n;
// writeByteDirect((byte)n);
break;
// return;
} else {
temp[idx++] = (byte)((n & 0x7F) | 0x80);
// writeByteDirect((byte)((n & 0x7F) | 0x80));
n >>>= 7;
}
}
trans_.write(temp, 0, idx);
}
从代码来看,写入方法有两个操作:
使用 intToZigZag
将int
类型数据转换为ZigZag
使用 writeVarint32
将得到的ZigZag
写入
关于ZigZag
是个啥,我在网上找了一篇介绍文章:小而巧的数字压缩算法:zigzag((https://blog.csdn.net/zgwangbo/article/details/51590186)[https://blog.csdn.net/zgwangbo/article/details/51590186])
以int
类型为例,它的核心思想如下:对于int
类型的整数1(二进制为00000000_00000000_00000000_00000001
),如果直接按int
传输,前3个byte都是0,有效值仅在第4个byte上,这种情况下对于前3个byte的传输就是一种浪费。针对这种小整数的情况,ZigZag
的处理是,通过只传输有效的部分,降低传输的字节数,即最终只需要传输00000001
,这就可以少传输3个byte
。
根据ZigZag
的思想,ZigZag
仅对大于1个byte
的整型数据类型有效,比如char
、short
、int
、long
等。
读写double
类型数据
ZigZag
可处理大于1个byte
的整型数据类型,那能不能处理double
类型数据呢?
我们来看看double
类型的操作:
public void writeDouble(double dub) throws TException {
fixedLongToBytes(Double.doubleToLongBits(dub), temp, 0);
trans_.write(temp, 0, 8);
}
从代码来看,依然是写入了8个字节,因此并不能对double
进行压缩操作。
读写String
类型
再看看String
类型:
public void writeString(String str) throws TException {
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
// 长度还是可以使用的
writeVarint32(bytes.length);
trans_.write(bytes, 0, bytes.length);
}
在writeString
方法中,写入了两部分数据:
String
数据转化为二进制后的长度:int类型,可以使用仅是对长度使用了ZigZag
压缩String
具体数据:不是整数类型,不能使用ZigZag
压缩
可以看到,由于长度是int
类型,因此可以使用仅是对长度使用了ZigZag
算法,数据部分就无能为力.
从以上来分析来看,ZigZag
仅是对整形数据类型(char
、short
、int
、long
)的压缩,除此之外,对其他类型的数据就无能为力了。
TJSONProtocol 与 TSimpleJSONProtocol
关于json的操作,想必各位小伙伴已经很熟悉了,这里我们简单地了解下thrfit
的json序列化协议。
TJSONProtocol
的注释如下:
JSON protocol implementation for thrift. This is a full-featured protocol supporting write and read. Please see the C++ class header for a detailed description of the protocol's wire format.
thrift 的 JSON 协议实现。这是一个支持读写的全功能协议。请参阅 C++ 类头文件以获取协议的有线格式的详细说明。
注意到注释中的「这是一个支持读写的全功能协议」,这个是相对于TSimpleJSONProtocol
而言的,它的注释如下:
JSON protocol implementation for thrift. This protocol is write-only and produces a simple output format suitable for parsing by scripting languages. It should not be confused with the full-featured TJSONProtocol. JSON协议实现节俭。该协议是只写的,并且会生成一种简单的输出格式,适合通过脚本语言进行解析。不应将其与功能齐全的TJSONProtocol混淆。
thrift 中提供了两个json序列化协议:
TJSONProtocol
:支持「读写的全功能协议」TSimpleJSONProtocol
:「只写协议」
在thrift
中传输多是二进制协议,json
协议用的比较少,关于json
的读写操作,本文就不过多分析了,
TMultiplexedProtocol
TMultiplexedProtocol
并不是一个传输协议,而是协议的包装器,它允许Thrift
客户端在函数调用期间通过在服务名称之前添加服务名称来与Thrift
服务器进行通信。
如果使用了客户端使用了TMultiplexedProtocol
作为传输协议,在服务端,需要使用TMultiplexedProcessor
处理来自多路复用客户端的请求。
TMultiplexedProtocol
示例
下面我们使用TMultiplexedProtocol
实现单个套接字传输来调用两个服务的功能:
客户端:
TTransport transport = new TSocket("localhost", SERVER_PORT);
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
// 指定serviceName与处理的service
// helloService
TMultiplexedProtocol helloService = new TMultiplexedProtocol(
protocol, "helloService");
HelloService.Client client = new HelloService.Client(helloService);
System.out.println(client.hello("thrift world"));
// queryService
TMultiplexedProtocol helloProtocol = new TMultiplexedProtocol(
protocol, "queryService");
QueryService.Client queryClient = new QueryService.Client(helloProtocol);
System.out.println(queryClient.query(1));
服务端:
// 构建 processor,分别指定serivceNam与对应的Processor
TMultiplexedProcessor processor = new TMultiplexedProcessor();
processor.registerProcessor("helloService",
new HelloService.Processor<>(new HelloServiceImpl()));
processor.registerProcessor("queryService",
new QueryService.Processor<>(new QueryServiceImpl()));
// 生成 TServer 实例
TServer server = new TSimpleServer(
new TServer.Args(new TServerSocket(port)).processor(processor));
System.out.println("Starting the simple server...");
server.serve();
TMultiplexedProtocol
的实现原理
TMultiplexedProtocol
是如何做到区分服务的呢?答应就在serviceName
!
我们来看看TMultiplexedProtocol
构造方法:
public TMultiplexedProtocol(TProtocol protocol, String serviceName) {
super(protocol);
SERVICE_NAME = serviceName;
}
在TMultiplexedProtocol
的构造方法中,需要指定服务名(serviceName
)。
再来看看数据写入操作:
@Override
public void writeMessageBegin(TMessage tMessage) throws TException {
if (tMessage.type == TMessageType.CALL || tMessage.type == TMessageType.ONEWAY) {
super.writeMessageBegin(new TMessage(
// 写数据时,需要指定服务名
SERVICE_NAME + SEPARATOR + tMessage.name,
tMessage.type,
tMessage.seqid
));
} else {
super.writeMessageBegin(tMessage);
}
}
从代码来看,与其他Protocol
不同的是,TMultiplexedProtocol
在写入数据时,需要指定服务名。
thrift
客户端在请求服务端时,会带上当前请求的「服务名」,表明要调用的是哪个服务的方法;当服务端收到数据后,根据这个「服务名」来获取对应的服务,再调用该服务的方法,从而实现「多服务复用」功能。
总结
数据在网络传输中是以字节流的形式传输的,即使用的是二进制,所谓的传输协议本质上就是二进制与其他数据类型的转换操作。
本文介绍了thrift
提供的几种传输协议的实现,其中重点介绍了二进制传输协议TBinaryProtocol
的功能,具体介绍了int
、long
、String
、boolean
以及对象
类型与二进制相互转化的操作;
接着介绍了TCompactProtocol
的功能,它可以对整数类型的数据进行压缩,从而减少数据的传输,提高传输效率;
对于thrfit
提供的两个json
序列化协议,本文并没有深究,个人觉得了解即可;
最后介绍了TMultiplexedProtocol
协议,这是个包装协议,并不实现具体的读写操作,它的作用是,如果一个服务器上有多个thrift
服务,可以通过它来指定serviceName
从而确定调用的是哪个服务。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。
本文首发于微信公众号 「Java技术探秘」,如果您喜欢本文,欢迎关注该公众号,让我们一起在技术的世界里探秘吧!