目录
一、引言
二、简介
三、Linux环境搭建&安装包下载
四、RocketMQ部署服务的简述
五、RocketMQ部署集中集群方式和配置
六、配置(注意对应版本)
一、引言
适用读者:一切使用RocketMQ的人员。
文章目的:主要介绍RocketMQ的集中部署方式和方法、使开发人员快速搭建RocketQM服务器。
二、简介
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:
-
能够保证严格的消息顺序
-
提供丰富的消息拉取模式
-
提供丰富的消息拉取模式
-
高效的订阅者水平扩展能力
-
实时的消息订阅机制
-
亿级消息堆积能力
三、Linux环境搭建&安装包下载
- Java环境:RocketMQ依赖于宿主机Java环境,所以首先确保Java环境安装到位,如有不懂Java安装,问度娘。
- Rocket下载:https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz 具体版本根据需求下载,此处以3.2.6作为案例版本。
- 复制、解压安装包:将安装包上传至服务器目标目录,例:opt/tpapp/。解压安装包即可: tar -zxvf alibaba-rocketmq-3.2.6.tar.gz opt/tpapp/rocketmq 。
四、RocketMQ部署服务的简述
我们需要部署NameSever和Broker服务器,这两个服务有什么作用和用途请看下图:
NameServer:
RocketMQ-nameSrv用于管理所有broker的信息,以便于Producer和Consumer能够获取到正确的Broker信息,进行业务处理;
可以看到NameSrv的主要管理内容如下:
1. 接收Broker的注册,注销请求。
2. Producer获取topic下的所有BrokerQueue,put消息。
3. Consumer获取topic下所有的BrokerQueue,get消息。
Broker:
负责存储消息,提供读写功能。
RocketMQ依赖于NameServer服务器和Broker服务器,所以我们需要分别部署NameServer和Broker(实际上只是用同一个软件文件,以不同的命令和配置区分别启动NameServer服务和Broker服务的)。
五、RocketMQ部署集中集群方式和配置
NameServer一般作多台集群部署(2台:namesrv1、namesrv2):
启动:namesrv1 : nohup opt/tpapp/rocketmq/bin/mqnamesrv &
namesrv2 : nohup opt/tpapp/rocketmq/bin/mqnamesrv &
停止:sh mqshutdown namesrv
Broker部署集群方式有以下几种(Master和Slave都是指Broker服务器):
1.单Master(一台服务器)
优点:除了配置简单没什么优点
缺点:不可靠,该机器重启或宕机,将导致整个服务不可用
启动:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-noslave/broker-a.properties)
2.多Master(两台或多台服务器)
优点:配置简单,性能最高
缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性
启动:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-noslave/broker-a.properties)
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-noslave/broker-b.properties)
3.多Master多Slave,每个Master配一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级()
优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预
缺点:Master宕机或磁盘损坏时会有少量消息丢失
启动:
Master:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-async/broker-a.properties)
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-async/broker-a-s.properties)
Slave:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-async/broker-b.properties)
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-async/broker-b-s.properties)
4.多Master多Slave,每个Master配一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功
优点:服务可用性与数据可用性非常高
缺点:性能比一部HA略低,当前版本主宕备不能自动切换为主
Master和Slave的配置文件参考conf目录下的配置文件
Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数
一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分
Master:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-sync/broker-a.properties)
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-sync/broker-a-s.properties)
Slave:
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-sync/broker-b.properties)
nohup opt/tpapp/rocketmq/bin/mqbroker -n namesrv1IP;namesrv2IP -c 配置文件(../conf/2m-2s-sync/broker-b-s.properties)
5. 停止broker
sh mqshutdown broker
六、配置(注意对应版本)
1.Broker配置
参数名 |
默认值 |
说明 |
consumerGroup | Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 | |
listenPort | 10911 | Broker对外服务的监听端口 |
namesrvAddr | Null | Name Server地址 |
brokerIP1 | 本机IP | 本机IP地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况下可以人工配置。 |
brokerName | 本机主机名 | |
brokerClusterName | DefaultCluster | Broker所属哪个集群 |
brokerId | 0 | BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master和Slave通过BrokerName来配对 |
storePathCommitLog | $HOME/store/commitlog | commitLog存储路径 |
storePathConsumeQueue | $HOME/store/consumequeue | 消费队列存储路径 |
storePathIndex | $HOME/store/index | 消息索引存储队列 |
deleteWhen | 4 | 删除时间时间点,默认凌晨4点 |
fileReservedTime | 48 | 文件保留时间,默认48小时 |
maxTransferBytesOnMessageInMemory | 262144 | 单次pull消息(内存)传输的最大字节数 |
maxTransferCountOnMessageInMemory | 32 | 单次pull消息(内存)传输的最大条数 |
maxTransferBytesOnMessageInMemory | 65535 | 单次pull消息(磁盘)传输的最大字节数 |
maxTransferCountOnMessageInDisk | 8 | 单次pull消息(磁盘)传输的最大条数 |
messageIndexEnable | TRUE | 是否开启消息索引功能 |
messageIndexSafe | FALSE | 是否提供安全的消息索引机制,索引保证不丢 |
brokerRole | ASYNC_MASTER | Broker的角色 -ASYNC_MASTER异步复制Master -SYNC_MASTER同步双写Master -SLAVE |
flushDiskType | ASYNC_FLUSH | 刷盘方式 -ASYNC_FLUSH异步刷盘 -SYNC_FLUSH同步刷盘 |
cleanFileForciblyEnable | TRUE | 磁盘满,且无过期文件情况下TRUE表示强制删除文件,优先保证服务可用 FALSE标记服务不可用,文件不删除 |
2.客户端公共配置类:clientconfig
参数名 | 默认值 | 说明 |
NamesrvAddr | NameServer地址列表,多个nameServer地址用分号隔开 | |
clientIP | 本机IP | 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
instanceName | DEFAULT | 客户端实例名称,客户端创建的多个Producer,Consumer实际是共用一个内部实例(这个实例包含网络连接,线程资源等) |
clientCallbackExecutorThreads | 4 | 通信层异步回调线程数 |
pollNameServerInteval | 30000 | 轮训Name Server 间隔时间,单位毫秒 |
heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
3.producer配置
参数名 | 默认值 | 说明 |
producerGroup | DEFAULT_PRODUCER | Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。 |
createTopicKey | TBW102 | 在发送消息时,自动创建服务器不存在的topic,需要指定key |
defaultTopicQueueNums | 4 | 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 |
sendMsgTimeout | 10000 | 发送消息超时时间,单位毫秒 |
compressMsgBodyOverHowmuch | 4096 | 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
retryAnotherBrokerWhenNotStoreOK | FALSE | 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
maxMessageSize | 131072 | 客户端限制的消息大小,超过报错,同时服务端也会限制(默认128K) |
transactionCheckListener | 事物消息回查监听器,如果发送事务消息,必须设置 | |
checkThreadPoolMinSize | 1 | Broker回查Producer事务状态时,线程池大小 |
checkThreadPoolMaxSize | 1 | Broker回查Producer事务状态时,线程池大小 |
checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
4.pushConsumer配置
参数名 |
默认值 |
说明 |
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应将它们归为同一组 |
messageModel | CLUSTERING | 消息模型,支持以下两种1.集群消费2.广播消费 |
consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从什么位置开始消费 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
Subscription | {} | 订阅关系 |
messageListener | 消息监听器 | |
offsetStore |
| 消费进度存储 |
consumeThreadMin | 10 | 消费线程池数量 |
consumeThreadMax | 20 | 消费线程池数量 |
consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费允许的最大跨度 |
pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 |
Pullinterval | 0 | 拉消息间隔,由于是长轮询,所以为0,但是如果应用了流控,也可以设置大于0的值,单位毫秒 |
consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
5.pullConsumer
参数名 |
默认值 |
说明 |
consumerGroup | Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 | |
brokerSuspendMaxTimeMillis | 20000 | 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 |
consumerPullTimeoutMillis | 10000 | 非长轮询,拉消息超时时间,单位毫秒 |
consumerTimeoutMillisWhenSuspend | 30000 | 长轮询,Consumer拉消息请求咋broker挂起超过指定时间,客户端认为超时,单位毫秒 |
messageModel | BROADCASTING | 消息模型,支持以下两种:1集群消费 2广播模式 |
messageQueueListener | 监听队列变化 | |
offsetStore | 消费进度存储 | |
registerTopics | 注册的topic集合 | |
allocateMessageQueueStrategy | Rebalance算法实现策略 |