RabbitMQ问题总结

news2024/12/24 11:27:51


:::info
使用场景

  • 异步发送(验证码、短信、邮件。。。)
  • MySQL 和 Redis、ES 之间的数据同步
  • 分布式事务
  • 削峰填谷

  • :::

如何保证消息不丢失


上图是消息正常发送的一个过程,那在哪个环节中消息容易丢失?在哪一个环节都可能丢失

  • 生产者宕机,消息就可能到达不了交换机,或者消息未到达队列
  • 消息发送成功后,消费者还没有消费前,MQ 宕机,就有可能导致队列中消息丢失
  • 消费者宕机,导致消费者未接收到消息

生产者,队列,消费者三个层面都有可能导致消息丢失,所以保证消息不丢失需要从这三个层面解决

生产者确认机制

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。消息发送到 MQ 后,会返回一个结果给发送者,表示消息是否处理成功。

如果消息发送成功,就会返回 publish-confirm ack,如果发送到交换机失败,就会返回 publish-confirm nack,如果发送到队列失败,就会返回 publish-return ack。
消息失败之后如何处理呢?

  • 回调方法即时重发
  • 记录日志
  • 保存到数据库然后定时重发,成功发送后即刻删除表中的数据

消息持久化

MQ 默认是内存存储消息,开启持久化功能可以确保缓存在 MQ 中的消息不丢失。

  1. 交换机持久化
@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct",true,false);
}
  1. 队列持久化
@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久的
    return QueueBuilder.durable("simple.queue").build();
}
  1. 消息持久化,SpringAMQP 中的消息默认是持久的,可以通过 MessageProperties 中的 DeliveryMode 来指定
Message msg = MessageBuilder
        .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
    	.setDeliveryMode(MessageDeliveryMode.PERSISTENE) // 持久化
    	.build();

消费者确认

RabbitMQ 支持消费者确认机制,即消费者处理消息后可以向 MQ 发送 ack 回执,MQ 收到 ack 回执后才会删除消息。
SpringAMQP 则允许配置三种确认模式:

  • manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack。
  • auto:自动 ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack;抛出异常则返回 nack。
  • none:关闭 ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除

我们可以利用 Spring 的 retry 机制,在消费者出现异常时利用本地重试,设置重试次数,当次数达到了以后,如果消息依然失败,将消息投递到异常交换机,交由人工处理。

RabbitMQ 如何保证消息不丢失?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为 auto,由 spring 确认消息处理成功后完成 ack
  • 开启消费者失败重试机制,多次重试失败后将消息投递到异常交换机,交由人工处理

消息的重复消费问题如何解决的

为什么会出现重复消费的问题?

  • 网络抖动
  • 消费者挂了


消费者已经处理完消息,还没来得及给 MQ 发送确认,这时网络发生了抖动或者消费者挂了,等网络恢复之后或者消费者重启之后,因为队列没有收到确认,所以消息还在 MQ 中,因为我们设置了重试机制,消费者就会重新消费消息。
解决方案

  • 每条消息设置一个唯一的标识 id,消费者收到消息后去业务 id 是否存在
  • 幂等方案:分布式锁、数据库锁(悲观锁、乐观锁)

RabbitMQ 死信交换机(RabbitMQ 延迟队列有了解过嘛)

  • 延迟队列:进入队列的消息会被延迟消费的队列
  • 场景:超时订单、限时优惠、定时发布

延迟队列=死信交换机+TTL(消息的生存时间)

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称 DLX)。

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("simple.queue")  // 指定队列名称,并持久化
        .ttl(10000)  // 设置队列的超时时间,10秒
        .deadLetterExchange("dl.direct")  // 指定死信交换机
        .build();
}

TTL

TTL,也就是 Time-To-Live。如果一个队列中的消息 TTL 结束仍未消费,则会变成死信,ttl 超时分为两种情况:

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间


哪个 ttl 短以哪个为准。

// 创建消息
Message message = MessageBuilder
	.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
	.setExpiration("5000")
	.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new correlationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct","ttl",message,correlationData);

延迟队列插件

实现延迟队列,还可以使用官方提供的插件。DelayExchange 插件,需要安装在 RabbitMQ 中
RabbitMQ 有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.htmlimage.png

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name ="delay.queue", durable="true"),
    exchange = @Exchange(name="delay.direct",delayed="true")
    key="delay"
))
public void listenDelayedQueue(String msg){
	log.info("接收到 delay.queue的延迟消息:{}",msg);
}
// 创建消息
Message message = MessageBuilder
	.withBody("hello,delayed message".getBytes(StandardCharsets.UTF_8))
	.setHeader("x-delay",10000)
	.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);

RabbitMQ 死信交换机?(RabbitMQ 延迟队列有了解过嘛)

  • 我们当时一个业务使用到了延迟队列(超时订单、限时优惠、定时发布…)
  • 其中延迟队列就用到了死信交换机和 TTL 实现的
  • 消息超时未消费就会变成死信(死信的其他情况:拒绝被消费,队列满了)

延迟队列插件实现延迟队列 DelayExchange

  • 声明一个交换机,添加 delayed 属性为 true
  • 发送消息时,添加 x-delay 头,值为超时时间

如果有 100 万消息堆积在 MQ,如何解决(消息堆积怎么解决)

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
解决消息堆积有三种思路:

  • 增加更多消费者,提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积,提高堆积上限

惰性队列

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持百万条的消息存储
@Bean
public Queue lazyQueue(){
    return QueueBuilder
        .durable("lazy.queue")
        .lazy()  // 开启x-queue-mode为lazy
        .build();
}
// 或者
@RabbitListener(queuesToDeclare = @Queue(
    name="lazy.queue",
    durable="true",
    arguments=@Argument(name="x-queue-mode",value="lazy")
))
public void listenLazyQueue(String msg){
	log.info("接收到lazy.queue的消息:{}",msg);
}

如果有 100 万消息堆积在 MQ,如何解决?
解决消息堆积有三种思路:

  • 增加更多的消费者,提高消费速度
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积,提高堆积上限,采用惰性队列
    • 在声明队列的时候可以设置属性 x-queue-mode 为 lazy,即为惰性队列
    • 基于磁盘存储,消息上限高
    • 性能比较稳定,但基于磁盘存储,受限与磁盘 IO,时效性会降低

RabbitMQ 高可用机制

  • 在生产环境下,使用集群来保证高可用性
  • 普通集群、镜像集群、仲裁队列

普通集群

普通集群,或者叫标准集群(classic cluster),具备下列特征:

  • 会在集群的各个节点共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回。
  • 队列所在节点宕机,队列中的消息就会丢失。

镜像集群

镜像集群:本质是主从模式,具备下面的特征:

  • 交换机、队列、队列中的消息会在各个 mq 的镜像节点之间同步备份。
  • 创建队列的节点被称为该队列的主节点,备份到其他节点叫做该队列的镜像节点。
  • 一个队列的主节点可能是另一个队列的镜像节点。
  • 所有操作都是在主节点完成,然后同步给镜像节点。
  • 主宕机后,镜像节点会替代成新的主。

仲裁队列

仲裁队列:仲裁队列是 3.8 版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于 Raft 协议,强一致
@Bean
public Queue quorumQueue(){
    return QueueBuilder
        .durable("quorum.queue")  // 持久化
        .quorum()  //仲裁队列
        .build();
}

RabbitMQ 的高可用机制有了解过嘛

  • 在生产环境下,我们当时采用的镜像模式搭建的集群,共有 3 个节点。
  • 镜像队列结构是一主多从(从就是镜像),所有操作都是主节点完成,然后同步给镜像节点。
  • 主节点宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)

出现数据丢失怎么解决?
我们可以采用仲裁队列,与镜像队列一样,都是主从模式,支持主从数据同步,主从同步基于 Raft 协议,强一致。并且使用起来也非常简单,不需要额外的配置,在声明队列的时候只要指定这个是仲裁队列即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1413246.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu20.4 Mono C# gtk 编程习练笔记(四)

连续实时绘图 图看上去不是很清晰,KAZAM录屏AVI尺寸80MB, 转换成gif后10MB, 按CSDN对GIF要求,把它剪裁缩小压缩成了上面的GIF,图像质量大不如原屏AVI,但应该能说明原意:随机数据随时间绘制在 gtk 的 drawin…

使用OpenCV实现一个简单的实时人脸跟踪

简介: 这个项目将通过使用OpenCV库来进行实时人脸跟踪。实时人脸跟踪是一项在实际应用中非常有用的技术,如视频通话、智能监控等。我们将使用OpenCV中的VideoCapture()函数来读取视频流,并使用之前加载的Haar特征级联分类器来进行人脸跟踪。 …

浅谈Python两大爬虫库——urllib库和requests库区别

目录 一、urllib库 1、使用方法 2、功能 3、效率 二、requests库 1、使用方法 2、功能 3、效率 三、总结与建议 在Python中,网络爬虫是一个重要的应用领域。为了实现网络爬虫,Python提供了许多库来发送HTTP请求和处理响应。其中,url…

MySql8的简单使用(1.模糊查询 2.group by 分组 having过滤 3.JSON字段的实践)

MySql8的简单使用(1.模糊查询 2.group by 分组 having过滤 3.JSON字段的实践) 一.like模糊查询、group by 分组 having 过滤 建表语句 create table student(id int PRIMARY KEY,name char(10),age int,sex char(5)); alter table student add height…

【Linux C | 网络编程】入门知识:TCP协议、TCP客户端、TCP服务端

😁博客主页😁:🚀https://blog.csdn.net/wkd_007🚀 🤑博客内容🤑:🍭嵌入式开发、Linux、C语言、C、数据结构、音视频🍭 🤣本文内容🤣&a…

MySQL 数据库 JDBC 简化 JDBCTemplate

JDBCTemplate编程 场景:使用JdbcTemplate技术向MySQL中插入一条数据 表结构如下 create table t_person(person_id int primary key auto_increment,person_name varchar(20),age tinyint,sex enum(男,女,奥特曼),height decimal(4,1),birthday datetime ) AP…

勤学苦练“prompts“,如沐春风“CodeArts Snap“

前言 CodeArts Snap 上手一段时间了,对编程很有帮助。但是,感觉代码编写的不尽人意。 我因此也感到困惑,想要一份完整的 CodeArts Snap 手册看看。 就在我感觉仿佛"独自彷徨在这条悠长、悠长又寂寥的雨巷"时,我听了大…

Ubuntu+Apache+MySQL+PHP+phpstorm+xdebug下的debug环境搭建(纯小白笔记)

关键词 一、win10双系统装Ubuntu(Deepin) 参考文章: https://www.cnblogs.com/masbay/p/11627727.html 1.1 首先需要确定自己操作系统的引导方式 1.1.1 查看BIOS模式 “winr"快捷键进入"运行”,输入"msinfo32…

微软 AD |域控制器 | 组件 | 域服务 | 对象解析

介绍 Active Directory(AD),是微软的目录服务,提供强大的功能和管理体系,用于组织管理和安全存储网络上的资源和用户、计算机、服务对象等信息。 AD 功能: 身份验证和访问控制: 提供集中式的身…

2024 高级前端面试题之 CSS 「精选篇」

该内容主要整理关于 CSS 的相关面试题,其他内容面试题请移步至 「最新最全的前端面试题集锦」 查看。 CSS模块精选篇 1. 盒模型2. BFC3. 层叠上下文4. 居中布局5. 选择器权重计算方式6. 清除浮动7. link 与 import 的区别8. CSS3的新特性9. CSS动画和过渡10. 有哪些…

Mac中java jdk、android sdk、flutter sdk目录

1、Java JDK 目录 (1)官网下载的 Java JDK Java JDK下载官网 /Library/Java/JavaVirtualMachines(2)Android Studio下载的 Java JDK /Users/用户名/Library/Java/JavaVirtualMachines2、Android SDK 目录 /Users/用户名/Libr…

以梦为码,CodeArts Snap 缩短我与算法的距离

背景 最近一直在体验华为云的 CodeArts Snap,逐渐掌握了使用方法,代码自动生成的准确程度大大提高了。 自从上次跟着 CodeArts Snap 学习用 Python 编程,逐渐喜欢上了 Python。 我还给 CodeArts Snap 起了一个花名: 最佳智能学…

SQL注入实战:二阶注入

一、二阶注入的原理 1、二阶注入也称为SOL二次注入。 2、二次注入漏洞是一种在Web应用程序中广泛存在的安全漏洞形式:相对于一次注入漏洞而言,二次注入漏洞更难以被发现,但是它却具有与一次注入攻击漏洞相同的攻击威力。 3、简单的说,二次…

GitHub国内打不开(解决办法有效)

最近国内访问github.com经常打不开,无法访问。 github网站打不开的解决方法 1.打开网站http://tool.chinaz.com/dns/ ,在A类型的查询中输入 github.com,找出最快的IP地址。 2.修改hosts文件。 在hosts文件中添加: # localhost n…

Leetcode—2807. 在链表中插入最大公约数【中等】

2023每日刷题(九十九) Leetcode—2807. 在链表中插入最大公约数 实现代码 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val…

Datawhale 组队学习之大模型理论基础 Task7 分布式训练

第8章 分布式训练 8.1 为什么分布式训练越来越流行 近年来,模型规模越来越大,对硬件(算力、内存)的发展提出要求。因为内存墙的存在,单一设持续提高芯片的集成越来越困难,难以跟上模型扩大的需求。 为了…

力扣每日一题 ---- 1039. 多边形三角剖分的最低得分

这题的难点在哪部分呢,其实是怎么思考。这道题如果之前没做过类似的话,还是很难看出一些性质的,这题原本的话是没有图片把用例显示的这么详细的。这题中有个很隐晦的点没有说出来 剖出来的三角形是否有交叉,这题中如果加一个三角…

企业级大数据安全架构(六)数据授权和审计管理

作者:楼高 本节详细介绍企业级大数据架构中的第六部分,数据授权和审计管理 1.Ranger简介 Apache Ranger是一款被设计成全面掌管Hadoop生态系统的数据安全管理框架,为Hadoop生态系统众多组件提供一个统一的数据授权和管理界面, 管…

Redis为什么速度快:数据结构、存储及IO网络原理总结

Redis,作为内存数据结构存储的佼佼者,其高性能表现一直备受赞誉。那么,Redis究竟是如何实现这一点的呢?接下来,我们将更深入地探讨其背后的关键技术,并提供进一步的优化策略。 一、内存存储与数据结构设计…

【Linux】进程概述

创作不易&#xff0c;本篇文章如果帮助到了你&#xff0c;还请点赞 关注支持一下♡>&#x16966;<)!! 主页专栏有更多知识&#xff0c;如有疑问欢迎大家指正讨论&#xff0c;共同进步&#xff01; &#x1f525;c系列专栏&#xff1a;C/C零基础到精通 &#x1f525; 给大…