Netty接收数据时一次读取多少字节以及读多少次

Netty历险记

共 6242字,需浏览 13分钟

 ·

2021-08-28 17:48


本篇文章介绍一下,Netty在接收到数据时,一次性读取多少字节.


本篇使用Netty构建一个简单的服务端,使用Python构建一个简单的客户端,然后客户端向服务端发送数据,然后观察Netty每次读取的字节数.


客户端代码如下


import socket
if __name__ == '__main__':
client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('127.0.0.1',8080)) client.send('''11111111111111111111111111111111111111111111\ 2222222222222222222222222222222222222222222222222\ 3333333333333333333333333333333333333333333333333\ 4444444444444444444444444444444444444444444444444\ 5555555555555555555555555555555555555555555555555\ 6666666666666666666666666666666666666666666666666\ 7777777777777777777777777777777777777777777777777\ 8888888888888888888888888888888888888888888888888\ 9999999999999999999999999999999999999999999999999\ 0000000000000000000000000000000000000000000000000\ 1111111111111111111111111111111111111111111111111\ 2222222222222222222222222222222222222222222222222\ 3333333333333333333333333333333333333333333333333\ 4444444444444444444444444444444444444444444444444\ 5555555555555555555555555555555555555555555555555\ 6666666666666666666666666666666666666666666666666\ 7777777777777777777777777777777777777777777777777\ 8888888888888888888888888888888888888888888888888\ 9999999999999999999999999999999999999999999999999'''.encode('utf-8'))


刻意让客户端一次性发送多一些数据 .


服务端代码如下


import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;
public class Server {

public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(8); EventLoopGroup businessGroup = new NioEventLoopGroup(8);
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ChannelPipeline channelPipeline = ch.pipeline(); channelPipeline.addLast(new StringEncoder()); channelPipeline.addLast(new StringDecoder()); channelPipeline.addLast(businessGroup, new ServerHandler()); } });
ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}



import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;
public class ServerHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println("接收到客户端信息:" + msg); }}



启动服务端,然后执行客户端发送数据.



这里我们借助wireshark工具查看数据包情况




如上图可知, 客户端(端口43262)向服务端(端口8080)首先进行了三次握手,握手之后,客户端一次性向服务端发送了1142个字节内容,之后进行了四次挥手.

接下来看一下服务端的打印日志情况.



从上图可以发现,共打印了两次. 客户端发送了一次数据,就把所有的数据发送完了,而服务端却打印了两次,难道是Netty读取了两次TCP中的数据?

接下来通过debug方式,观察下数据读取情况.

Netty读取数据的逻辑在以下类方法中

// 源码位置io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
@Overridepublic final void read() { ... byteBuf = allocHandle.allocate(allocator); // 读取数据 allocHandle.lastBytesRead(doReadBytes(byteBuf)); ... }


继续跟踪doReadBytes方法,最后会调用到


@Overridepublic int writeBytes(ScatteringByteChannel in, int length) throws IOException {    ensureWritable(length);    int writtenBytes = setBytes(writerIndex, in, length);    if (writtenBytes > 0) {        writerIndex += writtenBytes;    }    // 就在此处打断点    return writtenBytes;}


客户端重新发送数据,再进行测试



会发现,读取了1024个字节,放行



如上图,第二次读取了118字节. 两次加起来1024+118=1142个字节,和客户端发送的数据一致.
当然以上是我们通过debug方式查看的数据读取情况,我们也可以通过ss命令查看数据的读取情况,先让客户端发送数据,然后服务端读取一次数据,再通过debug让服务器暂时停下来,通过ss命令查看TCP接收缓冲区中还剩多少字节.



还剩119个字节,其实就是118个有效数据再加一个结束字节. 其实与我们上面分析的是一致的.


根据以上分析,客户端一次性把1142个字节发送给了服务端,但是服务端分两次才把数据读取完成,而且第一次只读取1024个字节.


如果这个时候你认为文章标题的答案是1024个字节,那其实也是不对的.
我们假设一种场景,客户端在一直急速地给服务端发送数据. 第一次Netty会使用1024字节大小的Buffer去读取TCP接收缓冲区中的数据,当读取完成之后,Netty发现分配的1024字节大小的Buffer都用来装数据了,那么Netty猜测后面应该还会有更多的数据,那么Netty下次就会分配16384字节大小的Buffer用来读取TCP接收缓冲区中的数据,如果16384字节大小的Buffer也被装满了数据,说明后面可能还会有很多数据,因此还会分配比16384更大的Buffer用来装数据.假如分配的16384字节大小的Buffer在读取数据之后没有被装满,说明TCP接收缓冲区中的数据可能不是很多,那么Netty就会分配比16384小的Buffer用来装下一次要读取的数据.


总之,Netty会根据分配Buffer的大小和实际读取到的数据大小之间的关系,来决定是增大Buffer的大小还是减小Buffer的大小.

核心代码如下

// 源码位置io.netty.channel.AdaptiveRecvByteBufAllocator.HandleImpl#record
private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) { if (decreaseNow) { index = max(index - INDEX_DECREMENT, minIndex); // 减小下次读取字节的大小 nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) { index = min(index + INDEX_INCREMENT, maxIndex); // 增大下次读取字节的大小 nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; }}


还有一点需要说明的是,假如客户端发送了非常多的数据过来,难道服务端必须一直读取这个Channel里的数据吗?
当然不是, 默认Netty只会读取Channel里面的数据16次,如果在16次的机会里,还是没有读取完这个Channel里面的数据,那么暂时就不会读取这个Channel里面的数据了,Netty需要去处理其他事情(比如轮询IO事件,处理IO事件,执行task任务等),只有当下次轮循到IO事件的时候,才会继续读取之前没有读完的数据.





Netty使用的是水平触发,因此即便客户端不发送数据了,Netty依然可以把之前没有读取完的数据,继续读取.





浏览 39
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报