目录
1. 单机搭建
2. 测试RocketMQ
3. 集群搭建
4. 集群启动
5. RocketMQ-DashBoard搭建
6. 不同类型消息发送
1.同步消息
2. 异步消息发送
3. 单向发送消息
7. 消费消息
1. 单机搭建
1. 先从rocketmq官网下载二进制包,ftp上传至linux服务器,unzip命令解压。
2. 启动NameServer
# 1.启动NameServer
nohup sh bin/mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
3. 启动Broker
RocketMQ默认的虚拟机内存较大,启动Broker如果因为内存不足失败,需要编辑如下两个配置文件,修改JVM内存大小。
# 编辑runbroker.sh和runserver.sh修改默认JVM大小
vi runbroker.sh
vi runserver.sh
- 参考设置:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# 1.启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
2. 测试RocketMQ
1. 发送消息
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
2. 接收消息
# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
3. 关闭RocketMQ
# 1.关闭NameServer
sh mqshutdown namesrv
# 2.关闭Broker
sh mqshutdown broker
3. 集群搭建
这里采用broker双主双从(同步模式),NameServer、Producer、Producer集群由于无状态性,搭建简单。(Master和Slave的brokerName相同,brokerId不同)
1. 服务器环境
序号 | IP | 角色 | 架构模式 |
---|---|---|---|
1 | 192.168.183.132 | nameserver、brokerserver | Master1、Slave2 |
2 | 192.168.183.128 | nameserver、brokerserver | Master2、Slave1 |
2. hosts配置(两台一样配置)
vi /etc/hosts
# 重启网卡
systemctl restart network
3. 防火墙关闭
# 关闭防火墙
systemctl stop firewalld.service
# 查看防火墙的状态
firewall-cmd --state
# 禁止firewall开机启动
systemctl disable firewalld.service
4. 创建消息存储路径
mkdir -p /root/store/master1
mkdir -p /root/store/master1/commitlog
mkdir -p /root/store/master1/consumequeue
mkdir -p /root/store/master1/index
mkdir -p /root/store/slave2
mkdir -p /root/store/slave2/commitlog
mkdir -p /root/store/slave2/consumequeue
mkdir -p /root/store/slave2/index
mkdir -p /root/store/master2
mkdir -p /root/store/master2/commitlog
mkdir -p /root/store/master2/consumequeue
mkdir -p /root/store/master2/index
mkdir -p /root/store/slave1
mkdir -p /root/store/slave1/commitlog
mkdir -p /root/store/slave1/consumequeue
mkdir -p /root/store/slave1/index
5. broker配置文件
1)master1
服务器:192.168.183.132
vi /root/rocketmq4.4/conf/2m-2s-sync/broker-a.properties
# 所属集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意:不同的broker名字必须不同
brokerName=broker-a
# brokerId, brokerId=0表示Master,大于0表示Slave
brokerId=0
brokerIP1=rocketmq-master1
# nameserver地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defalutTopicQueueNums=4
# 是否允许自动创建Topic,建议线下环境开启,线上环境关闭
autoCreateTopicEnable=true
# 是否允许自动创建订阅组,建议线下环境开启,线上环境关闭
autoCreateSubscriptionGroup=true
# Broker 对外暴露端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时(测试环境建议设置120分钟)
fileReservedTime=120
# commitlog文件大小,默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/store/master1
#commitLog 存储路径
storePathCommitLog=/root/store/master1/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/store/master1/consumequeue
#消息索引存储路径
storePathIndex=/root/store/master1/index
#checkpoint 文件存储路径
storeCheckpoint=/root/store/master1/checkpoint
#abort 文件存储路径
abortFile=/root/store/master1/abort
# 限制消息的大小
maxMessageSize=65536
# Broker角色
brokerRole=SYNC_MASTER
# 刷盘方式
flushDiskType=SYNC_FLUSH
2)slave2
服务器:192.168.183.132
vi /root/rocketmq4.4/conf/2m-2s-sync/broker-b-s.properties
# 所属集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意:不同的broker名字必须不同
brokerName=broker-b
# brokerId, brokerId=0表示Master,大于0表示Slave
brokerId=1
brokerIP1=rocketmq-slave2
# nameserver地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defalutTopicQueueNums=4
# 是否允许自动创建Topic,建议线下环境开启,线上环境关闭
autoCreateTopicEnable=true
# 是否允许自动创建订阅组,建议线下环境开启,线上环境关闭
autoCreateSubscriptionGroup=true
# Broker 对外暴露端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时(测试环境建议设置120分钟)
fileReservedTime=120
# commitlog文件大小,默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/store/slave2
#commitLog 存储路径
storePathCommitLog=/root/store/slave2/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/store/slave2/consumequeue
#消息索引存储路径
storePathIndex=/root/store/slave2/index
#checkpoint 文件存储路径
storeCheckpoint=/root/store/slave2/checkpoint
#abort 文件存储路径
abortFile=/root/store/slave2/abort
# 限制消息的大小
maxMessageSize=65536
# Broker角色
brokerRole=SLAVE
# 刷盘方式
flushDiskType=ASYNC_FLUSH
3)master2
服务器:192.168.183.128
vi /root/rocketmq4.4/conf/2m-2s-sync/broker-b.properties
# 所属集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意:不同的broker名字必须不同
brokerName=broker-b
# brokerId, brokerId=0表示Master,大于0表示Slave
brokerId=0
brokerIP1=rocketmq-master2
# nameserver地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defalutTopicQueueNums=4
# 是否允许自动创建Topic,建议线下环境开启,线上环境关闭
autoCreateTopicEnable=true
# 是否允许自动创建订阅组,建议线下环境开启,线上环境关闭
autoCreateSubscriptionGroup=true
# Broker 对外暴露端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时(测试环境建议设置120分钟)
fileReservedTime=120
# commitlog文件大小,默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/store/master2
#commitLog 存储路径
storePathCommitLog=/root/store/master2/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/store/master2/consumequeue
#消息索引存储路径
storePathIndex=/root/store/master2/index
#checkpoint 文件存储路径
storeCheckpoint=/root/store/master2/checkpoint
#abort 文件存储路径
abortFile=/root/store/master2/abort
# 限制消息的大小
maxMessageSize=65536
# Broker角色
brokerRole=SYNC_MASTER
# 刷盘方式
flushDiskType=SYNC_FLUSH
4)slave1
服务器:192.168.183.128
vi /root/rocketmq4.4/conf/2m-2s-sync/broker-a-s.properties
# 所属集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意:不同的broker名字必须不同
brokerName=broker-a
# brokerId, brokerId=0表示Master,大于0表示Slave
brokerId=1
brokerIP1=rocketmq-slave1
# nameserver地址,分号分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defalutTopicQueueNums=4
# 是否允许自动创建Topic,建议线下环境开启,线上环境关闭
autoCreateTopicEnable=true
# 是否允许自动创建订阅组,建议线下环境开启,线上环境关闭
autoCreateSubscriptionGroup=true
# Broker 对外暴露端口
listenPort=11011
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时(测试环境建议设置120分钟)
fileReservedTime=120
# commitlog文件大小,默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 检测物理文件磁盘空间
disMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/store/slave1
#commitLog 存储路径
storePathCommitLog=/root/store/slave1/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/store/slave1/consumequeue
#消息索引存储路径
storePathIndex=/root/store/slave1/index
#checkpoint 文件存储路径
storeCheckpoint=/root/store/slave1/checkpoint
#abort 文件存储路径
abortFile=/root/store/slave1/abort
# 限制消息的大小
maxMessageSize=65536
# Broker角色
brokerRole=SLAVE
# 刷盘方式
flushDiskType=ASYNC_FLUSH
4. 集群启动
1)启动NameServe集群
分别在192.168.183.132和192.168.183.128启动NameServer
nohup sh mqnamesrv &
启动成功。
2)启动broker集群
- 在192.168.183.132上启动master1和slave2
# master1
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-a.properties &
# slave2
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-b-s.properties &
启动成功。
- 在192.168.183.128上启动master2和slave1
# slave1
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-a-s.properties &
# master2
nohup sh mqbroker -c /root/rocketmq4.4/conf/2m-2s-sync/broker-b.properties &
3)日志查看
# 查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log
5. RocketMQ-DashBoard搭建
github上拉取项目后,修改yml的namesrvAddrs即可。
6. 不同类型消息发送
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
1.同步消息
SyncProducer.java
/**
* 发送同步消息
*/
public class SyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置NameServer地址
producer.setNamesrvAddr("192.168.183.132:9876;192.168.183.128:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag,消息体
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息到broker
SendResult sendResult = producer.send(msg);
// 发送状态
SendStatus status = sendResult.getSendStatus();
// 消息ID
String msgId = sendResult.getMsgId();
// 消息接收队列ID
int queueId = sendResult.getMessageQueue().getQueueId();
System.out.println("发送状态: " + status + " 消息ID: " + msgId + " 消息接收队列ID: " + queueId);
TimeUnit.SECONDS.sleep(1);
}
// 如果不再发送消息,关闭Producer实例
producer.shutdown();
}
}
2. 异步消息发送
AsyncProducer1.java
public class AsyncProducer1 {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 设置NameServer地址
producer.setNamesrvAddr("192.168.183.132:9876;192.168.183.128:9876");
// 启动Producer实例
producer.start();
// 设置重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag,消息体
Message msg = new Message("TopicTest",
"TagB",
("Hello World " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// SendCallback 接收异步返回结果的回调
int finalI = i;
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送结果:" + sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送异常:" + throwable);
}
});
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}
3. 单向发送消息
不关心结果,比如日志发送
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("192.168.183.132:9876;192.168.183.128:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
7. 消费消息
public class Consumer1 {
public static void main(String[] args) throws MQClientException {
// 实例化消费者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 指定NameServer地址信息
consumer.setNamesrvAddr("192.168.183.132:9876;192.168.183.128:9876");
// 订阅Topic
consumer.subscribe("TopicTest", "*");
// 负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING); // 广播模式 MessageModel.BROADCASTING
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Consumer Started.");
}
}
负载均衡模式消费:多个消费者共同处理broker消息队列的消息。
广播模式消费:每个消费者都会收到订阅的Topic的消息。
持续更新中......