RabbitMQ---消息确认和持久化

news2025/1/18 14:36:18

(一)消息确认

1.概念

生产者发送消息后,到达消费端会有以下情况:

1.消息处理成功

2.消息处理异常

   如果RabbitMQ把消息发送给消费者后就把消息删除,那么就可能会导致,消息处理异常想要再获取这条消息的时候,造成消息丢失,但如果消息处理成功,不需要这条消息了,就可以进行删除。此时就不会有问题

 所以如何保证消费者成功接收并正确处理?我们RabbitMQ就给我们提供了消息确认机制

 消费者在订阅队列的时候,我们可以指定autoAck参数,根据这个参数,我们可以把确认机制大致分为两种(Spring boot有三种,但是本质也是这两种)

  自动确认:当autoAck为true时,RabbitMQ只要发送了这个消息,就会从内存中删除,不会管消费者是否收到了这些消息,所以自动确认的可靠性不高

  手动确认:当autoAck为false时,RabbitMQ会等待消费者调用Basic.Ack,回复确认信号后,RabbitMQ才会从队列中删除消息,所以手动确认的可靠性是比较高的

当我们设置为手动确认后,队列中的消息就分为了两个部分:

1.等待投递给消费者的消息

2.已经投递给消费者但是没有收到确认信号的消息

也就是Ready和Unacked

如果RabbitMQ一致没有收到消费者的确认信号,并且消息对应的消费者断开连接,RabbitMQ就会安排消息重新入队列,等待投递给下一个消费者

 

 

 

2.Spring Boot的三种策略和代码演示

Springboot给我们提供了三种策略(本质上还是自动确认和手动确认)

1)AcknowledgeMode.NONE

 这种模式下,就是标准的自动确认,消息一旦投递给消费者,不管消费者是否正确处理了消息,RabbitMQ就会自动确认消息,并且从队列中移除,所以消息是很有可能丢失的

 2)AcknowledgeMode.AUTO(默认)

这种模式下,消息成功处理时会自动确认消息,如果消息过程中抛出了异常就不会确认消息

3)AckniwledgeMode.MANUAL

这种模式下,就是标准的手动确认模式,消费者必须在成功处理消息后调用basicAck方法来确认消息,如果消息没有被确认,RabbitMQ就会认为消息没有被成功处理,会重新投递该消息,这种模式会提高消息的可靠性,因为消息不会丢失,而是重新入队

代码演示

配置文件代码

spring:
  rabbitmq:
    addresses: amqp://student:student@62.234.46.219:5672/test
    listener:
      simple:
        acknowledge-mode: NONE

声明交换机和队列代码

@Component
public class Config {
    @Bean("ackExchange")
    public Exchange ackExchange(){
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).durable(true).build();
    }
    @Bean("ackQueue")
    public Queue ackQueue(){
        return QueueBuilder.durable(Constants.ACK_QUEUE).build();
    }
    @Bean
    public Binding ackBind(@Qualifier("ackExchange") Exchange ackExchange,@Qualifier("ackQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("ack").noargs();
    }
}

生产者代码


@RequestMapping("producer")
@RestController
public class AckProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("ack")
    public String ackPro(){
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack","ack test");
        return "发送成功";
    }
}

然后我们发送一条消息看现象(此时我们还没写消费者代码,所以没有自动确认) 

消费者代码

@Component
public class AckConsumer {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void ListenerQueue(Message message, Channel channel){
        System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "
                +message.getMessageProperties().getDeliveryTag());
//        int num=3/0;     //模拟失败
        System.out.println("处理完成");
    }
}

此时我们没有出错,我们看能否正确接收消息,已经队列中消息是否存在 

 

我们发现能够准确的接收消息

如果我们此时抛出异常呢? 

 

 

  我们发现抛出了异常后,我们后端并没有接收到消息,但是队列的消息也消失了,所以就会导致我们的消息不见了,这也就是自动确认的弊端 

此时我们把确认策略改成AUTO

我们这里只需要改配置文件即可

截图体现不出来,但是结果是一直在循环,因为我们如果设置确认策略为AUTO,在运行正常时,会自动确认,并且从队列中删除,如果抛出异常,就不会确认消息,造成循环

 

我们再来演示下正确情况 

 

此时我们再来说一下手动确认策略

首先更改配置文件

  其次和我们刚刚演示AUTO和NONE时,代码是没有改动的,但是手动确认,因为要我们手动做一些处理,所以代码也要有一定的改变 

  那我们先来讲一下手动确认方法

手动确认方法:

1.肯定确认

RabbitMQ已经知道该消息并且成功的处理消息,可以丢弃了 

我们来解释下这个方法的参数

deliveryTag:消息的唯一标识,是一个单调递增的64位长整型,每个channel之间是独立的,所以在每个channel上是唯一的,当消费者确认一条消息时,需要用对应的信道上进行确认

multiple:是否批量确认,如果为false,就只确认当前值,如果为true就会确定小于等于的值

2.否定确认

有两个方法

他们两个的区别不大,唯一区别就是是否支持一次性批量拒绝消息

我们来看一些这个requeue这个参数,这个参数表示拒绝后,消息要如何处理,如果为true,RabbitMQ就会把他重新入队,发给下一个消费者,如果为false,RabbitMQ就会把他从队列中删除,不会把他发送给信道消费者

消费者代码

@Component
public class AckConsumer {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void ListenerQueue(Message message, Channel channel) throws IOException {
        long Tag=message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "
                    +Tag);
            int num=3/0;     //模拟失败
            channel.basicAck(Tag,false);
            System.out.println("处理完成");
        }catch (Exception e){
            channel.basicNack(Tag,false,true);
            channel.basicReject(Tag,true);
        }
    }
}

出现异常时的现象 

因为我们是设置出现异常重新入队给下一个消费者,所以会循环

如果消息成功接收

 

(二)持久化

  我们刚刚做的消息确认和之前说过七大工作模式中的发送确认都是为了保证消息不丢失,能够正确的接收,但是我们如何保证RabbitMQ服务停掉以后,生产者发送过的消息不丢失(存储在RabbitMQ中的消息)?

1.RabbitMQ的持久化

  RabbitMQ的持久化分为三个部分:交换机持久化,队列持久化和消息持久化

1)交换机持久化

  交换机持久化是通过在声明交换机的时候,将durable参数设置为true实现的,相当于将交换机的属性在服务器中存储,M服务关闭后再次重新,RabbitMQ会自动重新建立交换机,此时就相当于一直存在

上面是我们把交换机设置为持久化的代码

其实我们不手动设置,默认交换机也是持久的,我们来看源码 

 

 

我们发现就算不设置,他默认也是true,在我们创建的时候也是持久化的

2)队列持久化

队列持久化也是在声明队列时调用方法来实现的

如果队列不设置持久化,MQ重启时,队列就会被删掉,与此同时消息是存在队列中的,所以队列上的消息也会小时

所以我们要设置消息持久化,那么一定要设置队列持久化否则没用

这是设置为持久化的代码

这是设置为非持久化的代码

那我们还是来看一眼源码

然后我们点进这个QueueBuilder看看

 

我们发现这个就是给名字赋值用的

再来看看durable中的setDurable

我们发现是设置持久化的,因为是boolean类型,默认都是false所以非持久化就不需要做处理了

3)消息持久化

消息持久化

消息实现持久化就需要把消息的投递模式进行更改

设置了队列和消息的持久化,RabbitMQ服务器重启后,消息才会存在,其他情况下,重新MQ都武器消息都会丢失

消息持久化代码

@RequestMapping("producer")
@RestController
public class AckProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("ack")
    public String ackPro(){
        String s1="ack test";
        Message message=new Message(s1.getBytes(StandardCharsets.UTF_8),new MessageProperties());
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"ack",message);
        return "发送成功";
    }
}

注意我们这里传的不是一个单独字符串了,而是一个消息 

  RabbitMQ会默认将消息视为持久化的,除非队列被声明为非持久化,或者发送消息时被标记为非持久化 

问题:

 注:如果我们将所有消息都设置为持久化会严重影响RabbitMQ的性能,写入磁盘的速度比写入内存的速度慢很多,所以我们在选择消息持久化的时候,要做一个衡量

如果我们将交换机,队列,消息都设置了持久化后,就能保证百分百不丢失数据了吗?

答案是错误的

1.在持久化消息存入RabbitMQ时,还有一段时间,消息是在缓存上的还没有写入硬盘,因为RabbitMQ并不会为每条消息都同步存盘,因为这样会严重影响性能,所以会有缓存,等待数据一起写入硬盘。如果在这是MQ重启了,消息还没有写入硬盘,那这些消息就会丢失

2.消费者设置自动确认,然后消费者接收消息后宕机,此时消息也会丢失,此时我们只需要设置为手动确认即可

第一个问题的解决方案我们下一篇博客在讲

  

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2278507.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

map和set c++

关联式容器也是⽤来存储数据的,与序列式容器不同的是,关联式容器逻辑结构通常是⾮线性结构,两个位置有紧密的关联关系,交换⼀下,他的存储结构就被破坏了。顺序容器中的元素是按关键字来保存和访问的。关联式容器有map/…

turtle教学课程课堂学习考试在线网站

完整源码项目包获取→点击文章末尾名片!

Digital Document System (DDS)

Digital Document System (DDS) 数字档案平台 信息注入

Springer Nature——Applied Intelligence 投稿指南

投稿系统:Editorial Manager (Manuscript and Peer Review) : 使用Editorial Manager 投稿系统的期刊列表:期刊列表 期刊主页:Spring Nature 主页 投稿主页:Spring Nature Submit SystemSubmission Guidelines: Official Submissi…

如何在前端给视频进行去除绿幕并替换背景?-----Vue3!!

最近在做这个这项目奇店桶装水小程序V1.3.9安装包骑手端V2.0.1小程序前端 最近,我在进行前端开发时,遇到了一个难题“如何给前端的视频进行去除绿幕并替换背景”。这是一个“数字人项目”所需,我一直在冥思苦想。终于有了一个解决方法…

使用python+pytest+requests完成自动化接口测试(包括html报告的生成和日志记录以及层级的封装(包括调用Json文件))

一、API的选择 我们进行接口测试需要API文档和系统,我们选择JSONPlaceholder免费API,因为它是一个非常适合进行接口测试、API 测试和学习的工具。它免费、易于使用、无需认证,能够快速帮助开发者模拟常见的接口操作(增、删、改、…

UE4原生的增量Cook原理

设置Cook的步骤后&#xff0c;断点进入到如下堆栈&#xff1a; UCookOnTheFlyServer::StartCookByTheBook(const UCookOnTheFlyServer::FCookByTheBookStartupOptions &) CookOnTheFlyServer.cpp:7723 UCookCommandlet::CookByTheBook(const TArray<…> &, TArr…

C#表达式和运算符

本文我们将学习C#的两个重要知识点&#xff1a;表达式和运算符。本章内容会理论性稍微强些&#xff0c;我们会尽量多举例进行说明。建议大家边阅读边思考&#xff0c;如果还能边实践就更好了。 1. 表达式 说到表达式&#xff0c;大家可能感觉有些陌生&#xff0c;我们先来举个…

蓝桥杯 Python 组知识点容斥原理

容斥原理 这张图初中或者高中数学课应该画过 也就是通过这个简单的例子引出容斥原理的公式 这张图的面积&#xff1a;s1 s3 s7 - 2 * s2 - 2 * s4 - 2 * s6 3 * s5 通过此引导出容斥原理公式 那么下面来一起看看题目 题目描述 给定 n,m 请求出所有 n 位十进制整数中有多…

PDF文件提取开源工具调研总结

概述 PDF是一种日常工作中广泛使用的跨平台文档格式&#xff0c;常常包含丰富的内容&#xff1a;包括文本、图表、表格、公式、图像。在现代信息处理工作流中发挥了重要的作用&#xff0c;尤其是RAG项目中&#xff0c;通过将非结构化数据转化为结构化和可访问的信息&#xff0…

PDF编辑 PDF-XChange Editor Plus 免装优化版

PDF编辑器很多打工人都需要用到&#xff0c;也分享过好几款口碑不错的&#xff0c;这次这款PDF依旧值得你的期待。 PDF-XChange Editor&#xff0c;号称打开速度最快最强大的PDF编辑器/PDF阅读器&#xff0c;专注于PDF文档的编辑&#xff0c;可以自定义制作PDF电子文档&#xf…

IP属地会随着人的移动而改变吗

在当今数字化时代&#xff0c;互联网已成为人们生活中不可或缺的一部分。无论是社交媒体的日常互动&#xff0c;还是在线购物、远程工作&#xff0c;IP地址作为网络身份的重要标识&#xff0c;扮演着举足轻重的角色。随着移动互联网技术的飞速发展&#xff0c;人们越来越多地在…

我的世界-与门、或门、非门等基本门电路实现

一、红石比较器 (1) 红石比较器结构 红石比较器有前端单火把、后端双火把以及两个侧端 其中后端和侧端是输入信号,前端是输出信号 (2) 红石比较器的两种模式 比较模式 前端火把未点亮时处于比较模式 侧端>后端 → 0 当任一侧端强度大于后端强度时,输出…

【大数据】机器学习------支持向量机(SVM)

支持向量机的基本概念和数学公式&#xff1a; 1. 线性可分的支持向量机 对于线性可分的数据集 &#xff0c;其中(x_i \in R^d) 是特征向量 是类别标签&#xff0c;目标是找到一个超平面 &#xff0c;使得对于所有 的样本 &#xff0c;对于所有(y_i -1) 的样本&#xff0c;…

RDD和DataFrame两种数据结构的对比

文章目录 1. 实战概述2. RDD&#xff08;弹性分布式数据集&#xff09;2.1 RDD概念2.2 RDD特点2.3 实战操作 3. DataFrame&#xff08;数据帧&#xff09;3.1 DataFrame概念3.2 DataFrame优点3.3 实战操作 4. 实战小结 1. 实战概述 今天我们将深入探讨 Apache Spark 中的两种核…

中职网络建设与运维ansible服务

ansible服务 填写hosts指定主机范围和控制节点后创建一个脚本&#xff0c;可以利用简化脚本 1. 在linux1上安装系统自带的ansible-core,作为ansible控制节点,linux2-linux7作为ansible的受控节点 Linux1 Linux1-7 Yum install ansible-core -y Vi /etc/ansible/hosts 添加…

Kibana 控制台中提供语义、向量和混合搜索

作者&#xff1a;来自 Elastic Mark_Laney 想要将常规 Elasticsearch 查询与新的 AI 搜索功能结合起来吗&#xff1f;那么&#xff0c;你不需要连接到某个第三方的大型语言模型&#xff08;LLM&#xff09;吗&#xff1f;不。你可以使用 Elastic 的 ELSER 模型来改进现有搜索&a…

多种 Docker 镜像拉取解决方案与实践

最近国内 Docker 镜像拉取不太通畅&#xff0c;尝试了几种镜像拉取的方式&#xff0c;写篇博客分享一下。 原以为只是 docker hub 被毙了&#xff0c;上机器一操作&#xff0c;官方的下载地址也被毙了&#xff0c;真是从源头解决问题。 不过还好目前还有其他源能用&#xff0…

2025边缘计算新年沙龙成功举办,共话边缘AI未来

1月11日下午&#xff0c;北京市海淀区中关村创业大街热闹非凡&#xff0c;以“云边腾跃&#xff0c;蛇启新航”为主题的 2025边缘计算新年沙龙 盛大举行。本次活动汇聚了边缘计算、人工智能以及云边协同领域的顶尖专家、学者和从业者&#xff0c;共同探讨技术前沿与实际应用场景…

使用redis-cli命令实现redis crud操作

项目场景&#xff1a; 线上环境上redis中的key影响数据展示&#xff0c;需要删除。但环境特殊没办法通过 redis客户端工具直连。只能使用redis-cli命令来实现。 操作步骤&#xff1a; 1、确定redis安装的服务器&#xff1b; 2、找到redis的安装目录下 ##找到redis安装目…