RocketMQ 延迟消息

news2025/1/16 3:59:42

RocketMQ 延迟消息

RocketMQ 消费者启动流程

什么是延迟消息

RocketMQ 延迟消息是指,生产者发送消息给消费者消息,消费者需要等待一段时间后才能消费到。

使用场景

用户下单之后,15分钟未支付,对支付账单进行提醒或者关单处理。

RocketMQ 开源版本的消息不支持任意时间精度,只支持5s 10s 1m等等。

Broker 如何处理延迟消息

消息投递如下:

  1. 生产者发送一个延迟消息到一个 topic
  2. Broker 判断是个延迟消息后,将消息暂存
  3. Broker 通过延迟服务, 先检查消息是否过期,如果到期将消息投递到目标 topic
  4. 消费者消费topic中的投递延迟消息。

开源RocketMQ 的消息不支持任意精度,默认支持 18个 level:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker 在启动的时候,会创建一个内部 topic:“SCHEDULE_TOPIC_XXXX” 根据延迟 level 数量,创建对应数量的 队列。 也就是说 18 level 对应了18 个队列。

具体可以在 代码TopicConfigManager.java 中 看到:

private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;

要注意的是,Broker 一般是集群模式
部署,也就是说,每个Broker 都会有18个队列。

TopicConfigManager#TopicConfigManager(BrokerController brokerController)

生产者消息延迟发送

代码示例如下:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setTags("Tag");
msg.setBody("this is a delay message".getBytes());
//设置延迟level为5,对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);

Broker 存储延迟消息

上一篇文章已经谈到,Broker 收到消费者消息后,会进行消息存储,然后再转发到消费队列(ConsumerQueue),然后再推给消费者。

其实一旦消息转发到

存储延迟消息的流程也类似

  1. 确定延迟消息投递到topic 哪个队列。存储生产者写入的消息时,将消息转发到 ConsumeQueue 中,消费者就能消费到。 延迟消息不能立即消息到,于是将 topic 名称修改为 SCHEDULE_TOPIC_XXX,并根据延迟消息级别,确定投递到哪个队列上。同时还会将原来消息要发送到的目标 topic 和队列记录投递到哪个队列。

代码在CommitLog#asyncPutMessage 中

设置延迟消息的投递队列信息代码如下:

 // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
            // 如果设置的级别超过了最大级别,重置延迟级别
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
// 计算延迟消息应该投递到 SCHEDULE_TOPIC_XXXX 到哪个队列。
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                // 记录原始 topic ,queueid,方便后期投递到目标 topic
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 更新消息投递目标为 SCHEDULE_TOPIC_XXX,queueId
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }

消息转发

消息转发过程其实中会对延迟消息做一些特殊处理

CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/858239.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++中的typeid

2023年8月10日,周四下午 目录 概述typeid的用法用法1用法2用法3举例说明 概述 typeid是 C 中的运算符,用于获取表达式或类型的运行时类型信息。 它返回一个std::type_info对象,该对象包含有关类型的信息,例如类型的名称。 type…

怎样学会单片机

0、学单片机首先要明白,一个单片机啥也干不了,学单片机的目的是学习怎么用单片机驱动外部设备,比如数码管,电机,液晶屏等,这个需要外围电路的配合,所以学习单片机在这个层面上可以等同为学习单片…

南大实验pa0:安装环境

安装untubu没问题,但是切到清华软件园之后,问题百出。记录一下 问题1 如上图所示,在安装build-essential的时候出现了问题 The following packages have unmet dependencies:g-11 : Depends: gcc-11-base ( 11.2.0-19ubuntu1) but 11.4.0-1…

杭州企业可以用DV https证书吗

DV https证书是入门级的https证书,也可以叫DV基础型https证书,这款https证书企业是可以用的,甚至商城网站、金融网站都可以使用,不限制申请者,个人或者企事业单位都可以申请。DV基础型https证书虽然只是入门级的https证…

OpenHarmony携千行百业创新成果亮相HDC.Together 2023

8月4日-6日,华为开发者大会2023(以下简称“大会”)在中国松山湖举办,OpenAtom OpenHarmony(简称“OpenHarmony”)隆重参会,在大会互动体验区设置了“行业创新展区”,成为大会展区中的…

Debian10:安装PHPVirtualBox

PHPVirtualBox 是一个用 PHP 编写,用于管理 VirtualBox 的 Web 前端(由AJAX实现)。 参考文章:VirtualBoxPHPVirtualBox部署_骡子先生的博客-CSDN博客php virualbox,浏览器远程控制VBox 虚拟机phpVirtualBox_weixin_39815879的博客…

ctypes使用浅谈

什么是ctypes: ctypes 是 Python 的一个标准库,用于与 C 语言进行交互。它提供了一组工具和函数,可以方便地调用动态链接库(DLL)或共享对象(SO)中的 C 函数,并处理 C 数据类型的转换…

探索自动化网页交互的魔力:学习 Selenium 之旅【超详细】

"在当今数字化的世界中,网页自动化已经成为了不可或缺的技能。想象一下,您可以通过编写代码,让浏览器自动执行各种操作,从点击按钮到填写表单,从网页抓取数据到进行自动化测试。学习 Selenium,这一功能…

用普通用户sudo执行ansibe命令

1、编辑/etc/ansible/ansible.cfg 2、hosts文件 3、执行命令 必须加-b选项

无涯教程-Perl - goto函数

描述 该函数具有三种形式,第一种形式使当前执行点跳到称为LABEL的点。这种形式的goto不能用于跳入循环或外部函数。您只能跳至同一范围内的一点。 第二种形式期望EXPR判断为可识别的LABEL。通常,您应该能够使用普通的条件语句或函数来控制程序的执行,因此不建议使用该程序。 …

配置service启动nginx

一.以源码形式安装的nginx,没有nginx.service 二.切换到service配置目录 三.编辑nginx.service文件 四.启动测试 1.我的开始报了这个问题,说没有这个/var/cache/nginx/client_temp目录,直接创建一个就好了 2.开启/关闭 一.以源码形式安装…

【Flutter】【基础】CustomPaint 绘画功能(一)

功能:CustomPaint 相当于在一个画布上面画画,可以自己绘制不同的颜色形状等 在各种widget 或者是插件不能满足到需求的时候,可以自己定义一些形状 使用实例和代码: CustomPaint: 能使你绘制的东西显示在你的ui 上面&a…

Mapbox加载天地图CGCS2000矢量瓦片地图

1.背景 最近在做天地图的项目,要基于MapBox添加CGCS2000矢量切片数据,但是 Mapbox 只支持web 墨卡托(3857)坐标系的数据。Github有专业用户修改了mapbox-gl的相关代码,支持CGCS2000的切片数据加载,并且修改…

idea添加作者信息

idea添加作者信息 在idea中,经常会有这些波浪纹提示,放在上面之后会提示添加作者信息,点击添加作者信息后,但是不是自己想要的 这里提取的话好像没什么办法修改,可能修改电脑的名称。这样提取出来才是自己需要的 自定义作者信息…

基于Java+SpringBoot+Vue的网上书城管理系统设计与实现(源码+LW+部署文档等)

博主介绍: 大家好,我是一名在Java圈混迹十余年的程序员,精通Java编程语言,同时也熟练掌握微信小程序、Python和Android等技术,能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…

CSS:background 复合属性详解(用法 + 例子 + 效果)

目录 background 复合属性background-color 背景颜色(纯)background-image 背景图片 或者 渐变颜色background-repeat 背景是否重复background-size 设置图片大小background-position 设置背景图片显示位置background-attachment 设置背景图片是否随页面…

软件包管理

一、rpm管理软件包 1、获得rpm的软件包 1)去官网安装不推荐 找自己光盘有没有这个包 装好需要的包之后装qq 2)去镜像站点,推荐 二、yum/dnf管理软件包 解决软件的依赖关系,可以自动的去服务器下载软件包 1、使用yum软件包 使用…

clion run qt 问题汇总

一、Error copying file “D:/soft/QT/5.15.2/mingw81_64/bin/Qt5Cored.dll” to “D:/work/Ccode/qtproject/cmake-build-debug-qtmingw”.报错 查看路径下确实没有Qt5Cored.dll,只有Qt5Core.dll 注释掉cmakelist中的这三行 重新执行后成功 二、使用CLion编辑u…

【Autolayout案例02-距离四周边距 Objective-C语言】

一、好,来看第二个案例 1.第二个案例,是什么意思呢,第二个案例,要求屏幕中间,有一个UIView UIView,是个红色的UIView UIView的大小,我不限定 但是无论你是什么屏幕下 这个UIView距离上边,始终是50 距离右边,始终是50, 距离下边,始终是50, 距离左边,始终是5…

【运维工程师学习八】代理及安装配置Nginx反向代理

【运维工程师学习八】代理 正向代理一、使用正向代理的主要作用有:二、反向代理三、使用反向代理的主要作用有:四、透明代理五、各种代理的主要区别六、Nginx的安装七、了解nginx的文件位置八、了解nginx程序的命令行参数九、开启nginx反向代理十、解读n…