RocketMQ定时/延时消息实现机制

news2025/3/4 8:40:47

RocketMQ 的延迟消息是其核心特性之一,允许消息在指定延迟时间后才被消费者消费。
定时消息生命周期
在这里插入图片描述


一、延迟消息的核心机制

  • RocketMQ(5.0之前不支持任意时间精度的延迟,而是通过预定义的 延迟级别(Delay Level) 实现。默认支持 18 个延迟级别,对应延迟时间如下:
1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h

消息发送时指定延迟级别,Broker 会将消息暂存到内部 Topic(SCHEDULE_TOPIC_XXXX),这个topic下对应着这18个延迟级别的队列,到期后投递到目标 Topic。

  • RocketMQ5.0之后支持任意时间精度的延迟,内部是通过时间轮算法实现的消息延迟功能。
    定时消息仅支持在 MessageType为Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。
    RocketMQ 定时消息的定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度。

二、使用方式

  1. 发送消息时设置延迟级别:
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 设置延迟级别为 2(对应 5s)
msg.setDelayTimeLevel(2);
SendResult sendResult = producer.send(msg);
  1. 发送消息时设置延迟时间
		//定时/延时消息发送
        MessageBuilder messageBuilder = new MessageBuilderImpl();;
        //以下示例表示:延迟时间为10分钟之后的Unix时间戳。
        Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
        Message message = messageBuilder.setTopic("topic")
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                .setDeliveryTimestamp(deliverTimeStamp)
                //消息体
                .setBody("messageBody".getBytes())
                .build();
        try {
            //发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }

三、源码实现分析

1. 消息存储阶段
  • CommitLog 存储
    消息写入 CommitLog 前,Broker 检测 delayTimeLevel

    • 若为延迟消息,替换 TopicSCHEDULE_TOPIC_XXXX队列 IDdelayLevel - 1
    • 源码位置:org.apache.rocketmq.store.CommitLog#putMessage
  • 延迟队列 Topic
    每个延迟级别对应一个队列,例如 Level 2 存储在 SCHEDULE_TOPIC_XXXX 的队列 1 中。

2. 延迟消息调度
  • ScheduleMessageService
    核心调度服务,启动时初始化定时任务:

    public void start() {
        for (int i = 0; i < this.delayLevelTable.size(); i++) {
            int delayLevel = i + 1;
            this.scheduleExecutorService.schedule(
                new DeliverDelayedMessageTimerTask(delayLevel), 
                this.firstDelayTime, 
                TimeUnit.MILLISECONDS
            );
        }
    }
    
    • 每个延迟级别对应一个 DeliverDelayedMessageTimerTask 定时任务。
    • 定时扫描对应队列,将到期消息投递到目标 Topic。
  • 消息投递逻辑

    • 计算消息的 到期时间(存储时间 + 延迟时间)。
    • 若当前时间 >= 到期时间,从延迟队列拉取消息,恢复原始 Topic/QueueId,重新存入 CommitLog。
    • 源码位置:DeliverDelayedMessageTimerTask#executeOnTime
3. 消费者可见性
  • 消息在延迟期间对消费者不可见,到期投递到原 Topic 后,消费者才能拉取。

四、关键配置

  • Broker 配置
    修改 messageDelayLevel 参数可自定义延迟级别(需重启 Broker):
    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    

五、设计思考

  • 固定延迟级别 vs 任意延迟
    RocketMQ 采用固定级别避免为每个消息创建 Timer,减少资源消耗。定时任务按队列批量处理,效率更高。

  • 性能优化
    延迟消息的存储和普通消息共用 CommitLog,通过 Topic 和队列切换实现逻辑隔离,保证写入性能。


六、注意事项

  • 延迟精度:受定时任务扫描间隔影响,可能存在微小误差。
  • 消息重试:若消费失败,消息进入重试队列(非延迟队列)。
  • 避免大量相同定时时刻的消息:定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。

通过上述机制,RocketMQ 以较低成本实现了高吞吐量的延迟消息功能,适用于订单超时、定时任务等场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2309389.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SpringBoot的校园二手交易平台(源码+论文+部署教程)

运行环境 校园二手交易平台运行环境如下&#xff1a; • 前端&#xff1a;Vue • 后端&#xff1a;Java • IDE工具&#xff1a;IntelliJ IDEA&#xff08;可自行更换&#xff09; • 技术栈&#xff1a;SpringBoot Vue MySQL 主要功能 校园二手交易平台主要包含前台和…

利用 LangChain 和一个大语言模型(LLM)构建一个链条,自动从用户输入的问题中提取相关的 SQL 表信息,再生成对应的 SQL 查询

示例代码&#xff1a; from langchain_core.runnables import RunnablePassthrough from langchain.chains import create_sql_query_chain from operator import itemgetter from langchain.chains.openai_tools import create_extraction_chain_pydantic# 系统消息&#xff…

力扣hot 100之矩阵四题解法总结

本期总结hot100 中二维矩阵的题&#xff0c;时空复杂度就不分析了 1.矩阵置零 原地标记&#xff0c;用第一行和第一列作为当前行列是否为0的标记&#xff0c;同时用两个标签分别记录0行、0列的标记空间中原本是否有0 class Solution:def setZeroes(self, matrix: List[List[…

在Linux上使用APT安装Sniffnet的详细步骤

一、引言 Sniffnet 是一款开源的网络流量监控工具&#xff0c;适用于多种Linux发行版。如果你的Linux系统使用APT&#xff08;Advanced Package Tool&#xff09;作为包管理器&#xff0c;以下是如何通过APT安装Sniffnet的详细步骤。 二、系统要求 在开始安装之前&#xff0…

zookeeper-docker版

Zookeeper-docker版 1 zookeeper概述 1.1 什么是zookeeper Zookeeper是一个分布式的、高性能的、开源的分布式系统的协调&#xff08;Coordination&#xff09;服务&#xff0c;它是一个为分布式应用提供一致性服务的软件。 1.2 zookeeper应用场景 zookeeper是一个经典的分…

StableDiffusion本地部署 3 整合包猜想

本地部署和整合包制作猜测 文章目录 本地部署和整合包制作猜测官方部署第一种第二种 StabilityMatrix下载整合包制作流程猜测 写了这么多python打包和本地部署的文章&#xff0c;目的是向做一个小整合包出来&#xff0c;不要求有图形界面&#xff0c;只是希望一键就能运行。 但…

数据结构(初阶)(七)----树和二叉树(前中后序遍历)

实现链式结构的二叉树 实现链式结构的二叉树遍历前序遍历中序遍历后序遍历 节点个数叶子节点个数⼆叉树第k层结点个数⼆叉树的深度/⾼度查找值为X的节点二叉树的销毁 层序遍历判断二叉树是否为完全二叉树 ⽤链表来表⽰⼀棵⼆叉树&#xff0c;即⽤链来指⽰元素的逻辑关系。 通常…

科技赋能筑未来 中建海龙MiC建筑技术打造保障房建设新标杆

近日&#xff0c;深圳梅林路6号保障房项目顺利封顶&#xff0c;标志着国内装配式建筑领域又一里程碑式突破。中建海龙科技有限公司&#xff08;以下简称“中建海龙”&#xff09;以模块化集成建筑&#xff08;MiC&#xff09;技术为核心&#xff0c;通过科技创新与工业化建造深…

json介绍、python数据和json数据的相互转换

目录 一 json介绍 json是什么&#xff1f; 用处 Json 和 XML 对比 各语言对Json的支持情况 Json规范详解 二 python数据和json数据的相互转换 dumps() : 转换成json loads(): 转换成python数据 总结 一 json介绍 json是什么&#xff1f; 实质上是一条字符串 是一种…

计算机毕设JAVA——某高校宿舍管理系统(基于SpringBoot+Vue前后端分离的项目)

文章目录 概要项目演示图片系统架构技术运行环境系统功能简介 概要 网络上许多计算机毕设项目开发前端界面设计复杂、不美观&#xff0c;而且功能结构十分单一&#xff0c;存在很多雷同的项目&#xff1a;不同的项目基本上就是套用固定模板&#xff0c;换个颜色、改个文字&…

Spring Boot 测试:单元、集成与契约测试全解析

一、Spring Boot 分层测试策略 Spring Boot 应用采用经典的分层架构&#xff0c;不同层级的功能模块对应不同的测试策略&#xff0c;以确保代码质量和系统稳定性。 Spring Boot 分层架构&#xff1a; Spring Boot分层架构 A[客户端] -->|HTTP 请求| B[Controller 层] …

Oracle 数据库基础入门(四):分组与联表查询的深度探索(上)

在 Oracle 数据库的学习进程中&#xff0c;分组查询与联表查询是进阶阶段的重要知识点&#xff0c;它们如同数据库操作的魔法棒&#xff0c;能够从复杂的数据中挖掘出有价值的信息。对于 Java 全栈开发者而言&#xff0c;掌握这些技能不仅有助于高效地处理数据库数据&#xff0…

机器学习的起点:线性回归Linear Regression

机器学习的起点&#xff1a;线性回归Linear Regression 作为机器学习的起点&#xff0c;线性回归是理解算法逻辑的绝佳入口。我们从定义、评估方法、应用场景到局限性&#xff0c;用生活化的案例和数学直觉为你构建知识框架。 回归算法 一、线性回归的定义与核心原理 定义&a…

17、什么是智能指针,C++有哪几种智能指针【高频】

智能指针其实不是指针&#xff0c;而是一个&#xff08;模板&#xff09;类&#xff0c;用来存储指向某块资源的指针&#xff0c;并自动释放这块资源&#xff0c;从而解决内存泄漏问题。主要有以下四种&#xff1a; auto_ptr 它的思想就是当当一个指针对象赋值给另一个指针对…

PyCharm接入本地部署DeepSeek 实现AI编程!【支持windows与linux】

今天尝试在pycharm上接入了本地部署的deepseek&#xff0c;实现了AI编程&#xff0c;体验还是很棒的。下面详细叙述整个安装过程。 本次搭建的框架组合是 DeepSeek-r1:1.5b/7b Pycharm专业版或者社区版 Proxy AI&#xff08;CodeGPT&#xff09; 首先了解不同版本的deepsee…

PyCharm怎么集成DeepSeek

PyCharm怎么集成DeepSeek 在PyCharm中集成DeepSeek等大语言模型(LLM)可以借助一些插件或通过代码调用API的方式实现,以下为你详细介绍两种方法: 方法一:使用JetBrains AI插件(若支持DeepSeek) JetBrains推出了AI插件来集成大语言模型,不过截至2024年7月,官方插件主要…

【定昌Linux系统】部署了java程序,设置开启启动

将代码上传到相应的目录&#xff0c;并且配置了一个.sh的启动脚本文件 文件内容&#xff1a; #!/bin/bash# 指定JAR文件的路径&#xff08;如果JAR文件在当前目录&#xff0c;可以直接使用文件名&#xff09; JAR_FILE"/usr/local/java/xs_luruan_client/lib/xs_luruan_…

Java零基础入门笔记:(7)异常

前言 本笔记是学习狂神的java教程&#xff0c;建议配合视频&#xff0c;学习体验更佳。 【狂神说Java】Java零基础学习视频通俗易懂_哔哩哔哩_bilibili 第1-2章&#xff1a;Java零基础入门笔记&#xff1a;(1-2)入门&#xff08;简介、基础知识&#xff09;-CSDN博客 第3章…

【字符串】最长公共前缀 最长回文子串

文章目录 14. 最长公共前缀解题思路&#xff1a;模拟5. 最长回文子串解题思路一&#xff1a;动态规划解题思路二&#xff1a;中心扩散法 14. 最长公共前缀 14. 最长公共前缀 ​ 编写一个函数来查找字符串数组中的最长公共前缀。 ​ 如果不存在公共前缀&#xff0c;返回空字符…

react 中,使用antd layout布局中的sider 做sider的展开和收起功能

一 话不多说&#xff0c;先展示效果&#xff1a; 展开时&#xff1a; 收起时&#xff1a; 二、实现代码如下 react 文件 import React, {useState} from react; import {Layout} from antd; import styles from "./index.module.less"; // 这个是样式文件&#…