全面解剖 消息中间件 RocketMQ-(2)
一、RocketMQ – RocketMQ 各角色介绍
1、RocketMQ 各角色介绍
- Producer : 消息的发送者; 举例:发信者。
- Consumer : 消息接收者; 举例:收信者。
- Broker : 暂存和传输消息; 举例:邮局。
- NameServer : 管理 Broker; 举例:各个邮局的管理机构
- Topic : 区分消息的种类; 一个发送者可以发送消息给一个或者多个 Topic; 一个消息的接收者可以订阅一个或者多个 Topic 消息。
- Message Queue : 相当于是 Topic 的分区;用于并行发送和接收消息。
2、RocketMQ 各角色 示例图:
二、RocketMQ – RocketMQ 集群特点
1、NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
2、Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 Brokerld 来定义,Brokerld 为 0 表示 Master,非 0 表示 Slave。
3、Master 也可以部署多个。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
4、Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
5、Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
三、RocketMQ – RocketMQ 各种集群模式介绍
1、单 Master 模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
2、多 Master 模式
一个集群无 Slave,全是 Master,例如2个 Master 或者3个 Master,这种模式的优缺点如下:
-
优点 : 配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
-
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
3、多 Master 多 Slave 模式(异步)
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
-
优点 :
即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样; -
缺点 :
Master宕机,磁盘损坏情况下会丢失少量消息。
4、多 Master 多 Slave 模式(同步)
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优
缺点如下:
-
优点 :
数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高; -
缺点 :
性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
四、RocketMQ – 双主双从(2m-2s)集群介绍和工作流程说明
1、RocketMQ – 双主双从(2m-2s)集群介绍总体架构:消息高可用采用 2m-2s(同步双写)方式。
2、RocketMQ – 双主双从(2m-2s)集群工作流程说明
-
1)启动 NameServer,NameServer 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。
-
2)Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker信息(IP+端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
-
3)收发消息前,先创建 Topic,创建Topic时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
-
4) Producer 发送消息,启动时先跟 NameSErver 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker 发消息。
-
5) Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker上,然后直接跟 Broker 建立连接通道,开始消费消息。
五、RocketMQ – 集群搭建1
1、RocketMQ–集群搭建–服务器环境
序号 | IP | 角色 | 架构模式 |
---|---|---|---|
1 | 192.168.25.135 | nameserver、brokerserver | Master1、Slave2 |
2 | 192.168.25.138 | nameserver、brokerserver | Master2、Slave1 |
2、RocketMQ–集群搭建–Host 添加信息
(在两个虚拟机上都要添加配置:192.168.25.135 和 192.168.25.138)
vim /etc/hosts
配置如下:
# nameserver
192.168.25.135 rocketmq-nameserver1
192.168.25.138 rocketmq-nameserver2
# broker
192.168.25.135 rocketmq-master1
192.168.25.135 rocketmq-slave2
192.168.25.138 rocketmq-master2
192.168.25.138 rocketmq-slave1
# 配置完成后,重启网卡
systemctl restart network
3、RocketMQ–集群搭建–防火墙配置
(在两个虚拟机上都要添加配置:192.168.25.135 和 192.168.25.138)
1)宿主机需要远程访问虚拟机的 rocketmq 服务和 web 服务,需要开放相关端口号,简单粗暴的方式是直接关闭防火墙。
# 关闭防火墙
systemctl stop firewalld.service
# 查看防火墙状态
firewall -cmd --state
# 禁止 firewall 开机启动
sysemctl disable firewalld.service
2)或者为了安全,只开放特定的端口号,RocketMQ 默认使用3个端口:9876,10911,11011.如果防火墙没有关闭,那么防火墙就必须开放这几个端口。
- nameserver 默认使用 9876 端口
- master 默认使用 10911 端口
- slave 默认使用 11011 端口
执行以下命令
# 开放 nameserver 默认使用 9876 端口
firewall -cmd --remove-port=9876/tcp --permanent
# 开放 master 默认使用 10911 端口
firewall -cmd --remove-port=10911/tcp --permanent
# 开放 slave 默认使用 11011 端口
firewall -cmd --remove-port=11011/tcp --permanent
# 重启防火墙
firewall -cmd --reload
4、RocketMQ–集群搭建–环境变量配置
(在两个虚拟机上都要添加配置:192.168.25.135 和 192.168.25.138)
vim /etc/profile
# 在profile文件的末尾加入如下命令
#set rocketmq
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.5.1-bin-release
PATH=$PATH: $ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
# 输入: wq! 保存并退出,并使得配置立刻生效:
source /etc/profile
5、RocketMQ–集群搭建–创建消息存储路径
(在两个虚拟机上都要添加配置:192.168.25.135 和 192.168.25.138)
mkdir /usr/loca1/rocketmq/store
mkdir /usr/loca1/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
六、RocketMQ – 集群搭建2
1、RocketMQ–集群搭建2–broker 配置文件
修改 服务器 192.168.25.135 上的 broker 配置文件 配置 master1
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-a.properties
# 修改配置如下
# 所属集群名字
brokerclusterName=rocketmq-cluster
# broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示 Master, >0 表示 slave
brokerid=0
# nameserver 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autocreatesubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deletewhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
#- ASYNC MASTER异步复制Master
#- SYNC MASTER同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
#- ASYNC FLUSH 异步刷盘
#- SYNC FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
# checkTransactionMessageEnable=false
# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
2、RocketMQ–集群搭建2–broker 配置文件
修改 服务器 192.168.25.135 上的 broker 配置文件 配置 slave2
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-b-s.properties
# 修改配置如下
# 所属集群名字
brokerclusterName=rocketmq-cluster
# broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0 表示 Master, >0 表示 slave
brokerid=1
# nameserver 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autocreatesubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deletewhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
#- ASYNC MASTER异步复制Master
#- SYNC MASTER同步双写Master
#- SLAVE
brokerRole=SLAVE
# 刷盘方式
#- ASYNC FLUSH 异步刷盘
#- SYNC FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# checkTransactionMessageEnable=false
# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
3、RocketMQ–集群搭建2–broker 配置文件
修改 服务器 192.168.25.138 上的 broker 配置文件 配置 master2
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-b.properties
# 修改配置如下
# 所属集群名字
brokerclusterName=rocketmq-cluster
# broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
# 0 表示 Master, >0 表示 slave
brokerid=0
# nameserver 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autocreatesubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deletewhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
#- ASYNC MASTER异步复制Master
#- SYNC MASTER同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
#- ASYNC FLUSH 异步刷盘
#- SYNC FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
# checkTransactionMessageEnable=false
# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
4、RocketMQ–集群搭建2–broker 配置文件
修改 服务器 192.168.25.138 上的 broker 配置文件 配置 slave1
vim /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-a-s.properties
# 修改配置如下
# 所属集群名字
brokerclusterName=rocketmq-cluster
# broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示 Master, >0 表示 slave
brokerid=1
# nameserver 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autocreatesubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deletewhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketmq/store
# commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
#- ASYNC MASTER异步复制Master
#- SYNC MASTER同步双写Master
#- SLAVE
brokerRole=SLAVE
# 刷盘方式
#- ASYNC FLUSH 异步刷盘
#- SYNC FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# checkTransactionMessageEnable=false
# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
5、RocketMQ–集群搭建2–broker 配置文件 注意事项:
# 1)进入 broker 配置文件 目录:
cd /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/
# 2)注意配置端口号,不要冲突
- nameserver 默认使用 9876 端口
- master 默认使用 10911 端口
- slave 默认使用 11011 端口
# 3)注意修改的 broker 配置文件名和端口号(同一虚拟机服务器端口号不能重复)
192.168.25.135 rocketmq-master1 vim broker-a.properties 端口:10911
192.168.25.135 rocketmq-slave2 vim broker-b-s.properties 端口:11011
192.168.25.138 rocketmq-master2 vim broker-b.properties 端口:10911
192.168.25.138 rocketmq-slave1 vim broker-a-s.properties 端口:11011
七、RocketMQ – 集群搭建3
1、RocketMQ–集群搭建3–修改启动脚本文件
(注意:在两个虚拟机上都要修改配置:192.168.25.135 和 192.168.25.138)
# 1)修改 runbroker.sh
vi /usr/1oca1/rocketmq/rocketmq-all-4.5.1-bin-release/bin/runbrokeF.sh
# 需要根据内存大小进行适当的对JVM参数进行调整:
# 开发环境配置 JVM Configuration
JAVA_OPT="${JAVA_OPT} -server -xms256m -Xmx256m -Xmn128m"
# 2) 修改 runserver.sh
vim /usr/loca1/rocketmq/rocketmq-all-4.5.1-bin-release/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -xms256m -xmx256m -Xmn128m-XX:Metaspacesize=128m-
XX:MaxMetaspacesize=320m"
2、RocketMQ–集群搭建3–服务启动
# 1)启动 NameServe 集群
# 分别在 192.168.25.135 和 192.168.25.138 启动 NameServer
# 切换目录
cd /usr/loca1/rocketmq/rocketmq-all-4.5.1-bin-release/bin/
# 启动 NameServe
nohup sh mqnamesrv &
# 查看启动结果
jps
# 2)启动 Broker 集群--# 在 192.168.25.135 上启动 master1 和 slave2
# master1 切换目录
cd /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/
# 启动 master1
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-a.properties &
# slave2: 切换目录
cd /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/
# 启动 slave2
nohup sh mqbroker -c /usr/loca1/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-b-s.properties &
# 查看启动结果
jps
# 3)启动 Broker 集群--# 在 192.168.25.138 上启动 master2 和 slave1
# master2 切换目录
cd /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/
# 启动 master2
nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-b.properties &
# slave1: 切换目录
cd /usr/local/rocketmq/rocketmq-all-4.5.1-bin-release/bin/
# 启动 slave1
nohup sh mqbroker -c /usr/loca1/rocketmq/rocketmq-all-4.5.1-bin-release/conf/2m-2s-sync/broker-a-s.properties &
# 查看启动结果
jps
3、RocketMQ–集群搭建3–查看进程状态和查看日志
# 1)启动后通过 jps 查看启动进程
jps
# 2)查看日志
# 查看 nameServer 日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看 broker 日志
tail -500f ~/logs/rocketmqlogs/broker.log
八、RocketMQ – 双主双从集群搭建小结
1、总体架构
2、集群搭建工作流程
3、服务器环境
4、Host 添加信息
5、防火墙配置
6、环境变量配置
7、创建消息存储路径
8、broker 配置文件
9、修改启动脚本文件
10、服务启动
11、查看进程状态
12、查看日志。
上一节关联链接请点击:
# 全面解剖 消息中间件 RocketMQ-(1)