纯技术方案水文特此记录
MQ本地消息事务表解决了什么问题?
MQ本地事务表方案解决了本地事务与消息发送的原子性问题,即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。实现本地事务和消息发送的原子性,要么都成功,要么都失败。 简而言之就是保证了生产者发送消息的可靠性
实现流程
整体实现思路并不复杂,代码看起来绕,其实就是新增了一张本地消息表,记录消息发送失败的日志,且随当前业务事务一块提交。等到业务事务执行完毕后,在执行发送MQ逻辑,此时如果MQ发送失败了也不打紧,本地表兜着底呢,后面可以通过定时扫表的方式进行MQ消息的重新发送
从代码实现上来看用到了一些小众的 API ,前提知识储备:事务相关组件 TransactionSynchronizationManager 。通过 TransactionSynchronizationManager 我们可以拿到当前线程是否是事务线程,毕竟事务和线程挂钩。我们还可以通过 TransactionSynchronizationManager 为当前事务线程注册一些事件,例如 afterCommit 事件。当事务执行后触发我们的 afterCommit 事件进行真正的 MQ 消息的发送。
手把手带你debug流程
发来一个请求执行事务方法,事务方法里面处理好业务逻辑后,进行MQ可靠消息发送
由于可靠MQ消息发送被我们的自定义注解做过 @Around 切面处理,因此先走切面逻辑
来到切面后,由于此时的所有逻辑还在事务里面,因此并不会触发 joinPoint.proceed() 方法《此方法调用会触发 rocketMQTemplate.send(topic, build) 的执行》。开始调用 secureInvokeService.invoke(record, async) 进行MQ消息发送日志入库到本地表。
入库完了之后呢?开始注册一个 afterCommit 事件,注意这里仅仅是注册事件,并没有触发执行
注册完成后直接 return null 。其实这个切面针对事务线程就是一个空壳方法,作用仅仅是保存了一条本地MQ消息发送日志而已
到这一步已经完成了,本地事务与本地MQ消息表的数据原子性了,此时如果如下代码出现业务逻辑异常,不会触发MQ消息发送逻辑。是正常的,如果MQ本地表入库失败的话,业务逻辑会进行回滚。
@Transactional
public void sendSecureMsg(String testRocketMQ, String message, String ccc) {
log.info("执行其他的业务逻辑");
mqProducer.sendSecureMsg(testRocketMQ, message, ccc);
}
等待 sendSecureMsg 方法执行完成我们的自定义切面后,事务切面最终会进行事务的 commit 操作。commit 完成后调用之前注册过的 afterCommit 事件,具体事务源码可以看我主页的这篇文章:
全面解读spring注解事务失效场景,伪代码+图文深度解析spring源码运行过程
当事务提交后开始发送MQ消息
最终发MQ。发送失败重试几次,重试还是发送失败改本地MQ表的数据状态等待,定时任务扫描重新发即可。
通过 method.invoke(bean, args); 触发事务方法,事务会失效,即使此方法被自定义切面加强,由于切面中有判断如果当前线程不在事务内,执行的是方法本身的逻辑,因此走的是如下图圈红的地方
最终调用如下代码发送MQ
@SecureInvoke
public void sendSecureMsg(String topic, Object body, Object key) {
Message<Object> build = MessageBuilder
.withPayload(body)
.setHeader("KEYS", key)
.build();
else rocketMQTemplate.send(topic, build);
}
本文总结
这个方案就是新增了一张本地消息表,记录消息发送失败的日志,且随当前业务事务一块提交。等到业务事务执行完毕后,在执行发送MQ逻辑,此时如果MQ发送失败了也不打紧,本地表兜着底呢,后面可以通过定时扫表的方式进行MQ消息的重新发送
如果还不懂这套方案童鞋可以先去搞懂一下,再来阅读本文效果更佳
- 事务方法失效场景。即必须通过代理对象进行的方法调用事务才会生效。
- 声明式事务整个的代理对象是如何创建、事务运行的整套流程
全面解读spring注解事务失效场景,伪代码+图文深度解析spring源码运行过程