文章目录
- 1.简介、安装部署
- 2.Springboot集成RocketMQ
- 2.1.添加maven依赖:
- 2.2.RocketMQ配置
- 生产者配置
- 消费者配置
- 2.3.生产者(发送消息)
- 2.4.消费者(接收消息)
- 3.实战结果
- 3.1.消费者服务
- 3.2.生产者服务
- 3.3.运行日志
- 生产日志
- 消费日志
1.简介、安装部署
此部分不做赘述,可从官网查阅
说明文档 https://rocketmq.apache.org/zh/docs/4.x/
下载地址 https://rocketmq.apache.org/zh/download (建议下载二进制包)
安装和启动教程 https://rocketmq.apache.org/zh/docs/4.x/introduction/02quickstart
其实官网讲解已经很详细了,所以这里只是详细描述一下启动和停止命令
### 启动namesrv, <mqnamesrv.path> 表示安装好的二进制包中bin目录下的 mqnamesrv 文件地址,使用命令时替换为实际地址即可
$ nohup sh <mqnamesrv.path> &
### 先启动broker <mqbroker.path> 表示安装好的二进制包中bin目录下的 mqbroker 文件地址
### -n NameServer地址
$ nohup sh <mqbroker.path> -n localhost:9876 &
### 关闭broker <mqbroker.path> 表示安装好的二进制包中bin目录下的 mqshutdown 文件地址
sh <mqshutdown.path> broker
### 关闭namesrv
sh <mqshutdown.path> namesrv
2.Springboot集成RocketMQ
点击前往 官方案例
2.1.添加maven依赖:
<!--在pom.xml中添加依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${RELEASE.VERSION}</version>
</dependency>
2.2.RocketMQ配置
生产者配置
# rocketmq配置
rocketmq:
# NameServer 服务器地址
name-server: localhost:9876
# 见源码 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Producer
producer:
# 发送同一类消息的生产者设置为同一个group,保证唯一
group: my_producer_group
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
# 发送消息超时时间,默认3000
sendMessageTimeout: 3000
# 消息最大长度,默认1024 * 1024 * 4(默认4M)
maxMessageSize: 4096
# 压缩消息阈值,默认4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在内部发送失败时重试另一个broker,默认false
retryNextServer: false
# access-key
#accessKey: xxx
# secret-key
#secretKey: xxx
# 是否启用消息跟踪,默认false
enableMsgTrace: false
# 消息跟踪主题的名称值。如果不进行配置,可以使用默认的跟踪主题名称
customizedTraceTopic: RMQ_SYS_TRACE_TOPIC
消费者配置
# rocketmq配置
rocketmq:
# NameServer 服务器地址
name-server: localhost:9876
#Push模式, 对应name为rocketMQTemplate的RocketMQTemplate
# 见源码 org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.Consumer
consumer:
# 配置指定group是否启动监听器 group.topic = false
listeners:
# key:group名称。 value:{key: topic名称: value: true/false}
my_consumer_group:
mytopic: true
2.3.生产者(发送消息)
通过 RocketMQTemplate
模版类,我们进行了二次封装,构建了一个统一的消息发送工具类 MessageHelper
,目的在于核心代码抽离实现解耦合。哪怕后续如果更换了新的消息中间件,只需统一更改该工具类的结构即可。
重点方法 selectSendWay
根据处理好的 destination 和消息方式方式 way 选择 RocketMQTemplate
中对应的方法版本。
/**
* @Name: MessageHelper
* @Description: 消息发送工具
* @Author: ahao
* @Date: 2024/4/12 7:22 PM
*/
@Component
public class MessageHelper {
/**
* 默认标签
*/
public static final String DEFAULT_TAG = "none";
public static final int SYNCHRONOUSLY = 1;
public static final int ASYNCHRONOUSLY = 2;
public static final int ORDERLY_SYNCHRONOUSLY = 3;
public static final int ORDERLY_ASYNCHRONOUSLY = 4;
/**
* 导入RocketMQ模版工具
*/
@Resource
private RocketMQTemplate rocketMQTemplate;
/**
* 同步发送消息,会阻塞等待消息发送结果
*
* @param destination 消息发送目的地(主题:标签)
* @param msg 消息内容
*/
public MsgSendResult send(@NotNull String destination, Object msg) {
return selectSendWay(destination, msg, null, null, SYNCHRONOUSLY);
}
/**
* 同步发送消息,会阻塞等待消息发送结果
*
* @param topic 主题
* @param tag 标签
* @param msg 消息内容
* @return
*/
public MsgSendResult send(@NotNull String topic, @Nullable String tag, Object msg) {
return selectSendWay(topic, tag, msg, null, null, SYNCHRONOUSLY);
}
/**
* 异步发送消息
*
* @param destination 消息发送目的地
* @param msg 消息内容
* @param callback 消息发送回调通知 {@link SendCallback}
*/
public void asyncSend(@NotNull String destination, Object msg, SendCallback callback) {
selectSendWay(destination, msg, callback, null, ASYNCHRONOUSLY);
}
/**
* 异步发送消息
*
* @param topic 主题
* @param tag 标签
* @param msg 消息内容
* @param callback 消息发送回调通知 {@link SendCallback}
*/
public void asyncSend(@NotNull String topic, @Nullable String tag, Object msg, SendCallback callback) {
selectSendWay(topic, tag, msg, callback, null, ASYNCHRONOUSLY);
}
/**
* 同步发送顺序消息,会阻塞等待消息发送结果
*
* @param destination 消息发送目的地(主题:标签)
* @param msg 消息内容
* @param key 分区键关键字,相同key的消息发送到同一个队列中
*/
public MsgSendResult sendOrderly(@NotNull String destination, Object msg, String key) {
return selectSendWay(destination, msg, null, key, ORDERLY_SYNCHRONOUSLY);
}
/**
* 异步发送顺序消息,会阻塞等待消息发送结果
*
* @param destination 消息发送目的地(主题:标签)
* @param msg 消息内容
* @param key 分区键关键字,相同key的消息发送到同一个队列中
*/
public void asyncSendOrderly(@NotNull String destination, Object msg, String key, SendCallback callback) {
selectSendWay(destination, msg, callback, key, ORDERLY_ASYNCHRONOUSLY);
}
private MsgSendResult selectSendWay(String destination, Object msg, SendCallback callback, String key, int way) {
if (!destination.contains(":")) {
destination = destination + ":" + DEFAULT_TAG;
}
if (way == SYNCHRONOUSLY) {
return new MsgSendResult(rocketMQTemplate.syncSend(destination, msg));
} else if (way == ASYNCHRONOUSLY) {
rocketMQTemplate.asyncSend(destination, msg, callback);
} else if (way == ORDERLY_SYNCHRONOUSLY) {
return new MsgSendResult(rocketMQTemplate.syncSendOrderly(destination, msg, key));
} else if (way == ORDERLY_ASYNCHRONOUSLY) {
rocketMQTemplate.asyncSendOrderly(destination, msg, key, callback);
}
return null;
}
private MsgSendResult selectSendWay(String topic, String tag, Object msg, SendCallback callback, String key, int way) {
if (topic.contains(":")) {
// 这里其实也可以抛异常,提示topic不合法,包含':'
int i = topic.indexOf(':');
topic = topic.substring(0, i);
}
if (tag == null) {
tag = DEFAULT_TAG;
}
String destination = topic + ":" + tag;
return selectSendWay(destination, msg, callback, key, way);
}
}
消息发送结果
// 内部实现可以自定义,这里偷懒,作者只是继承RocketMQ的SendResult
public class MsgSendResult extends SendResult {
private SendResult result;
public MsgSendResult(SendResult result){}
}
2.4.消费者(接收消息)
/**
* @Name: ConsumerDemo
* @Description: 消费样例
* @Author: ahao
* @Date: 2024/4/12 7:35 PM
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = "my_topic",
consumerGroup = "my_consumer_group",
selectorType = SelectorType.TAG,
// * 表示匹配所有
selectorExpression = "*",
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING
)
public class ConsumerDemo implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("接收到的消息:{}",message);
}
}
@RocketMQMessageListener
相关属性解析:
- topic:主题
- consumerGroup:消费分组
- selectorType:筛选方式
- SelectorType.TAG:根据TAG选择。仅支持表达式格式如:“tag1 || tag2 || tag3”,如果表达式为null或者“*”标识订阅所有消息
- SelectorType.SQL92:根据SQL92表达式选择。支持类似SQL的关键词语法 AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS,NULL
- selectorExpression:筛选条件,与selectorType关联
- consumeMode:消费模式
- ConsumeMode.CONCURRENTLY:并行处理
- ConsumeMode.ORDERLY:顺序处理
- messageModel:消息模式
- MessageModel.CLUSTERING:集群模式即负载均衡模式,每一个消息只会被某一个消费者消费一次
- MessageModel.BROADCASTING:广播模式,每个消费者都会消费消息
- consumeThreadMax:最大线程数
- consumeTimeout:消息阻塞消费线程的最长时间(以分钟为单位)
- enableMsgTrace:是否启用消息轨迹
- customizedTraceTopic:自定义的消息轨迹主题
- nameServer:命名服务器地址
- accessKey:标识用户身份的字符串(access-key)
- secretKey:密钥(secret-key)
3.实战结果
3.1.消费者服务
启动类
@Slf4j
@SpringBootApplication
public class MyApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
TimeUnit.SECONDS.sleep(10000);
}
}
目录结构
3.2.生产者服务
启动类
@Slf4j
@SpringBootApplication
public class MyApplication implements ApplicationRunner {
@Resource
private MessageHelper messageHelper;
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
new Thread(() -> {
SimpleDateFormat format = new SimpleDateFormat("hh:MM:ss");
while (true){
String curDate = format.format(new Date());
try {
log.info("发送消息:{}",curDate);
messageHelper.send("my_topic", curDate);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
},"producer_test").start();
TimeUnit.SECONDS.sleep(10000);
}
}
目录结构