Channel
基本类似于NIO中的Channel概念。作为读写数据的通道。
常见方法
close()
可以用来关闭 channelcloseFuture()
用来处理 channel 的关闭sync
方法作用是同步等待 channel 关闭- 而
addListener
方法是异步等待 channel 关闭
pipeline()
方法添加处理器write()
方法将数据写入writeAndFlush()
方法将数据写入并刷出
这些方法其中大多与NIO
中的Channel
一致。使用起来大同小异。
这里closeFuture
是一个异步处理关闭结果的类。
如果我们直接使用close()
方法关闭通道,它会异步的去进行关闭,直接执行后面的代码。
所以如果有操作希望在通道完全关闭后再进行就需要用这个closeFuture
了。下面会介绍怎么用。
write
方法,不一定立即发出消息。因为netty有一个缓冲机制,一般会等到消息积累一定大小再发送。所以writeAndFlush()
方法的意义在于,不仅写到缓冲区,还要立即通过网络发送出去。
ChannelFuture
这个 ChannelFuture
是与Channel
息息相关的类。
同时对于Future
,学过一些并发编程的应该了解,是我们用来获取异步处理结果的类(同步阻塞等待)。
而这里的 ChannelFuture
类似于Future
,它的目的是阻塞等待,或者添加异步回调来处理结果。
使用方法见下。
先看我们这段示例代码
我们观察截图中的代码。
可以看到用Bootstrap
创建客户端时,链式调用到connect
方法时返回的就是一个ChannelFuture
对象。接下来的sync()
与channel
方法自然就是ChannelFuture
的方法了。顾名思义,sync()
是同步等待结果,channel()
是获取对应Channel
。然后我们用拿到的Channel去做读写操作。
那么我们connect()
方法为什么不能直接拿到Channel
,而是要加这么一层ChannelFuture
呢?
因为connect()
是网络操作,需要一定时间才能完成连接的建立,完成后才能返回对应连接的Channel
(Netty
所有操作都是异步的)。所以这个ChannelFuture
就是用来等待连接结果的,就是一个异步结果的接收类,当connect()
操作没完成时去获取Channel
是获取不到的,所以调用sync
等待异步操作connect()
完成再调用channel()
获取Channel
。
注意这里使用ChannelFuture
的sync
的作用是让我们得线程对异步操作同步等待。
本段开头提到还有另一种方法处理异步结果,就是设置回调。即调用addListener
方法给ChannelFuture
添加一个监听器,并写好我们希望异步操作结束后进行什么操作,在异步操作结束后会触发监听器中我们写好的回调函数。
例子如下:
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
})
.connect("127.0.0.1", 8080);
System.out.println(channelFuture.channel()); // 1
channelFuture.addListener((ChannelFutureListener) future -> {
System.out.println(future.channel()); // 2
});
这里的lambda
表达式实际上是重写的ChannelFutureListener
类中的operationComplete
方法,这个方法会在异步操作完成后被调用,而它由我们重写,所以我们通过这种回调的方式来进行异步方法结果的处理。(具体怎么调用监听器里的方法不太清楚,需要看源码,估计实现应该不难,就是在操作结束后调一个方法。这里仅学习用法。后续我再去学习netty的源码)
同时Netty
中除了connect
的其他IO操作如write
,bind
,read
等操作也是异步的,也可以用ChannelFuture
来处理。
CloseFuture
ChannelFuture
处理connect
,write
等操作。
而CloseFuture
则是特殊的ChannelFuture
。专门处理close
操作
示例程序
@Slf4j
public class CloseFutureClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.sync().channel();
log.debug("{}", channel);
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close(); // close 异步操作 1s 之后
// log.debug("处理关闭之后的操作"); //此处不能做close操作完成后的处理,因为close是异步的,close没完成就会直接执行后面代码
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
ChannelFuture closeFuture = channel.closeFuture();
/*log.debug("waiting close...");
closeFuture.sync();
log.debug("处理关闭之后的操作");*/
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("处理关闭之后的操作");
group.shutdownGracefully();
}
});
}
}
结语
老师这部分是Channel与ChannelFuture一起讲解使用,仅仅讲述了最基本的方法。
对于ChannelFuture的状态,偏内部机制的部分没有讲解。
且在紧接着的下部分讲解Future与Promise时也没有说到。
所以我打算这篇先按课程中的讲解,把最基本的使用摆上来。在下一篇对比分析Netty的Future与Promise时再考虑是否扩展偏底层的部分(我是初学者,扩展还需要自己看书或者找文章看,虽然已经看过了,但是写出来不免还是要费些时间精力的)。
感谢阅读,欢迎批评指正。