RocketMQ(二十四)整合SpringBoot
SpringBoot整合rabbitmq使用案例
- 一 SpringBoot整合RocketMQ实现消息发送和接收
- 消息生产者
- 1)添加依赖
- 2)配置文件
- 3)启动类
- 4)测试类
- 消息消费者
- 1)添加依赖
- 2)配置文件
- 3)启动类
- 4)测试类
- 5)RocketMQMessageListener参数
- 测试
- RocketMQ发送同步消息
- 同步消API介绍
- RocketMQ发送异步消息
- 异步消息API介绍
- RocketMQ发送单向消息
- RocketMQ消费者广播模式和负载均衡模式
- RocketMQ实现顺序消息
- RocketMQ实现延迟消息
一 SpringBoot整合RocketMQ实现消息发送和接收
消息生产者
1)添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
2)配置文件
server:
port: 8081
servlet:
context-path: /
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: producer-demo1
3)启动类
@SpringBootApplication
public class RocketmqProducerApplication {
public static void main(String[] args) {
ConfigurableApplicationContext run = SpringApplication.run(RocketmqProducerApplication.class, args);
}
}
4)测试类
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RocketmqProducerApplication.class})
public class ProducerTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void ConvertAndSendTest() {
rocketMQTemplate.convertAndSend("springboot-mq", "hello springboot rocketmq");
}
}
消息消费者
1)添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
2)配置文件
server:
port: 8084
servlet:
context-path: /
rocketmq:
name-server: 127.0.0.1:9876
consumer:
group: consumer-demo1
3)启动类
@SpringBootApplication
public class RocketmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqConsumerApplication.class, args);
}
}
4)测试类
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "${rocketmq.consumer.group}")
@Component
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到消息内容:"+message);
}
}
5)RocketMQMessageListener参数
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
/**
* 消费者分组
*
* @return
*/
String consumerGroup();
/**
* 主题
*/
String topic();
/**
* selectorType:消息选择器类型
* - SelectorType.TAG:默认值,根据TAG选择,仅支持表达式格式如:“tag1 || tag2 || tag3”,如果表达式为null或者“*”标识订阅所有消息
* - SelectorType.SQL92:根据SQL92表达式选择
*/
SelectorType selectorType() default SelectorType.TAG;
/**
* selectorType 对应的表达式
*/
String selectorExpression() default "*";
/**
* consumeMode:消费模式
* - ConsumeMode.CONCURRENTLY:默认值,并行处理
* - ConsumeMode.ORDERLY:按顺序处理
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* messageMode:消息模型
* - MessageModel.CLUSTERING:默认值,集群
* - MessageModel.BROADCASTING:广播
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
/**
* 最大线程数,默认值 64
*/
int consumeThreadMax() default 64;
/**
* 消费失败,最大重试次数
* <p>
* - 在并发模式中,-1表示16
* - 在有序模式中,-1表示整数最大值
*/
int maxReconsumeTimes() default -1;
/**
* 消息可能阻止使用线程的最长时间(分钟)
*/
long consumeTimeout() default 15L;
/**
* 发送回复消息超时
*/
int replyTimeout() default 3000;
/**
* 默认值 ${rocketmq.consumer.access-key:}
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;
/**
* 默认值 ${rocketmq.consumer.secret-key:}
*/
String secretKey() default SECRET_KEY_PLACEHOLDER;
/**
* 启用消息轨迹,默认值 false
*/
boolean enableMsgTrace() default false;
/**
* 自定义的消息轨迹主题,默认值${rocketmq.consumer.customized-trace-topic:}
* 没有配置此配置项则使用默认的主题
*/
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
/**
* 命名服务器地址,默认值${rocketmq.name-server:}
*/
String nameServer() default NAME_SERVER_PLACEHOLDER;
/**
* 默认值${rocketmq.access-channel:}
*/
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
测试
运行生产者中ConvertAndSendTest测试方式,观察消费者监听器日志
RocketMQ发送同步消息
发送同步消息是指producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结
果;
相对异步发送消息,同步会阻塞线程,性能相对差点,但是可靠性高,这种方式得到广泛使用,比如:
短信通知,邮件通知,站内重要信息通知等。
RocketMQTemplate 给我们提供了syncSend方法(有多个重载),来实现发送同步消息;
下面给一个实例:
/**
* 发送同步消息
*/
@Test
public void sendSyncMessageTest() {
for (int i = 0; i < 10; i++) {
SendResult sendResult = rocketMQTemplate.syncSend("springboot-mq", "rocketmq同步消息!" + i);
System.out.println(sendResult);
}
}
这里执行完发送同步消息返回执行结果 SendResult ;
同步消API介绍
//发送普通同步消息-Object
syncSend(String destination, Object payload)
//发送普通同步消息-Message
syncSend(String destination, Message<?> message)
//发送批量普通同步消息
syncSend(String destination, Collection<T> messages)
//发送普通同步消息-Object,并设置发送超时时间
syncSend(String destination, Object payload, long timeout)
//发送普通同步消息-Message,并设置发送超时时间
syncSend(String destination, Message<?> message, long timeout)
//发送批量普通同步消息,并设置发送超时时间
syncSend(String destination, Collection<T> messages, long timeout)
//发送普通同步延迟消息,并设置超时
syncSend(String destination, Message<?> message, long timeout, int delayLevel)
/**
* 批量消息
*/
@Test
void asyncSendBatch() {
Message<String> msg = MessageBuilder.withPayload("hello world test1").build();
List<Message> msgList = Arrays.asList(msg,msg,msg,msg,msg);
SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", msgList);
log.info("批量消息");
}
RocketMQ发送异步消息
发送异步消息是指producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API
后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行
。
相对发送同步消息,异步消息性能更高,可靠性略差。适合对响应时间要求高的业务场景。
RocketMQTemplate 给我们提供了asyncSend方法(有多个重载),来实现发送异步消息;
/**
* 异步消息-String
* 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包
* 关键实现异步发送回调接口(SendCallback)
* 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理
* 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞
*/
@Test
public void sendAsyncMessage() {
for (int i = 0; i < 10; i++) {
rocketMQTemplate.asyncSend("springboot-mq", "rocketmg异步消息!" + i,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功!");
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败!");
}
});
}
}
类似发送同步消息,多了一个SendCallback回调接口参数,实现onSuccess和onException方法,分别
表示异步发送成功和失败;
异步消息API介绍
//发送普通异步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback)
//发送普通异步消息-Message
asyncSend(String destination, Message<?> message, SendCallback sendCallback)
//发送普通异步消息-Object,并设置发送超时时间
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
//发送普通异步消息-Message,并设置发送超时时间
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout)
//发送普通异步延迟消息,并设置超时
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel)
RocketMQ发送单向消息
发送单向消息是 指producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果
这种方式主要用在不特别关心发送结果的场景,举例:日志发送;
RocketMQTemplate 给我们提供了sendOneWay方法(有多个重载),来实现发送单向消息;
下面给一个实例:
/**
* 单向消息
* 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
* 此方式发送消息的过程耗时非常短,一般在微秒级别
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
*/
@Test
public void sendOneWayMessage() {
for (int i = 0; i < 10; i++) {
rocketMQTemplate.sendOneWay("springboot-mq", "rocketmg单向消息!" + i);
}
}
RocketMQ消费者广播模式和负载均衡模式
如上图,假如我们有多个消费者,消息生产者发送的消息,是每一个消费者都消费一次呢?还是通过一
些机制,比如轮询机制,每个消息只被某一个消费者消费一次呢?
这里涉及到消费者的消费模式,一种是广播模式,还有一种是负载均衡模式;
广播模式
是每个消费者,都会消费消息;负载均衡模式
是每一个消息只会被某一个消费者消费一次;
我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮
箱,手机,站内提示;
我们可以通过 @RocketMQMessageListener
的 messageModel
属性值来设置,
MessageModel.BROADCASTING 是广播模式
, MessageModel.CLUSTERING 是默认集群负载均衡模式
;
我们先集群负载均衡测试,加上 messageModel=MessageModel.CLUSTERING
/**
* @ClassName: ConsumerService
* @Description: 消息消费者
* @Author wxl
* @Date 2024-04-22
* @Version 1.0.0
**/
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "${rocketmq.consumer.group}",messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到消息内容:"+message);
}
}
RocketMQ实现顺序消息
rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;
有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完成等操作,需要顺序执行;
RocketMQTemplate 给我们提供了SendOrderly方法(有多个重载),来实现发送顺序消息;包括以下:
syncSendOrderly
,发送同步顺序消息;
asyncSendOrderly
,发送异步顺序消息;
sendOneWayOrderly
,发送单向顺序消息;
一般我们用第一种发送同步顺序消息;
第三个参数hashKe
,方法点进去:
因为broker会管理多个消息队列,这个hashKey参数,主要用来计算选择队列的,一般可以把订单ID,
产品ID作为参数值;
发送到一个队列,这样方便搞顺序队列;
以及消费端接收的时候,默认是并发多线程去接收消息。RocketMQMessageListener有个属性
consumeMode,默认是ConsumeMode.CONCURRENTLY ,我们要改成ConsumeMode.ORDERLY,
单线程顺序接收消息;
下面给具体事例,模拟两个订单发送消息;
消息生产者端:
/**
* 发送同步顺序消息
*/
public void sendOrderlyMessage() {
// hashKey用来计算决定消息发送到哪个消息队列 一般是订单ID,产品ID等
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,创建", "98456231");
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,支付", "98456231");
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,完成", "98456231");
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,创建", "98456232");
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,支付", "98456232");
rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,完成", "98456232");
}
消费者端:
@RocketMQMessageListener(topic = "java1234-rocketmq-orderly",
consumerGroup = "${rocketmq.consumer.group}", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerService implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消费者:收到消息内容:" + s);
}
}
运行测试:
RocketMQ实现延迟消息
延迟消息
对于消息中间件来说,producer 将消息发送到mq的服务器上,但并不希望这条消息马上被消费,
而是推迟到当前时间节点之后的某个时间点,再将消息投递到 queue 中让 consumer 进行消费。
延迟消息的使用场景很多,一种比较常见的场景就是在电商系统中,订单创建后,会有一个等待用
户支付的时间窗口,一般为30分钟,30分钟后 customer 会收到这条订单消息,然后程序去订单表中检
查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式
去处理了。
Rocket的延迟消息
RocketMQ 支持定时的延迟消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s,
10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类
推。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
我们会发现,所有消息发送方法都有一个带int类型的delayLevel参数重载方法,这个就是设置延迟消息
级别的参数。
同时注意,每个带delayLevel参数的方法,也同时带有long类型的timeout参数,这个是设置消息发送超
时时间,默认是3秒,我们也可以自行设置;
同时还有 Message参数,如果发送这种类型的消息,可以携带具体的消息数据;
/**
* 发送延迟消息
*/
@Test
public void sendDelayMessage() {
rocketMQTemplate.syncSend("java1234-rocketmq", MessageBuilder.withPayload("rocketmq延迟1秒消息").build(), 3000, 1);
rocketMQTemplate.syncSend("java1234-rocketmq", MessageBuilder.withPayload("rocketmq延迟5秒消息").build(), 3000, 2);
rocketMQTemplate.syncSend("java1234-rocketmq", MessageBuilder.withPayload("rocketmq延迟10秒消息").build(), 3000, 3);
}
运行测试: