RocketMQ 基于时间轮算法实现指定时间点的定时消息原理解析

news2025/1/16 3:01:45

在 RocketMQ 4.x 版本,使用延时消息来实现消息的定时消费。延时消息可以一定程度上实现定时发送,但是有一些局限。

RocketMQ 新版本基于时间轮算法引入了定时消息,目前,精确到秒级的定时消息实现的 pr 已经提交到社区,今天来介绍一下。

1 延时消息

1.1 简介

RocketMQ 的延时消息是指 Producer 发送消息后,Consumer 不会立即消费,而是需要等待固定的时间才能消费。在一些场景下,延时消息是很有用的,比如电商场景下关闭 30 分钟内未支付的订单。

使用延时消息非常简单,只需要给消息的 delayTimeLevel 属性赋值就可以。参考下面代码:

Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
//第 3 个级别,10s
message.setDelayTimeLevel(3);
producer.send(message);

延时消息有 18 个级别,如下:

org.apache.rocketmq.common.message.Message messageExt = this.sendMessageActivity.buildMessage(null,
 Lists.newArrayList(
  Message.newBuilder()
   .setTopic(Resource.newBuilder()
    .setName(TOPIC)
    .build())
   .setSystemProperties(SystemProperties.newBuilder()
    .setMessageId(msgId)
    .setQueueId(0)
    .setMessageType(MessageType.DELAY)
    .setDeliveryTimestamp(Timestamps.fromMillis(deliveryTime))
    //定义消息投递时间
    .setBornTimestamp(Timestamps.fromMillis(System.currentTimeMillis()))
    .setBornHost(StringUtils.defaultString(RemotingUtil.getLocalAddress(), "127.0.0.1:1234"))
    .build())
   .setBody(ByteString.copyFromUtf8("123"))
   .build()
 ),
Resource.newBuilder().setName(TOPIC).build()).get(0);
1.2 实现原理

延时消息的实现原理如下图:

Producer 把消息发送到 Broker 后,Broker 判断到是延时消息,首先会把消息投递到延时队列(Topic = SCHEDULE_TOPIC_XXXX,queueId = delayTimeLevel - 1)。定时任务线程池会有 18 个线程来对延时队列进行调度,每个线程调度一个延时级别,调度任务把延时消息再投递到原始队列,这样 Consumer 就可以拉取到了。

1.3 存在不足

延时消息存在着一些不足:

  1. 延时级别只有 18 个,并不能满足所有场景;

  2. 如果通过修改 messageDelayLevel 配置来自定义延时级别,并不灵活,比如一个在大规模的平台上,延时级别成百上千,而且随时可能增加新的延时时间;

  3. 延时时间不准确,后台的定时线程可能会因为处理消息量大导致延时误差大。

2 定时消息

为了弥补延时消息的不足,RocketMQ 5.0 引入了定时消息。

2.1 时间轮算法

为了解决定时任务队列遍历任务导致的性能开销,RocketMQ 定时消息引入了秒级的时间轮算法。如下图:

图中是一个 60s 的时间轮,时间轮上会有一个指向当前时间的指针定时地移动到下一个时间(秒级)。

时间轮算法的优势是不用去遍历所有的任务,每一个时间节点上的任务用链表串起来,当时间轮上的指针移动到当前的时间时,这个时间节点上的全部任务都执行。

虽然上面只是一个 60s 的时间轮,但是对于所有的时间延时,都是支持的。可以在每个时间节点增加一个 round 字段,记录时间轮转动的圈数,比如对于延时 130s 的任务,round 就是 2,放在第 10 个时间刻度的链表中。这样当时间轮转到一个节点,执行节点上的任务时,首先判断 round 是否等于 0,如果等于 0,则把这个任务从任务链表中移出交给异步线程执行,否则将 round 减 1 继续检查后面的任务。

2.2 使用方式

基于时间轮算法的思想,RocketMQ 实现了精准的定时消息。使用 RocketMQ 定时消息时,客户端定义消息的示例代码如下:

protected void fillDelayMessageProperty(apache.rocketmq.v2.Message message, org.apache.rocketmq.common.message.Message messageWithHeader) {
 if (message.getSystemProperties().hasDeliveryTimestamp()) {
  Timestamp deliveryTimestamp = message.getSystemProperties().getDeliveryTimestamp();
  //delayTime 这个延时时间默认不能超过 1 天,可以配置
  long deliveryTimestampMs = Timestamps.toMillis(deliveryTimestamp);
  validateDelayTime(deliveryTimestampMs);
        //...
  String timestampString = String.valueOf(deliveryTimestampMs);
  //MessageConst.PROPERTY_TIMER_DELIVER_MS="TIMER_DELIVER_MS"
  MessageAccessor.putProperty(messageWithHeader, MessageConst.PROPERTY_TIMER_DELIVER_MS, timestampString);
 }
}
2.3 实现原理
2.3.1 消息投递

上面的代码构中,Producer 创建消息时给消息传了一个系统属性 deliveryTimestamp,这个属性指定了消息投递的时间,并且封装到消息的 TIMER_DELIVER_MS 属性,代码如下:

//TimerMessageStore类
ByteBuffer tmpBuffer = timerLogBuffer;
tmpBuffer.clear();
tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size
tmpBuffer.putLong(slot.lastPos); //prev pos
tmpBuffer.putInt(magic); //magic
tmpBuffer.putLong(tmpWriteTimeMs); //currWriteTime
tmpBuffer.putInt((int) (delayedTime - tmpWriteTimeMs)); //delayTime
tmpBuffer.putLong(offsetPy); //offset
tmpBuffer.putInt(sizePy); //size
tmpBuffer.putInt(hashTopicForMetrics(realTopic)); //hashcode of real topic
tmpBuffer.putLong(0); //reserved value, just set to 0 now
long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);
if (-1 != ret) {
 // If it's a delete message, then slot's total num -1
 // TODO: check if the delete msg is in the same slot with "the msg to be deleted".
 timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,
  isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
 
}

Broker 收到这个消息后,如果判断到 TIMER_DELIVER_MS 这个属性有值,就会把这个消息投递到 Topic 是 rmq_sys_wheel_timer 的队列中,queueId 是 0,同时会保存原始消息的 Topic、queueId、投递时间(TIMER_OUT_MS)。

TimerMessageStore 中有个定时任务 TimerEnqueueGetService 会从 rmq_sys_wheel_timer 这个 Topic 中读取消息,然后封装 TimerRequest 请求并放到队列 enqueuePutQueue。

2.3.2 绑定时间轮

RocketMQ 使用 TimerLog 来保存消息的原始数据绑定到时间轮上。首先看一下 TimerLog 保存的数据结构,如下图:

参考下面代码:

//类 TimerWheel
public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) {
 localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE);
 localBuffer.get().putLong(timeMs / precisionMs);
 localBuffer.get().putLong(firstPos);
 localBuffer.get().putLong(lastPos);
 localBuffer.get().putInt(num);
 localBuffer.get().putInt(magic);
}

TimerEnqueuePutService 这个定时任务从上面的 enqueuePutQueue(2.3.1 节) 取出 TimerRequest 然后封装成  TimerLog。

那时间轮是怎么跟 TimerLog 关联起来的呢?RocketMQ 使用 TimerWheel 来描述时间轮,TimerWheel 中每一个时间节点是一个 Slot,Slot 保存了这个延时时间的 TimerLog 信息。数据结构如下图:

参考下面代码:

//类 TimerWheel
public void putSlot(long timeMs, long firstPos, long lastPos, int num, int magic) {
 localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE);
 localBuffer.get().putLong(timeMs / precisionMs);
 localBuffer.get().putLong(firstPos);
 localBuffer.get().putLong(lastPos);
 localBuffer.get().putInt(num);
 localBuffer.get().putInt(magic);
}

这样时间轮跟 TimerLog 就关联起来了,见下图:

如果时间轮的一个时间节点(Slot)上有一条新的消息到来,那只要新建一个 TimerLog,然后把它的指针指向该时间节点的最后一个 TimerLog,然后把 Slot 的 lastPos 属性指向新建的这个 TimerLog,如下图:

从源码上看,RocketMQ 定义了一个 7 天的以秒为单位的时间轮。

2.3.3 时间轮转动

转动时间轮时,TimerDequeueGetService 这个定时任务从当前时间节点(Slot)对应的 TimerLog 中取出数据,封装成 TimerRequest 放入 dequeueGetQueue 队列。

2.3.4 CommitLog 中读取消息

定时任务 TimerDequeueGetMessageService 从队列 dequeueGetQueue 中拉取 TimerRequest 请求,然后根据 TimerRequest 中的参数去 CommitLog(MessageExt) 中查找消息,查出后把消息封装到 TimerRequest 中,然后把 TimerRequest 写入 dequeuePutQueue 这个队列。

2.3.5 写入原队列

定时任务 TimerDequeuePutMessageService 从 dequeuePutQueue 队列中获取消息,把消息转换成原始消息,投入到原始队列中,这样消费者就可以拉取到了。

3 总结

RocketMQ 4.x 版本只支持延时消息,有一些局限性。而 RocketMQ 新版本引入了定时消息,弥补了延时消息的不足。定时消息的处理流程如下图:

可以看到,RocketMQ 的定时消息的实现还是有一定复杂度的,这里用到 5 个定时任务和 3 个队列来实现。

最后,对于定时时间的定义,客户端、Broker 和时间轮的默认最大延时时间定义是不同的,使用的时候需要注意。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1065127.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

unity中绑定动画的行为系统

主要代码逻辑是创建一个action队列,当动画播放结束时就移除队头,执行后面的事件 public class Enemy : MonoBehaviour {public event Action E_AnimatorFin;//当动画播放完毕时public Action DefaultAction;//默认事件public Dictionary<Action, string> EventAnimator n…

数据科学最佳实践:Kedro 的工程化解决方案 | 开源日报 No.47

leonardomso/33-js-concepts Stars: 58.4k License: MIT 这个项目是一个帮助开发者掌握 JavaScript 概念的资源库。该项目基于 Stephen Curtis 撰写的一篇文章&#xff0c;包含了对 33 个重要 JavaScript 概念全面深入地讲解&#xff0c;并被 GitHub 评为 2018 年最佳开源项目…

Python Random模块详解

Random模块详解 随机数 random模块 randint(a, b) 返回[a, b]之间的整数randrange ([start,] stop [,step]) 从指定范围内&#xff0c;按指定基数递增的集合中获取一个随机数&#xff0c;基数 缺省值为1。random.randrange(1,7,2)choice(seq) 从非空序列的元素中随机挑选一个…

驱动器类产品的接口EMC拓扑方案

驱动器类产品的接口EMC拓扑方案 1. 概述 本文以高压伺服驱动器和变频器类产品为例&#xff0c;对常用端口滤波拓扑方案进行总结&#xff0c;后续根据不同的应用场景可进行适当删减&#xff0c;希望对大家有帮助。 2. 驱动器验证等级 本文推荐拓扑的实验结果&#xff0c;满足…

Ps:选择并遮住

选择并遮住 Select and Mask主要用来提高选区边缘的品质&#xff0c;尤其在毛发等复杂边缘的抠图上发挥强大的作用。 Ps菜单&#xff1a;选择/选择并遮住 Select and Mask 快捷键&#xff1a;Ctrl Alt R 在所有选区工具的工具选项栏上以及图层蒙版的属性面板中都可以看到“选…

NAT+ACL+mstp小综合

三、实验一相关知识点 1&#xff0c;实验&#xff1a;NAT 综合实验 2&#xff0c;拓扑&#xff1a; 3&#xff0c;需求: 1&#xff09;&#xff0c;实现VLAN20 的除了20这台主机以外所有主机上网访问外网 2&#xff09;&#xff0c;实现VLAN30 的主机为奇数电脑上网 3&#…

详解C语言—预处理

目录 1、预处理 (1)预定义符号介绍 (2)预处理指令 #define #define 定义标识符&#xff1a; #define 定义宏&#xff1a; #define 替换规则 (3)预处理操作符# (4)预处理操作符## (5)带副作用的宏参数 (6)宏和函数对比 2、命名约定 3、预处理指令 #undef 4、命令行定…

用 Pytorch 自己构建一个Transformer

一、说明 用pytorch自己构建一个transformer并不是难事,本篇使用pytorch随机生成五千个32位数的词向量做为源语言词表,再生成五千个32位数的词向量做为目标语言词表,让它们模拟翻译过程,transformer全部用pytorch实现,具备一定实战意义。 二、论文和概要 …

mac连接easyconnnect显示“本地环境出现异常”

mac连接easyconnnect显示“本地环境出现异常” 解决方法&#xff1a; 终端下输入&#xff1a;vim ~/.zprofile文件内加入如下内容&#xff0c;如下图&#xff1a; ####解决连接easyconnnect显示“本地环境出现异常问题 function EC_start(){/Applications/EasyConnect.app/Co…

学信息系统项目管理师第4版系列19_质量管理

1. 公差 1.1. 质量测量中公差是测量指标的可允许变动范围&#xff0c;而不是实际测量值与预期值的差 1.1.1. 【高22下选35】 1.2. 结果的的可接受范围 2. 控制界限 2.1. 统计意义上稳定的过程或过程绩效的普通偏差的边界 3. 3版 3.1. 质量控制新七工具 3.1.1. 【高19下…

cpp primer笔记070-算法函数

accumulate的第三个参数的类型决定了函数中使用哪个加法运算符以及返回值的类型&#xff0c;如果返回值是自定义类型&#xff0c;需要使用accumlate&#xff0c;则需要重载运算符&#xff0c;该接口的第三个参数返回的是一个需要处理的数据类型的一个变量。 std::vector<std…

蓝桥等考Python组别十四级001

第一部分&#xff1a;选择题 1、Python L14 &#xff08;15分&#xff09; 运行下面程序&#xff0c;输出的结果是&#xff08; &#xff09;。 d {A: 501, B: 602, C: 703, D: 804} print(d[B]) 501602703804 正确答案&#xff1a;B 2、Python L14 &#xff08;15分…

吃鸡高手必备工具大揭秘!提高战斗力,分享干货,一站满足!

大家好&#xff01;你是否想提高吃鸡游戏的战斗力&#xff0c;分享顶级的游戏作战干货&#xff0c;方便进行吃鸡作图和查询装备皮肤库存&#xff1f;是否也担心被骗&#xff0c;希望查询游戏账号是否在黑名单上&#xff0c;或者查询失信人和VAC封禁情况&#xff1f;在这段视频中…

System Generator学习——使用 AXI 接口和 IP 集成器

文章目录 前言一、目标二、步骤1、检查 AXI 接口2、使用 System Generator IP 创建一个 Vivado 项目3、创建 IP 集成设计&#xff08;IPI&#xff09;4、实现设计 总结 前言 在本节中&#xff0c;将学习如何使用 System Generator 实现 AXI 接口。将以 IP 目录格式保存设计&am…

「专题速递」回声消除算法、低功耗音频、座舱音频系统、智能音频技术、低延时音效算法、手机外放增强算法...

随着多媒体和通信网络技术的持续升级&#xff0c;以及新型音视频应用场景的不断涌现&#xff0c;音频处理技术正朝着更加智能化和沉浸化的方向迅猛发展。人们对音频听觉体验的要求也逐渐提高&#xff0c;无论是在何种场景下&#xff0c;都期望获得更加清晰的声音&#xff0c;并…

吃鸡高手必备!这些技巧帮你提高战斗力!

大家好&#xff01;作为一名吃鸡玩家&#xff0c;我们都想提高自己的战斗力&#xff0c;享受顶级游戏作战干货&#xff0c;装备皮肤库存展示和查询&#xff0c;并避免被骗游戏账号。在这里&#xff0c;我将为大家介绍一些实用的技巧和工具&#xff0c;让你成为吃鸡高手&#xf…

三相逆变器下垂控制双机

下垂控制的原理推荐看这篇知乎&#xff08;形象又生动&#xff09;&#xff1a;https://www.zhihu.com/question/41003509/answer/518837491 主拓扑图 控制主要模块 Droop子模块 监控有功结果 1、从两台逆变器输出的有功功率波形可以看到&#xff0c;在负载突变的时候&#xf…

卷积网络的发展历史-AlexNet

简介 2012 年&#xff0c;Krizhevsky 与 Hinton 推出了 AlexNet&#xff0c;引起了许多学者对深度学习的研究&#xff0c;可以算是深度学习的热潮的起始标志。在图像分类领域不得不提的就是ImageNet大规模视觉挑战赛(ILSVRC)&#xff0c;它被称为深度学习在图像分类任务研究方…

《Spring框架原理》

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

杂记 define,typedef,static,memset,ifndef,递归,逻辑与,整型提升,算术转换

目录 常量&#xff0c;define typedef static ​编辑​编辑 #define定义常量和宏 指针 ​编辑 操作系统&#xff0c;网络 system执行系统命令 memset ifndef 递归 冒泡排序 单目操作符 逻辑与&& 隐式类型转换 整型提升 算术转换 有符号无符号所占的…