RabbitMQ延时队列

news2025/1/11 14:17:42

延时队列内部是有序的,最重要的特性就是延时,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

使用场景

  1. 订单在十分钟之内未支付则自动取消
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议。

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理,如果数据量比较少,可以这样做,但对于数据量比较大,并且时效性较强的场景,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量使用轮询的方式是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

image

TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。

如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,这条消息如果在TTL设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL和消息的TTL,会使用较小的值,有两种方式设置 TTL。

队列设置TTL

在创建队列的时候设置队列的x-message-ttl属性,使用SpringBoot整合设置:

/**
 * 声明 A 队列,并设置TTL时间为 10 秒
 */
@Bean("aQueue")
public Queue aQueue() {
    Map<String, Object> args = new HashMap<>(3);
    // 声明队列的 TTL
    args.put("x-message-ttl", 10000);
    return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:

image

创建配置文件类

import org.springframework.amqp.core.*;

@Configuration
public class TtlQueueConfig {

    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "A";
    public static final String QUEUE_B = "B";
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String DEAD_LETTER_QUEUE = "D";

    /**
     * 声明 X 交换机
     */
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    /**
     * 声明 Y 死信交换机
     */
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明 A 队列,绑定死信交换机,并设置TTL时间为 10 秒
     */
    @Bean("aQueue")
    public Queue aQueue() {
        Map<String, Object> args = new HashMap<>(3);
        // 声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        // 声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        // 声明队列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    /**
     * 声明 A 队列与 X 交换机绑定
     */
    @Bean
    public Binding aQueueBindingX(@Qualifier("aQueue") Queue aQueue,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(aQueue).to(xExchange).with("XA");
    }

    /**
     * 声明 B 队列,绑定死信交换机,并设置TTL时间为 40 秒
     */
    @Bean("bQueue")
    public Queue bQueue() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    /**
     * 声明队列A与交换机X绑定
     */
    @Bean
    public Binding bQueueBindingX(@Qualifier("bQueue") Queue bQueue,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(bQueue).to(xExchange).with("XB");
    }

    /**
     * 声明死信队列 D
     */
    @Bean("dQueue")
    public Queue dQueue() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    /**
     * 声明死信队列 D 与死信交换机 Y 绑定
     */
    @Bean
    public Binding dDeadLetterQueueBindingY(@Qualifier("dQueue") Queue queueD,
                                            @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

消息生产者

@Slf4j
@RequestMapping("/ttl")
@RestController
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", LocalDateTime.now(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message);
    }
}

消息消费者

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = TtlQueueConfig.DEAD_LETTER_QUEUE)
    public void receiveD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}", LocalDateTime.now(), msg);
    }
}

测试

启动项目,发起一个请求 http://localhost:8080/ttl/sendMsg/124123

image

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

消息设置TTL

对每条消息设置TTL,使用SpringBoot整合设置:

@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
    log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", LocalDateTime.now(), message);
    rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
    rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message,
            // 消息来自设置了40S的队列,但消息的TTL是2S,如果都设置了,则以少的为准
            correlationData -> {
                correlationData.getMessageProperties().setExpiration("2000");
                return correlationData;
            });
}

在上面的案例中新增一个C队列,绑定关系如下,该队列不设置TTL 时间:

image

创建一个新的配置类

@Component
public class MsgQueueConfig {

    public static final String QUEUE_C = "C";
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";

    /**
     * 声明 C 队列,绑定死信交换机
     */
    @Bean("cQueue")
    public Queue cQueue(){
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //没有声明 TTL 属性
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }

    /**
     * 声明队列C与交换机X绑定
     */
    @Bean
    public Binding cQueueBindingX(@Qualifier("cQueue") Queue bQueue,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(bQueue).to(xExchange).with("XC");
    }
}

消息生产者

@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
    log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", LocalDateTime.now(), message);
    rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
    rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl 为 40S 的队列: " + message,
            // 消息来自设置了40S的队列,但消息的TTL是2S,如果都设置了,则以少的为准
            correlationData -> {
                correlationData.getMessageProperties().setExpiration("2000");
                return correlationData;
            });
}

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
    rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
        correlationData.getMessageProperties().setExpiration(ttlTime);
        return correlationData;
    });
    log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C: {}", LocalDateTime.now(), ttlTime, message);
}

消息消费者

和上面的保持一致

测试

启动项目,发起两个请求

  • http://localhost:8080/ttl/sendExpirationMsg/你好2/20000
  • http://localhost:8080/ttl/sendExpirationMsg/你好/2000

image

执行成功。

两者的区别

设置队列的 TTL 属性,一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)。

设置消息的 TTL 属性,消息过期,不一定会被马上丢弃,消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

延时队列插件

上面消息TTL设置时,看起来没什么问题,但如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

使用延时队列插件解决:

安装

官网下载地址:https://www.rabbitmq.com/community-plugins.html

找到 rabbitmq_delayed_message_exchange 插件,点击Releases,进入github页面点击下载ez格式的文件,并将该文件上传到 RabbitMQ 的插件目录/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

下载合适的版本,mq是3.8.8,插件就下载支持该版本的,否则执行下面命令会报错。

image

执行命令:

 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如果执行了没反应,可以进入rabbitmq的安装目录执行,然后重启试试。

image

image

新增一个队列delayed.queue,一个交换机delayed.exchange,绑定关系如下:

image

配置文件类

@Configuration
public class DelayedQueueConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    /**
     * 声明 delayed.queue 队列
     */
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    /**
     * 声明 delayed.exchange 交换机
     * 交换机使用新类型的交换机:x-delayed-message
     */
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        // 自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    /**
     * 声明队列与交换机绑定
     */
    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

消息生产者

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
    rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> {
                correlationData.getMessageProperties().setDelay(delayTime);
                return correlationData;
            });
    log.info("当前时间:{},发送一条延时 {} 毫秒的信息给队列 delayed.queue:{}", 
        LocalDateTime.now(), delayTime, message);
}

消息消费者

@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
    String msg = new String(message.getBody());
    log.info("当前时间:{},收到延时队列的消息:{}", LocalDateTime.now(), msg);
}

测试

启动项目,发起两个请求

http://localhost:8080/ttl/sendDelayMsg/{message}/{delayTime}

  • http://localhost:8080/ttl/sendDelayMsg/第1条消息/20000
  • http://localhost:8080/ttl/sendDelayMsg/第2条消息/2000

image

第二个消息被先消费掉,符合预期。

总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用其特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。

另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/54011.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

新上线软件需不需要防御?

导语&#xff1a;随着5G时代到来和ipv6的普及&#xff0c;攻击者手段层出不穷&#xff0c;从一开始简单的DDOS分布式拒绝服务&#xff0c;后到蔓延ACK 从不同协议通讯层面发起的攻击&#xff0c;现在CC请求类型攻击&#xff0c;已经可以绕过域名验证&#xff0c;以及模拟正常用…

css实现价格降价线

比较简单&#xff0c;直接上代码 <div class"container"><div>今日价格&#xff1a;$9.99</div><div>商品原价&#xff1a;<span class"price">$49.99</span></div> </div>.price {text-decoration: lin…

GJB 5000B二级-II实施基础

本实践域为新增实践域   思想:以GJB5000A的共用过程域中不乏实践为基础进行提炼并提升,结合各个行业的优秀实践和行业特点,坚持问题导向,使标准更具有指导性和可操作性;充分借鉴GJB9001C中:“4组织环境”、“7支持”的相关内容,形成实施基础实践域。本实践域强调突出重…

让你真实的看见 TCP 三次握手和四次挥手到底是什么样!

前言 TCP 建立连接是三次握手&#xff0c;而断开连接是四次挥手。 但事实上从你打开这篇文章&#xff0c;到关掉这篇文章&#xff0c;你是看不见这个过程的。 那 TCP 建立连接和断开连接的过程是不是真的如大多数文章所描绘的一样&#xff1f; 带着这些疑问&#xff0c;那就…

揭晓:一条SQL语句的执行过程是怎么样的?

数据库系统能够接受 SQL 语句&#xff0c;并返回数据查询的结果&#xff0c;或者对数据库中的数据进行修改&#xff0c;可以说几乎每个程序员都使用过它。 而 MySQL 又是目前使用最广泛的数据库。所以&#xff0c;解析一下 MySQL 编译并执行 SQL 语句的过程&#xff0c;一方面…

seata在nacos上注册IP为内网,启动时加了 -h 外网ip还是显示内网?

版本&#xff1a; 部署位置&#xff1a;Linux seata版本&#xff1a;1.5.1 问题&#xff1a; seata在nacos上注册IP为内网&#xff0c;启动时加了 -h 外网ip还是显示内网? 解决&#xff1a; 该版本存在-h失效问题&#xff0c;后面1.5.2就修掉-h失效的问题了。 可以在sea…

Web前端大作业——城旅游景点介绍(HTML+CSS+JavaScript) html旅游网站设计与实现

&#x1f468;‍&#x1f393;学生HTML静态网页基础水平制作&#x1f469;‍&#x1f393;&#xff0c;页面排版干净简洁。使用HTMLCSS页面布局设计,web大学生网页设计作业源码&#xff0c;这是一个不错的旅游网页制作&#xff0c;画面精明&#xff0c;排版整洁&#xff0c;内容…

更新UpdatePanel外部控件

目前处理项目问题的时候&#xff0c;发现有个功能有问题。 界面大致如下 版本radiobuttonlist&#xff08;在UpdatePanel外&#xff09; UpdatePanel 上传按钮 文件列表 UpdatePanel 正常逻辑&#xff1a; 上传文件后&#xff0c;文件列表会刷新。&#xff08;这块没问…

[附源码]Python计算机毕业设计Django家庭教育app

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;我…

[附源码]Python计算机毕业设计Django惠农微信小程序论文

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

B树(BTree)与B+树(B+Tree)

B树是什么&#xff1f; B树是一种多路平衡查找树 平衡&#xff0c;指的是子树高度相同&#xff08;即所有叶子结点均在同一层&#xff09;&#xff0c;即每个结点的平衡因子均等于0 多路&#xff0c;就是它除了根结点外&#xff08;之所以根结点的分叉数不限定&#xff0c;是…

【java】多线程

文章目录进程和线程继承Thread类的方式实现多线程设置和获取线程的名称线程优先级 线程调度控制线程线程的生命周期多线程的实现方式案例--卖票同步方法解决数据安全问题线程安全的类Lock锁生产者消费者模式概述案例进程和线程 继承Thread类的方式实现多线程 MyThread.java pa…

懵了,阿里一面就被虐了,幸获内推华为技术四面,成功拿到offer

上个月&#xff0c;哥们从某小厂离职&#xff0c;转投阿里云&#xff0c;简历优秀&#xff0c;很顺利地拿到了面试通知&#xff0c;但之后的进展却让哥们怀疑人生了&#xff0c;或者说让哥们懵逼的是&#xff0c;面试阿里云居然第一面就被吊打&#xff1f;让哥们开始怀疑自己&a…

【OpenCV-Python】教程:3-12 模板匹配

OpenCV Python 模板匹配 【目标】 利用模板匹配的方法寻找目标cv2.matchTemplate(), cv2.minMaxLoc() 【理论】 模板匹配是一个寻找大图像中目标位置的方法。OpenCV提供了函数 cv2.matchTemplate() 函数&#xff0c;通过在输入图像上滑动模板&#xff0c;将目标与滑动处的图…

[附源码]计算机毕业设计JAVA校园淘宝节系统

[附源码]计算机毕业设计JAVA校园淘宝节系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis …

【c++】STL--string

前言 最开始我们学习c语言的时候&#xff0c;我们发现刷题或者写代码都是比较麻烦的&#xff0c;如果说用c语言造一辆车&#xff0c;那么我需要用c语言先把轮子造好--各个零件&#xff0c;当我们造好之后再组装。那么c则是造好了轮子&#xff0c;只需要我们组装就好了。这里的的…

岩藻多糖-聚乙二醇-过氧化氢酶,Catalase-PEG-Fucoidan,过氧化氢酶-PEG-岩藻多糖

岩藻多糖-聚乙二醇-过氧化氢酶&#xff0c;Catalase-PEG-Fucoidan&#xff0c;过氧化氢酶-PEG-岩藻多糖 中文名称&#xff1a;岩藻多糖-过氧化氢酶 英文名称&#xff1a;Fucoidan-Catalase 别称&#xff1a;过氧化氢酶修饰岩藻多糖&#xff0c;过氧化氢酶-岩藻多糖 过氧化氢…

LiteFlow v2.9.4发布!一款能让你系统支持热更新,编排,脚本编写逻辑的国产规则引擎框架

前言 上海的天气降温让人猝不及防&#xff0c;但是我们的迭代速度却井然有序。 今天我们带来了LiteFlow v2.9.4版本。 我们每次的发布的issue有很大一部分依托于我们的使用者社区&#xff0c;社区人越来越多。我看到了使用者在使用过程中遇到的问题&#xff0c;也收集了很多…

【Java实战】这样写SQL语句性能嘎嘎好

目录 一、前言 二、SQL语句 1.【强制】不要使用 count(列名) 或 count(常量) 来替代 count(*)&#xff0c;count(*) 是 SQL92 定义的标准统计行数的语法&#xff0c;跟数据库无关&#xff0c;跟 NULL 和非 NULL 无关。 2.【强制】count(distinct col) 计算该列除 NULL 之外的…

如何实现网站首页变为黑白色?

某些时候&#xff0c;网站会根据要求将页面调成黑白色&#xff0c;一开始我还以为是将连夜把图片和文字都搞成黑白色&#xff0c;但是转念一想&#xff0c;像推送产品的京东、淘宝&#xff0c;以及展示up内容的B站、CSDN等&#xff0c;刷新之后可能展示的内容均不同&#xff0c…