parallelStream
parallelStream
是一种并行流, 意为处理任务时并行处理。
parallelStream
底层使用的是ForkJoinPool
。ForkJoinPool
是一种工作窃取算法线程池,和分治法的概念一致,可以充分利用多 CPU 的优势,把一个任务拆分成多个"小任务", 把多个"小任务"放到多个处理器核心上并行执行; 当多个"小任务"执行完成之后, 再将这些执行结果合并起来
前提是硬件支持, 如果单核 CPU, 只会存在并发处理, 而不会并行
核心概念
调用并行流的API
- stringList.parallelStream()
- stringList.stream().parallel()
- Stream.of(stringList).parallel()
虽然 API 的调用方式不同, 但是底层都是将AbstractPipeline
中的parallel
标识设置为true
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
并行流parallerStream的底层都是使用同一个ForkJoinPool,而ForkJoinPool线程数默认为cpu的核心数-1
// 查看内核的可用核数
Runtime.getRuntime().availableProcessors()
// ForkJoinPool线程数
ForkJoinPool.commonPool().getParallelism()
修改默认的线程数量
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16");
不建议直接修改commonPool线程数,自行创建一个ForkJoinPool更优
注意点
- parallelStream线程不安全问题(加锁/使用线程安全的集合(如
ConcurrentHashMap
)/集合采用collect()/reduce()
操作) - parallelStream 适用的场景是计算密集型的,假如服务器CPU的负载很大,那并不能起到并行计算的作用(尽量不要在paralelSreram操作中使用IO流)
- 不要在多线程中使用parallelStream,原因同上,当线程都在争抢CPU时不但没有提升效果,反而还会加大线程切换的开销
ForkJoinPool
ForkJoinPool
的每个ForkJoinWorkerThread
下都维护着一个工作队列(WorkQueue
),这是一个双端队列,里面存放的对象是任务ForkJoinTask
ForkJoinWorkerThread
在运行中产生新的任务(通常是因为调用了fork()
)时,会放入工作队列的队尾,并且会在队尾取出任务(LIFO
)- ForkJoinWorkerThread在处理自己的工作队列同时,会尝试窃取
steal
一个任务(来自于刚刚提交到 pool的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首(FIFO
) - 在遇到
join()
时,如果需要join
的任务尚未完成,则会先处理其他任务,直到目标的任务方法被告知已经结束(isDone()
),所有的任务都是无阻塞的完成
自定义ForkJoinPool并行流
public void mission() {
try {
List<Long> idList = Lists.newArrayList(1L, 2L, 3L);
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> {
idList.parallelStream().forEach(uid -> {
//do something
});
});
// 等待任务执行完毕
pool.awaitTermination(2, TimeUnit.SECONDS);
log.info("mission done");
} catch (InterruptedException e) {
}
若需要等待所有任务执行完毕,可使用下述方式
- ForkJoinPool#awaitTermination()
- 显式submit任务,使用ForkJoinTask#get()阻塞
- CountDownLatch
public void mission() {
try {
List<Long> idList = Lists.newArrayList(1L, 2L, 3L);
CountDownLatch countDownLatch = new CountDownLatch(1);
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> {
idList.parallelStream().forEach(uid -> {
//do something
});
countDownLatch.countDown();
});
// 等待任务执行完毕
countDownLatch.await();
log.info("mission done");
} catch (InterruptedException e) {
}
参考资料:
- 谨慎使用 Java8 新特性 parallelStream
- ParallelStream的陷阱
- ForkJoinPool线程池—独门专访