【RocketMQ】004-Spring Boot 集成 RocketMQ
文章目录
- 【RocketMQ】004-Spring Boot 集成 RocketMQ
- 一、基本使用
- 1、创建 Spring Boot 项目,并引入 `RocketMQ` 依赖
- 2、`application.yml` 配置
- 3、消息生产者
- 4、消息消费者
- 5、消息调用接口
- 6、启动 `RocketMQ`
- 7、启动项目,并访问
- 8、启动 `rocketmq-dashboard` ,并查看
- 主题
- 消费者
- 消息
- 二、常用消息种类
- 1、常用消息种类
- 2、普通消息
- 代码示例
- 代码解释
- 单向消息
- 同步消息
- 异步消息
- 3、延时消息
- 4、顺序消息
- 5、事务消息
- 6、批量消息
- 三、参考文章
一、基本使用
1、创建 Spring Boot 项目,并引入 RocketMQ
依赖
<!--Rocket MQ-->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2、application.yml
配置
rocketmq:
# NameServer地址
name-server: localhost:9876
# 生产者
producer:
# 发送同一类消息的设置为同一个group,保证唯一
group: springboot_producer_group
# 发送消息超时时间,默认3000
sendMessageTimeout: 10000
# 发送消息失败重试次数,默认2
retryTimesWhenSendFailed: 2
# 异步消息重试此处,默认2
retryTimesWhenSendAsyncFailed: 2
# 消息最大长度,默认1024 * 1024 * 4(默认4M)
maxMessageSize: 4096
# 压缩消息阈值,默认4k(1024 * 4)
compressMessageBodyThreshold: 4096
# 是否在内部发送失败时重试另一个broker,默认false
retryNextServer: false
# 消费者
consumer:
group: springboot_consumer_group
# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值
pull-batch-size: 10
3、消息生产者
package com.example.mq.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* mq 生产者
*
* @author zibo
* @date 2023/5/17 15:48
* @slogan 慢慢学,不要停。
*/
@Slf4j
@Service
public class MQProducerService {
// 直接注入使用,用于发送消息到 broker 服务器
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public void send(String msg) {
// 写法一
rocketMQTemplate.convertAndSend("springboot_topic:test", msg);
// 写法二
// rocketMQTemplate.send("springboot_topic:test", MessageBuilder.withPayload(msg).build());
}
}
4、消息消费者
package com.example.mq.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* mq 消费者
*
* @author zibo
* @date 2023/5/17 15:48
* @slogan 慢慢学,不要停。
*/
@Slf4j
@Service
@RocketMQMessageListener(topic = "springboot_topic", selectorExpression = "test", consumerGroup = "springboot_consumer_group")
public class MQConsumerService implements RocketMQListener<String> {
// 监听到消息就会执行此方法
@Override
public void onMessage(String msg) {
log.info("监听到消息:msg={}", msg);
}
}
5、消息调用接口
package com.example.mq.controller;
import com.example.mq.service.MQProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* mq 接口
*
* @author zibo
* @date 2023/5/17 15:48
* @slogan 慢慢学,不要停。
*/
@RestController
@RequestMapping("/rocketmq")
public class RocketMQController {
@Autowired
private MQProducerService mqProducerService;
@GetMapping("/send")
public void send() {
mqProducerService.send("测试消息");
}
}
6、启动 RocketMQ
- 启动 NameServer :双击
mqnamesrv.cmd
启动即可! - 启动 Broker :双击
mqbroker.cmd
启动即可!
7、启动项目,并访问
http://localhost:8080/rocketmq/send
控制台日志
2023-05-17 16:04:54.126 INFO 19772 --- [onsumer_group_1] c.example.mq.service.MQConsumerService : 监听到消息:msg=测试消息
8、启动 rocketmq-dashboard
,并查看
主题
消费者
消息
二、常用消息种类
1、常用消息种类
- 普通消息(Normal Message):普通消息是最常用的消息类型,一旦发送就会立即被投递给消费者进行消费。
- 延时消息(Delay Message):延时消息是指发送后延迟一段时间后再投递给消费者。你可以指定延时级别,例如延迟 10 秒、1 分钟、1 小时等。
- 顺序消息(Orderly Message):顺序消息是指保证消息按照发送的顺序被消费的消息类型。你可以为同一消息队列中的消息指定相同的消息队列选择器(Message Queue Selector),从而保证消息按照发送顺序被消费。
- 事务消息(Transaction Message):事务消息是指将消息发送与本地事务操作相结合,可以保证消息和本地事务的最终一致性。发送事务消息时,你需要实现事务监听器(Transaction Listener)来执行本地事务和提交事务状态。
- 批量消息(Batch Message):批量消息是一次发送多条消息的方式,可以减少网络开销和提高消息吞吐量。你可以将多个消息封装成一个消息列表,然后使用批量发送方法一次性发送。
2、普通消息
代码示例
rocketMQTemplate.convertAndSend("springboot_topic:test", msg);
代码解释
rocketMQTemplate
:是 RocketMQ 提供的用于发送消息的模板类,需要在 Spring Boot 中配置和注入。- convertAndSend方法:是 RocketMQTemplate 类的方法,用于将消息对象转换并发送消息。它接受两个参数:
"springboot_topic:test"
:是要发送消息的目标主题和标签。在这个示例中,主题是"springboot_topic"
,标签是"test"
。你可以根据实际情况修改主题和标签。msg
:是要发送的消息内容。它可以是字符串、对象或其他数据类型。RocketMQTemplate
会根据消息内容的类型进行转换。
单向消息
/**
* 发送单向消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public void sendOneWay(String msg) {
rocketMQTemplate.sendOneWay("springboot_topic:test", msg);
}
同步消息
/**
* 发送同步消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public SendResult sendSync(String msg) {
SendResult result = rocketMQTemplate.syncSend("springboot_topic:test", msg);
log.info("发送结果:{}", result);
return result;
}
异步消息
/**
* 发送异步消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public void sendAsync(String msg) {
rocketMQTemplate.asyncSend("springboot_topic:test", msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功");
}
@Override
public void onException(Throwable throwable) {
log.info("发送失败");
}
});
}
3、延时消息
/**
* 发送延时消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public void sendDelay(String msg) {
rocketMQTemplate.syncSendDelayTimeSeconds("springboot_topic:test", msg, 5);
}
4、顺序消息
/**
* 发送顺序消息
*
* @param msg 消息可以是任何对象,如:String、Map、对象等
*/
public void sendOrderly(String msg) {
// 第一条
rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "1");
// 第二条
rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "2");
// 第三条
rocketMQTemplate.syncSendOrderly("springboot_topic:test", msg, "3");
}
5、事务消息
略
6、批量消息
/**
* 发送批量消息
*
* @param msgList 消息列表
*/
public void sendBatch(List<String> msgList) {
List<Message<String>> rocketMQMessages = new ArrayList<>();
for (String msg : msgList) {
rocketMQMessages.add(MessageBuilder.withPayload(msg).build());
}
rocketMQTemplate.syncSend("springboot_topic:test", rocketMQMessages);
}
三、参考文章
SpringBoot整合RocketMQ,老鸟们都是这么玩的!
https://juejin.cn/post/7220075270664405052