RocketMQ
一 引言
Message Queue(消息 队列),从字⾯上理解:⾸先它是⼀个队列。先进先出的数据结构——队列。消息队列就是所谓的存放消息的队列。
消息队列解决的不是存放消息的队列的⽬的,解决的是通信问题;⽐如以电商订单系统为例,如果各服务之间使⽤同步通信,不仅耗时较久,且过程中受到⽹络波动的影响,不能保证⾼成功率。因此可以使⽤异步的通信⽅式对架构进⾏改造;使⽤异步的通信⽅式对模块间的调⽤进⾏解耦,可以快速的提升系统的吞吐量。上游执⾏完消息的发送业务后⽴即获得结果,下游多个服务订阅到消息后各⾃消费。通过消息队列,屏蔽底层的通信协议,使得解藕和并⾏消费得以实现。
MQ的作用:异步、解耦、流量削峰
几种常见消息队列的对比
二 RocketMQ的基本概念
1 技术架构
RocketMQ架构上主要分为四部分,如上图所示:
Producer:消息发布的⻆⾊,⽀持分布式集群⽅式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进⾏消息投递,投递的过程⽀持快速失败并且低延迟
Consumer:消息消费的⻆⾊,⽀持分布式集群⽅式部署。⽀持以push推,pull拉两种模式对消息进⾏消费。同时也⽀持集群⽅式和⼴播⽅式的消费,它提供实时消息订阅机制,可以满⾜⼤多数⽤户的需求
NameServer:NameServer是⼀个⾮常简单的Topic路由注册中⼼,其⻆⾊类似Dubbo中的zookeeper,⽀持Broker的动态注册与发现。主要包括两个功能:
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据然后提供⼼跳检测机制,检查Broker是否还存活;
- 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和⽤于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从⽽进⾏消息的投递和消费。
NameServer通常也是集群的⽅式部署,各实例间相互不进⾏信息通讯。Broker是向每⼀台NameServer注册⾃⼰的路由信息,所以每⼀个NameServer实例上⾯都保存⼀份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息
BrokerServer:Broker主要负责消息的存储、投递和查询以及服务⾼可⽤保证,为了实现这些功能,Broker包含了以下⼏个重要⼦模块。
- Remoting Module:整个Broker的实体,负责处理来⾃clients端的请求。
- Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息Store Service:提供⽅便简单的API接⼝处理消息存储到物理硬盘和查询功能。
- HA Service:⾼可⽤服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
- Index Service:根据特定的Message key对投递到Broker的消息进⾏索引服务,以提供消息的快速查询。
2 部署架构
RocketMQ ⽹络部署特点
- NameServer是⼀个⼏乎⽆状态节点,可集群部署,节点之间⽆任何信息同步。
- Broker部署相对复杂,Broker分为Master与Slave,⼀个Master可以对应多个Slave,但是⼀个Slave只能对应⼀个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,⾮0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建⽴⻓连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上⽀持⼀Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
- Producer与NameServer集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建⽴⻓连接,且定时向Master发送⼼跳。Producer完全⽆状态,可集群部署。
- Consumer与NameServer集群中的其中⼀个节点(随机选择)建⽴⻓连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建⽴⻓连接,且定时向Master、Slave发送⼼跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最⼤偏移量的距离(判断是否读⽼消息,产⽣读I/O),以及从服务器是否可读等因素建议下⼀次是从Master还是Slave拉取。
结合部署架构图,描述集群⼯作流程:
- 启动NameServer,NameServer起来后监听端⼝,等待Broker、Producer、Consumer连上来,相当于⼀个路由控制中⼼。
- Broker启动,跟所有的NameServer保持⻓连接,定时发送⼼跳包。⼼跳包中包含当前Broker信息(IP+端⼝等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时⾃动创建Topic
- Producer发送消息,启动时先跟NameServer集群中的其中⼀台建⽴⻓连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择⼀个队列,然后与队列所在的Broker建⽴⻓连接从⽽向Broker发消息
- Consumer跟Producer类似,跟其中⼀台NameServer建⽴⻓连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建⽴连接通道,开始消费消息。
三 消息种类
1 简单消息
简单消息分成三种:同步消息、异步消息、单向消息
1.1 同步消息
⽣产者发送消息后,必须等待broker返回信息后才继续之后的业务逻辑,在broker返回信息之前,⽣产者阻塞等待。
同步消息的应⽤场景:如重要通知消息、短信通知、短信营销系统等。
1.2 异步消息
⽣产者发完消息后,不需要等待broker的回信,可以直接执⾏之后的业务逻辑。⽣产者提供⼀个回调函数供broker调⽤,体现了异步的⽅式。
异步传输⼀般⽤于响应时间敏感的业务场景。
1.3 单向消息
⽣产者发送完消息后不需要等待任何回复,直接进⾏之后的业务逻辑,单向传输
⽤于需要中等可靠性的情况,例如⽇志收集。
2 顺序消息
顺序消息指的是消费者消费消息的顺序按照发送者发送消息的顺序执⾏。顺序消息分成两种:局部顺序消息和全局顺序消息。
2.1 局部顺序消息
局部消息指的是消费者消费某个topic的某个队列中的消息是顺序的。
消费者使⽤MessageListenerOrderly类做消息监听,实现局部顺序。
2.2 全局顺序消息
消费者消费全部消息都是顺序的,只能通过某个topic只有⼀个队列才能实现,
这种应⽤场景较少,且性能较差。
2.3 乱序消费
消费者消费消息不需要关注消息的顺序。消费者使⽤MessageListenerConcurrently类做消息监听。
3 ⼴播消息
⼴播是向主题(topic)的所有订阅者发送消息。订阅同⼀个topic的多个消费者,能全量收到⽣产者发送的所有消息。
4 延迟消息
延迟消息与普通消息的不同之处在于,它们要等到指定的时间之后才会被传递。
延迟等级:RocketMQ设计了18个延迟等级,分别是
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1
等级3对应的是10s。系统为这18个等级配置了18个topic,⽤于实现延迟队列的效果。
在商业版RocketMQ中,不仅可以设置延迟等级,还可以设置具体的延迟时间,但是在社区版RocketMQ中,只能设置延迟等级。
5 批量消息
批量发送消息提⾼了传递⼩消息的性能。
官⽅建议批量消息的总⼤⼩不应超过1m,实际不应超过4m。如果超过4m的批量消息需要进⾏分批处理,同时设置broker的配置参数为4m(在broker的配置⽂件中修改: maxMessageSize=4194304 )
使⽤限制:同⼀批次的消息应该具有相同的主题、相同的 waitStoreMsgOK、并且不⽀持延迟消息和事务消息。
6 过滤消息
在⼤多数情况下,标签是⼀种简单⽽有⽤的设计,可以⽤来选择您想要的消息。
消费者将收到包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是⼀条消息只能有⼀个标签,这可能不适⽤于复杂的场景。在这种情况下,您可以使⽤ SQL 表达式来过滤掉消息。
使⽤限制:只有推模式的消费者可以使⽤SQL过滤。拉模式是⽤不了的
使⽤SQL过滤
SQL 功能可以通过您在发送消息时输⼊的属性进⾏⼀些计算。在 RocketMQ 定义的语法下,可以实现⼀些有趣的逻辑。这是⼀个例⼦:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
RocketMQ 只定义了⼀些基本的语法来⽀持这个特性,也可以轻松扩展它,语法如下:
- 数值⽐较,如
>
,>=
,<
,<=
,BETWEEN
,=
;- 字符⽐较,如
=
,<>
,IN
;IS NULL
或IS NOT NULL
;- 逻辑
AND
,OR
,NOT
;常量类型有:
- 数字,如 123、3.1415;
- 字符,如’abc’,必须⽤单引号;
NULL
,特殊常数;- 布尔值,
TRUE
或FALSE
;
7 事务消息
事务消息的定义:它可以被认为是⼀个两阶段的提交消息实现,以确保分布式系统的最终⼀致性。事务性消息确保本地事务的执⾏和消息的发送可以原⼦地执⾏。
事务消息有三种状态:
- TransactionStatus.CommitTransaction:提交事务,表示允许消费者消费该消息。
- TransactionStatus.RollbackTransaction:回滚事务,表示该消息将被删除,不允许消费。
- TransactionStatus.Unknown:中间状态,表示需要MQ回查才能确定状态
事务消息的实现流程
使⽤限制
- 事务性消息没有调度和批处理⽀持
- 为避免单条消息被检查次数过多,导致半队列消息堆积,我们默认将单条消息的检查次数限制为15次,但⽤户可以通过更改“transactionCheckMax”来更改此限制”参数在broker的配置中,如果⼀条消息的检查次数超过“transactionCheckMax”次,broker默认会丢弃这条消息,同时打印错误⽇志。⽤户可以通过重写“AbstractTransactionCheckListener”类来改变这种⾏为
- 事务消息将在⼀定时间后检查,该时间由代理配置中的参数“transactionTimeout”确定。并且⽤户也可以在发送事务消息时通过设置⽤户属性“CHECK_IMMUNITY_TIME_IN_SECONDS”来改变这个限制,这个参数优先于“transactionMsgTimeout”参数。
- ⼀个事务性消息可能会被检查或消费不⽌⼀次
- 提交给⽤户⽬标主题的消息reput可能会失败。⽬前,它取决于⽇志记录。⾼可⽤是由 RocketMQ 本身的⾼可⽤机制来保证的。如果要保证事务消息不丢失,保证事务完整性,推荐使⽤同步双写机制。
- 事务性消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务性消息允许向后查询。MQ 服务器通过其⽣产者 ID 查询客户端。