Spring源码解密--事务篇

news2024/11/25 20:12:21

文章目录

  • 一、事务的实现方式
    • 1、JDBC
    • 2、Spring
      • 基于xml配置
      • 编程式事务
      • 声明式事务
  • 二、源码设计
    • 1、TransactionManager
      • 1)TransactionManager
      • 2)PlatformTransactionManager
      • 3)ReactiveTransactionManager
    • 2、TransactionDefinition
    • 3、TransactionStatus
    • 4、 核心方法说明
      • 1)getTransaction
        • 关键方法1:doGetTransaction()
        • 关键方法2:suspend
        • 关键方法3:startTransaction
        • 关键方法4:handleExistingTransaction
        • (1)实验
          • a、执行m1()方法过程:
          • b、执行m2()方法过程如下:
        • (2)总结
      • 2) commit
        • 关键方法1:doCommit
        • 关键方法2:cleanupAfterCompletion
        • (1)实验
          • a、单事务
          • b、多事务
          • REQUIRED
          • NESTED
          • REQUIRES_NEW
          • NOT_SUPPORTED
        • (2)总结
      • 3)rollback
        • (1)实验
          • a、单事务
          • b、多事务
          • REQUIRED
          • NESTED
          • REQUIRES_NEW
          • NOT_SUPPORTED
        • (2)总结
  • 三、事务源码串联起飞
    • 1、通过debug模式来看看怎么调用事务
    • 2、TransactionInterceptor作为入口,如何注入容器中的?
  • 四、提炼
    • 1、@Transactional怎么实现事务的?
    • 2、怎么实现事务传播行为的?
      • 1)NOT_SUPPORTED
      • 2)REQUIRED,SUPPORTS,MANDATORY
      • 3)NESTED
      • 4)REQUIRES_NEW

本文相关源码包使用的是spring-tx-5.2.8.RELEASE.jar

一、事务的实现方式

1、JDBC

public static void main(String[] args) {
     Connection conn = null;
      Statement stmt = null;
      try {
          // 注册 JDBC 驱动
          Class.forName("com.mysql.cj.jdbc.Driver");
          // 打开连接
          conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC", "root", "xxx");
          // 执行查询
          stmt = conn.createStatement();
          // 关闭自动提交
          conn.setAutoCommit(false); 
		  // 执行更新
          String sql1 = "INSERT INTO test(id,user_name)values(1,'forlan1')";
          stmt.executeUpdate(sql1);
          String sql1 = "INSERT INTO test(id,user_name)values(2,'forlan2')";
          stmt.executeUpdate(sql2);

          // 事务提交
          conn.commit(); // 上面两个操作都没有问题就提交
      } catch (Exception e) {
          e.printStackTrace();
          // 事务回滚
          conn.rollback();
      } finally {
         // 关闭资源,stmt.close();conn.close();
      }
   }

其实关键就是3个动作:关闭自动提交、commit、rollback

2、Spring

基于xml配置

  • 创建事务管理器
  • 配置事务方法
  • 配置AOP
	 <bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager" id="transactionManager">
	 	<property name="dataSource" ref="dataSource"/>
	 </bean>
	 <tx:advice id="advice" transaction-manager="transactionManager">
	 	<tx:attributes>
	 		<tx:method name="forlan*" propagation="REQUIRED"/>
	 	</tx:attributes>
	 </tx:advice>
	 <!-- aop配置 -->
	 <aop:config>
		 <aop:pointcut expression="execution(* *..service.*.*(..))" id="tx"/>
	 	 <aop:advisor advice-ref="advice" pointcut-ref="tx"/>
	 </aop:config>

编程式事务

@Autowired
private PlatformTransactionManager txManager;

public void forlanTest() {
	// 1、创建事务定义
	DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
	// 2、根据定义开启事务
	TransactionStatus transaction = txManager.getTransaction(definition);
	try {
		//TODO 业务逻辑、SQL操作
		// 3、提交事务
		txManager.commit(transaction);
	} catch (Throwable e) {
		e.printStackTrace();
		// 4、异常了,回滚事务
		txManager.rollback(transaction);
	}
}

声明式事务

启动类加@EnableTransactionManagement,开启注解版本的事务
方法头部加@Transactional,开启事务

二、源码设计

通过编程式事务的代码我们可以发现,关键在于PlatformTransactionManager,TransactionDefinition,TransactionStatus这几个接口,下面我们就来深挖下这几个接口是怎么设计的

1、TransactionManager

在这里插入图片描述

1)TransactionManager

顶层接口,空代码

public interface TransactionManager {}

2)PlatformTransactionManager

这个是平台事务管理器,比较常用的,像我们一般的命令式编程就是用它

public interface PlatformTransactionManager extends TransactionManager {
    TransactionStatus getTransaction(@Nullable TransactionDefinition var1) throws TransactionException;

    void commit(TransactionStatus var1) throws TransactionException;

    void rollback(TransactionStatus var1) throws TransactionException;
}

PlatformTransactionManager有个核心的实现抽象类AbstractPlatformTransactionManager,实现了公共的方法
- JtaTransactionManager:支持分布式事务【本身服务中的多数据源】
- DataSourceTransactionManager:数据源事务管理器,提供了相关事务操作的方法

3)ReactiveTransactionManager

响应式编程的事务管理器,响应式编程会用,一般比较少使用

2、TransactionDefinition

事务定义,作为PlatformTransactionManager接口getTransactio()方法的入参,得到事务TransactionStatus;

public interface TransactionDefinition {
    int PROPAGATION_REQUIRED = 0;
    int PROPAGATION_SUPPORTS = 1;
    int PROPAGATION_MANDATORY = 2;
    int PROPAGATION_REQUIRES_NEW = 3;
    int PROPAGATION_NOT_SUPPORTED = 4;
    int PROPAGATION_NEVER = 5;
    int PROPAGATION_NESTED = 6;
    int ISOLATION_DEFAULT = -1;
    int ISOLATION_READ_UNCOMMITTED = 1;
    int ISOLATION_READ_COMMITTED = 2;
    int ISOLATION_REPEATABLE_READ = 4;
    int ISOLATION_SERIALIZABLE = 8;
    int TIMEOUT_DEFAULT = -1;

    default int getPropagationBehavior() {
        return PROPAGATION_REQUIRED;
    }
    default int getIsolationLevel() {
        return ISOLATION_DEFAULT;
    }
    default int getTimeout() {
        return TIMEOUT_DEFAULT;
    }
    default boolean isReadOnly() {
        return false;
    }
    @Nullable
    default String getName() {
        return null;
    }
    static TransactionDefinition withDefaults() {
        return StaticTransactionDefinition.INSTANCE;
    }
}

主要参数:传播行为、隔离级别,是否只读,超时时间
DefaultTransactionDefinition:是事务定义的默认实现

 private int propagationBehavior = 0;// PROPAGATION_REQUIRED
 private int isolationLevel = -1;// 跟随数据库
 private int timeout = -1;// 无超时时间
 private boolean readOnly = false;// 可读写

3、TransactionStatus

public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {
    boolean hasSavepoint(); //是否有保存点
    void flush(); // 刷新
}

4、 核心方法说明

1)getTransaction

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
		throws TransactionException {

	// 如果未给出事务定义,则使用默认值
	TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

	// 关键方法1:获取数据源事务对象DataSourceTransactionObject(是否允许保存点,ConnectionHolder)
	Object transaction = doGetTransaction();
	boolean debugEnabled = logger.isDebugEnabled();

	/**
	 * 判断当前线程是否存在事务:connectionHolder != null && transactionActive
	 * 第1次调用,connectionHolder为null
	 * 第2次调用,connectionHolder的transactionActive=true
	 */
	if (isExistingTransaction(transaction)) {
		// Existing transaction found -> check propagation behavior to find out how to behave.
		// 关键方法4:处理已经存在的事务
		return handleExistingTransaction(def, transaction, debugEnabled);
	}

	// 超时,抛出异常
	if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
		throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
	}

	// 传播行为是PROPAGATION_MANDATORY且当前线程不存在事务,就抛出异常
	if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
		throw new IllegalTransactionStateException(
				"No existing transaction found for transaction marked with propagation 'mandatory'");
	}
	else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
			def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
			def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		// 关键方法2:挂起事务
		SuspendedResourcesHolder suspendedResources = suspend(null);
		if (debugEnabled) {
			logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
		}
		try {
			 // 关键方法3:创建新事务,transaction = xxx;newTransaction = true;newSynchronization = true
			return startTransaction(def, transaction, debugEnabled, suspendedResources);
		}
		catch (RuntimeException | Error ex) {
			// 恢复挂起的事务
			resume(null, suspendedResources);
			throw ex;
		}
	}
	else {// PROPAGATION_SUPPORTS、PROPAGATION_NOT_SUPPORTED、PROPAGATION_NEVER这些传播行为进到这里
		if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
			logger.warn("Custom isolation level specified but no actual transaction initiated; " +
					"isolation level will effectively be ignored: " + def);
		}
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		// 创建空事务,transaction = null;newTransaction = true;newSynchronization = true
		return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
	}
}

protected boolean isExistingTransaction(Object transaction) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
	return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

关键方法1:doGetTransaction()

创建一个数据源事务对象DataSourceTransactionObject,设置SavepointAllowed和ConnectionHolder,ConnectionHolder里面会存放的是JDBC连接(第一次进来是null)

protected Object doGetTransaction() {
	// 创建一个数据源事务对象
    DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
    // 设置是否允许保存点,DataSourceTransactionManager里面为true
    txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
	/**
	 * TransactionSynchronizationManager是事务同步管理器对象,用来保存当前事务信息
	 * 第一次进来获取的话,因为是通过数据源去获取,但事务同步管理器中还没有存放,所以此时获取的conHolder为null
	 */
    ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());
    // 非新创建连接则写false,newConnectionHolder=false
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

关键方法2:suspend

挂起事务,暂停事务同步或解绑数据源的数据库连接对象,返回这些资源

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
	if (TransactionSynchronizationManager.isSynchronizationActive()) {// 事务和同步中有东西,说明事务和同步都激活
		// 暂停所有当前同步并停用当前线程的事务同步,返回挂起的事务同步对象列表
		List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
		try {
			Object suspendedResources = null;
			if (transaction != null) {
				// 解绑数据源的数据库连接,返回该连接
				suspendedResources = doSuspend(transaction);
			}
			String name = TransactionSynchronizationManager.getCurrentTransactionName();
			TransactionSynchronizationManager.setCurrentTransactionName(null);
			boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
			TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
			Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
			boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
			TransactionSynchronizationManager.setActualTransactionActive(false);
			// 返回挂起的ResourcesHolder
			return new SuspendedResourcesHolder(
					suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
		}
		catch (RuntimeException | Error ex) {
			// 挂起失败,重新激活当前线程的事务同步并恢复所有给定的同步
			doResumeSynchronization(suspendedSynchronizations);
			throw ex;
		}
	}
	else if (transaction != null) {// 事务激活,但没激活同步
		// 解绑数据源的数据库连接,返回该连接
		Object suspendedResources = doSuspend(transaction);
		// 返回挂起的ResourcesHolder
		return new SuspendedResourcesHolder(suspendedResources);
	}
	else {
		// 事务和同步都没激活
		return null;
	}
}

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
		new NamedThreadLocal<>("Transaction synchronizations");
public static boolean isSynchronizationActive() {
	return (synchronizations.get() != null);
}

关键方法3:startTransaction

创建TransactionStatus实例,填充属性值,获取数据库连接,关闭自动提交,绑定数据源和数据库连接到当前线程,激活事务+同步

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources) {
    // 这里的transactionSynchronization = 0,所以newSynchronization =true
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // 创建指定参数的TransactionStatus实例,true指的是newTransaction,创建后,事务提交和回滚状态都为false
    DefaultTransactionStatus status = newTransactionStatus(
		definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    // 获取数据库连接,关闭自动提交,绑定数据源和数据库连接到当前线程,激活事务
    doBegin(transaction, definition);
    // newSynchronization=true,激活事务+同步
    prepareSynchronization(status, definition);
    return status;
}

1、newTransactionStatus方法:创建TransactionStatus实例,填充属性值

/**
 * status = {DefaultTransactionStatus@10391}
 *  completed = false
 *  debug = false
 *  newSynchronization = true
 *  newTransaction = true
 *  readOnly = false
 *  rollbackOnly = false
 *  savepoint = null
 *  suspendedResources = null
 *  transaction = {DataSourceTransactionManager$DataSourceTransactionObject@10383}
 *   connectionHolder = null
 *   mustRestoreAutoCommit = false
 *   newConnectionHolder = false
 *   previousIsolationLevel = null
 *   readOnly = false
 *   savepointAllowed = true
 */
protected DefaultTransactionStatus newTransactionStatus(
		TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
		boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
	// newSynchronization = true并且之前没激活同步(synchronizations线程对象没东西)
	boolean actualNewSynchronization = newSynchronization &&
			!TransactionSynchronizationManager.isSynchronizationActive();
	return new DefaultTransactionStatus(
			transaction, newTransaction, actualNewSynchronization,
			definition.isReadOnly(), debug, suspendedResources);
}

public DefaultTransactionStatus(
			@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,
			boolean readOnly, boolean debug, @Nullable Object suspendedResources) {

		this.transaction = transaction;
		this.newTransaction = newTransaction;
		this.newSynchronization = newSynchronization;
		this.readOnly = readOnly;
		this.debug = debug;
		this.suspendedResources = suspendedResources;
	}

2、doBegin方法:获取数据库连接,关闭自动提交,绑定数据源和ConnectionHolder到当前线程,设置DataSourceTransactionObject 信息(激活事务、同步),
可能的操作(设置只读、隔离解绑)

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject
    Connection con = null;
    try {
    	// ConnectionHolder为空 或 已经激活事务同步
        if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // 通过数据源获取数据库连接对象
			Connection newCon = this.obtainDataSource().getConnection();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
			// 把数据库连接对象包装为ConnectionHolder对象,然后设置到txObject对象中
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }
		// 设置ConnectionHolder的synchronizedWithTransaction为true
        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        // 获取数据库连接对象
		con = txObject.getConnectionHolder().getConnection();
        // 如果有是否只读和隔离级别!=-1,设置con.setReadOnly(true);con.setTransactionIsolation(xxx);
		Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        // 设置txObject的隔离级别
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        // 设置txObject是否只读
		txObject.setReadOnly(definition.isReadOnly());
        if (con.getAutoCommit()) {// 如果是自动提交
            txObject.setMustRestoreAutoCommit(true);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false);// 关闭自动提交
        }
        // 准备事务连接,如果“enforceReadOnly”标志设置为true,并且事务定义指示只读事务,则默认实现将执行“SET TRANSACTION IONLY”语句
        prepareTransactionalConnection(con, definition);
        // 设置ConnectionHolder的transactionActive为true,表示已经存在事务,后面就会进入有事务的逻辑
        txObject.getConnectionHolder().setTransactionActive(true);
        int timeout = this.determineTimeout(definition);
        if (timeout != -1) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);// 设置事务超时时间
        }
        if (txObject.isNewConnectionHolder()) {
			// 将资源绑定到当前线程,key=数据源,value=ConnectionHolder
            TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
        }
    } catch (Throwable var7) {
        if (txObject.isNewConnectionHolder()) {
			// 关闭数据库连接
            DataSourceUtils.releaseConnection(con, this.obtainDataSource());
            txObject.setConnectionHolder((ConnectionHolder)null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
    }
}

3、prepareSynchronization方法:如果newSynchronization = true,初始化事务同步管理器

protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
	if (status.isNewSynchronization()) {
		TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
		TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
				definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
						definition.getIsolationLevel() : null);
		TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
		TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
		TransactionSynchronizationManager.initSynchronization();
	}
}

关键方法4:handleExistingTransaction

处理存在事务的情况下,不同传播行为的逻辑,主要看是否抛出异常,是否挂起资源,是执行startTransaction
startTransaction和prepareTransactionStatus的区别,startTransaction多执行了doBegin()方法,开启了新事物,返回的参数为true(newSynchronization,newTransaction,mustRestoreAutoCommit,newConnectionHolder)
说明:newTransaction:true表示新开事务,false参与到当前事务
newSynchronization:后面提交和回滚有用

private TransactionStatus handleExistingTransaction(
		TransactionDefinition definition, Object transaction, boolean debugEnabled)
		throws TransactionException {
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
		// 当前存在事务,则抛出异常
		throw new IllegalTransactionStateException(
				"Existing transaction found for transaction marked with propagation 'never'");
	}
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction");
		}
		// 挂起当前事务,返回挂起资源
		Object suspendedResources = suspend(transaction);
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		// 创建空事务,并携带挂起资源,transaction = null;suspendedResources=xxx; newTransaction = false;newSynchronization = true;
		return prepareTransactionStatus(
				definition, null, false, newSynchronization, debugEnabled, suspendedResources);
	}
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
		if (debugEnabled) {
			logger.debug("Suspending current transaction, creating new transaction with name [" +
					definition.getName() + "]");
		}
		// 挂起当前事务,返回挂起资源
		SuspendedResourcesHolder suspendedResources = suspend(transaction);
		try {
			// 创建新事务,并携带挂起资源,transaction = xxx;suspendedResources=xxx; newTransaction = true;newSynchronization = true;
			return startTransaction(definition, transaction, debugEnabled, suspendedResources);
		}
		catch (RuntimeException | Error beginEx) {
			resumeAfterBeginException(transaction, suspendedResources, beginEx);
			throw beginEx;
		}
	}
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		if (!isNestedTransactionAllowed()) {// 不允许嵌套事务就报错
			throw new NestedTransactionNotSupportedException(
					"Transaction manager does not allow nested transactions by default - " +
					"specify 'nestedTransactionAllowed' property with value 'true'");
		}
		if (debugEnabled) {
			logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
		}
		if (useSavepointForNestedTransaction()) {
			// Create savepoint within existing Spring-managed transaction,
			// through the SavepointManager API implemented by TransactionStatus.
			// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
			
			// 创建新事务,不挂起资源,transaction = xxx;suspendedResources=null; newTransaction = false;newSynchronization = false;
			DefaultTransactionStatus status =
					prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
			// 创建保存点,savepoint=xxx
			status.createAndHoldSavepoint();
			return status;
		}
		else {
			// Nested transaction through nested begin and commit/rollback calls.
			// Usually only for JTA: Spring synchronization might get activated here
			// in case of a pre-existing JTA transaction.
			// 创建新事务
			return startTransaction(definition, transaction, debugEnabled, null);
		}
	}
	// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
	if (debugEnabled) {
		logger.debug("Participating in existing transaction");
	}
	if (isValidateExistingTransaction()) {
		if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
			Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
			if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
				Constants isoConstants = DefaultTransactionDefinition.constants;
				throw new IllegalTransactionStateException("Participating transaction with definition [" +
						definition + "] specifies isolation level which is incompatible with existing transaction: " +
						(currentIsolationLevel != null ?
								isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
								"(unknown)"));
			}
		}
		if (!definition.isReadOnly()) {
			if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
				throw new IllegalTransactionStateException("Participating transaction with definition [" +
						definition + "] is not marked as read-only but existing transaction is");
			}
		}
	}
	// 传播行为:PROPAGATION_REQUIRED,PROPAGATION_SUPPORTS,PROPAGATION_MANDATORY,会走到这里的逻辑
	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
	// 创建新事务,不挂起资源,transaction = xxx;suspendedResources=null; newTransaction = false;newSynchronization = false;
	return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

(1)实验

我们通过以下代码来分析源码做了什么

@Transactional/*(propagation = Propagation.SUPPORTS)*/
public void m1(){
	forlanDao.update(xxx);
	forlanService.m2();
}

@Transactional(propagation = Propagation.REQUIRED)
// @Transactional(propagation = Propagation.NESTED)
// @Transactional(propagation = Propagation.REQUIRES_NEW)
// @Transactional(propagation = Propagation.NOT_SUPPORTED)
public void m2(){
	forlanDao.insert(xxx);
}
a、执行m1()方法过程:
  • 调用doGetTransaction()方法,创建DataSourceTransactionObjec
  • 调用isExistingTransaction()方法,判断当前线程是否存在事务,最外层方法,肯定不存在
    • REQUIRED
      • 调用suspend()方法,什么都没做
      • 调用startTransaction()方法,创建TransactionStatus对象,获取数据库连接,关闭自动提交,绑定数据源和数据库连接到当前线程,激活事务+同步
    • SUPPORTS
      • 调用prepareTransactionStatus()方法,创建TransactionStatus对象
        最终,返回的TransactionStatus对象,内容如下:
transactionStatus = {DefaultTransactionStatus@10569} 
 transaction = {DataSourceTransactionManager$DataSourceTransactionObject@10398} 
  newConnectionHolder = true
  mustRestoreAutoCommit = true
  connectionHolder = {ConnectionHolder@10577} 
  previousIsolationLevel = null
  readOnly = false
  savepointAllowed = true
 newTransaction = true
 newSynchronization = true
 readOnly = false
 debug = false
 suspendedResources = null
 rollbackOnly = false
 completed = false
 savepoint = null

放开注释的话,返回的TransactionStatus对象,内容如下:

transactionStatus = {DefaultTransactionStatus@10421} 
 transaction = null
 newTransaction = true
 newSynchronization = true
 readOnly = false
 debug = false
 suspendedResources = null
 rollbackOnly = false
 completed = false
 savepoint = null

主要区别在于transaction是否为空,为null表示没有事务,否则就是有事务

b、执行m2()方法过程如下:
  • 调用doGetTransaction()方法,创建DataSourceTransactionObjec
  • 调用isExistingTransaction()方法,判断当前线程是否存在事务,前面已经创建过事务了,肯定存在
  • 调用handleExistingTransaction()方法,这里就有区分逻辑了,看我们的传播行为是什么
    • PROPAGATION_NEVER,抛异常
    • PROPAGATION_NOT_SUPPORTED,挂起当前事务,创建空事务(带挂起资源)
    • PROPAGATION_REQUIRES_NEW,挂起当前事务,创建新事务(带挂起资源)
    • PROPAGATION_NESTED,加入当前事务(带保存点)
    • 其它行为,比如REQUIRED,SUPPORTS,MANDATORY,加入当前事务

1)REQUIRED的话,返回的TransactionStatus对象,内容如下:

transactionStatus = {DefaultTransactionStatus@10678} 
 transaction = {DataSourceTransactionManager$DataSourceTransactionObject@10652} 
  newConnectionHolder = false
  mustRestoreAutoCommit = false
  connectionHolder = {ConnectionHolder@10683} 
  previousIsolationLevel = null
  readOnly = false
  savepointAllowed = true
 suspendedResources = null
 newTransaction = false
 newSynchronization = false
 readOnly = false
 debug = false
 rollbackOnly = false
 completed = false
 savepoint = null

注:SUPPORTS,MANDATORY也是一样内容

2)NESTED的话,返回的TransactionStatus对象,内容如下:

transactionStatus = {DefaultTransactionStatus@10675} 
 transaction = {DataSourceTransactionManager$DataSourceTransactionObject@10652} 
  newConnectionHolder = false
  mustRestoreAutoCommit = false
  connectionHolder = {ConnectionHolder@10695} 
  previousIsolationLevel = null
  readOnly = false
  savepointAllowed = true
 suspendedResources = null
 newTransaction = false
 newSynchronization = false
 readOnly = false
 debug = false
 rollbackOnly = false
 completed = false
 savepoint = {MysqlSavepoint@10692} 
  savepointName = "SAVEPOINT_1"
  exceptionInterceptor = null

3)REQUIRES_NEW的话,返回的TransactionStatus对象,内容如下:

transactionStatus = {DefaultTransactionStatus@10672} 
 transaction = {DataSourceTransactionManager$DataSourceTransactionObject@10652} 
  newConnectionHolder = true
  mustRestoreAutoCommit = true
  connectionHolder = {ConnectionHolder@10678} 
  previousIsolationLevel = null
  readOnly = false
  savepointAllowed = true
 suspendedResources = {AbstractPlatformTransactionManager$SuspendedResourcesHolder@10677} 
  suspendedResources = {ConnectionHolder@10679} 
  suspendedSynchronizations = {Collections$UnmodifiableRandomAccessList@10680}  size = 1
  name = "cn.lw.service.impl.ShiwuServiceImpl.insert"
  readOnly = false
  isolationLevel = null
  wasActive = true
 newTransaction = true
 newSynchronization = true
 readOnly = false
 debug = false
 rollbackOnly = false
 completed = false
 savepoint = null

4)NOT_SUPPORTED的话,返回的TransactionStatus对象,内容如下:

transactionStatus = {DefaultTransactionStatus@10678} 
 transaction = null
 suspendedResources = {AbstractPlatformTransactionManager$SuspendedResourcesHolder@10666} 
  suspendedResources = {ConnectionHolder@10681} 
   connectionHandle = {SimpleConnectionHandle@10687} "SimpleConnectionHandle: HikariProxyConnection@961058126 wrapping com.mysql.jdbc.JDBC4Connection@4d0128fe"
   currentConnection = {HikariProxyConnection@10688} "HikariProxyConnection@961058126 wrapping com.mysql.jdbc.JDBC4Connection@4d0128fe"
   transactionActive = true
   savepointsSupported = null
   savepointCounter = 0
   synchronizedWithTransaction = true
   rollbackOnly = false
   deadline = null
   referenceCount = 1
   isVoid = false
  suspendedSynchronizations = {Collections$UnmodifiableRandomAccessList@10682}  size = 1
  name = "cn.lw.service.impl.ShiwuServiceImpl.insert"
  readOnly = false
  isolationLevel = null
  wasActive = true
 newTransaction = false
 newSynchronization = true
 readOnly = false
 debug = false
 rollbackOnly = false
 completed = false
 savepoint = null

(2)总结

总的区别:

  • transaction是否为null
  • suspendedResources是否为null
  • savepoint是否为null
  • newTransaction、newSynchronization布尔值不一样

属性值区别:

  • NOT_SUPPORTED:transaction=null,suspendedResources=挂起资源,savepoint=null,newTransaction=false,newSynchronization= true
  • REQUIRED,SUPPORTS,MANDATORY:transaction={newConnectionHolder = false,mustRestoreAutoCommit = false},suspendedResources=null,savepoint=null,newTransaction=false,newSynchronization= false
  • NESTED:transaction={newConnectionHolder = false,mustRestoreAutoCommit = false},suspendedResources=null,savepoint=保存点,newTransaction=false,newSynchronization= false
  • REQUIRES_NEW:transaction={newConnectionHolder = true,mustRestoreAutoCommit = true},suspendedResources=挂起资源,savepoint=null,newTransaction=true,newSynchronization= true

transaction表示是否以事务的方式执行,newConnectionHolder表示是否开启新事物

2) commit

public final void commit(TransactionStatus status) throws TransactionException {
	if (status.isCompleted()) {// completed=true,说明之前已经提交或回滚
		throw new IllegalTransactionStateException(
				"Transaction is already completed - do not call commit or rollback more than once per transaction");
	}
	DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
	if (defStatus.isLocalRollbackOnly()) {// rollbackOnly=true,请求回滚
		if (defStatus.isDebug()) {
			logger.debug("Transactional code has requested rollback");
		}
		processRollback(defStatus, false);
		return;
	}
	if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
		if (defStatus.isDebug()) {
			logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
		}
		processRollback(defStatus, true);
		return;
	}
	processCommit(defStatus);
}

处理提交

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
	try {
		boolean beforeCompletionInvoked = false;
		try {
			boolean unexpectedRollback = false;
			// 为提交做准备,空方法,交给子类去实现
			prepareForCommit(status);			
			// newSynchronization=true,执行TransactionSynchronization子类的beforeCommit方法,提供给我们扩展实现,actualTransactionActive有东西的话,提交Connection、Session
			triggerBeforeCommit(status);
			// newSynchronization=true,执行TransactionSynchronization子类的beforeCompletion方法,提供给我们扩展实现,不被其它资源引用,(referenceCount=0)的话,解绑本地线程的resources,holderActive=false,关闭Connection、Session
			triggerBeforeCompletion(status);
			beforeCompletionInvoked = true;
			if (status.hasSavepoint()) {
				if (status.isDebug()) {
					logger.debug("Releasing transaction savepoint");
				}
				unexpectedRollback = status.isGlobalRollbackOnly();
				// releaseSavepoint,设置savepoint=null
				status.releaseHeldSavepoint();
			}
			else if (status.isNewTransaction()) {// newTransaction=true
				if (status.isDebug()) {
					logger.debug("Initiating transaction commit");
				}
				unexpectedRollback = status.isGlobalRollbackOnly();// 是否需要回滚
				doCommit(status);// 执行commit方法
			}
			else if (isFailEarlyOnGlobalRollbackOnly()) {
				unexpectedRollback = status.isGlobalRollbackOnly();
			}
			// Throw UnexpectedRollbackException if we have a global rollback-only
			// marker but still didn't get a corresponding exception from commit.
			if (unexpectedRollback) {
				throw new UnexpectedRollbackException(
						"Transaction silently rolled back because it has been marked as rollback-only");
			}
		}
		catch (UnexpectedRollbackException ex) {
			// can only be caused by doCommit
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
			throw ex;
		}
		catch (TransactionException ex) {
			// can only be caused by doCommit
			if (isRollbackOnCommitFailure()) {
				doRollbackOnCommitException(status, ex);
			}
			else {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
			}
			throw ex;
		}
		catch (RuntimeException | Error ex) {
			if (!beforeCompletionInvoked) {
				triggerBeforeCompletion(status);
			}
			doRollbackOnCommitException(status, ex);
			throw ex;
		}
		// Trigger afterCommit callbacks, with an exception thrown there
		// propagated to callers but the transaction still considered as committed.
		try {
			// newSynchronization=true的话,在提交之后触发,空方法,交给子类去实现
			triggerAfterCommit(status);
		}
		finally {
			// newSynchronization=true,清除当前线程的synchronizations,TransactionSynchronization子类的afterCompletion方法,给我们提供的扩展
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
		}
	}
	finally {
		// 设置TransactionStatus的completed=true,可能需要清除本地线程信息,重置connection,关闭连接,恢复挂起资源
		cleanupAfterCompletion(status);
	}
}

关键方法1:doCommit

提交事务

protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)status.getTransaction();
    // 获取Connection对象
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        this.logger.debug("Committing JDBC transaction on Connection [" + con + "]");
    }
    try {
    	// 执行提交
        con.commit();
    } catch (SQLException var5) {
        throw new TransactionSystemException("Could not commit JDBC transaction", var5);
    }
}

关键方法2:cleanupAfterCompletion

  • 设置TransactionStatus的completed=true
  • newSynchronization=true的话,清除当前线程的事务信息,仅保留事务资源ConnectionHolder
  • newTransaction=true的话,移除线程中的ConnectionHolder,重置connection,清空ConnectionHolder和ResourceHolder事务状态(newConnectionHolder=true,移除当前线程的connection,关闭连接)
  • suspendedResources有的话,恢复挂起的资源,绑定到本地线程resources;恢复挂起的TransactionSynchronization
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
	status.setCompleted();// 设置completed=true
	if (status.isNewSynchronization()) {
		// 清除当前线程的事务信息,仅保留事务资源ConnectionHolder
		TransactionSynchronizationManager.clear();
	}
	if (status.isNewTransaction()) {
		// 移除当前线程的ConnectionHolder,重置connection
		doCleanupAfterCompletion(status.getTransaction());
	}
	if (status.getSuspendedResources() != null) {// 有挂起资源
		if (status.isDebug()) {
			logger.debug("Resuming suspended transaction after completion of inner transaction");
		}
		Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
		// 恢复挂起的资源,绑定到本地线程resources;恢复挂起的TransactionSynchronization
		resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
	}
}

// TransactionSynchronizationManager.java
public static void clear() {
	synchronizations.remove();
	currentTransactionName.remove();
	currentTransactionReadOnly.remove();
	currentTransactionIsolationLevel.remove();
	actualTransactionActive.remove();
}

protected void doCleanupAfterCompletion(Object transaction) {
	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

	// Remove the connection holder from the thread, if exposed.
	if (txObject.isNewConnectionHolder()) {
		// 移除当前线程的ConnectionHolder
		TransactionSynchronizationManager.unbindResource(obtainDataSource());
	}

	// Reset connection.
	Connection con = txObject.getConnectionHolder().getConnection();
	try {
		if (txObject.isMustRestoreAutoCommit()) {// mustRestoreAutoCommit=true,恢复自动提交
			con.setAutoCommit(true);
		}
		// 重置隔离级别和是否只读,con.setTransactionIsolation(上一个隔离级别);con.setReadOnly(false);
		DataSourceUtils.resetConnectionAfterTransaction(
				con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
	}
	catch (Throwable ex) {
		logger.debug("Could not reset JDBC Connection after transaction", ex);
	}

	if (txObject.isNewConnectionHolder()) {
		if (logger.isDebugEnabled()) {
			logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
		}
		// 如果当前是事务连接,设置ConnectionHolder.currentConnection=null,否则关闭连接con.close()
		DataSourceUtils.releaseConnection(con, this.dataSource);
	}
	// 清空ConnectionHolder和ResourceHolder事务状态
	txObject.getConnectionHolder().clear();
}

public void clear() {
    this.synchronizedWithTransaction = false;
	this.rollbackOnly = false;
	this.deadline = null;
    this.transactionActive = false;
    this.savepointsSupported = null;
    this.savepointCounter = 0;
}

(1)实验

a、单事务

调用processCommit

  • 执行提交/完成前置方法
  • doCommit:提交事务,con.commit();
  • 执行提交/完成后置方法
  • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息以及资源,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态
b、多事务

外层方法默认是REQUIRED,表示有事务,没有事务的情况下就不用演示了,就是走我们的单事务逻辑,接下来我们还是分析以下代码,来看看不同传播行为的提交方式

@Transactional
public void m1(){
	forlanDao.update(xxx);
	forlanService.m2();
}

@Transactional(propagation = Propagation.REQUIRED)
// @Transactional(propagation = Propagation.NESTED)
// @Transactional(propagation = Propagation.REQUIRES_NEW)
// @Transactional(propagation = Propagation.NOT_SUPPORTED)
public void m2(){
	forlanDao.insert(xxx);
}
REQUIRED

REQUIRED,SUPPORTS,MANDATORY三者是一样的,都是有事务就加入当前事务,调用getTransaction方法得到的TransactionStatus是一样的。
先调用m2()方法的commitTransactionAfterReturning,执行过程如下:

  • 调用processCommit
    • cleanupAfterCompletion:设置TransactionStatus的completed=true

再调用m1()方法的commitTransactionAfterReturning

  • 调用processCommit
    • 执行提交/完成前置方法
    • doCommit:提交事务,con.commit();
    • 执行提交/完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,移除线程中的ConnectionHolder,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态
NESTED

先调用m2()方法的commitTransactionAfterReturning,执行过程如下:

  • 调用processCommit
    • releaseHeldSavepoint:releaseSavepoint,设置savepoint=null
    • cleanupAfterCompletion:设置TransactionStatus的completed=true

再调用m1()方法的commitTransactionAfterReturning

  • 调用processCommit
    • 执行提交/完成前置方法
    • doCommit:提交事务,con.commit();
    • 执行提交/完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,移除线程中的ConnectionHolder,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态
REQUIRES_NEW

先调用m2()方法的commitTransactionAfterReturning,执行过程如下:

  • 调用processCommit
    • 执行提交/完成前置方法
    • doCommit:提交事务,con.commit();
    • 执行提交/完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,移除线程中的ConnectionHolder,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态,恢复挂起的资源,绑定到本地线程resources;恢复挂起的TransactionSynchronization

再调用m1()方法的commitTransactionAfterReturning

  • 调用processCommit
    • 执行提交/完成前置方法
    • doCommit:提交事务,con.commit();
    • 执行提交/完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,移除线程中的ConnectionHolder,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态
NOT_SUPPORTED

先调用m2()方法的commitTransactionAfterReturning,执行过程如下:

  • 调用processCommit
    • 执行提交/完成前置方法
    • 执行提交/完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,恢复挂起的资源,绑定到本地线程resources;恢复挂起的TransactionSynchronization

再调用m1()方法的commitTransactionAfterReturning

  • 调用processCommit
    • 执行提交/完成前置方法
    • doCommit:提交事务,con.commit();
    • 执行提交/完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,移除线程中的ConnectionHolder,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态,恢复挂起的资源,绑定到本地线程resources;恢复挂起的TransactionSynchronization

(2)总结

  • transaction有没有值,表示是否以事务方式执行
  • newConnectionHolder=true,表示是否以新事务方式执行
  • mustRestoreAutoCommit=true,前面关闭了自动提交,需要恢复自动提交
  • suspendedResources表示是否有挂起资源,如果是加入当前事务,就不需要挂起
  • newTransaction=true,提交事务,con.commit();重置connection,清空ConnectionHolder和ResourceHolder事务状态(newConnectionHolder=true,移除当前线程的connection,关闭连接)
  • newSynchronization=true,执行提交/完成前置方法以及后置方法,清除当前线程的事务信息,可以理解为后面还有一个新事务要执行,所以需要事务同步

3)rollback

public final void rollback(TransactionStatus status) throws TransactionException {
	if (status.isCompleted()) {
		throw new IllegalTransactionStateException(
				"Transaction is already completed - do not call commit or rollback more than once per transaction");
	}
	DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
	processRollback(defStatus, false);
}

执行回滚

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
	try {
		boolean unexpectedRollback = unexpected;
		try {
			// newSynchronization=true,执行TransactionSynchronization子类的beforeCompletion方法,提供给我们扩展实现,不被其它资源引用,(referenceCount=0)的话,解绑本地线程的resources,holderActive=false,关闭Connection、Session
			triggerBeforeCompletion(status);
			if (status.hasSavepoint()) {
				if (status.isDebug()) {
					logger.debug("Rolling back transaction to savepoint");
				}
				status.rollbackToHeldSavepoint();
			}
			else if (status.isNewTransaction()) {
				if (status.isDebug()) {
					logger.debug("Initiating transaction rollback");
				}
				// 事务回滚,con.rollback();
				doRollback(status);
			}
			else {
				// Participating in larger transaction
				if (status.hasTransaction()) {
					if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
						}
						// 设置ResourceHolder的rollbackOnly=true
						doSetRollbackOnly(status);
					}
					else {
						if (status.isDebug()) {
							logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
						}
					}
				}
				else {
					logger.debug("Should roll back transaction but cannot - no transaction available");
				}
				// Unexpected rollback only matters here if we're asked to fail early
				if (!isFailEarlyOnGlobalRollbackOnly()) {
					unexpectedRollback = false;
				}
			}
		}
		catch (RuntimeException | Error ex) {
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
			throw ex;
		}
		// newSynchronization=true,清除当前线程的synchronizations,TransactionSynchronization子类的afterCompletion方法,给我们提供的扩展
		triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
		// Raise UnexpectedRollbackException if we had a global rollback-only marker
		if (unexpectedRollback) {
			throw new UnexpectedRollbackException(
					"Transaction rolled back because it has been marked as rollback-only");
		}
	}
	finally {
		// 设置TransactionStatus的completed=true,可能需要清除本地线程信息,重置connection,关闭连接,恢复挂起资源
		cleanupAfterCompletion(status);
	}
}

(1)实验

a、单事务

调用processRollback

  • 执行前置完成方法
  • doRollback:事务回滚,con.rollback();
  • 执行后置完成方法
  • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息以及资源,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态
b、多事务

外层方法默认是REQUIRED,表示有事务,没有事务的情况下就不用演示了,就是走我们的单事务逻辑,接下来我们还是分析以下代码,来看看不同传播行为的回滚方式

@Transactional
public void m1(){
	forlanDao.update(xxx);
	forlanService.m2();
}

@Transactional(propagation = Propagation.REQUIRED)
// @Transactional(propagation = Propagation.NESTED)
// @Transactional(propagation = Propagation.REQUIRES_NEW)
// @Transactional(propagation = Propagation.NOT_SUPPORTED)
public void m2(){
	forlanDao.insert(xxx);
	int i = 1/0;
}
REQUIRED

REQUIRED,SUPPORTS,MANDATORY三者是一样的,都是有事务就加入当前事务,调用getTransaction方法得到的TransactionStatus是一样的。
先调用m2()方法的completeTransactionAfterThrowing,执行过程如下:

  • 调用processRollback
    • doSetRollbackOnly: 设置ResourceHolder的rollbackOnly=true
    • cleanupAfterCompletion:设置TransactionStatus的completed=true

再调用m1()方法的completeTransactionAfterThrowing

  • 调用processRollback
    • 执行完成前置方法
    • doRollback:事务回滚,con.rollback();
    • 执行完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,移除线程中的ConnectionHolder,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态
NESTED

先调用m2()方法的completeTransactionAfterThrowing,执行过程如下:

  • 调用processRollback
    • rollbackToHeldSavepoint:回滚到保存点,设置ResourceHolder的rollbackOnly=true,releaseSavepoint,设置savepoint=null
    • cleanupAfterCompletion:设置TransactionStatus的completed=true

再调用m1()方法的completeTransactionAfterThrowing

  • 调用processRollback
    • 执行完成前置方法
    • doRollback:事务回滚,con.rollback();
    • 执行完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,移除线程中的ConnectionHolder,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态

特殊,如果我们在m1()方法中捕获了异常,这时候m1()方法怎么执行?

@Transactional
public void insert(ForlanA forlanA, ForlanB forlanB) {
	forlanADao.insert(forlanA);
	try {
		forlanBService.insert(forlanB);
	} catch (Exception e) {
		System.out.println("报错了");
	}
}

在这里插入图片描述
这时候m1()方法会调用commitTransactionAfterReturning,执行processCommit提交事务,会不会把我们的m2()方法执行的一起提交?
答案肯定是不会的,什么叫嵌套事务,内部回滚是不会影响外部的,前面我们说到,有保存点,会调用rollbackToHeldSavepoint,就是在这里面回滚到了保存点,我们看下源码

public void rollbackToHeldSavepoint() throws TransactionException {
	Object savepoint = getSavepoint();
	if (savepoint == null) {
		throw new TransactionUsageException(
				"Cannot roll back to savepoint - no savepoint associated with current transaction");
	}
	// 关键就在这,会进行事务回滚到保存点,使用的
	getSavepointManager().rollbackToSavepoint(savepoint);
	getSavepointManager().releaseSavepoint(savepoint);
	setSavepoint(null);
}

/**
 * This implementation rolls back to the given JDBC 3.0 Savepoint.
 * @see java.sql.Connection#rollback(java.sql.Savepoint)
 */
public void rollbackToSavepoint(Object savepoint) throws TransactionException {
	ConnectionHolder conHolder = getConnectionHolderForSavepoint();
	try {
		// 回滚到保存点,调用了Connection.rollback(savepoint)
		conHolder.getConnection().rollback((Savepoint) savepoint);
		conHolder.resetRollbackOnly();
	}
	catch (Throwable ex) {
		throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
	}
}
REQUIRES_NEW

先调用m2()方法的completeTransactionAfterThrowing,执行过程如下:

  • 调用processRollback
    • 执行完成前置方法
    • doRollback:事务回滚,con.rollback();
    • 执行完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,移除线程中的ConnectionHolder,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态,恢复挂起的资源,绑定到本地线程resources;恢复挂起的TransactionSynchronization

再调用m1()方法的completeTransactionAfterThrowing

  • 调用processRollback
    • 执行完成前置方法
    • doRollback:事务回滚,con.rollback();
    • 执行完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,移除线程中的ConnectionHolder,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态

假设父方法中报错了,此时m2()方法已经执行了commitTransactionAfterReturning, 事务已经提交,不会回滚

NOT_SUPPORTED

先调用m2()方法的completeTransactionAfterThrowing,执行过程如下:

  • 调用processRollback
    • 执行完成前置方法
    • 执行完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,恢复挂起的资源,绑定到本地线程resources;恢复挂起的TransactionSynchronization

再调用m1()方法的completeTransactionAfterThrowing

  • 调用processRollback
    • 执行完成前置方法
    • doRollback:事务回滚,con.rollback();
    • 执行完成后置方法
    • cleanupAfterCompletion:设置TransactionStatus的completed=true,清除当前线程的事务信息,移除线程中的ConnectionHolder,重置connection,关闭连接,清空ConnectionHolder和ResourceHolder事务状态,恢复挂起的资源,绑定到本地线程resources;恢复挂起的TransactionSynchronization

(2)总结

  • savepoint不为空,回滚到保存点(代码已经回滚),释放保存点,savepoint
  • newTransaction=true,回滚事务,con.rollback();重置connection,清空ConnectionHolder和ResourceHolder事务状态(newConnectionHolder=true,移除当前线程的connection,关闭连接)
  • transaction不为空,设置ResourceHolder的rollbackOnly=true
  • mustRestoreAutoCommit=true,前面关闭了自动提交,需要恢复自动提交
  • suspendedResources表示是否有挂起资源,如果是加入当前事务,就不需要挂起
  • newSynchronization=true,执行完成前置方法以及后置方法,清除当前线程的事务信息,可以理解为后面还有一个新事务要执行,所以需要事务同步

三、事务源码串联起飞

1、通过debug模式来看看怎么调用事务

调试代码,打断点
在这里插入图片描述
可以看到,调用了 TransactionInterceptor 的 invoke 方法,这里就是开始处理事务的起点
在这里插入图片描述

@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
    // 获取目标类
    Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
    // 获取目标方法
    Method var10001 = invocation.getMethod();
    invocation.getClass();
    return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);
}

调用了 TransactionAspectSupport 的 invokeWithinTransaction 方法

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
		final InvocationCallback invocation) throws Throwable {

	// 如果事务属性为空,则该方法是非事务性的
	TransactionAttributeSource tas = getTransactionAttributeSource();
	// 通过事务属性源对象拿到方法的事务属性信息:propagationBehavior,readOnly...
	final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
	// 获取事务管理器对象
	final TransactionManager tm = determineTransactionManager(txAttr);

	if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
		// 响应式编程逻辑
		....
	}
	// 将事务管理器强制转为平台事务管理器
	PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
	// 获取连接点唯一标识:目前方法具体路径
	final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

	if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
		// 创建事务
		TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

		Object retVal;
		try {
			// 调用目标对象方法
			retVal = invocation.proceedWithInvocation();
		}
		catch (Throwable ex) {
			// target invocation exception
			completeTransactionAfterThrowing(txInfo, ex);
			throw ex;
		}
		finally {
			// 清除事务信息
			cleanupTransactionInfo(txInfo);
		}

		if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
			// Set rollback-only in case of Vavr failure matching our rollback rules...
			TransactionStatus status = txInfo.getTransactionStatus();
			if (status != null && txAttr != null) {
				retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
			}
		}
		// // 执行commit操作
		commitTransactionAfterReturning(txInfo);
		return retVal;
	}

	else {
		// CallbackPreferringPlatformTransactionManager的实例,才执行的逻辑
		...
	}
}

调用了 TransactionAspectSupport的createTransactionIfNecessary方法,方法里面又调用了AbstractPlatformTransactionManager的 getTransaction方法,这个就是我们前面分析过的核心方法了,到这里已经关联上了

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
		@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
	// If no name specified, apply method identification as transaction name.
	if (txAttr != null && txAttr.getName() == null) {
		// 设置targetDefinition和targetAttribute属性值,和TransactionAttribute的一样,设置joinpointIdentification
		txAttr = new DelegatingTransactionAttribute(txAttr) {
			@Override
			public String getName() {
				return joinpointIdentification;
			}
		};
	}
	TransactionStatus status = null;
	if (txAttr != null) {
		if (tm != null) {
			status = tm.getTransaction(txAttr);
		}
		else {
			if (logger.isDebugEnabled()) {
				logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
						"] because no transaction manager has been configured");
			}
		}
	}
	// 创建TransactionInfo并填充属性,并绑定到transactionInfoHolder线程对象中
	return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
			new NamedThreadLocal<>("Current aspect-driven transaction");

TransactionAttribute继承TransactionDefinition,propagationBehavior、isolationLevel,readOnly等属性会因我们注解填写的不同而不同 TransactionStatus会根据是单事务还是双事务,设置不同的属性值

2、TransactionInterceptor作为入口,如何注入容器中的?

TransactionInterceptor 就是事务处理的 advice,AOP中的增强方法
1)我们点击开启注解事务的注解@EnableTransactionManagement,可以看到里面@的Import
在这里插入图片描述
2)进入到TransactionManagementConfigurationSelector类,可以看到里面注入了ProxyTransactionManagementConfiguration类
在这里插入图片描述
3)进入到ProxyTransactionManagementConfiguration类,可以看到里面把TransactionInterceptor设置给advisor
在这里插入图片描述

四、提炼

1、@Transactional怎么实现事务的?

@Transactional就是我们说的声明式事务,其实就是把编程式事务的公共代码抽取出来去AOP实现,要在方法前后增强,所以用环绕通知,出现异常,异常通知进行回滚

// 进行前置增强:获取数据库连接,关闭自动提交,绑定Map<数据源,数据库连接>到线程对象
// @Transactional的信息其实会被解析为TransactionDefinition,最终得到TransactionStatus对象
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
	// 调用目标对象方法
	retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
	// 出现异常,执行rollback操纵
	completeTransactionAfterThrowing(txInfo, ex);
	throw ex;
}
finally {
	// 设置之前的事务消息到当前线程transactionInfoHolder.set(this.oldTransactionInfo)
	cleanupTransactionInfo(txInfo);
}

// 执行commit操作
commitTransactionAfterReturning(txInfo);
return retVal;

2、怎么实现事务传播行为的?

如果需要了解不同传播行为的效果,可以查看这里

对于不同的传播行为,前面创建事务的时候,我们得到了TransactionStatus对象,关键在于里面的值不同,代码会去实现不同的效果
NEVER的话,是不会得到TransactionStatus,直接就抛异常了
其它情况得到的对象内部如下:

  • NOT_SUPPORTED:transaction=null,suspendedResources=挂起资源,savepoint=null,newTransaction=false,newSynchronization= true
  • REQUIRED,SUPPORTS,MANDATORY:transaction={newConnectionHolder = false,mustRestoreAutoCommit = false},suspendedResources=null,savepoint=null,newTransaction=false,newSynchronization= false
  • NESTED:transaction={newConnectionHolder = false,mustRestoreAutoCommit = false},suspendedResources=null,savepoint=保存点,newTransaction=false,newSynchronization= false
  • REQUIRES_NEW:transaction={newConnectionHolder = true,mustRestoreAutoCommit = true},suspendedResources=挂起资源,savepoint=null,newTransaction=true,newSynchronization= true

主要区别就在于上面这些值,我们来分析下是怎么提交和回滚的

1)NOT_SUPPORTED

  • transaction为null,表示不以事务的方式运行,不存在事务提交或回滚
  • newSynchronization= true,表示需要同步事务信息,suspendedResources存在挂起资源,说明后面会新起事务执行

2)REQUIRED,SUPPORTS,MANDATORY

这三者得到的TransactionStatus对象是一样的,执行情况也是一样,我们来分析下

  • transaction不为null,表示以事务的方式执行
  • newTransaction=false,说明不会提交事务,newSynchronization=false,表示不需要同步事务,mustRestoreAutoCommit = false,说明后面需要恢复自动提交,这就说明是加入外层事务一起提交或回滚,无论是内层方法还是外层方法,报错都会一起回滚,属于加入事务行为

3)NESTED

它存在一个特殊的值,那就是savepoint,其他值基本和REQUIRED的一样,说明加入外层事务一起提交或回滚,真的是这样?savepoint有什么用?
那就是出现异常时,内层事务会直接进行回滚,Connection.rollback(savepoint),说明内外层是不同事务,但是正常执行的话,是等最终才一起提交的,这种属于嵌套事务,比较特殊的是,内层事务异常的话,外层事务捕获了,外层事务是正常提交的

4)REQUIRES_NEW

首先transaction不为null,表示以事务的方式执行,newConnectionHolder=true,说明是新起事务执行,相对于内外层是两个不同的事务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/647381.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

压缩感知入门——基于总体最小二乘的扰动压缩感知

压缩感知系列博客&#xff1a;压缩感知入门①从零开始压缩感知压缩感知入门②信号的稀疏表示和约束等距性压缩感知入门③基于ADMM的全变分正则化的压缩感知重构算法 文章目录 1. Problem2. 仿真结果3. MATLAB算法4. 源码地址参考文献 1. Problem 一个经典的压缩感知重构问题可以…

Git的原理与使用

背景知识&#xff1a; 我们在编写各种文档时&#xff0c;为了防止文档丢失&#xff0c;更改失误&#xff0c;失误后能恢复到原来的版本&#xff0c;不得不复制出一个副本。每个版本有各自的内容&#xff0c;但最终会只有一份报告需要被我们使用 。但在此之前的工作都需要这些不…

day10_类中成员之变量

通过以前的学习&#xff0c;我们知道了成员变量是类的重要组成部分。对象的属性以变量形式存在&#xff0c;下面我们就来详解的学习一下类中的变量 成员变量 成员变量的分类 实例变量&#xff1a;没有static修饰&#xff0c;也叫对象属性&#xff0c;属于某个对象的&#xf…

9k字长文理解Transformer: Attention Is All You Need

作者&#xff1a;猛码Memmat 目录 Abstract1 Introduction2 Background3 Model Architecture3.1 Encoder and Decoder Stacks3.2 Attention3.2.1 Scaled Dot-Product Attention3.2.2 Multi-Head Attention3.2.3 Applications of Attention in our Model 3.3 Position-wise Feed…

Linux 数据库 MySQL

Linux系统分类 ^ 数据库的分类 Linux系统 Centos(获取软件包使用yum、dnf) Ubuntu(获取软件包使用apt-get) Suse 国产系统 华为 欧拉 阿里 龙蜥 腾讯 tencentOS 麒麟&#xff08;银河麒麟、中标麒麟->基于centos 优麒麟-> 基于Ubuntu &#xff09; 统…

流计算、Flink和图计算

流计算 流计算流计算概述静态数据和流数据批量计算和实时计算流计算概念流计算与Hadoop流计算框架 流计算处理流程数据处理流程数据实时采集数据实时计算实时查询服务 流计算的应用开源流计算框架StormStorm简介Storm的特点 FlinkFlink简介为什么选择Flink传统数据处理架构大数…

三年 Android 开发的技术人生,浅谈自身面试的感悟

文章素材来源于网友 本篇主要记录了一个 Android 菜瓜三年的面试之旅&#xff0c;希望对大家面试、跳槽有所帮助。 一些唠叨 从进入这行开始&#xff0c;就是听说Android端凉了&#xff0c;寒冬这又如何那又如何的事情&#xff0c;很多的风言风语缠绕着这个圈子。但是老弟觉得…

电商系统架构设计系列(三):关于「订单系统」有哪些问题是要特别考虑的?

订单系统是整个电商系统中最重要的一个子系统&#xff0c;订单数据也就是电商企业最重要的数据资产。 上篇文章中&#xff0c;我给你留了一个思考题&#xff1a;当系统在创建和更新订单时&#xff0c;如何保证数据准确无误呢&#xff1f; 今天这篇文章&#xff0c;主要聊一下&…

DM8:达梦数据库开启SQL日志sqllog

DM8:达梦数据库开启SQL日志sqllog 环境介绍1 修改配置文件sqllog.ini2 开启与关闭 DMSQLLOG2.1 开启 sql 日志记录功能2.2 查询 sql 日志记录是否开启&#xff1a;0 关闭&#xff0c;1/2/3开启);2.3 关闭 sql 日志记录功能 3 sqllog.ini 详细介绍4 更多达梦数据库使用经验总结 …

辅助驾驶功能开发-功能算法篇(3)-ACC-弯道速度辅助

1、功能架构:ACC弯道速度辅助(CSA) 2、CSA功能控制 2.1 要求 2.1.1 CSA ASM:弯道速度辅助 1. 模式管理器:驾驶员应能够激活/关闭功能 应存在处理 CSA 功能的模式管理器。模式管理器由驾驶员输入和系统状态控制。 模式管理器有两个由 CSAStatus 定义的状态。状态转换定义…

ubuntu 系统解决GitHub无法访问问题

先后试了网上两个教程,终于解决。通过修改host文件实现访问。 教程1: 1)终端输入: sudo gedit /etc/hosts 打开hosts文件; 2)使用ip查找工具查询 http://github.com和IP:http://github.global.ssl.fastly.net的ip地址并添加到hosts文件末尾; 3)关掉hosts文件,在终端…

HTTPS协议-保障数据安全【安全篇】

我们都知道由于HTTP是明文的&#xff0c;整个传输过程完全透明&#xff0c;任何人都能够在链路中监听、修改、伪造请求/响应报文。所以不能满足我们的安全要求。比我如我们上网的信息会被轻易的截获&#xff0c;所浏览的网站真实性也无法验证。黑客可以伪装成银行、购物网站来盗…

接口测试常见接口类型?

常见接口类型 1.根据协议区分 1、webService接口:是走soap协议通过http传输请求报文和返回报文都是xml格式的&#xff0c;我们在测试的时候都用通过工具才能进行调用&#xff0c;测试。可以使用的工具有Soapul、jmeter、loadrunner等; 2、http接口:是走http协议&#xff0c;…

Vue向pdf文件中添加二维码

这两天刚看到一个需求&#xff0c;简单描述一下&#xff0c;就是我们拿到一个pdf文件流&#xff08;文件流可以是后端返回的&#xff0c;也可以是从自己本地选的&#xff09;和一个url链接 &#xff0c;现在要将url链接生成二维码&#xff0c;并将这个二维码添加到这个pdf文件中…

一元线性回归分析

一元线性回归分析&#xff1a; &#xff08;1&#xff09;假设X与Y有线性相关关系&#xff0c;求Y与X样本回归直线方程&#xff0c;并求 的无偏估计&#xff1b; &#xff08;2&#xff09;检验Y和X之间的线性关系是否显著(α0.05)&#xff1b; &#xff08;3&#xff09;当Xx0…

通过电脑屏幕传输文件

简介 本文介绍一套用于通过屏幕传输文件信息的软件。 通过屏幕传输文件&#xff0c;即非接触式&#xff0c;非插入式传递文件&#xff0c;是除了常用的网络传输&#xff0c;硬盘类介质拷贝之外的文件信息传输方式&#xff0c;基本原理就是将待传输的文件转换为二维码&#xf…

SkipList(跳表)

基本概述 SkipList&#xff08;跳表&#xff09;首先是链表&#xff0c;但与传统链表相比有几点差异&#xff1a; 元素按照升序排列存储节点可能包含多个指针&#xff0c;指针跨度不同【根据元素个数不同&#xff0c;可以建立多级指针&#xff08;最多可以建立32级指针&#…

liunx优化命令之free命令

free 这里写目录标题 一、free命令描述&#xff1a;1.free命令的语法&#xff1a;2.free命令的选项&#xff1a;3.free命令的输出格式&#xff1a; 二、压力测试工具stress&#xff1a;1.工具简介&#xff1a;2.参数详解&#xff1a;3.下载压力测试工具&#xff1a; 三、模拟实…

基于51单片机设计的红外遥控器

一、项目介绍 遥控器是现代生活中必不可少的电子产品之一,目前市面上的遥控器种类繁多,应用范围广泛。而 NEC 红外遥控器协议则是目前应用最为广泛的一种协议之一,几乎所有的电视、空调等家用电器都支持该协议。 本项目是基于 51 单片机设计支持 NEC 协议的红外遥控器,实…

TPU-MLIR的环境搭建和使用

1、开发环境配置 Linux开发环境 一台安装了Ubuntu16.04/18.04/20.04的x86主机&#xff0c;运行内存建议12GB以上下载SophonSDK开发包(v23.03.01) &#xff08;1&#xff09;解压缩SDK包 sudo apt-get install p7zip sudo apt-get install p7zip-full 7z x Release_<date&…