【RocketMQ】深入剖析延迟消息核心实现原理

news2025/1/13 13:51:32

一、背景

电商相关业务的时候,有一个常见的需求场景是:用户下单之后,超过半小时不支付,就取消订单。现在我们在淘宝京东买东西,或者通过美团点外卖,下单之后,如果不在指定时间内支付,订单也会取消。 那么,如何实现这样的超时取消逻辑呢,通过消息队列的延时消息,是一个非常稳定的实现方案。

RocketMQ 就提供了这样的延时消息功能,producer 端在发送消息时,设置延迟级别,从秒级到分钟小时等等。消息在发送之后,会在消息队列的服务器进行存储。等过了设定的延迟时间之后,消息才会被 consumer 端消费到。

如果我们在下单的时候,发送一条设置延时30分钟的消息,这条消息会在30分钟之后被下游系统消费,然后判断订单有没有支付,如果没有支付,则取消订单。那么这样,通过消息队列就完成了一个延迟取消的逻辑了。

二、原理

2.1、设置延时

先来看一下如何设置消息的延时 消息体可以通过setDelayTimeLevel方法来设置延时级别

public void produce() {
    Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    msg.setDelayTimeLevel(1)
    SendResult sendResult = producer.send(msg);
}

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

其实是将延迟信息存到 Message 的 property 中(property是一个保存meta信息的hashmap)

public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

void putProperty(final String name, final String value) {
    if (null == this.properties) {
        this.properties = new HashMap<String, String>();
    }

    this.properties.put(name, value);
}

之后,broker收到 message之后,会根据 message 中设置的延时级别进行处理 可以看看延时级别的具体情况: 一共分为18个级别(1-18),对应时间从1s到2h

public class MessageStoreConfig {
    
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

}

那么整个系统是怎么做到让consumer在设定的延时之后,开始消费指定消息的呢?

不得不说,RocketMQ 的设计还是挺巧妙的,我们接着往下看。

2.2、消息预存

对于broker收到的延时消息,并不是和普通消息一样,进入发送端指定的topic中, 而是进入专门的延时topic中,延时topic有18条队列(queueId 编号0-17),queueId 和 delayLevel 的关系是 queueId + 1 = delayLevel,是一一对应的。所以计算延时消息的待执行时间deliverTimestamp之后,会将消息存入对应延时级别的队列中。

// 如果是延迟消息
if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }
    
    // 重设延迟消息的topic和queueId,topic为指定的RMQ_SYS_SCHEDULE_TOPIC
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    ...
    // 将实际的指定topic和queueId进行存入property,进行备份
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));        
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

之后,会由ScheduleMessageService来进行任务处理。ScheduleMessageService是broker启动时就开始执行的,用来处理延迟队列中的消息,处理的逻辑如下所示。


public class ScheduleMessageService extends ConfigManager {

    // key: delayLevel | value: delayTimeMillis 
    private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

    public void start() {
        // 创建一个Timer,用于执行定时任务
        this.timer = new Timer("ScheduleMessageTimerThread", true);
            
        // 这里对每个delayLevel的queue都创建一个DeliverDelayedMessageTimerTask,
        // 用来处理对应queue中的消息
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
    }
    
}

ScheduleMessageService启动之后,会根据延时队列的数目创建一一对应的DeliverDelayedMessageTimerTask,然后周期执行。该类继承自TimerTask,是JDK的工具类,用于执行定时任务。

2.3、消息转投

可以看到DeliverDelayedMessageTimerTask实现的 run 方法,主要逻辑都在executeOnTimeup方法中,从对应的延迟队列中取出时间已到的 message,发送到 message 对应原始topic的队列中。只要队列没有发生消费积压,message 就会马上被消费了。(这部分的代码实现比较复杂,感兴趣可以去看对应的源码)

class DeliverDelayedMessageTimerTask extends TimerTask {
    private final int delayLevel;
    private final long offset;

    public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
        this.delayLevel = delayLevel;
        this.offset = offset;
    }

    @Override
    public void run() {
        try {
            if (isStarted()) {
                this.executeOnTimeup();
            }
        } catch (Exception e) {
            // XXX: warn and notify me
            log.error("ScheduleMessageService, executeOnTimeup exception", e);
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
        }
    }

    public void executeOnTimeup() {
        ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                delayLevel2QueueId(delayLevel));

        long failScheduleOffset = offset;

        if (cq != null) {
            
            // 这部分是核心逻辑,实现的是 从延时消息队列中取出 deliverTimestamp - now <= 0 的消息,
            // 将消息从延时queue移到原本指定Topic的queue中,这些消息就马上会被consumer消费。
        } 

        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);
    }
}

总体的原理示意图,如下所示:

broker 在接收到延时消息的时候,会将延时消息存入到延时TOPIC的队列中,然后ScheduleMessageService中,每个 queue 对应的定时任务会不停地被执行,检查 queue 中哪些消息已到设定时间,然后转发到消息的原始TOPIC,这些消息就会被各自的 producer 消费了。

三、拓展-消费重试

平常在使用RocketMQ的时候,一般会依赖consumer的消费重试功能。 而consumer端的消费重试,其实也是通过这个和延时队列差不多的原理来实现的。

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            // 这里如果返回RECONSUME_LATER,就会重试消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试。

  • 业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER

  • 业务消费方返回null

  • 业务消费方主动/被动抛出异常

业务代码中,一般会利用重试功能去做下游逻辑的重试。而RocketMQ的重试并不是固定时间间隔重复进行,二是采取的退避式重试,重试的时间间隔会不断变长。 这个时间间隔,和设置delayLevel的延时类似。

Consumer客户端会通过processConsumeResult方法处理每一条消息的消费结果,如果判断需要进行重试,则会通过sendMessageBack方法将消息发送到broker,重试消息会带上已重试次数的信息。

broker收到消息之后,SendMessageProcessor会对重试消息进行处理,设置topic为RETRY_TOPIC,具体逻辑如下:

public class SendMessageProcessor
    extends AbstractSendMessageProcessor
    implements NettyRequestProcessor {

    private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {

        // 给重试消息设置新的topic
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

        // 根据已经发生重试的次数确定delayLevel
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);

        // 重试次数+1
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        // 存储消息
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

        // ...
    }   
}

存储消息的时候,CommitLog.putMessage方法内部判断如果设置了delayLevel,就会重设topic为SCHEDULE_TOPIC,然后将消息存储到延时队列中,后续就是和ScheduleMessageService的逻辑相同。

整个消息重试的逻辑示意图如下所示:

如图所示

  1. Consumer在消费的时候,都会订阅指定的TOPIC-NORMAL_TOPIC和该ConsumerGroup对应的重试TOPIC-RETRY_GROUP1_TOPIC,同时消费来自这两个topic中的消息。

  2. 当发生消费失败后,Client端会调用sendMessageBack方法将失败消息发送回broker。

  3. broker端的SendMessageProcessor会根据当前的重试次数确定延时级别,将消息存入延时队列-SCHEDULE_TOPIC中。

  4. ScheduleMessageService会将到期的消息重新发送到重试TOPIC-RETRY_GROUP1_TOPIC中,这个时候消息被Consumer消费,就完成了整个重试过程。

可以对比之前的延时消息流程去看,其实重试消息就是将失败的消息当作延时消息进行处理,只不过最后投入的是专门的重试消息队列中。

四、总结

延时消息都是非常日常业务使用中很重要的功能,而RocketMQ通过时间片分级+多队列+定时任务,就实现了这样的功能,设计上是很巧妙的。并且消费重试采用退避式的策略,重试时间的梯度刚好与延时消息策略一致,这样就可以直接利用延时队列去完成消息重试的功能,从策略上来说非常合理(消息消费重复失败,在短时间内重试成功的可能性比较低),并且复用了底层代码,这些是值得去学习和借鉴的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1183874.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

个人实用的街头防身自卫术,男女必学的防身实战技能

一、教程描述 本套教程&#xff0c;大小455.93M&#xff0c;共有17个文件。 二、教程目录 实战防身术01、街头防身自卫术示例.mp4 实战防身术02、街头防身自卫术序言.mp4 实战防身术03、腕部被抓解脱.mp4 实战防身术04、胸襟被抓解脱.mp4 实战防身术05、腰部被抓解脱.mp…

应用在全固态激光雷达中的ALS环境光传感芯片

全固态扫描式激光雷达系统这一创新性技术在多个领域都有着巨大的潜力&#xff0c;将改变未来科技格局。本文将探讨这一革命性的发明&#xff0c;以及它在自动驾驶、无人机、工业自动化、环境监测等领域的关键应用。 传统激光雷达系统通常使用复杂的机械装置&#xff0c;这些部…

如何上传自己的Jar到Maven中央仓库

在项目开发过程中&#xff0c;我们常常会使用 Maven 从仓库拉取开源的第三方 Jar 包。本文将带领大家将自己写好的代码或开源项目发布到 Maven中央仓库中&#xff0c;让其他人可以直接依赖你的 Jar 包&#xff0c;而不需要先下载你的代码后 install 到本地。 注册帐号 点击以…

基于Pymavlink协议的BlueROV开发

1 BlueROV概述 1.1 什么是ROV 维基百科遥控潜水器&#xff08;Remotely operated underwater vehicle&#xff0c;缩写ROV&#xff09;是一个无人的水下航行器&#xff0c;以电缆连接到母船的人员操作。常搭载水下光源和照相机、摄影机、机械手臂、声纳等。因为具有机械手臂&a…

华为OD机试 - 找朋友(Java 2023 B卷 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述大白话解释一下就是&#xff1a;1、输入&#xff1a;2、输出&#xff1a;3、说明 四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专…

安装node-sass安装失败(Failed at the node-sass@4.14.1 postinstall script.)

npm i安装依赖&#xff0c;安装node-sass失败 全局设置淘宝镜像&#xff0c;还是下载不下来。下载不下来可能是因为默认从github上去下载node-sass&#xff0c;而国内经常连不上或者网络不好。可以单独下载 npm i node-sass4.14.1 --sass_binary_sitehttps://npm.taobao.org/…

Maven多环境下 active: @profileActive@报错问题解决

1.报错&#xff1a; Caused by: org.yaml.snakeyaml.scanner.ScannerException: while scanning for the next token found character that cannot start any token.(Do not use for indentation) 2.解决办法&#xff1a; 在主pom的文件下&#xff0c;重新加载&#xff1a;

模型可解释性

模型可解释性 前言导读Background1、为什么需要可解释性&#xff1f;2、诞生背景3、研究现状4、常见的模型可解释性方法4.1 基于模型自身的可解释性1&#xff09;Explanation Generation2&#xff09;Prototype Network 4.2 基于结果的可解释性 5、应用前景6、面临挑战 前言导读…

基于ssm的校园快递物流管理系统(java+jsp+ssm+javabean+mysql+tomcat)

博主24h在线&#xff0c;想要源码文档部署视频直接私聊&#xff0c;9.9拿走&#xff01; 基于javawebmysql的ssm校园快递物流管理系统(javajspssmjavabeanmysqltomcat) 运行环境&#xff1a; Java≥8、MySQL≥5.7、Tomcat≥8 开发工具&#xff1a; eclipse/idea/myeclipse/s…

php实现普通和定时跳转的几种方式

一、普通跳转 1、使用header函数&#xff1a;通过设置HTTP头部信息实现页面跳转。可以使用Location头部指定跳转的URL。例如&#xff1a; header("Location: http://www.example.com"); exit(); 2、使用JavaScript&#xff1a;可以使用JavaScript的window.location…

倾斜摄影三维模型的根节点合并的并行处理技术分析

倾斜摄影三维模型的根节点合并的并行处理技术分析 倾斜摄影三维模型的根节点合并是指将多个倾斜摄影拍摄得到的三维模型中的根节点进行合并&#xff0c;以减少模型大小和复杂度。在处理大规模的倾斜摄影数据时&#xff0c;传统的串行处理方法效率较低&#xff0c;因此需要使用并…

Shiro安全框架

一、与SpringBoot整合 ①&#xff1a;框架整合 1. 创建SpringBoot项目 环境&#xff1a; jdk: 1.8SpringBoot: 2.5.14 2. 整合MyBatis根据实体类生成表 可查看文章&#xff1a;https://juejin.cn/post/7234324615015776315 按照以上笔记配置后在补充一下代码 依赖MyBatisP…

QML 中TextField输入框和下划线的设定

1.TextField的默认显示方式是输入框&#xff0c;如下所示: TextField { placeholderText: qsTr("Enter name") } 但是也有这样显示的,它变成了下划线: 在属性设置中是找不到相关设置&#xff0c;结果在mian.cpp中发现了一行代码会影响效果。这行代码是…

网工实验笔记:IPv6(配置6to4隧道)

1. 实验目的 熟悉6to4隧道的应用场景 掌握6to4隧道的配置方法 2. 实验拓扑 实验拓扑如图所示&#xff1a; 想要华为数通配套实验拓扑和配置笔记的朋友们点赞关注&#xff0c;评论区留下邮箱发给你! 3. 实验步骤 &#xff08;1&#xff09;配置IP地址 AR1的配置 …

ActiveMQ反序列化漏洞(CVE-2015-5254)复现

漏洞描述 Apache ActiveMQ 是由美国 Pachitea &#xff08;Apache&#xff09; 软件基金会开发的开源消息传递中间件&#xff0c;支持 Java 消息传递服务、集群、Spring 框架等。 Apache ActiveMQ 版本 5.x 之前的 5.13.0 安全漏洞&#xff0c;该漏洞由程序导致&#xff0c;不…

运动蓝牙耳机哪个品牌好?值得推荐的运动耳机分享

​对于我来说&#xff0c;运动和音乐是生活中不可或缺的元素。无论是在室内还是在户外锻炼&#xff0c;我都会选择一款适合的运动耳机&#xff0c;播放自己喜欢的音乐&#xff0c;让自己放松身心。在选择运动耳机时&#xff0c;我会考虑到它的舒适度、音质、耐用的性能以及防水…

led灯对眼睛有伤害吗?精选高品质的护眼台灯

在大家的认知中led灯最大的危害就是有蓝光辐射&#xff0c;其实在如今科技发达的时代&#xff0c;很多led灯对蓝光的处理技术都已经非常成熟的了&#xff0c;有些led灯具甚至做到了RG0无蓝光危害的。只要我们挑选一款光源合适、质量合格的产品&#xff0c;正确的使用基本都不会…

Kubernetes的介绍

目录 Kubernetes概述 1、作用 2、官网 3、K8S的主要功能 Kubernetes 集群架构与组件 1、核心组件 1&#xff09;Kube-apiserver 2&#xff09;Kube-controller-manager 3&#xff09;Kube-scheduler 4&#xff09;etcd 5&#xff09;Kubelet 6&#xff09;Kube-Pro…

FAN7391MX 高压600V 用于高压、高速驱动 MOSFET和IGBT 半桥栅极驱动器IC

FAN7391MX是单片高侧和低侧栅极驱动 IC&#xff0c;可驱动工作在高达 600 V 电压的高速 MOSFET 和 IGBT。它具有缓冲输出级&#xff0c;所有 NMOS 晶体管设计用于实现高脉冲电流驱动能力和最低交叉传导。Fairchild 的高压工艺和共模噪声消除技术可使高侧驱动器在高 dv/dt 噪声环…

amd Ubuntu opencl 安装

amd cpugpu 安装amd显卡驱动&#xff0c;下载地址&#xff1a; https://www.amd.com/en/support/linux-drivers //eg: sudo apt install ./amdgpu-install_5.4.50403-1_all.deb amdgpu-install安装成功之后可输入 glxinfo | grep rendering&#xff0c;显示 yes 则显卡驱动安…