1 Pipeline设计原理
在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应,它们的组成关系如下图:
通过上图可以看到,一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由ChannelHandlerContext组成的双向链表。这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext又关联着一个ChannelHandler。
通过分析代码,已经知道了一个Channel初始化的基本过程,下面在回顾一下。AbstractChannel构造器的代码如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
AbstractChannel有一个pipeline属性,在构造器中会把它初始化为DefaultChannelPipeline的实例。这里的代码就印证了这一点:每个Channel都有一个ChannelPipeline。来看一下DefaultChannelPipeline的构造器,代码如下:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在DefaultChannelPipeline构造器中,首先将与之关联的Channel保存到属性channel中。然后实例化两个ChannelHandlerContext:一个是HeadContext实例Head,另一个是TailContext实例Tail。接着将Head和Tail互相指向,构成一个双向链表。
特别注意的是:在开始的示意图中,Head和Tail并没有包含ChannelHandler,这是因为HeadContext和TailContext继承于AbstractChannelHandlerContext的同时,也实现了ChannelHandler接口,所以它们有Context和Handler的双重属性。
2 ChannelPipeline初始化
下面看一下ChannelPipeline的初始化具体做了哪些工作。先回顾一下,在实例化一个Channel时,会伴随着一个ChannelPipeline的实例化,并且此Channel会与这个ChannelPipeline相互关联,这一点可以通过NioEventLoop的父类AbstractChannel的构造器予以佐证,代码如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
当实例化一个NioSocketChannel时,其pipeline属性就是新创建的DefaultChannelPipeline对象,再来回顾一下DefaultChannelPipeline的构造方法,代码如下:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
上面代码中的Head实现了ChannelInboundHandler接口,而Tail实现了ChannelOutboundHandler接口,因此可以说Head和Tail就是ChannelHandler,又是ChannelHandlerContext。
3 ChannelInitializer的添加
前面分析过Channel的组成,最开始的时候ChannelPipeline中含有两个ChannelHandlerContext,但是此时的Pipeline并不能实现特定的功能,因为还没有添加自定义的ChannelHandler。通常来说,在初始化Bootstrap时,会添加自定义的ChannelHandler,下面就以具体的客户端启动代码片段举例:
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
System.out.println("初始化channel:" + socketChannel);
}
});
在调用Handler时,传入ChannelInitializer对象,它提供了一个initChannel方法来初始化ChannelHandler。通过代码跟踪,发现ChannelInitializer是在Bootstrap的init方法中添加到ChannelPipiline中的,代码如下:
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelHandler[]{this.config.handler()});
Map<ChannelOption<?>, Object> options = this.options0();
synchronized(options) {
Iterator i$ = options.entrySet().iterator();
while(true) {
if (!i$.hasNext()) {
break;
}
Entry e = (Entry)i$.next();
try {
if (!channel.config().setOption((ChannelOption)e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable var10) {
logger.warn("Failed to set a channel option: " + channel, var10);
}
}
}
Map<AttributeKey<?>, Object> attrs = this.attrs0();
synchronized(attrs) {
Iterator i$ = attrs.entrySet().iterator();
while(i$.hasNext()) {
Entry<AttributeKey<?>, Object> e = (Entry)i$.next();
channel.attr((AttributeKey)e.getKey()).set(e.getValue());
}
}
}
从上面的代码可见,将handler()方法返回的ChannelHandler添加到Pipeline中,而handler()方法返回的其实就是在初始化Bootstrap时通过handler方法设置的ChannelInitializer实例,因此这里就将ChannelInitializer插到了Pipieline的末端。此时Pipeline的结构如下图所示:
这是会有一个疑问,明明插入的是 ChannelInitializer实例,为什么在ChannelPipeline的双向链表中的元素却是一个ChannelHandlerContext呢?继续看源码,在Bootstrap的init方法中会调用p.addList()方法,将ChannelInitializer插入链表的末端,代码如下:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
addList方法有很多重载的方法,只需要关注这个方法即可。上面的addList方法中,首先检查ChannelHandler的名字是否重复,如果不重复,则调用newContext方法为这个Handler创建一个对应的DefaultChannelHandlerContext实例,并与之关联起来。
为了添加一个Handler到Pipeline中,必须把此Handler包装成ChannelHandlerContext。因此在上面的代码中,我们新实例化一个newCx对象,并将Handler作为参数传递到构造方法中。下面来看一下DefaultChannelHandlerContext的构造器。
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
在DefaultChannelHandlerContext的构造器中,调用了isInbound()方法和isOutbound()方法,这两个方法的代码如下:
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
从上面代码中可以看到,当一个Handler实现了ChannelInboundHandler接口,则isInbound返回true;类似的,当一个Handler实现了ChannelOutboundHandler接口,则isOuntbound返回true。而这两个boolean类型变量会传递给父类AbstractChannelHandlerContext中,并初始化父类的两个属性:inbound和outbound。
这里的ChannelInitializer所对应的DefaultChannelHandlerContext的inbound与outbound属性分别是什么呢?先来看ChannelInitializer的类层次结构图,如下图:
可以看到,ChannelInitializer仅仅实现了ChannelInboundHandler接口,因此这里实例化的DefaultChannelHandlerContext的inbound是true,outbound是false。
inbound和outbound这两个属性关系到Pipeline事件的流向与分类,因此十分关键。这里先记住一个结论:ChannelInitializer所对应的DefaultChannelHandlerContext的inbound=true,outbound=false。
当创建好Context之后,就将这个Context插入Pipeline的双向链表中。
DefaultChannelPipeline.java
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
添加完ChannelInitializer的Pipeline内部如下图所示
4 自定义ChannelHandler的添加过程
上面分析了ChannelInitializer是如何插入Pipeline中的,接下来探讨ChannelInitializer在哪里被调用、ChannelInitializer的作用以及自定义的ChannelHandler是如何插入Pipeline中的。
自定义ChannelHandler的添加过程,发生在AbstractUnsafe的register方法中,在这个方法中调用了pipeline.fireChannelRegister()方法,代码如下:
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
再看AbstractChannelHandlerContext的invokeChannelRegister()方法。
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
很显然,这个代码将从Head开始遍历Pipeline的双向链表,然后找到第一个属性inbound为true的ChannelHandlerContext实例。在分析ChannelInitializer时,专门分析了inbound和outbound属性,现在这里就用上了。回想一下,ChannelInitializer实现了ChannelInboundHandler,因此它所对应的ChannelHandlerContext的inbound属性为true,因此这里返回的就是ChannelInitializer实例所对应的ChannelHandlerContext对象,如下图所示
当获取inbound的Context后,就调用它的invokeChannelRegistered()方法。
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
我们已经知道,每个ChannelHandler都和一个ChannelHandlerContext关联,可以通过ChannelHandlerContext获取对应的ChannelHandler。很明显,这里handler()方法返回的对象其实就是一开始实例化的ChannelInitializer对象,接着调用了ChannelInitializer的channelRegister()方法。ChannelInitializer的channelRegister()方法的代码如下:
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (this.initChannel(ctx)) {
ctx.pipeline().fireChannelRegistered();
} else {
ctx.fireChannelRegistered();
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (this.initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
this.initChannel(ctx.channel());
} catch (Throwable var6) {
this.exceptionCaught(ctx, var6);
} finally {
this.remove(ctx);
}
return true;
} else {
return false;
}
initChannel()方法就是在初始化Bootstrap时,调用handler方法传入的匿名内部类所实现的方法。因此,在调用这个方法之后,自定义的ChannelHandler就插入到Pipeline中,此时Pipeline的状态如下图:
当添加完自定义的ChannelHandler后,在finally代码块会删除自定义的ChannelHandler,也就是remove(ctx),最终调用ctx.pipeline().remove(this),因此最后Pipeline的状态如下图:
到此,自定义ChannelHandler的添加过程也就分析完成了。
5 给ChannelHandler命名
pipeline.addXXX()都有一个重载方法,例如addList()有一个重载的版本,代码如下:
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
第一个参数指定添加的是Handler的名字(更准确的说是ChannelHandlerContext的名字)。那么Handler的名字有什么用呢?如果不设置name,那么Handler默认的名字是怎么样的?上面的方法会调用重载的addLast()方法,代码如下:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
第一个参数设置为null,不用关心。第二个参数就是Handler的名字。有代码可知,再添加一个Handler之前,需要调用checkMultiplicity()方法来确定新添加的Handler名字是否与已添加的Handler名字重复。