写在文章开头
Netty
内置了各种开箱即用的处理器,把握好处理器中几个比较重要的生命周期回调用助于我们编写出强大的网络通信程序,所以本文将基于一个简单的示例和源码介绍一下Netty
中几个比较重要的生命周期函数,希望对你有帮助。
Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
详解Netty几个核心的生命周期
基于基础服务端程序演示各个生命周期回调
首先我们给出服务端的处理器handler
代码示例,可以看到笔者重写了每一个生命周期函数,读者可以参考输出了解每一个回调的用意:
public class LifeCyCleHandler extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("逻辑处理器被添加调用handlerAdded");
super.handlerAdded(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel注册到eventLoop上,执行channelRegistered");
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接准备就绪,调用channelActive");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("此时有数据可读,执行channelRead");
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("数据阅读完成,调用channelReadComplete");
super.channelReadComplete(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel被关闭,调用channelInactive");
super.channelInactive(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("NIO线程解绑该channel,调用channelUnregistered");
super.channelUnregistered(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("执行handlerRemoved,逻辑处理器被移除");
super.handlerRemoved(ctx);
}
}
对应的服务端代码如下所示,逻辑比较简单,收到连接并将该socket
封装为channel
之后,为这个channel
分配一个LifeCyCleHandler
处理器:
public static void main(String[] args) {
// 引导类负责引导服务端启动工作
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 以下两个对象可以看做是两个线程组
// 负责监听端口,接受新的连接
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 负责处理每一个连接读写的线程组
NioEventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
// 配置线程组并指定NIO模型
serverBootstrap.group(bossGroup, workerGroup)
//设置IO模型,这里为NioServerSocketChannel,建议Linux服务器使用 EpollServerSocketChannel
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//为每一个新建的连接添加LifeCyCleHandler
nioSocketChannel.pipeline().addLast(new LifeCyCleHandler());
}
});
//绑定端口
serverBootstrap.bind(9191);
}
此时我们将服务端程序启动,通过cmd
的telnet
指令与其建立连接之后,我们可以看到这样的输出:
逻辑处理器被添加调用handlerAdded
channel注册到eventLoop上,执行channelRegistered
连接准备就绪,调用channelActive
由此可知服务端检测到新连接并完成channel
分配之后,每当添加一个handler
就会回调handlerAdded
方法,然后channel
会被注册到处理channel
读写事件的eventLoop
上,此时就会回调channelRegistered
方法,自此连接建立状态属于正式激活,由此触发channelActive
回调:
然后通过命令行发送数据,又会收到下面这样一段输出:
此时有数据可读,执行channelRead
数据阅读完成,调用channelReadComplete
由此可知收到可读数据时,Netty
会触发channelRead
回调,一旦数据读取完成就会触发channelReadComplete
回调:
我们将命令行界面关闭,又看到下面这样一段输出:
channel被关闭,调用channelInactive
NIO线程解绑该channel,调用channelUnregistered
执行handlerRemoved,逻辑处理器被移除
由此可知,一旦断开连接,Netty
服务端会执行如下步骤:
channel
对应socket
连接关闭,触发channelInactive
回调,此时TCP层面已经不再是establish
状态了。- 将
channel
从处理客户端channel
的eventLoop
线程中移除,触发channelUnregistered
回调 - 将
channel
的处理器移除触发handlerRemoved
回调。
详解各个生命周期回调
Netty服务端收到客户端连接时,将当前socket
封装为AbstractChannel
之后,调用AbstractChannel
的register0
调用 pipeline
的invokeHandlerAddedIfNeeded
触发handlerAdded
回调。
private void register0(ChannelPromise promise) {
//......
//回调当前channel的所有handler的handlerAdd
pipeline.invokeHandlerAddedIfNeeded();
//......
}
于是就来到我们上文所配置的ChannelInitializer
的handlerAdded
此时就会来到initChannel
将我们的生命周期处理器添加进去
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
//由此调用到我们引导类配置的添加handler的逻辑nioSocketChannel.pipeline().addLast(new LifeCyCleHandler());
initChannel(ctx);
}
}
完成处理器添加之后通过callHandlerAdded0
方法触发handlerAdded
回调:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//添加handler
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
//......
//查看当前添加处理器的线程是否是eventLoop线程,如果是则直接调用callHandlerAdded0触发handlerAdded,反之提交任务通过callHandlerAdded0执行handlerAdded
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
随后channel会将自己的事件注册到eventLoop
上,本质上调用AbstractChannel
的register0
触发fireChannelRegistered
触发回调,后续步骤完成后触发fireChannelActive
回调:
private void register0(ChannelPromise promise) {
try {
//......
//注册到eventLoop上
doRegister();
neverRegistered = false;
//......
//调用fireChannelRegistered触发channelRegistered回调
pipeline.fireChannelRegistered();
//如果channel准备就绪则调用fireChannelActive触发channelActive回调
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
}
//......
}
} catch (Throwable t) {
//......
}
}
后续读事件将消息传播到AbstractNioByteChannel
的read
方法的其内部通过fireChannelRead
触发channelRead
回调后,调用fireChannelReadComplete
触发channelReadComplete
回调:
@Override
public final void read() {
//......
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
//......
//触发channelRead
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
//读取完成后触发channelReadComplete
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
//.....
}
当客户端channel
连接断开,eventLoop
轮询到这个事件后触发当前channel
关闭处理,便执行AbstractChannel
的doDeregister
方法将其从eventLoop
中移除,然后在finally
语句块中执行channelInactive
和channelUnregistered
回调:
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
//......
invokeLater(new Runnable() {
@Override
public void run() {
try {
//将channel从eventLoop中移除
doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
//执行channelInactive和channelUnregistered回调
if (fireChannelInactive) {
pipeline.fireChannelInactive();
}
if (registered) {
registered = false;
pipeline.fireChannelUnregistered();
}
}
});
}
可能会有读者文handlerRemoved
在哪里执行,实际上在channelUnregistered
内部完成channelUnregistered
回调后就会调用destroy
方法,触发handlerRemoved
回调:
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
//触发channelUnregistered回调
ctx.fireChannelUnregistered();
//destroy内部执行handlerRemoved
if (!channel.isOpen()) {
destroy();
}
}
handlerRemoved
回调的逻辑我们步入destroy
内部的destroyDown
就可以看到这段逻辑:
private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
//......
if (inEventLoop || executor.inEventLoop(currentThread)) {
synchronized (this) {
remove0(ctx);
}
//触发handlerRemoved回调
callHandlerRemoved0(ctx);
}
//......
}
各个周期函数使用建议
所以简单小结一下:
handlerAdded
、handlerRemoved
更适合连接初始化时资源的申请和释放。channelReadComplete
处于读取完数据之后的回调,服务端读取数据时一般会做一些回复,这时候我们常常会用到writeAndFlush
,实际上如果我们希望提升程序性能,更建议使用write
方法然后在channelReadComplete
执行ctx.channel().flush()进行批量刷新更由此保证程序执行性能。channelActive
和channelInactive
在含义在可以直接理解为TCP连接的建立与释放,所以更适用于统计连接数。
对此我们也给出《Netty in action》中的对于生命周期的总结,读者可以自行阅读回顾一下:
然后是channelHandler的生命周期:
ChannelInboundHandler
的生命周期:
最后就是ChannelOutboundHandlerd
的生命周期:
小结
自此笔者从使用和源码示例上将Netty中的几个核心生命周期分析完成,希望对你有帮助。
我是 sharkchili ,CSDN Java 领域博客专家,开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili 。
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。
参考
《跟着闪电侠学Netty》
《Netty in action》