Netty 编码器 解码器 正确使用姿势

news2025/1/16 18:59:35

Netty 编码器 & 解码器 正确使用姿势

通过前面文章的例子,相信读者也感受到了Netty 开发核心工作在于处理读事件(解码)、写事件(编码)。

Netty 的编解码器是处理网络数据编码和解码的核心组件,编解码器使得在客户端和服务器之间传输的数据可以被正确地序列化和反序列化

  • 编码: 结构化数据序列化为字节流
  • 解码: 字节流反序列化 结构化数据

本文主要内容

  • 如何正确使用编码器解码器
  • pipeline 顺序的重要性

类图

解码器

对于读取消息实现,我们可以实现ChannelInboundHandler,这是比较底层接口, 我们可以实现ChannelInboundHandlerAdapter(我们只需要重新channelRead 方法),你可能问下面的**msg究竟是什么类型?**先卖一个关子。

我们业务开发更多的使用 ByteToMessageDecoder ,先看较完整的代码再解释


channelRead(ChannelHandlerContext ctx, Object msg)

自定义编码器例子

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyChannelInboundHandler0());
                            ch.pipeline().addLast( new MyMessageDecoder());
                            ch.pipeline().addLast(new MyChannelInboundHandler1());
                        }
                    });

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
public class MyChannelInboundHandler0 extends ChannelInboundHandlerAdapter {

   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

       System.out.println( "MyChannelInboundHandler0 ..." + msg.getClass());

       ctx.fireChannelRead(msg);
   }
}
public class MyMessageDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        out.add(doDecode(in));
    }

    private MyMessage doDecode(ByteBuf in){
        ByteBuf targetBuf =   in.readBytes(in.readableBytes());
        String content = targetBuf.toString(StandardCharsets.UTF_8);
        return new MyMessage(content);
    }
}
public class MyChannelInboundHandler1 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       if(msg instanceof MyMessage){
           // 处理消息
           MyMessage myMessage = (MyMessage)msg;
           System.out.println(" handler MyMessage " + myMessage.getMessage() );
       }else {
           ctx.fireChannelRead(msg);
       }
    }
}
代码解读

之前文章我们一直强调pipeline 顺序的重要性,底层读到数据会转成 ByteBuf ,所以我们第一个ChannelInboundHandler 拿到的msg 类型是 ByteBuf

消息会经过 pipeline中一些列ChannelInboundHandler 处理中间会对消息进行转换
所以具体类型,要看前面是如何处理的,读取消息处理顺序和消息类型变化如下:

处理器正确处理方案

通常我们处理器都需要判断是否需要处理,当前处理器不能处理则ctx.fireChannelRead(msg)传递下去,否则后面的处理器无法处理获取到消息,比如MyChannelInboundHandler1实现逻辑。

ByteToMessageDecoder 部分源码解读

ByteToMessageDecoder的channelRead核心代码如下,主体逻辑

  • 如果msg不是ByteBuf,则不处理(对应else 分支)

  • 对于ByteBuf,调用decode方法 ,该类由子类实现,子类需要将转换后的消息放入到out 集合 (参考MyMessageDecoder)

  • finally 块,会将out 集合列表中将所有消息发出去

这就是我们业务将消息转换时 需要放入到out 集合。 本质还是通过ctx.fireChannelRead(msg) 传递下去

// 省略 catch 
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       if (msg instanceof ByteBuf) {
           CodecOutputList out = CodecOutputList.newInstance();
           try {
               first = cumulation == null;
               cumulation = cumulator.cumulate(ctx.alloc(),
                       first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
               callDecode(ctx, cumulation, out);
           }finally {
               try {
                   ... 
                   fireChannelRead(ctx, out, size);
               } finally {
                   out.recycle();
               }
           }
       } else {
           ctx.fireChannelRead(msg);
       }
   }

   /**
    * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
    */
   static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
       if (msgs instanceof CodecOutputList) {
           fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
       } else {
           for (int i = 0; i < numElements; i++) {
               ctx.fireChannelRead(msgs.get(i));
           }
       }
   }

MessageToByteEncoder

将消息"编码"ByteBuf,与ByteToMessageDecoder相反 ,MyMessageEncoder是一个简单MessageToByteEncoder使用例子,实现它不难重点还是pipeline 的责任链顺序

public class MyMessageEncoder extends MessageToByteEncoder<MyOutMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MyOutMessage msg, ByteBuf out) throws Exception {
        System.out.println("encode " + msg.getMessage());
        String message = "echo " +msg.getMessage();
        out.writeBytes(message.getBytes(StandardCharsets.UTF_8));

    }

}
public class MyChannelInboundHandler1 extends ChannelInboundHandlerAdapter {

   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      if(msg instanceof MyMessage){
          // 处理消息
          MyMessage myMessage = (MyMessage)msg;
          System.out.println(" handler MyMessage " + myMessage.getMessage() );
          MyOutMessage myOutMessage = new MyOutMessage(myMessage.getMessage());

          ctx.writeAndFlush(myOutMessage);
           //ctx.channel().writeAndFlush(myOutMessage);
      }else {
          ctx.fireChannelRead(msg);
      }
   }
}
    ch.pipeline().addLast(new MyChannelInboundHandler0());
                            ch.pipeline().addLast( new MyMessageDecoder());
                            ch.pipeline().addLast(new MyMessageEncoder());
                            ch.pipeline().addLast(new MyChannelInboundHandler1());
                            //ch.pipeline().addLast(new MyMessageEncoder());

将前面MyChannelInboundHandler1 代码稍作修改,使用ctx.writeAndFlush(myOutMessage) 发送出去。

MyMessageEncoder 如果定义在MyChannelInboundHandler1 后面则接收不到,具体源码后面文章会解读。

再谈顺序

ctx.fireChannelRead 读事件传播方向 从当前节点 -> 责任链尾部方向 传播
所以前面节点不会处理到;

ctx.writeAndFlush 从当前节点 —> 责任链头部方向 传播 ,因此定义在当前节点后面的节点不会处理到对应的
写事件。

举一反三 ctx.channel().writeAndFlush(myOutMessage);结果又如何 ?

MessageToMessageDecoder 和 MessageToMessageEncoder

有时候我们需要对消息多次处理这两个类比较有用,这两个类是可选的。

通常读取事件处理顺序

写入事件处理顺序

使用时需要保证责任链上处理类类型能够衔接上,否则消息从中间某个处理器就断了。

总结

本文主要介绍了MessageToByteEncoder 和 ByteToMessageDecoder 重要的抽象类有了这两个基础类,理论上我们可以实现
一切编码、转码工作。

当然netty内置了许多编码器和解码器,尤其在处理TCP半包、粘包问题值得我们借鉴。
另外通过本文,详细读者也看到责任链顺序的重要性,不正确的顺序会导致业务无法正确的处理消息。
限于篇幅后面文章会持续输出消息处理整个流程,以及内置编码器使用等待。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2236098.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于微信小程序的电子购物系统的设计与实现(lw+演示+源码+运行)

摘 要 由于APP软件在开发以及运营上面所需成本较高&#xff0c;而用户手机需要安装各种APP软件&#xff0c;因此占用用户过多的手机存储空间&#xff0c;导致用户手机运行缓慢&#xff0c;体验度比较差&#xff0c;进而导致用户会卸载非必要的APP&#xff0c;倒逼管理者必须改…

FIPS203 后量子安全ML-KEM(标准简读)

FIPS 203是美国国家标准与技术研究院&#xff08;NIST&#xff09;发布的关于模块格基密钥封装机制&#xff08;ML-KEM&#xff09;的标准&#xff0c;旨在提供一种能抵御量子计算机攻击的密钥建立方案。以下是对该文档的详细总结&#xff1a; 标准概述 目的与范围&#xff…

鸿萌数据迁移服务: 企业服务器整机在线热迁移, 实现不停机业务转移

天津鸿萌科贸发展有限公司从事数据安全服务二十余年&#xff0c;致力于为各领域客户提供专业的数据存储、数据恢复、数据备份、数据迁移等解决方案与服务&#xff0c;并针对企业面临的数据安全风险&#xff0c;提供专业的相关数据安全培训。 鸿萌数据迁移业务为众多企业顺利高效…

macOS15.1及以上系统bug:开发者证书无法打开,钥匙串访问无法打开一直出现图标后立马闪退

团队紧跟苹果最新系统发现bug:今日设备信息如下,希望能带给遇到这个问题的开发者一点帮助。 错误图如下: 点击证书文件后,先出现钥匙串访问图标,后立马闪退消失 中间试过很多方法,都是一样的表现,最后好在解决了,看网上也没有相关的帖子,这里直接写解决办法和导致原因…

20241102在荣品PRO-RK3566开发板的预置Android13下适配宸芯的数传模块CX6603N

20241102在荣品PRO-RK3566开发板的预置Android13下适配宸芯的数传模块CX6603N 2024/11/2 18:04 在WIN10使用程序&#xff1a;ViewLink-4.0.7_0708-windows-x64.exe 在荣品PRO-RK3566开发板的预置Android13下使用&#xff1a;ViewLink-2023_12_21-release-0.2.6.apk adb install…

【STM32】DMA直接存储器读取

文章目录 DMA简介DMA定义DMA传输方式DMA传输参数STM32的存储器映像DMA基本结构DMA的具体应用数据转运 DMAADC扫描模式 DMA DMA库函数 DMA数据传输&#xff08;数据转运 DMA&#xff09;接线图MyDMA模块main.c 源程序 DMA AD多通道&#xff08;ADC扫描模式 DMA&#xff09;…

兰空图床配置域名访问

图床已经创建完毕并且可以访问了&#xff0c;但是使用IP地址多少还是差点意思&#xff0c;而且不方便记忆&#xff0c;而NAT模式又没法直接像普通服务器一样DNS解析完就可以访问。 尝试了很多办法&#xff0c;nginx配置了半天也没配好&#xff0c;索性直接重定向&#xff0c;反…

React 入门课程 - 使用CDN编程React

1. 第一个React 注意&#xff1a;在vscode里&#xff0c;使用Live Server来运行html文件。 index.html <html><head><link rel"stylesheet" href"index.css"><script crossorigin src"https://unpkg.com/react17/umd/react.de…

flink 内存配置(一):设置Flink进程内存

flink 内存配置&#xff08;一&#xff09;&#xff1a;设置Flink进程内存 flink 内存配置&#xff08;二&#xff09;&#xff1a;设置TaskManager内存 flink 内存配置&#xff08;三&#xff09;&#xff1a;设置JobManager内存 flink 内存配置&#xff08;四&#xff09;…

快讯,Flutter PC 多窗口新进展,已在 Ubuntu/Canonical 展示

相信 Flutter 开发者对于 Flutter PC 多窗口的支持一直是「望眼欲穿」&#xff0c;而根据 #142845 相关内容展示&#xff0c; 在上月 27 号的 Ubuntu 峰会&#xff0c;Flutter 展示了多窗口相关进展。 事实上 Ubuntu 和 Flutter 的进一步合作关系应该是在 2021 年就开始了&…

HTB:Nibbles[WriteUP]

目录 连接至HTB服务器并启动靶机 1.How many open TCP ports are listening on Nibbles? 使用nmap对靶机TCP端口进行开放扫描 2.What is the relative path on the webserver to a blog? 使用ffuf对靶机80端口Web进行路径FUZZ 3.What content management system (CMS) …

AI资讯快报(2024.11.3-11.8)

1.<字节跳动上线名为炉米 Lumi的 AI 模型交流社区> 近日&#xff0c;字节跳动上线了一款名为【炉米 Lumi】的 AI 模型交流社区&#xff0c;这是一个专门给AI爱好者、研究人员和开发者准备的AI模型分享社区平台。该平台目前还在内部测试阶段&#xff0c;只有白名单用户才…

使用最新版的wvp和ZLMediaKit搭建Gb28181测试服务器

文章目录 说明安装1.安装nodejs简介安装步骤 2.安装java环境3.安装mysql安装修改密码 4.安装redis5.安装编译器6.安装cmake7.安装依赖库8.编译ZLMediaKit9.编译wvp-GB28181-pro 配置1.ZLMediaKit配置2.wvp-GB28181-pro配置2.1.配置ZLMediaKit连接信息2.2.28181服务器的配置2.3.…

AutoOps 使每个 Elasticsearch 部署都更易于管理

作者&#xff1a;来自 Elastic Ziv Segal&#xff0c;Ori Shafir AutoOps for Elasticsearch 通过性能建议、资源利用率和成本洞察、实时问题检测和解决路径显著简化了集群管理。 虽然 Elasticsearch 是一款功能强大且可扩展的搜索引擎&#xff0c;可提供多种功能&#xff0c;但…

Excel:vba实现正则匹配

一、匹配数字 实现的效果&#xff1a;(点击右边“提取数字”按钮) 实现的代码&#xff1a; Sub 提取数字() Dim cell As Range Dim sj As Object Dim regx As Object Dim ss As Object Dim n As Integer创建了一个 VBScript 正则表达式对象 regx&#xff0c;用于匹配特定模式…

第三十五篇:HTTP报文格式,HTTP系列二

HTTP 是超⽂本传输协议&#xff0c;也就是HyperText Transfer Protocol。 前面我们讲到第三章中网络协议的定义&#xff0c;网络协议的定义&#xff1a;网络协议是通信计算机双方必须共同遵从的一组约定。就像两个人要进行交流&#xff0c;如果不制定一套约定&#xff0c;一方…

[JAVAEE] 面试题(四) - 多线程下使用ArrayList涉及到的线程安全问题及解决

目录 一. 多线程下使用ArrayList 1.1. 自行判断加锁 1.2 使用Collections.synchronizedList()套壳加锁 1.3 CopyOnWriteArrayList类 二. 总结 一. 多线程下使用ArrayList 多线程下使用ArrayList会涉及到线程安全问题, 例如: public static void main(String[] args) thro…

使用axois自定义基础路径,自动拼接前端服务器地址怎么办

请求路径&#xff1a; http://localhost:5173/http://pcapi-xiaotuxian-front-devtest.itheima.net/home/category/head 很明显多拼接了路径地址 查看基础路径文件发现&#xff1a; //axios基础封装 import axios from axiosconst httpInstance axios.create({baseURL: /h…

docker镜像仓库常用命令

docker镜像仓库常用命令 docker logindocker logoutdocker pulldocker pushdocker searchdocker imagesdocker image inspectdocker tagdocker rmidocker image prunedocker savedocker loaddocker history docker login 语法: docker login [options] [server] 功能&#xff…

itextpdf打印A5的问题

使用A5打印的时候&#xff0c;再生成pdf是没有问题的。下面做了一个测试&#xff0c;在打印机中&#xff0c;使用A5的纸张横向放入&#xff0c;因为是家用打印机&#xff0c;A5与A4是同一个口&#xff0c;因此只能这么放。 使用itextpdf生成pdf&#xff0c;在浏览器中预览pdf是…