RabbitMQ惰性队列的工作原理、消息持久化机制、同步刷盘的概念、延迟插件的使用方法

news2025/4/16 6:31:57

惰性队列工作原理

惰性队列通过尽可能多地将消息存储到磁盘上来减少内存的使用。与传统队列相比,惰性队列不会主动将消息加载到内存中,而是尽量让消息停留在磁盘上,从而降低内存占用。尽管如此,它并不保证所有操作都是同步写入磁盘的。这意味着消息可能会先被缓存到操作系统的缓冲区中,然后由操作系统决定何时将其真正写入磁盘。

  • 优点:适合处理大量消息且对内存压力敏感的场景。
  • 缺点:由于频繁的磁盘I/O操作,性能可能不如传统队列。

同步刷盘的概念

同步刷盘意味着每次写入操作都会等待数据完全写入磁盘后才返回确认信息。虽然这种方式提供了更强的数据持久性保证,但它也显著增加了写入操作的延迟。对于RabbitMQ而言,可以通过设置消息为持久化来增加数据的安全性,但对于极端情况下的数据安全性要求,还需要结合其他策略如调整操作系统参数或使用文件系统级别的同步写入配置。

延迟插件的工作原理

RabbitMQ本身没有内置的延迟队列功能,但可以通过安装rabbitmq_delayed_message_exchange插件实现这一功能。该插件允许创建一个自定义交换机类型,该交换机能够根据消息头中的延迟时间属性来延迟消息的传递。

在Spring Boot中集成RabbitMQ惰性队列和延迟消息

1. 项目初始化

首先,确保你的Spring Boot项目中包含必要的依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>
2. 配置RabbitMQ连接

application.yml中配置RabbitMQ连接信息:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
3. 定义惰性队列

创建一个配置类来定义惰性队列:

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqConfig {

    /**
     * 定义惰性模式的队列
     * @return 返回惰性队列实例
     */
    @Bean
    public Queue lazyQueue() {
        Map<String, Object> args = new HashMap<>();
        // 设置队列为惰性模式
        args.put("x-queue-mode", "lazy");
        return new Queue("my_lazy_queue", true, false, false, args); // durable=true for queue durability
    }
}
4. 发送持久化消息

创建一个服务类用于发送消息,并确保消息是持久化的:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送一条持久化消息到惰性队列
     * @param message 要发送的消息内容
     */
    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend("my_lazy_queue", message);
        System.out.println(" [x] Sent '" + message + "'");
    }
}

确保消息持久化可以在application.yml中设置如下:

spring:
  rabbitmq:
    template:
      exchange: ''
      routing-key: 'my_lazy_queue'
      mandatory: true
    publisher-confirms: true
    publisher-returns: true
5. 接收消息

创建一个监听器来接收消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageReceiver {

    /**
     * 监听并接收来自惰性队列的消息
     * @param message 接收到的消息内容
     */
    @RabbitListener(queues = "my_lazy_queue")
    public void receiveMessage(String message) {
        System.out.println(" [x] Received '" + message + "'");
    }
}
6. 使用延迟插件发送延迟消息

首先,在RabbitMqConfig中定义延迟交换机:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMqConfig {

    /**
     * 定义延迟交换机
     * @return 返回延迟交换机实例
     */
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);
    }

    /**
     * 绑定延迟队列到延迟交换机
     * @param delayedQueue 延迟队列
     * @param delayExchange 延迟交换机
     * @return 返回绑定实例
     */
    @Bean
    public Binding binding(Queue delayedQueue, CustomExchange delayExchange) {
        return new Binding("delayed_queue", Binding.DestinationType.QUEUE, "delayed_exchange", "routing.key", Collections.emptyMap());
    }

    /**
     * 定义延迟队列
     * @return 返回延迟队列实例
     */
    @Bean
    public Queue delayedQueue() {
        return new Queue("delayed_queue");
    }
}

然后,创建一个服务类来发送延迟消息:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class DelayedMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送带有延迟的消息
     * @param message 要发送的消息内容
     * @param delayTime 延迟时间(毫秒)
     */
    public void sendDelayedMessage(String message, int delayTime) {
        MessagePostProcessor messagePostProcessor = message -> {
            message.getMessageProperties().setHeader("x-delay", delayTime);
            return message;
        };

        rabbitTemplate.convertAndSend("delayed_exchange", "routing.key", message, messagePostProcessor);
        System.out.println(" [x] Sent '" + message + "' with delay.");
    }
}

最后,创建一个监听器来接收延迟消息:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DelayedMessageReceiver {

    /**
     * 监听并接收来自延迟队列的消息
     * @param message 接收到的消息内容
     */
    @RabbitListener(queues = "delayed_queue")
    public void receiveDelayedMessage(String message) {
        System.out.println(" [x] Received delayed message '" + message + "'");
    }
}

高级特性和最佳实践

  • 发布确认机制:为了提高可靠性,可以开启发布确认机制,以确保消息确实被RabbitMQ服务器接受。

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
        System.out.println("Message acknowledged");
    } else {
        System.err.println("Message not acknowledged due to: " + cause);
    }
});
  • 预取计数(Prefetch Count):通过设置预取计数限制每个消费者同时处理的消息数量,有助于防止消费者被过多未处理的消息压垮。

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConnectionConfig {

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setChannelCacheSize(25);
        connectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(200);
        return connectionFactory;
    }
}

可以在application.yml中设置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 10

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2334925.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pyqtgraph.opengl.items.GLSurfacePlotItem.GLSurfacePlotItem 报了一个错

1. 需求是这个样子的 有一个 pyqtgraph.opengl.GLViewWidget &#xff0c;在应用启动时存在QMainWindow中&#xff0c;即父对象是QMainWindow&#xff0c;当业务需要时&#xff0c;修改它的父对象变为一个QDialog&#xff0c;可以让它从QMainWindow中弹出显示在QDialog里&#…

【C++初学】课后作业汇总复习(六) 函数模板

1、函数模板 思考&#xff1a;如果重载的函数&#xff0c;其解决问题的逻辑是一致的、函数体语句相同&#xff0c;只是处理的数据类型不同&#xff0c;那么写多个相同的函数体&#xff0c;是重复劳动&#xff0c;而且还可能因为代码的冗余造成不一致性。 解决&#xff1a;使用…

【第16届蓝桥杯C++C组】--- 数位倍数

Hello呀&#xff0c;小伙伴们&#xff0c;第16届蓝桥杯也完美结束了&#xff0c;无论大家考的如何&#xff0c;都要放平心态&#xff0c;今年我刚上大一&#xff0c;也第一次参加蓝桥杯&#xff0c;刷的算法题也只有200来道&#xff0c;但是还是考的不咋滴&#xff0c;但是拿不…

Numpy和OpenCV库匹配查询,安装OpenCV ABI错误

文章目录 地址opencv-python&#xff1a;4.x版本的对应numpyopencv-python&#xff1a;5.x版本的对应numpy方法2 ps&#xff1a;装个opencv遇到ABI错误无语了&#xff0c;翻了官网&#xff0c;github文档啥都没&#xff0c;记录下 地址 opencv-python&#xff1a;4.x版本的对应…

ubuntu18.04安装miniforge3

1.下载安装文件 略&#xff08;注&#xff1a;从同事哪里拖来的安装包&#xff09; 2.修改安装文件权限 chmod x Miniforge3-Linux-x86_64.sh 3.将它安装到指定位置 micromamba activate /home/xxx/fxp/fromDukto/miniforge3 4.激活 /home/xxx/fxp/fromDukto/miniforge3…

智能手机功耗测试

随着智能手机发展,用户体验对手机的续航功耗要求越来越高。需要对手机进行功耗测试及分解优化,将手机的性能与功耗平衡。低功耗技术推动了手机的用户体验。手机功耗测试可以采用powermonitor或者NI仪表在功耗版上进行测试与优化。作为一个多功能的智能终端,手机的功耗组成极…

使用U盘安装 ubuntu 系统

1. 准备U 盘制作镜像 1.1 下载 ubuntu iso https://ubuntu.com/download/ 这里有多个版本以供下载&#xff0c;本文选择桌面版。 1.2 下载rufus https://rufus.ie/downloads/ 1.3 以管理员身份运行 rufus 设备选择你用来制作启动项的U盘&#xff0c;不能选错了&#xff1b;点…

oracle 并行度(Parallel Degree)

在Oracle数据库中&#xff0c;并行度&#xff08;Parallel Degree&#xff09; 是用于控制并行处理任务的关键配置&#xff0c;旨在通过多进程协作加速大规模数据处 一、并行度的核心概念 并行度&#xff08;DOP, Degree of Parallelism&#xff09; 表示一个操作同时使用的并…

Redis-场景缓存+秒杀+管道+消息队列

缓存一致性 1.两次更新 先更新数据库&#xff0c;再更新缓存&#xff1b;先更新缓存&#xff0c;再更新数据库&#xff1b; 出现不一致问题场景&#xff1a; 先更新数据库&#xff0c;再更新缓存&#xff1b; 先更新缓存&#xff0c;再更新数据库&#xff1b; 两次更新的适…

系统的安全及应用

仓库做了哪些优化 仓库源换成国内源不使用root用户登录将不必要的开机启动项关闭内核的调优 系统做了哪些安全加固 禁止使用root禁止使用弱命令将常见的 远程连接端口换掉 系统安全及应用 Cpu负载高 java程序 运行异常中病毒&#xff1f; ps aux - - sort %cpu %mem Cpu …

PostgreSQL内幕探索—基础知识

PostgreSQL内幕探索—基础知识 PostgreSQL&#xff08;以下简称PG&#xff09; 起源于 1986 年加州大学伯克利分校的 ‌POSTGRES 项目‌&#xff0c;最初以对象关系模型为核心&#xff0c;支持高级数据类型和复杂查询功能‌。 1996 年更名为 PostgreSQL 并开源&#xff0c;逐…

WPS复制粘贴错误 ,文件未找到 mathpage.wll

文章目录 1.错误提示图片2.解决方案1.找到MathType.wll文件和MathType Commands 2016.dotm文件并复制2.找到wps安装地址并拷贝上述两个文件到指定目录 3.重启WPS 1.错误提示图片 2.解决方案 1.找到MathType.wll文件和MathType Commands 2016.dotm文件并复制 MathType.wll地址如…

驱动开发硬核特训 · Day 6 : 深入解析设备模型的数据流与匹配机制 —— 以 i.MX8M 与树莓派为例的实战对比

&#x1f50d; B站相应的视屏教程&#xff1a; &#x1f4cc; 内核&#xff1a;博文视频 - 从静态绑定驱动模型到现代设备模型 主题&#xff1a;深入解析设备模型的数据流与匹配机制 —— 以 i.MX8M 与树莓派为例的实战对比 在上一节中&#xff0c;我们从驱动框架的历史演进出…

【UE5 C++课程系列笔记】35——HTTP基础——HTTP客户端异步请求API接口并解析响应的JSON

目录 前言 步骤 一、 搭建异步蓝图节点框架 二、异步蓝图节点嵌入到引擎的执行流程 三、获取本地时间并异步返回 四、获取网络时间并异步返回 五、源码 前言 本文以请求网络/本地时间API为例&#xff0c;介绍如何实现HTTP异步请求。 步骤 一、 搭建异步蓝图节点框架 …

手机静态ip地址怎么获取?方法与解析‌

而在某些特定情境下&#xff0c;我们可能需要为手机设置一个静态IP地址。本文将详细介绍手机静态IP地址详解及获取方法 一、什么是静态IP地址&#xff1f; 静态IP&#xff1a;由用户手动设置的固定IP地址&#xff0c;不会因网络重启或设备重连而改变。 动态IP&#xff1a;由路…

Python 基础语法汇总

Python 语法 │ ├── 基本结构 │ ├── 语句&#xff08;Statements&#xff09; │ │ ├── 表达式语句&#xff08;如赋值、算术运算&#xff09; │ │ ├── 控制流语句&#xff08;if, for, while&#xff09; │ │ ├── 定义语句&#xff08;def…

Linux上位机开发实践(OpenCV算法硬件加速)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 图像处理里面&#xff0c;opencv基本是一个标准模块。但是由于图像处理的特点&#xff0c;如果所有的算法都是cpu来做的话&#xff0c;效率会很低。…

Spring Boot MongoDB自定义连接池配置

手打不易&#xff0c;如果转摘&#xff0c;请注明出处&#xff01; 注明原文&#xff1a;http://zhangxiaofan.blog.csdn.net/article/details/144341407 一、引言 在 Spring Boot 应用中使用 MongoDB 时&#xff0c;合理配置连接池可以显著提升数据库访问的性能和稳定性。默…

游戏引擎学习第223天

回顾 今天我们正在进行过场动画序列的制作&#xff0c;因此我想深入探讨这个部分。昨天&#xff0c;我们暂时停止了过场动画的制作&#xff0c;距离最终结局还有一些内容没有完成。今天的目标是继续完成这些内容。 我们已经制作了一个过场动画的系列&#xff0c;并把它们集中…

DeepSeek 助力 Vue3 开发:打造丝滑的日历(Calendar),日历_基础功能示例(CalendarView01_01)

前言:哈喽,大家好,今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏+关注哦 💕 目录 DeepSeek 助力 Vue3 开发:打造丝滑的日历(Calendar),日历_基础功能示例(CalendarView01_01)📚…