【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理

news2024/12/29 10:59:56

【SpringBoot应用篇】SpringBoot集成atomikos实现多数据源配置和分布式事务管理

  • 分布式事务概念
  • XA和JTA概述
  • SpringBoot集成atomikos
    • 数据库结构
    • pom
    • 通用工具类
      • R
      • BaseController
      • BaseExceptionCode
      • ExceptionCode
      • BaseException
      • BaseUncheckedException
      • BizException
    • application.yml
    • 数据源配置类
      • OrderXADataSourceConfig
      • UserXADataSourceConfig
    • 实体类
      • Order
      • User
    • Mapper
      • OrderMapper
      • UserMapper
    • Service
      • OrderService
        • 测试分布式事务
      • UserService
        • 测试分布式事务
    • Controller
      • OrderController
      • UserController
    • 启动类

分布式事务概念

讨论分布式事务之前我们分清两个概念:本地事务分布式事务

本地事务是解决单个数据源上的数据操作的一致性问题的话,而分布式事务则是为了解决跨越多个数据源上数据操作的一致性问题。

百度官方对分布式事务的定义是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

也就是说我们在操作一个业务逻辑过程中,涉及两个数据源(A、B),且很多时候A、B这两个数据源属于两个不同的物理环境。当我们操作A数据源过程中出现异常情况,那么必须让针对B数据源的操作回滚,同时A数据源的操作也回滚。

在Java开发过程中事务一般使用Spring为我们提供了方便的声明式事务方法@transactional。但是默认的Spring事务只支持单数据源,而实际上一个系统往往需要写多个数据源,这个时候我们就需要考虑如何通过Spring实现对分布式事务的支持。
SpringBoot官方提供推荐了Atomikos和 Bitronix两种无需服务器支持的分布式事务组件

JAVA领域中针对分布式事务的解决方案就是JTA(即Java Transaction API);

XA和JTA概述

XA 是由 X/Open 组织提出的分布式事务的一种协议(或者称之为分布式架构)。它主要定义了两部分的管理器,全局事务管理器及资源管理器。在 XA 的设计理念中,把不同资源纳入到一个事务管理器进行统一管理,例如数据库资源,消息中间件资源等,从而进行全部资源的事务提交或者取消,目前主流的数据库,消息中间件都支持 XA 协议。

JTA 叫做 Java Transaction API,它是 XA 协议的 JAVA 实现。目前在 JAVA 里面,关于 JTA 的定义主要是两部分

  • 事务管理器接口-----javax.transaction.TransactionManager
  • 资源管理器接口-----javax.transaction.xa.XAResource

在一般应用采用 JTA 接口实现事务,需要一个外置的 JTA 容器来存储这些事务,像 Tomcat。今天我们要讲的是 Atomikos,它是一个独立实现了 JTA 的框架,能够在我们的应用服务器中运行 JTA 事务。

SpringBoot集成atomikos

在这里插入图片描述

数据库结构

在这里插入图片描述

CREATE TABLE `tb_order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单id',
  `user_id` bigint DEFAULT NULL COMMENT '用户id',
  `name` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '商品名称',
  `price` bigint DEFAULT NULL COMMENT '商品价格',
  `num` int DEFAULT '0' COMMENT '商品数量',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=137 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=COMPACT;

添加了username为唯一索引,方便后面测试多数据插库异常事务回滚

CREATE TABLE `tb_user` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `username` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '收件人',
  `address` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '地址',
  PRIMARY KEY (`id`),
  UNIQUE KEY `idx_uername` (`username`)
) ENGINE=InnoDB AUTO_INCREMENT=34 DEFAULT CHARSET=utf8mb3 ROW_FORMAT=COMPACT;

pom

技术栈版本号
springboot2.3.2.RELEASE
druid1.1.10
mysql驱动8.0.33
mybatis-plus3.1.1
<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.3.2.RELEASE</version>
</parent>

<dependencies>
    <!-- druid-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.1.10</version>
    </dependency>
    <!-- druid-->
    <!-- mysql-connector-->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
       <version>8.0.33</version>
    </dependency>
    <!-- mysql-connector-->
    <!-- mybatis-plus-->
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.1.1</version>
    </dependency>
    <!-- mybatis-plus-->

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>2.0.25</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>32.0.0-jre</version>
    </dependency>
</dependencies>


<build>
    <resources>
        <resource>
            <directory>src/main/java</directory>
            <includes>
                <include>**/*.xml</include>
            </includes>
            <filtering>false</filtering>
        </resource>
    </resources>
</build>

通用工具类

R

/**
 * @ClassName: R
 * @Description: 统一返回实体
 */
@Getter
@Setter
@SuppressWarnings({"AlibabaClassNamingShouldBeCamel"})
@Accessors(chain = true)
public class R<T> {
    public static final String DEF_ERROR_MESSAGE = "系统繁忙,请稍候再试";
    public static final String HYSTRIX_ERROR_MESSAGE = "请求超时,请稍候再试";
    public static final int SUCCESS_CODE = 0;
    public static final int FAIL_CODE = -1;
    public static final int TIMEOUT_CODE = -2;
    /**
     * 统一参数验证异常
     */
    public static final int VALID_EX_CODE = -9;
    public static final int OPERATION_EX_CODE = -10;
    /**
     * 调用是否成功标识,0:成功,-1:系统繁忙,此时请开发者稍候再试 详情见[ExceptionCode]
     */
    private int code;

    /**
     * 调用结果
     */
    private T data;

    /**
     * 结果消息,如果调用成功,消息通常为空T
     */
    private String msg = "ok";


    private String path;
    /**
     * 附加数据
     */
    private Map<String, Object> extra;

    /**
     * 响应时间
     */
    private long timestamp = System.currentTimeMillis();

    private R() {
        super();
    }

    public R(int code, T data, String msg) {
        this.code = code;
        this.data = data;
        this.msg = msg;
    }

    public static <E> R<E> result(int code, E data, String msg) {
        return new R<>(code, data, msg);
    }

    /**
     * 请求成功消息
     *
     * @param data 结果
     * @return RPC调用结果
     */
    public static <E> R<E> success(E data) {
        return new R<>(SUCCESS_CODE, data, "ok");
    }

    public static R<Boolean> success() {
        return new R<>(SUCCESS_CODE, true, "ok");
    }

    /**
     * 请求成功方法 ,data返回值,msg提示信息
     *
     * @param data 结果
     * @param msg  消息
     * @return RPC调用结果
     */
    public static <E> R<E> success(E data, String msg) {
        return new R<>(SUCCESS_CODE, data, msg);
    }

    /**
     * 请求失败消息
     *
     * @param msg
     * @return
     */
    public static <E> R<E> fail(int code, String msg) {
        return new R<>(code, null, (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg);
    }

    public static <E> R<E> fail(String msg) {
        return fail(OPERATION_EX_CODE, msg);
    }

    public static <E> R<E> fail(String msg, Object... args) {
        String message = (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg;
        return new R<>(OPERATION_EX_CODE, null, String.format(message, args));
    }

    public static <E> R<E> fail(BaseExceptionCode exceptionCode) {
        return validFail(exceptionCode);
    }

    public static <E> R<E> fail(BizException exception) {
        if (exception == null) {
            return fail(DEF_ERROR_MESSAGE);
        }
        return new R<>(exception.getCode(), null, exception.getMessage());
    }

    /**
     * 请求失败消息,根据异常类型,获取不同的提供消息
     *
     * @param throwable 异常
     * @return RPC调用结果
     */
    public static <E> R<E> fail(Throwable throwable) {
        return fail(FAIL_CODE, throwable != null ? throwable.getMessage() : DEF_ERROR_MESSAGE);
    }

    public static <E> R<E> validFail(String msg) {
        return new R<>(VALID_EX_CODE, null, (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg);
    }

    public static <E> R<E> validFail(String msg, Object... args) {
        String message = (msg == null || msg.isEmpty()) ? DEF_ERROR_MESSAGE : msg;
        return new R<>(VALID_EX_CODE, null, String.format(message, args));
    }

    public static <E> R<E> validFail(BaseExceptionCode exceptionCode) {
        return new R<>(exceptionCode.getCode(), null,
                (exceptionCode.getMsg() == null || exceptionCode.getMsg().isEmpty()) ? DEF_ERROR_MESSAGE : exceptionCode.getMsg());
    }

    public static <E> R<E> timeout() {
        return fail(TIMEOUT_CODE, HYSTRIX_ERROR_MESSAGE);
    }


    public R<T> put(String key, Object value) {
        if (this.extra == null) {
            this.extra = Maps.newHashMap();
        }
        this.extra.put(key, value);
        return this;
    }

    /**
     * 逻辑处理是否成功
     *
     * @return 是否成功
     */
    public Boolean getIsSuccess() {
        return this.code == SUCCESS_CODE || this.code == 200;
    }

    /**
     * 逻辑处理是否失败
     *
     * @return
     */
    public Boolean getIsError() {
        return !getIsSuccess();
    }

    @Override
    public String toString() {
        return JSONObject.toJSONString(this);
    }
}

BaseController

/**
 * @ClassName: BaseController
 * @Description: controller 抽象基类
 */
public abstract class BaseController {
    /**
     * 成功返回
     *
     * @param data
     * @return
     */
    public <T> R<T> success(T data) {
        return R.success(data);
    }

    public R<Boolean> success() {
        return R.success();
    }

    /**
     * 失败返回
     *
     * @param msg
     * @return
     */
    public <T> R<T> fail(String msg) {
        return R.fail(msg);
    }

    public <T> R<T> fail(String msg, Object... args) {
        return R.fail(msg, args);
    }

    /**
     * 失败返回
     *
     * @param code
     * @param msg
     * @return
     */
    public <T> R<T> fail(int code, String msg) {
        return R.fail(code, msg);
    }

    public <T> R<T> fail(BaseExceptionCode exceptionCode) {
        return R.fail(exceptionCode);
    }

    public <T> R<T> fail(BizException exception) {
        return R.fail(exception);
    }

    public <T> R<T> fail(Throwable throwable) {
        return R.fail(throwable);
    }

    public <T> R<T> validFail(String msg) {
        return R.validFail(msg);
    }

    public <T> R<T> validFail(String msg, Object... args) {
        return R.validFail(msg, args);
    }

    public <T> R<T> validFail(BaseExceptionCode exceptionCode) {
        return R.validFail(exceptionCode);
    }
}

BaseExceptionCode


/**
 * @ClassName: BaseExceptionCode
 * @Description: 公共异常编码类
 */
public interface BaseExceptionCode {
    /**
     * 异常编码
     *
     * @return
     */
    int getCode();

    /**
     * 异常消息
     * @return
     */
    String getMsg();
}

ExceptionCode

/**
 * 全局错误码 10000-15000
 * <p>
 * 预警异常编码    范围: 30000~34999
 * 标准服务异常编码 范围:35000~39999
 * 邮件服务异常编码 范围:40000~44999
 * 短信服务异常编码 范围:45000~49999
 * 权限服务异常编码 范围:50000-59999
 * 文件服务异常编码 范围:60000~64999
 * 日志服务异常编码 范围:65000~69999
 * 消息服务异常编码 范围:70000~74999
 * 开发者平台异常编码 范围:75000~79999
 * 搜索服务异常编码 范围:80000-84999
 * 共享交换异常编码 范围:85000-89999
 * 移动终端平台 异常码 范围:90000-94999
 * <p>
 * 安全保障平台    范围:        95000-99999
 * 软硬件平台 异常编码 范围:    100000-104999
 * 运维服务平台 异常编码 范围:  105000-109999
 * 统一监管平台异常 编码 范围:  110000-114999
 * 认证方面的异常编码  范围:115000-115999
 *
 */
public enum ExceptionCode implements BaseExceptionCode {

    //系统相关 start
    SUCCESS(0, "成功"),
    SYSTEM_BUSY(-1, "系统繁忙~请稍后再试~"),
    SYSTEM_TIMEOUT(-2, "系统维护中~请稍后再试~"),
    PARAM_EX(-3, "参数类型解析异常"),
    SQL_EX(-4, "运行SQL出现异常"),
    NULL_POINT_EX(-5, "空指针异常"),
    ILLEGALA_ARGUMENT_EX(-6, "无效参数异常"),
    MEDIA_TYPE_EX(-7, "请求类型异常"),
    LOAD_RESOURCES_ERROR(-8, "加载资源出错"),
    BASE_VALID_PARAM(-9, "统一验证参数异常"),
    OPERATION_EX(-10, "操作异常"),


    OK(200, "OK"),
    BAD_REQUEST(400, "错误的请求"),
    /**
     * {@code 401 Unauthorized}.
     *
     * @see <a href="http://tools.ietf.org/html/rfc7235#section-3.1">HTTP/1.1: Authentication, section 3.1</a>
     */
    UNAUTHORIZED(401, "未经授权"),
    /**
     * {@code 404 Not Found}.
     *
     * @see <a href="http://tools.ietf.org/html/rfc7231#section-6.5.4">HTTP/1.1: Semantics and Content, section 6.5.4</a>
     */
    NOT_FOUND(404, "没有找到资源"),
    METHOD_NOT_ALLOWED(405, "不支持当前请求类型"),

    TOO_MANY_REQUESTS(429, "请求超过次数限制"),
    INTERNAL_SERVER_ERROR(500, "内部服务错误"),
    BAD_GATEWAY(502, "网关错误"),
    GATEWAY_TIMEOUT(504, "网关超时"),
    //系统相关 end

    REQUIRED_FILE_PARAM_EX(1001, "请求中必须至少包含一个有效文件"),
    //jwt token 相关 start

    JWT_TOKEN_EXPIRED(40001, "会话超时,请重新登录"),
    JWT_SIGNATURE(40002, "不合法的token,请认真比对 token 的签名"),
    JWT_ILLEGAL_ARGUMENT(40003, "缺少token参数"),
    JWT_GEN_TOKEN_FAIL(40004, "生成token失败"),
    JWT_PARSER_TOKEN_FAIL(40005, "解析token失败"),
    JWT_USER_INVALID(40006, "用户名或密码错误"),
    JWT_USER_ENABLED(40007, "用户已经被禁用!"),
    //jwt token 相关 end

    ;

    private int code;
    private String msg;

    ExceptionCode(int code, String msg) {
        this.code = code;
        this.msg = msg;
    }

    @Override
    public int getCode() {
        return code;
    }

    @Override
    public String getMsg() {
        return msg;
    }


    public ExceptionCode build(String msg, Object... param) {
        this.msg = String.format(msg, param);
        return this;
    }

    public ExceptionCode param(Object... param) {
        msg = String.format(msg, param);
        return this;
    }
}


BaseException

/**
 * @ClassName: BaseException
 * @Description: 异常接口类
 */
public interface BaseException {

    /**
     * 统一参数验证异常码
     */
    int BASE_VALID_PARAM = -9;

    /**
     * 返回异常信息
     *
     * @return
     */
    String getMessage();

    /**
     * 返回异常编码
     *
     * @return
     */
    int getCode();
}

BaseUncheckedException

/**
 * @ClassName: BaseUncheckedException
 * @Description: 非运行期异常基类,所有自定义非运行时异常继承该类
 */
public class BaseUncheckedException extends RuntimeException implements BaseException {

    private static final long serialVersionUID = -778887391066124051L;

    /**
     * 异常信息
     */
    protected String message;

    /**
     * 具体异常码
     */
    protected int code;

    public BaseUncheckedException(int code, String message) {
        super(message);
        this.code = code;
        this.message = message;
    }

    public BaseUncheckedException(int code, String format, Object... args) {
        super(String.format(format, args));
        this.code = code;
        this.message = String.format(format, args);
    }


    @Override
    public String getMessage() {
        return message;
    }
    @Override
    public int getCode() {
        return code;
    }
}

BizException

/**
 * @ClassName: BizException
 * @Description: 业务异常 用于在处理业务逻辑时,进行抛出的异常。
 */
public class BizException extends BaseUncheckedException {

    private static final long serialVersionUID = -3843907364558373817L;

    public BizException(String message) {
        super(-1, message);
    }

    public BizException(int code, String message) {
        super(code, message);
    }

    public BizException(int code, String message, Object... args) {
        super(code, message, args);
    }

    /**
     * 实例化异常
     *
     * @param code    自定义异常编码
     * @param message 自定义异常消息
     * @param args    已定义异常参数
     * @return
     */
    public static BizException wrap(int code, String message, Object... args) {
        return new BizException(code, message, args);
    }

    public static BizException wrap(String message, Object... args) {
        return new BizException(-1, message, args);
    }

    public static BizException validFail(String message, Object... args) {
        return new BizException(-9, message, args);
    }

    public static BizException wrap(BaseExceptionCode ex) {
        return new BizException(ex.getCode(), ex.getMsg());
    }

    @Override
    public String toString() {
        return "BizException [message=" + message + ", code=" + code + "]";
    }
}

application.yml

spring:
  datasource:
    druid:
      order:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/cloud_order?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true
        user: root
        password: root
      user:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/cloud_user?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true
        user: root
        password: root



mybatis-plus:
  #mybatis日志
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

logging:
  level:
    cn.zysheep.dao: debug

数据源配置类

OrderXADataSourceConfig

/**
 * @ClassName: OrderXADataSourceConfig
 * @Description: mybatis配置类 Order
 */
@Configuration
@MapperScan(basePackages = OrderXADataSourceConfig.BASE_PACKAGES, sqlSessionTemplateRef = "orderSqlSessionTemplate")
public class OrderXADataSourceConfig {
    /**
     * 扫描mapper接口包
     */
    static final String BASE_PACKAGES = "cn.zysheep.dao.order";
    /**
     * 扫描的mapper配置文件路径
     */
    private static final String MAPPER_LOCATION = "classpath:/mapper/order/*Mapper.xml";

    /**
     * 将这个对象放入spring容器中(交给Spring管理)
     * @ConfigurationProperties 自动配置属性
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.druid.order")
    public XADataSource getDataSourceOrder(){
        // 创建XA连接池
        return new MysqlXADataSource();
    }

    /**
     * 创建Atomikos数据源
     * 注解@DependsOn("druidXADataSourcePre"),在名为druidXADataSourcePre的bean实例化后加载当前bean
     * @param xaDataSource
     * @return
     */
    @Bean
    @DependsOn("getDataSourceOrder")
    @Primary
    public DataSource dataSourceOrder(@Qualifier("getDataSourceOrder") XADataSource xaDataSource){
        //这里的AtomikosDataSourceBean使用的是spring提供的
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        // 必须为数据源指定唯一标识
        atomikosDataSourceBean.setUniqueResourceName("dataSourceOrder");
        atomikosDataSourceBean.setPoolSize(5);
        atomikosDataSourceBean.setTestQuery("select 1");
        atomikosDataSourceBean.setBorrowConnectionTimeout(3);
        atomikosDataSourceBean.setXaDataSource(xaDataSource);

        return atomikosDataSourceBean;
    }

    /**
     * 创建 SqlSessionFactory
     * @return
     * @throws Exception
     */
    @Bean
    @Primary
    public SqlSessionFactory orderSqlSessionFactory(@Qualifier("dataSourceOrder") DataSource dataSource) throws Exception{
        // 用来创建 SqlSessionFactory 等同于下面配置
//        <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
//            <property name="dataSource" ref="dataSource" />
//            <property name="mapperLocations" value="classpath:mybatis-mapper/*.xml"/>
//        </bean>

        // 在配置sqlSession工厂类的时候,创建的是MybatisSqlSessionFactoryBean,是为了能够正常使用Mybatis-Plus组件的基本功能,比如通用的crud语句绑定
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        // 设置mybatis的xml所在位置(扫描mybatis的相关xml文件,装配到容器中)
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
        //手动设置session工厂时,需要手动添加分页插件
        Interceptor[] plugins = new Interceptor[1];
        plugins[0] = new PaginationInterceptor();
        sqlSessionFactoryBean.setPlugins(plugins);

        return sqlSessionFactoryBean.getObject();
    }

    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     * @param sqlSessionFactory
     * @return
     */
    @Bean
    @Primary
    public SqlSessionTemplate orderSqlSessionTemplate(@Qualifier("orderSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

UserXADataSourceConfig

/**
 * @ClassName: UserXADataSourceConfig
 * @Description: mybatis配置类 User
 */
@Configuration
@MapperScan(basePackages = UserXADataSourceConfig.BASE_PACKAGES, sqlSessionTemplateRef = "userSqlSessionTemplate")
public class UserXADataSourceConfig {
    /**
     * 扫描mapper接口包
     */
    static final String BASE_PACKAGES = "cn.zysheep.dao.user";
    /**
     * 扫描的mapper配置文件路径
     */
    private static final String MAPPER_LOCATION = "classpath:/mapper/user/*Mapper.xml";


    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.druid.user")
    public XADataSource getDataSourceUser(){
        // 创建XA连接池
        return new MysqlXADataSource();
    }

    @Bean
    @DependsOn("getDataSourceUser")
    public DataSource dataSourceUser(@Qualifier("getDataSourceUser") XADataSource xaDataSource){
        //这里的AtomikosDataSourceBean使用的是spring提供的
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setUniqueResourceName("dataSourceUser");
        atomikosDataSourceBean.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        atomikosDataSourceBean.setTestQuery("select 1");
        atomikosDataSourceBean.setBorrowConnectionTimeout(3);
        atomikosDataSourceBean.setXaDataSource(xaDataSource);
        return atomikosDataSourceBean;
    }

    /**
     * 创建 SqlSessionFactory
     * @return
     * @throws Exception
     */
    @Bean
    @Primary
    public SqlSessionFactory userSqlSessionFactory(@Qualifier("dataSourceUser") DataSource dataSource) throws Exception{
        // 用来创建 SqlSessionFactory 等同于下面配置
//        <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
//            <property name="dataSource" ref="dataSource" />
//            <property name="mapperLocations" value="classpath:mybatis-mapper/*.xml"/>
//        </bean>

        // 在配置sqlSession工厂类的时候,创建的是MybatisSqlSessionFactoryBean,是为了能够正常使用Mybatis-Plus组件的基本功能,比如通用的crud语句绑定
        MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        // 设置mybatis的xml所在位置(扫描mybatis的相关xml文件,装配到容器中)
        sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
        //手动设置session工厂时,需要手动添加分页插件
        Interceptor[] plugins = new Interceptor[1];
        plugins[0] = new PaginationInterceptor();
        sqlSessionFactoryBean.setPlugins(plugins);

        return sqlSessionFactoryBean.getObject();
    }

    /**
     * 通过 SqlSessionFactory 来创建 SqlSessionTemplate
     * @param sqlSessionFactory
     * @return
     */
    @Bean
    @Primary
    public SqlSessionTemplate userSqlSessionTemplate(@Qualifier("userSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        // SqlSessionTemplate是线程安全的,可以被多个DAO所共享使用
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

1、每个数据源对应一个配置类
2、每个配置类的@MapperScan注解不一样,各自对应自己mapper接口文件夹(这就是为什么要将不同数据源的mapper接口写在不同文件夹的原因了)
3、在配置sqlSession工厂类的时候,创建的是MybatisSqlSessionFactoryBean,是为了能够正常使用Mybatis-Plus组件的基本功能,比如通用的crud语句绑定。
4、配置工厂类的时候,需要指定各自mapper.xml存放的路径(这就是为什么要将不同数据源的mapper.xml写在不同文件夹的原因了)
5、配置工厂类的时候,需要手动将分页插件加进去。因为数据源相关的自动配置被我们关闭了,创建传统PaginationInterceptor类的方法已经不好使了

实体类

Order

@Builder
@Data
@TableName("tb_order")
public class Order {
    @TableId(type = IdType.AUTO)
    private Long id;
    @TableField("user_id")
    private Long userId;
    @TableField("name")
    private String name;
    @TableField("price")
    private Long price;
    @TableField("num")
    private Integer num;
}

User

@Builder
@Data
@TableName("tb_user")
public class User {
    @TableId(type = IdType.AUTO)
    private Long id;
    @TableField("username")
    private String username;
    @TableField("address")
    private String address;
}

Mapper

OrderMapper

在这里插入图片描述

public interface OrderMapper extends BaseMapper<Order> {
}

配置文件 OrderMapper.xml,Mapper类路径、和配置路径必须数据源配置类的路径一致否则会报错
在这里插入图片描述

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.zysheep.dao.order.OrderMapper">

</mapper>

UserMapper

public interface UserMapper extends BaseMapper<User> {
}

配置文件UserMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.zysheep.dao.user.UserMapper">

</mapper>

Service

OrderService

public interface OrderService extends IService<Order> {
    /**
     * 保存订单
     */
    void saveOrder() throws Exception;
}
@Service
@AllArgsConstructor
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {

    private final OrderMapper orderMapper;

    private final UserMapper userMapper;

    /**
     * 实现多数据库操作
     * @return
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void saveOrder() throws Exception {
        // order数据源
        Order order = Order.builder()
                .userId(1000L)
                .name("Apple 苹果 iPhone 12 ")
                .price(699900L)
                .num(1).build();
        orderMapper.insert(order);

        // user数据源
        User user = User.builder()
                .id(1001L)
                .address("长沙")
                .username("封于修").build();

        userMapper.insert(user);

        // throw new Exception("12312");
    }
}

测试分布式事务

atomikos多数据源分布式事务和Spring声明式事务使用方法一样,类或方法加@Transactional注解。

1、正常保存

2、order数据源保存成功,user数据源保存成功,方法其他地方抛出异常,方法事务回滚

3、order数据源保存成功,user数据源保存失败,方法事务回滚

4、order数据源保存失败,user数据源保存不执行,方法事务回滚

UserService

public interface UserService extends IService<User> {
    /**
     * 保存用户
     * @throws Exception
     */
    void saveUser() throws Exception;
}
@Service
@AllArgsConstructor
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService {

    private final OrderMapper orderMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void saveUser() throws Exception {
        // order数据源  抛出异常,方法事务回滚
        Order order = Order.builder()
                .userId(1000L)
                .name("Apple 苹果 iPhone 12 ")
                .price(699900L)
                .num(1).build();
        orderMapper.insert(order);


        // 2、user数据源 抛出异常,方法事务回滚
        User user = User.builder()
                .id(1001L)
                .address("长沙")
                .username("封于修").build();

        saveBatch(Collections.singletonList(user));

        // 1、主方法抛出异常,方法事务回滚
        // throw new Exception("12312");
    }
}

测试分布式事务

atomikos多数据源分布式事务和Spring声明式事务使用方法一样,类或方法加@Transactional注解。

这里主要测试Mybaits-Plus提供的批量新增是否支持 atomikos多数据源分布式事务,测试是方法内部其他数据源发生异常事务是可以回滚的;

Controller

OrderController

@RestController
@RequestMapping("/order")
@AllArgsConstructor
public class OrderController extends BaseController {

    private final OrderService orderService;

    @PostMapping("/save")
    public R save() throws Exception {
        orderService.saveOrder();
        return success();
    }
}

UserController

@RestController
@RequestMapping("/user")
@AllArgsConstructor
public class UserController extends BaseController {
    private final UserService userService;

    @PostMapping("/batchSave")
    public R batchSave() throws Exception {
        userService.saveUser();
        return success();
    }
}

启动类

@SpringBootApplication
public class AtomikosApplication {
    public static void main(String[] args) {
        SpringApplication.run(AtomikosApplication.class, args);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/758171.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C语言】扫雷----详解(扩展版)

&#x1f341; 博客主页:江池俊的博客_CSDN博客 &#x1f341; 如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏&#x1f31f; 三连支持一下博主&#x1f49e; ✉️每一次努力都是一次进步&#xff0c;每一次尝试都是一次机会。无论遇到什么困难&#xff0c;…

SpringBoot 使用前缀树实现敏感词过滤

文章目录 前缀树介绍节点初始化前缀树添加敏感词删除敏感词敏感词过滤代码实现 前缀树介绍 前缀树&#xff08;Trie&#xff09;&#xff0c;也称为字典树或前缀字典树&#xff0c;是一种特殊的多叉树数据结构。它用于高效地存储和检索字符串集合。以下是前缀树的常见数据结构…

verilog实现led闪烁

文章目录 verilog实现led闪烁一、介绍二、代码三、仿真代码四、仿真结果五、总结 verilog实现led闪烁 一、介绍 使用verilog实现代码&#xff0c;实现led闪烁&#xff0c;每间隔200ms进行切换led灯 二、代码 module led (input wire clk,input wire rstn,output wire[3:0] …

深入解析向量数据库:定义、原理和应用的全面指南

&#x1f337;&#x1f341; 博主 libin9iOak带您 Go to New World.✨&#x1f341; &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33…

C++智能指针(3/3)

目录 上一节内容 share_ptr用法 share_ptr指针可以用于上一节所说的错误 例子&#xff08;类定义&#xff09; 主函数代码 执行的结果 解释说明 share_ptr 相关构造 空的share指针可以指向其他相同类型的变量来进行托管 可以shared_ptr< T > sp2(new T())也可以s…

RocketMQ高阶使用

RocketMQ高阶使用 1. 流程 2. 探讨功能点 RocketMQ的顺序消息消息投递策略消息保障 3. 顺序消息 3.1 顺序类型 3.1.1 无序消息 无序消息也指普通的消息&#xff0c;Producer 只管发送消息&#xff0c;Consumer 只管接收消息&#xff0c;至于消息和消息之间的顺序并没有保证…

macOS搭建C++开发环境CLion

首先我是一个java开发者&#xff0c;最近对C产生点兴趣。想开发点C程序玩一玩。 下载IDE 本人是java开发者&#xff0c;习惯使用IDEA了。所以也下载jetbrains的C开发工具:clion 下载地址&#xff1a; https://www.jetbrains.com/clion/download/#sectionmac Hello world Fi…

利用ArcGIS Pro制作三维效果图

1、新建工程 打开Arcgispro,新建工程,这里我们要用到的模板为全局场景。 2、添加数据 这里添加的数据需要有一个字段内容是数值的,这个字段也是接下来要进行拉伸的字段。 3、高度拉伸 数据添加进来后,如下图所示,这时图层处于2D图层里。 这时我们点中该图层,回到菜单栏…

微服务系列文章 之SpringBoot之定时任务详解

序言 使用SpringBoot创建定时任务非常简单&#xff0c;目前主要有以下三种创建方式&#xff1a; 一、基于注解(Scheduled)二、基于接口&#xff08;SchedulingConfigurer&#xff09; 前者相信大家都很熟悉&#xff0c;但是实际使用中我们往往想从数据库中读取指定时间来动态…

天眼使用指南--分析平台

#天眼分析平台 提供全面的溯源分析能力&#xff0c;涵盖图中模块。负责存储日志&#xff0c;分为三类&#xff0c;告警日志 告警日志&#xff1a;来自探针和沙箱的告警&#xff0c;探针的告警可以记录双向完整对话&#xff0c;如果网络流量中没有恶意信息&#xff0c;就会储存…

windows Server 2008 R2服务器IIS环境启用TLS 1.2

windows Server 2008 R2服务器IIS环境启用TLS 1.2&#xff0c;配置TLS1.2 分为2步, 添加TLS配置和禁用老的SSL版本&#xff0c;提供两种方法, 选择其中一种就行了&#xff0c;手动设置 打开注册表&#xff0c;运行regedit&#xff0c;找到 HKEY_LOCAL_MACHINE\SYSTEM\CurrentCo…

【hadoop】在linux上设置Hadoop的环境变量

设置Hadoop的环境变量 解压压缩包编辑环境变量激活环境变量 解压压缩包 使用下面命令对hadoop的压缩包进行解压 tar -zxvf hadoop-2.7.3.tar.gz -C ~/training/编辑环境变量 在linux中&#xff0c;~/.bash_profile文件是设置环境变量的文件&#xff0c;我们使用vi进行编辑。…

Verdi之波形展示nWave

6.nWave 6.1 添加波形文件 1.打开nWave界面&#xff0c;具体操作如下&#xff1a; 2.正式添加波形&#xff0c;使用快捷键G或者点击以下图标&#xff0c;选择需要的信号。 也可以在 n Trace中选中信号后&#xff0c;鼠标中键拖拽&#xff0c;或者ctrlw进行添加&#xff1b; 6…

Dreamweaver批量替换所有超链接替换成#

需求&#xff1a;想要将页面所有链接地址替换为#。 方法一 CTRLF打开“查找和替换”&#xff0c;勾选“使用正则表达式” 查找 href"([\s\S]*?)" 替换为 href"#" 副作用&#xff1a;样式表链接地址也会被替换为#&#xff0c;需提前备份。 方法二 也可以查…

CAN总线(二)CAN协议的帧格式(一文看懂CAN的报文结构)

如果只是使用CAN进行CAN通讯,可以粗略看下以下内容,主要了解下数据字段,但了解一下其他内容有助于使用CAN通讯。 一、CAN总线协议规范 CAN报文有两种不同的格式:标准格式和扩展格式,前者的标志符长度是11位,而后者的标志符长度可达29位。 CAN协议的2.0A版本规定CAN控制…

Git -> 创建第一个本地repo

创建一个本地仓库及提交文件 打开Git Bash执行以下命令 // 切换至d盘 cd d: // 新建文件夹 mkdir my_first_local_repo // 切换至新建文件夹 cd my_first_local_repo假设my_first_local_repo文件夹下有以下文件 初始化git仓库 // 在当前文件夹初始化git仓库 git init.gi…

【stable diffusion】保姆级入门课程-Stable diffusion(SD)介绍与安装

目录 0.学前准备 1.什么是AI绘画 2.当前主流的AI绘画工具 3.什么是SD(stable diffusion) 4.SD能做什么 1.文生图 2.图生图 3.AI换模特&#xff0c;背景 5.使用stable diffusion配置要求 6.环境配置与安装 需要注意的地方&#xff1a; 扩展知识&#xff1a; 1.pyth…

Linux学习之环境变量配置文件

配置文件的执行先后顺序如下&#xff1a; /etc/profile $HOME/.bash_profile $HOME/.bashrc /etc/bashrc vim /etc/profile&#xff0c;把echo "/etc/profile"写到第一行&#xff0c;head -n 1 /etc/profile看一下/etc/profile里边第一行内容。 vim $HOME/.bash_pr…

工作:三菱PLC之CC-Link IE Field Network通讯知识及应用

工作&#xff1a;三菱PLC之CC-Link IE Field Network通讯知识及应用 一、理论 1. 简介连接 CC-LINK-IE通讯分别有 CC-Link IE TSN&#xff0c;CC-Link IE Control Network&#xff0c;CC-Link IE Field Network&#xff0c;CC-Link IE Field Network Basic几种形式&#xff…

38译码器

文章目录 38译码器一、38译码器介绍二、项目代码三、仿真代码四、仿真结果 五、总结 38译码器 一、38译码器介绍 38译码器是一种常用的逻辑电路元件&#xff0c;用于将一个3位二进制输入编码转换成8个输出信号之一。它具有多个输入引脚和多个输出引脚。 通常&#xff0c;38译…