关于Netty使用中黏包拆包带来报错问题及解决

news2024/9/24 7:24:27

文章目录

    • 问题现象
    • 解决
    • 总结

问题现象

  • 业务场景:雷达作为客户端,平台作为服务端,采用TCP/IP协议的socket连接,数据包采用字节的二进制数据传输
  • 平台与雷达的通信和数据解析,在我接手时,已经开发完成,我接手负责维护和继续开发
  • 由于日常工作忙于大数据分析和指标计算,未对使用正常的通信代码做什么改动,在客户现场使用后发现了一些问题(公司测试没问题)
  • 问题1:雷达实时监测过车车辆,将数据通过socket上报给平台。但平台存入的过车,与现场视频相比,数量上明显偏少,怀疑存在数据丢失现象
  • 问题2:程序刚启动时,产生以下报错,后续也有偶尔类似报错
2022-12-19 10:40:57.215 ERROR 1 --- [ntLoopGroup-3-9] c.n.s.listener.DefaultExceptionListener  : Can't parse 115

java.lang.IllegalArgumentException: Can't parse 115
	at com.newatc.socketio.protocol.PacketType.valueOfInner(PacketType.java:63)
	at com.newatc.socketio.protocol.PacketDecoder.decodePackets(PacketDecoder.java:21)
	at com.newatc.socketio.handler.InPacketHandler.channelRead0(InPacketHandler.java:66)
	at com.newatc.socketio.handler.InPacketHandler.channelRead0(InPacketHandler.java:33)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at com.newatc.socketio.handler.AuthorizeHandler.channelRead(AuthorizeHandler.java:149)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Unknown Source)
	

解决

  • 针对问题2,追踪业务代码,发现是解析到的数据包类型出错,不在规定范围内。由于我们的平台和雷达设备都在客户内网内(不通互联网),基本不会有第三方数据侵入,数据上报类型不应该超出规定范围,肯定是平台数据接收解析出了问题
  • 针对问题1,用了另一个工具(雷达上位机软件)接收雷达数据,发现车辆数量正常,初步定位为平台处理socket上报信息时丢失了部分数据
  • 针对问题1,是在客户现场实施时发现的问题,在公司测试环境接入雷达测试时没有问题。对比分析了下两个场景,发现客户的路口数量较多,雷达数量也较多,同时这些路口实时过车也较多。
  • 雷达监测到过车就会触发数据上报,当数据量较大时,单个数据包超过最大长度就会进行拆包,分多个数据包上报
  • 查看了平台已有代码,使用的是Netty,但在数据接收时,没有进行包头校验、长度校验。对于拆包后的数据包(前面几个字节不是包头),跳过包头长度直接进行包类型校验会失败报错(此字节位置可能是负载的实际数据内容,而不是包头包类型),出错就丢掉数据,就会导致上面2个问题
    在这里插入图片描述
  • 先看下优化前,有问题的代码,很粗糙,很明显,但当过车较少,单个数据包包含所有信息时没问题
    // 数据接收经过的一个解码器 InPacketHandler 的接收处理方法
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, PacketsMessage message) throws Exception {
        ByteBuf content = message.getContent();
        ClientHead client = message.getClient();

        if (log.isTraceEnabled()) {
            log.trace("In message: {} sessionId: {}", content.toString(CharsetUtil.UTF_8), client.getSessionId());
        }
        while (content.isReadable()) {
            try {
                Packet packet = decoder.decodePackets(content, client);
                if(packet == null)
                    continue;
                Namespace ns = namespacesHub.get(packet.getNsp());

                NamespaceClient nClient = client.getChildClient(ns);
                if (nClient == null) {
                    log.debug(
                        "Can't find namespace client in namespace: {}, sessionId: {} probably it was disconnected.",
                        ns.getName(),
                        client.getSessionId()
                    );
                    return;
                }
                packetListener.onPacket(packet, nClient);
            } catch (Exception ex) {
                String c = content.toString(CharsetUtil.UTF_8);
                log.error("Error during data processing. Client sessionId: " + client.getSessionId() + ", data: " + c, ex);
                throw ex;
            }
        }
        // 读取完成后,释放ByteBuf,防止内存溢出LEAK
        content.release();
    }
    
    // 上面channelRead0方法里对应的具体解码方法
    public Packet decodePackets(ByteBuf buf, ClientHead client) {
        // 前七个字节为 : 标志(4)-负载长度(2)-协议版本号(1),未校验,直接跳过了
        buf.skipBytes(7);
        Packet packet = null;
        // 第8个字节是包类型
        short pType = buf.readUnsignedByte();
        PacketType packetSubType = PacketType.valueOfInner(pType);
        // 第 9、10字节是校验位和预留位,也跳过了
        buf.skipBytes(2);
        // 这里也没进行负载长度校验,只校验了是否还有可读字节
        if (buf.readableBytes() < 1){
            // 直接返回了,已读的字节也没有回滚回去,这样后面数据包上报依然不完整,解析依然会有问题
            return packet;
        }
        short oID = buf.readUnsignedByte();
        ObjectId objectId = ObjectId.valueOf(oID);

        switch (packetSubType) {
            case QUERY_RESULT:
                break;
            case REPLY:
                break;
            case REPORT:
                packet = new Packet(PacketType.MESSAGE);
                packet.setSubType(packetSubType);
                packet.setObjectId(objectId);

                switch (objectId){
                    case REALTIME_DATA:
                        packet.setName(EventName.REALTIME_DATA);
                        break;
                    case PASSING_VEHICLE:
                        packet.setName(EventName.PASSING_VEHICLE);
                        break;
                    case TRAFFIC_STATUS:
                        packet.setName(EventName.TRAFFIC_STATUS);
                        break;
                    case TRAFFIC_STATS:
                        packet.setName(EventName.FLOW_STATS);
                        break;
                    case PERFORMANCE:
                        packet.setName(EventName.PERFORMANCE);
                        break;
                    case TRAFFIC_EVENT:
                        packet.setName(EventName.TRAFFIC_EVENT);
                        break;
                    case RADAR_FAULT:
                        packet.setName(EventName.RADAR_FAULT);
                        break;
                }
                // 解析数据包里负载的数据体
                Object o = readReportObject(objectId, buf);
                List<Object> args = new ArrayList<>();
                args.add(o);
                packet.setData(args);
                break;
            case HEART_BEAT:
                packet = new Packet(PacketType.PING);
                packet.setSubType(packetSubType);
                packet.setObjectId(ObjectId.HEART_BEAT_FROM_RADAR);

                break;
            default:
                break;
        }

        return packet;
    }

  • 优化后的代码,主要做了包头校验和负载长度校验
  • 如果不是包头则舍弃,直到读到包头开始解析,这是为了防止程序启动时接收到的就是一个不完整包
  • 读取完包头后,如果剩余字节长度小于负载长度,回滚游标到包头位置,返回,继续等待下一个数据包上报再一起解析
    public Packet decodePackets(ByteBuf buf, ClientHead client) {
        // 将起始位置记录下,后面如果不完整,读取失败,回复到此位置
        int savedReaderIndex = buf.readerIndex();

        // 前七个字节为 : 标志CYRC(4)-负载长度(2)-协议版本号(1),除了负载长度,其他没有用,直接跳过
        // 读取前四个字节,如果不是包头,则继续往下读
        byte head1 = buf.readByte();
        if (head1 != 0x43) {
            logger.info("第一个字符不是 C ,舍弃");
            return null;
        }
        byte head2 = buf.readByte();
        if (head2 != 0x59) {
            logger.info("第二个字符不是 Y ,舍弃");
            return null;
        }
        byte head3 = buf.readByte();
        if (head3 != 0x52) {
            logger.info("第三个字符不是 R ,舍弃");
            return null;
        }
        byte head4 = buf.readByte();
        if (head4 != 0x43) {
            logger.info("第四个字符不是 C ,舍弃");
            return null;
        }
        // 读取负载长度
        int dataLength = buf.readUnsignedShort();
        buf.skipBytes(1);
        // 第8个字节为包类型
        short pType = buf.readUnsignedByte();
        PacketType packetSubType = PacketType.valueOfInner(pType);
        // 第9/10字节,为校验位(1)-Reserve(1),暂未实际使用,直接跳过了
        buf.skipBytes(2);
        // 后续数据长度小于负载长度,则等待后面的数据上报再一起解析
        if (buf.readableBytes() < dataLength) {
            buf.readerIndex(savedReaderIndex);
            logger.info("剩余消息长度,小于负载(消息内容)长度 ,重置游标返回,等待后面数据上报一起解析");
            return null;
        }
        // 包头十个字节,后面为负载(消息内容),负载的第一个字节为对象标识,剩余为对象数据内容
        Packet packet = null;
        short oID = buf.readUnsignedByte();
        ObjectId objectId = ObjectId.valueOf(oID);

        switch (packetSubType) {
            case QUERY_RESULT:
                break;
            case REPLY:
                break;
            case REPORT:
                packet = new Packet(PacketType.MESSAGE);
                packet.setSubType(packetSubType);
                packet.setObjectId(objectId);

                switch (objectId) {
                    case REALTIME_DATA:
                        packet.setName(EventName.REALTIME_DATA);
                        break;
                    case PASSING_VEHICLE:
                        packet.setName(EventName.PASSING_VEHICLE);
                        break;
                    case TRAFFIC_STATUS:
                        packet.setName(EventName.TRAFFIC_STATUS);
                        break;
                    case TRAFFIC_STATS:
                        packet.setName(EventName.FLOW_STATS);
                        break;
                    case PERFORMANCE:
                        packet.setName(EventName.PERFORMANCE);
                        break;
                    case TRAFFIC_EVENT:
                        packet.setName(EventName.TRAFFIC_EVENT);
                        break;
                    case RADAR_FAULT:
                        packet.setName(EventName.RADAR_FAULT);
                        break;
                }

                Object o = readReportObject(objectId, buf);
                List<Object> args = new ArrayList<>();
                args.add(o);
                packet.setData(args);
                break;
            case HEART_BEAT:
                packet = new Packet(PacketType.PING);
                packet.setSubType(packetSubType);
                packet.setObjectId(ObjectId.HEART_BEAT_FROM_RADAR);

                break;
            default:
                break;
        }

        return packet;
    }

总结

在使用Netty进行TCP数据传输时,由于TCP是一个面向流的协议,消息会被拆分成多个字节流进行发送,因此接收方收到消息时,可能会出现黏包和拆包现象。

黏包指的是接收方一次性收到了多个完整的消息,而拆包则是接收方收到了不完整的消息。这种现象的出现是由于TCP是面向流的、无边界的协议,不保留数据报的边界。

为了解决黏包和拆包问题,Netty提供了多种解决方法:

  1. 消息定长:即发送方发送的每个消息长度固定,接收方接收到固定长度的字节流后进行消息的解析,这个是一种简单有效的实现方法。但是存在一个问题,不同消息长度不同,如心跳消息和数据上报消息,消息定长则部分消息必须填充补偿,显得浪费带宽。

  2. 消息分隔符:发送方在每个消息后添加特殊的分隔符,接收方根据分隔符对消息进行解码和拆分。使用这种实现方式时,要确保分隔符是唯一的,不会和消息内容重复,造成错误分隔。

  3. 消息头部标识:发送方在消息头部加入固定长度的表示消息长度的字段,接收方根据消息长度字段对消息进行解码和拆分。这是一种比较通用的实现,消息头部可以加一些标识、负载长度、校验位等,就可以正确识别到包头位置和对数据完整性进行校验。

  4. 基于Netty编解码器:Netty提供了一系列编解码器,可以自定义编解码器来控制消息的长度和格式。我们按照自己的诗句情况进行参数配置后,就可以实现自动的黏包和拆包处理。

总的来说,Netty提供了多种方式来解决黏包和拆包问题,我们可以根据业务需求选择合适的方式进行实现。只要双方约定了通信协议且严格按照协议发送数据,并且代码已经处理了黏包拆包,数据解析应该就没有问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/452501.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年Q1天猫空调品牌销量排行榜

如今&#xff0c;空调的普及水平较高&#xff0c;空调行业进入存量换新为主的发展阶段。 根据鲸参谋数据分析平台的相关数据显示&#xff0c;2023年Q1在天猫平台上&#xff0c;空调的销量将近100万件&#xff0c;销售额将近30亿&#xff0c;同时&#xff0c;空调产品的产品均价…

浅谈CRM系统:优化企业管理,提高客户满意度!

一、什么是 CRM&#xff1f; CRM 是 Customer Relationship Management&#xff0c;即客户关系管理的缩写。它是一套用于帮助企业与客户建立和维护良好关系的系统。在 CRM 系统中&#xff0c;将客户的信息集成在一起&#xff0c;包括其历史交易记录、活动记录、沟通记录以及个…

深入探讨Linux驱动开发:Linux设备树

文章目录 一、设备树介绍二、设备树框架1.设备树框架2.节点基本格式3.节点部分属性简介 总结 一、设备树介绍 设备树&#xff08;Device Tree&#xff0c;简称 DT&#xff09;是一种在嵌入式系统中描述硬件设备的一种数据结构和编程语言。它用于将硬件设备的配置信息以树形结构…

介绍与评测Intel HLE与RTM技术

HLE&#xff08;即Hardware Lock Elision&#xff0c;硬件锁省略&#xff09;以及RTM&#xff08;即Restricted Transactional Memory&#xff0c;受限的事务性存储器&#xff09;是Intel在x86微架构中所引入的两条指令集系统&#xff0c;它们均属于TSX&#xff08;Transaction…

Unity日记22(携程概念)

目录 学习视频 携程 1异步 2调用方法 3优点 4停止方法 5返回值 实例&#xff1a;每过一秒打印当前运行时间 实例&#xff1a;停止数字打印携程 错误方法&#xff1a;&#xff08;携程只能开一个&#xff09; 参考方法 学习视频 https://www.bilibili.com/video/BV1eu…

什么是轻量云服务器,有哪些优势,适用于什么场景?

​  随着互联网的迅速发展&#xff0c;越来越多的企业选择将自己的业务部署到云服务器上。而轻量级云服务器作为云计算领域的一种新的服务模式&#xff0c;也开始受到越来越多企业的青睐。那么&#xff0c;究竟何为轻量级云服务器?其优势又在哪里?又适用于哪些场景?以下将…

前端--移动端--3移动web开发rem适配布局

目标&#xff1a; 能够适应rem单位 能够使用媒体查询的基本语法 能够使用less的基本语法 能够使用less中的嵌套 能够使用less中的运算 能够使用2种rem适配方案 能够独立完成苏宁移动端首页 目录&#xff1a; rem基础 媒体查询 less基础 rem适配方案 苏宁首页案例…

如何修改AS2接收的文件名?

知行之桥EDI系统的AS2端口&#xff0c;负责接收和发送EDI文件。企业通过AS2端口接收来自交易伙伴的文件时&#xff0c;其文件名会和交易伙伴发出的文件名完全一致&#xff1b;如果交易伙伴发过来的请求中没有文件名或者文件名没有出现在AS2 协议规定的位置&#xff0c;AS2端口会…

K_A33_001 基于STM32等单片机驱动RC522射频卡 读写IC卡 串口显示

K_A33_001 基于STM32等单片机驱动RC522射频卡 读写IC卡 串口显示 所有资源导航一、资源说明二、基本参数参数引脚说明 三、驱动说明时序:对应程序: 四、部分代码说明1、接线引脚定义1.1、STC89C52RCRC522射频模块1.2、STM32F103C8T6RC522射频模块 五、基础知识学习与相关资料下…

Django分页+增删改查

Django分页增删改查 演示 源码下载地址&#xff1a;https://download.csdn.net/download/qq_35622606/87719248 其他小笔记&#xff1a; django-admin.exe startproject mysite python .\manage.py startapp app01 python manage.py makemigrations python manage.py migrate…

Docker系列---Docker Compose | 容器编排 | 理论详解

目录 1.Docker Compose 概述&#xff08;YML&#xff09; 2.Docker Compose 安装 3.Docker Compose 配置常用字段 4.Docker Compose 常用命令 5.基于 Compose 创建 镜像 1.首先安装好Compose 2.使用Dockerfile环境&#xff1a; 1.Docker Compose 概述&#xff08;YML&am…

docker + K8S + Rancher + Harbor的安装

docker K8S Rancher Harbor的安装 1. 系统初始化 关闭防火墙 systemctl stop firewalld禁用防火墙开机自启 systemctl disable firewalld永久-关闭selinux sed -i s/enforcing/disabled/ /etc/selinux/config # 重启 reboot关闭swap分区 # 永久 sed -ri s/.*swap.*/#&/…

重启电脑数据丢失怎么恢复?这篇指南很受用!

案例分享&#xff1a;“你好&#xff0c;我重新启动了我的win10电脑&#xff0c;电脑为什么再次开机后&#xff0c;下载的软件就不见了&#xff1f;不仅如此&#xff0c;我的文档也消失不见了&#xff0c;这令我很困惑。请问重启电脑数据丢失怎么恢复&#xff1f;请大家帮帮我&…

系统分析师考试,信息安全常错题

计算机网络---其他 Stub区域是一种比较特殊的区域&#xff0c;因为它不能像其他区域那样&#xff0c;经过该区域中的ABR接收其他OSPF AS路由。在Stub区域的内部路由器仅需要配置一条到达该区域ABR的默认路由&#xff08;0.0.0.0.0.0.0.0&#xff09;来实现与同一AS中不同区域间…

CloudCompare插件开发之如何设计界面ui与功能实现?

文章目录 0.引言1.使用文件说明2.添加界面ui相关文件到插件目录3.修改工程相关文件并生成4.结果展示 0.引言 CloudCompare源代码编译成功后&#xff0c;即可进行二次开发&#xff0c;可通过修改源码实现二次开发基础功能&#xff08;见&#xff1a;CloudCompare如何进行二次开发…

#mysql binlog 备份恢复数据流程#

模式&#xff1a;mysql全量备份binlog日志完整恢复数据 首先&#xff0c;数据库在误操作之前必须已经开启了binlog日志功能&#xff0c;且binlog日志的保存周期必须大于全备份的时间周期&#xff01; 所谓恢复&#xff0c;就是让将全备份的数据全部恢复后&#xff0c;再使用my…

HTB-Time

HTB-Time 信息收集80端口 立足pericles -> root 信息收集 80端口 有两个功能&#xff0c;一个是美化JSON数据。 一个是验证JSON&#xff0c;并且输入{“abc”:“abc”}之类的会出现报错。 Validation failed: Unhandled Java exception: com.fasterxml.jackson.core.JsonPa…

当⻉借⼒阿⾥云落地云原⽣架构转型,运维降本、效率稳定性双升

作者&#xff1a;当贝技术团队 随着业务飞速发展&#xff0c;当贝的传统 IT 资产也渐显臃肿&#xff0c;为了避免制约发展的瓶颈&#xff0c;痛定思痛&#xff0c;技术团队果断变革&#xff1a;核心业务云原生化之后&#xff0c;运维效率、整体稳定性和研发效率均得到了全面提…

网络基础知识

网络基础知识 一、什么是二层互通与三层互通&#xff1f;1.1 二层网络互通1.2 三层网络互通 二、什么是Overlay网络&#xff1f;2.1 Underlay网络2.2 Overlay网络2.3 Underlay网络 vs Overlay网络 三、什么是SNMP&#xff1f;3.1 SNMP概念3.2 为什么需要SNMP&#xff1f;3.3 SN…

基于ubuntu18.04.6 LTS服务器安装nvidia驱动

1对于一个刚刚配置的服务器&#xff0c;首先nvidia-smi&#xff0c;自然无法显示Driver Version、最高cuda版本等信息。 nvidia-smi: command not found 需要我们自己安装nvidia驱动 2禁用老驱动 禁用自带nouveau驱动 sudo vim /etc/modprobe.d/blacklist.conf 打开后在CONF文…