RabbitMQ 的高阶应用及可靠性保证

news2024/12/24 21:32:41

       

目录

一、RabbitMQ 高阶应用        

     1.1 消息何去何从

        1.2 过期时间

        1.3 死信队列

        1.4 延迟队列

        1.5 优先级队列

        1.6 消费质量保证(QOS)

二、持久化

三、生产者确认

四、消息可靠性和重复消费

        4.1 消息可靠性

        4.2 重复消费问题


上篇文章介绍了 RabbitMQ 的基本概念和使用,这篇文章就来介绍下其高阶应用和可靠性保证。

一、RabbitMQ 高阶应用        

        RabbitMQ 还提供了诸多高级特性,比如:过期时间、交换器备份、死信队列、延迟队列、优先级队列、持久化、消费端消息分发等等,下面介绍几个重要特性。

     1.1 消息何去何从

        mandatory 参数,当设置为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当设置为 false 时,出现上述情形,则消息直接被丢弃。那么生产者如何获取到未被路由到合适队列的消息呢?需要实现 listener,SpringBoot 中需要实现 ReturnCallback。

        immediate 参数,为 true 时,如果交换机将消息路由到队列时发现队列上并不存在消费者,那么这条消息将不会被存入队列中。当与路由键匹配的队列都没有消费者时,该消息会 return 给生产者。

        概括来说,mandatory 参数告诉服务器至少将消息路由到一个队列中,否则将消息返回给生产者。immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递,如果所有匹配的队列上无消费者,则将消息返回给生产者,不用将消息存入队列等待消费者。

        RabbitMQ 3.0 版本去掉了 immediat 参数的支持,官方解释是:会影响镜像队列的性能,增加代码的复杂性,建议采用 TTL 和 DLX 的方法替换。

        1.2 过期时间

        RabbitMQ 可以对消息和队列设置 TTL。

        设置消息的TTL。方法一:通过队列的属性设置,队列中的所有消息都有相同的过期时间,一旦消息过期,就会立即从队列中抹去。方法二:对消息单独设置,每条消息的TTL可以不同,即使消息过期也不会立即从队列中抹去,在投递前判定。如果两者一同使用,则以最小的那个为准,消息的生存时间一旦超过了设置的TTL,就会变成“死信”,消费者则无法收到该消息。设置过期时间的方法如下:

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 6000);
Queue queue = new Queue(vodQueue, true, false, false, args);

        1.3 死信队列

        DLX,全称为 Dead-Letter-Exchange,死信交换器。当一个消息在队列中变成死信之后,它能被发送到另一个交换器中,这个交换器即是 DLX,绑定死信交换器的队列称为死信队列。消息变为死信的情况:

  • 消息被拒绝,并且设置的 requeue 为 false
  • 消息过期
  • 队列达到最大长度

        DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动的将消息重新发布到设置的 DLX 上去,进而被路由到另一个队列中,即死信队列。可以监听这个队列中的消息进行相应的处理。设置死信队列的方法:

Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx-exchange");
Queue queue = new Queue(vodQueue, true, false, false, args);

        对于 RabbitMQ 来说,DLX 是一个非常有用的特性。他可以处理异常情况,消息不能被消费者正确消费而被至于死信队列中的情况后去分析程序可以通过这个死信队列中的内容来分析当时所遇到的一场情况。进而可优化改善系统。

        1.4 延迟队列

        所谓延迟队列是指当消息被发出后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到消息。

        在AMQP协议中,或者 RabbitMQ 本身并没有直接支持延迟队列的功能,但可以通过 DLX 和TTL 来实现。        

        1.5 优先级队列

         优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。设置优先级队列

Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
Queue queue = new Queue(vodQueue, true, false, false, args);

        在发送消息时设置当前的优先级,默认最低为0,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个是有前提的:如果在消费者消费速度大于生产者的生产速度且broker 中没有消息堆积的情况,对发送的消息设置优先级也就没有什么实际意义了。因为生产者刚发送完一条消息就被消费了,那么就意味着 broker 中至多有一条消息,对于单条消息来说优先级是没意义的。       

        1.6 消费质量保证(QOS)

        当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询的方式分发给消费者。每条消息只会发送给一个消费者。这种方式非常适合扩展,而且是专门为并发程序设计的。如果现在的负载加重,只需要创建更多的消费者即可。

        这种方式不那么优雅,分发中不管消费者的消息是否处理完了,试想一下,某些消费者的任务繁重,来不及处理消息并确认,而某些消费者由于某些原因很快处理完了所分配的消息,进而进程空闲,这样会造成总体的吞吐量下降。该如何处理这种情况呢?引入Qos,他会告诉 broker 我没消费完当前消息前,不要给我新消息了,这就保证了消费质量。Qos对于拉模式是无效的。                设置方法如下:

// prefetchSize和prefetchCount设置为0,说明无限制
// prefetchSize: 指定消费者可以接收的最大内容量(单位通常是字节)。如果设置了非零值,RabbitMQ 会阻止发布者发送更多的消息,直到消费者发送了足够多的确认来释放足够的容量。默认情况下,RabbitMQ 并不实现 prefetchSize 参数,所以通常设置为0,表示不对此做限制。
// prefetchCount: 更常用的一个参数,表示消费者最多可以接收多少个未确认的消息。当达到这个数量后,RabbitMQ 将暂停向该消费者推送更多消息,直到消费者确认了部分消息,腾出了“槽位”。例如,设置为1意味着消费者每次处理完一个消息并发送确认之后,才能接收下一个消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

二、持久化

        持久化可以提高 RabbitMQ 的可靠性,防止在异常情况下消息的丢失。RabbitMQ 的持久化分成三部分:交换器的持久化、队列的持久化和消息的持久化。

        交换器的持久化可以在声明交换器的时候设置,如果交换器不设置持久化,RabbitMQ 重启后,交换器的元数据会丢失,不过消息不会丢失,只是不能将消息发送给这个交换器了。

        队列的持久化在声明队列的时候设置,如果不设置队列的持久化,RabbitMQ 服务重启后,队列的元数据会丢失,此时数据也会丢失。

        将交换器、队列、消息都设置成持久化后能保证数据不丢失吗?答案是否定的。

  1. 从消费者的角度来看,将 autoAck 设置成 true,那么当消费者接受到相关消息后,还没来得及处理就宕机了,这样也算数据丢失。
  2. 在持久化的消息正确存入 RabbitMQ 之后,还需要一段时间才能存入磁盘。RabbitMQ 不会为每条消息都进行同步存盘(调用内核的fsync)的处理,可能仅保存在操作系统缓存之中而不是物理磁盘之中。如果在这段时间内,RabbitMQ 发生了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。

        上面的问题可以通过镜像队列机制来解决。相当于配置了副本,如果主节点在此特殊时期挂掉了,可以自动切换到从节点,这样有效的保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全保证不丢失,但这样已经好很多。

三、生产者确认

        生产者将消息发送出去后,消息到底有没有到达服务器呢?如果不进行配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下,生产者不知道消息有没有正确的到达服务器。如果到达服务器之前就丢失了,持久化操作也解决不了问题,因为还没到达服务器,何谈持久化?针对这个问题提供了两种解决方式:

  • 通过事务机制实现
  • 通过发送确认机制实现

        开启事务多了几个环节,只有消息成功被 RabbitMQ 接收,事务才能提交,否则便可在捕获异常之后进行处理。但事务会严重影响 RabbitMQ 的性能,大大降低吞吐量。

        发送方确认是一种轻量级的机制,生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包括消息的唯一id),这就使得生产者知晓消息已经正确到达目的地。如果消息和队列是持久化的,那么确认消息会在消息写入磁盘之后发出。

        事务机制在一条消息发出后会使发送端阻塞,以等待rabbitmq的回应,之后才能发送下一条消息。相比消息确认机制,发送方确认机制是异步的。事务机制和确认机制是互斥的不能共存。

四、消息可靠性和重复消费

        只要涉及到消息中间件,消息可靠性和重复消费就是无可避免的话题,那 RabbitMQ 是如何设计的呢?

        4.1 消息可靠性

  1. 持久化设置,这在上文已经介绍,通过持久化队列、交换器和消息来保存消息。
  2. 事务和确认机制:上文已经介绍了生产者的确认机制,通过这个机制来保证生产者发送的消息不回丢失。
  3. 消费者消息确认:可以通过消息的手动ack来保证消息能消费完成
  4. 消息镜像队列:设置队列为镜像队列,可以将消息复制到多个节点,即使某个节点宕机,消息仍可以从其他节点获取。

        通过以上措施的组合使用,可以大大提高 RabbitMQ 消息传递的可靠性,尽可能减少消息丢失的风险。然而,即使采取了所有措施,也不能完全保证100%的消息不丢失,因为消息在传输过程中可能还受到网络、硬件故障等因素的影响。在实际应用中,需要根据业务场景权衡消息的可靠性、性能和成本。

        4.2 重复消费问题

        在 RabbitMQ 中,重复消费指的是同一个消息被多个消费者或者同一个消费者消费多次的现象。这种问题可能会导致数据不一致或者业务逻辑错误。造成重复消费的原因可能有:

  1. 消费者ACK确认失败:消费者接收到消息并开始处理,但是在处理完毕并发送 ACK 确认之前断开了连接,比如网络抖动或消费者进程异常退出,导致 RabbitMQ 未收到ACK确认,于是消息重新入队等待再次被消费。
  2. 消息重回队列:在有死信交换机(Dead Letter Exchange, DLX)或者消息TTL(Time To Live)到期后重新投递的情况下,消息可能被重新发送到原来的队列或另一个队列,从而被再次消费。
  3. 消费者超时设置不当:如果消费者的超时设置过短,可能会在消息处理未完成时就已经被认为超时,消息会被重新放回队列。

        那如何解决重复消费问题呢?

  1. 消息确认机制:确保消费者正确使用手动确认模式(Manual Acknowledgments),只有当消息处理成功后才发送 ACK 确认给 RabbitMQ,否则在遇到异常时可以重新消费。
  2. 幂等性设计:消费者的业务逻辑应当设计为幂等的,即使同一条消息被消费多次,处理结果也是相同的,不影响业务状态。例如,通过消息ID或业务流水号来判断消息是否已经处理过。
  3. 防重ID:在消息体中携带一个全局唯一的ID,消费者在处理消息前,先检查这个ID是否已经被处理过,如果已经处理过,则直接丢弃消息。

        总之,避免重复消费的关键在于消息确认机制、幂等性设计以及合理的重试和补偿策略。同时,完善的日志记录和监控也是非常重要的,以便在出现问题时能够快速定位和修复。

往期经典推荐

探秘 RabbitMQ 的设计理念与核心技术要点-CSDN博客

走进 Mybatis 内核世界:理解原理,释放更多生产力-CSDN博客

深入浅出 Kafka 消费者:解密分布式消息流的幕后英雄_kafka消费-CSDN博客

深入剖析Kafka生产者:揭秘消息从发送到落地的全过程-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1551109.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

理解JVM:从字节码到程序运行

大家好&#xff0c;我是程序员大猩猩。 今天我们来讲一下JVM&#xff0c;好多面试者在面试的时候&#xff0c;都会被问及JVM相关知识。那么JVM到底是什么&#xff0c;要理解它到底是出于什么原因&#xff1f; JVM俗称Java虚拟机&#xff0c;它是一个抽象的计算机&#xff0c;…

香港 4月7日「比特币新资产 Runes 协议」线下沙龙报名开启!

比特币减半在即&#xff0c;聚焦新资产、布局新赛道。 香港时间 2024 年 4 月 7 日&#xff0c;聚焦「比特币新资产 & Runes 协议」的《New Asset Issuance and Settlement on Bitcoin》主题沙龙将于「香港海富中心」欢迎大家的到来&#xff01; 我们邀请了比特币新资产生…

题目:摆花(蓝桥OJ 0389)

问题描述&#xff1a; 题解&#xff1a; #include <bits/stdc.h> using namespace std; using ll long long; const int N 105; const ll p 1e6 7; ll a[N], dp[N][N];int main() {int n, m; cin >> n >> m;for(int i 1; i < n; i)cin >> a[i…

IDEA Android新建项目基础

title: IDEA Android基础开发 search: 2024-03-16 tags: “#JavaAndroid开发” 一、构建基本项目 在使用 IDEA 进行基础的Android 开发时&#xff0c;我们可以通过IDEA自带的新建项目功能进行Android应用开发基础架构的搭建&#xff0c;可以直接找到 File --> New --> …

【机器学习-08】参数调优宝典:网格搜索与贝叶斯搜索等攻略

超参数是估计器的参数中不能通过学习得到的参数。在scikit-learn中&#xff0c;他们作为参数传递给估计器不同类的构造函数。典型的例子有支持向量分类器的参数C&#xff0c;kernel和gamma&#xff0c;Lasso的参数alpha等。 ​ 在超参数集中搜索以获得最佳cross validation交叉…

Java开发过程中如何进行进制换换

最近由于工作上的需要&#xff0c;遇到进制转换的问题。涉及到的进制主要是十进制、十六进制、二进制转换。 1、十进制转十六进制、二进制 调用java自带的api,测试十进制转16进制、2进制 package com.kangning.common.utils.reflect;/*** 十进制 转 十六进制* 十进制 转 二进…

黑群晖Docker安装aria2-pro

前言 最近买了星际蜗牛C款当Nas&#xff0c;来满足我的存储需求&#xff0c;在之前我写过一篇docker安装aria2-pro的文章&#xff0c;既然买了nas那当然也要安装一个aria2-pro做下载器 1.安装 Container Manager 套件 可以在套件中心搜索docker找到 2.下载aria2-pro镜像 打…

力扣热门算法题 89. 格雷编码,92. 反转链表 II,93. 复原 IP 地址

89. 格雷编码&#xff0c;92. 反转链表 II&#xff0c;93. 复原 IP 地址&#xff0c;每题做详细思路梳理&#xff0c;配套Python&Java双语代码&#xff0c; 2024.03.24 可通过leetcode所有测试用例。 目录 89. 格雷编码 解题思路 完整代码 Python Java 92. 反转链表…

利用Tensor在jetson orin 上加速YOLOv5

一、第一种方法&#xff0c;需要下载各种包&#xff1a; 要用到一个大佬的开源&#xff0c;GitHub地址如下&#xff1a; https://github.com/wang-xinyu/tensorrtx/tree/master/yolov51. 安装pycuda&#xff0c;在线安装pycuda pip3 install pycuda 2. Windows操作&#xf…

Ubuntu Desktop 更改默认应用程序 (Videos -> SMPlayer)

Ubuntu Desktop 更改默认应用程序 [Videos -> SMPlayer] References System Settings -> Details -> Default Applications 概况、默认应用程序、可移动介质、法律声明 默认应用程序&#xff0c;窗口右侧列出了网络、邮件、日历、音乐、视频、照片操作的默认应用程序…

2024全行业数字化转型企业建设解决方案PPT合集(附下载)

精品推荐&#xff0c;2024全行业数字化转型企业建设解决方案PPT合集&#xff0c;精品PPT源格式共21份。 点击直达星球下载地址&#xff08;文末领取优惠券&#xff09;&#xff1a;2024全行业数字化转型企业建设解决方案PPT合集 1.制造业数字化转型解决方案及应用.pptx 2.医院…

Java代码基础算法练习-求一个三位数的各位数字之和-2024.03.27

任务描述&#xff1a; 输入一个正整数n&#xff08;取值范围&#xff1a;100<n<1000&#xff09;&#xff0c;然后输出每位数字之和 任务要求&#xff1a; 代码示例&#xff1a; package M0317_0331;import java.util.Scanner;public class m240327 {public static voi…

langchin-chatchat部分开发笔记(持续更新)

大模型相关目录 大模型&#xff0c;包括部署微调prompt/Agent应用开发、知识库增强、数据库增强、知识图谱增强、自然语言处理、多模态等大模型应用开发内容 从0起步&#xff0c;扬帆起航。 大模型应用向开发路径及一点个人思考大模型应用开发实用开源项目汇总大模型问答项目…

Matlab进阶绘图第47期—气泡分组蝴蝶图

气泡分组蝴蝶图是分组蝴蝶图与气泡图的组合——在分组蝴蝶图每组柱子上方添加大小不同的气泡&#xff0c;用于表示另外一个数据变量&#xff08;如每组柱子值的和&#xff09;的大小。 本文利用自己制作的BubbleButterfly工具&#xff0c;进行气泡分组蝴蝶图的绘制&#xff0c…

从接口发现到文件上传getshell

0x01 信息收集 通过fofa&#xff0c;子域名收集等相关工具搜索域名 定位到站点&#xff1a;htps://xx..edu.cn/x/xx/ 0x02 寻找接口 通过f12寻找相关的js&#xff0c;发现有其他的页面 0x03 拼接路径 https://xx.xx.edu.cn/xx/xx/repairResgister 之后未授权获取到注册用户的页…

【群晖】解决docker容器启动出现 database is locked 错误

【群晖】解决docker容器启动出现 database is locked 错误 问题描述 升级DSM 7.2 V3版本后docker中的大量容器出现虽然显示启动状态&#xff0c;但是webStation中服务是禁用中。 因此选择手动重启容器&#xff0c;但是发现容器无法启动&#xff0c;提示了以下错误&#xff1…

盏多多生物现已加入2024第七届燕窝天然滋补品展

参展企业介绍 广东省盏多多生物科技有限公司是一家从事食品销售,食品销售,食品进出口等业务的公司&#xff0c;成立于2018年12月07日&#xff0c;公司坐落在广东省&#xff0c;详细地址为&#xff1a;惠州市东江三路45号悦榕湾27层05号&#xff08;仅限办公&#xff09;;经国家…

课堂练习:环境体验——Linux 文件操作命令

任务描述 第二个任务就是了解Linxu的文件查看命令&#xff0c;文件编辑基本命令。 相关知识 为了完成本关任务&#xff0c;你需要掌握&#xff1a; 1.文件查看命令。 2.文件编辑基本命令。 文件查看命令 我们要查看一些文本文件的内容时&#xff0c;要使用文本编辑器来查看…

解决NRF52832正常添加OTA代码后无法进入app一直运行在bootloader的问题!

问题现象描述&#xff1a; SDK版本17.1.0 在 mergehex工具 合并以下文件setting.hex bootloader.hex app.hex sortdevice.hex 之后烧录固件第一次运行 程序一直运行在bootloader&#xff0c;蓝牙名称显示 DFUTARG &#xff0c;必须要进行一次OTA才进入APP 注意&#xff1a;如…

补单系统平台第三方接口,电商平台数据市场接口api提供

补单系统平台第三方接口&#xff0c;电商平台数据市场接口api提供 部分数据参数