一、Netty基本介绍
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty 在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
本文主要介绍下netty的PileLine类。
二、继承体系
三、核心内容
1.DefaultChannelPipeline
在ServerBootstarp的介绍中有跟到pileline的创建,即:在创建ServerBootstarp在channel的时候封装了unsafe,newId,以及创建pileline
// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 为channel生成id,由五部分构成
id = newId();
// 生成一个底层操作对象unsafe
unsafe = newUnsafe();
// 创建与这个channel相绑定的channelPipeline
pipeline = newChannelPipeline();
}
跟进去newChannelPipeline()方法
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
跟进去DefaultChannelPipeline的构造器
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
// 创建一个记录成功的 succeededFuture
succeededFuture = new SucceededChannelFuture(channel, null);
// 创建一个记录异常的 voidPromise,在 VoidChannelPromise 方法中创建了一个异常监听器,
// 触发重写的 fireExceptionCaught(cause) 的方法执行。
voidPromise = new VoidChannelPromise(channel, true);
// 创建尾节点
tail = new TailContext(this);
// 创建头节点
head = new HeadContext(this);
// 将头尾节点连接
head.next = tail;
tail.prev = head;
}
先看下TailContext类
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
// 调用父类构造器
super(pipeline, null, TAIL_NAME, TailContext.class);
// 修改节点状态
setAddComplete();
}
// 省略...
}
TailContext是一个内部类,并实现了ChannelInboundHandler 处理器,是一个 InboundHandler, InboundHandler 和 OutboundHandler 处理器下面单独说
先跟进去setAddComplete()修改状态的方法
final boolean setAddComplete() {
for (;;) {
int oldState = handlerState; // 获取处理器状态
if (oldState == REMOVE_COMPLETE) { // 处理器状态为移除状态
return false;
}
// 通过CAS方式将处理器状态修改为 添加完毕
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return true;
}
}
}
这里通过CAS操作将当前节点的处理器状态修改为添加完毕
接着跟进TailContext的父类构造器
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
// 每个处理器节点都会绑定一个executor
this.executor = executor;
// 执行标记
this.executionMask = mask(handlerClass);
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
跟进去mask()方法
static int mask(Class<? extends ChannelHandler> clazz) {
// 从缓存中尝试着获取标记
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
// 创建一个标记
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
跟进去mask0()方法
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
// 处理节点类型为inbound处理器的情况
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
// 若channelRegistered()方法上出现了@skip注解,测该方法的执行将要被跳过
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
// 该运算最终会使mask中该方法对应的二进制位置0
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
// 处理节点类型为outbound处理器的情况
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask;
}
这是充分使用了二进制的开关的性质,这里方法的作用就是将所有的 InboundHandler 处理器和 OutboundHandler 处理器中定义的方法进行标记,如果其中的方法被实现了,并且方法中没有 @Skip 注解,则当前方法对应的二进制位的值是 1,当当前标记位等于 1 时,则标记当前方法时需要执行的。
TailContext 和 HeadContext 中所有的标记位都是 1,因为 TailContext 和 HeadContext 分别都实现了 InboundHandler 和 OutboundHandler 接口中的接口。
我在在写代码的过程中,没有直接实现ChannelInboundHandler,而是继承了 ChannelInboundHandlerAdapter。
在 ChannelInboundHandlerAdapter 中每一个实现方法上都有一个 @Skip 注解,而且它默认实现了所有的 InboundHandler 接口的方法,就是为了我们在定义自定义处理器减少一些默认实现的处理,而且为了性能在初始化时将所有的方法打上标记,保证只执行我们自己实现的方法,这就是这个标记的用处。
这里 mask 处理的都是 InboundHandler 和 OutboundHandler 处理器中的接口方法。
看完 TailContext,再看下HeadContext
// 头节点既是inbound处理器,也是outbound处理器
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
// 省略...
}
HeadContext 也是内部类,这里与 TailContext 不同的是,HeadContext 同时实现了 InboundHandler 和 OutboundHandler。并且创建了一个用于底层 IO 处理的 unsafe 对象。
2.ChannelInitializer处理器
先看下继承
ChannelInitializer 继承于 ChannelInboundHandler 接口,是一个抽象类,定义了一个抽象方法:
protected abstract void initChannel(C ch) throws Exception;
在创建ServerBootstrap的时候会用到这个类
在 ServerBootstrap 中初始化方法 io.netty.bootstrap.ServerBootstrap#init 方法中可以找到代码:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
负责在 accept 新连接的 Channel 的 pipeline 被添加了一个 ChannelInitializer,由于此时这个 Channel 还没有被注册到EventLoop,于是在 addLast 方法的调用链中,会给 pipeline 添加一个 PendingHandlerAddedTask ,其目的是在 Channel被注册到 EventLoop 的时候,触发一个回调事件然后在 AbstractBootstrap.initAndRegister() 方法中,这个Channel会被注册到 ParentEventLoopGoup,接着会被注册到 ParentEventLoopGoup 中的某一个具体的 EventLoop 然后在AbstractChannel.register0() 方法中,之前注册的 PendingHandlerAddedTask 会被调用,经过一系列调用之后,最终 ChannelInitializer.handleAdded() 方法会被触发。所以我们进去看 ChannelInitializer.handleAdded() 方法:
// 当前处理器所封装的处理器节点被添加到pipeline后就会触发该方法的执行
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) { // 若channel已经完成了注册
if (initChannel(ctx)) {
// 将当前处理器节点从initMap集合中删除
removeState(ctx);
}
}
}
可以看到 ChannelInitializer 的方法触发时必须在 Channel 注册完成之后,然后开始执行 initChannel 方法,在初始化操作完成之后又将当前处理器节点从 initMap 集合中移除。现在先看看 initChannel 方法:
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
// 将当前处理器节点添加到initMap集合中
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 调用重写的initChannel()
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
// 立即将该处理器节点从pipeline上删除
ChannelPipeline pipeline = ctx.pipeline();
// 查找在pipeline中是否存在当前处理器
if (pipeline.context(this) != null) {
// 将当前处理器节点从pipeline中删除
pipeline.remove(this);
}
}
return true;
}
return false;
}
在 Server 端 ServerBootstrap 中我们使用 ChannelInitializer 给 pipeline 中添加处理器节点。
再回到 initChannel 方法。继续看再其执行完重新的 initChannel 方法之后必然执行 finally 的代码,首先获取当前Pipeline,pipeline.context(this) 从 Pieline 查找但其处理器节点是否存在。存在然后 pipeline.remove(this) 将其从当前 Pipeline 中移除。
ChannelInitializer总结:只是一个向pileline添加处理器的工具,在处理器添加完成之后将自生移除(因为本身也是个处理器)。
3.将Handler添加到Pileline中
在添加自定义处理器的时候,我们都是调用pileline的addLast()方法,所以看下DefaultChannelPipeline中的addLast方法
public final ChannelPipeline addLast(ChannelHandler... handlers) {
// 第一个参数为group,其值默认为null
return addLast(null, handlers);
}
每个 Pipeline 都有一个 EventLoop 绑定,这里添加方法默认传入一个 EventLoopGroup 参数,不过这里传了空;继续往下跟:
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
// 遍历所有handlers,逐个添加到pipeline
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);// 这里第二个参数是处理器name
}
return this;
}
跟进去addLast()方法
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检测处理器是否被多次添加
checkMultiplicity(handler);
// 将处理器包装为一个节点 filterName() 获取到节点的名称
newCtx = newContext(group, filterName(name, handler), handler);
// 将新的节点添加到pipeline
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 获取新建节点绑定的eventLoop
EventExecutor executor = newCtx.executor();
// 若该eventLoop绑定的线程与当前线程不是同一个,则执行下面的代码
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 若该eventLoop绑定的线程与当前线程是同一个线程,
// 则调用重写的handlerAdded()方法
callHandlerAdded0(newCtx);
return this;
}
上述代码主要做了以下的一些事情:
- 校验当前处理器是否被多次添加,没有被 @Sharable 注解标注的处理器只可以被添加一次。
- 将处理器包装一个新的节点 newContext(group, filterName(name, handler), handler)
- 将新的节点添加到 Pipeline 中。addLast0(newCtx)
- 判断 channel 没有注册,处理异常情况。callHandlerCallbackLater(newCtx, true)
- 获取新节点的 EventLoop ,判断是否是当前线程,如果不是当前线程执行 callHandlerAddedInEventLoop(newCtx, executor)
- 新节点的 EventLoop 是当前线程执行 callHandlerAdded0(newCtx)
重点关注 newContext、addLast0、callHandlerCallbackLater/callHandlerAdded0方法
3.1.newContext
跟进去看看
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
// childExecutor() 获取与当前处理器节点相绑定的eventLoop
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
我们传入的 group 是 null ,前面一直没有传,这里调用 childExecutor(group) 直接跟进去:
private EventExecutor childExecutor(EventExecutorGroup group) {
// 若group为null,则与该节点绑定的eventLoop为null
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
我们没有指定 Group 的时候,这里代码是被直接返回了出去,下面执行逻辑就没有执行,而此时节点的 EventLoop 是在 EventExecutor executor = newCtx.executor(); 中绑定的:
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop(); // 获取到channel所绑定的eventLoop
} else {
return executor;
}
}
这里可以在添加的时候指定一个group,跟踪下面的代码
继续看 childExecutor 下面的逻辑:
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
// 轮询获取 eventLoop
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
- 获取 channel 的 option 配置的 SINGLE_EVENTEXECUTOR_PER_GROUP 值。用来配置当前处理器节点是否绑定使用同一个 EventLoop。不为空并且是 False : 则表示一个group中的每一个处理器都会分配一个 eventLoop,调用 EventLoopGroup 的 next 方法,而 next 是轮询的方式从 Group 中选取 EventLoop。
- 没有配置使用同一个 eventLoop 则先获取缓存中保存的 EventLoopGroup 对应的 eventLoop,如果缓存中存在则直接返回,如果缓存中不存在则从 EventLoopGroup 获取一个 eventLoop,保存到缓存中并返回。
总结:以上就是创建处理器节点中绑定 EventLoop 的方法。所以到这可以看到处理器节点的 EventLoop 可以指定,不指定则直接使用当前 channel 的 EventLoop 。拿到 EventLoop 之后则直接创建处理器节点 new DefaultChannelHandlerContext
3.2.addLast0
跟进去代码看下:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
这是个链表尾部添加节点的操作。
3.3.callHandlerCallbackLater/callHandlerAdded0
跟进去callHandlerCallbackLater方法
private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
newCtx.setAddPending(); // 将当前节点状态设置成处理中,等待 callHandlerAdded0 执行完成
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
}
这个方法也是调用的 callHandlerAdded0 方法,只不过因为当前节点绑定的 EventLoop 不是当前执行线程,所以需要通过 EventLoop 创建一个新的任务,由任务来完成 callHandlerAdded0 方法的执行,而是当前线程则直接执行 callHandlerAdded0 方法。继续看 callHandlerAdded0 方法:
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
remove0(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
这里直接调用当前处理器的callHandlerAdded方法,如果异常则将资源移除。
进到callHandlerAdded方法中:
final void callHandlerAdded() throws Exception {
if (setAddComplete()) { // CAS 方式将处理器状态修改为 添加完毕 ADD_COMPLETE
handler().handlerAdded(this);
}
}
先通过 CAS 修改处理器状态为添加完成,状态修改成功则调用处理器的添加方法完成处理器添加。
4.Pipeline 中消息的传递与处理
Pipeline 中的消息都是由每一个处理器来完成处理的,即ChannelInboundHandler和 ChannelOutboundHandler,先看下这两个类的继承体系
先看下ChannelHandler接口
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
当前接口只定义了两个方法:hanndlerAdded
和 handlerRemoved
,用于当前处理器被从 Pipeline 中添加或移除时调用的方法,还有一个过时的处理异常的方法:exceptionCaught
。
- ChannelInboundHandler:定义了所有的用于处理回调的方法,都是需要主动触发的方法。
- ChannelOutboundHandler:定义的所有的用于主动调用的处理器方法,需要主动调用。
- ChannelHandlerAdapter:则是处理器适配器顶级抽象类,用于定义处理器适配器的规范。
- ChannelInboundHandlerAdapter/ChannelOutboundHandlerAdapter:默认实现了所有的 handler 的方法,并且每个方法上面都标记了 @Skip 注解,被标记 @Skip 注解的 Handler 方法会被标记,不会被 Handler 执行,这让我们再使用适配器之定义处理器时只要重写我们关心方法即可,重写的方法不会标记 @Skip 。
- ChannelHandlerContext:上下文对象,每个 Handler 每个方法都需要传递上下文对象,在Pipeline中处理器的调用就是通过ChannelHandlerContext上下文对象完成执行链的调用,也可以用来保存上下文数据。
4.1.ChannelInboundHandler
上面说过pileline是带头节点和尾节点的双向链表, 消息通过ChannelHandlerContext在链表传递(从Head节点开始,tail节点结束)
当请求到Server并写入数据时,触发 Server 端 Head 节点的 channelRead 方法,此时调用链开始执行,Head 节点执行 channelRead 方法调用 fireChannelRead 触发 next 节点的 channelRead 方法执行,不过这里会先判断 next 节点的方法标记也就是 mask 是否标记当前 Handler 是否需要执行 channelRead 方法,如果重写则调用,否则继续找 next 的 next 节点。
先看 Head 节点的 channelRead 方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg); // 准备调用其下一个节点的channelRead()
}
进入fireChannelRead()方法:
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
这里通过 findContextInbound 方法获取下一个节点,传入的参数 MASK_CHANNEL_READ 用来判断当前处理器的 channelRead 方法是否被标记需要执行。
先进入 findContextInbound()方法:
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
再看下fireChannelRead的invokeChannelRead()方法,触发 Next 节点的 channelRead 的执行:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
进入invokeChannelRead()方法:
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
invokeHandler() 方法判断当前处理器状态是否完成,然后调用下一个节点的 channelRead 方法。
在这里看到调用了下一个ChannelInboundHandler的channelRead()方法。
4.2.ChannelOutboundHandler
OutboundHandler 的方法是需要我们自己调用,而不像 InboundHandler 处理器的方法是触发调用。
所以在这种情况下,如果我们只添加 OutboundHandler 处理器的话,当 Client 发起请求也不会触发 OutboundHandler 的方法执行。因此我们加一个 InboundHandler 方法,用来触发 OutboundHandler 的执行。
定义一个InboundHandler,WriteInboundHandler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("WriteInboundHandler 进入 " + msg);
ctx.channel().write("我是 Server。");
System.out.println("InboundHWriteInboundHandler andler3 退出 " + msg);
}
Outbound 的处理流程与 Inbound 的流程是相反的,在 Inbound 中处理是获取 next 节点执行,而在 Outbound 节点中获取的确是 prev 节点的重写方法执行。所以这里的执行顺序会变成跟添加顺序相反的顺序执行。
这里首先请求进入 Server 触发 Head 节点的 channelRead 方法执行,然后再获取下一个重写了 channelRead 方法的 Inbound 处理器,找到 WirteHandler 处理器,在 WirteHandler 处理器中掉用 Channel 的 writeAndFlush 方法,找到Tail 节点实现方法,调用了 Write 方法,然后获取 Prev 节点中重写了 write 方法的处理器,找到 Outbound3,然后依次调用 Outbound2 的 write 方法和 Outbound1 的 write 方法,执行完成依次返回。
从 ctx.channel().write 的方法进入,实现类为:DefaultChannelPipeline
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
Channel 中的 Write 方法直接找到 Tail 调用其重写的 Write 方法:
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
跟进去write()方法:
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
// 查找下一个节点(prev)
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
task.cancel();
}
}
}
跟进findContextOutbound()方法
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
Outbound 的获取顺序是获取的 prev ,并且判断该 handler 中当前方法是否标记需要执行。
获取到下一个节点之后在 wirte 方法中通过 next.invokeWrite 方法完成执行器链调用。当然如果我们一个 OutboundHandler 都没有定义的话,findContextOutbound 方法最终会获取到 Head 节点,然后执行 Head 节点 write 方法,因为我们在前面看到 HeadContext 节点同时实现了 Inboundhandler 和 OutboundHandler。
看一下 Head 节点的write()方法:
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
最终在 HeadContext 中完成底层 unsafe 的 write 操作。