Netty
- 基本概念理解
- 阻塞与非阻塞
- 同步与异步
- BIO 与 NIO
- Reactor 模型
- Netty 基本概念
- Netty 的执行流程
- Netty 的模块组件
- Netty 工作原理
- Netty 的基本使用
- Netty Server
- Netty Client
- 参考文章
基本概念理解
阻塞与非阻塞
阻塞与非阻塞是进程访问数据时的处理方式,根据数据是否准备完成,划分为:
-
阻塞:进程将一直等待数据准备完成;
-
非阻塞:若进程发现数据未准备完成,则立即返回,期间不断的进行数据请求,直到数据准备完成,直接返回;
例如烧水,我们站在水壶旁边一直等待水烧开即为阻塞;而我们在将水烧上之后,就去处理别的事情,并按照一定的频率来看水是否烧开,这个过程即为非阻塞。阻塞与非阻塞的区分在于“人”。
同步与异步
同步与异步是应用程序和操作系统处理 IO 时的处理方式,根据数据是否准备完成,划分为:
-
同步:应用程序直接参与 IO 的操作;
-
异步:应用程序将 IO 操作完全交由操作系统处理,应用程序只等待通知;
还是烧水的例子,我们烧水并一直阻塞在水壶旁,等待水开的过程即为同步。我们需要参与整个水烧开的过程;而如果有一个智能水壶,当水烧开时会发出提示声,这样我们就可以打开烧水开关后继续去完成其他任务,等待水壶提醒水已烧开即可,该过程即为异步。同步与异步的区分在于“水壶”。
BIO 与 NIO
-
BIO 是一种同步阻塞的模型,对应传统的
java.io
包,其通信基于流模型实现,在 IO 操作完成之前,对应线程会一直阻塞; -
NIO 是一种同步非阻塞的模型,对应
java.nio
包,其通信基于缓冲实现,对于高负载、高并发的网络应用,适合使用 NIO 的非阻塞模式来开发;
BIO 的 Server 通信模型:
NIO 的 Server 通信模型:
Reactor 模型
单线程的 Reactor 模型
多线程的 Reactor 模型
多线程的主从 Reactor 模型
Netty 基本概念
Netty 的执行流程
Netty 的模块组件
Bootstrap/ServerBootstrap
Bootstrap 是 Netty 应用的引导类,其作用是配置整个 Netty 程序,并串联各个组件。
Future/ChannelFuture
Netty 中的所有 IO 操作都是异步的。消息被处理后不会立刻得到处理结果,此时就需要通过该组件注册监听以追踪后续结果,当消息的处理操作产生结果相应的监听会自动触发监听事件。
Channel
Netty 中负责网络通信的组件,用于处理网络 IO 操作。
Selector
Netty 基于 Selector 实现 IO 多路复用,通过该组件,一个线程可以监听多个注册的 Channel 事件。
NioEventLoop
NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用其中的 run 方法,执行 IO 或非 IO 的任务。
IO 任务即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发;非 IO 任务即添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。
NioEventLoopGroup
该模块管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程 NioEventLoop 负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。
ChannelHandler
ChannelHandler 是一个接口,处理 IO 事件或拦截 IO 操作,并将其转发到其 ChannelPipeline 中的下一个处理程序。
ChannelHandlerContext
保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象。
ChannelPipline
保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。
Netty 工作原理
Server 端工作原理
server 端启动时,会绑定本地的端口,并将自己的 NioServerSocketChannel 注册到某一个 bossNioEventLoop 的 selector 中。server 端包含 1 个 boss NioEventLoopGroup 和 1 个 worker NioEventLoopGroup。
每个 boss NioEventLoop 循环执行的任务包含 3 步:
- 轮询 accept 事件;
- 处理 io 任务,即 accept 事件,与 client 建立连接,生成 NioSocketChannel,并将 NioSocketChannel 注册到某个 worker NioEventLoop 的 selector 上;
- 处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其它线程提交到该 eventloop 的任务。
每个 worker NioEventLoop 循环执行的任务包含 3 步:
- 轮询 read、write 事件;
- 处理 io 任务,即 read、write 事件,在 NioSocketChannel 可读、可写事件发生时进行处理;
- 处理任务队列中的任务,runAllTasks。
Client 端工作原理
client 端启动时 connect 到 server,建立 NioSocketChannel,并注册到某个 NioEventLoop 的 selector 上。
client 端只包含 1 个 NioEventLoopGroup,每个 NioEventLoop 循环执行的任务包含 3 步:
- 轮询 connect、read、write 事件;
- 处理 io 任务,即 connect、read、write 事件,在 NioSocketChannel 连接建立、可读、可写事件发生时进行处理;
- 处理非 io 任务,runAllTasks。
Netty 的基本使用
首先引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.51.Final</version>
</dependency>
Netty Server
Netty Server 端代码:
public class Server {
/**
* 服务端端口
*/
private int port;
/**
* 构造方法:需指定服务器端口号
*/
public Server(int port) {
this.port = port;
}
/**
* 服务器启动方法
*/
public void start() {
System.out.println("服务器启动中...");
// 配置服务端线程组
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
// 服务器启动引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
// 设置存放已完成 tcp 三次握手的请求的等待队列的最大长度
.option(ChannelOption.SO_BACKLOG, 1024)
// 设置允许复用 host/port,默认 false
.option(ChannelOption.SO_REUSEADDR, true)
// 指定客户端使用的 handler,同样可以配置多个
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerHandler());
}
});
try {
// 启动服务器,绑定端口,同步等待绑定成功
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
System.out.println("服务端启动成功...");
// 等待服务器监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("服务器启动失败!");
e.printStackTrace();
} finally {
// 退出,释放线程池
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
// 启动服务端
new Server(8000).start();
}
}
ServerHandler 代码:
/**
* ServerHandler
* @ChannelHandler.Sharable 允许多个线程使用此 handler
*/
@ChannelHandler.Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
/**
* 建立连接时 channel 被激活,将自动调用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
/**
* 接收到客户端信息时调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf data = (ByteBuf) msg;
System.out.println("服务器接收到数据>>>" + data.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(data);
}
/**
* 发生异常时调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Netty Client
Netty Client 端代码:
public class Client {
/**
* 要连接的服务器主机地址
*/
private String host;
/**
* 要连接的服务器端口号
*/
private int port;
/**
* 构造方法,指定服务器的主机地址和端口号
*/
public Client(String host, int port) {
this.host = host;
this.port = port;
}
/**
* 客户端启动方法
*/
public void start() {
System.out.println("客户端启动中...");
// EventLoop可以看做线程,EventLoopGroup可以看做线程组?
NioEventLoopGroup group = new NioEventLoopGroup();
// 实例化启动引导类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
// 指定服务端主机地址和端口
.remoteAddress(new InetSocketAddress(host, port))
// 设置允许复用 host/port,默认 false
.option(ChannelOption.SO_REUSEADDR, true)
// 指定客户端使用的 handler
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 此处可以指定多个 handler,各个 handler 按照添加的顺序进行处理
socketChannel.pipeline().addLast(new ClientHandler());
}
});
try {
// 连接到服务器,connect() 是异步连接,需要调用 sync() 同步等待连接成功
ChannelFuture channelFuture = bootstrap.connect().sync();
System.out.println("客户端启动成功...");
// 阻塞线程直到客户端 channel 关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("客户端启动失败!");
e.printStackTrace();
} finally {
// 执行退出,释放 NIO 线程组
group.shutdownGracefully();
}
}
@SuppressWarnings("all")
public static void main(String[] args) throws InterruptedException {
// 启动客户端,因为启动后会阻塞所在线程,所以单独用一个线程来启动
new Thread(() -> {
new Client("localhost", 8000).start();
}).start();
Thread.sleep(1000);
ClientHandler.sendMsg("Hello, Netty...");
}
}
ClientHandler 代码:
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static ChannelHandlerContext context;
/**
* 建立连接时 channel 被激活,将自动调用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ClientHandler.context = ctx;
}
/**
* 接收到服务器返回信息时调用
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println("服务端返回的数据>>>" + byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 处理完服务器返回的数据时调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
/**
* 发生异常时调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 暴露方法供外部调用,向服务器发送消息
*/
public static void sendMsg(String msg) {
ByteBuf byteBuf = Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8);
ClientHandler.context.writeAndFlush(byteBuf);
}
}
参考文章
- 【硬核】肝了一月的Netty知识点
- 再有人问你Netty是什么,就把这篇文章发给他
- 万字长文带你深入理解netty,史上最强详解!