使用多线程导入大量数据,多线程事物控制

news2025/1/20 5:46:24

本文主要讲述通过Spring Boot + MyBatis做大数据量数据插入的案例和结果

不分批次直接梭哈

MyBatis直接一次性批量插入30万条,代码如下:

@Test
public void testBatchInsertUser() throws IOException {
    InputStream resourceAsStream =
            Resources.getResourceAsStream("sqlMapConfig.xml");
    SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
    SqlSession session = sqlSessionFactory.openSession();
    System.out.println("===== 开始插入数据 =====");
    long startTime = System.currentTimeMillis();
    try {
        List<TestImport> entityList = new ArrayList<>();
      	for(int i =0; i< 300000; i++) {
      		TestImport testImport = new TestImport();
      		testImport.setName(RandomUtils.getLetters(6));
      		testImport.setPhone(RandomUtils.getNumbers(13));
      		entityList.add(testImport);
      	}
        session.insert("saveBatch", entityList); // 最后插入数据
        session.commit();

        long spendTime = System.currentTimeMillis()-startTime;
        System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
    } finally {
        session.close();
    }
}

可以看到控制台输出:

超出最大数据包限制了,可以通过调整max_allowed_packet限制来提高可以传输的内容,不过由于30万条数据超出太多,这个不可取,梭哈看来是不行了 既然梭哈不行那我们就一条一条循环着插入行不行呢,这种方法我没有自己亲自尝试30w的数据,但是我平时尝试了一千多条,耗时都要分钟级别的,这种一条一条循环着插入不建议直接使用。

分批插入

1. MyBatis手写Sql分批插入

MyBatis直接手写batchInsert方法的Sql我就不写了,大家可以自己写

@Test
public void testBatchInsertUser() throws IOException {
    InputStream resourceAsStream =          Resources.getResourceAsStream("sqlMapConfig.xml");
    SqlSessionFactory sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
    SqlSession session = sqlSessionFactory.openSession();
    System.out.println("===== 开始插入数据 =====");
    long startTime = System.currentTimeMillis();
    int waitTime = 10;
    try {
        List<User> entityList = new ArrayList<>();
        for (int i = 1; i <= 300000; i++) {
            TestImport testImport = new TestImport();
      		testImport.setName(RandomUtils.getLetters(6));
      		testImport.setPhone(RandomUtils.getNumbers(13));
      		entityList.add(testImport);
            if (i % 1000 == 0) {
                session.insert("batchInsert", entityList);
                // 每 1000 条数据提交一次事务
                session.commit();
                entityList.clear();

                // 等待一段时间
                Thread.sleep(waitTime * 1000);
            }
        }
        // 最后插入剩余的数据
        if(!CollectionUtils.isEmpty(entityList)) {
            session.insert("batchInsert", entityList);
            session.commit();
        }

        long spendTime = System.currentTimeMillis()-startTime;
        System.out.println("成功插入 30 万条数据,耗时:"+spendTime+"毫秒");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        session.close();
    }
}

 2. MyBatis- Plus分批插入

这种方法可以详见我之前写的文章

解决Mybatis-plus 批量插入太慢的问题,提升插入性能_Crystalqy的博客-CSDN博客

使用多线程分批插入,并且控制事物

使用多线程分批插入,是基于MyBatis- Plus分批插入做了一个并行处理的优化,使用多线程同时处理数据的插入,这里重点是事物的控制

我的事物要求是多个线程同时插入,只有所有的线程都执行成功了,主线程最后才能提交,只要有一个线程失败了,所有的事物都要回滚,为了达到这种目的,可以使用TransactionTemplateCountDownLatch来实现这个目标。

1. TransactionTemplate + CountDownLatch

以下是一个示例代码,演示了如何在多个线程中同时插入不同的对象,然后在所有线程执行完成后进行事务的提交或回滚:

@Service
public class ConcurrentInsertService {

    @Autowired
    private TransactionTemplate transactionTemplate;

    @Autowired
    private YourRepository repository;

    public void performConcurrentInsert() throws InterruptedException {
        int threadCount = 3;
        CountDownLatch latch = new CountDownLatch(threadCount);

        AtomicBoolean allSuccessful = new AtomicBoolean(true); // 用于记录所有线程是否成功

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    transactionTemplate.execute(status -> {
                        try {
                            // 执行插入操作,例如:
                            YourObject object = new YourObject();
                            // 设置对象属性
                            repository.save(object);
                        } catch (Exception e) {
                            status.setRollbackOnly();
                            allSuccessful.set(false); // 标记为失败
                        }
                        return null;
                    });
                } finally {
                    latch.countDown();
                }
            }).start();
        }

        latch.await(); // 等待所有线程完成

        if (allSuccessful.get()) {
            // 所有线程都成功,进行事务提交操作
            transactionTemplate.execute(status -> {
                // 执行一些整体提交操作
                return null;
            });
        } else {
            // 至少一个线程失败,进行事务回滚操作
            transactionTemplate.execute(status -> {
                // 执行一些整体回滚操作
                return null;
            });
        }
    }
}

在这个示例中,我使用了AtomicBoolean来标记是否所有线程都成功执行。在等待所有线程完成后,根据allSuccessful的值,你可以决定是提交事务还是回滚事务。请根据你的实际业务需求,适当调整示例代码中的提交和回滚操作。

请注意,transactionTemplate.execute 方法会在事务作用域内执行给定的代码块,如果在代码块中抛出异常,会导致事务回滚。这就是为什么在上面的代码中,我使用了setRollbackOnly 来标记失败的事务。整体的事务提交或回滚操作也是在自己的事务作用域内执行的。

2. TransactionManager + TransactionStatus

这个是本文重点要介绍的方法

下面是我本次的代码:

API层的代码:

 @GetMapping(value = "/testImport")
    public void testImport() throws InterruptedException {
    	List<TestImport> entityList = new ArrayList<>();
      	for(int i =0; i< 10000; i++) {
      		TestImport testImport = new TestImport();
      		testImport.setName(RandomUtils.getLetters(6));
      		testImport.setPhone(RandomUtils.getNumbers(13));
      		entityList.add(testImport);
      	}
    	importService.performConcurrentImport(entityList);
    }

Service成核心代码:核心代码的注释都已经写的很清楚了


@Service
public class TestImportService {
	
	
	@Autowired
    private PlatformTransactionManager transactionManager; // 事务管理器
	
	@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
    public void performConcurrentImport(List<TestImport> entityList) throws InterruptedException {
        int threadCount = 3;
        if (entityList.size() < 1000) {
        	threadCount = 1;
		} else if (entityList.size() < 5000) {
			threadCount = 2;
		}
        
        // 创建多线程处理任务
        ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
        AtomicBoolean allsuccessfull = new AtomicBoolean(true); // 用于记录所有线程是否成功
        TestImportService importService = SpringContextUtil.getApplicationContext().getBean(TestImportService.class);
        
        List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>()); 
        try {
        	List<FutureTask<TransactionStatus>> tasks = Lists.newArrayList();
        	List<List<TestImport>> subList = ListUtils.subWithNum(entityList, threadCount);
        	for (int i = 0; i < threadCount; i++) {
        		List<TestImport> insertList = subList.get(i);
        		FutureTask<TransactionStatus> importTask = new FutureTask<>(() -> importService.importTransaction(transactionManager, insertList));
        		tasks.add(importTask);
        	}
        	for (FutureTask<TransactionStatus> futureTask : tasks) {
        		threadPool.submit(futureTask);
			}
        	
        	try {
        		for (FutureTask<TransactionStatus> futureTask : tasks) {
        			transactionStatuses.add(futureTask.get());
    			}
			} catch (Exception e) {
				e.printStackTrace();
				allsuccessfull.set(false);
			} finally {
				
			}
		} catch (Exception e) {
			e.printStackTrace();
			allsuccessfull.set(false);
		}
        
        if (!transactionStatuses.isEmpty()) {
            if (allsuccessfull.get()) { //全部执行成功,提交事物
            	transactionStatuses.forEach(s -> transactionManager.commit(s));
                
            } else {// 只要有一个线程执行失败,就回滚事物
            	transactionStatuses.forEach(s -> transactionManager.rollback(s));
            }
        }
        System.out.println("主线程完成");
    }
	
	/**
	 * 使用这种方式将事务状态都放在同一个事务里面
	 * @param transactionManager
	 * @param entityList
	 * @return
	 */
	@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
	public TransactionStatus importTransaction(PlatformTransactionManager transactionManager, List<TestImport> entityList ) {
	    DefaultTransactionDefinition definition = new DefaultTransactionDefinition();// 事务定义接口:事务的一些基础信息,如超时时间、隔离级别、传播属性等
	    definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物传播属性,开启新事务,这样会比较安全些。
	    TransactionStatus status = transactionManager.getTransaction(definition); // 获得事务状态,事务的一些状态信息,如是否是一个新的事务、是否已被标记为回滚
	    
    	super.saveBatch(0, entityList);
    	System.out.println("线程名称:"+ Thread.currentThread().getName());
    	return status;
	}
}

3. TransactionManager和TransactionTemplate的简单介绍

当在Spring应用程序中处理数据库事务时,`TransactionManager`和`TransactionTemplate`是两个重要的组件。它们都用于管理和控制事务的开启、提交、回滚等操作。

1. **TransactionManager:**
   
   `TransactionManager`是Spring框架中用于管理事务的核心接口。它提供了在应用程序中执行事务相关操作的标准方法。具体来说,`TransactionManager`有助于以下几个方面:
   
   - **事务的开启和提交:** `TransactionManager`负责事务的开启和提交操作。当你在一个方法上使用`@Transactional`注解时,Spring会通过`TransactionManager`自动管理事务的开启和提交。
   
   - **事务的回滚:** 如果在事务处理过程中发生异常或出现错误,`TransactionManager`会将事务标记为回滚状态,确保事务不会提交,并将所有的更改都回滚到事务开始之前的状态。
   
   - **多数据源管理:** 对于多数据源的情况,不同数据源可以有不同的`TransactionManager`,以确保每个数据源的事务行为得以正确管理。
   
   `PlatformTransactionManager`是`TransactionManager`的一个常见实现,用于管理不同类型的事务(例如,JDBC、JPA、Hibernate等)。常见的`PlatformTransactionManager`实现包括`DataSourceTransactionManager`(用于JDBC事务)和`JpaTransactionManager`(用于JPA事务)。

2. **TransactionTemplate:**
   
   `TransactionTemplate`是Spring框架中的一个辅助类,它简化了在事务环境中执行操作的编程。`TransactionTemplate`封装了事务的开启、提交、回滚等操作,使你不必手动处理这些事务操作,从而减少了重复的模板代码。它的一些主要功能包括:
   
   - **自动事务处理:** 你可以将需要在事务环境中执行的操作传递给`TransactionTemplate`,它会自动为你管理事务的开启、提交和回滚。
   
   - **异常处理:** `TransactionTemplate`可以捕获操作中的异常,自动将事务标记为回滚状态,以确保在出现异常时事务不会提交,并会回滚到事务开始前的状态。
   
   - **线程绑定:** `TransactionTemplate`会自动处理线程和事务之间的绑定,确保在操作方法中不需要考虑事务的开启和关闭。
   
   通过使用`TransactionTemplate`,你可以更专注于业务逻辑的实现,而无需过多关注事务的细节。

综上所述,`TransactionManager`负责底层事务管理,而`TransactionTemplate`是在上层封装,为事务操作提供了更便捷的方式。你可以根据项目的需要,选择使用`TransactionManager`、`TransactionTemplate`或两者结合使用,以实现有效的事务管理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/966884.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows配置SonarQube代码审查工具详细步骤(附带IDEA SonarLint插件使用)

文章目录 环境说明以及准备一. SonarQube的下载与安装二. 添加SonarQube项目三. 使用Maven命令上传代码到SonarQube四. IDEA安装SonarLint插件 环境说明以及准备 本篇博客使用的SonarQube版本为9.8&#xff0c;注意JDK 1.8已经不能支持 NameVersionDownLoad LinkSonarQube9.8…

Redis缓存和持久化

目录 Redis缓存 什么是缓存 缓存更新策略​编辑 业务场景 缓存穿透 常见的解决方案 缓存雪崩 解决方案 缓存击穿 解决方案 Redis持久化 RDB持久化 执行时机 RDB方式bgsave的基本流程 AOF持久化 RDB和AOF的对比​编辑 Redis主从 数据同步原理 总结 Redis缓存 …

数学建模:灰色预测模型

&#x1f506; 文章首发于我的个人博客&#xff1a;欢迎大佬们来逛逛 数学建模&#xff1a;灰色预测模型 文章目录 数学建模&#xff1a;灰色预测模型灰色预测算法步骤代码实现 灰色预测 三个基本方法&#xff1a; 累加数列&#xff1a;计算一阶累加生成数列 x ( 1 ) ( k ) …

Python安装与Pycharm配置

Python与Pycharm安装 用了一年的Python最近被一个问题难倒了&#xff0c;pip安装一直不能用&#xff0c;报错说被另一个程序使用。被逼到只能重新安装python了&#xff0c;正好记录一下这个过程&#xff0c;写这篇笔记。&#xff08;突然想到可能是配Arcgis的python接口&#…

锐捷盒式交换机S5760C版本U盘升级

1.确认设备当前版本信息 2.将升级文件包放置U盘文件夹中&#xff0c; U盘名称123 &#xff0c; 文件夹名称A 3.查看到升级包后&#xff0c;进行U盘升级 #upgrade usb0:/A/S5760X_RGOS12.5(4)B0702P4_install.bin 4.升级成功后 reload交换机 5.等交换机重启完毕&#xff0c;再次…

零基础学Python:元组(Tuple)详细教程

前言 嗨喽&#xff0c;大家好呀~这里是爱看美女的茜茜呐 Python的元组与列表类似&#xff0c; 不同之处在于元组的元素不能修改, 元组使用小括号,列表使用方括号, 元组创建很简单,只需要在括号中添加元素,并使用逗号隔开即可 &#x1f447; &#x1f447; &#x1f447; 更…

CXL.mem M2S Message 释义

&#x1f525;点击查看精选 CXL 系列文章&#x1f525; &#x1f525;点击进入【芯片设计验证】社区&#xff0c;查看更多精彩内容&#x1f525; &#x1f4e2; 声明&#xff1a; &#x1f96d; 作者主页&#xff1a;【MangoPapa的CSDN主页】。⚠️ 本文首发于CSDN&#xff0c…

ubuntu22.04搭建verilator仿真环境

概述 操作系统为 Ubuntu(22.04.2 LTS)&#xff0c;本次安装verilator开源verilog仿真工具&#xff0c;进行RTL功能仿真。下面构建版本为5.008的verilator仿真环境。先看一下我系统的版本&#xff1a; 安装流程 安装依赖 sudo apt-get install git perl python3 make autoc…

UDP/TCP协议报头详细分析

文章目录 ————————预备知识————————数据段netstatpidof—————UDP协议报头即相关概念分析—————UDP协议端格式UDP 特点全双工send / rec 函数的本质UDP的缓冲区基于UDP的应用层协议—————TCP协议报头即相关概念分析—————TCP格式及解析32位序号…

C++ Primer 第3章 字符串、向量和数组

C Primer 第3章 字符串、向量和数组 3.1 命名空间的using声明一、每个名字都需要独立的using声明二、头文件不应包含using声明三、一点注意事项 3.2 标准库类型string3.2.1 定义和初始化string对象一、直接初始化和拷贝初始化 3.2.2 string对象上的操作一、读写string对象二、读…

微信 小程序 在电脑PC端无法加载的解决办法。电脑微信小程序打不开是怎么回事?电脑微信小程序不能打开解决方法教学

一、电脑微信小程序打不开或者一直在加载的原因&#xff1f; 1、电脑端微信版本未更新 微信版本未及时更新&#xff0c;也会影响小程序的正常打开&#xff0c;可以尝试更新版本。 2、缓存过多 如果电脑缓存文件过多&#xff0c;内存少&#xff0c;也可能导致小程序无法流畅…

qt day 5

1>实现闹钟功能 ---------------------------------------------------------------------- .pro ---------------------------------------------------------------------- QT core gui texttospeechgreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# T…

Tomcat 安装

1.关闭防火墙 2.安装JDK包 3. 4。添加环境变量 5.刷新配置文件 6.解压文件 7.启动tomcat 8. 9.编写tomcat.service文件 vim /etc/systemd/system/tomcat.service 10.刷新服务 11.打开浏览器访问&#xff1a;192.168.2.100:8080/&#xff0c;正常可以看到以下界面

虚拟世界指南:从零开始,一步步教你安装、配置和使用VMware,镜像ISO文件!

本章目录 CentOS简介镜像下载一、新建虚拟机&#xff08;自定义&#xff09;1、进入主页&#xff0c;在主页中点击“创建新的虚拟机”2、点击创建虚拟机创建自己的虚拟机。可以选择自定义3、在“硬件兼容性(H)中选择&#xff1a;Workststion 15.x” ->下一步4、选择“稍后安…

ARTS打卡第三周之有序链表的合并、gdb中run命令、数制建议、WOOP思维心理学分享

Algorithm 题目&#xff1a;两个有序链表的合并 自己的分析见博客《合并两个有序链表》 Review 《run command》是我这周读的英文文章。 在gdb中&#xff0c;run命令在不设置断点的前提下&#xff0c;能够直接把程序运行完成&#xff1b;要是设置断点的话&#xff0c;可以直…

知识图谱项目实践

目录 步骤 SpaCy Textacy——Text Analysis for Cybersecurity Networkx Dateparser 导入库 写出页面的名称 ​编辑 自然语言处理 词性标注 可能标记的完整列表 依存句法分析&#xff08;Dependency Parsing&#xff0c;DEP&#xff09; 可能的标签完整列表 实例理…

SEAN代码(1)

代码地址 首先定义一个trainer。 trainer Pix2PixTrainer(opt)在Pix2PixTrainer内部&#xff0c;首先定义Pix2PixModel模型。 self.pix2pix_model Pix2PixModel(opt)在Pix2PixModel内部定义生成器&#xff0c;判别器。 self.netG, self.netD, self.netE self.initialize_…

11.Redis的慢操作之rehash

Redis为什么快 它接收到一个键值对操作后&#xff0c;能以微秒级别的速度找到数据&#xff0c;并快速完成操作。 数据库这么多&#xff0c;为啥 Redis 能有这么突出的表现呢&#xff1f; 内存数据结构 一方面&#xff0c;这是因为它是内存数据库&#xff0c;所有操作都在内存上…

Redis—常用数据结构

Redis—常用数据结构 &#x1f50e;数据结构与内部编码 Redis 中常用的数据结构包括 Strings—字符串Hashes—哈希表Lists—列表Sets—集合Sorted sets—有序集合 Redis 底层在实现上述数据结构时, 会在源码层面针对上述实现进行特定优化, 以达到节省时间 / 节省空间的效果 …

卡片介绍、EMV卡组织、金融认证---安全行业基础篇2

一、卡片介绍 卡片是一种用于存储和传输数据的可携带式物品&#xff0c;通常由塑料或纸质材料制成。卡片通常具有特定的尺寸和形状&#xff0c;以适应各类读写设备。不同类型的卡片可以用于不同的应用&#xff0c;如身份验证、支付、门禁控制等。 接触卡 接触卡是一种需要与读…