消息中间件之RocketMQ源码分析(二十八)

news2024/11/15 19:55:38

延迟消息存储机制

概述

什么是延迟消息呢?延迟消息也叫定时消息,一般地,生产者在发送消息后,消费者希望在指定的一段时间后再消费。常规做法是,把信息存储在数据库中,使用定时任务扫描,符合条件的数据再发送给消费者。典型的业务场景春节买票30分钟内完成订单支付。
RocketMQ延迟消息是通过ScheduleMessageService类实现的

核心属性

  • SCHEDULE_TOPIC:一个系统内置的Topic,用来保存所有定时消息。RocketMQ全部未执行的延迟消息保存在这个内部Topic中(现如今保存在TopicValidator中)
    在这里插入图片描述
  • FIRST_DELAY_TIME:第一次执行定时任务的延迟时间,默认为1000ms
    在这里插入图片描述
  • DELAY_FOR_A_WHILE:第二次及以后的定时任务检查间隔时间,默认为100ms
  • DELAY_FOR_A_PERIOD:如果延迟消息到时间投递时却失败了,会在DELAY_FOR_A_PERIOD中设置的ms后重新尝试投递,默认为10 000ms
  • delayLevelTable:保存延迟队列和延迟时间的映射关系
  • offsetTable:保存延迟级别及相应的消费位点
  • timer:用于执行定时任务,线程名叫ScheduleMessageTImerThread
    在这里插入图片描述
    在这里插入图片描述

核心方法

  • queueId2DelayLevel():将queueid转化为延迟级别
  • delayLevel2QueueId():将延迟级别转化为queueId

一个延迟级别保存在一个Queue中,延迟级别和Queue之间的转化关系为
queueId = delayLevel -1
在这里插入图片描述

  • updateOffset():更新延迟消息的Topic的消费位点
    在这里插入图片描述

  • computeDeliverTimestamp():根据延迟级别和消息的存储时间计算该延迟消息的投递时间
    在这里插入图片描述

  • start():启动延迟消息服务。启动第一次延迟消息投递的检查定时任务和持久化消费位点的定时任务
    在这里插入图片描述

  • shutdown():关闭start()方法中启动的timer任务
    在这里插入图片描述

  • load():加载延迟消息的消费位点信息和全部延迟级别信息,延迟级别可以通过messageDelayLevel字段进行设置,默认1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    在这里插入图片描述

  • parseDelayLevel();格式化所有延迟级别信息,并保存到内存中
    在这里插入图片描述

  • DeliverDelayedMessageTimerTask内部类用于检查延迟消息是否可以投递,DeliverDelayedMessageTImerTask是TimerTask的一个扩展实现
    在这里插入图片描述

延迟消息存储机制

在延迟消息的发送流程中,消息体中会设置一个delayTimeLevel,其他发送流程也是如此。Broker在接收延迟消息时会有几个地方单独处理再存储,其余过程和普通消息存储一致.

延迟消息在保存到CommitLog中的单独处理。CommitLog.putMessage()/asyncPutMessage
方法存储延迟消息的实现逻辑如图
在这里插入图片描述

  • msg.getDelayTimeLevel()是发送消息时可以设置的延迟级别,如果该值大于0,则表示当前处理的消息是一个延迟消息,将对该消息做如下修改:
    1.将原始Topic、queueId备份在消息的扩展字段中,全部的延迟消息都保存在SCHEDULE_TOPIC的Topic中
    2.备份原始Topic、queueId为延迟消息的Topic、queueId。备份的目的是当消息到达投递时间时会恢复原始的Topic和queueId,继而被消费者拉取并消费
    在这里插入图片描述
  • 经过处理后,该消息会被正常保存到CommitLog中,然后创建ConsumeQueue和IndexFile两个索引。在创建ConsumeQueue时,从CommitLog中获取的消息内容会单独进行处理,单独处理的逻辑方法是CommitLog.checkMessageAndReturnSize().
    有一个很精巧的设计:在CommitLog中查询出消息后,调用computeDeliverTimestamp()方法计算消息具体的投递时间,再将该时间保存在ConsumeQueue的tagCode中。
    这样设计的好处是,不需要检查CommitLog大文件,在定时任务检查消息是否需要投递时,只需要检查ConsumeQueue中的tagCode(不再是Tag的Hash值,而是消息可以投递的时间,单位是ms),如果满足条件再通过查询CommitLog将消息投递出去即可,如果每次都查询CommitLog,那么可想而知,效率会很低

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1486978.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

js字符串转json的3种方法

1.eval方式解析 function strToJson(str){var json eval("(" str ")");return json;}console.log(strToJson("{int:1, string:demo}")); 运行截图: 注: 记得别忘了str两旁的小括号。 永远不要使用 eval !!! eval() 是一…

最短路径(2.19)

目录 1.网络延迟时间 弗洛伊德算法 迪杰斯特拉算法 2. K 站中转内最便宜的航班 3.从第一个节点出发到最后一个节点的受限路径数 4.到达目的地的方案数 1.网络延迟时间 有 n 个网络节点,标记为 1 到 n。 给你一个列表 times,表示信号经过 有向 边的…

3、JavaWeb-Ajax/Axios-前端工程化-Element

P34 Ajax介绍 Ajax:Asynchroous JavaScript And XML,异步的JS和XML JS网页动作,XML一种标记语言,存储数据, 作用: 数据交换:通过Ajax给服务器发送请求, 并获取服务器响应的数据 异步交互&am…

C++ //练习 10.24 给定一个string,使用bind和check_size在一个int的vector中查找第一个大于string长度的值。

C Primer(第5版) 练习 10.24 练习 10.24 给定一个string,使用bind和check_size在一个int的vector中查找第一个大于string长度的值。。 环境:Linux Ubuntu(云服务器) 工具:vim 代码块 /*****…

VsCode配置PCL、Open3D自动补全

写在前面 本文内容 在VsCode上开发PCL、Open3D相关代码,代码自动补全 Open3D、PCL的安装使用见各个版本的Open3D、PCL的编译、使用教程 平台/环境 windows11(windows10): visual studio 2022;cmake 3.22; VsCode 通过cmake构建项目; 转载请…

基于Springboot免费搭载轻量级阿里云OSS数据存储库(将本地文本、照片、视频、音频等上传云服务保存)

一、注册阿里云账户 打开https://www.aliyun.com/,申请阿里云账户并完成实名认证(个人)。这种情况就是完成了: 二、开通OSS服务 点击立即开通即可。 三、创建Bucket 申请id和secert: 进去创建一个Accesskey就会出现以…

【软考】数据库的三级模式

目录 一、概念1.1 说明1.2 数据库系统体系结构图 二、外模式三、概念模式四、内模式 一、概念 1.1 说明 1.数据的存储结构各不相同,但体系结构基本上具有相同的特征,采用三级模式和两级镜像 2.数据库系统设计员可以在视图层、逻辑层和物理层对数据进行抽…

C语言:预处理

C语言:预处理 预定义符号#define定义常量定义宏宏与函数对比 #操作符##操作符条件编译头文件包含库文件包含本地文件包含嵌套文件包含 预定义符号 C语⾔设置了⼀些预定义符号,可以直接使⽤,预定义符号也是在预处理期间处理的。 __FILE__ //…

惠普GT5810打印机报错E9的处理方法

当打印机检测到供墨系统需要维护时,将会出现 E9 错误。 吴中函 打印出的带错误的供墨系统维护页包含解决该错误的说明。 出现 E9 警告时维持 HP Ink Tank 打印机的打印质量,出现 E9 警告时如何维持 HP Ink Tank 打印机的打印质量。 惠普5810报错E9通常…

UE 打包窗口及鼠标状态设置

UE 打包窗口及鼠标状态设置 打包后鼠标不锁定 显示鼠标图标 打包后设置窗口模式 找到打包路径下的配置文件GameUserSettings,设置相关项目 FullscreenMode0表示全屏模式,1表示窗口全屏模式,2表示窗口模式

Spring的Bean的生命周期 | 有图有案例

Spring的Bean的生命周期 Spring的Bean的生命周期整体过程实例化初始化服务销毁循环依赖问题 完整生命周期演示 Spring的Bean的生命周期 Spring Bean的生命周期:从Bean的实例化之后,通过反射创建出对象之后,到Bean称为一个完整的对象&#xf…

位运算---求n的二进制表示中第k位是1还是0 (lowbit)

操作: 先把第k位移到最后一位(右边第一位) 看个位是1还是0 lowbit(x):返回x的最右边的1。 原理: 其中 ,意思是 是 的补码。 就可以求出最右边的一位1。 应用: 当中 的个数。 int re…

案例介绍:汽车维修系统的信息抽取技术与数据治理应用(开源)

一、引言 在当今汽车产业的快速发展中,软件已经成为提升车辆性能、安全性和用户体验的关键因素。从车载操作系统到智能驾驶辅助系统,软件技术的进步正在重塑我们对汽车的传统认知。我有幸参与了一个创新项目,该项目专注于开发和集成先进的汽…

分布式ID生成算法|雪花算法 Snowflake | Go实现

写在前面 在分布式领域中,不可避免的需要生成一个全局唯一ID。而在近几年的发展中有许多分布式ID生成算法,比较经典的就是 Twitter 的雪花算法(Snowflake Algorithm)。当然国内也有美团的基于snowflake改进的Leaf算法。那么今天我们就来介绍一下雪花算法…

2024年智能驾驶年度策略:自动驾驶开始由创造型行业转向工程型行业

感知模块技术路径已趋于收敛,自动驾驶从创造型行业迈向工程型行业。在特斯拉的引领下,国内主机厂2022年以来纷纷跟随特斯拉相继提出“重感知、轻地图”技术方案,全球自动驾驶行业感知模块技术路径从百花齐放开始走向收敛。我们认为主机厂智能…

光学遥感卫星分辨率的奥秘 !!

文章目录 前言 1、光学遥感卫星分辨率的多维视角 (1)空间分辨率 (2)光谱分辨率 (3)辐射分辨率 (4)时间分辨率 2、光学遥感分辨率的重要性 3、遥感分辨率的挑战与进步 4、未来展望 总…

Git推送本地仓库至阿里云仓库

Git推送本地仓库至阿里云仓库 1.安装Git 参考Git安装详解 2.生成 SSH 密钥 基于RSA算法SSH 密钥 1.管理员权限运行Git Bash 2.输入生成密钥指令点击回车,选择 SSH 密钥生成路径。 $ ssh-keygen -t rsa -C "2267521563qq.com"3.以 RSA算法为例&…

ABAP - SALV教程17 弹窗ALV

SALV可以通过弹窗形式打开在生成SALV实例对象后调用set_screen_popup方法设置成弹出模式 "设置为弹窗模式 go_alv->set_screen_popup( start_column 10end_column 110start_line 5end_line 15). 显示效果 完整代码 SELECT *FROM ekkoINTO TABLE DATA(gt_dat…

C++自学精简实践教程

一、介绍 1.1 教程特点 一篇文章从入门到就业有图有真相,有测试用例,有作业;提供框架代码,作业只需要代码填空规范开发习惯,培养设计能力 1.2 参考书 唯一参考书《C Primer 第5版》​参考书下载: 蓝奏云…

如何自己系统的学python

学习Python是一项很好的投资,因为它是一种既强大又易于学习的编程语言,适用于多种应用,如数据分析、人工智能、网站开发等。下面是一个系统学习Python的步骤建议: 基础准备 安装Python: 访问Python官网下载最新版本的…