一、基本定义
Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
- Topic:消息主题,用于将一类的消息进行归类,比如订单主题,就是所有订单相关的消息都可以由这个主题去承载,生产者向这个主题发送消息。
- 生产者:负责生产消息并发送消息到 Topic 的角色。
- 消费者:负责从 Topic 接收并消费消息的角色。
- 消息:生产者向 Topic 发送的内容,会被消费者消费。
- 消息属性:生产者发送的时候可以为消息自定义一些业务相关的属性,比如 Mesage Key 和 Tag 等。
- Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。
Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
二、消息存储
1、基础
分布式队列因为有可靠性的要求,一般要对消息进行持久化的处理。对于存储介质的选择:
1、关系型数据库DB:如Avtivemq,关系型数据库在数据量变大之后性能会显著下降,同时还需要进行关系型数据库的连接,数据库持久化的操作,最终还是数据还是进入的文件系统。
2、文件系统:如RocketMQ、Kafka、RabbitMQ。直接把数据保存在文件系统中,可靠快速。
2、消息存储和发送性能保证
如果磁盘配置得当,磁盘的顺序写入速度可以达到600MB/s,这超过了一般网卡的传输速度。但是随机写的速度只有大概100KB/s,和顺序写入相比相差了大概6000倍。所以RocketMQ就使用了顺序写来保证消息的存储速度。
默认情况下,从磁盘中读取文件或者通过网络向文件中写入数据,需要内核态与用户态的多次拷贝操作。RocketMQ使用了零拷贝的技术,省去了向用户态拷贝的步骤,提高了消息存盘和网络发送的速度。零拷贝机制在Java中有一个MappedByteBuffer来实现
采用MappedByteBuffer技术对文件有大小的要求,故RocketMQ的默认的一个CommitLog日志数据文件大小为1g。
3、消息存储结构
消息存储架构图中主要有下面三个跟消息存储相关的文件构成。
- commitLog: 存储消息的元数据,可能会有多个文件,每个文件默认大小1g。
- consumequeue:存储消息在CommitLog的索引,这个消费逻辑队列。为了加快commitLog的读取速度。当我们创建了一个消息队列就会对应产生一个对应的consumequeue。
- indexFile: 为了消息查询提供了一种通过key或者时间区间来查询消息的方法,这种提供indexFile来查找消息的方法不影响发送和消费消息的主流程
消息发送过来先存储到commitLog,为了加快commitLog读取速度,就出现了一个consumequeue,相当于消息的主键索引,而indexFile其他索引例如key或者时间
4、刷盘机制
同步和异步刷屏如下图所示
(1) 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
(2) 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache(内存)即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
三、高可用机制
通信机制
RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:
1、Broker启动后,向NameServer注册并每隔30s时间定时向NameServer上报Topic路由信息;
2、消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30s向NameServer拉取一次路由信息;
3、消息生产者Producer根据第二步获得路由信息,选择一个消息队列(MessageQueue)进行消息的投递,消息的接收方是Broker,Broker接收消息并落盘;
4、消息的消费者Consumer也根据第二步获得的路由信息,先进行客户端的负载均衡,然后选择一个或多个消息队列中的信息进行消费;
高可用集群架构
如上图所示,生产者有生产者集群,消费者有消费者集群,Name Server也就Name Server集群。Broker集群一般采用多主多从的机制。一组主从节点的broker名称相同,其中master节点的brokerId为0,从节点的brokerId大于0;从节点只负责读的工作
当消费者读取消息时,出现master节点故障,broker集群会立即把读取的目标转到slave节点中,保证消费的高可用;
生产者发送消息时,即创建topic的时候,把topic中的多个messagequeue创建在多个broker组上,如果其中一个主节点挂了,还有一个主节点提供服务,此时保证生产者的高可用;
主从复制
主从复制指的是broker集群中的主从节点的数据复制,包括同步复制和异步复制。同步复制牺牲了一部分的性能但是数据可靠性高基本不会都丢失数据;
生产环境中,建议刷盘方式配置成异步SYNC_FLUSH,保证一个吞吐量,然后主从复制使用同步复制ASYNC_FLUSH组合来保证数据的安全性;
四、消息投递与消费
负载均衡
生产者负载均衡
生产者在发送消息的时候,要根据topic的路由来完成消息的投递,投递的目的地是broker。一个topic关联了多个messageQueue,客户端正常会轮询的方式依次投递到不同的消息队列中,由于消息队列配置在不同的broker上,这样就完成了消息投递的负载均衡,我们看下官网中给出的解释:
Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。
具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550L ms,就退避30000L ms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。
消费者负载均衡
在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端知道从Broker端的哪一个消息队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。
在Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量中;
在Consumer实例的启动后,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。这个线程会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。
集群模式
1)获取该Topic主题下的消息消费队列集合;
2)向Broker端发送获取该消费组下消费者Id列表;
3)先对Topic下的消息消费队列、消费者Id排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。
广播模式
Consumer实例把所有的消息队列中的消息都拉取过来
五、事务消息
Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。
事务消息流程概要
分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程
1.事务消息发送及提交:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
事务消息设计
我们看到上图中有一个halfmsg,这就是事务消息的关键;
1、事务消息在一阶段对用户不可见
如果消息是half消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故消费端无法消费half类型的消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
2、Commit和Rollback操作以及Op消息的引入
在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。
Rollback
本身一阶段的消息对用户就是不可见的, 也就是无法消费的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的);但还是需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。
OP消息
RocketMQ将Op消息写入到全局一个特定的Topic中,这个Topic是一个内部的Topic(像Half消息的Topic一样),不会被用户消费。Op消息的内容为对应的Half消息的存储的Offset,这样通过Op消息能索引到Half消息进行后续的回查操作。
Commit
在执行二阶段Commit操作时,需要构建出Half消息的索引。一阶段的Half消息由于是写到一个特殊的Topic,所以二阶段构建索引时需要读取出Half消息,并将Topic和Queue替换成真正的目标的Topic和Queue,之后通过一次普通消息的写入操作来生成一条对用户可见的消息。所以RocketMQ事务消息二阶段其实是利用了一阶段存储的消息的内容,在二阶段时恢复出一条完整的普通消息,然后走一遍消息写入流程。
六、消息消费幂等性
必要性
Rocketmq中的消息是会出现重复的,主要有以下三个内容
1、发送时消息重复:生产者在成功发送了消息之后,此时由于网络抖动等问题造成服务端没有对客户端应答失败,故而生产者会重复投递一个messageId相同的消息;
2、投递时消息重复:消费者成功消费之后,由于网络抖动等问题造成客户端没有给服务端应答,此时服务端基于投递策略(比如至少成功一次),会再次尝试投递之前已经成功的消息;
3、扩容时rebalance造成的消息重复投递:当客户端重启,扩缩容时会造成rebanlance就是重新负载均衡,也会造成消息的重复投递;
解决方案
首先不建议使用messageId来做幂等性检查,但是rocketmq不保证默认消息id的唯一性;
通常在消息中设置一个业务key,在消费者端通过判定业务key是否已经消费过了来判定幂等性;这个业务key可以通过数据库来存储,也可以通过redis等缓存工具来存储;
参考:
3.RocketMQ消息存储结构_哔哩哔哩_bilibili
分布式事务-阿里云MQ事务消息踩坑记录_localtransactionchecker-CSDN博客