1 分治思想
分治思想:规模为N的问题分解为K个规模的子问题,子问题相互独立且与原问题性质相同,求出子问题的解,就能得到原问题的解
分治思想的步骤:
分解
求解
合并
2 Fork/Join
2.1 介绍
并行计算框架,用来支持分治任务模型的,Fork对应的是分治任务模型里的任务分解,Join对应的是结果合并
2.2 应用场景
1 递归分解型任务
排序、归并、遍历等,通常可以将大的任务分解成若干子任务
2 数组处理
大型数组的排序、查找、统计等,拆成若干子数组,并行地处理每个子数组,最后合并成一个大的有序数组
3 并行化算法
并行化图像处理算法、并行化机器学习算法等,将任务拆分成若干子问题
4 大数据处理
大型日志文件处理、大型数据库查询等,将数据分成若干分片,并行处理每个分片
2.3 使用
主要组成:ForkJoinPool、ForkJoinTask
ForkJoinPool:用于管理任务的执行
ForkJoinTask:任务可以被分为得更小
使用步骤:
1 构建一个任务,需要继承RecursiveAction(无返回值)或RecursiveTask(有返回值),重写compute()方法来实现任务的执行逻辑,在方法中最后调用invokeAll开始执行任务
2 构建forkJoin线程池,调用forkJoin.invoke()来提交任务
2.3.1 ForkJoinPool
用于管理Fork/Join任务的线程
构造器
int parallelism:指定并行级别,决定工作线程的数量,未设置则使用Runtime.getRuntime().availableProcessors()来设置并行级别
ForkJoinWorkerThreadFactory:在创建线程时,通过该factory创建,未设置使用默认的DefaultForkJoinWorkerThreadFactory负责线程的创建
UncaughtExceptionHandler:指定异常处理器,运气出错时会由设定的处理器处理
asyncMode:队列的工作模式,true=先进先出,false=后进先出
任务提交方式
与普通线程池对比
工作窃取算法:普通线程池采用任务队列实现;FockJoinPool中的线程在执行完任务后,可以从其它线程的队列中获取任务并执行
任务的分解和合并:ForkJoinPool可将一个大任务分解为多个小任务,并行地执行这些小任务,最终将其结果合并;而普通线程只能按提交的任务顺序一个一个地执行
工作线程的数量:ForkJoinPool根据当前系统的CPU核心数来自动设置工作线程的数量,以最大限度地发挥CPU性能优势;普通线程需要手动设置线程池大小,且要考虑其合理性
任务类型:ForkJoinPool适用于执行大规模任务并行化;普通线程池适用于执行一些短小的任务,如处理请求
2.3.2 ForkJoinTask
定义执行任务的基本接口
通过继承ForkJoinTask类来实现自己的任务类,重写其中的compute()方法来定义任务的执行逻辑,实现时只需继承其子类:
RecursiveAction:递归执行但不需要返回结果
RecursiveTask:递归执行需要返回的结果
CountedCompleter:任务完成执行后,触发的自定义钩子函数
调用方法
fork() ---- 提交任务
向当前任务所运行的线程池中提交任务;当前线程是ForkJoinWorkerThread类型,会放入该线程的工作队列,否则放入common线程池的工作队列中
join() ---- 获取任务执行结果
用于获取任务的执行结果;调用方法时,会阻塞当前线程直到对应的子任务完成运行并返回结果
2.3.3 处理递归任务
public class Fibonacci extends RecursiveTask<Integer> {
final int n;
public Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 2);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 1);
return f2.compute() + f1.join();
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
Fibonacci fibonacci = new Fibonacci(100);
Integer result = pool.invoke(fibonacci);
System.out.println(result);
}
}
上述代码存在的问题:会导致程序运行时间长,递归深度过大时栈溢出等
在使用ForkJoinPool处理递归任务时,特别要考虑递归深度和任务粒度,避免调度带来的内存消耗