多线程事务怎么回滚?

news2024/11/26 22:41:40

项目中用到了多线程去批量处理一些数据,当时想当然认为只要方法上加上@Transactional注解就好了,实际并未达到想要的处理效果。特此去学习了下关于多线程事务回滚相关方案,参考了网上其他资料,这里整理并记录下学习历程。
站在巨人的肩膀上,我们可以看的更远!

多线程事务怎么回滚?

  • 一、准备相关基础方法
    • 1.线程池配置
    • 2.list切分工具类
    • 3.SqlSession工具类
    • 4.员工实体类
    • 5.员工EmployeeMapper
    • 6.员工对应EmployeeMapper.xml
  • 二、业务处理
    • 1.EmployeeService接口
    • 2.测试多线程事务实现类
    • 3.员工Controller
  • 三、方案验证
    • 1.数据库表Employee存储1条原始数据,用于验证数据删除后是否被回滚。![在这里插入图片描述](https://img-blog.csdnimg.cn/cb341ba2f3e146e69dc3e913f1b411f8.png)
    • 2.EmployeeServiceImpl的saveThreadByTransactional方法
    • 3.EmployeeServiceImpl的saveThreadRollBack方法
  • 四、方案总结
    • 1.方案总结
  • 五.项目结构及下载

一、准备相关基础方法

这里以多线程、分批次插入数据库employee表为例子进行演示。

1.线程池配置

/**
 * 线程池配置
 */
@Component
public class ExecutorConfig {
    private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
    private volatile static ExecutorService executorService;

    public static ExecutorService getThreadPool() {
        if (executorService == null){
            synchronized (ExecutorConfig.class){
                if (executorService == null){
                    executorService =  newThreadPool();
                }
            }
        }
        return executorService;
    }

    private static ExecutorService newThreadPool(){
        int queueSize = 1000;
        int corePool = Math.min(10, maxPoolSize);
        return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
    }
    private ExecutorConfig(){}
}

2.list切分工具类

/**
 * list切分工具类
 */
public class ListUtil {
    /**
     * 平均拆分list
     *
     * @param source
     * @param n
     * @param <T>
     * @return
     */
    public static <T> List<List<T>> AverageList(List<T> source, int n) {
        List<List<T>> result = new ArrayList<>();
        int remaider = source.size() % n;
        int number = source.size() / n;
        //偏移量
        int offset = 0;
        for (int i = 0; i < n; i++) {
            List<T> value;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }
}

3.SqlSession工具类

/**
 * SqlSession工具类
 */
@Component
public class SqlContext {
    @Resource
    private SqlSessionTemplate sqlSessionTemplate;

    public SqlSession getSqlSession(){
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        return sqlSessionFactory.openSession();
    }
}

4.员工实体类

/**
 * 员工
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "employee")
public class Employee {
    @TableField(value = "employee_id")
    private Integer employeeId;

    @TableField(value = "employee_name")
    private String employeeName;

    @TableField(value = "age")
    private Integer age;
}

5.员工EmployeeMapper

@Repository
public interface EmployeeMapper extends BaseMapper<Employee> {
   int saveBatchRollBack(List Employee);
}

6.员工对应EmployeeMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.it.mapper.EmployeeMapper">
    <resultMap id="BaseResultMap" type="com.it.entity.Employee">
        <!--@Table `Employee`-->
        <result column="employee_id" jdbcType="INTEGER" property="employee_id" />
        <result column="employee_name" jdbcType="VARCHAR" property="employee_name" />
        <result column="age" jdbcType="INTEGER" property="age" />
    </resultMap>

    <sql id="Base_Column_List">
        employee_id, employee_name, age
    </sql>
    <insert id="saveBatchRollBack">
        insert into
        employee (employee_id,age,employee_name)
        values
        <foreach collection="list" item="item" index="index" separator=",">
            (
            #{item.employeeId},
            #{item.age},
            #{item.employeeName}
            )
        </foreach>
    </insert>
</mapper>

二、业务处理

1.EmployeeService接口

public interface EmployeeService extends IService<Employee> {
    /**
     * 使用@Transactional测试多线程回滚失败
     */
    void saveThreadByTransactional(List<Employee> employeeList);

    /**
     * 使用手动操作事务测试多线程回滚成功
     */
    void saveThreadRollBack(List<Employee> employeeList) throws SQLException;
}

2.测试多线程事务实现类

/**
 * 测试多线程事务
 */
@Service
@Slf4j
public class EmployeeServiceImpl extends ServiceImpl<EmployeeMapper, Employee> implements EmployeeService {

    @Resource
    SqlContext sqlContext;

    /**
     * 多线程环境下Transactional失效场景
     *
     * @param employeeList
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void saveThreadByTransactional(List<Employee> employeeList) {
        try {
            // 先做删除操作,如果子线程出现异常,此操作不会回滚
            this.getBaseMapper().delete(null);
            // 获取线程池
            ExecutorService executorService = ExecutorConfig.getThreadPool();
            // 拆分数据,拆分6份
            List<List<Employee>> lists = ListUtil.AverageList(employeeList, 6);
            // 执行的线程
            Thread[] threadArray = new Thread[lists.size()];
            // 监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
            CountDownLatch countDownLatch = new CountDownLatch(lists.size());
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            for (int i = 0; i < lists.size(); i++) {
                if (i == lists.size() - 1) {
                    // 最后一个atomicBoolean设置为false
                    atomicBoolean.set(false);
                }
                List<Employee> list = lists.get(i);
                threadArray[i] = new Thread(() -> {
                    try {
                        // 最后一个线程抛出异常
                        if (!atomicBoolean.get()) {
                            throw new RuntimeException("最后一个线程添加时抛出异常");
                        }
                        //批量添加,mybatisPlus中自带的batch方法
                        this.saveBatch(list);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            for (int i = 0; i < lists.size(); i++) {
                executorService.execute(threadArray[i]);
            }
            // 当子线程执行完毕时,主线程再往下执行
            countDownLatch.await();
            System.out.println("employee列表添加完成");
        } catch (Exception e) {
            log.info("error", e);
            throw new RuntimeException("employee列表添加过程出现异常");
        }
    }

    /**
     * 使用sqlSession控制手动提交事务
     *
     * @param employeeList
     */
    @Override
    public void saveThreadRollBack(List<Employee> employeeList) throws SQLException {
        {
        // 获取数据库连接,获取会话(内部自有事务)
        SqlSession sqlSession = sqlContext.getSqlSession();
        Connection connection = sqlSession.getConnection();
        try {
            // 设置手动提交
            connection.setAutoCommit(false);
            //获取mapper
            EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
            //先做删除操作
            employeeMapper.delete(null);
            //获取执行器
            ExecutorService service = ExecutorConfig.getThreadPool();
            List<Callable<Integer>> callableList = new ArrayList<>();
            //拆分list
            List<List<Employee>> lists = ListUtil.AverageList(employeeList, 6);
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            for (int i = 0; i < lists.size(); i++) {
                if (i == lists.size() - 1) {
                    atomicBoolean.set(false);
                }
                List<Employee> list = lists.get(i);
                //使用返回结果的callable去执行,
                Callable<Integer> callable = () -> {
                    //让最后一个线程抛出异常
                    if (!atomicBoolean.get()) {
                        throw new Exception("出现异常");
                    }
                    return employeeMapper.saveBatchRollBack(list);
                };
                callableList.add(callable);
            }
            //执行子线程
            List<Future<Integer>> futures = service.invokeAll(callableList);
            for (Future<Integer> future : futures) {
                //如果有一个执行不成功,则全部回滚
                if (future.get() <= 0) {
                    connection.rollback();
                    return;
                }
            }
            connection.commit();
            System.out.println("添加完毕");
        } catch (Exception e) {
            connection.rollback();
            log.info("error", e);
        } finally {
            connection.close();
        }
    }
}

3.员工Controller

@RestController
@RequestMapping(value = "/employee")
public class EmployeeController {

    @Autowired
    EmployeeService employeeService;

    @PostMapping("/saveThreadByTransactional")
    public ResponseEntity saveThreadByTransactional() {
        // 模拟需要插入12名员工到数据库
        List<Employee> list = IntStream.range(0, 12)
                .mapToObj(i -> {
                    Employee employee = new Employee();
                    employee.setEmployeeId(i);
                    employee.setEmployeeName("三丰" + i);
                    employee.setAge(i + 100);
                    return employee;
                })
                .collect(Collectors.toList());
        employeeService.saveThreadByTransactional(list);
        return new ResponseEntity<>(HttpStatus.OK);
    }

    @PostMapping("/saveThreadRollBack")
    public ResponseEntity saveThreadRollBack() throws SQLException {
        // 模拟需要插入12名员工到数据库
        List<Employee> list = IntStream.range(0, 12)
                .mapToObj(i -> {
                    Employee employee = new Employee();
                    employee.setEmployeeId(i);
                    employee.setEmployeeName("三丰" + i);
                    employee.setAge(i + 100);
                    return employee;
                })
                .collect(Collectors.toList());
        employeeService.saveThreadRollBack(list);
        return new ResponseEntity<>(HttpStatus.OK);
    }
}

三、方案验证

1.数据库表Employee存储1条原始数据,用于验证数据删除后是否被回滚。在这里插入图片描述

2.EmployeeServiceImpl的saveThreadByTransactional方法

该方法通过使用@Transactional注解尝试处理多线程事务回滚。
利用postman测试saveThreadByTransactional接口
在这里插入图片描述
发现控制台显示我们自定义的线程报错
在这里插入图片描述
在这里插入图片描述
查询数据库Employee表,发现代码中this.getBaseMapper().delete(null);
可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚(数据库中表数据也已经被删除完成),则证明@Transactional注解并不能在多线程下进行事务回滚!
在这里插入图片描述
在这里插入图片描述

3.EmployeeServiceImpl的saveThreadRollBack方法

该方法通过使用sqlSession控制,手动提交事务,在多线程下进行事务回滚。
利用postman测试saveThreadRollBack接口。
在这里插入图片描述
发现控制台显示我们自定义的线程报错。
在这里插入图片描述
在这里插入图片描述
查询数据库Employee表,发现数据并未被删除,证明多线程执行过程中失败了,事务被回滚了。
在这里插入图片描述

四、方案总结

1.方案总结

在Spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效。
如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。
通过使用sqlSession控制手动提交事务,可以达到主线程和子线程数据事务回滚。

五.项目结构及下载

在这里插入图片描述
源码地址springboot-cacheable,创作不易,欢迎star哦~

参考资料
支付宝一面:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!
多线程事务怎么回滚?
多线程如何实现事务回滚?一招帮你搞定!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/872160.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在 IntelliJ IDEA 中使用 Docker 开发指南

目录 一、IDEA安装Docker插件 二、IDEA连接Docker 1、Docker for Windows 连接 2、SSH 连接 3、Connection successful 连接成功 三、查看Docker面板 四、使用插件生成镜像 一、IDEA安装Docker插件 打开 IntelliJ IDEA&#xff0c;点击菜单栏中的 "File" -&g…

山西电力市场日前价格预测【2023-08-14】

日前价格预测 预测明日&#xff08;2023-08-14&#xff09;山西电力市场全天平均日前电价为322.03元/MWh。其中&#xff0c;最高日前电价为366.98元/MWh&#xff0c;预计出现在19: 30。最低日前电价为286.57元/MWh&#xff0c;预计出现在13: 15。 价差方向预测 1&#xff1a; 实…

腾讯出了一个新聊天软件M8

众所周知&#xff0c;如今国内互联网&#xff0c;微信和QQ无疑是社交领域的霸主。 下载:https://www.123pan.com/s/BP5A-RW4xh.html 不过&#xff0c;它们也有各自局限性&#xff0c;比如难以结识新朋友、功能过于复杂等。 这让用户产生厌倦&#xff0c;再加上近几年AI、元宇…

标记垃圾,有三种色彩:四千长文带你深入了解三色标记算法

&#x1f52d; 嗨&#xff0c;您好 &#x1f44b; 我是 vnjohn&#xff0c;在互联网企业担任 Java 开发&#xff0c;CSDN 优质创作者 &#x1f4d6; 推荐专栏&#xff1a;Spring、MySQL、Nacos、Java&#xff0c;后续其他专栏会持续优化更新迭代 &#x1f332;文章所在专栏&…

SCSS的基本用法

1、声明变量 $ 声明变量的符号 $ 下面这张图左半部分是scss的语法&#xff0c;右半部分是编译后的css。&#xff08;整篇文章皆是如此&#xff09; 2、默认变量 !default sass 的默认变量仅需要在值后面加上 !default 即可。 如果分配给变量的值后面添加了 !default 标志…

jmeter通过BeanShell对接口参数进行MD5和HmacSHA256加密【杭州多测师_王sir】

一、在eclipse里面编写MD5加密算法 package com.Base64;import java.security.MessageDigest; import java.security.NoSuchAlgorithmException;public class Md5Utils {public static String md5(String sourceStr) {String result "";try {MessageDigest md Mess…

教你10分钟内学习如何CSS 设置网页打印时的样式

本文将教您开始为要打印的页面编写CSS所需要的一切提供帮助。 media 规则 If you’ve done any responsive design, you’ll already know about the media rule. As well as different screen sizes, media also lets you target “print” media. Here’s an example: 如果…

【CTF-web】修改请求头(XFF)

题目链接&#xff1a;https://ctf.bugku.com/challenges/detail/id/79.html 随意输入后可以看到需要本地管理员登录&#xff0c;得知这是一道需要修改XFF头的题。 XFF即X-Forwarded-For&#xff0c;该请求标头是一个事实上的用于标识通过代理服务器连接到 web 服务器的客户端的…

腾讯轻量云服务器搭建Node.js开发环境

1.购买腾讯云轻量应用服务器&#xff0c;登录 轻量应用服务器控制台&#xff0c;在 服务器 页面单击 新建。安装运行环境&#xff0c;选择为应用模板 > Web 开发场景 > Node.js 应用模板。 Node.js 是一个事件驱动 I/O 服务端 JavaScript 环境&#xff0c;基于 Chrome V…

分布式文件存储系统-FastDFS

前言&#xff1a;FastDFS 是一个分布式文件存储系统&#xff0c;主要用于存储和管理大规模的文件数据&#xff0c;如图片、视频、文档等&#xff0c;是淘宝前架构师为了存储图片用C语言开发出来的系统。 服务端有两个组件 Tracker Server 与 Storage Server &#xff0c;对应两…

​API网关类型与区别​

什么是API网关&#xff1f; 在现代软件架构中&#xff0c;API&#xff08;应用程序编程接口&#xff09;网关起着重要的作用。它是一个中间层&#xff0c;用于管理和控制应用程序之间的通信。API网关可以提供一些关键功能&#xff0c;如流量控制&#xff0c;安全认证&#xff…

java项目打包运行报异常:Demo-1.0-SNAPSHOT.jar中没有主清单属性

检查后发现pom文件中有错误&#xff0c;需要添加build内容才能恢复正常。 添加下面文件后再次启动恢复正常。 <build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactI…

Docker安装nacos v2.1.1

目录 前言安装nacos安装步骤1&#xff1a;准备1. 安装docker2. 搜索可以使用的镜像。3. 选择合适的redis镜像。3. 也可从docker hub上搜索镜像。 安装步骤2&#xff1a;拉取镜像拉取镜像查看已拉取的镜像 安装步骤3&#xff1a;创建容器创建容器方式1&#xff1a;快速创建容器创…

C语言实现扫雷游戏(附完整代码)

大家好&#xff0c;欢迎来到Mr.kanglong的CSDN博客&#xff0c;这篇博客来讨论一下如何使用C语言实现扫雷游戏&#xff0c;其实扫雷游戏和我之前写的三子棋游戏大体实现框架一样&#xff0c;只是逻辑有所不同。 目录 扫雷游戏介绍 游戏效果 实现代码 game.c game.h test.c 扫…

React学习之路 - 上传代码到GitCode

Git 全局设置 git config --global user.name "AnyaPapa" git config --global user.email "fangtaihongqq.com" 添加SSH密钥 Mac终端输入命令 cd existing_folder git init git remote add origin gitgitcode.net:Java_1710/test.git git add . git com…

msvcr120.dll丢失怎样修复?总结三个dll修复方法

当我遇到msvcr120.dll丢失的问题时&#xff0c;我感到有些困惑和焦虑。因为这个问题会导致我无法运行依赖这个文件的应用程序。msvcr120.dll是运行时库文件的一部分&#xff0c;为应用程序提供了必要的运行时支持。它的丢失会导致应用程序无法正常运行&#xff0c;这让我意识到…

【单片机毕业设计3-基于stm32c8t6的智能家居系统】

【单片机毕业设计3-基于stm32c8t6的智能家居系统】 前言一、功能介绍二、硬件部分三、软件部分总结 前言 &#x1f525;这里是小殷学长&#xff0c;单片机毕业设计篇3 基于stm32的智能家居控制系统 &#x1f9ff;创作不易&#xff0c;拒绝白嫖&#xff08;有需可点击最后链接&a…

[C++] 一篇带你了解C++中动态内存管理,new让大家都有对象

目录 1、C/C内存分布 2.、C语言中动态内存管理方式&#xff1a;malloc、calloc、realloc 3、C内存管理方式 3.1 new/delete操作内置类型 3.2 new和delete操作自定义类型 3.3 malloc与new的异常处理机制 4、operator new与operator delete函数 4.1 operator new与operat…

寻找适合你的在线客服系统?这里有8款推荐(2023年8月更新)

近年来&#xff0c;随着网站交互性的提升&#xff0c;越来越多的企业开始关注并采用在线客服系统&#xff0c;以便更好地与访客互动和沟通。尤其对于外贸网站等需要频繁沟通的行业来说&#xff0c;选择一个合适的在线客服系统显得尤为重要。在这篇文章中&#xff0c;我们将为您…

小白带你学习linux的Redis3.2集群(三十三)

目录 一、Redis主从复制 1、概念 2、作用 3、缺点 4、流程 5、搭建 6、验证 二、Reids哨兵模式 1、概念 2、作用 3、缺点 4、结构 5、搭建 6、验证 三、Redis集群 1、概述 2、原理 3、架构细节 4、选举过程 四、搭建 1、第一步现在外部使用finalshell 9.9…