【RocketMQ】消费失败重试与死信消息

news2024/9/29 23:32:56

🎯 导读:本文档详细介绍了RocketMQ中的重试机制与死信消息处理方法。对于生产者而言,文档提供了如何配置重试次数的具体示例;而对于消费者,它解释了默认情况下消息消费失败后的重试策略,并展示了如何通过代码自定义重试次数。当消息经过多次重试仍无法成功消费时,RocketMQ会将其标记为死信消息,并存入特定的死信队列中。文档还提供了处理死信队列的两种策略:一种是编写专门的消费者来处理这些消息,另一种是在达到一定重试次数后签收消息并通知人工干预。此外,还包括了关于死信消息生产和消费的基本示例代码。

文章目录

  • RocketMQ 重试机制
    • 生产者重试
    • 消费者重试
  • RocketMQ 死信消息
    • 消息生产者
    • 消息消费者
    • 死信消费者
    • 控制台显示

RocketMQ 重试机制

生产者重试

// 失败的情况重发3次(同步)
producer.setRetryTimesWhenSendFailed(3);
// 失败的情况重发3次(异步)
producer.setRetryTimesWhenSendAsyncFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);

【示例代码】

@Test
public void retryProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    // 如果发送失败要重试几次(同步),不设置默认值是2
    producer.setRetryTimesWhenSendFailed(3);
    // 如果发送失败要重试几次(异步)
//        producer.setRetryTimesWhenSendAsyncFailed(3);
    String key = UUID.randomUUID().toString();
    System.out.println(key);
    Message message = new Message("retryTopic", "vip1", key, "我是vip666的文章".getBytes());
    producer.send(message);
    System.out.println("发送成功");
    producer.shutdown();
}

消费者重试

如果消息消费失败,默认会重试16次,重试的时间间隔:10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

能否自定义重试次数?

可以,重试的次数一般设置为5

// 消费失败,重试几次
consumer.setMaxReconsumeTimes(5);

如果重试了16次(并发模式是16次,顺序模式下重试次数是 int 类型最大值) 都是失败的,怎么处理?

认为该消息是死信消息,将消息放在一个死信主题中去,名称:%DLQ%消费者组名,最后再实现一个消费者去消费死信消息,一般是发邮件发短信通知人工处理、做一些记录

在这里插入图片描述

死信队列只有一个队列

在这里插入图片描述

当消息处理失败的时候 该如何正确的处理?

方案一:处理死信队列,如果每个死信队列都写一个消费者,很麻烦

/**
 * 方案一
 * 死信队列消费者
 * @throws Exception
 */
@Test
public void retryDeadConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("%DLQ%retry-consumer-group", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println(new Date());
            System.out.println(new String(messageExt.getBody()));
            System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
            // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

方案二:在实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理

/**
 * 方案二
 * 重试次数较多,直接做日志记录、通知人工处理
 * @throws Exception
 */
@Test
public void retryConsumer2() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("retryTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println(new Date());
            try {
                // 业务处理,模拟报错
                handleDb();
            } catch (Exception e) {
                // 重试
                int reconsumeTimes = messageExt.getReconsumeTimes();
                if (reconsumeTimes >= 3) {
                    // 重试次数太大,不要重试了
                    System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

private void handleDb() {
    int i = 10 / 0;
}

RocketMQ 死信消息

当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列

  • 当一条消息初次消费失败, RocketMQ 会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。
  • 如果产生了死信消息,对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。
    • 可以利用 RocketMQ Admin 工具或者 RocketMQ Dashboard 上查询到对应死信消息的信息。
    • 也可以监听死信队列,进行自己的业务上的逻辑,写日志、通知人工处理

消息生产者

@Test
public void testDeadMsgProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("dead-group");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    Message message = new Message("dead-topic", "我是一个死信消息".getBytes());
    producer.send(message);
    producer.shutdown();
}

消息消费者

@Test
public void testDeadMsgConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("dead-topic", "*");
    // 设置最大消费重试次数 2 次
    consumer.setMaxReconsumeTimes(2);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(msgs);
            // 测试消费失败
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    consumer.start();
    System.in.read();
}

死信消费者

注意权限问题

在这里插入图片描述

@Test
public void testDeadMq() throws  Exception{
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
    consumer.setNamesrvAddr("localhost:9876");
    // 消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
    // 队列名称 默认是 %DLQ% + 消费者组名
    consumer.subscribe("%DLQ%dead-group", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(msgs);
            // 处理消息 签收了
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

控制台显示

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2178111.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32LL库之printf函数重定向

1. 加入以下代码 int fputc(int ch,FILE *f) {LL_USART_TransmitData8(USART1,ch);while(!LL_USART_IsActiveFlag_TXE(USART1));//需要等待发送完成return(ch); }记得添加 stdio.h 头文件 2. 在MDK中勾选&#xff1a;Use MicroLIB

C++【类和对象】(取地址运算符重载与实现Date类)

文章目录 取地址运算符重载const成员函数取地址运算符重载 Date类的实现Date.hDate.cpp1.检查日期合法性2. 构造函数/赋值运算符重载3.得到某月的天数4. Date类 - 天数的操作4.1 日期 天数4.2 日期 天数4.3 日期 - 天数4.4 日期 - 天数 5. Date的前后置/--5.1 前置5.2 后置5.…

学习鸿蒙HarmongOS(基础一)

最近听到一个朋友在干鸿蒙系统开发&#xff0c;于是我也来看看&#xff0c;我看到的第一感觉和前端TS好像&#xff0c;鸿蒙的是叫ArkTS&#xff0c;于是来看一下视频&#xff0c;学习了一下&#xff0c;我的随手笔记记录一下吧,方便我以后阅读 基本 语句 函数

unity3D雨雪等粒子特效不穿透房屋效果实现

做项目有时候会做天气模拟&#xff0c;模拟雨雪天气等等。但是容易忽略一个问题&#xff0c;就是房屋内不应该下雨或者下雪&#xff0c;这样不就穿帮了嘛。 下面就粒子穿透物体问题做一个demo。 正常下雨下雪在室内的话&#xff0c;你可以看到&#xff0c;粒子是穿透建筑的。…

【C++篇】启航——初识C++(上篇)

目录 引言 一、C的起源和发展史 1.起源 2.C版本更新 二、C在⼯作领域中的应⽤ 三、C入门建议 1.参考文档 2.推荐书籍 四、C的第一个程序 1.C语言写法 2.C写法 五、命名空间 1.为什么要有命名空间 2.定义命名空间 3.主要特点 4.使用示例 六、C输⼊&输出 …

C程序设计——结构化程序设计的三种结构

前面我说过&#xff1a;“结构化编程语言&#xff0c;用语法限制程序员&#xff0c;只能使用顺序、选择、循环三种结构来解决问题。” 接下来&#xff0c;就讲解这三种结构。 顺序结构 前面我讲过&#xff0c;C语言所有的程序&#xff0c;都必须有一个 main 函数&#xff0c…

TCP\IP标准与OSI标准

TCP/IP 模型和 OSI 模型都是用于描述网络体系结构的模型&#xff0c;但它们的设计理念和层次结构有所不同。TCP/IP 模型更注重实际实现&#xff0c;而 OSI 模型更注重抽象和标准化。 1. OSI 模型 (Open Systems Interconnection Model) OSI 模型是一个七层模型&#xff0c;从…

828华为云征文|部署在线论坛网站 Flarum

828华为云征文&#xff5c;部署在线论坛网站 Flarum 一、Flexus云服务器X实例介绍二、Flexus云服务器X实例配置2.1 重置密码2.2 服务器连接2.3 安全组配置2.4 Docker 环境搭建 三、Flexus云服务器X实例部署 Flarum3.1 Flarum 介绍3.2 Flarum 部署3.3 Flarum 使用 四、总结 一、…

针对考研的C语言学习(定制化快速掌握重点2)

1.C语言中字符与字符串的比较方法 在C语言中&#xff0c;单字符可以用进行比较也可以用 > , < ,但是字符串却不能用直接比较&#xff0c;需要用strcmp函数。 strcmp 函数的原型定义在 <string.h> 头文件中&#xff0c;其定义如下&#xff1a; int strcmp(const …

Vue.js组件开发指南

Vue.js组件开发指南 Vue.js 是一个渐进式的 JavaScript 框架&#xff0c;用于构建用户界面。它的核心是基于组件的开发模式。通过将页面分解为多个独立的、可复用的组件&#xff0c;开发者能够更轻松地构建复杂的应用。本文将深入探讨 Vue.js 组件开发的基础知识&#xff0c;并…

基于springoot新能源充电系统的设计与实现

新能源充电系统的设计与实现 摘 要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统新能源充电系统信息管理难度…

国产纯电SUV都在秀,只有Model Y在挨揍

文/王俣祺 导语&#xff1a;如果想知道纯电SUV应该怎么选&#xff0c;一定有人告诉你“无脑选Model Y”&#xff0c;虽说特斯拉确实粉丝多&#xff0c;但这也恰恰证明Model Y一度成为了纯电SUV的标杆。有标杆自然就有挑战者&#xff0c;随着阿维塔07、智己LS6以及乐道L60先后上…

云南省职业院校技能大赛赛项规程(软件测试)

赛项名称&#xff1a;软件测试 英文名称&#xff1a;Software Testing 赛项组别&#xff1a;高等职业教育 赛项编号&#xff1a;GZ034 目录 一、 赛项信息 二、竞赛目标 三、竞赛内容 1、本赛项考查的技术技能和涵盖的职业典型工作任务 2、专业核心能力与职业综合能力…

商标名称注册查询,到底是查询什么!

在商标注册前是需要商标名称注册查询&#xff0c;那这个到底是查询什么&#xff0c;普推知产商标老杨发现&#xff0c;近日国家知产局发布《商标代理委托合同示范文本》征求意见稿&#xff0c;虽然是参考使用不具有强制性&#xff0c;里面对商标名称注册查询描述是申请前商标检…

完成UI界面的绘制

绘制UI 接上文&#xff0c;在Order90Canvas下创建Image子物体&#xff0c;图片资源ui_fish_lv1&#xff0c;设置锚点&#xff08;CountdownPanelImg同理&#xff09;&#xff0c;命名为LvPanelImg,创建Text子物体&#xff0c;边框宽高各50&#xff0c; &#xff0c;重名为LvT…

阻焊层解析:PCB的“保护伞”是什么?

在电子制造行业中&#xff0c;尤其是PCBA贴片加工领域&#xff0c;阻焊层是一个重要的概念。以下是对阻焊层的详细讨论分析&#xff0c;包括其定义、作用以及类型。 阻焊层的定义 阻焊层&#xff0c;顾名思义&#xff0c;是一种用于阻止焊接的材料层。在PCB&#xff08;印刷电…

11.C++程序中的常用函数

我们将程序中反复执行的代码封装到一个代码块中&#xff0c;这个代码块就被称为函数&#xff0c;它类似于数学中的函数&#xff0c;在C程序中&#xff0c;有许多由编译器定义好的函数&#xff0c;供大家使用。下面就简单说一下&#xff0c;C中常用的函数。 1.sizeof sizeof函…

Perceptually Optimized Deep High-Dynamic-RangeImage Tone Mapping

Abstract 我们描述了一种深度高动态范围&#xff08;HDR&#xff09;图像色调映射算子&#xff0c;该算子计算效率高且感知优化。 我们首先将 HDR 图像分解为归一化拉普拉斯金字塔&#xff0c;并使用两个深度神经网络 (DNN) 根据归一化表示估计所需色调映射图像的拉普拉斯金字…

Mybatis缓存机制(图文并茂!)

目录 一级缓存 需求我们在一个测试中通过ID两次查询Monster表中的信息。 二级缓存 案例分许(和上述一样的需求) EhCache第三方缓存 在了解缓存机制之前&#xff0c;我们要先了解什么是缓存&#xff1a; ‌缓存是一种高速存储器&#xff0c;用于暂时存储访问频繁的数据&…