熟悉Netty之数据传输--TCP粘/拆包和加解码器(二)

lgli

共 6275字,需浏览 13分钟

 · 2020-10-27


敦煌天空的沙粒,带着我们的记忆

我从半路看回去,这秦关漫漫好蜿踞

梦想穿过了西域,包含了多少的禅意

爱情像一本游记,我会找寻它的密语

看月牙湾下的泪光,在丝路之上被遗忘


上次用一个小例子提到了Netty是如何处理粘包/拆包的,今天来分析下其中具体的细节,首先我们看到例子中用到的一个类:


io.netty.handler.codec.LineBasedFrameDecoder

下图是该类的继承关系图:


fe9f6f0baf70575c6d4297433a05b4a8.webp

从上图可以看出,该类属于

io.netty.channel.ChannelHandler,

所以,我们可以通过io.netty.channel.ChannelPipeline

添加至Handler中,即:

f669abe98386beb13c176bc984367c07.webp

那么

io.netty.handler.codec.LineBasedFrameDecoder

是如何解决粘包/拆包的呢?


直接进去看源代码:

2ced8352c167201f75343294a6cd54e1.webp

2b029543b16796d9c1f7b42c383c24ce.webp


这里是其主要的decode方法,这里不做逐行翻译了,只是描述其大致意思

首先通过获取读取缓冲区的数据,通过字节循环匹配,看是否有"\n",如果有,则以此位置结束,从可读取索引到此位置区间的字节组成一行,如果连续读取设定的长度,都没有发现"\n",则抛出异常。


循环获取:

1b533793e200357c7e02209d4fadafb3.webp抛出异常:

ee6693229b8d9b518c261abcab32b0a6.webp


然后,通过

io.netty.handler.codec.string.StringDecoder

将接收到的对象,转化为字符串,调用后续的Handler,处理数据


e37e275d8bc14ccb331a54e71a99781e.webp


io.netty.handler.codec.LineBasedFrameDecoder

io.netty.handler.codec.string.StringDecoder

结合,是一种典型的按换行切换的文本解码器,它被设计用来支持TCP的粘包和拆包


此时,有看官肯定有一种疑惑,那么如果我现在需求不要按照换行符号切换文本呢,比如我们常常用到的聊天软件,通常有些是可以指定,按回车键也可以不发送数据,继续输入文本,最后按个什么组合件发送数据,类似QQ或者微信,会提供按Enter键发送数据或者Ctrl+Enter发送数据,此时,需要怎么处理呢?


其实,无论是Enter或者Ctrl+Enter发送数据,程序都会指定一种形式发送数据,那么在Netty中,同样支持利用分隔符解码器来处理数据,这个分隔符则可以任意指定。


a40dff5bc11c1cf34cd69d7257d1f5ed.webp

如上图所示,Netty提供了

io.netty.handler.codec.FixedLengthFrameDecoder

io.netty.handler.codec.DelimiterBasedFrameDecoder

自定义分隔符和定长解码器,来解决上面提到的问题



那么先来看看其实际应用:


用前面例子为例:


服务端:

package com.lgli.netty.tcppackage;
import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.LineBasedFrameDecoder;import io.netty.handler.codec.string.StringDecoder;
import java.net.InetSocketAddress;
/** * Server * @author lgli */public class MyServer {
public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try{ ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,10240) .childHandler(new ServerInitial()); ChannelFuture future = bootstrap.bind(new InetSocketAddress("localhost", 8080)).sync(); future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } }
static class ServerInitial extends ChannelInitializer<SocketChannel>{
@Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf byteBuf = Unpooled.copiedBuffer("$_end".getBytes()); ch.pipeline() .addLast("frame",new DelimiterBasedFrameDecoder(1024,byteBuf)) .addLast("decode",new StringDecoder()) .addLast("handler",new ServerHandler()); } }
static class ServerHandler extends SimpleChannelInboundHandler<Object>{ private int counter;
@Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { String receive = (String) msg; System.out.println("服务端收到客户端数据:"+receive+";目前计数counter = "+ ++counter); String result = "hello,how are you?".equalsIgnoreCase(receive)?"I am fine,thank you":"wrong msg"; result += "$_end"; ByteBuf resultBuf = Unpooled.copiedBuffer(result.getBytes()); ctx.writeAndFlush(resultBuf); }    }}

客户端:


package com.lgli.netty.tcppackage;
import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.LineBasedFrameDecoder;import io.netty.handler.codec.string.StringDecoder;
import java.net.InetSocketAddress;
/** * Client * @author lgli */public class MyClient { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE,true) .handler(new ClientInitial()); ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync(); future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } }
static class ClientInitial extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBuffer("$_end".getBytes()); ch.pipeline() .addLast("frame",new DelimiterBasedFrameDecoder(1024,byteBuf)) .addLast("decode",new StringDecoder()) .addLast("handler",new ClientHandler()); } } static class ClientHandler extends SimpleChannelInboundHandler<Object>{ private int counter; private byte[] req;
public ClientHandler(){ req = ("hello,how are you?$_end").getBytes();        } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buff = null; for(int i = 0 ; i < 100 ;i++){ buff = Unpooled.buffer(req.length); buff.writeBytes(req); ctx.writeAndFlush(buff); } }
@Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { String msgs = (String) msg; System.out.println("服务器回复:"+msgs+";此时计数器counter:"+ ++counter); }    }}

这里,主要用了

io.netty.handler.codec.DelimiterBasedFrameDecoder

代替前面的

io.netty.handler.codec.LineBasedFrameDecoder

62706a4cb7cfed7e7d1a3cc674a2e6f0.webp

上述代码,说明,对于数据的截取,是根据"$_end"作为结束分隔符,

即遇到"$_end",则截取数据,发送数据。


运行上述代码,其实际结果如下:

服务端(部分打印):

c6581571e36cef6d6df04d9370faffa9.webp


客户端(部分打印):


61346836fa385d9c87fc61f2ca7ce4cd.webp

至于

io.netty.handler.codec.DelimiterBasedFrameDecoder

则是根据指定长度的截取数据

因为比较简单这里就不做详述了

有兴趣的可以自己做个实验


后续分享:Netty底层实现原理


有喜欢的烦请点个关注,谢谢!


浏览 55
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报