RabbitMQ 的经典问题

news2024/10/6 14:28:34

文章目录

  • 前言
  • 一、防止消息丢失
    • 1.1 ConfirmCallback/ReturnCallback
    • 1.2 持久化
    • 1.3 消费者确认消息
  • 二、防止重复消费
  • 三、处理消息堆积
  • 四、有序消费消息
  • 五、实现延时队列
  • 六、小结
  • 推荐阅读

前言

当设计和运维消息队列系统时,如 RabbitMQ,有几个关键问题需要特别关注:消息丢失、重复消费、消息堆积、有序消费和延时队列。这些问题直接影响系统的可靠性、性能和数据完整性。本文将深入探讨如何在使用 RabbitMQ 时有效地解决这些问题。

一、防止消息丢失

在 RabbitMQ 中,消息从生产者到消费者需要经历多个阶段。以下是消息传递的流程:

  1. 生产者创建消息:生产者(Producer)创建要发送的消息。
  2. 消息发送到交换机:生产者将消息发送到 RabbitMQ 服务器上的交换机(Exchange)。
  3. 消息进入队列:交换机将消息路由到一个或多个队列(Queue)。
  4. 消息从队列投递到消费者:RabbitMQ 将消息从队列中取出并投递给订阅的消费者。消费者接收到消息后,可以开始处理消息。

在这里插入图片描述

在这个流程中,可能会发生以下几个问题:

  1. 生产者无法确认消息是否发送到 RabbitMQ 服务器,若中途出现意外,则可能丢失消息
  2. 消息在 RabbitMQ 服务器内存中,若服务器出现异常,会丢失消息
  3. RabbitMQ 服务器无法确定消费者是否正常消费消息,中途出现异常,会丢失消息

1.1 ConfirmCallback/ReturnCallback

为了解决第一个问题,生产者无法确认消息是否正确发送到 RabbitMQ 服务器,RabbitMQ 给出的解决方案是:

  1. ConfirmCallback: 用来确认消息是否被 RabbitMQ 服务器成功接收的回调接口。
    • 当消息成功发送到交换机时,会调用 ConfirmCallback 的 confirm 方法。
    • 如果消息发送到交换机失败(比如交换机不存在),也会调用 ConfirmCallback 的 confirm 方法,此时 ack 参数为 false。
    • 通常情况下,ConfirmCallback 用来确认消息是否成功发送到交换机
  2. ReturnCallback: 用来处理未被路由到合适 Queue 的消息的回调接口。
    • 当消息从交换机发送到队列失败时,如果设置了 mandatory 为 true,则会调用 ReturnCallback。
    • 如果消息成功路由到队列,则不会调用 ReturnCallback。
    • 在消息无法被路由到队列时,会调用 ReturnCallback 的 returnedMessage 方法,可以在该方法中处理未路由消息的相关逻辑,比如重新发送或记录日志等。

1.2 持久化

针对第二个问题:RabbitMQ 服务器异常,内存中的内容无法持久化导致消息丢失。RabbitMQ 提供了相应的持久化方案:

  1. 交换机持久化:当声明一个交换机时,可以选择将其标记为持久化。持久化的交换机将会存储在磁盘上,即使 RabbitMQ 服务器重启,也不会丢失。
  2. 队列持久化:类似于交换机,可以声明一个队列为持久化。持久化的队列会在 RabbitMQ 服务器重启后保留,并且其中的消息也会被保留。这确保了即使发生了服务中断或者重启,消息也不会丢失。
  3. 消息持久化:当发布一条消息到 RabbitMQ 时,可以选择使消息持久化。持久化的消息将会写入磁盘,这样即使 RabbitMQ 服务器在消息到达消费者之前崩溃,消息也不会丢失。持久化消息比非持久化消息更加安全,但是也会有性能开销,因为需要写入磁盘。

1.3 消费者确认消息

RabbitMQ 是阅后即焚机制,即 RabbitMQ 将消息发送到消费者后会立刻删除。如果 RabbitMQ 将消息投递给消费者后,RabbitMQ 将消息进行了删除。然而,消费者出现了异常,并没有正常处理消息。就会导致消息丢失。

在 RabbitMQ 中,消费者可以设置不同的确认模式(acknowledgement mode),以确定当消费者收到消息时如何向 RabbitMQ 确认消息已经被处理。这些确认模式通常称为手动确认(manual acknowledgment)、自动确认(automatic acknowledgment)和无确认(no acknowledgment),也可以简写为 manual、auto 和 none。

  1. Manual Acknowledgment (manual):
    • 在手动确认模式下,消费者收到消息后,必须显式地向 RabbitMQ 发送一个确认(ack)来告知它已经处理了该消息。
    • 这种方式可以确保消息只有在消费者成功处理后才被标记为已传递(delivered),避免消息在处理过程中丢失。
  2. Automatic Acknowledgment (auto):
    • 在自动确认模式下,当消费者收到消息并且 RabbitMQ 将其成功传递给消费者时,它会自动向 RabbitMQ 发送一个确认。这意味着一旦消息传递给消费者,RabbitMQ 就会将其标记为已传递,而无需消费者显式确认。
    • 这种模式适合于那些允许偶尔丢失一些消息的应用场景,因为在消息传递给消费者后,就无法确保消费者是否成功处理了消息。
  3. No Acknowledgment (none):
    • 在无确认模式下,消费者在接收到消息后,RabbitMQ 不会等待消费者的确认,也不会尝试重新传递消息,而是将消息传递给消费者并将其标记为已传递,然后立即将其视为已处理。
    • 这种模式通常用于那些不需要保证每条消息都被处理的场景,例如日志处理等。

二、防止重复消费

RabbitMQ 的重复消费问题指的是在消息队列中,消费者可能会多次处理相同的消息,从而导致业务逻辑出现问题或者数据处理不一致的情况。这种问题通常发生在以下几种情况下:

  1. 消息重新投递
    • 当消费者处理消息时发生了异常或者消费者未能确认消息的处理完成(Ack),RabbitMQ 可能会将消息重新投递到同一个消费者或者其他消费者,导致消息被重复消费。
    • 这种情况下,如果消费者处理消息的过程中出现了异常或者无法确认消息处理结果,RabbitMQ 会将消息重新发送给消费者,以确保消息不会丢失。
  2. 消费者处理时间过长
    • 如果消息处理需要较长时间,而消费者在处理过程中并未确认消息的完成(Ack),则 RabbitMQ 可能会误以为消息未能成功处理,再次将消息投递给消费者。这样可能导致消息被重复处理。

在 RabbitMQ 中,防止重复消费的方法通常涉及以下几种策略和技术:

  1. 消息去重(Message Deduplication)
    • 使用唯一标识符(如消息的 ID 或者业务相关的唯一键)来标记每条消息。在消费者端,在处理消息之前,可以通过维护一个已处理消息的列表或者使用缓存(如 Redis)来检查这个唯一标识符是否已经被处理过。如果已经处理过,则可以选择性地丢弃这条消息或者执行相应的处理逻辑。
    • 如果消息具有全局唯一性要求,可以在生产者端确保生成的消息 ID 或者唯一键的唯一性,以确保在消费者端能够准确判断消息是否已经处理过。
  2. 幂等性操作(Idempotent Consumers)
    • 在设计消费者应用程序时,尽量确保消费者的处理逻辑具有幂等性。即使同一条消息被消费多次,也不会引起意外的副作用或者数据不一致的情况。例如,数据库操作中的幂等性可以通过使用唯一键约束、UPSERT(如果支持)、乐观锁等技术来实现。
    • 这种方式适用于那些无法避免消息重复投递的场景,或者确保消费者的处理逻辑具备强大的鲁棒性和数据一致性。
  3. 消息确认和消息预取(Message Acknowledgment and Prefetching)
    • RabbitMQ 提供了消息确认机制,即消费者在处理完消息后需要显式地确认消息。确保消费者确认消息后再进行后续的处理操作,可以防止因为消费者未能成功处理消息而导致消息重复消费的问题。
    • 同时,通过合理设置消费者的预取数(Prefetch Count),可以控制消费者从队列中预先获取的消息数量。这可以帮助在一定程度上减少因消息处理过长或者消费者出现故障而导致的消息重复消费。

三、处理消息堆积

RabbitMQ 消息堆积问题指的是在消息队列中积累大量未被消费的消息,导致队列中的消息数量急剧增加,可能会对系统的性能和稳定性产生负面影响。这种问题通常由以下几个原因引起:

  1. 生产者速度快于消费者速度
    • 如果生产者生产消息的速度远快于消费者处理消息的速度,未被消费的消息会不断积累在队列中,最终导致队列中消息数量急剧增加,形成消息堆积。
    • 这种情况可能发生在消费者处理能力不足、消费者出现故障或者网络延迟等情况下。
  2. 消费者处理能力不足
    • 当消费者处理消息的速度跟不上生产者发送消息的速度时,未处理的消息会在队列中积累,最终导致消息堆积问题。
    • 消费者处理能力不足可能是因为消费者数量不足、消费者处理逻辑复杂或者消费者出现瓶颈等原因引起的。
  3. 队列设置不当
    • 队列的容量设置不当,例如队列的最大长度设置过小,无法应对高峰期的消息积压。

解决 RabbitMQ 消息堆积问题需要综合考虑生产者和消费者之间的消息流量控制、队列的监控与管理以及系统的容错处理。以下是一些常见的解决方法和策略:

  1. 增加消费者数量
    • 最直接的方法是增加消费者的数量,以提升消息处理的能力。通过增加消费者,可以分担队列中消息的处理压力,减少消息堆积的可能性。
    • 可以动态地根据系统负载情况或者队列中消息数量来调整消费者的数量。
  2. 优化消费者处理能力
    • 优化消费者的处理逻辑,确保消费者能够高效地处理消息。这包括优化数据库访问、提升算法效率、避免长时间阻塞操作等。
    • 使用并发处理和异步处理技术,充分利用多线程或者异步框架来提高消费者的并发处理能力。
  3. 调整队列的配置参数
    • 根据实际情况调整队列的容量限制、消息过期时间等参数,避免队列过于拥挤或者消息长时间积压。

四、有序消费消息

在消息队列系统中,通常情况下,消息在生产者发送到队列后,会按照 FIFO(先进先出)的原则被消费者处理。但是,当存在多个消费者同时消费同一个队列时,RabbitMQ 无法保证消息的严格顺序性,因为不同的消费者可能以不同的速度处理消息,导致消息的处理顺序可能被打乱。

在 RabbitMQ 中实现有序消费消息通常涉及以下几种方法和技术:

  1. 单队列单消费者
    • 最简单的方式是使用单队列和单消费者。RabbitMQ 会确保对于同一个队列,消息的投递顺序与其被消费的顺序一致。这意味着当消费者从队列中接收消息时,它们会按照它们被发送的顺序进行处理。
    • 在这种情况下,RabbitMQ 的默认行为会保证消息的有序性,只要消费者处理消息的速度足够快,就能保证消息的有序消费。
  2. 使用消息的顺序属性
    • 在生产者端,可以为每条消息添加一个标识其顺序的属性,例如序号或者时间戳。然后在消费者端,通过这些属性来排序和处理消息。
    • 消费者可以在消费消息之前先对消息进行排序,或者根据属性来判断是否需要延迟处理某些消息,以保证消息的有序性。
  3. 使用全局顺序化插件(RabbitMQ Global Ordered Queue)
    • RabbitMQ 提供了全局顺序化插件,它可以确保所有队列中的消息都按照一定的顺序被投递和消费。这个插件适用于一些需要强有序性的场景,但需要注意它可能会引入一些性能上的限制和开销。

五、实现延时队列

在常规的消息队列中,消息一旦发送到队列中,就会尽快被消费者获取和处理。然而,某些业务场景可能需要延迟发送消息,或者延迟消费消息,这时就需要使用延时队列来实现这一需求。

在 RabbitMQ 中实现延时队列通常涉及使用以下几种方法:

  1. 使用 TTL(Time-To-Live)和死信队列(Dead Letter Exchange)
    • RabbitMQ 支持设置消息的 TTL,即消息的存活时间。通过设置消息的 TTL,可以让消息在指定的时间段后过期,然后被发送到死信队列(Dead Letter Queue)。
    • 死信队列是一个特殊的队列,它接收所有因某些原因未能成功消费的消息,包括过期的消息。可以通过设置队列的 x-dead-letter-exchangex-dead-letter-routing-key 参数,将消息发送到指定的交换器和路由键,实现延时消息的转发和消费。
  2. 利用插件实现延时队列
    • RabbitMQ 社区提供了一些插件,例如 rabbitmq_delayed_message_exchange 插件,它能够在 RabbitMQ 中实现更精确的延时消息发送和消费。
    • 这种插件通过引入一个特殊的交换器类型(Delayed Message Exchange),可以让生产者在消息发送时指定一个延迟时间,消息将会在指定的延迟时间之后被交换器路由到相应的队列,然后被消费者处理。
  3. 使用定时器和消息轮询
    • 在消费者端,可以使用定时器或者消息轮询的方式,定期检查队列中的消息是否已经到达了处理时间。一旦消息到达处理时间,消费者就可以将其从队列中取出并处理。
    • 这种方法虽然没有直接利用 RabbitMQ 的内置功能,但适用于一些简单的延时消息处理需求,可以通过编码实现。

六、小结

在使用 RabbitMQ 构建消息队列系统时,关键要点包括确保消息的可靠性和完整性、实现消费者的幂等性、有效管理消息堆积、处理有序消息以及实现延时队列功能。通过合理配置和实施这些策略,可以提升系统的性能和可靠性,确保消息系统稳定运行。

推荐阅读

  1. Spring 三级缓存
  2. 深入了解 MyBatis 插件:定制化你的持久层框架
  3. Zookeeper 注册中心:单机部署
  4. 【JavaScript】探索 JavaScript 中的解构赋值
  5. 深入理解 JavaScript 中的 Promise、async 和 await

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1875153.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

“Hello, World!“ 历史由来

布莱恩W.克尼汉(Brian W. Kernighan)—— Unix 和 C 语言背后的巨人 布莱恩W.克尼汉在 1942 年出生在加拿大多伦多,他在普林斯顿大学取得了电气工程的博士学位,2000 年之后取得普林斯顿大学计算机科学的教授教职。 1973 年&#…

海南聚广众达电子商务咨询有限公司专业电商服务代名词

在数字化浪潮席卷全球的今天,电子商务行业日新月异,各大平台纷纷崭露头角。其中,抖音电商以其独特的短视频直播模式,迅速崛起成为电商领域的新星。而在这股浪潮中,海南聚广众达电子商务咨询有限公司凭借其专业的服务和…

华为实训案例

案例下载 案例内包含空拓扑图、配置完整的拓扑、以及步骤脚本文档,可按需下载。 拓扑图 任务清单 (一)基础配置 根据附录1拓扑图、附录2地址规划表、附录3设备编号表,配置设备接口及主机名信息。 将所有终端超时时间设置为永不…

我在高职教STM32——GPIO入门之按键输入(2)

大家好,我是老耿,高职青椒一枚,一直从事单片机、嵌入式、物联网等课程的教学。对于高职的学生层次,同行应该都懂的,老师在课堂上教学几乎是没什么成就感的。正因如此,才有了借助 CSDN 平台寻求认同感和成就…

【图论 树 深度优先搜索】2246. 相邻字符不同的最长路径

本文涉及知识点 图论 树 图论知识汇总 深度优先搜索汇总 LeetCode 2246. 相邻字符不同的最长路径 给你一棵 树(即一个连通、无向、无环图),根节点是节点 0 ,这棵树由编号从 0 到 n - 1 的 n 个节点组成。用下标从 0 开始、长度…

ElementUI样式优化:el-input修改样式、el-table 修改表头样式、斑马格样式、修改滚动条样式、

效果图: 1、改变日期时间组件的字体颜色背景 .form-class ::v-deep .el-date-editor { border: 1px solid #326AFF; background: #04308D !important; } .form-class ::v-deep .el-date-editor.el-input__wrapper { box-shadow: 0 0 0 0px #326AFF inset; } // 输入…

Python | Leetcode Python题解之第203题移除链表元素

题目: 题解: # Definition for singly-linked list. # class ListNode: # def __init__(self, val0, nextNone): # self.val val # self.next next class Solution:def removeElements(self, head: ListNode, val: int) -> Li…

Unity保存玩家的数据到文件中(Unity的二进制序列化)

文章目录 文章运行环境什么是二进制序列化读写文件构造函数 自定义二进制序列化 文章运行环境 Unity2022 什么是二进制序列化 Unity中的二进制序列化是一种将游戏对象或数据结构转换为二进制格式的过程,以便于存储或网络传输。这使数据能够以高效的方式保存&…

网络==>总论v4

既然是写ICT方面的文章,就要不断更新版本,不是文学,可以一劳永逸,如果不更新,看十年前或者二十年前的书意义不大,这就是为啥看到很多编程书都更新到第十几版了,因为要与时俱进。 在去一个地方旅…

JS(JavaScript)事件处理(事件绑定)

天行健,君子以自强不息;地势坤,君子以厚德载物。 每个人都有惰性,但不断学习是好好生活的根本,共勉! 文章均为学习整理笔记,分享记录为主,如有错误请指正,共同学习进步。…

C语言单链表的算法之删除节点

一:为什么要删除节点 (1)有时候链表中节点的数据不想要了,就要删除这个节点 二:删除节点的两个步骤 (1)第一步:找到这个节点 (2)第二步:删除这个…

麒麟桌面系统CVE-2024-1086漏洞修复

原文链接:麒麟桌面操作系统上CVE-2024-1086漏洞修复 Hello,大家好啊!今天给大家带来一篇在麒麟桌面操作系统上修复CVE-2024-1086漏洞的文章。漏洞CVE-2024-1086是一个新的安全漏洞,如果不及时修复,可能会对系统造成安全…

Rust单元测试、集成测试

单元测试、集成测试 在了解了如何在 Rust 中写测试用例后,本章节我们将学习如何实现单元测试、集成测试,其实它们用到的技术还是上一章节中的测试技术,只不过对如何组织测试代码提出了新的要求。 单元测试 单元测试目标是测试某一个代码单…

算法金 | K-均值、层次、DBSCAN聚类方法解析

大侠幸会,在下全网同名「算法金」 0 基础转 AI 上岸,多个算法赛 Top 「日更万日,让更多人享受智能乐趣」 聚类分析概述 聚类分析的定义与意义 聚类分析(Clustering Analysis)是一种将数据对象分成多个簇(…

AI 音乐生成器 MusicGPT,同声传译StreamSpeech!Web短视频平台Sharine

AI 音乐生成器 MusicGPT,同声传译StreamSpeech!Web短视频平台Sharine。 项目简介 MusicGPT 是一款应用程序,允许在任何平台上以高性能方式本地运行最新的音乐生成 AI 模型,而无需安装 Python 或机器学习框架等严重依赖项。 目前它仅支持 Me…

【python】OpenCV—Aruco

文章目录 Detect ArucoGuess Aruco Type Detect Aruco 学习参考来自:OpenCV基础(19)使用 OpenCV 和 Python 检测 ArUco 标记 更多使用细节可以参考:【python】OpenCV—Color Correction 源码: 链接:http…

二进制方式部署consul单机版

1.consul的下载 mkdir -p /root/consul/data && cd /root/consul wget https://releases.hashicorp.com/consul/1.18.0/consul_1.18.0_linux_amd64.zip unzip consul_1.18.0_linux_amd64.zip mv consul /usr/local/bin/ 2.配置文件 // 配置文件路径: /roo…

Vue 项目运行时,报错Error: Cannot find module ‘node:path‘

Vue 项目运行时,报错Error: Cannot find module ‘node:path’ internal/modules/cjs/loader.js:883throw err;^Error: Cannot find module node:path Require stack: - D:\nodejs\node_modules\npm\node_modules\node_modules\npm\lib\cli.js - D:\nodejs\node_mo…

240628_昇思学习打卡-Day10-SSD目标检测

240628_昇思学习打卡-Day10-SSD目标检测 今天我们来看SSD(Single Shot MultiBox Detector)算法,SSD是发布于2016年的一种目标检测算法,使用的是one-stage目标检测网络,意思就是说它只需要一步,就能把目标检…

国内多个库被 rsc 钉上 Go 耻辱柱。。。

大家好,我是煎鱼。 这还是比较突然的,下午正努力打工。国内社区群里突然就闹腾起来了。 仔细一看,原来是 Go 核心团队负责人 rsc,又冷不丁搞大招 😅。他直接把国内好几个知名库给直接钉上了 Go 源码库的耻辱柱上了。 如…