J.U.C Review - Stream并行计算原理源码分析

news2024/9/22 3:29:16

文章目录

  • Java 8 Stream简介
  • Stream单线程串行计算
  • Stream多线程并行计算
  • 源码分析Stream并行计算原理
  • Stream并行计算的性能提升

在这里插入图片描述

Java 8 Stream简介

自Java 8推出以来,开发者可以使用Stream接口和lambda表达式实现流式计算。这种编程风格不仅简化了对集合操作的代码,还提高了代码的可读性和性能。

Stream接口提供了多种集合操作方法,包括empty(判空)、filter(过滤)、max(求最大值)、findFirstfindAny(查找操作)等,使得对集合的操作更加灵活和直观。


Stream单线程串行计算

在默认情况下,Stream接口是以串行的方式运行的,这意味着所有的操作都在一个线程内执行。我们可以通过以下示例代码展示这一点:

public class StreamDemo {
    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
                .reduce((a, b) -> {
                    System.out.println(String.format("%s: %d + %d = %d",
                            Thread.currentThread().getName(), a, b, a + b));
                    return a + b;
                })
                .ifPresent(System.out::println);
    }
}

在这个例子中,我们通过Stream.of()方法创建了一个包含数字1到9的流。随后,调用reduce方法对这些数字进行累加操作。reduce方法的作用是从前两个元素开始,执行指定操作(在此示例中为加法),然后将结果与下一个元素进行相同的操作,直到处理完所有元素。

程序的输出如下:

main: 1 + 2 = 3  
main: 3 + 3 = 6  
main: 6 + 4 = 10  
main: 10 + 5 = 15  
main: 15 + 6 = 21  
main: 21 + 7 = 28  
main: 28 + 8 = 36  
main: 36 + 9 = 45  
45

从输出可以看出,所有计算均由main线程执行,并且操作是严格按照元素顺序串行完成的。


Stream多线程并行计算

然而,单线程串行执行并不是唯一的选择。在现代多核处理器的时代,我们可以通过并行计算来更高效地利用计算资源。例如,当计算1+2=3的同时,我们可以在另一个线程中计算3+4=7,最后将这些部分结果进行合并。这种思想与Fork/Join框架的设计理念非常类似。

通过以下代码,我们可以让Stream在多线程中并行执行:

public class StreamParallelDemo {
    public static void main(String[] args) {
        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
                .parallel()
                .reduce((a, b) -> {
                    System.out.println(String.format("%s: %d + %d = %d",
                            Thread.currentThread().getName(), a, b, a + b));
                    return a + b;
                })
                .ifPresent(System.out::println);
    }
}

运行这段代码,输出如下:

ForkJoinPool.commonPool-worker-1: 3 + 4 = 7  
ForkJoinPool.commonPool-worker-4: 8 + 9 = 17  
ForkJoinPool.commonPool-worker-2: 5 + 6 = 11  
ForkJoinPool.commonPool-worker-3: 1 + 2 = 3  
ForkJoinPool.commonPool-worker-4: 7 + 17 = 24  
ForkJoinPool.commonPool-worker-4: 11 + 24 = 35  
ForkJoinPool.commonPool-worker-3: 3 + 7 = 10  
ForkJoinPool.commonPool-worker-3: 10 + 35 = 45  
45

从输出结果可以看出,这些计算是并行完成的,使用了ForkJoinPool中的commonPool线程池。尽管各个部分的计算是并行执行的,最终的结果仍然是正确的,因为Fork/Join框架负责协调这些并行任务。


源码分析Stream并行计算原理

通过以上的实践,我们知道Stream的并行计算底层是基于Fork/Join框架的。但具体是如何实现的?我们可以通过源码分析来探究。

首先,Stream.of()方法只是生成一个简单的流。接下来,我们查看parallel()方法的实现。由于这里的数据类型是int,因此调用的是BaseStream接口的parallel()方法。BaseStream接口的唯一实现类是AbstractPipeline类。以下是AbstractPipeline类的parallel()方法:

public final S parallel() {
    sourceStage.parallel = true;
    return (S) this;
}

这个方法的作用非常简单,仅仅是将sourceStage.parallel标志位设置为true,表示该流将以并行方式执行。

接下来,查看reduce方法的实现。Stream.reduce()方法的具体实现是通过ReferencePipeline这个抽象类,该类继承了AbstractPipeline类:

@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
    return evaluate(ReduceOps.makeRef(accumulator));
}

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

@Override
public final boolean isParallel() {
    return sourceStage.parallel;
}

从源码可以看出,reduce方法调用了evaluate方法,而evaluate方法根据parallel标志位来决定是并行执行还是串行执行。如果paralleltrue,则调用evaluateParallel方法,否则调用evaluateSequential方法。

我们再来看evaluateParallel方法在ReduceOps.ReduceOp类中的具体实现:

@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                 Spliterator<P_IN> spliterator) {
    return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

evaluateParallel方法创建了一个ReduceTask实例,并调用其invoke()方法来执行计算。ReduceTask类继承自AbstractTaskAbstractTask又继承自CountedCompleter,最终继承自ForkJoinTask。这就解释了为什么Stream的并行计算底层使用了Fork/Join框架。


Stream并行计算的性能提升

最后,我们通过一个简单的性能测试来验证Stream并行计算的优势。下面的代码演示了如何计算一千万个随机数的和,并比较串行计算和并行计算的时间开销:

public class StreamParallelDemo {
    public static void main(String[] args) {
        System.out.println(String.format("本计算机的核数:%d", Runtime.getRuntime().availableProcessors()));

        Random random = new Random();
        List<Integer> list = new ArrayList<>(1000_0000);

        for (int i = 0; i < 1000_0000; i++) {
            list.add(random.nextInt(100));
        }

        long prevTime = getCurrentTime();
        list.stream().reduce((a, b) -> a + b).ifPresent(System.out::println);
        System.out.println(String.format("单线程计算耗时:%d", getCurrentTime() - prevTime));

        prevTime = getCurrentTime();
        list.stream().parallel().reduce((a, b) -> a + b).ifPresent(System.out::println);
        System.out.println(String.format("多线程计算耗时:%d", getCurrentTime() - prevTime));
    }

    private static long getCurrentTime() {
        return System.currentTimeMillis();
    }
}

在一台8核计算机上的输出结果如下:

本计算机的核数:8  
495156156  
单线程计算耗时:223  
495156156  
多线程计算耗时:95  

结果表明,在多核环境下,Stream的并行计算相比串行计算确实能够显著提升性能。然而,性能提升的幅度并非线性增长,因为线程管理和上下文切换本身也会带来一定的开销。如果在单核环境中,串行计算反而可能会比并行计算更快。

总结而言,Java 8的Stream并行计算通过简化代码的方式,利用了底层的多核资源,大幅提升了复杂集合操作的性能。然而在实际应用中,开发者需要根据具体的硬件环境和任务特性来决定是否使用并行计算。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2111418.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

无线信道中ph和ph^2的场景

使用 p h ph ph的情况&#xff1a; Rayleigh 分布的随机变量可以通过两个独立且相同分布的零均值、高斯分布的随机变量表示。设两个高斯随机变量为 X ∼ N ( 0 , σ 2 ) X \sim \mathcal{N}(0, \sigma^2) X∼N(0,σ2)和 Y ∼ N ( 0 , σ 2 ) Y \sim \mathcal{N}(0, \sigma^2)…

枚举: C++和Python实现鸡兔同笼问题

作者制作不易&#xff0c;关注、点赞、收藏一下吧&#xff01; 目录 1.Python实现 2.C实现 1.Python实现 首先&#xff0c;我们需要输入头和脚的数量: head int(input("请输入头的数量: ")) feet int(input("请输入脚的数量: ")) input() 实现输入…

优质的产业园都在怎么做运营?

产业园区作为区域经济发展的重要载体&#xff0c;其运营模式和管理水平直接影响着产业集聚的成效和区域经济的竞争力。在一线城市与新一线城市中&#xff0c;已经涌现出了一批以高效运营、创新服务为特色的优质产业园&#xff0c;今天&#xff0c;我们就城市标杆产业园的案例和…

10款古方突破1800亿元,康缘药业发力,市场迎井喷式增长……

在政策东风与市场需求的双重驱动下&#xff0c;中药创新领域正迎来前所未有的发展机遇。特别是古代经典名方制剂的研发与注册&#xff0c;正以前所未有的速度推进&#xff0c;不仅激发了行业的活力&#xff0c;也为患者带来了更多治疗选择。本文将深入探讨这一领域的最新动态&a…

828华为云征文|Flexus云服务器X实例快速部署在线测评平台,适用各种信息学教学

文章目录 如何选配Flexus云服务器X实例服务器HydroOJHOJ 服务器资源的选取基础配置实例规格镜像、存储、网络弹性公网IP云服务器名称 部署HydroOJ1.设置安全组、开放端口2.部署HydroOJ回到控制中心&#xff0c;远程登录 部署HOJ安装docker# 安装docker-compose部署HOJ 本篇幅为…

Vatee万腾平台:赋能企业,共筑智慧经济新高地

在智慧经济时代的大潮中&#xff0c;企业如何把握机遇&#xff0c;实现转型升级&#xff0c;成为行业内的佼佼者&#xff1f;Vatee万腾平台以其卓越的技术实力、前瞻性的战略眼光和全方位的服务体系&#xff0c;正逐步成为企业数字化转型的坚实后盾&#xff0c;赋能企业&#x…

Day-06-QFileDialog文件操作

一、实现打开文件选择对话框 1、程序演示 2、相关代码 void Widget::on_QFileDialog_clicked() {QString fileName QFileDialog::getOpenFileName(this, tr("Open File"),"D:/QT project/", /*注意修改自己的文件打开地址*/tr("Text (*.txt *.d…

又一个新的开源AI项目!!【送源码】

好家伙&#xff0c;国内大模型发展太猛了&#xff01; 旗舰端侧模型面壁「小刚炮」系列进化为全新 MiniCPM 3.0 基座模型&#xff0c;再次以小博大&#xff0c;以 4B 参数&#xff0c;带来超越 GPT-3.5 的性能&#xff0c;强得不像端侧模型。 并且&#xff0c;量化后仅 2GB 内…

【Hadoop|HDFS篇】HDFS的读写流程

1. HDFS的写流程 1.1 剖析文件的写入 副本存储节点的选择问题&#xff1a; 第一个副本在Client所在的节点上&#xff0c;如果客户端在集群外&#xff0c;随机选一个。第二个副本在另一个机架的随机一个节点上。第三个副本在第二个副本所在的机架的随机节点上。 2. HDFS的写流…

Redis 集群:如何实现数据的高效分片与负载均衡

Redis 集群&#xff1a;如何实现数据的高效分片与负载均衡 一 . 基本概念二 . 数据分片算法2.1 哈希求余算法2.2 一致性哈希算法2.3 哈希槽分区算法核心思路Redis 集群中最多只能有 16384 个分片吗 ?为什么一定要是 16384 个槽位 ? 三 . 基于 docker 进行集群的搭建3.1 创建目…

《MaPLe: Multi-modal Prompt Learning》中文校对版

系列论文研读目录 文章目录 系列论文研读目录题目&#xff1a;《Maple&#xff1a;多模态提示学习》摘要1.简介2.相关工作视觉语言模型&#xff1a;提示学习&#xff1a;视觉语言模型中的提示学习&#xff1a; 3.方法3.1.回看CLIP编码图像&#xff1a;编码文本&#xff1a;Zero…

【vue、UI】使用 Vue2 和 Element UI 封装 CSV 文件上传组件,实现csv回显

文章目录 前言组件功能概述实现效果组件模板结构组件的核心逻辑1.数据属性定义2.方法拆解3.CSV 文件解析方法4. 错误处理方法 组件样式完整组件代码总结待优化的地方 前言 在 Vue2 项目中&#xff0c;我们经常需要封装一些可重用的组件来提升开发效率。本文将介绍如何使用 Vue…

Linux工程管理文件Makefile-入门篇

1.Makefile简介 Makefile是在Linux环境下 C/C 程序开发必须要掌握的一个工程管理文件。当你使用make命令去编译一个工程项目时&#xff0c;make工具会首先到这个项目的根目录下去寻找Makefile文件&#xff0c;然后才能根据这个文件去编译程序。那Makefile在编译过程中到底起了…

T7:咖啡豆识别

T7&#xff1a;咖啡豆识别 **一、前期工作**1.设置GPU,导入库2.导入数据3.查看数据 **二、数据预处理**1.加载数据2.可视化数据3.配置数据集 **三、构建CNN网络模型**1、手动搭建2、直接调用官方模型 **四、编译模型****五、训练模型****六、模型评估****七、预测**八、暂时总结…

Spring-容器:IOC-基于注解管理Bean

目录 一、基于注解管理Bean&#xff08;重点&#xff09;1.1、概述1.2、开启组件扫描1.2.1、指定要排除的组件1.2.2、仅扫描指定组件 1.3、使用注解定义Bean1.4、使用Autowired注入1.4.1、属性注入1.4.2、set注入1.4.3、构造方法注入1.4.4、形参注入1.4.5、无注解注入1.4.6、联…

自幂数判断c++

题目描述 样例输入 3 152 111 153样例输出 F F T 代码如下&#xff1a; #include<bits/stdc.h> using namespace std; long long m,a; int main(){cin>>m;for(int i1;i<m;i){cin>>a;long long ta,n[10],cc0,s0;while(t!0){//求位数与拆位n[cc]t%10;tt/…

报错:Reached the max session limit(DM8 达梦数据库)

报错:Reached the max session limit - - DM8 达梦数据库 1 环境介绍2 数据库启动SYSTEM IS READY后面日志3 数据库刚启动日志4 达梦数据库学习使用列表 1 环境介绍 某项目无法连接数据库,报错:超过最大会话数限制 , 检查 dmdba ulimit -a openfiles 已改检查 dm.ini 其中 MAX…

中间代码例题

答案&#xff1a;D 知识点&#xff1a; 中间代码是一种简单且含义明确的记号系统&#xff0c;可以有若干形式&#xff0c;它们的共同特征是与机器无关。 最常见的中间代码有&#xff1a;后缀式&#xff0c;语法树&#xff0c;三地址码&#xff0c;四元式 这些往往是数据&am…

迪普防火墙接口故障处理

一、防火墙故障初查 一台捷普防火墙&#xff0c;突然间业务不通&#xff0c;接口UP&#xff0c;策略正常&#xff0c;区域正常&#xff0c;互联地址就是不能ping通。 如上&#xff0c;接口状态很正常。 如上&#xff0c;互联设备ping不通。 二、锁定故障问题点 检查arp表&…

卷轴模式系统APP源码之产品分析:探索其设计精髓与市场潜力

在移动互联网的浪潮中&#xff0c;各类创新应用层出不穷&#xff0c;其中&#xff0c;“卷轴模式系统APP”作为一种融合了传统文化元素与现代交互设计的产品&#xff0c;正逐渐引起市场的关注。本文将从产品设计的角度&#xff0c;深入分析卷轴模式系统APP的源码特性、用户体验…