RabbitMQ延迟消息的实现

news2025/1/19 3:27:55

RabbitMQ延迟队列的实现

  • 延迟消息是什么
    • 延迟消息的实现
    • 死信交换机
      • 代码实现
    • 延迟消息插件


延迟消息是什么

延迟消息是将消息发送到MQ中,消费者不会立即收到消息,而是过一段时间之后才会收到消息,进行处理。在一些业务中,可以用到延迟消息,比如我们在成功下单一个商品后,需要立即付款,为了避免商品库存一直被占有,我们会给商品设置一个支付时间,如果在这段时间没有支付成功,就会恢复库存,删除订单,对于订单支付的超时删除我们是通过延迟消息来实现的,让消费者在支付超时之后查询用户是否支付,如果支付成功直接返回,如果支付失败就恢复库存删除订单。

延迟消息的实现

延迟消息由以下两种方式实现,第一种是通过绑定死信交换机实现,第二种通过延迟消息插件实现,推荐使用第二种,更加简单

死信交换机

满足以下三种情况之一的叫做死信:
1、在设置了过期时间的消息,放入队列中,超过了过期时间没有被处理的消息
2、消息消费失败(返回nack或者reject)并且不能重复入队
3、队列消息堆积满了,最早的消息叫做死信
我们可以给队列绑定参数指定交换机,那么死信会被投递到指定交换机。
消息队列实现原理:我们可以设置一组没有消费者的交换机和队列,设置另一组处理绑定死信的交换机、队列和消费者,来处理延迟消息。

代码实现

定义死信交换机等和消费延迟消息交换机等:


@Configuration
public class DelayConfiguration {
    /**
     * 定义死信交换机、队列以及绑定
     */
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange("dead.direct");
    }

    @Bean
    public Queue queue() {
        Queue queue = new Queue("dead.queue");
        queue.addArgument("x-dead-letter-exchange", "delay.direct");
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("dead");
    }

    /**
     * 定义处理延迟消息的交换机、队列和绑定
     */
    @Bean
    public DirectExchange exchange1() {
        return new DirectExchange("delay.direct");
    }

    @Bean
    public Queue queue1() {
        return new Queue("delay.queue");
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(exchange1()).with("dead");
    }
}

定义延迟消息监听器:

    @RabbitListener(queues = "delay.queue")
    public void listen(String msg){
        log.info(LocalDateTime.now()+": "+msg);
    }

测试:

@Test
    void sendDeadMsg() {
        rabbitTemplate.convertAndSend("dead.direct", "dead", "我是死信", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
//                设置过期消息时间
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
        });
    }

结果:
在这里插入图片描述
在这里插入图片描述
消费者在十秒钟后成功消费延迟消息

延迟消息插件

我们在之前通过死信交换机来实现延迟队列,但是死信交换机是专门用来存放无法处理的消息,并且使用死信交换机实现过于复杂,我们需要手动定义两个交换机和队列,因而RabbitMQ提供了延迟消息插件来让我们更简单的实现延迟消息。
原理:给消息设置延迟时间,当将消息放入MQ时,MQ的交换机不会立即将消息放入队列,而是会在交换机中暂存延迟时间过后将消息路由到队列中,可以让队列处理延迟消息。
安装插件:插件安装可以借鉴这篇博客
代码实现
消费者:

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "delay.queue",durable = "true"),
            // 开启延迟交换机
            exchange = @Exchange(name = "delay.direct",delayed = "true"),
            key = "dead"
    ))
    public void listen(String msg){
        log.info(msg);
    }
  @Test
    void sendDeadMsg() {
        rabbitTemplate.convertAndSend("delay.direct", "delay", "我是死信", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
//                设置延迟消息时间
                message.getMessageProperties().setDelay(10000);
                return message;
            }
        });
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2256064.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringMvc完整知识点一

SpringMVC概述 定义 SpringMVC是一种基于Java实现MVC设计模型的轻量级Web框架 MVC设计模型:即将应用程序分为三个主要组件:模型(Model)、视图(View)和控制器(Controller)。这种分离…

强化学习新突破:情节记忆与奖励机制引领多智能体协作

简介 本推文介绍了韩国科学技术院发表在人工智能顶会ICLR 2024上的论文《Efficient Episodic Memory Utilization of Cooperative Multi-Agent Reinforcement Learning》。该论文提出创新性高效情节记忆利用(Efficient Episodic Memory Utilization,EMU…

密码翻译

密码翻译 C语言实现C实现Java实现Python实现 💐The Begin💐点点关注,收藏不迷路💐 在情报传递过程中,为了防止情报被截获,往往需要对情报用一定的方式加密,简单的加密算法虽然不足以完全避免情报…

【C++初阶】第7课—标准模版库STL(string_1)

文章目录 1. 什么是STL2. STL六大组件3. 标准库中string类3.1 auto关键字3.2 范围for3.3 string类的类型3.4 string类的常用接口(string类对象的常见构造)3.5 string的析构和赋值运算符重载3.6 string类对象的容量操作 1. 什么是STL STL(standard template library—标准模板库…

uniapp扭蛋机组件

做了一个uniapp的扭蛋机组件,可以前往下载地址下载 支持vue2、3、h5页面微信小程序,其余小程序未测试 示例图片

MyBatis的工作流程是怎样的?

大家好,我是锋哥。今天分享关于【MyBatis的工作流程是怎样的?】面试题。希望对大家有帮助; MyBatis的工作流程是怎样的? MyBatis 的工作流程可以分为几个主要步骤:从配置、映射到执行 SQL,最终获取数据库结…

MYSQL PARTITIONING分区操作和性能测试

PARTITION OR NOT PARTITION IN MYSQl Bill Karwin says “In most circumstances, you’re better off using indexes instead of partitioning as your main method of query optimization.” According to RICK JAMES: “It is so tempting to believe that PARTITIONing wi…

[软件工程]九.可依赖系统(Dependable Systems)

9.1什么是系统的可靠性(reliability) 系统的可靠性反映了用户对系统的信任程度。它反映了用户对其能够按照预期运行且正常使用中不会失效的信心程度。 9.2什么是可依赖性(dependablity)的目的 其目的是覆盖系统的可用性&#x…

vue3中使用watchEffect和watch函数时应当防止内存泄漏

官方文档:https://cn.vuejs.org/api/reactivity-core.html#watcheffect 也就是说当使用他们两个时候,使用完成之后要及时停止他们,防止一直在运行,停止他们之后,也可以再次开启。 watchEffect()​ 立即运行一个函数…

Wwise SoundBanks内存优化

1.更换音频格式为Vorbis 2.停用多余的音频,如Random Container的随机脚步声数量降为2个 3.背景音乐勾选“Stream”。这样就让音频从硬盘流送到Wwise,而不是保存在内存当中,也就节省了内存 4.设置最大发声数Max Voice Instances 5.设置音频…

Windows宝塔面板下IIS环境如何部署SSL证书?

Windows宝塔面板下IIS环境如何部署SSL证书? 平时服务器linux宝塔用的较多,所以linux系统宝塔,如何部署SSL证书还是比较熟悉,今天遇到一个windows的部署SSL证书,还是头一次,所以记录一下,以防忘…

【计算机视觉】图像的几何变换

最常见的几何变换有仿射变换和单应性变换两种,最常用的仿射变换有缩放、翻转、旋转、平移。 1. 缩放 将图像放大或缩小会得到新的图像,但是多出的像素点如何实现----插值 1.1 插值方法 最近邻插值 双线性插值 cv2.resize() 是 OpenCV 中用于调整图像…

深入浅出 Go 语言:数组与切片

深入浅出 Go 语言:数组与切片 引言 在 Go 语言中,数组和切片是两种非常重要的数据结构,用于存储和操作一组相同类型的元素。虽然它们看起来相似,但在使用上有很大的区别。理解数组和切片的区别以及如何正确使用它们,…

基于超级电容和电池的新能源汽车能量管理系统simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 超级电容特性 4.2 电池特性 5.完整工程文件 1.课题概述 基于超级电容和电池的新能源汽车能量管理系统simulink建模与仿真。分析不同车速对应的电池,超级电容充放电变化情况。 2.系统仿…

y3编辑器文档3:物体编辑器

文章目录 一、物体编辑器简介1.1 界面介绍1.2 复用(导入导出)1.3 收藏夹(项目资源管理)1.4 对象池二、单位2.1 数据设置2.2 表现设置2.3 单位势力和掉率设置2.4 技能添加和技能参数修改2.5 商店2.5.1 商店属性设置2.5.2 商店物品设置三、装饰物3.1 属性编辑3.2 碰撞体积四、…

「嵌入式系统设计与实现」书评:学习一个STM32的案例

本文最早发表于电子发烧友论坛:【新提醒】【「嵌入式系统设计与实现」阅读体验】 学习一个STM32的案例 - 发烧友官方/活动 - 电子技术论坛 - 广受欢迎的专业电子论坛!https://bbs.elecfans.com/jishu_2467617_1_1.html 感谢电子发烧友论坛和电子工业出版社的赠书。 …

Qt Designer Ui设计 功能增加

效果展示 输入密码,密码错误,弹出提示 密码正确,弹出提示并且关闭原窗口 代码(只提供重要关键主代码)lxh_log.py代码: import sysfrom PySide6.QtWidgets import QApplication, QWidget, QPushButtonfrom …

RT Thread Studio新建STM32F407IG工程文件编译提示错误

编译提示错误 原因: RT 源码使用4.0.3的话,请用STM32F4支持包的0.2.2版本,就不会出错了。 如果支持包用0.2.3版本的话,需要用RT内核4.1.0版本。0.2.3 版本更新了一些针对内核4.1.0的驱动代码,这几个定义都是4.1.0里的。

智能制造标准体系建设指南

一、智能制造系统架构总览 智能制造作为当今制造业转型升级的核心,深度整合了新一代信息技术与传统制造工艺,催生出一个横跨产品全生命周期、纵贯多层级组织架构,并彰显多元智能特性的复杂系统。这一架构从生命周期、系统层级、智能特征三个…