Spring RabbitMQ 实现消息队列延迟

news2025/1/14 0:44:41

1.概述

要实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange插件。但RabbitMQ版本必须是3.5.8以上才支持该插件,否则得用其死信队列功能。

2.安装RabbitMQ延迟插件

  • 检查插件 使用rabbitmq-plugins list命令用于查看RabbitMQ安装的插件。

rabbitmq-plugins list

检查RabbitMQ插件安装情况

a57655d2aca32ef7138f7e33f48f86d5.png
  • 下载插件

如果没有安装插件,则直接访问官网进行下载

https://www.rabbitmq.com/community-plugins.html
a66bfbcf3c1c9602b0f8b2a6fc82c7b3.png d896c1601fcf8339dc9720288c5f93b7.png
  • 安装插件

下载后,将其拷贝到RabbitMQ安装目录的plugins目录;并进行解压,如:

E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\plugins
b0c8e53d6f10563e253c63a9f4345214.png

打开cmd命令行窗口,如果系统已经配置RabbitMQ环境变量,则直接执行以下的命令进行安装;否则需要进入到RabbitMQ安装目录的sbin目录。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
11c3b67b5d724326d0817f624d3ecc95.png

3.实现RabbitMQ消息队列延迟功能

  • pom.xml配置信息文件中,添加相关依赖文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.olive</groupId>
 <artifactId>rabbitmq-spring-demo</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.7.7</version>
  <relativePath />
 </parent>
 <dependencies>
  <!--rabbitmq-->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
  </dependency>
  
 <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.5</version>
  </dependency>

 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.8</source>
     <target>1.8</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
  • application.yml配置文件中配置RabbitMQ信息

server:
  port: 8080
spring:
  #给项目来个名字
  application:
    name: rabbitmq-spring-demo
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin123
    #虚拟host。可以不设置,使用server默认host;不同虚拟路径下的队列是隔离的
    virtual-host: /
  • RabbitMQ配置类

package com.olive.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * RabbitMQ配置类
 **/
@Configuration
public class RabbitMqConfig {
 
 public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";
 
 public static final String DELAY_QUEUE_NAME = "delay_queue_name";
 
 public static final String DELAY_ROUTING_KEY = "delay_routing_key";

 @Bean
 public CustomExchange delayExchange() {
  Map<String, Object> args = new HashMap<>();
  args.put("x-delayed-type", "direct");
  return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
 }

 @Bean
 public Queue queue() {
  Queue queue = new Queue(DELAY_QUEUE_NAME, true);
  return queue;
 }

 @Bean
 public Binding binding(Queue queue, CustomExchange delayExchange) {
  return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
 }
}
  • 发送消息

实现消息发送,设置消息延迟5s。

package com.olive.service;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.olive.config.RabbitMqConfig;

/**
 * 消息发送者
 **/
@Service
public class CustomMessageSender {
 
 @Autowired
 private RabbitTemplate rabbitTemplate;

 public void sendMsg(String msg) {
  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  System.out.println("消息发送时间:" + sdf.format(new Date()));
  rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, 
    RabbitMqConfig.DELAY_ROUTING_KEY, 
    msg, 
    new MessagePostProcessor() {
     @Override
     public Message postProcessMessage(Message message) throws AmqpException {
      // 消息延迟5秒
      message.getMessageProperties().setHeader("x-delay", 5000);
      return message;
     }
    });
 }
}
  • 接收消息

package com.olive.service;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.olive.config.RabbitMqConfig;

/**
 * 消息接收者
 **/
@Component
public class CustomMessageReceiver {
 
 @RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME)
 public void receive(String msg) {
  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  System.out.println(sdf.format(new Date()) + msg);
  System.out.println("Receiver:执行取消订单");
 }
}
  • 测试验证

package com.olive.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.olive.service.CustomMessageSender;

@RestController
public class DelayMessageController {
 
 @Autowired
 private CustomMessageSender customMessageSender;
 
 @GetMapping("/sendMessage")
 public String sendMessage() {
  // 发送消息
  customMessageSender.sendMsg("你已经支付超时,取消订单通知!");
  return "success";
 }

}

发送消息,访问

http://127.0.0.1:8080/sendMessage

查看控制台打印的信息

67b0bd789d8dc7b5b3b6073c2d9241c9.png

3a05b0b00bb8222770378bbcf8c9fe4b.gif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/474733.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++引用进阶篇:让你的程序更加高效、安全、简洁

文章目录 前言1. 引用和临时数据&#x1f351; 什么样的临时数据会放到寄存器中&#x1f351; 关于常量表达式&#x1f351; 引用也不能指代临时数据&#x1f351; 引用作为函数参数 2. 为const引用创建临时变量3. const引用与转换类型&#x1f351; 引用类型的函数形参请尽可能…

SPFA 算法:实现原理及其应用

文章目录 一、前言二、SPFA 算法1、SPFA算法的基本流程2、代码详解 三、SPFA 算法已死 &#xff1f; 一、前言 SPFA算法&#xff0c;全称为Shortest Path Faster Algorithm&#xff0c;是求解单源最短路径问题的一种常用算法&#xff0c;它可以处理有向图或者无向图&#xff0…

PySide2 QWebEngine与Web js交互

文章目录 单向交互双向传值案例 单向交互 QWebEngineView加载web页面&#xff0c;web页面中点击按钮&#xff0c;执行js代码&#xff0c;js的返回值传给QWebEnginePage&#xff0c;使用python进行保存结果。 单向&#xff0c;js向python(PySide2)端传输数据。 前端实现 <…

前端web3入门脚本三:一键完成与dex的交互,羊毛党必备

前言 该脚本用途&#xff1a;一键可以完成与dex的所有交互&#xff0c;包括2次swap&#xff0c;添加/移除流动性&#xff0c;以及farm和提取LP。一次运行可以有6条交易记录。 无论是个人单刷还是羊毛党批量地址刷交互都完美适配。当然反女巫方案不在这次文章的讨论范围内。 一、…

javascript中find(), filter(), some(), every(), map()等方法介绍

1、find() find() 用于找出第一个符合条件的数组成员。它的参数是一个回调函数&#xff0c;所有数组成员依次执行该回调函数&#xff0c;直到找出第一个返回值为true的成员&#xff0c;然后返回该成员。如果没有符合条件的成员&#xff0c;则返回undefined。 find()方法的回调…

利用Matab进行覆盖计算----战术计算

在 contour函数中添加如下代码 %------- 计算畅通区面积和占比例 --------% Spi*maxrange*maxrange/1e6; S0 nnz(isInRange)*reslons*reslats/1e6;isnn ~isnan(cdata); cdata0 cdata(isnn); S1numel(cdata0)*reslons*reslats/1e6;AS1/S0; % 畅通区所占比例; fprintf("…

CLion开发工具 | 06 - 使用CLion开发STM32(无需Cmake)

专栏介绍 文章目录 专栏介绍一、准备工作1. 工具准备2. 裸机工程准备二、使用CLion打开工程三、基于CLion写代码1. LED blink代码2. printf重定位代码四、编译工程1. 编译配置2. 选择编译目标3. 编译五、烧录1. OpenOCD基础知识(了解)2. 设置CLion路径3. 新建CLion配置文件4.…

面试总结,4年经验

小伙伴你好&#xff0c;我是田哥。 本文内容是一位星球朋友昨天面试遇到的问题&#xff0c;我把核心的问题整理出来了。 1&#xff1a;Java 层面的锁有用过吗&#xff1f;除了分布式锁以外 是的&#xff0c;Java中提供了多种锁机制来保证并发访问数据的安全性和一致性。常见的J…

分析GC日志解读

目录 GC分类 GC日志分类 GC日志结构剖析 透过日志看垃圾收集器 透过日志看GC原因 GC日志分析工具 GC分类 针对HotSpot VM的实现&#xff0c;它里面的GC按照回收区域又分为两大种类型&#xff1a;一种是部分收集&#xff08;Partial GC&#xff09;&#xff0c;一种是整堆…

VPN 虚拟专用网络隧道

1 什么是VPN VPN(全称&#xff1a;Virtual Private Network)虚拟专用网络&#xff0c;是依靠ISP和其他的NSP&#xff0c;在公共网络中建立专用的数据通信的网络技术&#xff0c;可以为企业之间或者个人与企业之间提供安全的数据传输隧道服务。在VPN中任意两点之间的链接并没有…

从零开始学习Linux运维,成为IT领域翘楚(二)

文章目录 &#x1f525;Linux系统目录结构&#x1f525;Linux用户和用户组&#x1f525;Linux用户管理 &#x1f525;Linux系统目录结构 文件系统组织结构 ⭐ /lib 系统开机所需要最基本的动态链接共享库&#xff0c;其作用类似于Windows里的DLL文件。 几乎所有的应用程序都需…

PACS系统源码,大型医院PACS源码集成三维重建

PACS系统为医院提供包括放射、超声、核医学、病理、内窥镜、心电图室在内的所有影像检查数字化的一体化解决方案。 它涵盖了传统PACS和RIS系统的所有功能&#xff0c;以构建全数字化影像科为目标&#xff0c;致力于实现对医院所有影像数据的统一管理、影像检查工作流的自动化&a…

POJ3704 括号匹配问题 递归方法

目录 题目 算法 完整代码 题目 参考 递归: https://blog.csdn.net/qq_45272251/article/details/103257953 利用了递归, 但思路稍复杂了 循环: https://blog.csdn.net/weixin_50340097/article/details/114579805 (看起来是递归其实是循环. 每次递归其实是循环内一次迭…

牛客网Python入门103题练习|【07--循环语句(2)】

⭐NP55 2的次方数 描述 在Python中&#xff0c; * 代表乘法运算&#xff0c; ** 代表次方运算。 请创建一个空列表my_list&#xff0c;使用for循环、range()函数和append()函数令列表my_list包含底数2的 [1, 10] 次方&#xff0c;再使用一个 for 循环将这些次方数都打印出来…

【Linux问题合集001】Linux中如何将用户添加到sudo组中的步骤

看教程的前提我的linux当前用户是zhou&#xff0c;看以下步骤时将zhou看做你的liunx当前用户就行了&#xff1a; 一、 以root用户登录到系统。 在Linux系统中&#xff0c;root用户是具有完全系统管理权限的超级用户。要以root用户身份登录到系统&#xff0c;您可>以使用以下…

继续打脸水货教程:关于可变对象与不可变对象

入门教程、案例源码、学习资料、读者群 请访问&#xff1a; python666.cn 大家好&#xff0c;欢迎来到 Crossin的编程教室 &#xff01; 今天这篇我要继续来打脸互联网上各种以讹传讹的水货教程。 前阵子我们聊了下Python中有关函数参数传递以及变量赋值的一些内容&#xff1a;…

LeetCode0014.最长公共前缀 Go语言AC笔记

时间复杂度&#xff1a;O(n) 解题思路 纵向扫描法。先扫描所有字符串的第一个字符&#xff0c;如果都相同就再次扫描所有字符串的第二个字符&#xff0c;直到某一字符串被扫描完或者出现了不相同的字符&#xff0c;此时就返回该字符串该字符的前缀。 为了确定所有字符是否相同…

【flask】三种路由和各自的比较配置文件所有的字母必须大写if __name__的作用核心对象循环引用的几种解决方式--难

三种路由 方法1&#xff1a;装饰器 python C#, java 都可以用这种方式 from flask import Flask app Flask(__name__)app.route(/hello) def hello():return Hello world!app.run(debugTrue)方法2: 注册路由 php python from flask import Flask app Flask(__name__)//app…

Java IO流第一章

Java IO流第一章 &#xff08;一&#xff09;简介 本文主要是从最基础的BIO式通信开始介绍到NIO , AIO&#xff0c;读者可以清晰的了解到阻塞、同步、异步的现象、概念和特征以及优缺点。 通信技术整体解决的问题 局域网内的通信要求。多系统间的底层消息传递机制。高并发下…

如何自制云平台,并实现远程访问控制?

除了阿里、腾讯各种云&#xff0c;计算机大神们都想自己搭建IoT云平台。今天小编跟大家分享一种用UbuntuEMQXNode-RED方式自制IoT云平台的方法&#xff0c;并实现无公网IP随时访问远程数据&#xff01; 第一步 Step1搭建EMQX服务器 1.搭建IoT平台需要一个服务器&#xff0c;这…