Netty中PileLine类介绍

news2025/1/10 19:08:11

一、Netty基本介绍

        Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。Netty 在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

        Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

        本文主要介绍下netty的PileLine类。

二、继承体系

 三、核心内容

1.DefaultChannelPipeline

        在ServerBootstarp的介绍中有跟到pileline的创建,即:在创建ServerBootstarp在channel的时候封装了unsafe,newId,以及创建pileline

// io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // 为channel生成id,由五部分构成
    id = newId();
    // 生成一个底层操作对象unsafe
    unsafe = newUnsafe();
    // 创建与这个channel相绑定的channelPipeline
    pipeline = newChannelPipeline();
}

        跟进去newChannelPipeline()方法

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

        跟进去DefaultChannelPipeline的构造器

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    // 创建一个记录成功的 succeededFuture
    succeededFuture = new SucceededChannelFuture(channel, null);

    // 创建一个记录异常的 voidPromise,在 VoidChannelPromise 方法中创建了一个异常监听器,
    // 触发重写的 fireExceptionCaught(cause) 的方法执行。
    voidPromise =  new VoidChannelPromise(channel, true);

    // 创建尾节点
    tail = new TailContext(this);
    // 创建头节点
    head = new HeadContext(this);

    // 将头尾节点连接
    head.next = tail;
    tail.prev = head;
}

        先看下TailContext类

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

	TailContext(DefaultChannelPipeline pipeline) {
        // 调用父类构造器
		super(pipeline, null, TAIL_NAME, TailContext.class);
        // 修改节点状态
		setAddComplete();
	}
	// 省略...
}

        TailContext是一个内部类,并实现了ChannelInboundHandler 处理器,是一个 InboundHandler, InboundHandler 和 OutboundHandler 处理器下面单独说

        先跟进去setAddComplete()修改状态的方法

final boolean setAddComplete() {
    for (;;) {
        int oldState = handlerState; // 获取处理器状态
        if (oldState == REMOVE_COMPLETE) { // 处理器状态为移除状态
            return false;
        }
        // 通过CAS方式将处理器状态修改为 添加完毕
        if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return true;
        }
    }
}

        这里通过CAS操作将当前节点的处理器状态修改为添加完毕

        接着跟进TailContext的父类构造器

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                              String name, Class<? extends ChannelHandler> handlerClass) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    // 每个处理器节点都会绑定一个executor
    this.executor = executor;
    // 执行标记
    this.executionMask = mask(handlerClass);
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

        跟进去mask()方法

static int mask(Class<? extends ChannelHandler> clazz) {
    // 从缓存中尝试着获取标记
    Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
    Integer mask = cache.get(clazz);
    if (mask == null) {
        // 创建一个标记
        mask = mask0(clazz);
        cache.put(clazz, mask);
    }
    return mask;
}

        跟进去mask0()方法

private static int mask0(Class<? extends ChannelHandler> handlerType) {
	int mask = MASK_EXCEPTION_CAUGHT;
	try {
		// 处理节点类型为inbound处理器的情况
		if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
			mask |= MASK_ALL_INBOUND;

			// 若channelRegistered()方法上出现了@skip注解,测该方法的执行将要被跳过
			if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
				// 该运算最终会使mask中该方法对应的二进制位置0
				mask &= ~MASK_CHANNEL_REGISTERED;
			}
			if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
				mask &= ~MASK_CHANNEL_UNREGISTERED;
			}
			if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
				mask &= ~MASK_CHANNEL_ACTIVE;
			}
			if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
				mask &= ~MASK_CHANNEL_INACTIVE;
			}
			if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
				mask &= ~MASK_CHANNEL_READ;
			}
			if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
				mask &= ~MASK_CHANNEL_READ_COMPLETE;
			}
			if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
				mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
			}
			if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
				mask &= ~MASK_USER_EVENT_TRIGGERED;
			}
		}

		// 处理节点类型为outbound处理器的情况
		if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
			mask |= MASK_ALL_OUTBOUND;

			if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
					SocketAddress.class, ChannelPromise.class)) {
				mask &= ~MASK_BIND;
			}
			if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
					SocketAddress.class, ChannelPromise.class)) {
				mask &= ~MASK_CONNECT;
			}
			if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
				mask &= ~MASK_DISCONNECT;
			}
			if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
				mask &= ~MASK_CLOSE;
			}
			if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
				mask &= ~MASK_DEREGISTER;
			}
			if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
				mask &= ~MASK_READ;
			}
			if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
					Object.class, ChannelPromise.class)) {
				mask &= ~MASK_WRITE;
			}
			if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
				mask &= ~MASK_FLUSH;
			}
		}

		if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
			mask &= ~MASK_EXCEPTION_CAUGHT;
		}
	} catch (Exception e) {
		// Should never reach here.
		PlatformDependent.throwException(e);
	}

	return mask;
}

        这是充分使用了二进制的开关的性质,这里方法的作用就是将所有的 InboundHandler 处理器和 OutboundHandler 处理器中定义的方法进行标记,如果其中的方法被实现了,并且方法中没有 @Skip 注解,则当前方法对应的二进制位的值是 1,当当前标记位等于 1 时,则标记当前方法时需要执行的。

        TailContext 和 HeadContext 中所有的标记位都是 1,因为 TailContext 和 HeadContext 分别都实现了 InboundHandler 和 OutboundHandler 接口中的接口。

        我在在写代码的过程中,没有直接实现ChannelInboundHandler,而是继承了 ChannelInboundHandlerAdapter。

        在 ChannelInboundHandlerAdapter 中每一个实现方法上都有一个 @Skip 注解,而且它默认实现了所有的 InboundHandler 接口的方法,就是为了我们在定义自定义处理器减少一些默认实现的处理,而且为了性能在初始化时将所有的方法打上标记,保证只执行我们自己实现的方法,这就是这个标记的用处。

        这里 mask 处理的都是 InboundHandler 和 OutboundHandler 处理器中的接口方法。

       看完 TailContext,再看下HeadContext

// 头节点既是inbound处理器,也是outbound处理器
final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

	private final Unsafe unsafe;

	HeadContext(DefaultChannelPipeline pipeline) {
		super(pipeline, null, HEAD_NAME, HeadContext.class);
		unsafe = pipeline.channel().unsafe();
		setAddComplete();
	}
	// 省略...
}

        HeadContext 也是内部类,这里与 TailContext 不同的是,HeadContext 同时实现了 InboundHandler 和 OutboundHandler。并且创建了一个用于底层 IO 处理的 unsafe 对象。

2.ChannelInitializer处理器 

        先看下继承

        ChannelInitializer 继承于 ChannelInboundHandler 接口,是一个抽象类,定义了一个抽象方法:

protected abstract void initChannel(C ch) throws Exception;

         在创建ServerBootstrap的时候会用到这个类

         在 ServerBootstrap 中初始化方法 io.netty.bootstrap.ServerBootstrap#init 方法中可以找到代码:

p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

        负责在 accept 新连接的 Channel 的 pipeline 被添加了一个 ChannelInitializer,由于此时这个 Channel 还没有被注册到EventLoop,于是在 addLast 方法的调用链中,会给 pipeline 添加一个 PendingHandlerAddedTask ,其目的是在 Channel被注册到 EventLoop 的时候,触发一个回调事件然后在 AbstractBootstrap.initAndRegister() 方法中,这个Channel会被注册到 ParentEventLoopGoup,接着会被注册到 ParentEventLoopGoup 中的某一个具体的 EventLoop 然后在AbstractChannel.register0() 方法中,之前注册的 PendingHandlerAddedTask 会被调用,经过一系列调用之后,最终 ChannelInitializer.handleAdded() 方法会被触发。所以我们进去看 ChannelInitializer.handleAdded() 方法:

// 当前处理器所封装的处理器节点被添加到pipeline后就会触发该方法的执行
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {  // 若channel已经完成了注册
        if (initChannel(ctx)) {
            // 将当前处理器节点从initMap集合中删除
            removeState(ctx);
        }
    }
}

        可以看到 ChannelInitializer 的方法触发时必须在 Channel 注册完成之后,然后开始执行 initChannel 方法,在初始化操作完成之后又将当前处理器节点从 initMap 集合中移除。现在先看看 initChannel 方法:

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    // 将当前处理器节点添加到initMap集合中
    if (initMap.add(ctx)) { // Guard against re-entrance.
        try {
            // 调用重写的initChannel()
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            // 立即将该处理器节点从pipeline上删除
            ChannelPipeline pipeline = ctx.pipeline();
            // 查找在pipeline中是否存在当前处理器
            if (pipeline.context(this) != null) {
                // 将当前处理器节点从pipeline中删除
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}

        在 Server 端 ServerBootstrap 中我们使用 ChannelInitializer 给 pipeline 中添加处理器节点。

        再回到 initChannel 方法。继续看再其执行完重新的 initChannel 方法之后必然执行 finally 的代码,首先获取当前Pipeline,pipeline.context(this) 从 Pieline 查找但其处理器节点是否存在。存在然后 pipeline.remove(this) 将其从当前 Pipeline 中移除。

        ChannelInitializer总结:只是一个向pileline添加处理器的工具,在处理器添加完成之后将自生移除(因为本身也是个处理器)。

3.将Handler添加到Pileline中

        在添加自定义处理器的时候,我们都是调用pileline的addLast()方法,所以看下DefaultChannelPipeline中的addLast方法

public final ChannelPipeline addLast(ChannelHandler... handlers) {
    // 第一个参数为group,其值默认为null
    return addLast(null, handlers);
}

        每个 Pipeline 都有一个 EventLoop 绑定,这里添加方法默认传入一个 EventLoopGroup 参数,不过这里传了空;继续往下跟:

@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {
        throw new NullPointerException("handlers");
    }
    // 遍历所有handlers,逐个添加到pipeline
    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);// 这里第二个参数是处理器name
    }
    return this;
}

        跟进去addLast()方法

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 检测处理器是否被多次添加
        checkMultiplicity(handler);

        // 将处理器包装为一个节点 filterName() 获取到节点的名称
        newCtx = newContext(group, filterName(name, handler), handler);

        // 将新的节点添加到pipeline
        addLast0(newCtx);
        
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        // 获取新建节点绑定的eventLoop
        EventExecutor executor = newCtx.executor();
        // 若该eventLoop绑定的线程与当前线程不是同一个,则执行下面的代码
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    // 若该eventLoop绑定的线程与当前线程是同一个线程,
    // 则调用重写的handlerAdded()方法
    callHandlerAdded0(newCtx);
    return this;
}

        上述代码主要做了以下的一些事情:

  1. 校验当前处理器是否被多次添加,没有被 @Sharable 注解标注的处理器只可以被添加一次。
  2. 将处理器包装一个新的节点 newContext(group, filterName(name, handler), handler)
  3. 将新的节点添加到 Pipeline 中。addLast0(newCtx)
  4. 判断 channel 没有注册,处理异常情况。callHandlerCallbackLater(newCtx, true)
  5. 获取新节点的 EventLoop ,判断是否是当前线程,如果不是当前线程执行 callHandlerAddedInEventLoop(newCtx, executor)
  6. 新节点的 EventLoop 是当前线程执行 callHandlerAdded0(newCtx)

        重点关注 newContext、addLast0、callHandlerCallbackLater/callHandlerAdded0方法

3.1.newContext

        跟进去看看

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    // childExecutor() 获取与当前处理器节点相绑定的eventLoop
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

        我们传入的 group 是 null ,前面一直没有传,这里调用 childExecutor(group) 直接跟进去:

private EventExecutor childExecutor(EventExecutorGroup group) {
    // 若group为null,则与该节点绑定的eventLoop为null
    if (group == null) {
        return null;
    }
    
    Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
    if (pinEventExecutor != null && !pinEventExecutor) {
        return group.next();
    }

    Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
    if (childExecutors == null) {
        childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
    }
    EventExecutor childExecutor = childExecutors.get(group);
    if (childExecutor == null) {
        childExecutor = group.next();
        childExecutors.put(group, childExecutor);
    }
    return childExecutor;
}

         我们没有指定 Group 的时候,这里代码是被直接返回了出去,下面执行逻辑就没有执行,而此时节点的 EventLoop 是在 EventExecutor executor = newCtx.executor(); 中绑定的:

public EventExecutor executor() {
    if (executor == null) {
        return channel().eventLoop();  // 获取到channel所绑定的eventLoop
    } else {
        return executor;
    }
}

         这里可以在添加的时候指定一个group,跟踪下面的代码

         继续看 childExecutor 下面的逻辑:

private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }

    Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
    if (pinEventExecutor != null && !pinEventExecutor) {
        // 轮询获取 eventLoop
        return group.next();
    }

    Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
    if (childExecutors == null) {
        childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
    }

    EventExecutor childExecutor = childExecutors.get(group);
    if (childExecutor == null) {
        childExecutor = group.next();
        childExecutors.put(group, childExecutor);
    }
    return childExecutor;
}
  1. 获取 channel 的 option 配置的 SINGLE_EVENTEXECUTOR_PER_GROUP 值。用来配置当前处理器节点是否绑定使用同一个 EventLoop。不为空并且是 False : 则表示一个group中的每一个处理器都会分配一个 eventLoop,调用 EventLoopGroup 的 next 方法,而 next 是轮询的方式从 Group 中选取 EventLoop。
  2. 没有配置使用同一个 eventLoop 则先获取缓存中保存的 EventLoopGroup 对应的 eventLoop,如果缓存中存在则直接返回,如果缓存中不存在则从 EventLoopGroup 获取一个 eventLoop,保存到缓存中并返回。

        总结:以上就是创建处理器节点中绑定 EventLoop 的方法。所以到这可以看到处理器节点的 EventLoop 可以指定,不指定则直接使用当前 channel 的 EventLoop 。拿到 EventLoop 之后则直接创建处理器节点 new DefaultChannelHandlerContext

3.2.addLast0

        跟进去代码看下:

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

        这是个链表尾部添加节点的操作。

3.3.callHandlerCallbackLater/callHandlerAdded0

        跟进去callHandlerCallbackLater方法

private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
    newCtx.setAddPending(); // 将当前节点状态设置成处理中,等待 callHandlerAdded0 执行完成
    executor.execute(new Runnable() {
        @Override
        public void run() {
            callHandlerAdded0(newCtx);
        }
    });
}

        这个方法也是调用的 callHandlerAdded0 方法,只不过因为当前节点绑定的 EventLoop 不是当前执行线程,所以需要通过 EventLoop 创建一个新的任务,由任务来完成 callHandlerAdded0 方法的执行,而是当前线程则直接执行 callHandlerAdded0 方法。继续看 callHandlerAdded0 方法:

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.callHandlerAdded();
    } catch (Throwable t) {
        boolean removed = false;
        try {
            remove0(ctx);
            ctx.callHandlerRemoved();
            removed = true;
        } catch (Throwable t2) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to remove a handler: " + ctx.name(), t2);
            }
        }

        if (removed) {
            fireExceptionCaught(new ChannelPipelineException(
                ctx.handler().getClass().getName() +
                ".handlerAdded() has thrown an exception; removed.", t));
        } else {
            fireExceptionCaught(new ChannelPipelineException(
                ctx.handler().getClass().getName() +
                ".handlerAdded() has thrown an exception; also failed to remove.", t));
        }
    }
}

        这里直接调用当前处理器的callHandlerAdded方法,如果异常则将资源移除。

        进到callHandlerAdded方法中:

final void callHandlerAdded() throws Exception {
    if (setAddComplete()) { // CAS 方式将处理器状态修改为 添加完毕 ADD_COMPLETE
        handler().handlerAdded(this);
    }
}

        先通过 CAS 修改处理器状态为添加完成,状态修改成功则调用处理器的添加方法完成处理器添加。

4.Pipeline 中消息的传递与处理

        Pipeline 中的消息都是由每一个处理器来完成处理的,即ChannelInboundHandler和 ChannelOutboundHandler,先看下这两个类的继承体系

先看下ChannelHandler接口

public interface ChannelHandler {

    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}

        当前接口只定义了两个方法:hanndlerAdded 和 handlerRemoved,用于当前处理器被从 Pipeline 中添加或移除时调用的方法,还有一个过时的处理异常的方法:exceptionCaught

  1. ChannelInboundHandler:定义了所有的用于处理回调的方法,都是需要主动触发的方法。
  2. ChannelOutboundHandler:定义的所有的用于主动调用的处理器方法,需要主动调用。
  3. ChannelHandlerAdapter:则是处理器适配器顶级抽象类,用于定义处理器适配器的规范。
  4. ChannelInboundHandlerAdapter/ChannelOutboundHandlerAdapter:默认实现了所有的 handler 的方法,并且每个方法上面都标记了 @Skip 注解,被标记 @Skip 注解的 Handler 方法会被标记,不会被 Handler 执行,这让我们再使用适配器之定义处理器时只要重写我们关心方法即可,重写的方法不会标记 @Skip 。
  5. ChannelHandlerContext:上下文对象,每个 Handler 每个方法都需要传递上下文对象,在Pipeline中处理器的调用就是通过ChannelHandlerContext上下文对象完成执行链的调用,也可以用来保存上下文数据。

4.1.ChannelInboundHandler        

        上面说过pileline是带头节点和尾节点的双向链表, 消息通过ChannelHandlerContext在链表传递(从Head节点开始,tail节点结束)

        当请求到Server并写入数据时,触发 Server 端 Head 节点的 channelRead 方法,此时调用链开始执行,Head 节点执行 channelRead 方法调用 fireChannelRead 触发 next 节点的 channelRead 方法执行,不过这里会先判断 next 节点的方法标记也就是 mask 是否标记当前 Handler 是否需要执行 channelRead 方法,如果重写则调用,否则继续找 next 的 next 节点。

        先看 Head 节点的 channelRead 方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ctx.fireChannelRead(msg);  // 准备调用其下一个节点的channelRead()
}

        进入fireChannelRead()方法:

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}

        这里通过 findContextInbound 方法获取下一个节点,传入的参数 MASK_CHANNEL_READ 用来判断当前处理器的 channelRead 方法是否被标记需要执行。

        先进入 findContextInbound()方法:

private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
}

        再看下fireChannelReadinvokeChannelRead()方法,触发 Next 节点的 channelRead 的执行:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

        进入invokeChannelRead()方法:

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

        invokeHandler() 方法判断当前处理器状态是否完成,然后调用下一个节点的 channelRead 方法。 

        在这里看到调用了下一个ChannelInboundHandler的channelRead()方法。

4.2.ChannelOutboundHandler

         OutboundHandler 的方法是需要我们自己调用,而不像 InboundHandler 处理器的方法是触发调用。

        所以在这种情况下,如果我们只添加 OutboundHandler 处理器的话,当 Client 发起请求也不会触发 OutboundHandler 的方法执行。因此我们加一个 InboundHandler 方法,用来触发 OutboundHandler 的执行。 

        定义一个InboundHandler,WriteInboundHandler

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
    throws Exception {
    System.out.println("WriteInboundHandler  进入 " + msg);
    ctx.channel().write("我是 Server。");
    System.out.println("InboundHWriteInboundHandler andler3 退出 " + msg);
}

        Outbound 的处理流程与 Inbound 的流程是相反的,在 Inbound 中处理是获取 next 节点执行,而在 Outbound 节点中获取的确是 prev 节点的重写方法执行。所以这里的执行顺序会变成跟添加顺序相反的顺序执行。

        这里首先请求进入 Server 触发 Head 节点的 channelRead 方法执行,然后再获取下一个重写了 channelRead 方法的 Inbound 处理器,找到 WirteHandler 处理器,在 WirteHandler 处理器中掉用 Channel 的 writeAndFlush 方法,找到Tail 节点实现方法,调用了 Write 方法,然后获取 Prev 节点中重写了 write 方法的处理器,找到 Outbound3,然后依次调用 Outbound2 的 write 方法和 Outbound1 的 write 方法,执行完成依次返回。

        从 ctx.channel().write 的方法进入,实现类为:DefaultChannelPipeline

@Override
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

         Channel 中的 Write 方法直接找到 Tail 调用其重写的 Write 方法:

@Override
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}

         跟进去write()方法:

private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        // 查找下一个节点(prev)
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            if (!safeExecute(executor, task, promise, m)) {
                task.cancel();
            }
        }
    }

        跟进findContextOutbound()方法

private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
}

       Outbound 的获取顺序是获取的 prev ,并且判断该 handler 中当前方法是否标记需要执行。

        获取到下一个节点之后在 wirte 方法中通过 next.invokeWrite 方法完成执行器链调用。当然如果我们一个 OutboundHandler 都没有定义的话,findContextOutbound 方法最终会获取到 Head 节点,然后执行 Head 节点 write 方法,因为我们在前面看到 HeadContext 节点同时实现了 Inboundhandler 和 OutboundHandler。

        看一下 Head 节点的write()方法:

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}

        最终在 HeadContext 中完成底层 unsafe 的 write 操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/650112.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VTK Filter 总结

源对象 成像滤波器 可视化滤波器 可视化滤波器&#xff08;输入类型vtkDataSet&#xff09;。 可视化滤波器&#xff08;输入类型vtkPointSet) 可视化滤波器&#xff08;输入类型vtkPolyData) 可视化滤波器&#xff08;(输入类型vtkStructuredGrid)。 可视化滤波器&#xff08;…

浅析视频监控技术及AI发展趋势下的智能化视频技术应用

视频监控技术是指通过摄像机对指定区域进行实时视频直播、录制、传输、存储、管理和分析的技术系统。它可以用于监控各种场所&#xff0c;如校园、工厂、工地、工作场所、公共区域、交通工具等。视频监控技术主要涉及到以下几个部分&#xff1a; 1、摄像机 摄像机是视频监控技…

三年软件测试外包的我也没能转正

外包的群体庞大&#xff0c;很多企业为了节约高昂的人力成本&#xff0c;会把一些非核心业务承包给外包公司&#xff0c;这些工作往往是阶段性、辅助性&#xff0c;没有什么技术含量&#xff0c;而且由于外包人员不是与大厂签订劳动合同&#xff0c;因此&#xff0c;他们更像是…

图像点运算之灰度变换之非线性变换

目录 note code test note 图像点运算之灰度变换之非线性变换 例如&#xff1a;y 10 * x ^ 0.5 code void noline_convert_fun(uchar& in, uchar& out) {out 10 * (uchar)pow((float)in, 0.5); } void img_nonline_convert(Mat& src, Mat& res) {if (s…

html好看的登录界面2(十四种风格登录源码)

文章目录 1.登录风格效果说明1.1 凹显风登录界面1.2 大气简洁风登录界面1.3 弹出背景风登录界面1.4 动态左右切换风登陆界面1.5 简洁背景切换登录界面1.6 可关闭登录界面1.7 蒙蒙山雨风登录界面1.8 苹果弹框风登录界面1.9 上中下青春风登录界面1.10 夏日风登录界面1.11 星光熠熠…

【从零开始玩量化20】BigQuant平台策略代码本地化(与Github同步)

引言 最近发现了个不错的量化平台&#xff0c;BigQuant BigQuant的客服找到我&#xff0c;推荐他们平台给我使用&#xff0c;宣传的是人工智能&#xff0c;里面可以使用类似ChatGPT的聊天机器人&#xff0c;和可视化拖拉拽功能实现策略。 不过&#xff0c;这些都是锦上添花的…

单体 V/s 分布式架构

这是软件架构模式博客系列第 2 章,我们将讨论单体 V/s 分布式架构。 在软件领域,存在多种架构风格可供选择,我们需要关注不同架构风格带来的风险。选择符合业务需求的架构风格是一个长期迭代的过程。 架构风格可以分为两大主要类型:单体架构(将所有代码部署在一个单元中…

Rancher:外部服务连接K8S-MongoDB服务

Rancher&#xff1a;外部服务请求K8S-MongoDB服务 一、前置条件二、「Layer 4 」与「Layer 7」Load Balancing的区别三、部署容器化MongoDB四、Load Banlancer of Service五、mongoDB验证连接六、总结 #参考链接 [1] How access MongoDB in Kubernetes from outside the clust…

树莓派4B多串口配置

0. 实验准备以及原理 0.1 实验准备 安装树莓派官方系统的树莓派 4B&#xff0c;有 python 环境&#xff0c;安装了 serial 库 杜邦线若干 屏幕或者可以使用 VNC 进入到树莓派的图形界面 0.2 原理 树莓派 4B 有 UART0&#xff08;PL011&#xff09;、UART1&#xff08;mini …

腾讯安全周斌:用模型对抗,构建新一代业务风控免疫力

6月13日&#xff0c;腾讯安全联合IDC发布“数字安全免疫力”模型框架&#xff0c;主张将守护企业数据和数字业务两大资产作为企业安全建设的核心目标。腾讯安全副总裁周斌出席研讨论坛并发表主题演讲&#xff0c;他表示&#xff0c;在新技术的趋势影响下&#xff0c;黑灰产的攻…

TS系列之any与unknown详解,示例

文章目录 前言一、一个示例二、示例目的1、功能描述2、主要区别3、代码实现 总结 前言 本片文章主要是在写ts时遇到不知道类型&#xff0c;很容易就想到用any可以解决一切&#xff0c;但这样写并不好。所以今天就总结学习一下&#xff0c;比较好的处理任意类型的unknown。 一、…

patroni+etcd+antdb高可用

patronietcdantdb高可用架构图 Patroni组件功能 自动创建并管理主备流复制集群&#xff0c;并且通过api接口往dcs(Distributed Configuration Store&#xff0c;通常指etcd、zookeeper、consul等基于Raft协议的键值存储)读取以及更新键值来维护集群的状态。键值包括集群状态、…

MySQL ibdata1 文件“减肥”记

夏天来了&#xff0c;没想到连 ibdata1 文件也要开始“减肥”了~ 作者&#xff1a;杨彩琳 爱可生华东交付部 DBA&#xff0c;主要负责 MySQL 日常问题处理及 DMP 产品支持。爱好跳舞&#xff0c;追剧。 本文来源&#xff1a;原创投稿 有句话是这么说的&#xff1a;“在 InnoDB…

深入分析 Java IO (一)概述

目录 一、前言 二、基于字节操作的接口 2.1、字节输入流 2.2、字节输出流 三、基于字符操作的接口 3.1、字符输入流 3.2、字符输出流 四、字节与字符的转化 4.1、输入流转化过程 4.2、输出流转化过程 五、基于磁盘操作的接口 六、基于网络操作的接口 6.1、Socket简…

接口自动化测试框架?你真的会封装吗?自动化框架几大功能专项...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 当准备使用一个接…

项目经理如何做好时间管理?

1、建立时间管理原则 &#xff08;1&#xff09;我们需要通过时间日志的方式对时间进行记录和分析&#xff0c;并对日常要处理的事务进行优先级排序&#xff0c;优先处理最重要的事物&#xff1b; &#xff08;2&#xff09;确定待处理事物的机会成本&#xff0c;提高时间使用…

建模助手618 | 谁不囤点Revit插件我都会生气!

大家好&#xff0c;这里是建模助手。 早在5月份&#xff0c;我们已经就“618”这个事情高调了一番&#xff0c;以提前放“价”的姿势&#xff0c;让许多用户以躺赢的状态拉开了年中大促的序幕。&#xff08;5月购买的盆友&#xff0c;切记看完全文&#xff0c;内附彩蛋 活动反…

Canal实现0侵入同步缓存数据

开启MySQL binlog功能 cd /home/mysql8/conf vim my.cnf [mysqld] log-bin/var/lib/mysql/mysql-bin # 开启 binlog binlog-formatROW # 选择 ROW 模式 server_id1 # 配置 MySQL replaction 需要定义&#xff0c;不要和 canal 的 slaveId 重复 binlog-do-dbimooc-hire-dev # …

冰冰学习笔记:简单了解protobuf

欢迎各位大佬光临本文章&#xff01;&#xff01;&#xff01; 还请各位大佬提出宝贵的意见&#xff0c;如发现文章错误请联系冰冰&#xff0c;冰冰一定会虚心接受&#xff0c;及时改正。 本系列文章为冰冰学习编程的学习笔记&#xff0c;如果对您也有帮助&#xff0c;还请各位…

Fiddler(Statistics、Inspectors)详解

一、Fiddler Statistics详解 Fiddler的 Statistics 分页会统计请求和响应的一些信息。可以使用它完成简单的性能测试&#xff0c;查看其接口的响应时间。 如果你想学习Fiddler抓包工具&#xff0c;我这边给你推荐一套视频&#xff0c;这个视频可以说是B站播放全网第一的Fiddle…