RabbitMQ 能保证消息可靠性吗

news2025/1/13 6:21:04

系列文章目录

消息队列选型——为什么选择RabbitMQ
RabbitMQ 五种消息模型


RabbitMQ 能保证消息可靠性吗

  • 系列文章目录
  • 前言
  • 一、消息可靠性的定义
  • 二、几种不可靠的场景
  • 三、防意外丢失
    • 1. 消息持久化
    • 2. 队列持久化
    • 3. 发布确认
      • 3.1 简单发布确认
      • 3.2 批量发布确认
      • 3.3 异步发布确认
    • 4. 手动接收确认
    • 5. 死信队列
  • 四、防重复传递
    • 1. 消息确认机制
    • 2. 幂等性校验(需代码实现)
  • 五、不可靠场景的对策
  • 六、总结


前言

前面我们在做MQ组件选型时,提到了rabbitMQ的消息可靠性,那么它到底可靠到什么程度?又是如何保证消息可靠性的呢?今天我们就一起来看一下


一、消息可靠性的定义

消息可靠性是指在消息传递过程中,确保消息能够被完整、准确、可靠地传递到目的地。更具体的说分为两个角度:

  1. 不会意外丢失
  2. 不会重复传递

因此,我们必须保证消息不会因为网络故障、系统故障或其他异常原因而丢失或重复传递,否则可能导致业务逻辑错误、数据损坏或系统崩溃等问题

二、几种不可靠的场景

  1. 消息漏发送:生产者在发送消息时,如果不观察RabbitMQ服务器的确认消息,可能导致有些消息在网络中丢失而不自知
  2. 消息重复发送:如果生产者在发送消息时,由于网络抖动或者其他原因,生产者无法从RabbitMQ收到消息确认,此时生产者会重发同样一条消息,从而导致消息重复
  3. 消息未储存:rabbitMQ服务器宕机,导致已经在rabbit服务器内的消息直接丢失
  4. 消费者重复消费:如果消费者和MQ都不记得曾经消费过的消息,主动拉取或推送了旧的消息,导致重复消费,

三、防意外丢失

在这里,必须提前声明一点:即消息意外丢失因为rabbitMQ经由转换机,如果匹配不到任何队列,是会主动丢弃该消息的,这种丢失属于业务配置上的主动丢弃,不记在意外丢失中

1. 消息持久化

消息持久化需要在消息生产者修改代码

   String MESSAGE = "Hello, RabbitMQ!";
   // 设置消息持久化
   AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
      .contentType("text/plain")
      .deliveryMode(2) // deliveryMode=1代表不持久化,deliveryMode=2代表持久化
      .build();
   channel.basicPublish("", MESSAGE_QUEUE, properties, MESSAGE.getBytes("UTF-8"));

也可以直接使用内置的properties

   channel.basicPublish("", MESSAGE_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes("UTF-8"));

2. 队列持久化

尽管我们上面已经使用了消息持久化,但是这是不够的,消息本身不会作为一个实体存在硬盘上,真正落在硬盘上的是队列,及队列中的消息。所以,要想保存消息,还得把消息所在的队列持久化,因此需要在声明队列时,将其 durable 属性设置为true

    // 设置队列持久化
    boolean durable = true;
    channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

注意,该属性不可修改,如果要把一个队列改成持久化,得先删除,再创建才行


3. 发布确认

我们上面已经成功把消息做了持久化,不过这并不能彻底避免消息丢失,比如在消息发布者发布消息的过程中,在消息成功持久化之前,rabbitMQ就崩溃了,此时消息仍然会丢失。因此,有必要执行发布确认的操作

即消息发送后,MQ要对生产者发送消息确认,确认已经持久化后,再进行发布确认
在这里插入图片描述
发布确认默认不开启,如果要开启,需要在channel上设置

    Channel channel = connection.createChannel();
    // 将信道设置为发布确认
    channel.confirmSelect();

进行完该项设置后,还需要针对确认消息的类型,适当的修改发送方代码。一般来说,发布确认有以下类型

3.1 简单发布确认

即发送后,单条单条的消息是否被rabbitMQ服务器接受

	String message = "Hello, RabbitMQ!";
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        
    // 设置简单发布确认
    channel.confirmSelect();
    if (channel.waitForConfirms()) {
         System.out.println("Message published successfully.");
    } else {
         System.err.println("Failed to publish message.");
    }

可以看到,这种方式其实采用的是发一条消息,确认一次,效率并不高。

3.2 批量发布确认

批量发布和简单发布,在调用方法上并没有区别,只是发送的消息,从发一条就等待确认一次,变成了发一批,才确认一次。

	int MESSAGE_COUNT = 100;
    String message = "Hello, RabbitMQ!";
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    }

    // 设置批量发布确认
    channel.confirmSelect();
    int outstandingConfirms = MESSAGE_COUNT;
    while (outstandingConfirms > 0) {
        outstandingConfirms -= channel.waitForConfirms();
    }
    System.out.println("All messages published successfully.");

此种方式,虽然仍然会同步阻塞,但从每条确认一次进化到批量确认一次,大大节约了网络耗时。但是可能会出现一些消息发布成功,但是一些消息未成功的情况,不易进行排查和处理

3.3 异步发布确认

异步确认则采用的另一种方案,通过给channel设置一个确认监听器,来异步的做确认,即将发布消息和确认处理放在不同的线程中处理

   int MESSAGE_COUNT = 100;
   String message = "Hello, RabbitMQ!";
   ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
   Set<Long> failConfirmMessages = new HashSet<>();
   // 异步发布确认
   channel.confirmSelect();
   // 需设置两个监听器,前者为肯定确认,后者为否定确认
   channel.addConfirmListener(new ConfirmCallback() {
       @Override
       // deliveryTag 代表 投递消息的序号;multiple为true,则代表确认所有小于或等于当前消息deliveryTag的状态,为false,代表仅确认该条消息
       public void handle(long deliveryTag, boolean multiple) throws IOException {
           if (multiple) {
               ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
               // 清除所有小于该序号的消息
               confirmed.clear();
           } else {
           	   // 仅清除本条消息
               outstandingConfirms.remove(deliveryTag);
           }
       }
   }, new ConfirmCallback() {
       @Override
       public void handle(long deliveryTag, boolean multiple) throws IOException {
           System.err.println("Failed to publish message.");
           failConfirmMessages.add(deliveryTag);
       }
   });
   for (int i = 0; i < MESSAGE_COUNT; i++) {
       long nextSeqNo = channel.getNextPublishSeqNo();
       channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
       outstandingConfirms.put(nextSeqNo, message);
   }
   // 一段时间过后
   ......
   // 看最后是否还有消息被确认丢失,此时可选择是否要重新发送
   if (failConfirmMessages .size() == 0 && outstandingConfirms.size() == 0) {
   		System.out.println("All messages published successfully.");
   } else {
		System.out.println("Some messages need republish.");
   }
   

在这里插入图片描述

通过异步方式做确认,能提升性能,缺点是需要一些多线程的知识,实现难度较高。

4. 手动接收确认

如果第三点,是保证消息发送者到MQ服务器之间,消息不会丢失。那么同理,还需要保证MQ服务器到消费者间,消息不会丢失。

这时候,就需要手动接收确认了,即消费者得到消息后,先进行业务处理(或消息存储),直到业务处理完成后。再告知rabbitMQ服务器,消息我收到了。从而避免了自动ack后,消费者宕机导致的消息未处理完就丢失的问题,其示例代码如下

 // 创建消费者对象
 final Consumer consumer = new DefaultConsumer(channel) {
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
         String message = new String(body, "UTF-8");

         try {
             // 处理消息
             System.out.println("Received message: " + message);

             // 显式 ack 消息
             channel.basicAck(envelope.getDeliveryTag(), false); // 第二个参数表示是否批量处理

         } catch (Exception ex) {
             // 处理消息时发生异常,拒绝消息并重新将其放回队列中
             channel.basicNack(envelope.getDeliveryTag(), false, true);
         }
     }
 };

 // 开始消费消息,使用手动ack
 boolean autoAck = false;
 channel.basicConsume(QUEUE_NAME, autoAck, consumer);

PS:需要注意的是,手动ack可能带来重复消费的问题,比如消息处理成功后,在执行channel.basicAck时宕机,导致RabbitMQ服务器没收到消息接收确认的信号,超时后会认为该消息未被接收

5. 死信队列

在某些情况下(如手动ACK),如消费者在暂时无法处理该消息,RabbitMQ 可能会将消息重新放回队列,但大量的重新放回会导致消息堆积,也是不可取的。

// 如下,消费者可以向rabbitMQ发送nack的消息,且设置requeue参数为false
 void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

为了避免这种情况,RabbitMQ 提供了死信队列的功能。当消息因为某些原因不能被消费时,RabbitMQ 将消息放入死信队列而不是重新放回队列,防止消息丢失
在这里插入图片描述

四、防重复传递

上面一节,我们为rabbitMQ在消息传递过程中,各个节点都有防消息丢失的配置。这一节,我们来说rabbitMQ为了防止一条消息重复传递而做的努力

1. 消息确认机制

上面,我们说了发布确认和接收确认。其实,不管是发布和接收,这都属于消息确认机制的一种,而消息确认机制是AMQP协议所规定的发布确认是为了防止丢失消息,接收确认则是为了防止重复消费,当消费者成功接收到消息并完成处理后,发送确认通知给 RabbitMQ,RabbitMQ 才会将该消息标记为已消费,防止重复传递

2. 幂等性校验(需代码实现)

在消息生产者发送消息之前,消息可以被设置上全局唯一uuid,而消费者在消费前,则会判断该uuid是否已经消费过。

// 生产者发送消息之前,将消息标记为idempotent
// 通过设置 messageId 属性为一个唯一值,即可标记该消息为幂等消息
String messageId = UUID.randomUUID().toString();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .messageId(messageId)
        .build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, message.getBytes());

// 消费者在处理消息之前,检查该消息是否已经被消费过
// 如果该消息已经被消费过,则直接确认消息
String messageId = properties.getMessageId();
if (processedIds.contains(messageId)) {
    channel.basicAck(envelope.getDeliveryTag(), false);
    return;
}
// 处理消息,并将 messageId 加入已处理集合
// ...
processedIds.add(messageId);

以上代码仅展示原理,实际上分布式高并发的情况下,uuid应该交由专门的服务器用雪花算法等方式去产生全局唯一的uuid。同样消费者处的processedIds也会进行远端存储

五、不可靠场景的对策

现在,让我们回头来看看不可靠场景下,rabbitMQ和我们开发者能用什么对策解决

  1. 消息漏发送:生产者在发送消息时,如果不观察RabbitMQ服务器的确认消息,可能导致有些消息在网络中丢失而不自知
  2. 消息重复发送:如果生产者在发送消息时,由于网络抖动或者其他原因,生产者无法从RabbitMQ收到消息确认,此时生产者会重发同样一条消息,从而导致消息重复
  3. 消息未储存:rabbitMQ服务器宕机,导致已经在rabbit服务器内的消息直接丢失
  4. 消费者重复消费:如果消费者不记得曾经消费过的消息,主动拉取或被推送了旧的消息,导致重复消费,
场景场景解释解决对策
消息漏发送生产者在发送消息时,如果不观察RabbitMQ服务器的确认消息,可能导致有些消息在网络中丢失而不自知发布确认、死信队列
消息重复发送如果生产者在发送消息时,由于网络抖动或者其他原因,生产者无法从RabbitMQ收到消息确认,此时生产者会重发同样一条消息,从而导致消息重复无策略
消息未储存rabbitMQ服务器宕机,导致已经在rabbit服务器内的消息直接丢失队列、消息持久化
消费者重复消费如果消费者和MQ都不记得曾经消费过的消息,主动拉取或推送了旧的消息,导致重复消费接受确认、幂等性校验(代码实现)

六、总结

RabbitMQ 能保证消息可靠性吗?答案是绝大部分情况可靠,但仅靠其自身机制无法做到100%。比如对于没有收到发布确认信息,导致消息生产者重复传递这种场景就并没有好的办法,只能通过开发者额额外代码去解决,比如发消息带全局唯一id,然后由消费者去做幂等性校验。而针对更极端的场景,如RabbitMQ硬盘故障导致消息丢失,就得依托镜像部署等手段去处理了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/690029.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vector - CAPL - CAPL入门 - 01

前面已经介绍了很多CAPL相关的函数极其应用&#xff0c;今天CAPL能够完成的功能来介绍在车载网络测试中都能够帮助测试工程师完成哪些工作&#xff1f;让我们对它有一个最基础的认识。 CAPL在总线中的应用 > 分析特定消息或特定数据 > 分析数据流量 > 创建和修改工具…

智慧班牌系统源码,相关技术:springboot,elmentui ,Quartz,jpa,jwt

电子班牌系统的主要功能包括&#xff1a;班级管理、学生信息管理、教师管理、课程管理、作业管理、考试管理、公告管理、评价管理、学校消息发布等。在班级管理方面&#xff0c;该系统可以实现教师对班级的整体管理以及学生个人信息的管理&#xff0c;包括个人信息、考试成绩、…

【Java】Java核心 72:XML (上)

文章目录 1 XML概述什么是XMLXML作用 2 编写第1个XML文件需求效果步骤 3 XML的组成&#xff1a;声明和元素XML组成文档声明元素&#xff08;标签、标记&#xff09; 4 XML的组成&#xff1a;属性、注释和转义字符属性的语法注释转义字符[实体字符]小结 1 XML概述 什么是XML 英…

rabbitmq设置允许外部访问

rabbitmq默认端口为15672,用户名和密码都为guest,是不允许外部访问的. 允许外部访问设置需要操作两步: 第一步:添加其它用户,guest只能用于本机 第二步:Virtual Host允许添加的用户访问,点击下图红色部分. spring配置 spring:rabbitmq:host: 192.168.101.57port: 5672username…

idea中有个目录不显示,磁盘中是有的

java项目src下有个目录data不显示 通过打开D盘看目录是有的&#xff0c;运行项目的时候报错&#xff0c;找不到目录下的文件。 解决方案&#xff1a; idea -> file -> seetings -> EDitor -> file types 打开页面后右侧显示有ignore files and folders 查看这里面有…

【Visual Studio】关于rc文件预处理器宏

问题 VS工程调试遇到一个问题&#xff1a;明明在 项目\属性&#xff0c;C/C\预处理器 页面定义了宏&#xff0c;为什么rc编译时没有影响&#xff1f; 百度后发现&#xff0c;和下方链接中问题很相似。 https://bbs.csdn.net/topics/50485796https://bbs.csdn.net/topics/50…

【运维】查询数据库每张表的数据及索引占用大小

【SQL】查询数据库每张表的数据及索引占用大小 SELECTa.*,CONCAT( a.总大小 / 1024000000, G ) 总大小G FROM(SELECTTABLE_SCHEMA,TABLE_NAME,sum( DATA_LENGTH ) 数据大小,sum( INDEX_LENGTH ) 索引大小,( sum( DATA_LENGTH ) sum( INDEX_LENGTH ) ) 总大小FROMinformation_s…

C# [unity]求顶点数量不等的两条曲线的中线

好久没写了.最近在尝试重写lgsvl导入地图数据的方式,地图同学提供的opendrive车道线计算不准,所以直接让他们导出经纬度的高精地图json数据,但是这种数据只有车道边界线,没有车道中心线, 基于只是想小改而非大改的前提下,还是要算出车道中心线.搞个小demo传上来,代码写的很拙劣…

宝塔定时任务实现磁盘使用率超阀值后自动发送邮件

服务器磁盘使用空间不足会产生各种不可预知的灾难&#xff0c;服务器上的应用几乎全部不能用&#xff0c;如果没有遇到过磁盘占满的问题&#xff0c;可能很难发现它。 步骤 安装邮件发送工具sendEmail磁盘检测并发送邮件shell脚本宝塔配置计划任务 安装邮件发送工具sendEmail …

【ROS】TF2坐标转换及实战示例

Halo&#xff0c;这里是Ppeua。平时主要更新C&#xff0c;数据结构算法…感兴趣就关注我吧&#xff01;你定不会失望。 文章目录 0.ROS中的坐标转换消息包0.1 geometry_msgs/TransformStamped0.2 geometry_msgs/PointStamped1.静态坐标转换1.1导入所需功能包1.2发布方实现1.3 …

多元分类预测 | Matlab粒子群算法(PSO)优化极限学习机(ELM)的分类预测,多特征输入模型。PSO-ELM分类预测模型

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 多元分类预测 | Matlab粒子群算法(PSO)优化极限学习机(ELM)的分类预测,多特征输入模型。PSO-ELM分类预测模型 多特征输入单输出的二分类及多分类模型。程序内注释详细,直接替换数据就可以用。程序语言为matlab,…

DALL-E2原理解读——大模型论文阅读笔记五

论文&#xff1a;https://cdn.openai.com/papers/dall-e-2.pdf 项目&#xff1a;https://openai.com/dall-e-2 一. 主要思想 利用CLIP提取的文本特征&#xff0c;级联式的生成图片。第一阶段通过prior将文本特征与图像特征进行对齐&#xff0c;第二阶段用扩散模型将视觉特征转…

简单demo演示Tomcat中Servlet

挺好玩的,有利于初学对容器和servlet接口规范的理解 具体代码 package org.apache;import javax.servlet.Servlet; import java.io.FileReader; import java.io.IOException; import java.util.Properties; import java.util.ResourceBundle; import java.util.Scanner;/*** a…

一文了解HTTP协议

文章目录 前言概念协议传输超文本 HTTP 协议的格式HTTP 请求HTTP 响应 总结 前言 在这之前&#xff0c;可以看看我之前的文章&#xff0c;也是关于协议的。 TCP/IP 协议详解 UDP协议详解 我们在打开一个网页的时候通常都会注意到网址的前面有一个统一的标识http://&#xf…

智慧校园电子班牌系统源码

电子班牌系统的主要功能包括&#xff1a;班级管理、学生信息管理、教师管理、课程管理、作业管理、考试管理、公告管理、评价管理、学校消息发布等。在班级管理方面&#xff0c;该系统可以实现教师对班级的整体管理以及学生个人信息的管理&#xff0c;包括个人信息、考试成绩、…

Long型参数传到前端精度丢失,后两位变为00,导致传值错误,解决方案

问题&#xff1a; 后端id字段为Long型&#xff0c;起初采用自增主键&#xff0c;没有问题&#xff1b;由于业务需要改为雪花id&#xff0c;后端可正常运行&#xff0c;传递到前端精度丢失&#xff0c;后两位变为00。 解决方案&#xff1a; 后端将属性转为字符串传递&#xff0…

Spring学习笔记---上篇

文章目录 1、Spring1.1、简介1.2、优点1.3、Spring的组成1.4、拓展 2、IOC理论推导3、IOC的本质3.1、IOC概念3.2、IoC是Spring框架的核心内容 3、HelloSpring3.1、实现3.2、思考 4、IOC创建对象的方式5、Spring配置5.1、别名&#xff08;alias&#xff09;5.2、Bean的配置5.3、…

单图换脸roop源码与环境配置

前言 1.roop是新开源了一个单图就可以进行视频换脸的项目&#xff0c;只需要一张所需面部的图像。不需要数据集&#xff0c;不需要训练。 2.大概的测试了一下&#xff0c;正脸换脸效果还不错&#xff0c;融合也比较自然。但如果人脸比较大&#xff0c;最终换出的效果可能会有…

source-map定位生产问题

source-map 定位源码错误位置 需要安装source-map库webpack配置需要配上devtool: “hidden-source-map”&#xff0c;devtool详细配置看这里devtool配置配置完webpack打包后&#xff0c;可以看到打包出来的.js.map文件 将生产包产生错误的栈赋值给stack即可&#xff0c;即设置…

前端——原生HTML猫猫max桌宠(附源码)

一、前言 看见了max大佬和狗头人大佬做的一个桌宠&#xff0c;于是就像用web简单实现一下 二、代码包 https://wwwf.lanzout.com/iWfER0ze0cqd密码:fg88 三、简单效果 简单用了随机动作&#xff08;可以进行权重设置&#xff09; 四、踩坑情况 如果不是主循环loop里&#xff0…