1.资源下载
官网:下载 | RocketMQ
这里选择使用编译后可以直接用的
下载后解压:略
2.更改配置
主要是更改 conf/broker.conf 的配置,记得添加上下面这几行,否则消息发送失败
autoCreateTopicEnable=true # 支持自动创建topic
namesrvAddr=127.0.0.1:9876 # namespace地址
brokerIP1=192.168.0.179 # 这里是broker地址
不添加这几行可能出现以下问题
原文参考:RocketMQ连接报错RemotingConnectException: connect to <172.17.0.1:10:109011>解决_org.apache.rocketmq.remoting.exception.remotingcon_zhangzengxiu的博客-CSDN博客
3.配置环境变量
需要配置 java,maven,rocketmq的环境变量,可以参照这里配置
# java环境变量配置
export JAVA_HOME=/usr/lib/java-1.8/jdk1.8.0_321
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
# rocketMq环境变量配置
export ROCKETMQ_HOME=/data/rocketMq/rocketmq-all-5.1.0-bin-release
export PATH=${PATH}:${ROCKETMQ_HOME}/bin
MAVEN_HOME=/data/maven/apache-maven-3.8.4
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin
配置好之后刷新一下才会生效
source /etc/profile
5.启动Namesrv
一定要先启动namesrv,因为消息服务器是注册到命名服务器上的
nohup mqnamesrv &
6.启动Broker
启动消息服务器,同时指定刚刚修改过的conf文件,不然还是会读取原本默认的阿里外网IP,还是会报错。
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &
7.部署可视化控制台
1.下载文件
通过下面链接进入下载
GitHub - apache/rocketmq-dashboard: The state-of-the-art Dashboard of Apache RoccketMQ provides excellent monitoring capability. Various graphs and statistics of events, performance and system information of clients and application is evidently made available to the user.
2.构建与运行
先构建,后运行
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
运行后访问部署那台机器的ip加上端口号即可进入,端口号可以更改配置文件
8.测试消息生产与消费
1.生产消息
在rocketmq的bin目录下运行
tools.shorg.apache.rocketmq.example.quickstart.Producer
会发送1000条测试数据
2.消费消息
在rocketmq的bin目录下运行
tools.shorg.apache.rocketmq.example.quickstart.Consumer
9.整合springboot
1.引入依赖(注意版本)
<rocketmq-starter.version>2.2.3</rocketmq-starter.version>
<rocketmq-client.version>4.8.0</rocketmq-client.version>
<!-- rocketmq starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-starter.version}</version>
</dependency>
<!-- rocketmq client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq-client.version}</version>
</dependency>
2.新增配置
rocketmq:
name-server: 139.9.178.38:9876
producer:
group: springBootGroup # 生产者组别
send-message-timeout: 30000 # 消息发送的超时时间
retry-times-when-send-async-failed: 2 # 异步消息发送失败重试次数
max-message-size: 4194304 # 消息的最大长度
consumer:
group: MyConsumerGroup
3.生产者
package com.hhmt.delivery.mq.rocket;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author huachun
* @version 1.0
* @description: TODO
* @email huachun_w@163.com
* @date 2023-04-20 15:10
*/
@Component
public class MyProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
//发送普通消息的示例
public void sendMessage(String topic, String msg) {
this.rocketMQTemplate.convertAndSend(topic, msg);
}
}
4.消费者
package com.hhmt.delivery.mq.rocket;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author huachun
* @version 1.0
* @description: TODO
* MessageModel.BROADCASTING 广播模式,MessageModel.CLUSTERING集群模式
* @email huachun_w@163.com
* @date 2023-04-20 15:12
*/
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic", messageModel = MessageModel.BROADCASTING)
public class MyConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Receivedmessage:" + message);
}
}
5.测试
编写一个控制器发送消息并接受
package com.hhmt.delivery.mq.rocket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author huachun
* @version 1.0
* @description: TODO
* @email huachun_w@163.com
* @date 2023-04-19 18:33
*/
@RestController
@RequestMapping("/v1/mq")
public class MessageController {
@Autowired
private MyProducer myProducer;
@GetMapping("/send/{str}")
public void send(@PathVariable("str") String str) {
myProducer.sendMessage("TestTopic",str);
}
}
测试效果
说明:通过控制器调用消息生产服务向 TestTopic 这个主题发送消息 ,同时消费者监听这个主题并接受到消息
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic", messageModel = MessageModel.BROADCASTING)messageModel说明:
MessageModel.BROADCASTING 广播模式,所有的消费者都可以收到同样的消息,并且都可以消费 MessageModel.CLUSTERING 集群模式,同一个消费者消费一组消息,不能重复,即A消息被x消费者消费了,其他消费者就不能消费到这个消息了
原文参考:SpringBoot+RocketMq入门_舌尖上的蛋炒饭的博客-CSDN博客