Netty核心组件ChannelPipeline事件handler源码解析

news2024/10/2 14:28:47

源码解析目标

  • 当请求进来,ChannelPipeline如何协调内部这些Handler
  • 通过源码梳理ChannelPipeline 与ChannelHandlerContext中的read,fireChannelRead等方法的不同
inbound源码解析
  • 在 Netty启动流程源码剖析 文中我们已经知道,启动后,Netty通过监听processSelectedKey来对事件进行监听,我们找到NioEventLoop类中对应方法,如下
 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
      ......

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
  • 通过Debug方式启动Server, 通过run的方式启动Client,

  • 我们关注的OP_READ事件的监听,调用的unsafe.read方法,

  • 第一次时完成连接操作OP_ACCEPT
    在这里插入图片描述

  • 第二次到read方法才是OP_READ
    在这里插入图片描述

  • 接着追,完成SocketUtil连接后,我们得到了一个封装好的HandlerList,遍历这个list并且调用fireChannelRead,如下源码

//代码位置NioMessageUnsafe
public void read() {
          ......
           final ChannelPipeline pipeline = pipeline();
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
                ......
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
//继续追fireChannelRead方法,进入我们今天要关注的DefaultChannelPipeline
  @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
  • 我们在来看下DefaultChannelPipeline的类UML图

在这里插入图片描述

  • 如上,他既有inbound的方法,又有outbound的方法,部分源码如下

//inbound部分
public class DefaultChannelPipeline implements ChannelPipeline {

    @Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }

    @Override
    public final ChannelPipeline fireChannelInactive() {
        AbstractChannelHandlerContext.invokeChannelInactive(head);
        return this;
    }

    @Override
    public final ChannelPipeline fireExceptionCaught(Throwable cause) {
        AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
        return this;
    }

    @Override
    public final ChannelPipeline fireUserEventTriggered(Object event) {
        AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
        return this;
    }

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

    @Override
    public final ChannelPipeline fireChannelReadComplete() {
        AbstractChannelHandlerContext.invokeChannelReadComplete(head);
        return this;
    }

    @Override
    public final ChannelPipeline fireChannelWritabilityChanged() {
        AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
        return this;
    }
}

  • inbound部分源码可以看出,所有的入站事件都是由fire开头,表示管道的流动,让后面的handler继续处理(此处之后源码分析)
  • 观察以上方法的入参,他们给定的都是一个Handler,并且都是HeadHandler节点,说明我们的入站事件都是从Head节点开始,事件依次在每一个Handler中流转最终到我们服务端,如下源码
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
  • next.executor(); 首先获取一个NioEventLoop 执行器
  • executor.inEventLoop() 判断当前线程和是否时执行器的线程,接着执行 next.invokeChannelRead(m);
  • 接着从Head节点开始执行以下方法执行以下
private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

在这里插入图片描述

  • 关键方法,channelRead(this, msg);参数this就是当前Handler,首先就是HeadContextHandler,接着看HeadHandler中的channelRead方法,只有一个fireChannelRead
@Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }
        //所以又来到了这里,第二次执行,会再次获取一次Next,也就是第二个handler
        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
  
  • 也就是HeadHandler只做了一次转发,将持有的msg转发给了下一个Handler,也就是我们自己添加的EchoServerHandler,如果我们实现的方法分中没有fireChannelRead ,则不会到tailContext中,那么读事件久完成了,总流程如下
  • fireChannelRead方法 —〉 AbstractChannelHandlerContext.invokeChannelRead(context); —〉next(context) 中 next.invokeChannelRead(m); —〉具体Handler的channelRead 方法(以及自实现方法)—〉Context的fireChannelRead方法参数是下一个节点的ChannelHandlerContext(此处不一定是fireChannelRead,只是这次我们Debug是read,有可能是Actice等其他事件)
outbound源码解析
  • Pipeline的outbound的fire相关方法,同样截取DefaultChannelPipeline
  @Override
  public class DefaultChannelPipeline implements ChannelPipeline {
    public final ChannelFuture bind(SocketAddress localAddress) {
        return tail.bind(localAddress);
    }

    @Override
    public final ChannelFuture connect(SocketAddress remoteAddress) {
        return tail.connect(remoteAddress);
    }

    @Override
    public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return tail.connect(remoteAddress, localAddress);
    }

    @Override
    public final ChannelFuture disconnect() {
        return tail.disconnect();
    }

    @Override
    public final ChannelFuture close() {
        return tail.close();
    }

    @Override
    public final ChannelFuture deregister() {
        return tail.deregister();
    }

    @Override
    public final ChannelPipeline flush() {
        tail.flush();
        return this;
    }

    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

    @Override
    public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }

    @Override
    public final ChannelFuture connect(
            SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, localAddress, promise);
    }

    @Override
    public final ChannelFuture disconnect(ChannelPromise promise) {
        return tail.disconnect(promise);
    }

    @Override
    public final ChannelFuture close(ChannelPromise promise) {
        return tail.close(promise);
    }

    @Override
    public final ChannelFuture deregister(final ChannelPromise promise) {
        return tail.deregister(promise);
    }

    @Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }

    @Override
    public final ChannelFuture write(Object msg) {
        return tail.write(msg);
    }

    @Override
    public final ChannelFuture write(Object msg, ChannelPromise promise) {
        return tail.write(msg, promise);
    }

    @Override
    public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        return tail.writeAndFlush(msg, promise);
    }

    @Override
    public final ChannelFuture writeAndFlush(Object msg) {
        return tail.writeAndFlush(msg);
    }
}
  • 这些都是出站方法的实现,但是都是调用的outboumd类的tailHandler来进程处理的,也就是我们用pipeline中的出站方法时候总是从TailHandler节点开始处理事件
  • 找一个比较典型的outbound方法分析,bind方法,如下源码
@Override
    public final ChannelFuture bind(SocketAddress localAddress) {
        return tail.bind(localAddress);
    }
    //继续根bind
    @Override
    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        if (isNotValidPromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }
    
  • 入上源码首先参数检验,isNotValidPromise
  • 接着,findContextOutbound 找第一个outbound节点对应的Handler(while循环,之前文章分析过)
  • 接着同inbound流程一样 next.executor(); 拿到当前HandlerContext的执行器
  • 执行对于HandlerContext对于的 next.invokeBind(localAddress, promise); 方法
  • 举例一个具体的HandlerContext对应的invokeBind方法,LoggingHandler,如下
@Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        if (logger.isEnabled(internalLevel)) {
            logger.log(internalLevel, format(ctx, "BIND", localAddress));
        }
        ctx.bind(localAddress, promise);//又回到了bind,然后接着next,同inbound一样的流程
    }
总结
  • ChannelPipeline中的出入站方法中出站是从tail开始,入站是从head开始,
  • 因为出站是从服务器端通过ChannelPipeline到SocketChannel写,从tailHandlerContext开始,能让前面所有的Handler进行处理,能防止Handler被遗漏,比如编码
  • 入站当然是从SocketChannel通过 ChannelPipeline 到服务器端,当然是从head开始往内部输入,让后面的handler能处理这些输入数据,比如解码
  • 即使HeadHandler也实现了outbound接口,但是不是从head开始执行出站任务的
  • 用如下图来理解数据在源码中执行的流程

在这里插入图片描述

  • 说明
    • pipeline首先会调用Conetxt的静态方法,并传入Context

    • 接着静态方法调用Context的invoke方法,而invoke方法内部会调用该Context所包含的Handler的真正的XXX方法(例如我们自己实现的EchoServerHandler中的Read,activity等事件方法)

    • 调用结束后,如果还需要继续向后传递,就调用Contexxt的firexxx2方法,循环往复

    • 由上可以得出,ChannelPipeline中的inbound,outbound 分别是从HeadContext,tailContext,开始执行过每一个handler

    • 而ChannelHandlerContext中的inbound,outbound是从本节点开始 向后或者向前传递

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/369782.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

公司项目vue cli2升级到vue cli3

背景&#xff1a;公司项目历时时间较长&#xff0c;通过长时间的迭代&#xff0c;目前项目文件较多&#xff08;src目录下有2217个文件&#xff09;&#xff0c;系统庞大&#xff0c; 之前通过vue cli2脚手架构建的项目框架&#xff0c;在本地开发时已经明显感觉到吃力&#xf…

Win10+vs2019配置与运行RenderMatch(踩坑记录)

Win10vs2019配置与运行RenderMatch RenderMatch旨在解决aerial images 和ground images 匹配问题&#xff0c;其思路可参考原论文 “Leveraging Photogrammetric Mesh Models for Aerial-Ground Feature Point Matching Toward Integrated 3D Reconstruction” 1.源码下载 G…

【2023new】OAK相机如何将Yolov5转换成blob格式?

编辑&#xff1a;OAK中国 首发&#xff1a;oakchina.cn 喜欢的话&#xff0c;请多多&#x1f44d;⭐️✍ 内容可能会不定期更新&#xff0c;官网内容都是最新的&#xff0c;请查看首发地址链接。 ▌前言 Hello&#xff0c;大家好&#xff0c;这里是OAK中国&#xff0c;我是助手…

机械革命极光Pro电脑开启出现英文代码无法启动怎么办?

机械革命极光Pro电脑开启出现英文代码无法启动怎么办&#xff1f;有的小伙伴在使用机械革命极光Pro电脑的时候&#xff0c;正常开启电脑却无法进入到桌面中&#xff0c;而是显示一些英文错误提示。遇到这个问题是我们的系统故障了&#xff0c;可以通过U盘重装系统的方法来进行问…

logback 自定义日志输出到数据库

项目日志格式 Spring Boot 的默认日志输出类似于以下示例&#xff1a; 2021-12-14 22:40:14.159 INFO 20132 --- [ main] com.kuangstudy.SpringbootApplication : Started SpringbootApplication in 2.466 seconds (JVM running for 3.617)输出以下项目&…

SpringBoot 整合 MongoDB 6 以上版本副本集及配置 SSL / TLS 协议

续上一篇 Linux 中使用 docker-compose 部署 MongoDB 6 以上版本副本集及配置 SSL / TLS 协议 前提&#xff1a;此篇文章是对上一篇文章的实战和项目中相关配置的使用&#xff0c;我这边针对 MongoDB 原有基础上做了增强&#xff0c;简化了 MongoDB 配置 SSL / TLS 协议上的支…

Android Studio引入JNI第三方库

一、前言 JNI作为Java与native沟通的桥梁&#xff0c;项目开发中难免要使用到&#xff1b;而我们除了自己开发JNI之外&#xff0c;有时候还要在Android Studio引入别人开源的C第三方库&#xff0c;并在jni层实现第三方库的调用。 二、流程 1.导入头文件和实现文件 将第三方…

Linux内核进程地址空间与进程内存布局

一&#xff0c;进程空间分布概述 对于一个进程&#xff0c;其空间分布如下图所示&#xff1a; 程序段(Text):程序代码在内存中的映射&#xff0c;存放函数体的二进制代码。初始化过的数据(Data):在程序运行初已经对变量进行初始化的数据。未初始化过的数据(BSS):在程序运行初未…

九龙证券|券商春季策略扎堆来袭 风格切换成焦点

2月以来&#xff0c;国泰君安、中信建投、国金证券等10余家券商组织相继发布2023年春季战略。综合来看&#xff0c;组织对A股持达观预期&#xff0c;未来两三个月A股商场或迎来重要切换。风格上&#xff0c;“中心财物&#xff0c;生长接力”或许成为上半年装备主线&#xff0c…

java 系列之Mybatis

java 系列文章 文章目录java 系列文章前言一、Mybatis 入门1.1 认识 框架&#xff08;了解&#xff09;1.2 认识 ORM&#xff08;要知道&#xff09;1.3 认识 Mybatis&#xff08;要知道&#xff09;二、Mybatis 使用2.1 创建maven项目并导入依赖2.2 准备数据库&#xff0c;包和…

释放内存流程

你好&#xff0c;我是安然无虞。 thread cache回收内存 当从 thread cache 中申请的内存对象使用完毕需要还回来的时候, 只需要计算出该内存对象对应 thread cache 中的哪一个自由链表桶, 然后将该内存对象插入进去即可. 不过需要注意的是, 如果不断有内存对象释放回来, 那么…

Java实现根据拼音首字母的排序

1.项目 手机APP端要对企业列表按企业名称首字母(如果企业名是英文的就按)进行分类排序&#xff0c;效果如下&#xff1a; 2.实现过程 2.1 首先引入项目的pinyin4j-2.5.0.jar包。 这个jar的下载地址如下&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1hkP_gGAYcgzyK_D…

跨链桥:Web3黑客必争之地

跨链桥&#xff0c;区块链的基础设施之一&#xff0c;所实现的功能是允许用户将自己的资产从一条链转移至另外一条链上&#xff0c;是连接不同的区块链的关键桥梁&#xff0c;常使用中心化的方式进行实现。由于跨链桥自身往往存储有用户所质押的巨额资产&#xff0c;是Web3黑客…

KUKA KR C4机器人与S7-1200PLC进行PROFINET通信的具体方法和步骤

KUKA KR C4机器人与S7-1200PLC进行PROFINET通信的具体方法和步骤 首先,从KUKA机器人控制柜中将KOP备选软件包拷贝出来,然后在“WorkVisual Development Environment”安装KUKA备选软件包(版本非常重要,尽量从控制柜中拷贝), 也可以从以下链接中获取: KUKA机器人PROFINET…

php 任务调度

在日常开发中&#xff0c;我们总会遇到一些在某个指定的时刻去执行&#xff0c;或是每隔xx时间执行&#xff0c;或是需要一直在后台监听的任务执行。基于这个需求&#xff0c;对于php我找了一些办法来实现这些功能 1、依赖于laravel的任务调度。 每隔xx时间执行一次命令&#…

七、虚拟机栈

虚拟机栈出现的背景 1.由于跨平台性的设计&#xff0c;Java的指令都是根据栈来设计的&#xff0c;不同平台CPU架构不同&#xff0c;所以不能设计为基于寄存器的。 2.优点是跨平台&#xff0c;指令集小&#xff0c;编译器容易实现&#xff0c;缺点是性能下降&#xff0c;实现同…

XC7K70T-1FBG676I【FPGA】XC3S200-4FTG256C参数XC7K70T-2FBG676C

Kintex-7 FPGA为快速增长应用和无线通信提供最优性价比和低功耗。Kintex-7 FPGA允许设计人员构建卓越带宽和12位数字可编程模拟&#xff0c;同时满足成本和功耗要求。Kintex-7内置支持8通道PCI Express (Gen1/Gen2)&#xff0c;用于连接主机系统。7系列器件利用Xilinx统一架构保…

如何实现外网跨网远程控制内网计算机?快解析来解决

远程控制&#xff0c;是指管理人员在异地通过计算机网络异地拨号或双方都接入Internet等手段&#xff0c;连通需被控制的计算机&#xff0c;将被控计算机的桌面环境显示到自己的计算机上&#xff0c;通过本地计算机对远方计算机进行配置、软件安装程序、修改等工作。通俗来讲&a…

超图软件许可过期后的处理

超图软件许可过期&#xff0c;博主犯了一个很低级的错误&#xff0c;特此发帖记录一下。 按照官网的介绍&#xff0c;直接申请新的许可&#xff0c;配置本地许可很简单的。 1.打开桌面软件会弹出以下的框。 2.配置本地许可对话框 3.切换到激活更新页面 4.选择激活文件即可 以…

内容团队如何快速出稿

对于内容团队而言&#xff0c;每个内容选题就相当于一个小项目&#xff0c;它们并非简单的线性工作流&#xff0c;相反其复杂程度不亚于一个小型工厂。一个内容选题会涉及内容形式&#xff0c;选题类型等多个变量&#xff0c;这些变量因素组合起来就是十几种不同类型的工作流。…