RocketMQ的几种消息发送方式应用
- 一:普通消息
- 1)发送同步消息
- 2)发送异步消息
- 3)单向发送消息
- 4)消费消息-负载均衡模式
- 5)消费消息-广播模式
- 二:顺序消息
- 1.顺序消息指的是:严格按照消息的发送顺序进行消费的消息(FIFO)。
- 2.为什么需要顺序消息?
- 3.有序性分类
- 4.代码示例
- 三:延时消息
- 1.延时消息概览及适用场景
- 2.延时等级
- 3.代码示例
- 四:批量消息
- 1.批量发送消息
- 1.1 发送限制
- 1.2 生产者发送的消息大小
- 2. 批量消费消息
- 2.1 批量消费配置
- 2.2 存在的问题
- 3.代码示例
- 3.1 定义消息列表分割器
- 3.2 发送消息:
- 3.3 消费消息:
- 3.4 结果:
- 五:过滤消息
- 1.Tag过滤
- 1.1 代码实现
- 2.SQL过滤
- 2.1 代码实现
- 六:事务消息
- 1.问题引入
- 2.解决思路
- 3.注意
- 4.代码实现
- 4.1生产者:
- 4.2 消费者:
- 4.3事务监听实现:
- 4.4结果:
导入MQ启动依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
yml配置:
rocketmq:
name-server: 192.168.31.30:9876 # 访问地址
producer:
group: rocket-producer # 必须指定group
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
工具类:
package com.lmy.config.rocketmq;
import com.lmy.dto.rsp.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import com.lmy.utils.JsonUtil;
/**
* @author : lmy
* @date : 2023/12/9 下午 12:45
* 生产者
*/
@Slf4j
@Component
public class MQProducerService {
@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;
// 建议正常规模项目统一用一个TOPIC
private static final String topic = "RLT_TEST_TOPIC";
@Autowired
private RocketMQTemplate rocketMQTemplate;
private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new
ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
/**
* 普通发送(这里的参数可以随意定义,可以发送个对象,也可以是字符串等)
*/
public void send(String topic,String tag,Object obj) {
// rocketMQTemplate.convertAndSend(topic + ":"+tag, obj);
if (StringUtils.isNotEmpty(tag)) {
tag = ":"+tag;
}
rocketMQTemplate.send(topic + tag, MessageBuilder.withPayload(obj).build()); // 等价于上面一行
}
/**
* 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
* (msgBody也可以是对象,sendResult为返回的发送结果)
* setHeader: 在消息发送到RocketMQ时,这个键值对会被添加到消息的头部,以便在消息接收端进行识别和处理
*/
public SendResult sendSyncMsg(String topic,String tag,Object msgBody) {
if (StringUtils.isNotEmpty(tag)) {
tag = ":"+tag;
}
SendResult sendResult = rocketMQTemplate.syncSend(topic+tag, MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.KEYS,"myKey").build());
log.info("【sendMsg】sendResult={}", JsonUtil.objectToJson(sendResult));
return sendResult;
}
/**
* 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
* (适合对响应时间敏感的业务场景)
*/
public void sendAsyncMsg(String topic,String tag,String msgBody) {
if (StringUtils.isNotEmpty(tag)) {
tag = ":"+tag;
}
rocketMQTemplate.asyncSend(topic+tag, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理消息发送成功逻辑
log.info("【sendAsyncMsg】sendResult={}", JsonUtil.objectToJson(sendResult));
}
@Override
public void onException(Throwable throwable) {
// 处理消息发送异常逻辑
log.info("【sendAsyncMsg】发送失败:sendResult={}",throwable.getMessage());
}
});
}
/**
* 发送顺序消息
* @param topic
* @param msgBody
* 该方法的hashkey参数,RocketMQ会根据这个key来决定消息发送到哪个队列,具有相同hashkey的消息会发送到同一个队列。
*/
public void syncSendOrderly(String topic,String tag,String msgBody,String hashKey) {
if (StringUtils.isNotEmpty(tag)) {
tag = ":"+tag;
}
SendResult sendResult = rocketMQTemplate.syncSendOrderly(topic + tag, MessageBuilder.withPayload(msgBody).build(), hashKey);
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
msgBody));
}
/**
* 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)
* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendDelayMsg(String topic,String tag,String msgBody, int delayLevel) {
if (StringUtils.isNotEmpty(tag)) {
tag = ":"+tag;
}
rocketMQTemplate.syncSend(topic+tag, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
}
/**
* 发送同步批量消息
* @param topic
* @param tag
* @param messageList
*/
public void sendBatchMsg(String topic, String tag, List<String> messageList) {
List<Message> messages = new ArrayList<>();
for (String message : messageList) {
messages.add(new Message(topic,tag, message.getBytes()));
}
DefaultMQProducer producer = rocketMQTemplate.getProducer();
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
SendResult sendResult = producer.send(listItem);
log.info("【sendMsg】sendResult={}", sendResult.getRawRespBody()+","+sendResult.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
}
/**
* 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)
*/
public void sendOneWayMsg(String topic,String tag,String msgBody) {
if (StringUtils.isNotEmpty(tag)) {
tag = ":"+tag;
}
rocketMQTemplate.sendOneWay(topic+tag, MessageBuilder.withPayload(msgBody).build());
}
/**
* 发送带tag的消息,直接在topic后面加上":tag"
*/
public SendResult sendTagMsg(String topic,String tag,Object msgBody) {
if (StringUtils.isNotEmpty(tag)) {
tag = ":"+tag;
}
return rocketMQTemplate.syncSend(topic + tag, MessageBuilder.withPayload(msgBody).build());
}
/**
* 发送sql过滤的消息"
*/
public SendResult sendSqlMsg(String topic, String tags, Object msgBody, Map<String,Object> propMap) {
SendResult sendResult = new SendResult();
try {
DefaultMQProducer producer = rocketMQTemplate.getProducer();
Message msg = new Message(topic, tags, msgBody.toString().getBytes());
Set<Map.Entry<String, Object>> entries = propMap.entrySet();
for (Map.Entry<String, Object> entry : entries) {
String key = entry.getKey();
Object value = entry.getValue();
msg.putUserProperty(key, value + "");
}
sendResult = producer.send(msg);
System.out.println(sendResult);
}catch (Exception e) {
e.printStackTrace();
}
return sendResult;
}
/**
* 发送事务消息
* @param topic
* @param tags
* @param msgBody
*/
public TransactionSendResult TransactionMsg(String topic, String tags, Object msgBody) {
TransactionMQProducer producer = (TransactionMQProducer)rocketMQTemplate.getProducer();
// 为生产者指定一个线程池
producer.setExecutorService(executorService);
// 设置事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
// 设置生产者组
producer.setProducerGroup("Con_Group_9");
// 生成生产事务id
String transactionId = UUID.randomUUID().toString().replace("-", "");
// 构建消息体
org.springframework.messaging.Message<Object> message = MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).build();
// 第三个参数用于指定在执行本地事务时要使用的业务参数
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, message, "业务参数:" + msgBody);
return transactionSendResult;
}
}
package com.lmy.controller.rocketmq.consumer;
import com.lmy.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* @author : lmy
* @date : 2023/12/9 下午 12:51
* 消费者
*
* consumerGroup:是消费者的逻辑分组,用于使同一个 consumerGroup 下的消费者消费同一类消息。
* consumerGroup 的作用主要有以下 2 点:
* 1. 实现负载均衡。同一个 consumerGroup 下的消费者会均匀地消费同一类消息,不会重复消费。
* 2. 实现消息重试。当某个消费者挂掉时,其它消费者会继续消费该消费者未消费的消息。
*
* topic:是消息的逻辑分类,用于将消息分发到不同的消费者
* topic 的作用主要有以下 2 点:
* 1. 实现消息分发。当消息发送到 RocketMQ 服务器时,会根据 topic 将消息分发到不同的消费者。
* 2. 实现消息过滤。消费者可以通过指定 topic 来过滤消息。
*
* selectorType:是用于指定消息选择器的类型的属性。
* selectorType 的作用:
* 根据消息的属性或标签进行消息过滤,以便只有符合特定条件的消息才会被消费者消费。
* MessageSelectorType.TAG:表示消息选择器的类型是基于标签进行过滤。只有标签匹配的消息才会被消费者消费。
* MessageSelectorType.SQL92:它可以使用SQL92语法作为过滤规则表达式
*
* selectorExpression:是用于指定消息选择器的表达式的属性
* selectorExpression 的作用:
* 根据消息的属性或标签进行消息过滤,以便只有符合特定条件的消息才会被消费者消费。
* 在使用@RocketMQMessageListener 注解标注消费者类时,通过 selectorExpression 属性指定消息选择器的表达式
* 例:selectorExpression = "tag1 || tag2"
*
* consumeMode:用于指定消费者消费模式的属性
* consumeMode的作用:
* ConsumeMode.ORDERLY:有序消费。在这种模式下,消费者会按照消息发送的顺序来消费消息。
* ConsumeMode.CONCURRENTLY:并发消费。在这种模式下,消费者可以并发地消费消息
*
* messageModel:用于指定消息模型的属性
* messageModel的作用:
* MessageModel.CLUSTERING:负载均衡模式。在这种模式下,消息会被发送给其中一个订阅该主题的消费者。
* MessageModel.BROADCASTING:广播模式。在这种模式下,消息会被发送给所有订阅该主题的消费者。
*
* consumeThreadNumber:用于指定消费者线程数的属性
* RocketMQ中consumeThreadNumber的默认值是1,表示使用单线程消费消息。如果消息量很大,可以通过增加 consumeThreadNumber 来提高消费性能。
*
* maxReconsumeTimes:用于指定消息最大重试次数的属性
* RocketMQ中maxReconsumeTimes 的默认值是 3,表示消息最多可以重试 3 次。如果消息在重试 3 次后仍然没有被消费,那么该消息将被丢弃。
* 注意:maxReconsumeTimes 的设置会影响消费者的消费性能。如果消息量很大,建议减少 maxReconsumeTimes 的值,以提高消费性能。但是,如果消息量不大,减少 maxReconsumeTimes 的值可能会导致消息丢失。
*
* consumeTimeout:用于指定消息消费超时时间的属性
* RocketMQ中consumeTimeout 的默认值是 10000 毫秒,表示消息消费超时时间为 10 秒。如果消息在 10 秒内没有被消费,那么该消息将被重新投递。
* 注意:consumeTimeout 的设置会影响消费者的消费性能。如果消息量很大,建议减少 consumeTimeout 的值,以提高消费性能。但是,如果消息量不大,减少 consumeTimeout 的值可能会导致消息丢失。
*
* replyTimeout:用于指定消息回复超时时间的属性。
* RocketMQ中replyTimeout 的默认值是 3000 毫秒,表示消息回复超时时间为 3 秒。如果消息在 3 秒内没有收到回复,那么该消息将被重新投递。
* 注意:replyTimeout 的设置会影响消费者的消费性能。如果消息量很大,建议减少 replyTimeout 的值,以提高消费性能。但是,如果消息量不大,减少 replyTimeout 的值可能会导致消息丢失。
*
* enableMsgTrace:用于指定是否开启消息跟踪的属性
* enableMsgTrace的默认值是 false,表示不开启消息跟踪。如果开启消息跟踪,那么消费者可以通过消息 ID 查询消息的消费状态。
* 注意:enableMsgTrace 的设置会影响消费者的消费性能。如果开启消息跟踪,那么消费者需要额外消耗资源来记录消息的消费状态。如果消息量很大,建议关闭消息跟踪。
*
* tlsEnable:用于指定是否开启 TLS 加密的属性
* tlsEnable 的默认值是 false,表示不开启 TLS 加密。如果开启 TLS 加密,那么消费者和生产者之间的通信将使用 TLS 协议进行加密。
* 注意:tlsEnable 的设置会影响消费者的消费性能。如果开启 TLS 加密,那么消费者需要额外消耗资源来进行 TLS 加密和解密。如果消息量很大,建议关闭 TLS 加密。
*
* namespace:用于指定消息的命名空间的属性
* namespace 的默认值是 default,表示使用默认的命名空间。如果需要使用自定义的命名空间,可以通过 namespace 属性指定。
* 注意:namespace 的设置会影响消息的消费路由。如果 namespace 设置不正确,那么消息可能会被错误地消费。
*
* delayLevelWhenNextConsume:用于指定消息下次消费的延迟级别的属性
* delayLevelWhenNextConsume 的默认值是 0,表示消息下次消费的延迟级别为 0 级。如果需要设置消息下次消费的延迟级别,可以通过 delayLevelWhenNextConsume 属性指定
* 注意:delayLevelWhenNextConsume 的设置会影响消息的消费时间。如果 delayLevelWhenNextConsume 设置过高,那么消息可能会被延迟很长时间才能被消费。
*
* suspendCurrentQueueTimeMillis:用于指定暂停当前队列的毫秒数的属性
* suspendCurrentQueueTimeMillis 的默认值是 -1,表示不暂停当前队列。如果需要暂停当前队列,可以通过 suspendCurrentQueueTimeMillis 属性指定暂停的时间。
* 注意:suspendCurrentQueueTimeMillis 的设置会影响消息的消费时间。如果 suspendCurrentQueueTimeMillis 设置过高,那么消息可能会被延迟很长时间才能被消费。
*
* awaitTerminationMillisWhenShutdown:用于指定消费者在关闭时等待终止的时间的属性。
* awaitTerminationMillisWhenShutdown 的默认值是 0,表示消费者在关闭时不等待终止。如果需要消费者在关闭时等待一段时间后终止,可以通过 awaitTerminationMillisWhenShutdown 属性指定等待的时间。
* 注意:awaitTerminationMillisWhenShutdown 的设置会影响消费者的关闭时间。如果设置的等待时间过长,消费者可能需要等待较长时间才能完全关闭。
*
* instanceName:用于指定消费者实例名称的属性
* instanceName 的作用是为了在一个进程中创建多个消费者实例,每个实例可以独立运行和管理。
* 这对于需要同时消费多个主题或者在不同的消费者组中使用相同的消费者类来处理消息时非常有用。
*
*/
@Slf4j
@Component
public class MQConsumerService {
/**
*
* topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
* selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
* messageModel可设置消费者模式
* 1.CLUSTERING:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
* 2.BROADCASTING:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
*/
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_One")
public class ConsumerSend implements RocketMQListener<Object> {
// 监听到消息就会执行此方法
@Override
public void onMessage(Object obj) {
log.info("监听到消息:Object={}", JsonUtil.objectToJson(obj));
}
}
// 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,
// 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行
// @Service
// @RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two")
// public class ConsumerSend2 implements RocketMQListener<String> {
// @Override
// public void onMessage(String str) {
// log.info("监听到消息:str={}", str);
// }
// }
// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2",messageModel = MessageModel.CLUSTERING, consumerGroup = "Con_Group_Three")
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag3",messageModel = MessageModel.CLUSTERING,consumeMode= ConsumeMode.ORDERLY, consumerGroup = "Con_Group_Four")
public class orderConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到顺序消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag4",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_Five")
public class delayMsgConsumer implements RocketMQListener<Object> {
// 监听到消息就会执行此方法
@Override
public void onMessage(Object obj) {
log.info("监听到延迟消息:Object={},消费时间={}", JsonUtil.objectToJson(obj),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5", consumerGroup = "Con_Group_Sex")
public class batchConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到批量消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5 || tag6", consumerGroup = "Con_Group_7")
public class tagConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到过滤消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC",selectorType = SelectorType.SQL92, selectorExpression = "age between 0 and 6", consumerGroup = "Con_Group_8")
public class SqlConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到sql过滤消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag8 || tag9 || tag10", consumerGroup = "Con_Group_9")
public class TransactionConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到事务消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
}
消息发送者步骤:
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
消息消费者步骤:
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
一:普通消息
1)发送同步消息
同步发送消息是指,Producer发出⼀条消息后,会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高,但消息发送效率太低。
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
@ApiOperation("发送同步消息")
@GetMapping("/syncMessageProducer")
public void syncMessageProducer() {
String msg = "同步消息体";
for (int i = 0; i < 100; i++) {
SendResult sendResult = mqProducerService.sendSyncMsg("RLT_TEST_TOPIC","tag1",msg + i);
System.out.printf("%s%n", sendResult);
}
}
2)发送异步消息
异步发送消息是指,Producer发出消息后无需等待MQ返回ACK,直接发送下⼀条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
@ApiOperation("发送异步消息")
@GetMapping("/sendAsyncMsg")
public void sendAsyncMsg() {
String msg = "异步消息体";
for (int i = 0; i < 100; i++) {
mqProducerService.sendAsyncMsg("RLT_TEST_TOPIC","tag2",msg + i);
}
}
3)单向发送消息
单向发送消息是指,Producer仅负责发送消息,不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高,但消息可靠性较差。
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
@ApiOperation("发送单向消息")
@GetMapping("/sendOneWayMsg")
public void sendOneWayMsg() {
String msg = "单向消息体";
for (int i = 0; i < 100; i++) {
mqProducerService.sendOneWayMsg("RLT_TEST_TOPIC","tag2",msg + i);
}
}
4)消费消息-负载均衡模式
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
生产消息:
@ApiOperation("发送同步消息")
@GetMapping("/syncMessageProducer")
public void syncMessageProducer() {
String msg = "同步消息体";
for (int i = 0; i < 100; i++) {
SendResult sendResult = mqProducerService.sendSyncMsg("RLT_TEST_TOPIC","tag1",msg + i);
System.out.printf("%s%n", sendResult);
}
}
消费消息:
package com.lmy.config.rocketmq;
import com.lmy.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* @author : lmy
* @date : 2023/12/9 下午 12:51
* 消费者
*/
@Slf4j
@Component
public class MQConsumerService {
/**
* topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
* selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
* messageModel可设置消费者模式
* 1.CLUSTERING:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
* 2.BROADCASTING:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
*/
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_One")
public class ConsumerSend implements RocketMQListener<Object> {
// 监听到消息就会执行此方法
@Override
public void onMessage(Object obj) {
log.info("监听到消息:Object={}", JsonUtil.objectToJson(obj));
}
}
// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.CLUSTERING, consumerGroup = "Con_Group_One")
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
}
响应结果:
5)消费消息-广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
生产消息:
@ApiOperation("发送同步消息")
@GetMapping("/syncMessageProducer")
public void syncMessageProducer() {
String msg = "同步消息体";
for (int i = 0; i < 100; i++) {
SendResult sendResult = mqProducerService.sendSyncMsg("RLT_TEST_TOPIC","tag1",msg + i);
System.out.printf("%s%n", sendResult);
}
}
消费消息:
package com.lmy.config.rocketmq;
import com.lmy.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* @author : lmy
* @date : 2023/12/9 下午 12:51
* 消费者
*/
@Slf4j
@Component
public class MQConsumerService {
/**
* topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
* selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
* messageModel可设置消费者模式
* 1.CLUSTERING:消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
* 2.BROADCASTING:消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
*/
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.BROADCASTING,consumerGroup = "Con_Group_One")
public class ConsumerSend implements RocketMQListener<Object> {
// 监听到消息就会执行此方法
@Override
public void onMessage(Object obj) {
log.info("监听到消息:Object={}", JsonUtil.objectToJson(obj));
}
}
// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1",messageModel = MessageModel.BROADCASTING, consumerGroup = "Con_Group_Three")
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
}
响应结果:
二:顺序消息
1.顺序消息指的是:严格按照消息的发送顺序进行消费的消息(FIFO)。
默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列;而消费消息时会从多个Queue上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个Queue中,消费时也只从这个Queue上拉取消息,就严格保证了消息的顺序性。
2.为什么需要顺序消息?
例如,现在有TOPIC ORDER_STATUS (订单状态),其下有4个Queue队列,该Topic中的不同消息用于描述当前订单的不同状态。假设订单有状态:未支付、已支付、发货中、发货成功、发货失败。
根据以上订单状态,生产者从时序上可以生成如下几个消息:
订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中 --> 订单T0000001:发货失败
消息发送到MQ中之后,Queue的选择如果采用轮询策略,消息在MQ的存储可能如下:
这种情况下,我们希望Consumer消费消息的顺序和我们发送是一致的,然而上述MQ的投递和消费方式,我们无法保证顺序是正确的。对于顺序异常的消息,Consumer即使设置有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。
基于上述的情况,可以设计如下方案:对于相同订单号的消息,通过一定的策略,将其放置在一个Queue中,然后消费者再采用一定的策略(例如,一个线程独立处理一个queue,保证处理消息的顺序性),能够保证消费的顺序性。
3.有序性分类
根据有序范围的不同,RocketMQ可以严格地保证两种消息的有序性:分区有序与全局有序。
1)全局有序
当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序, 称为全局有序。
在创建Topic时指定Queue的数量。有三种指定方式:
- 在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
- 在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
- 使用mqadmin命令手动创建Topic时指定Queue数量
2)分区有序
如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。
如何实现Queue的选择:
一般我们给出唯一且不重复的key(例如订单号),让key(或其hash值)与该Topic所包含的Queue的数量取模,其结果即为选择出的Queue的QueueId。以下是源码:
取模算法存在一个问题:不同选择key与Queue数量取模结果可能会是相同的,即不同选择key的消息可能会出现在相同的Queue,即同一个Consuemr可能会消费到不同选择key的消息。这个问题如何解决?一般性的作法是,从消息中获取到选择key,对其进行判断。
但是不属于那个Consumer的消息被拉取走了,那么应该消费该消息的Consumer是否还能再消费该消息呢?同一个Queue中的消息不可能被同一个Group中的不同Consumer同时消费。所以,消费现一个Queue的不同选择key的消息的Consumer一定属于不同的Group。而不同的Group中的Consumer间的消费是相互隔离的,互不影响的。
4.代码示例
/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("未支付");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("未支付");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("未支付");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("已支付");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("已支付");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("已支付");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("发货中");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("发货中");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("发货中");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("发货失败");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("发货成功");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("发货成功");
orderList.add(orderDemo);
return orderList;
}
生产消息:
@ApiOperation("发送顺序消息")
@GetMapping("/syncSendOrderly")
public void syncSendOrderly() {
String msg = "顺序消息体";
for (int i = 0; i < 12; i++) {
// 订单列表
List<OrderStep> orderList = new RocketMQController().buildOrders();
mqProducerService.syncSendOrderly("RLT_TEST_TOPIC","tag3",msg +orderList.get(i).toString() ,orderList.get(i).getOrderId()+"");
}
}
消息消费:
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag3",messageModel = MessageModel.CLUSTERING,consumeMode= ConsumeMode.ORDERLY, consumerGroup = "Con_Group_Four")
public class orderConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到顺序消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
响应结果:
三:延时消息
1.延时消息概览及适用场景
当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。
采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,提交一个订单时可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
2.延时等级
延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。
延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中:
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
即,若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。
如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了1天这个等级1d)。配置文件在RocketMQ安装目录下的conf目录中。
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m
1h 2h 1d
3.代码示例
延迟消息生产者:
@ApiOperation("发送延时消息")
@GetMapping("/sendDelayMsg")
public void sendDelayMsg() {
String msg = "延时消息体发出时间";
for (int i = 0; i < 12; i++) {
String msgBody = msg + i + ":" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
mqProducerService.sendDelayMsg("RLT_TEST_TOPIC","tag4",msgBody ,3);
}
}
延迟消息消费者:
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag4",messageModel = MessageModel.CLUSTERING,consumerGroup = "Con_Group_Five")
public class delayMsgConsumer implements RocketMQListener<Object> {
// 监听到消息就会执行此方法
@Override
public void onMessage(Object obj) {
log.info("监听到延迟消息:Object={},消费时间={}", JsonUtil.objectToJson(obj),new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
结果:
四:批量消息
生产者批量发送消息能显著提高传递小消息的性能。
1.批量发送消息
1.1 发送限制
- 批量发送的消息必须具有相同的Topic
- 批量发送的消息必须具有相同的刷盘策略
- 批量发送的消息不能是延时消息与事务消息
- 默认情况下,一批发送的消息总大小不能超过4MB字节,若想调整大小方法如下
1)将批量消息进行拆分,拆分为若干不大于4M的消息集合分多次批量发送
2)在Producer端与Broker端修改属性
Producer端需要在发送之前设置Producer的maxMessageSize属性
Broker端需要修改其加载的配置文件中的maxMessageSize属性
1.2 生产者发送的消息大小
生产者通过send()方法发送的Message,并不是直接将Message序列化后发送到网络上的,而是通过这个Message生成了一个字符串发送出去的。这个字符串由四部分构成:Topic、消息Body、消息日志(占20字节),及用于描述消息的一堆属性key-value。
2. 批量消费消息
2.1 批量消费配置
Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息,则可以通过修改Consumer的consumeMessageBatchMaxSize属性来指定。但是,该值不能超过32。因为默认情况下消费者每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,则可通过修改Consumer的pullBatchSize属性来指定。
2.2 存在的问题
- pullBatchSize值设置的越大,Consumer每拉取一次需要的时间就会越长,且在网络上传输出现问题的可能性就越高。若在拉取过程中若出现了问题,那么本批次所有消息都需要全部重新拉取。
- consumeMessageBatchMaxSize值设置的越大,Consumer的消息并发消费能力越低,且这批被消费的消息具有相同的消费结果。因为consumeMessageBatchMaxSize指定的一批消息只会使用一个线程进行处理,且在处理过程中只要有一个消息处理异常,则这批消息需要全部重新再次消费处理。
3.代码示例
3.1 定义消息列表分割器
package com.lmy.utils.rocketMq;
import org.apache.rocketmq.common.message.Message;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* @author : lmy
* @date : 2023/12/24 上午 11:58
* 定义消息列表分割器,将消息列表分割为多个不超出4M大小的小列表
* 消息列表分割器:其只会处理每条消息的大小不超4M的情况。
* 若存在某条消息,其本身大小大于4M,这个分割器无法处理,
* 其直接将这条消息构成一个子列表返回。并没有再进行分割
*/
public class ListSplitter implements Iterator<List<Message>> {
// 指定极限值为4M
private int SIZE_LIMIT = 1024 * 1024 * 4;
// 存放所有要发送的消息
private final List<Message> messages;
// 要进行批量发送消息的小集合起始索引
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
// 判断当前开始遍历的消息索引要小于消息总数
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
// 记录当前要发送的这一小批次消息列表的大小
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
// 获取当前遍历的消息
Message message = messages.get(nextIndex);
// 统计当前遍历的message的大小
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
// 判断当前消息本身是否大于4M
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
// 获取当前messages列表的子集合[currIndex, nextIndex)
List<Message> subList = messages.subList(currIndex, nextIndex);
// 下次遍历的开始索引
currIndex = nextIndex;
return subList;
}
}
3.2 发送消息:
@ApiOperation("发送批量消息")
@GetMapping("/sendBatchMsg")
public void sendBatchMsg() {
String msg = "批量消息体发出时间";
List<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
String msgBody = msg + i + ":" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
list.add(msgBody);
}
mqProducerService.sendBatchMsg("RLT_TEST_TOPIC","tag5",list);
}
3.3 消费消息:
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5", consumerGroup = "Con_Group_Sex")
public class batchConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到批量消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
3.4 结果:
可见,100条信息一次批量发送
五:过滤消息
消费者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。
1.Tag过滤
以一个标签标识信息进行过滤
1.1 代码实现
生产者:
@ApiOperation("发送过滤消息")
@GetMapping("/sendTagMsg")
public void sendTagMsg() {
String msg = "过滤消息体";
for (int i = 0; i < 100; i++) {
mqProducerService.sendTagMsg("RLT_TEST_TOPIC","tag6",msg + i);
}
}
消费者:
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag5 || tag6", consumerGroup = "Con_Group_7")
public class tagConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到过滤消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
结果:
2.SQL过滤
相对于tag(一个消息只能有一个标签)不能应对复杂的场景,可以使用SQL表达式过滤消息。
SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。但是,只有使用PUSH模式的消费者才能使用SQL过滤。
SQL过滤表达式中支持多种常量类型与运算符:
支持的常量类型:
- 数值:比如:123,3.1415
- 字符:必须用单引号包裹起来,比如:‘abc’
- 布尔:TRUE 或 FALSE
- NULL:特殊的常量,表示空
支持的运算符有:
- 数值比较:>,>=,<,<=,BETWEEN,=
- 字符比较:=,<>,IN
- 逻辑运算 :AND,OR,NOT
- NULL判断:IS NULL 或者 IS NOT NULL
默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:
#默认情况下Broker没有开启消息的SQL过滤功能,以下设置开启
enablePropertyFilter=true
然后重启生效即可!
2.1 代码实现
定义SQL过滤生产者:
/**
* 发送sql过滤的消息"
*/
public SendResult sendSqlMsg(String topic, String tags, Object msgBody, Map<String,Object> propMap) {
SendResult sendResult = new SendResult();
try {
DefaultMQProducer producer = rocketMQTemplate.getProducer();
Message msg = new Message(topic, tags, msgBody.toString().getBytes());
Set<Map.Entry<String, Object>> entries = propMap.entrySet();
for (Map.Entry<String, Object> entry : entries) {
String key = entry.getKey();
Object value = entry.getValue();
msg.putUserProperty(key, value + "");
}
sendResult = producer.send(msg);
System.out.println(sendResult);
}catch (Exception e) {
e.printStackTrace();
}
return sendResult;
}
@ApiOperation("发送sql过滤消息")
@GetMapping("/sendSqlMsg")
public void sendSqlMsg() {
String msg = "sql过滤消息体";
for (int i = 0; i < 100; i++) {
Map<String, Object> pop = new HashMap<>();
pop.put("age",i);
mqProducerService.sendSqlMsg("RLT_TEST_TOPIC","tag7",msg + i,pop);
}
}
消费者:
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC",selectorType = SelectorType.SQL92, selectorExpression = "age between 0 and 6", consumerGroup = "Con_Group_8")
public class SqlConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到sql过滤消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
结果:
六:事务消息
1.问题引入
需求场景:工行用户A向建行用户B转账1万元
可以使用同步消息来处理该需求场景:
存在的问题:
若第3步中的扣款操作失败,但消息已经成功发送到了Broker。对于MQ来说,只要消息写入成功,那么这个消息就可以被消费。此时建行系统中用户B增加了1万元。出现了数据不一致问题。
2.解决思路
让第1、2、3步具有原子性,要么全部成功,要么全部失败。即消息发送成功后,必须要保证扣款成功。如果扣款失败,则回滚发送成功的消息。而该思路即使用事务消息。这里要使用分布式事务解决方案。
预扣款执行结果存在三种可能性:
// 描述本地事务执行状态
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定,表示需要进行回查以确定本地事务的执行结果
}
消息回查,即重新查询本地事务的执行状态。
关于消息回查,有三个常见的属性设置。它们都在broker加载的配置文件中设置,例如:
- transactionTimeout=20,指定TM在20秒内应将最终确认状态发送给TC,否则引发消息回查。默认为60秒
- transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志。默认15次。
- transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒。默认为60秒。
rocketMQ中使用的分布式事务解决方案是XA处理模式
XA模式中有三个重要组件:TC、TM、RM。
TC:事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。(RocketMQ中Broker充当着TC。)
TM:事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。它实际是全局事务的发起者。(RocketMQ中事务消息的Producer充当着TM)
RM:资源管理器。管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚(RocketMQ中事务消息的Producer及Broker均是RM)
3.注意
- 事务消息不支持延时消息和批量消息。
- 对于事务消息要做好幂等性检查,因为事务消息可能不止一次被消费(因为存在回滚后再提交的情况)
- 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
4.代码实现
4.1生产者:
@ApiOperation("发送事务消息")
@GetMapping("/sendTransactionMsg")
public void TransactionMsg() {
for (int i = 8; i < 11; i++) {
mqProducerService.TransactionMsg("RLT_TEST_TOPIC", "tag" + i, "事务消息" + i);
}
}
/**
* 发送事务消息
* @param topic
* @param tags
* @param msgBody
*/
public TransactionSendResult TransactionMsg(String topic, String tags, Object msgBody) {
TransactionMQProducer producer = (TransactionMQProducer)rocketMQTemplate.getProducer();
// 为生产者指定一个线程池
producer.setExecutorService(executorService);
// 设置事务监听器
producer.setTransactionListener(new TransactionListenerImpl());
// 设置生产者组
producer.setProducerGroup("Con_Group_9");
// 生成生产事务id
String transactionId = UUID.randomUUID().toString().replace("-", "");
// 构建消息体
org.springframework.messaging.Message<Object> message = MessageBuilder.withPayload(msgBody).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).build();
// 第三个参数用于指定在执行本地事务时要使用的业务参数
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, message, "业务参数:" + msgBody);
return transactionSendResult;
}
4.2 消费者:
@Service
@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag8 || tag9 || tag10", consumerGroup = "Con_Group_9")
public class TransactionConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("监听到事务消息:msg={},Key:{}", msg,messageExt.getKeys());
}
}
4.3事务监听实现:
package com.lmy.config.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
/**
* @author : liu ming yong
* @date : 2024/4/21 下午 6:30
* @description : 事务监听器
*/
@Slf4j
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 回调操作方法,消息预提交成功就会触发该方法的执行,用于完成本地事务
log.info("预提交消息成功:" + msg+";业务参数:"+arg);
// 假设接收到tag8的消息就表示操作成功,tag9的消息表示操作失败,tag10表示操作结果不清楚,需要执行消息回查
if (StringUtils.equals("tag8", msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("tag9", msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals("tag10", msg.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 消息回查方法
// 引发消息回查的原因最常见的有两个:1)回调操作返回UNKNWON 2)TC没有接收到TM的最终全局事务确认指令
log.info("执行消息回查" + msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
}