RabbitMQ、RocketMQ、Kafka延迟队列实现

news2024/11/16 15:40:28

延迟队列在实际项目中有非常多的应用场景,最常见的比如订单未支付,超时取消订单,在创建订单的时候发送一条延迟消息,达到延迟时间之后消费者收到消息,如果订单没有支付的话,那么就取消订单。

那么,今天我们需要来谈的问题就是RabbitMQ、RocketMQ、Kafka中分别是怎么实现延时队列的,以及他们对应的实现原理是什么?

RabbitMQ

RabbitMQ本身并不存在延迟队列的概念,在 RabbitMQ 中是通过 DLX 死信交换机和 TTL 消息过期来实现延迟队列的。

TTL(Time to Live)过期时间

有两种方式可以设置 TTL。

  1. 1. 通过队列属性设置,这样的话队列中的所有消息都会拥有相同的过期时间

  2. 2. 对消息单独设置过期时间,这样每条消息的过期时间都可以不同

那么如果同时设置呢?这样将会以两个时间中较小的值为准。

针对队列的方式通过参数x-message-ttl来设置。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

针对消息的方式通过setExpiration来设置。

AMQP.BasicProperties properties = new AMQP.BasicProperties();
Properties.setDeliveryMode(2);
properties.setExpiration("60000");
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "message".getBytes());

DLX(Dead Letter Exchange)死信交换机

一个消息要成为死信消息有 3 种情况:

  1. 1. 消息被拒绝,比如调用reject方法,并且需要设置requeuefalse

  2. 2. 消息过期

  3. 3. 队列达到最大长度

可以通过参数dead-letter-exchange设置死信交换机,也可以通过参数dead-letter- exchange指定 RoutingKey(未指定则使用原队列的 RoutingKey)。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey");
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

原理

当我们对消息设置了 TTL 和 DLX 之后,当消息正常发送,通过 Exchange 到达 Queue 之后,由于设置了 TTL 过期时间,并且消息没有被消费(订阅的是死信队列),达到过期时间之后,消息就转移到与之绑定的 DLX 死信队列之中。

这样的话,就相当于通过 DLX 和 TTL 间接实现了延迟消息的功能,实际使用中我们可以根据不同的延迟级别绑定设置不同延迟时间的队列来达到实现不同延迟时间的效果。

RocketMQ

RocketMQ 和 RabbitMQ 不同,它本身就有延迟队列的功能,但是开源版本只能支持固定延迟时间的消息,不支持任意时间精度的消息(这个好像只有阿里云版本的可以)。

他的默认时间间隔分为 18 个级别,基本上也能满足大部分场景的需要了。

默认延迟级别:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。

使用起来也非常的简单,直接通过setDelayTimeLevel设置延迟级别即可。

setDelayTimeLevel(level)

原理

实现原理说起来比较简单,Broker 会根据不同的延迟级别创建出多个不同级别的队列,当我们发送延迟消息的时候,根据不同的延迟级别发送到不同的队列中,同时在 Broker 内部通过一个定时器去轮询这些队列(RocketMQ 会为每个延迟级别分别创建一个定时任务),如果消息达到发送时间,那么就直接把消息发送到指 topic 队列中。

RocketMQ 这种实现方式是放在服务端去做的,同时有个好处就是相同延迟时间的消息是可以保证有序性的。

谈到这里就顺便提一下关于消息消费重试的原理,这个本质上来说其实是一样的,对于消费失败需要重试的消息实际上都会被丢到延迟队列的 topic 里,到期后再转发到真正的 topic 中。

Kafka

对于 Kafka 来说,原生并不支持延迟队列的功能,需要我们手动去实现,这里我根据 RocketMQ 的设计提供一个实现思路。

这个设计,我们也不支持任意时间精度的延迟消息,只支持固定级别的延迟,因为对于大部分延迟消息的场景来说足够使用了。

只创建一个 topic,但是针对该 topic 创建 18 个 partition,每个 partition 对应不同的延迟级别,这样做和 RocketMQ 一样有个好处就是能达到相同延迟时间的消息达到有序性。

原理

  • • 首先创建一个单独针对延迟队列的 topic,同时创建 18 个 partition 针对不同的延迟级别

  • • 发送消息的时候根据延迟参数发送到延迟 topic 对应的 partition,对应的key为延迟时间,同时把原 topic 保存到 header 中

ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>("delay_topic", delayPartition, delayTime, data);
producerRecord.headers().add("origin_topic", topic.getBytes(StandardCharsets.UTF_8));
  • • 内嵌的consumer单独设置一个ConsumerGroup去消费延迟 topic 消息,消费到消息之后如果没有达到延迟时间那么就进行pause,然后seek到当前ConsumerRecordoffset位置,同时使用定时器去轮询延迟的TopicPartition,达到延迟时间之后进行resume

  • • 如果达到了延迟时间,那么就获取到header中的真实 topic ,直接转发

这里为什么要进行pauseresume呢?因为如果不这样的话,如果超时未消费达到max.poll.interval.ms 最大时间(默认300s),那么将会触发 Rebalance。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/107946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Proteus8仿真:51单片机IrLink红外发送加接受模块的使用

51单片机IrLink红外的使用元器件原理图部分代码单片机1发送main.c单片机2接受main.c工程文件元器件 元器件名称51单片机AT89C51红外收发IRLINK按键BUTTON发光二极管LED-RED时钟激励源DCLOCK与门74LS08示波器 原理图部分 关于IRLINK的使用&#xff1a; 在Proteus上就是一个红外…

数据中台选型前必读(七):解读数据服务的四大关键技术

在前面的文章中&#xff0c;我们介绍了“数据服务”对于“数据中台”的重要性&#xff0c;并讲解了数据服务解决的问题及其核心功能&#xff0c;在这个系列的最终篇我们展开聊聊数据服务的四大关键技术&#xff0c;然后总结一下数据服务架构的三大关键点&#xff0c;希望对大家…

JSP ssh培训管理系统myeclipse开发mysql数据库MVC模式java编程计算机网页设计

一、源码特点 JSP ssh 培训管理系统是一套完善的web设计系统&#xff08;系统采用ssh框架进行设计开发&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Mye…

校园打架行为识别检测 yolov7

校园打架行为识别检测系统基于python基于yolov7深度学习框架边缘分析技术&#xff0c;自动对校园、广场等区域进行实时监测&#xff0c;当监测到有人打架斗殴时&#xff0c;系统立即抓拍存档语音提醒&#xff0c;并将打架行为回传给学校后台&#xff0c;提醒及时处理打架情况。…

Word控件Spire.Doc 【超链接】教程(7):在 C#、VB.NET 中的 Word 中创建图像超链接

Spire.Doc for .NET是一款专门对 Word 文档进行操作的 .NET 类库。在于帮助开发人员无需安装 Microsoft Word情况下&#xff0c;轻松快捷高效地创建、编辑、转换和打印 Microsoft Word 文档。拥有近10年专业开发经验Spire系列办公文档开发工具&#xff0c;专注于创建、编辑、转…

你一定要会的JavaFile

File对象就表示一个路径&#xff0c;可以是文件的路径&#xff0c;也可以是文件夹的路径这个路径可以是存在的&#xff0c;也允许是不存在的File的构造 方法名称说明public File(String pathname)根据文件路径创建文件对象public File(String parent,String child)根据父路径名…

第4章 角色Api控件器的实现与调试

1 自定义管道中间件 1.1 WebApi.Middleware.CorsMiddleware namespace WebApi.Middleware { /// <summary> /// 【跨域访问中间件--类】 /// <remarks> /// 摘要&#xff1a; /// 该管道中间件类主要为了解决在由vue/uni-app前端项目(Cors)访问当前后端项…

你的期待薪资是多少?为什么?

很多人去面试的时候&#xff0c;就像打游戏&#xff0c;过五关斩六将&#xff0c;终于到最后一关了&#xff0c;但是谈薪资的难度堪比打游戏中搞定终级 boss 的难度&#xff0c;真的是太「南」了&#xff0c;好多人都是因为这个问题让自己五味杂陈呀。报高了怕好 offer 失之交臂…

【Call for papers】SIGIR-2023(CCF-A/内容检索/2023年1月31日截稿)

The 46th International ACM SIGIR Conference on Research and Development in Information Retrieval will be held from 23-27 July, 2023 in Taipei. 文章目录1.会议信息2.时间节点3.论文主题1.会议信息 会议介绍&#xff1a; SIGIR是展示新研究成果和展示信息检索新系统和…

Postgresql INDEX HOT 原理与更好的 “玩转” INDEX

随着问问题的同学越来越多&#xff0c;公众号内部私信回答问题已经很困难了&#xff0c;所以建立了一个群&#xff0c;关于各种数据库的问题都可以&#xff0c;目前主要是 POSTGRESQL, MYSQL ,MONGODB ,POLARDB ,REDIS&#xff0c;SQL SERVER 等&#xff0c;期待你的加入&#…

[洛谷]P1996 约瑟夫问题

[洛谷]P1996 约瑟夫问题一、问题描述题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1提示二、思路分析1、算法标签&#xff1a;2、算法分析&#xff1a;三、代码实现1、环形链表2、队列一、问题描述 [洛谷]P1996 约瑟夫问题 题目描述 nnn 个人围成一圈&#xff0c;从…

3.Spark 操作

基于centos7 ,hadoop2.7.3, spark-2.4.4-bin-hadoop2.7.tgz 目录: 一.spark shell二. 读取hdfs文件三.Idea中编写wordcount一.spark shell 在spark shell中编写wordcount程序读取本地文件 1、准备数据源(创建目录,创建文件) 2.代码: --注意修改文件地址-- sc.textF…

【QT开发笔记-基础篇】| 第五章 绘图QPainter | 5.8 画刷设置

本节对应的视频讲解&#xff1a;B_站_视_频 https://www.bilibili.com/video/BV1A44y1Z7vz 本节讲解画刷的设置&#xff0c;包括画刷的颜色和样式 画刷设置完后&#xff0c;就可以把该画刷设置给 QPainter 了 1. 相关 API 1.1 画刷颜色 // 获取和设置画刷的颜色 const QCo…

新冠阳性的第三篇博客,使用Swagger管理API

新冠阳性的第三篇博客&#xff0c;使用Swagger管理API1.Swagger简介2.在项目中使用Swagger3.配置swagger4.swagger配置扫描接口5.配置API文档的分组6.swagger的实体类扫描7.给Controller加文档注释今天是新冠确诊的第二天&#xff0c;得了新冠也不要忘记学习啊&#xff01;&…

一文读懂自动驾驶汽车:软硬结合 造就未来出行体验(上篇)

在 GTC 2022 秋季大会上&#xff0c;NVIDIA 汽车部门营销经理 Katie Burke Washabaugh&#xff0c;面向想要了解自动驾驶汽车、并有志于投身自动驾驶行业的观众&#xff0c;介绍了自动驾驶汽车的历史、工作原理、相关技术以及发展前景。本文对此次分享的精华内容进行了汇总和整…

基于蒙特卡诺的电动汽车对电网影响(数据+Matlab代码)

目录 0 知识回顾 1 电网没考虑电动汽车时 1.1 案例1&#xff08;4kw&#xff09; 1.2 案例2&#xff08;7kw&#xff09; 31.3 案例3&#xff08;20kw&#xff09; 2 静态测试 2.1 收敛的最优结果 2.2 改变电动汽车数量的影响 2.3 收敛的最优结果 3 动态测试 4 一…

图结构

图结构 从哥尼斯堡的七桥问题开始 ▪ 18世纪初普鲁士的哥斯尼堡,有一条河穿过,河上有两个小岛,有七座桥把两个小岛与河岸联系起来 ▪ 问题:一个步行者怎样才能不重复、不遗漏地一次走完七座桥&#xff0c;最后回到出发点。 ▪ 难点&#xff1a;可能的走法----7&#xff01;5…

苹果给出 AirTag 固件更新日志,苹果Find My功能越来越完善

自 11 月以来&#xff0c;苹果已经为其 AirTag 物品追踪器发布了两个固件更新。然而&#xff0c;该公司此前并没有详细说明这些更新带来了什么变化。不过有网友发现&#xff0c;苹果终于分享了最新 AirTag 固件更新的更新内容。 以下是 AirTag 固件更新 2.0.24 和 2.0.36 带来…

[力扣c++实现]85. 最大矩形

85. 最大矩形 给定一个仅包含 0 和 1 、大小为 rows x cols 的二维二进制矩阵&#xff0c;找出只包含 1 的最大矩形&#xff0c;并返回其面积。 示例 1&#xff1a; 输入&#xff1a;matrix [[“1”,“0”,“1”,“0”,“0”],[“1”,“0”,“1”,“1”,“1”],[“1”,“1”…

2022最新最全的Java面试八股文小抄开源!带你摸熟 20+ 互联网公司面试考点

2022真是多变的一年&#xff0c;相对往年我们会发现今年猎头电话少了&#xff0c;大部分企业年终奖缩水&#xff0c;加薪幅度也不如往年&#xff0c;选择好 offer 就要趁早&#xff0c;现在开始准备吧&#xff0c;刷一波 Java 面试题&#xff0c;能回答 70%就去 BATJTMD 大胆试…