怎么使用Netty解码自定义通信协议

news2025/1/16 3:32:42

网络协议的基本要素

一个完备的网络协议需要具备哪些基本要素

  1. 魔数:魔数是通信双方协商的一个暗号,通常采用固定的几个字节表示。魔数的作用是防止任何人随便向服务器的端口上发送数据
  2. 协议版本号:随着业务需求的变化,协议可能需要对结构或字段进行改动,不同版本的协议对应的解析方法也是不同的。所以在生产级项目中强烈建议预留协议版本号这个字段。
  3. 序列化算法:表示数据发送方应该采用何种方法将请求的对象转化为二进制,以及如何再将二进制转化为对象
  4. 报文类型:报文可能存在不同的类型。例如在 RPC 框架中有请求、响应、心跳等类型的报文,在 IM 即时通信的场景中有登陆、创建群聊、发送消息、接收消息、退出群聊等类型的报文。
  5. 长度域字段:代表请求数据的长度,接收方根据长度域字段获取一个完整的报文。
  6. 请求数据:通常为序列化之后得到的二进制流
  7. 状态:状态字段用于标识请求是否正常。一般由被调用方设置。例如一次 RPC 调用失败,状态字段可被服务提供方设置为异常状态。
  8. 保留字段:保留字段是可选项,为了应对协议升级的可能性,可以预留若干字节的保留字段,以备不时之需。
+---------------------------------------------------------------+
​
| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |
​
+---------------------------------------------------------------+
​
| 状态 1byte |        保留字段 4byte     |      数据长度 4byte     | 
​
+---------------------------------------------------------------+
​
|                   数据内容 (长度不定)                          |
​
+---------------------------------------------------------------+

举例如下: image.png

如何实现自定义通信协议

Netty 作为一个非常优秀的网络通信框架,已经为我们提供了非常丰富的编解码抽象基类,帮助我们更方便地基于这些抽象基类扩展实现自定义协议。 Netty 常用编码器类型:

  • MessageToByteEncoder 对象编码成字节流;

  • MessageToMessageEncoder 一种消息类型编码成另外一种消息类型。

Netty 常用解码器类型:

  • ByteToMessageDecoder/ReplayingDecoder 将字节流解码为消息对象;

  • MessageToMessageDecoder 将一种消息类型解码为另外一种消息类型。

编解码器可以分为一次解码器和二次解码器,一次解码器用于解决 TCP 拆包/粘包问题,按协议解析后得到的字节数据。如果你需要对解析后的字节数据做对象模型的转换,这时候便需要用到二次解码器,同理编码器的过程是反过来的。 一次编解码器:MessageToByteEncoder/ByteToMessageDecoder。 二次编解码器:MessageToMessageEncoder/MessageToMessageDecoder。

抽象编码类

ChannelOutboundHandler.png 通过抽象编码类的继承图可以看出,编码类是 ChanneOutboundHandler 的抽象类实现,具体操作的是 Outbound 出站数据。

MessageToByteEncoder

MessageToByteEncoder 用于将对象编码成字节流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,我们只需要实现encode 方法即可完成自定义编码。 编码器实现非常简单,不需要关注拆包/粘包问题。如下例子,展示了如何将字符串类型的数据写入到 ByteBuf 实例,ByteBuf 实例将传递给 ChannelPipeline 链表中的下一个 ChannelOutboundHandler。

public class StringToByteEncoder extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception {
        byteBuf.writeBytes(data.getBytes());
    }
}

encode什么时候被调用的

MessageToByteEncoder 重写了 ChanneOutboundHandler 的 write() 方法,其主要逻辑分为以下几个步骤:

  1. acceptOutboundMessage 判断是否有匹配的消息类型,如果匹配需要执行编码流程,如果不匹配直接继续传递给下一个 ChannelOutboundHandler;

  2. 分配 ByteBuf 资源,默认使用堆外内存;

  3. 调用子类实现的 encode 方法完成数据编码,一旦消息被成功编码,会通过调用 ReferenceCountUtil.release(cast) 自动释放;

  4. 如果 ByteBuf 可读,说明已经成功编码得到数据,然后写入 ChannelHandlerContext 交到下一个节点;如果 ByteBuf 不可读,则释放 ByteBuf 资源,向下传递空的 ByteBuf 对象。

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) { // 1. 消息类型是否匹配
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            buf = allocateBuffer(ctx, cast, preferDirect); // 2. 分配 ByteBuf 资源
            try {
                encode(ctx, cast, buf); // 3. 执行 encode 方法完成数据编码
            } finally {
                ReferenceCountUtil.release(cast);
            }
            if (buf.isReadable()) {
                ctx.write(buf, promise); // 4. 向后传递写事件
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        if (buf != null) {
            buf.release();
        }
    }
}

MessageToMessageEncoder

MessageToMessageEncoder 与 MessageToByteEncoder 类似,同样只需要实现 encode 方法。

MessageToMessageEncoder常用的实现子类StringEncoderLineEncoderBase64Encoder等。

StringEncoder为例看下MessageToMessageEncoder 的用法。

源码示例如下:将 CharSequence 类型(String、StringBuilder、StringBuffer 等)转换成 ByteBuf 类型,结合 StringDecoder 可以直接实现 String 类型数据的编解码。

@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
    if (msg.length() == 0) {
        return;
    }
    out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}

抽象解码类

解码类是 ChanneInboundHandler 的抽象类实现,操作的是 Inbound 入站数据。解码器实现的难度要远大于编码器,因为解码器需要考虑拆包/粘包问题。

由于接收方有可能没有接收到完整的消息,所以解码框架需要对入站的数据做缓冲操作,直至获取到完整的消息。 ChannelOutboundHandler.png

ByteToMessageDecoder

使用 ByteToMessageDecoder,Netty 会自动进行内存的释放,我们不用操心太多的内存管理方面的逻辑。 首先,我们看下 ByteToMessageDecoder 定义的抽象方法:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.isReadable()) {
            decodeRemovalReentryProtection(ctx, in, out);
        }
    }
}

我们只需要实现一下decode()方法,这里的 in 大家可以看到,传递进来的时候就已经是 ByteBuf 类型,所以我们不再需要强转,第三个参数是List类型,我们通过往这个List里面添加解码后的结果对象,就可以自动实现结果往下一个 handler 进行传递,这样,我们就实现了解码的逻辑 handler。 d966e7714f17f19cf3606ddae7a0b6ed.png

为什么存取解码后的数据是用List

由于 TCP 粘包问题,ByteBuf 中可能包含多个有效的报文,或者不够一个完整的报文。

Netty 会重复回调 decode() 方法,直到没有解码出新的完整报文可以添加到 List 当中,或者 ByteBuf 没有更多可读取的数据为止。

如果此时 List 的内容不为空,那么会传递给 ChannelPipeline 中的下一个ChannelInboundHandler。

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
    for (int i = 0; i < numElements; i ++) {
        //循环传播  有多少调用多少
        ctx.fireChannelRead(msgs.getUnsafe(i));
    }
}

decodeLast

ByteToMessageDecoder 还定义了 decodeLast() 方法。为什么抽象解码器要比编码器多一个 decodeLast() 方法呢?

因为 decodeLast 在 Channel 关闭后会被调用一次,主要用于处理 ByteBuf 最后剩余的字节数据。Netty 中 decodeLast 的默认实现只是简单调用了 decode() 方法。如果有特殊的业务需求,则可以通过重写 decodeLast() 方法扩展自定义逻辑。

ReplayingDecoder

ByteToMessageDecoder 还有一个抽象子类是 ReplayingDecoder。它封装了缓冲区的管理,在读取缓冲区数据时,你无须再对字节长度进行检查。因为如果没有足够长度的字节数据,ReplayingDecoder 将终止解码操作。ReplayingDecoder 的性能相比直接使用 ByteToMessageDecoder 要慢,大部分情况下并不推荐使用 ReplayingDecoder。

MessageToMessageDecoder

与 ByteToMessageDecoder 不同的是 MessageToMessageDecoder 并不会对数据报文进行缓存,它主要用作转换消息模型。 比较推荐的做法是使用 ByteToMessageDecoder 解析 TCP 协议,解决拆包/粘包问题。解析得到有效的 ByteBuf 数据,然后传递给后续的 MessageToMessageDecoder 做数据对象的转换,具体流程如下图所示: image.png 案例如下:

public class MyTcpDecoder extends ByteToMessageDecoder {@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 检查ByteBuf数据是否完整
        if (in.readableBytes() < 4) {
            return;
        }// 标记ByteBuf读取索引位置
        in.markReaderIndex();// 读取数据包长度
        int length = in.readInt();// 如果ByteBuf中可读字节数不足一个数据包长度,则将读取索引位置恢复到标记位置,等待下一次读取
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }// 读取数据
        ByteBuf data = in.readBytes(length);// 将数据传递给下一个解码器进行转换,转换后的数据对象添加到out中
        ctx.fireChannelRead(data);
    }
}public class MyDataDecoder extends MessageToMessageDecoder<ByteBuf> {@Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        // 将读取到的ByteBuf数据转换为自定义的数据对象
        MyData data = decode(msg);
        if (data != null) {
            // 将转换后的数据对象添加到out中,表示解码成功
            out.add(data);
        }
    }private MyData decode(ByteBuf buf) {
        // 实现自定义的数据转换逻辑
        // ...
        return myData;
    }
}

实战案例

如何判断 ByteBuf 是否存在完整的报文? 最常用的做法就是通过读取消息长度 dataLength 进行判断。如果 ByteBuf 的可读数据长度小于 dataLength,说明 ByteBuf 还不够获取一个完整的报文。在该协议前面的消息头部分包含了魔数、协议版本号、数据长度等固定字段,共 14 个字节。 固定字段长度和数据长度可以作为我们判断消息完整性的依据,具体编码器实现ByteToMessageDecoder逻辑示例如下:

/*
+---------------------------------------------------------------+
| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |
+---------------------------------------------------------------+
| 状态 1byte |        保留字段 4byte     |      数据长度 4byte     | 
+---------------------------------------------------------------+
|                   数据内容 (长度不定)                          |
+---------------------------------------------------------------+
 */
@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    // 判断 ByteBuf 可读取字节
    if (in.readableBytes() < 14) { 
        return;
    }
    // 标记 ByteBuf 读指针位置
    in.markReaderIndex();
    // 跳过魔数
    in.skipBytes(2);
    // 跳过协议版本号
    in.skipBytes(1);
    byte serializeType = in.readByte();
     // 跳过报文类型
    in.skipBytes(1);
    // 跳过状态字段
    in.skipBytes(1);
    // 跳过保留字段
    in.skipBytes(4);
    
    // 验证报文长度,不对的话就重置指针位置
    int dataLength = in.readInt();
    if (in.readableBytes() < dataLength) {
        in.resetReaderIndex(); // 重置 ByteBuf 读指针位置,这一步很重要
        return;
    }
    byte[] data = new byte[dataLength];
    in.readBytes(data);
    // 方式一:在解码器中就将数据解码成具体的对象
    SerializeService serializeService = getSerializeServiceByType(serializeType);
    Object obj = serializeService.deserialize(data);
    if (obj != null) {
        out.add(obj);
    }
    // 方式二:这一步可以不在解码器中处理,将请求数据读取到一个新的byteBuf然后丢给handler处理
    // 创建新的 ByteBuf 对象来存储有效负载数据
    ByteBuf payload = Unpooled.buffer((int) dataSize);

    // 读取有效负载数据并写入到 payload 中
    in.readBytes(payload);
    if (payload.isReadable()) {
        out.add(payload);
    }
}

扩展

什么是字节序

字节顺序,是指数据在内存中的存放顺序 使用16进制表示:0x12345678。在内存中有两种方法存储这个数字, 不同在于,对于某一个要表示的值,是把值的低位存到低地址,还是把值的高位存到低地址。

字节顺序分类

字节的排列方式有两种。例如,将一个多字节对象的低位放在较小的地址处,高位放在较大的地址处,则称小端序;反之则称大端序。 典型的情况是整数在内存中的存放方式(小端/主机字节序)和网络传输的传输顺序(大端/网络字节序)

1. 网络字节序(Network Order):TCP/IP各层协议将字节序定义为大端(Big Endian) ,因此TCP/IP协议中使用的字节序通常称之为网络字节序。

  • 所以当两台主机之间要通过TCP/IP协议进行通信的时候就需要调用相应的函数进行主机序列(Little Endian)和网络序(Big Endian)的转换。这样一来,也就达到了与CPU、操作系统无关,实现了网络通信的标准化。

2. 主机字节序(Host Order): 整数在内存中保存的顺序,它遵循小端(Little Endian)规则(不一定,要看主机的CPU架构,不过大多数都是小端)。

  • 同型号计算机上写的程序,在相同的系统上面运行是没有问题的。

结论

Java中虚拟机屏蔽了大小端问题,如果是Java之间通信则无需考虑,只有在跨语言通信的场景下才需要处理大小端问题。

回到本文的重点,我们在编解码时也要注意大小端的问题,一般来说如果是小端序的话,我们用Netty取值的时候都要用LE结尾的方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/759420.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SAP顾问生涯闲记:在SAP工作是什么体验

又有一段时间没更新自己的公众号了&#xff0c;为什么突然决定新开一篇SAP顾问生涯闲记系列的文章呢&#xff0c;是因为最近很荣幸地当选了SAP雇主品牌推广大使&#xff0c;作为SAP官方的推广大使在收获这份荣誉的同时&#xff0c;也承担了一些工作以及责任。 集结完毕︱SAP雇…

Flask_实现token鉴权

目录 1、安装依赖 2、实现代码 3、测试 源码等资料获取方法 1、安装依赖 pip install flask pip install pycryptodome 2、实现代码 import random import string import time import base64from functools import wrapsfrom flask import Flask, jsonify, session, req…

苍穹外卖day02——员工管理功能代码开发+分类管理代码导入

目录 新增员工——需求分析与设计 产品原型 接口设计: 数据库设计: 新增员工——代码开发 在Controller层中 在Service层中 在Mapper层中 功能测试 接口文档测试: 前后端联调测试: 新增员工——代码完善 ​编辑 第一个问题 第二个问题 员工分页查询 需求分析与设计 …

PostgreSQL考试难不难 ?

当涉及到PostgreSQL考试的详细难度&#xff0c;以下是一些可能涉及的主题和考点&#xff0c;这些主题在不同的考试中可能有所不同&#xff1a; 1.数据库基础知识&#xff1a;数据库的基本概念、关系型数据库模型、表、字段、主键、外键等。 2.SQL语言&#xff1a;对SQL语言的掌…

数据集——个人收集标注与使用过的数据集

前言 这是一个我个人在工作和学习中使用过以数据集的一部分&#xff0c;有语义分割&#xff0c;目标识别&#xff0c;人像抠图等几个大类&#xff0c;这只是我用过数据集中的一部分&#xff0c;这些数据集有小一部分是来源自网络&#xff0c;很大一部分都是我自己收集。 一、…

【动手学深度学习】--05.权重衰退

文章目录 权重衰退1.原理1.1使用均方范数作为硬性限制1.2使用均方范数作为柔性限制1.3对最优解的影响1.4参数更新法则 2.从零开始实现权重衰退2.1初始化模型参数2.2定义L2范数惩罚2.3训练2.4忽略正则化直接训练2.5使用权重衰退 3.简洁实现 权重衰退 学习视频&#xff1a;权重衰…

在网格化数据集上轻松执行 2D 高通、低通、带通或带阻滤波器研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

ModaHub魔搭社区:AI原生云向量数据库Zilliz Cloud设置白名单

目录 前提条件 操作步骤 下一步 在 Zilliz Cloud 中,白名单是针对项目的安全设置,适用于项目下的所有集群。设置白名单后,仅白名单中的 IP 地址可以访问您项目下的所有集群。白名单能够有效降低受到恶意攻击的风险 本教程将介绍如何设置白名单。 前提条件 确保满足以…

EasyX测试布局代码

#include <iostream> #include <algorithm> #include <graphics.h> // 引用图形库头文件 #include <conio.h> #include <unordered_map> #include <Windows.h> #include "layout/LayoutSystem.h"#define DEFAULT_PANELS_LAYOUT…

谈二级索引

前提&#xff1a; 在数据库中&#xff0c;1、索引分为聚簇索引和非聚簇索引两类。2、所有索引的数据结构都是树&#xff0c;查找树上的节点数据时通过用二分法来锁定数据范围&#xff0c;指定数据排序的规则&#xff0c;比如&#xff1a;有小到大&#xff0c;对比之后最终确定…

Sequencer使用心得

在关卡序列中设置了触发蓝图的关键帧&#xff0c;为什么播放的时候没有触发蓝图事件呢&#xff1f; 在关卡序列中触发蓝图&#xff0c;一般是将蓝图添加到轨道中&#xff0c;设置触发器&#xff0c;在对应的关键帧中&#xff0c;绑定蓝图事件。 一般的话&#xff0c;点击播…

栈、队列、优先级队列详解【c++】

目录 &#x1f3c0;stack的介绍和使用⚽stack的介绍⚽stack的使用 &#x1f3c0;queue的介绍和使用⚽queue的介绍⚽queue的使用 &#x1f3c0;priority_queue的介绍和使用⚽priority_queue的介绍⚽priority_queue的使用 &#x1f3c0;总结 &#x1f3c0;stack的介绍和使用 ⚽s…

尝试-InsCode Stable Diffusion 美图活动一期

一、 Stable Diffusion 模型在线使用地址&#xff1a; https://inscode.csdn.net/inscode/Stable-Diffusion 二、模型相关版本和参数配置&#xff1a; 活动地址 三、图片生成提示词与反向提示词&#xff1a; 提示词&#xff1a;realistic portrait painting of a japanese…

OPENMV的形状和颜色组合识别

使用openmv&#xff0c;通过阈值颜色和形状来去真假宝藏。调试过程发现颜色的阈值比较重要&#xff0c;因为不准的话&#xff0c;它会把一些颜色相近的物体也识别了。识别的精度有待提高&#xff0c;可以使用YOLOV5来精确识别&#xff0c;奈何本人没精力来弄这个。 打开机器视觉…

Proxmox VE 为 Windows 虚拟机添加硬盘遇到的问题

环境&#xff1a;PVE 8.x、Windows 11/Windows Server 2019 &#x1f449;问题一&#xff1a; 为 windows 虚拟机添加磁盘&#xff0c;重启虚拟机后&#xff08;在 windows 系统中重启&#xff09;磁盘未能生效&#xff0c;并显示为橘色。 ❗橘色 意味需要重启VM才能生效&…

BIO实战、NIO编程与直接内存、零拷贝深入辨析-02

网络通信编程基本常识 什么是 Socket &#xff1f; Socket 是应用层与 TCP/IP 协议族通信的中间软件抽象层&#xff0c;它是一组接口&#xff0c;一般由操作 系统提供。在设计模式中&#xff0c;Socket 其实就是一个门面模式&#xff0c;它把复杂的 TCP/IP 协议处理和…

RocketMQ学习笔记(基础篇)

目录 RocketMQ简介 单Master模式 多Master模式 多Master多Slave模式&#xff08;异步&#xff09; 多Master多Slave模式&#xff08;同步&#xff09; 双主双从集群 事务消息 事务消息发送及提交 事务补偿 事务消息状态 RocketMQ高级功能 消息存储 存储介质 消息的…

vue upload 下载

目录 上传 下载 get post 对象/文件流 download处理返回 文件流 axios.post 封装axios 后端直接返回文件流&#xff0c;打开下载文件是 [object Object]&#xff0c;将res改成res.data即可 1.请求设置类型responseType: blob&#xff08;如果没有设置&#xff0c;打…

14_Linux设备树下的platform驱动编写

目录 设备树下的platform驱动简介 运行测试 设备树下的platform驱动简介 platform驱动框架分为总线、设备和驱动,其中总线不需要我们这些驱动程序员去管理&#xff0c;这个是Linux内核提供的,我们在编写驱动的时候只要关注于设备和驱动的具体实现即可。在没有设备树的Linux内…