RocketMQ~消息的种类与生命周期(普通消息、延时定时消息、事务消息)

news2024/11/13 11:22:38

普通消息

普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。

以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至 RocketMQ 服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。

每个消息之间都是相互独立的,且不需要产生关联。

另外还有日志系统,以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到 RocketMQ 。

普通消息生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

延时&定时消息

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。

使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

定时消息仅支持在 MessageType 为 Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。

在 4.x 版本中,只支持延时消息,默认分为 18 个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,也可以在配置文件中增加自定义的延时等级和时长。

在 5.x 版本中,开始支持定时消息,在构造消息时提供了 3 个 API 来指定延迟时间或定时时间。

基于定时消息的超时任务处理具备如下优势:

  • 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
  • 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。RocketMQ 的定时消息具有高并发和水平扩展的能力

如何实现延迟&定时消息

当消息发送到Broker后,Broker会将消息根据延迟级别进行存储

RocketMQ的延迟消息实现方式是:将消息先存储在内存中,然后使用Timer定时器进行消息的延迟,到达指定的时间后再存储到磁盘中,最后投递给消费者。

并使用Timer定时器来实现延迟投递。但是,由于Timer定时器有一定的缺陷,比如在定时器中有大量任务时,会导致定时器的性能下降,从而影响消息投递。

因此,在RocketMQ5.0中,采用了一种新的实现方式:基于时间轮的定时消息。时间轮是一种高效的定时器算法,能够处理大量的定时任务,并且能够在O(1)时间内找到下一个即将要执行的任务,因此能够提高消息的投递性能。

并且,基于时间轮的定时消息能够支持更高的消息精度,可以实现秒级、毫秒级甚至更小时间粒度的定时消息。

具体实现方式如下:

  1. RocketMQ在Broker端使用一个时间轮来管理定时消息,将消息按照过期时间放置在不同的槽位中,这样可以大幅减少定时器任务的数量。
  2. 时间轮的每个槽位对应一个时间间隔,比如1秒、5秒、10秒等,每次时间轮的滴答,槽位向前移动一个时间间隔。
  3. 当Broker接收到定时消息时,根据消息的过期时间计算出需要投递的槽位,并将消息放置到对应的槽位中。
  4. 当时间轮的滴答到达消息的过期时间时,时间轮会将该槽位中的所有消息投递给消费者。

延时&定时消息生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
  • 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度

顺序消息

Kafka只支持同一个Partition内消息的顺序性一样,RocketMQ中也提供了基于队列(分区)的顺序消费。即同一个队列内的消息可以做到有序,但是不同队列内的消息是无序的。

如果要使用顺序消息仅支持使用 MessageType 为 FIFO 的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。

如何实现顺序的消息

和普通消息发送相比,顺序消息发送必须要设置消息组。要保证消息的顺序性需要生产者串行发送,需要在send方法中,传入一个MessageQueueSelector,MessageQueueSelector中需要实现一个select方法,这个方法去就是用来定义要把消息发送到哪个MessageQueue。通常可以使用取模法进行路由。

通过以上形式就可以将需要有序的消息发送到同一个队列中。需要注意的时候,这里需要使用同步发送的方式!

消息按照顺序发送的消息队列中之后。消费者如何按照发送顺序进行消费呢?

RocketMQ的MessageListener回调函数提供了两种消费模式:

  • 有序消费模式MessageListenerOrderly
  • 并发消费模式MessageListenerConcurrently

所以,为了保证同一个队列中的的有序消息可以被顺序消费,就要保证RocketMQ的Broker只会把消息发送到同一个消费者上,单线程使用 MessageListenerConcurrently 即可可以顺序消费、多线程环境下需要使用 MessageListenerOrderly 才能顺序消费。

如果是多线程环境下,为了保证同一个队列中!的有序消息可以被顺序消费,就要保证RocketMQ的Broker只会把消息发送到同一个消费者上,这时候就需要加锁了。

在实现中,ConsumeMessageOrderlyService初始化的时候,会启动一个定时任务,会尝试向Broker为当前消费者客户端申请分布式锁。如果获取成功,那么后续消息将会只发给这个Consumer。

接下来在消息拉取的过程中,消费者会一次性拉取多条消息的,并且会将拉取到的消息放入ProcessQueue,同时将消息提交到消费线程池进行执行。

那么拉取之后的消费过程,怎么保证顺序消费呢?这里就需要更多的锁了。

RocketMQ在消费的过程中,需要申请MessageQueue 锁,确保存在同一时间,一个队列中只有一个线程能处理队列中的消息。获取到MessageQueue的锁后,就可以从ProcessQueue中依次拉取一批消息处理了,但是这个过程中,为了保证消息不会出现重复消费,还需要对ProcessQueue进行加锁。然后就可以开始处理业务逻辑了。

总结下来就是三次加锁,先锁定Broker上的MessageQueue,确保消息只会投递到唯一的消费者,对本地的MessageQueue加锁,确保只有一个线程能处理这个消息队列。对存储消息的ProcessQueue加锁,确保在重平衡的过程中不会出现消息的重复消费。

  • Broker 上的 MessageQueue
    消息存储与分发:Broker 上的 MessageQueue 主要用于存储和管理接收到的消息。它作为消息的中间存储介质,接收生产者发送的消息,并等待消费者来获取和处理这些消息。
    负载均衡:Broker 可以将消息均匀地分配到不同的 MessageQueue 中,实现负载均衡,避免单个队列承载过多的消息压力,提高系统的整体性能和稳定性。
    消息路由:确定消息的传递路径和目标,帮助将消息准确地传递到相应的消费者或其他 Broker 节点,确保消息在分布式系统中的正确流转。
  • 本地的 MessageQueue
    缓存与临时存储:本地的 MessageQueue 可以作为本地应用或进程的消息缓存区域。当网络延迟或 Broker 处理繁忙时,本地的 MessageQueue 可以暂时存储即将发送或接收的消息,防止数据丢失或消息积压。
    提高性能与响应速度:本地缓存的消息可以快速被本地应用访问和处理,减少了与远程 Broker 之间的网络通信开销,提高了消息处理的实时性和响应速度。
    离线处理支持:在网络连接中断或系统处于离线状态时,本地的 MessageQueue 可以存储尚未处理的消息,待网络恢复或系统重新上线后,再进行消息的发送和处理,保证消息处理的连续性。
  • 存储消息的 ProcessQueue
    消息排序与顺序处理:ProcessQueue 负责对存储的消息进行排序和组织,确保消息按照一定的顺序被处理。这对于一些对消息处理顺序有严格要求的业务场景非常重要,例如事务性消息处理或基于序列的业务流程。
    批量处理与优化:可以将多个相关的消息聚集在一起,进行批量处理,提高消息处理的效率和资源利用率。例如,将一批具有相同业务逻辑或操作类型的消息集中处理,减少处理过程中的上下文切换和重复操作。
    消息过滤与转换:在 ProcessQueue 中,可以对消息进行过滤、转换和预处理操作,例如根据消息的内容、类型、优先级等条件进行筛选和转换,以满足后续业务处理的需求。
    在这里插入图片描述

前面介绍加锁过程中,一共加了三把锁,那么,第三把锁如果不加的话,是不是也没问题?因为我们已经对MessageQueue加锁了,为啥还需要对ProcessQueue再次加锁呢?这里其实主要考虑的是重平衡的问题。

当我们的消费者集群,新增了一些消费者,发生重平衡的时候。某个队列可能会原来属于客户端A消费的,但是现在要重新分配给客户端B了。这时候客户端A就需要把自己加在Broker上的锁解掉,而在这个解锁的过程中,就需要确保消息不能在消费过程中就被移除了,因为如果客户端A可能正在处理一部分消息,但是位点信息还没有提交,如果客户端B立马去消费队列中的消息,那存在一部分数据会被重复消费。

那么如何判断消息是否正在消费中呢,就需要通过这个ProcesssQueue上面的锁来判断了,也就是说在解锁的线程也需要尝试对ProcessQueue进行加锁,加锁成功才能进行解锁操作! 以避免过程中有消息消费。

顺序消费存在的问题

通过上面的介绍,我们知道了RocketMQ的顺序消费是通过在消费者上多次加锁实现的,这种方式带来的问题就是会降低吞吐量,并且如果前面的消息阻塞,会导致更多消息阻塞。所以,顺序消息需要慎用。

事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性

简单来讲,就是将本地事务(数据库的 DML 操作)与发送消息合并在同一个事务中

例如,新增一个订单。在事务未提交之前,不发送订阅的消息。发送消息的动作随着事务的成功提交而发送,随着事务的回滚而取消。当然真正地处理过程不止这么简单,包含了半消息、事务监听和事务回查等概念。

如何实现事务消息

在发送事务消息时,首先向RocketMQ Broker发送一条"half消消息"(即半消息),半消息将被存储在Broker端的事务消息日志中,但是这个消息还不能被消费者消费。

接下来,在半消息发送成功后,应用程序通过执行本地事务来确定是否要提交该事务消息。如果本地事务执行成功,就会通知RocketMQ Broker提交该事务消息,使得该消息可以被消费者消费,否则,就会通知RocketMQ Broker回滚该事务消息,该消息将被删除,从而保证消息不会被消费者消费。
在这里插入图片描述
拆解下来的话,主要有以下4个步骤:

  1. 发送半消息:应用程序向RocketMQ Broker发送一条半消息,该消息在Broker端的事务消息日志中被标记为"prepared"状态。
  2. 执行本地事务:RocketMQ会通知应用程序执行本地事务。如果本地事务执行成功,应用程序通知RocketMQBroker提交该事务消息。
  3. 提交事务消息:RocketMQ收到提交消息以后,会将该消息的状态从"prepared"改为"committed",并使该消息可以被消费者消费。
  4. 回滚事务消息:如果本地事务执行失败,应用程序通知RcocketMQ Broker回滚该事务消息,RocketMQ将该消息的状态从"prepared"改为"rollback",并将该消息从事务消息日志中删除,从而保证该消息不会被消费者消费。
  • 如果一直没收到COMMIT或者ROLLBACK怎么办?

在RocketMQ的事务消息中,如果半消息发送成功后,RocketMQ的事务消息中,Broker在规定时间内没有收到COMMIT或者ROLLBACK消息。
RocketMQ会向应用程序发送一条检查请求,应用程序可以通过回调方法返回是否要提交或回滚该事务消息。如果应用程序在规定时间内未能返回响应,RocketMQ会将该消息标记为"UNKNOW"状态。

在标记为"UNKNOW"状态的事务消息中,如果应用程序有了日确的结果,还可以向MQ发送COMMIT或者ROLLBACK。但是MQ不会一直等下去,如果过期时间已到,RocketMQ会自自动回滚该事务消息,将其从事务消息日志中删除。

  • 第一次发送半消息失败了怎么办?

在事务消息的一致性方案中,我们是先发半消息,再做业务操作的。所以,如果半消息发失败了,那么业务操作也不会进行,不会有不一致的问题。遇到这种情况重试就行了。

  • 为什么要用事务消息?

本地事务执行完成之后再发送消息有什么区别?为什么要有事务消息呢?

主要是因为:本地事务执行完成之后再发送消息可能会发消息失败。一旦发送消息失败了,那么本地事务提交了,但是消息没成功,那么监听者就收不到消息,那么就产生数据不一致了。

那如果用事务消息。先提交一个半消息,然后执行本地事务,再发送一个commit的半消息。如果后面这个commit半消息失败了,MQ是可以基于第一个半消息不断反查来推进状态态的。这样只要本地事务提交成功,最终MQ也会成功。如果本地事务rolliback,那么MQ的消息也会rollback。保证了一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1925838.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

im即时通讯系统有哪些?

IM即时通讯系统是一种通过互联网和移动通信网络实现实时通信的系统。在众多IM即时通讯系统中,WorkPlus作为企业级IM即时通讯系统,提供了全面的通讯和协作解决方案。本文将介绍几种常见的IM即时通讯系统,以及WorkPlus作为企业级IM即时通讯系统…

第三方配件也能适配苹果了,iOS 18与iPadOS 18将支持快速配对

苹果公司以其对用户体验的不懈追求和对创新技术的不断探索而闻名。随着iOS 18和iPadOS 18的发布,苹果再次证明了其在移动操作系统领域的领先地位。 最新系统版本中的一项引人注目的功能,便是对蓝牙和Wi-Fi配件的配对方式进行了重大改进,不仅…

【自动驾驶汽车通讯协议】UART通信详解:理解串行数据传输的基石

文章目录 0. 前言1. 同步通讯与异步通讯1.1 同步通信1.2 异步通信 2. UART的数据格式3. 工作原理3.1 波特率和比特率3.2 UART的关键特性 4. UART在自动驾驶汽车中的典型应用4.1 UART特性4.2应用示例 5. 结语 0. 前言 按照国际惯例,首先声明:本文只是我自…

html5——列表、表格

目录 列表 无序列表 有序列表 自定义列表 表格 基本结构 示例 表格的跨列 表格的跨行 列表 无序列表 <ul>【声明无序列表】 <li>河间驴肉火烧</li>【声明列表项】 <li>唐山棋子烧饼</li> <li>邯郸豆沫</li> <l…

pyinstaller教程(二)-快速使用(打包python程序为exe)

1.介绍 PyInstaller 是一个强大的 Python 打包工具&#xff0c;可以将 Python 程序打包成独立的可执行文件。以下会基于如何在win系统上将python程序打包为exe可执行程序为例&#xff0c;介绍安装方式、快速使用、注意事项以及特别用法。 2.安装方式 通过 pip 安装 PyInstal…

随笔-不是来养老的吗

来了有一个多月了&#xff0c;日子过得飞快。都以为我来养老的&#xff0c;一开始我也这么认为&#xff0c;结果6月份的日均工时&#xff0c;排在了部门第一。一个月做的需求比之前的三个月都多。 来之前&#xff0c;老徐让我多承担点&#xff0c;想着能有多少活嘛&#xff0c…

QT TCP多线程网络通信

学习目标&#xff1a; TCP网络通信编程 学习前置环境 运行环境:qt creator 4.12 QT TCP网络通信编程-CSDN博客 Qt 线程 QThread类详解-CSDN博客 学习内容 使用多线程技术实现服务端计数器 核心代码 客户端 客户端&#xff1a;负责连接服务端&#xff0c;每次连接次数1。…

sklearn之神经网络学习算法

文章目录 什么是神经网络人工神经网络的结构输入层输出层隐含层神经元的链接 近几年深度学习还是比较火的&#xff0c;尤其是在大语言模型之后&#xff0c;在本质上深度学习网络就是层数比较多的神经网络。sklearn并不支持深度学习&#xff0c;但是支持多层感知机&#xff08;浅…

安全测试理论

安全测试理论 什么是安全测试&#xff1f; 安全测试&#xff1a;发现系统安全隐患的过程安全测试与传统测试区别 传统测试&#xff1a;发现bug为目的 安全测试&#xff1a;发现系统安全隐患什么是渗透测试 渗透测试&#xff1a;已成功入侵系统为目标的的攻击过程渗透测试与安全…

自动驾驶事故频发,安全痛点在哪里?

大数据产业创新服务媒体 ——聚焦数据 改变商业 近日&#xff0c;武汉城市留言板上出现了多条关于萝卜快跑的投诉&#xff0c;多名市民反映萝卜快跑出现无故停在马路中间、高架上占最左道低速行驶、转弯卡着不动等情况&#xff0c;导致早晚高峰时段出现拥堵。萝卜快跑是百度 A…

配置与管理Samba服务器(详细教程)

目录 一、基础理论 二、samba工作流程 三、项目实训 3.1目的 3.2准备工作 3.2.1服务器安装samba服务软件包 3.2.2客户端安装软件包 3.3配置Samba服务 3.3.1开启Samba服务&#xff0c;并设置开启自启动 3.3.2创建共享文件夹 3.3.3创建群组 3.3.4修改文件用户权限 3.3.5修改…

LabVIEW人工模拟肺控制系统开发

开发了一种创新的主被动一体式人工模拟肺模型&#xff0c;通过LabVIEW开发的上位机软件&#xff0c;实现了步进电机驱动系统的精确控制和多种呼吸模式的模拟。该系统不仅能够在主动呼吸模式下精确模拟快速呼吸、平静呼吸和深度呼吸&#xff0c;还能在被动模式下通过PID控制实现…

训练CDN基础代码

文章目录 时间整体流程训练细节小结 时间 从开始在平台上搭建到现在可以在平台上训练已经4天了 有GPU平台一般是autoDL平台&#xff0c;白嫖200元平台是&#xff1a;https://cloud.lanyun.net/ 整体流程 1.注册平台&#xff0c;以蓝耘为例子 卡从好变坏依次是&#xff1a;…

C语言:指针详解(5)

目录 一、sizeof()函数和strlen()函数的对比 1.sizeof()函数 2.strlen()函数 3.sizeof()函数和strlen()函数的对比 二、数组和指针笔试试题解析 1.一维数组 2.字符数组 &#xff08;1&#xff09;代码1 &#xff08;2&#xff09;代码2 &#xff08;3&#xff09;代码…

【数组、特殊矩阵的压缩存储】

目录 一、数组1.1、一维数组1.1.1 、一维数组的定义方式1.1.2、一维数组的数组名 1.2、二维数组1.2.1、二维数组的定义方式1.2.2、二维数组的数组名 二、对称矩阵的压缩存储三、三角矩阵的压缩存储四、三对角矩阵的压缩存储五、稀疏矩阵的压缩存储 一、数组 概述&#xff1a;数…

香橙派AIpro:体验强劲算力,运行ROS系统

文章目录 前言一、香橙派AIpro开箱及功能介绍1.1香橙派AIpro开箱1.2香橙派AIpro功能介绍 二、香橙派AIpro资料下载及环境搭建2.1资料下载2.2环境搭建2.3使用串口启动进入开发板2.4使用HDMI线接入屏幕启动 三、部署ROS系统四、香橙派AIpro的使用和体验感受 前言 本篇文章将带体…

sip协议栈简介

SIP协议栈简介 SIP协议栈流程 数据链路层&#xff1a;当SIP消息从网络中传输到达TCP/IP协议栈时&#xff0c;首先被接收到的是数据链路层的数据帧。数据链路层会对数据帧进行解封装&#xff0c;得到网络层的IP数据报。 网络层&#xff1a;网络层会对IP数据报进行解析&#xf…

js实现 JSON数据格式化的两种方法

本次方法不使用JS库直接采用原生JS 完整HTML代码如下&#xff0c;您可以复制代码然后&#xff0c;新建一个.html的网页进行保存即可体验 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title></head><b…

Xcode 16 beta3 真机调试找不到 Apple Watch 的尝试解决

很多小伙伴们想用 Xcode 在 Apple Watch 真机上调试运行 App 时却发现&#xff1a;在 Xcode 设备管理器中压根找不到对应的 Apple Watch 设备。 大家是否已将 Apple Watch 和 Mac 都重启一万多遍了&#xff0c;还是束手无策。 Apple Watch not showing in XCodeApple Watch wo…

android13 文件管理器无法安装apk 奔溃问题

总纲 android13 rom 开发总纲说明 目录 1.前言 2.我们简单写个apk测试下 3.排查客户apk 4.frameworks源码排查 5.编译验证 6.彩蛋 1.前言 客户提供的文件管理apk不能安装apk文件,一点击就奔溃。 2.我们简单写个apk测试下 private void installApk(File apkFile) {i…