参考文章 1
参考文章 2
官网API文档
Reactor
模型
Netty模型
Netty主要基于主从Reactor多线程
模型进行了一定的修改,该模型包括以下几个组件:
-
MainReactor
(主Reactor):负责处理客户端的连接请求。它监听服务器上的端口,接收客户端的连接请求,并将请求分发给SubReactor进行处理。 -
SubReactor
(从Reactor):负责处理连接成功后的通道的IO读写请求。每个SubReactor负责管理一组通道,它们使用多路复用技术(如Java NIO)来监听通道上的事件,例如可读、可写等事件。一般情况下,每个SubReactor都对应一个线程。 -
Worker Threads
(工作线程):负责处理非IO请求,即具体的业务逻辑处理。当有非IO请求需要处理时,这些任务会被写入一个队列中,等待工作线程来处理。工作线程可以是线程池中的线程,也可以是其他类型的线程。
模块组件
Bootstrap
、ServerBootstrap
: 在Netty中,Bootstrap
和ServerBootstrap
是用于启动和配置Netty应用程序的引导类。Bootstrap
类是用于客户端程序的启动引导类,ServerBootstrap
类是服务端启动引导类。NioEventLoopGroup
:NioEventLoopGroup
是Netty中用于管理NioEventLoop
的组件,它是一个线程池,包含多个NioEventLoop
实例,它对应着主从Reactor多线程模型中Reactor。NioEventLoopGroup
负责创建、管理和分配NioEventLoop
,处理异常情况,并提供优雅关闭的机制。它是Netty实现高性能的重要组件之一。NioEventLoop
:NioEventLoop
是Netty中的核心组件,负责处理I/O事件和执行任务。它使用Selector
来监听和处理注册在其上的Channel
的I/O事件,同时支持异步提交和执行任务。NioEventLoop
还管理定时任务和处理异常情况,是实现高性能和事件驱动的重要组成部分。
什么是Selector
-
Selector
:Netty基于Selector
对象实现了I/O多路复用。通过将多个Channel
注册到一个Selector
中,并使用一个线程来监听和处理这些Channel
的事件,可以高效地管理多个Channel
。Selector
会自动不断地查询这些注册的Channel
,以检查它们是否有已就绪的I/O事件。 -
Channel
:在Netty中,Channel
表示一个开放的网络连接,可以用于读取、写入和处理网络数据。它是网络通信的基本单元,负责处理底层的数据传输和事件通知。NioSocketChannel
:异步的客户端 TCP Socket 连接NioServerSocketChannel
:异步的服务器端 TCP Socket 连接NioDatagramChannel
:异步的 UDP 连接NioSctpChannel
:异步的客户端 Sctp 连接NioSctpServerChannel
:异步的 Sctp 服务器端连接
-
ChannelHandler
:在Netty中,ChannelHandler
是一个接口,用于处理I/O事件或拦截I/O操作,并将其转发到ChannelPipeline
中的下一个处理程序。ChannelHandler
是Netty的核心组件之一,它负责处理各种事件,如连接建立、数据读写、异常发生等。ChannelInboundHandler
:用于处理入站I/O事件ChannelOutboundHandler
:用于处理出站I/O操作
-
ChannelHandlerContext
:ChannelHandlerContext
保存了与特定Channel
相关的所有上下文信息,同时关联一个ChannelHandler
对象,并提供了访问Channel
、ChannelHandler
和ChannelPipeline
的方法。 -
ChannelPipline
: 它是一个保存ChannelHandler
的列表,用于处理或拦截Channel
的入站事件和出站操作。ChannelPipeline
实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式以及Channel
中各个的ChannelHandler
如何相互交互。
在 Netty 中每个Channel
都有且仅有一个ChannelPipeline
与之对应, 它们的组成关系如下:
一个Channel
包含了一个ChannelPipeline
,而ChannelPipeline
中维护了一个由ChannelHandlerContext
组成的双向链表。入站事件和出站事件在这个双向链表中进行传递,入站事件从链表的head
往后传递到最后一个入站的ChannelHandler
,出站事件从链表的tail
往前传递到最前一个出站的ChannelHandler
,两种类型的ChannelHandler
互不干扰。通过这种设计,ChannelPipeline
实现了事件的顺序传递和处理。 -
Future
、ChannelFuture
: Netty中的IO操作都是异步的,这意味着在发起一个IO操作后,无需等待其完成就可以继续执行后续的代码逻辑。为了获取操作的执行结果或者在操作完成时得到通知,Netty提供了Future
和ChannelFuture
。
实例代码
服务端
public class MyServer {
public static void main(String[] args) throws Exception {
//创建两个线程组 boosGroup、workerGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务端的启动对象,设置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
//设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//使用匿名内部类的形式初始化通道对象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道设置处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("java技术爱好者的服务端已经准备就绪...");
//绑定端口号,启动服务端
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 自定义的Handler需要继承Netty规定好的HandlerAdapter
* 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
**/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//发送消息给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,并给你发送一个问号?", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常,关闭通道
ctx.close();
}
}
客户端
public class MyClient {
public static void main(String[] args) throws Exception {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建bootstrap对象,配置参数
Bootstrap bootstrap = new Bootstrap();
//设置线程组
bootstrap.group(eventExecutors)
//设置客户端的通道实现类型
.channel(NioSocketChannel.class)
//使用匿名内部类初始化通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加客户端通道的处理器
ch.pipeline().addLast(new MyClientHandler());
}
});
System.out.println("客户端准备就绪,随时可以起飞~");
//连接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
//对通道关闭进行监听
channelFuture.channel().closeFuture().sync();
} finally {
//关闭线程组
eventExecutors.shutdownGracefully();
}
}
}
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发送消息到服务端
ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~马来西亚~", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收服务端发送过来的消息
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
}