Netty 解决粘包半包问题
来源:SegmentFault 思否社区
作者:y猪
一.什么是TCP粘包半包
客户端发送数据包给服务端,因服务端一次读取到的字节数是不确定的,有好几种情况.
服务端分两次读取到了两个独立的数据包,没有粘包和拆包; 服务端一次接收到了两个数据包,粘合在一起,被称为 TCP 粘包; 服务端分两次读取到了两个数据包, 第一次读取到了完整的包和另外一个包的部分内容,第二次读取到了另一个包的剩余内容, 这被称为 TCP 拆包; 服务端分两次读取到了两个数据包, 第一次读取到了包的部分内容 , 第二次读取到了之前未读完的包剩余内容和另一个包,发生了拆包和粘包。 服务端 TCP 接收滑动窗口很小,数据包比较大, 即服务端分多次才能将 包接收完全,发生多次拆包
二.粘包半包的原因
1.粘包
TCP协议:本身是 面向连接的可靠地协议-三次握手机制。
客户端与服务器会维持一个连接(Channel) ,在连接不断开的情况下, 可以将多个数据包发往服务器,但是发送的网络数据包太小, 那么本身会启用 Nagle 算法(可配置是否启用) 对较小的数据包进行合并(因此,TCP 的网络延迟要 UDP 的高些)然后再发送(超时或者包大小足够)。
服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;
服务器在接收到数据后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个
数据包的情况,造成粘包现象。
UDP:本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包) , 他不会对数据包进行合并发送,直接是一端发送什么数据, 直接就发出去了, 既然他不会对数据合并, 每一个数据包都是完整的(数据+UDP 头+IP 头等等发一 次数据封装一次) 也就没有粘包了。
2.半包
分包产生的原因:可能是IP分片传输导致的, 也可能是传输过程中丢失部 分包导致出现的半包, 还有可能就是一个包可能被分成了两次传输, 在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系) , 总之就是一个数据包被分成了多次接收。
更具体的原因有三个, 分别如下。
应用程序写入数据的字节大小大于套接字发送缓冲区的大小 进行 MSS 大小的 TCP 分段。MSS 是最大报文段长度的缩写。MSS 是 TCP 报文段中的数据字段的最大长度。数据字段加上 TCP 首部才等于整个的 TCP 报文段。所以 MSS 并不是
TCP 报文段的最大长度, 而是:MSS=TCP 报文段长度-TCP 首部长度
以太网的 payload 大于 MTU 进行 IP 分片。MTU 指:一种通信协议的某一层上面所能
通过的最大数据包大小。如果 IP 层有一个数据包要传, 而且数据的长度比链路层的 MTU 大,那么 IP 层就会进行分片, 把数据包分成托干片, 让每一片都不超过 MTU。注意, IP 分片可
以发生在原始发送端主机上, 也可以发生在中间路由器上。
3.解决粘包半包问题
由于底层的 TCP 无法理解上层的业务数据, 所以在底层是无法保证数据包不被拆分和重组的, 这个问题只能通过上层的应用协议栈设计来解决。业界的主流协议的解决方案,可以归纳如下。
(1) 在包尾增加分割符, 比如回车换行符进行分割, 例如 FTP 协议;
(2) 消息定长, 例如每个报文的大小为固定长度 200 字节, 如果不够, 空位补空格;
(3) 将消息分为消息头和消息体, 消息头中包含表示消息总长度(或者消息体长度)的字段, 通常设计思路为消息头的第一个字段使用 int32 来表示消息的总长度,LengthFieldBasedFrameDecoder。
下面列举一个包尾增加分隔符的例子:
服务端程序:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 入站处理器
*/
@ChannelHandler.Sharable
public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {
private AtomicInteger counter = new AtomicInteger(0);
private AtomicInteger completeCounter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的处理*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept["+request
+"] and the counter is:"+counter.incrementAndGet());
String resp = "Hello,"+request+". Welcome to Netty World!"
+ DelimiterEchoServer.DELIMITER_SYMBOL;
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
}
/*** 服务端读取完成网络数据后的处理*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
ctx.fireChannelReadComplete();
System.out.println("the ReadComplete count is "
+completeCounter.incrementAndGet());
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import java.net.InetSocketAddress;
/**
* 服务端
*/
public class DelimiterEchoServer {
public static final String DELIMITER_SYMBOL = "@~";
public static final int PORT = 9997;
public static void main(String[] args) throws InterruptedException {
DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
System.out.println("服务器即将启动");
delimiterEchoServer.start();
}
public void start() throws InterruptedException {
final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioServerSocketChannel.class)/*指定使用NIO进行网络传输*/
.localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/
/*服务端每接收到一个连接请求,就会新启一个socket通信,也就是channel,
所以下面这段代码的作用就是为这个子channel增加handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到完成*/
System.out.println("服务器启动完成,等待客户端的连接和数据.....");
f.channel().closeFuture().sync();/*阻塞直到服务器的channel关闭*/
} finally {
group.shutdownGracefully().sync();/*优雅关闭线程组*/
}
}
private static class ChannelInitializerImp extends ChannelInitializer
{
@Override
protected void initChannel(Channel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL
.getBytes());
ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
delimiter));
ch.pipeline().addLast(new DelimiterServerHandler());
}
}
}
客户端程序
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* 入站处理器
*/
public class DelimiterClientHandler extends SimpleChannelInboundHandler
{
private AtomicInteger counter = new AtomicInteger(0);
/*** 客户端读取到网络数据后的处理*/
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
+"] and the counter is:"+counter.incrementAndGet());
}
/*** 客户端被通知channel活跃后,做事*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
String request = "Mark,Lison,Peter,James,Deer"
+ DelimiterEchoServer.DELIMITER_SYMBOL;
for(int i=0;i<10;i++){
msg = Unpooled.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
}
}
/*** 发生异常后的处理*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
/**
* 客户端
*/
public class DelimiterEchoClient {
private final String host;
public DelimiterEchoClient(String host) {
this.host = host;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();/*线程组*/
try {
final Bootstrap b = new Bootstrap();;/*客户端启动必须*/
b.group(group)/*将线程组传入*/
.channel(NioSocketChannel.class)/*指定使用NIO进行网络传输*/
.remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*配置要连接服务器的ip地址和端口*/
.handler(new ChannelInitializerImp());
ChannelFuture f = b.connect().sync();
System.out.println("已连接到服务器.....");
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
private static class ChannelInitializerImp extends ChannelInitializer
{
@Override
protected void initChannel(Channel ch) throws Exception {
ByteBuf delimiter
= Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL
.getBytes());
ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
delimiter));
ch.pipeline().addLast(new DelimiterClientHandler());
}
}
public static void main(String[] args) throws InterruptedException {
new DelimiterEchoClient("127.0.0.1").start();
}
}
关键代码:
1.建立连接后,客户端给服务端发数据包,每次发送已特殊字符`@~结尾。
2.服务端收到数据包后经过DelimiterBasedFrameDecoder即分隔符基础框架解码器解码为一个个带有分隔符的数据包。
3.再到服务端的业务层处理器DelimiterServerHandler