CountDownLatch实战应用——批量数据多线程协调异步处理(子线程执行事务回滚)

news2024/11/26 2:27:15
😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CountDownLatch实战应用——批量数据多线程协调异步处理(子线程执行事务回滚)
⏱️ @ 创作时间: 2023年11月26日

在这里插入图片描述

目录

  • 前言
  • 1、概述
  • 2、实现
  • 3、方法说明:
  • 4、代码实例

前言

通过CountDownLatch开启多个子线程,由子线程完成数据的处理,子线程完成数据处理后进行等待,直到所有的子线程完成数据处理后,再判断是否进行回滚,如果需要回滚则所有线程执行回滚操作

如果需要由子线程处理完数据,但是由主线程进行事务提交或者回滚,参考:https://lhz1219.blog.csdn.net/article/details/134630258

1、概述

CountDownLatch是一个同步器工具类,用来协调多个线程之间的同步,能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行,不可重置使用。

2、实现

使用一个计数器进行实现,计数器初始值为线程的数量,当每一个线程完成自己任务后,计数器的值就会减一,当计数器的值为0时,在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

3、方法说明:

  • public void countDown():递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少.
  • public viod await() /boolean await(long timeout,TimeUnit unit) :使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回true值。当线程调用了CountDownLatch对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:
    • 如果计数到达零,则该方法返回true值。
    • 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
    • 如果超出了指定的等待时间,则返回值为false。如果该时间小于等于零,则该方法根本不会等待。参数:timeout-要等待的最长时间、unit-timeout 参数的时间单位

4、代码实例

有用到hutool的工具包,pom如下:

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.0.7</version>
        </dependency>

Controller:

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Resource
    private CountDownService countDownService;
   

    /**
     * CountDownLatch实现多线程(多个子线程)异步事务处理数据,全部子线程回滚数据
     *
     * @return
     */
    @ApiOperation(value = "测试CountDownLatch", notes = "测试")
    @ApiOperationSupport(order = 5)
    @GetMapping("/countDown/handleDataSonBack")
    public String handleDataSonBack() {
        countDownService.handleDataSonBack();
        return "success";
    }

Sevice:

@Service
@Slf4j
public class CountDownService {
    @Resource
    private TestMapper testMapper;

    @Resource
    private ApplicationContext applicationContext;
    
	/**
     * CountDownLatch实现多线程(多个子线程)异步事务处理数据,全部子线程回滚数据
     *
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public void handleDataSonBack() {
        List<TestEntity> testList = getData();
        AtomicBoolean errorTag = new AtomicBoolean(false);
        long start = System.currentTimeMillis();
        // 使用多线程对list集合进行分批次处理,实际情况可以根据具体耗时来决定
        // 比如:一万条数据,每条单独处理需要50ms,按批次一个线程处理200条数据,分为50个批次,实际情况根据业务来定
        // 需要使用hutool工具类进行分组
        // 由于开启了事务回滚,异步的线程数量要小于,dataSource中配置的maximum-pool-size数量
        List<List<TestEntity>> splitList = CollUtil.split(testList, 200);
        // 设置countDown大小
        CountDownLatch countDownLatch = new CountDownLatch(splitList.size());

        // 再创建一个CountDownLatch,大小固定为一,用于子线程相互等待,最后确定是否回滚
        CountDownLatch errorCountDown = new CountDownLatch(1);

        // 异步调用其他Service,执行业务处理
        CountDownService bean = applicationContext.getBean(CountDownService.class);
        // 简单创建一个线程池,这里的线程池可以自定义,为了方便直接使用
        ExecutorService executorService = Executors.newCachedThreadPool();
        splitList.forEach(list -> {
            // 线程处理
            executorService.execute(() -> {
                bean.handleDataAsyncSonBack(list, countDownLatch, errorCountDown, errorTag);
            });
        });

        executorService.shutdown();
        try {
            // 主线程阻塞
            countDownLatch.await();
            // 可以设置最大阻塞时间,防止线程一直挂起
            /*boolean await = countDownLatch.await(1, TimeUnit.SECONDS);
            if (!await) {
                // 超过时间子线程都还没有结束,直接都回滚
                errorTag.set(true);
            }*/
            log.info("继续执行主线程");

            // 继续执行后续的操作,比如insert、update等
            TestEntity entity = new TestEntity();
            entity.setId(new Random().nextInt(999999999));
            entity.setCount(1);
            entity.setCommodityCode("handleTestMain");
            entity.setMoney(new Random().nextInt(1000000));
            entity.setUserId("user-handleTestMain");
            testMapper.insert(entity);

        } catch (Exception e) {
            log.error("主线程业务执行异常");
            errorTag.set(true);
        } finally {
            // 主线程业务执行完成后,执行errorCountDown计时器减一,使得所有阻塞的子线程,继续执行进入到异常判断中
            errorCountDown.countDown();
        }

        long end = System.currentTimeMillis();
        log.info("数据处理完成,耗时:{}", (end - start) / 1000);
        // 如果出现异常
        if (errorTag.get()) {
            throw new RuntimeException("异步业务执行出现异常");
        }
        log.info("主线程执行完成");
    }

    /**
     * 子线程异步处理,并且实现回滚
     * 由于开启了事务回滚,异步的线程数量要小于,dataSource中配置的maximum-pool-size数量
     */
    @Transactional(rollbackFor = Exception.class)
    public void handleDataAsyncSonBack(List<TestEntity> list, CountDownLatch countDownLatch, CountDownLatch errorCountDown, AtomicBoolean errorTag) {
        try {
            log.info("开始执行子线程");
            for (TestEntity entity : list) {
                if (errorTag.get()) {
                    break;
                }

                // 对实体类的业务处理,此处模拟业务处理,耗时50ms
                ThreadUtil.sleep(50);
                // 数据库查询操作
                testMapper.insert(entity);

                // 模拟数据处理中,出现了异常
                if (entity.getCount().equals(2000)) {
                    throw new RuntimeException("子线程执行异常");
                }
            }
        } catch (Exception e) {
            log.error("子线程异常:{}", e.getMessage(), e);
            errorTag.set(true);
        } finally {
            // 子线程中,业务处理完成后,利用countDown的特性,计数器减一操作
            countDownLatch.countDown();
        }

        log.info("handleDataAsyncSonBack-业务处理完成从,等待其他子线程");
        // 子阻塞,直到其他子线程完成操作
        try {
            errorCountDown.await();
        } catch (Exception e) {
            errorTag.set(true);
        }
        log.info("handleDataAsyncSonBack-子线程执行完成");
        if (errorTag.get()) {
            // 抛出异常,回滚数据
            throw new RuntimeException("handleDataAsyncSonBack-子线程业务执行异常");
        }
    }

    /**
     * 模拟解析的excel等文件的数据
     */
    private List<TestEntity> getData() {
        List<TestEntity> list = new ArrayList<>();
        // 此处模拟一万条数据
        for (int i = 1; i <= 10000; i++) {
            TestEntity entity = new TestEntity();
            entity.setId(new Random().nextInt(999999999));
            entity.setCount(i);
            entity.setCommodityCode("code-" + i);
            entity.setMoney(new Random().nextInt(1000000));
            entity.setUserId("user-" + i);

            list.add(entity);
        }
        return list;
    }
}

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1255273.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库基础教程之数据库的创建(二)

双击打开Navicat,点击:文件-》新建连接-》PostgreSQL 在下图新建连接中输入各参数,然后点击:连接测试,连接成功后再点击确定。 创建数据表   3.1 方法1   3.1.1.双击你的数据库-》双击public-》双击选中表-》右键-》新建表-》常规 3.1.2.设置字段信息   双击选中创建…

【李宏毅-元学习】

一、基本概念 1、元学习&#xff1a;学习如何学习&#xff0c;超参数调整 2、机器学习和元学习 机器学习&#xff1a;定义函数&#xff08;未知参数&#xff09;-定义损失函数-优化&#xff08;最小化损失函数&#xff09; 3、什么是元学习 机器学习通过三个步骤找到了学习算…

建造者模式-C语言实现

UML类图&#xff1a; 代码实现&#xff1a; #include <stdio.h> #include <stdlib.h>// 产品类 typedef struct {char* part1;char* part2;char* part3; } Product;// 抽象建造者类 typedef struct {void (*buildPart1)(void*, const char*);void (*buildPart2)(v…

Linux进程通信——信号(二)

信号处理函数的注册 信号处理函数的注册不只一种方法&#xff0c;分为入门版和高级版 1.入门版: 函数 signal 2.高级版:函数 sigection 信号处理发送函数 信号发送函数也不止一个&#xff0c;同样分为入门版和高级版 1.入门版: 函数 kill 2.高级版: 函数 sigqueue sigactio…

图解系列--Http

1.URI和URL 1.1.URL URL是统一资源定位符。URL正是使用 Web 浏览器等访问 Web 页面时需要输入的网页地址。比如&#xff0c;http://hackr.jp/就是 URL。 1.2.URI 统一资源标识符。 URI 用字符串标识某一互联网资源&#xff0c;而URL表示资源的地点&#xff08;互联网上所处的位…

Unity 自带的一些可以操控时间的属性或方法。

今天来总结下Unity自带的一些可以操控时间的方法。 1、Time.time。比较常用计算运行时间而触发特定事件。 public class Controller : MonoBehaviour {public float eventTime 5f; // 触发事件的时间private float startTime; // 游戏开始的时间private void Start(){startT…

使用凌鲨进行接口联调

接口联调是指在软件开发过程中&#xff0c;不同的团队或模块之间进行接口协作的一种技术手段。它是研发过程中必不可少的一个环节&#xff0c;旨在确保不同模块之间的数据交互和功能调用能够顺畅进行&#xff0c;从而提升整个系统的稳定性和性能。 凌鲨中支持了GRPC&#xff0…

Sentry介绍与使用 - Issues模块

这篇文章是我在公司做 Sentry 相关分享的演讲稿。 大家好&#xff0c;现在由我来讲解 Sentry 的 Issues &#xff08;问题&#xff09;模块。我会分为三个部分来讲&#xff0c;首先我会介绍 Sentry 一些重要的概念&#xff0c;然后讲一下 Issues 的基本使用方式&#xff0c;最后…

【Java】线程池的简单实用

1、什么是线程池 Java当中&#xff0c;为了规避频繁创建调度进程的开销&#xff0c;我们引入了线程。但是如果进一步提高创建销毁频率&#xff0c;线程的开销也不容忽视。 对此我们有两个解决方案 协程&#xff08;轻量级线程&#xff09;&#xff1a;相比线程&#xff0c;把…

Cache学习(4):Cache分配策略Cache更新策略Cache逐出策略

Cache的数据流 常用名词 Allocation 分配Eviction 驱逐分配策略和更新策略分别为当产生Cache miss和Cache hit的时候数据流的具体行为 1 Cache分配策略&#xff08;Cache Allocation Policy&#xff09; Cache的分配策略是指不同情况下为数据分配Cache Line的不同行为。Cac…

2018年8月28日 Go生态洞察:Go 2草案设计初探

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

基于springboot+mysql实现的小区物业管理系统

基于springbootmysql实现的小区物业管理系统,演示地址:登录 演示账号&#xff1a;用户名:744621980qq.com 密码:123456,主要包含房屋管理(楼栋管理&#xff0c;单元管理&#xff0c;房屋管理)&#xff0c;车位管理&#xff0c;缴费管理&#xff0c;社区服务( 公告管理&#xf…

2023年程序设计迎新赛(第二届个人程序设计大赛)

7-1 找规律 请从所给的四个选项中&#xff0c;选择最合适的一个填入问号处&#xff0c;使之呈现一定的规律性。 输入格式: 无 输出格式: 大写字母 输入样例: 输出样例: #include<stdio.h> int main(){printf("D");return 0; }7-2 蜡烛燃烧时间 有粗细不同…

使用HTML+CSS+JS网页设计与制作,酷炫动效科技农业网页

使用HTMLCSSJS网页设计与制作&#xff0c;酷炫动效科技农业网页。 可以用于家乡介绍、科技农业、图片画廊展示等个人网站的设计与制作。农业网站、家乡网站、农产品网站、旅游网站。 网站亮点 1、视觉设计&#xff1a;排版布局极简设计&#xff0c;优质的视觉体验等。 2、动…

java小游戏之【王者荣耀】

首先创建一个新的Java项目命名为“王者荣耀”&#xff0c;并在src下创建两个包分别命名为“com.sxt"、”com.stx.beast",在相应的包中创建所需的类。 代码 package com.sxt;import javax.swing.*; import java.awt.*;public class Background extends GameObject {p…

以非常规思维去做一个嵌入式音视频开发项目!

前言&#xff1a; 大家好&#xff0c;在上周的文章里面&#xff0c;给大家介绍了一个音视频项目&#xff0c;本周继续来分享音视频项目&#xff0c;之前说过&#xff0c;如果你不知道做什么功能开发嘛&#xff0c;因为接触的少&#xff1b;我突然想到&#xff0c;可以去参考市面…

win10下载Remix IDE桌面版以及空白页面的解决

文章目录 Remix IDE 的下载Remix IDE 空白页面的解决 Remix IDE 的下载 到 github 地址 https://github.com/ethereum/remix-desktop/releases 选择exe文件或根据自己电脑版本选择对应的zip文件进行下载&#xff0c;然后正常安装即可。 Remix IDE 空白页面的解决 有时打开Remix…

容器技术——Cgroup

目录 容器技术容器技术概述要区分好共享与隔离的概念容器技术的三大核心容器对比虚拟机 namespaceUnionFs容器操作系统的来源操作系统的来源完整操作系统的镜像docker image是什么&#xff1f;如何构成的 如何为容器安装操作系统UnionFS&#xff08;联合文件系统&#xff09;的…

Echart力引导依赖关系布局图

Echarts ECharts&#xff08;Enterprise Charts&#xff09;Apache ECharts是百度开发的一款开源的 JavaScript 数据可视化库。它提供了丰富的图表和图形&#xff0c;适用于在 Web 应用程序中创建各种交互式和动态的数据可视化图表。ECharts支持各种图表类型&#xff0c;包括折…

【LeetCode】挑战100天 Day17(热题+面试经典150题)

【LeetCode】挑战100天 Day17&#xff08;热题面试经典150题&#xff09; 一、LeetCode介绍二、LeetCode 热题 HOT 100-192.1 题目2.2 题解 三、面试经典 150 题-193.1 题目3.2 题解 一、LeetCode介绍 LeetCode是一个在线编程网站&#xff0c;提供各种算法和数据结构的题目&…