16.Netty源码之ChannelPipeline

news2025/1/14 4:17:30

highlight: arduino-light

服务编排层:ChannelPipeline协调ChannelHandlerHandler

EventLoop可以说是 Netty 的调度中心,负责监听多种事件类型:I/O 事件、信号事件、定时事件等,然而实际的业务处理逻辑则是由 ChannelPipeline 中所定义的 ChannelHandler 完成的,ChannelPipeline 和 ChannelHandler应用开发的过程中打交道最多的组件。

Netty 服务编排层的核心组件 ChannelPipeline 和 ChannelHandler 为用户提供了 I/O 事件的全部控制权。

在学习之前,先思考几个问题。

  • ChannelPipeline 与 ChannelHandler 的关系是什么?它们之间是如何协同工作的?
  • ChannelHandler 的类型有哪些?有什么区别?
  • Netty 中 I/O 事件是如何传播的?

ChannelPipeline 概述

Pipeline 的字面意思是管道、流水线。它在 Netty 中起到的作用,和一个工厂的流水线类似。

原始的网络字节流经过 Pipeline ,被一步步加工包装,最后得到加工后的成品。

经过前面课程核心组件的初步学习,我们已经对 ChannelPipeline 有了初步的印象:它是 Netty 的核心处理链,用以实现网络事件的动态编排和有序传播。

今天我们将从以下几个方面一起探讨 ChannelPipeline 的实现原理:

  • ChannelPipeline 内部结构;
  • ChannelHandler 接口设计;
  • ChannelPipeline 事件传播机制;
  • ChannelPipeline 异常传播机制。

ChannelPipeline 内部结构

ChannelPipeline 可以看作是 ChannelHandler 的容器载体,它是由一组 ChannelHandler 实例组成的,内部通过双向链表将不同的 ChannelHandler 链接在一起,如下图所示。

当有 I/O 读写事件触发时,ChannelPipeline 会依次调用 ChannelHandler 列表对 Channel 的数据进行拦截和处理。

image.png

每个 ChannelHandler 都对应一个 ChannelHandlerContext。

所以实际上 ChannelPipeline 维护的是它与 ChannelHandlerContext 的关系。

那么你可能会有疑问,为什么这里会多一层 ChannelHandlerContext 的封装呢?

其实这是一种比较常用的编程思想。ChannelHandlerContext用于保存ChannelHandler。

ChannelHandlerContext包含了ChannelHandler生命周期的所有事件,如 connect、bind、read、 flush、write、close 等。

可以试想一下,如果没有ChannelHandlerContext 的这层封装,那么我们在做 ChannelHandler 之间传递的时 候。前置后置的通用逻辑就要在每个 ChannelHandler 里都实现一份。

这样虽然能解决问题,但是代码结构的耦合,会非常不优雅。

根据网络数据的流向,ChannelPipeline 分为入站 ChannelInboundHandler和出站 ChannelOutboundHandler。

服务端接收到客户端数据需要先经过 Decoder 入站处理后,再通过 Encoder 出站通知客户端。

image.png

ChannelPipeline是双向链表的构造

ChannelPipeline 的双向链表分别维护了HeadContext 和 TailContext 的头尾节点。

我们自定义的ChannelHandler会插入到 Head 和 Tail 之间,这两个节点在 Netty 中已经默认实现了,它们在

ChannelPipeline 中起到了至关重要的作用。

首先我们看下 HeadContext 和 TailContext 的继承关系,如下图所示。

image.png

HeadContext:in\&out

对于1个请求先由HeadContext处理入栈,经过一系列的入栈处理器然后传递到TailContext,由TailContext往下传递经过一系列的出栈处理器,最后再经过HeadContext返回。 ```md

<---6 <---5 <---4 HeadContext InBoundHandlerOne TailContext OutBoundHandlerOne 1---> 2---> 3--->

顺序是:12346 其中5是入栈已经在2处理过 所以不需要处理。 ```

Inbound第一站

Outbound最后一站

HeadContext 既是 Inbound 处理器,也是 Outbound 处理器。 它分别实现了 ChannelInboundHandler 和 ChannelOutboundHandler。

网络数据写入操作的入口就是由 HeadContext 节点完成的。HeadContext 作为 Pipeline 的头结点负责读取数据并开始传递 InBound 事件,当数据处理完成后,数据会反方向经过 Outbound 处理器,最终传递到 HeadContext,所以 HeadContext 既是数据读取Inbound事件的第一站又是处理 Outbound 事件的最后一站。

此外 HeadContext 在传递事件之前,还会执行一些前置操作。

TailContext

Outbound第一站

TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 Inbound 事件传播,例如释放 Message 数据资源等。

TailContext 节点作为 OutBound 事件传播的第一站,仅仅是将 OutBound 事件传递给下一个节点。

从整个 ChannelPipeline 调用链路来看,如果由 Channel 直接触发事件传播,那么调用链路将贯穿整个 ChannelPipeline。

然而也可以在其中某一个 ChannelHandlerContext 触发同样的方法,这样只会从当前的 ChannelHandler 开始执行事件传播,该过程不会从头贯穿到尾,在一定场景下,可以提高程序性能。

ChannelHandler 接口设计

在学习 ChannelPipeline 事件传播机制之前,我们需要了解 I/O 事件的生命周期。

整个 ChannelHandler 是围绕 I/O 事件的生命周期所设计的,例如建立连接、读数据、写数据、连接销毁等。

ChannelHandler 有两个重要的子接口ChannelInboundHandlerChannelOutboundHandler,分别拦截

站和出站的各种 I/O 事件

1. ChannelInboundHandler 的事件回调方法与触发时机。

| 事件回调方法 | 触发时机 | | ------------------------- | --------------------------------- | | channelRegistered | Channel 被注册到 EventLoop | | channelUnregistered | Channel 从 EventLoop 中取消注册 | | channelActive | Channel 处于就绪状态,可以被读写 | | channelInactive | Channel 处于非就绪状态Channel 可以从远端读取到数据 | | channelRead | Channel 可以从远端读取到数据 | | channelReadComplete | Channel 读取数据完成 | | userEventTriggered | 用户事件触发时 | | channelWritabilityChanged | Channel 的写状态发生变化 | | handlerAdded | 当该处理器被添加到pipeline时 |

2. ChannelOutboundHandler 的事件回调方法与触发时机。

ChannelOutboundHandler 的事件回调方法非常清晰,直接通过 ChannelOutboundHandler 的接口列表可以看到每种操作所对应的回调方法,如下图所示。

这里每个回调方法都是在相应操作执行之前触发,在此就不多做赘述了。

此外 ChannelOutboundHandler 中绝大部分接口都包含ChannelPromise 参数,以便于在操作完成时能够及时获得通知。

image.png

事件

事件枚举

public static final int OP_READ = 1 << 0;//1
public static final int OP_WRITE = 1 << 2;//4
public static final int OP_CONNECT = 1 << 3;//8
public static final int OP_ACCEPT = 1 << 4;//16

事件传播机制

在上文中我们介绍了 ChannelPipeline 可分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器,与此对应传输的事件类型可以分为Inbound 事件Outbound 事件

我们通过一个代码示例,一起体验下 ChannelPipeline 的事件传播机制。 ```java package io.netty.example.pipeline;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {

    public static void main(String[] args) throws Exception {

        EventLoopGroup workerGroup = new NioEventLoopGroup();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    //通过反射创建反射工厂类根据无参构造函数 反射生成实例
                    //将NioServerSocketChannel绑定到了bossGroup
                    //NioServerSocketChannel接收到请求会创建SocketChannel放入workerGroup
                    .channel(NioServerSocketChannel.class)

                    //指的是SocketChannel
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //指的是SocketChannel
                    .childOption(NioChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    //默認不使用堆外内存
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //false 不使用堆外内存
                    .childOption(ChannelOption.ALLOCATOR, 
                                    new UnpooledByteBufAllocator(false))
                    //   .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            //正常情况
                            p.addLast("SampleInBoundHandlerA", new SampleInBoundHandler("SampleInBoundHandlerA", false));
                            p.addLast("SampleInBoundHandlerB", new SampleInBoundHandler("SampleInBoundHandlerB", false));
                            p.addLast("SampleInBoundHandlerC", new SampleInBoundHandler("SampleInBoundHandlerC", true));




                            p.addLast("SampleOutBoundHandlerA", new SampleOutBoundHandler("SampleOutBoundHandlerA"));
                            p.addLast("SampleOutBoundHandlerB", new SampleOutBoundHandler("SampleOutBoundHandlerB"));
                            p.addLast("SampleOutBoundHandlerC", new SampleOutBoundHandler("SampleOutBoundHandlerC"));
                        }
                    });

            ChannelFuture f = b.bind(8090).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
package io.netty.example.pipeline;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;


public final class EchoClient {

    public static void main(String[] args) throws Exception {
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(new EchoClientHandler());
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect("127.0.0.1", 8090).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}
package io.netty.example.pipeline;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.TimeUnit;

/**
 * Handler implementation for the echo client.  It initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf firstMessage;

    /**
     * Creates a client-side handler.
     */
    public EchoClientHandler() {
        firstMessage = Unpooled.wrappedBuffer("I am echo message".getBytes());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("客户端发送消息" + firstMessage.toString());
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
       // ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws InterruptedException {
        TimeUnit.SECONDS.sleep(3);
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
       // cause.printStackTrace();
        ctx.close();
    }
}
package io.netty.example.pipeline;

import io.netty.channel.*;

public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {
  private final String name;
  private final boolean flush;

  public SampleInBoundHandler(String name, boolean flush) {
    this.name = name;
    this.flush = flush;
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      //打印消息
      System.out.println("InBoundHandler: " + name);
    //只有是true的时候才会写消息
    //否则只会读消息
    if (flush) {
      ctx.channel().writeAndFlush(msg);
    } else {
      super.channelRead(ctx, msg);
    }
  }
}
package io.netty.example.pipeline;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class SampleOutBoundHandler extends ChannelOutboundHandlerAdapter {
  private final String name;
  public SampleOutBoundHandler(String name) {
    this.name = name;
  }

  @Override
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    System.out.println("OutBoundHandler: " + name);
    super.write(ctx, msg, promise);
  }
}

```

通过 Pipeline 的 addLast 方法分别添加了三个 InboundHandler 和 OutboundHandler,添加顺序都是 A -> B -> C,下图可以表示初始化后 ChannelPipeline 的内部结构。

image.png

*当客户端向服务端发送请求时,会触发 SampleInBoundHandler 调用链的 channelRead 事件。经过 SampleInBoundHandler 调用链处理完成后,在 SampleInBoundHandlerC 中会调用 writeAndFlush 方法向客户端写回数据,此时会触发 SampleOutBoundHandler 调用链的 write 事件。** 最后我们看下代码示例的控制台输出:

image (3).png

方向:IN先进先出OUT:先进后出

由此可见,Inbound 事件和 Outbound 事件的传播方向是不一样的。Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head,两者恰恰相反。 在 Netty 应用编程中一定要理清楚事件传播的顺序。推荐你在系统设计时模拟客户端和服务端的场景画出 ChannelPipeline 的内部结构图,以避免搞混调用关系。

异常传播机制

ChannelPipeline 事件传播的实现采用了经典的责任链模式,调用链路环环相扣。那么如果有一个节点处理逻辑异常会出现什么现象呢?我们通过修改 SampleInBoundHandler 的实现来模拟业务逻辑异常: ```java package io.netty.example.pipeline;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {

    public static void main(String[] args) throws Exception {

        EventLoopGroup workerGroup = new NioEventLoopGroup();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    //通过反射创建反射工厂类根据无参构造函数 反射生成实例
                    //将NioServerSocketChannel绑定到了bossGroup
                    //NioServerSocketChannel接收到请求会创建SocketChannel放入workerGroup
                    .channel(NioServerSocketChannel.class)

                    //指的是SocketChannel
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //指的是SocketChannel
                    .childOption(NioChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    //默認不使用堆外内存
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //false 不使用堆外内存
                    .childOption(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false))
                    //   .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            //正常情况
                          /*
                            p.addLast("SampleInBoundHandlerA", new SampleInBoundHandler("SampleInBoundHandlerA", false));
                            p.addLast("SampleInBoundHandlerB", new SampleInBoundHandler("SampleInBoundHandlerB", false));
                            p.addLast("SampleInBoundHandlerC", new SampleInBoundHandler("SampleInBoundHandlerC", true));
                            */

                            p.addLast("SampleInBoundHandlerA", new SampleExceptionInBoundHandler("SampleInBoundHandlerA", false));
                            p.addLast("SampleInBoundHandlerB", new SampleExceptionInBoundHandler("SampleInBoundHandlerB", false));
                            p.addLast("SampleInBoundHandlerC", new SampleExceptionInBoundHandler("SampleInBoundHandlerC", true));

                            p.addLast("SampleOutBoundHandlerA", new SampleOutBoundHandler("SampleOutBoundHandlerA"));
                            p.addLast("SampleOutBoundHandlerB", new SampleOutBoundHandler("SampleOutBoundHandlerB"));
                            p.addLast("SampleOutBoundHandlerC", new SampleOutBoundHandler("SampleOutBoundHandlerC"));
                        }
                    });

            ChannelFuture f = b.bind(8090).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
package io.netty.example.pipeline;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SampleExceptionInBoundHandler extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;
    public SampleExceptionInBoundHandler(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("InBoundHandler: " + name);
        if (flush) {
            ctx.channel().writeAndFlush(msg);
        } else {
            throw new RuntimeException("InBoundHandler: " + name);
        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println("InBoundHandlerException: " + name);
        ctx.fireExceptionCaught(cause);
    }
}

``` 在SampleExceptionInBoundHandler的 channelRead 事件处理中,第一个 A 节点就会抛出 RuntimeException。同时我们重写了 ChannelInboundHandlerAdapter 中的 exceptionCaught 方法,只是在开头加上了控制台输出,方便观察异常传播的行为。下面看一下代码运行的控制台输出结果:

image (4).png

由输出结果可以看出 ctx.fireExceptionCaugh 会将异常按顺序从 Head 节点传播到 Tail 节点

如果用户没有对异常进行拦截处理,最后将由 Tail 节点统一处理,在 TailContext 源码中可以找到具体实现:

protected void onUnhandledInboundException(Throwable cause) {

    try {
    logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception.",cause);
    } finally {
        ReferenceCountUtil.release(cause);
    }
}

虽然 Netty 中 TailContext 提供了兜底的异常处理逻辑,但是在很多场景下,并不能满足我们的需求。假如你需要拦截指定的异常类型,并做出相应的异常处理,应该如何实现呢?我们接着往下看。

异常处理的最佳实践

在 Netty 应用开发的过程中,良好的异常处理机制会让排查问题的过程事半功倍。所以推荐用户对异常进行统一拦截,然后根据实际业务场景实现更加完善的异常处理机制。

通过异常传播机制的学习,我们应该可以想到最好的方法是在 ChannelPipeline 自定义处理器的末端添加统一的异常处理器,此时 ChannelPipeline 的内部结构如下图所示。

image.png

用户自定义的异常处理器代码示例如下: ```java package io.netty.example.pipeline;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {

    public static void main(String[] args) throws Exception {

        EventLoopGroup workerGroup = new NioEventLoopGroup();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    //通过反射创建反射工厂类根据无参构造函数 反射生成实例
                    //将NioServerSocketChannel绑定到了bossGroup
                    //NioServerSocketChannel接收到请求会创建SocketChannel放入workerGroup
                    .channel(NioServerSocketChannel.class)

                    //指的是SocketChannel
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //指的是SocketChannel
                    .childOption(NioChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    //默認不使用堆外内存
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    //false 不使用堆外内存
                    .childOption(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false))
                    //   .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            //正常情况
                          /*
                            p.addLast("SampleInBoundHandlerA", new SampleInBoundHandler("SampleInBoundHandlerA", false));
                            p.addLast("SampleInBoundHandlerB", new SampleInBoundHandler("SampleInBoundHandlerB", false));
                            p.addLast("SampleInBoundHandlerC", new SampleInBoundHandler("SampleInBoundHandlerC", true));
                            */

                            p.addLast("SampleInBoundHandlerA", new SampleExceptionInBoundHandler("SampleInBoundHandlerA", false));
                            p.addLast("SampleInBoundHandlerB", new SampleExceptionInBoundHandler("SampleInBoundHandlerB", false));
                            p.addLast("SampleInBoundHandlerC", new SampleExceptionInBoundHandler("SampleInBoundHandlerC", true));

                            //添加异常处理器
                            p.addLast(new ExceptionHandler());

                            p.addLast("SampleOutBoundHandlerA", new SampleOutBoundHandler("SampleOutBoundHandlerA"));
                            p.addLast("SampleOutBoundHandlerB", new SampleOutBoundHandler("SampleOutBoundHandlerB"));
                            p.addLast("SampleOutBoundHandlerC", new SampleOutBoundHandler("SampleOutBoundHandlerC"));
                        }
                    });

            ChannelFuture f = b.bind(8090).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
//进站出站异常处理器
public class ExceptionHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (cause instanceof RuntimeException) {
            System.out.println("Handle Business Exception Success.");
        }
    }
}

``` 加入统一的异常处理器后,可以看到异常已经被优雅地拦截并处理掉了。这也是 Netty 推荐的最佳异常处理实践。

image (5).png

总结

本节课我们深入分析了 Pipeline 的设计原理与事件传播机制。

那么前面的几个问题你是否已经都找到答案,来做个简单的总结:

  • ChannelPipeline 是双向链表结构,包含 ChannelInboundHandler 和 ChannelOutboundHandler 两种处理器。
  • ChannelHandlerContext 是对 ChannelHandler 的封装,每个 ChannelHandler 都对应一个 ChannelHandlerContext,实际上 ChannelPipeline 维护的是与 ChannelHandlerContext 的关系。
  • Inbound 事件和 Outbound 事件的传播方向相反,Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head。
  • 异常事件的处理顺序与 ChannelHandler 的添加顺序相同,会依次向后传播,与 Inbound 事件和 Outbound 事件无关。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/805137.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《Elasticsearch 源码解析与优化实战》第5章:选主流程

《Elasticsearch 源码解析与优化实战》第5章&#xff1a;选主流程 - 墨天轮 一、简介 Discovery 模块负责发现集群中的节点&#xff0c;以及选择主节点。ES 支持多种不同 Discovery 类型选择&#xff0c;内置的实现称为Zen Discovery ,其他的包括公有云平台亚马逊的EC2、谷歌…

C++之普通函数指针/类成员函数指针/lambda回调函数总结(一百六十八)

简介&#xff1a; CSDN博客专家&#xff0c;专注Android/Linux系统&#xff0c;分享多mic语音方案、音视频、编解码等技术&#xff0c;与大家一起成长&#xff01; 优质专栏&#xff1a;Audio工程师进阶系列【原创干货持续更新中……】&#x1f680; 人生格言&#xff1a; 人生…

边写代码边学习之卷积神经网络CNN

1. 卷积神经网络CNN 卷积神经网络&#xff08;Convolutional Neural Network&#xff0c;CNN&#xff09;是一种深度学习神经网络的架构&#xff0c;主要用于图像识别、图像分类和计算机视觉等任务。它是由多层神经元组成的神经网络&#xff0c;其中包含卷积层、池化层和全连接…

算法与数据结构-二分查找

文章目录 什么是二分查找二分查找的时间复杂度二分查找的代码实现简单实现&#xff1a;不重复有序数组查找目标值变体实现&#xff1a;查找第一个值等于给定值的元素变体实现&#xff1a;查找最后一个值等于给定值的元素变体实现&#xff1a;查找最后一个小于给定值的元素变体实…

【雕爷学编程】MicroPython动手做(10)——零基础学MaixPy之神经网络KPU2

KPU的基础架构 让我们回顾下经典神经网络的基础运算操作&#xff1a; 卷积&#xff08;Convolution&#xff09;:1x1卷积&#xff0c;3x3卷积&#xff0c;5x5及更高的卷积 批归一化&#xff08;Batch Normalization&#xff09; 激活&#xff08;Activate&#xff09; 池化&…

玩一玩编程式 AOP

[toc] 平时我们项目中涉及到 AOP&#xff0c;基本上就是声明式配置一下就行了&#xff0c;无论是基于 XML 的配置还是基于 Java 代码的配置&#xff0c;都是简单配置即可使用。声明式配置有一个好处就是对源代码的侵入小甚至是零侵入。不过今天松哥要和小伙伴们聊一聊编程式的 …

Chapter 9: Lists | Python for Everybody 讲义笔记_En

文章目录 Python for Everybody课程简介ListsA list is a sequenceLists are mutableTraversing a listList operationsList slicesList methodsDeleting elementsLists and functionsLists and stringsParsing linesObjects and valuesAliasingList argumentsDebuggingGlossar…

【Spring】Spring 下载及其 jar 包

根据 【动力节点】最新Spring框架教程&#xff0c;全网首套Spring6教程&#xff0c;跟老杜从零学spring入门到高级 以及老杜的原版笔记 https://www.yuque.com/docs/share/866abad4-7106-45e7-afcd-245a733b073f?# 《Spring6》 进行整理&#xff0c; 文档密码&#xff1a;mg9b…

数字签名与数字证书

数字签名与数字证书 数字签名数字证书数字证书的原理数字证书的特点 如何验证证书机构的公钥不是伪造的 数字签名 数字签名是非对称密钥加密技术与数字摘要技术的应用&#xff0c;数字签名就是用加密算法加密报文文本的摘要&#xff08;摘要通过hash函数得到&#xff09;而生成…

「回溯框架」

文章目录 0 回溯和动态规划&#xff08;dp&#xff09;的区别0.1 框架 1 刷题1.1 全排列1.1.1 题解1.1.2 Code1.1.3 结果 1.2 N皇后1.2.1 题解1.2.2 Code1.2.3 结果 0 回溯和动态规划&#xff08;dp&#xff09;的区别 动态规划的核心是穷举&#xff0c;那么回溯算法和dp有什么…

单机最快的队列Disruptor解析和使用

前言 介绍高性能队列Disruptor原理以及使用例子。 Disruptor是什么? Disruptor是外汇和加密货币交易所运营商 LMAX group 建立高性能的金融交易所的结果。用于解决生产者、消费者及其数据存储的设计问题的高性能队列实现。可以对标JDK中的ArrayBlockingQueue。是目前单机且…

IDC报告背后:大模型时代,重新理解AI公有云

大模型之于AI公有云的意义&#xff0c;在于大模型可以改变过去“手工作坊定制算法”的高成本模式&#xff0c;转向“工厂模式”&#xff0c;只需要微调和精调&#xff0c;就可以形成针对性的场景算法。 作者|葛覃 出品|产业家 一年前&#xff0c;依然有不少云计算从业者思…

基于智能状态和源代码插桩的 C 程序内存安全性动态分析

原文来自微信公众号“编程语言Lab”&#xff1a;基于智能状态和源代码插桩的 C 程序内存安全性动态分析 搜索关注“编程语言Lab”公众号&#xff08;HW-PLLab&#xff09;获取更多技术内容&#xff01; 欢迎加入 编程语言社区 SIG-程序分析 参与交流讨论&#xff08;加入方式&a…

警惕!通过谷歌和必应搜索广告传播的新型恶意活动

据观察&#xff0c;一种新的恶意广告活动利用谷歌搜索和必应的广告&#xff0c;以AnyDesk、Cisco AnyConnect VPN和WinSCP等IT工具的用户为目标&#xff0c;诱骗他们下载木马安装程序&#xff0c;目的是入侵企业网络&#xff0c;并可能在未来实施勒索软件攻击。 Sophos在周三的…

Python生成pyc以及pyd文件的方法

文章目录 0. 背景1. pyc文件的生成2. pyd文件的生成3. 两者的异同 0. 背景 当有些模块的代码需要一定的保密性&#xff0c;这个时候就需要考虑pyc和pyd文件了。今天就好好琢磨一下这两种文件的生成和使用方法。让自己的知识能够朝着商业化的方向再前进一步。 1. pyc文件的生成…

为企业发展赋能增效:中国智能交通协会来访闪马智能

7月26日&#xff0c;中国智能交通协会秘书长杨颖一行来访闪马智能&#xff0c;闪马智能助理总裁兼营销与方案中心总经理黄智宏、CMO王一佳、副总裁詹诚以及副总裁兼智慧城市创新院院长邵钦豪等出席了交流会。 上海电科智能系统股份有限公司、卡斯柯信号有限公司、上海澳星照明电…

告别胆怯,大步向前,迎接新挑战!

告别胆怯&#xff0c;大步向前&#xff0c;迎接新挑战&#xff01; “赤日炎炎似火烧&#xff0c;野田禾稻半枯焦。农夫心内如汤煮。公子王孙把扇摇。”读罢《水浒传》中的这一首七绝诗&#xff0c;受其感染&#xff0c;笔者也乘兴呤顺口溜四句抒怀&#xff1a;“烈日炎炎似火…

FlatBuffers 使用编译器

1、前言 可能刚接触的人会思考为啥要使用编译器&#xff1a; 一般跨平台、跨语言的都有一套固定的流程&#xff0c;大致可分为&#xff1a; 撰写IDL文件 -> 使用对应语言的编译器&#xff0c;编译成对应的语言 -> 序列化 ->持久化 -> 反序列化 这里就对应着这个…

Spring中IOC容器常用的接口和具体的实现类

在Spring框架没有出现之前&#xff0c;在Java语言中&#xff0c;程序员们创建对象一般都是通过关键字new来完成&#xff0c;那时流行一句话“万物即可new&#xff0c;包括女朋友”。但是这种创建对象的方式维护成本很高&#xff0c;而且对于类之间的相互关联关系很不友好。鉴于…

三言两语说透关于 MySQL2 和 MySQL 的区别

MySQL是最流行的开源关系型数据库管理系统,拥有大量的使用者和广泛的应用场景。而MySQL2是MySQL官方团队推出的新一代MySQL驱动&#xff0c;用于取代老版的MySQL模块&#xff0c;提供更好的性能和更丰富的功能。本文将介绍MySQL2相较于MySQL有哪些优势以及具体的技术区别。 My…