【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试死信队列幂等消息的出现以及处理

news2025/1/23 9:06:36

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门

文章目录

    • 1. 消费重试
      • 1.1. 消费重试应用场景
      • 1.2. 消费重试的原理
        • 1.2.1. 消息重试触发的条件
      • 1.3. 消费重试次数
    • 2. 死信队列
    • 3. 消息幂等问题的出现
    • 4. 如何保证幂等性消费呢?

1. 消费重试

消息重试是指消费者在消费某条消息失败之后,RocketMQ服务端会根据重试策略重新消费该消息,若超过最大重试次数还未消费成功则不在进行重新消费,而是直接将该消息发送到死信队列中。

1.1. 消费重试应用场景

消息重试主要是为了解决偶发性的消息消费失败导致的消费完整性问题,这些消费失败的原因包括业务处理逻辑的问题,网络抖动问题。

消费重试应用场景主要有两个:

  1. 业务处理失败,且失败的原因跟当前的消息内容相关,比如该消息对应的事务状态还未获取到,预期一段时间后可执行成功。
  2. 消息失败的原因是偶发性的,比如由于网络抖动,消费者消费时宕机等偶发性的问题导致的失败,后续的消息大概率会消费成功。

不要把消息失败来作为条件判断的结果分流,也不要通过使用消息失败来对处理速率限流。

1.2. 消费重试的原理

消费重试的状态机如下图所示:会重试的消息可能会经历如下四种状态。

消息重试的原理

  1. Ready: 已就绪状态,消息在RocketMQ服务端中准备就绪,可以被消费者消费
  2. Inflight:处理中状态,消息正在被消费者获取,处于消费中还没返回消费结果的状态。
  3. Commit: 提交状态:消息被消费者消费成功,消费者返回成功响应,消息会结束重试。
  4. Wait Retry: 待重试状态:PushComsumer独有的状态,当消费者消费失败或者消费超时,会触发消费重试机制。如果当前重试次数未达到最大重试次数,则该消息会变成待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
  5. DLQ:死信队列:当消息消费失败,并且消费重试的次数超过最大重试次数(默认是16次)之后,RocketMQ服务端会结束该消息的重试,并且将该消息直接发送到死信队列中。
1.2.1. 消息重试触发的条件
  1. 消费失败:

当消息消费失败就会触发消费重试,即消费者没有向RocketMQ服务端返回offset的情况下都被认为是消费失败。都会触发消费重试。

对应的代码没有返回 CONSUME_SUCCESS 的状态是:

// 4.创建一个回调函数
		consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
			// 5.处理消息
			for (MessageExt msg : msgs) {
				System.out.println(msg);
				System.out.println("收到的消息内容:" + new String(msg.getBody()));
			}
			// 1. 消费监听返回null则会消费重试
		 	return null;
		    //2.消费监听返回RECONSUME_LATER也会消费重试
             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
		});
  1. 消息处理超时,包括在PushConsumer中排队超时。

1.3. 消费重试次数

RocketMQ 会为每个消费组都设置一个Topic名称为"%RETRY%+consumerGroup"的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费者组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而消费失败的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重试间隔,随着重试次数的增多,重试间隔也会越来越大。

RocketMQ对于重试消息的处理是先保存至Topic名为 “SCHEDULE_TOPIC_XXXX” 的延迟队列,后台定时任务按照对应的时间进行Delay后重新保存至 “%RETRY%+consumerGroup” 的重试队列中。

与延迟队列的设置相同,消息默认会重试16次,每次重试的时间间隔如下:

10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

image-20231010230137660

2. 死信队列

前面说某个消息被重试超过最大重试次数16次之后,则会被直接发送到死信队列中。也就是说死信队列用来存放的是无法被正常消费的消息。

RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message), 将存放死信消息的队列称为死信队列(Dead-Letter Queue)。可以使用Console控制台对死信队列里的消息进行重发来使得消费者可以进行重新消费。

死信队列具备如下特点:

  1. RocketMQ会自动为需要死信队列的消费者组创建死信队列。
  2. 死信队列与消费者组对应,死信队列中包含该消费者组所有相关Topic的死信消息。
  3. 死信队列中消息的有效期与正常消息相同,默认48小时。
  4. 若要消费死信队列中的消息,需要在控制台将死信队列的权限设置为6,即可读可写。

3. 消息幂等问题的出现

幂等的定义:幂等性指的是多次操作造成的结果是一致的。在http接口中查询操作是幂等的,

新增操作:非幂等的,每次都会插入新数据

更新操作:幂等的,对同样的数据进行修改

删除操作:根据id删除是幂等的。

那么,非幂等的操作如何保证幂等性呢?

消息队列中,很可能会存在一条消息被重复发送,或者一条消息被多个消费者消费。对于像用户注册等非幂等操作,就需要做幂等性保证。可以将情况概况为如下几种:

  1. 生产者重复发送消息:由于网络抖动,导致生产者没有收到broker的ack消息而重发消息,从而造成消息队列中消息重复。

  2. 消费者重复消费消息:由于网络抖动,导致消费者没有返回ack给broker,导致消费者重复消费。

  3. rebalance时的重复消费:由于网络抖动,在rebalance重分配时也可能会出现消费者重复消费某条消息的情况。

    image-20231011222658661

4. 如何保证幂等性消费呢?

  1. mysql 插入业务id作为主键,主键是唯一的,所以一次只能插入一条
  2. 使用Redis或zk的分布式锁(主流的方案)

比如在创建订单场景下,我们在发送消息的时候传入orderId作为业务唯一ID。当消息重复发送或者重复消息的时候可以根据订单ID 来做一个逻辑判断。

为了防止两个消费者同时消费相同重复消息的情况,这时候可以在OderId上加上分布式锁,保证同一时间内相同的消息只能有一个消费者消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1127157.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于拦截器Interceptor实现简易权限控制及行为记录功能

一、业务需求 使用拦截器(Interceptor),实现Controller中方法的权限控制,并记录访问行为。要求仅在Controller方法上加注解,就可以实现权限控制。具体为: 1、拦截未登录用户的访问; 2、拦截不具有权限用户的访问&#…

ASPICE标准快速掌握「3.1. 实践示例」

实践示例 本章内容是最重要的,建议慢下来跟着博主的思路一步一步前进 1. 示例背景说明 假设我们现在是一个Tier1的车窗控制软件开发商,我们给OEM提供软件解决方案 1.1. 本过程目标 根据客户、上级部门、安全团队与质量团队等提出的要求,本项目要求SYS.1过程达到ASPICE过…

【源码】C/C++学生信息管理系统 1024程序员节日快乐

文章目录 题目介绍源码效果展示报告内容 更多源码: 点我跳转目录 题目介绍 1024程序员节日快乐! 使用语言: 此源码包含两个版本: 版本1:C语言 版本2: C 代码量: 650 题目: 学生信息管理系统, …

JAVA-编程基础-11-04-java IO 字符流

Lison <dreamlison163.com>, v1.0.0, 2023.05.07 JAVA-编程基础-11-04-java IO 字符流 文章目录 JAVA-编程基础-11-04-java IO 字符流字符流Reader 和 Writer字符输入流&#xff08;Reader&#xff09;**FileReader构造方法****FileReader读取字符数据** 字符输出流&am…

解决样本不均衡问题

一、样本不均衡问题 样本&#xff08;类别&#xff09;不平衡指的是分类任务中不同类别的训练样例数目差别很大的情况&#xff0c;一般地&#xff0c;样本类别比例&#xff08;多数类VS少数类&#xff09;明显大于1&#xff1a;1&#xff08;例如4&#xff1a;1&#xff09;就…

软考信息安全工程师备考

软考信息安全工程师备考 报名 公告地址 通知是否有考试和考试安排的公告 https://www.ruankao.org.cn/arrange报名地址 https://bm.ruankao.org.cn/sign/welcome刷题 51cto刷题小程序 5cto题库点击进去选择信息安全工程师 可以刷选择题和大题 还可以刷真题非常好用 …

Mysql数据库指定某数据库或某表赋予增删改查操作权限各类划分权限的方法总结实战

一、mysql创建用户只赋予指定数据库的增删改查操作权限 在日常生产运维工作中&#xff0c;我们经常需要给其他厂商或者合作伙伴提供数据库的账号&#xff0c;并且需要指定某个用户只能查询指定的数据库&#xff0c;并且赋予增删改查的指定权限。 &#xff08;1&#xff09;创…

面试算法38:每日温度

题目 输入一个数组&#xff0c;它的每个数字是某天的温度。请计算每天需要等几天才会出现更高的温度。例如&#xff0c;如果输入数组[35&#xff0c;31&#xff0c;33&#xff0c;36&#xff0c;34]&#xff0c;那么输出为[3&#xff0c;1&#xff0c;1&#xff0c;0&#xff…

软硬件架构分层总结

一、前言 软件系统很多架构图我们经常看到是这样的三段 就是这三段就可以演化出很多层 二、硬件架构分层 硬件层&#xff0c;基本是计算机硬件的体系结构&#xff0c;包括硬盘设备&#xff0c;cpu&#xff0c;内存&#xff0c;控制器&#xff0c;运算器&#xff0c;寄存器&am…

清除excel中换行符方法

1、选择要删除或替换换行符的单元格。 2、按 Ctrl H 以打开“查找和替换”对话框。 3、在“查找内容”栏中输入Ctrl J 或 Ctrl Enter 这时会出现一个闪烁的小点。如下图所示&#xff0c;然后点击全部替换即可。

【机器学习合集】参数初始化合集 ->(个人学习记录笔记)

文章目录 综述1. 全零与随机初始化2. 标准初始化(固定方差)3. Xavier初始化(方差缩放)4. He初始化5. 正交初始化6. MSRA初始化 综述 这些是不同的权重初始化方法&#xff0c;用于初始化神经网络的权重参数。它们的主要区别在于初始化权重的策略和数学原理。以下是这些初始化方法…

RTI-DDS代码分析使用介绍

DDS(Data Distribution Service数据分发服务)是对象管理组织OMG的有关分布式实时系统中数据发布的规范。 DDS规范采用了发布/订阅体系结构&#xff0c;但对实时性要求提供更好的支持。DDS是以数据为中心的发布/订阅通信模型。 以下工程基于rti_connext_dds-7.2.0 hello_world.…

Spark简单回顾

星光下的赶路人star的个人主页 大鹏一日同风起&#xff0c;扶摇直上九万里 文章目录 1、Spark1.1 Spark入门1.1.1 Spark部署模式1.1.2 常用端口 1.2 SparkCore1.2.1 RDD不可变和五大属性1.2.2 RDD的弹性1.2.3 cache和Checkpoint的区别1.2.4 算子 1.3 SparkSQL1.4 内核1.4.1提交…

在Linux上安装RStudio工具并实现本地远程访问【内网穿透】

文章目录 前言1. 安装RStudio Server2. 本地访问3. Linux 安装cpolar4. 配置RStudio server公网访问地址5. 公网远程访问RStudio6. 固定RStudio公网地址 前言 RStudio Server 使你能够在 Linux 服务器上运行你所熟悉和喜爱的 RStudio IDE&#xff0c;并通过 Web 浏览器进行访问…

音频怎么录制?让你轻松成为录音专家!

“音频可以录制吗&#xff1f;如果可以那应该怎么去操作呢&#xff1f;参加了一个配音比赛&#xff0c;需要录制自己配音的视频&#xff0c;但是我不懂怎么录制音频&#xff0c;眼看比赛就要截止了&#xff0c;真的很着急&#xff0c;大家帮帮我。” 音频录制是一项常见但强大…

【数据结构初阶】算法的时间复杂度和空间复杂度

算法的时间复杂度和空间复杂度 1.算法效率1.1 如何衡量一个算法的好坏1.2 算法的复杂度 2.时间复杂度2.1 时间复杂度的概念2.2 大O的渐进表示法2.3常见时间复杂度计算举例 3.空间复杂度4. 常见复杂度对比 1.算法效率 1.1 如何衡量一个算法的好坏 如何衡量一个算法的好坏呢&am…

【pdf密码】为什么我的PDF文件不能复制文字?

大家现在接触PDF文件越来越多&#xff0c;有的时候在网上下载的PDF文件打开之后&#xff0c;发现选中文字之后无法复制。甚至其他功能也都无法使用&#xff0c;这是怎么回事&#xff1f;该怎么办&#xff1f; 当我们发现文件打开之后&#xff0c;编辑功能无法使用&#xff0c;很…

设置中添加UI设置系统默认NTP服务器

经常遇到客户在内网中使用无法与ntp服务器通讯导致系统时间错乱&#xff0c;他们想自己替换ntp地址要么是用adb命令要么是重新刷机&#xff0c;这样比较浪费客户的时间。 看了一下Android系统中选择ntp地址的逻辑&#xff0c;发现在framework中已经有了个ntp地址那么系统将会选…

window10彻底关闭系统管理员控制(所有软件以管理员身份运行)

window10彻底关闭系统管理员控制&#xff08;所有软件以管理员身份运行&#xff09; gpedit.msc》计算机配置》windows设置》安全设置》安全选项》 1.用户账户控制&#xff1a;以管理员批准模式运行所有管理员 2.用户账户控制&#xff1a;用于内置管理员账户的管理员批准模式 1…

GeoHash分享

写在前边 复制的一个内部分享&#xff0c;所以可能更偏向PPT性质&#xff0c;本文提出的问题&#xff0c;在末尾参考材料中都会有所提及&#xff0c;包括更深层次的实现原理和各大API对于GeoHash的优化。感兴趣的读者可以拓展看一下。 START GeoHash是一种地址编码&#xff…