Rocketmq 定时消息源码分析

news2025/1/12 18:07:32

定时消息定义

生产者将消息投放到broker后,不会马上被消费者消费。需要等待到特定时间才会被消费。

调用链路

  1.  producer 将定时消息写入commitLog
  2. 线程ReputThead 休息1毫秒,读取一次commitlog数据,写入ConsumeQueue和IndexFile
  3. 线程ScheduledService 首次延时1秒执行,以后延迟100毫秒执行。职责是将到期的延时消息投放普通消息

定时消息详细介绍

核心类

ScheduedMessageService

rocketmq 不支持任意精度的定时消息,仅支持指定级别的定时消息。

核心配置

delayLevelTable

1s 5s 10s 30s 1m 2m ..... 1h 2h 每个级别对应一个TimerTask

offsetTable

消息消费进度维护

具体逻辑

定时消息写入commitlog

检查msg的DelayTimeLevel ,若大于0.重写消息主题改成SCHEDUELED_TOPIC_XXXX

按延迟级别改写队列id。

CommitLog#putMessage

        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }

ScheduledMessageService#delayLevel2QueueId

    public static int delayLevel2QueueId(final int delayLevel) {
        return delayLevel - 1;
    }

时序图

定时消息写入ConsumeQueue

DefaultMessageStore.ReputMessageService#run

休息1毫秒执行一次

        @Override
        public void run() {
            DefaultMessageStore.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    Thread.sleep(1);
                    this.doReput();
                } catch (Exception e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            DefaultMessageStore.log.info(this.getServiceName() + " service end");
        }

DefaultMessageStore.ReputMessageService#run 

从commitLog取出数据,

                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

若消息主题是 SCHEDULED_TOPIC_XXXX  ,则设置consumeQueue的tagsCode是延时消息的到期时间

CommitLog#checkMessageAndReturnSize

                    String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                    if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {
                        int delayLevel = Integer.parseInt(t);

                        if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                            delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
                        }

                        if (delayLevel > 0) {
                            tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                                storeTimestamp);
                        }
                    }

返回对象DispatchRquest

         int readLength = calMsgLength(sysFlag, bodyLen, topicLen, propertiesLength);
            if (totalSize != readLength) {
                doNothingForDeadCode(reconsumeTimes);
                doNothingForDeadCode(flag);
                doNothingForDeadCode(bornTimeStamp);
                doNothingForDeadCode(byteBuffer1);
                doNothingForDeadCode(byteBuffer2);
                log.error(
                    "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
                    totalSize, readLength, bodyLen, topicLen, propertiesLength);
                return new DispatchRequest(totalSize, false/* success */);
            }

            return new DispatchRequest(
                topic,
                queueId,
                physicOffset,
                totalSize,
                tagsCode,
                storeTimestamp,
                queueOffset,
                keys,
                uniqKey,
                sysFlag,
                preparedTransactionOffset,
                propertiesMap
            );
        } catch (Exception e) {
        }

        return new DispatchRequest(-1, false /* success */);
    }

追加到ConsumeQueue和IndexFile

                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

public void doDispatch(DispatchRequest req) {
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
}

时序图

image.png

检查定时消息线程配置

首次执行延迟1秒

            this.timer = new Timer("ScheduleMessageTimerThread", true);
            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level);
                if (null == offset) {
                    offset = 0L;
                }

                if (timeDelay != null) {
                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            }

以后延迟100毫秒执行

            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);

线程检查ConsumeQueue

取出指定主题和队列的数据。

    public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";

            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) {
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);

检查延迟消息是否到期

correctDeliverTimestamp 检查是否到期

       ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) {
                                if (cq.getExt(tagsCode, cqExtUnit)) {
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else {
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

        private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {

            long result = deliverTimestamp;

            long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
            if (deliverTimestamp > maxTimestamp) {
                result = now;
            }

            return result;
        }

投放普通消息到CommitLog

创建普通消息


        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setBody(msgExt.getBody());
            msgInner.setFlag(msgExt.getFlag());
            MessageAccessor.setProperties(msgInner, msgExt.getProperties());

            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
            long tagsCodeValue =
                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
            msgInner.setTagsCode(tagsCodeValue);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

            msgInner.setSysFlag(msgExt.getSysFlag());
            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
            msgInner.setBornHost(msgExt.getBornHost());
            msgInner.setStoreHost(msgExt.getStoreHost());
            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());

            msgInner.setWaitStoreMsgOK(false);
            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);

//原消息主题            

msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
            //原消息队列id
            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
            int queueId = Integer.parseInt(queueIdStr);
            msgInner.setQueueId(queueId);

            return msgInner;
        }

写入消息

                     PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.writeMessageStore
                                                .putMessage(msgInner);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/830636.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

所学即所用:方飞将AI技术运用于反偷猎领域

原创 | 文 BFT机器人 方飞&#xff0c;高中毕业于江苏省常州高级中学&#xff0c;于2007年进入清华大学电子工程系攻读学士学位&#xff0c;2011年本科毕业后赴美国南加州大学计算机系攻读博士&#xff0c;主要从事安全博弈研究&#xff0c;师从安全博弈领域的权威专家 Milind…

vxworks文件系统分析

参考https://www.freebuf.com/articles/endpoint/335030.html 测试固件 https://service.tp-link.com.cn/detail_download_7989.html 固件提取 binwalk解压固件&#xff0c;在第一部分即为要分析的二进制文件&#xff0c;可以拖进ida分析 设置为arm小端字节序&#xff0c;点…

爆火的“为i做e”梗,小红书如何成为年轻人的社交货币?

话题浏览超13亿&#xff0c;“新社交密码”抢占用户心智 2023-08-03 草稿临时预览&#xff0c;有效期剩余59分59秒 请勿包含诱导分享&#xff0c;虚假中奖&#xff0c;违法违纪等信息。 爆火的“为i做e”梗、将MBTI写进个人简介、花样百出的MBI梗图 ...... 从去年5月到现在&…

手把手教你安装Eclipse最新版本的详细教程 (非常详细,非常实用)

简介 首先声明此篇文章主要是针对测试菜鸟或者刚刚入门的小伙们或者童鞋们&#xff0c;大佬就没有必要往下看了。 写这篇文章的由来是因为后边要用这个工具&#xff0c;但是由于某些原因有部分小伙伴和童鞋们可能不会安装此工具&#xff0c;为了方便小伙伴们和童鞋们的后续学习…

第五届宁波市卫生健康系统信息化技能竞赛暨赛前培训成功举办 平凯星辰受邀授课

近日&#xff0c; 第五届宁波市卫生健康系统第五届信息化技能竞赛暨赛前培训在宁波饭店成功举办 。本次培训吸引了来自区、县、市属各级医疗单位的信息化相关负责人参与。宁波市卫生信息中心副主任唐玲作主题发言&#xff0c; 平凯星辰作为中国数据库代表厂商&#xff0c;受邀进…

企业级,Pytest自动化测试框架脚本编写总结,看这篇就够了...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 用到的知识点&…

web爬虫第四弹 - JS逆向入门(猿人学第一题)

0- 前言 爬虫是一门需要实战的学问。 而对于初学者来说&#xff0c;要想学好反爬&#xff0c;js逆向则是敲门砖。今天给大家带来一个js逆向入门实例&#xff0c;接下来我们一步一步来感受下入门的逆向是什么样的。该案例选自猿人学练习题。猿人学第一题 1- 拿到需求 进入页面…

矩阵怎么求导数(学习笔记)

当标量 拓展到向量的时候 需要弄清楚形状 这里 看图大概是不清晰的 先要看清楚谁是向量 y 是向量 x 是标量 求导之后 仍然还是向量 y 是标量 x 是向量 求导之后 仍然还是向量 两个都是向量 求导之后 是矩阵 标量大家都会的 啊 求导 的意义很重要 如图所示 梯度一定指…

51单片机(普中HC6800-EM3 V3.0)实验例程软件分析 实验一 点亮第一个LED

目录 前言 一、原理图及知识点介绍 1.1、LED原理图 1.2、MCU51原理图 二、代码分析 知识点一&#xff1a;#include "reg52.h" //此文件中定义了单片机的一些特殊功能寄存器 知识点二&#xff1a;你知道sfr P0 0x80;是怎么来的呢为什么要赋值0x80&#xff…

Java基础面试题2

Java基础面试题 一、IO和多线程专题 1.介绍下进程和线程的关系 进程&#xff1a;一个独立的正在执行的程序 线程&#xff1a;一个进程的最基本的执行单位&#xff0c;执行路径 多进程&#xff1a;在操作系统中&#xff0c;同时运行多个程序 多进程的好处&#xff1a;可以充…

7种有效安全的网页抓取方法,如何避免被禁止?

网页抓取是一种从互联网上抓取网页内容的过程&#xff0c;但在网络抓取种相信您也经常遇到障碍&#xff1f;尤其是做跨境业务的&#xff0c;在抓取国外的网站时更有难度。但我们站在您的立场上&#xff0c;提供七种有效的方法来进行网页抓取而不被阻止&#xff0c;最大限度地降…

JVM面试突击班2

JVM面试突击班2 对象被判定为不可达对象之后就“死”了吗 对象的生命周期 创建阶段 &#xff08;1&#xff09;为对象分配存储空间 &#xff08;2&#xff09;开始构造对象 &#xff08;3&#xff09;从超类到子类对static成员进行初始化 &#xff08;4&#xff09;超类成…

如何搭建WordPress博客网站,并且发布至公网上?

如何搭建WordPress博客网站&#xff0c;并且发布至公网上&#xff1f; 文章目录 如何搭建WordPress博客网站&#xff0c;并且发布至公网上&#xff1f;概述前置准备1 安装数据库管理工具1.1 安装图形图数据库管理工具&#xff0c;SQL_Front 2 创建一个新数据库2.1 创建数据库2.…

基于DiscordMidjourney API接口实现文生图

https://discord.com/api/v9/interactions 请求头&#xff1a; authorization:取自 浏览器中discord 文生图请求头中的 authorization 的值 Content-Type:application/json 请求体&#xff1a; {“type”:2,“application_id”:“93692956130267xxxx”,“guild_id”:“1135900…

Error message “error:0308010C:digital envelope routines::unsupported“

https://stackoverflow.com/questions/69692842/error-message-error0308010cdigital-envelope-routinesunsupported nvm install 16即可解决

Python接口自动化之使用requests库发送http请求

requests库 ​ 什么是Requests &#xff1f;Requests 是⽤Python语⾔编写&#xff0c;基于urllib&#xff0c;采⽤Apache2 Licensed开源协议的 HTTP 库。它⽐ urllib 更加⽅便&#xff0c;可以节约我们⼤量的⼯作&#xff0c;完全满⾜HTTP测试需求。 ​ 安装&#xff1a;cmd命…

Vivado常见critical warning 、error

综合 Designutils 20-1281 如下图所示 可能原因 在复制Vivado工程时&#xff0c;IP文件缺少导致 解决方法 重新生成IP即可

【雕爷学编程】MicroPython动手做(29)——物联网之SIoT 2

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

Chat模块封装

封装保存用户类 utils/chat.js class Chat{constructor(){// 当前登录的用户this._user null;// 会话数组 和多个人this._sessions []; //user message// 当前会话 &#xff08;和谁在聊天&#xff09;this._current_session null;}setUser(user){this._user user} }exp…

百度贴吧视频推送排名软件怎么卖

百度贴吧视频上传工具 软件有月卡、季卡、半年卡、年卡 【有时软件个别卡种售空&#xff0c;价格有上涨下降不定&#xff0c;需要的话联系客服获取当日价格】 视频教程&#xff1a; 软件功能&#xff1a; 1.软件不限制账号&#xff0c;可以批量循环发布 2.贴吧同步功能&am…