MQ 延迟队列

news2025/1/11 7:45:21

MQ 延迟队列

1. 前言

延迟队列是我们日常开发过程中,经常接触并需要使用到的一种技术方案。前些时间在开发业务需求时,我也遇到了一个需要使用到延迟消息队列的需求场景,因此我也在网上调研了一系列不同的延迟队列的实现方案,在此进行了一个总结并且给大家进行分享。

2. 延迟队列定义

首先,队列这种数据结构相信大家都不陌生,它是一种先进先出的数据结构。普通队列中的元素是有序的,先进入队列中的元素会被优先取出进行消费;

延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。从某种意义上来讲,延迟队列的结构并不像一个队列,而更像是一种以时间为权重的有序堆结构。

3. 应用场景

我在开发业务需求时遇到的使用场景是这样的,用户可以在小程序中订阅不同的微信或者 QQ 的模板消息,产品同学可以在小程序的管理端新建消息推送计划,当到达指定的时间节点的时候给所有订阅模板消息的用户进行消息推送。

如果仅仅是服务单一的小程序,那也许起个定时任务,或者甚至人工的定时去执行能够最便捷最快速的去完成这项需求,但我们希望能够抽象出一个消息订阅的模块服务出来给所有业务使用,这时候就需要一种通用的系统的解决方案,这时候便需要使用到延迟队列了。

除了上述我所遇到的这样的典型的需求以外,延迟队列的应用场景其实也非常的广泛,比如说以下的场景:

  1. 新建的订单,如果用户在 15 分钟内未支付,则自动取消。
  2. 公司的会议预定系统,在会议预定成功后,会在会议开始前半小时通知所有预定该会议的用户。
  3. 安全工单超过 24 小时未处理,则自动拉企业微信群提醒相关责任人。
  4. 用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。

对于数据量比较少并且时效性要求不那么高的场景,一种比较简单的方式是轮询数据库,比如每秒轮询一下数据库中所有数据,处理所有到期的数据,比如如果我是公司内部的会议预定系统的开发者,我可能就会采用这种方案,因为整个系统的数据量必然不会很大并且会议开始前提前 30 分钟提醒与提前 29 分钟提醒的差别并不大。

但是如果需要处理的数据量比较大实时性要求比较高,比如淘宝每天的所有新建订单 15 分钟内未支付的自动超时,数量级高达百万甚至千万,这时候如果你还敢轮询数据库怕是要被你老板打死,不被老板打死估计也要被运维同学打死。

这种场景下,就需要使用到我们今天的主角 —— 延迟队列了。延迟队列为我们提供了一种高效的处理大量需要延迟消费消息的解决方案。那么话不多说,下面我们就来看一下几种常见的延迟队列的解决方案以及他们各自的优缺点。

4. 实现方案 Redis ZSet

我们知道 Redis 有一个有序集合的数据结构 ZSet,ZSet 中每个元素都有一个对应 Score,ZSet 中所有元素是按照其 Score 进行排序的。

在这里插入图片描述

那么我们可以通过以下这几个操作使用 Redis 的 ZSet 来实现一个延迟队列:

  1. 入队操作:ZADD KEY timestamp task, 我们将需要处理的任务,按其需要延迟处理时间作为 Score 加入到 ZSet 中。Redis 的 ZAdd 的时间复杂度是O(logN),N是 ZSet 中元素个数,因此我们能相对比较高效的进行入队操作。

  2. 起一个进程定时(比如每隔一秒)通过ZREANGEBYSCORE方法查询 ZSet 中 Score 最小的元素,具体操作为:ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES。查询结果有两种情况:

  • a. 查询出的分数小于等于当前时间戳,说明到这个任务需要执行的时间了,则去异步处理该任务;
  • b. 查询出的分数大于当前时间戳,由于刚刚的查询操作取出来的是分数最小的元素,所以说明 ZSet 中所有的任务都还没有到需要执行的时间,则休眠一秒后继续查询;

同样的,ZRANGEBYSCORE操作的时间复杂度为 O(logN + M) ,其中N为 ZSet 中元素个数,M为查询的元素个数,因此我们定时查询操作也是比较高效的。

这里从网上搬运了一套 Redis 实现延迟队列的后端架构,其在原来 Redis 的 ZSet 实现上进行了一系列的优化,使得整个系统更稳定、更健壮,能够应对高并发场景,并且具有更好的可扩展性,是一个挺不错的架构设计,其整体架构图如下:

在这里插入图片描述

其核心设计思路:

  1. 将延迟的消息任务通过 hash 算法路由至不同的 Redis Key 上,这样做有两大好处:

    • a. 避免了当一个 KEY 在存储了较多的延时消息后,入队操作以及查询操作速度变慢的问题(两个操作的时间复杂度均为O(logN))。
    • b. 系统具有了更好的横向可扩展性,当数据量激增时,我们可以通过增加 Redis Key 的数量来快速的扩展整个系统,来抗住数据量的增长。
  2. 每个 Redis Key 都对应建立一个处理进程,称为 Event 进程,通过上述步骤 2 中所述的 ZRANGEBYSCORE 方法轮询 Key,查询是否有待处理的延迟消息。

  3. 所有的 Event 进程只负责分发消息,具体的业务逻辑通过一个额外的消息队列异步处理,这么做的好处也是显而易见的:

    • a. 一方面,Event 进程只负责分发消息,那么其处理消息的速度就会非常快,就不太会出现因为业务逻辑复杂而导致消息堆积的情况。
    • b. 另一方面,采用一个额外的消息队列后,消息处理的可扩展性也会更好,我们可以通过增加消费者进程数量来扩展整个系统的消息处理能力。
  4. Event 进程采用 Zookeeper 选主单进程部署的方式,避免 Event 进程宕机后,Redis Key 中消息堆积的情况。一旦 Zookeeper 的 leader 主机宕机,Zookeeper 会自动选择新的 leader 主机来处理 Redis Key 中的消息。

从上述的讨论中我们可以看到,通过 Redis Zset 实现延迟队列是一种理解起来较为直观,可以快速落地的方案。并且我们可以依赖 Redis 自身的持久化来实现持久化,使用 Redis 集群来支持高并发和高可用,是一种不错的延迟队列的实现方案。

5. 实现方案 RabbitMQ

RabbitMQ 本身并不直接提供对延迟队列的支持,我们依靠 RabbitMQ 的 TTL 以及 死信队列 功能,来实现延迟队列的效果。那就让我们首先来了解一下,RabbitMQ 的死信队列以及 TTL 功能。

5.1 死信队列

死信队列实际上是一种 RabbitMQ 的消息处理机制,当 RabbmitMQ 在生产和消费消息的时候,消息遇到如下的情况,就会变成“死信”:

  1. 消息被拒绝basic.reject/ basic.nack 并且不再重新投递 requeue=false
  2. 消息超时未消费,也就是 TTL 过期了
  3. 消息队列到达最大长度

消息一旦变成一条死信,便会被重新投递到死信交换机(Dead-Letter-Exchange),然后死信交换机根据绑定规则转发到对应的死信队列上,监听该队列就可以让消息被重新消费。

5.2 消息生存时间 TTL

TTL(Time-To-Live)是 RabbitMQ 的一种高级特性,表示了一条消息的最大生存时间,单位为毫秒。如果一条消息在 TTL 设置的时间内没有被消费,那么它就会变成一条死信,进入我们上面所说的死信队列。

有两种不同的方式可以设置消息的 TTL 属性,一种方式是直接在创建队列的时候设置整个队列的 TTL 过期时间,所有进入队列的消息,都被设置成了统一的过期时间,一旦消息过期,马上就会被丢弃,进入死信队列,参考代码如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

在延迟队列的延迟时间为固定值的时候,比较适合使用这种方式。

另一种方式是针对单条消息设置,参考代码如下,该消息被设置了 6 秒的过期时间:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg content".getBytes());

如果需要不同的消息设置不同的延迟时间,上面针对队列的 TTL 设置便无法满足我们的需求,需要使用这种针对单个消息的 TTL 设置。

不过需要注意的是,使用这种方式设置的 TTL,消息可能不会按时死亡,因为 RabbitMQ 只会检查第一个消息是否过期。比如这种情况,第一个消息设置了 20s 的 TTL,第二个消息设置了 10s 的 TTL,那么 RabbitMQ 会等到第一个消息过期之后,才会让第二个消息过期。

解决这个问题的方法也很简单,只需要安装 RabbitMQ 的一个插件即可:

https://www.rabbitmq.com/community-plugins.html

安装好这个插件后,所有的消息就都能按照被设置的 TTL 过期了。

5.3 RabbitMQ 实现延迟队列

好了,介绍完 RabbitMQ 的死信队列以及 TTL 这两种特性之后,我们离实现延迟队列就只差一步之遥了。

聪明的读者可能已经发现了,TTL 不就是延迟队列中消息要延迟的时间么?如果我们把需要延迟的消息,将 TTL 设置为其延迟时间,投递到 RabbitMQ 的普通队列中,一直不去消费它,那么经过 TTL 的时间后,消息就会自动被投递到死信队列,这时候我们使用消费者进程实时地去消费死信队列中的消息,不就实现了延迟队列的效果。

从下图可以直观的看出使用 RabbitMQ 实现延迟队列的整体流程:

在这里插入图片描述

使用 RabbitMQ 来实现延迟队列,我们可以很好的利用一些 RabbitMQ 的特性,比如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延迟队列不可用或者消息丢失。

6. TimeWheel

TimeWheel 时间轮算法,是一种实现延迟队列的巧妙且高效的算法,被应用在 Netty,Zookeeper,Kafka 等各种框架中。

6.1 时间轮

在这里插入图片描述

如上图所示,时间轮是一个存储延迟消息的环形队列,其底层采用数组实现,可以高效循环遍历。这个环形队列中的每个元素对应一个延迟任务列表,这个列表是一个双向环形链表,链表中每一项都代表一个需要执行的延迟任务。

时间轮会有表盘指针,表示时间轮当前所指时间,随着时间推移,该指针会不断前进,并处理对应位置上的延迟任务列表。

6.2 添加延迟任务

由于时间轮的大小固定,并且时间轮中每个元素都是一个双向环形链表,我们可以在 O(1) 的时间复杂度下向时间轮中添加延迟任务。

如下图,例如我们有一个这样的时间轮,在表盘指针指向当前时间为 2 时,我们需要新添加一个延迟 3 秒的任务,我们可以快速计算出延迟任务在时间轮中所对应的位置为 5,并添加到位置 5 上任务列表尾部。

在这里插入图片描述

6.3 多层时间轮

到现在为止一切都非常棒,但是细心的同学可能发现了,上面的时间轮的大小是固定的,只有 12 秒。如果此时我们有一个需要延迟 200 秒的任务,我们应该怎么处理呢?直接扩充整个时间轮的大小吗?这显然不可取,因为这样做的话我们就需要维护一个非常非常大的时间轮,内存是不可接受的,而且底层数组大了之后寻址效率也会降低,影响性能。

为此,Kafka 引入了多层时间轮的概念。其实多层时间轮的概念和我们的机械表上时针、分针、秒针的概念非常类似,当仅使用秒针无法表示当前时间时,就使用分针结合秒针一起表示。同样的,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中,如下图所示:

在这里插入图片描述

第一层时间轮整个时间轮所表示时间范围是 0-12 秒,第二层时间轮每格能表示的时间范围是整个第一层时间轮所表示的范围也就是 12 秒,所以整个第二层时间轮能表示的时间范围即 12*12=144 秒,依次类推第三层时间轮能表示的范围是 1728 秒,第四层为 20736 秒等等。

比如现在我们需要添加一个延时为 200 秒的延迟消息,我们发现其已经超过了第一层时间轮能表示的时间范围,我们就需要继续往上层时间轮看,将其添加在第二层时间轮 200/12 = 17 的位置,然后我们发现 17 也超过了第二次时间轮的表示范围,那么我们就需要继续往上层看,将其添加在第三层时间轮的 17/12 = 2 的位置。

Kafka 中时间轮算法添加延迟任务以及推动时间轮滚动的核心流程如下,其中 Bucket 即时间轮中的延迟任务队列,并且 Kafka 引入的 DelayQueue 解决了多数 Bucket 为空导致的时间轮滚动效率低下的问题:

在这里插入图片描述

使用时间轮实现的延迟队列,能够支持大量任务的高效触发。并且在 Kafka 的时间轮算法的实现方案中,还引入了 DelayQueue,使用 DelayQueue 来推送时间轮滚动,而延迟任务的添加与删除操作都放在时间轮中,这样的设计大幅提升了整个延迟队列的执行效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1522831.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计算机网络----计算机网络的基础

目录 一.计算机网络的相关概念 二.计算机网络的功能 三.计算机网络的发展 四.计算机网络的组成 五.计算机网络的分类 六.计算机的性能指标 1.速率 2.带宽 3.吞吐量 4.时延 5.时延带宽积 6.往返时延RTT 7.利用率 七.计算机的分层结构 八.ISO/OSI参考模型 九.OSI…

Word粘贴时出现“运行时错误53,文件未找到:MathPage.WLL“的解决方案

在安装完MathType后&#xff0c;打开word复制粘贴时报错“运行时错误53,文件未找到&#xff1a;MathPage.WLL” 首先确定自己电脑的位数&#xff08;这里默认32位&#xff09; 右击MathType桌面图标&#xff0c;点击“打开文件所在位置”&#xff0c; 然后分别找到MathPage.W…

RabbitMQ高级-高级特性

1.消息可靠性传递 在使用RabbitMQ的时候&#xff0c;作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性模式 1.confirm 确认模式 确认模式是由exchange决定的 2.return 退回模式 回退模式是由routing…

Webapi(.net6) 批量服务注册

如果不考虑第三方库&#xff0c;如Autofac这种进行服务注入&#xff0c;通过本身的.Core Weabpi实现的&#xff0c;总结了两种实现方法&#xff0c; 1.一种是参考abp框架里面的形式; 1.1 新建个生命周期的文件夹: 三个接口分别为: public interface IScopedDependency { }pu…

机器学习周报第33周

目录 摘要Abstract一、文献阅读1.1 论文标题1.2 论文摘要1.3 论文背景1.4 过去研究1.5 论文介绍1.5.1 论文模型1.5.2 时空交互学习模块&#xff08;Spatiotemporal Interactive Learning Module&#xff09;1.5.3 动态图推理模块&#xff08;Dynamic Graph Inference Module&am…

Uniapp有奖猜歌游戏系统源码,附带流量主

有奖猜歌游戏是一款基于uni-app、uniCloud、uniAD 开发的小游戏&#xff0c;通过猜歌曲、观看广告赚取现金奖励。 游戏基本特征 玩家可以通过猜歌、做任务等方式直接获取现金奖励 玩家可以通过猜歌、拆红包、做任务等方式获取金币奖励&#xff0c;当金币累积到一定数量可以兑…

C++之类和对象(3)

目录 1. 再谈构造函数 1.1 构造函数体赋值 1.2 初始化列表 1.3 explicit 2. static成员 2.1 概念 3. 友元 3.1 友元函数 3.2 友元类 4. 内部类 5. 匿名对象 6. 拷贝对象时编译器做出的优化 1. 再谈构造函数 1.1 构造函数体赋值 class Date { public:Date(int year2024…

实现界面跳转及注册界面编写(AndroidStudio)

目录 一、代码 二、最后效果 一、代码 1.先新建一个activity文件 2.注册界面的代码如下&#xff1a; <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"android:la…

(附数据集)基于lora参数微调Qwen1.8chat模型的实战教程

基于lora微调Qwen1.8chat的实战教程 日期&#xff1a;2024-3-16作者&#xff1a;小知运行环境&#xff1a;jupyterLab描述&#xff1a;基于lora参数微调Qwen1.8chat模型。 样例数据集 - qwen_chat.json&#xff08;小份数据&#xff09; - chat.json&#xff08;中份数据&…

Tuxera NTFS 2023安装使用教程 Tuxera NTFS破解版 Tuxera NTFS for Mac优惠

对于必须在Windows电脑和Mac电脑之间来回切换的Mac朋友来说&#xff0c;跨平台不兼容一直是一个巨大的障碍&#xff0c;尤其是当我们需要使用NTFS格式的硬盘在Windows和macOS之间共享文件时。因为Mac默认不支持写入NTFS磁盘。 为了解决这一问题&#xff0c;很多朋友会选择很便捷…

vscode插件开发-发布插件

安装vsce vsce是“Visual Studio Code Extensions”的缩写&#xff0c;是一个用于打包、发布和管理VS Code扩展的命令行工具。 确保您安装了Node.js。然后运行&#xff1a; npm install -g vscode/vsce 您可以使用vsce轻松打包和发布扩展&#xff1a; // 打包插件生成name…

RansomwareSim:一款功能强大的勒索软件模拟研究学习工具

关于RansomwareSim RansomwareSim是一款功能强大的勒索软件模拟研究学习工具&#xff0c;该工具是为网络安全教育和培训目的开发的模拟勒索软件应用程序&#xff0c;它旨在为广大研究人员演示勒索软件如何加密系统上的文件并与命令和控制服务器通信&#xff0c;以更好地了解勒…

“一键解锁复古魅力:底片效果瞬间生成!“

时光荏苒&#xff0c;岁月如梭。你是否曾怀念那些旧时光里&#xff0c;老照片所散发出的独特韵味&#xff1f;那种历经岁月沉淀的底片效果&#xff0c;仿佛能带我们回到那些被遗忘的角落&#xff0c;重温那些温馨的瞬间。 首先第一步&#xff0c;我们要进入视频剪辑高手&#…

java数据结构与算法刷题-----LeetCode376. 摆动序列

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 贪心2. 动态规划3. 优化版动态规划 1. 贪心 解题思路&#x…

【强化学习笔记一】初识强化学习(定义、应用、分类、性能指标、小车上山案例及代码)

文章目录 第1章 初识强化学习1.1 强化学习及其关键元素1.2 强化学习的应用1.3 强化学习的分类1.3.1 按任务分类1.3.2 按算法分类 1.4 强化学习算法的性能指标1.5 案例&#xff1a;基于Gym库的智能体/环境接口1.5.1 安装Gym库1.5.2 使用Gym库1.5.3 小车上山1.5.3.1 有限动作空间…

软考80-上午题-【面向对象技术3-设计模式】-结构型设计模式03

一、外观模式 1-1、意图 为子系统中的一组接口提供一个一致的界面。 Facade 模式定义了一个高层接口&#xff0c;这个接口使得这一子系统更加容易使用。 1-2、结构 Facade 知道哪些子系统类负责处理请求&#xff1a;将客户的请求代理给适当的子系统对象。Subsvstem classes …

Mock.js了解(Mock就是模拟一个后端,Postman模拟前端)

JSON5 Node.js Vue CLI与Mock.js Jquery与Mock.js Mock与分页

Linux - 线程互斥和互斥锁

文章目录 前言一、为什么要线程互斥原子性 二、互斥锁互斥锁的创建与销毁互斥锁进行互斥 前言 前几节课&#xff0c;我们学习了多线程的基础概念&#xff0c;这节课&#xff0c;我们来对线程互斥和互斥锁的内容进行学习。 一、为什么要线程互斥 首先我们要明白&#xff0c;对…

openGauss学习笔记-244 openGauss性能调优-SQL调优-典型SQL调优点-统计信息调优

文章目录 openGauss学习笔记-244 openGauss性能调优-SQL调优-典型SQL调优点-统计信息调优244.1 统计信息调优244.1.1 统计信息调优介绍244.1.2 实例分析&#xff1a;未收集统计信息导致查询性能差 openGauss学习笔记-244 openGauss性能调优-SQL调优-典型SQL调优点-统计信息调优…

JVM学习-底层字节码的执行过程

目录 1.一个简单的程序分析 2. a&#xff0c;a&#xff0c;a--在JVM中的执行过程 3. 一个好玩的xx 4.方法调用的字节码分析、多态的实现、对象头 5. try-catch-finally的字节码分析 5.1 try-catch 5.2 try-catch-finally 5.3特殊情况 5.3.1 try和finally块中都出现了re…