1、概述
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架。ForkJoinPool是Java中提供了一个线程池,特点是用来执行分治任务。主题思想是将大任务分解为小任务,然后继续将小任务分解,直至能够直接解决为止,然后再依次将任务的结果合并。
2、原理详解
ForkJoinPool是自java7开始,jvm提供的一个用于并行执行的任务框架。其主旨是将大任务分成若干小任务,之后再并行对这些小任务进行计算,最终汇总这些任务的结果。得到最终的结果。其广泛用在java8的stream中。
这个描述实际上比较接近于单机版的map-reduce。都是采用了分治算法,将大的任务拆分到可执行的任务,之后并行执行,最终合并结果集。区别就在于ForkJoin机制可能只能在单个jvm上运行,而map-reduce则是在集群上执行。此外,ForkJoinPool采取工作窃取算法,以避免工作线程由于拆分了任务之后的join等待过程。这样处于空闲的工作线程将从其他工作线程的队列中主动去窃取任务来执行。这里涉及到的两个基本知识点是分治法和工作窃取。
2.1、分治法思想
分治法的基本思想是一个规模为N的问题分解为K个规模较小的子问题,这些子问题的相互独立且与原问题的性质相同,求出子问题的解之后,将这些解合并,就可以得到原有问题的解。是一种分目标完成的程序算法。简单的问题,可以用二分法来完成。
二分法,就是我们之前在检索的时候经常用到的Binary Search 。这样可以迅速将时间复杂度从O(n)降低到O(log n)。那么对应到ForkJoinPool对问题的处理也如此。基本原理如下图:
2.2、工作窃取思想
工作窃取是指当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。在ForkJoinpool中,工作任务的队列都采用双端队列Deque容器。我们知道,在通常使用队列的过程中,我们都在队尾插入,而在队头消费以实现FIFO。而为了实现工作窃取。一般我们会改成工作线程在工作队列上LIFO,而窃取其他线程的任务的时候,从队列头部取获取。示意图如下:
3、ForkJoinPool实现
Java提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合成总的计算结果。
ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。ForkJoinPool提供了如下两个常用的构造器。
// 创建一个包含parallelism个并行线程的ForkJoinPool
public ForkJoinPool(int parallelism)
//以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool
public ForkJoinPool()
创建ForkJoinPool实例后,可以调用ForkJoinPool的submit(ForkJoinTask task) 或者invoke(ForkJoinTask task) 来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:RecursiveAction和RecursiveTask。
RecursiveTask代表有返回值的任务
RecursiveAction代表没有返回值的任务。
3.1、RecursiveTask实现
案例:计算1-10000累加之和。
使用RecursiveTask有返回值的任务,编写SumTask
import java.util.concurrent.RecursiveTask;
public class SumTask extends RecursiveTask<Long> {
//达到多少不在分段
private int MIN_SUB = 100;
private int start;
private int end;
/**
* 构造SumTask
*
* @param start 开始累加数
* @param end 结束累加数
*/
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if (end - start < MIN_SUB) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
//拆分逻辑
int middle = (end + start) / 2;
SumTask task1 = new SumTask(start, middle);
SumTask task2 = new SumTask(middle + 1, end);
task1.fork();
task2.fork();
//等到子任务做完
long sum1 = task1.join();
long sum2 = task2.join();
sum = sum1 + sum2;
}
return sum;
}
}
测试类
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class ForkJoinPoolTest {
/**
* 测试forkjoinpool
*
* @param args 参数
*/
public static void main(String[] args) {
ForkJoinPool excutor = new ForkJoinPool(10);
SumTask task = new SumTask(1, 10000);
try {
ForkJoinTask<Long> future = excutor.submit(task);
System.out.println("result:" + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}