RocketMQ 消费重试

news2025/1/25 4:40:15

消费者出现异常,消费某条消息失败时, Apache RocketMQ 会根据消费重试策略重新投递该消息进行故障恢复。本文介绍消费重试机制的原理、版本兼容性和使用建议。

一、应用场景​

Apache RocketMQ 的消费重试主要解决的是业务处理逻辑失败导致的消费完整性问题,是一种为业务兜底的策略,不应该被用做业务流程控制。建议以下消费失败场景使用重试机制:

推荐使用消息重试场景如下:

  • 业务处理失败,且失败原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。

  • 消费失败的原因不会导致连续性,即当前消息消费失败是一个小概率事件,不是常态化的失败,后面的消息大概率会消费成功。此时可以对当前消息进行重试,避免进程阻塞。

典型错误使用场景如下:

  • 消费处理逻辑中使用消费失败来做条件判断的结果分流,是不合理的,因为处理逻辑已经预见了一定会大量出现该判断分支。

  • 消费处理中使用消费失败来做处理速率限流,是不合理的。限流的目的是将超出流量的消息暂时堆积在队列中达到削峰的作用,而不是让消息进入重试链路。

二、应用目的​

消息中间件做异步解耦时的一个典型问题是如果下游服务处理消息事件失败,如何保证整个调用链路的完整性。Apache RocketMQ 作为金融级的可靠业务消息中间件,在消息投递处理机制的设计上天然支持可靠传输策略,通过完整的确认和重试机制保证每条消息都按照业务的预期被处理。

了解 Apache RocketMQ 的消息确认机制以及消费重试策略可以帮助您分析如下问题:

  • 如何保证业务完整处理消息:了解消费重试策略,可以在设计实现消费者逻辑时保证每条消息处理的完整性,避免部分消息出现异常时被忽略,导致业务状态不一致。

  • 系统异常时处理中的消息状态如何恢复:帮助您了解当系统出现异常(宕机故障)等场景时,处理中的消息状态如何恢复,是否会出现状态不一致。

三、消费重试策略概述​

消费重试指的是,消费者在消费某条消息失败后,Apache RocketMQ 服务端会根据重试策略重新消费该消息,超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。

消息重试的触发条件

  • 消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。

  • 消息处理超时,包括在PushConsumer中排队超时。

消息重试策略主要行为

  • 重试过程状态机:控制消息在重试流程中的状态和变化逻辑。

  • 重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间。

  • 最大重试次数:消息可被重试消费的最大次数。

消息重试策略差异

根据消费者类型不同,消息重试策略的具体内部机制和设置方法有所不同,具体差异如下:

消费者类型重试过程状态机重试间隔最大重试次数
PushConsumer已就绪 处理中 待重试 提交 * 死信消费者分组创建时元数据控制。 无序消息:阶梯间隔 顺序消息:固定间隔时间消费者分组创建时的元数据控制。
SimpleConsumer已就绪 处理中 提交 死信通过API修改获取消息时的不可见时间。消费者分组创建时的元数据控制。

具体的重试策略,请参见下文PushConsumer消费重试策略和SimpleConsumer消费重试策略。

四、PushConsumer消费重试策略​

重试状态机

PushConsumer消费消息时,消息的几个主要状态如下:

Push消费状态机

  • Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。

  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。

  • WaitingRetry:待重试状态,PushConsumer独有的状态。当消费者消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。

  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。

  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

消息重试过程中,每次重试消息状态都会经过已就绪>处理中>待重试的变化,两次消费间的间隔时间实际由消费耗时及重试间隔控制,消费耗时的最大上限受服务端系统参数控制,一般不应该超过上限时间。

消息间隔时间

最大重试次数

PushConsumer的最大重试次数由消费者分组创建时的元数据控制,具体参数,请参见消费者分组。

例如,最大重试次数为3次,则该消息最多可被投递4次,1次为原始消息,3次为重试投递次数。

重试间隔时间

  • 无序消息(非顺序消息):重试间隔为阶梯时间,具体时间如下:

    第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间
    110秒97分钟
    230秒108分钟
    31分钟119分钟
    42分钟1210分钟
    53分钟1320分钟
    64分钟1430分钟
    75分钟151小时
    86分钟162小时

信息

若重试次数超过16次,后面每次重试间隔都为2小时。

  • 顺序消息:重试间隔为固定时间,具体取值,请参见参数限制。

使用示例

PushConsumer触发消息重试只需要返回消费失败的状态码即可,当出现非预期的异常时,也会被SDK捕获。

SimpleConsumer simpleConsumer = null;
        //消费示例:使用PushConsumer消费普通消息,如果消费失败返回错误,即可触发重试。
        MessageListener messageListener = new MessageListener() {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                System.out.println(messageView);
                //返回消费失败,会自动重试,直至到达最大重试次数。
                return ConsumeResult.FAILURE;
            }
        };
            

五、SimpleConsumer消费重试策略​

重试状态机

SimpleConsumer消费消息时,消息的几个主要状态如下:

SimpleConsumer状态机

  • Ready:已就绪状态。消息在Apache RocketMQ服务端已就绪,可以被消费者消费。

  • Inflight:处理中状态。消息被消费者客户端获取,处于消费中还未返回消费结果的状态。

  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。

  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

和PushConsumer消费重试策略不同的是,SimpleConsumer消费者的重试间隔是预分配的,每次获取消息消费者会在调用API时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。 

simpleconsumer重试

由于不可见时间为预分配的,可能和实际业务中的消息处理时间差别较大,您可以通过API接口修改不可见时间。

例如,您预设消息处理耗时最多20 ms,但实际业务中20 ms内消息处理不完,您可以修改消息不可见时间,延长消息处理时间,避免消息触发重试机制。

修改消息不可见时间需要满足以下条件:

  • 消息处理未超时

  • 消息处理未提交消费状态

如下图所示,消息不可见时间修改后立即生效,即从调用API时刻开始,重新计算消息不可见时间。 

修改不可见时间

最大重试次数

SimpleConsumer的最大重试次数由消费者分组创建时的元数据控制,具体参数,请参见消费者分组。

消息重试间隔

消息重试间隔=不可见时间-消息实际处理时长

SimpleConsumer 的消费重试间隔通过消息的不可见时间控制。例如,消息不可见时间为30 ms,实际消息处理用了10 ms就返回失败响应,则距下次消息重试还需要20 ms,此时的消息重试间隔即为20 ms;若直到30 ms消息还未处理完成且未返回结果,则消息超时,立即重试,此时重试间隔即为0 ms。

使用示例

SimpleConsumer 触发消息重试只需要等待即可。

 //消费示例:使用SimpleConsumer消费普通消息,如果希望重试,只需要静默等待超时即可,服务端会自动重试。
        List<MessageView> messageViewList = null;
        try {
            messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                //如果处理失败,希望服务端重试,只需要忽略即可,等待消息再次可见后即可重试获取。
            });
        } catch (ClientException e) {
            //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
            e.printStackTrace();
        }

六、使用建议​

合理重试,避免因限流等诉求触发消费重试

上文应用场景中提到,消息重试适用业务处理失败且当前消费为小概率事件的场景,不适合在连续性失败的场景下使用,例如消费限流场景。

  • 错误示例:如果当前消费速度过高触发限流,则返回消费失败,等待下次重新消费。

  • 正确示例:如果当前消费速度过高触发限流,则延迟获取消息,稍后再消费。

合理控制重试次数,避免无限重试

虽然Apache RocketMQ支持自定义消费重试次数,但是建议通过减少重试次数+延长重试间隔来降低系统压力,避免出现无限重试或大量重试的情况。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1372890.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Web开发SpringBoot SpringMVC Spring的学习笔记(包含开发常用工具类)

开发框架学习笔记 一.Spring SpringMVC SpringBoot三者的联系SpringMVC工作原理 二.SpringBoot的学习2.1 注解2.1.1 SpringBoot的核心注解2.1.2 配置导入注解(简化Spring配置写XML的痛苦)Configuration和Bean(人为注册Spring 的 Bean)Import(补)ImportResource(补)AutowiredQua…

盖子的c++小课堂——第二十四讲:差分数组

前言 嗨嗨嗨&#xff0c;这里是盖子的小课堂哟&#xff0c;这次更新主要是因为快放假了&#xff0c;时间多了&#xff0c;好嘞&#xff0c;废话不多说&#xff0c;点赞评论拿来吧你~ 差分数组 一维差分数组 假设给你一个数组 nums &#xff0c;先对区间 [a,b] 中每个元素加…

深兰科技AI医疗健康产品获3000台采购订单

12月6日&#xff0c;武汉某企业与深兰科技签署协议&#xff0c;一次性采购3000台深兰科技AI生理健康检测仪——扁鹊。 深兰科技AI生理健康检测仪——扁鹊是深兰科技推出的人体生理指标检测产品。基于AI生物技术、融合互联网医疗及AIoT技术&#xff0c;深兰科技AI生理健康检测仪…

STM32入门教程-2023版【3-4】按键控制制LED

关注 点赞 不错过精彩内容 大家好&#xff0c;我是硬核王同学&#xff0c;最近在做免费的嵌入式知识分享&#xff0c;帮助对嵌入式感兴趣的同学学习嵌入式、做项目、找工作! 这篇文章以项目代码的形式实现GPIO输入 一、按键控制LED &#xff08;1&#xff09;搭建面包板电…

Java中输入和输出处理(三)二进制篇

叮咚&#xff01;加油&#xff01;马上学完 读写二进制文件Data DataInputStream类 FilFeInputStream的子类 与FileInputStream类结合使用读取二进制文件 DataOutputStream类 FileOutputStream的子类 与FileOutputStream类结合使用写二进制文件 读写二进制代码 package 面…

国产AI工具钉钉AI助理:开启个性化助手服务的新篇章

钉钉AI助理是钉钉平台的一项功能&#xff0c;它可以根据用户的需求提供个性化的AI助手服务。用户可以在AI助理页面一键创建个性化的AI助理&#xff0c;如个人的工作AI助理、旅游AI助理、资讯AI助理等。企业也可以充分使用企业所沉淀的知识库和业务数据&#xff0c;在获得授权后…

文字转语音在线合成系统源码 附带完整的安装部署教程

现如今&#xff0c;文字转语音&#xff08;TTS&#xff09;技术逐渐成为人们获取信息的重要手段之一。然而&#xff0c;市面上的TTS工具大多需要下载安装&#xff0c;且功能较为单一&#xff0c;无法满足用户多样化的需求。因此&#xff0c;开发一款功能强大、易于部署的文字转…

共享wifi项目如何加盟?

共享wifi贴项目如何加盟呢&#xff1f;具体的途径在哪里&#xff0c;费用是多少呢&#xff1f;今天小编就来一次性同你讲清楚。 我们先来讲一下共享wifi贴的加盟方法。 首先&#xff0c;找到共享wifi的官方渠道在点击右上角&#xff0c;根据页面上的信息填写资料。 然后&…

长期使用外接键盘,外物压着自带键盘,容易导致华硕飞行堡垒FX53VD键盘全部失灵【除电源键】

华硕飞行堡垒FX53VD键盘全部失灵【除电源键】 前言一、故障排查二、发现问题三、使用方法总结 前言 版本型号&#xff1a; 型号 ASUS FX53VD&#xff08;华硕-飞行堡垒&#xff09; 板号&#xff1a;GL553VD 故障情况描述&#xff1a; 键盘无法使用&#xff0c;键盘除开机键外…

每日学习更新(LQR+iLQR)

一直想更新一下根据cost to go来推导LQR&#xff0c;之前的话可能会直接套问题&#xff0c;但是对于理论有些困惑&#xff0c;正好最近在学习ilqr轨迹生成/优化&#xff0c;因此来推一下公式&#xff0c;以下参考B站Dr_CAN&#xff0c;链接如下&#xff1a; 【最优控制】5_线性…

基于arcgis的遥感深度学习数据集制作

由于很多时候&#xff0c;我们在研究过程中往往需要根据实际情况使用自己的影像数据来提取目标物&#xff0c;如果没有合适的公开数据集的话&#xff0c;为了满足实际需要&#xff0c;我们就需要制作符合自己要求的数据集。 今天我们就根据实际情况来详细讲解如何利用arcgis&am…

视频分割软件,视频批量分割,轻松搞定

有时候&#xff0c;为了特定的需求&#xff0c;我们需要对视频进行分割&#xff0c;传统的方法是使用视频编辑软件&#xff0c;逐个进行操作&#xff0c;费时又费力。那么&#xff0c;有没有一种方法可以批量分割视频&#xff0c;轻松高效地完成任务呢&#xff1f;这个确实是有…

虚拟机安装intel架构的银河麒麟V10(SP1)

一 背景 银河麒麟是国产操作系统之一&#xff0c;是基于Linux内核的桌面操作系统&#xff0c;有自己的应用中心&#xff0c;具有一定的生态系统。今从官网下载了V10&#xff08;SP1&#xff09;镜像文件&#xff0c;在Windowns的VMware虚拟机上安装试用。 官网&#xff1a;http…

JMeter之Windows安装

JMeter之Windows安装 一、安装JDK二、安装JMeter1、下载JMeter2、配置环境变量3、验证JMeter 三、扩展知识1、汉化 一、安装JDK 略 二、安装JMeter 1、下载JMeter 官网地址&#xff1a;https://jmeter.apache.org/download_jmeter.cgi 放到本地目录下 2、配置环境变量 变量…

RTX20系开启超分辨率

我的显卡是2070s现在也支持了超分辨率,根据网上的教程一通折腾后发现了不少的坑,记一下.希望有缘人可以少走点弯路. 机器配置如下: [CPU] CPU #1: 3600MHz, AMD Ryzen 7 3700X 8-Core Processor , AMD64 Family 23 Model 113 Stepping Instruction set: MMX MMX…

Camunda外部任务

外部任务&#xff1a;即任务可以在外部系统进行审批。 一&#xff1a;pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instan…

大模型LLM在 Text2SQL 上的应用实践

一、前言 目前&#xff0c;大模型的一个热门应用方向Text2SQL&#xff0c;它可以帮助用户快速生成想要查询的SQL语句&#xff0c;再结合可视化技术可以降低使用数据的门槛&#xff0c;更便捷的支持决策。本文将从以下四个方面介绍LLM在Text2SQL应用上的基础实践。 Text2SQL概…

22、Kubernetes核心技术 - 整合Rancher通过界面管理k8s集群

目录 一、概述 二、Rancher API Server 的功能 2.1、授权和角色权限控制 2.2、使用 Kubernetes 的功能 2.3、配置云端基础信息 2.4、查看集群信息 三、Rancher 安装 3.1、前置环境 3.2、通过 Docker 来进行安装Rancher 3.3、在 Rancher 的界面上绑定k8s集群 3.4、在 …

redis 主从同步和故障切换的几个坑

数据不一致 当我们从节点读取一个数据时&#xff0c;和主节点读取的数据不一致&#xff0c;这是因为主从同步的命令是异步进行的&#xff0c;一般情况下是主从同步延迟导致的&#xff0c;为什么会延迟&#xff0c; 主要二个原因 1、网络状态不好 2、网络没问题&#xff0c;从节…

java 项目使用 activi 设计流程,流程线上设置条件表达式时出现以下错误

项目场景&#xff1a; 背景&#xff1a; java 项目使用 activi 设计流程&#xff0c;流程线上设置条件表达式后&#xff0c;保存时出现错误 问题描述 问题&#xff1a; java 项目使用 activi 设计流程&#xff0c;流程线上设置条件表达式后&#xff0c;保存时出现以下错误&…