SpringBoot整合RabbitMQ实现延迟队列功能

news2024/11/30 20:39:31

请添加图片描述

👨🏻‍💻 热爱摄影的程序员
👨🏻‍🎨 喜欢编码的设计师
🧕🏻 擅长设计的剪辑师
🧑🏻‍🏫 一位高冷无情的编码爱好者
大家好,我是 DevOps 工程师
欢迎分享 / 收藏 / 赞 / 在看!

延迟队列是一种常见的消息队列模式,用于在一定时间后处理消息。在本文中,我们将探讨如何使用 Spring Boot 和 RabbitMQ 实现延迟队列功能。

通常情况下,生产者将消息发送给普通交换机,由普通交换机发送给普通的队列给消费者进行消费,当消息被拒绝、消息 TTL 过期或者队列达到最大长度时,消息成为死信消息,将被丢弃给死信交换机处理,由死信交换机发送给死信队列进行下一步处理。

但是,实现死信队列上述步骤可以简化为:生产者直接将消息发送给死信交换机并设置过期时间,如果消息 TTL 达到,则被丢弃到死信队列,而此时监听着死信队列的消费者就可以及时地消费到消息。由此可以通过死信队列的机制实现延迟队列的功能。
请添加图片描述

首先,使用 Docker 构建好 RabbitMQ 容器服务。注意,笔者使用的是自行构建的带有延迟插件的镜像 hongyoudan/rabbitmq-management-delayed:3.12,大家可以使用笔者构建好的镜像直接创建容器,如果使用其他镜像需关注是否安装了延迟插件 rabbitmq_delayed_message_exchange,如果未安装,需自行安装,因为要实现 RabbitMQ 延迟队列的功能,需要开启该插件。

docker run -d \
--name rabbitmq \
-p 5672:5672 -p 15672:15672 \
hongyoudan/rabbitmq-management-delayed:3.12

初始化项目,使用 Spring 初始化器新建项目。

引入 Web 和 AMQP 两个依赖项:

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
 </dependency>

编写配置文件:

server:
  port: 3031

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5623
    username: guest
    password: guest

编写 RabbitMQ 配置类:

/**
 * @Author: hayden
 * @Date: 2023-09-28
 * @Description: RabbitMQ配置类
 */
@Configuration
public class RabbitMQConfig {

    /**
     * 定义延迟队列
     */
    @Bean
    public Queue delayedQueue() {
        // 参数:name 队列名称,durable 是否持久化,exclusive 是否排他,autoDelete 是否自动删除
        return new Queue(DELAY_QUEUE_NAME, true, false, false);
    }

    /**
     * 定义延迟交换机
     */
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        // 参数:name 交换机名称,type 交换机类型,durable 是否持久化,autoDelete 是否自动删除,arguments 参数
        return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    /**
     * 将延迟队列绑定到延迟交换机
     */
    @Bean
    public Binding delayBinding(Queue delayedQueue, CustomExchange delayedExchange) {
        // 参数:队列,交换机,路由键
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAY_ROUTING_KEY).noargs();
    }
}

通用常量类:

/**
 * @Author: hayden
 * @Date: 2023-09-28
 * @Description: 常量类
 */
public class Constant {

    public static final String DELAY_QUEUE_NAME = "delay_queue";

    public static final String DELAY_EXCHANGE_NAME = "delay_exchange";

    public static final String DELAY_ROUTING_KEY = "delay_routing_key";
}

生产者:

/**
 * @Author: hayden
 * @Date: 2023-09-28
 * @Description: 生产者
 */
@Component
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public String sendDelayMessage(String message, int delayTime) {
        // 参数:交换机名称,路由键,消息内容,消息处理器(设置消息的延迟时间)
        rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_ROUTING_KEY, message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(delayTime);
                return message;
            }
        });
        String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        System.out.println("发送消息:" + message + ",延迟时间:" + delayTime + "ms" + ",当前时间:" + time);
        return "发送消息:" + message + ",延迟时间:" + delayTime + "ms" + ",当前时间:" + time;
    }
}

消费者:

/**
 * @Author: hayden
 * @Date: 2023-09-28
 * @Description: 消费者
 */
@Component
public class Consumer {

    @RabbitListener(queues = DELAY_QUEUE_NAME)
    public void processDelayMessage(String message) {
        String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        System.out.println("消费消息: " + message + ",当前时间:" + time);
    }
}

控制器:

/**
 * @Author: hayden
 * @Date: 2023-09-28
 * @Description: 控制器
 */
@RestController
public class SendMessageController {

    @Autowired
    private Producer producer;

    @GetMapping("/sendMessage")
    public String sendMessage(@RequestParam("message") String message, @RequestParam("delayTime") int delayTime) {
        return producer.sendDelayMessage(message, delayTime);
    }
}

完整项目结构:

在这里插入图片描述

使用接口测试工具 Apifox 进行测试。测试发送消息为:aaa,延迟时间为 5 秒。在控制台中可以看到在发送消息 5 秒后,消息才被消费。

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1051701.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AOP 编程

目录 ​编辑一、AOP 编程 1、AOP 概念 2、AOP 编程的开发步骤 3、切面的名词解释 二、AOP 的底层实现原理 1、核心问题 2、动态代理类的创建 &#xff08;1&#xff09;JDK 的动态代理创建 &#xff08;2&#xff09;CGlib 的动态代理 &#xff08;3&#xff09;总结…

[红明谷CTF 2021]write_shell %09绕过过滤空格 ``执行

目录 1.正常短标签 2.短标签配合内联执行 看看代码 <?php error_reporting(0); highlight_file(__FILE__); function check($input){if(preg_match("/| |_|php|;|~|\\^|\\|eval|{|}/i",$input)){ 过滤了 木马类型的东西// if(preg_match("/| |_||php/&quo…

设计模式5、原型模式 Prototype

解释说明&#xff1a;使用原型实例指定待创建对象的类型&#xff0c;并且通过复制这个原型阿里创建型的对象 UML 结构图&#xff1a; 抽象原型&#xff08;Prototype&#xff09;&#xff1a;规定了具体原型对象必须实现的clone()方法 具体原型&#xff08;ConcretePrototype&…

CVE-2023-5129 libwebp堆缓冲区溢出漏洞影响分析

漏洞简述 近日苹果、谷歌、Mozilla和微软等公司积极修复了libwebp组件中的缓冲区溢出漏洞&#xff0c;相关时间线如下&#xff1a; 9月7日&#xff0c;苹果发布紧急更新&#xff0c;修复了此前由多伦多大学公民实验室报告的iMessage 0-click 漏洞&#xff0c;漏洞被认为已经被…

爬取北京新发地当天货物信息并展示十五天价格变化(三)---获取物品十五天内的价格

。。。。。。。。。。。。。。。。。。。。。。 1.网页请求一下内容2.通过爬虫进行请求3.获取商品十五天详细数据并绘制折线图4.项目详细代码 1.网页请求一下内容 通过抓包我们发现一共七个参数 limit: 20 # 一页多少数据 current: …

自制代码编辑器:CASM Editor

哔哩哔哩演示视频&#xff1a;我使用python自制了一个代码编辑器——CASM Editor_哔哩哔哩_bilibili 源代码&#xff1a; import idlelib.colorizer as idc import idlelib.percolator as idp import os import sys import threading import time import tkinter as T_tk imp…

计算机视觉: 可控的高质量人体生成

背景 关于人体动作的生成范式目前主流的方向可以分为以下两种: Sequence based motion generation: 给定控制信号然后一次性生成连续的动作&#xff0c;能生成一些连续高阶语义的动作信号&#xff0c;因为其能看到整个动作信号。eg: MDM: Human Motion Diffusion Model, Teve…

机器学习(20)---神经网络详解

神经网络 一、神经网络概述1.1 神经元模型1.2 激活函数 二、感知机2.1 概述2.2 实现逻辑运算2.3 多层感知机 三、神经网络3.1 工作原理3.2 前向传播3.3 Tensorflow实战演示3.3.1 导入数据集查看3.3.2 数据预处理3.3.3 建立模型3.3.4 评估模型 四、反向传播五、例题5.1 题15.2 题…

【SQL】Mysql 时区设置解决--20230928

https://blog.csdn.net/qq_44392492/article/details/108717616 输入命令show variables like “%time_zone%”;&#xff08;注意分号结尾&#xff09;设置时区&#xff0c;输入 set global time_zone “8:00”; 回车,然后退出重启&#xff08;一定记得重启&#xff0c;不然查…

Mysql 本地计算机无法启动 mysql 服务 错误 1067:进程意外终止

有时候一段时间本地mysql不用&#xff0c;在连接本地数据库的时候&#xff0c;会报mysql无法连接出现错误提示10061错误&#xff0c; 这时候一般是本地mysql服务没有启动 去左下角搜“服务”&#xff0c;进入后选择Mysql&#xff0c;点击启动&#xff08;我的截图是已经启动好…

C#,数值计算——Ranfib的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { /// <summary> /// Implements Knuths subtractive generator using only floating operations. See /// text for cautions. /// </summary> public class Ranfib { p…

通过茶叶酒水小程序商城的作用是什么?

茶叶酒水往往会在一起经营&#xff0c;同时又具备较强的送礼属性&#xff0c;需求度较高但经营商家同样不少&#xff0c;同行竞争激烈&#xff0c;加之同城生意有限、外地客户难以拓展、销售营销不足、品牌宣传效果差等痛点&#xff0c;传统酒水茶叶门店需要线上带来增长。 那…

大数据Flink(九十二):DML:集合操作

文章目录 DML:集合操作 DML:集合操作 集合操作支持 Batch\Streaming 任务。 UNION:将集合合并并且去重。

uni-app:js修改元素样式(宽度、外边距)

效果 代码 1、在<view>元素上添加一个ref属性&#xff0c;用于在JavaScript代码中获取对该元素的引用&#xff1a;<view ref"myView" id"mybox"></view> 2、获取元素引用 &#xff1a;const viewElement this.$refs.myView.$el; 3、修改…

【Flink】

事件驱动型应用 核心目标&#xff1a;数据流上的有状态计算 Apache Flink是一个框架和分布式处理引擎&#xff0c;用于对无界或有界数据流进行有状态计算。 运行逻辑 状态 把流处理需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态。这就是所谓的“…

mrctf2020_shellcode_revenge

mrctf2020_shellcode_revenge Arch: amd64-64-little RELRO: Full RELRO Stack: No canary found NX: NX disabled PIE: PIE enabled RWX: Has RWX segments64位&#xff0c;开了PIE和RELRO&#xff0c;看到RWX出来&#xff0c;就感觉是shellcode了…

如何使用PyInstaller打包Python应用(包含参数详解,spec文件详解,反编译和防止反编译)

文章目录 介绍PyInstaller安装PyInstaller参数及使用方法PyInstaller打包技巧和注意事项反编译和防止反编译介绍PyInstaller PyInstaller是一个强大的Python打包工具,它可以将Python程序打包成独立的可执行文件,方便在不同的操作系统上分发和运行。使用PyInstaller,你可以将…

react.js在visual code 下的hello World

想学习reacr.js &#xff0c;就开始做一个hello world。 我的环境是visual code &#xff0c;所以我找这个环境下的例子。参照&#xff1a; https://code.visualstudio.com/docs/nodejs/reactjs-tutorial 要学习react.js &#xff0c;还得先安装node.js&#xff0c;我在visual …

Mysql高级语句(进阶查询语句、数据库函数、连接查询)

Mysql高级语句&#xff08;进阶查询语句、MySQL数据库函数、连接查询 一、mysql查询语句1.1、 select ----显示表格中一个或数个字段的所有数据记录1.2、 distinct ----不显示重复的数据记录1.3、where ----有条件查询1.4、 and or ----且 或1.5 、in----显示已知的值的数据记录…

9_分类算法—决策树

文章目录 1 信息熵1.1 比特化&#xff08;Bits&#xff09;1.2 一般化的比特化&#xff08;Bits&#xff09;1.3 信息熵&#xff08;Entropy&#xff09;1.3.1 熵越大混乱程度越大 1.4 条件熵H&#xff08;YIX&#xff09; 2 决策树2.1 什么是决策树2.2 决策树构建过程&#xf…