Apache RocketMq 在4.3.0版本中已经支持分布式事物消息,采用了2PC的的思想实现提交事物消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。
一、事物消息生产者:TransactionMQProducer
发送事物消息
TransactionMQProducer#sendMessageInTransaction
1、获取事物监听器,一个消息生产者一个事物监听器
//事务监听器
TransactionListener transactionListener = getCheckListener();
2、忽略延迟级别
3、在消息属性中添加TRAN_MSG=true 标识事物消息;PGROUP=消息所属发送者组,然后以同步方式发送消息
4、判断消息发送状态
1、消息发送成功,回调TransactionListener#executeLocalTransaction方法,执行本地事物,返回本地
事物状态 localTransactionState
2、消息发送失败,不执行本地方法,localTransactionState 事物状态设置为回滚
5、结束事物方法
this.endTransaction(sendResult, localTransactionState, localException);
1、获取消息id
2、获取事物id
3、获取brokerAddr
4、构建结束事物消息头
5、根据事物状态设置回滚或者提交标识
6、设置生产者组名
7、设置事物消息队列偏移量
8、通过同步单向发送结束事物消息
二、事务消息事件监听器:TransactionListener
TransactionListener 的实现
executeLocalTransaction:方法,记录mq half消息同步发送broker成功后,执行本地事物,
记录事物状态,
checkLocalTransaction:本地事物状态回查方法,broker端接收事物消息后定时回查消息,
根据返回的事物状态,0-UNKNOW,1-COMMIT_MESSAGE,2-ROLLBACK_MESSAGE ,进行事物消息的继续回查,
提交、或者回滚。
三、broker接收处理事物消息
事物消息相对普通消息最大的特点就是一阶段发送的消息对用户不可见,RocketMq 一阶段发送的消息时half消息。
SendMessageProcessor#asyncSendMessage收到处理的消息,通过TRAN_MSG 标识获取事物标识
如果是事物消息通过this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);进行事物消息处理
1、备份消息的原主题名称与原队列ID
2、然后取消是事务消息的消息标签,重新设置消息的主题为:RMQ_SYS_TRANS_HALF_TOPIC
3、队列ID固定为0
调用org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage 存储替换后的half事物消息。
四、Commit和Rollback操作
在完成一阶段half消息写入,二阶段如果是commit操作则需要让消息对用户可见;如果是Rollback操作则需要撤销一阶段的消息
1、EndTransactionProcessor broker结束事物处理器
1、提交消息
1、根据commitLogOffset从commitlog文件中查找消息返回OperationResult
2、如果成功找到消息则继续处理,否则返回客户端未找到消息错误
3、检查消息必要字段
4、设置消息的相关属性,恢复原消息的数量,取消事物消息的相关系统标记
5、通过MessageStore将消息存储在CommitLog中,此时消息被转发到原消息主题对应的消费队列,消费端可以消费到
6、删除half预处理消息,其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理提交
2、回滚消息
1、根据commitlogOffset查找消息
2、消息存储在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表该消息已被处理,与提交事务消息不同的是
提交事务消息会将消息恢复原主题与队列,再次存储在commitlog文件中
3、删除half预处理消息,其实是将消息存储在主题为:RMQ_SYS_TRANS_OP_HALF_TOPIC的主题中,代表这些消息已经被处理提交
五、事物消息定时回查
RocketMQ使用TransactionalMessageCheckService线程定时去检测RMQ_SYS_TRANS_HALF_TOPIC主题中的消息,回查消息的事物状态
TransactionalMessageCheckService检测频率默认1分钟,可以通过broker.conf中设置transactionCheckInterval 来改变单位为毫秒
1、onWaitEnd 事物回查处理逻辑
2、从broker配置文件中获取transactionTimeOut参数值默认6秒,表示事务的过期时间
3、消息的存储时间 + 该值 大于系统当前时间,才对该消息执行事务状态会查
4、消息默认回查次数15,超过消息会默认为丢弃,即rollback消息
5、this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener())
1、事物消息的两个topic
1、RMQ_SYS_TRANS_HALF_TOPIC:prepare消息的主题,事务消息首先先进入到该主题。
2、RMQ_SYS_TRANS_OP_HALF_TOPIC:当消息服务器收到事务消息的提交或回滚请求后,会将消息存储在该主题
2、RMQ_SYS_TRANS_HALF_TOPIC 获取这个topic的所有消费队列,循环遍历消息队列,从单个消息消费队列去获取消息
3、获取操作队列的消费进度,待操作的消费进度,如果任意一个小于0,忽略该消息队列,继续处理下一个队列
4、调用fillOpRemoveMap方法填充removeMap、doneOpOffset,主要目的是避免重复调用事物回查接口
5、getMessageNullCount 获取空消息的次数, newOffset 前处理RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新进度
6、MAX_PROCESS_TIME_LIMIT 限制每次最多处理的时间,一次最多不超过60秒
7、如果removeMap中包含当前处理的消息,则继续下一条,removeMap中 的值是通过RMQ_SYS_TRANS_HALF_TOPIC#queueId当前的处理进度时,会添加到removeMap中,表示已处理过
8、根据消息队列偏移量i从消费队列中获取消息,如果消息为空,则根据允许重复次数进行操作,默认重试一次,目前不可配置。
9、如果超过重试次数,直接跳出,结束该消息队列的事务状态回查
10、如果是由于没有新的消息而返回为空(拉取状态为:PullStatus.NO_NEW_MSG),则结束该消息队列的事务状态回查
11、其他原因,则将偏移量i设置为: getResult.getPullResult().getNextBeginOffset(),重新拉取
12、判断该消息是否需要discard(吞没,丢弃,不处理)、或skip(跳过)
1、如果该消息回查的次数超过允许的最大回查次数,则该消息将被丢弃,即事务消息提交失败,
2、不能被消费者消费,其做法,主要是每回查一次,在消息属性TRANSACTION_CHECK_TIMES中增1,默认最大回查次数为5次
3、如果事务消息超过文件的过期时间,默认72小时(具体请查看RocketMQ过期文件相关内容),则跳过该消息
13、valueOfCurrentMinusBorn 该消息已存储的时间,等于系统当前时间减去消息存储的时间戳
14、checkImmunityTime 立即检测事务消息的时间,其意义是应用程序发送事物消息后,事物不会马上提交
该时间是假设事物消息发送成功后,应用提交事物的时间,这个时间内RocketMq任务事物未提交,因此这个时间不应该发送
事物回查请求。
15、checkImmunityTimeStr 事物消息过期时间, 如果立即检测事务消息的时间超过已存储时间,就算时间小于checkImmunityTime时间,也发送事务回查指令
16、如果当前时间还未过(应用程序事务结束时间),则跳出本次回查处理的,等下一次再试
17、从op队列获取opMsg集合,判断是否需要发送事物消息回查
1、如果从操作队列(RMQ_SYS_TRANS_OP_HALF_TOPIC)中没有已处理消息并且已经超过(应用程序事务结束时间)
2、如果操作队列不为空,并且最后一天条消息的存储时间已经超过transactionTimeOut值。
18、如果需要发送事务状态回查消息,则先将消息再次发送到RMQ_SYS_TRANS_HALF_TOPIC主题中发送成功则返回true,否则返回false
19、如果发送成功,会将该消息的queueOffset、commitLogOffset设置为重新存入的偏移量,为什么需要这样呢
答案在listener.resolveHalfMsg(msgExt)中
1、AbstractTransactionalMessageCheckListener#resolveHalfMsg 发送具体的事务回查机制,这里用一个线程池来异步发送回查消息
为了回查进度保存的简化,这里只要发送了回查消息,当前回查进度会向前推动,如果回查失败,上一步骤新增的消息将可以再次发送回查消息
如果回查消息发送成功,那会不会下一次又重复发送回查消息呢?这个可以根据OP队列中的消息来判断是否重复
如果回查消息发送成功并且消息服务器完成提交或回滚操作,这条消息会发送到OP队列中,然后首先会通过fillOpRemoveMap根据
处理进度获取一批已处理的消息,来与消息判断是否重复,由于fillopRemoveMap一次只拉32条消息,那又如何保证一定能拉取到与
当前消息的处理记录呢?其实就是通过第17步,如果此批消息最后一条未超过事务延迟消息,
则20步继续拉取更多消息进行判断
2、通过异步方式发送消息回查的实现过程
1、构建检查事物状态请求消息头
2、设置消息offsetId、消息事物id、事务消息队列中的偏移量(RMQ_SYS_TRANS_HALF_TOPIC)
3、恢复原消息的主题、队列,并设置storeSize为0
4、获取生产者组名称
5、根据生产者组获取任意一个生产者,通过与其连接发送事务回查消息,回查消息的请求者为【Broker服务器】,接收者为(client,具体为消息生产者)
6、发送回查消息 brokerController.getBroker2Client().checkProducerTransactionState RequestCode.CHECK_TRANSACTION_STATE
3、org.apache.rocketmq.client.impl.ClientRemotingProcessor#checkTransactionState 生产者接收回查消息
1、使用一个匿名类( Runnable )构建一个运行任务
1、执行TransactionListener#checkLocalTransaction,检测本地事务状态,也就是应用程序需要实现
2、然后返回COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW中的一个,然后向Broker发送END_TRANSACTION
2、然后提交到checkExecutor线程池中执行
20、 fillOpRemoveMap拉取更多op消息继续进行判断
21、transactionalMessageBridge.updateConsumeOffset 保存(Prepare)消息队列的回查进度。
22、transactionalMessageBridge.updateConsumeOffset 保存处理队列(op)的进度