本文尝试从Apache RocketMQ的简介、主要组件及其作用、3种部署模式、Controller集群模式工作流程、最佳实践等方面对其进行详细分析。希望对您有所帮助!
一、Apache RocketMQ 简介
Apache RocketMQ 是一个开源的分布式消息中间件,由阿里巴巴集团开发并贡献给 Apache 软件基金会。它旨在提供高吞吐量、低延迟和高可靠性的消息传递和流处理服务。广泛应用于金融、互联网、物联网等领域,支持多种应用场景。
核心特性
1. 高性能
- 高吞吐量:支持每秒处理数百万条消息,适用于高并发的业务场景。
- 低延迟:具备低于毫秒级的延迟,确保消息能够快速传递。
2. 高可靠性
- 数据一致性:通过消息确认和重新传递机制,确保消息的可靠传递。
- 消息持久化:支持消息持久化,保证在系统故障时消息不丢失。
3. 可扩展性
- 水平扩展:支持通过增加节点的方式水平扩展,满足业务增长需求。
- 弹性伸缩:根据业务负载动态调整资源,提升资源利用效率。
4. 灵活性
- 多种消息模型:支持点对点、发布/订阅、流处理等多种消息传递模型,满足不同应用需求。
- 丰富的接口:提供Java、C++、Python等多种语言的客户端接口,便于集成和使用。
关键功能和支持特性
- Producer(生产者):负责发送消息到 RocketMQ 集群。生产者可以指定消息的主题(Topic)和标签(Tag),便于消息分类和筛选。
- Consumer(消费者):负责从 RocketMQ 集群接收和处理消息。消费者分为集群消费和广播消费两种模式。
- NameServer(命名服务器):提供轻量级的路由服务,存储生产者和消费者与 Broker 之间的路由信息。支持动态扩展,确保系统的高可用性。
- Broker(代理服务器):负责存储和转发消息。Broker 将消息存储在磁盘上,并根据需要将消息传递给消费者。支持主从架构,提升系统的容错能力和数据可靠性。
- 事务消息:支持分布式事务消息,确保在分布式系统中实现最终一致性。事务消息可以确保消息在事务成功时被发送,事务失败时被回滚。
- 顺序消息:支持顺序消息传递,确保消息按照发送顺序被消费。
- 定时消息和延时消息:支持消息定时发送和延时发送,满足特定的业务需求。
- 批量消息:支持批量消息发送,提高消息传递效率。
- 消息过滤:支持基于标签的消息过滤,消费者可以根据标签选择性地接收消息。
- 消息重试:支持消息重试机制,确保消息在消费失败时可以重新消费。
- 死信队列(DLQ):支持死信队列,确保处理失败的消息不会丢失。
- 流量控制:支持流量控制,防止系统过载。
- 分布式架构:支持多数据中心部署和跨地域部署,提升系统的高可用性和容灾能力。
- 监控和管理:提供完善的监控和管理工具,支持消息统计、系统监控和运维管理。
- 安全性:支持基于权限的访问控制,确保消息传递的安全性。
使用场景
- 异步通信:在分布式系统中实现异步消息传递,解耦系统组件,提升系统的响应速度和可靠性。
- 事件驱动架构:构建基于事件的系统,通过消息队列实现事件驱动的业务逻辑,提升系统的灵活性和可维护性。
- 日志收集:集中收集和处理分布式系统中的日志信息,实现统一的日志管理和分析。
- 流处理:实时处理和分析数据流,支持大数据分析、实时监控等应用场景。
优势
- 高性能、高吞吐量:适用于大规模、高并发的业务场景。
- 强一致性:通过严格的消息确认机制,确保数据一致性。
- 灵活的扩展性:支持弹性伸缩,能够适应业务的快速变化。
- 多语言支持:提供多种语言的客户端接口,便于开发和集成。
Apache RocketMQ 以其高性能、高可靠性和灵活性,成为众多企业实现分布式消息传递和流处理的首选解决方案。
二、RocketMQ 主要组件及其作用
Apache RocketMQ 主要组件及其作用如下:
1. NameServer(命名服务器)
作用:提供轻量级的路由服务,存储生产者和消费者与 Broker 之间的路由信息。
- 路由管理:维护 Broker 的地址列表和路由信息,供生产者和消费者查询。
- 动态注册:Broker 启动时向 NameServer 注册,定期发送心跳以保持连接。
- 高可用性:可以部署多个 NameServer,实现高可用。
2. Broker(代理服务器)
作用:负责存储和转发消息,是 RocketMQ 的核心组件。
- 消息存储:持久化存储消息,确保数据的可靠性和持久性。
- 消息转发:将消息从生产者传递到消费者。
- 主从架构:支持主从模式,主 Broker 负责消息处理,从 Broker 备份数据,提供容错能力。
- 消息索引:通过建立消息索引,提高消息检索效率。
3. Producer(生产者)
作用:负责发送消息到 RocketMQ 集群。
- 消息生成:创建并发送消息到指定的主题(Topic)。
- 异步发送:支持异步消息发送,提高发送性能。
- 路由获取:通过 NameServer 获取 Broker 路由信息,将消息发送到合适的 Broker。
4. Consumer(消费者)
作用:负责从 RocketMQ 集群接收和处理消息。
- 消息消费:从指定的主题(Topic)接收并处理消息。
- 消费模式:支持集群消费和广播消费两种模式。
- 集群消费:同一消费组内的多个消费者负载均衡地处理消息,每条消息只会被一个消费者处理一次。
- 广播消费:每个消费者都会处理所有的消息,每条消息会被所有消费者处理一次。
- 消息过滤:可以根据标签(Tag)进行消息过滤,选择性接收和处理消息。
5. Controller(控制器)
作用:管理主从 Broker 的自动故障转移和高可用性。
- 事务管理:负责分布式事务的管理,确保消息在事务成功时被发送,事务失败时被回滚。
- 状态监控:监控 Broker 的状态,在主 Broker 发生故障时自动将从 Broker 提升为新的主 Broker。
- 协调主从:在 Broker 间协调主从关系,确保系统的高可用性和数据一致性。
6. RocketMQ Console(管理控制台)
作用:提供可视化的运维管理工具。
- 集群监控:实时监控 RocketMQ 集群的运行状态,包括 Broker、Topic、Consumer 等信息。
- 消息查询:支持消息的精确查询和模糊查询,方便运维人员排查问题。
- 配置管理:提供 Topic、Consumer Group、Broker 配置的管理功能。
7. Store(消息存储)
作用:实现消息的持久化存储。
- CommitLog:存储所有消息的物理文件,按顺序写入,支持快速写入操作。
- ConsumeQueue:消息的逻辑队列,记录消息在 CommitLog 中的位置,便于消费者快速检索。
- IndexFile:消息索引文件,通过消息的属性(如消息键)建立索引,支持快速查询。
8. Client(客户端)
作用:包含 Producer 和 Consumer 的客户端 SDK。
- API 提供:提供 Java、C++、Python 等多种语言的客户端接口,便于集成和使用。
- 消息操作:支持消息的发送、接收、过滤、批处理等操作。
总结
最新版本的 Apache RocketMQ 通过这些组件的协同工作,提供了高性能、高可靠性和高可扩展性的分布式消息传递和流处理服务。各个组件分工明确,确保系统能够高效、稳定地运行,并满足各种复杂业务场景的需求。
三、RocketMQ 部署方式介绍
Apache RocketMQ 5.0 版本完成基本消息收发,包括 NameServer、Broker、Proxy 组件。 在 5.0 版本中 Proxy 和 Broker 根据实际诉求可以分为 Local 模式和 Cluster 模式,一般情况下如果没有特殊需求,或者遵循从早期版本平滑升级的思路,可以选用Local模式。
Local 模式
- 在 Local 模式下,Broker 和 Proxy 是同进程部署,只是在原有 Broker 的配置基础上新增 Proxy 的简易配置就可以运行。
Cluster 模式
- 在 Cluster 模式下,Broker 和 Proxy 分别部署,即在原有的集群基础上,额外再部署 Proxy 即可。
主备自动切换模式
- 主备自动切换模式部署方式下,主要增加支持自动主从切换的 Controller 组件,它可以独立部署也可以内嵌在 NameServer 中。
下文分别介绍三种部署方式:
四、Local模式部署
由于 Local 模式下 Proxy 和 Broker 是同进程部署,Proxy本身无状态,因此主要的集群配置仍然以 Broker 为基础进行即可。
启动 NameServer
NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:
### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动Broker+Proxy
单组节点单副本模式
警告
这种方式风险较大,因为 Broker 只有一个节点,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用, 可以用于本地测试。
启动 Broker+Proxy
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### 验证Broker 是否启动成功,例如Broker的IP为:192.168.1.2,且名称为broker-a
$ tail -f ~/logs/rocketmqlogs/broker_default.log
The broker[xxx, 192.169.1.2:10911] boot success...
多组节点(集群)单副本模式
一个集群内全部部署 Master 角色,不部署Slave 副本,例如2个Master或者3个Master,这种模式的优缺点如下:
-
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
-
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
启动Broker+Proxy集群
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties --enable-proxy &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties --enable-proxy &
...
备注
如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n
后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876
。
多节点(集群)多副本模式-异步复制
每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
-
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
-
缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
启动Broker+Proxy集群
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties --enable-proxy &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties --enable-proxy &
### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties --enable-proxy &
### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties --enable-proxy &
多节点(集群)多副本模式-同步双写
每个Master配置一个Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
-
优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
-
缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
启动 Broker+Proxy 集群
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties --enable-proxy &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties --enable-proxy &
### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties --enable-proxy &
### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties --enable-proxy &
提示
以上 Broker 与 Slave 配对是通过指定相同的 BrokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量。
5.0 HA新模式
提供更具灵活性的HA机制,让用户更好的平衡成本、服务可用性、数据可靠性,同时支持业务消息和流存储的场景。详见
五、Cluster模式部署
在 Cluster 模式下,Broker 与 Proxy分别部署,我可以在 NameServer和 Broker都启动完成之后再部署 Proxy。
在 Cluster模式下,一个 Proxy集群和 Broker集群为一一对应的关系,可以在 Proxy的配置文件 rmq-proxy.json
中使用 rocketMQClusterName
进行配置
启动 NameServer
### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
单组节点单副本模式
警告
这种方式风险较大,因为 Broker 只有一个节点,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用, 可以用于本地测试。
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 &
多组节点(集群)单副本模式
一个集群内全部部署 Master 角色,不部署Slave 副本,例如2个Master或者3个Master,这种模式的优缺点如下:
-
优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
-
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
...
如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n
后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876
。
多节点(集群)多副本模式-异步复制
每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
-
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
-
缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
多节点(集群)多副本模式-同步双写
每个Master配置一个Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
-
优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
-
缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
提示
以上 Broker 与 Slave 配对是通过指定相同的 BrokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量。
5.0 HA新模式
提供更具灵活性的HA机制,让用户更好的平衡成本、服务可用性、数据可靠性,同时支持业务消息和流存储的场景。详见
启动 Proxy
可以在多台机器启动多个Proxy
### 在机器A,启动第一个Proxy,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 &
### 在机器B,启动第二个Proxy,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 &
### 在机器C,启动第三个Proxy,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 &
若需要指定配置文件,可以使用 -pc
或者 --proxyConfigPath
进行指定
### 自定义配置文件
$ nohup sh bin/mqproxy -n 192.168.1.1:9876 -pc /path/to/proxyConfig.json &
六、主备自动切换模式部署
如何部署支持自动主从切换的 RocketMQ 集群? 其架构如上图所示,主要增加支持自动主从切换的 Controller 组件,其可以独立部署也可以内嵌在 NameServer 中。
Controller 部署
Controller 组件提供选主能力,若需要保证 Controller 具备容错能力,Controller 部署需要三副本及以上(遵循 Raft 的多数派协议)。
注意
Controller 若只部署单副本也能完成 Broker Failover,但若该单点 Controller 故障,会影响切换能力,但不会影响存量集群的正常收发。
Controller 部署有两种方式。一种是嵌入于 NameServer 进行部署,可以通过配置 enableControllerInNamesrv 打开(可以选择性打开,并不强制要求每一台 NameServer 都打开),在该模式下,NameServer 本身能力仍然是无状态的,也就是内嵌模式下若 NameServer 挂掉多数派,只影响切换能力,不影响原来路由获取等功能。另一种是独立部署,需要单独部署 Controller 组件。
Controller 嵌入 NameServer 部署
嵌入 NameServer 部署时只需要在 NameServer 的配置文件中设置 enableControllerInNamesrv=true,并填上 Controller 的配置即可。
enableControllerInNamesrv = true
controllerDLegerGroup = group1
controllerDLegerPeers = n0-127.0.0.1:9877;n1-127.0.0.1:9878;n2-127.0.0.1:9879
controllerDLegerSelfId = n0
controllerStorePath = /home/admin/DledgerController
enableElectUncleanMaster = false
notifyBrokerRoleChanged = true
参数解释:
- enableControllerInNamesrv:Nameserver 中是否开启 controller,默认 false。
- controllerDLegerGroup:DLedger Raft Group 的名字,同一个 DLedger Raft Group 保持一致即可。
- controllerDLegerPeers:DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致。
- controllerDLegerSelfId:节点 id,必须属于 controllerDLegerPeers 中的一个;同 Group 内各个节点要唯一。
- controllerStorePath:controller 日志存储位置。controller 是有状态的,controller 重启或宕机需要依靠日志来恢复数据,该目录非常重要,不可以轻易删除。
- enableElectUncleanMaster:是否可以从 SyncStateSet 以外选举 Master,若为 true,可能会选取数据落后的副本作为 Master 而丢失消息,默认为 false。
- notifyBrokerRoleChanged:当 Broker 副本组上角色发生变化时是否主动通知,默认为 true。
参数设置完成后,指定配置文件启动 Nameserver 即可。
$ nohup sh bin/mqnamesrv -c namesrv.conf &
Controller 独立部署
独立部署执行以下脚本即可
$ nohup sh bin/mqcontroller -c controller.conf &
mqcontroller 脚本在源码包 distribution/bin/mqcontroller,配置参数与内嵌模式相同。
注意
独立部署Controller后,仍然需要单独部署NameServer提供路由发现能力
Broker 部署
Broker 启动方法与之前相同,增加以下参数
- enableControllerMode:Broker controller 模式的总开关,只有该值为 true,自动主从切换模式才会打开。默认为 false。
- controllerAddr:controller 的地址,多个 controller 中间用分号隔开。例如
controllerAddr = 127.0.0.1:9877;127.0.0.1:9878;127.0.0.1:9879
- syncBrokerMetadataPeriod:向 controller 同步 Broker 副本信息的时间间隔。默认 5000(5s)。
- checkSyncStateSetPeriod:检查 SyncStateSet 的时间间隔,检查 SyncStateSet 可能会 shrink SyncState。默认5000(5s)。
- syncControllerMetadataPeriod:同步 controller 元数据的时间间隔,主要是获取 active controller 的地址。默认10000(10s)。
- haMaxTimeSlaveNotCatchup:表示 Slave 没有跟上 Master 的最大时间间隔,若在 SyncStateSet 中的 slave 超过该时间间隔会将其从 SyncStateSet 移除。默认为 15000(15s)。
- storePathEpochFile:存储 epoch 文件的位置。epoch 文件非常重要,不可以随意删除。默认在 store 目录下。
- allAckInSyncStateSet:若该值为 true,则一条消息需要复制到 SyncStateSet 中的每一个副本才会向客户端返回成功,可以保证消息不丢失。默认为 false。
- syncFromLastFile:若 slave 是空盘启动,是否从最后一个文件进行复制。默认为 false。
- asyncLearner:若该值为 true,则该副本不会进入 SyncStateSet,也就是不会被选举成 Master,而是一直作为一个 learner 副本进行异步复制。默认为false。
- inSyncReplicas:需保持同步的副本组数量,默认为1,allAckInSyncStateSet=true 时该参数无效。
- minInSyncReplicas:最小需保持同步的副本组数量,若 SyncStateSet 中副本个数小于 minInSyncReplicas 则 putMessage 直接返回 PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH,默认为1。
在Controller模式下,Broker配置必须设置 enableControllerMode=true,并填写 controllerAddr,并以下面命令启动:
$ nohup sh bin/mqbroker -c broker.conf &
注意
自动主备切换模式下Broker无需指定brokerId和brokerRole,其由Controller组件进行分配
兼容性
该模式未对任何客户端层面 API 进行新增或修改,不存在客户端的兼容性问题。
Nameserver 本身能力未做任何修改,Nameserver 不存在兼容性问题。如开启 enableControllerInNamesrv 且 controller 参数配置正确,则开启 controller 功能。
Broker若设置 enableControllerMode=false,则仍然以之前方式运行。若设置 enableControllerMode=true,则需要部署 controller 且参数配置正确才能正常运行。
具体行为如下表所示:
旧版 Nameserver | 旧版 Nameserver+独立部署 Controller | 新版 Nameserver 开启 controller功能 | 新版 Nameserver 关闭 controller 功能 | |
---|---|---|---|---|
旧版 Broker | 正常运行,无法切换 | 正常运行,无法切换 | 正常运行,无法切换 | 正常运行,无法切换 |
新版 Broker 开启 Controller 模式 | 无法正常上线 | 正常运行,可以切换 | 正常运行,可以切换 | 无法正常上线 |
新版 Broker 不开启 Controller 模式 | 正常运行,无法切换 | 正常运行,无法切换 | 正常运行,无法切换 | 正常运行,无法切换 |
升级注意事项
从上述兼容性表述可以看出,NameServer 正常升级即可,无兼容性问题。在不想升级 Nameserver 情况,可以独立部署 Controller 组件来获得切换能力。
针对 Broker 升级,分为两种情况:
(1)Master-Slave 部署升级成 Controller 切换架构
可以带数据进行原地升级,对于每组 Broker,停机主、备 Broker,保证主、备的 CommitLog 对齐(可以在升级前禁写该组 Broker 一段时间,或则通过拷贝方式保证一致),升级包后重新启动即可。
注意
若主备 CommitLog 不对齐,需要保证主上线以后再上线备,否则可能会因为数据截断而丢失消息。
(2)原 DLedger 模式升级到 Controller 切换架构
由于原 DLedger 模式消息数据格式与 Master-Slave 下数据格式存在区别,不提供带数据原地升级的路径。在部署多组 Broker 的情况下,可以禁写某一组 Broker 一段时间(只要确认存量消息被全部消费即可,比如根据消息的保存时间来决定),然后清空 store 目录下除 config/topics.json、subscriptionGroup.json 下(保留 topic 和订阅关系的元数据)的其他文件后,进行空盘升级。
七、Rocketmq Controller集群模式的工作流程
在 RocketMQ 的集群模式下,Controller 负责管理多个 Broker 的元数据和协调它们的操作。下面是集群模式下的工作流程和 Mermaind 图示例。
工作流程
-
启动 Controller
- 启动多个 Controller 节点,形成一个高可用的 Controller 集群。
- Controller 节点通过 Raft 协议选举出一个 Leader,其他节点为 Follower。
-
启动 Broker
- Broker 启动时,会向 Controller 集群注册,获取元数据和配置信息。
- Broker 定期向 Controller 报告其状态和心跳信息。
-
元数据管理
- Controller 负责维护 Broker 的元数据,包括 Broker 地址、Topic 分配信息等。
- Controller 处理 Broker 的注册、注销和状态变更,并将这些变更通知给其他 Broker。
-
协调操作
- 当有新的 Topic 或者 Queue 需要创建时,客户端向 Controller 发送请求。
- Controller 负责选择合适的 Broker 并执行创建操作。
- Controller 负责处理 Broker 故障,重新分配 Topic 和 Queue。
-
数据同步
- 主 Broker 负责处理写请求,从 Broker 负责处理读请求。
- 主从 Broker 之间通过同步机制保持数据一致性。
Mermaind 图
以下是表示 RocketMQ 集群模式下 Controller 工作流程的 Mermaind 图示例:
这张图展示了 Controller 和 Broker 之间的交互流程,Controller 负责管理元数据、协调 Broker 操作,并在 Broker 之间分配任务和同步数据。
八、RocketMQ最佳实践
1. 集群部署
主备部署
- 多主多备:在生产环境中,推荐使用多主多备的部署方式。多个主节点(Master)和备份节点(Slave)可以确保在主节点发生故障时,备份节点能够快速接管,保证系统的高可用性。
- 跨机房部署:将 Broker 分布在不同的机房,以防止单个机房故障导致服务不可用。
NameServer 高可用
- 多 NameServer 部署:至少部署三个 NameServer 节点,以确保高可用性。NameServer 是无状态的,客户端可以随机选择一个 NameServer 进行连接。
- 定期检查:定期检查 NameServer 的状态,确保其处于正常工作状态。
Broker 分片
- 分片部署:将 Broker 分成多个分片,分布在不同的物理服务器上。这样即使某个 Broker 出现故障,也不会影响到整个系统的运行。
- 合理分配 Topic:将 Topic 分配到不同的分片中,避免单个分片承载过多的消息流量。
2. 消息存储
消息持久化
- 开启持久化:确保消息的持久化功能开启,防止因 Broker 异常重启导致的消息丢失。可以在 Broker 配置文件中设置
flushDiskType=SYNC_FLUSH
来开启同步刷盘。
存储路径
- 选择高性能磁盘:将消息存储路径设置在性能较好的 SSD 磁盘上,提升存储和读取效率。
- 分开存储日志和数据:将 CommitLog 和 ConsumerQueue 存储在不同的磁盘上,减少磁盘 I/O 的竞争。
清理过期消息
- 设置消息过期时间:根据业务需求,在 Broker 配置文件中设置消息过期时间(默认是72小时)。过期的消息会被定期清理,释放存储空间。
- 手动清理:在必要时,可以手动清理过期消息,确保存储空间的充足。
3. 性能优化
批量发送
- 批量发送配置:在 Producer 端配置批量发送,减少网络 I/O 次数。例如,可以通过
sendBatchMessage
方法发送批量消息,提高发送效率。 - 批量大小控制:合理设置批量消息的大小,避免单次发送的数据量过大,导致网络拥堵。
消费端并发
- 并发消费:在 Consumer 端设置合理的并发线程数,以提高消息的消费能力。可以通过
consumeThreadMin
和consumeThreadMax
参数来调整消费线程数。 - 顺序消费:对于需要保证顺序的消息,使用顺序消费模式(Orderly),避免消息乱序。
异步发送与回调
- 异步发送:使用异步发送方式,通过
sendAsync
方法发送消息,避免同步等待,提高发送性能。 - 回调处理:结合回调函数处理发送结果,及时处理发送失败的情况。
4. 消息重试和补偿
消息重试机制
- 消费重试:利用 RocketMQ 内置的重试机制,确保消息消费成功。可以通过
maxReconsumeTimes
参数设置最大重试次数。 - 重试间隔:合理设置重试间隔,避免频繁重试导致的资源浪费。
业务补偿机制
- 幂等性设计:在关键业务场景中,设计幂等性操作,确保多次处理同一消息不会产生副作用。
- 补偿事务:对于分布式事务场景,设计补偿机制,确保事务的一致性。
5. 监控和报警
系统监控
- RocketMQ Console:使用 RocketMQ Console 监控系统的运行状态,包括 Broker、Producer 和 Consumer 的状态。
- Prometheus 和 Grafana:将 RocketMQ 的监控数据导入 Prometheus,并使用 Grafana 可视化监控数据。
日志记录
- 详细日志:启用详细的日志记录功能,记录消息的发送、接收和消费的详细信息。可以通过调整
logLevel
参数设置日志级别。 - 日志轮转:定期轮转日志文件,避免日志文件过大,影响系统性能。
报警设置
- 阈值报警:设置合理的报警阈值,当系统指标(如消息堆积、发送失败率等)超过阈值时,触发报警。
- 邮件和短信通知:配置报警通知,通过邮件和短信及时通知运维人员,进行处理。
6. 安全性
权限控制
- ACL 配置:启用 RocketMQ 的访问控制列表(ACL)功能,对 Producer 和 Consumer 进行权限管理,防止非法访问。可以在配置文件中设置 ACL 规则。
- 用户认证:对接入 RocketMQ 的用户进行认证,确保只有合法用户才能访问消息系统。
数据加密
- 传输加密:对消息传输过程进行加密,使用 SSL/TLS 协议保护数据的安全。
- 存储加密:对敏感数据进行加密存储,确保数据在磁盘上的安全。
网络隔离
- 内网部署:将 RocketMQ 部署在内网环境中,通过防火墙和 VPN 等手段进行网络隔离,防止外部攻击。
- IP 白名单:配置 IP 白名单,只允许特定 IP 地址访问 RocketMQ 服务。
7. 灾备和容灾
跨地域部署
- 多地域部署:在多个地域部署 Broker 集群,确保在一个地域发生故障时,能够自动切换到其他地域继续提供服务。
- 数据同步:利用 RocketMQ 的数据同步功能,在不同地域之间进行数据同步,确保数据的一致性。
数据备份
- 定期备份:定期进行数据备份,确保在发生数据丢失时,能够快速恢复数据。可以通过定期备份 CommitLog 和 ConsumerQueue 数据来实现。
- 异地备份:将备份数据存储在异地,防止同一地点的灾难导致备份数据丢失。
总结
通过遵循以上详细的最佳实践,可以有效提升 RocketMQ 的稳定性、性能和安全性,确保消息系统能够高效、可靠地运行。结合具体的业务需求和系统环境,合理配置和优化 RocketMQ,可以实现最佳的使用效果。
九、Go语言实践RocketMQ样例
在 Go 语言中使用 RocketMQ,你需要使用 RocketMQ 的 Go 客户端库来进行消息的生产和消费。下面是一个简单的实践示例,包括如何配置 RocketMQ Go 客户端、发送消息和消费消息的步骤。
1. 安装 RocketMQ Go 客户端库
首先,你需要安装 RocketMQ 的 Go 客户端库。可以通过 Go 的包管理工具 go get
来安装:
go get github.com/apache/rocketmq-client-go/v2
2. 发送消息
以下是一个简单的 Go 语言程序,用于向 RocketMQ 发送消息:
package main
import (
"fmt"
"log"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
// 创建一个 RocketMQ 生产者实例
p, err := producer.NewProducer(producer.WithNameServer([]string{"localhost:9876"}))
if err != nil {
log.Fatalf("create producer failed: %s", err.Error())
}
// 启动生产者
err = p.Start()
if err != nil {
log.Fatalf("start producer failed: %s", err.Error())
}
defer p.Shutdown()
// 创建消息
msg := &primitive.Message{
Topic: "TestTopic",
Body: []byte("Hello RocketMQ!"),
}
// 发送消息
res, err := p.SendSync(context.Background(), msg)
if err != nil {
log.Fatalf("send message failed: %s", err.Error())
}
fmt.Printf("Send message success: %s\n", res.String())
}
3. 接收消息
以下是一个简单的 Go 语言程序,用于从 RocketMQ 消费消息:
package main
import (
"context"
"fmt"
"log"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
// 创建一个 RocketMQ 消费者实例
c, err := consumer.NewPushConsumer(
consumer.WithNameServer([]string{"localhost:9876"}),
consumer.WithGroupName("TestConsumerGroup"),
)
if err != nil {
log.Fatalf("create consumer failed: %s", err.Error())
}
// 定义消息处理函数
c.RegisterMessageListener(consumer.MessageListenerFunc(func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
fmt.Printf("Received message: %s\n", string(msg.Body))
}
return consumer.ConsumeSuccess, nil
}))
// 订阅主题
err = c.Subscribe("TestTopic", consumer.MessageSelector{}, nil)
if err != nil {
log.Fatalf("subscribe topic failed: %s", err.Error())
}
// 启动消费者
err = c.Start()
if err != nil {
log.Fatalf("start consumer failed: %s", err.Error())
}
defer c.Shutdown()
// 阻塞主线程,保持消费者运行
select {}
}
4. 运行示例
-
确保你已经启动了 RocketMQ 服务,并且
localhost:9876
是你的 NameServer 地址。 -
编译并运行发送消息程序:
go run producer.go
-
编译并运行接收消息程序:
go run consumer.go
-
你应该能看到发送的消息被消费者接收到并输出。
注意事项
- NameServer 地址:确保你在创建生产者和消费者时指定的 NameServer 地址是正确的,并且 RocketMQ 服务正在运行。
- 主题和消费者组:确保在 RocketMQ 中已经创建了相应的主题(Topic),并且消费者组(Consumer Group)设置正确。
- 错误处理:在生产环境中,你应该实现更详细的错误处理和日志记录,以便调试和监控。
以上就是如何在 Go 语言中实践 RocketMQ 的基本步骤。
十、RocketMQ历史演进
RocketMQ 是一个高性能、高可用的分布式消息中间件,最初由阿里巴巴开源,并在社区中逐渐发展和演进。以下是 RocketMQ 的主要历史演进过程:
1. 初始版本 (2012-2015)
-
2012年:RocketMQ 的前身被阿里巴巴开发为一个内部消息中间件,用于支持大规模的电商平台和业务系统。
-
2013年:阿里巴巴开始开源 RocketMQ,最初以阿里巴巴内部的需求和经验为基础开发。早期版本主要用于阿里巴巴内部系统,支持基本的消息队列功能。
-
2015年:RocketMQ 被正式开源,并成为 Apache 顶级项目。Apache RocketMQ 提供了更加成熟的消息中间件功能,满足了高吞吐量、低延迟和高可靠性的需求。
2. 成为 Apache 顶级项目 (2016-2019)
-
2016年:RocketMQ 成为 Apache 顶级项目,标志着其在开源社区中的认可和稳定性。Apache RocketMQ 提供了生产环境所需的多种特性,包括事务消息、消息过滤、消息顺序消费等。
-
2017年:引入了支持多种语言的客户端 SDK,包括 Java、C++、Python 和 Go 等。
-
2018年:引入了新的特性和改进,包括改进的消息存储机制、增强的高可用性和容错能力、以及更多的管理和监控工具。
-
2019年:发布了 RocketMQ 4.x 版本,引入了更高的性能优化和新特性,如消息轨迹(Message Track)、增强的客户端性能等。
3. 云原生和微服务支持 (2020-2023)
-
2020年:RocketMQ 5.0 版本发布,引入了云原生支持和对微服务架构的支持,包括对 Kubernetes 的支持、集成了 Prometheus 监控、增强的分布式事务能力等。RocketMQ 5.0 还引入了新的存储引擎和更加灵活的消息路由策略。
-
2021年:进一步改进了 RocketMQ 的高可用性、可扩展性和性能,提供了更多的集群管理工具和性能监控功能。
-
2022年:引入了 RocketMQ 5.1 版本,改进了消息顺序消费的可靠性和性能,同时增强了对云平台的支持,优化了与大数据平台的集成能力。
-
2023年:发布了 RocketMQ 5.2 版本,增加了新的功能和改进,包括更好的分布式事务支持、消息历史追踪功能和更高效的消息存储机制。
4. 未来发展
- 未来计划:RocketMQ 继续关注云原生架构的演进、微服务架构的支持以及与其他大数据和流处理平台的集成。未来的版本可能会引入更多对多租户和弹性伸缩的支持,以及对新的消息通信协议的支持。
总结
RocketMQ 从一个内部使用的消息中间件发展成为一个成熟的开源项目,经过不断的改进和演进,逐步成为 Apache 顶级项目,并支持现代的云原生和微服务架构。通过持续的开发和社区贡献,RocketMQ 不断适应新的技术需求和业务场景。
完。
希望对您有所帮助!关注锅总,及时获得更多花里胡哨的运维实用操作!
十一、一个秘密
锅总个人博客
https://gentlewok.blog.csdn.net/
锅总微信公众号