Rocketmq 事务消息API使用
使用TransactionMQProducer类。 实现TransactionListener 接口覆盖其方法executeLocalTransaction和checkLocalTransaction 即可。
其中executeLocalTransaction 执行本地方法和checkLocalTransaction 事务状态回查。
玩法
- 简历一张本地事务表,字段大概有(rocketmq事务id,业务事务id),
- 于executeLoalTransaction,利用数据库事务特性,和业务数据同时持久化到数据库。
- checkLocalTransaction. 按rocketmq事务id查询数据库,是否有对应的数据。
为什么需要本地事务表
保证可靠性。当业务事务提交后节点宕机。rocketmq同样也能回查到数据。
流程分析
事务消息源码分析
实现原理是基于二阶段提交和定时事务状态回查实现的。
二阶段提交分析
涉及相关类
Producer
TransactionMQProducer
DefaultMQProducerImpl
TransactionListener
Broker
SendMessageProcessor
EndTransactionProcessor
分析流程
- 入口方法TransactionMQProducer.sendMessageInTransaction 投递事务消息
- 调用DefaultMQProducerImpl.sendMessageInTransaction
- 为消息头部增加事务消息标志,发送消息。
- Broker 入口方法 SendMessageProcessor#sendMessage检查消息头部是否有事务标记,有投递半消息。响应Producer 结果包括事务id
- Producer收到消息成功发送结果后,执行本地事务。并通知Broker 本地事务执行结果。
- Broker 入口方法EndTransactionProcessor#processRequest 。按收到结果做决定。若是事务提交则投递普通消息,删除半消息。若是事务回滚则删除半消息。
事务消息回查
RocketMQ 通过TramsactionalMessageCheckService 线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事务状态。TransactionalMessageCheckService 的检测频率默认为1分钟,可通过broker.conf文件中设置transactionCheckInterval 来改变默认值,单位为毫秒
public class TransactionalMessageCheckService extends ServiceThread {
private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private BrokerController brokerController;
public TransactionalMessageCheckService(BrokerController brokerController) {
this.brokerController = brokerController;
}
//.... 省略代码
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
}