7.延时消息与原理探究

news2025/1/10 1:55:47

highlight: arduino-light

4.3 延时消息

延迟消息对应的Topic是SCHEDULETOPICXXXX,注意就是SCHEDULETOPICXXXX,XXXX不是某某某的意思。

image.png

SCHEDULETOPICXXXX的队列名称是从2开始到17,对应的delayLevel为3到18,3对应10s,18对应2h,在类MessageStoreConfig中这样定义延时时间:

String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"。

SCHEDULETOPICXXXX这个topic只对内部使用,对于consumer只能消费到自己所在的消费者组的重试topic的数据。

比如A是OrderConsumer组的一个消费者,OrderConsumer消费的主题Topic是Order。

那么消费者A在启动的时候,会订阅Order主题的同时,还会订阅%RETRY%_OrderConsumer。

即订阅Order主题的同时,还会订阅%RETRY%+消费者组的主题。

consumer消费失败的消息发回broker后总是先写到SCHEDULETOPICXXXX里面,然后schedule service在延迟时间到了以后会读取SCHEDULETOPICXXXX里面的数据然后重新发回到重试主题,consumer订阅了重试主题,所以会重新消费失败的数据,这样就完成了一个循环。

发送到重试消费Topic 是%RETRY% + 消费组名 注意是消费组名 我们思考一下为什么是消费者组名? A消费者组消费成功 B消费者组消费失败 如果发回原topic就有问题了,A又会消费一次

rocketmq 先将不同延时等级的消息存入内部对应延时队列中,然后不断的从延时队列中拉取消息判断是否到期,然后进行投递到对应的topic中。

从这个过程也能看到,一个消费失败的消息体每次发回broker需要在commitLog里面存储两份。

topic为SCHEDULETOPICXXXX的一份这个主要是为schedule service控制延时用的。

topic为%RETRY%groupName的一份。

通过固定延时等级的方式,同一个队列中的消息都是相同的延时等级,不需要对消息进行排序,只需要按顺序拉取消息判断是否可以投递就行了。但也限制了延时时间。

另外,因为只要延时消息存入延时队列中,就会写入commitlog文件中,然后rocketmq的高可用(同步复制或异步复制)就会将消息复制到slave中,从而保证延时消息的可靠性。

虽然rocketmq不支持任意延时时间,但相比于rabbitmq的死信消息,仍然提供了18个延时等级,基本也能覆盖很多场景了。

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。

阿里云 RocketMQ 提供了任意时刻的定时消息功能,Apache 的 RocketMQ 并没有,阿里并没有开源 发送延时消息时需要设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。

延迟消息是根据延迟队列的 level 来的,延迟队列默认是 msg.setDelayTimeLevel(5) 代表延迟一分钟 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" 是这 18 个等级(秒(s)、分(m)、小时(h)),level 为 1,表示延迟 1 秒后消费,level 为 5 表示延迟 1 分钟后消费,level 为 18 表示延迟 2 个 小时消费。

生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。

固定Level的含义是延迟是特定级别的,比如支持3秒、5秒的Level,那么用户只能发送3秒延迟或者5秒延迟,不能发送8秒延迟的消息。

消息队列RocketMQ的阿里云版本(收费版本)才支持到精确到秒级别的延迟消息(没有特定Level的限制)。

开源版本没有支持任意延迟的消息,我想可能有以下几个原因:

  1. 任意延迟的消息的需求不强烈
  2. 可能是一个比较有技术含量的点,不愿意开源

需求不强

对支持任意延迟的需求确实不强,因为:

  1. 延迟并不是MQ场景的核心功能,业务单独做一个替代方案的成本不大
  2. 业务上一般对延迟的需求都是固定的,比如下单后半小时check是否付款,发货后7天check是否收货

在我司,MQ上线一年多后才有业务方希望我能支持延迟消息,且不要求任意延迟,只要求和RocketMQ开源版本一致,支持一些业务上的级别即可。

不愿意开源

为了差异化(好云上卖钱),只能将开源版本的功能进行阉割,所以开源版本的RocketMQ变成了只支持特定Level的延迟。

既然业务有需求,我们肯定也要去支持。

任意延迟的消息难点在哪里?

首先,我们先划清楚定义和边界:在我们的系统范围内,支持任意延迟的消息指的是:

  1. 精度支持到秒级别
  2. 最大支持30天的延迟

本着对自己的高要求,我们并不满足于开源RocketMQ的18个Level的方案。那么,如果我们自己要去实现一个支持任意延迟的消息队列,难点在排序和消息存储。

消息要在服务端排序

任意延迟意味消息要在服务端排序

比如用户先发了一条延迟1分钟的消息,一秒后发了一条延迟3秒的消息,显然延迟3秒的消息需要先被投递出去。

那么服务端在收到消息后需要对消息进行排序后再投递出去。在MQ中,为了保证可靠性,消息是需要落盘的,且对性能和延

迟的要求,决定了在服务端对消息进行排序是完全不可接受的。

消息存储量太大

其次,目前MQ的方案中都是基于Write Ahead Log的方式实现的(RocketMQ、Kafka),日志文件会被过期删除,一般会保留最近一段时间的数据。

支持任意级别的延迟,那么需要保存最近30天的消息。阿里内部 1000+ 核心应用使用,每天流转几千亿条消息,经过双11交易、商品等核心链路真实场景的验证,稳定可靠。

考虑一下一天几千亿的消息,保存30天的话需要堆多少服务器,显然是无法做到的。

开源版本延迟消息如何做的?

虽然决定自己做,但是依旧需要先了解开源的实现,那么就只能看看RocketMQ开源版本中,支持18个Level是怎么实现的,希望能从中得到一些灵感。

上图是通过RocketMQ源码分析后简化一个实现原理方案示意图。

image.png

消息写入

  1. 在写入CommitLog之前,如果是延迟消息,替换掉消息的Topic和queueId(被替换为延迟消息特定的Topic,queueId则为延迟级别对应的id)

  2. 消息写入CommitLog之后,提交dispatchRequest到DispatchService

  3. 因为在第①步中Topic和QueueId被替换了,所以写入的ConsumeQueue实际上非真正消息应该所属的ConsumeQueue,而是写入到ScheduledConsumeQueue中(这个特定的Queue存放不会被消费)

Schedule过程中:

private String levelString = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; String[] levelArray = levelString.split(" "); for (int i = 0; i < levelArray.length; i++) { this.delayLevelTable.put(level, delayTimeMillis); }

每个level分配1个定时器,扫描所有延迟级别里面的延迟消息message。

如果存在延时消息。

如果当前消息不到消费的时间,则在countdown毫秒后再执行任务。countdown是消息消费的时间-当前时间now。

如果当前消息到消费的时间,根据消息的物理偏移量和大小获取消息。把延迟消息message,发送到真正的topic对应的某个队列。

RocketMQ延迟消息的代码实战及原理分析 - 万猫学社 - 博客园 (cnblogs.com)

回顾一下这个方案,最大的优点就是没有了排序:

  • 先发一条level是5s的消息,再发一条level是3s的消息,因为他们会属于不同的ScheduleQueue所以投递顺序能保持正确
  • 如果先后发两条level相同的消息,那么他们的处于同一个ConsumeQueue且保持发送顺序
  • 因为level数固定,每个level的有自己独立的定时器,开销也不会很大
  • ScheduledConsumeQueue其实是一个普通的ConsumeQueue,所以可靠性等都可以按照原系统的M-S结构等得到保障

但是这个方案也有一些问题:

  • 固定了Level,不够灵活,最多只能支持18个Level
  • 业务是会变的,但是Level需要提前划分,不支持修改
  • 如果要支持30天的延迟,CommitLog的量会很大,这块怎么处理没有看到

时间轮:TimeWheel

总结RocketMQ的方案,通过划分Level的方式,将排序操作转换为了O(1)的ConsumeQueue 的append操作。

我们去支持任意延迟的消息,必然也需要通过类似的方式避免掉排序。

此时我们想到了TimeWheel:Netty中也是用TimeWheel来优化I/O超时的操作。

4.3.1 启动消息消费者

java public class ScheduledMessageConsumer { public static void main(String[] args) throws Exception { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); // 订阅Topics consumer.subscribe("TestTopic", "*"); // 注册消息监听者 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); } }

4.3.2 发送延时消息

java public class ScheduledMessageProducer {   public static void main(String[] args) throws Exception {      // 实例化一个生产者来产生延时消息      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");      // 启动生产者      producer.start();      int totalMessagesToSend = 100;      for (int i = 0; i < totalMessagesToSend; i++) {          Message message = new Message             ("TestTopic", ("Hello scheduled message " + i).getBytes());          // delayTimeLevel="1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";          // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间)          message.setDelayTimeLevel(3);          // 发送消息          producer.send(message);     }       // 关闭生产者      producer.shutdown(); } }

4.3.3 验证

您将会看到消息的消费比存储时间晚10秒

4.3.4 使用限制:1s-2h

md // org/apache/rocketmq/store/config/MessageStoreConfig.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18。

RocketMQ延时消息实现原理探究

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/673911.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

因为计算机中丢失mfc140.dll无法启动修复步骤分享

计算机报错提示mfc140.dll无法启动是怎么回事&#xff1f;mfc140.dll是什么文件&#xff0c;为什么会影响到软件程序的运行?相信你也有不少困惑&#xff0c;遇到这个情况不用慌&#xff0c;小编下面就分享关于mfc140.dll丢失的详细修复步骤以及mfc140.dll是什么。 mfc140.dll是…

java中集合类forEach删除元素报错:ConcurrentModificationException

如题所示&#xff0c;我们在java开发中&#xff0c;可能会有这样的一种情况&#xff0c;一个集合使用完了&#xff0c;我们想删除里面所有的元素&#xff0c;可能会遍历他们&#xff0c;然后依次调用删除操作。最简单的我们使用forEach遍历。 示例如下&#xff1a; public cla…

EasyCode代码生成插件-模板分享(基于数据表生成MyBatisPlus格式的dao,service,controller和vue组件)

目录 概述 使用演示 模板代码 实体类pojo 表现层controller 业务层service接口 业务层serviceImpl实现类 持久层dao Vue组件 概述 本片博客用于分享EasyCode的自定义模板&#xff08;模板在篇末&#xff09;&#xff0c;用于简化开发&#xff0c;免去重复性的工作。 …

SQL 基础语句

SQL 基础语句 DDL Data Definition Language 数据定义语言创建 create删除 drop修改 alter清空 truncate show tables ; --查看所有表&#xff1a; drop database db1; --删除数据库 create database db1 default character set utf8; --创建数据库 use databas…

【统信uos-server-20-1060e】-详细安装openGauss

【统信uos-server-20-1060e】-详细安装openGauss &#x1f53b; 前言&#x1f53b; 一、安装前准备&#x1f530; 1.1 openGauss安装包下载&#x1f530; 1.2 安装环境准备⛳ 1.2.1 硬件环境要求⛳ 1.2.2 软件环境要求⛳ 1.2.3 软件依赖要求⛳ 1.2.4 关闭操作系统防火墙、selin…

Redis 2023面试5题(四)

一、AOF 持久化&#xff08;Append Only File&#xff09;如何配置&#xff1f; AOF&#xff08;Append Only File&#xff09;持久化是 Redis 的一种持久化方式&#xff0c;它通过记录所有收到的写命令来保存数据。以下是一些关于如何配置 AOF 持久化的重要信息&#xff1a; …

Linux系统下使用移动硬盘或者U盘,如何挂载硬盘分区到Linux系统

本文目录 1、查看当前磁盘分区状态2、查看当前磁盘的挂载状态3、将磁盘挂载到指定目录下4、从文件系统里卸载磁盘 Linux系统里&#xff0c;除根目录以外&#xff0c;任何文件或者目录要想被访问&#xff0c;需要将其“关联”到根目录下的某个目录来实现&#xff0c;这种关联操作…

网络安全等级保护2.0 | 等保合规5件事

网络安全等级保护工作包括定级、备案、安全建设、等级测评、监督检查五个阶段。 1、定级 确认定级对象&#xff0c;参考《定级指南》等初步确认等级&#xff0c;组织专家评审&#xff0c;主管单位审核&#xff0c;公安机关备案审查。 备案 持定级报告和备案表等材料到公安机…

一文读懂openguass dcf网络模块

一文读懂openguass dcf网络模块 文章目录 一文读懂openguass dcf网络模块0. mec概要1. compress2. mec2.1 agent2.1.1 初始化agent2.1.2 agent执行 2.2 channel2.2.1 初始化channel2.2.2 连接channel 2.3 api2.4 func2.5 queue2.5.1 初始化2.5.2 运行2.5.1.1 接收消息入队2.5.1…

基于spss的多元统计分析 之 实例3(血压、胆固醇于心脏病关系的研究)(8/8)

血压、胆固醇于心脏病关系的研究 摘要 一般线性模型中的一种&#xff0c;即反应变量 (dependent variables)为二分类变量的回归分析&#xff0c;模型输出为变量取特定值的概率。 在进行二元Logistic回归分析时&#xff0c;通常会涉及3个步骤&#xff0c;分别是数据处理、卡方分…

自动化运维管理工具——Ansible

目录 一、概述 &#xff08;一&#xff09;特点 &#xff08;二&#xff09;工作特性 二、运行机制 三、安装 &#xff08;一&#xff09;配置源 &#xff08;二&#xff09;安装ansible &#xff08;三&#xff09;查看相关文件 &#xff08;四&#xff09;配置文件 …

如何统计网页访问量

目录 一、搭建Nginx服务 安装Nginx服务 第一步 关闭防火墙和安全机制 第二步 安装扩展包 第三步 安装Nginx和依赖环境 第四步 安装依赖包 第五步 创建一个用户和组 第六步 解包 第七步 进入Nginx目录下编译安装 第八步 进行编译 第九步 添加系统识别操作 第十步 检…

跟朋友撞offer怎么办?接了offer,下个月入职,结果老板面了我朋友,她已经入职了,我的offer还算数吗?...

职场上什么奇葩事都可能发生&#xff0c;跟朋友撞了offer是什么感受&#xff1f; 一位网友求助&#xff1a; 接了offer&#xff0c;正在和现公司谈判离职&#xff0c;下个月才能入职。结果老板面了其他人&#xff0c;正好是楼主认识的人&#xff0c;比楼主大十几岁。更尴尬的是…

浅谈C++|引用篇

目录 引入 一.引用的基本使用 (1)引用的概念&#xff1a; (2)引用的表示方法 (3)引用注意事项 (4)引用权限 二.引用的本质 三.引用与函数 (1)引用做函数参数 (2)引用做函数返回值 四.常量引用 五.引用与指针 引入 绰号&#xff0c;又称外号&#xff0c;是人的本名以外…

基于深度学习的目标检测的介绍(Introduction to object detection with deep learning)

物体检测的应用已经深入到我们的日常生活中&#xff0c;包括安全、自动车辆系统等。对象检测模型输入视觉效果(图像或视频)&#xff0c;并在每个相应对象周围输出带有标记的版本。这说起来容易做起来难&#xff0c;因为目标检测模型需要考虑复杂的算法和数据集&#xff0c;这些…

基于spss的多元统计分析 之 主成分分析(5/8)

实验目的&#xff1a; 1&#xff0e;掌握主成分分析的基本思想&#xff1b; 2&#xff0e;熟悉掌握SPSS软件进行主成分分析的基本操作&#xff1b; 3&#xff0e;利用实验指导的实例数据&#xff0c;上机熟悉主成分分析方法&#xff0e; 实验内容&#xff1a; 下表是我国2005年…

【C++篇】OOP中部分:继承和派生

友情链接&#xff1a;C/C系列系统学习目录 知识总结顺序参考C Primer Plus&#xff08;第六版&#xff09;和谭浩强老师的C程序设计&#xff08;第五版&#xff09;等&#xff0c;内容以书中为标准&#xff0c;同时参考其它各类书籍以及优质文章&#xff0c;以至减少知识点上的…

2023年05月份青少年软件编程Python等级考试试卷六级真题(含答案)

2023-05 Python六级真题 分数&#xff1a;100 题数&#xff1a;38 测试时长&#xff1a;60min 一、单选题(共25题&#xff0c;共50分) 1. 明明每天坚持背英语单词&#xff0c;他建立了英语单词错题本文件“mistakes.txt”&#xff0c;将每天记错的单词增加到该文件中&#x…

ROS:launch文件演示

目录 前言一、添加launch文件夹二、新建launch文件三、编辑launch内容四、 执行文件 前言 一个程序中可能需要启动多个节点&#xff0c;比如:ROS 内置的小乌龟案例&#xff0c;如果要控制乌龟运动&#xff0c;要启动多个窗口&#xff0c;分别启动 roscore、乌龟界面节点、键盘…

【数据分享】1929-2022年全球站点的逐日降雪深度数据(Shp\Excel\12000个站点)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、能见度等指标&#xff0c;说到气象数据&#xff0c;最详细的气象数据是具体到气象监测站点的数据&#xff01; 对于具体到监测站点的气象数据&#xff0c;之前我们分享过1929-2022年全球气象…