ForkJoinPool 是什么?
ForkJoinPool 是一个 Java 并发编程框架,用于解决分治算法中的任务拆分、执行、合并等问题,是 Java 7 引入的一个新的工具类。
ForkJoinPool 的基本思想是将一个大任务划分成若干个小任务,然后并行执行这些小任务,最后将它们的结果合并起来得到最终结果。ForkJoinPool 的实现采用了工作窃取算法,即当某个线程完成自己的任务后,会主动从其他线程的任务队列中“窃取”任务执行,以充分利用 CPU 的计算资源,提高程序的并行度和性能。
使用 ForkJoinPool 可以通过简单的几行代码就实现高效的并发任务执行,尤其适合处理递归式的任务。在 Java 8 中,Stream API 内部就是基于 ForkJoinPool 实现的。
什么是RecursiveTask ?
RecursiveTask是Java中Fork/Join框架提供的一个类,用于实现可递归地将一个任务拆分成多个小任务的任务。它是ForkJoinTask的一个子类,表示一个可分解的异步任务,通常用于处理需要分治的问题,如归并排序、快速排序等。
实现RecursiveTask需要重写它的compute()方法,该方法将问题分解成较小的子问题,每个子问题都会被分配到一个线程池中执行,并返回一个子问题的结果。当所有子问题的结果都被收集后,compute()方法会将它们合并成一个完整的结果,这个结果就是该任务的最终结果。
RecursiveTask与普通的ForkJoinTask的区别在于,它的compute()方法返回值是一个泛型类型的结果,而不是void类型。在处理可分解的问题时,RecursiveTask会递归地将问题拆分成多个小问题,并使用fork()方法将它们提交到线程池中执行,然后使用join()方法等待子任务的结果,最后将所有子任务的结果合并成一个大的结果。因此,RecursiveTask常常被用于处理有返回结果的可分解问题,而ForkJoinTask则常常被用于处理无返回结果的可分解问题。
为什么有了ForkJoinPool 还要RecursiveTask?
ForkJoinPool 和 RecursiveTask 是 Java 并发编程中常用的工具,它们都用于实现并行化的任务执行,但它们的应用场景略有不同。
ForkJoinPool 是一个线程池,用于执行可以被拆分为更小任务的大型任务。在一个 ForkJoinPool 中,任务被分成多个子任务并由多个工作线程并行执行。每个子任务都是 ForkJoinTask 类的实例,它们可以是 ForkJoinTask 的子类或者是它的两个子类之一:RecursiveAction 和 RecursiveTask。
RecursiveTask 是 ForkJoinTask 的一个子类,它代表一个可以被拆分为更小任务并返回结果的任务。相比 RecursiveAction,RecursiveTask 可以返回一个结果,适用于需要在任务中返回结果的场景。
虽然 ForkJoinPool 可以执行任何类型的 ForkJoinTask,但是 RecursiveTask 是专门用于返回结果的任务,因此在需要返回结果的任务中应该使用 RecursiveTask 而不是其他类型的 ForkJoinTask。同时,RecursiveTask 中也可以拆分任务,利用 ForkJoinPool 的线程池机制实现更高效的并行化任务执行。
RecursiveAction 是 ForkJoinTask 的一个子类,用于代表一个可以被拆分为更小任务但不返回结果的任务。相比 RecursiveTask,RecursiveAction 不会返回一个结果,适用于不需要返回结果的场景。
适用于 RecursiveAction 的任务通常涉及对数据结构的修改或者执行某些副作用,例如对一个列表进行排序或者对一个数组进行求和。这些任务可以被拆分为多个子任务,并由 ForkJoinPool 中的多个工作线程并行执行,以提高任务执行的效率。
ForkJoinTask 的子类或者是它的两个子类之一:RecursiveAction 和 RecursiveTask。区别:
RecursiveAction :RecursiveAction 不会返回一个结果,适用于不需要返回结果的场景。用于代表一个可以被拆分为更小任务但不返回结果的任务。
RecursiveTask:RecursiveTask 可以返回一个结果,适用于需要在任务中返回结果的场景。用于代表一个可以被拆分为更小任务并返回结果的任务。
第一步:定义一个RecursiveTask 子类以重写 compute () 方法
package com.lfsun.main.point.democoncurrent;
import java.util.concurrent.RecursiveTask;
/**
* 计算数组元素和的任务
* <p>
* RecursiveTask 是一个抽象类,它表示一个可以通过递归地将任务拆分为更小的子任务来并行执行的任务。
* <p>
* 具体来说,这个 SumTask 类表示一个计算长整型数列元素和的任务。
* 它需要实现 RecursiveTask 类的 compute() 方法来完成计算任务,并返回计算结果。
* 在实现 compute() 方法时,这个类可能会将大的数列拆分为多个子任务,并通过调用 fork() 方法来提交这些子任务,
* 然后通过调用 join() 方法来等待子任务的完成并合并它们的计算结果。
* <p>
* 在这个 SumTask 类中,递归的拆分过程可能会继续下去,直到数列被拆分成足够小的子问题,
* 这些子问题可以直接计算出其结果,而不需要再进行拆分。
* 这些直接可计算的子问题的计算结果将被累加到最终的计算结果中。
*
* @author Administrator
*/
public class SumTask extends RecursiveTask<Long> {
/**
* 小任务阈值
*/
private static final int THRESHOLD = 5000000;
/**
* 待计算的数组
*/
private final long[] array;
/**
* 待计算的数组下标区间 [low, high]
*/
private final int low;
private final int high;
public SumTask(long[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
/**
* 计算任务
*/
@Override
protected Long compute() {
long sum = 0L;
// 如果阈值设置得过小,会导致过多的任务拆分,而拆分后的子任务过小,从而导致线程调度和任务切换的开销过大,反而降低了程序的效率。
// 如果阈值设置得过大,会导致任务过于庞大,无法充分利用系统资源进行并行计算,也会降低程序的效率。
if (high - low <= THRESHOLD) {
// 如果区间长度小于等于阈值,则直接计算
for (int i = low; i <= high; i++) {
sum += array[i];
}
} else {
// 否则,将任务分解成两个子任务,递归计算
// >> 1 是右移一位,等价于除以2,不过它是有符号右移,也就是如果原来的数是负数,那么移位之后仍然是负数,会在高位补上符号位。
//比如:-3 的二进制表示为 1111 1101,在使用 >> 1 右移一位后变成 1111 1110,对应的十进制数是 -2。
//而无符号右移 >>> 1 则不考虑符号位,直接右移,高位补 0,因此对于正数的结果和有符号右移一致,对于负数则会得到一个很大的正数,因为最高位变成了0。
int mid = (low + high) >>> 1;
SumTask left = new SumTask(array, low, mid);
SumTask right = new SumTask(array, mid + 1, high);
// left.fork() 方法和 right.fork() 方法都是用来提交子任务的,
// 它们的作用是将当前任务分成若干个子任务,并将这些子任务提交到 Fork/Join 框架中进行执行。
// 当一个任务被提交到 Fork/Join 框架中时,框架会根据任务的大小和复杂度来决定是否将任务继续拆分,
// 以及如何分配任务给线程池中的不同线程来执行。
left.fork();
right.fork();
// 使用 join() 方法获取子任务的计算结果,并将这些结果合并成整个任务的计算结果。
// 在 Fork/Join 框架中,join() 方法用于等待子任务完成并返回子任务的计算结果。
// 在执行 sum = left.join() + right.join() 时,程序会阻塞等待子任务的完成,
// 并在所有子任务完成后才会继续执行计算结果的合并。这样,就可以保证整个数列的和的计算是正确的。
sum = left.join() + right.join();
}
return sum;
}
}
第二步:测试
import com.google.common.base.Stopwatch;
import com.lfsun.common.util.MyArraysUtils;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
class SumTaskTest {
@Test
void compute() throws ExecutionException, InterruptedException {
int size = 10000000;
long[] array = MyArraysUtils.genArray(size);
// 创建计时器\StopWatch 是一个计时器类,用于测量代码块的执行时间。
Stopwatch stopWatch = Stopwatch.createStarted();
long sumFor = 0;
for (int i = 0; i < size; i++) {
sumFor += array[i];
}
// elapsed() 方法是 StopWatch 类的一个公共方法,用于返回计时器已经过去的时间,以指定的时间单位为单位。
System.out.printf("结果 %s 耗时 %sms%n", sumFor, stopWatch.elapsed(TimeUnit.MILLISECONDS));
// reset() 方法是 StopWatch 类的一个公共方法,用于将计时器重置为初始状态,即清零计时器的计时值和状态。
stopWatch.reset();
// 创建任务
SumTask sumTask = new SumTask(array, 0, size - 1);
// 创建线程池 ForkJoinPool 是一个基于工作窃取算法的线程池实现,用于执行可以被拆分成更小任务并在多个线程中并行执行的任务。
// ForkJoinPool.commonPool() 是一个静态方法,用于获取一个全局共享的 ForkJoinPool 线程池实例,这个线程池实例会被多个任务共享。
// 工作窃取算法:一种用于并行计算的调度算法:让每个线程都有一个私有的任务队列,当线程完成自己的任务时,
// 它可以从其他线程的队列中“窃取”任务并执行。
// 在这种方式下,线程之间的任务负载可以自适应地平衡,从而更好地利用系统资源,提高程序的并行性能。
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
// 提交任务到线程池,ForkJoinPool 线程池会在后台异步地执行这个任务。
forkJoinPool.submit(sumTask);
// 获取结果
Long result = sumTask.get();
System.out.printf("结果 %s 耗时 %sms%n", result, stopWatch.elapsed(TimeUnit.MILLISECONDS));
}
}