浅谈 parallelStream和Stream 源码及其应用场景

news2024/11/9 6:10:53

上篇讲述了list.forEach()和list.stream().forEach() 异同点
谈到了并行流的概念,本篇则从源码出发,了解一下其原理。

一、流的初始操作流程

jdk8中 将Collection中加入了转换流的概念。

default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

default Stream<E> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }

在这里插入图片描述
目前看到的两者是一个参数的区别。

//boolean parallel 是否为并行流
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
		//用于检查传入的spliterator是否为空
        Objects.requireNonNull(spliterator);
        //ReferencePipeline.Head 表示流的开始,根据spliterator以及parallel创建对应的流操作链
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
//该构造方法用于初始化Head类的实例
Head(Spliterator<?> source,
             int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }
ReferencePipeline(Spliterator<?> source,
                      int sourceFlags, boolean parallel) {
        //super关键字调用父类的构造方法,完成对父类的初始化工作
        super(source, sourceFlags, parallel);
    }
//按照给定的参数初始化AbstractPipeline类的实例
AbstractPipeline(Spliterator<?> source,
                     int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;  
        this.sourceStage = this;  //当前阶段作为源操作
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;  //左移一位取反操作
        this.depth = 0;
        this.parallel = parallel; //当前流水线是否支持并行操作
    }

二、forEach操作

在这里插入图片描述

@Override
        public void forEach(Consumer<? super E_OUT> action) {
            if (!isParallel()) {
            	//串行流执行
                sourceStageSpliterator().forEachRemaining(action);
            }
            else {
            	//并行流执行
                super.forEach(action);
            }
        }

在这里插入图片描述

1)串行流

demo debug add操作,看为何会报错?

public static void main(String[] args) {
        List<String> list= new ArrayList<>();
        list.add("Sunday");
        list.add("Monday");
        list.add("Tuesday");
        list.add("Wednesday");
        list.add("Thursday");
        list.add("Friday");
        list.add("Saturday");
        list.stream().forEach(d->{
            System.out.println("value="+d);
            if (d.equals("Thursday")){
                list.add(d);
            }
        });
    }
//如果此管道截断是源阶段,则获取源阶段拆分器。调用此方法并成功返回后,将消耗管道
final Spliterator<E_OUT> sourceStageSpliterator() {
        if (this != sourceStage)
            throw new IllegalStateException();

        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        if (sourceStage.sourceSpliterator != null) {
            @SuppressWarnings("unchecked")
            Spliterator<E_OUT> s = sourceStage.sourceSpliterator;
            sourceStage.sourceSpliterator = null;
            return s;
        }
        else if (sourceStage.sourceSupplier != null) {
            @SuppressWarnings("unchecked")
            Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
            return s;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }
    }

forEachRemaining实现方法
在这里插入图片描述
在这里插入图片描述
return未走,直接走了异常返回 throw new ConcurrentModificationException();

public void forEachRemaining(Consumer<? super E> action) {
            int i, hi, mc; // hoist accesses and checks from loop
            ArrayList<E> lst; Object[] a;
            if (action == null)
                throw new NullPointerException();
            if ((lst = list) != null && (a = lst.elementData) != null) {
                if ((hi = fence) < 0) {
                    mc = lst.modCount;
                    hi = lst.size;
                }
                else
                    mc = expectedModCount;
                //i表示开始迭代的位置
                //i=index index表示上次迭代的位置,将上次迭代器正在迭代的位置复制给i
                //(i=index)>=0 保证当前迭代的下标大于等于0
                //表示最大迭代到hi,设置最大的hi=a.length
                //(index = hi) <= a.length保证数组不跨界
                if ((i = index) >= 0 && (index = hi) <= a.length) {
                    for (; i < hi; ++i) {
                        @SuppressWarnings("unchecked") E e = (E) a[i];
                        //执行具体的迭代
                        action.accept(e);
                    }
                    if (lst.modCount == mc)
                        return;
                }
            }
            throw new ConcurrentModificationException();
        }

2)并行流

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

@Override
    public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }
//构造一个 {@code TerminalOp},用于对流的每个元素执行操作。
//action,接收流所有元素的 {@code Consumer} 
//ordered,是否请求有序遍历,因为是并行流,所以ordered未false
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                                  boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
    }
//使用终端操作评估管道以产生结果。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

三、核心原理-ForkJoinPool

其核心原理则-ForkJoinPool

1)Diagrams

在这里插入图片描述

2)compute()

ForkJoin进行计算任务时,计算类是要继承ForkJoinTask并且重写compute方法的。
我们看一下ForkJoinTask内部类是如何重写compute()方法的。

//类似于 AbstractTask,但不需要跟踪子任务
public void compute() {
            Spliterator<S> rightSplit = spliterator, leftSplit;
            //先调用当前splititerator 方法的estimateSize 方法,预估这个分片中的数据量
            long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
            if ((sizeThreshold = targetSize) == 0L)
                targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            boolean forkRight = false;
            Sink<S> taskSink = sink;
            ForEachTask<S, T> task = this;
            while (!isShortCircuit || !taskSink.cancellationRequested()) {
                if (sizeEstimate <= sizeThreshold ||
                    (leftSplit = rightSplit.trySplit()) == null) {
                    task.helper.copyInto(taskSink, rightSplit);
                    break;
                }
                ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                task.addToPendingCount(1);
                ForEachTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    taskToFork = task;
                    task = leftTask;
                }
                else {
                    forkRight = true;
                    taskToFork = leftTask;
                }
                //根据预估的数据量获取最小处理单元的大小阈值,即当数据量已经小于这个阈值的时候进行计算,否则进行fork.fork方法中调用了ForkJoinPool线程池并行计算
                taskToFork.fork();
                //将任务划分成更小的数据块,进行求解
                sizeEstimate = rightSplit.estimateSize();
            }
            task.spliterator = null;
            task.propagateCompletion();
        }

重写的compute()方法,当进行fork方法时,实际就是调用了ForkJoinPool线程池进行计算了,那么线程池本身是无顺序的,谁先计算完谁展示。

3)ForkJoinPool核心算法

在这里插入图片描述

“工作窃取”(work-stealing)算法

ForkJoinPool的基本原理是基于“工作窃取”(work-stealing)算法。它维护着一个工作队列(WorkQueue)的数组,每个工作队列对应一个工作线程(WorkerThread)。当一个线程需要执行一个任务时,它会将任务添加到自己的工作队列中。当一个线程的工作队列为空时,它会从其他线程的工作队列中“窃取”一个任务来执行。这个“窃取”操作可以在不同的线程间实现任务的负载均衡。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

“分治法”(Divide-and-Conquer Algorithm)

分治法-典型的应用比如快速排序算法。使用分治法来实现任务的并行执行。分治法是一种将大问题划分成小问题,并通过递归地解决小问题来解决大问题的方法。

四、结果无顺序

在这里插入图片描述

1)若想并行且有顺序,用.forEachOrdered替代

在这里插入图片描述

2).forEachOrdered是如何保证有序的?

private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {
            Spliterator<S> rightSplit = task.spliterator, leftSplit;
            long sizeThreshold = task.targetSize;
            boolean forkRight = false;
            while (rightSplit.estimateSize() > sizeThreshold &&
                   (leftSplit = rightSplit.trySplit()) != null) {
                ForEachOrderedTask<S, T> leftChild =
                    new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
                ForEachOrderedTask<S, T> rightChild =
                    new ForEachOrderedTask<>(task, rightSplit, leftChild);

                // 分叉父任务 完成左右子项 “发生在”父项完成之前
                task.addToPendingCount(1);
                // 完成左边的孩子“发生在”完成右边的孩子之前
                rightChild.addToPendingCount(1);
                task.completionMap.put(leftChild, rightChild);

                // If task is not on the left spine
                if (task.leftPredecessor != null) {
                    /*
                     * Completion of left-predecessor, or left subtree,
                     * "happens-before" completion of left-most leaf node of
                     * right subtree.
                     * The left child's pending count needs to be updated before
                     * it is associated in the completion map, otherwise the
                     * left child can complete prematurely and violate the
                     * "happens-before" constraint.
                     */
                    leftChild.addToPendingCount(1);
                    // Update association of left-predecessor to left-most
                    // leaf node of right subtree
                    if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {
                        // If replaced, adjust the pending count of the parent
                        // to complete when its children complete
                        task.addToPendingCount(-1);
                    } else {
                        // Left-predecessor has already completed, parent's
                        // pending count is adjusted by left-predecessor;
                        // left child is ready to complete
                        leftChild.addToPendingCount(-1);
                    }
                }

                ForEachOrderedTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    task = leftChild;
                    taskToFork = rightChild;
                }
                else {
                    forkRight = true;
                    task = rightChild;
                    taskToFork = leftChild;
                }
                taskToFork.fork();
            }

            /*
             * Task's pending count is either 0 or 1.  If 1 then the completion
             * map will contain a value that is task, and two calls to
             * tryComplete are required for completion, one below and one
             * triggered by the completion of task's left-predecessor in
             * onCompletion.  Therefore there is no data race within the if
             * block.
             */
            if (task.getPendingCount() > 0) {
                // Cannot complete just yet so buffer elements into a Node
                // for use when completion occurs
                @SuppressWarnings("unchecked")
                IntFunction<T[]> generator = size -> (T[]) new Object[size];
                Node.Builder<T> nb = task.helper.makeNodeBuilder(
                        task.helper.exactOutputSizeIfKnown(rightSplit),
                        generator);
                task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
                task.spliterator = null;
            }
            task.tryComplete();
        }

看代码会大概得理解,此采用了“happens-before”原则,左子树需右子树之前完成,通过计数策略保证前后顺序的完成,继而保证了其有序性。最终执行依旧是ForkJoinPool线程池执行。
在这里插入图片描述

五、应用场景

1)并行的前提是需要硬件支持

前提是硬件支持, 如果单核 CPU, 只会存在并发处理, 而不会并行

2)demo 测试性能(本机测试)

测试配置:16 GB 2667 MHz DDR4

@Test
    public void dateTest() {
        System.out.println("数据汇总开始");
        Long startTime = System.currentTimeMillis();
        int count1 = adminTaskReceiveService.receiveCount();
        int count2 = adminTaskReceiveService.inspectCount();
        int count3=adminTaskReceiveService.constructCount();
        int count4=adminTaskReceiveService.appointmentCount();
        TestResult testResult = new TestResult();
        testResult.setReceiveCount(count1);
        testResult.setInspectCount(count2);
        testResult.setConstructCount(count3);
        testResult.setAppointmentCount(count4);

        int count11 = adminTaskReceiveService.receiveCount1();
        int count22 = adminTaskReceiveService.inspectCount1();
        int count33=adminTaskReceiveService.constructCount1();
        int count44=adminTaskReceiveService.appointmentCount1();
        testResult.setReceiveCount1(count11);
        testResult.setInspectCount1(count22);
        testResult.setConstructCount1(count33);
        testResult.setAppointmentCount1(count44);

        System.out.println("数据汇总结束,result=" + testResult);
        Long endTime = System.currentTimeMillis();
        System.out.println("time=" + (endTime - startTime) + "毫秒");
    }

    @Test
    public void dateTest1() {
        System.out.println("数据汇总开始");
        Long startTime = System.currentTimeMillis();
        TestResult testResult = new TestResult();
        List<Runnable> taskList = new ArrayList<Runnable>() {
            {
                add(() -> testResult.setReceiveCount(adminTaskReceiveService.receiveCount()));
                add(() -> testResult.setInspectCount(adminTaskReceiveService.inspectCount()));
                add(() -> testResult.setConstructCount(adminTaskReceiveService.constructCount()));
                add(() -> testResult.setAppointmentCount(adminTaskReceiveService.appointmentCount()));

                add(() -> testResult.setReceiveCount1(adminTaskReceiveService.receiveCount1()));
                add(() -> testResult.setInspectCount1(adminTaskReceiveService.inspectCount1()));
                add(() -> testResult.setConstructCount1(adminTaskReceiveService.constructCount1()));
                add(() -> testResult.setAppointmentCount1(adminTaskReceiveService.appointmentCount1()));
            }
        };
        taskList.parallelStream().forEach(Runnable::run);
        System.out.println("数据汇总结束,result=" + testResult);
        Long endTime = System.currentTimeMillis();
        System.out.println("time=" + (endTime - startTime) + "毫秒");
    }

一个单线程,一个并行,看结果:测试demo我是需要统计8个数量,由结果可见性能并没什么大区别。
由结果可知:并行处理并不总是能提高性能,特别是当任务规模较小或者任务之间依赖性较强时。此外,在使用并行流时,应该避免使用会修改原始集合的操作,因为这些操作可能会导致不可预测的结果。由于’foreach`操作是终端操作,它会阻塞主线程直到所有元素都被处理完毕,因此即使操作是并行的,它们仍然是按照顺序完成的。
在这里插入图片描述
在这里插入图片描述

3)最后总结

在数据量比较大的情况下,CPU负载本身不是很高,不要求顺序执行的时候,可以使用并行流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1714555.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小红书引流获客软件,轻松成为爆款达人

在这个信息爆炸的时代&#xff0c;小红书凭借其独特的内容分享社区模式&#xff0c;迅速成为了品牌和个体创业者不可忽视的营销宝地。作为一个集生活方式分享、购物心得、美妆教程、旅行攻略等于一身的平台&#xff0c;小红书聚集了大量追求品质生活的年轻用户群体。对于想要在…

学业辅导导师:文心一言智能体详细介绍和开发

一、前言 本期题目 开发方向&#xff1a;学习成长类 解读&#xff1a; AI技术在学习成长方向的应用正日益增多&#xff0c;本期赛题需围绕该方向开发智能体包括但不限于:作文辅导助手、个性化学习助手、考试助手、各垂类教育内容专家等 二、我的智能体&#xff1a;学业辅导…

CNAS软件测试公司作用分享,如何获取CNAS软件测试报告?

在软件测试行业&#xff0c;CNAS认可和CNAS软件测试公司是不可忽视的关键词。CNAS认可是指中国合格评定国家认可委员会对特定领域组织、机构或公司的能力和资质进行的认可过程。该认可遵循国际标准及相关法律法规&#xff0c;是评定组织或实验室技术能力和专业水平的权威认可&a…

97.网络游戏逆向分析与漏洞攻防-ui界面的设计-通过逆向分析确认角色信息

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 如果看不懂、不知道现在做的什么&#xff0c;那就跟着做完看效果&#xff0c;代码看不懂是正常的&#xff0c;只要会抄就行&#xff0c;抄着抄着就能懂了 内容…

今日选题.

诱导读者点开文章的9引真经&#xff08;二&#xff09; 标题重要么&#xff1f;新媒体、博客文通常在手机上阅读。首先所有的内容不同于纸媒&#xff0c;手机只展现标题&#xff0c;而内容都是折叠。其次读者能像看内容一样看4、5条或者7、8条标题&#xff08;区别于不同的主流…

基于Paraformer的alpha-token强制对齐

1. 基本原理 CIF 作为Parafoemr的核心模块&#xff0c;用于预测字数和生成声学向量&#xff0c;从而实现了单轮非自回归解码。其中字数的预测主要通过encoder输出系数alpha的累计得分&#xff0c;满足通关阈值β1.0即可产生一个token&#xff0c;其中alpha曲线在一定程度上呈现…

gitlab 创建 ssh 和 token

文章目录 一、创建ssh key二、将密钥内容复制到gitlab三、创建token 一、创建ssh key 打开控制台cmd&#xff0c;执行命令 ssh-keygen -t rsa -C xxxxx xxxxx是你自己的邮箱 C:\Users\xx\.ssh 目录下会创建一个名为id_rsa.pub的文件&#xff0c;用记事本打开&#xff0c;并…

哪些公司防泄密软件最受欢迎?2024年防泄密软件排行榜 |

在数字化时代&#xff0c;数据的安全性和保密性已成为企业运营和发展的关键要素。随着技术的不断进步&#xff0c;防泄密软件逐渐成为了企业保护核心数据和知识产权的重要工具。在2024年&#xff0c;市场上涌现出了众多防泄密软件&#xff0c;它们各具特色&#xff0c;为企业的…

Scikit-Learn随机森林回归

Scikit-Learn随机森林回归 1、随机森林1.1、集成学习1.2、Bagging方法1.3、随机森林算法1.4、随机森林的优缺点2、Scikit-Learn随机森林回归2.1、Scikit-Learn随机森林回归API2.2、随机森林回归实践(加州房价预测)1、随机森林 随机森林是一种由决策树构成的集成算法,它在大多…

JAVA-->方法的使用详解

JAVA–>方法的使用详解 1.方法的概念及使用 1.1 什么是方法 : 方法就是一个代码片段. 类似于 C 语言中的 “函数”。 1.2 方法定义 / 方法定义 修饰符 返回值类型 方法名称([参数类型 形参 ...]){方法体代码;[return 返回值]; }判断是否为闰年 public class Method{ //…

为什么工控现场会用到Profinet转Modbus网关设备

一、背景&#xff1a; 工控现场之所以需要使用Profinet转Modbus网关&#xff0c;是因为工控系统中常常存在不同厂家设备之间通讯协议不一致的问题。而Modbus和Profinet分别代表着两种不同的通信协议&#xff0c;Profinet通常用于较新的设备&#xff0c;而Modbus则是比较老的通…

medsam ,数入xml +img, 根据检测框,原图显示分割效果,加上点的减少处理

1、输入每张图片的多个检测框&#xff0c;得到这张图片的sam 分割结果 import numpy as np import matplotlib.pyplot as plt import osjoin os.path.join import torch from segment_anything import sam_model_registry from skimage import io, transform import torch.nn…

透视AI技术:探索折射技术在去衣应用中的奥秘

引言&#xff1a; 随着人工智能技术的飞速发展&#xff0c;其在图像处理和计算机视觉领域的应用日益广泛。其中&#xff0c;AI去衣技术作为一种颇具争议的应用&#xff0c;引发了广泛的讨论和关注。本文将深入探讨折射技术在AI去衣中的应用及其背后的原理。 一、AI去衣技术简介…

AI Agent智能体概述及原理

AI Agent概述 AI Agent旨在理解、分析和响应人类输入&#xff0c;像人类一样执行任务、做出决策并与环境互动。它们可以是遵循预定义规则的简单系统&#xff0c;也可以是根据经验学习和适应的复杂、自主的实体&#xff1b;可以是基于软件的实体&#xff0c;也可以是物理实体。…

深入理解统计学中的最大值与最小值

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、统计学中的基础概念&#xff1a;最大值与最小值 1. 创建数组与数据导入 2. 求解整体数…

重磅发布,2024精选《制造业商业智能BI最佳实践合集 》

在数字时代&#xff0c;中国制造业正面临着前所未有的深刻变革。 商业环境的复杂性与多变性、全球化竞争的激烈程度、消费需求的快速演变&#xff0c;以及新技术的持续进步等多种因素共同推动着制造企业积极加入数字化转型的潮流。 在这个转型的过程中&#xff0c;转型的速度…

yq—2024/5/29—零钱兑换

代码实现&#xff1a; #define min(a, b) ((a) > (b) ? (b) : (a))int coinChange(int *coins, int coinsSize, int amount) {int dp[amount 1];// 初始化for (int i 0; i < amount 1; i) {dp[i] INT32_MAX;}dp[0] 0;// 01背包 -----先遍历物品&#xff0c;再遍历背…

oracle数据回显时候递归实战

太简单的两篇递归循环 orcale 在项目里递归循环实战 先看资产表T_ATOM_ASSET结构 看业务类别表T_ATOM_BUSI_CATEGORY结构 问题出现 页面显示 实际对应的归属业务分类 涉及到oracle递归实战(这里不会如何直接在atomAsset的seelct里面处理递归回显) 直接在实现层看atomAs…

CTF_RE典例

PZCTF Xor 分组异或 0&#xff0c;1&#xff0c;2&#xff0c;3 不变, 4 , 5 &#xff0c;6&#xff0c;7只异或Str[0], 8,9,10,11要先后异或Str[0],Str[1] s [0x50, 0x5a, 0x43, 0x54, 0x16, 0x2b, 0x11, 0xf, 0x3b, 0x63,0x7e, 0x7e, 0x78, 0x2c, 0x16, 0x3a, 0x71, 0x2e…

The First项目报告:一场由社区驱动的去中心化加密冒险—Turbo

2023年3月14日&#xff0c;由OpenAI公司开发自回归语言模型GPT-4发布上线&#xff0c;一时之间引发AI智能领域的轩然大波&#xff0c;同时受到影响的还有加密行业&#xff0c;一众AI代币纷纷出现大幅度拉升。与此同时&#xff0c;一款名为Turbo的Meme代币出现在市场中&#xff…