目录
一、MQ简介
(一)什么是MQ
(二)MQ的应用场景
1、异步解耦
2、流量削峰
(三)常见的MQ产品
二、RocketMQ入门
(一)RocketMQ安装部署
1、环境要求
2、下载RocketMQ
3、安装RocketMQ
4、启动RocketMQ
5、测试RocketMQ
6、关闭RocketMQ
(二)RocketMQ控制台安装与启动
下载并解压
三、springcloud集成rocketmq
(一)产品微服务-发送消息
1、pom添加依赖
2、application.yml配置
3、controller发送消息
(二)用户微服务-订阅消息
1、pom添加依赖
2、application.yml配置
3、消息接收服务
4、测试
(三)控制台
四、不同类型的消息发送与接收
(一)普通消息
1、可靠同步发送(sync)
2、可靠异步发送(async)
3、单向发送(oneway)
4、三种发送方式的对比
5、发消息代码案例
(二)顺序消息
(三)广播模式
(四)延时消息
(五)批量消息
(六)过滤消息
(七)事务消息
(八)消息消费要注意的细节
一、MQ简介
(一)什么是MQ
MQ(Message Queue <消息队列>)是一种跨进程的通信机制,用于传递消息。通俗点说,就是一个先进先出的数据结构。
(二)MQ的应用场景
1、异步解耦
最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法如下:
此架构下注册、邮件、短信三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。 但是对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,而后续的注册短信和邮件不是即时需要关注的步骤。
所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的消息队列 MQ 中然后马上返回用户结果,由消息队列 MQ 异步地进行这些操作。架构图如下:
异步解耦是消息队列 MQ 的主要特点,主要目的是减少请求响应时间和解耦。主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时,由于使用了消息队列MQ,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。
2、流量削峰
流量削峰也是消息队列 MQ 的常用场景,一般在秒杀或团队抢购(高并发)活动中使用广泛。在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入消息队列 MQ。
秒杀处理流程如下所述:
-
用户发起海量秒杀请求到秒杀业务处理系统。
-
秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送至消息队列 MQ。
-
下游的通知系统订阅消息队列 MQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
-
用户收到秒杀成功的通知。
(三)常见的MQ产品
ZeroMQ、RabbitMQ、ActiveMQ、RocketMQ、Kafka
二、RocketMQ入门
RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转。
(一)RocketMQ安装部署
接下来我们先在linux平台下安装一个RocketMQ的服务
1、环境要求
-
Linux 64位操作系统
-
64bit JDK 1.8+
2、下载RocketMQ
Release Notes - Apache RocketMQ - Version 4.9.5 | RocketMQ
下载时要注意与springcloudalibaba版本匹配
版本说名链接地址
3、安装RocketMQ
-
上传文件到Linux系统
-
解压到安装目录
[root@bogon RocketMQ]# unzip rocketmq-all-4.9.5-bin-release.zip [root@bogon RocketMQ]# ll total 32136 drwxr-xr-x. 6 root root 103 Mar 27 14:47 rocketmq-all-4.9.5-bin-release -rw-r--r--. 1 root root 32906177 May 24 10:22 rocketmq-all-4.9.5-bin-release.zip
-
修改RocketMQ启动配置
bin 下的 3 个配置文件不然会报insufficient memory:
1)runserver.sh
vi runserver.sh # JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
2)runbroker.sh
vi runbroker.sh # JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
3)tools.sh
vi tools.sh # JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m" JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
-
开启自动动创建Topic功能 在conf/broker.conf⽂件中加⼊如下配置,开启自动动创建Topic功能。
autoCreateTopicEnable=true
4、启动RocketMQ
-
启动NameServer 执行命令启动NameServer
## 创建日志目录 cd bin mkdir logs # nohup ./mqnamesrv &:属于后台以静默⽅式启动 # ./mqnamesrv:属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出 nohup ./mqnamesrv > logs/mqnamesrv.out 2>1 &
查看启动状态,在当前目录下会有一个nohup.out的日志文件,可以打开查看。
## 查看日志 tail -f logs/mqnamesrv.out ## 看到以下表示启动成功 The Name Server boot success. serializeType=JSON
解决报错
## 报错 ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !! ## 解决 配置jdk环境变量 # 需要 export 环境变量
-
启动Broker
同样进入 RocketMQ 安装目录下的 /bin目录进行操作 执行启动命令,并且常驻内存,注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问。
# 启动命令,并且常驻内存:注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问 # nohup ./mqbroker -n 192.168.109.149:9876 & :属于后台以静默⽅式启动 # sh ./mqbroker -n 92.168.109.149:9876 :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出 nohup ./mqbroker -n 192.168.109.149:9876 > logs/mqbroker.out 2>1 &
查看启动状态,启动之后同样提示将日志信息追加到了当前目录下的nohup.out文件中。
## 查看日志 tail -f logs/mqbroker.out ## 看到以下表示启动成功 The broker[linux1, 192.168.109.149:10911] boot success. serializeType=JSON and name server is 192.168.109.149:9876
解决没反应:
#删除/root/store/* cd /root/store rm -rf * # 重新启动broker nohup ./mqbroker -n 192.168.109.149:9876 > logs/mqbroker.out 2>1 &
5、测试RocketMQ
发送/接收消息之前,需要告诉客户端(Producer、Consumer)名称服务器的位置,RocketMQ 提供了多种方法来实现这一点.
编程方式,如:producer.setNamesrvAddr(“ip:port”) Java 选项,如:rocketmq.namesrv.addr 环境变量,如:NAMESRV_ADDR HTTP 端点
-
测试消息发送
[root@linux1 rocketmq]# export NAMESRV_ADDR=localhost:9876 [root@linux1 rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer OpenJDK 64-Bit Server VM warning: MaxNewSize (262144k) is equal to or greater than the entire heap (262144k). A new max generation size of 261632k will be used. 16:19:05.806 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. SendResult [sendStatus=SEND_OK, msgId=AC11000176396FF3C5B512F379FA0000, offsetMsgId=AC11000100002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=linux1, queueId=3], queueOffset=0] ...... SendResult [sendStatus=SEND_OK, msgId=AC11000176396FF3C5B512F382B603E7, offsetMsgId=AC11000100002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=linux1, queueId=2], queueOffset=249] 16:19:08.609 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[172.17.0.1:10911] result: true 16:19:08.631 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
-
测试消息接收
[root@linux1 rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer OpenJDK 64-Bit Server VM warning: MaxNewSize (262144k) is equal to or greater than the entire heap (262144k). A new max generation size of 261632k will be used. 16:21:15.395 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework Consumer Started. ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=linux1, queueId=2, storeSize=201, queueOffset=1, sysFlag=0, bornTimestamp=1659601146477, bornHost=/192.168.0.101:48216, storeTimestamp=1659601146478, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F000000000000057F, commitLogOffset=1407, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1659601275866, UNIQ_KEY=AC11000176396FF3C5B512F37A6D0007, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='null'}]] ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=linux1, queueId=2, storeSize=202, queueOffset=2, sysFlag=0, bornTimestamp=1659601146500, bornHost=/192.168.0.101:48216, storeTimestamp=1659601146501, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F00000000000008A4, commitLogOffset=2212, bodyCRC=2088767104, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1659601275867, UNIQ_KEY=AC11000176396FF3C5B512F37A84000B, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 49], transactionId='null'}]]
消息发送完毕之后就会退出,在同一窗口中可以使用消费者类来进行接收消息,消费是多线程的。
6、关闭RocketMQ
与启动顺序相反进行关闭,先关闭 broker、在关闭 nameserv
./mqshutdown broker
./mqshutdown namesrv
(二)RocketMQ控制台安装与启动
下载并解压
下载地址:Tags · apache/rocketmq-externals · GitHub
下载rocketmq-console-1.0.0:https://codeload.github.com/apache/rocketmq-externals/zip/refs/tags/rocketmq-console-1.0.0
2、将压缩包放到你平时放项目的文件夹中,解压到当前文件夹
3、点进去,选中rocketmq-console
4、进入idea,找到解压的文件夹
5、选中下面 rocketmq-console ,点击ok,打开
6、然后点击 Trust Object(可信的实体),再点击建一个新的窗口,进去之后找到配置文件:按图修改
7、运行启动类,访问访问控制台 http://127.0.0.1:8080
三、springcloud集成rocketmq
接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:
(一)产品微服务-发送消息
1、pom添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
2、application.yml配置
rocketmq:
name-server: 虚拟机ip.XXX.XXX.XXX:9876
producer:
group: product-server # 生产者消息分组
3、controller发送消息
@RestController
@RequestMapping("/mq")
public class RocketMQController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public void send(){
rocketMQTemplate.convertAndSend("product-topic", "hello world from repository!!");
}
}
(二)用户微服务-订阅消息
1、pom添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
2、application.yml配置
rocketmq:
name-server: 虚拟机ip.XXX.XXX.XXX:9876
3、消息接收服务
//@RocketMQMessageListener 注解用于配置一个消息监听器,它包含以下两个重要的参数
//topic:指定监听器要消费哪个主题的消息。一个主题可以有多个消费者组订阅。
//consumerGroup:指定监听器属于哪个消费者组。一个消费者组内的消费者会负责消费主题的一部分消息,实现负载均衡。
@Service
@RocketMQMessageListener(consumerGroup = "product-server", topic = "product-topic")
public class SmsService implements RocketMQListener<String> {
private Logger logger = LoggerFactory.getLogger("user-server");
@Override
public void onMessage(String s) {
logger.info("收到一个订单信息{},接下来发送短信"+s);
}
}
4、测试
调用产品微服务的接口发送消息,在用户微服务的控制台查看接收到的消息即可。
(三)控制台
在控制台可以查看到主题,可以在控制台发送消息:
然后idea中user模块控制台可以接收到消息。
四、不同类型的消息发送与接收
(一)普通消息
RocketMQ提供三种方式来发送普通消息:可靠同步发送、可靠异步发送和单向发送。
1、可靠同步发送(sync)
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。 此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等可靠性要求高的的数据或者需要实时响应的数据。
这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。
2、可靠异步发送(async)
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次(
DefaultMQProducer#getRetryTimesWhenSendAsyncFailed}
)。发送的结果同样存在同一个消息可能被多次发送给给broker,需要应用的开发者自己在消费端处理幂等性问题。
3、单向发送(oneway)
单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
4、三种发送方式的对比
在实际使用场景中,利用何种发送方式,可以总结如下:
-
当发送的消息很重要,且对响应时间不敏感的时候采用
sync
方式; -
当发送的消息很重要,且对响应时间非常敏感的时候采用
async
方式; -
当发送的消息不重要,采用
one-way
方式,以提高吞吐量;
5、发消息代码案例
product模块
@Slf4j
@RestController
@RequestMapping("/mq")
public class RocketMQController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 同步消息
@GetMapping("/testSyncSend")
public void testSyncSend(){
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
//参数二: 消息内容
SendResult sendResult = rocketMQTemplate.syncSend("product-topic", "这是一条同步消息");
log.info(sendResult.toString());
}
// 异步消息
@GetMapping("/testASyncSend")
public void testASyncSend(){
//参数一: topic, 如果想添加tag 可以使用"topic:tag"的写法
//参数二: 消息内容
//参数三: 回调函数, 处理返回结果
rocketMQTemplate.asyncSend("product-topic", "这是一条异步消息", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(sendResult.toString());
}
@Override
public void onException(Throwable throwable) {
log.info(throwable.toString());
}
});
//因为是异步发送,所以让线程不要终止以便测试
try {
Thread.sleep(60000);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// 单向消息
@GetMapping("/testOneWay")
public void testOneWay(){
rocketMQTemplate.sendOneWay("product-topic", "这是一条单向消息");
}
}
接收异步、同步消息可以使用RocketMQ的消息监听器。通过实现RocketMQListener接口来监听指定Topic上的消息,异步处理消息时不需要等待,示例代码如下:
user模块
@Service
@RocketMQMessageListener(consumerGroup = "product-server", topic = "product-topic")
public class SmsService implements RocketMQListener<String> {
private Logger logger = LoggerFactory.getLogger("user-server");
@Override
public void onMessage(String s) {
logger.info("收到一个产品信息{},接下来发送短信"+s);
}
}
在上面的示例中,通过使用@RocketMQMessageListener注解指定了Topic和Consumer Group,消息被接收到后会自动调用onMessage方法进行处理。
需要注意的是,在异步接收消息时,RocketMQ会启动多个消费线程来处理消息,需要确保消息处理的线程安全性。
(二)顺序消息
顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型
//同步顺序消息[异步顺序 单向顺序写法类似]
// 顺序消息
@GetMapping("/testSyncSendOrderly")
public void testSyncSendOrderly(){
//第三个参数用于队列的选择
rocketMQTemplate.syncSendOrderly("product-topic", "这是一条异步顺序消息", "xxxx");
}
(三)广播模式
在RocketMQ中,消息可以以广播模式发送,广播模式的消息会被所有订阅了该topic的消费者都接收到。下面是一个简单的广播模式下消息的发送和接收的示例:
-
发送广播模式的消息 (一个服务发,两个服务收)
@GetMapping("/sendBroadcastMsg") public void sendBroadcastMsg(@RequestParam("msg") String msg){ Message<String> message = MessageBuilder.withPayload(msg).build(); SendResult sendResult = rocketMQTemplate.syncSend("product-topic", message); System.out.printf("广播消息发送结果:MsgId:%s SendStatus:%s%n", sendResult.getMsgId(), sendResult.getSendStatus()); }
这个消息发给一个主题(product-topic),只要是这个主题都可以收到消息,但消费者的组名得不同。
-
接收广播模式的消息
order-server
@Service // consumerGroup 消费者消息分组 topic 主题 @RocketMQMessageListener(consumerGroup = "order-server", topic = "product-topic") public class MessageListener implements RocketMQListener<String> { private Logger logger = LoggerFactory.getLogger("order-server"); @Override public void onMessage(String message) { logger.info("收到一个短信:"+message); } }
user-server
@Service @RocketMQMessageListener(consumerGroup = "user-server", topic = "product-topic") public class MessageListener implements RocketMQListener<String> { private Logger logger = LoggerFactory.getLogger("user-server"); @Override public void onMessage(String s) { logger.info("收到一个短信: "+s); } }
生产者发送消息,两个消费者都收到消息了。
上述示例代码中,通过使用@RocketMQMessageListener注解指定了订阅的Topic和Consumer Group,实现对广播消息的接收。接收广播模式的消息时,消息广播到所有订阅该topic的消费者,每个消费者都会独立接收到广播的消息。
需要注意的是,广播消息可能会被重复消费,消费者收到广播消息后会自动提交offset,若后续有新的消费者加入,则会从消费组最早的消费offset位置重新消费。
(四)延时消息
可以使用Spring Boot提供的RocketMQTemplate来发送延时消息,具体代码如下:
当使用 syncSend 发送延迟消息时,必须注意以下两个点:
-
必须设置 timeout 超时时间,防止方法堵塞
-
timeout 时间建议设置稍大于 delayLevel 对应的延迟时间
如果不设置 timeout 时间,方法会永久堵塞,直到消息被消费后才返回。
@GetMapping("/sendMsg")
public void sendMsg(String msg) {
// 发送延时消息,设置延时10秒(即该消息将在10秒后被消费)
Message<String> message = MessageBuilder.withPayload(msg).build();
// 这里设置 timeout = 10000 毫秒,即 10 秒。同时设置 delayLevel = 3,对应 10 秒的延迟。
// 这样可以确保,即使消息在 broker 延迟 10 秒未消费,syncSend 方法也不会永久堵塞,它会在超过 10 秒后返回。
rocketMQTemplate.syncSend("product-topic", message, 10000, 3);
}
RocketMQTemplate提供了syncSend方法,第三个参数是超时时间,第四个参数是延时时间。以上示例中延时级别为3,即延时10秒。
如果需要在Spring Boot应用中接收延时消息,推荐使用@RocketMQMessageListener注解,具体代码如下:
@Service
@RocketMQMessageListener(consumerGroup = "user-server", topic = "product-topic")
public class MessageListener implements RocketMQListener<String> {
private Logger logger = LoggerFactory.getLogger("user-server");
@Override
public void onMessage(String s) {
logger.info("收到一个短信: "+s);
}
}
等待十秒钟,订阅主题的消费者们会收到消息。当收到延时消息后,Spring会自动调用onMessage
方法来处理消息。
RocketMQ支持通过延时级别来控制消息的延时时间,延时级别设置在消息发送时,具体的延时时间由该延时级别对应的配置参数决定。RocketMQ定义了18个延时级别,级别从1到18,延时时间从1秒到2小时不等,各级别对应的延时时间和配置参数如下:
例如,若要设置一个延时30秒的消息,可以将延时级别设为4,参数设置为delayLevel=4。
需要注意的是,这里提到的延时时间是指消息发送后到达Broker存储的时间,与消息从Broker发送到消费者端的时间无关。在实际使用过程中,考虑到网络延迟等因素,消息的最终消费时间可能会比预设的延时时间略晚。
(五)批量消息
下面是一个使用SpringBoot整合RocketMQ的示例代码,演示如何发送和接收批量消息。
发送批量消息:
// 批量消息
@GetMapping("/sendBatchMsg")
public void sendBatchMsg() {
List<String> messages = new ArrayList<>();
messages.add("Message 1");
messages.add("Message 2");
messages.add("Message 3");
messages.add("Message 4");
messages.add("Message 5");
try {
SendResult result = rocketMQTemplate.syncSend("product-topic", messages);
System.out.println("Batch message send result: " + result);
} catch (MessagingException e) {
e.printStackTrace();
}
}
在以上示例中,使用了RocketMQTemplate的syncSend()方法,来发送批量消息。syncSend()方法的第二个参数可以是一个List集合,每个元素表示一条消息。具体使用时,可以将多条消息打包成一个List集合,然后将集合作为syncSend()方法的第二个参数。最后通过获取SendResult对象,来获取消息发送的结果信息。
接收批量消息的RocketMQListener:(user-server接收代码,order-server和下面差不多,只是消费消息分组不同)
@Service
@RocketMQMessageListener(consumerGroup = "user-server", topic = "product-topic")
public class MessageListener implements RocketMQListener<List<String>> {
private Logger logger = LoggerFactory.getLogger("user-server");
@Override
public void onMessage(List<String> s) {
logger.info("收到一个短信: "+s);
}
}
在以上示例中,通过实现RocketMQListener<List>接口,来接收List类型的批量消息。在重写的onMessage()方法中,处理接收到的批量消息。
需要特别注意的是,RocketMQListener的泛型需要根据发送的消息类型进行设置。如果发送的是String类型的批量消息,那么RocketMQListener的泛型就应该设置为List。如果发送的是其他类型的批量消息,比如自定义的对象,那么需要自定义RocketMQMessageConverter实现类,来将消息转换为指定的类型。
(六)过滤消息
下面是一个使用SpringBoot整合RocketMQ的示例代码,演示如何发送和接收过滤消息。
@GetMapping("/sendFilterMsg")
public void sendMsg(@RequestParam("msg") String msg, @RequestParam("tag") String tag) {
rocketMQTemplate.convertAndSend("product-topic" + ":" + tag, message);
}
在以上示例中,使用Autowired注解自动注入RocketMQTemplate实例,并通过setMessageProperty()方法和setTags()方法指定消息的属性和标签。最后调用convertAndSend()方法,将消息发送到名为product-topic的主题。
接收过滤消息的RocketMQListener:
@Slf4j
@Service
@RocketMQMessageListener(
topic = "product-topic",
consumerGroup = "filter-user-server",
selectorExpression = "tagA || tagB",
selectorType = SelectorType.TAG
)
public class FilterMsgListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
log.info("收到一个过滤短信:"+ msg);
}
}
在以上示例中,通过@RocketMQMessageListener注解指定了消费组、主题和选择器表达式(相当于过滤表达式)。在选择器表达式中使用了消息的属性prop1和prop2,以及消息的标签tag1来过滤消息。最后通过实现RocketMQListener接口,来处理接收到的过滤消息。
(七)事务消息
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。 事务消息交互流程:
两个概念:
-
半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
-
消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
事务消息发送步骤:
-
发送方将半事务消息发送至RocketMQ服务端。
-
RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功,此时消息为半事务消息。
-
发送方开始执行本地事务逻辑。
-
发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤:
-
在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
-
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
-
发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
(八)消息消费要注意的细节
@RocketMQMessageListener(
consumerGroup = "shop",//消费者分组
topic = "order-topic",//要消费的主题
consumeMode = ConsumeMode.CONCURRENTLY, //消费模式:无序和有序
messageModel = MessageModel.CLUSTERING, //消息模式:广播和集群,默认是集群
)
public class SmsService implements RocketMQListener<Order> {
}
RocketMQ支持两种消息模式:
-
广播消费: 每个消费者实例都会收到消息,也就是一条消息可以被每个消费者实例处理;
-
集群消费: 一条消息只能被一个消费者实例消费