初次使用rocketmq,记录一下全流程步骤。
1、下载安装包
首先在官网,下载安装包,可也根据官方文档进行部署,但有一些细节没说明,可能会有坑,本文会尽量详细的描述每个步骤,把我踩过的坑填补上。
下载完安装包后,解压到服务器指定目录下,我这里下载的是二进制包,解压放在/usr/local/rocketmq-all-5.2.0-bin-release。
2、配置环境变量(不配也行,这步是为了执行命令方便)
找到文件/etc/profile
文件末尾追加:
export ROCKETMQ_HOME=/usr/local/rocketmq-all-5.2.0-bin-release
export PATH=$PATH:$ROCKETMQ_HOME/bin
执行命令,让环境变量立即生效
source /etc/profile
3、修改防火墙设置
rocketmq的通信会用到多个端口, 为了方便测试,我们关闭防火墙:
# 关闭防火墙
systemctl stop firewalld.service
# 禁止防火墙开机启动
systemctl disable firewalld.service
# 查看防火墙状态
firewall-cmd --state
4、修改rocketmq默认jvm内存配置
rocketmq默认jvm内存配置过大,会导致启动失败或运行异常,在安装目录下的bin目录里,找到runserver.sh文件,搜索“JAVA_OPT="${JAVA_OPT} -server”,修改前:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改后:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
还有runbroker.sh文件,修改前:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
修改后:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
5、配置broker.conf文件
新增brokerIP1,namesrvAddr
#当前 broker 监听的 IP,默认值 网卡的 InetAddress
brokerIP1=192.168.210.100
#name server服务器地址及端口,可以是多个,分号隔开
namesrvAddr=192.168.210.100:9876
文章末尾会补充broker.conf文件的其他配置项及说明,可根据项目情况选择性配置。
6、启动NameServer
### 启动namesrv
$ nohup sh bin/mqnamesrv &
### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
我们可以在namesrv.log 中看到 'The Name Server boot success..', 表示NameServer 已成功启动。
7、启动Broker+Proxy
NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。
### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/proxy.log
The broker[broker-a,192.169.1.2:10911] boot success...
我们可以在 proxy.log 中看到“The broker[brokerName,ip:port] boot success..”,这表明 broker 已成功启动。
8、部署完成,测试消息收发
至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们可以利用脚本进行简单的消息收发。
在进行工具测试消息收发之前,我们需要告诉客户端NameServer的地址,RocketMQ有多种方式在客户端中设置NameServer地址,这里我们利用环境变量NAMESRV_ADDR
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
后续会补充SDK版实现消息收发。。。
附录 broker.conf文件配置说明
#接受客户端连接的监听端口,默认10911
listenPort=10911
#name server服务器地址及端口,可以是多个,分号隔开
namesrvAddr=192.168.1.100:9876
#当前 broker 监听的 IP,默认值 网卡的 InetAddress
brokerIP1=
#存在主从 broker 时,如果在 broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步
brokerIP2=跟 brokerIP1 一样
#broker名称,用于主从配对,相同名称的broker才能做主从设置
brokerName=mq_broker_1
#broker集群名称,用于划分broker
brokerClusterName=MQCluster001
#用于标识主从关系,0为主,其他大于0的为从(不能小于0)master设置0,slave设置1。Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是Producer只能和Master角色的Broker连接写人消息:Consumer可以连接Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
#Master节点设置
brokerId=0
#Slave节点设置
#brokerId=1
#创建topic时,若未指定topic下的队列数,则取该默认值作为默认队列数
defaultTopicQueueNums=8
#是否自动创建默认topic,生产需保持关闭
autoCreateTopicEnable=true
#是否自动创建topic的订阅组,默认开启
autoCreateSubscriptionGroup=true
#未消费的持久化消息清理时间点,在每天的什么时间删除已经超过文件保留时间的 commit log
deleteWhen=04
#持久化消息保存周期(单位:小时),超过该周期将被清理
fileReservedTime=24
#单个commitLog文件的大小限制(单位:字节)
mapedFileSizeCommitLog=1073741824
#单个consumeQueue大小限制(存储的消息条数 * 每条消息的索引大小20)
mapedFileSizeConsumeQueue=8000000
#存储使用率阀值,当使用率超过阀值时,将拒绝发送消息请求
diskMaxUsedSpaceRatio=88
#持久化消息存储根路径,默认值 $HOME/store/
storePathRootDir=/data/store
#commitLog文件存储路径 默认值$HOME/store/commitlog/
storePathCommitLog=/data/store/commitlog
#存储 consume queue 的路径 默认值 $HOME/store/consumequeue/
storePathConsumerQueue=/data/store/consumequeue
#最大消息大小限制(单位:字节)
maxMessageSize=65536
#commitLog最少刷盘page数
flushCommitLogLeastPages=4
#consumeQueue最少刷盘page数
flushConsumeQueueLeastPages=2
#commitLog刷盘间隔时间
flushCommitLogThoroughInterval=10000
#consumeQueue刷盘间隔时间
flushConsumeQueueThoroughInterval=60000
#处理消息发送线程池大小
sendMessageThreadPoolNums=128
#处理消息拉取线程池大小
pullMessageThreadPoolNums=128
#broker角色(SYNC_MASTER:同步双写Master、ASYNC_MASTER:异步复制Master、SLAVE:Slave)
brokerRole=ASYNC_MASTER
#Slave节点设置
#brokerRole=SLAVE
#刷盘方式 SYNC_FLUSH/ASYNC_FLUSH
#SYNC_FLUSH 模式下的 broker 保证在收到确认生产者之前将消息刷盘。
#ASYNC_FLUSH 模式下的 broker 则利用刷盘一组消息的模式,可以取得更好的性能。
flushDiskType=ASYNC_FLUSH