【RabbitMQ笔记08】消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)

news2024/9/26 3:22:28

这篇文章,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。

目录

一、防止消息丢失

1.1、消息确认机制(生产者)

(1)生产者丢失消息

(2)生产者消息确认机制

1.2、消息确认机制(消费者)

(1)消费者丢失消息

(2)消费者消息确认机制

1.3、消息持久化(RabbitMQ)

(1)RabbitMQ丢失消息

(2)消息持久化机制


一、防止消息丢失

RabbitMQ消息队列,在使用的时候,可能会存在消息丢失的情况,所谓的消息丢失就是生产者发送的消息没办法被消费者正确的消费,消息队列中导致消息丢失的地方有三个,分别是:

  • 第一种情况:生产者发送的消息没有正确的发送到RabbitMQ里面,导致发送的消息丢失。
  • 第二种情况:消费者从RabbitMQ消费消息时候,消费失败,但是RabbitMQ认为消费成功,从而删除了消息。
  • 第三种情况:RabbitMQ中保存的消息还没有被消费者消费,此时RabbitMQ服务宕机,导致内存中的消息丢失。

1.1、消息确认机制(生产者)

(1)生产者丢失消息

生产者丢失消息,是指:当生产者发送消息给RabbitMQ的时候,此时消息发送失败了,并且生产者又没有重新发送这一条消息,所以这个时候,生产者这一条失败的消息就丢失了。

既然是生产者发送消息失败导致这一条消息丢失的,那么我们在处理这个丢失消息问题的时候,就可以这样做:当生产者消息发送失败之后,可以让生产者再次发送这一条消息,这里就有一个问题啦,那就是生产者怎么知道消息有没有发送成功???

RabbitMQ给我们提供了一个机制,即:发布确认机制,大致思想是:当生产者将消息发送到RabbitMQ之后,并且RabbitMQ正确接收到消息并将其放入Queue队列里面时,RabbitMQ会返回一个ACK标识给生产者,生产者接收到ACK标识就可以认为消息发送成功啦;如果消息接收失败,RabbitMQ会返回一个NACK标识,表示接收失败。

(2)生产者消息确认机制

生产者消息确认机制,上一篇文章已经介绍了(【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式),这里就不再重复。

1.2、消息确认机制(消费者)

(1)消费者丢失消息

如果生产者已经将消息正确的发送到RabbitMQ里面了,消费者从Queue队列里面获取消息消费时候,如果消费失败,那么此时就会导致这一条消息丢失,这是因为,默认情况下,RabbitMQ将消息分发给消费者之后,消费者接收到消息时候,就会返回一个ACK标识给消息队列RabbitMQ,此时RabbitMQ就会将这一条消息从Queue队列里面删除,但是这种情况下,消费者是否正确将这条消息消费了,RabbitMQ是不知道的,所以这就有可能导致丢失。

如何解决消费者丢失消息???

  • 既然丢失消息是因为消费者消费失败,并且RabbitMQ把消息删除了,那么我们就可以开启手动确认的方式来告诉RabbitMQ,消费者是否正确的消费消息,是否可以将消息从Queue队列里面删除了。

(2)消费者消息确认机制

  • 消费者进行消息确认,需要关闭自动确认,将【basicConsume()】方法的第二个参数设置为【false】。
  • 消息成功消费之后,主动调用【basicAck()】方法,返回ACK标识给RabbitMQ。
package com.rabbitmq.demo.dropmsg;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @version 1.0.0
 * @Date: 2023/2/25 16:30
 * @Copyright (C) ZhuYouBin
 * @Description: 消息消费者
 */
public class Consumer {
    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接的 RabbitMQ 服务地址
        factory.setHost("127.0.0.1"); // 默认就是本机
        factory.setPort(5672); // 默认就是 5672 端口
        // 3、获取连接
        Connection connection = null; // 连接
        Channel channel = null; // 通道
        try {
            connection = factory.newConnection();
            // 4、获取通道
            channel = connection.createChannel();
            // 5、声明 Exchange,如果不存在,则会创建
            String exchangeName = "exchange_dropmsg_2023";
            channel.exchangeDeclare(exchangeName, "direct");
            // 6、指定需要操作的消息队列,如果队列不存在,则会创建
            String queueName = "queue_dropmsg_2023";
            channel.queueDeclare(queueName, false, false, false, null);
            // 7、绑定 Exchange 和 Queue, 接收 routingKey = "info" 的消息
            channel.queueBind(queueName, exchangeName, "key_2023");
            // 8、消费消息
            Channel finalChannel = channel;
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    // 接收消息
                    System.out.println("这是接收的消息:" + new String(delivery.getBody()));
                    // TODO 消费者正确消费消息之后,主动返回 ACK 标识
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                }
            };
            // TODO 这第二个参数修改为 false,表示消费者需要手动发送 ACK 标识给 RabbitMQ(默认是true)
            channel.basicConsume(queueName, false, callback, i->{});
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1.3、消息持久化(RabbitMQ)

(1)RabbitMQ丢失消息

上面介绍了两种丢失消息的情况,分别是生产者和消费者丢失消息,还有一种丢失消息的情况,那就是RabbitMQ消息队列将消息丢失了。假设,现在存在这一种情况,生产者已经正确将消息发送到RabbitMQ里面,正准备将消息发送给消费者的时候,此时RabbitMQ服务宕机了,导致RabbitMQ中的消息丢失了(默认情况下,RabbitMQ是将消息保存在内存中的),由于内存中的数据断电即失,所以这就导致消息丢失情况。

如何解决RabbitMQ出现的消息丢失问题呢???

  • 既然RabbitMQ是将消息保存在内存中的,那么为了避免消息丢失,可以将内存中的消息保存到磁盘文件里面,这样即使RabbitMQ宕机了,重新启动的时候也可以从磁盘文件里面读取消息到内存里面。

(2)消息持久化机制

  • 在调用【queueDeclare()】方法,创建Queue队列的时候,设置第二个参数等于【true】,表示消息允许持久化。
  • 生产者调用【basicPublish()】方法发送消息的时候,设置消息属性等于【MessageProperties.PERSISTENT_TEXT_PLAIN】,表示文本持久化。
// 第二个参数设置为true,表示开启持久化消息
channel.queueDeclare("Queue队列名称", true, false, false, null);

// 生产者发送消息时候,设置消息属性是文本持久化
channel.basicPublish("Exchange交换机名称", "Queue队列名称", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

到此,RabbitMQ消息队列防止消息丢失的三种方式介绍完啦。

综上,这篇文章结束了,主要介绍消息队列RabbitMQ之防止消息丢失的三种方式(生产者消息确认、消费者消息确认、消息持久化)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/376252.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

字节跳动软件测试岗4轮面经(已拿34K+ offer)...

没有绝对的天才,只有持续不断的付出。对于我们每一个平凡人来说,改变命运只能依靠努力幸运,但如果你不够幸运,那就只能拉高努力的占比。 2021年10月,我有幸成为了字节跳动的一名测试工程师,从外包辞职了历…

一文读懂自动驾驶运行设计域ODD

/ 导读 /在自动驾驶技术发展如此迅速的今天,很多量产车上已经配备了多种的辅助驾驶功能,例如自适应巡航ACC、紧急制动AEB、车道居中保持LKA等等,很多的车主也都体验到了技术带给驾驶的改变。另一方面,由于现在的自动驾驶技术还处于…

量化选股——基于动量因子的行业风格轮动策略(第2部分—策略回测)

文章目录1. 交易策略2. Backtrader回测程序3. 回测效果3.1 2020年1月1日 - 2021年1月1日3.2 2021年1月1日 — 2022年1月1日3.3 2022年1月1日 — 2023年1月1日动量因子的概述与测算,阿隆指标测算请参考:https://blog.csdn.net/weixin_35757704/article/de…

react源码解析1.开篇介绍和面试题

怎样学习react源码 作为前端最常用的js库之一,熟悉react源码成了高级或资深前端工程师必备的能力,如果你不想停留在api的使用层面或者想在前端技能的深度上有所突破,那熟悉react源码将是你进步的很好的方式。 react的纯粹体现在它的api上&a…

【神经网络】LSTM

1.什么是LSTM 长短期记忆(Long short-term memory, LSTM)是一种特殊的RNN,主要是为了解决长序列训练过程中的梯度消失和梯度爆炸问题。简单来说,相比普通的RNN,LSTM能够在更长的序列中有更好的表现。 LSTM区别于RNN地方…

Java查漏补缺(09)异常概述、Java异常体系、常见的错误和异常、异常的处理、手动抛出异常对象:throw、自定义异常

Java查漏补缺(09)异常概述、Java异常体系、常见的错误和异常、异常的处理、手动抛出异常对象:throw、自定义异常本章专题与脉络1. 异常概述1.1 什么是生活的异常1.2 什么是程序的异常1.3 异常的抛出机制1.4 如何对待异常2. Java异常体系2.1 T…

【JAVA】xxl-job服务搭建

xxl-job服务搭建 1.下载xxl-job项目 https://github.com/xuxueli/xxl-job 2.数据库表创建 3.修改配置 注意:这是两个项目,一个是xxl-job前台,一个是xxl-job执行器,找到这两个项目得配置文件,修改配置。 配置文件地址…

day54【代码随想录】二刷数组

文章目录前言一、二分查找(力扣724)二、移除元素(力扣27)【双指针】三、有序数组的平方(力扣977)【双指针】四、合并两个有序数组(力扣88)五、长度最小的子数组(力扣209&…

前端学习第二阶段-第3章 Flex 伸缩布局

3-1 移动端基础知识 01-移动端基础 02-视口 03-meta视口标签 04-物理像素与物理像素比 05-二倍图 06-背景缩放background-size 07-背景二倍图以及多倍图切图 08-移动端开发选择 09-移动端技术解决方案 10-移动端特殊样式 11-移动端技术选型 12-流式布局 3-2 移动端电商首页制作…

Python基础—while循环

(1)while循环&#xff1a; 语法格式&#xff1a; while 条件&#xff1a;   执行语句1……   执行语句2…… 适用条件&#xff1a;无限循环 死循环 while True:print(条件是真的&#xff01;)代码实例&#xff1a; i 0 # 创建一个计数的变量 while i < 5: # Truepr…

感知趋势,洞察发展:2023(第十届)趋势与预测大会成功举办

2023年2月23日&#xff0c;运联年会&#xff1a;2023&#xff08;第十届&#xff09;趋势与预测大会在深圳机场凯悦酒店成功闭幕。自2014年开始&#xff0c;“运联年会&#xff1a;趋势与预测”已经连续举办九届。这场大会&#xff0c;既是一次行业性的“年终总结”&#xff0c…

【Java开发】JUC基础 01:进程、线程、多线程

1 进程与线程1.1 进程开发写的代码称为程序&#xff0c;那么我们将程序运行起来&#xff0c;我们称之为进程&#xff1b;进程就是申请一块内存空间&#xff0c;将数据放到内存空间中去&#xff0c;是系统进行资源分配和调度的基本单位。&#x1f4cc; 程序与进程的区别程序是数…

QML Item

在QML中所有的可视项目都继承自Item&#xff0c;虽然Item本身没有可视化的外观&#xff0c;但它定义了可视化项目的所有属性。 Item可以作为容器使用&#xff1a; Item{Rectangle{id:retc}Rectangle{id:retc1}Rectangle{id:retc2}Rectangle{id:retc3}} item拥有children属性…

MyBatis学习笔记(七) —— 特殊SQL的执行

7、特殊SQL的执行 7.1、模糊查询 模糊查询的三种方式&#xff1a; 方式1&#xff1a;select * from t_user where username like ‘%${mohu}%’ 方式2&#xff1a;select * from t_user where username like concat(‘%’,#{mohu},‘%’) 方式3&#xff1a;select * from t_u…

DolphinScheduler跨版本升级1.3.8至3.0.1

DolphinScheduler跨版本升级1.3.8至3.0.1Refer背景基础环境依赖版本升级修改pom.xml问题解决MYSQL升级1.文件替换2.修改表结构t_ds_process_definitiont_ds_alertt_ds_process_instance3.时间参数修改4.数据库升级DOLPHIN安装zookeeper集群创建用户dolphinscheduler_env.shinst…

指针变量作为函数参数详解,形参和实参之间的值传递如何传递?如何改变指针变量所指向的变量?

函数的参数不仅可以是整型&#xff0c;浮点型&#xff0c;字符型等数据&#xff0c;还可以是指针类型&#xff1b;它的作用是将一个变量的地址传送到另一个函数中。 关于地址&#xff0c;指针&#xff0c;指针变量可以参考我的上一篇文章&#xff1a; 地址&#xff0c;指针&…

线程的基本方法

线程等待&#xff1a;wait方法 调用wait方法的线程会进入WAITING状态&#xff0c;只有等到其他线程的通知或程序被中断才会返回。调用wait方法后会释放对象的锁&#xff0c;因此 wait方法一般被用于同步方法或同步代码块中 。 线程睡眠&#xff1a;sleep方法 调用sleep方法会导…

Spring Boot 版本升级2.2.11.RELEASE至2.7.4

2.2.11.RELEASE > 2.7.4项目更新spring-boot-starter-parent 主依赖&#xff0c;导致项目跑不起了日志也没有输出有用信息&#xff0c;自己查看源码调试启动入口打断点&#xff0c;一步步进入方法定位项目停止代码我的项目执行到SpringApplication.class 的152行代码会停止项…

华为HCIE学习之Openstack Glance组件(glance对接swift)

文章目录一、Glance的结构二、服务部署流程三、将glance存储在swift中1、默认使用swift来存储2、指定可以存在swift中3、swift版本4、keystone的endpoint地址&#xff08;当glance去找swift时通过keystone去找&#xff09;5、租户名:用户名&#xff0c;用户必须拥有admin角色6、…

【C语言】自定义类型:结构体、枚举、联合

目录 1.结构体 1.1结构体类型 1.2结构体的自引用 1.3结构体的初始化 1.4结构体内存对齐 //对齐 //offsetof //修改默认对齐数 1.5结构体传参 2.位段 2.1位段的内存开辟 2.2位段的内存分配 3.枚举 4.联合&#xff08;共用体&#xff09; //判断大小端 1.结构体…