Java8并行流---并行数据处理与性能
- 0.主要内容
- 1.并行流
- 1.1将顺序流转换为并行流
- 附录
- 附录.1
0.主要内容
主要内容
用并行流并行处理数据
并行流的性能分析
分支/合并框架
使用Spliterator分割流
1.并行流
调用parallelStream方法来把集合转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让它们都忙起来。让我们用一个简单的例子来试验一下这个思想。
假设你需要写一个方法,接受数字n作为参数,并返回从1到给定参数的所有数字的和。一个直接(也许有点土)的方法是生成一个无穷大的数字流,把它限制到给定的数目,然后用对两个数字求和的BinaryOperator来归约这个流,如下所示:
这似乎是利用并行处理的好机会,特别是n很大的时候。那怎么入手呢?你要对结果变量进
行同步吗?用多少个线程呢?谁负责生成数呢?谁来做加法呢?
根本用不着担心啦。用并行流的话,这问题就简单多了!
1.1将顺序流转换为并行流
你可以把流转换成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序
流调用parallel方法:
Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
请注意,在现实中,对顺序流调用parallel方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。类似地,你只需要对并行流调用sequential方法就可以把它变成顺序流。请注意,你可能以为把这两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。例如,你可以这样做:
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
但最后一次parallel或sequential调用会影响整个流水线。在本例中,流水线会并行执行,因为最后调用的是它。
配置并行流使用的线程池
看看流的parallel方法,你可能会想,并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?
并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。
但是你可以通过系统属性java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小,如下所示:
System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,“12”);
这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。
但是使用并行流并不是会绝对的比循序流快,必须要使用更有针对性的方法
附录
附录.1
参考书籍《Java8实战》 百度网盘