RocketMQ 5.x延时消息源码分析(不包含时间轮)

news2024/9/21 5:40:46

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

RocketMQ版本

  • 5.1.0

背景

首先说明本次源码分析仅分析时间轮之前的延时消息设计

现在的RocketMQ已经支持基于时间轮的任意级别延时消息

延时消息基础知识

默认RocketMQ延时消息共有18个等级

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

延时消息的简单使用

发送延时消息

        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        try {
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }

        Message msg = new Message(TOPIC /* Topic */,
                TAG /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );

        msg.setDelayTimeLevel(2);
        producer.send(msg);

和普通消息不同的我们新增了延时消息等级的设置

        msg.setDelayTimeLevel(2);

源码分析

消息发送本身和普通消息发送没有太大差别,但是新增了一个DELAY属性

public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";

消息发送的请求code是

public static final int SEND_BATCH_MESSAGE = 320;

源码入口

这里的我们先从消息的发送开始

消息发送最终的请求状态码是

public static final int SEND_MESSAGE_V2 = 310;

可以看到实际消息发送和普通消息没有什么区别,所以我们还是要去broker那边看看有没有什么特殊处理

broker处理请求

消息处理方法
org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest

实际的消息发送逻辑在方法

this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                        (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));

这里我们进去这个方法看看

由于是分析延时消息,所以我们不关注消息发送的一些其他细节。主要关注延时消息和普通消息的处理区别
我们看方法

org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage

这里可以看到现在的topic还是我们正常指定的xiao-zou-topic

但是这里会执行多个消息的后置处理器putMessageHookList

我们看看putMessageHookList有哪些

    public void registerMessageStoreHook() {
        List<PutMessageHook> putMessageHookList = messageStore.getPutMessageHookList();

        putMessageHookList.add(new PutMessageHook() {
            @Override
            public String hookName() {
                return "checkBeforePutMessage";
            }

            @Override
            public PutMessageResult executeBeforePutMessage(MessageExt msg) {
                return HookUtils.checkBeforePutMessage(BrokerController.this, msg);
            }
        });

        putMessageHookList.add(new PutMessageHook() {
            @Override
            public String hookName() {
                return "innerBatchChecker";
            }

            @Override
            public PutMessageResult executeBeforePutMessage(MessageExt msg) {
                if (msg instanceof MessageExtBrokerInner) {
                    return HookUtils.checkInnerBatch(BrokerController.this, msg);
                }
                return null;
            }
        });

        putMessageHookList.add(new PutMessageHook() {
            @Override
            public String hookName() {
                return "handleScheduleMessage";
            }

            @Override
            public PutMessageResult executeBeforePutMessage(MessageExt msg) {
                if (msg instanceof MessageExtBrokerInner) {
                    return HookUtils.handleScheduleMessage(BrokerController.this, (MessageExtBrokerInner) msg);
                }
                return null;
            }
        });

        SendMessageBackHook sendMessageBackHook = new SendMessageBackHook() {
            @Override
            public boolean executeSendMessageBack(List<MessageExt> msgList, String brokerName, String brokerAddr) {
                return HookUtils.sendMessageBack(BrokerController.this, msgList, brokerName, brokerAddr);
            }
        };

        if (messageStore != null) {
            messageStore.setSendMessageBackHook(sendMessageBackHook);
        }
    }

这里可以看到主要是三个

  1. checkBeforePutMessage
  2. innerBatchChecker
  3. handleScheduleMessage

通过方法名我们可以很确定的看到主要是handleScheduleMessage这个方法出处理延时消息,所以我们重点看看handleScheduleMessage

handleScheduleMessage

  • org.apache.rocketmq.broker.util.HookUtils#handleScheduleMessage
 public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
        final MessageExtBrokerInner msg) {
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        // 事务消息 暂时不管
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            if (!isRolledTimerMessage(msg)) {
                if (checkIfTimerMessage(msg)) {
                    if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
                        //wheel timer is not enabled, reject the message
                        return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
                    }
                    PutMessageResult transformRes = transformTimerMessage(brokerController, msg);
                    if (null != transformRes) {
                        return transformRes;
                    }
                }
            }
            // Delay Delivery 延时消息
            if (msg.getDelayTimeLevel() > 0) {
                transformDelayLevelMessage(brokerController, msg);
            }
        }
        return null;
    }

这里的核心逻辑还是

if (msg.getDelayTimeLevel() > 0) {
                transformDelayLevelMessage(brokerController, msg);
            }

我们看看transformDelayLevelMessage方法

public static void transformDelayLevelMessage(BrokerController brokerController, MessageExtBrokerInner msg) {
        // 判断是否超过最大延时级别18 如果超过则设置为最大延时等级
        if (msg.getDelayTimeLevel() > brokerController.getScheduleMessageService().getMaxDelayLevel()) {
            msg.setDelayTimeLevel(brokerController.getScheduleMessageService().getMaxDelayLevel());
        }

        // Backup real topic, queueId 备份真实的topic queueId
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
        // 设置延时消息为SCHEDULE_TOPIC_XXXX 这个固定的topic
        msg.setTopic(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);    
        // 设置延时消息的queueID delayLevel - 1 
        msg.setQueueId(ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()));
    }

备份前的message信息

备份后

总结就是

  1. 将原始topic替换为延迟消息固定的topic:SCHEDULE_TOPIC_XXXX
  2. 将原始queueid替换为delayLevel - 1
  3. 备份原始topic/queueid, 保存到原始消息的properties属性中

延时消息消费

我们通过查看SCHEDULE_TOPIC_XXXX的调用发现有一个方法

  • org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

我们查看调用链

发现很明显应该是使用了一个定时器去执行的,我们可以看看

这里可以看到把18个延时等级的任务都加进去了

deliverExecutorService的核心线程数也是18个

            this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));

我们这里看看丢进去的DeliverDelayedMessageTimerTask的任务里面里面的逻辑

  • org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup

核心逻辑都被封装在executeOnTimeup

我们进去看看executeOnTimeup

代码逻辑有点长我们,慢慢看

public void executeOnTimeup() {
            // 根据延迟topic和延迟queueid 去获取Consumequeue
            ConsumeQueueInterface cq =
                ScheduleMessageService.this.brokerController.getMessageStore().getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));
            // 如果 Consumequeue 为空则新增一个 DeliverDelayedMessageTimerTask 丢到 deliverExecutorService再定时执行,延时时间默认为100毫秒
            // Consumequeue存在但是没有消费文件 一样延时100毫秒后继续重复执行
            if (cq == null) {
                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
                return;
            }

            ReferredIterator<CqUnit> bufferCQ = cq.iterateFrom(this.offset);
            if (bufferCQ == null) {
                long resetOffset;
                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
                        this.offset, resetOffset, cq.getQueueId());
                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
                        this.offset, resetOffset, cq.getQueueId());
                } else {
                    resetOffset = this.offset;
                }

                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
                return;
            }

            long nextOffset = this.offset;
            try {
                while (bufferCQ.hasNext() && isStarted()) {
                    CqUnit cqUnit = bufferCQ.next();
                    long offsetPy = cqUnit.getPos();
                    int sizePy = cqUnit.getSize();
                    // 这里的 tagCode实际是投递时间
                    long tagsCode = cqUnit.getTagsCode();

                    if (!cqUnit.isTagsCodeValid()) {
                        //can't find ext content.So re compute tags code.
                        log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                            tagsCode, offsetPy, sizePy);
                        long msgStoreTime = ScheduleMessageService.this.brokerController.getMessageStore().getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                        tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                    }

                    long now = System.currentTimeMillis();
                    // 延时消息时间校验 如果超过deliverTimestamp now+delayLevel 时间则矫正为 now+delayLevel
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                    long currOffset = cqUnit.getQueueOffset();
                    assert cqUnit.getBatchNum() == 1;
                    nextOffset = currOffset + cqUnit.getBatchNum();

                    long countdown = deliverTimestamp - now;
                    // 如果延时时间超过当前时间,证明还未到消息处理时间
                    if (countdown > 0) {
                        this.scheduleNextTimerTask(currOffset, DELAY_FOR_A_WHILE);      
                        ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);
                        return;
                    }
                    // 加载出延时消息
                    MessageExt msgExt = ScheduleMessageService.this.brokerController.getMessageStore().lookMessageByOffset(offsetPy, sizePy);
                    if (msgExt == null) {
                        continue;
                    }
                    // 构建新的消息体,将原来的消息信息设置到这里,并将topic和queueid设置为原始的topic和queueid(前面备份过)
                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                            msgInner.getTopic(), msgInner);
                        continue;
                    }

                    boolean deliverSuc;
                    if (ScheduleMessageService.this.enableAsyncDeliver) {
                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
                    } else {
                        // 将消息写入到 commitlog中
                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
                    }

                    if (!deliverSuc) {
                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                        return;
                    }
                }
            } catch (Exception e) {
                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
            } finally {
                bufferCQ.release();
            }

            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
        }

上面的代码我们将核心逻辑做了注释,我们总结一下大致流程

  1. 校验对应的延时等级消息是否存在ConsumeQueue,如果不存在则延时100ms继续轮训
  2. 校验ConsumeQueue中的索引是否存在,如果不存在则延时100ms继续轮训
  3. 校验索引里面的tagsCode 实际是投递时间是否到了需要投递的时间,如果是则取出延时消息
  4. 将延时消息还原为原始消息,投递到commitLog中继续消费

这里我们通过debug看看消息转换前后的参数

转换前的延时消息

转换后的原始消息

消息的重新投递是通过syncDeliver方法

deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), currOffset, offsetPy, sizePy);
private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
            int sizePy) {
            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
            PutMessageResult result = resultProcess.get();
            boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
            if (sendStatus) {
                ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
            }
            return sendStatus;
        }

如果成功则更新延时消息的消费进度,注意延时消息的消费进度是存储在

    private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
        new ConcurrentHashMap<>(32);

实际的配置文件是delayOffset.json

自此RocketMQ的延时消息我们就分析完了

总结

总的来说延时消息就是默认分为18个队列,然后启动18个线程一直去扫描这18个队列。如果没有消息就过100毫秒继续重复扫描,周而复始
如果扫描到消息则将延时消息TopicSCHEDULE_TOPIC_XXXX中的消息取出来,然后重新转换为原始消息,包括原始消息的TopicqueueId,然后重新写入到commitLog

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/824733.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023财务ERP-业财一体化ERP系统功能应用,实现河北企业报表专业管理|亿发

信息技术迅速发展&#xff0c;传统的人工管理会导致财务与业务数据割裂&#xff0c;繁琐的数据处理和低效的财务管理已经不能满足河北省企业的发展需要&#xff0c;需要通过智能化手段持续推动河北企业的组织和管理变革。而在数字化时代&#xff0c;业财一体化建设与应用正逐渐…

PySpark-RDD编程入门

文章目录 2.PySpark——RDD编程入门2.1 程序执行入口SparkContext对象2.2 RDD的创建2.2.1 并行化创建2.2.2 获取RDD分区数2.2.3 读取文件创建 2.3 RDD算子2.4 常用Transformation算子2.4.1 map算子2.4.2 flatMap算子2.4.3 reduceByKey算子2.4.4 WordCount回顾2.4.5 groupBy算子…

Connection is read-only. Queries leading to data modification are not allowe解决

场景&#xff1a;用动态代理类调用本类中的事务方法后&#xff0c;又调用其他service的方法时&#xff0c;会抛此异常 解决方法:事务注解放在本类方法上&#xff0c;让处于同一事务下即可 初步认为是动态代理类影响了事务aop。

vsphere之vmotion精华 虚拟机迁移

概念 vmotion是vsphere高级功能的基础&#xff0c;DRS、HA、FT等功能都依赖于vmotion。简单来说&#xff0c;vmotion指虚拟机漂移。比如有一台VM原来在exsi1上运行&#xff0c;我们可以通过vmotion技术把VM移动到exsi2上。 那为什么要用到vmotion呢&#xff1f;显而易见&…

隔断玻璃内部的雕花工艺有哪些特点

隔断玻璃内部的雕花工艺具有以下几个特点&#xff1a; 1. 精细细腻&#xff1a;隔断玻璃内部的雕花工艺通常需要经过精细的雕刻和打磨&#xff0c;以展现出细腻的纹理和图案。因此&#xff0c;这种工艺在细节上非常令人印象深刻。 2. 独特个性&#xff1a;隔断玻璃内部的雕花工…

华中科技大学成功验证LK-99,美国实验室证实常温常压超导理论可行!

原创 | 文 BFT机器人 上周&#xff0c;全球物理学界迎来了一则令人震惊的消息&#xff1a;一支韩国科学家团队宣布他们发现了全球首个室温超导材料。这种材料被称为“改性铅磷灰石晶体结构&#xff08;下称LK-99&#xff0c;一种掺杂铜的铅磷灰石&#xff09;”。这项研究引起…

炒股杠杆途乐证券;股票买入卖出时间规则?

股票买入卖出时刻规则是指出资者在股票商场上进行生意交易时需求遵循的一系列时刻规定。正确的买入和卖出时刻能够协助出资者最大化出资回报&#xff0c;一起降低风险。但是&#xff0c;在股票商场上&#xff0c;生意时刻的挑选是一个复杂的问题&#xff0c;需求从多个角度剖析…

angr学习-入门篇

前言&#xff1a; 资源链接&#xff1a;GitHub - jakespringer/angr_ctf&#xff08;题库仓库&#xff0c;里面有个讲解angr的PPT&#xff0c;里面有官方的题解很详细&#xff09; GitHub - Hustcw/Angr_Tutorial_For_CTF: angr tutorial for ctf 安装&#xff1a; 关于angr…

JAVA学习(十)

1. 线程上下文切换 巧妙地利用了时间片轮转的方式, CPU 给每个任务都服务一定的时间&#xff0c;然后把当前任务的状态保存 下来&#xff0c;在加载下一任务的状态后&#xff0c;继续服务下一任务&#xff0c;任务的状态保存及再加载, 这段过程就叫做 上下文切换。时间片轮转的…

【Linux】线程同步 -- 条件变量 | 生产者消费者模型 | 自旋锁 |读写锁

初识生产者消费者模型同步条件变量初步使用 POSIX信号量其他常见的各种锁自旋锁读写锁 初识生产者消费者模型 举一个例子&#xff1a; 学生去超市消费的时候&#xff0c;与厂家生产的时候&#xff0c;两者互不相冲突。 生产的过程与消费的过程 – 解耦 临时的保存产品的场所(…

APP外包开发的android开发框架

Android的开发框架有很多&#xff0c;每个框架的特点不同&#xff0c;选择哪种框架取决于特定的开发需求和项目目标。今天和大家分享这方面的知识&#xff0c;以下是一些比较常见且重要的开发框架及其特点&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c…

【JavaEE】深入了解Spring中Bean的可见范围(作用域)以及前世今生(生命周期)

【JavaEE】Spring的开发要点总结&#xff08;4&#xff09; 文章目录 【JavaEE】Spring的开发要点总结&#xff08;4&#xff09;1. Bean的作用域1.1 一个例子感受作用域的存在1.2 通过例子说明作用域的定义1.3 六种不同的作用域1.3.1 singleton单例模式&#xff08;默认作用域…

Shell脚本学习-应用启动脚本

利用系统函数模拟实现应用脚本启动的特殊颜色效果&#xff1a; /server/scripts/start_nginx.sh {start|stop|restart}&#xff0c;用if语句实现。 [rootvm1 scripts]# cat start_nginx.sh #!/bin/bash. /etc/init.d/functionsUSAGE(){echo "USAGE: $0 {start|stop|resta…

DBeaver开源数据库管理工具发布23.1.3版本

导读DBeaver开源数据库管理软件近日发布了v23.1.3版本,该版本在空间数据查看器、数据传输、数据编辑器等多个模块进行了优化,提升了软件的可用性和兼容性。 具体来看,空间数据查看器新增了地图对象标记和曲线渲染支持,也实现了坐标复制等功能。数据传输模块增强了XLSX文件导入和…

探究LCS透明屏的工作原理

LCS透明屏是一种新型的显示技术&#xff0c;它能够在显示屏上实现透明效果&#xff0c;使得用户可以同时看到屏幕上的内容和背后的物体。这种技术在商业广告、展览、零售等领域有着广泛的应用前景。 LCS透明屏的工作原理是利用液晶分子的特性来控制光的透过与阻挡。 液晶分子可…

WebGPU重塑Web开发的未来

一、 WebGL 1.1 什么是WebGL 说到 WebGL,就不得不说说 OpenGL。在早期的个人电脑中,使用最广泛的 3D 图形渲染技术是 Direct3D 和 OpenGL。Direct3D 是微软 DirectX 技术的一部分,主要用于 Windows 平台。 OpenGL 作为一种开源的跨平台技术,赢得了众多开发者的青睐。 后…

ES6基础知识十:你是怎么理解ES6中 Decorator 的?使用场景?

一、介绍 Decorator&#xff0c;即装饰器&#xff0c;从名字上很容易让我们联想到装饰者模式 简单来讲&#xff0c;装饰者模式就是一种在不改变原类和使用继承的情况下&#xff0c;动态地扩展对象功能的设计理论。 ES6中Decorator功能亦如此&#xff0c;其本质也不是什么高大…

JavaData:JDK8之前传统的日期和时间

Data JDK8之前传统的日期和时间 //目标:掌握Date日期类的使用。 //1、创建一个Date的对象:代表系统当前时间信息的。 Date d new Date(); system.out.println(d);//2、拿到时间毫秒值。 long time d.getTime(); system.out.println(time);//3、把时间毫秒值转换成日期对象:2…

僵尸进程(Zombie process )及孤儿进程简介

1.僵尸进程如何产生&#xff1f; 僵尸进程是当子进程比父进程先结束&#xff0c;而父进程又没有回收子进程&#xff0c;释放子进程占用的资源&#xff0c;此时子进程将成为一个僵尸进程。如果父进程先退出 &#xff0c;子进程被init接管&#xff0c;子进程退出后init会回收其占…