本文档是RocketMQ4.8两主两从异步复制的搭建过程(也可单机部署,更简单一点),包括监控台界面.
写在前面:本文档适用于4.8版本,其它版本的坑没有踩过不清楚。我是用VMware启了两台虚拟机,环境:CentOS7,JDK1.8,Maven3.6.3,提前设置好PATH
RocketMQ支持多种集群策略
2m-2s-async(本文采用模式)-2主2从异步刷盘(吞吐量较大,但是消息可能丢失)
2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全)
2m-noslave :2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置
dledger:用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,
其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。
示意图如下:
两主两从的服务分布如下:
这样的分布让主从节点不在一台机器上,一台机器挂了消费者可以继续消费(开源版好像没有自动的主从切换,且只有brokerId=1的slave节点提供读服务)
192.168.198.129 namesrv1
192.168.198.130 namesrv2
192.168.198.129 broker-a master
192.168.198.129 broker-b-s slave
192.168.198.130 broker-b master
192.168.198.130 broker-a-s slave
1.下载和编译
#进入想要放的文件夹
cd /usr/local
#下载压缩包
wget https://archive.apache.org/dist/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
#解压
unzip rocketmq-all-4.8.0-bin-release.zip
#进入解压好的目录
cd rocketmq-all-4.8.0-source-release
#编译
mvn -Prelease-all -DskipTests clean install -U
#进入编译好的目录
cd distribution/target
#下面有rocketmq-4.8.0、rocketmq-4.8.0.tar.gz、rocketmq-4.8.0.zip,我们用rocketmq-4.8.0.tar.gz即可(下面有),将其放到/user/local/解压)
cd /usr/local/rocketmq-4.8.0
2.修改配置
此处有两个坑,一个是:JAVA_HOME的配置(配置为自己的要用的JDK目录,不一定是PATH中的JAVA_HOME),一个是:JVM内存配置(根据机器内存,rocketmq默认设置的比较大,可能启动不起来)
#进入bin目录
cd /usr/local/rocketmq-4.8.0
#修改server的启动脚本
vim bin/runserver.sh
#内容如下
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/local/jdk1.8.0_161
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
#修改broker的启动脚本
vim bin/runbroker.sh
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/local/jdk1.8.0_161
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=256m"
#修改tools的启动脚本
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/local/jdk1.8.0_161
#[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
#[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
JAVA_OPT="${JAVA_OPT} -server -Xms64m -Xmx64m -Xmn32m -XX:MetaspaceSize=32m -XX:MaxMetaspaceSize=32m"
3.创建存储路径
这里的存储路径可在4中的broker-*.properties中看到什么含义
#机器1上,我的是192.168.198.129
mkdir -p /usr/local/rocketmq/store/broker-a /usr/local/rocketmq/store/broker-a/nsumequeue /usr/local/rocketmq/store/broker-a/commitlog /usr/local/rocketmq/store/broker-a/index /usr/local/rocketmq/logs /usr/local/rocketmq/store/broker-b-s /usr/local/rocketmq/store/broker-b-s/nsumequeue /usr/local/rocketmq/store/broker-b-s/commitlog /usr/local/rocketmq/store/broker-b-s/index
#机器2上,我的是192.168.198.130
mkdir -p /usr/local/rocketmq/store/broker-a-s /usr/local/rocketmq/store/broker-a-s/nsumequeue /usr/local/rocketmq/store/broker-a-s/commitlog /usr/local/rocketmq/store/broker-a-s/index /usr/local/rocketmq/logs /usr/local/rocketmq/store/broker-b /usr/local/rocketmq/store/broker-b/nsumequeue /usr/local/rocketmq/store/broker-b/commitlog /usr/local/rocketmq/store/broker-b/index
4.修改broker的配置文件
两台机器都要修改,进入/conf/2m-2s-async目录,对于节点1(192.168.198.129),分别修改broker-a.properties,broker-b-s.properties;对于节点2(192.168.198.130),分别修改broker-b.properties,broker-a-s.properties,修改下面的ip可以直接粘贴过去
节点1 broker-a.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=192.168.198.129:9876;192.168.198.130:9876
brokerIP1=192.168.198.129
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口,
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-a
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-a/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-a/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-a/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
节点1 broker-b-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
brokerName=broker-b
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-b,他的slave也叫broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=192.168.198.129:9876;192.168.198.130:9876
brokerIP1=192.168.198.129
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口,
listenPort=10920
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-b-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-b-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-b-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-b-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
节点2 broker-b.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-a,他的slave也叫broker-a
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=192.168.198.129:9876;192.168.198.130:9876
brokerIP1=192.168.198.130
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口,
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-b
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-b/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-b/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-b/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
节点2 broker-a-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
brokerName=broker-a
#broker名字,名字可重复,为了管理,每个master起一个名字,他的slave同他,eg:Amaster叫broker-b,他的slave也叫broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=192.168.198.129:9876;192.168.198.130:9876
#brokerIP1=192.168.198.130
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口,
listenPort=10920
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store/broker-a-s
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/broker-a-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/broker-a-s/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/broker-a-s/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
5.启动
几主几从,取决于启动多少broker,如果只启动broker-a,就是一主,如果启动broker-a和broker-b就是两主,如果启动broker-a和borker-a-s就是一主一从
分别启动两台机器上的nameServer和broker,启动后可以通过jps -l 查看有没有进程(有进程不一定启成功了,具体看日志文件),日志文件的位置可以看/conf/logback_*.xml中的配置
节点1
#启动nameserver
cd /usr/local/rocketmq-4.8.0
nohup sh bin/mqnamesrv >/usr/local/rocketmq/logs/mqnamesrv.log 2>&1 &
#启动broker-a
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties > /usr/local/rocketmq/logs/broker-a.log 2>&1 &
#启动broker-b-s
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties > /usr/local/rocketmq/logs/broker-b-s.log 2>&1 &
节点2
#启动nameserver
cd /usr/local/rocketmq-4.8.0
nohup sh bin/mqnamesrv >/usr/local/rocketmq/logs/mqnamesrv.log 2>&1 &
#启动broker-b
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties > /usr/local/rocketmq/logs/broker-b.log 2>&1 &
#启动broker-a-s
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties > /usr/local/rocketmq/logs/broker-a-s.log 2>&1 &
6.监控台启动
命令如下:
#启动监控台,过程可能较慢,耐心等待,可以在任意机器上启动,只要能telnet通nameserver对应的ip和端口
java -jar rocketmq-console-ng-1.0.0.jar --server.port=8000 --rocketmq.config.namesrvAddr=192.168.198.129:9876;192.168.198.130:9876
#查看监控台,我是在192.168.198.130上启的
浏览器192.168.198.130:8000
7.停止RocketMQ
先停止broker再停止namesrv,或者使用jps-l查看进程再kill也可以
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
8.模拟使用
8.1 添加依赖
<!--注意: 这里的版本,要和部署在服务器上的版本号一致-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
8.2配置类
配置类我写死了host,可以写到配置文件
public class JmsConfig {
/**
* Name Server 地址,因为是集群部署 所以有多个用 分号 隔开
*/
public static final String NAME_SERVER = "192.168.198.129:9876;192.168.198.130:9877";
/**
* 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
*/
public static final String TOPIC = "topic_Dragon";
}
8.3Producer(生产者)
@Slf4j
@Component
public class Producer {
private String producerGroup = "test_producer";
private DefaultMQProducer producer;
public Producer(){
//示例生产者
producer = new DefaultMQProducer(producerGroup);
//不开启vip通道 开通口端口会减2
producer.setVipChannelEnabled(false);
//绑定name server
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
start();
}
/**
* 对象在使用之前必须要调用一次,只能初始化一次
*/
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public DefaultMQProducer getProducer(){
return this.producer;
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown(){
this.producer.shutdown();
}
}
8.4Consumer(消费者)
@Slf4j
@Component
public class Consumer {
/**
* 消费者实体对象
*/
private DefaultMQPushConsumer consumer;
/**
* 消费者组
*/
public static final String CONSUMER_GROUP = "test_consumer";
/**
* 通过构造函数 实例化对象
*/
public Consumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅主题和 标签( * 代表所有标签)下信息
consumer.subscribe(JmsConfig.TOPIC, "*");
// //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// msgs中只收集同一个topic,同一个tag,并且key相同的message
// 会把不同的消息分别放置到不同的队列中
try {
for (Message msg : msgs) {
//消费者获取消息 这里只输出 不做后面逻辑处理
String body = new String(msg.getBody(), "utf-8");
log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("消费者 启动成功=======");
}
}
8.5Controller(检验)
@Slf4j
@RestController
public class RocketMQController {
@Autowired
private Producer producer;
private List<String> mesList;
/**
* 初始化消息
*/
public RocketMQController() {
mesList = new ArrayList<>();
mesList.add("西门吹雪");
mesList.add("叶孤城");
mesList.add("陆小凤");
mesList.add("李寻欢");
mesList.add("楚留香");
}
@RequestMapping("/text/rocketmq")
public Object callback() throws Exception {
//总共发送五次消息
for (String s : mesList) {
//创建生产信息
Message message = new Message(JmsConfig.TOPIC, "testtag", ("传说中的大侠:" + s).getBytes());
//发送
SendResult sendResult = producer.getProducer().send(message);
log.info("输出生产者信息={}",sendResult);
}
return "成功";
}
}
8.6访问,打印日志和监控台变化
http://localhost/text/rocketmq
日志:
监控台: