Rabbitmq--延迟消息

news2025/4/22 16:00:29

13.延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息

延迟任务:一定时间之后才会执行的任务

image-20250310204548904

1.死信交换机

当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false

  • 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费

  • 要投递的队列消息堆积满了,最早的消息可能成为死信

如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

image-20250310204712321

利用死信交换机的特点,可以实现发送延迟消息的功能

2.延迟消息插件(推荐使用)

1.下载并安装延迟插件

RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中

插件的下载地址:rabbitmq-delayed-message-exchange

image-20250310204809121

下载完插件后,运行以下指令,在输出信息中找到 Mounts ,再找到 RabbitMQ 的插件的安装目

sudo docker inspect rabbitmq

image-20250310204857664

然后进入 RabbitMQ 的插件的安装目录,将刚才下载的插件上传到该目录下

sudo chmod +rx -R /var/lib/docker

//接着打开/var/lib/docker/volumes/rabbitmq-plugins/_data目录的写权限(如果修改权限不生效,请切换到 root 用户执行指令)
sudo chmod 777 /var/lib/docker/volumes/rabbitmq-plugins/_data

//将刚才下载的插件上传到/var/lib/docker/volumes/rabbitmq-plugins/_data目录
//上传成功后将/var/lib/docker/volumes/rabbitmq-plugins/_data目录的权限复原
sudo chmod 755 /var/lib/docker/volumes/rabbitmq-plugins/_data

//最后进入容器内部,运行指令安装插件,安装完成后退出容器内部
sudo docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit

看到以下信息,说明插件安装成功了

image-20250310205159598

2.在 Java 代码中发送延迟消息

声明延迟交换机

@Bean
public DirectExchange delayExchange() {
    return ExchangeBuilder.directExchange("delay.direct").delayed().build();
}

声明队列和延迟交换机,并将队列和延迟交换机绑定在一起

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue"),
        exchange = @Exchange(name = "delay.direct", delayed = "true", type = ExchangeTypes.DIRECT),
        key = "delay"
))
public void listenDelayQueue(String message) {
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
    simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
    System.out.println("消费者收到了 delay.queue的消息: " + message + ",时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

编写测试方法,测试发送延迟消息

@Test
void testSendDelayMessage() {
    rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setDelay(10000); // 毫秒
            return message;
        }
    });

    SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
    simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
    System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

发送延迟消息的本质是在消息头属性中添加 x-delay 属性

image-20250310205359715

3. 延迟消息的原理和缺点
  • RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒

  • RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)

  • 定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大

4.取消超时订单

设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题

  1. 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
  2. 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源

image-20250310205533364

image-20250310205550036

5.发送延迟检测订单的消息

我们定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MultipleDelayMessage<T> {

    private T data;

    private List<Long> delayMillis;

    public MultipleDelayMessage() {

    }

    public MultipleDelayMessage(T data, Long... delayMillis) {
        this.data = data;
        this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis));
    }

    public MultipleDelayMessage(T data, List<Long> delayMillis) {
        this.data = data;
        this.delayMillis = delayMillis;
    }

    public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) {
        return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis)));
    }

    public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) {
        return new MultipleDelayMessage<>(data, delayMillis);
    }

    public boolean hasNextDelay() {
        return !delayMillis.isEmpty();
    }

    public Long removeNextDelay() {
        return delayMillis.remove(0);
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    public List<Long> getDelayMillis() {
        return delayMillis;
    }

    public void setDelayMillis(List<Long> delayMillis) {
        this.delayMillis = delayMillis;
    }

    @Override
    public String toString() {
        return "MultipleDelayMessage{" +
                "data=" + data +
                ", delayMillis=" + delayMillis +
                '}';
    }

}

我们再定义一个发送延迟消息的消息处理器,供所有服务使用

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

public class DelayMessagePostProcessor implements MessagePostProcessor {

    private final Integer delay;

    public DelayMessagePostProcessor(Integer delay) {
        this.delay = delay;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelay(delay);
        return message;
    }

}

改造后的发送延迟消息的测试方法

  • “delay.direct”:交换机的名称

  • “delay”:路由键(routing key),交换机会将消息发送到绑定到这个路由键的队列

  • “Hello, DelayQueue!”:实际要发送的消息内容

  • new DelayMessagePostProcessor(10000):消息后处理器(Message Post Processor),用于在消息发送之前对消息进行修改

@Test
void testSendDelayMessage() {
    rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000));

    SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
    simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");
    System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis()));
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2314985.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot436-基于SpringBoot的汽车票网上预订系统(源码+数据库+纯前后端分离+部署讲解等)

&#x1f495;&#x1f495;作者&#xff1a; 爱笑学姐 &#x1f495;&#x1f495;个人简介&#xff1a;十年Java&#xff0c;Python美女程序员一枚&#xff0c;精通计算机专业前后端各类框架。 &#x1f495;&#x1f495;各类成品Java毕设 。javaweb&#xff0c;ssm&#xf…

宇树ROS1开源模型在ROS2中Gazebo中仿真

以GO1为例 1. CMakelists.txt更新语法 cmake_minimum_required(VERSION 3.8) project(go1_description) if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")add_compile_options(-Wall -Wextra -Wpedantic) endif() # find dependencies find…

Web网页制作之爱家居的设计(静态网页)

一、使用的是PyCharm来敲写的代码&#xff08;布局&#xff09; 二、主要的html代码的介绍 这段代码展示了如何使用HTML和CSS创建一个结构化的网页&#xff0c;包含导航栏、新闻内容、图片展示和页脚信息。通过引入外部CSS文件&#xff0c;可以进一步美化和布局这些元素。 HTM…

Linux云计算SRE-第二十周

完成ELK综合案例里面的实验&#xff0c;搭建完整的环境 一、 1、安装nginx和filebeat&#xff0c;配置node0(10.0.0.100)&#xff0c;node1(10.0.0.110)&#xff0c;node2(10.0.0.120)&#xff0c;采用filebeat收集nignx日志。 #node0、node1、node2采用以下相同方式收集ngin…

【MATLAB例程】AOA(到达角度)法,多个目标定位算法,三维空间、锚点数量自适应(附完整代码)

给出AOA方法下的多目标定位,适用三维空间,锚点数量>3即可,可自定义目标和锚点的数量、坐标等。 文章目录 运行结果源代码代码讲解概述功能代码结构运行结果 10个锚点、4个目标的情况: 100个锚点、10个目标的情况: 修改方便,只需调节下面的两个数字即可: 源代码 …

Matlab:矩阵运算篇——矩阵数学运算

目录 1.矩阵的加法运算 实例——验证加法法则 实例——矩阵求和 实例——矩阵求差 2.矩阵的乘法运算 1.数乘运算 2.乘运算 3.点乘运算 实例——矩阵乘法运算 3.矩阵的除法运算 1.左除运算 实例——验证矩阵的除法 2.右除运算 实例——矩阵的除法 ヾ(&#xffe3;…

MinIO问题总结(持续更新)

目录 Q: 之前使用正常&#xff0c;突然使用空间为0B&#xff0c;上传文件也是0B&#xff08;部署在k8s中&#xff09;Q: 无法上传大文件参考yaml Q: 之前使用正常&#xff0c;突然使用空间为0B&#xff0c;上传文件也是0B&#xff08;部署在k8s中&#xff09; A: 1、检查pod状态…

智算新纪元,腾讯云HAI-CPU助力法律援助

高性能应用服务 1. ChatbotUI ​应用介绍 基于腾讯云 ​DeepSeek 模型的智能化对话界面&#xff0c;支持灵活集成到企业级应用或服务中&#xff0c;提供自然语言交互能力&#xff0c;适用于客服、知识检索、任务自动化等场景。 ​核心功能 ​多轮对话引擎&#xff1a;支持上下…

android 调用wps打开文档并感知保存事件

需求场景 在项目开发中会碰到需要调用WPS打开Word,Excel,Ppt等Office系列文档的情况&#xff0c;网上目前少有正式介绍如何调用相关API打开文档&#xff0c;并实现文档编辑后回传给三方应用&#xff0c;本人在逛WPS社区时发现 解锁WPS二次开发新世界&#xff1a;Android开发用…

【fnOS飞牛云NAS本地部署跨平台视频下载工具MediaGo与远程访问下载视频流程】

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

PyQt基础——简单的窗口化界面搭建以及槽函数跳转

一、代码实现 import sysfrom PyQt6.QtGui import QPixmap from PyQt6.QtWidgets import QApplication, QWidget, QPushButton, QLabel, QLineEdit, QMessageBox from PyQt6.uic import loadUi from PyQt6.QtCore import Qtclass LoginWindow(QWidget):def __init__(self):sup…

【Java--数据结构】优先级队列( PriorityQueue)

一. 优先级队列 1.1 优先级队列的概念 优先级队列是一种特殊的队列&#xff0c;它在入队时会根据元素的优先级进行排序&#xff0c;优先级最高的元素排在队列的前面&#xff0c;出队时会优先出队优先级最高的元素。 1.2 优先级队列的区别 &#xff08;1&#xff09;与普通…

【 <一> 炼丹初探:JavaWeb 的起源与基础】之 JavaWeb 项目的部署:从开发环境到生产环境

<前文回顾> 点击此处查看 合集 https://blog.csdn.net/foyodesigner/category_12907601.html?fromshareblogcolumn&sharetypeblogcolumn&sharerId12907601&sharereferPC&sharesourceFoyoDesigner&sharefromfrom_link <今日更新> 一、开发环境…

【AIGC】OpenAI 集成 Langchain 操作实战使用详解

目录 一、前言 二、前置准备 2.1 安装 Langchain必须的依赖 2.1.1 python环境 2.1.2 langchain openai 环境 2.1.3 准备一个apikey 2.1.4 langchain 核心组件 三、Langchain 各组件使用 3.1 Chat models组件 3.1.1 Invocation 使用 3.1.1.1 结果解析 3.2 提示词模板…

Xxl-Job学习笔记

目录 概述 核心架构 核心特点 应用场景 什么是任务调度 快速入门 获取源码 初始化调度数据库 基本配置 数据源datasource 邮箱email&#xff08;可选&#xff09; 会话令牌access token 启动调度中心 启动执行器 依赖 yaml基本配置 XxlJobConfig类配置 定义执…

SAIL-RK3576核心板应用方案——无人机视觉定位与地面无人设备通信控制方案

本方案以 EFISH-RK3576-SBC工控板 或 SAIL-RK3576核心板 为核心&#xff0c;结合高精度视觉定位、实时通信与智能控制技术&#xff0c;实现无人机与地面无人设备的协同作业。方案适用于物流巡检、农业植保、应急救援等场景&#xff0c;具备高精度定位、低延迟通信与强环境适应性…

CSS 入门指南(一):基本概念 选择器 常用元素属性

一、初识 CSS 1, CSS 定义 层叠样式表(Cascading Style Sheets&#xff0c;缩写为 CSS)&#xff0c;是一种 样式表 语言&#xff0c;用来描述 HTML 文档的呈现&#xff08;美化内容&#xff09; CSS 能够对网页中元素位置的排版进行 像素级 精确控制&#xff0c;实现美化页面…

HTML5(Web前端开发笔记第一期)

p.s.这是萌新自己自学总结的笔记&#xff0c;如果想学习得更透彻的话还是请去看大佬的讲解 目录 三件套标签标题标签段落标签文本格式化标签图像标签超链接标签锚点链接默认链接地址 音频标签视频标签 HTML基本骨架综合案例->个人简介列表表格表单input标签单选框radio上传…

【AIGC】计算机视觉-YOLO系列家族

YOLO系列家族 &#xff08;1&#xff09;YOLO发展史&#xff08;2&#xff09; YOLOX&#xff08;3&#xff09; YOLOv6&#xff08;4&#xff09; YOLOv7&#xff08;5&#xff09; YOLOv8&#xff08;6&#xff09; YOLOv9&#xff08;7&#xff09;YOLOv10&#xff08;8&…

The First项目报告:重塑 DeFi 流动性的革新者,ELX 即将登陆 The First

随着去中心化金融&#xff08;DeFi&#xff09;的持续发展&#xff0c;流动性问题一直是各类去中心化交易所&#xff08;DEX&#xff09;和项目方面临的核心挑战。传统的做市模式往往需要依赖中心化流动性提供者&#xff0c;而这些机构的资金控制能力可能影响代币价格波动&…