RabbitMQ实现延迟队列

news2025/1/13 14:23:59

业务场景:

  • 延迟发送短信
  • 用户下单,若15分钟内用户未支付,取消订单
  • 预约工作会议,20分钟后通知所有参会人员

实现方式1:死信交换机

死信概念:当一个队列中的消息满足下列情况之一,就会变成死信

  • 消费者使用basic.rejectbasic.nack声明消费失败且消息的requeue(重新入队)参数设置为false
  • 消息超时未消费
  • 队列里的消息堆积满了,最早未被消费的消息就可能成为死信

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)。给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey

TTL(Time To Live)是指消息的存活时间
RabbitMQ可以对队列和消息分别设置TTL

在这里插入图片描述
代码实现

添加依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

rabbitmq配置

spring:
  rabbitmq:
    host: 127.0.0.1 # rabbitMQ的ip地址
    port: 5672 # 端口
    username: guest
    password: guest
    virtual-host: /

死信交换机配置

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TTLMessageConfig {

    @Bean
    public DirectExchange ttlDirectExchange(){
        return new DirectExchange("ttl.direct");
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                // 创建并持久化队列
                .durable("ttl.queue")
                // 设置队列的消息存活时间
                .ttl(10000)
                // 给该队列绑定死信交换机并指定routing-key路由到死信队列
                .deadLetterExchange("dl.direct")
                .deadLetterRoutingKey("dl")
                .build();
    }

    @Bean
    public Binding ttlBinding(){
        // 将ttl.queue队列绑定到ttl.direct
        return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
    }
}

监听队列

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SpringRabbitListener {

	// 监听队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dl.queue", durable = "true"),
            exchange = @Exchange(name = "dl.direct"),
            key = "dl"
    ))
    public void listenDlQueue(String msg) {
        log.info("消费者接收到了dl.queue的延迟消息");
    }
}

测试

@Test
    public void testTTLMessage() {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                //消息持久化(默认)
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                // 消息过时时间(当消息的过时时间与队列的过时时间都设置时,值小的生效)
                .setExpiration("15000")
                .build();
        // 2.发送消息
        rabbitTemplate.convertAndSend("ttl.direct", "ttl", message);
        // 3.记录日志
        log.info("消息已经成功发送!");
    }

实现方式2:使用插件实现延迟队列

GitHub地址
1、下载完成之后进行解压,此处推荐bandzip进行解压,并且将解压之后的文件夹放到rabbitmq的安装目录下的plugins目录下

2、进入到sbin目录下使用cmd执行以下指令来启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3、然后重启rabbitmq服务

可以看到交换机类型添加了x-delayed-message类型,此类型是基于fanout、direct、topic类型的
在这里插入图片描述

监听队列

  @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            // delayed属性指定该交换机为延迟交换机
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg) {
        log.info("消费者接收到了delay.queue的延迟消息");
    }

测试

	@Test
    public void testSendDelayMessage() throws InterruptedException {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                // x-delay设置延时时间
                .setHeader("x-delay", 15000)
                .build();
        // 2.准备CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3.发送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

        log.info("发送消息成功");
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/133895.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数字验证学习笔记——SystemVerilog芯片验证19 ——线程的控制

一、线程的控制 1.1 fork并行线程语句块 fork join_any 当T3结束时&#xff0c;退出fork join_any 后&#xff0c;T1和T2还会执行。 1.1.1 fork … join 上述代码中&#xff0c;在fork join中开辟了4个子线程&#xff0c;当4个子线程执行完之后&#xff0c;才能执行下面的$di…

《垃圾回收算法手册 自动内存管理的艺术》——并发算法预备知识(笔记)

文章目录十二、特定语言相关内容12.1 终结12.1.1何时调用终结方法12.1.2 终结方法应由哪个线程调用12.1.3 是否允许终结方法彼此之间的并发12.1.4 是否允许终结方法访问不可达对象12.1.5 何时回收已终结对象12.1.6 终结方法执行出错时应当如何处理12.1.7 终结操作是否需要遵从某…

指针进阶之字符指针(超详细)

文章目录一、回顾二、字符指针1.基本用法2.误区&#xff08;1&#xff09;字符指针存放字符串首元素地址&#xff08;2&#xff09;输出问题3.内存布局三、字符指针与字符串数组1.字符指针2.字符串数组四、面试题1.One2.Two3.探究4.补充五、地址问题六、字符数组与字符串数组1.…

普通索引和唯一索引,应该怎么选择?

在前面的基础篇文章中&#xff0c;我给你介绍过索引的基本概念&#xff0c;相信你已经了解了唯一索引和普通索引的区别。今天我们就继续来谈谈&#xff0c;在不同的业务场景下&#xff0c;应该选择普通索引&#xff0c;还是唯一索引&#xff1f; 假设你在维护一个市民系统&…

基于springboot+mybatis+mysql+html实现在线教育平台系统

基于springbootmybatismysqlhtml实现在线教育平台系统1. 技术介绍2.功能介绍3. 前端3.1 首页3.2 课程3.3 登入3.4 商品兑换3.5 课程发布4. 后端4.1 登录4.2 系统管理4.3 课程管理4.4 教师管理4.5 导航菜单4.6 轮播管理4.7 通知管理4.8 礼品管理1. 技术介绍 核心技术&#xff1…

【电工技术】期末复习题

1.电路是为实现人们的某种需求&#xff0c;由 电源 、中间环节和负载三部分按一定方式组合起来&#xff0c;使电流流通的整体。 2&#xff0e;在使用叠加定理对电路进行分析时&#xff0c;通常要对电源作除源处理&#xff0c;处理方法是将各个理想电压源 短接 …

ArcGIS基础实验操作100例--实验33计算栅格统计参数

本实验专栏参考自汤国安教授《地理信息系统基础实验操作100例》一书 实验平台&#xff1a;ArcGIS 10.6 实验数据&#xff1a;请访问实验1&#xff08;传送门&#xff09; 高级编辑篇--实验33 计算栅格统计参数 目录 一、实验背景 二、实验数据 三、实验步骤 &#xff08;1&…

2022年终总结与展望

2022年终总结 自2019年3月13日入驻CSDN&#xff0c;已经三年零九个月了。截至2022年12月31日&#xff0c;CSDN博客已发原创博文112篇&#xff0c;粉丝3616个&#xff0c;访问量超过157万次。 2019年12月31日数据情况&#xff1a; 2020年12月31日数据情况&#xff1a; 2021年1…

7-9 包装机

一种自动包装机的结构如图 1 所示。首先机器中有 N 条轨道&#xff0c;放置了一些物品。轨道下面有一个筐。当某条轨道的按钮被按下时&#xff0c;活塞向左推动&#xff0c;将轨道尽头的一件物品推落筐中。当 0 号按钮被按下时&#xff0c;机械手将抓取筐顶部的一件物品&#x…

尚医通- Nacos服务注册 医院列表接口(二十一)

目录&#xff1a; &#xff08;1&#xff09;后台系统-医院管理-需求和Nacos启动 &#xff08;2&#xff09;医院列表-Nacos注册服务 &#xff08;3&#xff09;医院列表接口-初步实现 .&#xff08;1&#xff09;后台系统-医院管理-需求和Nacos启动 之前我们完成了数据相…

基于Java+Swing实现捕鱼达人游戏(含课程报告)

基于JavaSwing实现捕鱼达人游戏&#xff08;含课程报告&#xff09;一、系统介绍1、开发背景2、基本内容、实现方法及主要技术实现目标3实现目标二、功能展示三、其他系统一、系统介绍 1、开发背景 捕鱼达人这个项目是一个娱乐性的游戏开发&#xff0c;本次游戏的程序设计包含…

Spring6笔记4

十四、GoF之代理模式 14.1 对代理模式的理解 代理模式中有一个非常重要的特点&#xff1a;对于客户端程序来说&#xff0c;使用代理对象时就像在使用目标对象一样。【在程序中&#xff0c;目标需要被保护时】 业务场景&#xff1a;系统中有A、B、C三个模块&#xff0c;使用这…

移动Web【Flex布局模型构成 主轴对齐方式 侧轴对齐方式 伸缩比】

文章目录Flex布局Flex布局模型构成主轴对齐方式侧轴对齐方式伸缩比Flex布局 思考 多个盒子横向排列使用什么属性&#xff1f; 浮动 设置盒子间的间距使用什么属性&#xff1f; margin 需要注意什么问题&#xff1f; 浮动的盒子脱标 Flex布局/弹性布局&#xff1a; 是一种浏览…

06-07-SpringAop

介绍下AspectJ和AOP和关系 AspectJ是java编程语言的无缝的面向方面的扩展&#xff0c;可以在java代码的字节码中植入切面代码。 AspectJ 是静态代理的增强&#xff0c;所谓的静态代理就是 AOP 框架会在编译阶段生成 AOP 代理类&#xff0c;因此也称为编译时增强。 AspectJ 是…

手把手代码实现五级流水线CPU——第一篇:初级顺序流水线

文章目录指令系统编码格式一、基础&#xff1a;顺序结构1.取值阶段&#xff1a;2.译码阶段3.执行阶段4.访存阶段5.写回阶段6.更新PC阶段详细硬件结构指令在各个阶段完成的操作C代码实现指令系统 编码格式 一、基础&#xff1a;顺序结构 1.取值阶段&#xff1a; 根据icode还可以…

【FPGA开发】Verilog 基础

写在前面&#xff1a;本章将对 Verilog 进行简要介绍&#xff0c;并对其基本特性进行讲解说明。之后&#xff0c;我们将按步骤演示如何使用 Vivado 创建简单项目。手动实践部分将根据我们提供的 .v 和 .tb 代码&#xff0c;跟着步骤跑出 Simulation 结果即可。 Ⅰ. Verilog 基础…

Odoo 16 企业版手册 - 库存管理之产品追溯

产品追溯 Odoo提供的产品可追溯性功能将有助于跟踪和跟踪产品的每个组件。在库存移动的每个阶段跟踪产品对于控制所有操作是必要的。为了确保有效监控库存的走势&#xff0c;批号和序列号发挥着重要作用。从制造过程到交付操作&#xff0c;产品可追溯性将保持适当的跟踪&#x…

Mixlab 的自我介绍

‍‍‍‍2022在探索元宇宙落地过程中&#xff0c;走过不少弯路&#xff0c;本着 “孵化” 的初心&#xff0c;我们将继续探索面向未来的社区模式。1 / Mixlab 无界社区社区即服务&#xff0c;以此作为基础&#xff0c;孵化各种形态的产品/服务。在2022的白皮书记录了我们做社区…

数据结构与算法—链表之单链表

文章目录链表单链表结构和特点创建添加修改删除2023年的第一篇文章在开发过程中&#xff0c;选择合适的数据结构是很重要的&#xff0c;可以快速处理数据的存储及使用问题。计划有时间慢慢系统的学习《数据结构与算法》&#xff0c;看看视频&#xff0c;练习实践&#xff0c;最…

国产FPGA应用--易灵思Programming Mode完全解析

本文介绍易灵思的几种配置模式&#xff0c;方便大家参考。 一、易灵思下载模式&#xff1a; 二、下载模式选择&#xff1a; 1、SPI Active mode 时序图如下&#xff1a; 2、SPI Passive Mode 时序图如下&#xff1a; SPI Active using JTAG Bridge 实际项目中&#xff0c;SPI…