源码解析目标
- 当请求进来,ChannelPipeline如何协调内部这些Handler
- 通过源码梳理ChannelPipeline 与ChannelHandlerContext中的read,fireChannelRead等方法的不同
inbound源码解析
- 在 Netty启动流程源码剖析 文中我们已经知道,启动后,Netty通过监听processSelectedKey来对事件进行监听,我们找到NioEventLoop类中对应方法,如下
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
......
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
-
通过Debug方式启动Server, 通过run的方式启动Client,
-
我们关注的OP_READ事件的监听,调用的unsafe.read方法,
-
第一次时完成连接操作OP_ACCEPT
-
第二次到read方法才是OP_READ
-
接着追,完成SocketUtil连接后,我们得到了一个封装好的HandlerList,遍历这个list并且调用fireChannelRead,如下源码
//代码位置NioMessageUnsafe
public void read() {
......
final ChannelPipeline pipeline = pipeline();
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
......
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
//继续追fireChannelRead方法,进入我们今天要关注的DefaultChannelPipeline
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
- 我们在来看下DefaultChannelPipeline的类UML图
- 如上,他既有inbound的方法,又有outbound的方法,部分源码如下
//inbound部分
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
@Override
public final ChannelPipeline fireChannelInactive() {
AbstractChannelHandlerContext.invokeChannelInactive(head);
return this;
}
@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this;
}
@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
return this;
}
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
@Override
public final ChannelPipeline fireChannelReadComplete() {
AbstractChannelHandlerContext.invokeChannelReadComplete(head);
return this;
}
@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
return this;
}
}
- inbound部分源码可以看出,所有的入站事件都是由fire开头,表示管道的流动,让后面的handler继续处理(此处之后源码分析)
- 观察以上方法的入参,他们给定的都是一个Handler,并且都是HeadHandler节点,说明我们的入站事件都是从Head节点开始,事件依次在每一个Handler中流转最终到我们服务端,如下源码
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- next.executor(); 首先获取一个NioEventLoop 执行器
- executor.inEventLoop() 判断当前线程和是否时执行器的线程,接着执行 next.invokeChannelRead(m);
- 接着从Head节点开始执行以下方法执行以下
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
- 关键方法,channelRead(this, msg);参数this就是当前Handler,首先就是HeadContextHandler,接着看HeadHandler中的channelRead方法,只有一个fireChannelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
//所以又来到了这里,第二次执行,会再次获取一次Next,也就是第二个handler
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- 也就是HeadHandler只做了一次转发,将持有的msg转发给了下一个Handler,也就是我们自己添加的EchoServerHandler,如果我们实现的方法分中没有fireChannelRead ,则不会到tailContext中,那么读事件久完成了,总流程如下
- fireChannelRead方法 —〉 AbstractChannelHandlerContext.invokeChannelRead(context); —〉next(context) 中 next.invokeChannelRead(m); —〉具体Handler的channelRead 方法(以及自实现方法)—〉Context的fireChannelRead方法参数是下一个节点的ChannelHandlerContext(此处不一定是fireChannelRead,只是这次我们Debug是read,有可能是Actice等其他事件)
outbound源码解析
- Pipeline的outbound的fire相关方法,同样截取DefaultChannelPipeline
@Override
public class DefaultChannelPipeline implements ChannelPipeline {
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
return tail.connect(remoteAddress);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return tail.connect(remoteAddress, localAddress);
}
@Override
public final ChannelFuture disconnect() {
return tail.disconnect();
}
@Override
public final ChannelFuture close() {
return tail.close();
}
@Override
public final ChannelFuture deregister() {
return tail.deregister();
}
@Override
public final ChannelPipeline flush() {
tail.flush();
return this;
}
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
@Override
public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}
@Override
public final ChannelFuture disconnect(ChannelPromise promise) {
return tail.disconnect(promise);
}
@Override
public final ChannelFuture close(ChannelPromise promise) {
return tail.close(promise);
}
@Override
public final ChannelFuture deregister(final ChannelPromise promise) {
return tail.deregister(promise);
}
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
return tail.write(msg, promise);
}
@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return tail.writeAndFlush(msg, promise);
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
}
- 这些都是出站方法的实现,但是都是调用的outboumd类的tailHandler来进程处理的,也就是我们用pipeline中的出站方法时候总是从TailHandler节点开始处理事件
- 找一个比较典型的outbound方法分析,bind方法,如下源码
@Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}
//继续根bind
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
- 入上源码首先参数检验,isNotValidPromise
- 接着,findContextOutbound 找第一个outbound节点对应的Handler(while循环,之前文章分析过)
- 接着同inbound流程一样 next.executor(); 拿到当前HandlerContext的执行器
- 执行对于HandlerContext对于的 next.invokeBind(localAddress, promise); 方法
- 举例一个具体的HandlerContext对应的invokeBind方法,LoggingHandler,如下
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND", localAddress));
}
ctx.bind(localAddress, promise);//又回到了bind,然后接着next,同inbound一样的流程
}
总结
- ChannelPipeline中的出入站方法中出站是从tail开始,入站是从head开始,
- 因为出站是从服务器端通过ChannelPipeline到SocketChannel写,从tailHandlerContext开始,能让前面所有的Handler进行处理,能防止Handler被遗漏,比如编码
- 入站当然是从SocketChannel通过 ChannelPipeline 到服务器端,当然是从head开始往内部输入,让后面的handler能处理这些输入数据,比如解码
- 即使HeadHandler也实现了outbound接口,但是不是从head开始执行出站任务的
- 用如下图来理解数据在源码中执行的流程
- 说明
-
pipeline首先会调用Conetxt的静态方法,并传入Context
-
接着静态方法调用Context的invoke方法,而invoke方法内部会调用该Context所包含的Handler的真正的XXX方法(例如我们自己实现的EchoServerHandler中的Read,activity等事件方法)
-
调用结束后,如果还需要继续向后传递,就调用Contexxt的firexxx2方法,循环往复
-
由上可以得出,ChannelPipeline中的inbound,outbound 分别是从HeadContext,tailContext,开始执行过每一个handler
-
而ChannelHandlerContext中的inbound,outbound是从本节点开始 向后或者向前传递
-