Netty | 工作流程图分析 & 核心组件说明 & 代码案例实践
一、Netty 工作流程
我们先来看看Netty的工作原理图,简单说一下工作流程,然后通过这张图来一一分析Netty的核心组件。
1.1、Server工作流程图:
1.2、Server工作流程分析:
server端启动时绑定本地某个端口,初始化
NioServerSocketChannel
.将自己
NioServerSocketChannel
注册到某个BossNioEventLoopGroup
的selector上。server端包含1个
Boss NioEventLoopGroup
和1个Worker NioEventLoopGroup
,Boss NioEventLoopGroup
专门负责接收客户端的连接,Worker NioEventLoopGroup
专门负责网络的读写NioEventLoopGroup相当于1个事件循环组,这个组里包含多个事件循环NioEventLoop,每个NioEventLoop包含1个selector和1个事件循环线程。
BossNioEventLoopGroup
循环执行的任务:1、轮询accept事件;
2、处理accept事件,将生成的NioSocketChannel注册到某一个
WorkNioEventLoopGroup
的Selector上。3、处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用
eventloop.execute或schedule
执行的任务,或者其它线程提交到该eventloop
的任务。WorkNioEventLoopGroup
循环执行的任务:轮询
read和Write
事件处理IO事件,在NioSocketChannel可读、可写事件发生时,回调(触发)ChannelHandler进行处理。
处理任务队列的任务,即
runAllTasks
1.3、Client工作流程图
流程就不重复概述啦😁
二、核心模块组件
Netty的核心组件大致是以下几个:
Channel 接口
EventLoopGroup 接口
ChannelFuture 接口
ChannelHandler 接口
ChannelPipeline 接口
ChannelHandlerContext 接口
SimpleChannelInboundHandler 抽象类
Bootstrap、ServerBootstrap 类
ChannelFuture 接口
ChannelOption 类
2.1、Channel 接口
我们平常用到基本的 I/O 操作(bind()、connect()、read()和 write()),其本质都依赖于底层网络传输所提供的原语,在Java中就是Socket
类。
Netty 的 Channel 接 口所提供的 API,大大地降低了直接使用Socket
类的复杂性。另外Channel
提供异步的网络 I/O
操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O
调用都将立即返回,并且不保证在调用结束时所请求的 I/O
操作已完成。
在调用结束后立即返回一个 ChannelFuture
实例,通过注册监听器到 ChannelFuture
上,支持 在I/O
操作成功、失败或取消时立马回调通知调用方。
此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根,比如:
LocalServerChannel
:用于本地传输的ServerChannel ,允许 VM 通信。EmbeddedChannel
:以嵌入式方式使用的 Channel 实现的基类。NioSocketChannel
:异步的客户端 TCP 、Socket 连接。NioServerSocketChannel
:异步的服务器端 TCP、Socket 连接。NioDatagramChannel
:异步的 UDP 连接。NioSctpChannel
:异步的客户端 Sctp 连接,它使用非阻塞模式并允许将 SctpMessage 读/写到底层 SctpChannel。NioSctpServerChannel
:异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
2.2、EventLoopGroup接口
EventLoop
定义了Netty的核心抽象,用于处理连接的生命周期中所发生的事件。
Netty 通过触发事件将 Selector 从应用程序中抽象出来,消除了所有本来将需要手动编写 的派发代码。在内部,将会为每个 Channel 分配一个 EventLoop,用以处理所有事件,包括:
注册事件;
将事件派发给 ChannelHandler;
安排进一步的动作。
不过在这里我们不深究它,针对 Channel、EventLoop、Thread 以及 EventLoopGroup 之间的关系做一个简单说明。
一个
EventLoopGroup
包含一个或者多个EventLoop
;每个
EventLoop
维护着一个Selector
实例,所以一个 EventLoop 在它的生命周期内只和一个Thread
绑定;因此所有由
EventLoop
处理的 I/O 事件都将在它专有的Thread
上被处理,实际上消除了对于同步的需要;一个
Channel
在它的生命周期内只注册于一个EventLoop
;一个
EventLoop
可能会被分配给一个或多个Channel
。通常一个服务端口即一个
ServerSocketChannel
对应一个Selector
和一个EventLoop
线程。BossEventLoop
负责接收客户端的连接并将SocketChannel
交给WorkerEventLoopGroup
来进行 IO 处理,就如上文中的流程图一样。
2.3、ChannelFuture 接口
Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会 立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。具体的实现就是通过 Future
和 ChannelFutures
,其 addListener()
方法注册了一个 ChannelFutureListener
,以便在某个操作完成时(无论是否成功)自动触发注册的监听事件。
常见的方法有
Channel channel()
,返回当前正在进行IO
操作的通道ChannelFuture sync()
,等待异步操作执行完毕
2.4、ChannelHandler 接口
从之前的入门程序中,我们可以看到ChannelHandler
在Netty中的重要性,它充当了所有处理入站和出站数据的应用程序逻辑的容器。我们的业务逻辑也大都写在实现的字类中,另外ChannelHandler
的方法是由事件自动触发的,并不需要我们自己派发。
ChannelHandler
的实现类或者实现子接口有很多。平时我们就是去继承或子接口,然后重写里面的方法。
最常见的几种Handler:
ChannelInboundHandler
:接收入站事件和数据ChannelOutboundHandler
:用于处理出站事件和数据。
常见的适配器:
ChannelInboundHandlerAdapter
:用于处理入站IO事件ChannelInboundHandler
实现的抽象基类,它提供了所有方法的实现。这个实现只是将操作转发到ChannelPipeline
的下一个ChannelHandler
。子类可以覆盖方法实现来改变这一ChannelOutboundHandlerAdapter
:用于处理出站IO事件
我们经常需要自定义一个 Handler
类去继承 ChannelInboundHandlerAdapter
,然后通过重写相应方法实现业务逻辑,我们来看看有哪些方法可以重写:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
//注册事件
public void channelRegistered(ChannelHandlerContext ctx) ;
//
public void channelUnregistered(ChannelHandlerContext ctx);
//通道已经就绪
public void channelActive(ChannelHandlerContext ctx);
public void channelInactive(ChannelHandlerContext ctx) ;
//通道读取数据事件
public void channelRead(ChannelHandlerContext ctx, Object msg) ;
//通道读取数据事件完毕
public void channelReadComplete(ChannelHandlerContext ctx) ;
public void userEventTriggered(ChannelHandlerContext ctx, Object evt);
//通道可写性已更改
public void channelWritabilityChanged(ChannelHandlerContext ctx);
//异常处理
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
}
复制代码
2.5、ChannelPipeline 接口
ChannelPipeline
提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站和出站事件流的 API。当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline
。他们的组成关系如下:
一个 Channel 包含了一个 ChannelPipeline
,而 ChannelPipeline 中又维护了一个由ChannelHandlerContext
组成的双向链表,并且每个ChanneHandlerContext中又关联着一个ChannelHandler
。
ChannelHandler
安装到 ChannelPipeline
中的过程:
一个
ChannelInitializer
的实现被注册到了ServerBootstrap
中 ;当
ChannelInitializer.initChannel()
方法被调用时,ChannelInitializer将在ChannelPipeline
中安装一组自定义的ChannelHandler
;ChannelInitializer
将它自己从ChannelPipeline
中移除。
从一个客户端应用程序 的角度来看,如果事件的运动方向是从客户端到服务器端,那么我们称这些事件为出站的,反之 则称为入站的。服务端反之。
如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline
的头部 开始流动,并被传递给第一个 ChannelInboundHandler
。次此handler处理完后,数据将会被传递给链中的下一个 ChannelInboundHandler
。最终,数据将会到达 ChannelPipeline
的尾端,至此,所有处理就结束了。
出站事件会从尾端往前传递到最前一个出站的 handler。出站和入站两种类型的 handler互不干扰。
2.6、ChannelHandlerContext 接口
作用就是使ChannelHandler
能够与其ChannelPipeline
和其他处理程序交互。因为 ChannelHandlerContext保存channel
相关的所有上下文信息,同时关联一个 ChannelHandler
对象, 另外,ChannelHandlerContext
可以通知ChannelPipeline
的下一个ChannelHandler
以及动态修改它所属的ChannelPipeline
。
2.7、SimpleChannelInboundHandler 抽象类
我们常常能够遇到应用程序会利用一个 ChannelHandler
来接收解码消息,并在这个Handler中实现业务逻辑,要写一个这样的 ChannelHandler
,我们只需要扩展抽象类 SimpleChannelInboundHandler< T >
即可, 其中T类型是我们要处理的消息的Java类型。
在SimpleChannelInboundHandler
中最重要的方法就是void channelRead0(ChannelHandlerContext ctx, T msg)
,
我们自己实现了这个方法之后,接收到的消息就已经被解码完的消息啦。
举个例子:
2.8、Bootstrap、ServerBootstrap 类
Bootstrap
意思是引导,一个 Netty
应用通常由一个 Bootstrap
开始,主要作用是配置整个 Netty
程序,串联各个组件。
类别 | Bootstrap | ServerBootstrap |
---|---|---|
引导 | 用于引导客户端 | 用于引导服务端 |
在网络编程中作用 | 用于连接到远程主机和端口 | 用于绑定到一个本地端口 |
EventLoopGroup 的数目 | 1 | 2 |
我想大家对于最后一点可能会存有疑惑,为什么一个是1一个是2呢?
因为服务器需要两组不同的 Channel
。
第一组将只包含一个 ServerChannel
,代表服务 器自身的已绑定到某个本地端口的正在监听的套接字。
而第二组将包含所有已创建的用来处理传入客户端连接(对于每个服务器已经接受的连接都有一个)的 Channel
。
这一点可以上文中的流程图。
2.9、ChannelFuture 接口
异步 Channel I/O 操作的结果。Netty 中的所有 I/O 操作都是异步的。这意味着任何 I/O 调用将立即返回,但不保证在调用结束时请求的 I/O 操作已完成。相反,您将返回一个 ChannelFuture 实例,该实例为您提供有关 I/O 操作的结果或状态的信息。ChannelFuture 要么未完成,要么已完成。当 I/O 操作开始时,会创建一个新的未来对象。新的未来最初是未完成的——它既没有成功,也没有失败,也没有取消,因为 I/O 操作还没有完成。如果 I/O 操作成功完成、失败或取消,则使用更具体的信息(例如失败原因)将未来标记为已完成。请注意,即使失败和取消也属于完成状态。
Netty
中所有的 IO
操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future
和 ChannelFutures
,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
常见的方法有
Channel channel()
,返回当前正在进行IO
操作的通道ChannelFuture sync()
,等待异步操作执行完毕
2.10、ChannelOption 类
Netty
在创建Channel
实例后,一般都需要设置ChannelOption
参数。ChannelOption
参数如下:ChannelOption.SO_KEEPALIVE
:一直保持连接状态ChannelOption.SO_BACKLOG
:对应TCP/IP协议listen 函数中的backlog参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理内,所N博求放在队刚中等待处理,backilog参数指定端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理, backlog参数指定了队列的大小。
三、应用实例
【案例】:
写一个服务端,两个或多个客户端,客户端可以相互通信。
3.1、服务端 Handler
ChannelHandler
的实现类或者实现子接口有很多。平时我们就是去继承或子接口,然后重写里面的方法。
在这里我们就是继承了 SimpleChannelInboundHandler< T > ,这里面许多方法都是大都只要我们重写一下业务逻辑,触发大都是在事件发生时自动调用的,无需我们手动调用。
package com.crush.atguigu.group_chat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author crush
*/
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 定义一个channle 组,管理所有的channel
* GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
*/
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* handlerAdded 表示连接建立,一旦连接,第一个被执行
* 将当前channel 加入到 channelGroup
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//将该客户加入聊天的信息推送给其它在线的客户端
/*
该方法会将 channelGroup 中所有的channel 遍历,并发送 消息,
我们不需要自己遍历
*/
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
channelGroup.add(channel);
}
/**
* 断开连接, 将xx客户离开信息推送给当前在线的客户
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");
System.out.println("channelGroup size" + channelGroup.size());
}
/**
* 表示channel 处于活动状态, 既刚出生 提示 xx上线
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上线了~");
}
/**
* 表示channel 处于不活动状态, 既死亡状态 提示 xx离线了
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 离线了~");
}
//读取数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//获取到当前channel
Channel channel = ctx.channel();
//这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
channelGroup.forEach(ch -> {
if (channel != ch) { //不是当前的channel,转发消息
ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");
} else {//回显自己发送的消息给自己
ch.writeAndFlush("[自己]发送了消息" + msg + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭通道
ctx.close();
}
}
复制代码
3.2、服务端 Server 启动
package com.crush.atguigu.group_chat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* @author crush
*/
public class GroupChatServer {
/**
* //监听端口
*/
private int port;
public GroupChatServer(int port) {
this.port = port;
}
/**
* 编写run方法 处理请求
* @throws Exception
*/
public void run() throws Exception {
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//8个NioEventLoop
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的业务处理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服务器启动");
ChannelFuture channelFuture = b.bind(port).sync();
//监听关闭
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run();
}
}
复制代码
3.3、客户端 Handler
package com.crush.atguigu.group_chat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author crush
*/
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
//当前Channel 已从对方读取消息时调用。
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
复制代码
3.4、客户端 Server
package com.crush.atguigu.group_chat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
/**
* @author crush
*/
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入相关handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//加入自定义的handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress() + "--------");
//客户端需要输入信息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//通过channel 发送到服务器端
channel.writeAndFlush(msg + "\r\n");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
复制代码
多个客户端,cv一下即可。
3.5、测试:
测试流程是先启动 服务端 Server,再启动客户端 。
四、自言自语
这篇文章应该算是个存稿了,之前忙其他事情去了😂。
作者:宁在春
链接:https://juejin.cn/post/7017602386747195429
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。