16.RocketMQ之消费重试以及原理

news2024/10/5 19:44:27

highlight: arduino-light

1.4 消费重试

image.png

对于顺序消息,当消费者消费消息失败后,消费者会在本地自动不断进行消息重试,每次间隔时间为 1 秒,重试最大值是 Integer.MAX_VALUE。

对于无序消息(普通、定时、延时、事务消息)当消费者消费消息失败时可以通过设置返回状态达到重试的目的。

广播模式下消息队列RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。

1.4.1顺序消息自动重试:需要避免阻塞

当消费者消费消息失败后,消费者会在本地自动不断进行消息重试,每次间隔时间为 1 秒,重试最大值是 Integer.MAX_VALUE。这时,应用会出现消息消费被阻塞的情况。

因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

1.4.2无序消息重试:广播模式不会失败重投

无序消息的重试只针对集群消费方式生效。

广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

广播模式下消息队列 RocketMQ保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况,可以自己做一些重试的操作比如消费失败做持久化,使用定时器重新重试数据库中持久化的数据。

不会失败重投的原因是一旦消费失败做了消费重投,所有的消费者都需要再消费一次!

1.4.3无序消息重试:集群模式需要返回消费失败状态

对于无序消息(普通、定时、延时、事务消息)当消费者消费消息失败时可以通过设置返回状态达到重试的目的。

无序消息的重试只针对集群消费方式生效。

广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

默认消息消费重试次数:16

默认最大重试次数为16,所以一个消息最多可以消费17次。

为什么是16?

默认设置延迟级别范围为3到18。因为18个等级,从3开始重试,18-3+1 =15。

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

| 第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 | | ----- | ---------- | ----- | ---------- | | 1 | 10 秒 | 9 | 7 分钟 | | 2 | 30 秒 | 10 | 8 分钟 | | 3 | 1 分钟 | 11 | 9 分钟 | | 4 | 2 分钟 | 12 | 10 分钟 | | 5 | 3 分钟 | 13 | 20 分钟 | | 6 | 4 分钟 | 14 | 30 分钟 | | 7 | 5 分钟 | 15 | 1 小时 | | 8 | 6 分钟 | 16 | 2 小时 |

如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。

自定义消息最大重试次数

消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:

  • 最大重试次数小于等于16 次,则重试时间间隔同上表描述。
  • 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。

java Properties properties = new Properties(); //配置对应 Group ID 的最大消息重试次数为 20 次 properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); Consumer consumer =ONSFactory.createConsumer(properties);

注意:

  • 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。
  • 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
  • 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置

获取消息重试次数

消费者收到消息后,可按照如下方式获取消息的重试次数:

java public class MessageListenerImpl implements MessageListener {    @Override    public Action consume(Message message, ConsumeContext context) {        //获取消息的重试次数        System.out.println(message.getReconsumeTimes());        return Action.CommitMessage;   } }

消费失败后,重试配置方式

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

  • 返回 Action.ReconsumeLater (推荐)
  • 返回 Null
  • 抛出异常

java public class MessageListenerImpl implements MessageListener {    @Override    public Action consume(Message message, ConsumeContext context) {        //处理消息        doConsumeMessage(message);        //方式1:返回 Action.ReconsumeLater,消息将重试        return Action.ReconsumeLater;        //方式2:返回 null,消息将重试        return null;        //方式3:直接抛出异常, 消息将重试        throw new RuntimeException("Consumer Message exceotion");   } }

消费失败后,不重试配置方式

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。

java public class MessageListenerImpl implements MessageListener {    @Override    public Action consume(Message message, ConsumeContext context) {        try {            doConsumeMessage(message);       } catch (Throwable e) {            //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;            return Action.CommitMessage;       }        //消息处理正常,直接返回 Action.CommitMessage;        return Action.CommitMessage;   } }

当然 我们也可以根据

重试原理

发送到重试消费Topic 是%RETRY% + 消费组名 注意是消费组名

我们思考一下为什么是消费者组名而不是topic的名字?

A消费者组消费成功 B消费者组消费失败

如果发回原topic就有问题了,A又会消费一次。

消息消费失败会设置DelayLevel并发回重试主题,重试主题 是%RETRY% + 消费组名,发回失败会立刻重试1次。

发回重试主题时,DelayTimeLeve从3开始

我们知道RocketMQ的默认的配置是messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,分别代表延迟level1-level18,为什么不是从1开始呢?

newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());//默认的重试次数是0

所以默认重试时,实际是从3开始的。也就是第一次重试的时间间隔是10秒,这也解释了为什么是16次!3-18可不就是16次!

SCHEDULETOPICXXXX的队列名称是从2开始到17,对应的delayLevel为3到18,3对应10s,18对应2h,在类MessageStoreConfig中这样定义延时时间:

String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"。

延迟消息对应的Topic是SCHEDULETOPICXXXX,注意就是SCHEDULETOPICXXXX,XXXX不是某某某的意思。

image.png

SCHEDULETOPICXXXX 介绍 SCHEDULETOPICXXXX 是 RocketMQ 一个系统类型的 Topic,用于标识延时消息。

这个 Topic 有 18 个队列,分别唯一对应着 RocketMQ 的 18 个延时等级,对应关系为:queueId = delayTimeLevel – 1。

ScheduleMessageService 介绍 这是 Broker 中的一个延时服务,专门消费 Topic 为 SCHEDULETOPICXXXX 的延时消息,并将其投递到目标 Topic 中。

ScheduleMessageService 在启动时,会创建一个定时器 Timer,并根据延迟级别的个数,启动对应数量的 TimerTask,每个TimerTask负责一个延迟级别的消费与投递。

SCHEDULETOPICXXXX的队列名称是从2开始到17,对应的delayLevel为3到18,3对应10s,18对应2h,在类MessageStoreConfig中这样定义延时时间:

String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"。

SCHEDULETOPICXXXX这个topic只对内部使用,对于consumer只能消费到自己所在的消费者组的重试topic的数据。

比如A是OrderConsumer组的一个消费者,OrderConsumer消费的主题Topic是Order。

那么消费者A在启动的时候,会订阅Order主题的同时,还会订阅%RETRY%_OrderConsumer。

即订阅Order主题的同时,还会订阅%RETRY%+消费者组的主题。

consumer消费失败的消息发回broker后总是先写到SCHEDULETOPICXXXX里面,然后schedule service在延迟时间到了以后会读取SCHEDULETOPICXXXX里面的数据然后重新发回到重试主题,consumer订阅了重试主题,所以会重新消费失败的数据,这样就完成了一个循环。

rocketmq 先将不同延时等级的消息存入内部对应延时队列中,然后不断的从延时队列中拉取消息判断是否到期,然后进行投递到对应的topic中。

从这个过程也能看到,一个消费失败的消息体每次发回broker需要在commitLog里面存储两份。

topic为SCHEDULETOPICXXXX的一份这个主要是为schedule service控制延时用的。

topic为%RETRY%groupName的一份。

参考链接:https://blog.csdn.net/gelald/article/details/126949527

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/692576.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

湿地环境监测物联网解决方案

湿地作为生态系统的关键组成部分,发挥着涵养水源、调节气候、改善环境、维护生物多样性等生态功能。湿地提供了独特的生境,为许多鸟类、鱼类和其他野生动物提供了栖息地和食物来源。此外,湿地还具有保持水量平衡和水质净化的重要功能&#xf…

最优控制:代数黎卡提方程ARE(Algebraic Riccati Equation)

本文介绍代数黎卡提方程的Matlab解法,包括直接求解和迭代求解 问题描述: 一、数值解法 可以看出,ARE方程是关于P的一个非线性方程,当系统矩阵维度较高时,难以求解,但是MATLAB给出了求解ARE的函数care % 系…

颜色聚合向量 Color Co-ccurrence Vector 介绍以及MATLAB代码实现

这件事情的起因是我想复习一下我在亚太杯数学建模当中使用过的CCV这种方法,但是CSDN平台上找了半天都没有,所以后来决定Google一下,终于找到了,甚至还有实现的代码,因此放上来。原文见Dr. Mahmoud Attia的博客。 一、…

JAVA中的伪共享与缓存行

一.伪共享与缓存行 1.CPU缓存架构 CPU 是计算机的心脏,所有运算和程序最终都要由它来执行。 主内存(RAM)是数据存放的地方,CPU 和主内存之间有好几级缓存,因为即使直接访问主内存也是非常慢的。 CPU的速度要远远大…

一图看懂CodeArts Board 5大特性,带你玩转看板服务

华为云看板服务CodeArts Board,通过构建研发效能度量体系,实现软件研发过程的可视化、软件交付的可管理可跟踪可量化,及时识别研发过程的堵塞点和改进点,通过数据驱动运营和治理,不断提升企业的软件能力和研发效能。

详解JAVA序列化

目录 1.什么是序列化 2.JAVA中的序列化 2.1.成员变量必须可序列化 2.2.transient关键字,可避免被序列化 2.3.无法更新状态 2.4.serialVersionUID 3.JDK序列化算法 4.序列化在实际中的一些应用 1.什么是序列化 序列化就是将对象转换为二进制格式的过程。对象…

Maven安装和配置详细教程

Maven安装和配置详细教程 1、Maven简介 Maven 是 Apache 软件基金会的一个开源项目,是一个优秀的项目构建工具,它用来帮助开发者管理项目中的 jar,以及 jar 之间的依赖关系、完成项目的编译、测试、打包和发布等工作。 2、Maven下载 点击Maven下载官方地址下载Maven。或者去…

postman持续集成-Jenkins自动构建

自动构建,就是设置一个定时器,定时时间到, Jenkins 自动执行测试用例 比如说,我设置下午五点,那么jenkins就是自动执行命令,自动生成报告,后续还可加上邮箱,会把报告发至邮箱 1. Jenkins 首页,点击任务名:如&#xff…

数据库—关系代数

传统的集合运算 在数据库中的关系代数运算有以下三种基本运算 并交差 必须满足两个表之间的属性个数必须一样。(必须具有相容性) 投影与选择运算 投影:π L _L L​( R ) 解释->π是投影符号,L是R表中的属性列,从…

临时文件中转服务的搭建-chfs软件的使用

因为经常用到远程桌面连接,所以本地pc和远程pc间的文件传输一直是个经常遇到的问题,尝试过用vftp搭建ftp服务,但是该服务在许多vps上被禁用,且windows上使用也要进行设置,比较麻烦。所幸发现了ods-im/CuteHttpFileServ…

接口测试如何做?你真的会做吗?全网超全整理实战案例...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 API测试的流程 准…

分布式事务Seate

一、Seata简介 1、Seata的核心组件 TC (Transaction Coordinator)事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。TM (Transaction Manager)事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。RM (Resourc…

2023下半年北京/上海/深圳软考(中/高级)认证招生

软考是全国计算机技术与软件专业技术资格(水平)考试(简称软考)项目,是由国家人力资源和社会保障部、工业和信息化部共同组织的国家级考试,既属于国家职业资格考试,又是职称资格考试。 系统集成…

Docker安装与启动

Docker安装与启动 文章目录 Docker安装与启动前言容器与虚拟机比较 1、安装Docker2、设置ustc的镜像3、Docker的启动与停止总结 前言 容器与虚拟机比较 虚拟机(VM)是计算机系统的仿真。简而言之,它可以在实际上是一台计算机的硬件上运行看起…

Docker教程

Docker 能解决的问题 ⾸先,我们先来看⼏个问题: 1. 合作开发的时候,在本机可以运⾏,在别⼈的电脑上跑不起来。 这⾥我们以 Java Web 应⽤程序为例,⼀个 Java Web 应⽤程序涉及很多东⻄,⽐如 JDK 、 Tomc…

机器学习 day21(Tensorflow代码实现)

Tensorflow代码实现 在tensorflow中训练神经网络模型的步骤:第一步:指定模型,并告诉tensorflow按何种方式计算。第二步:使用特定的损失函数编译模型。第三步:训练模型

企业知识管理要怎么做,才能清晰有序?

在当今快速变化的商业环境中,企业知识管理的重要性日益凸显。有效的知识管理可以帮助企业整理、保存和传递知识,提高员工的工作效率和创新能力,从而为企业获得竞争优势奠定基础。本文将介绍企业在进行知识管理时应采取的措施,以确…

微型导轨的使用寿命能达到多久?

微型导轨,顾名思义就是体积很小的导轨,一般是应用在小型化设备中的,像半导体设备,医疗设备,IC制造设备,X-Y table,精密测量及检测仪器,高速皮带驱动设备,高速移载设备等都…

传说中,让测试猿分分钟心酸的五大谣言

谣传1:测试无聊 综观现今软件测试的一些轶事,我对某些错误想法的频繁出现感到吃惊。尽管有很多可以罗列,但是我还是想分享测试的五个最常见的谣传(基于我短暂的经验)。我发现前三个盛行于一些主流的新闻文章&#xff…

STM32F407系统时钟的配置和查看方法

1、系统时钟的来源 STM32F407具有两个PLL,用于产生不同的时钟信号。这里主要来讨论主PLL时钟。主PLL时钟的时钟源有两个信号,分别是上边提到的HIS信号和HSE信号。PLL通过把这两个信号倍频,分频等达到更高频率的时钟信号。一般来说&#xff0…