RabbitMQ系列【11】延迟队列

news2024/12/24 4:01:03

有道无术,术尚可求,有术无道,止于术。

文章目录

    • 前言
    • 1. 过期消息实现延迟队列
    • 2. 过期队列实现延迟队列
    • 3. 插件实现延迟队列
      • 3.1 安装插件
      • 3.2 代码实现
      • 3.3 测试

前言

延迟队列:即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求场景: 用户下单后,30分钟未支付,取消订单,回滚库存。

本文介绍了三种方式实现,前两种存在一定的局限性

1. 过期消息实现延迟队列

发送带有TTL过期属性的消息,到达过期时间后,投递到死信队列,实现延迟队列功能。

添加一个死信交换机、死信队列:

@Configuration
public class RabbitMqDeadQueueConfig {

    private static final String DEAD_QUEUE = "deadQueue";

    private static final String DEAD_EXCHANGE = "deadExchange";

    private static final String DEAD_ROUTE_KEY = "dead.key";

    /**
     * 死信队列
     */
    @Bean(DEAD_QUEUE)
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    /**
     * 死信交换机
     */
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange() {
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
    }

    /**
     * 创建死信队列和死信交换机的绑定关系
     */
    @Bean("deadBinding")
    public Binding deadBinding(@Qualifier(DEAD_QUEUE) Queue deadQueue, @Qualifier(DEAD_EXCHANGE) Exchange directExchange) {
        return BindingBuilder.bind(deadQueue).to(directExchange).with(DEAD_ROUTE_KEY).and(null);
    }
}

添加一个延迟交换机、延迟队列:

@Configuration
public class RabbitMqDelayQueueConfig {


    private static final String DEAD_EXCHANGE = "deadExchange";

    private static final String DEAD_ROUTE_KEY = "dead.key";

    /**
     * 使用 ExchangeBuilder 创建交换机
     */
    @Bean("delayExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.directExchange("delayExchange").durable(true).build();
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("delayQueue")
    public Queue bootQueue001() {
        return QueueBuilder.durable("delayQueue").deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
    }

    /**
     * 创建绑定关系
     */
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").and(null);
    }
}

创建一个消费者,接收死信消息:

@Component
public class RabbitConsumer {

    @RabbitListener(queues = {"deadQueue"})
    public void deadQueue(Message message) {
        System.out.println("收到消息" + new String(message.getBody()));
        System.out.println("当前时间:"+ LocalDateTime.now());
        System.out.println("判断订单状态...." + new String(message.getBody()));
        System.out.println("未支付,回滚数据库....");
    }

}

发送TTL订单消息:

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("10000"); // 10秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message);

运行程序,可以看到,过了10秒,收到了订单信息~
在这里插入图片描述

但是该方式存在一个致命缺陷,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。 每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列。RabbitMQ是等消息到达队列顶部即将被消费时,才会判断其是否过期并删除。所以即使消息过期,也不会马上从队列中抹去。

首先发送一个过期时间为20秒的消息,再发送一个过期时间为10秒的消息:

        // 发送过期时间为20秒的消息,先到达队列
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("20000"); // 20秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message001);
        // 发送过期时间为10秒的消息,后达队列
        messageProperties.setExpiration("10000"); // 20秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message002);

测试发现,第二条过期时间为10秒的消息,虽然过期时间更短,但也需要等到第一条过期后,到达消息顶部,才会被扫描是否过期,由此可见, 过期消息实现延迟队列并不可取~~~
在这里插入图片描述

2. 过期队列实现延迟队列

实现思路:

  1. 用户下单后,生成订单,发送订单消息到延迟队列中,并设置过期时间为30分钟,该队列没有消费者
  2. 订单队列消息过期后,发送订单至死信队列
  3. 死信队列消费者接收到消息后,判断订单状态,进行后续操作
    在这里插入图片描述

给整个队列添加过期时间实现延迟队列。由于过期时间作用于整个队列,所以不是很灵活,比如设置30分钟需要一个队列,设置10分钟时,又需要创建一个队列。

创建一个过期时间队列:

    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable("delayQueue").withArgument("x-message-ttl", 10000).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
    }

参照上面的案例发送消息即可。

3. 插件实现延迟队列

上面我们讨论了两种实现延迟队列的方式,但是都存在一些问题,官网也提供了基于插件的方式来实现。

消息到达延迟交换机后,消息不会立即进入队列,先将消息保存至表中,插件将会尝试确认消息是否过期,如果消息过期则投递至目标队列。

在这里插入图片描述

3.1 安装插件

官网中提供了很多插件,以满足更多的功能需求。

在GitJHub中下载延迟插件:

在这里插入图片描述
将其放在RabbitMQ程序主目录的plugins下:

在这里插入图片描述
切换到sbin目录下,运行安装插件命令:

rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange

成功安装提示:

在这里插入图片描述

重启RabbitMQ,进入到交换机页面,添加交换机,可以看到一个新的类型为x-delayed-message,说明插件安装成功:
在这里插入图片描述

3.2 代码实现

首先创建x-delayed-message类型的交换机,并绑定队列:

@Configuration
public class RabbitMqDelayQueueConfig {

    @Bean("delayExchange")
    public CustomExchange delayExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange("delayExchange", "x-delayed-message", true, false, arguments);
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable("delayQueue").build();
    }

    /**
     * 创建绑定关系
     */
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").noargs();
    }
}

创建消费者,监听延迟队列:

    @RabbitListener(queues = {"delayQueue"})
    public void deadQueue(Message message) {
        System.out.println("收到消息" + new String(message.getBody()));
        System.out.println("当前时间:"+ LocalDateTime.now());
        System.out.println("判断订单状态...." + new String(message.getBody()));
        System.out.println("未支付,回滚数据库....");
    }
}

创建生产者,发送不同延迟时间的消息:

        // 发送延迟时间为20秒的消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(20000); // 20秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message001);
        // 发送延迟时间为10秒的消息
        messageProperties.setDelay(10000); // 10秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message002);

3.3 测试

可以看到第一条消息和第二条消息,都是在各自指定的延迟时间后被消费,并没有出现时序问题,而且每个消息都具有不同的延迟时间,灵活性很高。

所以插件是实际应用中常用的一种方式,要实现延迟队列功能,当前其他MQ,甚至Redis也可以~~~
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/21125.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL事务隔离机制 -- 必须说透

文章目录前言一、什么是数据库事务二、事务并发带来的4类问题三、事务4种隔离级别四、Mysql演示4种隔离级别总结前言 如何控制并发是数据库领域中非常重要的问题之一&#xff0c;MySQL为了解决并发带来的问题&#xff0c;设计了事务隔离机制、锁机制、MVCC机制&#xff0c;用一…

c# 实验七 图像列表框及树形视图控件的使用

前言&#xff1a; &#x1f44f;作者简介&#xff1a;我是笑霸final&#xff0c;一名热爱技术的在校学生。 &#x1f4dd;个人主页&#xff1a;个人主页1 || 笑霸final的主页2 &#x1f4d5;系列专栏&#xff1a;《项目专栏》 &#x1f4e7;如果文章知识点有错误的地方&#xf…

[附源码]java毕业设计四六级考试管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

m基于matlab的wcdma软切换算法的研究分析和仿真

目录 1.算法概述 2.仿真效果预览 3.MATLAB部分代码预览 4.完整MATLAB程序 1.算法概述 软切换是WCDMA系统的关键技术之一&#xff0c;软切换算法和相关参数的设置直接影响着系统的容量和服务质量。通过WCDMA系统的软切换技术可以提高小区覆盖率和系统容量。所以软切换技术是…

【ASM】字节码操作 工具类与常用类 LocalVariablesSorter 类 简单介绍与使用

文章目录 1.概述2. LocalVariablesSorter#2.1 class info2.2 fields3.案例3.1 编码实现3.2 编码实现v21.概述 在上一节:【ASM】字节码操作 工具类与常用类 GeneratorAdapter 介绍 我们知道了对于GeneratorAdapter 类来说,它非常重要的一个特点:将一些visitXxx()方法封装成一…

Java面向对象详解(上)

Java面向对象详解&#xff08;上&#xff09;&#x1fa85;面向对象与面向过程的区分✨面向过程&#xff1a;✨面向对象&#xff1a;&#x1fa85;类是什么&#xff1f;&#x1fa85;对象是什么&#xff1f;&#x1fa85;类的结构&#x1fa85;类中方法&#xff1a;✨成员方法与…

实战讲解SpringBoot启动时自动加载数据库数据到内存:通过回调方法自动运行Bean(图+文+源码)

1 缘起 在补充SpringCloud网关&#xff08;Gateway&#xff09;配置白名单相关知识过程中&#xff0c; 有两种实现方案&#xff1a; &#xff08;1&#xff09;SpringBoot的启动配置文件application.yml进行配置&#xff1b; &#xff08;2&#xff09;自动加载MySQL数据库中的…

【人工智能】Mindspore框架中保存加载模型

前言 MindSpore着重提升易用性并降低AI开发者的开发门槛&#xff0c;MindSpore原生适应每个场景包括端、边缘和云&#xff0c;并能够在按需协同的基础上&#xff0c;通过实现AI算法即代码&#xff0c;使开发态变得更加友好&#xff0c;显著减少模型开发时间&#xff0c;降低模…

深度学习在图像处理中的应用学习笔记

这篇学习笔记用于记录本人在读研期间的学习内容 在刚入学不久&#xff0c;发现一个B站up主对这方面进行了一系列的整理总结&#xff0c;并上传了代码&#xff0c;并且非常成体系&#xff0c;因此本人打算跟着这位up主的步骤&#xff0c;对这方面进行学习并且做一个记录&#xf…

Vue安装并使用axios发送请求

前言 本文主要介绍的是使用在Vue项目中安装并使用axios发送请求 axios介绍 axios是一种Web数据交互方式 它是一个基于promise的网络请求库&#xff0c;作用于node.js和浏览器中&#xff0c;它是 isomorphic 的(即同一套代码可以运行在浏览器和node.js中) 本质是对原生XHRX…

NAND Flash原理

Flash 简介 Flash全名叫做Flash Memory&#xff0c;属于非易失性存储设备(Non-volatile Memory Device)&#xff0c;与此相对应的是易失性存储设备(Volatile Memory Device)。关于什么是非易失性/易失性&#xff0c;从名字中就可以看出&#xff0c;非易失性就是不容易丢失&…

BGP→→

BGP-4 提供了一套新的机制以支持无类域间路由。这些机制包括支持网络前缀的通告、取消 BGP 网络中 “ 类 ” 的概念。 BGP-4 也引入机制支持路由聚合&#xff0c;包括 AS 路径的集合。 特点 BGP属于外部或域间路由协议。BGP的主要目标是为处于不同AS中的路由器之间进行路由信…

Spring Cloud Config 配置中心

最简单的配置中心&#xff0c;就是启动一个服务作为服务方&#xff0c;之后各个需要获取配置的服务作为客户端来这个服务方获取配置。 Spring Cloud Config&#xff0c;可以用 git &#xff0c;还可以用数据库、svn、本地文件等作为存储。 1. Config Server 引入 config-ser…

Day08--自定义组件的properties属性

提纲挈领&#xff1a; 1.properties属性 我的操作&#xff1a; 1》 2》 3》输出看看效果是不是真的有接收到。 ********************************* ********************************* ********************************* ********************************* **************…

Spring 源码阅读

1. beanFactory The root interface for accessing a Spring bean container. 2. BeanFactoryPostProcessor 对bean定义进行后置处理&#xff0c;比如jdbc类读取配置的密码&#xff0c;用户名等。 3.BeanPostProcessor public interface BeanPostProcessor {Object postProce…

ctfshow--RCE极限挑战

本周ctfshow的挑战注重点为RCE&#xff0c;主要利用是&#xff1a;自增绕过RCE RCE挑战1 属于简单类型 源码 error_reporting(0); highlight_file(__FILE__); $code $_POST[code]; $code str_replace("(","括号",$code); $code str_replace(".&q…

Arcgis使用教程(十一)ARCGIS地图制图之经纬网格设置参数详解

Arcgis地图制图中&#xff0c;经纬网格的添加详细参见&#xff1a; Arcgis使用教程&#xff08;十&#xff09;ARCGIS地图制图之经纬网格添加_空中旋转篮球的博客-CSDN博客 1.加载数据 在Arcmap中加载中国矢量图层数据&#xff08;中国省级行政区图&#xff0c;国界线两个图层…

善于使用二阶思维

事情往往不是你想象的那样&#xff0c;有时候&#xff0c;看似解决了问题&#xff0c;却在不经意间&#xff0c;引发了更严重的后果。帮助我们思考、决策、解决问题的最有效方法是&#xff0c;运用二阶思维。 什么是二阶思维&#xff1f; 一阶思维是单纯而肤浅的&#xff0c;几…

34.nacos客户端读取共享配置文件实例(springcloud)

其他配置环境和上文相同&#xff0c;本实例不再演示 https://blog.csdn.net/weixin_59334478/article/details/127953755?spm1001.2014.3001.5501https://blog.csdn.net/weixin_59334478/article/details/127953755?spm1001.2014.3001.55011.新建共享配置文件&#xff0c;使…

高NA (数值孔径)物镜的分析

高NA(数值孔径)物镜常用于光学显微及光刻&#xff0c;并已广泛在其他应用中得以使用。众所周知&#xff0c;在高数值孔径物镜的使用中&#xff0c;电磁场矢量特性的影响是不可忽略的。一个众所周知的例子就是由高NA(数值孔径)物镜聚焦线性偏振圆光束时&#xff0c;焦斑的不对称…