文章目录
- 事务消息
- 概念介绍
- 交互流程
- 事务消息原理
- TransactionListener接⼝
- TransactionProducer.java
- TransactionConsumer.java
事务消息
内置topic中的消息对消费者不可见
本地事务+mq消息=事务消息
消息队列 RocketMQ 版提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求的场景。
概念介绍
事务消息:消息队列 RocketMQ 版提供类似 X/Open XA 的分布式事务功能,通过消息队列RocketMQ 事务消息能达到分布式事务的最终⼀致。
半事务消息:暂不能投递的消息,发送⽅已经成功地将消息发送到了消息队列 RocketMQ 版服务端,但是服务端未收到⽣产者对该消息的⼆次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查:由于⽹络闪断、⽣产者应⽤重启等原因,导致某条事务消息的⼆次确认丢失,消息队列RocketMQ 版服务端通过扫描发现某条消息⻓期处于“半事务消息”时,需要主动向消息⽣产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
交互流程
事务消息交互流程如下图所示。
事务消息发送步骤如下:
- 发送⽅将半事务消息发送⾄消息队列 RocketMQ 版服务端。
- 消息队列 RocketMQ 版服务端将消息持久化成功之后,向发送⽅返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
- 发送⽅开始执⾏本地事务逻辑。
- 发送⽅根据本地事务执⾏结果向服务端提交⼆次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅⽅最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅⽅将不会接受该消息。
事务消息回查步骤如下:
- 在断⽹或者是应⽤重启的特殊情况下,上述步骤 4 提交的⼆次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 发送⽅收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
- 发送⽅根据检查得到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤 4 对半事务消息进⾏操作。
注意事项
- 事务消息不⽀持延时消息和批量消息。
- 为了避免单个消息被检查太多次⽽导致半队列消息累积,我们默认将单个消息的检查次数限制为15 次,但是⽤户可以通过 Broker 配置⽂件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误⽇志。⽤户可以通过重写AbstractTransactionalMessageCheckListener 类来修改这个⾏为。
- 事务消息将在 Broker 配置⽂件中的参数 transactionTimeout 这样的特定时间⻓度之后被检查。当发送事务消息时,⽤户还可以通过设置⽤户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
- 事务性消息可能不⽌⼀次被检查或消费。做好幂等性的检查
- 提交给⽤户的⽬标主题消息可能会失败,⽬前这依⽇志的记录⽽定。它的⾼可⽤性通过 RocketMQ本身的⾼可⽤性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使⽤同步的双重写⼊机制。
- 事务消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的⽣产者 ID 查询到消费者。
事务消息原理
HALF消息:RMQ_SYS_TRANS_HALF_TOPIC(临时存放消息信息)
事务消息替换主题,保存原主题和队列信息
半消息对Consumer不可⻅,不会被投递
OP消息: RMQ_SYS_TRANS_OP_HALF_TOPIC(记录⼆阶段操作)
Rollback:只做记录
Commit:根据备份信息重新构造消息并投递
回查:
对⽐HALF消息和OP消息进⾏回查
TransactionListener接⼝
要使⽤RocketMQ的事务消息,要实现⼀个TransactionListener的接⼝,这个接⼝中有两个⽅法,如下:
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will
* be invoked to execute local transaction.
* 执⾏本地事务
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final
Object arg);
/**
* When no response to prepare(half) message. broker will send check
* message to check the transaction status, and this
* method will be invoked to get local transaction status.
* 消息回查后,需要检查对应消息的本地事务执⾏的最终结果
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
RocketMQ的事务消息是基于两阶段提交实现的,也就是说消息有两个状态,prepared和commited。当消息执⾏完send⽅法后,进⼊的prepared状态,进⼊prepared状态以后,就要执⾏executeLocalTransaction⽅法,这个⽅法的返回值有3个,也决定着这个消息的命运,1.LocalTransactionState.COMMIT_MESSAGE:提交消息,这个消息由prepared状态进⼊到commited状态,消费者可以消费这个消息;
2.LocalTransactionState.ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息;
3.LocalTransactionState.UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,⽽最终决定这个消息命运的,是checkLocalTransaction这个⽅法。
当executeLocalTransaction⽅法返回UNKNOW以后,RocketMQ会每隔⼀段时间调⽤⼀次checkLocalTransaction,这个⽅法的返回值决定着这个消息的最终归宿。那么checkLocalTransaction这个⽅法多⻓时间调⽤⼀次呢?我们在BrokerConfig类中可以找到,
/**
* Transaction message check interval.
*/
@ImportantField
private long transactionCheckInterval = 60 * 1000;
这个值是在brokder.conf中配置的,默认值是60*1000,也就是1分钟。那么会检查多少次呢?如果每次都返回UNKNOW,也不能⽆休⽌的检查吧,我们在BrokerConfig类中可以找到
/**
* The maximum number of times the message was checked, if exceed this
value, this message will be discarded.
*/
@ImportantField
private int transactionCheckMax = 15;
这个是检查的最⼤次数,超过这个次数,如果还返回UNKNOW,这个消息将被删除。
TransactionProducer.java
package com.example.rocketmq.demo.transaction;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.TimeUnit;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("GroupTransaction");
//2.指定nameserver地址
producer.setNamesrvAddr("localhost:9876");
//3.添加事务监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 在该方法执行本地事务
* @param msg
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//db ,本地事务 + mq消息 = 事务消息
System.out.println("executeLocal:"+msg.getTags());
if(StringUtils.equals("TAGA",msg.getTags())){
return LocalTransactionState.COMMIT_MESSAGE;
}else if(StringUtils.equals("TAGB",msg.getTags())){
//B消息本地事务返回rollback
return LocalTransactionState.ROLLBACK_MESSAGE;
}else if(StringUtils.equals("TAGC",msg.getTags())){
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
/**
* 该方法用于MQ进行消息的回查
* @param msg
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("checkLocalTransaction:"+msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
//4.启动producer
producer.start();
String[] tags = {"TAGA","TAGB","TAGC"};
//发送三条消息
for (int i = 0; i < 3; i++) {
// 发送A、B、C三条
Message msg = new Message("TransactionTopic", tags[i],
("Hello RocketMQ: " + tags[i]).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//6.发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg,null);
//7.获取发送状态
SendStatus sendStatus = sendResult.getSendStatus();
System.out.printf("发送结果:%s%n", sendStatus);
TimeUnit.SECONDS.sleep(1);
}
//8.关闭生产者producer
// producer.shutdown();
}
}
TransactionConsumer.java
package com.example.rocketmq.demo.transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 1.创建消费者consumer、制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupTransaction");
// 2.指定nameserver地址
consumer.setNamesrvAddr("localhost:9876");
// 3.订阅主题Topic和Tag
consumer.subscribe("TransactionTopic", "*");
// 4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for(MessageExt msg:msgs){
String key = msg.getKeys();
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
}
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}