7. TTL 延迟队列

news2025/1/15 17:43:14

二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。

创建两个队列 QA和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X和死信交
换机 Y,它们的类型都是 direct,创建一个死信队列QD,它们的绑定关系如下:

image.png

死信队列

ttl 配置 TtlConfig

package com.yjl.amqp.config.ttl;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * 列信队列配置
 *
 * @author yuejianli
 * @date 2022-11-23
 */
@Component
public class TtlConfig {
    @Value("${rabbit.ttl.queue_a}")
    private String queuea;
    @Value("${rabbit.ttl.queue_b}")
    private String queueb;

    @Value("${rabbit.ttl.x_exchange}")
    private String xexchange;


    @Value("${rabbit.ttl.y_dead_queue_d}")
    private String deadQueued;

    @Value("${rabbit.ttl.y_dead_exchange}")
    private String ydeadExchange;


    @Value("${rabbit.ttl.queue_c}")
    private String queuec;


    // 定义队列和 交换机,并进行绑定


    @Bean(value = "direct_exchange")
    DirectExchange directExchange() {
        return new DirectExchange(xexchange);
    }


    @Bean(value = "direct_queuea")
    public Queue queuea() {
        //return new Queue(queuea);

        Map<String, Object> args = new HashMap<>();

        args.put("x-dead-letter-exchange", ydeadExchange);

        args.put("x-dead-letter-routing-key", "YD");

        // 声明 ttl  10s
        args.put("x-message-ttl", 10000);

        return QueueBuilder.durable(queuea).withArguments(args).build();
    }


    //进行绑定
    @Bean
    Binding bindingDirectExchangea(@Qualifier("direct_queuea") Queue queue,
                                   @Qualifier("direct_exchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("XA");
    }


    @Bean(value = "direct_queueb")
    public Queue queueb() {
        // return new Queue(queueb);

        Map<String, Object> args = new HashMap<>();

        args.put("x-dead-letter-exchange", ydeadExchange);

        args.put("x-dead-letter-routing-key", "YD");

        // 声明 ttl  40s
        args.put("x-message-ttl", 40000);

        return QueueBuilder.durable(queueb).withArguments(args).build();
    }


    @Bean
    Binding bindingDirectExchangeb(@Qualifier("direct_queueb") Queue queue,
                                   @Qualifier("direct_exchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("XB");
    }


    @Bean(value = "direct_dead_exchange")
    DirectExchange directDeadExchange() {
        return new DirectExchange(ydeadExchange);
    }


    @Bean(value = "direct_dead_queued")
    public Queue deadQueued() {
        return new Queue(deadQueued);
    }

    // 进行绑定
    @Bean
    Binding bindingDeadDirectExchange(@Qualifier("direct_dead_queued") Queue queue,
                                      @Qualifier("direct_dead_exchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("YD");
    }
}

死信消费者 TtlConsumer

@Component
@Slf4j
public class TtlConsumer {

    /**
     * 死信队列
     */
    @RabbitListener(queues = {"${rabbit.ttl.y_dead_queue_d}"})
    public void deadQueueListener(String message) {
        log.info("死信队列获取消息:" + message);
        // 执行具体的业务操作
    }
}

生产者 TtlProduer

@RestController
@Slf4j
public class TtlProduer {

    @Value("${rabbit.ttl.x_exchange}")
    private String xexchange;


    @Value("${rabbit.ttl.delayed_exchange}")
    private String deadExchange;

    @Value("${rabbit.ttl.delayed_routing_key}")
    private String deadRoutingKey;


    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public String sendMessage(@PathVariable("message") String message) {
        log.info("接收到消息:{}", message);
        // 发送到 A 队列
        rabbitTemplate.convertAndSend(xexchange, "XA", "消息来自ttl 10s 的队列消息:" + message);
        //发送到B队列
        rabbitTemplate.convertAndSend(xexchange, "XB", "消息来自ttl 40s 的队列消息:" + message);
        return message;
    }
}

验证

输入网址, 发送消息

http://localhost:8088/Server/sendMessage/hello,RabbitMQ

image.png

死信队列 A 10s 接到消息, 队列B 40s 后接收到消息。

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,
然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S
两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

延时队列优化

在这里新增了一个队列QC,绑定关系如下,该队列不设置 TTL 时间

image.png

ttl 配置QC 队列 TtlConfig

    // 声明一个队列 C, 没有过期时间的。


    @Bean(value = "direct_queuec")
    public Queue queuec() {
        // return new Queue(queueb);

        Map<String, Object> args = new HashMap<>();

        args.put("x-dead-letter-exchange", ydeadExchange);

        args.put("x-dead-letter-routing-key", "YD");


        return QueueBuilder.durable(queuec).withArguments(args).build();
    }


    @Bean
    Binding bindingDirectExchangec(@Qualifier("direct_queuec") Queue queue,
                                   @Qualifier("direct_exchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("XC");
    }

死信消费者 TtlConsumer 保持不变

生产者 TtlProducer

时间由前端传入, 设置时间往 C 队列里面放置。

    @GetMapping("/sendMessage/{message}/{ttl}")
    public String sendMessageWithTtl(@PathVariable("message") String message, @PathVariable("ttl") String ttl) {
        log.info("接收到消息:{}, 发送的时长 是{} ms", message, ttl);
        //
//        // 发送到 A 队列
//        rabbitTemplate.convertAndSend(xexchange,"XA","消息来自ttl 10s 的队列消息:"+message);
//        //发送到B队列
//        rabbitTemplate.convertAndSend(xexchange,"XB","消息来自ttl 40s 的队列消息:"+message);


        // 发送给 C 队列
        rabbitTemplate.convertAndSend(xexchange, "XC", message, correlationData -> {
            correlationData.getMessageProperties().setExpiration(ttl);
            return correlationData;
        });
        return message;
    }

验证

发送消息,携带着时间:

http://localhost:8088/Server/sendMessage/sendA5s/5000

http://localhost:8088/Server/sendMessage/sendB20s/20000

image.png

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消
息可能并不会按时“死亡“,因为 RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

如,先执行发送 20s 的,再执行发送 5s的

http://localhost:8088/Server/sendMessage/sendB20s/20000
http://localhost:8088/Server/sendMessage/sendA5s/5000

image.png

消息消费时有顺序了。

Rabbitmq 插件实现延迟队列

上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间
及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题

在官网上下载 https://www.rabbitmq.com/community-plugins.html,
下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ的插件目录。
进入 RabbitMQ的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

image.png

我本地可能是由于版本的原因,一直不成功

D:\job\RabbitMQ Server\rabbitmq_server-3.10.11\sbin>rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange

image.png

image.png

虽然有这个 x-delayed-message

架构优化

image.png

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并
不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中

配置 DelayedQueueConfig

package com.yjl.amqp.config.ttl;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * 延迟队列处理
 *
 * @author yuejianli
 * @date 2022-11-23
 */
@Component
public class DelayedQueueConfig {
    @Value("${rabbit.ttl.delayed_queue}")
    private String deadQueued;

    @Value("${rabbit.ttl.delayed_exchange}")
    private String deadExchange;

    @Value("${rabbit.ttl.delayed_routing_key}")
    private String deadRoutingKey;


    // 自定义交换机, 定义延迟交换机
    @Bean("delayedExchange")
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        // 自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(deadExchange, "x-delayed-message", true, false, args);
    }

    @Bean("delayedQueue")
    public Queue delayedQueue() {
        return new Queue(deadQueued);
    }


    //进行绑定
    @Bean
    Binding bindingDelayedExchange(@Qualifier("delayedQueue") Queue queue,
                                   @Qualifier("delayedExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(deadRoutingKey).noargs();
    }
}

延迟消费者扩展 TtlConsumer

    /**
     * 延迟队列
     */
    @RabbitListener(queues = {"${rabbit.ttl.delayed_queue}"})
    public void delayedQueueListener(String message) {
        log.info("延迟队列获取消息:" + message);
        // 执行具体的业务操作
    }

生产者 TtlProducer

 @GetMapping("/sendDelayMessage/{message}/{ttl}")
    public String sendDelayMessageWithTtl(@PathVariable("message") String message, @PathVariable("ttl") String ttl) {
        log.info("接收到消息:{}, 发送的时长 是{} ms", message, ttl);

        // 发送给 C 队列
        rabbitTemplate.convertAndSend(deadExchange, deadRoutingKey, message, correlationData -> {
            correlationData.getMessageProperties().setExpiration(ttl);
            return correlationData;
        });
        return message;
    }

验证

http://localhost:8088/Server/sendDelayMessage/sendB20s/20000
http://localhost:8088/Server/sendDelayMessage/sendA5s/5000

我本地是不行, 有发现问题的,可以告诉我。
image.png

想要的效果是这样:

image.png

第二个消息被先消费掉了,符合预期

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ来实现延时队列可以很好的利用
RabbitMQ的特性,如:
消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。
另外,通过 RabbitMQ集群的特性,可以很好的解决单点故障问题,
不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,
比如利用 Java 的DelayQueue,利用 Redis 的 zset,
利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/59812.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot_项目打包部署

SpringBoot项目可以是jar类型的maven项目&#xff0c;也可以是一个war类型的maven项目&#xff0c;取决于我们要不要整合jsp使用。但是不管是哪种项目类型&#xff0c;已经不是我们传统意义上的项目结构了 在本地使用SpringBoot的启动器即可访问我们开发的项目。如果我们将项目…

尝试 vue 实现 SEO

背景: 官网使用 VUE 写的, 且 使用 <component /> 动态创建组件, 通过 手动配置的组件, 动态生成页面内容 然后收到通知, 需要实现 SEO , 于是就开始了 VUE SEO 的拉锯战..... 第一种尝试 VUEphantomjs 首先说下原理 phantomjs 是可以部署在服务端的 无头浏览器,…

最强大脑记忆曲线(12)-- 录入数据修改

录入数据修改一、设计思路二、解决过程2.1 设计修改窗口2.2 转成py文件2.3 写业务逻辑1、先显示一下基础页面2、配合适配器&#xff0c;自动调整窗口大小3、在数据录入窗口或背记窗口双击某条记录3.1 增加信号3.2 在槽函数中打开修改页面**3.3 两个页面之间传递信号**3.4 在子窗…

24点问题(带输出构造方式)

问题描述&#xff1a; 在屏幕上输入1〜10范围内的4个整数&#xff08;可以有重复&#xff09;&#xff0c;对它们进行加、减、乘、除四则运算后&#xff08;可以任意的加括号限定计算的优先级&#xff09;&#xff0c;寻找计算结果等于24的表达式。 例如输入4个整数4、5、6、7…

1. RabbitMq 的基本概念

参考使用: 尚硅谷 消息中间件 RabbitMQ 课件 MQ 的概念 什么是 MQ MQ(message queue)&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO 先入先出&#xff0c;只不过队列中存放的内容是 message 而已&#xff0c;还是一种跨进程的通信机制&#xff0c;用…

[附源码]计算机毕业设计毕业生就业管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Minecraft 1.19.2 Forge模组开发 05.矿石生成

我们本次尝试在主世界生成模组中自定义的矿石 1.由于1.19的版本出现了深板岩层的矿石&#xff0c;我们要在BlockInit类中声明一个矿石的两种岩层形态&#xff1a; BlockInit.java package com.joy187.re8joymod.init;import com.joy187.re8joymod.Main; import net.minecraf…

微服务框架 SpringCloud微服务架构 10 使用Docker 10.8 数据卷挂载案例1

微服务框架 【SpringCloudRabbitMQDockerRedis搜索分布式&#xff0c;系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 SpringCloud微服务架构 文章目录微服务框架SpringCloud微服务架构10 使用Docker10.8 数据卷挂载案例110.8.1 挂载数据卷10.8.2 案例10.8.3 总…

Pr:导出设置之高级设置及 VR 视频

视频 VIDEO设置因所选导出格式而异。每种格式都有独特的要求&#xff0c;这些要求决定了哪些设置可用。以导出文件格式为 H.264 为例&#xff0c;下面给出有关高级设置 Advanced Settings以及 VR 视频 VR Video的选项及说明。高级设置 Advanced Settings关键帧距离Key Frame Di…

期末复习-软件体系结构

软件体系结构一、软件重用与构件技术软件重用的定义重用驱动的软件的开发过程构件的三种描述模型三种构件分类方法的组织方式&#xff0c;检索方式&#xff0c;刻面分类法二、软件体系结构概述软件体系结构 构件 连接件 约束软件体系结构的四个发展阶段三、软件体系结构风格…

塔望3W消费战略全案丨牛小范低脂即食肉蛋白 行走的米其林牛排

牛小范 客户&#xff1a;山东如康集团 品牌&#xff1a;牛小范 服务&#xff1a;3W消费战略 品牌全案 项目背景 山东如康集团是一家集牛羊肉生产、加工与销售等为一体的大型综合性集团企业&#xff0c;是山东省级“专精特新”企业、农业产业化市级龙头企业和山东省"十三…

Linux命令:scp

目录 简介 一、语法 二、示例 2.1 将本地文件复制到远程主机目录 2.2 将本地目录复制到远程主机目录 2.3 将远程主机的文件复制到本机 2.4 复制远程主机目录到本机 简介 今天我们来介绍一个Linux命令&#xff1a;scp scp — secure copy (remote file copy program)&am…

Vue实现手机端界面的购物车案例

目录 前期准备 Header Goods Footer Counter 今天用Vue实现的一个手机端的购物车案例&#xff0c;着重阐述组件化思想的优势&#xff0c;将页面的内容分成各个模块进行书写&#xff0c;然后数据传输&#xff0c;父传子、子传父、兄弟数据共享等&#xff0c;这方面知识不牢…

[附源码]计算机毕业设计springboot游戏商城平台论文

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

PHP 经纬度坐标相关计算方法

1. 前言 2. 计算经纬度坐标间的距离 3. 根据经纬度坐标距离排序 4. 经纬度范围查询 1. 前言 想要测试本文提供的几个功能函数&#xff0c;可以使用下面这个数据表结构及其数据 CREATE TABLE user ( id int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT 用户id, name v…

html静态网站基于游戏网站设计与实现共计10个页面 (仿地下城与勇士游戏网页)

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

UDS(83服务-AccessTimingParameter)

诊断协议那些事儿 诊断协议那些事儿专栏系列文章,本文介绍诊断和通讯管理功能单元下的83服务AccessTimingParameter,该服务的目的是读取/修改有效通信的计时参数。 文章目录 诊断协议那些事儿一、83服务-AccessTimingParameter二、请求格式子功能参数定义-timingParameterA…

Java并发编程—死锁

文章目录死锁什么叫做加锁&#xff1f;死锁代码理解&#xff1a;如何避免死锁&#xff1f;资源限制的挑战什么是资源限制&#xff1f;资源限制引发的问题&#xff1f;如何解决资源限制的问题&#xff1f;在资源限制情况下进行并发编程————————————————————…

物联网 (IoT) 为何如此重要?哪些技术让物联网成为了可能?

随着社会的进步和科技的发展&#xff0c;定位技术在技术手段、定位精度、可用性等方面均取得质的飞越&#xff0c;并且逐步从航海、航天、航空、测绘、军事、自然灾害预防等“高大上”的领域逐步渗透社会生活的方方面面&#xff0c;成为人们日常中不可或缺的重要应用——比如人…

[附源码]计算机毕业设计基于SpringBoot的剧本杀管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…