Netty笔记05-组件Handler Pipeline

news2024/12/29 11:33:37

文章目录

  • 概述
    • ChannelHandler
      • 方法
    • ChannelPipeline
      • 特点
    • 总结
  • 代码示例
    • 服务器端
    • 客户端
  • EmbeddedChannel
    • EmbeddedChannel 的方法


概述

ChannelHandler

ChannelHandler 是 Netty 中的一个接口,它定义了处理 I/O 事件的方法。ChannelHandler 可以处理各种类型的事件,包括连接事件、读写事件、异常事件等。

方法

入站(Inbound)方法:

  • channelRead(ChannelHandlerContext ctx, Object msg):当从通道接收到数据时调用。
  • channelReadComplete(ChannelHandlerContext ctx):当所有数据都已读取完毕时调用。
  • exceptionCaught(ChannelHandlerContext ctx, Throwable cause):当发生异常时调用。

出站(Outbound)方法:

  • write(ChannelHandlerContext ctx, Object msg, Promise promise):当有数据需要写出时调用。
  • flush(ChannelHandlerContext ctx):当需要刷新写出缓冲区时调用。

ChannelPipeline

ChannelPipeline 是 Netty 中的核心组件之一,它管理了一系列的 ChannelHandler。ChannelPipeline 可以理解为一个责任链模式的实现,它按照顺序处理事件,每个 ChannelHandler 负责处理特定类型的事件,并可以选择将事件传递给下一个 ChannelHandler 或者停止处理。

特点

  • 有序性:ChannelPipeline 中的 ChannelHandler 是按顺序排列的,可以插入、删除或替换。
  • 双向性:ChannelPipeline 支持双向处理,即可以处理入站(Inbound)事件,也可以处理出站(Outbound)事件。
  • 灵活性:可以动态地添加、删除或替换 ChannelHandler,而无需重新启动应用程序。

总结

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。

代码示例

服务器端

在服务器端中添加处理器 head -> h1 -> h2 -> h4 -> h3 -> h5 -> h6 -> tail

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;

@Slf4j
public class PipelineTest {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 1. 通过 channel 拿到 pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        // 2. 添加处理器 head ->  h1 -> h2 ->  h4 -> h3 -> h5 -> h6 -> tail
                        //addLast()会加在tail之前,而不是最后。底层是双向链表
                        //入站处理器
                        pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("1");
                                //处理加工msg
                                ByteBuf buf = (ByteBuf) msg;
                                String name = buf.toString(Charset.defaultCharset());
                                //将处理好的字符串传递给h2
                                super.channelRead(ctx, name);
                            }
                        });
                        pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
                                log.debug("2");
                                Student student = new Student(name.toString());
                                super.channelRead(ctx, student);
                                // 调用super.channelRead()将数据传递给下个 handler,如果不调用,调用链会断开。或者调用ctx.fireChannelRead(student);
                            }
                        });

                        //出战处理器。注意:ctx.writeAndFlush()和ch.writeAndFlush()都会走该出站处理器
                        pipeline.addLast("h2.5", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("2.5");
                                super.write(ctx, msg, promise);
                            }
                        });

                        pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("3,h2传递的数据为{},class为{}",msg,msg.getClass());
                                //super.channelRead(ctx, msg);//channelRead()是将控制权交给pipeline中下一个入站处理器
                                //这里h3后面已经没有入站处理器了,所以可以不用调用channelRead()

                                //向channel中写入数据触发出站处理器(ch.writeAndFlush()会从尾节点向前找出站处理器)
                                ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));

                                //ctx.writeAndFlush()是从当前处理器向前寻找出站处理器
//                                ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));

                            }
                        });
                        //出站处理器 p2.25
                        //注意:出站处理器只有向channel中写入数据才会触发
                        pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("4");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("5");
                                super.write(ctx, msg, promise);
                            }
                        });
                        pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                log.debug("6");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }
    @Data
    @AllArgsConstructor
    static class Student {
        private String name;
    }
}

客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author qf
 * @since 2024/09/12 19:44
 */
public class Client {
    public static void main(String[] args) {
        new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect("127.0.0.1", 8080)
                .addListener((ChannelFutureListener) future -> {
                    future.channel().writeAndFlush("hello,world");
                });
    }
}

服务器端输出

18:45:18.287 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 1
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 2
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 3,h2传递的数据为PipelineTest.Student(name=hello,world),classclass com.qf.netty.Pipeline.PipelineTest$Student
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 6
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 5
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 4
18:45:18.288 [nioEventLoopGroup-2-2] DEBUG com.qf.netty.Pipeline.PipelineTest - 2.5

以上代码可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表。
在这里插入图片描述

  • 入站处理器中,super.channelRead(ctx, name)或使用ctx.fireChannelRead(msg) 是 调用下一个入站处理器

    • 如果注释掉 h1中 super.channelRead(ctx, name)代码,则仅会打印 1
    • 如果注释掉 h2中 super.channelRead(ctx, student)代码,则仅会打印 1 2
  • h3 处的 ch.writeAndFlush() 会 从尾部开始触发 后续出站处理器的执行

    • 如果注释掉 h3 处 ch.writeAndFlush() 代码,则仅会打印 1 2 3
  • 类似的,出站处理器中,super.write(ctx, msg, promise)或ctx.write(msg, promise) 的调用也会 触发上一个出站处理器

    • 如果注释掉 h6 处 super.write(ctx, msg, promise) 代码,则仅会打印 1 2 3 6
  • ctx.channel().write(msg) vs ctx.write(msg)

    • 都是触发出站处理器的执行
    • ctx.channel().write(msg) 从尾部开始查找出站处理器
    • ctx.write(msg) 是从当前节点找上一个出站处理器
    • 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
    • 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己(死循环)
      在这里插入图片描述
      服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序

EmbeddedChannel

EmbeddedChannel 是 netty提供的专门用来测试的channel。
使用EmbeddedChannel进行测试可以不启动服务器和客户端了。

EmbeddedChannel 的方法

  • writeInbound(Object msg):将入站消息写到 EmbeddedChannel 中。
  • writeOutbound(Object msg):将出站消息写到 EmbeddedChannel 中。
  • readInbound():从 EmbeddedChannel 中读取入站消息。
  • readOutbound():从 EmbeddedChannel 中读取出站消息。
  • finish():完成所有未完成的写操作,并关闭 EmbeddedChannel

@Slf4j
public class Test06EmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("4");
                super.write(ctx, msg, promise);
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        // 模拟入站操作
//        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
        /*
        [DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 1
        [DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 2
         */
        // 模拟出站操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
        /*
        [DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 4
        [DEBUG] [main] c.i.n.c.Test06EmbeddedChannel - 3
         */
    }
}


全部文章:
Netty笔记01-Netty的基本概念与用法
Netty笔记02-组件EventLoop
Netty笔记03-组件Channel
Netty笔记04-组件Future & Promise
Netty笔记05-组件Handler & Pipeline

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2136926.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ESP01的AT指令连接到阿里云平台

物联网平台提供安全可靠的设备连接通信能力&#xff0c;支持设备数据采集上云&#xff0c;规则引擎流转数据和云端数据下发设备端。此外&#xff0c;也提供方便快捷的设备管理能力&#xff0c;支持物模型定义&#xff0c;数据结构化存储&#xff0c;和远程调试、监控、运维。总…

C++学习笔记之引用(基础)

C学习笔记之引用 https://www.runoob.com/cplusplus/cpp-references.html 引用变量是一个别名&#xff0c;它是已存在变量的另一个名字 一旦把引用初始化为某个变量&#xff0c;可以使用该引用名称或变量名称来指向变量 1、引用vs指针 引用和指针之间有一些相似&#xff0c;也…

计算机的错误计算(九十三)

摘要 探讨 log(y,x) 即以 x 为底 y 的对数的计算精度问题。 Log(y,x)运算是指 x 为底 y 的对数。 例1. 计算 log(123667.888, 0.999999999999999) . 不妨在Python中计算&#xff0c;则有&#xff1a; 若在 Excel 单元格中计算&#xff0c;则有几乎同样的输出&#xff1a; 然…

树莓派交叉编译

目录 一、交叉编译的认知 1.1 本地编译&#xff1a; 1.2 交叉编译是什么&#xff1a; 1.3 为什么要交叉编译&#xff1a; 1.4 什么是宿主机&#xff1f;什么是目标机&#xff1f; 1.5 如何进行交叉编译&#xff1a; 二、交叉编译工具链的安装 2.1 下载交叉编译工具&…

CesiumJS+SuperMap3D.js混用实现可视域分析 S3M图层加载 裁剪区域绘制

版本简介&#xff1a; cesium&#xff1a;1.99&#xff1b;Supermap3D&#xff1a;SuperMap iClient JavaScript 11i(2023)&#xff1b; 官方下载文档链家&#xff1a;SuperMap技术资源中心|为您提供全面的在线技术服务 示例参考&#xff1a;support.supermap.com.cn:8090/w…

设置 AutoCAD双击 DWG 文件时启动新的程序

1 问题描述 原CAD打开多个文件时&#xff0c;会在该程序打开新的标签&#xff0c;合并显示。 有时想打开新文件时启动新的程序&#xff0c;单独显示&#xff0c;如下&#xff1a; 2 解决办法 2.1 方法1 SDI变量可以将CAD设置成单文档模式&#xff0c;设置为1的时候就能实…

音视频直播应用场景探讨之RTMP推流还是GB28181接入?

技术背景 好多开发者跟我们沟通音视频解决方案的时候&#xff0c;不清楚什么时候用RTMP推送模块&#xff0c;什么时候用GB28181设备接入模块&#xff0c;也不清楚二者差异化。实际上&#xff0c;RTMP推流和GB28181接入模块&#xff0c;在很多方面存在差异&#xff0c;如应用领…

IPC之AIDL从认识到实战

目录 前言 什么是AIDL? 为什么要设计出这样一种语言&#xff1f;它能帮助我们干什么&#xff1f; 还有其他方法能实现跨进程通信吗&#xff1f;相较于别的方法AIDL有什么优势呢&#xff1f; AIDL的相关语法 Java与AIDL的不同之处 AIDL默认支持的数据类型&#xff1a; …

怎么浏览URL的PDF文件呢

最近发现PDF文件网页端打开就是丑&#xff0c;不知道怎么办 1. 看着实在不舒服&#xff0c;用chorm的插件 然后原本本地用的也是2345pdf阅读器 2. 之后也下载了adobe pdf的桌面阅读器 2345打开是这个样子 这个是现在啦 如果要一些安装包什么的&#xff0c;评论见~ 最…

相机光学(三十八)——VCM(Voice Coil Motor)音圈马达

VCM&#xff08;Voice Coil Motor&#xff09;音圈马达 0.参考链接1.什么是音圈马达2.对焦&#xff08;变焦&#xff09;原理3.音圈马达结构4.音圈马达工作原理5.VCM 主要性能指标 0.参考链接 &#xff08;1&#xff09;Camera 模组之 VCM篇 &#xff08;2&#xff09;VCM基本…

08 vue3之认识bem架构及less sass 和scoped

bem架构 他是一种css架构 oocss 实现的一种 &#xff08;面向对象css&#xff09; &#xff0c;BEM实际上是block、element、modifier的缩写&#xff0c;分别为块层、元素层、修饰符层&#xff0c;element UI 也使用的是这种架构 1. BEM架构 1. 介绍 1. BEM是Block Element M…

PowerBI 关于FILTERS函数和VALUES函数

本人是powerbi新手&#xff0c;最近在使用Filters()函数和Values()函数时&#xff0c;有点不太明白它们之间的区别&#xff0c;u有时它们得到的结果是一样的&#xff0c;有时却不一样。 官方文档里&#xff0c;Filters()是表示返回直接作为筛选器应用到 columnName 的值 FILT…

MinIO - macOS上配置、Python调用

文章目录 安装配置 MinIO 服务Python 调用SDK 简介调用示例 安装配置 MinIO 服务 1、使用 brew 安装 MinIO 如果您之前使用 brew install minio 安装了MinIO服务器&#xff0c;那么我们建议您改为从 minio/stable/minio 重新安装。 brew install minio/stable/minio2、创建文…

YOLOv5/v8 + 双目相机测距

yolov5/v8双目相机测距的代码&#xff0c;需要相机标定 可以训练自己的模型并检测测距&#xff0c;都是python代码 已多次实验&#xff0c;代码无报错。 非常适合做类似的双目课题&#xff01; 相机用的是汇博视捷的双目相机&#xff0c;具体型号见下图。 用的yolov5是6.1版本的…

Spring Boot集成Akka remoting快速入门Demo

1.什么是Akka remoting&#xff1f; Akka-Remoting一种ActorSystem之间Actor对Actor点对点的沟通协议.通过Akka-Remoting来实现一个ActorSystem中的一个Actor与另一个ActorSystem中的另一个Actor之间的沟通 Akka Remoting限制&#xff1a; 不支持NAT&#xff08;Network Add…

使用Java实现一个简单的B树

1.B树简介 B树是一个搜索树&#xff0c;数据结构可以抽象成如二叉树一样的树&#xff0c;不过它有平衡、有序、多路的特点。 平衡&#xff1a;所有叶子节点都在同一层。有序&#xff1a;任一元素的左子树都小于它&#xff0c;右子树都大于它。多路&#xff1a;B树的每个节点最多…

深入链表的遍历——快慢指针算法(LeetCode——876题)

今天我们一起来学习一下一个快速遍历链表的方法 我们先来看看一道经典的需要遍历链表的题目 &#xff08;题目来自LeetCode&#xff09; 876. 链表的中间结点https://leetcode.cn/problems/middle-of-the-linked-list/ 给你单链表的头结点 head &#xff0c;请你找出并返回链…

C++多态 学习

目录 一、多态的概念 二、多态的实现 三、纯虚函数和多态类 四、多态的原理 一、多态的概念 多态&#xff1a;多态分为编译时多态(静态多态)和运行时多态(动态多态)。编译时多态主要是我们之前学过的函数重载和函数模板&#xff0c;他们在传不同类型的参数就可以调用不同的函…

diff 命令:文本比较

一、diff 命令简介 ​diff ​命令是一个用于比较两个文件并输出它们之间差异的工具。它是文件比较的基本工具&#xff0c;广泛用于源代码管理、脚本编写和日常的文件维护工作中。 ‍ 二、diff 命令参数 diff [选项] 文件1 文件2选项&#xff1a; ​-b​ 或 --ignore-space…

光伏选址和设计离不开气象分析!

都说光伏选址和设计离不开气象分析&#xff0c;气象条件对太阳能发电影响较大&#xff0c;具体有哪些影响呢&#xff1f;今天我就来讲解下。 - 太阳辐射&#xff1a;太阳辐射的强度是光伏发电的首要因素&#xff0c;对光伏发电有着重要的影响。太阳辐射的强度决定了光伏发电系…