多线程事务?拿捏!

news2025/4/25 11:48:52

场景:有一批1万或者10万数据,插入数据库,怎么做

事务中进行批量提交

publ
    List<List<OrderPo>> partition = Lists.partition(list, 450);
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    // 顺序插入
    for (List<OrderPo> sub : partition) {
        orderMapper.batchSave(sub);
    }
    stopWatch.stop();
    log.info("耗时:" + stopWatch.getTotalTimeSeconds());
}

得出来的结果是 1万数据大概在5-6秒,10万数据在53-58秒

线程池并行插入

// 线程池
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), new ThreadFactory() {
    // 线程名字
    private final String PREFIX = "BATCH_INSERT_";
    // 计数器
    private AtomicLong atomicLong = new AtomicLong();
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(null, r, PREFIX + atomicLong.incrementAndGet());
        return thread;
    }
});
@SneakyThrows
@Transactional(rollbackFor = Exception.class)
public void batchSave() {
    // 分批
    // 至于分多少批:PgSQL 的占位符个数是有限制的 不能超过 Short.MAX(32767)
    // 所以一批最多 = 32767 / 你的一行字段个数
    // 比如我这里 = 32767 / 66个字段 = 496 也就是一批最多496个数据
    List<List<OrderPo>> partition = Lists.partition(list, 450);
    CountDownLatch countDownLatch = new CountDownLatch(partition.size());
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    // 顺序插入
    for (List<OrderPo> sub : partition) {
        THREAD_POOL_EXECUTOR.execute(() -> {
            try {
                log.info("线程:{}开始处理", Thread.currentThread().getName());
                orderMapper.batchSave(sub);
            } finally {
                countDownLatch.countDown();
            }
        });
    }
    // 等待插入完毕
    countDownLatch.await();
    stopWatch.stop();
    log.info("耗时:" + stopWatch.getTotalTimeSeconds());
}

这种方式会导致事务失效从而导致部分数据的丢失,因为内部通过线程池提交了多个子任务,这些子任务是异步执行的,事务的传播机制和线程的隔离性导致事务上下文不会传播到这些异步线程中

原因刨析:

  1. @Transactional是作用在当前线程的,事务上下文不会传播到其他线程去
  2. 子线程的批量保存操作是独立执行的,不受主线程事务控制

线程池并行插入但共用一个事务

实际上就是通过编程式事务来解决事务上下文不传播的问题,这种方式灵活性就高很多了,毕竟是在代码里直接编码,但是可扩展性一般

// 线程池
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), new ThreadFactory() {
    // 线程名字
    private final String PREFIX = "BATCH_INSERT_";
    // 计数器
    private AtomicLong atomicLong = new AtomicLong();
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(null, r, PREFIX + atomicLong.incrementAndGet());
        return thread;
    }
});
@SneakyThrows
public void batchSave() {
    // 分批
    // 至于分多少批:PgSQL 的占位符个数是有限制的 不能超过 Short.MAX(32767)
    // 所以一批最多 = 32767 / 你的一行字段个数
    // 比如我这里 = 32767 / 66个字段 = 496 也就是一批最多496个数据
    List<List<OrderPo>> partition = Lists.partition(list, 450);
    // 手动事务提前创建出来
    DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
    transactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
    // 提前获取连接
    TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
    // 获取数据源以及连接 供多线程使用
    DataSource dataSource = dataSourceTransactionManager.getDataSource();
    Object resource = TransactionSynchronizationManager.getResource(dataSource);
    // 异常标志
    AtomicBoolean exceptionFlag = new AtomicBoolean(false);
    boolean poolExceptionFlag = false;
    // 计数器等待执行完毕
    CountDownLatch countDownLatch = new CountDownLatch(partition.size());
    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    // 顺序插入
    for (List<OrderPo> sub : partition) {
        try {
            THREAD_POOL_EXECUTOR.execute(() -> {
                try {
                    // 如果没有发生异常
                    if (exceptionFlag.get()) {
                        log.info("有其他线程执行失败,后续无需执行,因为最终会回滚");
                        return;
                    }
                    // 释放上次绑定的数据源连接
                    try {
                        TransactionSynchronizationManager.unbindResource(dataSource);
                    } catch (Exception ignored){
                    }
                    // 装上本次使用的连接
                    TransactionSynchronizationManager.bindResource(dataSource, resource);
                    log.info("线程:{}开始处理", Thread.currentThread().getName());
                    // 执行插入
                    orderMapper.batchSave(sub);
                    // 模拟异常
                    if (ThreadLocalRandom.current().nextInt(3) == 1) {
                        int i = 1/0;
                    }
                } catch (Exception e) {
                    // 发生异常设置异常标志
                    log.error(String.format("线程:%s我发生了异常,e:%s", Thread.currentThread().getName(), e.getMessage()), e);
                    exceptionFlag.set(true);
                } finally {
                    // 不管是成功还是失败 都要计数器 -1
                    countDownLatch.countDown();
                }
            });
        } catch (Exception e) {
            // 提交任务失败 那就是失败了
            exceptionFlag.set(true);
            log.info("当前线程池繁忙,请稍后重试");
            dataSourceTransactionManager.rollback(transactionStatus);
            poolExceptionFlag = true;
            break;
        }
    }
    // 等待执行完毕  这里有个隐患  等待多长时间呢? 线程池任务过多的话最严重的情况 就是一直要在这里阻塞
    // 因为事务的提交还是回滚都交给了 主任务线程
    // 如果提交到线程池都成功了的话 就等待都执行完
    if (!poolExceptionFlag) {
        countDownLatch.await();
    }
    // 异常标志来做提交还是回滚
    if (exceptionFlag.get()) {
        // 发生异常 回滚
        dataSourceTransactionManager.rollback(transactionStatus);
    } else {
        // 未发生异常 可以提交
        dataSourceTransactionManager.commit(transactionStatus);
    }
    stopWatch.stop();
    log.info("耗时:" + stopWatch.getTotalTimeSeconds());
}

@Async + @Transactional结合实现

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;

    // 主方法,负责分批并启动异步任务
    @Transactional(rollbackFor = Exception.class)
    public void batchSave(List<OrderPo> list) throws InterruptedException {
        // 分批
        List<List<OrderPo>> partition = Lists.partition(list, 450);
        CountDownLatch countDownLatch = new CountDownLatch(partition.size());
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        // 启动异步任务
        List<CompletableFuture<Void>> futures = partition.stream()
                .map(subList -> CompletableFuture.runAsync(() -> batchSaveAsync(subList, countDownLatch)))
                .collect(Collectors.toList());

        // 等待所有异步任务完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0]));

        try {
            // 等待所有异步任务完成,设置超时时间避免无限等待
            allFutures.get(60, TimeUnit.SECONDS); // 设置合理的超时时间
        } catch (Exception e) {
            throw new RuntimeException("Batch save failed", e);
        }

        stopWatch.stop();
        System.out.println("Total time taken: " + stopWatch.getTotalTimeSeconds());
    }

    // 异步执行的子任务
    @Async
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    public void batchSaveAsync(List<OrderPo> subList, CountDownLatch countDownLatch) {
        try {
            System.out.println("Thread: " + Thread.currentThread().getName() + " is processing batch");
            orderMapper.batchSave(subList);
            // 模拟异常
            if (ThreadLocalRandom.current().nextInt(3) == 1) {
                throw new RuntimeException("Simulated exception in batch save");
            }
        } catch (Exception e) {
            System.err.println("Exception occurred in thread: " + Thread.currentThread().getName() + ", " + e.getMessage());
            throw e; // 异常会触发事务回滚
        } finally {
            countDownLatch.countDown();
        }
    }
}
  • 事务传播机制:是指当一个事务方法被另一个事务方法调用时,这个事务方法应该如何进行事务控制。例如,常见的事务传播行为有 REQUIRED(如果当前没有事务,就新建一个事务;如果已经存在一个事务,就加入到这个事务中)、REQUIRES_NEW(新建事务,如果当前存在事务,就把当前事务挂起)等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2342393.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity InputSystem触摸屏问题

最近把Unity打包后的windows软件放到windows触摸屏一体机上测试&#xff0c;发现部分屏幕触摸点击不了按钮&#xff0c;测试了其他应用程序都正常。 这个一体机是这样的&#xff0c;一个电脑机箱&#xff0c;外接一个可以触摸的显示屏&#xff0c;然后UGUI的按钮就间歇性点不了…

Linux Awk 深度解析:10个生产级自动化与云原生场景

看图猜诗&#xff0c;你有任何想法都可以在评论区留言哦~ 摘要 Awk 作为 Linux 文本处理三剑客中的“数据工程师”&#xff0c;凭借字段分割、模式匹配和数学运算三位一体的能力&#xff0c;成为处理结构化文本&#xff08;日志、CSV、配置文件&#xff09;的终极工具。本文聚…

免费版还是专业版?Dynadot 域名邮箱服务选择指南

关于Dynadot Dynadot是通过ICANN认证的域名注册商&#xff0c;自2002年成立以来&#xff0c;服务于全球108个国家和地区的客户&#xff0c;为数以万计的客户提供简洁&#xff0c;优惠&#xff0c;安全的域名注册以及管理服务。 Dynadot平台操作教程索引&#xff08;包括域名邮…

旋转磁体产生的场-对导航姿态的影响

pitch、yaw、roll是描述物体在空间中旋转的术语&#xff0c;通常用于计算机图形学或航空航天领域中。这些术语描述了物体绕不同轴旋转的方式&#xff1a; Pitch&#xff08;俯仰&#xff09;&#xff1a;绕横轴旋转&#xff0c;使物体向前或向后倾斜。俯仰角度通常用来描述物体…

Day11(回溯法)——LeetCode79.单词搜索

1 前言 今天主要刷了一道热题榜中回溯法的题&#xff0c;现在的计划是先刷热题榜专题吧&#xff0c;感觉还是这样见效比较快。因此本文主要介绍LeetCode79。 2 LeetCode79.单词搜索(LeetCode79) OK题目描述及相关示例如下&#xff1a; 2.1 题目分析解决及优化 感觉回溯的方…

PostgreSQL 分区表——范围分区SQL实践

PostgreSQL 分区表——范围分区SQL实践 1、环境准备1-1、新增原始表1-2、执行脚本新增2400w行1-3、创建pg分区表-分区键为创建时间1-4、创建24年所有分区1-5、设置默认分区&#xff08;兜底用&#xff09;1-6、迁移数据1-7、创建分区表索引 2、SQL增删改查测试2-1、查询速度对比…

SpringCloud 微服务复习笔记

文章目录 微服务概述单体架构微服务架构 微服务拆分微服务拆分原则拆分实战第一步&#xff1a;创建一个新工程第二步&#xff1a;创建对应模块第三步&#xff1a;引入依赖第四步&#xff1a;被配置文件拷贝过来第五步&#xff1a;把对应的东西全部拷过来第六步&#xff1a;创建…

【Python爬虫基础篇】--4.Selenium入门详细教程

先解释&#xff1a;Selenium&#xff1a;n.硒&#xff1b;硒元素 目录 1.Selenium--简介 2.Selenium--原理 3.Selenium--环境搭建 4.Selenium--简单案例 5.Selenium--定位方式 6.Selenium--常用方法 6.1.控制操作 6.2.鼠标操作 6.3.键盘操作 6.4.获取断言信息 6.5.…

Langchain检索YouTube字幕

创建一个简单搜索引擎&#xff0c;将用户原始问题传递该搜索系统 本文重点&#xff1a;获取保存文档——保存向量数据库——加载向量数据库 专注于youtube的字幕&#xff0c;利用youtube的公开接口&#xff0c;获取元数据 pip install youtube-transscript-api pytube 初始化 …

【Linux网络】应用层自定义协议与序列化及Socket模拟封装

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;博客仓库&#xff1a;https://gitee.com/JohnKingW/linux_test/tree/master/lesson &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &…

客户案例:西范优选通过日事清实现流程与项目管理的优化

近几年来&#xff0c;新零售行业返璞归真&#xff0c;从线上销售重返线下发展&#xff0c;满足消费者更加多元化的需求&#xff0c;国内家居集合店如井喷式崛起。为在激烈的市场竞争中立于不败之地&#xff0c;西范优选专注于加强管理能力、优化协作效率的“内功修炼”&#xf…

LabVIEW实现Voronoi图绘制功能

该 LabVIEW 虚拟仪器&#xff08;VI&#xff09;借助 MathScript 节点&#xff0c;实现基于手机信号塔位置计算 Voronoi 图的功能。通过操作演示&#xff0c;能直观展示 Voronoi 图在空间划分上的应用。 各部分功能详细说明 随机地形创建部分 功能&#xff1a;根据 “Maximum a…

爬虫学习——获取动态网页信息

对于静态网页可以直接研究html网页代码实现内容获取&#xff0c;对于动态网页绝大多数都是页面内容是通过JavaScript脚本动态生成(也就是json数据格式)&#xff0c;而不是静态的&#xff0c;故需要使用一些新方法对其进行内容获取。凡是通过静态方法获取不到的内容&#xff0c;…

创新项目实训开发日志4

一、开发简介 核心工作内容&#xff1a;logo实现、注册实现、登录实现、上传gitee 工作时间&#xff1a;第十周 二、logo实现 1.设计logo 2.添加logo const logoUrl new URL(/assets/images/logo.png, import.meta.url).href <div class"aside-first">…

常见接口测试常见面试题(JMeter)

JMeter 是 Apache 提供的开源性能测试工具&#xff0c;主要用于对 Web 应用、REST API、数据库、FTP 等进行性能、负载和功能测试。​它支持多种协议&#xff0c;如 HTTP、HTTPS、JDBC、SOAP、FTP 等。 在一个线程组中&#xff0c;JMeter 的执行顺序通常为&#xff1a;配置元件…

计算机组成与体系结构:缓存(Cache)

目录 为什么需要 Cache&#xff1f; &#x1f9f1; Cache 的分层设计 &#x1f539; Level 1 Cache&#xff08;L1 Cache&#xff09;一级缓存 &#x1f539; Level 2 Cache&#xff08;L2 Cache&#xff09;二级缓存 &#x1f539; Level 3 Cache&#xff08;L3 Cache&am…

Flutter 在全新 Platform 和 UI 线程合并后,出现了什么大坑和变化?

Flutter 在全新 Platform 和 UI 线程合并后&#xff0c;出现了什么大坑和变化&#xff1f; 在两个月前&#xff0c;我们就聊过 3.29 上《Platform 和 UI 线程合并》的具体原因和实现方式&#xff0c;而事实上 Platform 和 UI 线程合并&#xff0c;确实为后续原生语言和 Dart 的…

stm32之GPIO函数详解和上机实验

目录 1.LED和蜂鸣器1.1 LED1.2 蜂鸣器 2.实验2.1 库函数&#xff1a;RCC和GPIO2.1.1 RCC函数1. RCC_AHBPeriphClockCmd2. RCC_APB2PeriphClockCmd3. RCC_APB1PeriphClockCmd 2.1.2 GPIO函数1. GPIO_DeInit2. GPIO_AFIODeInit3. GPIO_Init4. GPIO_StructInit5. GPIO_ReadInputDa…

用 PyQt5 和 asyncio 打造接口并发测试 GUI 工具

接口并发测试是测试工程师日常工作中的重要一环&#xff0c;而一个直观的 GUI 工具能有效提升工作效率和体验。本篇文章将带你用 PyQt5 和 asyncio 从零实现一个美观且功能实用的接口并发测试工具。 我们将实现以下功能&#xff1a; 请求方法选择器 添加了一个下拉框 QComboBo…

Qt实战之将自定义插件(minGW)显示到Qt Creator列表的方法

Qt以其强大的跨平台特性和丰富的功能&#xff0c;成为众多开发者构建图形用户界面&#xff08;GUI&#xff09;应用程序的首选框架。而在Qt开发的过程中&#xff0c;自定义插件能够极大地拓展应用程序的功能边界&#xff0c;让开发者实现各种独特的、个性化的交互效果。想象一下…