Netty学习——源码篇7 Pipeline的事件传播机制1 备份

news2025/1/12 5:02:19

        上篇:Netty学习——源码篇6 Pipeline设计原理 已经知道AbstractChannelHandlerContext中有Inbound和Outbound两个boolean变量,分别用于识别Context所对应的Handler的类型。

        1、Inbound为true时,表示其对应的ChannelHandler是ChannelInboundHandler的子类。

        2、Outbound为true时,表示其对应的ChannelHandler是ChannelOutboundHandler的子类。

        这两个属性到底有什么作用呢?还要从ChannelPipeline的事件传播类型说起。Netty中的传播事件可以分为两种:Inbound事件和Outbound事件。以下是Netty官网针对这两个事件的说明。

         由上可以看出,Inbound和Outbound事件的流向是不一样的,Inbound事件的流向是从下至上的,而Outbound恰好相反,是从下到上。并且Inbound方法是通过调用相应的ChannelHandlerContext.fireIN_EVT()方法来传递的,而Outbound方法是通过ChannelHandlerContext的fireChannelRegister()调用会发送一个ChannelRegistered的Inbound给下一个ChannelHandlerContext,而ChannelHandlerContext的bind()方法调用时会发送一个bind的Outbound事件给下一个ChannelHandlerContext。

        Inbound事件传播方法代码如下:

public interface ChannelInboundHandler extends ChannelHandler {

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * Invoked when the current {@link Channel} has read a message from the peer.
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
     * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

        Outbound事件传播方法的代码如下:

public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * Called once a bind operation is made.
     *
     * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
     * @param localAddress  the {@link SocketAddress} to which it should bound
     * @param promise       the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception    thrown if an error accour
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a connect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
     * @param remoteAddress     the {@link SocketAddress} to which it should connect
     * @param localAddress      the {@link SocketAddress} which is used as source on connect
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a disconnect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a close operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a deregister operation is made from the current registered {@link EventLoop}.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Intercepts {@link ChannelHandlerContext#read()}.
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
    * Called once a write operation is made. The write operation will write the messages through the
     * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
     * {@link Channel#flush()} is called
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
     * @param msg               the message to write
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error accour
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
     * that are pending.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
     * @throws Exception        thrown if an error accour
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

        可以发现,Inbound类似于事件回调(响应请求的事件),而Outbound类似于主动触发(发起请求的事件)。注意,如果捕获了一个事件,并且想让这个事件继续传递下去,需要调用Context对应的fireXXX()方法。

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelActive(ChannelHandlerContext ctx)  throws Exception{
        System.out.println("连接成功");
        ctx.fireChannelActive();
    }
    
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception{
        System.out.println("客户端关闭");
        ctx.close(promise);
    }
}

        如上面的代码所示,MyInboundHandler收到了一个channelActive事件,它在处理后,如果希望将事件继续传播下去,那么需要接着调用ctx.fireChannelActive()方法。

        下面用一个代码案例了解一下Pipeline的传播机制。分别编写InboundHandlerA、InboundHandlerB、InboundHandlerC和OutboundandlerA、OutboundandlerB、OutboundandlerC类。

public class InboundHandlerA extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        System.out.println("InboundHandlerA");
        ctx.fireChannelRead(msg);
    }
}
public class InboundHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        System.out.println("InboundHandlerB");
        ctx.fireChannelRead(msg);
    }
}
public class InboundHandlerC extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
        System.out.println("InboundHandlerC");
        ctx.fireChannelRead(msg);
    }
}

        以上三个类都调用了ctx.fireChannelRead()方法向下传播。

public class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
    
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
        System.out.println("OutboundHandlerA write");
        ctx.write(msg,promise);
    }
}

public class OutboundHandlerB extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
        System.out.println("OutboundHandlerB write");
        ctx.write(msg,promise);
    }

    @Override
    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception{
        ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                ctx.channel().write("hello");
            }
        },3, TimeUnit.SECONDS);
    }
}

public class OutboundHandlerC extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception{
        System.out.println("OutboundHandlerC write");
        ctx.write(msg,promise);
    }
}

        上面的三个类都调用了ctx.write()方法。下面编写测试代码,来了解其传播顺序。先编写服务端代码。PipelineServer类主要完成Pipeline的注册工作,代码如下:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class PipelineServer {
    public void start(int port) throws Exception{
        NioEventLoopGroup bossGroup= new NioEventLoopGroup();
        NioEventLoopGroup workerGroup= new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception{
                            //InboundHandler的执行顺序,应该是 A B C
                            ch.pipeline().addLast(new InboundHandlerA());
                            ch.pipeline().addLast(new InboundHandlerB());
                            ch.pipeline().addLast(new InboundHandlerC());
                            
                            //Outbound的执行顺序应该是C B A
                            ch.pipeline().addLast(new OutboundHandlerA());
                            ch.pipeline().addLast(new OutboundHandlerB());
                            ch.pipeline().addLast(new OutboundHandlerC());
                        }
                    }).option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
        
    }

    public static void main(String[] args) throws Exception{
        PipelineServer server = new PipelineServer();
        server.start(8080);
    }
}

        PipelineClient类,与服务端建立连接并向服务端发送数据,代码如下:

public class PipelineClient {
    public void connect(String host,int port) throws Exception{
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE,true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception{
                    ch.pipeline().addLast(new ClientInhandler());
                }
            });
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        }finally {
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception{
        PipelineClient client = new PipelineClient();
        client.connect("127.0.0.1",8080);
    }
}

        ClientHandler类,完成向服务端发送数据的动作,代码如下:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientInhandler extends ChannelInboundHandlerAdapter {

    //读取服务端的消息
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        System.out.println("clientInHandler.channelRead");
        ByteBuf result = (ByteBuf) msg;
        byte[] result1 = new byte[result.readableBytes()];
        result.readBytes(result1);
        result.release();
        ctx.close();
        System.out.println("server infomation:" + new String(result1));
    }

    //当连接建立的时候向服务端发送消息,channelActive 事件在连接建立的时候会被触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception{
        System.out.println("ClientHandler.channelActive");
        String msg = "are you ok";
        ByteBuf encoded = ctx.alloc().buffer(4 * msg.length());
        encoded.writeBytes(msg.getBytes());
        ctx.write(encoded);
        ctx.flush();
    }
}

        接下来运行测试代码,分别启动PipelineServer和PipelineClient,得到的运行结果如下图:

        从运行结果上看,Handler的传播顺序:从Inbound开始顺序执行,然后从Outbound逆序执行。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1548385.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【深入日志打印】log.error(“你好{}“, “世界“, e);只有一个占位符是否会打印后面多出的参数呢?(详细跟进源码讲解调试分析)

文章目录 【深入日志打印】log.error(“你好{}“, “世界“, e)&#xff1b;只有一个占位符是否会打印后面多出的参数呢&#xff1f;&#xff08;详细跟进源码讲解调试分析&#xff09;测试代码执行结果调试分析其他样例探讨 【深入日志打印】log.error(“你好{}“, “世界“, …

【电能管理】电力物联网仪表/多功能电表/无线计量/多回路计量/分项计量/终端感知设备/全电量参数测量/正反向有功无功测量

什么是物联网电表&#xff01;&#xff01;&#xff01; 安科瑞薛瑶瑶18701709087 物联网电表是智能电表的一种&#xff0c;可以用无线通信方式来操控&#xff0c;除了拥有电度表的有点以外&#xff0c;还可以把硬件和软件联合起来发挥更大的作用。 物联网电表主要用于计量低…

UOS、Linux下的redis的详细部署流程(适用于内网)

提示&#xff1a;适用于Linux以及UOS等内外网系统服务器部署。 文章目录 一.上传离线包二.部署基本环境三.解压并安装redis四.后台运行redis五.uos系统可能遇到的问题六.总结 一.上传离线包 1.自己去Redis官网下载适配自己部署系统的redis安装包。 2.通过文件传输工具&#xf…

微信平台会员卡应用源码系统 带完整的安装代码包以及搭建教程

在移动互联网时代&#xff0c;消费者对于便捷、个性化的服务需求日益增长。微信会员卡作为一种创新的营销方式&#xff0c;不仅能为消费者提供便捷的会员服务&#xff0c;还能帮助商家更好地管理会员信息&#xff0c;提升营销效果。然而&#xff0c;许多商家由于缺乏技术支持&a…

钡铼技术R40工业4G路由器为户外广告牌智能控制系统提供无线网络

钡铼技术R40工业4G路由器在户外广告牌智能控制系统中的应用&#xff0c;为广告行业带来了革命性的变革。作为一种先进的无线通信设备&#xff0c;R40工业4G路由器通过其稳定的信号传输和强大的网络连接能力&#xff0c;为户外广告牌的智能控制系统提供了可靠的无线网络支持&…

蓝桥杯day14刷题日记

P8707 [蓝桥杯 2020 省 AB1] 走方格 思路&#xff1a;很典型的动态规划问题&#xff0c;对于偶数格特判&#xff0c;其他的正常遍历一遍&#xff0c;现在所处的格子的方案数等于左边的格子的方案数加上上面格子的方案数之和 #include <iostream> using namespace std; …

北京朝阳办理广播电视节目制作经营许可证材料和要求

北京经典世纪集团有限公司-资 质代办 尊敬的客户&#xff0c;您对于办理广播电视节目制作经营许可证的需求我们深感关切。作为专 业的资 质代办机构&#xff0c;我们的目标是为您提供一站式服务&#xff0c;帮助您高效顺利地完成所有办理程序。&#xff08;游览器搜经典世纪胡云…

【竞技宝】国足4比1大胜新加坡,武磊独造三球记首功

国足在本轮世预赛主场跟新加坡狭路相逢,这场比赛对于主帅伊万科维奇来说不容有失。因为,国足之前未能在客场击败新加坡,让球队出线前景变得非常严峻。如果,国足还想从36强赛杀出重围,就必须主场战胜新加坡。如果,国足主场都赢不了新加坡,伊万科维奇将面临下课危机。重压之下的伊…

IPv6-基础概念

IPv6基础概念 IPv6技术特点&#xff1a;精简报文格式、实现自动配置和重新编制、支持层次化网络编制、支持端对端安全、更好的支持Qos、支持移动特性。 五元组&#xff1a;源地址&#xff0c;目的地址&#xff0c;源端口&#xff0c;目的端口&#xff0c;协议。 IPv6报头优势…

Abaqus周期性边界代表体单元Random Sphere RVE 3D (Mesh)插件

插件介绍 Random Sphere RVE 3D (Mesh) - AbyssFish 插件可在Abaqus生成三维具备周期性边界条件(Periodic Boundary Conditions, PBC)的随机球体骨料及骨料-水泥界面过渡区(Interfacial Transition Zone, ITZ)模型。即采用周期性代表性体积单元法(Periodic Representative Vol…

【Linux实践室】Linux用户管理实战指南:用户密码管理操作详解

&#x1f308;个人主页&#xff1a;聆风吟_ &#x1f525;系列专栏&#xff1a;Linux实践室、网络奇遇记 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 一. ⛳️任务描述二. ⛳️相关知识2.1 &#x1f514;用户密码存放地及方式2.2 &#x1f514;使用…

批量删除 rabbitmq中随机队列

批量删除 amq.gen–* 随机队列 操作错误产生了无效随机队列&#xff0c;需要批量删除 过滤列出指定amq.gen–队列 # 列出 指定 vhost/qq 以amq.gen开头的所有队列 rabbitmqctl list_queues --vhost / | grep ^amq.gen-# 批量删除队列 #由于list_queues会列出队列名称以及对应…

python实现图片压缩

首先 pip install Pillow compression_level参数&#xff0c;该参数的范围从0到100&#xff0c;其中0表示最小尺寸&#xff08;最高压缩&#xff09;&#xff0c;100表示最大质量&#xff08;最小压缩&#xff09;。这个脚本将尝试在保持图片可识别性的同时&#xff0c;尽可能…

解锁TikTok直播专线,提高使用体验

TikTok&#xff0c;作为当今全球最受欢迎的社交媒体平台之一&#xff0c;给商家带来了无限的商机与市场。然而&#xff0c;商家在TikTok的网络体验也面临诸多挑战&#xff0c;例如网络卡顿、直播断线以及账号易被封锁等问题。为解决这些难题&#xff0c;我们推出了TikTok直播专…

系统安装(kuntaiR522 kvm安装)

(1)通过PC1 web连接Server2,给Server2安装rocky-arm64 CLI系统(语言为英文)。 首先是访问server2的IPMI口,访问192.168.2.10, 用户为Admin,密码为Admin@123 登录进去 以HTML5 集成控制台方式打开 插入U盘修改启动项安装系统

逆向案例9--小鹅通视频m3u8内容解密--含简单webpack

视频网址&#xff1a;https://app4nseessp8638.h5.xiaoeknow.com/v2/course/alive/l_65b9e8dfe4b064a83b90e102?type2&app_idapp4nseessp8638&channel_id&res_type4&pro_id&res_idl_65b9e8dfe4b064a83b90e102 按照惯例&#xff0c;刷新网站&#xff0c;搜…

MFC(二)集成基础控件

目录 OnCreateCStatic【标签&#xff0c;图片】CEdit【文本框&#xff0c;密码框&#xff0c;数值框&#xff0c;文本区】CButton【按钮&#xff0c;单选按钮&#xff0c;多选按钮】CComboBox【下拉列表&#xff0c;列表】CSliderCtrl【滑动条】CListCtrl【表格】CAnimateCtrl【…

LangChain核心概念与组件

Chains Chains可以让你按照一定的顺序和逻辑来执行不同的任务。Chains有以下四种类型&#xff1a; 类型作用LLMChain用于在语言模型周围添加一些功能的简单Chain&#xff0c;它由一个PromptTemplate和一个语言模型&#xff08;LLM或chat model&#xff09;组成&#xff0c;它…

FlorisBoard:Android开源键盘的现代化选择

FlorisBoard&#xff1a;Android开源键盘的现代化选择 简介 FlorisBoard是一款免费且开源的安卓键盘&#xff0c;适用于Android 7.0及以上版本的设备。它的现代化设计和用户友好的界面使其在众多键盘应用中脱颖而出。FlorisBoard的独特之处在于它注重用户体验的同时&#xff0…

ArcGIS二次开发(一)——搭建开发环境以及第一个简单的ArcGIS Engine 程序

Arcgis10.2、Arcgis Engine10.2与Microsoft Visual Studio 2012的版本进行安装 1、推荐教程与安装包2、安装顺序3、安装成功测试VS新建项目可以创建ArcGIS项目&#xff0c;并且在VS中拖拽ArcGIS工具 4、搭建第一个简单的ArcGIS Engine 程序 ArcEngine和VS版本是有对应的&#x…