RabbitMQ 之 延迟队列

news2024/11/17 16:44:51

目录

​编辑一、延迟队列概念

二、延迟队列使用场景

三、整合 SpringBoot

1、创建项目

2、添加依赖

3、修改配置文件

4、添加 Swagger 配置类

四、队列 TTL

1、代码架构图

2、配置文件代码类

3、生产者

4、消费者

5、结果展示

五、延时队列优化

1、代码架构图

2、配置文件

3、生产者​编辑

4、结果展示

六、RabbitMQ 插件实现延迟队列

1、安装延时队列插件

2、代码架构图

 3、配置文件类代码

4、生产者

5、消费者

6、结果展示


一、延迟队列概念

延时队列 , 队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望
在指定时间到了以后或之前取出和处理,简单来说, 延时队列就是用来存放需要在指定时间被处理的元素的队列。

二、延迟队列使用场景

1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:
发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。

但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。


三、整合 SpringBoot

1、创建项目

2、添加依赖

<dependencies>
 <!--RabbitMQ 依赖-->
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.47</version>
 </dependency>
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 </dependency>
 <!--swagger-->
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger2</artifactId>
 <version>2.9.2</version>
 </dependency>
 <dependency>
 <groupId>io.springfox</groupId>
 <artifactId>springfox-swagger-ui</artifactId>
 <version>2.9.2</version>
 </dependency>
 <!--RabbitMQ 测试依赖-->
 <dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit-test</artifactId>
 <scope>test</scope>
 </dependency>
</dependencies>

3、修改配置文件

spring.rabbitmq.host=111.229.153.16
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.mvc.pathmatch.matching-strategy=ant_path_matcher

4、添加 Swagger 配置类

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig(){
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }
    private ApiInfo webApiInfo(){
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("馒头警告", "www.baidu.com","2714858327@qq.com"))
                .build();
    }
}

四、队列 TTL

1、代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:


2、配置文件代码类

// TTL 队列,配置文件类代码
@Configuration
public class TTLQueueConfig {

    // 普通交换机的名称
    public static final String X_EXCHANGE = "X";
    // 死信交换机名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    // 普通队列名称
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    // 死信队列名称
    public static final String DEAD_LETTER_QUEUE_D = "QD";

    // 声明 X 交换机
    @Bean(value = "xExchange") // 相当于别名
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    // 声明 Y 交换机
    @Bean(value = "yExchange") // 相当于别名
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    // 声明普通队列 TTL 为 10s
    @Bean(value = "queueA") // 相当于别名
    public Queue queueA(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        // 设置死信 RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        // 设置 TTL
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }

    // 声明普通队列 TTL 为 10s
    @Bean(value = "queueB") // 相当于别名
    public Queue queueB(){
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        // 设置死信 RoutingKey
        arguments.put("x-dead-letter-routing-key","YD");
        // 设置 TTL
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    // 声明死信队列
    @Bean(value = "queueD")
    public Queue queueD(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build();
    }

    // 绑定
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA ,
                                  @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    // 绑定
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB ,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }
    // 绑定
    @Bean
    public Binding queueDBindingY(@Qualifier("queueD") Queue queueD ,
                                  @Qualifier("yExchange") DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

}

3、生产者

// 发送延迟消息
@Slf4j
@Configuration
@RequestMapping("/ttl")
public class SendMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 开始发消息
    @GetMapping("/sendmsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date().toString(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自 TTL 为 10s 的队列" + message);
        rabbitTemplate.convertAndSend("X","XB","消息来自 TTL 为 40s 的队列" + message);
    }
}

4、消费者

// 队列 TTL
@Slf4j
@Component
public class DeadLetterQueueConsumer {

    // 接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
    }
}

5、结果展示


五、延时队列优化

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?


1、代码架构图

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间


2、配置文件

 


3、生产者


4、结果展示

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“

因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。


六、RabbitMQ 插件实现延迟队列

1、安装延时队列插件

大家可以参考网上的教程自行去下载安装,这里就不再进行描述了

如果安装好了,就会有一个叫做  x-delayed-message 的类型

一旦装完插件之后,消息延迟的位置就换到交换机了,相当于生产者发消息,这个消息停留在交换机这个位置,再将消息发给队列


2、代码架构图

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:

一个生产者,要走延迟的交换机,延迟的交换机根据 RoutingKey ,路由到延迟的队列,而延迟的位置是在交换机的位置,队列被消费者所消费


 3、配置文件类代码

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并
不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。

@Configuration
public class DelayedQueueConfig {

    // 交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    // 队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    // RoutingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    // 声明交换机 基于插件的
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
                true,false,arguments);
    }

    // 声明队列
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }

    // 绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayedExchange")CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

4、生产者

    // 开始发消息  基于插件的  消息及延迟的时间
    @GetMapping("/senddelaymsg/{message}/{delaytime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delaytime){
        log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",
                new Date().toString(),delaytime,message);
        rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey",message, msg ->{
            // 设置发送消息时候的延迟时长
            msg.getMessageProperties().setDelay(delaytime);
            return msg;
        });
    }

5、消费者

// 消费者 基于插件的延迟消息
@Slf4j
@Component
public class DelayQueueConsumer {
    // 监听消息
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiverDelayQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
    }
}

6、结果展示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1884476.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

鸿蒙生态应用开发白皮书V3.0

来源&#xff1a;华为&#xff1a; 近期历史回顾&#xff1a;

yolov8对新的数据集自动标注

项目地址 https://github.com/ultralytics/ultralytics 极简运行效果 获取模型bbox的极简demo 有时候是想要获取yolo检测的bbox框。 import random import cv2 as cv from ultralytics import YOLO# model YOLO("yolov8m.yaml") # model YOLO("yolov8m.pt…

【FPGA】Verilog:全减器与半减器 | Full Subtractor | Half Subtractor

0x00 全减器(Full Subtractor) 减法器是用于减法运算的逻辑电路,与不包含借位的半减法器不同。 全减法器因为包含借位的产生与否,所以具备完整的减法功能。 输出由差 和借位 组成:

开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

一、前言 使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。 FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,F…

物联网工业级网关解决方案 工业4G路由器助力智慧生活

随着科技的飞速发展&#xff0c;无线通信技术正逐步改变我们的工作与生活。在这个智能互联的时代&#xff0c;一款高性能、稳定可靠的工业4G路由器成为了众多行业不可或缺的装备。工业4G路由器以其卓越的性能和多样化的功能&#xff0c;助力我们步入智慧新纪元。 一、快速转化&…

SpringBoot+ELK 收集日志的两种方式

方式一、FileBeatlogstash 7.5.1(docker)ES(docker)springboot 日志文件 应用方式 我们采用ELFK 架构采集日志&#xff0c;直接读取日志生成的文件&#xff0c;不对Springboot的日志任何的修改。也就是FileBeat 通过读取日志文件位置获取日志内容&#xff0c;然后发送至logsta…

综合项目实战--jenkins流水线

一、流水线定义 软件生产环节,如:需求调研、需求设计、概要设计、详细设计、编码、单元测试、集成测试、系统测试、用户验收测试、交付等,这些流程就组成一条完整的流水线。脚本式流水线(pipeline)的出现代表企业人员可以更自由的通过代码来实现不同的工作流程。 二、pi…

Flink 运行时架构

Flink 运行时的组件 作业管理器&#xff08;JobManager&#xff09;资源管理器&#xff08;ResourceManager&#xff09;任务管理器&#xff08;TaskManager&#xff09;分发器&#xff08;Dispatch&#xff09; JobManager 控制一个应用程序执行的主进程&#xff0c;也就是说…

IDEA 编译单个Java文件

文章目录 一、class文件的生成位置二、编译单个文件编译项目报错Error:java: 无效的源发行版: 8 一、class文件的生成位置 file->project structure->Modules 二、编译单个文件 选中文件&#xff0c;点击recompile 编译项目报错 Error:java: 无效的源发行版: 8 Fi…

从GPT到AGI:ChatGPT如何改变人机交互

在人工智能&#xff08;AI&#xff09;领域&#xff0c;ChatGPT等大语言模型&#xff08;LLM&#xff09;的出现&#xff0c;标志着一个新的时代。本文将深入探讨ChatGPT的技术原理、误解、潜在问题以及未来的发展方向和应用场景&#xff0c;并分析其对社会和商业领域的影响。 …

【Python数据分析及环境搭建】:教程详解1(第23天)

系列文章目录 Python进行数据分析的优势常用Python数据分析开源库介绍启动Jupyter服务Jupyter Notebook的使用 文章目录 系列文章目录前言学习目标1. Python进行数据分析的优势2. 常用Python数据分析开源库介绍2.1 NumPy2.2 Pandas2.3 Matplotlib2.4 Seaborn2.5 Sklearn2.6 Ju…

python 分析nginx的error.log日志 然后写入到 mongodb当中 并且解决mongodb无法根据id删除数据的问题

废话不多说 直接上代码 import re import os import pymongo import uuid import bson def extract_unresolved_info(log_path):unresolved_info []with open(log_path, r) as file:log_text file.read()lines log_text.split("\n")for line in lines:# 这种属于主…

汽车内饰塑料件光照老化实验箱

塑料件光照老化实验箱概述 塑料件光照老化实验箱&#xff0c;又称为氙灯老化试验箱&#xff0c;是一种模拟自然光照条件下塑料材料老化情况的实验设备。它通过内置的氙灯或其他光源&#xff0c;产生接近自然光的紫外线辐射&#xff0c;以此来加速塑料及其他材料的光老化过程。…

Open3D 点云CPD算法配准(粗配准)

目录 一、概述 二、代码实现 2.1关键函数 2.2完整代码 三、实现效果 3.1原始点云 3.2配准后点云 一、概述 在Open3D中&#xff0c;CPD&#xff08;Coherent Point Drift&#xff0c;一致性点漂移&#xff09;算法是一种经典的点云配准方法&#xff0c;适用于无序点云的非…

Python番外篇之责任转移:有关于虚拟机编程语言的往事

编程之痛 如果&#xff0c;你像笔者一样&#xff0c;有过学习或者使用汇编语言与C、C等语言的经历&#xff0c;一定对下面所说的痛苦感同身受。 汇编语言 将以二进制表示的一条条CPU的机器指令&#xff0c;以人类可读的方式进行表示。虽然&#xff0c;人类可读了&#xff0c…

Android Studio 2023版本切换DNK版本

选择自己需要的版本下载 根目录下的配置路劲注意切换 build.gradle文件下的ndkVersion也要配好对应版本

【web APIs】快速上手Day03

目录 Web APIs - 第3天全选文本框案例事件流事件捕获事件冒泡阻止冒泡解绑事件on事件方式解绑addEventListener方式解绑 注意事项-鼠标经过事件的区别两种注册事件的区别 事件委托综合案例-tab栏切换改造 其他事件页面加载事件元素滚动事件页面滚动事件-获取位置页面滚动事件-滚…

【java高级】【算法】通过子节点 反向获取 树路径父节点 且不获取无关节点

有一个奇葩需求 要求 用户配置在某选择框的选项 例如 然后在选择时显示 用户配置的选项 依旧是返回树,但是只包含 选择的子节点。 以及涉及的父节点,树路径 不返回无关节点 【一般】我们开发中都是直接通过 树节点 返回 其下子节点 这个需求的确很奇葩。 而且还要考…

生命在于学习——Python人工智能原理(3.1.1)

Python部分结束了&#xff0c;开始概率论部分 一、概率基本知识 1.1 事件与概率 1.1.1 事件的运算与关系 &#xff08;一&#xff09;基本概念 定义1 随机试验 如果一个试验满足如下条件&#xff1a; 在试验前不能断定其将发生什么结果&#xff0c;但可明确指出或说明试验…

Python系统教程01

Python 是一门解释性语言&#xff0c;相对更简单、易学&#xff0c;它可以用于解决数学问题、获取与分 析数据、爬虫爬取网络数据、实现复制数学算法等等。 1、print()函数&#xff1a; print()书写时注意所有的符号都是英文符号。print()输出内容时&#xff0c;若要输出字符…