Netty
文章目录
- Netty
- 4 Netty 模型
- 4.1 Netty 模型介绍
- 4.2 Netty demo
- 4.3 Netty 异步模型
- 4.3.1 基本介绍
- 4.3.2 异步模型
- 4.3.3 Future-Listener 机制
- 4.4 Netty 任务队列 task
4 Netty 模型
4.1 Netty 模型介绍
Netty 线程模式:Netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从Reactor 多线程模型有多Reactor
- Netty 抽象出两组线程池 BoosGroup 和 WorkGroup
- BoosGroup 专门负责接收客户端连接
- WorkGroup 专门负责网络的读写
- BoosGroup、WorkGroup 类型都是 NioEventLoopGroup
- NioEventLoopGroup 是一个事件循环组,可以是多个线程,组中包含多个事件循环,每个事件循环都是 NioEventLoop
- NioEventLoop 表示一个事件循环,不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通讯
- Boos NioEventLoop 执行步骤
- 轮询 accept 事件
- 处理 select 事件,与 client 建立连接,生成 NioSocketChannnel,并将其注册到某个 work NioEventLoop 上的 selector
- runAllTasks 处理任务队列
- Work NioEventLoop 执行步骤
- 轮询 read、write 事件
- 在对应的 NioSocketChannel 处理 read、write 事件
- runAllTasks 处理任务队列
- 每个 work NioEventLoop 处理业务时,会使用 pipline,pipline 中包含 channel,即可以通过 pipline 获取到对应的 channel,并且 pipline 中也维护了很多处理器
4.2 Netty demo
maven
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
server
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
public class NettyServer {
public static void main(String[] args) {
EventLoopGroup boosGroup = null;
EventLoopGroup workGroup = null;
try {
// 创建 boosGroup 一直循环只处理连接请求,真正的业务交由 workGroup 处理
boosGroup = new NioEventLoopGroup();
// 创建 workGroup 处理 read write 事件
workGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(boosGroup, workGroup) // 配置boosGroup workGroup
.channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 64) // 设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
.handler(new LoggingHandler(LogLevel.INFO)) // handler 在 BoosGroup 中生效
.childHandler(new ChannelInitializer<SocketChannel>() { // childHandler 在 WorkGroup 中生效
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("初始化server端channel对象...");
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter() {
/**
* 读取客户端发送的数据
* @param ctx 上下文对象, 含有 管道pipeline , 通道channel, 地址 等
* @param msg 客户端发送的数据 默认Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server 端正在接收 client 端的数据......");
ByteBuf buffer = (ByteBuf) msg;
System.out.println("client:" + ctx.channel().remoteAddress() + " 发送过来的数据 msg = " + buffer.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("server 数据读取完毕 .....");
ctx.writeAndFlush(Unpooled.copiedBuffer("server 以读取 client 发送的数据...", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("出现异常:" + cause.getMessage());
// 关闭通道
ctx.close();
}
});
}
});
System.out.println("server 端 ready !!!!");
ChannelFuture channelFuture = b.bind("127.0.0.1", 8090).sync();
// 给 ChannelFuture 注册监听器
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) System.out.println("server 端监听 8090 端口 成功....");
else System.out.println("server 端监听 8090 端口 失败....");
}
});
//关闭通道
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boosGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
client
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
public class NettyClient {
public static void main(String[] args) {
// 客户端需要一个事件循环组
NioEventLoopGroup group = null;
try {
group = new NioEventLoopGroup();
// 创建客户端启动对象
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("当通道就绪就会触发该方法.....");
ctx.writeAndFlush(Unpooled.copiedBuffer("client 通道已就绪...", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
System.out.println("client 读取 server 发送的数据 msg = " + buffer.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("出现异常,异常信息 cause = " + cause.getMessage());
// 关闭通道
ctx.close();
}
});
}
});
System.out.println("client 端已 ok ...");
// 启动客户端去连接服务器端
ChannelFuture channelFuture = b.connect("127.0.0.1", 8090).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
- Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作
- NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。
- NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
- NioEventLoopGroup下包含多个 NioEventLoop
- 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
- 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
- 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
- 每个 NioChannel 都绑定有一个自己的 ChannelPipeline
4.3 Netty 异步模型
4.3.1 基本介绍
- 异步的概念和同步相对
- 当一个异步过程调用发出后,调用者不能立刻得到结果,实 际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者
- Netty中的I/O操作是异步的,包括Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture
- 调用者并不能立刻获得结果,而是通过Future-Listener 机制,用户可以方便的主动获 取或者通过通知机制获得IO操作结果
- Netty 的异步模型是建立在 future 和 callback 的之上的
- callback 回调
- Future 的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待 fun 返回 显然不合适。那么可以在调用 fun 的时候,立马返回一个Future,后续可以通过 Future 去监控方法 fun 的处理过程(即:Future-Listener 机制)
4.3.2 异步模型
- 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要提供 callback 或利用 future 即可。这使得链式操作简单、高效,并有利于编写可重用的、通用的代码。
- Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来
4.3.3 Future-Listener 机制
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作
isDone | 判断当前操作是否完成 |
---|---|
isSuccess | 判断已完成的当前操作是否成功 |
getCause | 获取已完成的当前操作失败的原因 |
isCancelled | 判断已完成的当前操作是否被取消 |
addListener | 注册监听器 当操作已完成(isDone方法返回完成),将会通知 指定的监听器;如果Future对象已完成,则通知指定的监听器 |
相比传统阻塞I/O,执行I/O操作后线程会被阻塞住,直到操作完成
异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量
4.4 Netty 任务队列 task
处理耗时操作
- 用户程序自定义普通任务
将任务提交 taskQueue中 但还是一个线程在执行 - 定时提交任务
将任务提交 scheduledTaskQueue 使用不同的线程
childHandler(new ChannelInitializer<SocketChannel>() { // childHandler 在 WorkGroup 中生效
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("初始化server端channel对象...");
System.out.println("ChannelInitializer thread name = " + Thread.currentThread().getName());
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChannelInboundHandlerAdapter() {
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有 管道pipeline , 通道channel, 地址 等
* @param msg 客户端发送的数据 默认Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server 端正在接收 client 端的数据......");
System.out.println("ChannelInitializer channelRead thread name = " + Thread.currentThread().getName());
System.out.println("ChannelInitializer channelRead 普通handle thread name = " + Thread.currentThread().getName());
ByteBuf buffer0 = (ByteBuf) msg;
System.out.println("client:" + ctx.channel().remoteAddress() + " 普通handle 发送过来的数据 msg = " + buffer0.toString(CharsetUtil.UTF_8));
ctx.channel().eventLoop().execute(() -> {
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ChannelInitializer channelRead taskQueue handle thread name = " + Thread.currentThread().getName());
ByteBuf buffer1 = (ByteBuf) msg;
System.out.println("client:" + ctx.channel().remoteAddress() + " 发送过来的数据 msg = " + buffer1.toString(CharsetUtil.UTF_8));
});
ctx.channel().eventLoop().schedule(() -> {
System.out.println("ChannelInitializer channelRead scheduleQueue handle thread name = " + Thread.currentThread().getName());
ByteBuf buffer2 = (ByteBuf) msg;
System.out.println("client:" + ctx.channel().remoteAddress() + " 发送过来的数据 msg = " + buffer2.toString(CharsetUtil.UTF_8));
}, 60, TimeUnit.SECONDS);
System.out.println("server doing...");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("server 数据读取完毕 .....");
ctx.writeAndFlush(Unpooled.copiedBuffer("server 以读取 client 发送的数据...", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("出现异常:" + cause.getMessage());
// 关闭通道
ctx.close();
}
});
}
})