Netty 解决粘包半包问题

来源:SegmentFault 思否社区
作者:y猪
一.什么是TCP粘包半包
客户端发送数据包给服务端,因服务端一次读取到的字节数是不确定的,有好几种情况.
服务端分两次读取到了两个独立的数据包,没有粘包和拆包; 服务端一次接收到了两个数据包,粘合在一起,被称为 TCP 粘包; 服务端分两次读取到了两个数据包, 第一次读取到了完整的包和另外一个包的部分内容,第二次读取到了另一个包的剩余内容, 这被称为 TCP 拆包; 服务端分两次读取到了两个数据包, 第一次读取到了包的部分内容 , 第二次读取到了之前未读完的包剩余内容和另一个包,发生了拆包和粘包。 服务端 TCP 接收滑动窗口很小,数据包比较大, 即服务端分多次才能将 包接收完全,发生多次拆包 
二.粘包半包的原因
1.粘包
TCP协议:本身是 面向连接的可靠地协议-三次握手机制。
客户端与服务器会维持一个连接(Channel) ,在连接不断开的情况下, 可以将多个数据包发往服务器,但是发送的网络数据包太小, 那么本身会启用 Nagle 算法(可配置是否启用) 对较小的数据包进行合并(因此,TCP 的网络延迟要 UDP 的高些)然后再发送(超时或者包大小足够)。
服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;
服务器在接收到数据后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个
数据包的情况,造成粘包现象。
UDP:本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包) , 他不会对数据包进行合并发送,直接是一端发送什么数据, 直接就发出去了, 既然他不会对数据合并, 每一个数据包都是完整的(数据+UDP 头+IP 头等等发一 次数据封装一次) 也就没有粘包了。
2.半包
分包产生的原因:可能是IP分片传输导致的, 也可能是传输过程中丢失部 分包导致出现的半包, 还有可能就是一个包可能被分成了两次传输, 在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系) , 总之就是一个数据包被分成了多次接收。
更具体的原因有三个, 分别如下。
应用程序写入数据的字节大小大于套接字发送缓冲区的大小 进行 MSS 大小的 TCP 分段。MSS 是最大报文段长度的缩写。MSS 是 TCP 报文段中的数据字段的最大长度。数据字段加上 TCP 首部才等于整个的 TCP 报文段。所以 MSS 并不是 
TCP 报文段的最大长度, 而是:MSS=TCP 报文段长度-TCP 首部长度
以太网的 payload 大于 MTU 进行 IP 分片。MTU 指:一种通信协议的某一层上面所能 
通过的最大数据包大小。如果 IP 层有一个数据包要传, 而且数据的长度比链路层的 MTU 大,那么 IP 层就会进行分片, 把数据包分成托干片, 让每一片都不超过 MTU。注意, IP 分片可
以发生在原始发送端主机上, 也可以发生在中间路由器上。
3.解决粘包半包问题
由于底层的 TCP 无法理解上层的业务数据, 所以在底层是无法保证数据包不被拆分和重组的, 这个问题只能通过上层的应用协议栈设计来解决。业界的主流协议的解决方案,可以归纳如下。
(1) 在包尾增加分割符, 比如回车换行符进行分割, 例如 FTP 协议;
(2) 消息定长, 例如每个报文的大小为固定长度 200 字节, 如果不够, 空位补空格;
(3) 将消息分为消息头和消息体, 消息头中包含表示消息总长度(或者消息体长度)的字段, 通常设计思路为消息头的第一个字段使用 int32 来表示消息的总长度,LengthFieldBasedFrameDecoder。
下面列举一个包尾增加分隔符的例子:
服务端程序:
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;import java.util.concurrent.atomic.AtomicInteger;/*** 入站处理器*/@ChannelHandler.Sharablepublic class DelimiterServerHandler extends ChannelInboundHandlerAdapter {private AtomicInteger counter = new AtomicInteger(0);private AtomicInteger completeCounter = new AtomicInteger(0);/*** 服务端读取到网络数据后的处理*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf)msg;String request = in.toString(CharsetUtil.UTF_8);System.out.println("Server Accept["+request+"] and the counter is:"+counter.incrementAndGet());String resp = "Hello,"+request+". Welcome to Netty World!"+ DelimiterEchoServer.DELIMITER_SYMBOL;ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));}/*** 服务端读取完成网络数据后的处理*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx)throws Exception {ctx.fireChannelReadComplete();System.out.println("the ReadComplete count is "+completeCounter.incrementAndGet());}/*** 发生异常后的处理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}
import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import java.net.InetSocketAddress;/*** 服务端*/public class DelimiterEchoServer {public static final String DELIMITER_SYMBOL = "@~";public static final int PORT = 9997;public static void main(String[] args) throws InterruptedException {DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();System.out.println("服务器即将启动");delimiterEchoServer.start();}public void start() throws InterruptedException {final DelimiterServerHandler serverHandler = new DelimiterServerHandler();EventLoopGroup group = new NioEventLoopGroup();/*线程组*/try {ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/b.group(group)/*将线程组传入*/.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/.localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*//*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,所以下面这段代码的作用就是为这个子channel增加handle*/.childHandler(new ChannelInitializerImp());ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/System.out.println("服务器启动完成,等待客户端的连接和数据.....");f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/} finally {group.shutdownGracefully().sync();/*优雅关闭线程组*/}}private static class ChannelInitializerImp extends ChannelInitializer{ @Overrideprotected void initChannel(Channel ch) throws Exception {ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL.getBytes());ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,delimiter));ch.pipeline().addLast(new DelimiterServerHandler());}}}
客户端程序
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;/*** 入站处理器*/public class DelimiterClientHandler extends SimpleChannelInboundHandler{ private AtomicInteger counter = new AtomicInteger(0);/*** 客户端读取到网络数据后的处理*/protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)+"] and the counter is:"+counter.incrementAndGet());}/*** 客户端被通知channel活跃后,做事*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ByteBuf msg = null;String request = "Mark,Lison,Peter,James,Deer"+ DelimiterEchoServer.DELIMITER_SYMBOL;for(int i=0;i<10;i++){msg = Unpooled.buffer(request.length());msg.writeBytes(request.getBytes());ctx.writeAndFlush(msg);}}/*** 发生异常后的处理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;/*** 客户端*/public class DelimiterEchoClient {private final String host;public DelimiterEchoClient(String host) {this.host = host;}public void start() throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();/*线程组*/try {final Bootstrap b = new Bootstrap();;/*客户端启动必须*/b.group(group)/*将线程组传入*/.channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/.remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/.handler(new ChannelInitializerImp());ChannelFuture f = b.connect().sync();System.out.println("已连接到服务器.....");f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}private static class ChannelInitializerImp extends ChannelInitializer{ @Overrideprotected void initChannel(Channel ch) throws Exception {ByteBuf delimiter= Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL.getBytes());ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,delimiter));ch.pipeline().addLast(new DelimiterClientHandler());}}public static void main(String[] args) throws InterruptedException {new DelimiterEchoClient("127.0.0.1").start();}}
关键代码:
1.建立连接后,客户端给服务端发数据包,每次发送已特殊字符`@~结尾。
2.服务端收到数据包后经过DelimiterBasedFrameDecoder即分隔符基础框架解码器解码为一个个带有分隔符的数据包。
3.再到服务端的业务层处理器DelimiterServerHandler

