netty之pipeline

news2025/1/23 17:25:46

Netty抽象出流水线(pipeline)这一层数据结构进行处理或拦截channel相关事件。

事件分为入站事件(inBound event)和出站事件(outBound event)的ChannelHandlers列表。ChannelPipeline使用先进的Intercepting Filter模式,使用户可以完全控制如何处理事件以及管道中的ChannelHandlers如何相互交互。类似与servlet的filter,亦或是Linux的管道命令,使用责任链模式。将不同的handlers组装成一个链表,进行依次调用。开发人员可以很方便的往链表添加或删除handler进行自己的业务逻辑操作。
在这里插入图片描述

channel事件分为入站和出栈事件两种类型,同样的handler分为ChannelInboundHandler和ChannelOutboundHandler两种类型处理对应事件。入站事件对应read事件,出栈事件对应write事件,读和写公用同一个pipeline,两个事件类型互不干涉。read事件从head开始往后找相关的inBound handlers依次进行处理,write事件从tail往前找相关的outBound handlers依次进行处理。

pipeline的创建

当channel创建时,就会创建一个pipeline与之进行绑定。在channel整个生命周期都会使用该pipeline进行事件处理,默认使用DefaultChannelPipeline类进行创建pipeline。

来看下DefaultChannelPipeline的构造方法:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

创建时候会初始化head和tail。我们在往pipeline添加的时候都是添加的handler,这里会包装成AbstractChannelHandlerContext类型添加到链表里。

添加handler

添加handler可以通过pipeline.addLast方法进行添加

 ChannelPipeline p = ...;
 p.addLast("1", new InboundHandlerA());
 p.addLast("2", new InboundHandlerB());
 p.addLast("3", new OutboundHandlerA());
 p.addLast("4", new OutboundHandlerB());
 p.addLast("5", new InboundOutboundHandlerX());

addLast的源码如下:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
    //判断handler能否被添加多次
        checkMultiplicity(handler);
        //创建一个DefaultChannelHandlerContext实例,将handler包装起来
        newCtx = newContext(group, filterName(name, handler), handler);
        //添加到链表
        addLast0(newCtx);

        //判断channel是否绑定了eventLoop,没绑定调用callHandlerCallbackLater后面文章会讲到这里
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
      
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    //回调当前handler的handlerAdded方法
    callHandlerAdded0(newCtx);
    return this;
}
//这里就是将handler添加到tail的前面
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

从上面源码可以看出,添加的handler首先会被包装成一个HandlerContext,然后在将其放到链表tail的前面。添加handler其它的方法还有addFirst,addBefore,addAfter原理类似。可以根据业务逻辑进行handler顺序编排。

常见事件

入站读事件:ChannelInboundHandler

回调方法事件说明
channelRegisteredchannel绑定EventLoop
channelUnregisteredchannel取消EventLoop绑定
channelActivechannel启动准备完成
channelInactivechannel处于非活动状态,准备关闭
channelRead从对端读取到数据
channelReadComplete处理完所有读取到的数据

出站写事件:ChannelOutboundHandler

回调方法事件说明
bindbind操作完成前回调,serverchannel事件
connectconnect操作完成前回调,clientchannel事件
disconnectdisconnect操作完成前回调,client端事件
closeclose操作前回调,server端事件
readread操作前回调
writewrite操作前回调
flushflush操作前回调

数据读写源码

io.netty.channel.Channel.write方法会调用pipeline.write,其源码如下

public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

这里看到写数据从tail开始调用。所以这里如果有多个handler的话,要注意我们加入handler的顺序。

手动添加的handler都被包装成DefaultChannelHandlerContext,该类只重写了handler()方法获取当前handler,其它方法实现都在其父类AbstractChannelHandlerContext中。方法间传递的ctx变量就是该类。递归调用handler也是主要在该类中方法实现:

ctx.write:

private void write(Object msg, boolean flush, ChannelPromise promise) {
    //找出下一个Outbound类型MASK_WRITE类型的handler
    final AbstractChannelHandlerContext next = findContextOutbound(flush ?
            (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
       //是否flush,调用下一个handler的write
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }
}

这里以读方法为例看调用过程

读数据从pipeline.fireChannelRead方法开始

public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

这里看到ctx从head开始往后找。然后调用ctx.invokeChannelRead

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

invokeChannelRead方法其实是调起handler.channelRead方法。

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

下一个handler一般在处理完后,会调用ctx.fireChannelRead完成下一个handler的调用。这样完成链式调用

public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}

参考:
https://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html
https://www.alibabacloud.com/blog/essential-technologies-for-java-developers-io-and-netty_597367

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1020955.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

全流程GMS地下水数值模拟及溶质(包含反应性溶质)运移模拟技术教程

详情点击公众号链接:全流程GMS地下水数值模拟及溶质(包含反应性溶质)运移模拟技术教程 前言 GMS三维地质结构建模 GMS地下水流数值模拟 GMS溶质运移数值模拟与反应性溶质运移模 详情 1.GMS的建模数据的收集、数据预处理以及格式等&#xff…

2023/9/18 -- C++/QT

作业 完善登录框 点击登录按钮后,判断账号(admin)和密码(123456)是否一致,如果匹配失败,则弹出错误对话框,文本内容“账号密码不匹配,是否重新登录”,给定两…

《Linux运维总结:Centos7.6之OpenSSH7.4升级版本至9.4》

一、环境信息 操作系统:Centos7.6.1810 OpenSSH_7.4p1, OpenSSL 1.0.2k-fips 如下图所示: 注意:升级后由于加密算法的区别,低版本的SSH工具可能无法连接,建议改用Xshell7或SecureCRT9.0以上版本。 二、注意事项 1、检…

第九章 关系查询处理和查询优化

第九章 关系查询处理和查询优化 9.1 关系数据库系统的查询处理 9.1.1 查询处理步骤 查询分析 对查询语句进行扫描、词法分析和语法分析, 词法分析:从查询语句中识别出正确的语言符号。语法分析:进行语法检查。 查询检查 合法性检查视图转换…

【java】【SpringBoot】【二】运维实用篇 SpringBoot工程

目录 一、打包与运行 1、程序打包与运行(Window版) 1.1 打包 1.2 运行 1.3 打包插件 1.4 总结 1.6 命令行启动常见问题及解决方案 2、程序运行(Linux版) 二、配置高级 1、临时属性设置 2、配置文件分类 3、自定义配置…

Spring Boot + Vue3前后端分离实战wiki知识库系统十三--单点登录开发二

接着https://www.cnblogs.com/webor2006/p/17608839.html继续往下。 登录功能开发: 接下来则来开发用户的登录功能,先准备后端的接口。 后端增加登录接口: 1、UserLoginReq: 先来准备用户登录的请求实体: package com…

07JVM_内存模型和CAS与原子类

一、内存模型 1.java内存模型 Java内存结构是JMM(Java Memory Model)的意思。JMM定义了一套在多线程读写共享数据(成员变量,数组)时,对数据的原子性,见性,有序性的规则和保障。 1…

浅谈电力电容器技术的发展及选型

安科瑞 华楠 摘要:介绍了我国电力电容器产品制造技术的发展现状。在与国外电力电容器产品先进水平对比的基础上,讨论了我国电力电容器产品的差距和某些对策,并对我国电力电容器技术发展趋势提出了一些看法。 关键词:电力电容器;制造技术;技术发展 0 引…

每日一题~最大二叉树

题目链接:654. 最大二叉树 - 力扣(LeetCode) 题目描述: 给定一个不重复的整数数组 nums 。 最大二叉树 可以用下面的算法从 nums 递归地构建: 创建一个根节点,其值为 nums 中的最大值。递归地在最大值 左边 的 子数…

Logstash介绍

Logstash介绍 Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。 集中、转换和存储你的数据 Logstash是一个开源的服务器端数据处理管道,可以同时从多…

采集分析仪设计原理图:437-带触摸显示的10路5Msps@18bit采集分析仪

带触摸显示的10路5Msps18bit采集分析仪 一、产品概述 本产品提供了多种传感器接入接口,支持多种类型传感器实时采集、处理、显示等功能。主处理器采用XC7Z100-FFG900芯片,具有444K逻辑单元和双核ARM Cortex-A9 MPCore处理器。PL部分得可编程逻辑可…

docker系列-报错以及解决指南

1. windows运行docker报错Windows Hypervisor is not presentDocker Desktop is unable to detect a Hypervisor.Hardware assisted virtualization and data execution protection must be enabled in the BIOS. Docker Desktop - Windows Hypervisor is not presentDocker D…

async和await的用法

定义 async的定义 在mdn中,async的定义为: async function 关键字可用于定义表达式中的异步函数。 其实很简单,就是async关键字后面定义的函数会被转化为一个异步的函数 如下所示: function fn1(){return 同步}async function asyncFn(){return 异步}console.log(fn1())con…

Dubbo3应用开发——架构的演变过程

Dubbo3应用开发——架构的演变过程 什么是Dubbo 早期Dubbo的定位; 基于Java的高性能,轻量级的RPC框架;SOA【Service-Oriented Architecture ⾯向服务的架构】 RPC服务治理; 2018年阿⾥巴巴把这个框架捐献给了 Apache 基⾦会&am…

深入理解Java单例模式和优化多线程任务处理

目录 饿汉模式懒汉模式单线程版多线程版双重检查锁定 阻塞队列 单例模式能保证某个类在程序中只存在唯一一份实例, 而不会创建出多个实例,并提供一个全局访问点。 饿汉模式 类加载的同时,创建实例。 class Singleton {private static final Singlet…

2023:生成式AI与存储最新发展和趋势分析(上)

生成式AI的热潮在短时间内席卷全球,以一种势不可挡的趋势迅速出圈,在某一时间段,似乎出现了“除了IT行业,人人都是AI专家”的盛况。这一轮如火如荼的全民AI热潮迸发至今,业已过半载,待最初的烟花绚烂散去&a…

【基础篇】六、基于SpringBoot来整合SSM的案例(下)

文章目录 1、前后端调用:axios发送异步请求2、添加功能3、删除功能4、修改功能5、异常消息处理6、分页功能7、分页Bug处理8、条件查询 接下来加入前端页面,使用axios发送异步请求调用上篇的接口。调前端代码时,发现还挺有趣,刷新、…

Learn Prompt-ChatGPT 精选案例:广告文案

ChatGPT 可以帮助我们生成广告文案和宣传图片,这对营销品牌建设很有帮助。通常,一个产品会有一个主要的广告词,传达设计理念或宣传产品的好处。我们可以尝试直接生成文案,看看 ChatGPT 有没有好的创意。假设我们的产品是一款登山鞋…

【css | linear-gradient】linear-gradient()的用法

linear-gradient() CSS函数创建一个由两种或多种颜色沿一条直线进行线性过渡的图像,其结果是<gradient>数据类型的对象,此对象是一种特殊的<image> 数据类型。 先看一个线上的示例 https://code.juejin.cn/pen/7277486410842996771 语法 /* 渐变轴为 45 度&…

Docker容器详解

值得看的原文地址