RabbitMQ--延迟队列

news2025/1/22 16:13:05

(一)延迟队列

1.概念

 延迟队列是一种特殊的队列,消息被发送后,消费者并不会立刻拿到消息,而是等待一段时间后,消费者才可以从这个队列中拿到消息进行消费

2.应用场景

 延迟队列的应用场景很多,就比如大部分定时的场景,我们都可以利用延迟队列例如:闹钟定时,预约会议,空调定时开关等

 但是RabbitMQ是没有直接给我们提供延迟队列的,但是我们可以通过上一篇博客说的ttl和死信来达到延迟队列的效果,具体操作如下

 首先我们有一个交换机和一个队列,然后此队列又指定一个死信交换机,死信交换机绑定一个死信队列,然后我们消费者并不是从正常队列中获取消息,而是从死信队列中获取消息,通过给消息/队列设置过期时间来影响消息到达死信队列的时间,消费者拿到消息就会延迟,这样就可以模拟出延迟的效果。

那接下来就是我们的代码实现

首先我们通过设置队列ttl来实现

 @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();
    }
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(5000)
                .deadLetterExchange(Constants.DEAD_EXCHANGE)
                .deadLetterRoutingKey("dead")
                .build();
    }
    @Bean("ttlBind")
    public Binding ttlBind(@Qualifier("ttlExchange") Exchange ackExchange,@Qualifier("ttlQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("ttl").noargs();
    }
    @Bean("deadExchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();
    }
    @Bean("deadQueue")
    public Queue deadQueue(){
        return QueueBuilder.durable(Constants.DEAD_QUEUE).build();
    }
    @Bean("deadBind")
    public Binding deadBind(@Qualifier("deadExchange") Exchange ackExchange,@Qualifier("deadQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("dead").noargs();
    }

然后生产者代码没什么变化

 @RequestMapping("ttl")
    public String TTLPro(){
        String s1="ttl test";
        Message message=new Message(s1.getBytes(StandardCharsets.UTF_8));
//        message.getMessageProperties().setExpiration("10000");
        RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);
        return "发送成功";
    }

只不过消费者订阅的是死信队列

@RabbitListener(queues = Constants.DEAD_QUEUE)
    public void ListenerQueue2(Message message,Channel channel) throws IOException {
        long Tag=message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("接收到消息: "+ new String(message.getBody())+" TagID: "
                    +Tag);
            int num=3/0;     //模拟失败
            channel.basicAck(Tag,false);
            System.out.println("处理完成");
        }catch (Exception e){
            channel.basicReject(Tag,false);
        }
    }

这样过了5s后我们就可以从死信队列中获取到延迟消息了

那我们再来通过设置消息的ttl来看一下

首先我们要把队列的ttl给取消掉,记得要删队列

 @Bean("ttlExchange")
    public Exchange ttlExchange(){
        return ExchangeBuilder.directExchange(Constants.TTL_EXCHANGE).durable(true).build();
    }
    @Bean("ttlQueue")
    public Queue ttlQueue(){
        return QueueBuilder.durable(Constants.TTL_QUEUE)
                .deadLetterExchange(Constants.DEAD_EXCHANGE)
                .deadLetterRoutingKey("dead")
                .build();
    }
    @Bean("ttlBind")
    public Binding ttlBind(@Qualifier("ttlExchange") Exchange ackExchange,@Qualifier("ttlQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("ttl").noargs();
    }
    @Bean("deadExchange")
    public Exchange deadExchange(){
        return ExchangeBuilder.directExchange(Constants.DEAD_EXCHANGE).durable(true).build();
    }
    @Bean("deadQueue")
    public Queue deadQueue(){
        return QueueBuilder.durable(Constants.DEAD_QUEUE).build();
    }
    @Bean("deadBind")
    public Binding deadBind(@Qualifier("deadExchange") Exchange ackExchange,@Qualifier("deadQueue") Queue queue){
        return BindingBuilder.bind(queue).to(ackExchange).with("dead").noargs();
    }

  然后我们发送一个ttl时间为10s的,再发送一个5s的,我们知道这样两条数据是会发生错误的,因为我们设置消息过期时间,我们RabbitMQ(性能问题)并不会遍历整个消息队列看看谁过没过期,如果过期的消息不在队头,那么只有当使用的时候,才会真正的进行一些过期处理,比如传给死信交换机 

@RequestMapping("ttl")
    public String TTLPro(){
        String s1="ttl test";
        Message message=new Message(s1.getBytes(StandardCharsets.UTF_8));
        message.getMessageProperties().setExpiration("10000");
        RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);
        message.getMessageProperties().setExpiration("5000");
        RabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"ttl",message);
        return "发送成功";
    }

如果正常,会在5s后接收到第一个消息,在10s后接收到第二个消息,但是此时我们会同一时间(10s)接收到两条消息

  那这个问题在上一篇ttl的时候就说过了,这里依然是个问题,虽然设置队列的ttl不会有这个问题,但是设置队列ttl我们针对不同延迟时间就需要创建多个队列,这是不太合理的,所以针对这个问题,我们有一个延迟队列的插件可以使用

 3.延迟队列插件

延迟队列插件,会给我们提供一个特殊的交换机,来完成我们的延迟功能

这是我们插件的下载地址

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  我们需要找到ez文件并下载,但是注意这里的版本要与你的RabbitMQ版本可以匹配,否则之后会出现问题

那插件下完后,我们要找到对应目录,下载插件

上面两个目录,我们可以任选一个下载即可,没有这个目录,我们手动创建

然后把下载的ez文件,copy到这个目录中即可,然后我们可以使用命令 rabbitmq-plugins list 来查看插件列表,看看我们有没有成功放进去,但是注意,即使我们成功放进去并成功显示了,也可能会出错,这就可能是你们下载的RabbitMQ版本与整个延迟插件的版本不匹配,重新下载其他版本即可

然后我们启动插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange

之后重启服务service rabbitmq-server restart

在没有发生错误的情况下,我们就发现我们会多了一个默认的交换机

此时我们代码中就不需要声明普通交换机了而是直接使用默认交换机即可

我们生产者代码是需要改一下的,我们需要调用一个方法来设置延迟时间

@RequestMapping("/delay2")
public String delay2() {
 //发送带ttl的消息 
 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
"delayed test 20s..."+new Date(), messagePostProcessor -> {
 messagePostProcessor.getMessageProperties().setDelayLong(20000L); 
 return messagePostProcessor;
 });
 rabbitTemplate.convertAndSend(Constant.DELAYED_EXCHANGE_NAME, "delayed", 
"delayed test 10s..."+new Date(), messagePostProcessor -> {
 messagePostProcessor.getMessageProperties().setDelayLong(10000L); //设置延迟时间 
 return messagePostProcessor;
 });
 return "发送成功!";
}

此时我们就可以在10s正确接收一个消息,在20s正确接收另一个消息 

  注意我们使用TTL+死信时消息传递给交换机后映射之后一直在正常队列中的,等待TTL时间到了把消息给死信交换机再映射到死信队列再拿到消息,我们使用插件的时候,消息是在RabbitMQ给我们提供的那个特殊的交换机中的,等待时间到了,再映射给队列,然后从队列中拿消息

4.常见面试题

介绍下延迟队列

我们可以这样回答:

 延迟队列是一个特殊的队列,消息发送后,消费者并不会立刻拿到,而是等待一定延迟时间后才发送给消费者进行消费

 并且延迟队列的应用场景很多,比如订单支付,智能家电,以及定时邮箱

 但是延迟队列在RabbitMQ中并没有直接给我们提供,我们可以通过TTL+死信的方式或者使用延迟插件的方式来实现延迟功能

 两者的区别:

1.通过TTL+死信

 优点:比较灵活,不需要我们额外引入插件

 缺点:我们设置消息TTL的时候可能会出现顺序的问题,而且我们需要多创建死信队列和死信交换机,完成一些绑定,增加了系统的复杂性

2.基于插件实现的延迟队列

 优点:通过插件能够简化延迟消息的实现,并且避免了时序问题

 缺点:需要依赖插件,不同版本RabbitMQ需要不同版本插件,有运维工作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2280450.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

后端面试题分享第一弹(状态码、进程线程、TCPUDP)

后端面试题分享第一弹 1. 如何查看状态码,状态码含义 在Web开发和调试过程中,HTTP状态码是了解请求处理情况的重要工具。 查看状态码的步骤 打开开发者工具: 在大多数浏览器中,您可以通过按下 F12 键或右键单击页面并选择“检查…

网络通信---MCU移植LWIP

使用的MCU型号为STM32F429IGT6,PHY为LAN7820A 目标是通过MCU的ETH给LWIP提供输入输出从而实现基本的Ping应答 OK废话不多说我们直接开始 下载源码 LWIP包源码:lwip源码 -在这里下载 ST官方支持的ETH包:ST-ETH支持包 这里下载 创建工程 …

CSS笔记基础篇02——浮动、标准流、定位、CSS精灵、字体图标

黑马程序员视频地址: 前端Web开发HTML5CSS3移动web视频教程https://www.bilibili.com/video/BV1kM4y127Li?vd_source0a2d366696f87e241adc64419bf12cab&spm_id_from333.788.videopod.episodes&p70https://www.bilibili.com/video/BV1kM4y127Li?vd_source…

Linux:进程(三)

1. 进程创建补充 fork之后父子两个执行流分别执行,fork之后谁谁先执行由调度器来决定。 一般,父子代码共享。当父子不再写入时,数据也是共享的,但是当有一方要写入,就触发写时拷贝。 fork调用失败的原因 1. 系统中有…

2025年1月21日刷题记录

1.leetcode1768题目 链接:1768. 交替合并字符串 - 力扣(LeetCode) 代码: class Solution { public:string mergeAlternately(string word1, string word2) {string word3;int a word1.size(), b word2.size();int i 0, j 0…

Mysql触发器(学习自用)

一、介绍 二、触发器语法 注意:拿取新的数据时用new,旧数据用old。

wireshark工具简介

目录 1 wireshark介绍 2 wireshark抓包流程 2.1 选择网卡 2.2 停止抓包 2.3 保存数据 3 wireshark过滤器设置 3.1 显示过滤器的设置 3.2 抓包过滤器 4 wireshark的封包列表与封包详情 4.1 封包列表 4.2 封包详情 参考文献 1 wireshark介绍 wireshark是非常流行的网络…

「2024·我的成长之路」:年终反思与展望

文章目录 1. 前言2.创作历程2.1 摆烂期2.2 转变期3. 上升期 2. 个人收获3.经验分享4. 展望未来 1. 前言 2025年1月16日,2024年博客之星入围公布,很荣幸获得了这次入围的机会。2024年对我个人是里程碑的一年,是意义非凡的一年,是充…

【RAG落地利器】向量数据库Chroma入门教程

安装部署 官方有pip安装的方式,为了落地使用,我们还是采用Docker部署的方式,参考链接来自官方部署: https://cookbook.chromadb.dev/running/running-chroma/#docker-compose-cloned-repo 我们在命令终端运行: docker run -d --…

电阻电位器可调电阻信号隔离变送器典型应用

电阻电位器可调电阻信号隔离变送器典型应用 产品描述: 深圳鑫永硕科技的XYS-5587系列是一进一出线性电子尺(电阻/电位计信号及位移)信号隔离变送器,是将输入电阻,线性电子尺,角度位移传感器信号进行采集,隔离,放大并转换成模拟量信号的小型仪表设备,并以…

[创业之路-259]:《向流程设计要效率》-1-让成功成熟业务交给流程进行复制, 把创新产品新业务新客户交给精英和牛人进行探索与创造

标题:成功与创新的双轨并行:以流程复制成熟,以精英驱动新知 在当今这个日新月异的商业环境中,企业要想持续繁荣发展,就必须在稳定与创新之间找到完美的平衡点。一方面,成熟业务的稳定运营是企业生存和发展的…

模拟飞行入坑(五) P3D 多通道视角配置 viewgroup

背景: P3D进行多个屏幕显示的时候,如果使用英伟达自带的屏幕融合成一个屏,或者使用P3D单独拉伸窗口,会使得P3D的画面被整体拉伸,又或者,当使用Multichannel进行多个设备联动时,视角同步组合需要配置&#…

Java中的错误与异常详解

Java中的错误与异常详解 Java提供了一种机制来捕获和处理程序中的异常和错误。异常和错误都继承自 Throwable 类,但它们有着不同的用途和处理方式。 1. Error(错误) Error 是程序无法处理的严重问题,通常由 JVM(Java…

免费开源的三维建模软件Blender

软件介绍 Blender是一款功能强大且免费开源的三维建模、动画制作和渲染软件,广泛应用于影视制作、游戏开发、建筑可视化、教育及艺术创作等多个领域。 核心功能 Blender是一款全能型3D软件,涵盖了从建模、动画到渲染、后期合成的完整工作流程。 1、建…

ElasticSearch DSL查询之排序和分页

一、排序功能 1. 默认排序 在 Elasticsearch 中,默认情况下,查询结果是根据 相关度 评分(score)进行排序的。我们之前已经了解过,相关度评分是通过 Elasticsearch 根据查询条件与文档内容的匹配程度自动计算得出的。…

iOS 网络请求: Alamofire 结合 ObjectMapper 实现自动解析

引言 在 iOS 开发中,网络请求是常见且致其重要的功能之一。从获取资料到上传数据,出色的网络请求框架能夠大大提升开发效率。 Alamofire 是一个极具人气的 Swift 网络请求框架,提供了便据的 API 以完成网络请求和响应处理。它支持多种请求类…

面向对象编程——对象实例化

在python中,对象实例化是根据类的定义创建具体对象的过程。也就是将类当成模板,从而定义了对象的结构和行为,而实例化则是根据这个模板创建具体的对象实例。每个实例都有自己独立的状态,但是却共享类的结构和方法。 代码&#xff…

阿里云-银行核心系统转型之业务建模与技术建模

业务领域建模包括业务建模和技术建模,整体建模流程图如下: 业务建模包括业务流程建模和业务对象建模 业务流程建模:通过对业务流程现状分析,结合目标核心系统建设能力要求,参考行业建 模成果,形成结构化的…

Unreal Engine 5 C++ Advanced Action RPG 九章笔记

第九章 Hero Special Abilities 2-Challenges Ahead(前方的挑战) 本次章节主要解决三件问题 怒气能力特殊武器能力治疗石怒气能力 对于这个能力我们需要处理它的激活和持械状态,当没有怒气时应该取消该能力当这个能力激活时,我希望角色是进入无敌状态的,不会受到伤害怒气状…

cursor重构谷粒商城05——docker容器化技术快速入门【番外篇】

前言:这个系列将使用最前沿的cursor作为辅助编程工具,来快速开发一些基础的编程项目。目的是为了在真实项目中,帮助初级程序员快速进阶,以最快的速度,效率,快速进阶到中高阶程序员。 本项目将基于谷粒商城…