多线程模式下保证事物的一致性

news2024/12/24 10:25:31

目录

  • 前置
  • InsertBatchSuccessServiceImpl.java
  • InsertBatchErrorServiceImpl.java
  • 效果图


前置

在一些特殊的场景下, 我们需要一些特定的操作.
比如我有一个接口, 做如下操作, 需要保持事物的一致性, 即: 全部成功则提交, 一个异常则全部回滚:
1.insert订单、(耗时1秒)
2.insert订单商品、(耗时1秒)
3.insert子订单、(耗时1秒)
4.insert操作记录、(耗时1秒)
在这波insert操作下来, 就需要花费4秒钟, 那么我们是否可以采用异步的方式进行保存, 将时间保持在1秒钟, 并保持事物一致性
故有了下面的方法

项目地址: https://gitee.com/xmaxm/test-code/tree/master/chaim-mybatis-plus

大致思路

要做异步操作, 就得做多线程, 但是事物是和线程是绑定在一起的,
同时我们知道, commit和rollback是和DML语句一起使用的, 也就是我们能知道这条SQL是成功还是失败
通过上面, 我们就可以进行线程等待, 在所有的DML语句执行之后, 统一进行commit还是rollback, 执行快的等待执行慢的, 当同时OK就进行统一操作
就可采用下面所列的方式, 当然还有很多别的方式也可以进行

强调:

多测, 做线程循环测试跑. 比如AB, JMeter, apifox, 下面列举的InsertBatchErrorServiceImpl在普通测试过程中不会出现问题, 但是当次数过多就会出现无法唤醒的情况. 一定要测试, 有些场景需要多跑几遍才能够进行重现

代码部分

InsertBatchSuccessServiceImpl:
该实现采用的是CountDownLatch, countDown()递减锁的数量, await()等待直到当前计数器数量为0, 释放所有等待线程

InsertBatchErrorServiceImpl:
该实现采用的 LockSupport.park()悬停、LockSupport.unpark(thread)唤醒. 但在实际使用过程中发现会出现无法唤醒的情况, 我发布了问题(可供参考 ), 但是目前还没有得到解决, 不得不暂时放弃


InsertBatchSuccessServiceImpl.java

package com.chaim.mybatis.service.impl;

import com.chaim.mybatis.converter.SysUserConverter;
import com.chaim.mybatis.dto.SysUserDTO;
import com.chaim.mybatis.entitys.SysUser;
import com.chaim.mybatis.mappers.SysUserMapper;
import com.chaim.mybatis.service.InsertBatchService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @author Chaim
 * @date 2022/12/24 16:06
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class InsertBatchSuccessServiceImpl implements InsertBatchService {
    private final SysUserMapper sysUserMapper;
    private final SysUserConverter sysUserConverter;
    private final DataSourceTransactionManager dataSourceTransactionManager;

    @Override
    public Object insertBatch(SysUserDTO.InsertSysUserDTO insertSysUserDTO) {
        // 定义开启的线程数
        final int i = 3;
        CountDownLatch latch = new CountDownLatch(i);
        // 事务定义
        DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
        // 所有开启事务的线程中, 是否存在异常
        AtomicBoolean isException = new AtomicBoolean(Boolean.FALSE);

        List<CompletableFuture<Void>> list = new ArrayList<>();
        list.add(this.save(insertSysUserDTO, latch, definition, isException));
        list.add(this.save1(insertSysUserDTO, latch, definition, isException));
        list.add(this.save2(insertSysUserDTO, latch, definition, isException));
        // 调用Future的阻塞接口, 等待全部future实例异步执行结束
        CompletableFuture.allOf(list.toArray(new CompletableFuture[i])).join();

        return "SUCCESS";
    }


    private CompletableFuture<Void> save(SysUserDTO.InsertSysUserDTO insertSysUserDTO, CountDownLatch latch, DefaultTransactionDefinition definition, AtomicBoolean isException) {
        return CompletableFuture.runAsync(() -> {
            // 获得事务状态
            TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);
            SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);
            try {
                sysUser.setPassword("123456");
                sysUser.setSalt(1234);
                sysUserMapper.insert(sysUser);
                this.threadBlocking(latch, status, isException);
            } catch (Exception exception) {
                log.error("方法: [save] 异常: {}", exception.getMessage());
                this.errorRollback(latch, status, isException);
            }
        });

    }

    private CompletableFuture<Void> save1(SysUserDTO.InsertSysUserDTO insertSysUserDTO, CountDownLatch latch, DefaultTransactionDefinition definition, AtomicBoolean isException) {
        return CompletableFuture.runAsync(() -> {
            TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);
            SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);
            try {
                sysUser.setPassword("123456");
                sysUser.setSalt(9876);
                sysUser.setUsername(sysUser.getUsername().concat(": 子线程1"));
                sysUserMapper.insert(sysUser);
                this.threadBlocking(latch, status, isException);
            } catch (Exception exception) {
                log.error("方法: [save1] 异常: {}", exception.getMessage());
                this.errorRollback(latch, status, isException);
            }
        });
    }

    private CompletableFuture<Void> save2(SysUserDTO.InsertSysUserDTO insertSysUserDTO, CountDownLatch latch, DefaultTransactionDefinition definition, AtomicBoolean isException) {
        return CompletableFuture.runAsync(() -> {
            TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);
            SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);
            try {
                sysUser.setPassword("123456");
                sysUser.setSalt(9876);
                sysUser.setUsername(sysUser.getUsername().concat(": 子线程2"));
                sysUserMapper.insert(sysUser);
                this.threadBlocking(latch, status, isException);
            } catch (Exception exception) {
                log.error("方法: [save2] 异常: {}", exception.getMessage());
                this.errorRollback(latch, status, isException);
            }
        });
    }

    /**
     * 进行线程阻塞操作
     *
     * @param latch
     * @param status
     * @param isException
     */
    private void threadBlocking(CountDownLatch latch, TransactionStatus status, AtomicBoolean isException) throws InterruptedException {
        log.info("计数器递减");
        latch.countDown();
        log.info("开始悬停, 剩余计数数量: {}", latch.getCount());
        latch.await();

        // 以下步骤抛出异常进入catch, 做countDown操作, 不会影响. 走到这一步已经没有阻塞了
        if (isException.get()) {
            log.info("开始回滚");
            dataSourceTransactionManager.rollback(status);
        } else {
            log.info("开始提交");
            dataSourceTransactionManager.commit(status);
        }
    }

    /**
     * 程序异常, 进行回滚, 线程唤醒
     *
     * @param latch
     * @param status
     * @param isException
     */
    private void errorRollback(CountDownLatch latch, TransactionStatus status, AtomicBoolean isException) {
        // 设定线程中存在异常信息
        isException.set(Boolean.TRUE);
        latch.countDown();
        log.info("开始回滚, 程序异常, 计数器递减, 剩余数量: {}", latch.getCount());
        // 本线程回滚
        dataSourceTransactionManager.rollback(status);
    }
}


InsertBatchErrorServiceImpl.java

关于该方式存在的问题, 我已经在问答区提出疑问, 可供参考

package com.chaim.mybatis.service.impl;

import com.chaim.mybatis.converter.SysUserConverter;
import com.chaim.mybatis.dto.SysUserDTO;
import com.chaim.mybatis.entitys.SysUser;
import com.chaim.mybatis.mappers.SysUserMapper;
import com.chaim.mybatis.service.InsertBatchService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

/**
 * @author Chaim
 * @date 2022/12/24 22:06
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class InsertBatchErrorServiceImpl implements InsertBatchService {
    private final SysUserMapper sysUserMapper;
    private final SysUserConverter sysUserConverter;
    private final DataSourceTransactionManager dataSourceTransactionManager;

    @Override
    public Object insertBatch(SysUserDTO.InsertSysUserDTO insertSysUserDTO) {
        // 定义开启的线程数
        AtomicInteger totalThreadCount = new AtomicInteger(3);
        // 事务定义
        DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
        // 所有开启事务的线程中, 是否存在异常
        AtomicBoolean isException = new AtomicBoolean(Boolean.FALSE);
        // 定义没有执行结束的线程集合
        List<Thread> unFinishedThread = Collections.synchronizedList(new ArrayList<>());

        List<CompletableFuture<Void>> list = new ArrayList<>();
        list.add(this.save(insertSysUserDTO, unFinishedThread, totalThreadCount, definition, isException));
        list.add(this.save1(insertSysUserDTO, unFinishedThread, totalThreadCount, definition, isException));
        list.add(this.save2(insertSysUserDTO, unFinishedThread, totalThreadCount, definition, isException));
        // 调用Future的阻塞接口, 等待全部future实例异步执行结束
        CompletableFuture.allOf(list.toArray(new CompletableFuture[totalThreadCount.get()])).join();

        return "SUCCESS";
    }


    private CompletableFuture<Void> save(SysUserDTO.InsertSysUserDTO insertSysUserDTO, List<Thread> unFinishedThread, AtomicInteger totalThreadCount, DefaultTransactionDefinition definition, AtomicBoolean isException) {
        return CompletableFuture.runAsync(() -> {
            // 获得事务状态
            TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);
            SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);
            try {
                sysUser.setPassword("123456");
                sysUser.setSalt(1234);
                sysUserMapper.insert(sysUser);
                this.threadBlocking(Thread.currentThread(), unFinishedThread, totalThreadCount, status, isException);
            } catch (Exception exception) {
                log.error("方法: [save] 异常: {}", exception.getMessage());
                this.errorRollback(unFinishedThread, totalThreadCount, status, isException);
            }
        });

    }

    private CompletableFuture<Void> save1(SysUserDTO.InsertSysUserDTO insertSysUserDTO, List<Thread> unFinishedThread, AtomicInteger totalThreadCount, DefaultTransactionDefinition definition, AtomicBoolean isException) {
        return CompletableFuture.runAsync(() -> {
            TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);
            SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);
            try {
                sysUser.setPassword("123456");
                sysUser.setSalt(9876);
                sysUser.setUsername(sysUser.getUsername().concat(": 子线程1"));
                sysUserMapper.insert(sysUser);
                this.threadBlocking(Thread.currentThread(), unFinishedThread, totalThreadCount, status, isException);
            } catch (Exception exception) {
                log.error("方法: [save1] 异常: {}", exception.getMessage());
                this.errorRollback(unFinishedThread, totalThreadCount, status, isException);
            }
        });
    }

    private CompletableFuture<Void> save2(SysUserDTO.InsertSysUserDTO insertSysUserDTO, List<Thread> unFinishedThread, AtomicInteger totalThreadCount, DefaultTransactionDefinition definition, AtomicBoolean isException) {
        return CompletableFuture.runAsync(() -> {
            TransactionStatus status = dataSourceTransactionManager.getTransaction(definition);
            SysUser sysUser = sysUserConverter.insertUserDTOToSysUser(insertSysUserDTO);
            try {
                sysUser.setPassword("123456");
                sysUser.setSalt(9876);
                sysUser.setUsername(sysUser.getUsername().concat(": 子线程2"));
                sysUserMapper.insert(sysUser);
                this.threadBlocking(Thread.currentThread(), unFinishedThread, totalThreadCount, status, isException);
            } catch (Exception exception) {
                log.error("方法: [save2] 异常: {}", exception.getMessage());
                this.errorRollback(unFinishedThread, totalThreadCount, status, isException);
            }
        });
    }

    /**
     * 进行线程阻塞操作
     *
     * @param thread
     * @param unFinishedThread
     * @param totalThreadCount
     * @param status
     * @param isException
     */
    private void threadBlocking(Thread thread, List<Thread> unFinishedThread, AtomicInteger totalThreadCount, TransactionStatus status, AtomicBoolean isException) {
        // 添加到没有执行结束的线程集合
        unFinishedThread.add(thread);
        // 每个线程都在悬停前开启唤醒检查
        this.notifyAllThread(unFinishedThread, totalThreadCount, false);

        if (isException.get()) {
            log.info("已存在异步任务发生回滚, 当前线程: {}", thread.getName());
            dataSourceTransactionManager.rollback(status);
        } else {
            log.info("线程: {}, 开始悬停", thread.getName());
            LockSupport.park();

            if (isException.get()) {
                log.info("线程: {}, 开始回滚", thread.getName());
                dataSourceTransactionManager.rollback(status);
            } else {
                log.info("线程: {}, 开始提交", thread.getName());
                dataSourceTransactionManager.commit(status);
            }
        }
    }

    /**
     * 程序异常, 进行回滚, 线程唤醒
     *
     * @param unFinishedThread
     * @param totalThreadCount
     * @param status
     * @param isException
     */
    private void errorRollback(List<Thread> unFinishedThread, AtomicInteger totalThreadCount, TransactionStatus status, AtomicBoolean isException) {
        // 设定线程中存在异常信息
        isException.set(Boolean.TRUE);
        // 本线程回滚
        dataSourceTransactionManager.rollback(status);
        // 发生异常, 全部线程进行唤醒
        this.notifyAllThread(unFinishedThread, totalThreadCount, true);
        log.info("异常回滚, 开始全部线程唤醒, 当前线程数量: {}", unFinishedThread.size());
    }

    /**
     * 唤醒全部悬停的线程
     *
     * @param unFinishedThread 手动悬停的线程
     * @param totalThreadCount 全部开启的线程数
     * @param isForce          是否强行操作集合中全部线程
     */
    private void notifyAllThread(List<Thread> unFinishedThread, AtomicInteger totalThreadCount, boolean isForce) {
        if (isForce || unFinishedThread.size() == totalThreadCount.get()) {
            for (Thread thread : unFinishedThread) {
                LockSupport.unpark(thread);
                log.info("线程: [{}]被唤醒", thread.getName());
            }
        }
    }
}


效果图

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/124271.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

4、MYSQL常用函数(字符串函数)

目录 1、concat函数&#xff1a; 2、insert(str,x,y,instr)函数&#xff1a; 3、lower(str)和upper(str)函数&#xff1a; 4、left(str,x)和right(str,x)函数&#xff1a; 5、lpad(str,n,pad) 和rpad(str,n,pad) 函数&#xff1a; 6、ltrim(str)和rtrim(str)函数&#xff…

【大厂高频真题100题】《除自身以外数组的乘积》 真题练习第19题 持续更新~

除自身以外数组的乘积 给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请不要使用除法,且在 O(n) 时间复杂度内完成…

微软补丁包下载

Microsoft Update Cataloghttps://www.catalog.update.microsoft.com/Home.aspxMicrosoft Update CatalogMicrosoft Update Cataloghttps://www.catalog.update.microsoft.com/Home.aspx

城市通信管线资源管理解决方案

01 背景 随着新基建发展&#xff0c;智慧项目如火如荼&#xff0c;网络畅通、高速成为了刚需&#xff0c;城市通信网络管线资源重要性不言而喻&#xff0c;在实际项目中&#xff0c;我们也不难发现有关于“通信网络管线管理”“光缆资源管理”项目愈发增多&#xff0c;大多客户…

(五)JSP

一、JSP 概述 JSP&#xff08;全称&#xff1a;Java Server Pages&#xff09;&#xff1a;Java 服务端页面。是一种动态的网页技术&#xff0c;其中既可以定义HTML、JS、CSS等静态内容&#xff0c;还可以定义Java代码的动态内容&#xff0c;也就是 JSP HTML Java 。二、JSP …

基于注解的AOP之准备工作以及各种通知

目录 基于注解的AOP之准备工作以及各种通知 1. 技术说明 2. 准备工作 ①添加依赖 ②准备被代理的目标资源 3. 创建切面类并配置 在Spring的配置文件中配置&#xff1a; ​​​​​​4. 各种通知 各种通知的执行顺序&#xff1a; 基于注解的AOP之准备工作以及各种通知 …

C++【B树】【B+树】

文章目录一、什么是B树1.为什么要存在B树&#xff1f;2.B树的规则二、B树的插入三、B树的实现时间复杂度四、B树1.B树的分类过程五、B*树六、B树系列的应用1.MyISAM2.InnoDB一、什么是B树 相比于我们别的数据结构&#xff0c;我们的B树更加适合进行外查找 B树也可以进行内查找…

元启发式算法-模拟退火算法MATLAB实现

元启发式算法-模拟退火算法MATLAB实现 模拟退火介绍 模拟退火算法来源于固体退火原理&#xff0c;是一种基于概率的算法&#xff0c;将固体加温至充分高&#xff0c;再让其徐徐冷却&#xff0c;加温时&#xff0c;固体内部粒子随温升变为无序状&#xff0c;内能增大&#xff0…

SpringBoot系列教程之定义接口返回类型的几种方式

本文节选自 《Spring WEB专栏》 WEB系列】 定义接口返回类型的几种方式 实现一个 web 接口返回 json 数据&#xff0c;基本上是每一个 javaer 非常熟悉的事情了&#xff1b;那么问题来了&#xff0c;如果我有一个接口&#xff0c;除了希望返回 json 格式的数据之外&#xff0c…

Linux内核学习笔记——内核页表隔离KPTI机制

接前文。 一步一步理解CPU芯片漏洞&#xff1a;Meltdown与Spectre ARM系列之MMU TLB和ASID基础概念介绍。 一、Meltdown & Spectre 漏洞 Meltdown 和 Spectre 这两个漏洞厉害的地方就在于&#xff0c;利用现代CPU speculative execution (预测执行)的漏洞&#xff0c;在…

退役记——破铜烂铁的一生

写在前面 今天刚刚结束大三上的所有课程。我慢慢鼓起勇气去整理这段零碎的竞赛记忆&#xff0c;或许是最终也没拿到一个满意的奖项&#xff0c;来给我的竞赛生涯画上一个圆满的句号。 我该怎么回忆这破铜烂铁的一生&#xff0c;上万次尝试提交、数以千计的习题、上百次练习赛…

01、Java 数据结构:数据结构和算法的概述

数据结构和算法的概述1 参考教材2 数据结构2.1 数据的逻辑结构2.2 数据的存储结构2.3 数据的运算3 基本的数学概念的复习3.1 函数的定义3.2 极限3.3 对数4 算法4.1 算法的基本特性4.2 算法设计的要求4.3 时间复杂度和空间复杂度是衡量算法优劣的重要指标1 参考教材 主要参考的…

wy的leetcode刷题记录_Day62——二叉树结束

wy的leetcode刷题记录_Day62 声明 本文章的所有题目信息都来源于leetcode 如有侵权请联系我删掉! 时间&#xff1a;2022-12-27 前言 目录wy的leetcode刷题记录_Day62声明前言1750. 删除字符串两端相同字符后的最短长度题目介绍思路代码收获108. 将有序数组转换为二叉搜索树题…

各类遥测终端机RTU/水文遥测终端机简介

平升电子测遥测终端机RTU/水文遥测终端机基于4G、5G、NB-IoT、光纤、北斗三号卫星等通信网络&#xff0c;实现数据采集、存储、处理分析、传输&#xff0c;远程/自动控制现场泵、闸、阀等设备运行。它广泛应用于智慧水利领域的灌区信息化、水库安全监测、山洪灾害预警、水资源税…

泛型学习(java)

1.泛型的理解和好处 1.1看一个需求 1)请编写程序&#xff0c;在ArrayList中&#xff0c;添加3个Dog对象 Dog对象含有name和age,并输出name和age(要求使用getXxx()) 先使用传统的方法来解决->引出泛型 import java.util.ArrayList;public class Generic01 {public stati…

前端性能优化(一):指标和工具

目录 一&#xff1a;性能指标和优化目标 1.1.网络加载性能 1.2.用户交互体验 二&#xff1a;RAIL测量模型 2.1.Response&#xff08;响应&#xff09;: 处理事件应在在50ms内完成 2.2.Animation&#xff08;动画&#xff09;: 每10ms产生一帧 2.3.Idle&#xff08;空闲&…

SpringBoot — 初始创建项目小白教程

这里写目录标题前言SpringBoot简介重要策略Spring Boot 项目约定IntelliJ IDEA 直接创建Maven项目改造创建常见项目结构代码层资源文件结构主要文件说明SpringBootApplication 注解分析总结前言 使用 Servlet/JSP 开发 JavaWeb 时&#xff0c;一个接口对应一个Servlet&#xf…

mysql分区之RANGE类型

目录 首先查看MySQL是否支持分区 在实际操作分区前我们得了解下分区的几点限制&#xff1a; RANGE分区实操 SQL如何查询分区数据 首先查看MySQL是否支持分区 show plugins; 当查询结果显示partition的状态为active则表示当前MySQL版本支持分区。分区方案一般有四种&#…

CVPR 2017|SfMLearner:单目视频中深度和姿态估计的无监督算法

&#x1f3c6;作者提出了一个单目相机的视频序列进行深度估计与运动估计&#xff0c;作者的方法是完全无监督的&#xff0c;端到端的学习&#xff0c;作者使用了单视角深度网络和多姿态网络&#xff0c;提出了一个图像&#xff08;predict&#xff09;与真实的下一帧&#xff0…

09---Vue使用路由

由于之前数据、主页全部放在Home.vue中&#xff0c;不能够实现复用&#xff0c;于是&#xff0c;现在进行拆分&#xff0c;拆分出数据主体&#xff08;user.vue&#xff09;&#xff0c;侧边栏&#xff08;aside&#xff09;&#xff0c;顶部栏&#xff08;Header&#xff09;&…