JDK8 新特性之并行的Stream流

news2024/11/27 4:23:41

目录

一:串行的Stream流

二:并行的Stream流

获取并行Stream流的两种方式

小结

三:并行和串行Stream流的效率对比

四:parallelStream线程安全问题

五:parallelStream背后的技术

Fork/Join框架介绍

   Fork/Join原理-分治法

Fork/Join原理-工作窃取算法

Fork/Join案例

小结


一:串行的Stream

   目前我们使用的Stream流是串行的,就是在一个线程上执行。

@Test
public void test0Serial() {
long count = Stream.of(4, 5, 3, 9, 1, 2, 6)
.filter(s -> {
System.out.println(Thread.currentThread() + ", s = " + s);
return true;
})
.count();
System.out.println("count = " + count);
}
效果:
   
Thread[main,5,main], s = 4
Thread[main,5,main], s = 5
Thread[main,5,main], s = 3
Thread[main,5,main], s = 9
Thread[main,5,main], s = 1
Thread[main,5,main], s = 2
Thread[main,5,main], s = 6

二:并行的Stream

           parallelStream 其实就是一个并行执行的流。它通过默认的 ForkJoinPool ,可能提高多线程任务的速度。

获取并行Stream流的两种方式

1. 直接获取并行的流
2. 将串行流转成并行流
@Test
public void testgetParallelStream() {
ArrayList<Integer> list = new ArrayList<>();
// 直接获取并行的流
// Stream<Integer> stream = list.parallelStream();
// 将串行流转成并行流
Stream<Integer> stream = list.stream().parallel();
}
并行操作代码:
@Test
public void test0Parallel() {
long count = Stream.of(4, 5, 3, 9, 1, 2, 6)
.parallel() // 将流转成并发流,Stream处理的时候将才去
.filter(s -> {
System.out.println(Thread.currentThread() + ", s = " + s);
return true;
})
.count();
System.out.println("count = " + count);
}
效果:
@Test
public void test0Parallel() {
long count = Stream.of(4, 5, 3, 9, 1, 2, 6)
.parallel() // 将流转成并发流,Stream处理的时候将才去
.filter(s -> {
System.out.println(Thread.currentThread() + ", s = " + s);
return true;
})
.count();
System.out.println("count = " + count);
}
效果:
Thread[ForkJoinPool.commonPool-worker-13,5,main], s = 3
Thread[ForkJoinPool.commonPool-worker-19,5,main], s = 6
Thread[main,5,main], s = 1
Thread[ForkJoinPool.commonPool-worker-5,5,main], s = 5
Thread[ForkJoinPool.commonPool-worker-23,5,main], s = 4
Thread[ForkJoinPool.commonPool-worker-27,5,main], s = 2
Thread[ForkJoinPool.commonPool-worker-9,5,main], s = 9
count = 7

小结

获取并行流有两种方式:
  •  直接获取并行流: parallelStream()
  •  将串行流转成并行流: parallel()

三:并行和串行Stream流的效率对比

使用 for 循环,串行 Stream 流,并行 Stream 流来对 5 亿个数字求和。看消耗的时间。
public class Demo06 {
private static long times = 50000000000L;
private long start;
@Before
public void init() {
start = System.currentTimeMillis();
}
@After
public void destory() {
long end = System.currentTimeMillis();
System.out.println("消耗时间: " + (end - start));
}
// 测试效率,parallelStream 120
@Test
public void parallelStream() {
System.out.println("serialStream");
LongStream.rangeClosed(0, times)
.parallel()
.reduce(0, Long::sum);
}
// 测试效率,普通Stream 342
@Test
public void serialStream() {
System.out.println("serialStream");
LongStream.rangeClosed(0, times)
.reduce(0, Long::sum);
}
// 测试效率,正常for循环 421
@Test
public void forAdd() {
System.out.println("forAdd");
long result = 0L;
for (long i = 1L; i < times; i++) {
result += i;
}
}
}
            我们可以看到 parallelStream 的效率是最高的。
            Stream 并行处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作。

四:parallelStream线程安全问题

// 并行流注意事项
@Test
public void parallelStreamNotice() {
ArrayList<Integer> list = new ArrayList<Integer>();
for (int i = 0; i < 1000; i++) {
list.add(i);
}
List<Integer> newList = new ArrayList<>();
// 使用并行的流往集合中添加数据
list.parallelStream()
.forEach(s -> {
newList.add(s);
});
System.out.println("newList = " + newList.size());
}

  运行效果:

newList = 903
            我们明明是往集合中添加 1000 个元素,而实际上只有 903 个元素。
            解决方法: 加锁、使用线程安全的集合或者调用 Stream toArray() / collect() 操作就是满足线程安全的了。
  // parallelStream线程安全问题
    @Test
    public void parallelStreamNotice() {
        ArrayList<Integer> list = new ArrayList<>();
        /*IntStream.rangeClosed(1, 1000)
                .parallel()
                .forEach(i -> {
                    list.add(i);
                });
        System.out.println("list = " + list.size());*/

        // 解决parallelStream线程安全问题方案一: 使用同步代码块
        /*Object obj = new Object();
        IntStream.rangeClosed(1, 1000)
                .parallel()
                .forEach(i -> {
                    synchronized (obj) {
                        list.add(i);
                    }
                });*/

        // 解决parallelStream线程安全问题方案二: 使用线程安全的集合
        // Vector<Integer> v = new Vector();
        /*List<Integer> synchronizedList = Collections.synchronizedList(list);
        IntStream.rangeClosed(1, 1000)
                .parallel()
                .forEach(i -> {
                    synchronizedList.add(i);
                });
        System.out.println("list = " + synchronizedList.size());*/

        // 解决parallelStream线程安全问题方案三: 调用Stream流的collect/toArray
        List<Integer> collect = IntStream.rangeClosed(1, 1000)
                .parallel()
                .boxed()
                .collect(Collectors.toList());
        System.out.println("collect.size = " + collect.size());
    }

五:parallelStream背后的技术

Fork/Join框架介绍

             parallelStream 使用的是 Fork/Join 框架。 Fork/Join 框架自 JDK 7 引入。 Fork/Join 框架可以将一个大任务拆分为很多小任务来异步执行。 Fork/Join 框架主要包含三个模块:
1. 线程池: ForkJoinPool
2. 任务对象: ForkJoinTask
3. 执行任务的线程: ForkJoinWorkerThread

   Fork/Join原理-分治法

                ForkJoinPool 主要用来使用分治法 (Divide-and-Conquer Algorithm) 来解决问题。典型的应用比如快速排序算法,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对 1000 万个数据进行排序,那么会将这个任务分割成两个500 万的排序任务和一个针对这两组 500 万数据的合并任务。以此类推,对于 500 万的数据也会做出同样的分割处 理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10 时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+ 个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

 

Fork/Join原理-工作窃取算法

              Fork/Join 最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的 cpu ,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取( work-stealing )算法就是整个 Fork/Join 框架的核心理念Fork/Join工作窃取( work-stealing )算法是指某个线程从其他队列里窃取任务来执行。

 

            那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来 执行队列里的任务,线程和队列一一对应,比如A 线程负责处理 A 队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行

            工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
            上文中已经提到了在 Java 8 引入了自动并行化的概念。它能够让一部分 Java 代码自动地以并行的方式执行,也就是我们使用了ForkJoinPool ParallelStream

                对于 ForkJoinPool 通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。可以通过设置系统属性:java.util.concurrent.ForkJoinPool.common.parallelism=N N 为线程数量),来调整 ForkJoinPool 的线程数量,可以尝试调整成不同的参数来观察每次的输出结果。

Fork/Join案例

                   需求:使用 Fork/Join 计算 1-10000 的和,当一个任务的计算数量大于 3000 时拆分任务,数量小于 3000 时计算。

mport java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Demo08ForkJoin {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        ForkJoinPool pool = new ForkJoinPool();
        SumRecursiveTask task = new SumRecursiveTask(1, 99999999999L);
        Long result = pool.invoke(task);
        System.out.println("result = " + result);
        long end = System.currentTimeMillis();
        System.out.println("消耗时间: " + (end - start));
    }
}

// 1.创建一个求和的任务
// RecursiveTask: 一个任务
class SumRecursiveTask extends RecursiveTask<Long> {
    // 是否要拆分的临界值
    private static final long THRESHOLD = 3000L;
    // 起始值
    private final long start;
    // 结束值
    private final long end;

    public SumRecursiveTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long length = end - start;
        if (length < THRESHOLD) {
            // 计算
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            // 拆分
            long middle = (start + end) / 2;
            SumRecursiveTask left = new SumRecursiveTask(start, middle);
            left.fork();
            SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);
            right.fork();

            return left.join() + right.join();
        }
    }
}

 

小结

1. parallelStream 是线程不安全的
2. parallelStream 适用的场景是 CPU 密集型的,只是做到别浪费 CPU ,假如本身电脑 CPU 的负载很大,那还到处用并行流,那并不能起到作用
3. I/O 密集型 磁盘 I/O 、网络 I/O 都属于 I/O 操作,这部分操作是较少消耗 CPU 资源,一般并行流中不适用于 I/O 密集型的操作,就比如使用并流行进行大批量的消息推送,涉及到了大量I/O ,使用并行流反而慢了很多
4. 在使用并行流的时候是无法保证元素的顺序的,也就是即使你用了同步集合也只能保证元素都正确但无法保证其中的顺序

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/177621.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RK3399平台开发系列讲解(内存篇)访问虚拟内存的物理内存过程

🚀返回专栏总目录 文章目录 一、虚拟地址的表示二、虚拟地址到物理地址的转换三、Linux页表沉淀、分享、成长,让自己和他人都能有所收获!😄 📢虚拟内存这一概念给进程带来错觉,使它认为内存大到几乎无限,有时甚至超过系统的实际内存。每次访问内存位置时,由CPU完成从…

static_cast,dynamic_cast,const_cast详解

目录 一.static_cast&#xff08;静态转换&#xff09; 二.dynamic_cast&#xff08;动态转换&#xff09; 三.const_cast 一.static_cast&#xff08;静态转换&#xff09; 1.语法&#xff1a; static_cast<new_type>(expression); newtype dataname static_cast…

分享133个ASP源码,总有一款适合您

ASP源码 分享133个ASP源码&#xff0c;总有一款适合您 下面是文件的名字&#xff0c;我放了一些图片&#xff0c;文章里不是所有的图主要是放不下...&#xff0c; 133个ASP源码下载链接&#xff1a;https://pan.baidu.com/s/1l_8UHQkosNF3HHTu8AFq5A?pwdyxvw 提取码&#x…

欧几里得与扩展欧几里得算法(含推导过程及代码)

文章目录前言一、欧几里得算法二、扩展欧几里得算法2.1、认识裴蜀定理2.2、推导axbygcd(a, b)得到x与y2.2.1、推导过程2.2.2、代码实现2.3、推导axbygcd(a, b)的所有解及a或者b的最小值&#xff08;结论验证&#xff09;参考文章前言 在学习Acwing c蓝桥杯辅导课第八讲数论-Ac…

Spark 常用算子02

常用Action算子 1、countByKey算子 功能&#xff1a;统计key出现的次数&#xff08;一般适用于KV型的RDD&#xff09; 用法&#xff1a; result rdd1.countByKey() print(result)代码示例&#xff1a; # coding:utf8from pyspark import SparkConf, SparkContextif __name…

破解五角大楼3.0漏洞赏金计划专注于设施控制系统

国防部正在计划其“黑掉五角大楼”计划的第三次迭代&#xff0c;重点是找出维持标志性建筑和地面运行的操作技术中的漏洞。 国防部于 2016 年启动了黑客入侵五角大楼计划&#xff0c;供应商 HackerOne 协调了该部门公共网站上的漏洞赏金计划。 超过 1,400 名黑客参加了第一轮…

绝对空前!!!互联网史上的最大ddos攻击惊艳登场

美国遭遇史上最大黑客攻击&#xff0c;知名网站全部瘫痪。全世界一半的网络被黑客攻陷&#xff0c;大网站无一幸免。就在&#xff08;10月22日&#xff09;&#xff0c;美国早上我们见证了互联网建立以来的最大ddos攻击&#xff0c;twitter、netflix、paypal、reddit、pinteres…

【MySQL】锁

文章目录基础MyISAM表锁并发插入锁调度策略InnoDB事务并发事务行锁行锁争用情况行锁实现方式恢复和复制对InnoDB锁机制的影响死锁MVCC底层实现和原理悲观锁和乐观锁基础 锁是计算机协调多个进程或线程并发访问某一资源的机制&#xff08;避免争抢&#xff09;。在数据库中&…

一文打通java线程

基本概念&#xff1a;程序、进程、线程 程序(program) 是为完成特定任务、用某种语言编写的一组指令的集合。即指一 段静态的代码&#xff0c;静态对象。 进程(process) 是程序的一次执行过程&#xff0c;或是正在运行的一个程序。是一个动态的过程&#xff1a;有它自身的产…

Linux常用命令——sort命令

在线Linux命令查询工具(http://www.lzltool.com/LinuxCommand) sort 将文件进行排序并输出 补充说明 sort命令是在Linux里非常有用&#xff0c;它将文件进行排序&#xff0c;并将排序结果标准输出。sort命令既可以从特定的文件&#xff0c;也可以从stdin中获取输入。 语法…

文本情感分类TextCNN原理+IMDB数据集实战

1.任务背景 情感分类&#xff1a; 发展历程&#xff1a; 2.数据集 本次使用IMDB数据集进行训练。 3.模型结构 3.1 CNN基础 卷积&#xff1a; 单通道卷积&#xff1a;每组卷积核只包含一个。 单通道输入 单输出&#xff1a;设置一组卷积核。 单通道输入 多输出&#xff1a;…

国企避坑:to B服务性质的业务线不要来!又卷又累,互联网和它比简直是小巫见大巫!...

国企好归好&#xff0c;但不是所有的国企都能闭眼入&#xff0c;一位网友友情提示大家&#xff1a;不管是国企还是央企&#xff0c;to b服务性质的业务线不要来&#xff0c;不要来&#xff0c;不要来&#xff01;又卷又累&#xff0c;苦哈哈&#xff0c;互联网和这个比&#xf…

在CSS世界的权力——权重

在CSS的世界中也存在着权力即CSS权重 1. 概念 CSS权重指的是样式的优先级&#xff0c;有两条或多条样式作用于一个元素&#xff0c;权重高的那条样式对元素起作用&#xff0c;权重相同的&#xff0c;后写的样式会覆盖前面写的样式 2. 以前的BUG 在实际开发中&#xff0c;我…

代码随想录--双指针章节总结

代码随想录–双指针章节总结 1.LeetCode27 移除元素 给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 不要使用额外的数组空间&#xff0c;你必须仅使用 O(1) 额外空间并 原地 修改输入数组。 …

C++程序设计——动态内存管理

一、C/C内存分布 1.栈&#xff08;堆栈&#xff09; 存储非静态局部变量、函数参数、返回值等等&#xff0c;栈是向下增长。 2.内存映射段 是高效的I/O映射方式&#xff0c;用于转载一个共享的动态内存库。用户可使用系统接口创建共享内存&#xff0c;做进程间通信。 3.堆 用…

WPS的简单JS宏应用

有一阵子没写博客了&#xff0c;各种琐事忙碌&#xff1b;前段时间接触了下WPS的宏功能&#xff0c;抽点时间写个学习笔记吧。 案例背景简单说一下&#xff0c;主任让我统计OA后台在建工程项目的概况&#xff0c;后台数据导出一张表&#xff0c;再问隔壁经营部的同事要了一张中…

java类的初始化2023018

类的初始化&#xff1a; 第一次使用某个类&#xff0c;例如Person类&#xff0c;系统通常会在第一次使用Person类时加载这个类并初始化这个类。在类的准备阶段&#xff0c;系统将会为该类的类变量分配内存空间&#xff0c;并指定默认初始值。当Person类初始化完成后&#xff0c…

机器学习笔记之深度玻尔兹曼机(二)深度玻尔兹曼机的预训练过程

机器学习笔记之深度玻尔兹曼机——深度玻尔兹曼机的预训练过程引言深度信念网络预训练过程的问题深度玻尔兹曼机的预训练过程(2023/1/24)引言 上一节介绍了玻尔兹曼机系列的相关模型&#xff0c;本节将介绍深度玻尔兹曼机的预训练过程。 深度信念网络预训练过程的问题 在玻尔…

Escher 愛雪磁磚設計法則 - 高雄燕巢深水國小科展指導

“Talk is cheap. Show me the code.” ― Linus Torvalds 老子第41章 上德若谷 大白若辱 大方無隅 大器晚成 大音希聲 大象無形 道隱無名 拳打千遍, 身法自然 “There’s no shortage of remarkable ideas, what’s missing is the will to execute them.” – Seth Godin …

GreenPlum AOCO列存如何将数据刷写磁盘

GreenPlum AOCO列存如何将数据刷写磁盘AOCO列存表每个字段一个文件&#xff0c;前面我们介绍了列存表如何加载数据页&#xff0c;本文我们重点介绍AOCO表如何进行刷写。AOCO表进行insert、update、delete会产生脏数据&#xff0c;和heap表的异步脏页刷写不同&#xff0c;AOCO表…