概念
Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件。
支持Broker和Consumer端消息过滤,支持发布订阅模型和点对点,支持拉pull和推push两种消息模式,单一队列百万消息、亿级消息堆积,支持单master节点,多master节点,多master多slave节点,任意一点都是高可用,水平拓展,Producer、Consumer、队列都可以分布式,消息失败重试机制、支持特定level的定时消息,新版本底层采用Netty,4.3.x支持分布式事务,适合金融类业务,高可用性跟踪和审计功能。
Broker
服务器上部署的RocketMq进程一般称之为Broker
,Broker会接收Producer的消息,持久化到本地,然后push给Consumer,通常使用集群部署。主从之间会有数据同步
NameServer
路由服务,类似与dubbo中的注册中心zk,它存储了Broker的路由信息,供Producer和Consumer使用,不然Producer怎么知道往哪个Broker发送消息。多个NameSever之间没有通信,每个NameSever都会保存所有路由信息。
Producer
生产者,即发送消息的一方,往Broker中写入数据。Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息。 再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。在Broker端也会每10秒扫描一次当前注册的Producer,如果发现某个Producer超过2分钟都没有发心跳,则断开连接。Producer完全无状态,可集群部署。
Consumer
消费者,即消费消息的一方,从Broker中获取数据。Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息。 Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳,就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡。 Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
Topic
Topic
翻译过来就是主题的意思,但它其实是个抽象概念,我们可以理解成数据集合
,比如订单系统有一个Topic叫topic_order_info
,这个Topic里面就是订单系统投递的订单信息,如果其他系统想要获取订单信息,就可以从这个Topic中获取。
MessageQueue
MessageQueue即消息队列,在创建Topic的时候会让我们指定MessageQueue的数量,简单来说就是指定Topic中的队列数量。 那么MessageQueue到底是什么呢?这个问题要和Topic、Broker一起来看,大家想一想Topic在Broker中是如何存储的?要知道Broker是集群部署的,如果我们有2个Broker,那Topic中的数据哪些存储在这个Broker,哪些存储在另一个Broker呢?所以RocketMq引入了MessageQueue的概念,本质上是一个数据分片机制。 比如一个Topic指定了4个MessageQueue,该Topic有1W个消息,那么这1W个消息会均匀分配在4个MessageQueue中(实际是根据分配策略),而这4个MessageQueue又是放在Broker上的,一个Broker上存储2个MessageQueue。
死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明Consumer在正常情况下无法正确地消费该消息。此时,消息队列RocketMQ不会立刻将消息丢弃,而是将这条消息发送到该Consumer对应的特殊队列中。
消息队列RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
应用场景
-
削峰填谷:诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ可提供削峰填谷的服务来解决该问题。
-
异步解耦:交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ可实现异步通信和应用解耦,确保主站业务的连续性。
-
顺序收发:细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ提供的顺序消息即保证消息FIFO。
-
分布式事务一致性:交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
-
大数据分析:数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。
-
分布式缓存同步:天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ构建分布式缓存,实时通知商品数据的变化。
普通消息处理
如上所述,注册系统和邮件通知系统之间通过消息队列进行异步处理。注册系统将注册信息写入注册系统之后,发送一条注册成功的消息到消息队列RocketMQ,邮件通知系统订阅消息队列RocketMQ的注册消息,做相应的业务处理,发送注册成功或者失败的邮件。
流程说明如下:
- 注册系统发起注册。
- 注册系统向消息队列RocketMQ发送注册消息成功与否的消息。
2.1. 消息发送成功,进入3。
2.2. 消息发送失败,导致邮件通知系统未收到消息队列RocketMQ发送的注册成功与否的消息,而无法发送邮件,最终邮件通知系统和注册系统之间的状态数据不一致。 - 邮件通知系统收到消息队列RocketMQ的注册成功消息。
- 邮件通知系统发送注册成功邮件给用户。
在这样的情况下,虽然实现了系统间的解耦,上游系统不需要关心下游系统的业务处理结果;但是数据一致性不好处理,如何保证邮件通知系统状态与注册系统状态的最终一致。
事务消息处理
此时,需要利用消息队列RocketMQ所提供的事务消息来实现系统间的状态数据一致性
流程说明如下:
-
注册系统向消息队列RocketMQ发送半事务消息。
1.1. 半事务消息发送成功,进入2。
1.2. 半事务消息发送失败,注册系统不进行注册,流程结束。(最终注册系统与邮件通知系统数据一致) -
注册系统开始注册。
2.1. 注册成功,进入3.1。
2.2. 注册失败,进入3.2。 -
注册系统向消息队列RocketMQ发送半消息状态。
3.1. 提交半事务消息,产生注册成功消息,进入4。
3.2. 回滚半事务消息,未产生注册成功消息,流程结束。
说明 最终注册系统与邮件通知系统数据一致。 -
邮件通知系统接收消息队列RocketMQ的注册成功消息。
-
邮件通知系统发送注册成功邮件。(最终注册系统与邮件通知系统数据一致)
关于分布式事务消息的更多详细内容,请参见事务消息。
消息的顺序收发
消息队列RocketMQ顺序消息分为两种情况:
- 全局顺序:对于指定的一个Topic,所有消息将按照严格的先入先出(FIFO)的顺序,进行顺序发布和顺序消费。
- 分区顺序:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区,同一个分区内的消息将按照严格的FIFO的顺序,进行顺序发布和顺序消费,可以保证一个消息被一个进程消费。
在注册场景中,可使用用户ID作为Sharding Key来进行分区,同一个分区下的新建、更新或删除注册信息的消息必须按照FIFO的顺序发布和消费。
削峰填谷
流量削峰也是消息队列RocketMQ的常用场景,一般在秒杀或团队抢购活动中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列RocketMQ。
秒杀处理流程如下所述:
- 用户发起海量秒杀请求到秒杀业务处理系统。
- 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列RocketMQ。
- 下游的通知系统订阅消息队列RocketMQ的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
- 用户收到秒杀成功的通知。
开发实例:
引入依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.3</version>
</dependency>
配置参数:
rocketmq.address=127.0.0.1:9876
rocketmq.producer.groupName=groupName
rocketmq.producer.sendMsgTimeout=10000
rocketmq.producer.retryWhenSendFailed=3
未完待续
参考:为什么选择RocketMQ | RocketMQ RocketMQ 简介-阿里云开发者社区