RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
「RocketMQ本质上的设计思路和Kafka类似」,但是和Kafka不同的是其使用Java进行开发,由于在国内的Java受众群体远远多于Scala、Erlang,所以RocketMQ是很多以Java语言为主的公司的首选。同样的RocketMQ和Kafka都是Apache基金会中的顶级项目,他们社区的活跃度都非常高,项目更新迭代也非常快。
RocketMQ是阿里review kafka的java版,如果消息性能要求高,用 RocketMQ 与 Kafka 可以更优
消息队列在实际应用中常用的使用场景,包含「应用解耦、异步处理、流量削锋、消息通讯、日志处理」等。
RocketMQ运行原理
上面是RocketMQ运行的一个大致流程图。
在对Apache RocketMQ进行深入探索的过程中,我们首先需要理解其核心组件的作用:
NameServer:充当注册中心的角色,主要任务是管理Broker节点.
Broker:它是RocketMQ系统的核心部分,主要负责消息的存储。
Producer:这是消息的生成者,它创建消息并将其写入Broker。
Consumer:作为消息的接收者,负责从Broker读取消息并进行处理。
下面,让我们一起深入了解RocketMQ的运行流程:
1. 首先,Broker在启动后会向根据其配置向NameServer注册。
2. NameServer作为注册中心,管理着我们的Broker集群信息以及Topic路由信息。例如,一个特定的Topic具有哪些Broker主机以及队列信息。
3. 接着,由特定的业务系统中的生产者(Producer)生成消息,并发送到Broker的主节点。
4. 在Broker节点中,这些消息将被保存到本地磁盘的CommitLog中,以确保消息不会丢失。
5. 接下来,主节点Broker将这些消息同步到从节点Broker,这样可以实现负载均衡并增强系统的鲁棒性。
6. 最后,业务系统中的消费者(Consumer)会从Broker中取出消息并进行处理,这就完成了数据的完整生命周期。
我们的Producer写入消息前需要先选择Broker,那Producer是如何选择Broker的呢?
上面提到,NameServer 在 RocketMQ 架构中起到了注册中心的作用,它负责管理所有的 Broker 节点。每当 Broker 启动后,它就会自动的注册到 NameServer 中,并且会每隔30秒向 NameServer 发送一次心跳,以证明它依然在运行。NameServer 则会每隔10秒检查一次各个 Broker 节点是否还在线,如果有 Broker 在120秒内未发送心跳,那么 NameServer 就会判断该 Broker 已经宕机,进而将其从注册列表中移除。
在业务系统中,Producer 在发送消息之前,会先从 NameServer 中拉取需要的 topic 路由信息,这些信息将包含目标 topic 各个 queue 的详细信息,以及各 queue 分别存储在哪个 Broker 节点上。Producer 会将这些信息缓存到本地,并依此信息,通过一种负载均衡算法,选择从哪个 queue 中读取数据,以及找到该 queue 对应的 Broker 节点。
那么,如果某个 Broker 在 Producer 准备写入数据的时候突然宕机了,又该如何处理呢?
RocketMQ 设计了一套故障探测与处理机制。如果某个 Broker 宕机了,那么 Producer 进行写入操作时将会失败,此时,它会发起重试操作,并从可用的 Broker 列表中重新选择一个进行写入。并且,为避免持续向故障节点写入数据,Producer 会采取一种称为"故障退避"的策略,即在一段时间内停止向该 Broker 发送数据。值得注意的是,Broker 的故障并不会立即被 Producer 和 NameServer 感知,这样做是为了降低 NameServer 处理逻辑的复杂性。当 Broker 宕机后,由于本地的 topic 路由缓存并未更新,Producer 仍可能尝试向故障的 Broker 发送数据,然后备受失败并重试。只有当 NameServer 在检查心跳时发现该 Broker 已宕机,并从注册列表中移除后,Producer 在刷新本地缓存时,才会真正地感知到该 Broker 的宕机。
当我们的Producer基于负载均衡选择了Broker节点,它的消息是如何写入的呢?
在深入理解 RocketMQ 的存储机制时,我们需要知道,Producer 在写入消息时,默认会优先写到操作系统管理的 pageCache,这个过程是异步的,只要消息被写入 pageCache,写入操作就被认为是成功的。这种异步的处理方式极大地提高了 RocketMQ 的写入效率。
当消息被写入 pageCache 后,将有一个后台线程异步地将这些消息从 pageCache 刷入到磁盘文件 CommitLog 中,CommitLog 是消息的实际存储位置。同时,还会有一个专门的线程负责将 CommitLog 中的消息位置(物理偏移量)写入到 ConsumeQueue 中。
那么客户端如何读取存储在 CommitLog 中的消息呢?
当 Consumer 端的消费者需要读取消息时,它会先到ConsumeQueue,然后根据在 ConsumeQueue 中存储的 offset 信息找到 CommitLog 中的实际数据进行读取。
这样的存储方法是否可以支持高并发模式的写入呢?
当系统面临大量同时写入和读取的请求时,可能会遇到一种情况,即大量的读取请求通过 ConsumeQueue 去找 CommitLog 中的数据,但是此时数据可能还在 pageCache 中并未完成异步写入。这时,系统会通过 CommitLog 和 PageCache 的映射,找到 pageCache 中的消息进行读取。也就是说,大量的读取和写入请求都对 pageCache 进行操作。但是当并发量过高时,可能会出现 "Broker busy" 的异常,这是因为在极高的并发场景下,持续大量的读写操作可能会对系统性能造成影响。
简而言之,RocketMQ 的存储机制旨在为高并发高效的读写提供支持,但是在一些极端情况下,仍然需要额外的优化措施以提高稳定性和性能。
当并发量非常高时,出现Broker busy异常了,如何解决?
RocketMQ 在面对高并发场景时,为了改善 "Broker busy" 异常和提高吞吐量,可以启用 transientStorePool 机制。这种机制的实现方式是,Broker 在写入消息时,将消息直接写入由 JVM 管理的 offheap 堆外内存,这样的设计能有效提升并发性能。
那么,为什么启用 transientStorePool 可以提高并发处理能力呢?
当开启该机制后,消息首先写入 JVM 的 offheap内存,然后异步刷新到 pageCache,最终由 pageCache 异步刷新到 CommitLog。大量的写请求将向 JVM 的 heap 内存进行,而大量的读请求仍然从 pageCache 进行,这种读写分离的机制极大地提高了 RocketMQ 的并发性能。
但是为什么 transientStorePool 机制不作为默认机制呢?
虽然 transientStorePool 能显著提升并发性能,但其也存在风险。当消息写入到 JVM 管理的 offheap 堆外内存后,如果 JVM 进程重启或者宕机,那些尚未被及时落盘的消息就会丢失。但如果采用默认的写入方法,即先写入操作系统管理的 pageCache,那么在 JVM 进程重启后,那些保存在 pageCache 中的信息不会丢失,只有当整个服务器宕机重启时,pageCache 中的消息才有可能丢失。因此,数据最安全的处理方式是,将其直接写入到 CommitLog。
总结:开启 transientStorePool 机制可以极大地提高 RocketMQ 的并发处理能力,然而这可能会带来数据的丢失。因此,它更适合那些并发处理能力要求高、且可以接受部分数据丢失的场景。
如果我们想要写入数据不丢失,应该怎么处理?
在设计与金融场景以及其他要求数据不能丢失的环境中,我们会采用同步方式将数据写入CommitLog。成功执行写入操作后才返回,确保了数据的完整性和安全性。只有在broker的物理存储设备出现故障的情况下,才有可能导致数据丢失。为了提供进一步提高数据的安全性,也可以通过多台服务器进行数据备份。
但值得注意的是, 尽管实现了对数据的安全性提升, 使用同步写入CommitLog方式会降低系统的性能到几个数量级。
Rocketmq如何支持分布式事务消息
场景
A(存在DB操作)、B(存在DB操作)两方需要保证分布式事务一致性,通过引入中间层MQ,
A和MQ保持事务一致性(异常情况下通过MQ反查A接口实现check),
B和MQ保证事务一致(通过重试),从而达到最终事务一致性。
「原理:大事务 = 小事务 + 异步」
「1.MQ与DB一致性原理(两方事务)」
流程图
上图是RocketMQ提供的保证MQ消息、DB事务一致性的方案。
「MQ消息、DB操作一致性方案:」
1)发送消息到MQ服务器,此时消息状态为SEND_OK。此消息为consumer不可见。
2)执行DB操作;DB执行成功Commit DB操作,DB执行失败Rollback DB操作。
3)如果DB执行成功,回复MQ服务器,将状态为COMMIT_MESSAGE;如果DB执行失败,回复MQ服务器,将状态改为ROLLBACK_MESSAGE。注意此过程有可能失败。
4)MQ内部提供一个名为“事务状态服务”的服务,此服务会检查事务消息的状态,如果发现消息未COMMIT,则通过Producer启动时注册的TransactionCheckListener来回调业务系统,业务系统在checkLocalTransactionState方法中检查DB事务状态,如果成功,则回复COMMIT_MESSAGE,否则回复ROLLBACK_MESSAGE。
「说明:」
上面以DB为例,其实此处可以是任何业务或者数据源。
以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE 均是client jar提供的状态,在MQ服务器内部是一个数字。
TransactionCheckListener 是在消息的commit或者rollback消息丢失的情况下才会回调(上图中灰色部分)。这种消息丢失只存在于断网或者RocketMQ集群挂了的情况下。当RocketMQ集群挂了,如果采用异步刷盘,存在1s内数据丢失风险,异步刷盘场景下保障事务没有意义。所以如果要核心业务用RocketMQ解决分布式事务问题,建议选择同步刷盘模式。
「2.多系统之间数据一致性(多方事务)」
当需要保证多方(超过2方)的分布式一致性,上面的两方事务一致性(通过RocketMQ的事务性消息解决)已经无法支持。这个时候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。
「以上图交易系统为例:」
1)交易系统创建订单(往DB插入一条记录),同时发送订单创建消息。通过RocketMQ事务性消息保证一致性
2)接着执行完成订单所需的同步核心RPC服务(非核心的系统通过监听MQ消息自行处理,处理结果不会影响交易状态)。执行成功更改订单状态,同时发送MQ消息。
3)交易系统接受自己发送的订单创建消息,通过定时调度系统创建延时回滚任务(或者使用RocketMQ的重试功能,设置第二次发送时间为定时任务的延迟创建时间。在非消息堵塞的情况下,消息第一次到达延迟为1ms左右,这时可能RPC还未执行完,订单状态还未设置为完成,第二次消费时间可以指定)。延迟任务先通过查询订单状态判断订单是否完成,完成则不创建回滚任务,否则创建。PS:多个RPC可以创建一个回滚任务,通过一个消费组接受一次消息就可以;也可以通过创建多个消费组,一个消息消费多次,每次消费创建一个RPC的回滚任务。回滚任务失败,通过MQ的重发来重试。
以上是交易系统和其他系统之间保持最终一致性的解决方案。
「3.案例分析」
「1) 单机环境下的事务示意图」
如下为A给B转账的例子。如下为A给B转账的例子。
以上过程在代码层面,甚至可以简化到在一个事物中,执行两条sql语句。
「2) 分布式环境下事务」
和单机事务不同,A、B账户可能不在同一个DB中,此时无法像在单机情况下使用事务来实现。
此时可以通过以下方式实现,将转账操作分成两个操作。
a) A账户
b) MQ消息
A账户数据发生变化时,发送MQ消息,MQ服务器将消息推送给转账系统,转账系统来给B账号加钱。
c) B账户
顺序消息
RocketMq有3种消息类型
-
普通消费
-
顺序消费
-
事务消费
顺序消费场景:在网购的时候,我们需要下单,那么下单需要假如有三个顺序:
第一:创建订单 第二:订单付款 第三:订单完成
也就是这三个环节要有顺序,这个订单才有意义,RocketMQ可以保证顺序消费。
「RocketMQ 实现顺序消费的原理」:produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息
「注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue」
「1. 顺序消息缺陷」
发送顺序消息,无法利用集群Fail Over特性,消费顺序消息的并行度依赖于队列数量队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题遇到消息失败的消息,无法跳过,当前队列消费暂停。
「2. 原理」
produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息。
「3. 扩展」
可以通过实现发送消息的队列选择器方法,实现部分顺序消息。
举例:比如一个数据库通过MQ来同步,只需要保证每个表的数据是同步的就可以。解析binlog,将表名作为队列选择器的参数,这样就可以保证每个表的数据到同一个对列里面,从而保证表数据的顺序消费