RocketMQ5.x版本延迟消息被重放问题调查

news2024/11/24 6:20:39

一、问题

由于目标计划是将集群从4.9.x逐步升级至5.x,故目前先对一些不重要的集群进行升级测试。
但是在4.x的broker陆续升级至5.x的过程中,发现了延迟消息被重放的问题。
具体如下:
在升级时刷新后台监控,发现竟然有写入量:
在这里插入图片描述
即上图(因为当时的问题场景没有截图,所以上图只是提供参考)红框中的生产和消费项并不为0,等启动完毕后,变为0,感觉有些诡异。
之后,查询了该集群的topic情况,这一查竟然发现好多topic出现了堆积的情况。

二、调查

1 堆积情况:
经过调查所有topic,这些topic于几日前已经不进行生产和消费,也就是生产消费完成了。
而发现堆积的topic都是之前消费有失败的,产生了重试消息的,而这些重试消息也是被消费完了的。
而我将4.x的一个broker升级到5.x后,重试的消息完全被重放了,类似如下截图:
在这里插入图片描述
%RETRY%pugc-sofa-delete-video-flush-consumer这个topic在broker-cold-1上的消息量变成了34706,而消费量为17353,堆积量跟消费量相等。

我查看了好几个topic,发现只要有重试消息的,都存在这个情况,也就是重试消息被重写了一遍!

2 日志调查
经过拿其中一个消费者:pugc-sofa-delete-video-flush-consumer进行调查,根据该消费者最后在日志中出现的位置,可以看到:

2023-06-05 15:25:18 ERROR NettyServerNIOSelector_3_2 - RemotingCommand [code=11, language=JAVA, version=395, opaque=166848104, flag(B)=0, remark=null, extFields={queueId=0, maxMsgNums=32, sysFlag=3, suspendTimeoutMillis=15000, commitOffset=17353, topic=%RETRY%pugc-sofa-delete-video-flush-consumer, queueOffset=17353, expressionType=TAG, subVersion=1685949771302, consumerGroup=pugc-sofa-delete-video-flush-consumer}, serializeTypeCurrentRPC=JSON]

commitOffset=17353,该消费者在2023-06-05消费重试消息时,消费偏移量已经达到17353。

这进一步确认了,就是因为我升级5.x导致了重试消息被重写了一遍。

我进一步走查了5.x的broker启动时关于%RETRY%pugc-sofa-delete-video-flush-consumer的日志,如下:

store.log:2023-06-07 09:39:13 INFO main - load /opt/mqcloud/broker-cold-1/data/consumequeue/%RETRY%pugc-sofa-delete-video-flush-consumer/0/00000000000000000000 OK
store.log:2023-06-07 09:39:13 INFO main - load consume queue %RETRY%pugc-sofa-delete-video-flush-consumer-0 OK
store.log:2023-06-07 09:39:14 INFO main - recover current consume queue file over,  /opt/mqcloud/broker-cold-1/data/consumequeue/%RETRY%pugc-sofa-delete-video-flush-consumer/0/00000000000000000000 0 0 0
store.log:2023-06-07 09:39:14 INFO main - recover current consume queue over /opt/mqcloud/broker-cold-1/data/consumequeue/%RETRY%pugc-sofa-delete-video-flush-consumer/0/00000000000000000000 347060

根据最后一条的347060,调查对应代码简化如下:

public void recover() {
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        int mappedFileSizeLogics = this.mappedFileSize;
        long mappedFileOffset = 0;
        while (true) {
            for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                long offset = byteBuffer.getLong();
                int size = byteBuffer.getInt();
                long tagsCode = byteBuffer.getLong();
                if (offset >= 0 && size > 0) {
                    mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                } else {
                    log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                        + offset + " " + size + " " + tagsCode);
                    break;
                }
            }
            if (mappedFileOffset == mappedFileSizeLogics) {
            } else {
                log.info("recover current consume queue over " + mappedFile.getFileName() + " "
                    + (processOffset + mappedFileOffset));
                break;
            }
        }
    }
}

347060代表的是重试队列的物理偏移量,由于consumequeue每个数据单元大小是20个字节,那么消息量为347060/20=17353。

也就是说,在broker初始化恢复数据时,重试队列的数据量还是对的。

而broker启动整个过程包括,创建->初始化->数据恢复->启动,如果数据重复,就很可能出现在启动过程中。

3 走查延迟队列代码
通过上面的调查,基本可以确认,重试队列数据刚加载时是对的,应该是启动后内部的延迟队列进行重新消费导致的。

随即对比5.x和4.x的延迟队列相关代码,至到对比到下面的代码:
在这里插入图片描述
在这里插入图片描述
即5.x的ScheduleMessageService竟然在构造方法中添加了一个持久化的定时任务,而且在start方法中又添加了一个同样的持久化的定时任务

而4.x只有在start时,先执行load(),再执行持久化定时任务。

那么5.x的持久化任务在构造方法中延迟10秒就会执行,如果10秒内,数据还没有加载完成,它执行persist必然会把存在的delayOffset.json覆盖

再start时,执行load,加载的offset就是空的,那就会认为没有消费过,进行数据重放

接着再查看store.log的日志,如下:
在这里插入图片描述
在这里插入图片描述
从2023-06-07 09:39:13启动,至到2023-06-07 09:39:24才刚恢复完数据,那么调用start方法肯定在10秒之后了。

而在测试环境中,broker没多少数据,10秒内肯定已经启动完了,所以没有发现这个问题,而线上环境数据量一般都是上百G,通常启动比较慢。

三、解决

所以定位到问题所在,就好改了,直接把5.x中ScheduleMessageService构造方法中的定时持久化代码移除即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/628239.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

日撸java三百行day61-62

文章目录 说明Day61 决策树1.什么是决策树2.什么是熵3.什么是信息增益4.详细例子1. weather样本2.第一次决策3.第二次决策4.最终决策树 4. 代码理解4.1 变量理解4.2 代码中主要方法理解 说明 闵老师的文章链接&#xff1a; 日撸 Java 三百行&#xff08;总述&#xff09;_minf…

深入源码分析RecyclerView缓存复用原理

文章目录 前言四级缓存 源码分析缓存一级缓存&#xff08;mChangedScrap和mChangedScrap&#xff09;二级缓存&#xff08;mCachedViews&#xff09;三级缓存四级缓存&#xff08;mRecyclerPool&#xff09;缓存池mRecyclerPool结构理解四级缓存简单小结 缓存流程图 复用复用流…

分布式项目15 用户注册,单点登陆dubbo来实现

分析&#xff1a;当用户填写完成注册信息之后,将请求发送给前台服务器.之后前台消费者利用dubbo框架实现RPC调用。之后将用户信息传递给jt-sso服务提供者.之后完成数据的入库操作。 01.页面url分析 02.查看页面JS $.ajax({ type : "POST", url : "/user/doRe…

MMPose安装及推理验证

MMPose安装 依赖环境 1.创建虚拟环境并激活 conda create --name openmmlab python3.8 -y conda activate openmmlab2.安装pytorch conda install pytorch torchvision torchaudio pytorch-cuda11.7 -c pytorch -c nvidia报错&#xff1a; InvalidArchiveError(‘Error wit…

quartus下联合modelsim_Altera仿真

vivado工程转换到quartus下联合modelsim仿真_内有小猪卖的博客-CSDN博客 这个博客是用单独的modelsim仿真&#xff0c;而下面的流程是使用quartus自带的modelsim-altera仿真。 版本为&#xff1a;quartus ii 13.1 64-bit 以fpga实现数码管和流水灯编码为例。数码管为1时&#x…

Spring中Bean加载流程

上面是跟踪了 getBean 的调用链创建的流程图&#xff0c;为了能够很好地理解 Bean 加载流程&#xff0c;省略一些异常、日志和分支处理和一些特殊条件的判断。 从上面的流程图中&#xff0c;可以看到一个 Bean 加载会经历这么几个阶段&#xff08;用绿色标记&#xff09;&…

机器学习算法分类(三)

在机器学习中&#xff0c;又分为监督学习、无监督学习、半监督学习、强化学习和深度学习。 监督、无监督、半监督学习 机器学习根据数据集是否有标签&#xff0c;又分为监督学习、无监督学习、半监督学习。 监督学习&#xff1a;训练数据集全部都有标签无监督学习&#xff1a…

grep命令的使用

grep命令是Linux中常用的文本搜索工具&#xff0c;它可以根据用户指定的模式&#xff0c;在文件或标准输入中查找匹配的文本行并返回。 下面是grep命令的一些常见选项&#xff1a; -i&#xff1a;忽略大小写-n&#xff1a;显示匹配行的行号-v&#xff1a;显示不匹配的行-r&am…

【新版】系统架构设计师 - 计算机系统基础知识

个人总结&#xff0c;仅供参考&#xff0c;欢迎加好友一起讨论 文章目录 架构 - 计算机系统基础知识考点摘要计算机系统计算机硬件组成浮点数Flynn分类法CISC与RISC流水线技术超标量流水线存储系统层次化存储结构CacheCache的命中率Cache的页面淘汰主存编址磁盘管理&#xff08…

《MySQL(四):基础篇- 约束》

文章目录 4. 约束4.1 概述4.2 约束演示4.3 外键约束4.3.1 介绍4.3.2 语法4.3.3 删除/更新行为 4. 约束 4.1 概述 概念&#xff1a;约束是作用于表中字段上的规则&#xff0c;用于限制存储在表中的数据。 目的&#xff1a;保证数据库中数据的正确、有效性和完整性。 分类: 约…

YOLOv5白皮书-第Y6周:模型改进

目录 一、改进网络结构设计1 改进的注意力机制2 多尺度特征融合3 改进的激活函数 二 数据增强和数据平衡1 数据增强2 数据平衡3 注意事项 三、模型融合策略1 投票策略2 加权平均策略3 特征融合策略4 其他模型融合策略 &#x1f368; 本文为&#x1f517;365天深度学习训练营 中…

protobuf实现原理

文章目录 一、前言二、概述三、数据存储方式&#xff1a;Varints(一)原理(二)举例(三)缺点 四、协议的数据结构(一)原理(二)举例 一、前言 最近刚刚从一家公司离职&#xff0c;在职的时候使用到了go语言的grpc库&#xff0c;了解了除了json之外的另一个专门用于远程调用的序列…

二本计算机专业学长经验之谈

2023.6.9 今年的行情对我们这些双非大学、二本真的太难了&#xff0c;菜鸟今年感觉毕业找的工作真的又苦逼钱又少&#xff0c;准备跳槽的&#xff0c;结果满大街投简历&#xff0c;连个毛都没有&#xff0c;唯一一个给了个海笔&#xff0c;然后就没然后… 所以希望大家真的要好…

Element的Select分组全选模式

Select 选择器选择器的分组&#xff0c;如上图所示&#xff0c;我们希望做到的效果是&#xff0c;点击“热门城市”或“城市名”的时候全选分组的options。 思路 思路一&#xff1a;目前的Select 选择器分组OptionGroup的Title只是一个文本DOM&#xff0c;没用其他东西&#…

详解基于罗德里格斯(Rodrigues)公式由旋转向量到旋转矩阵的 Python 实现

文章目录 旋转向量 rotation vector旋转矩阵 rotation matrix罗德里格斯公式 Rodrigues formula基于 Python 和 NumPy 实现 Rodrigues 公式 旋转向量 rotation vector 任何一个旋转都可以通过一个 旋转轴 加一个 旋转角 进行描述, 即围绕 旋转轴 旋转一个 旋转角. 此时可以通过…

javascript 中的 URL 解码

文章目录 需要URL编解码JavaScript 中的 URL 解码使用 unescaped() 方法解码编码的 URL使用 decodeURI() 方法解码编码的 URL使用 decodeURIComponent() 方法解码编码的 URL 总结 本文着眼于 URL 解码以及如何使用 JavaScript 对编码的 URL 进行解码。 需要URL编解码 URL 应具…

政企HTTPS加密国产化替代的四要素

信创产业是数字经济、信息安全发展的基础&#xff0c;也是“新基建”的重要内容&#xff0c;将成为拉动中国经济增长的重要抓手之一。随着国资委79号文的发布&#xff0c;国央企落实信息化系统的信创国产化改造的步伐加快&#xff0c;贯彻“28N”战略&#xff0c;从党政机关扩展…

Doris学习笔记

1.数据模型 数据模型 - Apache Doris 1.1 Aggregate 模型(聚合&#xff09; 可以发现&#xff0c;user_id、date、age ...等没有设置 AggregationType, 那么这几个字段就成了一个key了。设置了 AggregationType 字段&#xff0c;说明该列的属性已经成value了。 我们导入一张…

Linux·Binder机制原理

目录 前言 目录 1. Binder到底是什么&#xff1f; 2. 知识储备 2.1 进程空间划分 2.2 进程隔离 & 跨进程通信&#xff08; IPC &#xff09; 2.5 内存映射 3. Binder 跨进程通信机制 模型 3.1 模型原理图 3.3 模型原理步骤说明 3.4 额外说明 4. Binder机制 在An…

自学黑客(网络安全),一般人我劝你还是算了

写在开篇 笔者本人 17 年就读于一所普通的本科学校&#xff0c;20 年 6 月在三年经验的时候顺利通过校招实习面试进入大厂&#xff0c;现就职于某大厂安全联合实验室。 我为啥说自学黑客&#xff0c;一般人我还是劝你算了吧&#xff01;因为我就是那个不一般的人。 ​ 首先我…