Netty 解决粘包半包问题

共 6200字,需浏览 13分钟

 ·

2020-08-19 11:15


来源:SegmentFault 思否社区

作者:y猪




一.什么是TCP粘包半包


客户端发送数据包给服务端,因服务端一次读取到的字节数是不确定的,有好几种情况.


  1. 服务端分两次读取到了两个独立的数据包,没有粘包和拆包;
  2. 服务端一次接收到了两个数据包,粘合在一起,被称为 TCP 粘包;
  3. 服务端分两次读取到了两个数据包, 第一次读取到了完整的包和另外一个包的部分内容,第二次读取到了另一个包的剩余内容, 这被称为 TCP 拆包;
  4. 服务端分两次读取到了两个数据包, 第一次读取到了包的部分内容 , 第二次读取到了之前未读完的包剩余内容和另一个包,发生了拆包和粘包。
  5. 服务端 TCP 接收滑动窗口很小,数据包比较大, 即服务端分多次才能将 包接收完全,发生多次拆包




二.粘包半包的原因


1.粘包


TCP协议:本身是 面向连接的可靠地协议-三次握手机制。


客户端与服务器会维持一个连接(Channel) ,在连接不断开的情况下, 可以将多个数据包发往服务器,但是发送的网络数据包太小, 那么本身会启用 Nagle 算法(可配置是否启用) 对较小的数据包进行合并(因此,TCP 的网络延迟要 UDP 的高些)然后再发送(超时或者包大小足够)。


服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;


服务器在接收到数据后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个


数据包的情况,造成粘包现象。


UDP:本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包) , 他不会对数据包进行合并发送,直接是一端发送什么数据, 直接就发出去了, 既然他不会对数据合并, 每一个数据包都是完整的(数据+UDP 头+IP 头等等发一 次数据封装一次) 也就没有粘包了。


2.半包


分包产生的原因:可能是IP分片传输导致的, 也可能是传输过程中丢失部 分包导致出现的半包, 还有可能就是一个包可能被分成了两次传输, 在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系) , 总之就是一个数据包被分成了多次接收。


更具体的原因有三个, 分别如下。


  1. 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
  2. 进行 MSS 大小的 TCP 分段。MSS 是最大报文段长度的缩写。MSS 是 TCP 报文段中的数据字段的最大长度。数据字段加上 TCP 首部才等于整个的 TCP 报文段。所以 MSS 并不是


TCP 报文段的最大长度, 而是:MSS=TCP 报文段长度-TCP 首部长度


  1. 以太网的 payload 大于 MTU 进行 IP 分片。MTU 指:一种通信协议的某一层上面所能


通过的最大数据包大小。如果 IP 层有一个数据包要传, 而且数据的长度比链路层的 MTU 大,那么 IP 层就会进行分片, 把数据包分成托干片, 让每一片都不超过 MTU。注意, IP 分片可


以发生在原始发送端主机上, 也可以发生在中间路由器上。


3.解决粘包半包问题


由于底层的 TCP 无法理解上层的业务数据, 所以在底层是无法保证数据包不被拆分和重组的, 这个问题只能通过上层的应用协议栈设计来解决。业界的主流协议的解决方案,可以归纳如下。


(1) 在包尾增加分割符, 比如回车换行符进行分割, 例如 FTP 协议;
(2) 消息定长, 例如每个报文的大小为固定长度 200 字节, 如果不够, 空位补空格;
(3) 将消息分为消息头和消息体, 消息头中包含表示消息总长度(或者消息体长度)的字段, 通常设计思路为消息头的第一个字段使用 int32 来表示消息的总长度,LengthFieldBasedFrameDecoder。


下面列举一个包尾增加分隔符的例子:

服务端程序:


import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
/** * 入站处理器 */@ChannelHandler.Sharablepublic class DelimiterServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0); private AtomicInteger completeCounter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的处理*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + DelimiterEchoServer.DELIMITER_SYMBOL; ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); }
/*** 服务端读取完成网络数据后的处理*/ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); System.out.println("the ReadComplete count is " +completeCounter.incrementAndGet()); }
/*** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}


import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import java.net.InetSocketAddress;
/** * 服务端 */public class DelimiterEchoServer {
public static final String DELIMITER_SYMBOL = "@~"; public static final int PORT = 9997;
public static void main(String[] args) throws InterruptedException { DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer(); System.out.println("服务器即将启动"); delimiterEchoServer.start(); }
public void start() throws InterruptedException { final DelimiterServerHandler serverHandler = new DelimiterServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/ .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/ /*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel, 所以下面这段代码的作用就是为这个子channel增加handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/ System.out.println("服务器启动完成,等待客户端的连接和数据....."); f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/ } finally { group.shutdownGracefully().sync();/*优雅关闭线程组*/ } }
private static class ChannelInitializerImp extends ChannelInitializer {
@Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL .getBytes()); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterServerHandler()); } }
}


客户端程序


import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;

/** * 入站处理器 */public class DelimiterClientHandler extends SimpleChannelInboundHandler {
private AtomicInteger counter = new AtomicInteger(0);
/*** 客户端读取到网络数据后的处理*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8) +"] and the counter is:"+counter.incrementAndGet()); }
/*** 客户端被通知channel活跃后,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; String request = "Mark,Lison,Peter,James,Deer" + DelimiterEchoServer.DELIMITER_SYMBOL; for(int i=0;i<10;i++){ msg = Unpooled.buffer(request.length()); msg.writeBytes(request.getBytes()); ctx.writeAndFlush(msg); } }
/*** 发生异常后的处理*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}


import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;
/** * 客户端 */public class DelimiterEchoClient {
private final String host;
public DelimiterEchoClient(String host) { this.host = host; }
public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { final Bootstrap b = new Bootstrap();;/*客户端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/ .remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/ .handler(new ChannelInitializerImp()); ChannelFuture f = b.connect().sync(); System.out.println("已连接到服务器....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } }
private static class ChannelInitializerImp extends ChannelInitializer {
@Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL .getBytes()); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterClientHandler()); } }
public static void main(String[] args) throws InterruptedException { new DelimiterEchoClient("127.0.0.1").start(); }}


关键代码:


1.建立连接后,客户端给服务端发数据包,每次发送已特殊字符`@~结尾。


2.服务端收到数据包后经过DelimiterBasedFrameDecoder即分隔符基础框架解码器解码为一个个带有分隔符的数据包。


3.再到服务端的业务层处理器DelimiterServerHandler





点击左下角阅读原文,到 SegmentFault 思否社区 和文章作者展开更多互动和交流。

- END -

浏览 17
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报