一、概述
Spring事务执行的流程如下图,我们将按照下面的执行顺序,介绍Spring的事务。
二、代码分析
(一)核心流程
TransactionAspectSupport#invokeWithinTransaction 事务处理核心伪代码
// 通过事务调用
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 获取事务属性源对象
TransactionAttributeSource tas = getTransactionAttributeSource();
// 通过事务属性源对象获取到当前方法的事务属性信息
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取配置的事务管理器对象DataSourceTransactionManager
final TransactionManager tm = determineTransactionManager(txAttr);
// TransactionManager 转换为 PlatformTransactionManager
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 获取连接点的唯一标识 类名+方法名
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 创建事务信息TransactionInfo
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 执行被增强方法,调用具体的处理逻辑
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 异常回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清除事务信息,恢复线程私有的老的事务信息
cleanupTransactionInfo(txInfo);
}
// 成功后提交,会进行资源储量,连接释放,恢复挂起事务等操作
commitTransactionAfterReturning(txInfo);
return retVal;
}
1、 getTransactionAttributeSource() 获取事务属性源对象,获取的是在IOC容器启动过程中,创建的NameMatchTransactionAttributeSource对象,内含属性nameMap,可通过方法名称获取方法对应的事务属性。
// key: 方法名称;value:方法的事务属性
2 private Map<String, TransactionAttribute> nameMap = new HashMap<>();
2、determineTransactionManager(txAttr) 获取事务管理器,返回在IOC容器启动过程中,创建的DataSourceTransactionManager事务管理器对象。
3、asPlatformTransactionManager(txAttr) 将DataSourceTransactionManager转换为PlatformTransactionManager。
4、methodIdentification(method, targetClass, txAttr) 获取被增强方法的类名 + 方法名
5、获取连接、开启事务 (包含事务传播特性的处理,执行insert、update、delete等被增强方法前的处理)
6、执行被增强方法,数据库新增、删除、更新操作
7、是否异常
被增强方法执行异常,completeTransactionAfterThrowing(txInfo, ex) 事务回滚操作
被增强方法执行无异常, commitTransactionAfterReturning(txInfo) 事务提交操作
(二)创建事务信息
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
// 如果没有名称指定则使用方法唯一标识,并使用DelegatingTransactionAttribute封装txAttr
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
// 获取TransactionStatus事务状态信息,开启事务、获取jdbc连接
TransactionStatus status = tm.getTransaction(txAttr);
// 根据指定的属性与status准备TransactionInfo,
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
1、封装事务属性对象
用DelegatingTransactionAttribute对象封装事务属性对象TransactionAttribute,若事务属性对象的name属性为空,重写DelegatingTransactionAttribute的getName方法,将被增强方法的类名+方法名返回;
2、事务状态 - TransactionStatus
AbstractPlatformTransactionManager#getTransaction 获取事务状态信息核心伪代码
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// 若不指定事务定义,则使用默认的事务定义StaticTransactionDefinition
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取数据源对象 DataSourceTransactionObject
Object transaction = doGetTransaction();
// 若已存在事务,按已存在事务流程执行
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Mandatory支持当前事务,如果不存在就抛异常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 事务不存在,创建新事务
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
// 开启事务(获取连接、开启事务)
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// 创建一个空事务信息,用作事务同步
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
1、获取数据源事务对象DataSourceTransactionObject,含是否允许当前事务设置保存点、jdbc连接持有对象,此处有一个复用JDBC连接逻辑。
TransactionSynchronizationManager.getResource(obtainDataSource()) 获取数据源对应的jdbc连接 核心伪代码
private static Object doGetResource(Object actualKey) {
// 获取线程私有事务资源
Map<Object, Object> map = resources.get();
// 第一次创建事务,线程私有事务资源为空,后续创建,返回null
if (map == null) {
return null;
}
// 获取线程私有事务资源中数据源对应的jdbc连接
Object value = map.get(actualKey);
// 如果value是ResourceHolder类型,并且已经解绑
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
// 将该数据源从线程私有事务资源中的元素map里移除
map.remove(actualKey);
// 线程私有事务资源的map为空,清空线程私有事务资源
if (map.isEmpty()) {
resources.remove();
}
// 返回null
value = null;
}
// 返回数据源对应的jdbc连接
return value;
}
2、isExistingTransaction(transaction) 若已存在事务,根据被增强方法的传播特性,判断是抛异常、新建事务、还是支持原有事务。
3、根据被增强方法的传播特性判断是抛异常、创建事务、按非事务方式执行。
当然我们最关心的还是按照事务的方式来执行被增强方法,下面来看看被增强方法的事务是如何处理的。
(三)开启事务
AbstractPlatformTransactionManager#startTransaction 开启事务核心伪代码
// 开启事务
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
// 是否需要新同步
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建新事务状态对象
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 获取连接、开启事务
doBegin(transaction, definition);
// 新同步事务的设置,针对于当前线程的设置
prepareSynchronization(status, definition);
return status;
}
1、获取事务是否需要同步标识
用作后续prepareSynchronization对线程私有变量做初始化操作的判断;
2、创建事务状态对象
AbstractPlatformTransactionManager#newTransactionStatus 创建事务状态对象
// 创建事务状态对象
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// 重置同步标识,只有当前事务需要同步,并且未被激活,同步标识为true
boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
// 创建DefaultTransactionStatus事务状态对象
return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
}
3、获取连接、开启事务
DataSourceTransactionManager#doBegin 获取连接、开启事务核心伪代码:
// 获取连接、开启事务
protected void doBegin(Object transaction, TransactionDefinition definition) {
// 强制转化事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 判断事务对象没有数据库连接持有器
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 通过数据源获取一个数据库连接对象
Connection newCon = obtainDataSource().getConnection();
// 把数据库连接包装成一个ConnectionHolder对象 然后设置到txObject对象中去
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 标记当前的连接是一个同步事务
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 获取JDBC连接
con = txObject.getConnectionHolder().getConnection();
// 为当前的事务设置隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 设置先前隔离级别
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 设置是否只读
txObject.setReadOnly(definition.isReadOnly());
// 关闭自动提交
if (con.getAutoCommit()) {
//设置需要恢复自动提交
txObject.setMustRestoreAutoCommit(true);
// 关闭自动提交
con.setAutoCommit(false);
}
// 判断是否设置为只读事务
prepareTransactionalConnection(con, definition);
// 标记激活事务
txObject.getConnectionHolder().setTransactionActive(true);
// 设置事务超时时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 绑定数据源和连接到同步管理器上,把数据源作为key,数据库连接作为value 设置到线程变量中
if (txObject.isNewConnectionHolder()) {
// 将当前获取到的连接绑定到当前线程
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
// 释放数据库连接
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
3.1、获取jdbc连接
第一次创建事务,数据源属性对象中的数据库jdbc连接为空,通过数据源获取jdbc数据源连接。
3.2、填充数据源属性对象属性
1、设置连接持有器中的事务为同步事务,设置数据源属性对象之前的隔离级别、设置只读属性;
2、关闭jdbc连接的自动提交,开启事务;
3、标记当前事务为激活状态,可用作后续prepareSynchronization对线程私有变量做初始化操作的判断;
3.3、绑定新的jdbc连接至当前线程的事务资源中
若当前连接是一个新的jdbc连接,需要将当前jdbc连接绑定到线程当前线程私有的事务资源属性中TransactionSynchronizationManager#bindResource 核心伪代码如下:
// 绑定jdbc连接至当前线程的事务资源中
public static void bindResource(Object key, Object value) throws IllegalStateException {
// 获取数据源
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
// 获取线程私有事务资源中jdbc连接
Map<Object, Object> map = resources.get();
// 第一次创建事务,事务资源未创建,初始化线程私有事务资源
if (map == null) {
map = new HashMap<>();
resources.set(map);
}
// 将数据源作为key,jdbc连接作为value,绑定到线程私有的事务资源中
Object oldValue = map.put(actualKey, value);
// 若当前线程的事务资源中原有的value为ResourceHolder类型,并且已经解绑,将原有的事务资源value置空
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
// 若当前线程的事务资源已经被绑定,抛异常
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
}
当外层事务的传播特性为REQUIRED,内层的传播特性也为REQUIRED时,内层可复用外层创建的jdbc连接。通过数据源获取的jdbc连接是同一个。
(四)事务信息
事务信息准备核心伪代码:
// 准备事务信息
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
// 创建事务信息
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
// 设置新事务状态
txInfo.newTransactionStatus(status);
// 事务信息绑定到当前线程
txInfo.bindToThread();
// 返回事务信息
return txInfo;
}
1、创建事务信息
创建TransactionInfo对象,封装事务管理器,事务注解属性,被增强方法类名+方法名,事务状态对象
2、事务信息绑定到当前线程
// 持有当前被增强方法的事务状态,支持多个被增强方法间的事务处理
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
new NamedThreadLocal<>("Current aspect-driven transaction");
// 将事务信息绑定到当前线程
private void bindToThread() {
// 获取事务信息持有器原来的事务信息,并设置到oldTransactionInfo属性
this.oldTransactionInfo = transactionInfoHolder.get();
// 将当前线程的事务属性设置进事务持有器中
transactionInfoHolder.set(this);
}
(五)事务信息
TransactionAspectSupport#commitTransactionAfterReturning 调用事务管理器提交事务概览伪代码
// 提交事务
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
// 事务信息不为空,并且事务信息对象持有事务状态不为空
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// 调用事务信息对象中事务管理器的事务提交方法
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
1、提交处理
AbstractPlatformTransactionManager#commit 事务提交伪代码
public final void commit(TransactionStatus status) throws TransactionException {
// 获取事务状态对象
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 处理事务提交
processCommit(defStatus);
}
AbstractPlatformTransactionManager#processCommit 处理事务提交伪代码
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
//当前状态是新事务
if (status.isNewTransaction()) {
// 若当前事务为新事务,事务提交
doCommit(status);
}
} finally {
//根据条件,完成后数据清除,和线程的私有资源解绑,重置连接自动提交,隔离级别,是否只读,释放连接,恢复挂起事务等
cleanupAfterCompletion(status);
}
}
DataSourceTransactionManager#doCommit 事务提交核心伪代码
protected void doCommit(DefaultTransactionStatus status) {
// 获取事务状态中的数据源事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
// 获取数据源事务对象中连接持有器的jdbc连接
Connection con = txObject.getConnectionHolder().getConnection();
try {
// jdbc连接事务提交
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
AbstractPlatformTransactionManager#cleanupAfterCompletion 提交完成时的清理核心伪代码
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
// 将当前事务设置为完成状态
status.setCompleted();
// 新事务完成事务提交,清理线程私有事务属性资源
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
// 恢复jdbc连接相关操作
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
// 恢复挂起事务
if (status.getSuspendedResources() != null) {
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
TransactionSynchronizationManager#clear() 清除当前线程私有的事务同步信息核心代码
// 事务同步
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
// 当前事务的名称
private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name");
// 当前事务是否只读
private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");
// 当前事务的隔离级别
private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");
// 实际事务是否激活
private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
public static void clear() {
synchronizations.remove();
currentTransactionName.remove();
currentTransactionReadOnly.remove();
currentTransactionIsolationLevel.remove();
actualTransactionActive.remove();
}
DataSourceTransactionManager#doCleanupAfterCompletion jdbc连接相关操作核心伪代码。 jdbc连接解绑、属性恢复、连接释放、连接关闭及数据源事务对象持有jdbc连接释放操作。
// 事务提交后jdbc连接相关处理
protected void doCleanupAfterCompletion(Object transaction) {
// 获取数据源事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 新事务,将数据库连接从当前线程中解除绑定
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
// 获取连接
Connection con = txObject.getConnectionHolder().getConnection();
// 关闭事务,恢复数据库连接的自动提交属性
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
// 重置数据库jdbc连接, 恢复原有隔离级别、是否只读属性
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
// 当前事务是独立的新创建的事务,在事务完成时释放数据库连接并关闭连接
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
}
// 清除数据源事务对象中的连接持有器
txObject.getConnectionHolder().clear();
}
AbstractPlatformTransactionManager#resume 恢复外层原挂起事务核心代码
// 恢复外层挂起事务
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
// 若存在挂起事务,恢复挂起事务
if (resourcesHolder != null) {
// 获取挂起事务的资源属性
Object suspendedResources = resourcesHolder.suspendedResources;
// 挂起事务的资源属性不为空,将事务属性资源绑定到线程私有的事务属性资源中
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
}
}
2、回滚处理
TransactionAspectSupport#completeTransactionAfterThrowing 事务回滚概览伪代码:
// 异常时事务处理
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// 事务信息中事务属性不为空,并且事务属性中回滚
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
// 进行回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
else {
// 回滚标识rollBackOnly为 false,执行提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
}
DefaultTransactionAttribute#rollbackOn 是否回滚判断标识
// 未捕获异常的回滚操作
public boolean rollbackOn(Throwable ex) {
return (ex instanceof RuntimeException || ex instanceof Error);
}
AbstractPlatformTransactionManager#processRollback 回滚核心伪代码
// 处理回滚
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
// 当前事务是一个新事务
if (status.isNewTransaction()) {
// 进行回滚
doRollback(status);
}
}
finally {
// 回滚完成时的处理
cleanupAfterCompletion(status);
}
}
DataSourceTransactionManager#doRollback 事务回滚核心伪代码
@Override
protected void doRollback(DefaultTransactionStatus status) {
// 获取事务状态中的数据源事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
// 获取数据源事务对象中连接持有器的jdbc连接
Connection con = txObject.getConnectionHolder().getConnection();
try {
// jdbc的回滚
con.rollback();
}catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
参考文章:
Spring 事务源码分析_spring事务源码分析_J3code的博客-CSDN博客
Spring事务源码分析_spring事务源码深度解析_程序员小潘的博客-CSDN博客
https://www.cnblogs.com/wt20/p/10957371.html
https://www.cnblogs.com/RunningSnails/p/17015808.html
Spring事务流程源码剖析+传播行为运用案例+心得 - 知乎
十八、spring 事务之事务执行流程 - 简书
https://www.cnblogs.com/RunningSnails/p/17015808.html
https://www.cnblogs.com/RunningSnails/p/17017176.html
https://www.cnblogs.com/RunningSnails/p/17020456.html
https://www.cnblogs.com/RunningSnails/p/17023323.html
https://www.cnblogs.com/RunningSnails/p/17025931.html