多线程事务怎么回滚

news2024/11/16 15:35:08

背景介绍


1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。

2,在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

3,下面用一个简单示例演示多线程事务。

公用的类和方法


/**
 * 平均拆分list方法.
 * @param source
 * @param n
 * @param <T>
 * @return
 */
public static <T> List<List<T>> averageAssign(List<T> source,int n){
    List<List<T>> result=new ArrayList<List<T>>();
    int remaider=source.size()%n;
    int number=source.size()/n;
    int offset=0;//偏移量
    for(int i=0;i<n;i++){
        List<T> value=null;
        if(remaider>0){
            value=source.subList(i*number+offset, (i+1)*number+offset+1);
            remaider--;
            offset++;
        }else{
            value=source.subList(i*number+offset, (i+1)*number+offset);
        }
        result.add(value);
    }
    return result;
}
/**  线程池配置
 * @version V1.0
 */
public class ExecutorConfig {
    private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
    private volatile static ExecutorService executorService;
    public static ExecutorService getThreadPool() {
        if (executorService == null){
            synchronized (ExecutorConfig.class){
                if (executorService == null){
                    executorService =  newThreadPool();
                }
            }
        }
        return executorService;
    }

    private static  ExecutorService newThreadPool(){
        int queueSize = 500;
        int corePool = Math.min(5, maxPoolSize);
        return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
    }
    private ExecutorConfig(){}
}
/** 获取sqlSession
 * @author 86182
 * @version V1.0
 */
@Component
public class SqlContext {
    @Resource
    private SqlSessionTemplate sqlSessionTemplate;

    public SqlSession getSqlSession(){
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        return sqlSessionFactory.openSession();
    }
}

示例事务不成功操作


 /**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {
    try {
        //先做删除操作,如果子线程出现异常,此操作不会回滚
        this.getBaseMapper().delete(null);
        //获取线程池
        ExecutorService service = ExecutorConfig.getThreadPool();
        //拆分数据,拆分5份
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        //执行的线程
        Thread []threadArray = new Thread[lists.size()];
        //监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
        CountDownLatch countDownLatch = new CountDownLatch(lists.size());
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (int i =0;i<lists.size();i++){
            if (i==lists.size()-1){
                atomicBoolean.set(false);
            }
            List<EmployeeDO> list  = lists.get(i);
            threadArray[i] =  new Thread(() -> {
                try {
                 //最后一个线程抛出异常
                    if (!atomicBoolean.get()){
                        throw new ServiceException("001","出现异常");
                    }
                    //批量添加,mybatisPlus中自带的batch方法
                    this.saveBatch(list);
                }finally {
                    countDownLatch.countDown();
                }

            });
        }
        for (int i = 0; i <lists.size(); i++){
            service.execute(threadArray[i]);
        }
        //当子线程执行完毕时,主线程再往下执行
        countDownLatch.await();
        System.out.println("添加完毕");
    }catch (Exception e){
        log.info("error",e);
        throw new ServiceException("002","出现异常");
    }finally {
         connection.close();
     }
}

数据库中存在一条数据:

Spring Boot 基础就不介绍了,推荐下这个实战教程:https://github.com/javastacks/spring-boot-best-practice

//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {

    @Resource
    private EmployeeBO employeeBO;

    /**
     *   测试多线程事务.
     * @throws InterruptedException
     */
    @Test
    public  void MoreThreadTest2() throws InterruptedException {
        int size = 10;
        List<EmployeeDO> employeeDOList = new ArrayList<>(size);
        for (int i = 0; i<size;i++){
            EmployeeDO employeeDO = new EmployeeDO();
            employeeDO.setEmployeeName("lol"+i);
            employeeDO.setAge(18);
            employeeDO.setGender(1);
            employeeDO.setIdNumber(i+"XX");
            employeeDO.setCreatTime(Calendar.getInstance().getTime());
            employeeDOList.add(employeeDO);
        }
        try {
            employeeBO.saveThread(employeeDOList);
            System.out.println("添加成功");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

测试结果:

可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。

使用sqlSession控制手动提交事务

 @Resource
  SqlContext sqlContext;
 /**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
    // 获取数据库连接,获取会话(内部自有事务)
    SqlSession sqlSession = sqlContext.getSqlSession();
    Connection connection = sqlSession.getConnection();
    try {
        // 设置手动提交
        connection.setAutoCommit(false);
        //获取mapper
        EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
        //先做删除操作
        employeeMapper.delete(null);
        //获取执行器
        ExecutorService service = ExecutorConfig.getThreadPool();
        List<Callable<Integer>> callableList  = new ArrayList<>();
        //拆分list
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (int i =0;i<lists.size();i++){
            if (i==lists.size()-1){
                atomicBoolean.set(false);
            }
            List<EmployeeDO> list  = lists.get(i);
            //使用返回结果的callable去执行,
            Callable<Integer> callable = () -> {
                //让最后一个线程抛出异常
                if (!atomicBoolean.get()){
                    throw new ServiceException("001","出现异常");
                }
              return employeeMapper.saveBatch(list);
            };
            callableList.add(callable);
        }
        //执行子线程
       List<Future<Integer>> futures = service.invokeAll(callableList);
        for (Future<Integer> future:futures) {
        //如果有一个执行不成功,则全部回滚
            if (future.get()<=0){
                connection.rollback();
                 return;
            }
        }
        connection.commit();
        System.out.println("添加完毕");
    }catch (Exception e){
        connection.rollback();
        log.info("error",e);
        throw new ServiceException("002","出现异常");
    }finally {
         connection.close();
     }
}
// sql
<insert id="saveBatch" parameterType="List">
 INSERT INTO
 employee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)
 values
     <foreach collection="list" item="item" index="index" separator=",">
     (
     #{item.employeeId},
     #{item.age},
     #{item.employeeName},
     #{item.birthDate},
     #{item.gender},
     #{item.idNumber},
     #{item.creatTime},
     #{item.updateTime},
     #{item.status}
         )
     </foreach>
 </insert>

数据库中一条数据:

测试结果:抛出异常,

删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。

另外,如果你近期准备面试跳槽,建议在Java面试库小程序在线刷题,涵盖 2000+ 道 Java 面试题,几乎覆盖了所有主流技术面试题。

成功操作示例:

 @Resource
SqlContext sqlContext;
/**
 * 测试多线程事务.
 * @param employeeDOList
 */
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
    // 获取数据库连接,获取会话(内部自有事务)
    SqlSession sqlSession = sqlContext.getSqlSession();
    Connection connection = sqlSession.getConnection();
    try {
        // 设置手动提交
        connection.setAutoCommit(false);
        EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
        //先做删除操作
        employeeMapper.delete(null);
        ExecutorService service = ExecutorConfig.getThreadPool();
        List<Callable<Integer>> callableList  = new ArrayList<>();
        List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
        for (int i =0;i<lists.size();i++){
            List<EmployeeDO> list  = lists.get(i);
            Callable<Integer> callable = () -> employeeMapper.saveBatch(list);
            callableList.add(callable);
        }
        //执行子线程
       List<Future<Integer>> futures = service.invokeAll(callableList);
        for (Future<Integer> future:futures) {
            if (future.get()<=0){
                connection.rollback();
                 return;
            }
        }
        connection.commit();
        System.out.println("添加完毕");
    }catch (Exception e){
        connection.rollback();
        log.info("error",e);
        throw new ServiceException("002","出现异常");
       // throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);
    }
}

测试结果:

数据库中数据:

删除的删除了,添加的添加成功了,测试成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/350454.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

返回数组的上三角和下三角np.triu()和np.tril()

【小白从小学Python、C、Java】 【计算机等级考试500强双证书】 【Python-数据分析】 返回数组的上三角和下三角 np.triu()和np.tril() 选择题 以下说法错误的是? import numpy as np anp.array([[1,2,3],[4,5,6],[7,8,9]]) print("【显示】a&#xff1a;\n",a) pri…

一、Python时间序列小波分析——实例分析

小波分析是在Fourier分析基础上发展起来的一种新的时频局部化分析方法。小波分析的基本思想是用一簇小波函数系来表示或逼近某一信号或函数。 小波分析原理涉及到傅里叶变换&#xff0c;并有多种小波变换&#xff0c;有点点小复杂。但是不会原理没关系&#xff0c;只要会应用并…

单臂路由配置

单臂路由实现不同VLAN间通信 链路类型 交换机连接主机的端口为access链路 交换机连接路由器的端口为Trunk链路 子接口 路由器的物理接口可以被划分成多个逻辑接口 每个子接口对应一个VLAN网段的网关 我们需要知道的是单臂路由和虚拟机软件原理比较相似。他们都是依托于物理设备…

按键70秒,Root轻松得:Linux惊现高危漏洞

导读E安全11月16日讯 Linux被发现高危漏洞(CVE-2016-4484)&#xff0c;攻击者可以通过持续按下Enter键70秒钟来获取 root initramfs shell&#xff0c;进而破坏Linux boxes。漏洞存在于Linux流行变体中的统一密钥设置(LUKS)。通过访问shell&#xff0c;攻击者可以解密Linux机…

标签管理系统

电子墨水屏最大的不便利之处在于它是单色屏&#xff0c;显示速度不够快&#xff0c;因为显示内容的单色性&#xff0c;要求需要显示的数据处理好后&#xff0c;以单色位图的方式把数据信息发送到屏幕上才可以正常显示。 我们把这个制作显示内容的方式叫做模版&#xff0c;有了…

Zynq 裸机 PS + PL 双网口实现之 lwip 库文件修改

基于 xilinx vivado 2017.4 库文件 lwip141_v2_0 的修改&#xff1a; 添加对 PHY 芯片 ksz9031 的支持&#xff1b; 添加 SDK 中 LWIP 参数设置对话框 emio_options 选项&#xff1b; 添加 XPAR_GMII2RGMIICON_0N_ETH0_ADDR 和 XPAR_GMII2RGMIICON_0N_ETH1_ADDR 宏配置&#…

布隆过滤器和布谷鸟过滤器详解

今天和大家分享下布隆过滤器和布谷鸟过滤器 一.布隆过滤器 1.简单介绍 布隆过滤器是用于检索一个元素是否在一个集合中的算法&#xff0c;是一种用空间换时间的查询算法。 2.实现原理 布隆过滤器的存储结构是一个bitmap结构&#xff0c;初始值都是0&#xff0c;如下图所示&am…

内核模块(传参和依赖)

目录 一、模块传参 二、模块依赖 三、内核空间和用户空间 四、执行流 五、模块编程与应用编程的比较 六、内核接口头文件查询 七、小作业 一、模块传参 module_param(name,type,perm);//将指定的全局变量设置成模块参数 name:全局变量名 type&#xff1a; 使用符号 …

MFC消息机制

1.消息映射消息映射是一个将消息和成员函数相互关联的表。比如&#xff0c;框架窗口接收到一个鼠标左击消息&#xff0c;MFC将搜索该窗口的消息映射&#xff0c;如果存在一个处理WM_LBUTTTONDOWN消息的处理程序&#xff0c;然后就调用OnButtonDown。2.消息映射机制2.1 声明宏 写…

教务查询系统简介

教务查询系统简介 项目核心代码展示 service层如下&#xff1a; Teacher老师Service层&#xff1a; public interface TeacherService {//根据id更新老师信息void updateById(Integer id, TeacherCustom teacherCustom) throws Exception;//根据id删除老师信息void removeB…

SheetJS的通用电子表格对象简介和使用

简言 “通用电子表格格式”&#xff08;CSF&#xff09;是SheetJS使用的对象模型。 例如使用xlsx插件时&#xff0c;获得的excel文件数据对象就是依据这个模型设计的。 SheetJs通用电子表格对象 介绍 cdn导入&#xff1a; <script lang"javascript" src"ht…

uprobe 实战

观测数据源 目前按照我的理解&#xff0c;和trace相关的常用数据源–探针 大致分为四类。 内核 Trace point kprobe 用户程序 USDT uprobe 在用户程序中&#xff0c;USDT是所谓的静态Tracepoint。和内核代码中的Trace point类似。实现方式是在代码开发时&#xff0c;使用USDT…

Visual Studio开启clang-tidy代码检查

在CLion中有针对C的静态代码检查工具clang-tidy&#xff0c;感觉非常好用&#xff0c;能养成好的编码习惯&#xff0c;后来写Qt转入了VS平台&#xff0c;想要继续使用功能一致的clang-tidy体验&#xff0c;所以研究出来在VS中开启clang-tidy的方法。 版本&#xff1a;Visual S…

XML 基础知识 XXE 漏洞原理解析及实验(基础篇)

XML 介绍 XXE全称XML外部实体注入&#xff0c;所以在介绍XXE漏洞之前&#xff0c;先来说一说什么是XML以及为什么使用XML进而再介绍一下XML的结构。 XML全称 可拓展标记语言&#xff0c;与HTML相互配合后&#xff1a; HMTL用来显示数据 HTML的焦点在于数据的外观&#xff08;…

双网卡(有线和wifi)同时连接内网和外网

双网卡&#xff08;有线和wifi&#xff09;同时连接内网和外网 Win10技巧&#xff1a;如何修改有线/WiFi网络优先级&#xff1a;https://www.ithome.com/html/win10/253612.htm双网卡实现两个网络的自由访问&#xff1a;https://blog.51cto.com/ghostlan/1299090Linux服务器安…

【Linux】网络编程 - 基础概念

目录 一.OSI七层模型vsTCP/IP五层模型 1.一些周边概念 2.OSI七层模型 3.TCP/IP五层模型 4.网络传输流程图 二.什么是MAC地址 三.什么是IP/IP地址 1.什么是IP 2.什么是IP地址 四.什么是端口号 一.OSI七层模型vsTCP/IP五层模型 1.一些周边概念 局域网vs广域网 网络互…

LeetCode——2341. 数组能形成多少数对

一、题目 给你一个下标从 0 开始的整数数组 nums 。在一步操作中&#xff0c;你可以执行以下步骤&#xff1a; 从 nums 选出 两个 相等的 整数 从 nums 中移除这两个整数&#xff0c;形成一个 数对 请你在 nums 上多次执行此操作直到无法继续执行。 返回一个下标从 0 开始、…

亚马逊云科技重磅发布《亚马逊云科技汽车行业解决方案》

当今&#xff0c;随着万物智联、云计算等领域的高速发展&#xff0c;创新智能网联汽车和车路协同技术正在成为车企加速发展的关键途径&#xff0c;推动着汽车产品从出行代步工具向着“超级智能移动终端”快速转变。挑战无处不在&#xff0c;如何抢先预判&#xff1f;随着近年来…

31-Golang中的二维数组

二维数组的使用方式 使用方式一&#xff1a;先声明/定义再赋值 1.语法&#xff1a;var数组名 [大小] [大小]类型2.比如&#xff1a;var arr [2] [3]int,再赋值 package main import ("fmt" )func main() {//定义/声明数组var arr [4][6]int//赋初值arr[1][2] 1ar…

volatile,内存屏障

volatile的特性可见性: 对于其他线程是可见,假设线程1修改了volatile修饰的变量,那么线程2是可见的,并且是线程安全的重排序: 由于CPU执行的时候,指令在后面的会先执行,在指令层级的时候我们晓得volatile的特性后,我们就要去volatile是如何实现的,这个很重要&#xff01;&#…