文章目录
- 消息队列解决事务存在的问题
- RocketMQ的分布式事务方案
- RocketMQ的分布式事务案例代码
- 分布式事务源码分析
- 消息发送源码分析
- 确认/回滚源码分析
- 回查源码分析
- 总结
消息队列解决事务存在的问题
如果使用消息队列解决事务在哪个阶段向MQ发送消息?
-
先扣款后再向RocketMQ发消息
先扣款再发送消息,万一发送消息超时了(MQ中有可能成功,有可能失败),那这个状态就很难判断了 -
先向RocketMQ发消息后再扣款
扣款成功消息发送成功,但是如果本地扣款业务失败了,那消息已经发给MQ了,第二阶段的加钱就会执行成功。
所以无论是哪种方案,处理起来都会有问题。
其实仔细分析下,问题的关键点,就是RocketMQ改变不了消息发送者的事务状态。所以RocketMQ的分布式事务方案进行了优化。
RocketMQ的分布式事务方案
RocketMQ在分布式事务中引入了半事务及事务回查机制。
半事务:
发一个消息到rocketmq,但该消息只储存在commitlog中,但consumeQueue中不可见,也就是消费端(订阅端)无法看到此消息。
事务回查:
RocketMq会定时遍历commitlog中的半事务消息,这个事务回查机制就可以站在 RocketMQ的角度参与消息发送者的事务中。
RocketMQ的分布式事务案例代码
这个是分布式事务的生产者,完成了半事务的发送。
通过事务回查,如果在TransactionListenerImpl类executeLocalTransaction方法中,如果本地事务执行成功,则提交commit_message,消费端即可消费消息
如果有一些比较耗时的操作导致,不能在这个步骤确认的话,可以提交UNKNOW,交给定时的任务回查来处理
分布式事务源码分析
从分布式事务的流程上,可以从消息发送,确认/回滚 ,回查三个方面分析。
消息发送源码分析
Producer
Broker
RocketMQ使用Netty处理网络,broker收到消息写入的请求就会进入SendMessageProcessor类中processRequest方法。
最终进入DefaultMessageStore类中asyncPutMessage方法进行消息的存储
结合图同时结合代码,我们可以看到,在事务消息发送时,消息实际存储的主题是一个系统主题:RMQ_SYS_TRANS_HALF_TOPIC
同时消息中保存着消息的原有主题相关的信息与队列
确认/回滚源码分析
Producer
DefaultMQProducerImpl类sendMessageInTransaction方法
Broker
EndTransactionProcessor类
回查源码分析
Producer
事务回查中,Producer是服务端,所以需要注册服务处理
DefaultMQProducerImpl类checkTransactionState方法
DefaultMQProducerImpl类processTransactionState方法
Broker
在Broker启动的时候,是要作为客户端,定期的访问客户端做事务回查。
事务回查是Broker发起的一次定时的网络调用(每隔60s),所以事务回查在客户端启动的时候第一次不一定是60s的间隔,一般会小于60s(因为事务回查是broker发起的,并不是client端定时发起)
总结
RocketMQ支持分布式事务,主要是通过两阶段提交协议实现。在第一阶段,系统发送一个prepared消息到MQ,如果这个prepared消息发送失败那么就直接取消操作别执行了。如果这个消息发送成功了,就接着执行本地事务(executeLocalTransaction),如果成功就告诉MQ发送确认消息,如果失败,就告诉MQ发送回滚消息。在第二阶段,如果发送了确认消息,那么B系统会接收到确认消息,然后执行本地事务。如果在发送确认或回滚消息失败的情况下,broker有轮询机制,根据唯一id查询本地事务状态,MQ会自动定时轮询所有prepared消息回调你的接口(checkLocalTransaction),询问这个消息是不是本地事务处理失败了,所有没有发送确认的消息,是继续重试还是回滚。