MQ四兄弟:如何实现延时消息

news2024/11/13 12:32:25

RabbitMQ延时消息

RabbitMQ 本身并没有直接支持延时消息的功能,但是可以通过使用 RabbitMQ 插件或构建消息死信队列(Dead Letter Exchange, DLX)的方式来实现延时消息。以下是两种实现延时消息的方法:

1、死信队列 (Dead-Letter Queue):

  • 当消息被拒绝接收,或者在队列中的存活时间超过了设置的TTL(Time-To-Live),或者队列达到最大长度时,消息会变成死信。
  • 死信会被重新发布到另一个交换机上,这个交换机就是DLX(Dead-Letter Exchange)。
  • 在声明业务队列时,可以添加一个x-dead-letter-exchange参数,值为死信交换机,这样死信就会被RabbitMQ重新发布到配置的这个交换机上.

2、延时插件 (Delayed Message Plugin):

需要先安装 RabbitMQ Delayed Message 插件

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
mv rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-<version>/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 声明一个类型为x-delayed-message的交换机,并添加一个x-delayed-type参数,值为交换机的类型,用于路由键的映射。
  • 这种方式允许消息在交换机中延迟一定时间,然后再根据路由键发送到相应的队列.

RocketMQ延时消息

RocketMQ把持 实现延时消息相对简单,因为它内置了对延时消息的支持。RocketMQ 通过设置消息的 定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

比如,在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求。

以下是实现延时消息的详细步骤:

1、Producer发送延时消息

RocketMQ 通过设置消息的 delayTimeLevel 属性来实现延时投递。

在 Producer 端,通过设置消息的 delayTimeLevel 属性来发送延时消息。RocketMQ 内置了一组延迟级别,可以通过 delayTimeLevel 来指定延迟时间。


// 创建消息
Message message = new Message("TopicTest", "TagA", "delayed message".getBytes());

// 设置延时级别
message.setDelayTimeLevel(3);  // 延时级别 3,对应于延时 10 秒

2、延时级别对照表

RocketMQ 预定义了多个延时级别,每个级别对应不同的延时时间。以下是默认的延时级别对照表:

可以根据需要选择合适的延时级别,并设置到 message.setDelayTimeLevel(level) 方法中。

3、Consumer 接收消息

在 Consumer 端,接收和处理延时消息与普通消息相同。

4、调整配置

如果默认的延时级别不满足需求,可以通过修改 RocketMQ 配置文件 broker.conf 来调整延时级别和对应的延时时间:

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

修改完配置文件后,重启 Broker 生效。

小结

可以看到RocketMQ 5.x已经更新了延时消息的实现,在官方文档可以看到已经改成定时、延时消息,原本4.x文档中的延时级别对照表已经去掉了,统一成时间戳的实现。

RocketMQ 5.x 版本对延时消息的实现进行了重大更新。与之前版本相比,5.x 版本可以更灵活地处理消息的延迟时间,允许用户指定准确的延迟时间而不仅仅是预设的延迟级别。

下面来具体看看,实时/延时消息概念:

定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

  • 定时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望消息在下午19:20:00定时投递,则定时时间为2022-06-09 19:20:00,转换成时间戳格式为1654773600000。
  • 延时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望延时1个小时后投递消息,则您需要根据当前时间和延时时长换算成定时时刻,即消息投递时间为2022-06-09 18:30:00,转换为时间戳格式为1654770600000。

Kafka延时消息

在Apache Kafka中,延时消息的处理也不是内置功能,但可以通过一些设计模式和技术手段来实现。以下是几种常见的方法来处理Kafka中的延时消息:

1、基于时间戳的延时消息

  • 生产者在发送消息时,可以在消息的头部添加一个时间戳字段,表示消息应该被消费的时间。
  • 消费者在接收到消息后,检查时间戳,如果未到处理时间,则暂时不处理此消息,直到达到指定时间。

2、基于单独的延时主题(Topic)

  • 创建一个专门的延时Topic
  • 生产者先将延时消息发送到延时Topic
  • 消费者从延时Topic拉取未到期的消息放入延时队列
  • 延时消息到期后,再发送到目标Topic供实际消费

3、利用Kafka Stream做中间处理

  • 创建一个Kafka Streams应用程序,用于处理延时消息。
  • 定义输入Topic,用于接收原始延时消息。同时定义输出Topic,用于发送到期的延时消息。
  • 使用Kafka Streams DSL定义Topology,对输入消息进行处理。
  • 使用自定义的Punctuator定期从State Store中读取到期的延时消息,发送到输出Topic。

这种方式的优点是延迟时间计算由Kafka Streams完成,不需要额外线程控制。缺点是需要应用程序支持并维护Kafka Streams,运维成本较高。

4、基于第三方中间件或工具

  • 利用redis、rabbitmq等其它中间件,构建一个延时消息系统
  • 延时消息从外部系统发往Kafka时已经延时完成
  • 例如利用RabbitMQ的延时队列特性实现

Pulsar延时消息

Pulsar自带了延时消息功能,可以在发送消息时设置消息的deliverAt或deliverAfter属性。

Apache Pulsar实现延时消息的方案:

  • deliverAfter方法:通过指定一个延时时长来发送消息,消息将在该时长后被投递。
  • deliverAt方法:通过指定一个具体的未来时间戳来发送消息,消息将在该时间点被投递。

这两种方法都可以通过Pulsar的客户端API实现,简单且直接。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1931442.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

kubernetes集群部署elasticsearch集群,包含无认证和有认证模式

1、背景&#xff1a; 因公司业务需要&#xff0c;需要在测试、生产kubernetes集群中部署elasticsearch集群&#xff0c;因不同环境要求&#xff0c;需要部署不同模式的elasticsearch集群&#xff0c; 1、测试环境因安全性要求不高&#xff0c;是部署一套默认配置&#xff1b; 2…

【LeetCode】十七、并查集

文章目录 1、并查集Union Find2、并查集find的优化&#xff1a;路径压缩 Quick find3、并查集union的优化&#xff1a;权重标记 1、并查集Union Find 并查集&#xff0c;一种树形的数据结构&#xff0c;处理不相交的两个集合的合并与查询问题。 【参考&#xff1a;&#x1f4…

Linux·基本指令(下)

1. mv 指令 (move) 语法&#xff1a;mv[选项] 源文件或目录 目标文件或目录 功能&#xff1a;将源文件或目录剪贴到一个新位置&#xff0c;或给源文件或目录改名但不会改变其内容 常用选项&#xff1a; -f &#xff1a;force 强制&#xff0c;如果目标文件已经存在&#xff0c;…

Unty 崩溃问题(Burst 1.8.2)

错误代码&#xff1a; Assertion failed on expression: exception SCRIPTING_NULL UnityEngine.StackTraceUtility:ExtractStackTrace () Unity.Burst.BurstCompiler:SendRawCommandToCompiler (string Unity版本&#xff1a;2021.3.17F1&#xff0c;Burst 1.8.2 表现&…

openstack设置IP直接登录,不需要加dashboard后缀

openstack 实验环境&#xff0c;openstack-t版&#xff0c;centos2009 修改配置文件 [rootcontroller ~]# vim /WEBROOT /etc/openstack-dashboard/local_settings #将dashboard去掉 WEBROOT /dashboard/ #改为 WEBROOT /[rootcontroller ~]# vim /etc/httpd/conf.d/openst…

pytorch学习(七):池化层的使用

MaxPool2d&#xff1a; 参数详解&#xff1a; kernel_size: int or tuple。 stride&#xff1a;窗口的步长&#xff0c;默认值是kernel_size的值。&#xff08;卷积层默认值为1&#xff09; dilation&#xff1a;如下图&#xff0c;控制窗口内内元素之间的距离。学名空洞卷积…

浅析stm32启动文件

浅析stm32启动文件 文章目录 浅析stm32启动文件1.什么是启动文件&#xff1f;2.启动文件的命名规则3.stm32芯片的命名规则 1.什么是启动文件&#xff1f; 我们来看gpt给出的答案&#xff1a; STM32的启动文件是一个关键的汇编语言源文件&#xff0c;它负责在微控制器上电或复位…

持续集成05--Gogs的安装与使用

前言 在持续集成/持续部署&#xff08;CI/CD&#xff09;的旅程中&#xff0c;版本控制系统是不可或缺的一环。当我们在使用jenkins&#xff0c;想要达到测试脚本有更新&#xff0c;就让项目自动去进行构建&#xff0c;或者当开发脚本有更新&#xff0c;也可以自动去构建的效果…

graham 算法计算平面投影点集的凸包

文章目录 向量的内积&#xff08;点乘&#xff09;、外积&#xff08;叉乘&#xff09;确定旋转方向numpy 的 cross 和 outernp.inner 向量与矩阵计算示例np.outer 向量与矩阵计算示例 python 示例生成样例散点数据图显示按极角排序的结果根据排序点计算向量转向并连成凸包 基本…

linux中关于环境变量的常用的设置方法

一. linux中设置环境变量的方式 1.使用/etc/environment, 是一个全局的环境变量设置文件&#xff0c;它会影响到所有用户和所有进程。当你需要设置一个全局的环境变量时&#xff0c;应该使用这个文件。这个文件的格式是 KEYvalue&#xff0c;每行一个环境变量。 2. 使用/etc/…

Spring Data Redis + Redis数据缓存学习笔记

文章目录 1 Redis 入门1.1 简介1.2 Redis服务启动与停止&#xff08;Windows&#xff09;1.2.1 服务启动命令1.2.2 客户端连接命令1.2.3 修改Redis配置文件1.2.4 Redis客户端图形工具 2. Redis数据类型2.1 五种常用数据类型介绍 3. Redis常用命令3.1 字符串操作命令3.2 哈希操作…

【学习笔记】虚幻SkeletalMesh学习(一)基础介绍

文章目录 零、前言一、资源介绍1.1 骨架资源1.2 骨架网格体资源 二、UE4中的定义2.1 骨骼数据2.2 模型网格数据 三、渲染3.1 RenderData的初始化3.2 渲染对象的创建3.3 渲染对象的更新3.3.1 游戏线程的更新&#xff08;*FSkeletalMeshObjectGPUSkin::Update*&#xff09;3.3.2 …

大模型“重构”教育:解构学习奥秘,推动教育普惠

大模型“重构”千行百业系列选题 生成式人工智能的热潮&#xff0c;为AI领域的发展注入新的活力&#xff0c;而“赋能千行百业”已经成为人们普遍对于人工智能和大模型的全新理解。 人工智能和大模型技术的迅猛发展正在以前所未有的速度深刻改变着各个行业。正如专家所预测&a…

《昇思25天学习打卡营第23天|onereal》

第23天学习内容简介&#xff1a; ----------------------------------------------------------------------------- 本案例基于MindNLP和ChatGLM-6B实现一个聊天应用。 1 环境配置 配置网络线路 2 代码开发 下载权重大约需要10分钟 ------------------------------- 运…

汇总国内镜像提供了Redis的下载地址

文章目录 1. 清华大学开源软件镜像站&#xff1a;2. 中国科技大学开源软件镜像&#xff1a;3. 阿里云镜像&#xff1a;4. 华为云镜像&#xff1a;5. 腾讯云镜像&#xff1a;5. 官方GitHub仓库&#xff08;虽然不是镜像&#xff0c;但也是一个可靠的下载源&#xff09;&#xff…

XX2104 培训【C++解决】

描述 某培训机构的学员有如下信息&#xff1a; 姓名&#xff08;字符串&#xff09; 年龄&#xff08;周岁&#xff0c;整数&#xff09; 去年 NOIP 成绩&#xff08;整数&#xff0c;且保证是 5 的倍数&#xff09; 经过为期一年的培训&#xff0c;所有同学的成绩都有所提高&…

【数据结构与算法】数据结构(Data Structure)的基本概念及其研究对象

什么是程序 算法数据结构程序 —— Nicklaus Wirth(尼古拉斯沃斯) Niklaus Wirth是一位著名的计算机科学家&#xff0c;他提出了"程序算法数据结构"的观点。他认为&#xff0c;程序不仅仅是执行特定任务的一段代码&#xff0c;而是由算法和数据结构两部分组成的。算法…

Linux--线程同步

目录 0.上篇 1. 线程同步概念 2.认识条件变量 2.1条件变量的概念 2.2认识接口 2.3写一个测试代码 3.生产者消费者模型 3.1概念部分 1.基本概念 2.主要问题 3.优点 4.思考切入点&#xff08;321原则&#xff09; 3.2编写基于BlockingQueue的生产者消费者模型&…

js执行机制----事件循环

前言 问题 一般情况下,我们都认为js是顺序执行的 但是遇到下列情况 setTimeout(function(){console.log(定时器开始啦) });new Promise(function(resolve){console.log(马上执行for循环啦);for(var i 0; i < 10000; i){i 99 && resolve();} }).then(function(…

MySQL数据库查询索引失效场景

在连表情况下,如果排序字段涉及到了两个表,排序字段将无法走索引. 加上第二个排序字段之后,走全表扫描了. 或者尽量让两次排序都用同一个表的字段,这样可以建联合索引让排序也能走索引.&#xff08;不想建联合索引的话&#xff0c;可以第二次排序用表id&#xff0c;这样单个的…