RabbitMQ之延迟队列解读

news2025/1/15 7:27:14

目录

基本介绍

概述

 为什么需要引进RabbitMQ延迟队列

应用场景

 springboot代码实战

实战架构

工程概述

RabbitConfigDeal 配置类:创建队列及交换机并进行绑定 

MessageService业务类:发送消息及接收消息

主启动类RabbitMq01Application:实现ApplicationRunner接口


基本介绍

概述

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。 其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

RabbitMQ 中并没有延时队列的概念,是通过 延时交换机与 死信队列实现。

需要注意:如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡”,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先被执行。 

 为什么需要引进RabbitMQ延迟队列

场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队列来实现,也可以有一些其他办法实现;

定时任务方式

每隔5秒扫描一次数据库,查询过期的订单然后进行处理;

优点:

简单,容易实现;

缺点:

  • 存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;
  • 性能较差,每次扫描数据库,如果订单量很大

被动取消 

当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);

优点:

对服务器而言,压力小;

缺点:

  • 用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;
  • 用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;
  • JDK延迟队列(单体应用,不能分布式下)

DelayedQueue

无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素 

优点:

实现简单,任务延迟低;

缺点:

  • 服务重启、宕机,数据丢失;
  • 只适合单机版,不适合集群;
  • 订单量大,可能内存不足而发生异常; oom

采用消息中间件(rabbitmq)

RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。

应用场景

  •         1. 订单超时处理:当用户下单后,可以将订单放入一个延迟队列中,在一定的时间后检查订单是否完成支付。如果订单未及时支付,可以触发相应的超时处理逻辑,如取消订单、释放库存等操作。
  •         2. 消息重试机制:当某个消息无法被立即处理时,可以将该消息放入延迟队列,并设置延迟时间。在延迟时间到达后,将消息重新发送到原始队列,供消费者重新处理。
  •         3. 定时任务调度:延迟队列可以用于实现定时任务的调度。将需要执行的任务放入延迟队列,并设置合适的延迟时间,当延迟时间到达时,任务会被获取并执行。
  •         4. 优惠券过期提醒:在发放优惠券时,可以同时将过期时间放入延迟队列。当优惠券过期时间到达时,系统可以发送提醒消息给用户,以提醒其使用优惠券。
  •         5. 延迟通知和提醒:在需要延迟通知或提醒的场景中,可以将通知信息放入延迟队列,并设置适当的延迟时间。当延迟时间到达后,系统会从队列中获取通知信息并发送给相应的用户

 springboot代码实战

实战架构

如上图,消息到达正常的交换机exchange.nomal.a,通过与正常的队列queue.noaml.a绑定,消息会到达正常队列,如果消息到期变为死消息以后会重新转发到正常的交换机exchange.nomal.a中,最后会转发延迟队列queue.delayed.a中进行消化。

工程概述

 工程采用springboot架构,主要用到的依赖为:

<!--        rabbit的依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

application.yml配置文件如下:

server:
  port: 8080
spring:
  rabbitmq:
    host: 123.249.70.148
    port: 5673
    username: admin
    password: 123456
    virtual-host: /

RabbitConfigDeal 配置类:创建队列及交换机并进行绑定 

@Configuration
public class RabbitConfigDeal {


}

创建正常交换机

   @Bean
    public DirectExchange normalExchange(){
        return ExchangeBuilder.directExchange("exchange.normal.a").build();
    }

 创建延迟队列

    @Bean
    public Queue delayedQueue(){
        return new Queue("queue.delayed.a");
    }

 创建正常队列,设置他的绑定死信交换机,以及对应绑定的路由key为order

    @Bean
    public Queue normalQueue(){
        Map<String, Object> arguments =new HashMap<>();
        arguments.put("x-dead-letter-exchange","exchange.normal.a");
        arguments.put("x-dead-letter-routing-key","error");
        return QueueBuilder.durable("queue.normal.a")
                .withArguments(arguments).build();
    }

绑定正常交换机和正常队列

    @Bean
    public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("info");
    }

 绑定正常交换机和延迟队列

    @Bean
    public Binding bindingDelayed(DirectExchange normalExchange,Queue delayedQueue){
        return BindingBuilder.bind(delayedQueue).to(normalExchange).with("error");
    }

MessageService业务类:发送消息及接收消息

@Component
@Slf4j
public class MessageService {
    @Resource
    private RabbitTemplate rabbitTemplate;

}

 发送消息方法

    public void sendMsg(){
        MessageProperties messageProperties = new MessageProperties();
        // 设置过期时间,单位:毫秒
        messageProperties.setExpiration("15000");
        Message message1 = new Message("hello word 15s".getBytes(), messageProperties);
        //发送消息
        rabbitTemplate.convertAndSend("exchange.normal.a", "info", message1);
        System.out.println("发送完毕:" + new Date());
        // 设置过期时间,单位:毫秒
        messageProperties.setExpiration("15000");
        Message message2 = new Message("hello word 5s".getBytes(), messageProperties);
        //发送消息
        rabbitTemplate.convertAndSend("exchange.normal.a", "info", message2);
        System.out.println("发送完毕:" + new Date());
    }

这里发送了俩条消息,路由key为info,第一条消息的过期时间为15s,第二条消息的过期的时间为5s,按照分析,虽然第二条消息先过期,但是第一条消息过期以后再会对第二条处理,也就意味着:到达延迟队列的顺序为:hello word 15s       hello word 5s

MessageConvert

  • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
  • RabbitMQ 的序列化是指 Messagebody 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter默认)、Jackson2JsonMessageConverter

  接受消息

    @RabbitListener(queues = {"queue.delayed.a"})
    public void receiveMsg(Message message){
        byte[] body = message.getBody();
        String queue = message.getMessageProperties().getConsumerQueue();
        String msg=new String(body);
        log.info("{}接收到消息时间:{},消息为{}",queue,new Date(),msg);
    }

 Message

在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

  1. MessageProperties // 消息属性

  2. byte[] body // 消息内容

@RabbitListener

使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理

  • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

  • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

    • application/octet-stream:二进制字节数组存储,使用 byte[]
    • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
    • text/plain:文本数据类型存储,使用 String
    • application/json:JSON 格式,使用 Object、相应类型

主启动类RabbitMq01Application:实现ApplicationRunner接口

/**
 * @author 风轻云淡
 */
@SpringBootApplication
public class RabbitMq01Application implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMq01Application.class, args);
    }

    @Resource
    private MessageService messageService;

    /**
     * 程序一启动就会调用该方法
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.sendMsg();

    }
}

在SpringBoot中,提供了一个接口:ApplicationRunner。 该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。

由于该方法是在容器启动完成之后,才执行的,所以,这里可以从spring容器中拿到其他已经注入的bean。

启动主启动类后查看控制台:

发送完毕:Fri Sep 29 17:12:35 CST 2023
发送完毕:Fri Sep 29 17:12:35 CST 2023
2023-09-29 17:12:50.165  INFO 89972 --- [ntContainer#0-1] 
c.e.rabbitmq01.service.MessageService    : 
queue.delayed.a接收到消息时间:Fri Sep 29 17:12:50 CST 2023,消息为hello word 15s
2023-09-29 17:12:50.166  INFO 89972 --- [ntContainer#0-1] 
c.e.rabbitmq01.service.MessageService    : 
queue.delayed.a接收到消息时间:Fri Sep 29 17:12:50 CST 2023,消息为hello word 5s

我们在这里可以看见到达延迟队列的顺序是hello word 15s    hello word 5s

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1080101.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年中国医院信息系统发展现状及行业市场规模分析[图]

医院信息系统&#xff0c;亦称“医院管理信息系统”&#xff08;简称HIS&#xff09;&#xff0c;是指利用计算机软硬件技术、网络通信技术等现代化手段&#xff0c;对医院及其所属各部门的人流、物流、财流进行综合管理&#xff0c;对在医疗活动各阶段产生的数据进行采集储存、…

Lab 1: Unix utilities汇总

这个实验主要学习了常用的一些系统调用。 Lab 1: Unix utilities Boot xv6 (easy) git克隆&#xff0c;切换分支&#xff0c;qemu。根据要求进行操作即可。 $ git clone git://g.csail.mit.edu/xv6-labs-2020 $ cd xv6-labs-2020 $ git checkout util $ make qemusleep (ea…

分享一下花店制作微信小程序的步骤是什么

一、准备阶段 在准备阶段&#xff0c;花店需要完成以下任务&#xff1a; 注册微信公众平台账号&#xff1a;首先&#xff0c;花店需要注册一个微信公众平台账号&#xff0c;这个账号将用于创建和管理微信小程序。 确定小程序的功能和需求&#xff1a;花店需要根据自身的业务需…

RISC-V架构 | 飞凌嵌入式FET7110-C国产高性能核心板现货发售!

RISC-V凭借其完全开源免费且可自由修改的特性而备受国内厂商的追捧&#xff0c;在此背景下&#xff0c;飞凌嵌入式联合RISC-V国产处理器厂商赛昉科技(StarFive)基于昉惊鸿7110处理器共同推出FET7110-C核心板。 现在&#xff0c;飞凌嵌入式FET7110-C核心板&#xff08;商业级&a…

2023.10月网络优化项目实战

基础配置 sw2 <Huawei>sy Enter system view, return user view with Ctrl+Z. [Huawei]sy sw2 [sw2]vlan batch 10 20 Info: This operation may take a few seconds. Please wait for a moment...done.[sw2]int e0/0/1 [sw2-Ethernet0/0/1]port link-type access [s…

第十章-输入输出系统

Ⅰ.锁 本质是互斥操作 原因&#xff1a;针对公共资源访问时&#xff0c;临界区若不加以互斥限制&#xff0c;可能导致执行过程中突然的中断导致出现异常。 1.互斥过程 设定互斥量M为二值信号量&#xff0c;0/1&#xff0c;P-&#xff0c;V&#xff0c;现有两个进程A、B共同…

大数据flink篇之三-flink运行环境安装(一)单机Standalone安装

一、安装包下载地址 https://archive.apache.org/dist/flink/flink-1.15.0/ 二、安装配置流程 前提基础&#xff1a;Centos环境&#xff08;建议7以上&#xff09; 安装命令&#xff1a; 解压&#xff1a;tar -zxvf flink-xxxx.tar.gz 修改配置conf/flink-conf.yaml&#xff1…

最新AI创作系统源码ChatGPT网站源码/支持Midjourney,AI绘画/支持OpenAI GPT全模型+国内AI全模型

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…

RxJava介绍及基本原理

随着互联网的迅猛发展&#xff0c;Java已成为最广泛应用于后端开发的语言之一。而在处理异步操作和事件驱动编程方面&#xff0c;传统的Java多线程并不总是最佳选择。这时候&#xff0c;RxJava作为一个基于观察者模式、函数式编程和响应式编程理念的库&#xff0c;为我们提供了…

【Nuget】程序包源

程序包源地址(部分) Azure 中国区的官方 NuGet 程序包源地址 https://nuget.cdn.azure.cn/v3/index.json 官方 NuGet 程序包源地址 V2 https://www.nuget.org/api/v2 官方 NuGet 程序包源地址 V3 https://api.nuget.org/v3/index.json MyGet 上 Eto.Forms 框架的程序包源地址 h…

杨冰:分布式数据库助力企业数实融合,跨越数字化转型深水区

近日&#xff0c;2023 inclusion外滩大会在上海黄浦世博园区举办。由赛迪顾问与 OceanBase 联合主办的外滩大会“分布式数据库助力数实融合”见解论坛圆满落幕。 会上&#xff0c;OceanBase CEO 杨冰发表了《分布式数据库助力企业数实融合&#xff0c;跨越数字化转型深水区》的…

一个完整的初学者指南Django-part1

源自&#xff1a;https://simpleisbetterthancomplex.com/series/2017/09/04/a-complete-beginners-guide-to-django-part-1.html 一个完整的初学者指南Django - 第1部分 介绍 今天我将开始一个关于 Django 基础知识的新系列教程。这是一个完整的 Django 初学者指南。材料分为七…

mysql面试题38:count(1)、count(*) 与 count(列名) 的区别

该文章专注于面试&#xff0c;面试只要回答关键点即可&#xff0c;不需要对框架有非常深入的回答&#xff0c;如果你想应付面试&#xff0c;是足够了&#xff0c;抓住关键点 面试官&#xff1a; count(1)、count(*) 与 count(列名) 的区别 当使用COUNT函数进行数据统计时&…

echarts折线图(其他图也是一样)设置tooltip自动滚动

按顺序自动滚动效果 <div class"leftComp-charts" id"chartsBox"></div>chartsData: {roadNorm: [],time: []},eChartsTimer: nullinitChartsBox() {this.option {tooltip: {trigger: "axis",axisPointer: {// 方法一type: "s…

2023年中国稻谷加工机械分类、市场规模及发展前景分析[图]

稻谷加工机械设备主要包括砻谷机、碾米机、抛光机、碎米机和砻糠机&#xff1b;通过物理和机械方式将稻谷加工成可供人们食用的大米&#xff0c;同时还可以提取出一些有价值的副产品&#xff0c;如砻糠可以用作饲料。 稻谷加工机械制造行业分类 资料来源&#xff1a;共研产业咨…

设计模式-相关内容

文章目录 一、设计模式概述二、UML图1.类的表示方法2.类与类之间关系的表示方法(1)关联关系(2)聚合关系(3)组合关系(4)依赖关系(5)继承关系(6)实现关系 三、软件设计原则1.开闭原则2.里氏代换原则3.依赖倒转原则4.接口隔离原则5.合成复用原则6.迪米特法则 一、设计模式概述 创…

集群分发脚本xysnc

一、scp&#xff08;secure copy&#xff09; 安全拷贝 1.定义 scp&#xff08;Secure Copy&#xff09;是一个用于在不同计算机之间安全地复制文件和目录的命令行工具。它使用 SSH 协议进行连接和文件传输&#xff0c;提供了加密和身份验证机制&#xff0c;确保数据传输的安…

Android 项目增加 res配置

main.res.srcDirs "src/main/res_test" build->android->sourceSets

从裸机启动开始运行一个C++程序(七)

前序文章请看&#xff1a; 从裸机启动开始运行一个C程序&#xff08;六&#xff09; 从裸机启动开始运行一个C程序&#xff08;五&#xff09; 从裸机启动开始运行一个C程序&#xff08;四&#xff09; 从裸机启动开始运行一个C程序&#xff08;三&#xff09; 从裸机启动开始运…

Mall脚手架总结(四) —— SpringBoot整合RabbitMQ实现超时订单处理

前言 在电商项目中&#xff0c;订单因为某种特殊情况被取消或者超时未支付都是比较常规的用户行为&#xff0c;而实现该功能我们就要借助消息中间件来为我们维护这么一个消息队列。在mall脚手架中选择了RabbitMQ消息中间件&#xff0c;接下来荔枝就会根据功能需求来梳理一下超时…