Spring 动态数据源事务处理

news2025/1/10 17:11:27

在一般的 Spring 应用中,如果底层数据库访问采用的是 MyBatis,那么在大多数情况下,只使用一个单独的数据源,Spring 的事务管理在大多数情况下都是有效的。然而,在一些复杂的业务场景下,如需要在某一时刻访问不同的数据库,由于 Spring 对于事务管理实现的方式,可能不能达到预期的效果。本文将简要介绍 Spring 中事务的实现方式,并对以 MyBatis 为底层数据库访问的系统为例,提供多数据源事务处理的解决方案

Spring 事务的实现原理

常见地,在 Spring 中添加事务的方式通常都是在对应的方法或类上加上 @Transactional 注解显式地将这部分处理加上事务,对于 @Transactional 注解,Spring 会在 org.springframework.transaction.annotation.AnnotationTransactionAttributeSource 定义方法拦截的匹配规则(即 AOP 部分中的 PointCut),而具体的处理逻辑(即 AOP 中的 Advice)则是在 org.springframework.transaction.interceptor.TransactionInterceptor 中定义

具体事务执行的调用链路如下

spring-transaction-flow.png

Spring 对于事务切面采取的具体行为实现如下:

public class TransactionInterceptor 
    extends TransactionAspectSupport 
    implements MethodInterceptor, Serializable {
    
    // 这里的方法定义为 MethodInterceptor,即 AOP 实际调用点
    @Override
	@Nullable
	public Object invoke(MethodInvocation invocation) throws Throwable {
		// Work out the target class: may be {@code null}.
		// The TransactionAttributeSource should be passed the target class
		// as well as the method, which may be from an interface.
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
        // invokeWithinTransaction 为父类 TransactionAspectSupport 定义的方法
		return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
			@Override
			@Nullable
			public Object proceedWithInvocation() throws Throwable {
				return invocation.proceed();
			}
			@Override
			public Object getTarget() {
				return invocation.getThis();
			}
			@Override
			public Object[] getArguments() {
				return invocation.getArguments();
			}
		});
	}
}

继续进入 TransactionAspectSupport 的 invokeWithinTransaction 方法:

public abstract class TransactionAspectSupport 
    implements BeanFactoryAware, InitializingBean {
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {
        // 省略响应式事务和编程式事务的处理逻辑

        // 当前事务管理的实际
		PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

		if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
            /*
            	检查在当前的执行上下文中,是否需要创建新的事务,这是因为当前执行的业务处理可能在上一个已经开始
            	的事务处理中
            */
			TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

			Object retVal;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
				retVal = invocation.proceedWithInvocation(); // 实际业务代码的业务处理
			}
			catch (Throwable ex) {
				// target invocation exception
				completeTransactionAfterThrowing(txInfo, ex); // 出现异常的回滚处理
				throw ex;
			}
			finally {
				cleanupTransactionInfo(txInfo);
			}

			if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
				// Set rollback-only in case of Vavr failure matching our rollback rules...
				TransactionStatus status = txInfo.getTransactionStatus();
				if (status != null && txAttr != null) {
					retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
				}
			}

            // 如果没有出现异常,则提交本次事务
			commitTransactionAfterReturning(txInfo);
			return retVal;
		}
	}
}

在获取事务信息对象时,首先需要获取到对应的事务状态对象 TransactionStatus,这个状态对象决定了 Spring 后续要对当前事务采取的何种行为,具体代码在 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction

// 这里的 definition 是通过解析 @Transactional 注解中的属性得到的配置对象
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    throws TransactionException {

    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

    /*
    	这里获取事务相关的对象(如持有的数据库连接等),具体由子类来定义相关的实现
    */
    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    // 如果当前已经在一个事务中,那么需要按照定义的属性采取对应的行为
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }

    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
            "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // 需要重新开启一个新的事务的情况,具体在 org.springframework.transaction.TransactionDefinition 有相关的定义
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
        }
        try {
            // 开启一个新的事务
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + def);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

在 AbstractPlatformTransactionManager 中已经定义了事务处理的大体框架,而实际的事务实现则交由具体的子类实现,在一般情况下,由 org.springframework.jdbc.datasource.DataSourceTransactionManager 采取具体的实现

主要关注的点在于对于事务信息对象的创建,事务的开启、提交回滚操作,具体对应的代码如下:

事务信息对象的创建代码:

protected Object doGetTransaction() {
    /*
    	简单地理解,DataSourceTransactionObject 就是一个持有数据库连接的资源对象
    */
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    /*
    	TransactionSynchronizationManager 是用于管理在事务执行过程相关的信息对象的一个工具类,基本上
    	这个类持有的事务信息贯穿了整个 Spring 事务管理
    */
    ConnectionHolder conHolder =
        (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

开启事务对应的源代码:

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        /*
        	如果当前事务对象没有持有数据库连接,则需要从对应的 DataSource 中获取对应的连接
        */
        if (!txObject.hasConnectionHolder() ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        txObject.setReadOnly(definition.isReadOnly());

        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don't want to do it unnecessarily (for example if we've explicitly
        // configured the connection pool to set it already).
        
        /*
        	由于当前的事务已经交由 Spring 进行管理,那么在这种情况下,原有数据库连接的自动提交
        	必须是关闭的,因为如果开启了自动提交,那么实际上就相当于每一次的 SQL 都会执行一次事务的提交,
        	这种情况下事务的管理没有意义
        */
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            con.setAutoCommit(false);
        }

        prepareTransactionalConnection(con, definition);
        txObject.getConnectionHolder().setTransactionActive(true);

        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // Bind the connection holder to the thread.
        
        /*
        	如果是新创建的事务,那么需要绑定这个数据库连接对象到这个事务中,使得后续再进来的业务处理
        	能够顺利地进入原有的事务中
        */
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    }

    catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1367514.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于YOLOv8全系列【n/s/m/l/x】开发构建道路交通场景下CCTSDB2021交通标识检测识别系统

交通标志检测是交通标志识别系统中的一项重要任务。与其他国家的交通标志相比&#xff0c;中国的交通标志有其独特的特点。卷积神经网络&#xff08;CNN&#xff09;在计算机视觉任务中取得了突破性进展&#xff0c;在交通标志分类方面取得了巨大的成功。CCTSDB 数据集是由长沙…

SkyWalking介绍和Docker环境下部署

一、Skywalking概述 1、Skywalking介绍 Skywalking是分布式系统的应用程序性能监视工具&#xff0c;专为微服务&#xff0c;云原生架构和基于容器&#xff08;Docker&#xff0c;K8S,Mesos&#xff09;架构而设计&#xff0c;它是一款优秀的APM&#xff08;Application Perfo…

CentOS 7 安装私有平台OpenNebula

目录 一、配置yum源 二、配置数据库MySQL 2.1 安装MySQL 2.2 修改MySQL密码 2.3 创建项目用户和库 三、安装配置前端包 四、设置oneadmin账号密码 五、验证安装 5.1 命令行验证安装 5.2 数据存放位置 5.3 端口介绍 5.4 命令介绍 六、访问 6.1 设置语言 6.2 创建主…

【Python学习】Python学习8-Number

目录 【Python学习】Python学习8-Number 前言在变量赋值时被创建Python支持四种不同的数据类型整型(Int)长整型(long integers&#xff09;浮点型(loating point real values)复数(complex numbers) Python Number 类型转换Python math 模块、cmath 模块Python数学函数Python随…

每日一题——LeetCode1021删除最外层括号1047消除字符串相邻重复字符串

这两道题基本上来说是差不多的&#xff0c;一个匹配并删除字符串中的( ) 一个匹配并删除字符串中相邻重复的元素&#xff0c;其实都是用到栈这种数据结构&#xff0c;通过匹配不同的条件使用入栈出栈操作保存或删除目标元素来实现。 1021.删除最外层括号 var removeOuterParent…

零售EDI:Petco EDI对接指南

Petco 始于1965年&#xff0c;是一家美国宠物零售商&#xff0c;提供各种宠物产品和服务以及某些类型的活体小动物。起初Petco只是一家邮购兽医用品公司&#xff0c;后发展为一家成熟的宠物食品和供应链的公司。Petco与其供应商之间是如何传输业务数据的呢&#xff1f; 通过EDI…

Qt QLineEdit文本框控件

文章目录 1 属性和方法1.1 占位字符串1.2 对齐方式1.3 回显模式1.4 读写控制1.5 格式控制1.6 信号和槽 2 实例2. 布局2.2 代码实现 QLineEdit 是Qt 中的文本框&#xff0c;准确地说是单行文本框&#xff0c;通常用于接受用户的输入。 比如用户输入用户名、密码等&#xff0c;都…

Windows安装Docker运行中间件(详细)

1、Docker Docker是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的镜像中&#xff0c;然后发布到任何流行的 Linux或Windows操作系统的机器上&#xff0c;也可以实现虚拟化。容器是完全使用沙箱机制&#xff0c;相互之间不会有任何接…

人类的失误、错误与机器的失误、错误

人类的失误和错误是指人类在认知、判断、决策和行动过程中出现的错误或差错。这些错误可能是由于认知偏差、信息不完全、判断错误、行为失控等原因造成的。人类的失误和错误是不可避免的&#xff0c;而且在很多领域都有广泛的存在&#xff0c;包括工作、学习、社交、交通等方面…

深入理解Java源码:提升技术功底,深度掌握技术框架,快速定位线上问题

为什么要看源码&#xff1a; 1、提升技术功底&#xff1a; 学习源码里的优秀设计思想&#xff0c;比如一些疑难问题的解决思路&#xff0c;还有一些优秀的设计模式&#xff0c;整体提升自己的技术功底 2、深度掌握技术框架&#xff1a; 源码看多了&#xff0c;对于一个新技术…

线性代数 --- 为什么LU分解中的下三角矩阵L的主对角线上都是1?

为什么LU分解中的下三角矩阵L的主对角线上都是1? 笔者的一些话&#xff1a; 为什么LU分解中L矩阵的主对角线上都是1&#xff1f;因为最近一段时间在研究LU分解的编程实现&#xff0c;这个问题也就时不时的从我脑子里面冒出来。但大多时候都是一闪而过&#xff0c;没有太在意。…

IIC Master 设计实现

写个IIC的主机来玩一玩。 仅100M时钟输入SCL波形工整&#xff0c;任意两个上升沿之间均为整数倍周期&#xff0c;占空比50%发送数据时SDA严格对其到SCL低电平正中间尽可能少的状态机不浪费资源数据逻辑和时序逻辑分离 接口设计中&#xff0c;我的思路是将数据与时序分离开&am…

数据结构-测试5

一、判断题 1.二叉树只能用二叉链表表示&#xff08;F&#xff09; 二叉树的存储结构有两种&#xff0c;顺序存储结构和链式存储结构 2. 装填因子是散列表的一个重要参数&#xff0c;它反映散列表的装满程度。(T) 装填因子越小&#xff0c;发生冲突的可能性越小 3. 在任何情况…

损失函数 - Focal Loss

b站账号 : Enzo_Mi 知识星球 : Enzo AI学习小组 | 小白分会 欢迎加入我的知识星球,一起来学习吧 ~ Focal Loss 1、Focal Loss 提出的背景2、正负样本数量不均衡问题 的解决 : baseline3、难分类样本/易分类样本 数量不均衡问题 的解决 : Focal Loss3、类别加权 Focal L…

1.7数算PPT选择汇总,PTA选择汇总,计算后缀表达式,中缀转后缀、前缀、快速排序

PTA选择汇总 在第一个位置后插入&#xff0c;注意是在后面插入&#xff0c;而不是前面&#xff1b;要移动49&#xff0c;为50-I&#xff0c;第25个的话&#xff0c;移25个 如果是插在前面&#xff0c;就移动50&#xff0c;N-I1&#xff0c;注意是插在前面还是后面 删第一个&a…

今日实践 — 附加数据库/重定向失败如何解决?

WMS数据库与重定向 前言正文如何建立数据库连接&#xff1f;第一步&#xff1a;打开SSMS&#xff0c;右击数据库&#xff0c;点击附加第二步&#xff1a;点击添加第三步&#xff1a;找到自己的数据库文件&#xff0c;点击确定按钮第四步&#xff1a;若有多个数据库&#xff0c;…

Hyperledger Fabric 管理链码 peer lifecycle chaincode 指令使用

链上代码&#xff08;Chaincode&#xff09;简称链码&#xff0c;包括系统链码和用户链码。系统链码&#xff08;System Chaincode&#xff09;指的是 Fabric Peer 中负责系统配置、查询、背书、验证等平台功能的代码逻辑&#xff0c;运行在 Peer 进程内&#xff0c;将在第 14 …

如何查找native服务的接口实现

以Netd为例&#xff1a; 首先adb看一下服务的接口&#xff1a; 接口文件是INetd&#xff0c;去源码找一下INetd.aidl 已经确定了接口API&#xff0c;对于native服务端的实现&#xff0c;一般的继承顺序为&#xff1a; 根据继承关系&#xff0c;对于BnXxx/XxxService 对象&…

IO类day02

JAVA IO java io可以让我们用标准的读写操作来完成对不同设备的读写数据工作. java将IO按照方向划分为输入与输出,参照点是我们写的程序. 输入:用来读取数据的,是从外界到程序的方向,用于获取数据. 输出:用来写出数据的,是从程序到外界的方向,用于发送数据. java将IO比喻为…

15个等轴视图设计的电动车汽车无人机等PR剪辑素材视频制作元素

包含15个等轴视图、等距视角电动车、汽车、无人机、沙漏、飞机等PR剪辑素材视频制作元素mogrt动画模板。 特征&#xff1a; 等距设计&#xff1b; 可以更改颜色&#xff1b; 分辨率&#xff1a;全高清&#xff08;19201080&#xff09;&#xff1b; 持续时间&#xff1a;15秒&a…