目录
channel 的主要作用
ChannelFuture
CloseFuture
为什么要异步关闭
channel 的主要作用
- close() 可以用来关闭 channel
- closeFuture() 用来处理 channel 的关闭
- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入
- writeAndFlush() 方法将数据写入并刷出
ChannelFuture
下面是一段客户端的代码
new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080)
.sync()
.channel()
.writeAndFlush(new Date() + ": hello world!");
现在把它拆开来看
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080); // 1
channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!");
1 处返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
注意 connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象
实验如下:
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.sync(); // 2
System.out.println(channelFuture.channel()); // 3
- 执行到 1 时,连接未建立,打印
[id: 0x2e1884dd]
- 执行到 2 时,sync 方法是同步等待连接建立完成
- 执行到 3 时,连接肯定建立了,打印
[id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]
除了用 sync 方法可以让异步操作同步以外,还可以使用回调的方式:
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.addListener((ChannelFutureListener) future -> {
System.out.println(future.channel()); // 2
});
- 执行到 1 时,连接未建立,打印
[id: 0x749124ba]
- ChannelFutureListener 会在连接建立时被调用(其中 operationComplete 方法),因此执行到 2 时,连接肯定建立了,打印
[id: 0x749124ba, L:/127.0.0.1:57351 - R:/127.0.0.1:8080]
CloseFuture
关闭是由另外一个线程来进行处理的,跟上面的原理一样
这里可以看出CLOSE之前就执行了关闭操作
正确关闭的第一种方式:同步关闭处理
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
log.debug("处理关闭以后的操作");
正确关闭的第二种方式:异步关闭处理
closeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("处理关闭以后的操作");
group.shutdownGracefully();
}
});
上述的测试代码
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws Exception{
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
}).connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("{}",channel);
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true){
String line = scanner.nextLine();
if("q".equals(line)){
channel.close();//close异步操作
log.debug("处理关闭之后的操作");//不能在这里善后 错误的
break;
}
}
},"input").start();
//获取CloseFuture对象
//1)同步关闭处理
//2)异步关闭处理
/* ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
log.debug("处理关闭以后的操作"); */
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("处理关闭以后的操作");
group.shutdownGracefully();
}
});
}
}
为什么要异步关闭
为什么不在一个线程中去执行建立连接、去执行关闭 channel,那样不是也可以吗?非要用这么复杂的异步方式:比如一个线程发起建立连接,另一个线程去真正建立连接。
还有人会笼统地回答,因为 netty 异步方式用了多线程、多线程就效率高。其实这些认识都比较片面,多线程和异步所提升的效率并不是所认为的
思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96
经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要 5 分钟,如下
因此可以做如下优化,只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12
效率几乎是原来的四倍,(一个小时内接待病人的效率大大提高,四个医生不是同时工作,但都要干满8小时才下班)
要点
- 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势,单线程不能同时处理多个任务,因此无法实现异步处理提高效率。异步处理需要利用多线程或多进程机制来实现,以便同时处理多个任务。多线程或多进程能够允许程序同时执行多个操作,从而提高效率。
- 异步并没有缩短响应时间,反而有所增加,异步可以提高系统的吞吐量,即能够同时处理多个请
- 合理进行任务拆分,也是利用异步的关键