Netty 的编码 解码 案例

JAVA乐园

共 29633字,需浏览 60分钟

 · 2021-07-15

0x01:半包粘包

例如发送两个数据包给服务器,由于服务端一次读取到的字节数不一定的分

没有半包和拆包:服务器分两次读取到两个地理的数据包,这个情况没有拆包和粘包的情况

  • 粘包:服务器一次收到两个数据包,在一起收到的

  • 拆包:第一次读取到完成的第一个包和第二个包的一部分内容,第二次读取到第二个包的剩余内容

  • 整包:第一次读取到第一包的部分内容,第二次读取到第一个包的剩余部分和第二个包的全部

  • 多次拆包:如果接收滑窗非常小,数据量大的时候发生多次发送的接收的情况

为什么会出现半包和粘包

1、HTTP 中有一个 Nagle 算法,每个报文都是一段的,使用网络发送发现网络效率低,然后 HTTP 设置一个算法,设置到一定程度发,所以出现一些延时,提高销量,所以形成了粘包

2、HTTP缓冲区引起的,报文段大的时候的时候直接弄在一起发送过去。

怎么解决

不断的从 TCP 的缓冲区中读取数据,每次读取完成都需要判断是否是一个完整的数据包

如果是读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包

定长
分隔符
基于长度的变长包

如果当前督导的数据加上已经读取到的数据足以拼接成一个数据包,那就讲已经读取的数据拼接本次读取的数据,构成一个完整的业务数据包传递到业务逻辑上,多余的数据保留,方便下次的读取或者数据链接。


0x02:Netty常用的编码器

  • LineBasedFrameDecoder

回车换行编码器
配合StringDecoder
  • DelimiterBasedFrameDecoder

分隔符解码器
  • FixedLengthFrameDecoder

固定长度解码器
  • LengthFieldBasedFrameDecoder

不能超过1024个字节不然会报错
基于'长度'解码器(私有协议最常用)


0x03:拆包的类

  • ByteToMessageDecoder

自解析
  • LengthFieldPrepender

长度编码器
  • Netty拆包的基类 - ByteToMessageDecoder

内部维护了一个数据累积器cumulation,每次读取到数据都会不断累加,然后尝试对累加到
的数据进行拆包,拆成一个完整的业务数据包
每次都将读取到的数据通过内存拷贝的方式, 累积到cumulation中

调用子类的 decode 方法对累积的数据尝试进行拆包

  • LengthFieldBasedFrameDecoder

参数说明

maxFrameLength:包的最大长度
lengthFieldOffset:长度属性的起始位(偏移位),包中存放长度属性字段的起始位置
lengthFieldLength:长度属性的长度 
lengthAdjustment:长度调节值,在总长被定义为包含包头长度时,修正信息长度
initialBytesToStrip:跳过的字节数,根据需要跳过lengthFieldLength个字节,以便接收端直接接受到不含“长度属性”的内容
  • LengthFieldPrepender 编码器

参数说明

lengthFieldLength:长度属性的字节长度
lengthIncludesLengthFieldLength:false,长度字节不算在总长度中,true,算到总长度中

编解码器的作用就是讲原始字节数据与自定义的消息对象进行互转

Decoder(解码器)
Encoder(编码器)

支持业界主流的序列化框架

Protobuf
Jboss Marshalling
Java Serialization

解码1拆包:把整个 ByteBuf 数据,分成一个个 ByteBuf,每个表示一个包

解码2反序列化:把每个包的 ByteBuf 字节数组转成 java object

package com.demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

public class StickyDemoClient {

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
             }
        }
        new StickyDemoClient().connect(port, "127.0.0.1");
    }

    public void connect(int port, String host) throws Exception {
        // 工作线程组 
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            //ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
                           // ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
                        //       Unpooled.wrappedBuffer(new byte[] { '#' })));
                            ch.pipeline().addLast("framer"new DelimiterBasedFrameDecoder(8192,
                                    Unpooled.wrappedBuffer(new byte[] { '#' })));

                            ch.pipeline().addLast(new StickyDemoClientHandler());

                        }
                    });

            // 发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();

            // 等待客户端链路关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            group.shutdownGracefully();
        }
    }
}


package com.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class StickyDemoClientHandler extends SimpleChannelInboundHandler<ByteBuf{

    private static String[] alphabets = {"A""B""C""D""E""F""G""H""I",
                                  "J""K""L""M""N""O""P"};

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        for(int i=0; i<10; i++) {
            StringBuilder builder = new StringBuilder();
            builder.append("这是第");
            builder.append(i);
            builder.append("条消息, 内容是:");
            for(int j=0; j<100; j++) {
                builder.append(alphabets[i]);
            }
            builder.append("......");
            builder.append("#");


            System.out.println(builder.toString().getBytes().length);

            ctx.writeAndFlush(Unpooled.copiedBuffer(builder.toString(),
                    CharsetUtil.UTF_8));
        }
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
        System.out.println("客户端接收到消息:" + in.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}


package com.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.ArrayList;
import java.util.List;

public class StickyDemoDecodeHandler extends ChannelInboundHandlerAdapter {

    //存放待拆包数据的缓冲区
    private ByteBuf cache;
    private int frameLength;

    public StickyDemoDecodeHandler(int length) {
        this.frameLength = length;
    }

    static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
        ByteBuf oldCache = cache;
        cache = alloc.buffer(oldCache.readableBytes() + readable);
        cache.writeBytes(oldCache);
        oldCache.release();
        return cache;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        ByteBuf data = (ByteBuf) msg;
        try {
            //读取每一个消息,创建缓冲区
            if (cache == null) {
                cache = ctx.alloc().buffer(1024);
            } else {
                //如果现有的缓冲区容量太小,无法容纳原有数据+新读入的数据,就扩容(重新创建一个大的,并把数据拷贝过去)
                if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
                    cache = expandCache(ctx.alloc(), cache, data.readableBytes());
                }
            }
            //把新的数据读入缓冲区
            cache.writeBytes(data);

            //每次读取frameLength(定长)的数据,做为一个包,存储起来 
            List<ByteBuf> output = new ArrayList<>();
            while (cache.readableBytes() >= frameLength) {
                output.add(cache.readBytes(frameLength));
            }

            //还有部分数据不够一个包,10, 15, 一个10个,还剩5个
            if (cache.isReadable()) {
                cache.discardReadBytes();
            }

            for (int i = 0; i < output.size(); i++) {
                ctx.fireChannelRead(output.get(i));
            }
        } finally {
            data.release();
        }

    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}


package com.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.ArrayList;
import java.util.List;

public class StickyDemoDecodeHandlerV2 extends ChannelInboundHandlerAdapter {
    private ByteBuf cache;
    private byte delimiter; //包分隔符

    public StickyDemoDecodeHandlerV2(ByteBuf delimiter) {
        if (delimiter == null) {
            throw new NullPointerException("delimiter");
        }
        if (!delimiter.isReadable()) {
            throw new IllegalArgumentException("empty delimiter");
        }

        this.delimiter =  delimiter.readByte();
        ;
    }

    static ByteBuf expandCache(ByteBufAllocator alloc, ByteBuf cache, int readable) {
        ByteBuf oldCache = cache;
        cache = alloc.buffer(oldCache.readableBytes() + readable);
        cache.writeBytes(oldCache);
        oldCache.release();
        return cache;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        ByteBuf data = (ByteBuf) msg;
        try {
            if (cache == null) {
                cache = ctx.alloc().buffer(1024);
            } else {
                if (cache.writerIndex() > cache.maxCapacity() - data.readableBytes()) {
                    cache = expandCache(ctx.alloc(), cache, data.readableBytes());
                }
            }
            cache.writeBytes(data);

            List<ByteBuf> output = new ArrayList<>();

            int frameIndex = 0;
            int frameEndIndex = 0;
            int length = cache.readableBytes();
            while (frameIndex <= length) {
                frameEndIndex = cache.indexOf(frameIndex + 1, length, delimiter);

                if (frameEndIndex == -1) {
                    cache.discardReadBytes();
                    break;
                }

                output.add(cache.readBytes(frameEndIndex - frameIndex));
                cache.skipBytes(1);
                frameIndex = frameEndIndex + 1;

            }

            if (cache.isReadable()) {
                cache.discardReadBytes();
            }

            for (int i = 0; i < output.size(); i++) {
                ctx.fireChannelRead(output.get(i));
            }
        } finally {
            data.release();
        }

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}


package com.demo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

public class StickyDemoServer {

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new StickyDemoServer().bind(port);
    }

    public void bind(int port) throws Exception {
        // 第一步:
        // 配置服务端的NIO线程组
        // 主线程组, 用于接受客户端的连接,但是不做任何具体业务处理,像老板一样,负责接待客户,不具体服务客户
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 工作线程组, 老板线程组会把任务丢给他,让手下线程组去做任务,服务客户
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 类ServerBootstrap用于配置Server相关参数,并启动Server
            ServerBootstrap b = new ServerBootstrap();

            // 链式调用
            // 配置parentGroup和childGroup
            b.group(bossGroup, workerGroup)
                    // 配置Server通道
                    .channel(NioServerSocketChannel.class)
                    // 配置通道的ChannelPipeline
                    .childHandler(new ChildChannelHandler());

            // 绑定端口,并启动server,同时设置启动方式为同步
            ChannelFuture f = b.bind(port).sync();

            System.out.println(StickyDemoServer.class.getName() + " 启动成功,在地址[" + f.channel().localAddress() + "]上等待客户请求......");

            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel{
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            //ch.pipeline().addLast("framer", new FixedLengthFrameDecoder(139));
            ch.pipeline().addLast("framer"new DelimiterBasedFrameDecoder(8192, Unpooled.wrappedBuffer(new byte[] { '#' })));
             //ch.pipeline().addLast("framer", new StickyDemoDecodeHandler(139));
            // ch.pipeline().addLast("framer", new StickyDemoDecodeHandlerV2(
            //       Unpooled.wrappedBuffer(new byte[] { '#' })));

            ch.pipeline().addLast(new StickyDemoServerHandler());
        }
    }
}


package com.demo;

 import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

 public class StickyDemoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println(
                "服务器接收到消息:" + in.toString(CharsetUtil.UTF_8));
        ctx.write(in);
       // ctx.write(Unpooled.copiedBuffer("#", CharsetUtil.UTF_8));
        //compositeBuffer.addComponent(in);
       // ByteBuf buf =  ctx.alloc().directBuffer();
       // buf.writeBytes("#".getBytes());
       // CompositeByteBuf compositeBuffer = ctx.alloc().compositeBuffer();
      //  compositeBuffer.addComponents(true, in, buf);


       // ctx.write(compositeBuffer);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception 
{
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
        Throwable cause)
 
{
        cause.printStackTrace();
        ctx.close();
    }
}


source:https://www.yuque.com/yangxinlei/lodfss/nguvm0

喜欢,在看

浏览 34
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报