为什么使用Java8中的并行流运算耗时变长了?

news2025/1/16 5:02:03

写在文章开头

近期对迭代的功能进行压测检查,发现某些使用并发技术的线程任务耗时非常漫长,结合监控排查定位到的并行流使用上的不恰当,遂以此文分享一下笔者发现的问题。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

问题复现

需求背景

这里笔者先简单介绍一下当前功能的使用背景,当前功能是一些大数据量的计算密集型任务定时执行,在常规优化效率有限的情况下,考虑到复用性,笔者通过JDK8底层内置的并行流完成这些任务的计算。

对应优化思路如下,可以看到针对每一批数据,笔者都是通过并行流采集出集合并将其写入文档:

在这里插入图片描述

常规串行计算

我们给出第一段代码示例,为了更专注于本文并行流问题的剖析,笔者对于两个并行线程所执行的数据采集和写入文档的操作通过原子类并发计算来模拟:

public static void main(String[] args) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();

        CountDownLatch countDownLatch = new CountDownLatch(2);

        long beginTime = System.currentTimeMillis();

        //模拟采集5000w数据并写入本地文档中
        new Thread(() -> {
            IntStream.range(0, 5000_0000)
                    .forEach(i -> atomicInteger.getAndIncrement());
            countDownLatch.countDown();
        }, "t1").start();

        //模拟采集5000w数据并写入本地文档中
        new Thread(() -> {
            IntStream.range(0, 5000_0000)
                    .forEach(i -> atomicInteger.getAndIncrement());
            countDownLatch.countDown();
        }, "t2").start();

        //等待两个线程结束
        countDownLatch.await();

        //输出耗时
        long endTime = System.currentTimeMillis();
        System.out.println("atomicInteger: " + atomicInteger.get());
        System.out.println("time: " + (endTime - beginTime) + " ms");


    }

输出结果如下,可以看到1e的数据耗时大约需要1.6s

atomicInteger: 100000000
time: 1620 ms

单任务并行流

我们再进行更进一步的优化,将某个线程的任务使用并行流进行原子运算(模拟业务操作):

public static void main(String[] args) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();

        CountDownLatch countDownLatch = new CountDownLatch(2);

        long beginTime = System.currentTimeMillis();

        //模拟并行流采集5000w数据并写入本地文档中
        new Thread(() -> {
            IntStream.range(0, 5000_0000)
                    .parallel()
                    .forEach(i -> atomicInteger.getAndIncrement());
            countDownLatch.countDown();
        }, "t1").start();

        //模拟采集5000w数据并写入本地文档中
        new Thread(() -> {
            IntStream.range(0, 5000_0000)
                    .forEach(i -> atomicInteger.getAndIncrement());
            countDownLatch.countDown();
        }, "t2").start();

        //等待两个线程结束
        countDownLatch.await();

        //输出耗时
        long endTime = System.currentTimeMillis();
        System.out.println("atomicInteger: " + atomicInteger.get());
        System.out.println("time: " + (endTime - beginTime) + " ms");


    }

从输出结果来看,性能表现提升了几毫秒,相对于最后生产上业务的数据量而言,可能会提升更多:

atomicInteger: 100000000
time: 1337 ms

双并行流运算

结合上述结果,我们大胆提出,是否所有任务都通过通过并行流进行运算,程序的执行性能是否会在此提升:

public static void main(String[] args) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();

        CountDownLatch countDownLatch = new CountDownLatch(2);

        long beginTime = System.currentTimeMillis();

        //模拟并行流采集5000w数据并写入本地文档中
        new Thread(() -> {
            IntStream.range(0, 5000_0000)
                    .parallel()
                    .forEach(i -> atomicInteger.getAndIncrement());
            countDownLatch.countDown();
        }, "t1").start();

        //模拟并行流采集5000w数据并写入本地文档中
        new Thread(() -> {
            IntStream.range(0, 5000_0000)
                    .parallel()
                    .forEach(i -> atomicInteger.getAndIncrement());
            countDownLatch.countDown();
        }, "t2").start();

        //等待两个线程结束
        countDownLatch.await();

        //输出耗时
        long endTime = System.currentTimeMillis();
        System.out.println("atomicInteger: " + atomicInteger.get());
        System.out.println("time: " + (endTime - beginTime) + " ms");


    }

很明显,从最终的耗时来看,执行时间不减反增了,这是为什么呢?

atomicInteger: 100000000
time: 1863 ms

详解多任务采用并行流导致执行低效的原因

实际上并行流底层所采用的线程池是一个在程序启动初始化期间就会创建的线程池common,程序初始化时它会检查用户的是否有配置java.util.concurrent.ForkJoinPool.common.parallelism这个参数,如果有则基于这个参数的数值为common创建定量的线程,后续的我们的并行流运算的执行都会提交到该线程池中。

这就意味着我们上述的操作中,所有线程中千万的执行子项都通过同一个线程池进行并行运算,这期间线程池的忙碌程度可想而知,这也就是为什么笔者在进行压测时明明某些数据量不是很大的任务耗时却非常大的本质原因:

在这里插入图片描述

对于该问题,笔者也通过StackOverflow看到并行流设计的思想,设计者认为对于计算密集型任务,默认情况下,它将通过一个初始化一个CPU核心数一致的线程池,让所有并行运算共享一个线程池,进行并行流运算时使用的线程永远在核心数以内,由此也会出现相同的缺点,所有并行运算依赖同一个线程池,可能会导致大量任务大耗时或者大阻塞:

This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get “all the processors” assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)

这一点我们也可以在ForkJoinPool的静态代码块中

static {
        // initialize field offsets for CAS etc
        try {
           
		//......
		//调用makeCommonPool完成线程池创建和初始化
        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
        int par = common.config & SMASK; // report 1 even if threads disabled
        commonParallelism = par > 0 ? par : 1;
    }

对应的我们步入makeCommonPool方法即可看到线程池的创建逻辑,即判断用户是否有通过java.util.concurrent.ForkJoinPool.common.parallelism指定线程数,若没有则按照CPU核心数完成初始化:

private static ForkJoinPool makeCommonPool() {
       	//......
        try {  // ignore exceptions in accessing/parsing properties
        	//获取用户对于common线程池中线程数的配置
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
         //......
        } catch (Exception ignore) {
        }
       	//......
       	//若小于parallelism小于0则说明用户没有指定,则直接按照CPU核心数创建线程池
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
       
      	//基于CPU核心数创建 ForkJoinPool线程池
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

解决方案

很明显,对于该问题就是因为多个并行运算跑到了单个线程池中,我们的解决方式无非是以下几种:

  1. 提升线程池线程数量已处理更多的并发运算。
  2. 业务上避免大量并发运算去竞争common线程池。

结合业务场景,笔者对于并行流的使用更多是计算密集型任务,通过java.util.concurrent.ForkJoinPool.common.parallelism去提升线程数并不会带来提升,所以在笔者结合业务场景通过压测计算出每个定时任务的耗时,大约是5分钟,所以笔者通过调整定时任务的执行间隔由原来的3min改为5min保证任务错峰执行解决该问题:

在这里插入图片描述

小结

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

参考

Custom thread pool in Java 8 parallel stream:https://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream

ForkJoinPool的commonPool相关参数配置
:https://www.jianshu.com/p/1b5f4ea0074a

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1810520.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚假RE:Runtime Error

简单数据处理 筛法求素数 描述 用筛法求2——N之间的素数 输入描述 输入一个整数N&#xff0c;2<N<100000 输出描述 输出[2——N]区间内的素数 用例输入 1 20 用例输出 1 2 3 5 7 11 13 17 19 把判断素数写出一个函数&#xff0c;方便后续直接使用 #include<stdio.…

头部外伤怎么办?别大意,科学处理是关键

头部外伤是一种常见的伤害&#xff0c;它可能由跌倒、撞击或其他事故造成。虽然许多头部外伤看似轻微&#xff0c;但如果不妥善处理&#xff0c;可能会带来严重的后果。因此&#xff0c;了解头部外伤的处理方法至关重要。 一、初步判断伤势 头部外伤后&#xff0c;首先要观察伤…

自动驾驶---Perception之视觉点云雷达点云

1 前言 在自动驾驶领域&#xff0c;点云技术的发展历程可以追溯到自动驾驶技术的早期阶段&#xff0c;特别是在环境感知和地图构建方面。 在自动驾驶技术的早期技术研究中&#xff0c;视觉点云和和雷达点云都有出现。20世纪60年代&#xff0c;美国MIT的Roberts从2D图像中提取3D…

Vuepress 2从0-1保姆级进阶教程——标准化流程(Tailwindcss+autoprefixer+commitizen)

Vuepress 2 专栏目录【已完结】 1. 入门阶段 Vuepress 2从0-1保姆级入门教程——环境配置篇Vuepress 2从0-1保姆级入门教程——安装流程篇Vuepress 2从0-1保姆级入门教程——文档配置篇Vuepress 2从0-1保姆级入门教程——主题与部署 2.进阶阶段 Vuepress 2从0-1保姆级进阶教程—…

【递归+二叉树思想+搜索】 Alice and the Cake题解

Alice and the Cake题解 AC记录&#xff1a;记录-洛谷 题面翻译&#xff08;大概就是题目大意&#xff09; 执行恰好 n − 1 n-1 n−1 次操作&#xff0c;每次操作可以选择当前所有蛋糕中满足其重量 w ⩾ 2 w\geqslant 2 w⩾2 的一块&#xff0c;然后将其分为质量分别为 …

vue + vite 图标导入总结

vue vite 图标导入总结 SVG 的使用 在页面中我们会使用到各种图标&#xff0c;为了保证图标在放大缩小不失真&#xff0c;通常会采用 SVG 来作为图标。 SVG&#xff08;Scalable Vector Graphics&#xff09;是一种基于XML的矢量图像格式&#xff0c;它可以用来创建清晰的、…

计算机组成原理之计算机的性能指标

目录 计算机的性能指标 复习提示 1.计算机的主要性能指标 1.1机器字长 1.1.1与机器字长位数相同的部件 1.2数据通路带宽 1.3主存容量 1.4运算速度 1.4.1提高系统性能的综合措施 1.4.2时钟脉冲信号和时钟周期的相关概念 1.4.3主频和时钟周期的转换计算 1.4.4IPS的相关…

图神经网络实战(13)——经典链接预测算法

图神经网络实战&#xff08;13&#xff09;——经典链接预测算法 0. 前言1. 链接预测2. 启发式技术2.1 局部启发式技术2.2 全局启发式技术 3. 矩阵分解小结系列链接 0. 前言 链接预测 (Link prediction) 可以帮助我们理解和挖掘图中的关系&#xff0c;并在社交网络、推荐系统等…

javaweb请求响应+@ResponseBody注解

这里用了一款接口测试软件postman 简单参数 SpringBoot方式 简单参数:参数名与形参变量名相同&#xff0c;定义形参即可接收参数。 请求参数名与方法形参变量名相同 会自动进行类型转换 package com.example.springbootwebreqresp.Controller;import org.springframework.w…

Go微服务: 理解分布式锁

概述 我们先看一个场景&#xff0c;到了双11&#xff0c;我们的商户又开始卖商品啦但是&#xff0c;我们的库存是有限的&#xff0c;如果超卖了&#xff0c;可能平台就会涉及相关法律责任了所以&#xff0c;我们的库存扣除问题&#xff0c;一定是一个非常经典的问题 先看上图&…

《Vue》系列文章目录

Vue (发音为 /vjuː/&#xff0c;类似 view) 是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML、CSS 和 JavaScript 构建&#xff0c;并提供了一套声明式的、组件化的编程模型&#xff0c;帮助你高效地开发用户界面。无论是简单还是复杂的界面&#xff0c;Vue 都可以…

实现3-8译码器①

描述 下表是74HC138译码器的功能表. E3 E2_n E1_n A2 A1 A0 Y0_n Y1_n Y2_n Y3_n Y4_n Y5_n Y6_n Y7_n x 1 x x x x 1 1 1 1 1 1 1 1 x x 1 x x x 1 1 1 1 1 1 1 1 0 x x x x x 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 …

C语言之argc、argv与main函数的传参

一 &#xff1a;谁给main函数传参 &#xff08;1&#xff09;调用main函数所在的程序的它的父进程给main函数传参&#xff0c;并且接收main函数的返回值 二 &#xff1a;为什么需要给main函数传参 &#xff08;1&#xff09;首先mian函数不传承是可以的&#xff0c;也就是说它的…

01-Linux网络设置

1、查看及测试网络 查看及测试网络配置是管理Linux网络服务的第一步&#xff0c;其中的大多数命令以普通用户的权限就可以完成操作&#xff0c;但普通用户在执行/sbin目录中的命令时需要指定命令文件的决对路径。 1.1 查看网络接口地址 主机的网卡通常称为网络接口&#xff…

轻松驾驭视频节奏:灵活调整视频播放速度,让视频内容更出彩!

视频已经成为了我们生活中不可或缺的一部分。无论是观看电影、电视剧&#xff0c;还是浏览短视频、直播内容&#xff0c;我们都希望能够根据自己的喜好和需求来调整视频的播放速度&#xff0c;甚至精确控制每一秒的播放内容。那么&#xff0c;如何轻松实现这一愿望呢&#xff1…

torchmetrics,一个无敌的 Python 库!

更多Python学习内容&#xff1a;ipengtao.com 大家好&#xff0c;今天为大家分享一个无敌的 Python 库 - torchmetrics。 Github地址&#xff1a;https://github.com/Lightning-AI/torchmetrics 在深度学习和机器学习项目中&#xff0c;模型评估是一个至关重要的环节。为了准确…

Springboot结合redis实现关注推送

关注推送 Feed流的模式 Timeline:不做内容筛选&#xff0c;简单的按照内容发布时间排序。常用于好友与关注。例如朋友圈的时间发布排序。 优点:信息全面&#xff0c;不会有缺失。并且实现也相对简单 缺点:信息噪音较多&#xff0c;用户不一定感兴趣&#xff0c;内容获取效率…

打造精细化运维新玩法(三)

实践SLO&#xff0c;概括下就是在相对标准、统一的框架下指导和推动服务质量的数字化建设&#xff0c;形成对组织有价值的数据资产和流程规范。借用在人工智能和机器学习领域的观点&#xff0c;算法的上限受限于数据质量的好坏&#xff0c;所以从源头上建设高质量的数据非常重要…

【电赛】STM32-PID直流减速电机小车【寻迹+避障+跟随】【更新ing】

一.需求分析 1.主控&#xff1a;STM32C8T6&#xff08;没什么好说的哈哈&#xff09; 2.电机&#xff1a;JAG25-370电机 【问】为什么要用直流减速电机&#xff1f;&#xff1f; PID控制器需要依靠精确的反馈信号来调整其输出&#xff0c;确保电机按照预定的速度和位置运行…

独立游戏之路:Tap篇 -- Unity 集成 TapTap 广告详细步骤

Unity 集成 TapADN 广告详细步骤 前言一、TapTap 广告介绍二、集成 TapTap 广告的步骤2.1 进入广告后台2.2 创建广告计划2.3 选择广告类型三、代码集成3.1 下载SDK3.2 工程配置3.3 源码分享四、常见问题4.1 有展现量没有预估收益 /eCPM 波动大?4.2 新建正式媒体找不到预约游戏…