写在前面
本文看下netty关闭channel相关源码。
1:前置准备
为了测试,我们需要使用netty源码中examples模块的echoserver和echoclient,但是echoclient因为会不断的发送消息,并不会断开连接,所以,我们需要修改为如下:
package io.netty.example.echo;
public final class EchoClientForDebugClose {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
// p.addLast(new EchoClientHandler()); 因为要debug通道close时server端的逻辑,所以这里直接注释
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
// f.channel().closeFuture().sync(); 因为要debug通道close时server端的逻辑,所以这里直接注释
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
其中代码group.shutdownGracefully();
就是客户端关闭通道的代码。当该代码执行后会触发server端的op_read事件,只不过此时读取到的数据量是-1,代表正常的EOF。接着就可以正式开始debug源码了。
2:正戏
首先在server端代码group.shutdownGracefully();
和服务端代码io.netty.channel.nio.NioEventLoop#run
打断点,首先启动server,此时我们先mute server端代码,接着启动client端代码, client端代码进入debug:
接着取消server端代码断点mute,就可以开始调试了:
执行到代码:
// io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
i// ...
try {
// ...
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop op_accept接收连接事件,op_read读数据事件的话为true,进if执行
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read(); // 读数据
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
继续:
// io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator(); // 这是一个自适应大小的分配器
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); // 用来动态调整buf的实际大小,是一个guess的过程,连续读的多就变大,反之变小
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// ...
// doReadBytes是真的读数据,lastBytesRead记录读到的总字节数以及当次读到的字节数,以作为动态调整buf大小的依据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) { // 读到的数据小于0,则说明是客户端关闭连接
// nothing was read. release the buffer. 数据清理,释放堆外内存
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// ...
} while (allocHandle.continueReading()); // allocHandle.continueReading() 代表在满足特定条件下会尝试多次重复读取数据
// ...
if (close) { // 关闭逻辑处理
closeOnRead(pipeline);
}
} catch (Throwable t) { // 异常关闭,同样是
// ...
} finally {
// ...
}
}
方法doReadBytes(byteBuf)
当返回-1,代表EOF了,即客户端正常关闭了channel,所以接下来的if就为true了,内部执行一些资源释放的工作。主要看方法closeOnRead(pipeline);
:
// io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#closeOnRead
private void closeOnRead(ChannelPipeline pipeline) {
if (!isInputShutdown0()) {
if (isAllowHalfClosure(config())) { // 半开,忽略
shutdownInput();
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise()); // 进一步执行关闭
}
} else {
inputClosedSeenErrorOnRead = true;
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
看方法close(voidPromise());
:
// io.netty.channel.AbstractChannel.AbstractUnsafe#close(io.netty.channel.ChannelPromise, java.lang.Throwable, java.nio.channels.ClosedChannelException, boolean)
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
// ...
if (closeExecutor != null) {
// ...
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise); // do了,真的去关了
} finally {
// ...
}
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive); // 触发事件了,通道变为不激活,以及取消事件注册
}
}
}
方法doClose0(promise);
,又do了,看到do就有点莫名兴奋,有没有?:
// io.netty.channel.socket.nio.NioSocketChannel#doClose
protected void doClose() throws Exception {
super.doClose();
javaChannel().close(); // 本质了执行Java NIO channel的close方法,完成通道关闭
}
javaChannel().close();
就已经是JavaNIO的代码了,内部会最终执行如下位置,关闭通道的同时也会取消selection key事件的注册:
// java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
k.cancel();
}
}
}
写在后面
参考文章列表
netty之导入源码到idea。