文章目录
- 1. 理论阐述
- 2. 代码实现
- 2.1. 问题代码
- 2.2. 改进方案
本文参考:
事务回调编程
大事务问题
1. 理论阐述
最近在学习数据库事务的过程中,了解到了大事务的危害:
- 并发情况下,数据库连接资源容易耗尽
- 锁定数据较多,容易造成大量阻塞和锁超时,进而接口超时
- 执行时间长,容易造成主从延迟
- 回滚所需要的时间变长
那么大事务又是如何产生的呢?
- 单个事务操作数据库操作较多
- 事务中存在 RPC/MQ 等非 DB 耗时操作
- 大量的锁竞争
项目编程中我们经常会用到 Spring 的声明式事务 @Transactional 注解,我去反思了下项目中对事务的使用,还真的存在事务中嵌套 MQ 的用法,比方说本地数据库操作过程中穿插着 ES 写入消息(牺牲直连写入 ES 的时效性,中间加一层 MQ 可以提升容灾性),这就容易产生大事务,整体架构如下:
在分布式异常场景下这种模式也是有问题的:
比方说数据库操作执行报错,或者 MQ 消息超时,本地事务需要回滚,但是 MQ 消息已经发出去了,没法执行回滚操作,这就没法保证本地事务+MQ的原子性了。
想一下怎么尽可能避免发送MQ但又需要回滚的场景,其实就是把发MQ消息的时机往后放放,本地事务执行成功了,才发送 MQ 消息,这样子也避免大事务中嵌套 MQ,这在业务上也是可以接受的。
这种做法底层避免了数据库操作失败,MQ 需要回滚但是没法回滚的困境,但仍然有它的缺点,就是仍然没法保证 “数据库操作 + MQ” 的原子性,比方说下面,数据库事务提交了之后,App 重启或者宕机了,就不会发出 MQ 消息。
这其实涉及到了分布式事务的处理策略,我们当然可以用本地消息表或者其他分布式处理策略如TCC来解决这个问题。
所以这里谈论到的策略其实并不是一种分布式事务的处理方案,重点在于优化代码结构避免长事务,同时尽量保证“数据库操作 + MQ” 的原子性。
2. 代码实现
2.1. 问题代码
在@Transactional 声明式事务编程中,两个 insert 操作中穿插着发送MQ消息,典型的大事务问题。
@Transactional
public void doTransaction() {
log.info("start tx");
User user1 = new User();
user1.setId(9);
user1.setAge(2);
user1.setName("jxz");
user1.setEmail("111@qq.com");
userMapper.insert(user1);
log.info("insert user1...");
log.info("调用其他 RPC 或者发送 MQ 消息");
User user2 = new User();
user2.setId(10);
user2.setAge(3);
user2.setName("jxz");
user2.setEmail("111@qq.com");
userMapper.insert(user2);
log.info("insert user2...");
log.info("end tx");
}
那正如前面所说的,我们可以在数据库本地事务提交以后,再去调用 RPC 或者 MQ。这个时候代码结构是需要调整的,如果只是单纯把 RPC 或者 MQ 从 @Transactional 注解声明的方法中抽取出来,后置调用,伪代码如下:
public void doRpcAfterTransaction() {
// 原先 @Transactional 声明的数据库操作,事务失效
doTransaction();
log.info("调用其他 RPC 或者发送 MQ 消息");
}
@Transactional 注解也会失效,因为这属于方法内部调用 @Transactional 声明的方法,Spring 不是拿到的代理对象去调用。此外这种方式还增加了代码的复杂性,改动量太大。
2.2. 改进方案
那么是否存在一种代码改动量较小,能让人一眼看懂,最好在静态上还是内嵌在原来 @Transactional 声明式事务编程中;同时还能在当前事务执行完以后,能够及时回调 RPC/MQ 等第三方调用的。
仍然声明一下,这种方案是为了尽可能保证“本地事务+RPC/MQ”的原子性,并且代码结构简单,并不是分布式事务的解决方案。
Spring 提供这样的 SPI 扩展,TransactionSynchronization 就提供事务执行完成以后回调的接口。
其中包括多个事务回调的拓展点:
其中 TransactionSynchronization#afterCompletion(int status) 就会根据事务执行结果(成功 commit 或者回滚 rollback),status 入参数代表事务执行状态,其实现就会执行事务后置处理。
这一切都建立在当前方法上下文存在活跃的事务,Spring 也提供了静态方法来让我们调用判断 TransactionSynchronizationManager#isActualTransactionActive()
最终写了个工具类实现代码如下:
package com.jxz.util;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* @Author jiangxuzhao
* @Description
* @Date 2024/10/2
*/
public class TransactionUtils {
/**
* 事务后置处理 api,可以优化大事务提升数据库性能,尽量保证“本地事务 + RPC/MQ”的原子性
*
* @param runnable 事务后置处理任务
*/
public static void doAfterTransaction(Runnable runnable) {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(new DoTransactionCompletion(runnable));
}
}
/**
* 实现 TransactionSynchronization 接口,重写其中的 afterCompletion 方法
*/
public static class DoTransactionCompletion implements TransactionSynchronization {
// 待执行的任务
Runnable runnable;
public DoTransactionCompletion(Runnable runnable) {
this.runnable = runnable;
}
// 在事务 commit/rollback 以后回调
@Override
public void afterCompletion(int status) {
// 当事务状态是 COMMITTED 时
if (status == TransactionSynchronization.STATUS_COMMITTED) {
runnable.run();
}
}
}
}
在原先调用的地方修改也很简单:
内嵌在 @Transactional 声明式事务中,甚至连 RPC/MQ 调用的代码位置都不需要变动,内部实现的就是事务执行完成之后的后置回调。
@Transactional
public void doTransaction2() {
log.info("start tx");
User user1 = new User();
user1.setId(13);
user1.setAge(2);
user1.setName("jxz");
user1.setEmail("111@qq.com");
userMapper.insert(user1);
log.info("insert user1...");
TransactionUtils.doAfterTransaction(() ->
log.info("afterCommit, 调用其他 RPC 或者发送 MQ 消息"));
User user2 = new User();
user2.setId(14);
user2.setAge(3);
user2.setName("jxz");
user2.setEmail("111@qq.com");
userMapper.insert(user2);
log.info("insert user2...");
log.info("end tx");
}