一、前言
前文介绍了线程的异步编排工具类 CompletableFuture 的使用,使用它能够很好的完成线程任务的编排工作,但同时,我们也注意到,其使用的默认线程池是 ForkJoinPool.commonPool() 的方法。则这个线程池是共用的,而为了业务之间互不影响,比如 A 业务秒杀并发量大,占用了大多数的线程,那 B 业务再使用这个线程池的话,就无法很好的推进下去。
观其源码也知在并行流时用到了 ForkJoinPool 的公共线程池。ForkJoinPool 是专门设计用于 Fork/Join 的线程池,什么是 Fork/Join 呢?这个 ForkJoinPool 这个线程池和我们学的 ThreadPoolExcutor 有啥区别呢?下图为ForkJoinPool 与我们熟知的线程池间的继承关系。下文,我们将解答这些疑惑,并学习相关概念并上手使用。
二、ForkJoinPool
2.1 概述
首先,查看该类介绍,该类是 Doug Lea 在JDK1.7版本中添加的新的线程池。Fork为分叉,Join 为连接,而其设计的思想也确实是先分开处理后合并处理的工作原理。核心思想就是把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务的结果。
Fork:把一个复杂任务进行分拆,大事化小
Join:把分拆任务的结果进行合并
复制代码
这个思想就很像 Hadoop 中 MapReduce 的工作原理,都是先组装成各个 Map ,然后在 Reduce 中完成合并。 ForkJoinPool 也是使用了分而治之的算法,用相对较少的线程处理大量的任务,将大任务一拆为二,以此类推,每个子任务再分成两半,知道达到阈值(自己设定)然后组成 N 个具有父子层级关系的任务。这时候 ForkJoinPool 就会安排合理的工作线程,例如 8个线程,让它们不断得去干活,先把它们的最下层的小任务干完,接着再去干小任务的父任务,一层层直到任务结束。
由此,可知既然它擅长干一些能拆分成父子任务的工作。所以例如快速排序、二叉查找等任务。最适合的是计算密集型的任务,比如是数据量很大的一个统计,如果是存在 IO交互,线程间同步,那就不太适合了。
2.2 累加案例
同样是线程池,为什么还要加个 ForkJoinPool 呢?你与我们老牌线程池有什么区别?就考虑一个场景,如果要计算 1-10000000 的复杂任务计算,如果使用一个原有的线程池 ThreadPool ,它只能有一个线程上场,而线程池里的其他线程就在那干瞪眼,这时候,为了进一步解决此类场景的问题,ForkJoinPool 提出了,下面我们通过一个计算案例简单了解下其是如何使用的。
public class ForkJoinPoolDemo {
public static void main(String[] args) {
singleDeal(1,100000000L);
System.out.println("==============");
ForkJoinDeal(1,100000000L);
}
private static void singleDeal(int start,long end) {
long startTime = System.currentTimeMillis();
long sum=0;
for (int i = start; i <= end; i++) {
// 累加
sum +=i;
}
System.out.println(sum);
System.out.println("单线程消耗的时间:"+(System.currentTimeMillis() - startTime));
}
private static void ForkJoinDeal(int start,long end) {
long startTime = System.currentTimeMillis();
//定义任务
TaskExample taskExample = new TaskExample(start, end);
//定义执行对象
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
//加入任务执行
ForkJoinTask<Long> result = forkJoinPool.submit(taskExample);
//输出结果
try {
System.out.println(result.get());
}catch (Exception e){
e.printStackTrace(
);
}finally {
forkJoinPool.shutdown();
System.out.println("ForkJoin 消耗的时间:"+(System.currentTimeMillis() - startTime));
}
}
}
复制代码
@Slf4j
public class TaskExample extends RecursiveTask<Long> {
private long start;
private long end;
private long sum;
/**
* 构造函数
* @param start
* @param end
*/
public TaskExample(long start, long end){
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 大于 100 个数相加切分,小于直接加
if(end -start <10000){
for (long i = start; i <= end; i++) {
// 累加
sum +=i;
}
}else{
// 切分为 2 块
long middle = (start +end)/2;
// 递归调用,切分为 2 个小任务
TaskExample taskExample = new TaskExample(start, middle);
TaskExample taskExample2 = new TaskExample(middle+1,end);
// 执行:异步
taskExample.fork();
taskExample2.fork();
// 同步阻塞获取执行结果
sum = taskExample.join() + taskExample2.join();
}
return sum;
}
}
复制代码
观察上述代码,使用 4个线程,去计算任务,将一个大的计算任务拆分为 0—10000 、10000-20000依次类推,然后最终计算结果被依次合并,得到最终的结果。
也看到 ForkJoin 比使用单线程计算会慢很多,但如果改变任务的切分粒度,比如提到10w 一个计算任务,那效率就会提示一些,所以,ForkJoin 在面对一些复杂的计算型任务时可以考虑,一般情况下也用不上。
2.3 ThreadPool 与 ForkJoinPool
事实上,它更多的是原有线程池的一个补充,在特定的场景下,使用它会更合适。下面对两者进行一个简单对比:
-
应用场景:
- ThreadPool :常用于线程并发,阻塞延时较长的任务,这张任务一般要求线程个数较多。
- ForkJoinPool:用于大任务可分解成小任务的情况下,一般是处理计算密集型任务。
-
基本原理对比:
- ThreadPool:所有线程共用一个队列,然后这些线程排着队去干活,来一个任务,派一个线程去,避免了线程的创建和销毁的开销。
- ForkJoinPool: 每个线程都是一个队列,可以用极少的线程干非常多的父子任务,然后将每个任务的结果合并进而得出最终结果。
工作窃取机制:
ForkJoinPool 提供了一个有效利用线程池机制的方法,就是当一个任务执行完成后,就是自动从队列尾部获取新的任务去执行,这样在任务量很大的时候,CPU 多的计算机会表现出很好的性能。
如图中的 A、B线程,分别从队列中读取任务,然后放入(push)进自己的队列中,同时也取出(pop)自己队列的任务进行消费,那为什么又放又取呢?不直接取最右边的任务?这主要是设计者为了CPU缓存的命中率,然后B 线程空闲时,可以偷取(poll) A 线程的未执行的任务。提高 线程工作效率。
三个API 方法:
方法名 | 说明 |
---|---|
invoke(ForkJoinTask) | 提交任务并一直阻塞直到任务执行完成返回合并结果 |
execute(ForkJoinTask) | 异步执行任务,无返回值。 |
submit(ForkJoinTask) | 异步执行任务,返回task本身,可以通过task.get()方法获取合并之后的结果。 |
2.4 ForkJoinTask
上面的计算案例中,我们创建出一个类,继承了 RecursiveTask ,然后交给 ForkJoinPool 提交任务。因为,ForkJoinPool 只会处理 ForkJoinTask 的任务。而我们用的 RecursiveTask 是其重要的实现类
我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务。该类提供了在任务中执行 fork 和 join 的机制。通常情况下我们不需要直接集成 ForkJoinTask 类,只需要继承它的子类,Fork/Join 框架提供了几个子类:
-
RecursiveAction:用于没有返回结果的任务
-
RecursiveTask : 用于有返回结果的任务
-
CountedCompleter : 无返回结果,完成任务后可以触发回调
FrokJoinTask 提供了两个重要的方法:
- fork:让 task 异步执行
- join:让 task 同步执行,可以获取返回值
ForkJoinPool 由 ForkJoinTask 数组和 ForkJoinWorkerThread 数组组成,ForkJoinTask 数组负责将存放以及将程序提交给 ForkJoinPool,而 ForkJoinWorkerThread 负责执行这些任务。
三、小结
对于 ForkJoinPool 相关的知识点需要记住以下几点:
- CompletableFuture 在使用并行流计算的时候会调用 ForkJoinPool 的commonPool 方法,但这个方法可能会被很多任务共同使用,所以要谨慎使用,使用自己创建的线程池。
- ForkJoinPool 是针对计算密集型的任务比较适合,如果数据量不大,没有使用 ForkJoinPool的必要,单线程也许更快。
- ForkJoinPool 的执行效率与任务分片的粒度和线程数和数据量都有关联,需要仔细评估使用
分类:
后端
标签:
后端Java面试