文章目录
- 添加 RocketMQ 依赖
- 消费者 Consumer
- YAML 配置
- 创建监听器
- 消息过滤
- Tag 过滤
- 生产者 Producer
- YAML 配置
- 发送同步消息
- 发送异步消息
- 发送单向消息
- 发送延迟消息
- 发送顺序消息
- 发送批量消息
- 发送集合消息
添加 RocketMQ 依赖
-
在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖:
-
在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖:
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency>
消费者 Consumer
YAML 配置
在 SpringBoot 项目的 yml 配置文件中添加以下配置:
rocketmq:
name-server: 192.168.68.121:9876 # rocketMq的nameServer地址
创建监听器
创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解:@Component
、@RocketMQMessageListener
,该类需要实现 RocketMQListener 接口,并使用泛型指定接收的消息类型:
@Component
@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
String msg = new String(message.getBody());
System.out.println("消息id:"+msgId+"消息内容:"+msg);
}
}
@RocketMQMessageListener
注解参数如下:
参数 | 描述 |
---|---|
topic | 消费者订阅的主题 |
consumerGroup | 消费者组 |
consumeMode | 消费模式:并发接收消息 | 有序接收消息【ConsumeMode.CONCURRENTLY or ConsumeMode.ORDERLY 】 |
messageModel | 消息模式:集群模式 | 广播模式【MessageModel.CLUSTERING or MessageModel.BROADCASTING 】 |
selectorType | 过滤消息的方式:Tag | SQL92【SelectorType.TAG or SelectorType.SQL92 】 |
selectorExpression | 过滤消息的表达式:Tag | SQL92【`tag1 |
maxReconsumeTimes | 消息消费失败后,可被重复投递的最大次数。消息重试只针对集群消费模式生效。 |
delayLevelWhenNextConsume | 并发模式的消息重试策略。-1,无需重试,直接放入死信队列(%DLQ%+消费组) |
消息过滤
Tag 过滤
消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。
编写并启动消费者项目订阅 tagTopic 主题:
@Component
@RocketMQMessageListener(topic = "tagTopic",
consumerGroup = "boot-mq-group-consumer",
selectorType = SelectorType.TAG,
selectorExpression = "java")
public class MQMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println(message);
}
}
编写生产者 Controller,使用 RocketMQTemplate 的 syncSend()
方法发送一个带 Tag 的同步消息:
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send/tag")
public String sendSyncMessage() {
SendResult result = rocketMQTemplate.syncSend("tagTopic:java", "这是一个带有 java tag 的消息");
return "发送状态:" + result.getSendStatus() + "<br>消息id:" + result.getMsgId();
}
}
运行项目,访问接口:http://localhost:8080/send/tag
查看 RocketMQ 控制台,可以看到消息带有 java tag:
查看消费者项目的 IDEA 控制台:
生产者 Producer
YAML 配置
在 SpringBoot 项目的 yml 配置文件中添加以下配置:
rocketmq:
name-server: 192.168.68.121:9876 # rocketMq的nameServer地址
producer:
group: boot-mq-group-producer # 生产者组名
注:生产者需要标注生产者组名,否则会报异常:
'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.
发送同步消息
编写 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息,并将消息发送的结果进行打印:
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send/sync/{msg}")
public String sendSyncMessage(@PathVariable String msg){
SendResult result = rocketMQTemplate.syncSend("syncTopic", msg);
return "发送状态:"+result.getSendStatus()+"<br>消息id:"+result.getMsgId();
}
}
运行项目,访问接口:http://localhost:8080/send/sync/同步消息
访问控制台,查看【syncTopic】主题,可以看到队列中存在一条消息:
发送异步消息
不同于同步消息,异步消息在发出后,并不会等待服务端返回响应,直接继续向下执行,发送方通过回调接口接收服务端响应,并处理响应结果。
编写 Controller,使用 RocketMQTemplate 的 asyncSend()
方法发送异步消息,并使用回调接口打印发送的结果:
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send/async/{msg}")
public String sendAsyncMessage(@PathVariable String msg) {
rocketMQTemplate.asyncSend("asyncTopic", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("异步消息发送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("异步消息发送失败");
}
});
System.out.println("异步消息已发送完成");
return "发送异步消息";
}
}
运行项目,访问接口:http://localhost:8080/send/async/异步消息,查看 IDEA 控制台:
访问控制台,查看【asyncTopic】主题,可以看到队列中存在一条消息:
发送单向消息
编写 Controller,使用 RocketMQTemplate 的 sendOneWay()
方法发送单向消息:
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send/oneWay/{msg}")
public String sendOneWayMessage(@PathVariable String msg) {
rocketMQTemplate.sendOneWay("oneWayTopic",msg);
return "单向消息发送成功";
}
}
运行项目,访问接口:http://localhost:8080/send/oneWay/单向消息
访问控制台,查看【oneWayTopic】主题,可以看到队列中存在一条消息:
发送延迟消息
编写并启动消费者项目订阅 delayTopic 主题:
@Component
@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
String msg = new String(message.getBody());
System.out.println("消息id:"+msgId+"\n消息内容:"+msg+"\n消息收到时间:"+new Date());
}
}
编写生产者 Controller,使用 RocketMQTemplate 的 syncSend()
方法发送同步消息:
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send/delay/{msg}")
public String sendDelayMessage(@PathVariable String msg) {
Message<String> message = MessageBuilder.withPayload(msg).build();
// 延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
SendResult result = rocketMQTemplate.syncSend("delayTopic", message, 2000, 3);
return "发送状态:" + result.getSendStatus() + "<br>消息id:" + result.getMsgId()+"<br>消息发送时间:"+new Date();
}
}
运行项目,访问接口:http://localhost:8080/send/delay/延迟消息
查看消费者项目的 IDEA 控制台,可以看到过去了10s,对应我们设置的延迟级别。
发送顺序消息
编写订单类,用于模拟【下订单->发短信->物流->签收】的顺序流程:
public class Order {
//订单号
private String orderId;
//订单名称
private String orderName;
//订单的流程顺序
private String seq;
}
编写并启动两个消费者项目订阅 orderlyTopic 主题,并将消费模式设置为顺序消费模式:
@Component
@RocketMQMessageListener(topic = "orderlyTopic",
consumerGroup="boot-mq-group-consumer",
consumeMode = ConsumeMode.ORDERLY)
public class MQMsgListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order message) {
System.out.println("消费者:"+message);
}
}
编写生产者 Controller,使用 RocketMQTemplate 的 syncSendOrderly()
方法发送同步顺序消息:
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send/orderly")
public String sendOrderlyMessage() {
List<Order> orders = Arrays.asList(
new Order(UUID.randomUUID().toString(), "下订单", "1"),
new Order(UUID.randomUUID().toString(), "发短信", "1"),
new Order(UUID.randomUUID().toString(), "物流", "1"),
new Order(UUID.randomUUID().toString(), "签收", "1"),
new Order(UUID.randomUUID().toString(), "下订单", "2"),
new Order(UUID.randomUUID().toString(), "发短信", "2"),
new Order(UUID.randomUUID().toString(), "物流", "2"),
new Order(UUID.randomUUID().toString(), "签收", "2")
);
//控制流程:下订单->发短信->物流->签收
//将 seq 作为 hashKey,这样 seq 相同的会放在同一个队列里面,顺序消费
orders.forEach(order -> {
rocketMQTemplate.syncSendOrderly("orderlyTopic",order,order.getSeq());
});
return "发送成功";
}
}
运行项目,访问接口:http:localhost:8080/send/orderly
查看 RocketMQ 控制台,可以看到我们的消息分别存储在两个队列中:
查看消费者项目的 IDEA 控制台,按照消息的顺序进行消费:
发送批量消息
编写并启动消费者项目订阅 batchOrderly 主题:
@Component
@RocketMQMessageListener(topic = "batchOrderly",
consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order message) {
System.out.println(Thread.currentThread().getName()+":"+message);
}
}
编写生产者 Controller,将消息打包成 Collection<Message> msgs
传入 syncSend()
方法中发送:
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send/batch")
public String sendOrderlyMessage() {
List<Message> messages = Arrays.asList(
MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),
MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build()
);
return rocketMQTemplate.syncSend("batchOrderly", messages).getSendStatus().toString();
}
}
运行项目,访问接口:http:localhost:8080/send/batch
查看 RocketMQ 控制台,可以看到队列中一次传入4条消息:
查看消费者项目的 IDEA 控制台,多个线程并发进行消费:
发送集合消息
编写并启动消费者项目订阅 listTopic 主题:
@Component
@RocketMQMessageListener(topic = "listTopic",
consumerGroup="boot-mq-group-consumer")
public class MQMsgListener implements RocketMQListener<List<Order>> {
@Override
public void onMessage(List<Order> orders) {
orders.forEach(o -> {
System.out.println(Thread.currentThread().getName()+":"+o);
});
}
}
编写生产者 Controller,将集合传入 syncSend()
方法中发送:
@RestController
public class ProducerController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send/list")
public String sendOrderlyMessage() {
List<Order> orders = Arrays.asList(
new Order(UUID.randomUUID().toString(), "下订单", "1"),
new Order(UUID.randomUUID().toString(), "下订单", "1"),
new Order(UUID.randomUUID().toString(), "下订单", "1"),
new Order(UUID.randomUUID().toString(), "下订单", "1")
);
rocketMQTemplate.syncSend("listTopic",orders);
return "发送成功";
}
}
运行项目,访问接口:http:localhost:8080/send/list
查看 RocketMQ 控制台,可以看到队列中一条消息:
查看消费者项目的 IDEA 控制台,进行消费: