RabbitMQ(六)消息的持久化

news2025/1/10 23:48:51

目录

    • 一、简介
      • 1.1 定义
      • 1.2 消息丢失的场景
    • 二、交换机的持久化
      • 方式一:直接 new
      • 方式二:channel.exchangeDeclare()
      • 方式三:ExchangeBuilder【推荐】
    • 三、队列的持久化
      • 方式一:直接 new
      • 方式二:channel.queueDeclare()
      • 方式三:QueueBuilder【推荐】
    • 四、消息的持久化
      • 方式一:channel.basicPublish()
      • 方式二:rabbitTemplate.convertAndSend()【推荐】
    • 五、持久化问题

在这里插入图片描述

一、简介

1.1 定义

  • 持久化: 是为了提高 RabbitMQ 消息的可靠性,防止在异常情况(重启宕机)下数据的丢失。
  • RabbitMQ 持久化分为三部分:交换机的持久化队列的持久化消息的持久化

1.2 消息丢失的场景

出现消息丢失的场景有:

  • 生产者发送消息丢失:发送消息到 RabbitMQ Server 异常。 可能因为网络问题造成 RabbitMQ 服务端无法收到消息。——解决方案:ConfirmCallback
  • 生产者发送消息丢失:消息无法路由到指定队列。 可能由于代码层面或配置层面错误导致消息路由到指定队列失败。——解决方案:ReturnCallback
  • RabbitMQ Server 存储消息丢失:消息未完全持久化到磁盘。 可能因为 RabbitMQ 宕机导致消息未完全持久化,或队列丢失从而导致消息丢失等持久化问题。——解决方案:集群部署,实现高可用
  • 消费者消费消息丢失:消费者消费消息异常。 可能在消费者接收消息后,还没来得及消费消息,消费者就发生宕机、故障等问题。——解决方案:消费端手动确认消息

二、交换机的持久化

方式一:直接 new

直接实例化对应的 Exchange 实现类即可,默认:持久化的、非自动删除的

@Bean
public DirectExchange directExchange() {
    return new DirectExchange(directExchange);
}

Exchange 源码:

package org.springframework.amqp.core;

public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    ...
    public AbstractExchange(String name) {
        // 默认是持久化的、非自动删除的。
        this(name, true, false);
    }
    ...
}

方式二:channel.exchangeDeclare()

在 RabbitMQ 的原生 API 中,例如:Java 客户端 API,声明持久化交换机时,需要将 durable 参数设置为 true

// 声明交换机:(交换机名称,交换机类型,持久化)
channel.exchangeDeclare("myExchange", "direct", true);

方式三:ExchangeBuilder【推荐】

在 Spring AMQP 中,我们可以使用 ExchangeBuilder 来创建持久化的交换机:

import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;

Exchange exchange = ExchangeBuilder.directExchange("myExchange")
    							.durable(true) // 交换机持久化
        						.build();

三、队列的持久化

方式一:直接 new

直接实例化 Queue 类即可,默认:持久化的、非独占的、非自动删除的

@Bean
public Queue myQueue() {
    return new Queue("myQueue");
}

Queue 源码:

package org.springframework.amqp.core;

public Queue(String name) {
    // 默认是持久化的、非独占的、非自动删除的。
   this(name, true, false, false);
}

方式二:channel.queueDeclare()

在原生 RabbitMQ API 中,例如 Java 客户端 API,声明持久化队列时,需要将 durable 参数设置为 true

// 声明队列:(队列名称,持久化,非独占,非自动删除,可选队列参数)
channel.queueDeclare("myQueue", true, false, false, null);

方式三:QueueBuilder【推荐】

在 Spring AMQP 中,可以使用 QueueBuilder 来创建持久化的队列:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;

Queue queue = QueueBuilder.durable("myQueue").build();

四、消息的持久化

即使队列时持久化的,发送到队列中的消息默认情况下 可能仍然是非持久化的。要使消息持久化,需要在发送消息时设置消息属性为持久化。

方式一:channel.basicPublish()

在原生 RabbitMQ API 中,例如 Java 客户端 API,声明持久化消息时,需要将 deliveryMode 参数设置为 2

import com.rabbitmq.client.AMQP;

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .deliveryMode(2) // 这里2对应MessageDeliveryMode.PERSISTENT
    .build();

// 参数:(交换机,路由键,消息的其他参数,消息体)
channel.basicPublish("myExchange", "myRoutingKey", props. messageBytes);

或者,我们可以直接使用 MessageProperties.PERSISTENT_TEXT_PLAIN 来进行指定消息的持久化:

import com.rabbitmq.client.MessageProperties;

// 参数:(交换机,路由键,消息的其他参数,消息体)
channel.basicPublish("myExchange", "myRoutingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

方式二:rabbitTemplate.convertAndSend()【推荐】

在 Spring AMQP 中,可以使用 rabbitTemplate.convertAndSend() 来创建持久化的消息:

rabbitTemplate.convertAndSend("myQueue", body, message -> {
    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    return message;
});

通过上述步骤,消息会被持久化存储,只要队列也被正确配置为持久化,即使 RabbitMQ 服务器重启,消息也将被保留下来。

补充:

然而,即使队列和消息都是持久化的,也不能完全保证消息的 100% 不丢失。例如:在消息尚未被刷写到磁盘时,RabbitMQ 服务器突然崩溃,这种情况下的消息仍然可能会丢失。此外,RabbitMQ 不保证消息的立即持久化,而是尽可能快地将消息保存到磁盘


五、持久化问题

思考:将交换机、队列、消息都设置持久化之后就能保证数据不会丢失吗?

答:当然不能,需要从多方面考虑:

  • 场景1: 如果消费者订阅队列的时候将 autoAck(自动确认)设置为 true,虽然消费者接收到了消息,但是没有来得及处理就宕机了,那消息也会丢失。

    解决方案: 将消费者的消息确认机制改为手动确认,带处理完消息之后,手动删除消息。

  • 场景2: 在 RabbitMQ 服务器,如果消息正确被发送,但是 RabbitMQ 服务器中的消息还没来得及持久化,即没有将数据写入磁盘时,如果服务器发生异常,消息也会丢失。

    解决方案: 可以 通过 RabbitMQ 集群的方式实现消息中间件的高可用

因此,还需要做好生产者和消费者的 消息确认机制 以及通过 集群 的方式来实现 RabbitMQ 的高可用。

整理完毕,完结撒花~ 🌻





参考地址:

1.RabbitMQ保证消息可靠性,https://www.cnblogs.com/auguse/articles/17712620.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1375154.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【算法Hot100系列】组合总和

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老导航 檀越剑指大厂系列:全面总结 jav…

回首24考研历程,那些无法忘却的收获

文章目录 毅力与坚持👍知识的力量🧐心态的历练🤓友谊与互助( •̀ ω •́ )y未来的启示(●◡●) 在这个充满希望与挑战的时刻,我想与你们分享一段关于2023这一年特别的经历——考研之路。这段旅程既有磨砺的痛楚,也有…

基础篇_快速入门(Java简介,安装JDK,cmd命令行运行Java文件产生乱码问题的解决方式,IDE工具,实用工具)

文章目录 一. Java 简介1. JVM2. JRE3. JDK 二. 安装 JDK1. 下载和安装2. 配置 Path3. 配置 JAVA_HOME(选讲)优化 三. 入门案例1. 第一行代码1) jshell2) 代码解读总结 3) 为何要分成对象与方法 2. 第一份源码1) 源码结构2) 编写 java 源代码3) 编译 jav…

Python基础知识:整理10 异常相关知识

1 异常的捕获 1.1 基础写法 """基本语法:try:可能发生错误的代码except:如果出现异常,将执行的代码""" try:fr open("D:/abc.txt", "r", encoding"utf-8") except:print("出现异常…

1小时收到几百份简历

最近一直在招人,一些情况我也在星球群里也说了,目前找工作的人确实很多,只要发布职位,几百号人来投简历。 因为我本身带学员,之前我说我在boss上基本上都会回,但后来我让求职者总结自己的优势点时候发现&am…

C语言发展史

前言 当前,C语言是大学广泛应用的计算机教学语言之一,除了文科类专业,大部分理工科专业都会教授C语言,但是,C语言是谁搞出来的?是怎么编译的?相信很多同学对此并不清楚,今天,我们就…

【.NET Core】可为null类型详解

【.NET Core】可为null类型详解 文章目录 【.NET Core】可为null类型详解一、概述二、可为空的值类型2.1 声明和赋值2.2 检查可为空值类型2.3 基础类型与可为空的值类型互换2.4 可为空的值类型装箱和取消装箱2.5 如何确定可为空的值类型 三、可为 null 的引用类型 一、概述 nu…

IDC机房服务器搬迁之运行了几年的服务器没关过机,今天关机下架,再上架突然起不来了,怎么快速处理?

环境 戴尔R420 服务器 1U 2台直连存储 4U CentOS 7 问题描述 IDC机房服务器搬迁之运行了几年的服务器没关过机,今天关机下架,再上架突然起不来了,怎么快速处理? 服务器上电开机就出现进入紧急模式 Welcome to emergency mode! After logging in, type “journalctl …

Windows VSCode 使用Python

一、vscode中安装python 二、下载python.exe(即vscode中需要的python解释器) 下载地址:https://www.python.org/downloads/ 三、安装第三方代码规范工具 参考网址:https://www.python.org/downloads/ 工具介绍 flake8 &#xf…

starrocks权限管理-2.3.2版本

1.新用户创建以及授权 1.创建用户(未分配角色) -- 使用明文密码创建用户,允许其从 172.25.20.1 登陆。如果172.25.20.1被%替换就是所有ip都可以访问 CREATE USER bigdata172.25.20.1 IDENTIFIED WITH mysql_native_password BY Zhengda1; 不…

Redis Zset类型

Redis Zset类型 Zset(有序集合)它是集合的一种,不仅可以保存元素,还可以为每个元素关联一个 double 类型的分数(score),Redis 正是通过分数来为集合中的元素进行从小到大的排序。在 Zset 中&am…

力扣LCR 166. 珠宝的最高价值(java 动态规划)

Problem: LCR 166. 珠宝的最高价值 文章目录 解题思路思路解题方法复杂度Code 解题思路 思路 改题目与本站64题实质上是一样的,该题目在64题的基础上将求取最小路径和改成了求取最大路径和。具体实现思路如下: 1.定义一个int类型的二维数组dp大小为给定…

跟着仙凡兄学习编译Telegram vs2022 2024.1.11编译成功

编译Telegram 本人花了两天,问官方作者终于编译成功Telegram 运行环境:win11 vs2022 参见学习视频:【telegram编译成功,编译遇到的各种问题】https://www.bilibili.com/video/BV11c411x7jm?vd_sourcedf2e51268cc7412cc3937cf3df2…

代码随想录第五十五天——判断子序列,不同的子序列

leetcode 392. 判断子序列 题目链接:判断子序列 确定dp数组及下标的含义 dp[i][j]:以下标i-1为结尾的字符串s,和以下标j-1为结尾的字符串t,相同子序列长度为dp[i][j]确定递推公式 分为两种情况:s[i - 1] 与t[j - 1]相…

CMake入门教程【高级篇】qmake转cmake

😈「CSDN主页」:传送门 😈「Bilibil首页」:传送门 😈「动动你的小手」:点赞👍收藏⭐️评论📝 文章目录 1. 概述2.qmake与cmake的差异3. qmake示例4.qmake转cmake示例5.MOC、UIC和RCC…

k8s的存储卷之静态

存储卷----数据卷 容器内的目录和宿主机的目录进行挂载 容器在系统上的生命周期是短暂的,delete,k8s用控制创建的pod,delete相当于重启,容器的状态也会回复到初始状态 一旦回到初始状态,所有的后天编辑的文件都会消…

【Linux】Linux 系统编程——tree 命令

文章目录 1. 命令概述2. 命令格式3. 常用选项4. 相关描述4.1 tree 命令安装 5. 参考示例5.1 创建树形目录5.2 使用 tree 命令查看树形目录 1. 命令概述 tree 命令用于在命令行界面以树状图形式显示目录及其子目录的内容。这个命令递归地列出所有子目录,并可选择显示…

平时执行很快的SQL语句,为什么会突然卡一下?

InnoDB在处理更新语句的时候,只做了写日志这一个磁盘操作,这个日志叫作redo log(重做日志),在更新内存写完redo log后,就返回给客户端,本次更新成功。 把内存里的数据写入磁盘的过程&#xff0…

SpringBoot中使用SpringEvent业务解耦神器实现监听发布事件同步异步执行任务

场景 SpringBoot中使用单例模式ScheduledExecutorService实现异步多线程任务(若依源码学习): SpringBoot中使用单例模式ScheduledExecutorService实现异步多线程任务(若依源码学习)-CSDN博客 设计模式-观察者模式在Java中的使用示例-环境监测系统: 设…

基于多反应堆的高并发服务器【C/C++/Reactor】(中)HttpResponse的定义和初始化 以及组织 HttpResponse 响应消息

一、HttpResponse的定义 1.定义状态码枚举 // 定义状态码枚举 enum HttpStatusCode {Unknown 0,OK 200,MovedPermanently 301,MovedTemporarily 302,BadRequest 400,NotFound 404 }; 2.HTTP 响应报文格式 这个数据块主要是分为四部分 第一部分是状态行第二部分是响应…