io.netty学习(五)ChannelPipeline

news2025/1/15 12:50:28

目录

前言

ChannelPipeline 接口

创建 ChannelPipeline

ChannelPipeline 事件传输机制

ChannelPipeline 中的 ChannelHandler

ChannelHandlerContext 接口

总结


前言

我们在前面的文章中也对ChannelPipeline接口做了初步的介绍。

io.netty学习使用汇总

ChannelPipeline 接口

ChannelPipeline接口采用了责任链设计模式,底层采用双向链表的数据结构,将链上的各个处理器串联起来。客户端每一个请求的到来,ChannelPipeline中所有的处理器都有机会处理它。

每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联是永久性的;Channel既不能附加另一个ChannelPipeline,也不能分离其当前的。

创建 ChannelPipeline

ChannelPipeline数据管道是与Channel管道绑定的,一个Channel通道对应一个ChannelPipelineChannelPipeline是在Channel初始化时被创建。

观察下面这个实例:

public void run() throws Exception {
    EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap(); // (2)
        b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class) // (3)
            .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                @Override
                public void initChannel(SocketChannel ch) throws Exception {

                    // 添加ChannelHandler到ChannelPipeline
                    ch.pipeline().addLast(new DiscardServerHandler());
                }
            })
            .option(ChannelOption.SO_BACKLOG, 128)          // (5)
            .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

        // 绑定端口,开始接收进来的连接
        ChannelFuture f = b.bind(port).sync(); // (7)

        System.out.println("DiscardServer已启动,端口:" + port);

        // 等待服务器  socket 关闭 。
        // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
        f.channel().closeFuture().sync();
    } finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}

从上述代码中可以看到,当ServerBootstrap初始化后,直接就可以获取到SocketChannel上的ChannelPipeline,而无需手动实例化,因为 Netty 会为每个Channel连接创建一个ChannelPipeline

Channel的大部分子类都继承了AbstractChannel,在创建实例时也会调用AbstractChannel构造器。在AbstractChannel构造器中会创建ChannelPipeline管道实例,核心代码如下:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    this.id = this.newId();
    this.unsafe = this.newUnsafe();
    this.pipeline = this.newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

从上述代码中可以看出,在创建Channel时,会由Channel创建DefaultChannelPipeline类的实例。DefaultChannelPipelineChannelPipeline的默认实现。

pipelineAbstractChannel的属性,内部维护着一个以AbstractChannelHandlerContext为节点的双向链表,创建的headtail节点分别指向链表头尾,源码如下:

public class DefaultChannelPipeline implements ChannelPipeline {   	

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
        this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);
        this.voidPromise = new VoidChannelPromise(channel, true);
        this.tail = new DefaultChannelPipeline.TailContext(this);
        this.head = new DefaultChannelPipeline.HeadContext(this);
        this.head.next = this.tail;
        this.tail.prev = this.head;
    }


    ...
    
    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, DefaultChannelPipeline.TailContext.class);
            this.setAddComplete();
        }

    ...
    }
    
      final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, DefaultChannelPipeline.HeadContext.class);
            this.unsafe = pipeline.channel().unsafe();
            this.setAddComplete();
        }
          
     	...
          
      }
    
     ...
}

从上述源码可以看到,TailContextHeadContext都继承了AbstractChannelHandlerContext,并实现了ChannelHandler接口。AbstractChannelHandlerContext内部维护着nextprev链表指针和入站、出站节点方向等。其中TailContext实现了ChannelInboundHandlerHeadContext实现了ChannelOutboundHandlerChannelInboundHandler

ChannelPipeline 事件传输机制

通过ChannelPipelineaddFirst()方法来添加ChannelHandler,并为这个ChannelHandler创建一个对应的DefaultChannelHandlerContext实例。

public class DefaultChannelPipeline implements ChannelPipeline {  
    //...
    
	public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        AbstractChannelHandlerContext newCtx;
        synchronized(this) {
            checkMultiplicity(handler);
            name = this.filterName(name, handler);
            newCtx = this.newContext(group, name, handler);
            this.addFirst0(newCtx);
            if (!this.registered) {
                newCtx.setAddPending();
                this.callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                this.callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }

        this.callHandlerAdded0(newCtx);
        return this;
    }
    
    //...
    

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);
    }
    
    //...

}

处理出站事件

当处理出站事件时,channelRead()方法的示例如下:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		System.out.println(ctx.channel().remoteAddress() + " -> Server :" + msg);
		
		// 写消息到管道
		ctx.write(msg);// 写消息
	}
	
	//...
}

上述代码中的write()方法会触发一个出站事件,该方法会调用DefaultChannelPipeline上的write()方法。

public final ChannelFuture write(Object msg) {
    return this.tail.write(msg);
}

从上述源码可以看到,调用的是DefaultChannelPipeline上尾部节点(tail)的write方法。

上述方法最终会调用到DefaultChannelHandlerContextwrite()方法。

private void write(Object msg, boolean flush, ChannelPromise promise) {
    ObjectUtil.checkNotNull(msg, "msg");

    try {
        if (this.isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            return;
        }
    } catch (RuntimeException var8) {
        ReferenceCountUtil.release(msg);
        throw var8;
    }

    AbstractChannelHandlerContext next = this.findContextOutbound(flush ? 98304 : '耀');
    Object m = this.pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractChannelHandlerContext.WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);
        if (!safeExecute(executor, task, promise, m, !flush)) {
            task.cancel();
        }
    }

}

上述的write()方法会查找下一个出站的节点,也就是当前ChannelHandler后的一个出站类型的ChannelHandler,并调用下一个节点的invokeWrite()方法。

void invokeWrite(Object msg, ChannelPromise promise) {
    if (this.invokeHandler()) {
        this.invokeWrite0(msg, promise);
    } else {
        this.write(msg, promise);
    }

}

接着调用invokeWrite0()方法,该方法最终调用ChannelOutboundHandlerwrite方法。

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler)this.handler()).write(this, msg, promise);
        } catch (Throwable var4) {
            notifyOutboundHandlerException(var4, promise);
        }

    }

至此,处理完成了第一个节点的处理,开始执行下一个节点并不断循环。

所以,处理出站事件时,数据传输的方向是从尾部节点tail到头部节点head

处理入站事件

入站事件处理的起点是触发ChannelPipeline fire方法,例如fireChannelActive()方法的示例如下:

public class DefaultChannelPipeline implements ChannelPipeline {   	  
    //...
    
    public final ChannelPipeline fireChannelActive() {
            AbstractChannelHandlerContext.invokeChannelActive(this.head);
            return this;
        }
    //...
}

从上述源码可以看到,处理的节点是头部节点headAbstractChannelHandlerContext.invokeChannelActive方法定义如下:

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelActive();
            }
        });
    }

}

该方法最终调用ChannelInboundHandlerchannelActive方法。

private void invokeChannelActive() {
    if (this.invokeHandler()) {
        try {
            ((ChannelInboundHandler)this.handler()).channelActive(this);
        } catch (Throwable var2) {
            this.invokeExceptionCaught(var2);
        }
    } else {
        this.fireChannelActive();
    }

}

至此完成了第一个节点的处理,开始执行下一个节点的不断循环。

所以,处理入站事件时,数据传输的方向是从头部节点head到尾部节点tail

ChannelPipeline 中的 ChannelHandler

从上述的ChannelPipeline 接口源码可以看出,ChannelPipeline 是通过addXxx或者removeXxx方法来将ChannelHandler动态的添加到ChannelPipeline中,或者从ChannelPipeline移除ChannelHandler的。那么ChannelPipeline是如何保障并发访问时的安全呢?

addLast方法为例,DefaultChannelPipeline的源码如下:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    AbstractChannelHandlerContext newCtx;
    //synchronized 保障线程安全
    synchronized(this) {
        checkMultiplicity(handler);
        newCtx = this.newContext(group, this.filterName(name, handler), handler);
        this.addLast0(newCtx);
        if (!this.registered) {
            newCtx.setAddPending();
            this.callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            this.callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }

    this.callHandlerAdded0(newCtx);
    return this;
}

从上述源码可以看到,使用synchronized关键字保障了线程的安全访问。其他方法的实现方式也是类似。

ChannelHandlerContext 接口

ChannelHandlerContext 接口是联系ChannelHandlerChannelPipeline 之间的纽带。

每当有ChannelHandler添加到ChannelPipeline 中时,都会创建ChannelHandlerContext ChannelHandlerContext 的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline 中的其他ChannelHandler之间的交互。

例如,ChannelHandlerContext 可以通知ChannelPipeline 中的下一个ChannelHandler开始执行及动态修改其所属的ChannelPipeline 

ChannelHandlerContext 中包含了许多方法,其中一些方法也出现在ChannelChannelPipeline 中。如果通过ChannelChannelPipeline 的实例来调用这些方法,它们就会在整个ChannelPipeline 中传播。相比之下,一样的方法在ChannelHandlerContext 的实例上调用,就只会从当前的ChannelHandler开始并传播到相关管道中的下一个有处理事件能力的ChannelHandler中。因此,ChannelHandlerContext 所包含的事件流比其他类中同样的方法都要短,利用这一点可以尽可能提高性能。

ChannelHandlerContext 与其他组件的关系

下图展示了ChannelPipeline ChannelChannelHandlerChannelHandlerContext 之间的关系做了如下说明:

(1)Channel被绑定到ChannelPipeline 上。

(2)和Channel绑定的ChannelPipeline 包含了所有的ChannelHandler

(3)ChannelHandler

(4)当添加ChannelHandlerChannelPipeline 时,ChannelHandlerContext 被创建。

跳过某些 ChannelHandler

下面的代码,展示了从ChannelHandlerContext 获取到Channel的引用,并通过调用Channel上的write()方法来触发一个写事件到流中。

ChannelHandlerContext ctx = context;
Channel channel = ctx.channel(); //获取ChannelHandlerContext上的Channel
channel.write(msg);

以下代码展示了从ChannelHandlerContext 获取到ChannelPipeline 

ChannelHandlerContext ctx = context;
ChannelPipeline pipeline = ctx.pipeline(); //获取ChannelHandlerContext上的ChannelPipeline 
pipeline.write(msg);

上述的两个示例,事件流是一样的。虽然被调用的ChannelChannelPipeline 上的write()方法将一直传播事件通过整个ChannelPipeline ,但是在ChannelHandler的级别上,事件从一个ChannelHandler到下一个ChannelHandler的移动是由ChannelHandlerContext 上的调用完成的。

下图展示了Channel或者ChannelPipeline 进行的事件传播机制。

在上图中可以看出:

(1)事件传递给ChannelPipeline 的第一个ChannelHandler

(2)ChannelHandler通过关联的ChannelHandlerContext 传递事件给ChannelPipeline 中的下一个ChannelHandler

(3)ChannelHandler通过关联的ChannelHandlerContext 传递事件给ChannelPipeline 中的下一个ChannelHandler

从上面的流程可以看出,如果通过ChannelChannelPipeline 的实例来调用这些方法,它们肯定会在整个ChannelPipeline 中传播。

那么是否可以跳过某些处理器呢?答案是肯定的。

通过减少ChannelHandler不感兴趣的事件的传递减少开销,并排除掉特定的对此事件感兴趣的处理器的处理以提升性能。想要实现从一个特定的ChannelHandler开始处理,必须引用与此ChannelHandler的前一个ChannelHandler关联的ChannelHandlerContext 。这个ChannelHandlerContext 将会调用与自身关联的ChannelHandler的下一个ChannelHandler,代码如下:

ChannelHandlerContext ctx = context;
ctx.write(msg);

直接调用ChannelHandlerContext write()方法,将会把缓冲区发送到下一个ChannelHandler

如下图,消息会将从下一个ChannelHandler开始流过ChannelPipeline ,绕过所有在它之前的ChannelHandler

(1)执行ChannelHandlerContext 方法调用。

(2)事件发送到了下一个ChannelHandler

(3)经过最后一个ChannelHandler后,事件从ChannelPipeline 中移除。

当调用某个特定的ChannelHandler操作时,它尤为有用。

例如:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		System.out.println(ctx.channel().remoteAddress() + " -> Server :" + msg);

        // 写消息到管道
		ctx.write(msg);// 写消息
		ctx.flush(); // 冲刷消息
		
		// 上面两个方法等同于 ctx.writeAndFlush(msg);
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

		// 当出现异常就关闭连接
		cause.printStackTrace();
		ctx.close();
	}
}

总结

以上就是关于ChannelPipeline 的源码分析,相信认真看完了,你就明白ChannelPipeline ChannelChannelHandlerChannelHandlerContext 之间的关系。下节我们继续来剖析 Netty 的源码。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/664935.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

chatgpt赋能python:Python把图片转换为图片代码

Python把图片转换为图片代码 在现代计算机应用和互联网中&#xff0c;图像已经成为不可或缺的一部分。然而&#xff0c;我们有时需要将图像转换为代码&#xff0c;以便在我们的应用程序中使用它或通过互联网共享它。Python作为一种流行的编程语言&#xff0c;提供了许多很好的…

如何判断商城源码是否靠谱?

伴随着电子商务的快速发展&#xff0c;商城系统成为了企业发展的重要工具。选择适合自己企业的商城系统源码是一个关键问题&#xff0c;因为它关系到企业未来的发展。那么如何判断商城系统源码是否靠谱呢&#xff1f; 一、核心技术 商城系统的核心技术是网站建设开发&#xff…

JWT入门指南

1、Token认证 随着 Restful API、微服务的兴起&#xff0c;基于 Token 的认证现在已经越来越普遍。基于token的用户认证是一种服务端无状态的认证方式&#xff0c;所谓服务端无状态指的token本身包含登录用户所有的相关数据&#xff0c;而客户端在认证后的每次请求都会携带toke…

利用SQL注入漏洞登录后台

所谓SQL注入&#xff0c;就是通过把SQL命令插入到Web表单递交或输入域名或页面请求的查询字符串&#xff0c;最终达到欺骗服务器执行恶意的SQL命令&#xff0c;比如先前的很多影视网站泄露VIP会员密码大多就是通过WEB表单递交查询字符暴出的&#xff0c;这类表单特别容易受到SQ…

A7+linux4.14内核SPI 总线通讯异常问题分析

I.问题现象、 2023年1月18日&#xff0c;A7核心板 升级内核版本时&#xff0c;发现SPI总线无法跟wk2168通讯&#xff0c;打印信息如下&#xff1a;nts_io_init in gpmi-nand 1806000.gpmi-nand: mode:4 ,failed in set feature. [bus:0~select:0]wk2xxx_probe() GENA 0xFF reg…

【动态规划】简单多状态dp问题(2)买卖股票问题

买卖股票问题 文章目录 【动态规划】简单多状态dp问题&#xff08;2&#xff09;买卖股票问题1. 最佳买卖股票时机含冷冻期&#xff08;买卖股票Ⅰ&#xff09;1.1 题目解析1.2 算法原理1.2.1 状态表示1.2.2 状态机1.2.3 状态转移方程1.2.4 初始化1.2.5 填表顺序1.2.6 返回值 1…

26.利用概率神经网络分类 预测基于PNN的变压器故障诊断(附matlab程序)

1.简述 学习目标&#xff1a; 概率神经网络分类预测 基于PNN的变压器故障诊断 概率神经网络是由Specht博士在1989年首先提出&#xff0c; 是一种与统计信号处理的许多概念有着紧密联系的并行算法。它实质上是一个分类器&#xff0c;根据概率密度函数的无参估计进行贝叶斯决策…

VanillaNet实战:使用VanillaNet实现图像分类(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整算法设置混合精度&#xff0c;DP多卡&#xff0c;EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试热力图可视化展示…

【掌握Spring事务管理】深入理解事务传播机制的秘密

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点! 欢迎志同道合的朋友一起加油喔&#x1f93a;&#x1f93a;&#x1f93a; 目录 1.Spring 中事务的实现方式 1.1 Spring 编程式…

第八十天学习记录:计算机硬件技术基础:80486微处理器的指令系统

80486微处理器的寻址方式 要使微处理器能够完成指令规定的操作&#xff0c;则指令中须包含2种信息&#xff0c;一是执行什么操作&#xff1b;二是该操作所涉及的数据在哪里&#xff1b;三是结果存于何处&#xff0c;故指令通常操作由操作码字段和操作数字组成&#xff0c;其书…

chatgpt赋能python:Python的抹零功能介绍及使用方法

Python的抹零功能介绍及使用方法 Python是一种广泛使用的编程语言&#xff0c;而其抹零功能是在进行浮点数操作时非常有用的。在本文中&#xff0c;我们将介绍python中抹零的概念、使用方法以及注意事项&#xff0c;以帮助大家更好地使用python中的抹零功能。 什么是抹零&…

【MarkDown】CSDN Markdown之时间轴图timeline详解

文章目录 时间轴图一个关于时间轴图的例子语法分组长时间段或事件文本换行时间段和事件文本样式自定义颜色方案主题基础主题森林主题黑色主题默认主题中性主题 与库或网站集成 时间轴图 时间轴图&#xff1a;现在这是一个实验性的图表。语法和属性可能会在未来版本中更改。除了…

渣土车未苫盖识别系统 yolov8

渣土车未苫盖识别系统通过yolov8python&#xff0c;渣土车未苫盖识别系统对经过的渣土车进行实时监测&#xff0c;当检测到有渣土车未能及时苫盖时&#xff0c;将自动发出警报提示现场管理人员及时采取措施。Yolo模型采用预定义预测区域的方法来完成目标检测&#xff0c;具体而…

chatgpt赋能python:Python抽人代码:如何优化你的抽奖过程?

Python抽人代码&#xff1a;如何优化你的抽奖过程&#xff1f; 简介 抽奖是在网站上进行的一项非常常见的活动。随着技术的发展&#xff0c;抽奖活动的方式也越来越多样化。在这些活动中&#xff0c;人们喜欢使用抽人软件或代码来提高效率并确保随机性。这在Python中是相当简…

chatgpt赋能python:Python查找第二大的数——从入门到实战

Python查找第二大的数——从入门到实战 Python是一门非常强大的编程语言&#xff0c;不仅支持基本的编程技巧&#xff0c;也支持各种复杂的算法和数据结构。本篇文章将介绍如何通过Python编写一个程序&#xff0c;来实现查找数组中第二大的数。 环境准备 想要运行这个程序&a…

SQL 函数:concat函数、concat_ws()函数、group_concat()

SQL 函数&#xff1a;concat函数、concat_ws()函数、group_concat()函数(转载) concat()函数 功能&#xff1a;将多个字符串连接成一个字符串。 语法&#xff1a;concat(str1, str2,…) 返回结果为连接参数产生的字符串&#xff0c;如果有任何一个参数为null&#xff0c;则返…

java - 报错解决集合

ssm-java学习笔记 java.lang.NoSuchMethodException: org.cjh.bean.Dept.<init>()Invalid bound statement (not found)错误解决方法动态sql if java.lang.IndexOutOfBoundsException: Index: 5, Size: 5Failed to determine a suitable driver classjava.sql.SQLExcepti…

第十二章 sys模块

1. sys模块介绍 什么是Python 解释器 当编写Python 代码时&#xff0c;通常都会得到一个包含Python 代码的以.py 为扩展名的文件。要运行编写的代码&#xff0c;就需要使用Python 解释器去执行.py 文件。因此&#xff0c;Python 解释器就是用来执行Python 代码的一种工具。常…

Windows下Nacos的配置与使用

一、什么是 Nacos 以下引用来自 nacos.io Nacos /nɑ:kəʊs/ 是 Dynamic Naming and Configuration Service的首字母简称&#xff0c;一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。 Nacos 致力于帮助您发现、配置和管理微服务。Nacos 提供了一组简单易用…

19使用MATLAB中的BP神经网络来做字母识别

1.简述 学习目标&#xff1a; 学习BP神经网络字母识别 字符识别应用非常广泛&#xff0c;比如车辆牌照自动识别和手写识别等。我们采用BP网络对26个英文字母进行识别&#xff0c;首先将26个字母中每一个字母都通过75的方格进行数字化处理&#xff0c;并用一个向量表示&#x…