CyclicBarrier 多线程处理数据

news2024/11/26 20:48:24

文章目录

    • 前言
    • 需求
    • 环境准备
    • 单线程处理
    • 多线程处理
    • 总结

前言

开发中,我们经常会遇到处理批量数据,最后把处理成功和失败的数据结果记录下来。普通方法一个循环就可以搞定这个需求,但是面临大量数据单个线程去处理可能面临很大的瓶颈,同时也无法最大发挥CPU的性能。这时候你可能会说:多线程我也没用过 天天工作CRUD,我只会个 hello world。接下来我们模拟一个需求,看看并发编程中有那些需要注意的点,相信看完这篇文章你一定有所收获👊👊

需求

模拟2001条数据,对每条数据进行处理,并记录最后处理成功和失败的结果

环境准备

@Data
public class Person {
    // id
    private int id;
    // 性别
    private String gender;
    // 名称
    private String name;
}
/**
* 模拟导入数据库
* @param person 处理的对象
* @return 是否成功
*/
private boolean importData(Person person) {
    // 模拟处理耗时,20-29ms随机数
    int expend = (int) (Math.random() * 10) + 20;
    try {
        TimeUnit.MILLISECONDS.sleep(expend);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    // 性别为女的处理成功
    if ("女".equals(person.getGender())) {
        return true;
    }
    return false;
}
/**
* 获取初始化数据
*/
private List<Person> initData() {
    List<Person> list = new ArrayList<>();
    for (int i = 1; i <= 2001; i++) {
        Person obj = new Person();
        obj.setId(i);
        obj.setGender(i % 2 == 0 ? "男" : "女");
        obj.setName("老王-" + i);
        list.add(obj);
    }
    return list;
}

单线程处理

@Test
public void main() {
    List<Person> personList = initData();
    long startTime = System.currentTimeMillis();

    List<Person> successList = new ArrayList<>();
    List<Person> errorList = new ArrayList<>();
    for (Person person : personList) {
        boolean state = importData(person);
        if(state){
            successList.add(person);
            continue;
        }
        errorList.add(person);
    }
    long endTime = System.currentTimeMillis();
    System.out.println("耗时:" + (endTime - startTime));

    System.out.println(successList.size());
    System.out.println(errorList.size());
}

非常的easy,性能也十分堪忧,总耗时大概等于单条数据处理时间×数据量,相信屏幕前的你不至于看不懂吧

在这里插入图片描述

多线程处理

其中代码都加了很多注释,防止自己忘记也方便大家理解查看

/**
 * @description 定义线程池
 */
@Component
public class TaskExecutorConfig {
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        // 设置核心线程数
        executor.setCorePoolSize(50);
        // 设置最大线程数
        executor.setMaxPoolSize(200);
        // 设置队列容量
        executor.setQueueCapacity(200);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(800);
        // 设置默认线程名称
        executor.setThreadNamePrefix("yzs-");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

CyclicBarrier 字面意思是循环栅栏,是一个同步的工具,能够允许一组线程去互相等待直到都到达了屏障,CyclicBarrier对于涉及到固定大小的线程是非常有用的,线程们必须相互等待。该屏障称之为循环屏障,是因为当等待屏障的线程被释放之后,该屏障能循环使用。

这里为什么采用CyclicBarrier来实现,因为多线程中它可以复用

@Resource
private TaskExecutor taskExecutor;
// 批次执行线程大小
int threadNum = 10;

@Test
public void main() throws InterruptedException {
    List<Person> personList = initData();
    if (CollectionUtils.isEmpty(personList)) {
        return;
    }
    long startTime = System.currentTimeMillis();
    AtomicInteger atomicInteger = new AtomicInteger(1);
    // 定义栅栏
    CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> {
        System.out.println("第几次:" + atomicInteger.getAndIncrement());
    });
    // 处理成功的数据
    List<Person> successList = new ArrayList<>();
    // 处理错误的数据
    List<Person> errorList = new ArrayList<>();

    // 循环次数
    int forNumber = personList.size() % threadNum == 0 ? personList.size() / threadNum : (personList.size() / threadNum) + 1;
    // 计数器等于线程数,计时到每个线程都执行完成任务
    CountDownLatch countDownLatch = new CountDownLatch(threadNum);
    // 处理集合的索引,从0开始
    AtomicInteger listIndex = new AtomicInteger();
    for (int i = 0; i < threadNum; i++) {
        taskExecutor.execute(() -> {
            for (int j = 0; j < forNumber; j++) {
                try {
                    if (listIndex.get() < personList.size()) {
                        // 这里不要先 getAndIncrement 再 get,多线程下会导致 get的值可能不是当前线程 ++ 后的,就会导致同一个索引的数据处理了多次
                        Person person = personList.get(listIndex.getAndIncrement());
                        boolean state = importData(person);
                        if (state) {
                            successList.add(person);
                        } else {
                            errorList.add(person);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        // 确保每个线程都在屏障前等待
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            // 每个线程执行完成后,计数器 -1
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    long endTime = System.currentTimeMillis();
    System.out.println("总耗时" + (endTime - startTime));
    System.out.println(successList.size());
    System.out.println(errorList.size());
}

运行结果:
在这里插入图片描述

总体性能是提高了,但是 最后记录的数据量不够2001条呢???
这是因为List是非线程安全的,多个线程操作会产生数据安全问题

修改之后:

@Resource
private TaskExecutor taskExecutor;
// 批次执行线程大小
int threadNum = 10;

@Test
public void main() throws InterruptedException {
    List<Person> personList = initData();
    if (CollectionUtils.isEmpty(personList)) {
        return;
    }
    long startTime = System.currentTimeMillis();
    AtomicInteger atomicInteger = new AtomicInteger(1);
    // 定义栅栏
    CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> {
        System.out.println("第几次:" + atomicInteger.getAndIncrement());
    });
    
    // 处理成功的数据
    Vector<Person> successList = new Vector<>();
    // 处理错误的数据
    Vector<Person> errorList = new Vector<>();

    // 循环次数
    int forNumber = personList.size() % threadNum == 0 ? personList.size() / threadNum : (personList.size() / threadNum) + 1;
    // 计数器等于线程数,计时到每个线程都执行完成任务
    CountDownLatch countDownLatch = new CountDownLatch(threadNum);
    // 处理集合的索引,从0开始
    AtomicInteger listIndex = new AtomicInteger();
    for (int i = 0; i < threadNum; i++) {
        taskExecutor.execute(() -> {
            for (int j = 0; j < forNumber; j++) {
                try {
                    if (listIndex.get() < personList.size()) {
                        // 这里不要先 getAndIncrement 再 get,多线程下会导致 get的值可能不是当前线程 ++ 后的,就会导致同一个索引的数据处理了多次
                        Person person = personList.get(listIndex.getAndIncrement());
                        boolean state = importData(person);
                        if (state) {
                            successList.add(person);
                        } else {
                            errorList.add(person);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        // 确保每个线程都在屏障前等待
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println(Thread.currentThread().getName());
            // 每个线程执行完成后,计数器 -1
            countDownLatch.countDown();
        });
    }
    countDownLatch.await();
    long endTime = System.currentTimeMillis();
    System.out.println("总耗时" + (endTime - startTime));
    System.out.println(successList.size());
    System.out.println(errorList.size());
}

运行结果:

在这里插入图片描述

看看数据是否正确,按照数据处理的规则,id为奇数的性别为女,处理全部成功
在这里插入图片描述

id为偶数的性别为男,处理全部失败
在这里插入图片描述
最后结果也正确。

总结

从单线程处理到多线程处理,把总体耗时从 51s优化到了6s,虽然开了十个线程同时去处理,但最终结果不是 51s除以10,这是因为线程的切换和CPU的调度也需要消耗一定的时间,线程数量不是越多越好。线程数需要根据实际情况、具体服务器CPU的核数 具体分析和反复测试,最终选取一个比较合适的数量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/90124.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怎么进行视频配音?建议收藏这些配音方法

最近我的朋友向我求助&#xff0c;他想要自己制作一个视频&#xff0c;但是视频里面有些片段需要配音&#xff0c;可是他又不想用自己的声音来配音。一方面担心容易NG&#xff0c;需要录制很多遍&#xff0c;会浪费较多的时间&#xff1b;另一方面是&#xff0c;如果视频录制和…

​单张图像三维人脸重建必备入门face3d—3DMM

作者&#xff1a;小灰灰 来源&#xff1a;投稿 编辑&#xff1a;学姐 本次的例子是将pipeline生成的图片作用于3DMM&#xff0c;重新拟合成新的图片。 load model 3DMM的表达式&#xff1a; &#x1d446;̅ ∈ &#x1d445;3&#x1d45b;是平均人脸形状&#xff0c;&#x…

国产网关apisix安装

1、安装docker 参考&#xff1a;centos7安装docker_代码手艺人老羊的博客-CSDN博客 2、下载包&#xff08;从github&#xff09; # Download the Docker image of Apache APISIX git clone https://github.com/apache/apisix-docker.git 3、安装 # Switch the current di…

单点登录设计

01 单系统登录机制 1、http无状态协议 web应用采用browser/server架构&#xff0c;http作为通信协议。http是无状态协议&#xff0c;浏览器的每一次请求&#xff0c;服务器会独立处理&#xff0c;不与之前或之后的请求产生关联&#xff0c;这个过程用下图说明&#xff0c;三…

JavaScript高级 |彻底搞懂原型对象

本文已收录于专栏⭐️ 《JavaScript》⭐️ 学习指南&#xff1a;对象的原型函数的原型new操作符将方法放原型里constructor总结梳理原型对象内存表现完结散花参考文献对象的原型 JavaScript 当中每个对象都有一个特殊的内置属性[[prototype ]] ,这个特殊的对象可以指向另外一个…

科技云报道:畅想无人化运维的AIOps,还有多远的路要走?

科技云报道原创。 在IT行业&#xff0c;运维人常常自我调侃“赚着5k的月薪&#xff0c;操着5千万的心&#xff0c;名下挂着5亿的资产”。 机房的暖通、网络、综合布线&#xff0c;系统的监控告警、故障响应等一大堆繁杂琐碎的工作&#xff0c;充斥着运维人的日常。 与开发和产…

自定义Feign的配置

SpringBoot虽然帮我们实现了自动装配&#xff0c;但是也是支持自定义配置的。 Feign运行自定义配置来覆盖默认配置&#xff0c;可以修改的默认配置如下&#xff1a; 配置Feign日志有两种方式 方式一&#xff1a;配置文件方式 1&#xff09;全局生效 feign:client:config:defa…

【愚公系列】2022年12月 Elasticsearch数据库-ELK添加SQL插件和浏览器插件(二)

文章目录前言一、ELK添加SQL插件和浏览器插件1.配置插件2.浏览器插件3.Elasticsearch术语介绍4.测试SQL插件和浏览器插件前言 下载SQL插件地址&#xff1a;https://github.com/NLPchina/elasticsearch-sql 我们选择7.15.2版本&#xff0c;ES页选择7.15.2版本把最后面的下载链…

车间调度|基于遗传算法的柔性车间调度(Matlab代码实现)

目录 1 概述 2 遗传优化算法 3 车间调度 4 运行结果 5 参考文献 6 Matlab代码实现 1 概述 调度通过合理安排生产资源,以缩短生产时间和提高资源利用率为目的,在生产系统中扮演着重要的角色。作业车间调度问题(Job-shop Schedu-ling Problem&#xff0c;JSP)是一类经典…

1996-2020年全国31省农村电力和农田水利建设相关数据

1996-2020年全国31省农村电力和农田水利建设相关数据 1、1996-2020年 2、范围&#xff1a;31省 3、指标包括&#xff1a; 乡村办水电站、装机容量、发电量、农村用电量、有效灌溉面积、旱涝保收面积、机电排灌面积、实际耕地灌溉面积、新增耕地灌溉面积、节水灌溉面积、新增…

2023年第六届先进控制,自动化与机器人国际会议(ICACAR 2023)

2023年第五届先进控制&#xff0c;自动化与机器人国际会议&#xff08;ICACAR 2023&#xff09; 重要信息 会议网址&#xff1a;www.icacar.org 会议时间&#xff1a;2023年4月14-16日 召开地点&#xff1a;中国北京 截稿时间&#xff1a;2023年2月28日 录用通知&#xf…

排序子序列

1 题目来源&#xff1a; 牛客网&#xff1a;排序子序列 2 题目描述  牛牛定义排序子序列为一个数组中一段连续的子序列,并且这段子序列是非递增或者非递减排序。牛牛有一个长度为n的整数数组A,他现在有一个任务是把数组A分为若干段排序子序列,牛牛想知道他最少可以把这个数组分…

Ashampoo Burning Studio创建可启动磁盘

Ashampoo Burning Studio创建可启动磁盘 Ashampoo的产品通常适合质量&#xff0c;但在其中&#xff0c;它是世界上最好的软件之一&#xff0c;名为Ashampoo Burning Studio。与著名的Nero程序相比&#xff0c;该软件几乎一无是处&#xff0c;所有用于制作、写入和复制光盘的软件…

Python Tutorial——模块

如果你从Python解释器中退出&#xff0c;并且再次进入&#xff0c;你会发现你以前定义的函数和变量都已经丢失了。所以&#xff0c;如果你想写一个在某种程度上更长的程序&#xff0c;使用一个文本编辑器来准备解释器的输入会使情况有所好转&#xff0c;并且使用文件代替输入来…

最简单的方式实现Zotero文件同步+坚果云在多台电脑设备之间

应用场景&#xff1a; 放假回家&#xff0c;只带了笔记本搞科研的好童靴&#xff0c;发现实验室台式机的zotero中的PDF没办法在笔记本上读取。于是探索了一下午如何不重新在网页上保存下载台式机中的PDF&#xff0c;轻松获取异地的文献。 方式一&#xff1a; 氪金付费zotero…

参数估计与假设检验

推断统计&#xff1a;研究如何利用样本数据来推断总体特征 描述统计&#xff1a;描述一组数据的特征 参数估计&#xff1a;利用样本信息估计总体特征 假设检验&#xff1a;利用样本信息判断对总体的假设是否成立 一.参数估计 就是对于总体指标的估计 估计&#xff1a;根据…

免费l2接口有多少种类型?

免费l2接口是一个预先定义的函数&#xff0c;它的目的是让开发人员和开发人员无需访问源代码&#xff0c;也无需访问源代码&#xff0c;也无需理解其内部工作。免费l2接口有多少种类型&#xff1f; 有四种类型的股票l2接口: RPC&#xff1a;通过处理(或任务)共享的数据缓冲区…

SpringBoot整合RabbitMQ实现死信队列

文章目录概念介绍什么是死信死信队列应用工程搭建环境说明搭建步骤实现死信准备Exchange&Queue监听死信队列方式一——消费者拒绝&否认方式二——超过消息TTL方式三——超过队列长度限制代码仓库前面一文通过 Java整合RabbitMQ实现生产消费&#xff08;7种通讯方式&…

Spark的运行模式介绍

Spark的运行模式 本地模式&#xff08;Local&#xff09; 一般用做测试&#xff0c;测试代码的逻辑是否正确 本地模式&#xff0c;只启动一个Driver进程&#xff0c;没有Executor进程的&#xff0c;所有Task都运行在Driver进程中 集群模式 &#xff08;Cluster&#xff09; 一…

医疗挂号网站

开发工具(eclipse/idea/vscode等)&#xff1a; 数据库(sqlite/mysql/sqlserver等)&#xff1a; 功能模块(请用文字描述&#xff0c;至少200字)&#xff1a; 管理员功能&#xff1a; 1、管理挂号须知、帮助信息 2、增删改查资讯类型、健康资讯信息 3、增删改查医生职称信息、医生…