在现代分布式架构的开发中,消息队列扮演着至关重要的角色,用于解耦系统组件、保障可靠性以及实现异步通信。RocketMQ作为一款开源的分布式消息中间件,凭借其高性能、高可用性和良好的扩展性,成为了众多企业在构建高可靠性、高吞吐量应用系统时的首选。
对于Spring Cloud Alibaba的用户来说,集成RocketMQ并进行消息的发送与消费是常见的任务。本篇博客将深入介绍RocketMQ的基础使用方法,带大家一步步学习如何在Spring Cloud Alibaba中发送和消费消息。
在开始之前,确保已经完成了以下准备工作:
- 安装RocketMQ:确保已经在系统中成功安装了RocketMQ,并启动了相关服务。教程可以查看我上一篇博客
【Spring Cloud Alibaba】Linux安装RocketMQ以及RocketMQ Dashboard可视化工具 - JDK:安装了JDK 1.8及以上版本,以便于运行Java应用程序。
文章目录
- 🎺 第一步,搭建rocketmq项目环境
- 🎺 第二步,生产者代码
- 🎺普通消息
- 🎺普通消息发送
- 🎺同步发送
- 🎺异步发送
- 🎺单向模式发送
- 🎺 普通消息接收
- 🎺顺序消息
- 🎺顺序消息发送
- 🎺顺序消息接收
- 🎺 延迟消息
- 🎺 延迟消息发送
- 🎺延时消息接收
- 🎺批量消息
- 🎺批量消息发送
- 🎺批量消息接收
- 🎺 事务消息
- 🎺 事务消息发送
- 在实际中遇到的问题
rocketmq官网地址:
https://rocketmq.apache.org/zh/docs/4.x/
这里我们讲的是4.x的版本
🎺 第一步,搭建rocketmq项目环境
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
我们使用的是rocketmq-spring-boot-starter2.2.1,其中的rocketmq版本为4.9.1
🎺 第二步,生产者代码
🎺普通消息
🎺普通消息发送
🎺同步发送
同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
/**
* 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#31-%E5%90%8C%E6%AD%A5%E5%8F%91%E9%80%81
*
* @return
*/
@GetMapping("/syncSend")
public SendResult syncSend(String message) {
Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));
return rocketMQTemplate.syncSend("my-topic:*", stringMessage);
}
🎺异步发送
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
/**
* 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#32-%E5%BC%82%E6%AD%A5%E5%8F%91%E9%80%81
*
* @return
*/
@GetMapping("/asyncSend")
public String asyncSend(String message) {
Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));
rocketMQTemplate.asyncSend("my-topic:*", stringMessage, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步发送服务器返回信息成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("异步发送服务器返回信息失败");
}
});
return "异步发送成功";
}
🎺单向模式发送
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
/**
* 普通消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/02message1#33-%E5%8D%95%E5%90%91%E6%A8%A1%E5%BC%8F%E5%8F%91%E9%80%81
*
* @return
*/
@GetMapping("/sendOneway")
public String sendOneway(String message) {
Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));
rocketMQTemplate.sendOneWay("my-topic:*", stringMessage);
return "发送成功";
}
🎺 普通消息接收
MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
Push
是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull
是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Apache RocketMQ既提供了Push模式也提供了Pull模式。
该博客中所有消费者都使用默认的push模式
@Component
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-group", consumeMode = ConsumeMode.CONCURRENTLY)
public class NormalRocketMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message:"+message);
}
}
messageModel = MessageModel.BROADCASTING
即广播消息,每个消费者都会去消费
但是即使都消费了,但是trackType都会显示NOT_CONSUME_YET
🎺顺序消息
🎺顺序消息发送
顺序消息是一种对消息发送和消费顺序有严格要求的消息。
对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费
/**
* 顺序消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/03message2
*
* @return
*/
@GetMapping("/syncSendOrderly")
public SendResult syncSendOrderly(String message) {
String[] split = message.split(",");
List<Message<String>> list = new ArrayList<>();
for (String mes : split) {
list.add(MessageBuilder.createMessage(mes, new MessageHeaders(null)));
}
return rocketMQTemplate.syncSendOrderly("order-topic:*", list, String.valueOf(System.currentTimeMillis()));
}
🎺顺序消息接收
消费者这里要设置consumeMode = ConsumeMode.ORDERLY
才能实现顺序接收
@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-topic", consumeMode = ConsumeMode.ORDERLY)
public class OrderRocketMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message:"+message);
}
}
🎺 延迟消息
🎺 延迟消息发送
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
/**
* 延迟消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/04message3
*
* @return
*/
@GetMapping("/send")
public SendResult send(String message) {
Message<String> stringMessage = MessageBuilder.createMessage(message, new MessageHeaders(null));
return rocketMQTemplate.syncSend("delay-topic:*", stringMessage,1000,2);
}
🎺延时消息接收
@Component
@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "delay-topic")
public class DelayRocketMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message:"+message);
}
}
🎺批量消息
🎺批量消息发送
在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。
/**
* 批量消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/05message4
*
* @return
*/
@GetMapping("/send")
public SendResult send(String message) {
String[] split = message.split(",");
List<Message<String>> list = new ArrayList<>();
for (String mes : split) {
list.add(MessageBuilder.createMessage(mes, new MessageHeaders(null)));
}
return rocketMQTemplate.syncSend("list-topic:*", list);
}
🎺批量消息接收
@Component
@RocketMQMessageListener(topic = "list-topic", consumerGroup = "list-topic")
public class ListRocketMQListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Received message:"+message);
}
}
🎺 事务消息
🎺 事务消息发送
/**
* 事务消息发送 https://rocketmq.apache.org/zh/docs/4.x/producer/06message5
*
* @return
*/
@GetMapping("/send")
public SendResult send(String message) {
Message<String> message1 = MessageBuilder.createMessage(message, new MessageHeaders(null));
return rocketMQTemplate.sendMessageInTransaction("transA-topic:*", message1, null);
}
rocketmq-springboot 提供了一个注解@RocketMQTransactionListener
使用方法:实现RocketMQLocalTransactionListener
接口,并且类上加注解@RocketMQTransactionListener
@RocketMQTransactionListener
public class TopicATransactionalMessageService implements RocketMQLocalTransactionListener {
private Map<String, RocketMQTransactionStrategy> strategyMap;
//策略模式
@Autowired
public TopicATransactionalMessageService(TopicAStrategy topicAStrategy,TopicBStrategy topicBStrategy) {
strategyMap = new HashMap<>();
strategyMap.put("transA-topic", topicAStrategy);
strategyMap.put("TopicB", topicBStrategy);
// ...
}
/**
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String topic = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TOPIC).toString();
RocketMQTransactionStrategy strategy = strategyMap.get(topic);
if (strategy == null) {
// 如果没有对应的策略,可以抛出异常或者返回一个默认的事务状态
}
return strategy.executeLocalTransaction(msg, arg);
}
/**
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String topic = msg.getHeaders().get(RocketMQHeaders.PREFIX+RocketMQHeaders.TOPIC).toString();
RocketMQTransactionStrategy strategy = strategyMap.get(topic);
if (strategy == null) {
// 如果没有对应的策略,可以抛出异常或者返回一个默认的事务状态
}
return strategy.checkLocalTransaction(msg);
}
}
这里可以使用策略模式来实现对每个topic的自定义策略
每个topic处理类需要实现RocketMQTransactionStrategy
接口
@Service
public class TopicAStrategy implements RocketMQTransactionStrategy {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TOPIC);
Object payload = msg.getPayload();
String mes = new String((byte[]) payload);
if (mes.equals("1")) {
return RocketMQLocalTransactionState.COMMIT;
} else if (mes.equals("2")) {
return RocketMQLocalTransactionState.ROLLBACK;
} else {
return RocketMQLocalTransactionState.UNKNOWN;
}
// ...
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
return RocketMQLocalTransactionState.COMMIT;
// ...
}
}
这样发送不同的topic都会有不同的处理策略
更多消息可查看官网
https://rocketmq.apache.org/zh/docs/4.x/introduction/02quickstart/
在实际中遇到的问题
-
在
conf/broker.conf
中配置autoCreateTopicEnable=true
,如果没有对应的topic,则会在生产者有消息发送到mq的时候自动创建对应的topic
如果不配置该属性,且开始没有topic的时候生产者发送消息到topic会报错org.apache.rocketmq.client.exception.MQBrokerException: CODE: 17 DESC: topic[delay-topic] not exist, apply first please!
-
不管有没有配置autoCreateTopicEnable=true,都会出现以下的情况:
添加了消费者注解,如@RocketMQMessageListener(topic = "delay-topic", consumerGroup = "delay-topic")
,程序会自动创建一个名称为%RETRY%delay-topic的topic
如果没有对应的topic,则会一直报错org.apache.rocketmq.client.exception.MQClientException: CODE: 17 DESC: No topic route info in name server for the topic: delay-topic
解决方法:
在mq中手动添加对应的topic即可 -
如果你修改了autoCreateTopicEnable=true没有效果,删除rocketmq存储数据的文件夹store即可,默认存放位置/root/store
博客中涉及到的仓库地址:
https://gitee.com/WangFuGui-Ma/spring-cloud-alibaba