Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源 分布式事务

news2025/1/21 15:46:39

Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源事务,分布式事务

文章目录

  • Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源事务,分布式事务
  • 0.前言
  • 1. 基础介绍
      • ConnectionFactory
      • AbstractRoutingDataSource 动态路由数据源的抽象类
    • DynamicLocalTransactionInterceptor 动态的本地事务拦截器
  • 3. 使用步骤示例
  • 4. 官方源码分析
  • 5. 参考资料

0.前言

背景
处理多数据源事务一直是一个复杂而棘手的问题,通常我们有两种主流的解决方法。

第一种是通过Atomikos手动创建多数据源事务,这种方法更适合数据源数量较少,参数配置不复杂,对性能要求不高的项目。然而,这种方法的最大困难在于需要手动配置大量设置,这可能会消耗大量时间。


第二种是通过使用Seata等分布式事务解决方案。这种方法的难点在于需要建立并维护像Seata-server这样的统一管理中心。

今天我们使用Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源 实现分布式事务和本地多数据源事务。
每种解决方案都有其适用的场景,然而在实际操作中,我经常接到如下的问题:
“我为什么在添加了事务注解之后,数据源切换还是失败了?”
“我了解到这涉及到分布式事务,但我并不想使用Seata。我的场景比较简单,有没有不需要依赖第三方的解决方案?”
这些问题突显出在现实工作中,我们可能需要更灵活、更简便的解决方案来处理多数据源事务问题。
在这里插入图片描述
在这里插入图片描述

1. 基础介绍

自从3.3.0开始,由seata的核心贡献者https://github.com/a364176773 贡献了基于connection代理的方案。
完整代码 https://github.com/baomidou/dynamic-datasource-spring-boot-starter/commit/f0cbad193528296eeb64faa76c79743afbdd811d
建议从3.4.0版本开始使用,其修复了一个功能,老版本不加@DS只加@DSTransactional会报错。
在这里插入图片描述
核心的几处代码

    @Role(value = BeanDefinition.ROLE_INFRASTRUCTURE)
    @ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX, name = "seata", havingValue = "false",
        matchIfMissing = true)
    @Bean
    public Advisor localTransactionAdvisor() {
        AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
        pointcut.setExpression("@annotation(com.baomidou.dynamic.datasource.annotation.DSTransactional)");
        return new DefaultPointcutAdvisor(pointcut, new DynamicTransactionAdvisor());
    }
 

我们可以看到通过spring.datasource.dynamic.seata=true配置来启用条件注解。这个是dynamic-datasource支持seata事务的开发和入口。

ConnectionFactory

ConnectionFactory 是一个工厂类,主要的作用是管理数据库连接,并提供获取和存储数据库连接的功能。

  1. 存储每个线程独立的数据库连接:ConnectionFactory 使用 ThreadLocal 为每个线程提供其自己的数据库连接池,这样可以防止在多线程环境中数据库连接的混乱。

  2. 提供获取数据库连接的方法:ConnectionFactory 提供 getConnection 方法,使得在同一个线程中的多个模块可以共享同一个数据库连接。

  3. 提供存储数据库连接的方法:ConnectionFactory 提供 putConnection 方法,可以存储新的数据库连接到当前线程的数据库连接池中。

  4. 提供通知数据库连接的方法:ConnectionFactory 提供 notify 方法,可以对当前线程的所有数据库连接进行统一的操作,比如提交或者回滚事务。

通过这些功能,ConnectionFactory 实现了数据库连接的有效管理,保证了在同一线程中对多个数据库进行操作时,可以共享同一连接,实现事务管理。核心代码如下。大家可以借鉴

package com.baomidou.dynamic.datasource.tx;

import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author funkye
 */
public class ConnectionFactory {

    // 使用ThreadLocal来保存与当前线程相关的数据库连接信息,以Map形式存储,Map中的key为数据源名称,value为对应的数据库连接代理类
    private static final ThreadLocal<Map<String, ConnectionProxy>> CONNECTION_HOLDER =
            new ThreadLocal<Map<String, ConnectionProxy>>() {
                @Override
                protected Map<String, ConnectionProxy> initialValue() {
                    return new ConcurrentHashMap<>(8);
                }
            };

    // 存储数据库连接到当前线程的连接池中,如果当前线程的连接池中没有该数据源的连接,则新建一个并放入
    public static void putConnection(String ds, ConnectionProxy connection) {
        Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
        if (!concurrentHashMap.containsKey(ds)) {
            try {
                connection.setAutoCommit(false);
            } catch (SQLException e) {
                e.printStackTrace();
            }
            concurrentHashMap.put(ds, connection);
        }
    }

    // 从当前线程的连接池中获取指定数据源的数据库连接
    public static ConnectionProxy getConnection(String ds) {
        return CONNECTION_HOLDER.get().get(ds);
    }

    // 对当前线程的所有数据库连接执行通知操作,根据参数state决定是提交还是回滚,如果在执行过程中发生错误,则在所有连接处理完后抛出
    public static void notify(Boolean state) throws Exception {
        Exception exception = null;
        try {
            Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();
            for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {
                try {
                    connectionProxy.notify(state);
                } catch (SQLException e) {
                    exception = e;
                }
            }
        } finally {
            CONNECTION_HOLDER.remove(); //清除当前线程的连接池
            if (exception != null) {
                throw exception;
            }
        }
    }

}

AbstractRoutingDataSource 动态路由数据源的抽象类

动态路由数据源的抽象类,用于根据不同的业务需要,动态地选择需要使用的数据源。关键的方法是getConnection()getConnection(String username, String password),这两个方法会根据当前是否存在全局事务来动态地选择获取原始的数据库连接还是数据库连接代理。

public abstract class AbstractRoutingDataSource extends AbstractDataSource {

    // 抽象方法,子类需要实现该方法以确定数据源
    protected abstract DataSource determineDataSource();

    // 抽象方法,子类需要实现该方法以确定默认的数据源名称
    protected abstract String getPrimary();

    // 获取数据库连接,根据事务上下文中是否有XID来判断是否需要获取代理连接
    @Override
    public Connection getConnection() throws SQLException {
        String xid = TransactionContext.getXID();
        if (StringUtils.isEmpty(xid)) {
            // 如果没有XID,说明当前不处于全局事务中,直接获取原始连接
            return determineDataSource().getConnection();
        } else {
            // 如果有XID,说明当前处于全局事务中,需要获取代理连接
            String ds = DynamicDataSourceContextHolder.peek();
            ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;
            ConnectionProxy connection = ConnectionFactory.getConnection(ds);
            return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;
        }
    }

    // 与上面的方法类似,只不过这个方法可以传入用户名和密码来获取数据库连接
    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        String xid = TransactionContext.getXID();
        if (StringUtils.isEmpty(xid)) {
            return determineDataSource().getConnection(username, password);
        } else {
            String ds = DynamicDataSourceContextHolder.peek();
            ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;
            ConnectionProxy connection = ConnectionFactory.getConnection(ds);
            return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection(username, password))
                    : connection;
        }
    }

    // 创建数据库连接代理,并将代理连接放入连接工厂
    private Connection getConnectionProxy(String ds, Connection connection) {
        ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds);
        ConnectionFactory.putConnection(ds, connectionProxy);
        return connectionProxy;
    }

    // 获取指定类型的代理对象
    @Override
    @SuppressWarnings("unchecked")
    public <T> T unwrap(Class<T> iface) throws SQLException {
        if (iface.isInstance(this)) {
            return (T) this;
        }
        return determineDataSource().unwrap(iface);
    }

    // 判断是否是指定类型的代理对象
    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return (iface.isInstance(this) || determineDataSource().isWrapperFor(iface));
    }
}

DynamicLocalTransactionInterceptor 动态的本地事务拦截器

动态的本地事务拦截器。基本思想是在方法调用前后添加事务处理的逻辑。当这个拦截器被应用到某个方法时,那么在调用这个方法时,会首先检查当前是否已经存在事务,如果存在则直接调用原始方法。如果不存在,则会先开启一个新的事务,然后调用原始方法,方法结束后根据方法执行的结果来提交或回滚事务。入口在这,看一眼就懂了。
在这里插入图片描述

// 实现MethodInterceptor接口定义拦截器
public class DynamicLocalTransactionInterceptor implements MethodInterceptor {

    @Override
    // invoke方法会在原方法执行前后进行拦截
    public Object invoke(MethodInvocation methodInvocation) throws Throwable {
        // 如果当前上下文中已存在事务,则直接调用原方法,不进行拦截处理
        if (!StringUtils.isEmpty(TransactionContext.getXID())) {
            return methodInvocation.proceed();
        }
        // 定义一个状态标志,标记事务是否执行成功
        boolean state = true;
        Object o;
        // 开启一个新的事务
        LocalTxUtil.startTransaction();
        try {
            // 调用原始方法
            o = methodInvocation.proceed();
        } catch (Exception e) {
            // 如果原方法执行抛出异常,则标记事务执行失败
            state = false;
            throw e;
        } finally {
            // 根据事务执行状态,提交或回滚事务
            if (state) {
                LocalTxUtil.commit();
            } else {
                LocalTxUtil.rollback();
            }
        }
        // 返回原方法的执行结果
        return o;
    }
}

3. 使用步骤示例

官方示例:https://github.com/dynamic-datasource/dynamic-datasource-samples/tree/master/tx-samples/tx-local-sample

完整示例项目 数据库都已准备好,可以直接运行测试。http://localhost:8080/doc.html

示例项目A,B,C分别对应OrderService,ProductService,AccountService。分别是独立的数据库。

用户下单分别调用产品库扣库存,账户库扣余额。
如果库存不足,或用户余额不足都抛出RuntimeException,触发整体回滚。

@Slf4j
@Service
@AllArgsConstructor
public class OrderService {

    private final OrderMapper orderMapper;
    private final AccountService accountService;
    private final ProductService productService;
    
    //@DS("order") 这里不需要,因为order是默认库,如果开启事务的不是默认库则必须加
    @DSTransactional //注意这里开启事务
    public void placeOrder(PlaceOrderRequest request) {
        log.info("=============ORDER START=================");
        Long userId = request.getUserId();
        Long productId = request.getProductId();
        Integer amount = request.getAmount();
        log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);

        log.info("当前 XID: {}", TransactionContext.getXID());

        Order order = Order.builder()
                .userId(userId)
                .productId(productId)
                .status(OrderStatus.INIT)
                .amount(amount)
                .build();

        orderMapper.insert(order);
        log.info("订单一阶段生成,等待扣库存付款中");
        // 扣减库存并计算总价
        Double totalPrice = productService.reduceStock(productId, amount);
        // 扣减余额
        accountService.reduceBalance(userId, totalPrice);

        order.setStatus(OrderStatus.SUCCESS);
        order.setTotalPrice(totalPrice);
        orderMapper.updateById(order);
        log.info("订单已成功下单");
        log.info("=============ORDER END=================");
    }
}
@Slf4j
@Service
@RequiredArgsConstructor
public class ProductService {

    private final ProductMapper productMapper;

    @DS("product")
    public Double reduceStock(Long productId, Integer amount) {
        log.info("=============PRODUCT START=================");
        log.info("当前 XID: {}", TransactionContext.getXID());

        // 检查库存
        Product product = productMapper.selectById(productId);
        Assert.notNull(product, "商品不存在");
        Integer stock = product.getStock();
        log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);

        if (stock < amount) {
            log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);
            throw new RuntimeException("库存不足");
        }
        log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());
        // 扣减库存
        int currentStock = stock - amount;
        product.setStock(currentStock);
        productMapper.updateById(product);
        double totalPrice = product.getPrice() * amount;
        log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);
        log.info("=============PRODUCT END=================");
        return totalPrice;
    }
}
@Slf4j
@Service
@RequiredArgsConstructor
public class AccountService {

    private final AccountMapper accountMapper;

    @DS("account")
    public void reduceBalance(Long userId, Double price) {
        log.info("=============ACCOUNT START=================");
        log.info("当前 XID: {}", TransactionContext.getXID());

        Account account = accountMapper.selectById(userId);
        Assert.notNull(account, "用户不存在");
        Double balance = account.getBalance();
        log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);

        if (balance < price) {
            log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);
            throw new RuntimeException("余额不足");
        }
        log.info("开始扣减用户 {} 余额", userId);
        double currentBalance = account.getBalance() - price;
        account.setBalance(currentBalance);
        accountMapper.updateById(account);
        log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);
        log.info("=============ACCOUNT END=================");
    }
}

4. 官方源码分析

5. 参考资料

  1. dynamic-datasource GitHub 仓库 ↗:dynamic-datasource 的官方 GitHub 仓库,包含源代码、文档和示例等资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/925207.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mac地址、ip地址、子网掩码、端口

1. mac地址 又称为网络适配器或者网络接口卡NIC&#xff0c;但是现在更多人原因使用更简单的名称"网卡"&#xff0c;通过网卡能够是不同的计算机之间相互连接&#xff0c;从而完成数据通信的功能 每一个网卡在出厂的时候 都会给分配到一个编号&#xff0c;类似与身份…

信创测试:创意与创新的评估之路

在当今竞争激烈的商业环境中&#xff0c;创意和创新成为企业脱颖而出的关键。为了有效评估和提升创意与创新的水平&#xff0c;信创测试被引入作为一个重要的工具。本文将介绍信创测试的概念、意义以及应用&#xff0c;探讨它在推动企业创新发展中的重要作用。 一、什么是信创测…

DOS常见命令

DOS常见命令 DOS是什么如何打开DOScmd常见的命令集合 DOS是什么 DOC命令是我们浏览器中的终端 &#xff0c;但不同的是我们打开软件的方式 使用的是点击文件图标&#xff0c;点击图标的同时 我们也相当于使用一个命令 只是我们看不见而已 在电脑上操作的时候 通常都是使用命令…

Docker微服务实战

文章目录 业务需求IDEA编写代码编写Dockerfile构建镜像运行容器网页端访问测试 业务需求 利用Docker部署应用服务&#xff0c;实现在网页端通过输入地址 ip:端口/hello/docker&#xff0c;页面显示hello docker ! IDEA编写代码 创建springboot项目 网上很多教程,此步骤省略……

Mybatis动态之灵活使用下

目录 ​编辑 前言&#xff1a; 1.mybatis的分页 1.1分页的应用场景 1.2分页的使用方式 2.mybatis中特殊字符处理 2.1mybatis中特殊字符介绍 2.2mybatis中特殊字符的使用方式 前言&#xff1a; 上篇我已经写了Mybatis动态之灵活使用&#xff0c;接着上篇写mybatis的分页…

QT6串口模块QSerialport的安装,主要是“编译器”版本问题

参考文档 https://blog.csdn.net/lidandan2016/article/details/85929069 https://blog.csdn.net/qq_42968012/article/details/126020554 https://blog.csdn.net/weixin_48467622/article/details/119982667 整体测试解决步骤总结 首先&#xff0c;QT6都不能进行离线安装&a…

Cocos独立游戏开发框架中的计时器管理器

引言 本系列是《8年主程手把手打造Cocos独立游戏开发框架》&#xff0c;欢迎大家关注分享收藏订阅。在独立游戏开发中&#xff0c;计时器是一个至关重要的组件&#xff0c;用于管理时间相关的操作&#xff0c;如动画效果、技能冷却、任务进度等。然而&#xff0c;随着游戏变得…

【王道-绪论-计算机系统概述】

#pic_center R 1 R_1 R1​ R 2 R^2 R2 目录 知识框架No.1 操作系统的概念功能和定义一、操作系统的概念和定义二、操作系统的功能和目标1、资源的管理者2、向用户提供服务2.1图形化用户接口2.2联机命令接口2.3脱机命令接口2.4程序接口 3、对硬件机器的拓展 三、总结 No.2 操作系…

QT6新建工程方式导致pro文件和无pro文件,

QT6创建工程文件的不同 cmake创建的无.pro工程 qmake创建的有.pro文件的工程

VMware Workstation 不支持在此主机上使用虚拟化性能计数器。

目录 问题描述&#xff1a;VMware Workstation 不支持在此主机上使用虚拟化性能计数器。解决办法&#xff1a; 问题描述&#xff1a;VMware Workstation 不支持在此主机上使用虚拟化性能计数器。 打开其他地方移植过来的虚拟机失败 解决办法&#xff1a; 编辑虚拟机设置&am…

配置门槛这么低,Fooocus你值得拥有!

简介&#xff1a; Fooocus是一个图像生成软件,可以用于生成各种图像。 其主要特点是: 离线使用,不依赖云服务开源免费,代码开放自动化了许多内部优化,简化用户操作只需要关注提示和图像,实现人机交互安装简单,兼容性强硬件需求低,可以在普通笔记本上运行性能优异,速度很快内…

Linux socket网络编程概述 和 相关API讲解

socket网络编程的步骤 大体上&#xff0c;连接的建立过程就是&#xff1a;服务器在确定协议类型后&#xff0c;向外广播IP地址和端口号&#xff0c;并监听等待&#xff0c;直到客户端获取了IP地址和端口号并成功连接&#xff1a; 使用socket来进行tcp协议的网络编程的大体步骤…

53 个 CSS 特效 2

53 个 CSS 特效 2 这里是第 17 到 32 个&#xff0c;跟上一部分比起来多了两个稍微大一点的首页布局&#xff0c;上篇&#xff1a;53 个 CSS 特效 1&#xff0c;依旧&#xff0c;预览地址在 http://www.goldenaarcher.com/html-css-js-proj/&#xff0c;git 地址&#xff1a; …

Redis数据结构之Set

Set 类型是一个无序并唯一的键值集合&#xff0c;它的存储顺序不会按照插入的先后顺序进行存储。Redis 中集合是通过哈希表实现的&#xff0c;所以添加&#xff0c;删除&#xff0c;查找的复杂度都是 O(1)。相对于列表&#xff0c;集合也有两个特点&#xff1a;无序、不可重复 …

docker安装redis并持久化数据

1. 创建挂载目录 sudo mkdir -p /home/redis/conf sudo mkdir -p /home/redis/data sudo touch /home/redis/conf/redis.confcat > /home/redis/conf/redis.conf << EOF appendonly yes EOFrootk8s-master:/home/redis# ls conf data rootk8s-master:/home/redis# t…

基于SSM的小说网站的设计与实现(论文+源码)

目 录 1 绪论................................................................................................... 1 1.1 项目背景................................................................................................................ 1 1.2 发展历程..…

ssm+vue毕业论文管理系统源码和论文

ssmvue毕业论文管理系统053 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm 摘 要 高校规模越来越大&#xff0c;学生越来越多&#xff0c;每年都有大批的大学生完成学业。毕业之前&#xff0c;各大高校设立…

【经验贴】大型复杂项目的风险管理如何做?

同事小李在年初的时候&#xff0c;接了一个大型项目&#xff0c;以为今年的年终奖都靠它了&#xff0c;结果现在面临着超支、超期的风险&#xff0c;各种风险及问题频发。前段时间经常跟我们沟通讨论&#xff0c;在大家的建议下&#xff0c;项目才逐渐步入正轨&#xff0c;这次…

Linux_4_文本处理工具和正则表达式

目录 1文本编辑工具之神VIM1.1 vi和vim简介1.2使用vim1.2.1 vim 命令格式1.2.2三种主要模式和转换 1.3扩展命令模式1.3.1扩展命令模式基本命令1.3.2 地址定界1.3.3查找并替换1.3.4定制vim的工作特性1.3.4.1行号1.3.4.2忽略字符的大小写1.3.4.3白动缩进1.3.4.4复制粘贴保留格式1…

如何使用IDEA链接数据库并自动生成POJO类?

如何使用IDEA链接数据库并自动生成POJO类&#xff1f; 在 IntelliJ IDEA 中使用数据库连接和自动生成 POJO 类的步骤如下&#xff1a; 1.打开 IntelliJ IDEA 并选择要打开的项目。 2.在顶部菜单中选择 “View” > “Tool Windows” > “Database”&#xff0c;打开数据…