熟悉Netty之数据传输--TCP粘/拆包和加解码器(二)
敦煌天空的沙粒,带着我们的记忆
我从半路看回去,这秦关漫漫好蜿踞
梦想穿过了西域,包含了多少的禅意
爱情像一本游记,我会找寻它的密语
看月牙湾下的泪光,在丝路之上被遗忘
上次用一个小例子提到了Netty是如何处理粘包/拆包的,今天来分析下其中具体的细节,首先我们看到例子中用到的一个类:
io.netty.handler.codec.LineBasedFrameDecoder
下图是该类的继承关系图:
从上图可以看出,该类属于
io.netty.channel.ChannelHandler,
所以,我们可以通过io.netty.channel.ChannelPipeline
添加至Handler中,即:
那么
io.netty.handler.codec.LineBasedFrameDecoder
是如何解决粘包/拆包的呢?
直接进去看源代码:
这里是其主要的decode方法,这里不做逐行翻译了,只是描述其大致意思
首先通过获取读取缓冲区的数据,通过字节循环匹配,看是否有"\n",如果有,则以此位置结束,从可读取索引到此位置区间的字节组成一行,如果连续读取设定的长度,都没有发现"\n",则抛出异常。
循环获取:
抛出异常:
然后,通过
io.netty.handler.codec.string.StringDecoder
将接收到的对象,转化为字符串,调用后续的Handler,处理数据
io.netty.handler.codec.LineBasedFrameDecoder
和
io.netty.handler.codec.string.StringDecoder
结合,是一种典型的按换行切换的文本解码器,它被设计用来支持TCP的粘包和拆包
此时,有看官肯定有一种疑惑,那么如果我现在需求不要按照换行符号切换文本呢,比如我们常常用到的聊天软件,通常有些是可以指定,按回车键也可以不发送数据,继续输入文本,最后按个什么组合件发送数据,类似QQ或者微信,会提供按Enter键发送数据或者Ctrl+Enter发送数据,此时,需要怎么处理呢?
其实,无论是Enter或者Ctrl+Enter发送数据,程序都会指定一种形式发送数据,那么在Netty中,同样支持利用分隔符解码器来处理数据,这个分隔符则可以任意指定。
如上图所示,Netty提供了
io.netty.handler.codec.FixedLengthFrameDecoder
和
io.netty.handler.codec.DelimiterBasedFrameDecoder
自定义分隔符和定长解码器,来解决上面提到的问题
那么先来看看其实际应用:
用前面例子为例:
服务端:
package com.lgli.netty.tcppackage;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import java.net.InetSocketAddress;
/**
* Server
* @author lgli
*/
public class MyServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,10240)
.childHandler(new ServerInitial());
ChannelFuture future = bootstrap.bind(new InetSocketAddress("localhost", 8080)).sync();
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
static class ServerInitial extends ChannelInitializer<SocketChannel>{
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBuffer("$_end".getBytes());
ch.pipeline()
.addLast("frame",new DelimiterBasedFrameDecoder(1024,byteBuf))
.addLast("decode",new StringDecoder())
.addLast("handler",new ServerHandler());
}
}
static class ServerHandler extends SimpleChannelInboundHandler<Object>{
private int counter;
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
String receive = (String) msg;
System.out.println("服务端收到客户端数据:"+receive+";目前计数counter = "+ ++counter);
String result = "hello,how are you?".equalsIgnoreCase(receive)?"I am fine,thank you":"wrong msg";
result += "$_end";
ByteBuf resultBuf = Unpooled.copiedBuffer(result.getBytes());
ctx.writeAndFlush(resultBuf);
}
}
}
客户端:
package com.lgli.netty.tcppackage;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import java.net.InetSocketAddress;
/**
* Client
* @author lgli
*/
public class MyClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ClientInitial());
ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync();
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
static class ClientInitial extends ChannelInitializer<SocketChannel>{
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBuffer("$_end".getBytes());
ch.pipeline()
.addLast("frame",new DelimiterBasedFrameDecoder(1024,byteBuf))
.addLast("decode",new StringDecoder())
.addLast("handler",new ClientHandler());
}
}
static class ClientHandler extends SimpleChannelInboundHandler<Object>{
private int counter;
private byte[] req;
public ClientHandler(){
req = ("hello,how are you?$_end").getBytes();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buff = null;
for(int i = 0 ; i < 100 ;i++){
buff = Unpooled.buffer(req.length);
buff.writeBytes(req);
ctx.writeAndFlush(buff);
}
}
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
String msgs = (String) msg;
System.out.println("服务器回复:"+msgs+";此时计数器counter:"+ ++counter);
}
}
}
这里,主要用了
io.netty.handler.codec.DelimiterBasedFrameDecoder
代替前面的
io.netty.handler.codec.LineBasedFrameDecoder
上述代码,说明,对于数据的截取,是根据"$_end"作为结束分隔符,
即遇到"$_end",则截取数据,发送数据。
运行上述代码,其实际结果如下:
服务端(部分打印):
客户端(部分打印):
至于
io.netty.handler.codec.DelimiterBasedFrameDecoder
则是根据指定长度的截取数据
因为比较简单这里就不做详述了
有兴趣的可以自己做个实验
后续分享:Netty底层实现原理
有喜欢的烦请点个关注,谢谢!