Netty-ChannelPipeline

news2025/1/31 19:45:56

EventLoop可以说是 Netty 的调度中心,负责监听多种事件类型:I/O 事件、信号事件、定时事件等,然而实际的业务处理逻辑则是由 ChannelPipeline 中所定义的 ChannelHandler 完成的,ChannelPipeline 和 ChannelHandler应用开发的过程中打交道最多的组件,为用户提供了 I/O 事件的全部控制权。

文章目录

  • 一、ChannelPipeline 是什么?🤔️
  • 二、ChannelPipeline 的内部结构🔍
    • 1、HeadContext
    • 2、TailContext
    • 3、addLiast() 方法🔍
  • 三、ChannelPipeline 事件传播机制
  • 四、ChannelPipeline 异常传播机制
  • 五、统一的异常处理器

一、ChannelPipeline 是什么?🤔️

pipeline 有管道,流水线的意思,最早使用在 Unix 操作系统中,可以让不同功能的程序相互通讯,使软件更加”高内聚,低耦合”,它以一种”链式模型”来串起不同的程序或组件,使它们组成一条直线的工作流。

ChannelPipeline 也是 Netty 中的一个比较重要的组件,从上面的 Channel 实例化过程可以看出,每一个 Channel 实例中都会包含一个对应的 ChannelPipeline 属性。ChannelPipeline维护着处理或拦截channel的进站事件和出站事件的双向链表,事件在ChannelPipeline中流动和传递,可以增加或删除ChannelHandler来实现对不同业务逻辑的处理。通俗的说,ChannelPipeline是工厂里的流水线,ChannelHandler是流水线上的工人。

二、ChannelPipeline 的内部结构🔍

final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

从 ChannelPipeline 的构造函数可以看出,ChannelPipeline 维护了一组 ChannelHandlerContext 实例组成双向链表。默认会包含 head 和 tail 头尾节点,用来进行一些默认的逻辑处理。我们自定义的ChannelHandler会插入到 head 和 tail 之间,这两个节点在 Netty 中已经默认实现了,它们在ChannelPipeline 中起到了至关重要的作用。

那么你可能会有疑问,为什么这里会多一层 ChannelHandlerContext 的封装呢?

其实这是一种比较常用的编程思想。ChannelHandlerContext用于保存ChannelHandler。ChannelHandlerContext包含了ChannelHandler生命周期的所有事件,如 connect、bind、read、 flush、write、close 等。

可以试想一下,如果没有ChannelHandlerContext 的这层封装,那么我们在做 ChannelHandler 之间传递的时候。前置后置的通用逻辑就要在每个 ChannelHandler 里都实现一份。

首先我们看下 HeadContext 和 TailContext 的继承关系
在这里插入图片描述

1、HeadContext

通过集成关系我们发现 HeadContext 分别实现了ChannelInboundHandler 和 ChannelOutboundHandler,即 HeadContext 既是 入站处理器,也是出站处理器。

HeadContext是入站第一站出站最后一站。对于1个请求先由HeadContext处理入栈,经过一系列的入栈处理器然后传递到TailContext,由TailContext往下传递经过一系列的出栈处理器,最后再经过HeadContext返回。

2、TailContext

TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 入站事件传播,例如释放 Message 数据资源等。

TailContext是入站最后一站出站第一站。TailContext节点作为出站事件传播的第一站,仅仅是将出站事件传递给下一个节点。

从整个 ChannelPipeline 调用链路来看,如果由 Channel 直接触发事件传播,那么调用链路将贯穿整个 ChannelPipeline。然而也可以在其中某一个 ChannelHandlerContext 触发同样的方法,这样只会从当前的 ChannelHandler 开始执行事件传播,该过程不会从头贯穿到尾,在一定场景下,可以提高程序性能。

3、addLiast() 方法🔍

addLast() 方法是向 ChannelPipeline 中添加 ChannelHandler 用来进行业务处理,关于ChannelHandler将会在下文中详细讲解!
在这里插入图片描述

三、ChannelPipeline 事件传播机制

入站事件是由I/O线程被动触发,由入站处理器按自下而上的方向处理,在中途可以被拦截丢弃,出站事件由用户handler主动触发,由出站处理器按自上而下的方向处理。
在这里插入图片描述
接下来用一个示例来讲解~

服务端代码,

public class PipelineServer {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup boss = new NioEventLoopGroup(1);
        NioEventLoopGroup worker = new NioEventLoopGroup(2);
        new ServerBootstrap()
                .group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(1);
                                super.channelRead(ctx, msg);
                            }
                        });
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(2);
                                super.channelRead(ctx, msg);
                            }
                        });
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(3);
                                super.channelRead(ctx, msg);
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(4);
                                super.write(ctx, msg, promise);
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(5);
                                super.write(ctx, msg, promise);
                            }
                        });
                        ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(6);
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

客户端代码,

public class PipelineClient {
    public static void main(String[] args) throws InterruptedException {
        new Bootstrap()
                .group( new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(msg);
                                super.channelRead(ctx, msg);
                            }
                        });
                        ch.pipeline().addLast(new StringEncoder());
                    }
                }).connect("127.0.0.1", 8080)
                .sync()
                .channel()
                .writeAndFlush("Hello,server!");
    }
}

依次启动服务端和客户端,服务端打印如下:

1
2
3

以上我们通过 Pipeline 的 addLast 方法分别添加了三个 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter,添加顺序分别是 1 -> 2 -> 3,4 -> 5 -> 6。

此时为什么没有打印 4、5、6呢,即没有触发出站的操作❓

出站处理器只有向channel中写入数据才会触发,我们在第三个 ChannelInboundHandlerAdapter 实现类中加入以下代码!

在这里插入图片描述
通过依次点入,我们发现最终是调用了 tail节点 的writeAndFlush 方法,即TailContext节点作为出站事件传播的第一站!
在这里插入图片描述

最终服务端打印如下:

1
2
3
6
5
4

可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表

在这里插入图片描述

  • 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
    • 如果注释掉 1 处代码,则仅会打印 1
    • 如果注释掉 2 处代码,则仅会打印 1 2
  • 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
    • 如果注释掉 3 处代码,则仅会打印 1 2 3
  • 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
    • 如果注释掉 6 处代码,则仅会打印 1 2 3 6
  • ctx.channel().write(msg) vs ctx.write(msg)
    • 都是触发出站处理器的执行
    • ctx.channel().write(msg) 从尾部开始查找出站处理器
    • ctx.write(msg) 是从当前节点找上一个出站处理器
    • 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
    • 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己

如图,服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序

在这里插入图片描述

四、ChannelPipeline 异常传播机制

ChannelPipeline 事件传播的实现采用了经典的责任链模式,调用链路环环相扣。那么如果有一个节点处理逻辑异常会出现什么现象呢?我们通过修改 第二个 ChannelInboundHandlerAdapter 实现类 的实现来模拟业务逻辑异常:
在这里插入图片描述
由输出结果可以看出 ctx.fireExceptionCaugh 会将异常按顺序从 Head 节点传播到 Tail 节点
在这里插入图片描述
如果用户没有对异常进行拦截处理,最后将由 Tail 节点统一处理,在 TailContext 源码中可以找到具体实现:
在这里插入图片描述

五、统一的异常处理器

在 Netty 应用开发的过程中,良好的异常处理机制会让开发在排查问题的时候事半功倍。虽然 Netty 中 TailContext 提供了兜底的异常处理逻辑,但是在很多场景下,并不能满足我们的需求。假如你需要拦截指定的异常类型,并做出相应的异常处理,应该如何实现呢?

小编个人推荐用户对异常进行统一拦截,然后根据实际业务场景实现更加完善的异常处理机制。

通过异常传播机制的学习,我们应该可以想到最好的方法是在 ChannelPipeline 自定义处理器的末端添加统一的异常处理器!

/**
 * 自定义异常处理器
 */
public static class ExceptionHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (cause instanceof RuntimeException) {
            System.out.println();
            log.error("业务异常处理,异常信息:{}", cause.getMessage());
        }
    }
}

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/964917.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

剑指 Offer 44. 数字序列中某一位的数字(中等)

题目&#xff1a; class Solution { //本题单纯找规律&#xff0c;要注意通过n%digits来判断有几个位数为digits的数 public:int findNthDigit(int n) {long base 9, digits 1; //digits代表位数while(n-base*digits>0){ //该循环是为了确定目标数字所在…

找不到msvcp140.dll的解决方法【msvcp140.dll修复工具下载】

今天&#xff0c;我将为大家分享一个与我们日常工作息息相关的话题——msvcp140.dll重新安装的5种解决方法。在接下来的时间里&#xff0c;我将向大家介绍什么是msvcp140.dll,为什么会丢失&#xff0c;以及它的用途。最后&#xff0c;我将为大家提供5种解决方法&#xff0c;帮助…

【人工智能】—_神经网络、前向传播、反向传播、梯度下降、局部最小值、多层前馈网络、缓解过拟合的策略

神经网络、前向传播、反向传播 文章目录 神经网络、前向传播、反向传播前向传播反向传播梯度下降局部最小值多层前馈网络表示能力多层前馈网络局限缓解过拟合的策略 前向传播是指将输入数据从输入层开始经过一系列的权重矩阵和激活函数的计算后&#xff0c;最终得到输出结果的过…

useEffect 不可忽视的 cleanup 函数

在 react 开发中&#xff0c; useEffect 是我们经常会使用到的钩子&#xff0c;一个基础的例子如下&#xff1a; useEffect(() > {// some code here// cleanup 函数return () > {doSomething()} }, [dependencies])上述代码中&#xff0c; cleanup 函数的执行时机有如下…

[dasctf]misc1

不确定何种加密方式 P7NhnTtPUm/L3rmkP/eAhx5Vnbc2YyatkXCePJ0Wh2NYfqXGZCpZdCesMmEAihhUYI1PjoLq6FedZ7MSclA9h0/Dy4CavBwVg5RHr8XJmfbtuWkxK2Gn3sNTEzQi0p 1t_15_s3cR3t_k3y 也许是密钥

html5——前端笔记

html 一、html51.1、理解html结构1.2、h1 - h6 (标题标签)1.3、p (段落和换行标签)1.4、br 换行标签1.5、文本格式化1.6、div 和 span 标签1.7、img 图像标签1.8、a 超链接标签1.9、table表格标签1.9.1、表格标签1.9.2、表格结构标签1.9.3、合并单元格 1.10、列表1.10.1、ul无序…

vmware虚拟机远程开发

目录 1. 下载vmware2. 下载ubuntu镜像3. 安装4. 做一些设置4.1 分辨率设置4.2 语言下载4.3 输入法设置4.4 时区设置 5. 直接切换管理员权限6. 网络6.1 看ip6.2 ssh 7. 本地编译器连接远程服务器7.1 创建远程部署的配置7.2 文件同步7.3 远程启动项目 8. ubuntu安装golang环境8.1…

C++学习笔记总结练习:多态与虚函数

1 多态 多态分类 静态多态&#xff0c;是只在编译期间确定的多态。静态多态在编译期间&#xff0c;根据函数参数的个数和类型推断出调用的函数。静态多态有两种实现的方式 重载。&#xff08;函数重载&#xff09;模板。 动态多态&#xff0c;是运行时多态。通过虚函数机制实…

单片机开发中的内存优化

在单片机开发中&#xff0c;内存优化是至关重要的&#xff0c;它不仅能够降低成本&#xff0c;还可以提高性能。本文将深入讨论如何在STM32单片机和C语言的环境中实施内存优化策略&#xff0c;以确保项目的顺利进行。 单片机内存资源通常包括RAM&#xff08;随机访问存储器&am…

Java空指针异常

在所有的RuntimeException异常中&#xff0c;Java程序员最熟悉的恐怕就是NullPointerException了。 NullPointerException即空指针异常&#xff0c;俗称NPE。如果一个对象为null&#xff0c;调用其方法或访问其字段就会产生NullPointerException&#xff0c;这个异常通常是由J…

2022年12月 C/C++(六级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C编程&#xff08;1~8级&#xff09;全部真题・点这里 第1题&#xff1a;区间合并 给定 n 个闭区间 [ai; bi]&#xff0c;其中i1,2,…,n。任意两个相邻或相交的闭区间可以合并为一个闭区间。例如&#xff0c;[1;2] 和 [2;3] 可以合并为 [1;3]&#xff0c;[1;3] 和 [2;4] 可以…

腾讯云网站备案详细流程_审核时间说明

腾讯云网站备案流程先填写基础信息、主体信息和网站信息&#xff0c;然后提交备案后等待腾讯云初审&#xff0c;初审通过后进行短信核验&#xff0c;最后等待各省管局审核&#xff0c;前面腾讯云初审时间1到2天左右&#xff0c;最长时间是等待管局审核时间&#xff0c;网站备案…

Python入门教程 - 判断语句(二)

目录 一、布尔类型 二、比较运算符 三、if判断语句 一、布尔类型 True False result1 10 > 5 result2 10 < 5 print(result1) print(result2) print(type(result1)) True False <class bool> 二、比较运算符 ! > < > < 比较运算的结果是布尔…

8. 摆平积木

题目&#xff1a; 小明很喜欢玩积木。一天&#xff0c;他把许多积木块组成了好多高度不同的堆&#xff0c;每一堆都是一个摞一个的形式。然而此时&#xff0c;他又想把这些积木堆变成高度相同的。但是他很懒&#xff0c;他想移动最少的积木块来实现这一目标&#xff0c; 你能帮…

DevEco Studio 配置

首先,打开deveco studio 进入首页 …我知道你们想说什么,我也想说 汉化配置 没办法,老样子,先汉化吧,毕竟母语看起来舒服 首先,点击软件左下角的configure,在配置菜单里选择plugins 进入到插件页面, 输入chinese,找到汉化插件,(有一说一写到这我心里真是很不舒服) 然后点击o…

2023年05月 C/C++(六级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C编程&#xff08;1~8级&#xff09;全部真题・点这里 第1题&#xff1a;字符串插入 有两个字符串str和substr&#xff0c;str的字符个数不超过10&#xff0c;substr的字符个数为3。&#xff08;字符个数不包括字符串结尾处的’\0’。&#xff09;将substr插入到str中ASCII码…

百万级并发IM即时消息系统(4)Swagger

golang swagger注解说明_go swagger 注释_mctlilac的博客-CSDN博客 Gin(十):集成 Swagger - 掘金 (juejin.cn) 手把手详细教你如何使用go-swagger文档 - 掘金 (juejin.cn) 08_Swagger&Logger复盘整理_哔哩哔哩_bilibili 1.配置swagger 1&#xff09;swagger ginSwag…

kafka详解一

kafka详解一 1、消息引擎背景 根据维基百科的定义&#xff0c;消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息&#xff0c;实现松耦合的异步式数据传递. 即&#xff1a;系统 A 发送消息给消息引擎系统&#xff0c;系统 B 从消息引擎系统中读取 A…

[dasctf]misc04

与他不说一模一样吧也差不多 第三届红明谷杯CTF-【MISC】-阿尼亚_keepb1ue的博客-CSDN客flag.zip需要解压密码&#xff0c;在图片中发现一串密文。一串乱码&#xff0c;尝试进行字符编码爆破。获取到密码&#xff1a;简单的编码。https://blog.csdn.net/qq_36618918/article/d…

IE浏览器攻击:MS11-003_IE_CSS_IMPORT

目录 概述 利用过程 漏洞复现 概述 MS11-003_IE_CSS_IMPORT是指Microsoft Security Bulletin MS11-003中的一个安全漏洞&#xff0c;影响Internet Explorer&#xff08;IE&#xff09;浏览器。这个漏洞允许攻击者通过在CSS文件中使用import规则来加载外部CSS文件&#xff0…