小伙伴们,你们好呀,我是老寇,好久不见啦,甚是想念。
rocketmq和rocketmq-console安装包:https://pan.baidu.com/s/1swrV9ffJnmz4S0mfkuBbIw
提取码:1111
1.准备三台主机
192.168.1.1 | rocketmq,rocketmq-console(机器1 主) |
192.168.1.2 | rocketmq(机器2 从) |
192.168.1.3 | rocketmq(机器3 从) |
2.机器1操作
# 将文件移动到/opt/mq目录下
# 解压文件
unzip rocketmq-5.0.0.zip
mv rocketmq-all-5.0.0-bin-release /opt/mq/rocketmq-5.0.0
cd /opt/mq/rocketmq-5.0.0
# 修改配置文件
vim conf/dledger/broker-n0.conf
# 集群名称
brokerClusterName = RaftCluster
# broker组名,同一个RaftClusterGroup内,brokerName名要一样
brokerName=RaftNode00
# 监听端口
listenPort=30911
# 设置NaemServer地址和端口
# 是用分号隔开,不是逗号啊,不然无法集群
namesrvAddr=192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
# n0 n1 n2 分别是broker1,broker2,broker3 的 DledgerSelfId
# DlegerPeers=n0-ip:40911...
dLegerPeers=n0-192.168.1.1:40911;n1-192.168.1.2:40912;n2-192.168.1.3:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
修改runbroker.sh
修改runserver.sh
# 后台启动nameserver
nohup sh bin/mqnamesrv > nohubNameserv &
# 后台启动broker
nohup sh bin/mqbroker > nohubBroker -c conf/dledger/broker-n0.conf &
3.机器2操作
# 将文件移动到/opt/mq目录下
# 解压文件
unzip rocketmq-5.0.0.zip
mv rocketmq-all-5.0.0-bin-release /opt/mq/rocketmq-5.0.0
cd /opt/mq/rocketmq-5.0.0
# 修改配置文件
vim conf/dledger/broker-n1.conf
# 集群名称
brokerClusterName = RaftCluster
# broker组名,同一个RaftClusterGroup内,brokerName名要一样
brokerName=RaftNode00
# 监听端口
listenPort=30921
# 设置NaemServer地址和端口
# 是用分号隔开,不是逗号啊,不然无法集群
namesrvAddr=192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876
storePathRootDir=/tmp/rmqstore/node01
storePathCommitLog=/tmp/rmqstore/node01/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
# n0 n1 n2 分别是broker1,broker2,broker3 的 DledgerSelfId
# DlegerPeers=n0-ip:40911...
dLegerPeers=n0-192.168.1.1:40911;n1-192.168.1.2:40912;n2-192.168.1.3:40913
## must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16
修改runbroker.sh
修改runserver.sh
# 后台启动nameserver
nohup sh bin/mqnamesrv > nohubNameserv &
# 后台启动broker
nohup sh bin/mqbroker > nohubBroker -c conf/dledger/broker-n1.conf &
4.机器2操作
# 将文件移动到/opt/mq目录下
# 解压文件
unzip rocketmq-5.0.0.zip
mv rocketmq-all-5.0.0-bin-release /opt/mq/rocketmq-5.0.0
cd /opt/mq/rocketmq-5.0.0
# 修改配置文件
vim conf/dledger/broker-n2.conf
# 集群名称
brokerClusterName = RaftCluster
# broker组名,同一个RaftClusterGroup内,brokerName名要一样
brokerName=RaftNode00
# 监听端口
listenPort=30931
# 设置NaemServer地址和端口
# 是用分号隔开,不是逗号啊,不然无法集群
namesrvAddr=192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876
storePathRootDir=/tmp/rmqstore/node02
storePathCommitLog=/tmp/rmqstore/node02/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
# n0 n1 n2 分别是broker1,broker2,broker3 的 DledgerSelfId
# DlegerPeers=n0-ip:40911...
dLegerPeers=n0-192.168.1.1:40911;n1-192.168.1.2:40912;n2-192.168.1.3:40913
## must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16
修改runbroker.sh
修改runserver.sh
# 后台启动nameserver
nohup sh bin/mqnamesrv > nohubNameserv &
# 后台启动broker
nohup sh bin/mqbroker > nohubBroker -c conf/dledger/broker-n2.conf &
# 查看集群状态
sh bin/mqadmin clusterList -n 127.0.0.1:9876
5.启动rocketmq-console(机器1)
# 进入target目录
unzip rocketmq-console.zip
cd /opt/mq/rocketmq-console/target
java -jar rocketmq-console-ng-4.0.0.jar
6.集成到微服务
1.导入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2.配置文件
rocketmq:
name-server: 192.168.1.1:9876
# 生产者
producer:
group: laokou-producer-group
3.代码编写
/**
* @author Kou Shenhai
*/
@RestController
@RequestMapping("/api")
@Slf4j
@RequiredArgsConstructor
@Api(value = "rocketmq消息API",protocols = "http",tags = "rocketmq消息API")
public class RocketmqSender {
private final RocketMQTemplate rocketMQTemplate;
@PostMapping("/send/{topic}")
@ApiOperation("rocketmq消息>同步发送")
public void sendMessage(@PathVariable("topic") String topic, @RequestBody RocketmqDTO dto) {
rocketMQTemplate.syncSend(topic,dto.getData(),3000);
}
@PostMapping("/sendAsync/{topic}")
@ApiOperation("rocketmq消息>异步发送")
public void sendAsyncMessage(@PathVariable("topic") String topic, @RequestBody RocketmqDTO dto) {
rocketMQTemplate.asyncSend(topic, dto.getData(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功");
}
@Override
public void onException(Throwable throwable) {
log.error("报错信息:{}",throwable.getMessage());
}
});
}
@PostMapping("/sendOne/{topic}")
@ApiOperation("rocketmq消息>单向发送")
public void sendOneMessage(@PathVariable("topic") String topic, @RequestBody RocketmqDTO dto) {
/**
* 单向发送,只负责发送消息,不会触发回调函数,即发送消息请求不等待
* 适用于耗时短,但对可靠性不高的场景,如日志收集
*/
rocketMQTemplate.sendOneWay(topic,dto.getData());
}
}
@Data
class RocketmqDTO implements Serializable {
private String data;
}
4.发送消息
rocketmqApiFeignClient.sendOneMessage(RocketmqConstant.LAOKOU_OPERATE_LOG_TOPIC, rocketmqDTO);
5.消费消息
@RocketMQMessageListener(consumerGroup = "laokou-consumer-group", topic = RocketmqConstant.LAOKOU_OPERATE_LOG_TOPIC)
@Component
@RequiredArgsConstructor
public class OperateLogConsumer implements RocketMQListener<String> {
private final LogApiFeignClient logApiFeignClient;
@Override
public void onMessage(String message) {
final OperateLogDTO operateLogDTO = JacksonUtil.toBean(message, OperateLogDTO.class);
logApiFeignClient.insertOperateLog(operateLogDTO);
}
}
后端地址:KCloud-Platform-Official
前端地址:KCloud-Antdv-Official
大功告成