普通消息
普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。
以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至 RocketMQ 服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。
每个消息之间都是相互独立的,且不需要产生关联。
另外还有日志系统,以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到 RocketMQ 。
普通消息生命周期
- 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
- 待消费:消息被发送到服务端,对消费者可见,等待消费者消费的状态。
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
- 消息删除:RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
延时&定时消息
在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。
使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
定时消息仅支持在 MessageType 为 Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。
在 4.x 版本中,只支持延时消息,默认分为 18 个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,也可以在配置文件中增加自定义的延时等级和时长。
在 5.x 版本中,开始支持定时消息,在构造消息时提供了 3 个 API 来指定延迟时间或定时时间。
基于定时消息的超时任务处理具备如下优势:
- 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
- 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。RocketMQ 的定时消息具有高并发和水平扩展的能力。
如何实现延迟&定时消息
当消息发送到Broker后,Broker会将消息根据延迟级别进行存储。
RocketMQ的延迟消息实现方式是:将消息先存储在内存中,然后使用Timer定时器进行消息的延迟,到达指定的时间后再存储到磁盘中,最后投递给消费者。
并使用Timer定时器来实现延迟投递。但是,由于Timer定时器有一定的缺陷,比如在定时器中有大量任务时,会导致定时器的性能下降,从而影响消息投递。
因此,在RocketMQ5.0中,采用了一种新的实现方式:基于时间轮的定时消息。时间轮是一种高效的定时器算法,能够处理大量的定时任务,并且能够在O(1)时间内找到下一个即将要执行的任务,因此能够提高消息的投递性能。
并且,基于时间轮的定时消息能够支持更高的消息精度,可以实现秒级、毫秒级甚至更小时间粒度的定时消息。
具体实现方式如下:
- RocketMQ在Broker端使用一个时间轮来管理定时消息,将消息按照过期时间放置在不同的槽位中,这样可以大幅减少定时器任务的数量。
- 时间轮的每个槽位对应一个时间间隔,比如1秒、5秒、10秒等,每次时间轮的滴答,槽位向前移动一个时间间隔。
- 当Broker接收到定时消息时,根据消息的过期时间计算出需要投递的槽位,并将消息放置到对应的槽位中。
- 当时间轮的滴答到达消息的过期时间时,时间轮会将该槽位中的所有消息投递给消费者。
延时&定时消息生命周期
- 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
- 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
- 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
- 消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。
顺序消息
Kafka只支持同一个Partition内消息的顺序性一样,RocketMQ中也提供了基于队列(分区)的顺序消费。即同一个队列内的消息可以做到有序,但是不同队列内的消息是无序的。
如果要使用顺序消息仅支持使用 MessageType 为 FIFO 的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。
如何实现顺序的消息
和普通消息发送相比,顺序消息发送必须要设置消息组。要保证消息的顺序性需要生产者串行发送,需要在send方法中,传入一个MessageQueueSelector,MessageQueueSelector中需要实现一个select方法,这个方法去就是用来定义要把消息发送到哪个MessageQueue。通常可以使用取模法进行路由。
通过以上形式就可以将需要有序的消息发送到同一个队列中。需要注意的时候,这里需要使用同步发送的方式!
消息按照顺序发送的消息队列中之后。消费者如何按照发送顺序进行消费呢?
RocketMQ的MessageListener回调函数提供了两种消费模式:
- 有序消费模式MessageListenerOrderly
- 并发消费模式MessageListenerConcurrently
所以,为了保证同一个队列中的的有序消息可以被顺序消费,就要保证RocketMQ的Broker只会把消息发送到同一个消费者上,单线程使用 MessageListenerConcurrently 即可可以顺序消费、多线程环境下需要使用 MessageListenerOrderly 才能顺序消费。
如果是多线程环境下,为了保证同一个队列中!的有序消息可以被顺序消费,就要保证RocketMQ的Broker只会把消息发送到同一个消费者上,这时候就需要加锁了。
在实现中,ConsumeMessageOrderlyService初始化的时候,会启动一个定时任务,会尝试向Broker为当前消费者客户端申请分布式锁。如果获取成功,那么后续消息将会只发给这个Consumer。
接下来在消息拉取的过程中,消费者会一次性拉取多条消息的,并且会将拉取到的消息放入ProcessQueue,同时将消息提交到消费线程池进行执行。
那么拉取之后的消费过程,怎么保证顺序消费呢?这里就需要更多的锁了。
RocketMQ在消费的过程中,需要申请MessageQueue 锁,确保存在同一时间,一个队列中只有一个线程能处理队列中的消息。获取到MessageQueue的锁后,就可以从ProcessQueue中依次拉取一批消息处理了,但是这个过程中,为了保证消息不会出现重复消费,还需要对ProcessQueue进行加锁。然后就可以开始处理业务逻辑了。
总结下来就是三次加锁,先锁定Broker上的MessageQueue,确保消息只会投递到唯一的消费者,对本地的MessageQueue加锁,确保只有一个线程能处理这个消息队列。对存储消息的ProcessQueue加锁,确保在重平衡的过程中不会出现消息的重复消费。
- Broker 上的 MessageQueue
消息存储与分发:Broker 上的 MessageQueue 主要用于存储和管理接收到的消息。它作为消息的中间存储介质,接收生产者发送的消息,并等待消费者来获取和处理这些消息。
负载均衡:Broker 可以将消息均匀地分配到不同的 MessageQueue 中,实现负载均衡,避免单个队列承载过多的消息压力,提高系统的整体性能和稳定性。
消息路由:确定消息的传递路径和目标,帮助将消息准确地传递到相应的消费者或其他 Broker 节点,确保消息在分布式系统中的正确流转。 - 本地的 MessageQueue
缓存与临时存储:本地的 MessageQueue 可以作为本地应用或进程的消息缓存区域。当网络延迟或 Broker 处理繁忙时,本地的 MessageQueue 可以暂时存储即将发送或接收的消息,防止数据丢失或消息积压。
提高性能与响应速度:本地缓存的消息可以快速被本地应用访问和处理,减少了与远程 Broker 之间的网络通信开销,提高了消息处理的实时性和响应速度。
离线处理支持:在网络连接中断或系统处于离线状态时,本地的 MessageQueue 可以存储尚未处理的消息,待网络恢复或系统重新上线后,再进行消息的发送和处理,保证消息处理的连续性。 - 存储消息的 ProcessQueue
消息排序与顺序处理:ProcessQueue 负责对存储的消息进行排序和组织,确保消息按照一定的顺序被处理。这对于一些对消息处理顺序有严格要求的业务场景非常重要,例如事务性消息处理或基于序列的业务流程。
批量处理与优化:可以将多个相关的消息聚集在一起,进行批量处理,提高消息处理的效率和资源利用率。例如,将一批具有相同业务逻辑或操作类型的消息集中处理,减少处理过程中的上下文切换和重复操作。
消息过滤与转换:在 ProcessQueue 中,可以对消息进行过滤、转换和预处理操作,例如根据消息的内容、类型、优先级等条件进行筛选和转换,以满足后续业务处理的需求。
前面介绍加锁过程中,一共加了三把锁,那么,第三把锁如果不加的话,是不是也没问题?因为我们已经对MessageQueue加锁了,为啥还需要对ProcessQueue再次加锁呢?这里其实主要考虑的是重平衡的问题。
当我们的消费者集群,新增了一些消费者,发生重平衡的时候。某个队列可能会原来属于客户端A消费的,但是现在要重新分配给客户端B了。这时候客户端A就需要把自己加在Broker上的锁解掉,而在这个解锁的过程中,就需要确保消息不能在消费过程中就被移除了,因为如果客户端A可能正在处理一部分消息,但是位点信息还没有提交,如果客户端B立马去消费队列中的消息,那存在一部分数据会被重复消费。
那么如何判断消息是否正在消费中呢,就需要通过这个ProcesssQueue上面的锁来判断了,也就是说在解锁的线程也需要尝试对ProcessQueue进行加锁,加锁成功才能进行解锁操作! 以避免过程中有消息消费。
顺序消费存在的问题
通过上面的介绍,我们知道了RocketMQ的顺序消费是通过在消费者上多次加锁实现的,这种方式带来的问题就是会降低吞吐量,并且如果前面的消息阻塞,会导致更多消息阻塞。所以,顺序消息需要慎用。
事务消息
事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
简单来讲,就是将本地事务(数据库的 DML 操作)与发送消息合并在同一个事务中。
例如,新增一个订单。在事务未提交之前,不发送订阅的消息。发送消息的动作随着事务的成功提交而发送,随着事务的回滚而取消。当然真正地处理过程不止这么简单,包含了半消息、事务监听和事务回查等概念。
如何实现事务消息
在发送事务消息时,首先向RocketMQ Broker发送一条"half消消息"(即半消息),半消息将被存储在Broker端的事务消息日志中,但是这个消息还不能被消费者消费。
接下来,在半消息发送成功后,应用程序通过执行本地事务来确定是否要提交该事务消息。如果本地事务执行成功,就会通知RocketMQ Broker提交该事务消息,使得该消息可以被消费者消费,否则,就会通知RocketMQ Broker回滚该事务消息,该消息将被删除,从而保证消息不会被消费者消费。
拆解下来的话,主要有以下4个步骤:
- 发送半消息:应用程序向RocketMQ Broker发送一条半消息,该消息在Broker端的事务消息日志中被标记为"prepared"状态。
- 执行本地事务:RocketMQ会通知应用程序执行本地事务。如果本地事务执行成功,应用程序通知RocketMQBroker提交该事务消息。
- 提交事务消息:RocketMQ收到提交消息以后,会将该消息的状态从"prepared"改为"committed",并使该消息可以被消费者消费。
- 回滚事务消息:如果本地事务执行失败,应用程序通知RcocketMQ Broker回滚该事务消息,RocketMQ将该消息的状态从"prepared"改为"rollback",并将该消息从事务消息日志中删除,从而保证该消息不会被消费者消费。
- 如果一直没收到COMMIT或者ROLLBACK怎么办?
在RocketMQ的事务消息中,如果半消息发送成功后,RocketMQ的事务消息中,Broker在规定时间内没有收到COMMIT或者ROLLBACK消息。
RocketMQ会向应用程序发送一条检查请求,应用程序可以通过回调方法返回是否要提交或回滚该事务消息。如果应用程序在规定时间内未能返回响应,RocketMQ会将该消息标记为"UNKNOW"状态。
在标记为"UNKNOW"状态的事务消息中,如果应用程序有了日确的结果,还可以向MQ发送COMMIT或者ROLLBACK。但是MQ不会一直等下去,如果过期时间已到,RocketMQ会自自动回滚该事务消息,将其从事务消息日志中删除。
- 第一次发送半消息失败了怎么办?
在事务消息的一致性方案中,我们是先发半消息,再做业务操作的。所以,如果半消息发失败了,那么业务操作也不会进行,不会有不一致的问题。遇到这种情况重试就行了。
- 为什么要用事务消息?
本地事务执行完成之后再发送消息有什么区别?为什么要有事务消息呢?
主要是因为:本地事务执行完成之后再发送消息可能会发消息失败。一旦发送消息失败了,那么本地事务提交了,但是消息没成功,那么监听者就收不到消息,那么就产生数据不一致了。
那如果用事务消息。先提交一个半消息,然后执行本地事务,再发送一个commit的半消息。如果后面这个commit半消息失败了,MQ是可以基于第一个半消息不断反查来推进状态态的。这样只要本地事务提交成功,最终MQ也会成功。如果本地事务rolliback,那么MQ的消息也会rollback。保证了一致性。