Netty源码解析——ChannelPipeline中的责任链模式
- ChannelPipeline的作用
- ChannelPipeline的设计
- ChannelPipeline源码解析
ChannelPipeline的作用
ChannelPipeline在Netty中的作用,主要是在有事件就绪时,用于处理就绪事件的。我们知道真正处理就绪事件的其实是ChannelHandler,但是由于ChannelHandler有多个,可能会同时处理这个就绪事件,于是Netty就设计了一个ChannelPipeline,利用责任链模式串联起多个ChannelHandler。
ChannelPipeline的设计
ChannelPipeline是责任链模式的一种实现,ChannelPipeline里面是一个由多个ChannelHandlerContext串联起来的双向链表,每个ChannelHandlerContext有一个后继指针next和前驱指针prev用于指向后面一个和前面一个ChannelHandlerContext,而ChannelHandler则是被包装在ChannelHandlerContext中。
之所以不让ChannelHandler维护next和prev指针,是尽量保证ChannelHandler的职责单一(处理就绪事件)。
在ChannelPipeline中,入站事件的处理顺序是从head到tail方向依次调用ChannelHandler,而处理出站事件则是从tail到head方向依次调用ChannelHandler。并且只有ChannelInboundHandler类型的ChannelHandler才会处理入站事件,只有ChannelOutboundHandler类型的ChannelHandler才会处理出站事件。
ChannelPipeline源码解析
我们就以read事件的处理为例,看一下ChannelPipeline的源码设计。ChannelPipeline的fireChannelRead()方法就是触发ChannelPipeline处理read事件的入口,我们从这里开始。
io.netty.channel.DefaultChannelPipeline#fireChannelRead
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
调用AbstractChannelHandlerContext的静态方法invokeChannelRead(),参数head就是HeadContext,是ChannelPipeline中的链表的头部节点。
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
由于是在NioEventLoop中的事件循环中监听到事件就绪触发的事件处理,所以if (executor.inEventLoop())条件一般都是满足的,因此直接调用next.invokeChannelRead(m)方法处理。
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
AbstractChannelHandlerContext的invokeChannelRead方法里面,一般会进入if条件分支,拿到ChannelHandlerContext的ChannelHandler,因为是处理read事件,read事件是入站事件,所以是ChannelInboundHandler类型,然后调用ChannelInboundHandler的channelRead方法进行事件处理,ChannelInboundHandler的channelRead方法就是我们实现的处理逻辑。
当我们在ChannelInboundHandler的channelRead方法处理完毕后,调用ChannelHandlerContext的fireChannelRead(msg)方法,read事件就会继续往下传递,否则事件处理就在当前的ChannelInboundHandler终止。
这里HeadContext的ChannelHandler就是HeadContext自己,channelRead方法就是直接调用ctx.fireChannelRead(msg)。
io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
我们再看一个Netty内置的ChannelInboundHandler,比如LoggingHandler。
io.netty.handler.logging.LoggingHandler#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "READ", msg));
}
ctx.fireChannelRead(msg);
}
打印一行日志,然后调用ctx.fireChannelRead(msg)让事件继续往后传递,如果它不调用ctx.fireChannelRead(msg),那么事件处理就到此为止了,这也是在开发Netty应用时需要注意的地方。
ctx.fireChannelRead(msg)会进入AbstractChannelHandlerContext的fireChannelRead方法。
io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
findContextInbound(MASK_CHANNEL_READ)是找到下一个ChannelInboundHandler所在的ChannelHandlerContext,里面是通过位运算进行匹配的,优点小复杂,我们就不看了。假设现在找到了下一个ChannelHandlerContext,那么进入invokeChannelRead方法。
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
可以看到,又回到了AbstractChannelHandlerContext的invokeChannelRead方法,这个方法我们上面已经看过了,这样整个链式调用就串联起来了。