Spring Cloud 微服务系列文章,点击上方合集↑
1. 简介
RocketMQ是一款开源的分布式消息中间件,它具有高可靠性、高性能和可伸缩性,被广泛用于构建分布式系统中的可靠消息传递服务。
官网地址: https://rocketmq.apache.org/
2. 工作流程
- 启动 NameServer
- 启动 Broker
- 创建 Topic
- 生产者发送消息
- 消费者接受消息
3. 下载安装
3.1 直接下载(推荐)
官网下载(4.9.4版本):https://github.com/apache/rocketmq/releases
官网下载很慢,网盘下载(推荐):「rocketmq-4.9.4.zip」来自UC网盘分享
https://drive.uc.cn/s/c53ad060dbc14
3.2 编译安装
git clone https://gitee.com/apache/rocketmq.git
cd rocketmq
git checkout rocketmq-all-4.9.4
mvn -Prelease-all -DskipTests clean install -U
打包后的文件在distribution/target/rocketmq-4.9.4
目录下。
4. 启动服务
进入到bin目录。
4.1 启动NameServer
NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
# windows 启动
mqnamesrv
# mac/linux 启动
sh mqnamesrv
# 后台启动
nohup sh mqnamesrv &
# 查看日志
tail -f logs/rocketmqlogs/namesrv.log
# 关闭
sh mqshutdown namesrv
4.2 启动Broker
Broker主要负责消息的存储、投递和查询以及服务高可用保证。
# windows 启动
mqbroker -n localhost:9876
# mac/linux 启动
sh mqbroker -n localhost:9876
# 后台启动
nohup sh mqbroker -n localhost:9876 &
# 查看日志
tail -f logs/rocketmqlogs/Broker.log
# 关闭
sh mqshutdown broker
当JDK安装位置路径有空格(比如在Program Files目录下),启动Broker可能会出现如下问题:
找不到或无法加载主类 Files\Java\jdk1.8
解决办法就是把JDK挪一下位置,重新配置环境变量。
5. 运行监控台
rocketmq-dashboard是可视化管理控制台,可以在页面上查看RocketMQ的运行情况,可以对主题、生产者、消费者等进行可视化管理。
官网地址:https://github.com/apache/rocketmq-dashboard
5.1 直接下载(推荐)
这里把打包好的jar放在了云盘上,方便下载直接运行。
网盘下载jar包:「rocketmq-dashboard-1.0.1.jar」来自UC网盘分享
https://drive.uc.cn/s/f27758e0a0744
# 运行jar包
java -jar rocketmq-dashboard-1.0.1.jar
访问dashboard地址:http://localhost:8200/#/
- 源码默认的端口是8080,我这里是把源码端口改为了8200后生成的jar包。
5.2 编译安装
通过源码编译生成jar包的方式如下:
git clone https://github.com/apache/rocketmq-dashboard.git
cd rocketmq-dashboard
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
默认端口8080,修改端口的文件是\rocketmq-dashboard\src\main\resources\application.yml
。
这里改成了8200。
6. 生产者 rocketmq-producer
6.1 pom.xml
添加rocketmq-spring-boot-starter
包依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
6.2 application.properties
配置rocketmq的服务地址和生产者的producer-group
。
# rocketmq 相关
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=producer-group
6.3 ProducerController
一个get
请求接口,路径是producer/send/{msg}
,通过rocketMQTemplate.send("test-topic", message)
发送数据,指定发送的主题为test-topic
。
@RestController
@RequestMapping("producer")
public class ProducerController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@GetMapping("send/{msg}")
public String send(@PathVariable String msg) {
MessageBuilder<String> builder = MessageBuilder.withPayload(msg);
Message<String> message = builder.build();
rocketMQTemplate.send("test-topic", message);
System.out.println("生产者发送:" + msg);
return msg;
}
}
7. 消费者 rocketmq-consumer
7.1 pom.xml
添加rocketmq-spring-boot-starter
包依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
7.2 application.properties
配置rocketmq的服务地址和消费者的consumer-group
。
# rocketmq 相关
rocketmq.name-server=127.0.0.1:9876
rocketmq.consumer.group=consumer-group
7.3 TestMQListener
定义消息接收器,用于接收生产者产生的消息,并打印出来。
@Component
@RocketMQMessageListener(consumerGroup = "consumer-group", topic = "test-topic")
public class TestMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消费者收到:" + s);
}
}
- 注意这里的主题
test-topic
跟生产者的一致。
8. 测试
访问接口地址:
http://localhost:8111/producer/send/hello
# 生产者打印
生产者发送:hello
生产者发送:hello
# 消费者打印
消费者收到:hello
消费者收到:hello
9. 总结
本文介绍了使用Spring Boot集成RocketMQ的方法,创建了一个生产者服务rocketmq-producer
和一个消费者服务rocketmq-consumer
。生产者可以将数据发送到RocketMQ服务器中,消费者可以订阅主题并接收、处理数据。这种集成方式方便快捷,可以快速实现RocketMQ的生产和消费功能。
RocketMQ可以用在电商订单系统、日志收集系统、实时数据处理系统等场景。
Spring Cloud 微服务系列 完整的代码在仓库的sourcecode/spring-cloud-demo
目录下。
gitee(推荐):https://gitee.com/cunzaizhe/xiaohuge-blog
github:https://github.com/tigerleeli/xiaohuge-blog
关注微信公众号:“小虎哥的技术博客”,让我们一起成为更优秀的程序员❤️!