Netty解决粘包半包问题

news2024/12/23 15:47:17

1.定长,每次读取固定的数据量

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new FixedLengthFrameDecoder(10)); // 每条消息长度固定为10字节
pipeline.addLast(new YourBusinessHandler());

每条消息长度固定,接收端读取固定字节数作为一个完整的消息。

  • 粘包问题: 即使多个消息被合并在一起,定长解码器可以通过固定的长度正确拆分数据。

  • 半包问题: 如果数据不完整,Netty 会等待剩余数据到达再进行组装。

  • 不灵活,只适用于固定长度的协议。

    如果消息长度不一致,需要填充或裁剪数据,浪费存储空间。

2.分隔符

每条消息使用特定的分隔符(如 \n)进行分隔。

粘包问题: 分隔符明确了每条消息的边界,无论消息是否粘连都可以正确拆分。

半包问题: 如果分隔符未到达,Netty 会缓冲当前数据,等待剩余数据到达后组装完整消息。

  • 如果分隔符是消息内容的一部分,可能会导致解析错误。

  • 每条消息都需要附加分隔符,增加了一些开销。

ChannelPipeline pipeline = ch.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes()); // 使用换行符作为分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
pipeline.addLast(new YourBusinessHandler());

3.基于长度字段的解决方案

消息头中包含一个字段表示消息体的长度,接收端根据长度字段解析完整的消息。

粘包问题: 长度字段明确了每条消息的大小,即使多条消息粘连,解码器可以逐条解析。

半包问题: 如果接收的数据不足以包含完整消息,Netty 会缓冲数据,等待剩余部分到达再处理。

  • 适用于变长消息,灵活且高效。

  • 不依赖分隔符,节省了数据的额外开销。

  • 需要协议设计时明确定义长度字段。

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(
    1024, 0, 4, 0, 4)); 
    // 最大帧长度1024,长度字段偏移量为0,长度字段长度为4字节,无附加偏移,去掉长度字段的头部字节
pipeline.addLast(new YourBusinessHandler());

一共五个参数

maxFrameLength:帧的最大长度

lengthFieldOffset:长度的偏移量,用于获取长度,比如你先写入长度4个字节,那就无需偏移,因为是先写入的,所以直接读长度的字节大小可以直接得到。如果先写了header,比如CAFE占用了1个字节,那长度偏移量就得是1了,此时他会跳过从头开始数的第一个字节,然后读取消息的长度。有了消息的长度就可以读取消息了。

lengthFieldLength:长度占用的字节大小,跟上面lengthFieldOffset的来确定发送消息的长度。

initialBytesstrip:剥离字节长度,比如我要剥离掉这个header1字节,还有这个记录长度的字节4字节,我需要指定剥离的字节大小(1字节+4字节),就能只留下消息了。

lengthAdjustment:他指的是从长度之后应该跳过几个字节的内容,比如我在长度和消息之间又加了一个版本号1字节

现在组成:header 1字节 ,长度 4字节 ,版本号 1字节 ,消息("helloworld")

此时maxFrameLength=1024

lengthFieldOffset=1(跳过header)

lengthFieldLength=4(长度的字节大小)

lengthAdjustment=1(长度以后跳过1个字节才是消息)

initialBytesstrip=6 (去掉header、长度、版本号共6个字节)此时才会得到真正的helloworld

4.自定义协议

如果协议复杂或者不符合上述通用解码器的场景,可以手动编写解码器。

通过分析接收到的数据流,根据协议规则解析完整的消息。

粘包问题: 自定义逻辑中明确解析每条消息的边界。

半包问题: 使用 Netty 提供的缓冲区特性,确保接收完整数据后再解析。

@Slf4j
public class ProtocolMessageEncoderDecoder extends ByteToMessageCodec<ProtocolMessage<?>> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, ProtocolMessage<?> protocolMessage, ByteBuf byteBuf) throws Exception {
        //判断protocolMessage为空
        if(protocolMessage == null || protocolMessage.getHeader() == null){
            byteBuf.writeBytes(new byte[]{});
        }
        //得到请求头
        ProtocolMessage.Header header = protocolMessage.getHeader();

        //拼装请求
        //魔数 1字节
        byteBuf.writeBytes(new byte[]{header.getMagic()});


        //版本号 1字节
        byteBuf.writeBytes(new byte[]{header.getVersion()});
        //序列化器 1字节
        byteBuf.writeBytes(new byte[]{header.getSerializer()});
        //类型 1字节
        byteBuf.writeBytes(new byte[]{header.getType()});
        //状态 1字节
        byteBuf.writeBytes(new byte[]{header.getStatus()});
        //请求id 8字节
        byteBuf.writeLong(header.getRequestId());
        //消息体长度 4字节
//        byteBuf.writeInt(header.getBodyLength());
        // 获取序列化器
        ProtocolMessageSerializerEnum enumByKey = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if(enumByKey == null){
            throw new RuntimeException("序列化协议不存在:"+ enumByKey.getValue());
        }
        //利用key得到序列化器
        Serializer serializer = SerializerFactory.getInstance(enumByKey.getValue());
        //序列化请求体
        byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
        //写入请求体长度
        byteBuf.writeInt(bodyBytes.length);
        //写入请求体
        byteBuf.writeBytes(bodyBytes);
        //完成自定义协议编码
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // 分别从指定位置读出 Buffer
        ProtocolMessage.Header header = new ProtocolMessage.Header();
        //魔数
        byte magic = byteBuf.readByte();
        //校验魔数
        if(magic != ProtocolConstant.PROTOCOL_MAGIC){
//            throw new RuntimeException("消息 magic 非法" + magic);
            throw new RpcException(ErrorCode.ConsumerError,"消息 magic 非法" + magic);
        }
        //版本号
        byte version = byteBuf.readByte();
        //序列化器
        byte serializer = byteBuf.readByte();
        //类型
        byte type = byteBuf.readByte();
        //状态
        byte status = byteBuf.readByte();
        //请求id
        long RequestId = byteBuf.readLong();
        //消息体长度
        int BodyLength = byteBuf.readInt();
        //写入header
        header.setMagic(magic);
        header.setSerializer(serializer);
        header.setVersion(version);
        header.setType(type);
        header.setStatus(status);
        header.setRequestId(RequestId);

        header.setBodyLength(BodyLength);
        //获得请求体数据,以上一共17个字节
        byte[] bodyBytes = new byte[BodyLength];

        //写入请求体数据, 解决粘包问题,只读指定长度的数据
        byteBuf.readBytes(bodyBytes,0,BodyLength);
        //解析消息体
        // 获取序列化器
        ProtocolMessageSerializerEnum serializerEnum   = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if(serializerEnum  == null){
        throw new RuntimeException("序列化消息的协议不存在");
        }
        //得到序列化器
        Serializer serializer1 = SerializerFactory.getInstance(serializerEnum .getValue());
        ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
        if (messageTypeEnum == null) {
            throw new RuntimeException("序列化消息的类型不存在");
        }
        switch (messageTypeEnum) {
            case REQUEST:
                RpcRequest request  = serializer1.deserialize(bodyBytes, RpcRequest.class);
                list.add(new ProtocolMessage<>(header,request));
                return;
            case RESPONSE:
                RpcResponse response = serializer1.deserialize(bodyBytes, RpcResponse.class);
                list.add(new ProtocolMessage<>(header,response));
                return;
            case HEART_BEAT:return;
            case OTHERS:return;
            default:
                throw new RuntimeException("暂不支持该消息类型");
        }
    }


}

灵活,适用于复杂协议。

方法粘包问题解决半包问题解决使用场景优缺点
定长消息精确分割等待补充数据消息固定长度的协议简单但不灵活,适合定长数据
分隔符根据分隔符拆分等待完整数据使用明确分隔符的协议实现简单,但需要额外的分隔符
长度字段按长度截取等待完整数据消息中包含长度字段的协议灵活高效,适合大多数场景
自定义解码器自行定义逻辑自行定义逻辑协议复杂或不规则灵活性最高,但开发成本高

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2264302.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构---------二叉树前序遍历中序遍历后序遍历

以下是用C语言实现二叉树的前序遍历、中序遍历和后序遍历的代码示例&#xff0c;包括递归和非递归&#xff08;借助栈实现&#xff09;两种方式&#xff1a; 1. 二叉树节点结构体定义 #include <stdio.h> #include <stdlib.h>// 二叉树节点结构体 typedef struct…

前置知识补充—JavaScript

JavaScript 简介 JavaScript 是什么 JavaScript (简称 JS), 是⼀个脚本语⾔, 解释型或即时编译型的编程语⾔. 虽然它是作为开发Web⻚⾯的脚本语⾔⽽出名&#xff0c;但是它也被⽤到了很多⾮浏览器环境中 HTML&#xff1a; ⽹⻚的结构 CSS&#xff1a; …

Mac上详细配置java开发环境和软件(更新中)

文章目录 概要JDK的配置JDK下载安装配置JDK环境变量文件 Idea的安装Mysql安装和配置Navicat Premium16.1安装安装Vscode安装和配置Maven配置本地仓库配置阿里云私服Idea集成Maven Cpolar快速入门 概要 这里使用的是M3型片 14.6版本的Mac 用到的资源放在网盘 链接: https://pan…

第二十六周机器学习笔记:PINN求正反解求PDE文献阅读——正问题

第二十六周周报 摘要Abstract文献阅读《Physics-informed neural networks: A deep learning framework for solving forward and inverse problems involving nonlinear partial differential equations》1. 引言2. 问题的设置3.偏微分方程的数据驱动解3.1 连续时间模型3.1.1 …

米思奇图形化编程之ESP32控制LED灯闪烁方案实现

目录 一、项目概述 二、硬件准备 三、硬件连接 四、软件编程 五、验证效果 六、总结 一、项目概述 本项目使用米思奇图形化编程环境&#xff0c;编写micropython软件代码&#xff0c;实现了控制ESP32开发板上LED灯闪烁效果。该项目可为后续更复杂的物联网项目打下基础。…

完全离线使用,效率直接拉满

现在越来越多的人使用OCR软件来提高自己的工作效率&#xff0c;今天给大家推荐一款电脑端的文字识别工具&#xff0c;对比以往的软件来说&#xff0c;功能更加丰富全面。 Umi-OCR 美术、舞蹈、音乐 打开软件之后需要安装一下。 软件主要有截图OCR识别、批量OCR识别、批量文档识…

CSDN外链失效3:

参考我之前的博客&#xff1a; 外链失效博客1&#xff1a;随想笔记1&#xff1a;CSDN写博客经常崩溃&#xff0c;遇到外链图片转存失败怎么办_csdn外链图片转存失败-CSDN博客 外链失效博客2&#xff1a;网络随想2&#xff1a;转语雀_md格式转语雀lake格式-CSDN博客 markdown…

Java 中的字符串

目录 Java 中的字符串字符串的创建字符串的比较字符串的拼接如何定义一个空的字符串 Java 中的字符串 字符串的创建 在 Java 中&#xff0c;可以通过以下几种方式创建字符串&#xff1a; 1.使用字符串字面量&#xff1a; String str "Hello, World!";2.使用 new…

U盘结构损坏且无法访问:原因、恢复方案与预防措施

U盘结构损坏现象描述 U盘&#xff0c;这一小巧便捷的存储设备&#xff0c;在日常工作和学习中扮演着重要角色。然而&#xff0c;当U盘出现结构损坏且无法访问时&#xff0c;用户往往会陷入焦虑与困惑。具体表现为&#xff0c;将U盘插入电脑后&#xff0c;系统无法识别U盘&…

basic_ios及其衍生库(附 GCC libstdc++源代码)

basic_ios及其衍生库(附 GCC libstdc源代码) 我们由这张图展开我们的讨论 对于Date对象&#xff0c;只有实现了<<重载到输出流才可以插入到stringstream ss中 现在我有疑问stringstream是怎么做到既能输出又能输入的&#xff1f; 而且为什么stringstream对象能传给ostre…

【开源库 | minizip】Linux(Ubuntu18.04)下,minizip的编译、交叉编译

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; ⏰发布时间⏰&#xff1a; 2024-12-20 …

Gin-vue-admin(1):环境配置和安装

目录 环境配置如果443网络连接问题&#xff0c;需要添加代理服务器 后端运行前端运行 环境配置 git clone https://gitcode.com/gh_mirrors/gi/gin-vue-admin.git到server文件目录下 go mod tidygo mod tidy 是 Go 语言模块系统中的一个命令&#xff0c;用于维护 go.mod 文件…

java: 无效的目标发行版: xx

java: 无效的目标发行版: xx 背景java: 无效的目标发行版: xx 在 Intellij 的修复 背景 这里单独针对Intellij开发工具对 “java: 无效的目标发行版: xx”错误的修复。 java: 无效的目标发行版: xx 在 Intellij 的修复 同一台电脑使用多个JDK的时候容易出现在运行程序时容易…

vscode+编程AI配置、使用说明

文章目录 [toc]1、概述2、github copilot2.1 配置2.2 使用文档2.3 使用说明 3、文心快码&#xff08;Baidu Comate&#xff09;3.1 配置3.2 使用文档3.3 使用说明 4、豆包&#xff08;MarsCode&#xff09;4.1 配置4.2 使用文档4.3 使用说明 5、通义灵码&#xff08;TONGYI Lin…

leetcode-80.删除有序数组的重复项II-day12

总结&#xff1a;不必过于死磕一道题目&#xff0c;二十分钟没做出来就可参考题解

Docker 入门:如何使用 Docker 容器化 AI 项目(一)

引言 在人工智能&#xff08;AI&#xff09;项目的开发和部署过程中&#xff0c;环境配置和依赖管理往往是开发者遇到的挑战之一。开发者通常需要在不同的机器上运行同样的代码&#xff0c;确保每个人使用的环境一致&#xff0c;才能避免 “在我的机器上可以运行”的尴尬问题。…

EdgeX Core Service 核心服务之 Core Command 命令

EdgeX Core Service 核心服务之 Core Command 命令 一、概述 Core-command(通常称为命令和控制微服务)可以代表以下角色向设备和传感器发出命令或动作: EdgeX Foundry中的其他微服务(例如,本地边缘分析或规则引擎微服务)EdgeX Foundry与同一系统上可能存在的其他应用程序…

Keil5 STM32库函数的工程

库函数来间接的操作寄存器 条件编译&#xff0c;如果你定义了USE_STDPERIPH_DRIVER &#xff08;使用标准外设驱动&#xff09;这个字符串&#xff0c;stm32f10x_conf.h才有效

Vue2五、自定义指令,全局局部注册、指令的值 ,插槽--默认插槽,具名插槽 ( 作用域插槽)

一、自定义指令 使用步骤 1. 注册 (全局注册 或 局部注册) &#xff0c;在 inserted 钩子函数中&#xff0c;配置指令dom逻辑 2. 标签上 v-指令名 使用 1、自定义指令&#xff08;全局&#xff09; Vue.directive("指令名"&#xff0c;{ 指令的配置项 insert…

Docker部署GitLab服务器

一、GitLab介绍 1.1 GitLab简介 GitLab 是一款基于 Git 的开源代码托管平台&#xff0c;集成了版本控制、代码审查、问题跟踪、持续集成与持续交付&#xff08;CI/CD&#xff09;等多种功能&#xff0c;旨在为团队提供一站式的项目管理解决方案。借助 GitLab&#xff0c;开发…