七、延时队列

news2025/1/20 3:46:13

1、延时队列的概念

队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素希望在指定时间到了以后被取出处理

延时队列就是用来存放需要在指定时间被处理的元素的队列

2、延时队列使用的场景

订单在十分钟之内未支付则自动取消
在这里插入图片描述
在这里插入图片描述

3、RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间

如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用

3.1 消息设置TTL

发送消息时指定消息的过期属性:
在这里插入图片描述

3.2 队列设置TTL属性

在创建队列的时候设置队列的“x-message-ttl”属性
在这里插入图片描述

3.3 两者之间的区别

  • 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)
  • 如果设置了消息的TTL属性,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;

前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息

4、整合SpringBoot

1)添加依赖

<dependencies>
        <!--RabbitMQ依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

2) 添加最基本的 RabbitMQ 的配置

spring:
  rabbitmq:
    host: 192.168.126.10
    port: 5672
    username: admin
    password: 123

5、队列TTL实现延时队列

5.1 代码架构图

在这里插入图片描述
队列QA : 队列TTL属性设置为10s
队列QB:队列TTL属性设置为40s
正常交换机 X
死信交换机 Y
死信队列 QD

5.2 配置文件

创建交换机、队列、绑定关系等

/**
 * @author houChen
 * @date 2022/11/13 22:58
 * @Description: 配置交换机
 */
@Configuration
public class TtlQueueConfig {

    private static final String X_EXCHANGE = "X";
    private static final String QA_QUEUE = "QA";
    private static final String QB_QUEUE = "QB";

    //死信交换机
    private static final String DEAD_EXCHANGE = "Y";
    private static final String DEAD_QUEUE = "QD";

    //创建普通交换机
    @Bean
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    //创建死信交换机
    @Bean
    public DirectExchange yExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }

    //创建队列QA, ttl为10秒,绑定到对应的死信交互机
    @Bean
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>();
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //声明当前队列的死信路由键
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QA_QUEUE).withArguments(args).build();
    }

    //队列QA绑定 X交换机
    @Bean
    public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //创建队列QB, ttl为40秒,绑定到对应的死信交互机
    @Bean
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>();
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //声明当前队列的死信路由键
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QB_QUEUE).withArguments(args).build();
    }

    //队列QB绑定 X交换机
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //声明死信队列 QD
    @Bean
    public Queue queueD() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    //死信队列和死信交换机进行绑定
    @Bean
    public Binding queuedBindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
 }

5.3 消息生产者

/**
 * @author houChen
 * @date 2022/11/14 22:39
 * @Description: 消息生产者代码
 */
@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条消息给两个TTL队列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);
    }

}

5.4 消费者代码

/**
 * @author houChen
 * @date 2022/11/14 23:34
 * @Description: 消费者代码
 */
@Slf4j
@Component
public class DelayQueueConsumer {

    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws UnsupportedEncodingException {
        String msg = new String(message.getBody(), "utf-8");
        log.info("当前时间:{},收到死信队列消息{}", new Date(), msg);
    }
}

5.5 测试

发送请求 :http://localhost:8080/ttl/sendMessage/aaaa
在这里插入图片描述
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了

缺点
使用队列的 x-message-ttl 属性的话,每增加一个新的时间需求,就需要新增一个队列,不太好扩展

6、延时队列优化

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/397887.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SAP Cloud Platform会抛弃ABAP吗

很早之前自己写的文章&#xff0c;重新发布一下。 别担心&#xff0c;该来的总会来&#xff0c;该走的也留不住&#xff01; - 剧情概要 - SAP Cloud Platform发布已经有一段时间了&#xff0c;自SAP云平台发布以来&#xff0c;很多人担心的一个问题是&#xff1a;在SAP云平…

【测试】HD-G2L-IO评估板测试结果表

1. 测试对象HD-G2L-IOT基于HD-G2L-CORE V2.0工业级核心板设计&#xff0c;双路千兆网口、双路CAN-bus、2路RS-232、2路RS-485、DSI、LCD、4G/5G、WiFi、CSI摄像头接口等&#xff0c;接口丰富&#xff0c;适用于工业现场应用需求&#xff0c;亦方便用户评估核心板及CPU的性能。H…

数据分析师常见问题(1)

1&#xff09;.sql三种排序的区别 2&#xff09;.几种连接方式 3&#xff09;.union和union all的区别 4) .drop和delete的区别 5&#xff09;.有关机器学习random forest 和xgboost的区别 6) .SVM原理 SVM是在特征空间上找到最佳的分离超平面&#xff0c;使得训练集上的正负样…

Win10使用ssh root用户登录centos7主机

1 、用SSH root用户登录Centos主机&#xff1b; 2 、检查centos是否装了epel库 执行命令 rpm -qa|grep epel 没有&#xff0c;需要安装 yum install epel-release 3 、安装xrdp yum install xrdp 4 、安装tigervnc-server yum install tigervnc-server 5 、为用户root…

如何通过SWTO分析法,加强项目风险管理?

1、什么是SWTO分析法 SWTO分析法是态势分析法&#xff0c;是根据企业自身的既定内在条件&#xff0c;对其优势、劣势、外部机会和危险进行分析&#xff0c;依照矩阵形式排列&#xff0c;将各种因素相互匹配分析的企业战略分析方法。 通过SWTO分析法 加强项目风险管理​ …

数学小课堂:数学和哲学的互动关系(自洽的哲学思想受益于数学思维)

文章目录 引言I 数学是“有底”的学问(止于公理)II 数学对哲学的影响2.1 哲学思想受益于数学思维2.2 笛卡尔的贡献2.3 莱布尼茨的哲学思想III 哲学对数学的影响引言 数学和科学各个分支之间在方法上却具有相通性和普适性,这些通用的方法常常让很多学科同时受益,依靠数学逻…

Maven 创建项目

在我们 maven 项目中的结构为 src/main/java —— 存放项目的.java 文件 src/main/resources —— 存放项目资源文件&#xff0c;如 spring, hibernate 配置文件 src/test/java —— 存放所有单元测试ava 文件&#xff0c;如 JUnit 测试类 src/test/resources —— 测试资源文件…

跑步用入耳的好还是挂耳的、最好用的运动耳机分享

健身房经常会播放一些节奏较快的歌曲&#xff0c;这样能够激发大家在运动过程中的动力&#xff0c;所以运动时聆听音乐确实比较有效果&#xff0c;居家运动、室外跑步时选择运动耳机就变成了刚需&#xff0c;不过一款适合自己的运动耳机确实是比较难找的&#xff0c;首先不能影…

算法刷题总结 (四) 动态规划

算法总结4 动态规划一、动态规划1.1、基础问题11.1.1、509. 斐波那契数列1.1.2、70. 爬楼梯1.1.3、746. 使用最小花费爬楼梯1.2、基础问题21.2.1、62. 不同路径1.2.2、63. 不同路径Ⅱ1.2.3、343. 整数拆分1.2.4、96. 不同的二叉搜索树1.3、背包问题1.3.1、01背包1.3.1.1、单次选…

现代卷积神经网络之稠密连接网络(DenseNet),并对CFIAR10训练

专栏&#xff1a;神经网络复现目录 本章介绍的是现代神经网络的结构和复现&#xff0c;包括深度卷积神经网络&#xff08;AlexNet&#xff09;&#xff0c;VGG&#xff0c;NiN&#xff0c;GoogleNet&#xff0c;残差网络&#xff08;ResNet&#xff09;&#xff0c;稠密连接网络…

pikachu靶场CSRF之TOKEN绕过

简介 Pikachu靶场中的CSRF漏洞环节里面有一关CSRF TOKEN&#xff0c;这个关卡和其余关卡稍微有点不一样&#xff0c;因为表单里面存在一个刷新就会变化的token&#xff0c;那么这个token是否能绕过呢&#xff1f;接下来我们来仔细分析分析 实战过程 简单尝试 先利用任意一个…

CNCF x Alibaba云原生技术公开课 第三章 kubernetes核心概念

1、Kubernetes概念 核心功能 服务的发现与负载的均衡容器的自动装箱&#xff0c;我们也会把它叫做 scheduling&#xff0c;就是“调度”&#xff0c;把一个容器放到一个集群的某一个机器上Kubernetes 会帮助我们去做存储的编排&#xff0c;让存储的声明周期与容器的生命周期能…

SpringCloud-高级篇(一)

目录&#xff1a; &#xff08;1&#xff09;初识Sentinel-雪崩问题的解决方案 &#xff08;2&#xff09;服务保护Sentinel和Hystrix对比 &#xff08;3&#xff09;Sentinel初始-安转控制台 &#xff08;4&#xff09;整合微服务和Sentinel 微服务高级篇 &#xff08;1&…

unity开发知识点小结04

混合动画 在动画器控制器中创建从新混合树&#xff0c;也就是创建混合动画 然后进入混合动画&#xff0c;选择混合类型为1D&#xff08;表示传递参数只有一个&#xff09;&#xff0c;并且为此混合状态添加两个动画&#xff0c;并且设定混合状态参数为何值得时候启用相应动画…

Python中函数的分类、创建和调用,你真的懂了吗

文章目录前言一、函数分类二、创建函数三、调用函数前言 在前面的博客中&#xff0c;所有编写的代码都是从上到下依次执行的&#xff0c;如果某段代码需要多次使用&#xff0c;那么需要将该段代码复制多次&#xff0c;这种做法势必会影响开发效率&#xff0c;在实际项目开发中是…

特权级那些事儿-实模式下分段机制首次出现的原因

前言&#xff1a; 操作系统的特权级模块在整个操作系统的学习中应该算的上是最难啃的了&#xff0c;提到特权级就要绕不开保护模式下的分段机制&#xff1b;如果想要彻底弄明白就要对比实模式下的分段机制有什么缺陷。这就衍生出很多问题如&#xff1a;什么是实模式&#xff1f…

Nacos 注册中心核心能力以及现实原理解析

Nacos注册中心主要分两方面解析&#xff1a;动态服务发现和Nacos实现动态服务发现的原理&#xff1b; 动态服务发现 服务发现是指使用一个注册中心来记录分布式系统中的全部服务的信息&#xff0c;以便其他服务能够快速的找到这些已注册的服务。 在单体应用中&#xff0c;DNS…

MINE: Towards Continuous Depth MPI with NeRF for Novel View Synthesis

MINE: Towards Continuous Depth MPI with NeRF for Novel View Synthesis&#xff1a;利用NeRF实现新视图合成的连续深度MPI 摘要&#xff1a;在论文中&#xff0c;提出了MINE&#xff0c;通过从单个图像进行密集3D重建来执行新的视图合成和深度估计。通过引入神经辐射场&…

05-Oracle中的对象(视图,索引,同义词,系列)

本章主要内容&#xff1a; 1.视图管理&#xff1a;视图新增&#xff0c;修改&#xff0c;删除&#xff1b; 2.索引管理&#xff1a;索引目的&#xff0c;创建&#xff0c;修改&#xff0c;删除&#xff1b; 3.同义词管理&#xff1a;同义词的作用&#xff0c;创建&#xff0…

如何通过websoket实现即时通讯+断线重连?

本篇博客只是一个demo&#xff0c;具体应用还要结合项目实际情况&#xff0c;以下是目录结构&#xff1a; 1.首先通过express搭建一个本地服务器 npm install express 2.在serve.js中自定义测试数据 const express require(express); const app express(); const http req…