17.RocketMQ之死信队列

news2024/11/23 8:13:36

highlight: arduino-light

1.5 死信队列

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。对应的源码如下:

java public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { ​    public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";    public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; ​    private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,                                      RemotingCommand request,                                      MessageExt msg, TopicConfig topicConfig) {        // 获取topic进行判断逻辑        String newTopic = requestHeader.getTopic();        // 重试队列:%RETRY%+consumerGroup        // 如果是重试队列才会进入判断        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {            //获取消费者组            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());            SubscriptionGroupConfig subscriptionGroupConfig =                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);            if (null == subscriptionGroupConfig) {                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);                response.setRemark(                    "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));                return false;           }            // 获取最大重试次数            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();           }            // 获取消费次数            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();            // 超过最大重试次数之后发送到死信队列            if (reconsumeTimes >= maxReconsumeTimes) {                // 死信队列:%DLQ%+consumerGroup                newTopic = MixAll.getDLQTopic(groupName);                                int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;                topicConfig = this.brokerController.getTopicConfigManager()                   .createTopicInSendMessageBackMethod(newTopic,                                                                      DLQ_NUMS_PER_GROUP,                                                                      PermName.PERM_WRITE, 0                                                                     );                msg.setTopic(newTopic);                msg.setQueueId(queueIdInt);                if (null == topicConfig) {                    response.setCode(ResponseCode.SYSTEM_ERROR);                    response.setRemark("topic[" + newTopic + "] not exist");                    return false;               }           }       }        int sysFlag = requestHeader.getSysFlag();        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;       }        msg.setSysFlag(sysFlag);        return true;   } }

在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

1.5.1 死信特性

image.png

死信消息具有以下特性

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列具有以下特性:

  • 一个死信队列对应一个消费者Group, 而不是对应单个消费者实例。
  • 如果一个消费者Group未产生死信消息,消息队列RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应消费者Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
  • 死信队列的topic是在消息发送过程中判断对应的topic是否存在,不存在就动态进行创建
  • 重试队列是以%RETRY%+consumerGroup作为TOPIC。
  • 死信队列是以%DLQ%+consumerGroup作为TOPIC。
  • 死信队列对应Topic的权限为2,只有写权限,所以死信队列没办法读取。需要设置权限为6[2:W; 4:R; 6:RW]

image.png

1.5.2 查看死信信息

  1. 在控制台查询出现死信队列的主题信息

image.png

  1. 在消息界面根据主题查询死信消息

image.png

  1. 选择重新发送消息

一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。

1.5.3处理死信队列

1.订阅并消费死信队列

```java public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstartconsumerdlq"); consumer.setNamesrvAddr("localhost:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUMEFROMFIRSTOFFSET); // 订阅死信队列 consumer.subscribe("%DLQ%quickstartconsumer_dlq", "*"); consumer.setMaxReconsumeTimes(1); consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

} ```

2.重试次数判断,持久化重试

重试次数到达一定次数,可能程序没问题,只是数据有问题,此时无论重试多少次,最终的结果都是一样的,所以此时可以直接返回成功!

由我们自己的通用定时任务重试,消息中注意需要定义1个bizType。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/702309.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

flutter 简介 flutter 能为我们做什么

flutter 简介 flutter 能为我们做什么 前言一、什么是Flutter&#xff1f;二、Flutter的特点和优势三、Flutter与其他跨平台框架的比较总结 前言 陆陆续续已经写了60多篇的flutter 的文章了&#xff0c;本篇文章就来说说我对flutter 的简单看法 一、什么是Flutter&#xff1f…

【Rust】安装

文章目录 1.官网下载2.安装3.安装验证4.打开本地文档5.安装插件6.HelloWorld①新建项目目录使用VSCode打开②新建rs文件③编译④运行 7.HelloCargo①新建项目目录使用VSCode打开②cargo build③cargo run④cargo check⑤为发布构建 8.更新与卸载 1.官网下载 官网地址&#xff…

c++11 标准模板(STL)(std::basic_ostream)(一)

定义于头文件 <ostream> template< class CharT, class Traits std::char_traits<CharT> > class basic_ostream : virtual public std::basic_ios<CharT, Traits> 类模板 basic_ostream 提供字符流上的高层输出操作。受支持操作包含有格式…

工业读码器在工业生产上应用的优势有哪些?

工业读码器是一种用于读取和解码条形码、二维码等信息的设备&#xff0c;一般广泛应用于工业生产中。可以辅助企业进行工业生产流程、物料等方面的管理。下面我们就一起来了解一下&#xff0c;工业读码器在工业生产上应用的优势有哪些&#xff1f; 工业读码器在工业生产上应用…

基于Java学生公寓管理系统设计实现(源码+lw+部署文档+讲解等)

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

华为OD机试真题 Python 实现【工作安排】【2023Q1 100分】

目录 一、题目描述二、输入描述三、输出描述四、解题思路五、Python算法源码六、效果展示1、输入2、输出3、说明 一、题目描述 小明每周上班都会拿着自己的工作清单&#xff0c;工作清单内包含n项工作&#xff0c;每项工作都有对应的耗时时长&#xff08;单位h&#xff09;和报…

回归预测 | MATLAB实现基于QPSO-BiLSTM、PSO-BiLSTM、BiLSTM多输入单输出回归预测

回归预测 | MATLAB实现基于QPSO-BiLSTM、PSO-BiLSTM、BiLSTM多输入单输出回归预测 目录 回归预测 | MATLAB实现基于QPSO-BiLSTM、PSO-BiLSTM、BiLSTM多输入单输出回归预测效果一览基本描述程序设计参考资料 效果一览 基本描述 1.Matlab实现QPSO-BiLSTM、PSO-BiLSTM、BiLSTM神经…

【综合布线技术】网络杂谈(17)之什么是综合布线系统

涉及知识点 什么是综合布线系统&#xff0c;综合布线的特点&#xff0c;综合布线的标准&#xff0c;综合布线 6 个子系统&#xff0c;综合布线系统的构成&#xff0c;深入了解综合布线技术。 原创于&#xff1a;CSDN博主-《拄杖盲学轻声码》&#xff0c;更多内容可去其主页关注…

chatgpt赋能python:Python量化指标库介绍

Python量化指标库介绍 Python是一种高级编程语言&#xff0c;因其简单易用、开源免费、生态环境完备等优点&#xff0c;已成为量化分析领域的首选编程语言之一。随着金融市场越来越复杂&#xff0c;金融量化分析的需求也日益增长。为了满足这一需求&#xff0c;Python量化指标…

【OJ比赛日历】快周末了,不来一场比赛吗? #07.01-07.07 #11场

CompHub[1] 实时聚合多平台的数据类(Kaggle、天池…)和OJ类(Leetcode、牛客…&#xff09;比赛。本账号会推送最新的比赛消息&#xff0c;欢迎关注&#xff01; 以下信息仅供参考&#xff0c;以比赛官网为准 目录 2023-07-01&#xff08;周六&#xff09; #3场比赛2023-07-02…

Yarn的实现原理

Yarn作为分布式集群的资源调度框架&#xff0c;它的出现伴随着Hadoop的发展&#xff0c;使Hadoop从一个单一的大数据计算引擎&#xff0c;成为一个集存储、计算、资源管理为一体的完整大数据平台&#xff0c;进而发展出自己的生态体系&#xff0c;成为大数据的代名词。 Yarn的发…

谷歌云:全面推出 AlloyDB for PostgreSQL 与数据库迁移服务

【本文由Cloud Ace 整理发布。Cloud Ace 是谷歌云全球战略合作伙伴&#xff0c;拥有 300 多名工程师&#xff0c;也是谷歌最高级别合作伙伴&#xff0c;多次获得 Google Cloud 合作伙伴奖。作为谷歌托管服务商&#xff0c;我们提供谷歌云、谷歌地图、谷歌办公套件、谷歌云认证培…

在Android Studio 中运行React Native 项目

项目根目录执行命令安装开发依赖 yarn检查项目SDK、NDK、JDK否配置正确 点击 Android Studio 里点击大象 全部下载完毕&#xff0c;点击运行按钮&#xff0c;编译项目 连接真机的两种方式 无线连接 adb devices adb tcpip 5555 #连接端口默认5555 adb connect 192.168.0…

低功耗测距语音方案,4路PWM调光,三合一语音芯片WTV890-B004

随着智能门锁市场的不断发展&#xff0c;人们对于智能化、便捷化的需求也越来越高。在这个背景下&#xff0c;深圳唯创知音研发出了三合一语音芯片——WTV890-B004。这款创新产品集低功耗红外测距、语音播放和4路PWM调光功能于一体&#xff0c;为您打造一个高效、智能化方案。 …

上传自己的npm依赖包

有时候我们需要对某个依赖包的源码进行修改进行使用&#xff0c;但我们又不能对已有的源码官网进行上传更新&#xff0c;这时&#xff0c;我们可以获取依赖包进行修改后&#xff0c;自行部署到https://npmjs.com中 1.官网https://npmjs.com中注册一个账号&#xff08;账号&…

微信小程序 editor图片上传到node服务器并展示在当前页面

前端 html <!-- 富文本 --><view class"container"><editor id"editor" ref"editor" :placeholderplaceholder input"onInput" ready"onEditorReady"></editor></view><view class…

pytorch 的matmult()函数详解

torch.matmul()也是一种类似于矩阵相乘操作的tensor连乘操作。但是它可以利用python中的广播机制&#xff0c;处理一些维度不同的tensor结构进行相乘操作。 matmul 就是矩阵求 叉乘 如果是二维矩阵&#xff0c;两个矩阵的大小应该为m*n &#xff0c;n*m。 一维向量的乘积&…

使用 .NET 开始 OpenAI Completions

作者&#xff1a;Luis Quintanilla 翻译&#xff1a;Alan Wang 排版&#xff1a;Alan Wang 欢迎来到有关 OpenAI 和 .NET 的博客系列&#xff01; 如果您是新来的&#xff0c;请查看我们的第一篇文章&#xff0c;我们在其中介绍了系列内容&#xff0c;并向您展示如何在 .NET 中…

vue3 elementplus table表格多行合计

表格底部如何多行合计 1.先在标签上定义合计方法 <el-table:data"data":summary-method"getSummaries":show-summary"true"selection-change"handleSelectionChange">2.文件头部引入h函数渲染多行div&#xff0c;BigNumber 防…

2023上半年软考系统分析师科目一整理-15

2023上半年软考系统分析师科目一整理-15 信息系统的性能评价指标是客观评价信息系统性能的依据&#xff0c;其中&#xff0c;&#xff08; &#xff09;是指系统在单位时间内处理请求的数量。 &#xff08;60&#xff09;A.系统响应时间 B.吞吐量 C.资源利用率 D.并发用户数 吞…