从一个编程式事务开始
Spring的声明式事务涉及到Bean的注入还有动态代理相关的知识,门槛会相对高一些。为了更容易理解事务,我们先从编程式事务的例子开始,逐步揭开Spring事务神秘的面纱。
//一个简单的编程式事务的例子
@Autowired
private PlatformTransactionManager transactionManager;
public void testTransaction() {
//创建一个默认的事务定义
TransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
//通过事务管理器和事务定义创建一个TransactionTemplate
TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager, transactionDefinition);
//使用事务模板类实例执行事务逻辑
transactionTemplate.execute(() -> {
//在事务中执行的逻辑
return r;
});
}
从上面的例子我们可以看出创建一个事务首先需要一个事务管理器PlatformTransactionManager
,然后是一个事务的定义TransactionDefinition
。通过这两兄弟我们可以得到一个事务模板类TransactionTemplate
,它存在的意义就是省去手动写commit
和rollback
的逻辑。
通过上面的例子我们可以先从PlatformTransactionManager
和TransactionDefinition
作为支点,来一步一步的翘起Spring事务底层实现。
没有也不要紧的事务定义TransactionDefinition
TransactionDefinition
会定义事务的隔离级别,传播机制,超时时间,是否为只读事务,事务名称。它是一个接口,其中一个实现是DefaultTransactionDefinition
,DefaultTransactionDefinition
上面的隔离级别等需要设置的信息都会有默认值
- 传播机制默认为
PROPAGATION_REQUIRED
- 隔离级别默认为
ISOLATION_DEFAULT
,数据库设置的隔离级别 - 默认没有超时时间
- 默认为非只读事务
由于有了以上的默认值,所以说TransactionDefinition
没有也不要紧。
关于更多关于事务隔离级别和传播机制的信息,可以另外从其他渠道获取,本文更多是介绍事务实现原理。
事务行为抽象PlatformTransactionManager
PlatformTransactionManager
将事务的动作(获取事务,提交事务,事务回滚)抽象成了三个方法getTransaction
,commit
,rollback
。PlatformTransactionManager
是一个接口,真实的实现交给了实现者。下面截图是其中的一些实现者。
不过在PlatformTransactionManager
与这些实现并不是直接关系,中间还有一层AbstractPlatformTransactionManager
,显而易见这里用到了父类模板方法模式。在AbstractPlatformTransactionManager
中会有一些通用逻辑的实现,有差异的地方由各个ORM框架根据自己的特性做不同的实现。这不仅是Spring事务有这种这种抽象,这种抽象在Spring Cloud也是随处可见,Spring Cloud封装了顶层的一些抽象,而Alibaba等将自己的组件(Nacos等)实现对应的接口整合到Spring Cloud生态。
事务行为实现AbstractPlatformTransactionManager源码分析
AbstractPlatformTransactionManager
继承了PlatformTransactionManager
,将事务的行为进行了实现。下面从AbstractPlatformTransactionManager
定义的获取事务,提交事务,事务回滚三个动作来进行源码分析。
在开始AbstractPlatformTransactionManager
的分析前我们先看看PlatformTransactionManager
接口的定义。
获取事务
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
获取事务需要一个事务的定义,返回的是一个TransactionStatus
。TransactionStatus
表示事务的状态,可以从TransactionStatus
中获取到事务是否提交,事务是否有保存点等状态相关的信息。为什么这里Spring要这么设计?按常理来说,这里应该返回的就是一个事务,而不是事务的状态。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
//如果是一个新事务,不会从上下文中取到数据库连接
//注意这里怎么也会创建一个新的DataSourceTransactionObject,
//不同的实现有不同的事务对象
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
//有不同的实现,DataSource是判断当前线程上下文中是否存在数据库连接,并且连接是活跃的
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
//如果存在事务,则要判断事务传播机制。
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
//不存在事务则判断是否需要在事务环境下执行
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error ex) {
//如果发生异常则恢复事务
resume(null, suspendedResources);
throw ex;
}
} else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
//表示不会创建事务的信息绑定到上下文
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
- 如果给定的事务定义为空就是用默认的事务定义。
- 获取一个事务对象
- 判读是否存在事务
- 存在事务则走事务传播的逻辑处理
- 不存在事务则判断事务传播是否需要创建事务
- 不需要事务,则会在空事务下执行
获取事务操作在大的逻辑上没有难以理解的地方,但是细节上其实有很多可以关注的地方。
- 在上面提到的为什么返回的一个事务状态,而不是事务本身?
这种我理解是对事务的一种抽象,因为不同实现的事务的事物对象会有不同的实现,从而处理各自一些特殊的逻辑。避免直接返回事务会将不同的实现的细节泄露出去。 - 每一次调用获取事务的方法都会创建一个事务,可能根本不需要事务,这样是不是浪费?
这个问题其实比较复杂,涉及到事务传播机制。在一个多层事务调用的时候,如果存在不需要事务的方法就需要将保存当前事务上下文后挂起当前事务,然后执行需要事务的方法,执行完成后再进行恢复事务上下文。
以上只是笔者个人理解。
下面会根据上面事务是否存在的几种情况进行详细的说明。
不存在事务需要在事务中执行
这种情况对应getTransaction
中下面的这段代码。
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error ex) {
//如果发生异常则恢复事务
resume(null, suspendedResources);
throw ex;
}
1.执行挂起当前事务的操作。这个时候就会有同学有疑惑了,不是都没有事务了,为啥还要执行事务挂起操作。在前面判断是否有事务,是通过判断txObject是否有连接并且连接的事务是活跃的判断存在事务。这里挂起事务传入的事务对象为null,在挂起的方法中会判断同步器是否为活跃的。主要是因为Spring的事务传播机制以及多容器情况的存在,确保当前线程上下文中是不存在事务的,存在事务就需要挂起。
2.开启事务。
要注意的地方就是newSynchronization
这个变量很关键。
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
//这个标志是用来判断是否要创建synchronization,用来存储Transaction的信息到线程。
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//这里是否为新事务的标识位为true
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
//生成synchronization,设置当前事务信息到当前线程
prepareSynchronization(status, definition);
return status;
}
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
//这里的isNewSynchronization判断很关键,是区分是否在事务上下文下执行以及是否是新创建的事务的关键。
if (status.isNewSynchronization()) {
//是否存在事务
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
//如果不是默认事务隔离级别则设置,是默认就不用设置
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
//设置是否为只读事务
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
在获取事务对象的时候下面这段代码可能会有同学有疑惑。为什么txObject
设置newConnectionHolder
的时候直接就给了一个false。如果获取的连接是空的呢?在哪里会进行重新设置呢?答案就在下面。
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
//从ThreadLocal中获取当前线程的连接
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
//直接设置为false
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
下面执行的主要逻辑:
- 从DataSource中获取数据库连接
- 关闭当前数据库连接事务自动提交
- 将当前连接的属性保存到txObject中,用于事务结束后恢复
- 设置超时时间以及其他属性的初始化
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
//设置新的连接
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
存在事务且需要在事物中执行
这种情况有需要分两种情况讨论。一种是存在事务直接在当前事务中执行,一种是需要另外创建新的事务。
第一种直接在当前事务中执行的需要额外的操作,也不会创建新的同步器。
第二种需要挂起当前事务,然后创建新的事务同步器以及获取新的数据库连接。注意:此时之前的事务是没有提交的,数据库连接也未释放。
这里就不贴代码了,前面如果都看懂了,这里没有啥难理解的,感兴趣可以直接去看Spring源码。
不需要在事务中执行
这种就和上面的有点类似,如果存在事务则会挂起事务,不会创建事务同步器。
这里就不贴代码了,前面如果都看懂了,这里没有啥难理解的,感兴趣可以直接去看Spring源码。
事务挂起
前面提到的事务挂起逻辑如下:
如果存在同步器,则会将同步器一一挂起,并且将信息保存到suspendedResources
,最后这个对象会保存到txObject,用于事务恢复。
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
//如果当前线程的事务为活跃的。
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
//将事务的信息封装起来,并将当前线程的事务信息清空。
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
} else if (transaction != null) {
// Transaction active but no synchronization active.
//事务是活跃的但是没有同步器活跃
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
} else {
// Neither transaction nor synchronization active.
return null;
}
}
提交事务
void commit(TransactionStatus status) throws TransactionException;
从接口定义可以看出提交事务需要的对象就是获取事务接口返回的TransactionStatus
。
下面同样使用DataSource的实现来进行源码分析。当然它同样也有AbstractPlatformTransactionManager
中的抽象实现。
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
processCommit(defStatus);
}
AbstractPlatformTransactionManager
中的commit
同样可以理解为一个父类模板方法,它将主要的骨架在这个方法中做了定义,具体的实现由子类去承接。
AbstractPlatformTransactionManager
中的commit
逻辑很清晰。
- 先判断是否已经完成,重复提交事务抛出异常。
- 判断当前执行是否已抛出异常需要回滚,如果是则会执行回滚。
- 判断全局的事务是否已抛出异常需要回滚,这个主要是嵌套事务,如果是则会执行回滚。
- 最后才会提交事务。
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
//存在保存点,只处理保存点
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
//当前事务是新事务,则会进行事务提交,否则不会处理
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
//需要回滚的事务,也就是标记发生了异常的事务,提交会抛出异常
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
//这一步很重要,里面涉及到资源的释放,以及线程上下文的恢复
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
cleanupAfterCompletion(status);
}
}
ConnectionSynchronization中处理连接的释放。
回滚事务
void rollback(TransactionStatus status) throws TransactionException;
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
//保存点回滚
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
} else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
//真实的回滚
doRollback(status);
} else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
} else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
} else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
} finally {
cleanupAfterCompletion(status);
}
}
逻辑和提交事务差不多,都是会判断一些是否为新事务,以及是否为保存点等来进行回滚处理。
总结
花了主要的精力对事务的获取进行了详细的说明,对于事务的提交和回滚描述比较粗,不过如果对获取事务的细节都了解后,对提交和回滚也很好理解。只要关键的点知道了,很容易就知道他的原理了。不过本篇没有细节到怎么去获取的连接以及如何进行的释放,已经它怎么和JDBC进行联动的,后续又时间会进行补充。
参考资料
Spring 5源码