深入浅出消息队列----【顺序消息的实现原理】

news2025/1/10 16:14:00

深入浅出消息队列----【顺序消息的实现原理】

  • 何为顺序
  • 发消息的顺序性
  • 存储消息的顺序性
  • 消费消息的顺序性
  • 顺序消息消费的三把锁
    • 第一把锁:分布式锁
    • 第二把锁:Synchronized
    • 第三把锁:ReentrantLock

本文仅是文章笔记,整理了原文章中重要的知识点、记录了个人的看法
文章来源:编程导航-鱼皮【yes哥深入浅出消息队列专栏】

何为顺序

  1. 因果顺序

请添加图片描述

  1. 时间顺序

请添加图片描述

发消息的顺序性

在明白什么是顺序后,我们需要保证消息在发送阶段的顺序就符合因果或时间顺序。

首先,我们需要保证单个生产者来发送顺序消息。

现在生产环境基本上都是多服务器部署,如果有多个生产者分布在不同的服务中,都往同一个 Topic 发送相关的顺序消息,那么我们压根无法保证消息的顺序性。

请添加图片描述

即使在因果上他们产生的顺序是对的,但是最终发送到 Broker 这个过程的顺序是无法把控的(可能产生消息的时间早,但是实际发送的时间晚,还有网络上的传输顺序也是不可预测的)。

保证单个生产者后,我们还需保证单个生产者内对顺序消息是单线程(串行)发送的

RocketMQ 的生产者是支持多线程发送消息的(线程安全),因此在使用上如果我们利用多线程提高发送顺序消息的速率,理论上就无法保证绝对的顺序。

比如消息-1和消息-2都发往顺序 Topic-A,消息-1比消息-2在顺序上领先,他也的确比消息-2更早产生,消息-1也更早被线程-A 处理,随后消息-2才产生并且被线程-B处理。

如图所示:

请添加图片描述

但是即使在这种情况下,都无法保证消息-1 一定比消息-2 早发送到 Broker 上,因为线程会被调度,可能线程-A 执行一半就停了,线程-B 还在执行,这样一来先启动的并不一定先执行完成,这就是多线程的不确定性

请添加图片描述

也就是多线程是无法保证顺序的,因此在发送的时候,需要单线程串行发送有关的顺序消息

所以对生产者来说,发送顺序消息需要保证两点:单个生产者串行发送

存储消息的顺序性

保证了发送的顺序性之后,我们来看看 Broker 存储消息的顺序性要如何保证。

在存储上我们知晓消息是按照时间顺序追加写入到 commitlog 中的,且会被分发到 consumeQueue 中。

请添加图片描述

同消费组内,一个 consumeQueue 仅会被一个消费者消费,且这个消费者会按照 consumeQueue 内存储的顺序来消费消息,因此我们仅需让相关的顺序消息都分配到同一个 consumeQueue 即可,这样存储上就是有序的。

好比之前举例过的订单情况,同一笔订单相关的创建、支付、发货都发往同一个队列即可:

请添加图片描述

那如何让相关的顺序消息投递到同一个 consumeQueu 中?

发送顺序消息的时候指定队列就行了

通过前面的学习可以知晓 RocketMQ 有队列的概念,对应到 Kafka 就是分区的概念,因此发消息的时候指定队列即可让相关的顺序消息被分配盗同一个 consumeQueu 中,以此来保证存储上的顺序。

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
	@Override
	public MessageQueue select(List mqs, Message msg, Object arg) {
		Integer id = (Integer)arg; // 订单号
		int index = id % mqs.size(); // 对队列数取余
		return mqs.get(index); // 选择队列
	}
}, orderId);

就像官方示例的代码,利用 MessageQueueSelector 来选择队列,select 方法里面的 mqs 就像 Topic 下所有队列,通过 orderId 取余队列数使得一样的订单都会被发往相同的队列,这样就保证分区顺序消息

当然,这样的操作部保证所有订单的顺序,仅保证同一笔订单相关消息的顺序。

如果我们要保证所有订单处理的顺序,那么直接写死选择一个队列即可,也就是让所有的订单相关消息都发往同一个队列,这样叫全局顺序消息

全局顺序消息的性能比较低,因为只能发往一个队列,这个并发度已经限制死了,像前面说的因为在集群模式下,统一消费组内,一个队列只能对应有一个消费者来消费

而分区顺序消息的并发度取决于 Topic 下的队列数,因此如果想提升性能仅需增加队列数即可,不同队列之间可以并发处理顺序消息,互不影响,性能较好。

一般在业务上,我们很少用到全局顺序消息,一般而言分区顺序消息就够用了。

消费消息的顺序性

存储上如果已经保证了顺序,那么消费者只要老老实实的按照拉取到的消息顺序一条一条消费,这样就保证消费的顺序性,但是实际上还是有很多细节的。

首先消费者需要保证单线程消费顺序消息,如果消费者是多线程消费消息,那么道理同生产者多线程发送消息一样,无法保证消息的顺序性。

并且还需要考虑异常的情况,也就是消费失败场景的处理

我们之前了解到正常消息如果消费失败会进行重试,重试 16 次后会进入死信队列,后续人工介入处理,不会堵着后面的消息。

而对于顺序消息来说,如果前置消息消费失败了,后续的消息能正常消费吗?

也就说重试多次后,能直接进入死信队列吗?跳过前置消息,后面的消息能正常消费吗?

很多情况下前面的消息消费失败,后续的消息是无法正常消费的,如果在逻辑上没有做处理很容易造成脏数据

还是拿订单来举例:比如创建订单的消息消费失败了,但是随后的支付消息消费成功,也就是成功把用户的钱给扣了,但订单实际上没生成,用户一看钱都扣了,订单却没有…

请添加图片描述

所以对于顺序消息来说,消费失败其实是比较难处理的,RocketMQ 对顺序消息的默认实现是重试次数时 Integer.MAX_VALUE 次。

这样不就一直重试堵着后面的消息?那也不行啊,一直堵着业务上不就阻塞了吗?

因此顺序消息的处理在业务上可能需要支持相关联的消息都直接失败,然后可以在另外的地方持久化保存这些消息,待后续修复可以重新消费

也就是不抛错,先记着不处理,不堵着后续无关的消息,后面可以人工介入或者其他方式去处理这些失败的消息。

还有重平衡机制需要考虑,简单来说就是消费者的负载均衡,反正在顺序消息场景下因为重平衡机制,可能会导致两个消费者消费同一个队列的消息,不仅导致重复消费,也可能使得顺序不一致。

因此需要一定的机制来避免这种情况的发送。

顺序消息消费的三把锁

在消费场景,RocketMQ 利用三把锁来尽可能地保证消息的消费顺序性。

很巧的是这三把锁,覆盖了我们常见的三把锁:分布式锁、Synchronized、ReentrantLock。

第一把锁:分布式锁

普通的消息消费用的是 RocketMQ 提供的 MessageListenerConcurrently,来实现并发消费。

而顺序消费用的是 MessageListenerOrderly 来保证顺序消费,RocketMQ 默认已经提供了一个实现类 ConsumeMessageOrderlyService。

这个 service 在启动的时候就会向 Broker 申请当前消费者负责的队列锁,会将自己的消费组、自己的客户端ID、以及负责的队列发往 Broker,Broker 就把对应的队列与这个消费者绑定,将这个关系存储在了本地。

请添加图片描述

这样一来,别的消费者如果想消费对应的队列也得来加分布式锁,如果发现已经被别的消费者绑定了,那么就无法拉取消息消费。

因此这个分布式锁保证了同一个消费组内,一个队列只会被分配给一个消费者。

对了,这个锁在 Broker 会过期,消费者会定时(默认每 20s)的去续这个锁来保证分布式锁的拥有。

第二把锁:Synchronized

第一把分布式锁保证了一个队列只会被分配给一个消费者,那么第二把本地锁保证了同一时刻只有一个线程去消费这个队列。

因为实际上 MessageListenerOrderly 拉取到消息后,也是丢给线程池并发消费的,因此需要有把锁来保证一个队列只会被一个线程消费。

第三把锁:ReentrantLock

线程获取到 Synchronized 锁后,要开始处理消费消息了。

但是在真正开始消费消息之前,会先获取 ProcessQueue 的 consumeLock,这个 lock 是一把 ReentrantLock。

那么为什么需要这把锁呢?从名字来看是表明正在消费消息的消费锁。

前面已经提到 RocketMQ 会有重平衡机制,当前的队列-1 此时属于消费者-1 负责,但重平衡后可能会分配给同一个消费组内的另一个消费者-2 负责。

而发生重平衡的时候,很可能当前的消费者-1 正在消费这个队列的数据,但是还没消费完,可能消费了一半且还未向 Broker 提交消费点位,这种情况不能直接将这个正在消费的队列重平衡给另一个消费者,因为会产生重复消费。

这把锁更像一个标记位,来表明当前这个队列还有消息在消费,无法重平衡,等待下一次重平衡

具体在真的发生队列平衡时,当前的消费者会先尝试获取这个队列对应的 ProcessQueue 的 consumeLock,如果获取失败说明该队列正在被消费,次队列重平衡失败,待下次重平衡再试。

如果获取 consumeLock 成功,表示当前没有正在消费消息,于是安心的去 Broker 解除分布式锁,这样一来新的消费者就能接手当前的队列了,消息也不会重复消费。

已经有一个 Synchronized 来保证一个队列只会被一个线程消费了吗,通过这个锁不能判断正在消费吗?为什么要加一个 consumeLock?

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2033108.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3仿飞书头像,根据不同名称生成不同的头像背景色

效果展示&#xff1a; 传递三个参数&#xff1a; name&#xff1a;要显示的名称&#xff1b;size&#xff1a;头像的大小&#xff1b;cutNum&#xff1a;分割当前名称的最后几位数&#xff1b; 代码如下&#xff1a; <template><div:style"{color: #fff,borde…

VMware虚拟机下安装Ubuntu22.04以及汉化配置保姆级教程

目录 一.VMware和Ubuntu下载 二.在VMware中创建Ubuntu 1.点击 创建新的虚拟机 2.选择典型 3.选择Ubuntu镜像包&#xff08;自定义存放的位置&#xff09; 4.创建个人信息&#xff08;密码一定要牢记&#xff09; 5.选择虚拟机的安装位置 6.其他配置项&#xff08;默认下…

在数字浪潮中扬帆远航,软件行业就业前景如何?

随着数字化转型的加速和信息技术的广泛应用&#xff0c;对于软件开发人员的需求持续增长。不仅传统IT企业需要大量的软件开发人才&#xff0c;各行各业的企业也普遍需要自主研发软件以满足其业务需求。对于具备较好的学习能力和适应能力的人来说&#xff0c;这个行业提供了更多…

jenkins一键推送到远程服务器并用docker容器启动

1.安装jenkins 我后端使用的是宝塔面板来安装的容器化jenkins,要选中允许外部访问&#xff0c;安装完之后没有那个选项了&#xff0c;一开始安装的时候要选中不使用域名和后面的允许外部访问。Jenkins 版本为&#xff1a; 2.462.1 2.配置Jenkins 2.1 Git plugin 安装完毕之…

江新安教授受邀引正基因进行《制药行业研发项目管理》培训

近日&#xff0c;科济管线创始人江新安教授应赛柏蓝邀请为北京引正基因科技有限公司&#xff08;简称引正基因&#xff09;进行《研发项目管理》授课。为提高项目管理水平&#xff0c;加强研发项目相关人员的管理能力&#xff0c;掌握研发项目管理技能与工具&#xff0c;江新安…

AI招聘在人才盘活中的作用:开启智慧人力新篇章

一、引言&#xff1a;AI赋能招聘新纪元 在21世纪的今天&#xff0c;随着科技的飞速发展&#xff0c;人工智能&#xff08;AI&#xff09;已经渗透到社会经济的各个角落&#xff0c;其中&#xff0c;人力资源管理领域也不例外。AI技术的引入&#xff0c;不仅颠覆了传统的招聘模…

代码规范 —— QMQ 开发规范

优质博文&#xff1a;IT-BLOG-CN 一、代码规范 【1】消费者必须以Consumer结尾&#xff0c;生产者必须以Producer结尾。 【2】选择合适的消费模式&#xff1a;根据业务判断消费模式是集群模式还是广播模式&#xff0c;具体为&#xff1a;MessageConsumerProvider.addListene…

R的行和列命名和类型的转换

下面内容摘录自&#xff1a; 4章8节&#xff1a;用R做数据重塑&#xff0c;行列命名和数据类型转换-CSDN博客 欢迎订阅我们专栏 一、行和列命名 在数据科学和统计分析中&#xff0c;命名是组织和管理数据的一个重要部分。尤其是在处理复杂的多维数据集时&#xff0c;为行和列命…

FPGA知识基础之--FIFO ip核的使用以及实例化clocking wizard ip产生一个异步FIFO,附RTL和仿真代码

目录 一、FIFO简介1.定义2.特点3.分类4.FIFO在FPGA中的应用 二、实验任务三、FIFO IP核1.接口2.写时序3.读时序1.Standara2 .FWFT 四、vivado设置五、程序设计1.模块2.时序3.异步信号传输4.RTL代码 五、仿真1.Testbench代码2.波形 一、FIFO简介 1.定义 FIFO是一种先进先出的数…

电能表在企业能源管理中的作用

电能表在企业能源管理中扮演着至关重要的角色&#xff0c;它不仅是能源计量的基础工具&#xff0c;更是企业实现高效能源管理、降低能源成本、提高竞争力的关键所在。 一、精确计量与实时监测 电能表作为能源计量的基础工具&#xff0c;其首要作用是实现电能的精确计量。相比…

PostgreSQL 练习 ---- psql 新增连接参数

目标 添加一个连接参数&#xff0c;默认为 false 。当 psql 连接时&#xff0c;若该连接参数非 “true” 时&#xff0c;用户 “u1“ 对表对象无操作权限&#xff0c;包括自己拥有的表。 连接机制简介 连接过程如下所述&#xff1a; 客户端初始化一个空连接&#xff0c;设置…

如何高效记录并整理编程学习笔记?

一&#xff1a;简介 在编程学习的过程中&#xff0c;建立一个高效的笔记记录和整理方法确实非常重要。下面是一些方法和建议&#xff0c;帮助你打造自己的编程学习“知识宝库”。 1&#xff09;. 选择合适的工具 选择一个适合自己的笔记工具非常重要。可以考虑以下几种&#…

SB3045LFCT-ASEMI无人机专用SB3045LFCT

编辑&#xff1a;ll SB3045LFCT-ASEMI无人机专用SB3045LFCT 型号&#xff1a;SB3045LFCT 品牌&#xff1a;ASEMI 封装&#xff1a;TO-220F 批号&#xff1a;最新 最大平均正向电流&#xff08;IF&#xff09;&#xff1a;30A 最大循环峰值反向电压&#xff08;VRRM&…

大模型系列8-Latex

大模型系列8-Latex 背景Latex符号符号加帽子、横线和波浪线求和连乘希腊字母等于约等于积分微分公式对齐算法矩阵 背景 目前正通过论文、博客、视频、文档等各种形式学习各种大模型知识。为了更好的记录&#xff0c;写了一些大模型的博客&#xff0c;不专业&#xff0c;只备忘…

Openlayers6 图形绘制和修改功能(结合React)

Openlayers常用的API了解的差不多了&#xff0c;就开始进入实战了&#xff0c;首先从绘制基本的图形开始&#xff0c;这里主要介绍一下绘制圆形、矩形和多边形。 通过使用openlayers的ol.interaction.Draw和ol.interaction.Modify模块实现地图上绘制圆形、矩形、多边形并修改编…

2024.8.12(LVS)

一、LVS 1、描述以及工作原理 1. 什么是LVS linux virtural server的简称,也就是linxu虚拟机服务器,这是一个由章文嵩博士发起的开源项目,官网是http://www.linuxvirtualserver.org,现在lvs已经是linux内核标准的一部分,使用lvs可以达到的技术目标是:通过linux达到负载均衡技…

mysql注入-字符编码技巧

一、环境搭建 创建数据表 CREATE TABLE mysql_Bian_Man (id int(10) unsigned NOT NULL AUTO_INCREMENT,username varchar(255) COLLATE latin1_general_ci NOT NULL,password varchar(255) COLLATE latin1_general_ci NOT NULL,PRIMARY KEY (id) ) ENGINEMyISAM AUTO_INCREME…

Python办公自动化:使用`xlutils` 修改Excel文档

在日常办公自动化中&#xff0c;除了读取Excel文件&#xff0c;我们还经常需要对文件进行修改或更新。在Python中&#xff0c;除了xlrd&#xff0c;还可以使用xlutils库来实现对Excel文件的修改操作。本文将继续以“巴黎奥运会奖牌榜.xlsx”文件为例&#xff0c;讲解如何使用xl…

OpenCV + CUDA + cuDNN模块编译

简介 在追求高端性能与资源优化并重的应用场景中&#xff0c;如边缘计算设备或资源受限的开发板上运行YOLO等复杂深度学习模型&#xff0c;采用C结合OpenCV与GPU加速技术相较于传统的Python环境展现出显著优势。这种策略不仅极大地提升了执行效率&#xff0c;还显著降低了运行时…

陶晶池串口屏数据存储区概述与使用

陶晶池串口屏的数据存储区大小&#xff1a;x系列是2k字节的&#xff0c;其他系列是1k字节的&#xff0c;超出了就会从头覆盖最先的字节 你可以在主动解析模式下调用u[x]来访问数据存储区内第x-1字节是什么&#xff0c;也可以读取usize看看记录大小 它的原理是&#xff0c;每接收…