通过nioSocketChannel.pipeline()的addLast添加入站处理器,如果有多个必须显示的唤醒下一个入站处理器,否则执行链中间会断掉。
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { log.debug(nioSocketChannel.toString()); //添加各种处理器 nioSocketChannel.pipeline().addLast(new LoggingHandler()); nioSocketChannel.pipeline().addLast(new StringDecoder());//字符串解析器 nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("msg 1 : {}",msg); //这里未调用唤醒下一个处理器 } }); nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("msg 1 : {}",msg); super.channelRead(ctx, msg); } }); }
执行结果:
解决方法:
super.channelRead(ctx, msg);源码:
该函数调用的是:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); }
进一步跟踪调用:
public ChannelHandlerContext fireChannelRead(final Object msg) { //找到下一个handler并执行 invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this; }
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { //新启动一个线程,执行新的处理器 executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
附加:入站处理器调用出站后,分别有:
ChannelHandlerContext.writeAndFlush:从当前处理器往前找
nioSocketChannel.writeAndFlush:从最后一个处理依次往前找。
上图执行顺序:
入站1-》入站2-》入站2调用出站-》当前往前找-》出站1
NioSocketChannel 会从后往前找出站处理器,
ChannelHandlerContext:会从当前处理器往前找出战处理器。
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg 1 : " + msg);
//唤醒下一个handler,同时可以将消息进一步处理
super.channelRead(ctx, msg + "我已经被处理过");
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("调用出战处理器 1");
System.out.println(msg.toString());
super.write(ctx, msg, promise);
}
});
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//得到消息,是第一个handler处理过后的内容
System.out.println("msg 2 : " + msg);
//ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("出出出出出".getBytes()));
//nioSocketChannel 会从最后一个依次向前找出站处理器
nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("出出出出出".getBytes()));
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("调用出战处理器 2");
System.out.println(msg.toString());
super.write(ctx, msg, promise);
}
});
nioSocketChannel.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("调用出战处理器 3");
System.out.println(msg.toString());
super.write(ctx, msg, promise);
}
});
}
nioSocketChannel 依次从最后往前调用出站处理器,执行顺序为: 3 2 1