【RocketMQ】RocketMQ 5.0版本任意时刻延迟消息的实现原理浅析

news2025/3/1 18:11:03

文章目录

  • 意外发现
  • 设计方案
    • 时间轮
    • 定时消息存储
  • 具体实现
    • 流程图
    • 流程步骤

意外发现

无意间从官方的最新的客户端代码中看到下面的Example:

感兴趣的可以看看这个介绍:https://rocketmq.apache.org/docs/featureBehavior/02delaymessage

生产者:

        // Send delay messages.
        MessageBuilder messageBuilder = null;
        // Specify a millisecond-level Unix timestamp. In this example, the specified timestamp indicates that the message will be delivered in 10 minutes from the current time. 
        Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
        Message message = messageBuilder.setTopic("topic")
                // Specify the message index key. The system uses the key to locate the message. 
                .setKeys("messageKey")
                // Specify the message tag. The consumer can use the tag to filter messages. 
                .setTag("messageTag")
                .setDeliveryTimestamp(deliverTimeStamp)
                // Configure the message body.
                .setBody("messageBody".getBytes())
                .build();
        try {
            // Send the messages. Focus on the result of message sending and exceptions such as failures. 
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }

可以看到一句:setDeliveryTimestamp(deliverTimeStamp),也就是说可以支持任意时刻的延迟消息了???

消费者:

        // Consumption example 1: If a scheduled message is consumed by a push consumer, the consumer needs to process the message only in the message listener. 
        MessageListener messageListener = new MessageListener() {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                System.out.println(messageView.getDeliveryTimestamp());
                // Return the status based on the consumption result. 
                return ConsumeResult.SUCCESS;
            }
        };
        // Consumption example 2: If a scheduled message is consumed by a simple consumer, the consumer must obtain the message for consumption and submit the consumption result. 
        List<MessageView> messageViewList = null;
        try {
            messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                // After consumption is complete, the consumer must invoke ACK to submit the consumption result. 
                try {
                    simpleConsumer.ack(messageView);
                } catch (ClientException e) {
                    e.printStackTrace();
                }
            });
        } catch (ClientException e) {
            // If the pull fails due to system traffic throttling or other reasons, you must re-initiate the request to obtain the message. 
            e.printStackTrace();
        }

RocketMQ5.X版本新增了Proxy模块,从配置中可以看到,延迟消息方案目前默认还是通过按level来指定的,也就是说,可以选择不按level来执行了!!!

下面一起看看新版本是怎么实现任意时刻延迟消息的。

设计方案

时间轮

首先,RocketMQ对任意时刻延迟消息的支持,是基于主流的方案——时间轮做的,时间轮,对时刻表的一种抽象,通常使用数组实现。时刻表上的每一秒,顺序对应到数组中的位置,然后数组循环使用。时间轮的每一格,指向了TimerLog中的对应位置,如果这一格的时间到了,则按TimerLog中的对应位置以及prev_pos位置依次读出每条消息。时间轮一格一格向前推进,配合TimerLog,依次读出到期的消息,从而达到定时消息的目的。

时间轮的每一格设计如下:

定时消息存储

定时消息的记录文件,Append Only。每条记录包含一个prev_pos,指向前一条定时到同样时刻的记录。每条记录的内容可以包含定时消息本身,也可以只包含定时消息的位置信息。每一条记录包含如下信息:

名称大小备注
size4B保存记录的大小
prev_pos8B前一条记录的位置
current_time8B当前时间
magic4Bmagic value
delayed_time4B该条记录的定时时间
offset_real8B该条消息在commitLog中的位置
size_real4B该条消息在commitLog中的大小
hash_topic4B该条消息topic的hash code
varbody8B存储可变的body,暂时没有为空

具体实现

流程图

流程步骤

从图中可以看出,共有五个Service分别处理定时消息的放置和存储。工作流如下:

  1. 针对放置定时消息的service,每50ms从commitLog读取指定主题(rmq_sys_wheel_timer)的定时消息。

    a. TimerEnqueueGetService从commitLog读取得到定时主题的消息,并先将其放入enqueuePutQueue。

    org.apache.rocketmq.store.timer.TimerMessageStore.TimerEnqueueGetService#run

    org.apache.rocketmq.store.timer.TimerMessageStore#enqueue

    b. 另一个线程TimerEnqueuePutService将其放入timerLog,更新时间轮的存储内容。将该任务放进时间轮的指定位置。

    org.apache.rocketmq.store.timer.TimerMessageStore.TimerEnqueuePutService#run

    org.apache.rocketmq.store.timer.TimerMessageStore#doEnqueue

  2. 针对取出定时消息的service,每50ms读取下一秒的slot。有三个线程将读取到的消息重新放回commitLog。

    a. 首先,TimerDequeueGetService每50ms读一次下一秒的slot,从timerLog中得到指定的msgs,并放进dequeueGetQueue。

    org.apache.rocketmq.store.timer.TimerMessageStore.TimerDequeueGetService#run

    org.apache.rocketmq.store.timer.TimerMessageStore#dequeue


    b. 而后TimerDequeueGetMessageService从dequeueGetQueue中取出msg,并将其放入队列中。该队列为待写入commitLog的队列,dequeuePutQueue。

    org.apache.rocketmq.store.timer.TimerMessageStore.TimerDequeueGetMessageService#run

    c. 最后TimerDequeuePutMessageService将这个queue中的消息取出,若已到期则修改topic,放回commitlog,否则继续按原topic写回CommitLog滚动。

    org.apache.rocketmq.store.timer.TimerMessageStore.TimerDequeuePutMessageService#run

    org.apache.rocketmq.store.timer.TimerMessageStore#convertMessage

    消息转换,更改真实Topic

    投递消息

    org.apache.rocketmq.store.timer.TimerMessageStore#doPut

消息投递到真实Topic后,其实就变成了一条“正常的消息”了,消费者就能正常消费了,以上就是对RocketMQ 5.0中延迟消息的变更做的分析,参考了部分官方的资料,后续会使用5.0版本,实际做一些演练,目前对于这个新特性,官方并没有大肆宣扬,也不知道具体有哪些限制,所以还需要做一些实践,踩踩坑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/378921.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ES】Elasticsearch之数据类型

文章目录1、Mapping1.1 Mapping的作用1.2 Dynamic Mapping1.3 字段控制参数1.3.1 index1.3.2 Index Options1.3.3 null_value1.3.4 copy_to2、数据类型2.1 核心数据类型2.1.1 字符串类型2.1.2 数字类型2.1.3 日期类型2.1.3.1 date2.1.3.2 date_nanos2.1.4 布尔类型2.1.5 二进制…

java中使用protobuf总结

基本没怎么接触过java编程&#xff0c;别的团队发过来一个用java编写的存储pb的文件&#xff0c;让拆分和解析&#xff0c;硬着头皮做一下&#xff0c;在此将步骤做个记录&#xff1a;下载安装protobufhttps://github.com/protocolbuffers/protobuf/tags?afterv3.6.1.2编译pro…

Python大数据培训班特色优势及工作方向

Python大数据培训班有多个大数据培训班类型&#xff0c;同时也包括训练营、学徒班、就业班等。 具体班型&#xff1a; 大数据挖掘与人工智能&#xff08;大数据分析&#xff09;学徒班、大数据应用开发学徒班 大数据挖掘与人工智能&#xff08;大数据分析&…

网络安全从入门到精通:30天速成教程到底有多狠?你能坚持下来么?

毫无疑问&#xff0c;网络安全是当下最具潜力的编程方向之一。对于许多未曾涉足计算机编程的领域「小白」来说&#xff0c;深入地掌握网络安全看似是一件十分困难的事。至于一个月能不能学会网络安全&#xff0c;这个要看个人&#xff0c;对于时间管理不是很高的&#xff0c;肯…

DDOS攻击和CC攻击分别是什么?

CC攻击其实是DDOS攻击的一种。CC攻击的前身CC攻击的前身是一个名为Fatboy攻击程序&#xff0c;而之所以后来人们会成为CC&#xff0c;是因为DDOS攻击发展的初期阶段&#xff0c;绝大部分DDOS攻击都能被业界熟知的“黑洞”&#xff08;collapsar&#xff0c;一种安全防护产品&am…

生成和查看dump文件

在日常开发中&#xff0c;即使代码写得有多谨慎&#xff0c;免不了还是会发生各种意外的事件&#xff0c;比如服务器内存突然飙高&#xff0c;又或者发生内存溢出(OOM)。当发生这种情况时&#xff0c;我们怎么去排查&#xff0c;怎么去分析原因呢&#xff1f; 1. 什么是dump文…

云服务器迁移 (全网最省钱最详细攻略)

0x00 背景服务器续费比较贵&#xff0c;由于旧的云服务器用的时间比较长&#xff0c;上面部署的应用&#xff0c;环境复杂、数据多&#xff0c;在新的服务器部署比较麻烦&#xff0c;所以想到把服务器环境制作成镜像。新服务器的选择上&#xff0c;我这里选择的天翼云服务器&am…

IGKBoard(imx6ull)-PWM编程蜂鸣器编程控制

文章目录1- PWM介绍2- PWM使能&#xff08;1&#xff09;添加配置&#xff08;2&#xff09;export、unexport与npwm属性文件&#xff08;3&#xff09;duty_cycle、enable和period属性文件3- PWM测试编程&#xff08;1&#xff09;PWM8管脚连接&#xff08;2&#xff09;流程介…

京东JDBook笔记本怎么安装Win10系统使用?

京东JDBook笔记本怎么安装Win10系统使用&#xff1f;有用户使用的京东JDBook笔记本电脑系统是Win7系统的&#xff0c;最近想要去将系统升级到Win10来使用。那么如何将Win7的系统重装Win10呢&#xff1f;以下为大家带来详细的操作步骤图文教程。 准备工作&#xff1a; 1、U盘一个…

视频和视频帧:FFMPEG CPU解码API介绍

写在前面本文将介绍的如何用FFMPEG API做视频解码。视频解码&#xff0c;是将压缩后的视频&#xff08;压缩格式如H264&#xff09;通过对应解码算法还原为YUV视频流的过程&#xff1b;在计算机看来&#xff0c;首先输入一段01串&#xff08;压缩的视频&#xff09;&#xff0c…

用 @types 前缀的包是什么?有什么用?

前言 解决过 TypeScript 的项目大概都是从两个方向&#xff0c;Vue3 方向和 React Native 方向&#xff0c;而在 React Native 方向上我经常会遇到一个烦人的错误 Could not find a declaration file for module ‘juejin-type-study’. ‘d:/fe-project/nodejs/types-study/n…

看懂这篇文章-你就懂了信息安全的密码学

一、前言一个信息系统缺少不了信息安全模块&#xff0c;今天就带着大家全面了解并学习一下信息安全中的密码学知识&#xff0c;本文将会通过案例展示让你了解抽象的密码学知识&#xff0c;阅读本文你将会有如下收获&#xff1a; 熟悉现代密码学体系包含的主流密码技术 掌握Base…

SignalR 实时通讯

SignalR 实时通讯1.SignalR1.1.SignalR 简介1.2.SignalR 功能1.3.传输1.4.中心2.服务器2.1.配置中心2.2.上下文对象2.3.客户端对象2.4.创建2.5.中心功能实现4.客户端6.案例演示&#xff08;DotNet 客户端&#xff09;1.SignalR 1.1.SignalR 简介 SignalR 是一个开放源代码库&a…

内容感知、AI融合:让实景三维看山是山,看水是水

实景三维具备还原客观物理世界的优势性&#xff0c;但也正由于部分真实性的欠缺备受争议。这是因为传统的三维建模软件大多基于像元的匹配与计算的逻辑&#xff0c;对地物进行无差别的重建处理&#xff0c;最终生成的模型看起来扭曲怪异、残缺变形。常见的模型缺陷有&#xff1…

2022 OpenCV Spatial AI大赛前三名项目分享,开源、上手即用,优化了OAK智能双目相机的深度效果。

编辑&#xff1a;OAK中国 首发&#xff1a;oakchina.cn 喜欢的话&#xff0c;请多多&#x1f44d;⭐️✍ 内容可能会不定期更新&#xff0c;官网内容都是最新的&#xff0c;请查看首发地址链接。 ▌前言 Hello&#xff0c;大家好&#xff0c;这里是OAK中国&#xff0c;我是助手…

深圳居住证申领指南

打开广东政务服务网&#xff0c;在首页搜索【深圳经济特区居住证申领】在搜索结果中可以发现有如下链接&#xff0c;点击在线办理 会转到登陆界面&#xff0c;直接使用个人登录并用微信扫描登录 根据提示进行手机登录验证。 完成登录认证之后会自转到深圳经济特区居住证申领界…

二分查找由浅入深--算法--java

二分查找写在开头算法前提&#xff1a;算法逻辑算法实现简单实现leftright可能超过int表示的最大限度代码分析和变换更多需求&#xff1a;求索引最小的值java二分API应用基础题思考难度方法写在开头 二分查找应该是算比较简单的这种算法了&#xff0c;我本以为还可以。但有时候…

Word处理控件Aspose.Words功能演示:使用 Java 比较 MS Word 文档

Aspose.Words 是一种高级Word文档处理API&#xff0c;用于执行各种文档管理和操作任务。API支持生成&#xff0c;修改&#xff0c;转换&#xff0c;呈现和打印文档&#xff0c;而无需在跨平台应用程序中直接使用Microsoft Word。此外&#xff0c; Aspose API支持流行文件格式处…

动态规划初阶-爬楼梯问题

示例1&#xff1a; 输入&#xff1a;cost [10,15,20] 输出&#xff1a;15 解释&#xff1a;你将从下标为 1 的台阶开始。 - 支付 15 &#xff0c;向上爬两个台阶&#xff0c;到达楼梯顶部。 总花费为 15 。示例2&#xff1a; 输入&#xff1a;cost [1,100,1,1,1,100,1,1,10…

使用Docker安装MongoDB,整合SpringBoot

使用Docker安装MongoDB MongoDB 和 MySQL 都是常用的数据库管理系统&#xff0c;但它们的设计目标不同&#xff0c;因此在某些方面的性能表现也有所不同。 MongoDB 是一个文档型数据库&#xff0c;它采用了面向文档的数据模型&#xff0c;支持动态查询和索引&#xff0c;适合…