【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

news2025/1/10 11:34:51

痛点背景

业务场景

假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做?

之前方案

最简单的做法,可以服务端启动个定时器,隔个几秒扫描数据库中待支付的订单,如果(当前时间-订单创建时间)>30分钟,则关闭订单。

方案评估
  • 优点:是实现简单,缺点呢?
  • *缺点:定时扫描意味着隔个几秒就得查一次数据库,频率高的情况下,如果数据库中订单总量特别大,这种高频扫描会对数据库带来一定压力,待付款订单特别多时(做个爆品秒杀活动,或者啥促销活动),若一次性查到内存中,容易引起宕机,需要分页查询,多少也会有一定数据库层面压力。
延时队列出现
  • 能够在指定时间间隔后触发某个业务操作
  • 能够应对业务数据量特别大的特殊场景

RocketMQ延时消息能够完美的解决上述需求,正常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费,这样就将数据库层面的压力转移到了MQ中,也不需要手写定时器,降低了业务复杂度,同时MQ自带削峰功能,能够很好的应对业务高峰。

功能特点

  • RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;
  • 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;
  • 在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;
  • *broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

Broker处理延迟消息

延时队列生产者端:

延时消息的关键点在于Producer生产者需要给消息设置特定延时级别,消费端代码与正常消费者没有差别。

public class Producer {
	private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("111.231.110.149:9876");

        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("test message" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );

                msg.setDelayTimeLevel(3);
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}
复制代码
初始化

DefaultMessageStore在启动时,会调用ScheduleMessageService#load()方法来加载消息消费进度和初始化延迟级别对应map,然后调用ScheduleMessageService#start()方法来启动类

load方法

public boolean load() {
        boolean result = super.load();
        result = result && this.parseDelayLevel();
        return result;
}
复制代码

ScheduleMessageService继承自ConfigManager类,super.load()方法对应

public boolean load() {
        String fileName = null;
        try {
            fileName = this.configFilePath();
            String jsonString = MixAll.file2String(fileName);

            if (null == jsonString || jsonString.length() == 0) {
                return this.loadBak();
            } else {
                this.decode(jsonString);
                log.info("load " + fileName + " OK");
                return true;
            }
        } catch (Exception e) {
            log.error("load " + fileName + " failed, and try to load backup file", e);
            return this.loadBak();
        }
}
复制代码

延时队列源码分析:

先从延时消息延迟级别设置与broker端消息持久化入手。

具体实现

RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早。

启动延迟消息定时任务

如果想要深入了解的可以看一下ScheduleMessageService这个类

内部变量含义

延时消息定时投递相关具体实现代码在ScheduleMessageService中,先看下变量定义

  • delayLevelTable定义了延迟级别和延迟时间的对应关系
  • *offsetTable存放延延迟级别对应的队列消费的offset
ScheduleMessageService.start()
复制代码

延迟消息投递

其中根据,delayLevel获取消费队列id的方法如下,即queueId = delayLevel-1

public static int delayLevel2QueueId(final int delayLevel) {
        return delayLevel - 1;
}
复制代码

核心逻辑就是取出tagCode(延时消息持久化时,tagsCode存储的是消息投递时间),解析成消息投递时间,与当前时间戳做差,判断是否应该进行消息投递,具体进行消息投递的方法,在if (countdown

分享资源

资源分享
获取以上资源请访问开源项目 点击跳转

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/855815.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EasyPoi导出 导入(带校验)简单示例 EasyExcel

官方文档 : http://doc.wupaas.com/docs/easypoi pom的引入: <!-- easyPoi--><dependency><groupId>cn.afterturn</groupId><artifactId>easypoi-spring-boot-starter</artifactId><version>4.0.0</version></dep…

分布式协调组件Zookeeper

Zookeeper介绍 什么是Zookeeper ZooKeeper 是⼀种分布式协调组件&#xff0c;用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper 通过其简单的架构和 API 解决了这个问题。ZooKeeper 允许开发人员专注于核心应用程序逻辑&#xff0c;而不必担心应用…

【Linux】多线程——线程引入 | 线程控制

文章目录 一、Linux多线程1. 线程概念2. 线程创建3. 线程和进程4. 线程的优缺点 二、线程控制1. 线程创建2. 线程终止3. 线程等待4. 线程分离5. 线程局部存储 三、线程封装 一、Linux多线程 一级页表和二级页表都是key/val模型&#xff0c;一级页表的key是第一份的10个比特位&a…

(统计学习方法|李航)第一章统计学习方法概论——四五六节模型评估与模型选择,正则化与交叉验证,泛化能力

一&#xff0c;模型评估与模型选择 1.训练误差与测试误差 假如我们有100个数据。80条记录给训练集&#xff0c;10条记录给测试集&#xff0c;10条记录给验证集 先在训练集中训练模型&#xff0c; 再在验证集上测试看哪种模型更拟合 最后用测试集算出成绩 表示决策函数 模型…

数据清理在数据科学中的重要性

什么是数据清理&#xff1f; 推荐&#xff1a;使用 NSDT场景编辑器 助你快速搭建可编辑的3D应用场景 在数据科学中&#xff0c;数据清理是识别不正确数据并修复错误的过程&#xff0c;以便最终数据集可供使用。错误可能包括重复字段、格式不正确、字段不完整、数据不相关或不准…

基于kettle实现pg数据定时转存mongodb

mogodb 待创建 基于kettle实现pg数据定时转存mongodb_kettle 实时迁移 mongodb_呆呆的私房菜的博客-CSDN博客

链表和哈希Set

1 LinkedList集合类 LinkedList集合类底层是使用双向链表实现的&#xff0c;相较于ArrayList&#xff0c;更方便进行增删操作。 在增删查改方面&#xff0c;新增了头尾操作&#xff0c;比如从头部插入、尾部插入、头部删除、尾部删除、头部查询和尾部查询等操作。由于有头尾的…

SpringCloud实用篇3----Docker

1.初识Docker 1.1 什么是Docker 微服务虽然具备各种各样的优势&#xff0c;但服务的拆分通用给部署带来了很大的麻烦。 分布式系统中&#xff0c;依赖的组件非常多&#xff0c;不同组件之间部署时往往会产生一些冲突。在数百上千台服务中重复部署&#xff0c;环境不一定一致…

gitblit windows部署

1.官网下载 往死慢&#xff0c;我是从百度找的1.9.1&#xff0c;几乎就是最新版 http://www.gitblit.com/ 2.解压 下载下来是一个zip压缩包&#xff0c;直接解压即可 3.配置 3.1.配置资源库路径 找到data文件下的gitblit.properties文件&#xff0c;用Notepad打开 **注意路…

云原生可观测框架 OpenTelemetry 基础知识(架构/分布式追踪/指标/日志/采样/收集器)...

什么是 OpenTelemetry&#xff1f; OpenTelemetry 是一个开源的可观测性框架&#xff0c;由云原生基金会(CNCF)托管。它是 OpenCensus 和 OpenTracing 项目的合并。旨在为所有类型的可观测信号(如跟踪、指标和日志)提供单一标准。 https://opentelemetry.iohttps://www.cncf.io…

微服务Eureka注册中心

目录 一、Eureka的结构和作用 二、搭建eureka-server 三、服务注册 四、服务发现 假如我们的服务提供者user-service部署了多个实例&#xff0c;如图&#xff1a; 存在的问题&#xff1a; order-service在发起远程调用的时候&#xff0c;该如何得知user-service实例的ip地址…

SpringCloud 尚硅谷 微服务简介以及Eureka使用

写在前面 该系列博客仅用于本人学习尚硅谷课程SpringCloud笔记&#xff0c;其中的错误在所难免&#xff0c;如有错误恳请指正。 官方源码地址&#xff1a;https://github.com/zzyybs/atguigu_spirngcloud2020 什么是SpringCloud Spring Cloud是微服务一站式服务解决方案&…

芒果 TV 基于 Flink 的实时数仓建设实践

公司简介&#xff1a;芒果 TV 作为湖南广电旗下互联网视频平台&#xff0c;在“一云多屏&#xff0c;多元一体”的战略指导下&#xff0c;通过内容自制&#xff0c;培植核心竞争力&#xff0c;从独播、独特走向独创&#xff0c;并通过市场化运作完成 A 轮、B 轮融资&#xff0c…

数据库活动监控(DAM)

在当今数据驱动的世界中&#xff0c;组织在保护存储在数据库中的机密数据并确保其完整性方面面临着越来越多的挑战。数据库审计通过提供全面的数据库活动监控方法&#xff0c;在应对这些挑战方面发挥着至关重要的作用。 数据库活动监控&#xff08;Database Activity Monitori…

【Redis】初学Redis

目录 使用Redisyum安装redis启动redis操作redis设置远程连接 Redis路线Redis 使用Redis yum安装redis 使用命令&#xff0c;直接将Redis安装到linux服务器&#xff1a; yum -y install redis启动redis redis-server /etc/redis.conf &操作redis redis-cli设置远程连接…

最新AI创作系统ChatGPT程序源码+详细搭建部署教程+微信公众号版+H5源码/支持GPT4.0+GPT联网提问/支持ai绘画+MJ以图生图+思维导图生成!

使用Nestjs和Vue3框架技术&#xff0c;持续集成AI能力到系统&#xff01; 新增 MJ 官方图片重新生成指令功能同步官方 Vary 指令 单张图片对比加强 Vary(Strong) | Vary(Subtle)同步官方 Zoom 指令 单张图片无限缩放 Zoom out 2x | Zoom out 1.5x新增GPT联网提问功能、手机号注…

集合Collection-List-ArrayList学习

一、集合 集合是数据容器。相较于数组集合具有以下几个特点&#xff1a; 数组一旦创建&#xff0c;长度不可改变。集合的长度会自动扩容。集合具有很多数组没有的功能函数API数组元素的存储特点单一&#xff0c;不同的集合有不同的存储特点。 1. Collection顶层接口 Collect…

Python-OpenCV中的图像处理-图像梯度

Python-OpenCV中的图像处理-图像梯度 图像梯度Sobel 算子和 Scharr 算子Laplacian 算子 图像梯度 图像梯度&#xff0c;图像边界等使用到的函数有&#xff1a; cv2.Sobel()&#xff0c; cv2.Scharr()&#xff0c; cv2.Laplacian() 等原理&#xff1a;梯度简单来说就是求导。Op…

Kotlin反射访问androidx.collection.LruCache类私有变量

Kotlin反射访问androidx.collection.LruCache类私有变量 androidx.collection.LruCache类中定义了一个名为map的LinkedHashMap&#xff0c;map存储了所有LruCache的数据&#xff0c;有时候需要遍历访问该LinkedHashMap&#xff0c;取出里面的值&#xff0c;但是LruCache代码实…

Jenkins+Docker+SpringCloud微服务持续集成

JenkinsDockerSpringCloud微服务持续集成 JenkinsDockerSpringCloud持续集成流程说明SpringCloud微服务源码概述本地运行微服务本地部署微服务 Docker安装和Dockerfile制作微服务镜像Harbor镜像仓库安装及使用在Harbor创建用户和项目上传镜像到Harbor从Harbor下载镜像 微服务持…