SpringBoot 多数据源及事务解决方案

news2025/1/22 19:07:50

1. 背景

一个主库和N个应用库的数据源,并且会同时操作主库和应用库的数据,需要解决以下两个问题:

  • 如何动态管理多个数据源以及切换?

  • 如何保证多数据源场景下的数据一致性(事务)?

本文主要探讨这两个问题的解决方案,希望能对读者有一定的启发。

2. 数据源切换原理

通过扩展Spring提供的抽象类AbstractRoutingDataSource,可以实现切换数据源。其类结构如下图所示:

 

  • targetDataSources&defaultTargetDataSource

项目上需要使用的所有数据源和默认数据源。

  • resolvedDataSources&resolvedDefaultDataSource

当Spring容器创建AbstractRoutingDataSource对象时,通过调用afterPropertiesSet复制上述目标数据源。由此可见,一旦数据源实例对象创建完毕,业务无法再添加新的数据源。

  • determineCurrentLookupKey

此方法为抽象方法,通过扩展这个方法来实现数据源的切换。目标数据源的结构为:Map<Object, DataSource>其key为lookup key

我们来看官方对这个方法的注释:

 

lookup key通常是绑定在线程上下文中,根据这个key去resolvedDataSources中取出DataSource。

根据目标数据源的管理方式不同,可以使用基于配置文件和数据库表两种方式。基于配置文件管理方案无法后续添加新的数据源,而基于数据库表方案管理,则更加灵活。

3. 配置文件解决方案

根据上面的分析,我们可以按照下面的步骤去实现:

  • 定义DynamicDataSource类继承AbstractRoutingDataSource,重写determineCurrentLookupKey()方法。

  • 配置多个数据源注入targetDataSourcesdefaultTargetDataSource,通过afterPropertiesSet()方法将数据源写入resolvedDataSourcesresolvedDefaultDataSource

  • 调用AbstractRoutingDataSourcegetConnection()方法时,determineTargetDataSource()方法返回DataSource执行底层的getConnection()

其流程如下图所示:

 

3.1 创建数据源

DynamicDataSource数据源的注入,目前业界主流实现步骤如下:

在配置文件中定义数据源

spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driverClassName=com.mysql.jdbc.Driver
# 主数据源
spring.datasource.druid.master.url=jdbcUrl
spring.datasource.druid.master.username=***
spring.datasource.druid.master.password=***
# 其他数据源
spring.datasource.druid.second.url=jdbcUrl
spring.datasource.druid.second.username=***
spring.datasource.druid.second.password=***

在代码中配置Bean

@Configuration
public class DynamicDataSourceConfig {
    @Bean
    @ConfigurationProperties("spring.datasource.druid.master")
    public DataSource firstDataSource(){
        return DruidDataSourceBuilder.create().build();
    }
 
    @Bean
    @ConfigurationProperties("spring.datasource.druid.second")
    public DataSource secondDataSource(){
        return DruidDataSourceBuilder.create().build();
    }
 
    @Bean
    @Primary
    public DynamicDataSource dataSource(DataSource firstDataSource, DataSource secondDataSource) {
        Map<Object, Object> targetDataSources = new HashMap<>(5);
        targetDataSources.put(DataSourceNames.FIRST, firstDataSource);
        targetDataSources.put(DataSourceNames.SECOND, secondDataSource);
        return new DynamicDataSource(firstDataSource, targetDataSources);
    }
}

3.2 AOP处理

通过DataSourceAspect切面技术来简化业务上的使用,只需要在业务方法添加@SwitchDataSource注解即可完成动态切换:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface SwitchDataSource {
    String value();
}

DataSourceAspect拦截业务方法,更新当前线程上下文DataSourceContextHolder中存储的key,即可实现数据源切换。

3.3 方案不足

基于AbstractRoutingDataSource的多数据源动态切换,有个明显的缺点,无法动态添加和删除数据源。在我们的产品中,不能把应用数据源写死在配置文件。接下来分享一下基于数据库表的实现方案。

4. 数据库表解决方案

我们需要实现可视化的数据源管理,并实时查看数据源的运行状态。所以我们不能把数据源全部配置在文件中,应该将数据源定义保存到数据库表。参考AbstractRoutingDataSource的设计思路,实现自定义数据源管理。

4.1 设计数据源表

主库的数据源信息仍然配置在项目配置文件中,应用库数据源配置参数,则设计对应的数据表。表结构如下所示:

 

这个表主要就是DataSource的相关配置参数,其相应的ORM操作代码在此不再赘述,主要是实现数据源的增删改查操作。

4.2 自定义数据源管理

4.2.1 定义管理接口

通过继承AbstractDataSource即可实现DynamicDataSource。为了方便对数据源进行操作,我们定义一个接口DataSourceManager,为业务提供操作数据源的统一接口。

public interface DataSourceManager {
    void put(String var1, DataSource var2);
 
    DataSource get(String var1);
 
    Boolean hasDataSource(String var1);
 
    void remove(String var1);
 
    void closeDataSource(String var1);
 
    Collection<DataSource> all();
}

该接口主要是对数据表中定义的数据源,提供基础管理功能。

4.2.2 自定义数据源

DynamicDataSource的实现如下图所示:

 

根据前面的分析,AbstractRoutingDataSource是在容器启动的时候,执行afterPropertiesSet注入数据源对象,完成之后无法对数据源进行修改。DynamicDataSource则实现DataSourceManager接口,可以将数据表中的数据源加载到dataSources。

4.2.3 切面处理

这一块的处理跟配置文件数据源方案处理方式相同,都是通过AOP技术切换lookup key。

public DataSource determineTargetDataSource() {
        String lookupKey = DataSourceContextHolder.getKey();
        DataSource dataSource = Optional.ofNullable(lookupKey)
                .map(dataSources::get)
                .orElse(defaultDataSource);
        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine DataSource for lookup key [" + lookupKey + "]");
        }
        return dataSource;
    }

4.2.4 管理数据源状态

在项目启动的时候,加载数据表中的所有数据源,并执行初始化。初始化操作主要是使用SpringBoot提供的DataSourceBuilder类,根据数据源表的定义创建DataSource。在项目运行过程中,可以使用定时任务对数据源进行保活,为了提升性能再添加一层缓存。

 

AbstractRoutingDataSource 只支持单库事务,切换数据源是在开启事务之前执行。 Spring使用 DataSourceTransactionManager进行事务管理。开启事务,会将数据源缓存到DataSourceTransactionObject对象中,后续的commit和 rollback事务操作实际上是使用的同一个数据源。

如何解决切库事务问题?借助Spring的声明式事务处理,我们可以在多次切库操作时强制开启新的事务:

@SwitchDataSource    
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)

这样的话,执行切库操作的时候强制启动新事务,便可实现多次切库而且事务能够生效。但是这种事务方式,存在数据一致性问题:

 

 

假若ServiceB正常执行提交事务,接着返回ServiceA执行并且发生异常。因为两次处理是不同的事务,ServiceA这个事务执行回滚,而ServiceA事务已经提交。这样的话,数据就不一致了。接下来,我们主要讨论如何解决多库的事务问题。

6. 多库事务处理

6.1 关于事务的理解

首先有必要理解事务的本质。

1.提到Spring事务,就离不开事务的四大特性和隔离级别、七大传播特性。

事务特性和离级别是属于数据库范畴。Spring事务的七大传播特性是什么呢?它是Spring在当前线程内,处理多个事务操作时的事务应用策略,数据库事务本身并不存在传播特性。

2.Spring事务的定义包括:begin、commit、rollback、close、suspend、resume等动作。

  • begin(事务开始): 可以认为存在于数据库的命令中,比如Mysql的start transaction命令,但是在JDBC编程方式中不存在。

  • close(事务关闭): Spring事务的close()方法,是把Connection对象归还给数据库连接池,与事务无关。

  • suspend(事务挂起): Spring中事务挂起的语义是:需要新事务时,将现有的Connection保存起来(还有尚未提交的事务),然后创建新的Connection2Connection2提交、回滚、关闭完毕后,再把Connection1取出来继续执行。

  • resume(事务恢复): 嵌套事务执行完毕,返回上层事务重新绑定连接对象到事务管理器的过程。

实际上,只有commit、rollback、close是在JDBC真实存在的,而其他动作都是应用的语意,而非JDBC事务的真实命令。因此,事务真实存在的方法是:setAutoCommit()commit()rollback()

close()语义为:

  • 关闭一个数据库连接,这已经不再是事务的方法了。

使用DataSource并不会执行物理关闭,只是归还给连接池。

6.2 自定义管理事务

为了保证在多个数据源中事务的一致性,我们可以手动管理Connetion的事务提交和回滚。考虑到不同ORM框架的事务管理实现差异,要求实现自定义事务管理不影响框架层的事务。

这可以通过使用装饰器设计模式,对Connection进行包装重写commit和rolllback屏蔽其默认行为,这样就不会影响到原生Connection和ORM框架的默认事务行为。其整体思路如下图所示:

 

这里并没有使用前面提到的@SwitchDataSource,这是因为我们在TransactionAop中已经执行了lookupKey的切换。

6.2.1 定义多事务注解

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransaction {
    String transactionManager() default "multiTransactionManager";
    // 默认数据隔离级别,随数据库本身默认值
    IsolationLevel isolationLevel() default IsolationLevel.DEFAULT;
    // 默认为主库数据源
    String datasourceId() default "default";
    // 只读事务,若有更新操作会抛出异常
    boolean readOnly() default false;

业务方法只需使用该注解即可开启事务,datasourceId指定事务用到的数据源,不指定默认为主库。

6.2.3 包装Connection

自定义事务我们使用包装过的Connection,屏蔽其中的commit&rollback方法。这样我们就可以在主事务里进行统一的事务提交和回滚操作。

public class ConnectionProxy implements Connection {
 
    private final Connection connection;
 
    public ConnectionProxy(Connection connection) {
        this.connection = connection;
    }
 
    @Override
    public void commit() throws SQLException {
        // connection.commit();
    }
 
    public void realCommit() throws SQLException {
        connection.commit();
    }
 
    @Override
    public void close() throws SQLException {
        //connection.close();
    }
 
    public void realClose() throws SQLException {
        if (!connection.getAutoCommit()) {
            connection.setAutoCommit(true);
        }
        connection.close();
    }
 
    @Override
    public void rollback() throws SQLException {
        if(!connection.isClosed())
            connection.rollback();
    }
    ...
}

这里commit&close方法不执行操作,rollback执行的前提是连接执行close才生效。这样不管是使用哪个ORM框架,其自身事务管理都将失效。事务的控制就交由MultiTransaction控制了。

6.2.4 事务上下文管理

public class TransactionHolder {
    // 是否开启了一个MultiTransaction
    private boolean isOpen;
    // 是否只读事务
    private boolean readOnly;
    // 事务隔离级别
    private IsolationLevel isolationLevel;
    // 维护当前线程事务ID和连接关系
    private ConcurrentHashMap<String, ConnectionProxy> connectionMap;
    // 事务执行栈
    private Stack<String> executeStack;
    // 数据源切换栈
    private Stack<String> datasourceKeyStack;
    // 主事务ID
    private String mainTransactionId;
    // 执行次数
    private AtomicInteger transCount;
 
    // 事务和数据源key关系
    private ConcurrentHashMap<String, String> executeIdDatasourceKeyMap;
 
}

每开启一个事物,生成一个事务ID并绑定一个ConnectionProxy。事务嵌套调用,保存事务ID和lookupKey至栈中,当内层事务执行完毕执行pop。这样的话,外层事务只需在栈中执行peek即可获取事务ID和lookupKey。

6.2.5 数据源兼容处理

为了不影响原生事务的使用,需要重写getConnection方法。当前线程没有启动自定义事务,则直接从数据源中返回连接。

@Override
    public Connection getConnection() throws SQLException {
        TransactionHolder transactionHolder = MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();
        if (Objects.isNull(transactionHolder)) {
            return determineTargetDataSource().getConnection();
        }
        ConnectionProxy ConnectionProxy = transactionHolder.getConnectionMap()
                .get(transactionHolder.getExecuteStack().peek());
        if (ConnectionProxy == null) {
            // 没开跨库事务,直接返回
            return determineTargetDataSource().getConnection();
        } else {
            transactionHolder.addCount();
            // 开了跨库事务,从当前线程中拿包装过的Connection
            return ConnectionProxy;
        }
    }

6.2.6 切面处理

切面处理的核心逻辑是:维护一个嵌套事务栈,当业务方法执行结束,或者发生异常时,判断当前栈顶事务ID是否为主事务ID。如果是的话这时候已经到了最外层事务,这时才执行提交和回滚。详细流程如下图所示:

 

package com.github.mtxn.transaction.aop;
 
import com.github.mtxn.application.Application;
import com.github.mtxn.transaction.MultiTransactionManager;
import com.github.mtxn.transaction.annotation.MultiTransaction;
import com.github.mtxn.transaction.context.DataSourceContextHolder;
import com.github.mtxn.transaction.support.IsolationLevel;
import com.github.mtxn.transaction.support.TransactionHolder;
import com.github.mtxn.utils.ExceptionUtils;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
 
import java.lang.reflect.Method;
 
 
@Aspect
@Component
@Slf4j
@Order(99999)
public class MultiTransactionAop {
 
    @Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)")
    public void pointcut() {
        if (log.isDebugEnabled()) {
            log.debug("start in transaction pointcut...");
        }
    }
 
 
    @Around("pointcut()")
    public Object aroundTransaction(ProceedingJoinPoint point) throws Throwable {
        MethodSignature signature = (MethodSignature) point.getSignature();
        // 从切面中获取当前方法
        Method method = signature.getMethod();
        MultiTransaction multiTransaction = method.getAnnotation(MultiTransaction.class);
        if (multiTransaction == null) {
            return point.proceed();
        }
        IsolationLevel isolationLevel = multiTransaction.isolationLevel();
        boolean readOnly = multiTransaction.readOnly();
        String prevKey = DataSourceContextHolder.getKey();
        MultiTransactionManager multiTransactionManager = Application.resolve(multiTransaction.transactionManager());
        // 切数据源,如果失败使用默认库
        if (multiTransactionManager.switchDataSource(point, signature, multiTransaction)) return point.proceed();
        // 开启事务栈
        TransactionHolder transactionHolder = multiTransactionManager.startTransaction(prevKey, isolationLevel, readOnly, multiTransactionManager);
        Object proceed;
 
        try {
            proceed = point.proceed();
            multiTransactionManager.commit();
        } catch (Throwable ex) {
            log.error("execute method:{}#{},err:", method.getDeclaringClass(), method.getName(), ex);
            multiTransactionManager.rollback();
            throw ExceptionUtils.api(ex, "系统异常:%s", ex.getMessage());
        } finally {
            // 当前事务结束出栈
            String transId = multiTransactionManager.getTrans().getExecuteStack().pop();
            transactionHolder.getDatasourceKeyStack().pop();
            // 恢复上一层事务
            DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());
            // 最后回到主事务,关闭此次事务
            multiTransactionManager.close(transId);
        }
        return proceed;
 
    }
 
 
}

7.总结

本文主要介绍了多数据源管理的解决方案(应用层事务,而非XA二段提交保证),以及对多个库同时操作的事务管理。

需要注意的是,这种方式只适用于单体架构的应用。因为多个库的事务参与者都是运行在同一个JVM进行。如果是在微服务架构的应用中,则需要使用分布式事务管理(譬如:Seata)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/477554.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++】——C++基础知识点(C++和C语言的区别)

文章目录 1. 前言2. 命名空间2.1 命名空间定义2.2 命名空间使用 3. C的输入输出4. 缺省参数4.1 缺省参数概念4.2 缺省参数分类 5. 函数重载5.1 函数重载概念5.2 C支持函数重载的原理——名字修饰 6. 引用6.1 引用概念6.2 引用特性6.3 常引用6.4 引用的使用场景6.5 引用和指针的…

OtterCTF

五年前的老题了&#xff0c;但还是值得一做&#xff0c;内存取证yyds! What the password? 取电脑的密码 先看缓存在内存中的注册表的偏移量 volatility_2.6_win64_standalone -f 1.vmem --profileWin7SP1x64 hivelist关注到SAM(账户密码表)和system volatility_2.6_win6…

MRI k空间概念整理

以下内容为MRI期末复习笔记&#xff0c;仅供复习参考使用。 K空间概念 K空间为包含MR数据的阵列&#xff0c;也可定义为原始数据阵列相位编码轴和频率编码轴的交叉点 MR扫描得到的数据为谱空间数据&#xff0c;谱空间数据与空间数据位置无直接对应关系 k空间每一数据点或数据…

数组、链表专题

数组、链表专题 前缀和数组LeetCode 303. 区域和检索 - 数组不可变解题思路代码实现 LeetCode 304. 二维区域和检索 - 矩阵不可变解题思路代码实现 总结 不要纠结&#xff0c;干就完事了&#xff0c;熟练度很重要&#xff01;&#xff01;&#xff01;多练习&#xff0c;多总结…

4726.ACWing.第80场周赛寻找数字

ACWing.第80场周赛寻找数字 题目算法思想代码 题目 算法思想 时间复杂度 相当于01背包的搜索时间复杂度&#xff0c;每个位上要么是 4 要么是 7 &#xff0c;而且题意位数不超过10位&#xff0c;所以dfs可以直接过搜索算法 代码 #include<bits/stdc.h> using namespac…

【LeetCode】235. 二叉搜索树的最近公共祖先

1. 问题 给定一个二叉搜索树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为&#xff1a;“对于有根树 T 的两个结点 p、q&#xff0c;最近公共祖先表示为一个结点 x&#xff0c;满足 x 是 p、q 的祖先且 x 的深度尽可能大&#xff08;一个节点也可…

项目骨架搭建

CSS样式补充 精灵图 CSS精灵图&#xff08;CSS Sprites&#xff09;是一种网页优化技术&#xff0c;通过将多个小图像合并成一个大图像&#xff0c;然后通过CSS的背景定位&#xff08;background-position&#xff09;属性来显示对应的图像部分。这种技术可以减少HTTP请求次数…

【Win11 | SSH】详细教你如何在Windows 11 下完成OpenSSH的安装(保姆级攻略)

文章目录 一、问题描述二、问题尝试解决2.1 ssh的问题2.2 如何手动安装2.3 install 脚本找不到2.4 power shell 开始报错2.5 继续安装ssh 三、输入 ssh 再报错 一、问题描述 最近在测试github的连接时&#xff0c;在终端或命令行窗口中&#xff0c;输入以下命令测试你的 SSH 连…

【五一创作】医院手术室麻醉管理系统概述和功能 SQLServer 2008 R2

医院手术室麻醉管理系统概述 1.系统功能概述 手术麻醉管理系统采用下拉式汉化菜单&#xff0c;界面友好&#xff0c;实用性强&#xff0c;设有与住院、病区、药房等系统的软件接口。 系统主要功能有&#xff1a; 手术管理&#xff1a;提供手术病人检索、手术申请、手术安排…

VSCode配置Arduino

综上所周知&#xff0c;Arduino IDE太不智能了&#xff0c;没有代码提示&#xff0c;不能代码跳转&#xff0c;于是乎找一下vscode有啥插件用 在网上找了些教程&#xff0c;也碰到了奇奇怪怪地坑&#xff0c;于是记录一下能成功配置步骤 准备 1.已安装好Arduino IDE 2.已安装好…

Clion开发STM32之串口封装(HAL库)

前提 在开发STM32过程中&#xff0c;芯片提供的串口引脚一般是不会发生变化的&#xff0c;所以为了方便移植&#xff0c;借助HAL提供的注册回调函数自定义&#xff0c;这边重新进行简要的封装此工程开发是以Clion为开发的IDE,用keil只需将对应的文件进行移植即可.文章末尾附带…

Photoshop如何使用选区之实例演示?

文章目录 0.引言1.利用快速选择工具抠图2.制作网店产品优惠券3.利用选区改变眼睛颜色4.抠取复杂的花束5.制作丁达尔光照效果6.利用选区调整图像局部颜色 0.引言 因科研等多场景需要进行绘图处理&#xff0c;笔者对PS进行了学习&#xff0c;本文通过《Photoshop2021入门教程》及…

STM32F103 系统架构

1、Cortex M3 内核 & 芯片 ARM公司提供内核&#xff08;如Cortex M3&#xff0c;简称CM3&#xff0c;下同&#xff09;授权&#xff0c;完整的MCU还需要很多其他 组件。芯片公司&#xff08;ST、NXP、TI、GD、华大等&#xff09;在得到CM3内核授权后&#xff0c;就可以把C…

Linux网络基础二

一.应用层 我们程序员写的一个个解决我们实际问题, 满足我们日常需求的网络程序, 都是在应用层。 二.再谈 "协议" 协议是一种 "约定". socket api的接口, 在读写数据时, 都是按 "字符串" 的方式来发送接收的. 如果我们要传输一些"结构化…

VBA替换中文文献引用出现的et al.和and

问题描述&#xff1a;Endnote是常用的文献管理工具&#xff0c;并提供国标模板Chinese Std GBT7714 (numeric).ens&#xff0c;但Endnote在中英文混排上略欠考虑。Chinese Std GBT7714使用序号的形式&#xff08;******1&#xff09;对文献进行引用&#xff0c;但有时我们需要以…

python毕业设计之django+vue医院医疗救助系统

&#xff08;1&#xff09; 信息发布 当有基金的申请审批通过时&#xff0c;慈善机构信息维护部门应与慈善机构进行对接&#xff0c;保证信息的真实性&#xff0c;信息发布之后患者可以进行相应的基金申请。 &#xff08;2&#xff09; 基金管理 此项功能是保证基金信息的动态刷…

HR员工管理的三重境界:管事、管人、管心

在一个公司里&#xff0c;员工来来往往是常态&#xff0c;虽说我们不能替他们决定&#xff0c;但是一定是与公司的管理者有一定的关系。马云曾经说过&#xff1a;“一个员工离职&#xff0c;不外乎两种原因&#xff0c;一是钱没给到位&#xff1b;二是心里委屈了”。一句话就是…

笔记:计算机网络体系结构(OSI七层模型、TCP/IP五层协议)

计算机网络体系结构 计算机网络是一个复杂的、具有综合性技术的系统&#xff0c;它由计算机系统、通信处理机、通信线路和通信设备、操作系统以及网络协议等组成。为了更好地描述计算机网络结构&#xff0c;使计算机网络系统有条不紊地处理工作&#xff0c;需要定义一种较好的…

2023.04.30 学习周报

文章目录 摘要文献阅读1.题目2.摘要3.介绍4.本文贡献5.数据处理6.模型6.1 look - up操作6.2 LSTM6.3 周期模拟及额外因素 7.实验7.1 数据集7.2 基线7.3 实验表现 8.结论 ISOMAP1.基本思想2.欧氏距离3.折线近似曲线4.计算折线长度5.Floyd-Warshall算法6.ISOMAP算法7.总结 数学建…

Educoder/头歌JAVA——Java Web:基于JSP的网上商城

目录 一、商品列表 本关任务 具体要求 结果输出 实现代码 二、商品详情 本关任务 JDBC查询方法封装 商品相关信息介绍 具体要求 结果输出 实现代码 三、商品搜索 编程要求 测试说明 实现代码 四、购物车列表 本关任务 JDBC查询方法封装 购物车相关信息介绍…