CountDownLatch实战应用——批量数据多线程协调异步处理(主线程执行事务回滚)

news2024/11/27 16:32:40
😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CountDownLatch实战应用——批量数据多线程协调异步处理(主线程执行事务回滚)
⏱️ @ 创作时间: 2023年11月26日

在这里插入图片描述

目录

  • 前言
  • 1、概述
  • 2、实现
  • 3、方法说明:
  • 4、代码实例

前言

通过CountDownLatch开启多个子线程,由子线程完成数据的处理,最后由主线程进行数据库操作,由主线程进行事务的提交或者回滚;
如果需要由子线程处理完数据,并且由子线程进行事务提交或者回滚,参考:https://lhz1219.blog.csdn.net/article/details/134630509

1、概述

CountDownLatch是一个同步器工具类,用来协调多个线程之间的同步,能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行,不可重置使用。

2、实现

使用一个计数器进行实现,计数器初始值为线程的数量,当每一个线程完成自己任务后,计数器的值就会减一,当计数器的值为0时,在CountDownLatch上等待的线程就可以恢复执行接下来的任务。

3、方法说明:

  • public void countDown():递减锁存器的计数,如果计数到达零,则释放所有等待的线程。如果当前计数大于零,则将计数减少.
  • public viod await() /boolean await(long timeout,TimeUnit unit) :使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。如果当前计数为零,则此方法立刻返回true值。当线程调用了CountDownLatch对象的该方法后,当前线程会被阻塞,直到下面的情况之一发生才会返回:
    • 如果计数到达零,则该方法返回true值。
    • 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
    • 如果超出了指定的等待时间,则返回值为false。如果该时间小于等于零,则该方法根本不会等待。参数:timeout-要等待的最长时间、unit-timeout 参数的时间单位

4、代码实例

有用到hutool的工具包,pom如下:

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.0.7</version>
        </dependency>

Controller:

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Resource
    private CountDownService countDownService;
    
    /**
     * CountDownLatch实现多线程(多个子线程)异步处理数据,再主线程回归处理
     *
     * @return
     */
    @ApiOperation(value = "测试CountDownLatch", notes = "测试")
    @GetMapping("/countDown/handleData")
    public String countDownHandleData() {
        countDownService.handleData();
        return "success";
    }

Sevice:

@Service
@Slf4j
public class CountDownService {
    @Resource
    private TestMapper testMapper;

    @Resource
    private ApplicationContext applicationContext;

    /**
     * 实现多线程(多个子线程)异步处理数据,再主线程回归处理
     */
    @Transactional(rollbackFor = Exception.class)
    public void handleData() {
        List<TestEntity> testList = getData();
        AtomicBoolean errorTag = new AtomicBoolean(false);
        long start = System.currentTimeMillis();
        // 使用多线程对list集合进行分批次处理,实际情况可以根据具体耗时来决定
        // 比如:一万条数据,每条单独处理需要200ms,按批次一个线程处理200条数据,分为50个批次,实际情况根据业务来定
        // 需要使用hutool工具类
        List<List<TestEntity>> splitList = CollUtil.split(testList, 200);
        // 设置countDown大小
        CountDownLatch countDownLatch = new CountDownLatch(splitList.size());
        // 简单创建一个线程池,这里的线程池可以自定义,为了方便直接使用
        ExecutorService executorService = Executors.newCachedThreadPool();
        splitList.forEach(list -> {
            // 线程处理
            executorService.execute(() -> {
                try {
                    for (TestEntity entity : list) {
                        if (errorTag.get()) {
                            break;
                        }

                        // 对实体类的业务处理,此处模拟业务处理,耗时50ms
                        ThreadUtil.sleep(50);

                        // 模拟数据处理中,出现了异常
                        if (entity.getCount().equals(2000)) {
                            throw new RuntimeException("子线程执行异常");
                        }
                    }
                } catch (Exception e) {
                    log.error("子线程异常:{}", e.getMessage(), e);
                    errorTag.set(true);
                } finally {
                    // 子线程中,业务处理完成后,利用countDown的特性,计数器减一操作
                    countDownLatch.countDown();
                }

                log.info("子线程执行完成");
            });
        });
        executorService.shutdown();
        try {
            // 主线程阻塞
            countDownLatch.await();
            // 可以设置最大阻塞时间,防止线程一直挂起
          /*  boolean await = countDownLatch.await(5, TimeUnit.SECONDS);
            if (!await) {
                // 超过时间子线程都还没有结束,直接都回滚
                errorTag.set(true);
            }*/

            // 模拟执行主线程业务逻辑,比如insert、update等
            ThreadUtil.sleep(20);
        } catch (Exception e) {
            errorTag.set(true);
        }
        long end = System.currentTimeMillis();
        log.info("数据处理完成,耗时:{}", (end - start) / 1000);
        // 如果出现异常
        if (errorTag.get()) {
            throw new RuntimeException("异步业务执行出现异常");
        }
        log.info("主线程执行完成");
    }


    /**
     * 模拟解析的excel等文件的数据
     */
    private List<TestEntity> getData() {
        List<TestEntity> list = new ArrayList<>();
        // 此处模拟一万条数据
        for (int i = 1; i <= 10000; i++) {
            TestEntity entity = new TestEntity();
            entity.setId(new Random().nextInt(999999999));
            entity.setCount(i);
            entity.setCommodityCode("code-" + i);
            entity.setMoney(new Random().nextInt(1000000));
            entity.setUserId("user-" + i);

            list.add(entity);
        }
        return list;
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1257196.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MIT 6.824 -- MapReduce Lab

MIT 6.824 -- MapReduce Lab 环境准备实验背景实验要求测试说明流程说明 实验实现GoLand 配置代码实现对象介绍协调器启动工作线程启动Map阶段分配任务执行任务 Reduce 阶段分配任务执行任务 终止阶段 崩溃恢复 注意事项并发安全文件转换golang 知识点 测试 环境准备 从官方gi…

nginx配置文件的简单结构

nginx的配置文件&#xff08;nginx.conf&#xff09;整体上可分为三个部分&#xff1a;全局块、events块、http块 区域职责全局块配置和nginx运行相关的全局配置events块配置和网络连接相关的配置http块配置代理、缓存、日志记录、虚拟主机等配置在http块中&#xff0c;可以包含…

Linux的基本指令(四)

目录 前言 时间相关的指令 date指令 时间戳 日志 时间戳转化为具体的时间 cal指令 find指令&#xff08;十分重要&#xff09; grep指令&#xff08;行文本过滤工具&#xff09; 学前补充 什么是打包和压缩&#xff1f; 为什么要打包和压缩&#xff1f; 怎么打包和…

【洛谷算法题】P5715-三位数排序【入门2分支结构】

&#x1f468;‍&#x1f4bb;博客主页&#xff1a;花无缺 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 花无缺 原创 收录于专栏 【洛谷算法题】 文章目录 【洛谷算法题】P5715-三位数排序【入门2分支结构】&#x1f30f;题目描述&#x1f30f;输入格式…

基于OGG实现MySQL实时同步

&#x1f4e2;&#x1f4e2;&#x1f4e2;&#x1f4e3;&#x1f4e3;&#x1f4e3; 哈喽&#xff01;大家好&#xff0c;我是【IT邦德】&#xff0c;江湖人称jeames007&#xff0c;10余年DBA及大数据工作经验 一位上进心十足的【大数据领域博主】&#xff01;&#x1f61c;&am…

常见树种(贵州省):020女贞、异叶梁王茶、掌叶梁王茶、鹅掌柴、楤木、柞木、华重阳木、马蹄荷、山桐子、刺楸

摘要&#xff1a;本专栏树种介绍图片来源于PPBC中国植物图像库&#xff08;下附网址&#xff09;&#xff0c;本文整理仅做交流学习使用&#xff0c;同时便于查找&#xff0c;如有侵权请联系删除。 图片网址&#xff1a;PPBC中国植物图像库——最大的植物分类图片库 一、女贞 …

我的创作纪念日-五周年

机缘 5年前&#xff0c;作为一名技术人员&#xff0c;平时利用CSDN作为学习平台工具&#xff0c;帮助解决工作中遇到的问题。随着30、35中年危机渐行渐近&#xff0c;回过头来发现平时虽然也有记录整理学习笔记的习惯&#xff0c;但还没有一个可以持续鞭笞自己和记录自己学习的…

C#,数值计算——插值和外推,RBF_fn 与 RBF_gauss 的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { public interface RBF_fn { double rbf(double r); } } ---------------------------------------------- using System; namespace Legalsoft.Truffer { public class RBF_gauss : RBF…

transformers pipeline出现ConnectionResetError的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

上海亚商投顾:北证50指数大涨 逾百只北交所个股涨超10%

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 沪指11月24日震荡调整&#xff0c;深成指、创业板指盘中跌超1%。北证50指数大涨超6%&#xff0c;北交所个股持…

[架构之路-253]:目标系统 - 设计方法 - 软件工程 - 软件设计 - 结构化设计的主要评估指标:高内聚(模块内部)、低耦合(模块之间)的含义

目录 前言&#xff1a; 一、软件工程中的软件设计种类&#xff1a;根据宏观到微观分 &#xff08;1&#xff09;软件架构设计&#xff08;层次划分、模块划分、职责分工&#xff09;&#xff1a; &#xff08;2&#xff09;软件高层设计、概要设计&#xff08;功能模块的接…

[设计模式] 常见的设计模式

文章目录 设计模式的 6 大设计原则设计模式的三大分类常见的设计模式有哪几种1. 单例模式&#xff1a;保证一个类仅有一个实例&#xff0c;并提供一个访问它的全局访问点。&#xff08;连接池&#xff09;1. 饿汉式2. 懒汉式3. 双重检测 2. 工厂模式3. 观察者模式● 推模型● 拉…

Windows上常用的dos命令

cd更改目录 cd c:\users从其他目录改成c:\users。 type显示文件内容 type good.txt显示文件good.txt里边的内容。 del删除文件 del good.txt删除文件。

(C++)string类的模拟实现

愿所有美好如期而遇 前言 我们模拟实现string类不是为了去实现他&#xff0c;而是为了了解他内部成员函数的一些运行原理和时间复杂度&#xff0c;在将来我们使用时能够合理地去使用他们。 为了避免我们模拟实现的string类与全局上的string类冲突(string类也在std命名空间中)&…

Redis序列化操作

目录 1.protostuff 的 Maven 依赖 2.定义实体类 3.序列化工具类 ProtostuffSerializer 提供了序列化和反序列化方法 4.测试 利用 Jedis 提供的字节数组参数方法&#xff0c;如&#xff1a; public String set(String key, String value) public String set(byte[] key…

【Springboot系列】SpringBoot整合WebSocket,既然如此简单(含源码)

文章目录 前言&#xff1a;什么是WebSocket&#xff1f;Spring Boot中的WebSocket支持WebSocket和HTTP优劣势WebSocket的优势&#xff1a;1.实时性&#xff1a;2.较低的延迟&#xff1a;3.较小的数据传输量&#xff1a;4.更好的兼容性&#xff1a; HTTP的优势&#xff1a;1.简单…

HDMI接口信号流向及原理图分析

1、HDMI的来源及发展 如今显示器上最常用的接口无非HDMI&#xff08;High Definition Multimedia Interface&#xff09;与DP&#xff08;DisplayPort&#xff09;两种&#xff0c;VGA与DVI已经很少使用&#xff0c;原因在于VGA传输的是模拟信号&#xff0c;在发送端需要将数字…

如果客户端同时有ipv4和ipv6,浏览器是如何选择用哪种ip

在互联网协议&#xff08;IP&#xff09;的发展历程中&#xff0c;IPv4和IPv6是两种主要的版本。对于一个客户端来说&#xff0c;同时拥有IPv4和IPv6的能力是常见的情况。那么&#xff0c;当一个客户端同时具有IPv4和IPv6的能力时&#xff0c;浏览器是如何选择使用哪种IP进行通…

数组栈的实现

1.栈的概念及结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作 进行数据插入和删除操作的一端称为栈顶&#xff0c;另一端称为栈底 栈中的数据元素遵守后进先出LIFO,&#xff08;Last In First Out&#xff09;的原则 压栈&…

Redis-Redis多级缓存架构

多级缓存架构 缓存设计 缓存穿透 缓存穿透是指查询一个根本不存在的数据&#xff0c; 缓存层和存储层都不会命中&#xff0c; 通常出于容错的考虑&#xff0c; 如果从存储层查不到数据则不写入缓存层。 缓存穿透将导致不存在的数据每次请求都要到存储层去查询&#xff0c; 失…