多线程事务

news2024/11/25 11:52:47

一、业务场景

        我们在工作中经常会到往数据库里插入大量数据的工作,但是既需要保证数据的一致性,又要保证程序执行的效率。因此需要在多线程中使用事务,这样既可以保证数据的一致性,又能保证程序的执行效率。但是spring自带的@Transactional注解无法满足多线程间的事务一致性,因为这几个事务执行的线程不同,无法保持数据的一致性。

二、解决方案

        我的解决方案参考分布式事务2PC(Two-phase commit protocol),各个线程需要等待所有的线程执行完成后才能进行下一步操作,在使用线程池执行任务时,如果线程池的最大线程数小于任务列表的数量,就会发生“死锁”,即获取到线程的任务阻塞等待没有获取线程的任务执行完成,而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”,超时机制会发送回滚命令,线程池收到后进行回滚,但这种情况任务始终无法提交,再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷,可以先看下面具体代码实现再来结合这段文字思考。

 

1、工具类代码: 

import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author poxiao
 * @create 2023-01-05 22:22
 * <p>
 * 多线程事务管理器
 * 基于分布式事务思想,采用2PC(Two-phase commit protocol)协议
 * 解决基于线程池的多线程事务一致性问题
 */
@Slf4j
public class MultiThreadingTransactionManager {

    /**
     * 事务管理器
     */
    private final PlatformTransactionManager transactionManager;

    /**
     * 超时时间
     */
    private final long timeout;

    /**
     * 时间单位
     */
    private final TimeUnit unit;

    /**
     * 一阶段门闩,(第一阶段的准备阶段),当所有子线程准备完成时(除“提交/回滚”操作以外的工作都完成),countDownLatch的值为0
     */
    private CountDownLatch oneStageLatch = null;

    /**
     * 二阶段门闩,(第二阶段的执行执行),主线程将不再等待子线程执行,直接判定总的任务执行失败,执行第二阶段让等待确认的线程进行回滚
     */
    private final CountDownLatch twoStageLatch = new CountDownLatch(1);

    /**
     * 是否提交事务,默认是true(当任一线程发生异常时,isSubmit会被设置为false,即回滚事务)
     */
    private final AtomicBoolean isSubmit = new AtomicBoolean(true);

    /**
     * 构造方法
     * @param transactionManager 事务管理器
     * @param timeout 超时时间
     * @param unit 时间单位
     */
    public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {
        this.transactionManager = transactionManager;
        this.timeout = timeout;
        this.unit = unit;
    }

    /**
     * 线程池方式执行任务,可保证线程间的事务一致性
     * @param runnableList 任务列表
     * @param executor 线程池
     * @return
     */
    public boolean execute(List<Runnable> runnableList, Executor executor) {

        // 排除null值
        runnableList.removeAll(Collections.singleton(null));

        // 属性初始化
        innit(runnableList.size());

        // 遍历任务列表并放入线程池
        for (Runnable runnable : runnableList) {
            // 创建线程
            Thread thread = new Thread() {
                @Override
                public void run() {
                    // 如果别的线程执行失败,则该任务就不需要再执行了
                    if (!isSubmit.get()) {
                        log.info("当前子线程执行中止,因为线程事务中有子线程执行失败");
                        oneStageLatch.countDown();
                        return;
                    }
                    // 开启事务
                    TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());
                    try {
                        // 执行业务逻辑
                        runnable.run();
                    } catch (Exception e) {
                        // 执行体发生异常,设置回滚
                        isSubmit.set(false);
                        log.error("线程{}:业务发生异常,执行体:{}", Thread.currentThread().getName(), runnable);
                    }
                    // 计数器减一
                    oneStageLatch.countDown();
                    try {
                        //等待所有线程任务完成,监控是否有异常,有则统一回滚
                        twoStageLatch.await();
                        // 根据isSubmit值判断事务是否提交,可能是子线程出现异常,也有可能是子线程执行超时
                        if (isSubmit.get()) {
                            // 提交
                            transactionManager.commit(transactionStatus);
                            log.info("线程{}:事务提交成功,执行体:{}", Thread.currentThread().getName(), runnable);
                        } else {
                            // 回滚
                            transactionManager.rollback(transactionStatus);
                            log.info("线程{}:事务回滚成功,执行体:{}", Thread.currentThread().getName(), runnable);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            executor.execute(thread);
        }

        /**
         * 主线程担任协调者,当第一阶段所有参与者准备完成,oneStageLatch的计数为0
         * 主线程发起第二阶段,执行阶段(提交或回滚),根据
         */
        try {
            // 主线程等待所有线程执行完成,超时时间设置为五秒
            oneStageLatch.await(timeout, unit);
            long count = oneStageLatch.getCount();
            System.out.println("countDownLatch值:" + count);
            // 主线程等待超时,子线程可能发生长时间阻塞,死锁
            if (count > 0) {
                // 设置为回滚
                isSubmit.set(false);
                log.info("主线线程等待超时,任务即将全部回滚");
            }
            twoStageLatch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败
        return isSubmit.get();
    }

    /**
     * 初始化属性
     * @param size 任务数量
     */
    private void innit(int size) {
        oneStageLatch = new CountDownLatch(size);
    }
}

2、业务代码:

(1)线程池参数

我这里采用自定义线程池,线程池参数如下:

@Configuration
public class ThreadPoolConfig {
    // 获取服务器的cpu个数
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数
    private static final int COUR_SIZE = CPU_COUNT * 4;
    private static final int MAX_COUR_SIZE = CPU_COUNT * 8;

    // 接下来配置一个bean,配置线程池。
    @Bean
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 设置核心线程数
        threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置最大线程数
        threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 4);// 配置队列容量(这里设置成最大线程数的四倍)
        threadPoolTaskExecutor.setThreadNamePrefix("thirdParty-thread");// 给线程池设置名称
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略
        return threadPoolTaskExecutor;
    }

}

(2)任务业务正常,无异常抛出时正常提交事务情况

public Result<?> testTransaction() throws SQLException {

        List<User> users = new LinkedList<>();
        User user = new User();
        user.setName("1111");
        users.add(user);
        User user1 = new User();
        user1.setName("2222");
        users.add(user1);
        MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);
        List<Runnable> runnableList = new ArrayList<>();
        users.forEach((x) -> {
            runnableList.add(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);
                    secondUserMapper.insertUser(x);
                }
            });
        });
        multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);

        return Result.success(1);
    }

执行时的日志: 

执行成功后数据库多次了两条数据 

 (3)展示出现异常任务时回滚事务情况

 public Result<?> testTransaction() throws SQLException {

        List<User> users = new LinkedList<>();
        User user = new User();
        user.setName("1111");
        users.add(user);
        User user1 = new User();
        user1.setName("2222");
        users.add(user1);
        MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);
        List<Runnable> runnableList = new ArrayList<>();
         //模拟任务出现异常
         runnableList.add(() -> {
             int a = 10 / 0;
         });
        users.forEach((x) -> {
            runnableList.add(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);
                    secondUserMapper.insertUser(x);
                }
            });
        });
        multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);

        return Result.success(1);
    }

执行时的日志:  

数据库没有新增的数据 

 

参考文章: 

Spring多线程事务解决方案-CSDN博客

 两阶段VS三阶段提交协议_两阶段提交-CSDN博客

详解Spring多线程下如何保证事务的一致性-51CTO.COM 

多线程结合sprongboot事务(完善)_springboot多线程事务-CSDN博客 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1694180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习系列】使用高斯贝叶斯模型进行数据分类的完整流程

目录 一、导入数据 二、选择特征 三、十折交叉验证 四、划分训练集和测试集 五、训练高斯贝叶斯模型 六、预测测试集 七、查看训练集和测试集上的分数 八、查看混合矩阵 九、输出评估指标 一、导入数据 # 根据商户数据预测其是否续约案例 import pandas #读取数据到 da…

【PostgreSQL001】比较开发生产2个数据库结构方法

1.一直以来想写下基于PostgreSQL的系列文章&#xff0c;作为较火的数据ETL工具&#xff0c;也是日常项目开发中常用的一款工具&#xff0c;最近刚好挤时间梳理、总结下这块儿的知识体系。 2.熟悉、梳理、总结下PostgreSQL数据库相关知识体系。 3.欢迎批评指正&#xff0c;欢迎关…

《AI学习笔记》大模型-微调/训练区别以及流程

阿丹&#xff1a; 之前一直对于大模型的微调和训练这两个名词不是很清晰&#xff0c;所有找了一个时间来弄明白到底有什么区别以及到底要怎么去使用去做。并且上手实践一下。 大模型业务全流程&#xff1a; 大模型为啥要微调&#xff1f;有哪些微调方式&#xff1f; 模型参数…

【机器学习】大模型在机器学习中的应用:从深度学习到生成式人工智能的演进

&#x1f512;文章目录&#xff1a; &#x1f4a5;1.引言 ☔2.大模型概述 &#x1f6b2;3.大模型在深度学习中的应用 &#x1f6f4;4.大模型在生成式人工智能中的应用 &#x1f44a;5.大模型的挑战与未来展望 &#x1f4a5;1.引言 随着数据量的爆炸性增长和计算能力的提…

电信光猫的USB存储对外网开放访问

前提条件当然是要有公网IP地址了&#xff0c;没有的话去找电信索要&#xff0c;然后可以使用动态域名正常访问。 我的电信光猫发现共享访问速度还可以&#xff0c;会有31M/s左右的写入速度 但是有一个不方便的是&#xff0c;无法从外网提供访问&#xff0c;SMB协议所用的445端…

国产信创数据库:使用MySQL等开源产品能做信创替换吗?

随着信创关键行业替代加速推进&#xff0c;多数企业习惯原来标配即&#xff1a;centosmysql等开源产品&#xff0c;而大家讨论核心焦点在于“什么是信创数据库”&#xff0c;使用 MySQL 能做信创替换吗&#xff1f;基于开源二开的数据库算信创库吗&#xff1f;等等。想来这个问…

常见算法(3)

1.Arrays 它是一个工具类&#xff0c;主要掌握的其中一个方法是srot&#xff08;数组&#xff0c;排序规则&#xff09;。 o1-o2是升序排列&#xff0c;o2-o1是降序排列。 package test02; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparat…

WordPress 发布了独立的 SQLite 插件

之前 WordPress 在官方的 Performance Lab 插件实现 SQLite 模块&#xff0c;现在重构 SQLite 的实现&#xff0c;并且将其发布成一个独立的插件&#xff1a;SQLite Database Integration。 独立 SQLite 插件 最初的功能模块实现是基于 aaemnnosttv 的 wp-sqlite-db 插件修改实…

linux文件权限常用知识点,基于Linux(openEuler、CentOS8)

目录 知识点常用实例 知识点 真实环境文件显示 解读 常用实例 文件所有者 chown -R nginx:nginx /home/source目录权限(R选填必须大写<遍历子文件夹及文件>) chmod -R 755 /home/sourcechmod -R 777 /home/source

原生js实现拖拽改变元素顺序

代码展示如下&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title>…

基于HTML5和CSS3搭建一个Web网页(二)

倘若代码中有任何问题或疑问&#xff0c;欢迎留言交流~ 网页描述 创建一个包含导航栏、主内容区域和页脚的响应式网页。 需求: 导航栏: 在页面顶部创建一个导航栏&#xff0c;包含首页、关于我们、服务和联系我们等链接。 设置导航栏样式&#xff0c;包括字体、颜色和背景颜…

2024年上半年软件系统架构师论文【回忆版】

文章目录 考试时间考试地点案例分析1、微服务架构的优点和缺点2、质量属性的6个元素3、分布式锁 Redis的缺点4、MongoDB 存储矢量图的优势 论文回忆版论文一、论单元测试的设计与应用论文二、论大数据模型的设计与应用论文三、论模型驱动的架构设计及应用论文四、论云原生运维的…

第十节 SpringBoot Starter 实战之 redis 滑动窗口

使用 redis 实现滑动窗口&#xff0c;我们会基于这个场景&#xff0c;建立一个 Starter&#xff0c;在这之前&#xff0c;我们需要先。理解这个场景。 关键字&#xff1a;滑动窗口、流式计算、lua脚本、redis、zset、starter 概要&#xff1a;本文封装 redis 的API&#xff0c…

内网渗透(不出网上线CS)

目录 CS的概述 实验&#xff1a;不出网上线CS实验 一&#xff1a;给PC1种马 二&#xff1a;使用Beacon SMB去控制PC2。 三&#xff1a;将CS权限传递给MSF 四&#xff1a;将msf权限传递给CS CS的概述 cs是一款强大的控制windows木马的工具。是目前渗透中常使用的一个工具…

Pandas高效数据清洗与转换技巧指南【数据预处理】

三、数据处理 1.合并数据&#xff08;join、merge、concat函数&#xff0c;append函数&#xff09; Concat()函数使用 1.concat操作可以将两个pandas表在垂直方向上进行粘合或者堆叠。 join属性为outer&#xff0c;或默认时&#xff0c;返回列名并集&#xff0c;如&#xff…

Leetcode - 398周赛

目录 一&#xff0c;3151. 特殊数组 I 二&#xff0c;3152. 特殊数组 II 三&#xff0c;3153. 所有数对中数位不同之和 四&#xff0c;3154. 到达第 K 级台阶的方案数 一&#xff0c;3151. 特殊数组 I 本题就是判断一个数组是否是奇偶相间的&#xff0c;如果是&#xff0c;…

开源的在线JSON数据可视化编辑器jsoncrack本地部署与远程访问

文章目录 1. 在Linux上使用Docker安装JSONCrack2. 安装Cpolar内网穿透工具3. 配置JSON Crack界面公网地址4. 远程访问 JSONCrack 界面5. 固定 JSONCrack公网地址 JSON Crack 是一款免费的开源数据可视化应用程序&#xff0c;能够将 JSON、YAML、XML、CSV 等数据格式可视化为交互…

一篇文章讲透排序算法之希尔排序

希尔排序是对插入排序的优化&#xff0c;如果你不了解插入排序的话&#xff0c;可以先阅读这篇文章&#xff1a;插入排序 目录 1.插入排序的问题 2.希尔排序的思路 3.希尔排序的实现 4.希尔排序的优化 5.希尔排序的时间复杂度 1.插入排序的问题 如果用插入排序对一个逆序…

结构体;结构成员访问操作符

结构体&#xff1a; 虽然c语言已经提供了内置类型&#xff0c;比如&#xff1a;char、short、int、long等&#xff0c;但还是不够用&#xff0c;就好比我描述一个人&#xff0c;我需要描述他的身高&#xff0c;体重&#xff0c;年龄&#xff0c;名字等信息&#xff0c…

手把手一起学习Python NumPy

NumPy 是用于处理数组的 python 库&#xff0c;NumPy 中的数组对象称为 ndarray&#xff0c;它提供了许多支持函数&#xff0c;使得利用 ndarray 非常容易。Numpy官方网址 NumPy 安装 使用pip安装NumPy 模块&#xff1a; pip install numpyNumPy 入门 创建numpy数组&#x…