CyclicBarrier实战应用——批量数据多线程协调异步处理(主线程执行事务回滚)

news2025/1/22 19:49:17
😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: CCyclicBarrier实战应用——批量数据多线程协调异步处理(主线程执行事务回滚)
⏱️ @ 创作时间: 2023年12月03日

在这里插入图片描述

目录

  • 前言
  • 1、概述
  • 2、方法说明:
  • 3、代码实例

前言

通过CyclicBarrierCountDownLatch配合开启多个子线程,由子线程完成数据的处理,最后由主线程进行数据库操作,由主线程进行事务的提交或者回滚;
如果需要由子线程处理完数据,并且由子线程进行事务提交或者回滚,参考:https://lhz1219.blog.csdn.net/article/details/134630794

1、概述

CyclicBarrier是一个同步器工具类,用来协调多个线程之间的同步,通过await()进行阻塞,直到所有的线程都执行await()后,所有的线程再继续执行。

2、方法说明:

  • public viod await() /int await(long timeout,TimeUnit unit) :使当前线程一直等待,除非线程被中断或超出了指定的等待时间。
    当线程会被阻塞,直到下面的情况之一发生才会返回:
    • 如果每执行一次await() 计数加一,直到达到初始值。
    • 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态。
    • 如果超出了指定的等待时间,则该方法根本不会再进行阻塞。

3、代码实例

有用到hutool的工具包,pom如下:

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.0.7</version>
        </dependency>

Controller:

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {

    @Resource
    private CyclicService cyclicService;
    
    /**
     * CyclicBarrier实现多线程(多个子线程)异步处理数据,再主线程回归处理
     *
     * @return
     */
    @GetMapping("/cyclic/handleData")
    public String countDownHandleData() {
        cyclicService.handleData();
        return "success";
    }

Sevice:

@Service
@Slf4j
public class CountDownService {
    @Resource
    private TestMapper testMapper;

    @Resource
    private ApplicationContext applicationContext;
    
   /**
     * 实现多线程(多个子线程)异步处理数据,再主线程回归处理
     */
    @Transactional(rollbackFor = Exception.class)
    public void handleData() {
        List<TestEntity> testList = getData();
        AtomicBoolean errorTag = new AtomicBoolean(false);
        long start = System.currentTimeMillis();
        // 使用多线程对list集合进行分批次处理,实际情况可以根据具体耗时来决定
        // 比如:一万条数据,每条单独处理需要200ms,按批次一个线程处理200条数据,分为50个批次,实际情况根据业务来定
        // 需要使用hutool工具类
        List<List<TestEntity>> splitList = CollUtil.split(testList, 200);
        // 设置CyclicBarrier大小,需要比实际子线程+1,业务主线程需要进行阻塞
        CyclicBarrier cyclicBarrier = new CyclicBarrier(splitList.size() + 1);
        // 简单创建一个线程池,这里的线程池可以自定义,为了方便直接使用
        ExecutorService executorService = Executors.newCachedThreadPool();
        splitList.forEach(list -> {
            // 线程处理
            executorService.execute(() -> {
                try {
                    for (TestEntity entity : list) {
                        if (errorTag.get()) {
                            break;
                        }

                        // 对实体类的业务处理,此处模拟业务处理,耗时50ms
                        ThreadUtil.sleep(50);

                        // 模拟数据处理中,出现了异常
                        if (entity.getCount().equals(2000)) {
                            throw new RuntimeException("子线程执行异常");
                        }
                    }
                } catch (Exception e) {
                    log.error("子线程异常:{}", e.getMessage(), e);
                    errorTag.set(true);
                } finally {
                    // 子线程中,业务处理完成后,利用cyclicBarrier的特性,计数器加一操作
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        errorTag.set(true);
                    }
                }
                log.info("子线程执行完成");
            });
        });
        executorService.shutdown();
        try {
            // 主线程阻塞,直到子线程执行完成
            cyclicBarrier.await();
            // 可以设置最大阻塞时间,防止线程一直挂起,当子线程时间大于当前时间后会抛出TimeOut异常
            // cyclicBarrier.await(5, TimeUnit.SECONDS);

            // 模拟执行主线程业务逻辑耗时,比如insert、update等
            ThreadUtil.sleep(20);
        } catch (Exception e) {
            errorTag.set(true);
        }
        long end = System.currentTimeMillis();
        log.info("数据处理完成,耗时:{}", (end - start) / 1000);
        // 如果出现异常
        if (errorTag.get()) {
            throw new RuntimeException("异步业务执行出现异常");
        }
        log.info("主线程执行完成");
    }


    /**
     * 模拟解析的excel等文件的数据
     */
    private List<TestEntity> getData() {
        List<TestEntity> list = new ArrayList<>();
        // 此处模拟一万条数据
        for (int i = 1; i <= 10000; i++) {
            TestEntity entity = new TestEntity();
            entity.setId(new Random().nextInt(999999999));
            entity.setCount(i);
            entity.setCommodityCode("code-" + i);
            entity.setMoney(new Random().nextInt(1000000));
            entity.setUserId("user-" + i);

            list.add(entity);
        }
        return list;
    }
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1279731.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于抓取明文密码的探究

基础知识 SSP&#xff08;Security Support Provider&#xff09;是windows操作系统安全机制的提供者。简单的说&#xff0c;SSP就是DLL文件&#xff0c;主要用于windows操作系统的身份认证功能&#xff0c;例如NTLM、Kerberos、Negotiate、Secure Channel&#xff08;Schanne…

倒计时 1 天,2023 IoTDB 用户大会期待与您相见!

终于&#xff01;就在明天&#xff0c;2023 IoTDB 用户大会即将在北京与大家见面&#xff01; 这场筹备已久的盛会&#xff0c;汇集了超 20 位大咖嘉宾带来的精彩议题&#xff0c;届时来自美国国家工程院、清华大学软件学院的产业大拿&#xff0c;与能源电力、钢铁冶炼、城轨运…

数据结构:堆的实现思路

我们之前写过堆的实现代码&#xff1a;数据结构&#xff1a;堆的实现-CSDN博客 这篇文章我们了解一下堆到底是如何实现的 1.堆向下调整算法 现在我们给出一个数组&#xff0c;逻辑上看做一颗完全二叉树。我们通过从根节点开始的向下调整算法可以把它调整成一个小堆 向下调…

CyclicBarrier实战应用——批量数据多线程协调异步处理(子线程执行事务回滚)

&#x1f60a; 作者&#xff1a; 一恍过去 &#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390 &#x1f38a; 社区&#xff1a; Java技术栈交流 &#x1f389; 主题&#xff1a; CountDownLatch实战应用——批量数据多线程协调异步处理(子线程执行事务…

时序预测 | Python实现LSTM长短期记忆神经网络时间序列预测(多图,多指标)

时序预测 | Python实现LSTM长短期记忆神经网络时间序列预测(多图,多指标) 目录 时序预测 | Python实现LSTM长短期记忆神经网络时间序列预测(多图,多指标)预测效果基本介绍环境准备程序设计参考资料预测效果 基本介绍 LSTM是一种递归神经网络(RNN)的变体

JVM之基本概念(一)

(1) 基本概念&#xff1a; JVM 是可运行 Java 代码的假想计算机 &#xff0c;包括一套字节码指令集、一组寄存器、一个栈、一个垃圾回收&#xff0c;堆 和 一个存储方法域。JVM 是运行在操作系统之上的&#xff0c;它与硬件没有直接的交互。 (2) 运行过程&#xff1a; 我们都…

2 文本分类入门:TextCNN

论文链接&#xff1a;https://arxiv.org/pdf/1408.5882.pdf TextCNN 是一种用于文本分类的卷积神经网络模型。它在卷积神经网络的基础上进行了一些修改&#xff0c;以适应文本数据的特点。 TextCNN 的主要思想是使用一维卷积层来提取文本中的局部特征&#xff0c;并通过池化操…

使用Python免费调用通义千问大模型

Qwen-72b开源模型 模型的主要用途是预测或描述一个系统或现象的行为模式。它可以帮助人们更好地理解这个系统或现象&#xff0c;例如预测股市变化、天气预报、地震预警、交通流量等。模型也常用于设计和优化产品和工艺。在科学研究中&#xff0c;模型也是一种方法&#xff0c;用…

Stm32F401RCT6内部FLASH数据擦除读写方法

Stm32F401RCT6内部FLASH数据的分区和F103的已经不一样了&#xff0c;读写格式化的方法网上内容不多&#xff0c;自己摸索了一下&#xff0c;基本可以&#xff0c;还存在一个问题 读取&#xff1a; uint16_t f[5];uint8_t tx[10];f[0] *(volatile uint16_t*)0x08020000; //ST…

同旺科技 USB TO SPI / I2C --- 调试W5500_读写网关地址

所需设备&#xff1a; 内附链接 1、USB转SPI_I2C适配器(专业版); 首先&#xff0c;连接W5500模块与同旺科技USB TO SPI / I2C适配器&#xff0c;如下图&#xff1a; 这里的网关地址设置为192.168.1.1 先将网关地址写入寄存器&#xff0c;然后再读取出来&#xff1a;

【SpringBoot】讲清楚日志文件lombok

文章目录 前言一、日志是什么&#xff1f;二、⽇志怎么⽤&#xff1f;三.自定义打印日志3.1在程序中得到日志对象3.2使用日志打印对象 四.⽇志级别4.1日志级别有什么用4.2 ⽇志级别的分类与使⽤ 五.日志持久化六.lombok6.1添加lobok依赖注意&#xff1a;使⽤ Slf4j 注解&#x…

Linux命令与shell脚本编程大全【读书笔记 + 思考总结】

Linux命令与shell脚本编程大全 第 1 章 初识Linux shellLinux的组成及关系结构图是什么&#xff1f;Linux系统内核的作用是什么&#xff1f;内核的主要功能是什么&#xff1f;&#xff08;4点&#xff09;物理内存和虚拟内存是什么关系&#xff1f;内核如何实现虚拟内存&#x…

idea不需安装插件,自动生成mybatis-plus对应的实体类entity,带注解@TableName、@TableId、@TableField

目录 1、修改Generate poJOs.groovy文件 2、idea中连接数据库 3、生成entity代码 4、查看生成的实体类 1、修改Generate poJOs.groovy文件 在项目下方点击Scratches and Consoles→ Extensions→ Database Tools and SQL箭头→schema→ Generate POJOs.groovy 替换为以下文…

ssl下载根证书和中间证书

为了保证客户端和服务端通过HTTPS成功通信&#xff0c;您在安装SSL证书时&#xff0c;也需要安装根证书和中间证书。本文介绍如何获取根证书和中间证书。 使用说明 如果您的业务用户通过浏览器访问您的Web业务&#xff0c;则您无需关注根证书和中间证书&#xff0c;因为根证书…

如何学习 Spring ?学习 Spring 前要学习什么?

整理了一下Spring的核心概念BeanDefinitionBeanDefinition表示Bean定义&#xff0c;BeanDefinition中存在很多属性用来描述一个Bean的特点。比如&#xff1a;class&#xff0c;表示Bean类型scope&#xff0c;表示Bean作用域&#xff0c;单例或原型等lazyInit&#xff1a;表示Be…

PyQt6 QDialogButtonBox组合按钮控件

锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计34条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话版…

【开源威胁情报挖掘3】开源威胁情报融合评价

基于开源信息平台的威胁情报挖掘综述 写在最前面5. 开源威胁情报关联分析5.1 开源威胁情报网络狩猎&#xff1a;技术、方法和最新研究应用实例和未来方向 5.2 开源威胁情报态势感知关键技术和方法应用实例和未来方向 5.3 开源威胁情报恶意检测关键技术和方法应用实例和未来方向…

【PTA题目】7-18 6翻了 分数 15

7-18 6翻了 分数 15 全屏浏览题目 切换布局 作者 陈越 单位 浙江大学 “666”是一种网络用语&#xff0c;大概是表示某人很厉害、我们很佩服的意思。最近又衍生出另一个数字“9”&#xff0c;意思是“6翻了”&#xff0c;实在太厉害的意思。如果你以为这就是厉害的最高境界&…

Mindspore实现手写数字识别

废话不多说&#xff0c;首先说一下我使用的环境&#xff1a; python3.9 mindspore 2.1 使用jupyter notebook Step1&#xff1a;导入相关依赖的包 import os from matplotlib import pyplot as plt import numpy as np import mindspore as ms import mindspore.context a…

【SpringMVC】Spring Web MVC入门(一)

文章目录 前言什么是Spring Web MVC&#xff1f;什么是MVC什么是Spring MVC&#xff1f; Spring Boot 和 Spring MVC 的区别什么是Spring Boot&#xff1f;关系和区别 Spring MVC 学习注解介绍1. SpringBootApplication2. RestController3. RequestMapping3.1 RequestMapping 使…