ForkJoinPool
是一个功能强大的 Java 类,用于处理计算密集型任务,使用 ForkJoinPool
分解计算密集型任务,并并行执行它们,能够产生更好的性能。它的工作原理是将任务分解成更小的子任务,使用分而治之的策略进行操作,使其能够并发地执行任务,从而提高吞吐量并减少处理时间。
ForkJoinPool 的独特特性之一是它用于优化性能的工作窃取算法。当工作线程完成分配的任务时,它将从其他线程窃取任务,确保所有线程都有效地工作,并且不会浪费计算机资源。
ForkJoinPool 在 Java 的并行流和 CompletableFutures
中广泛使用,允许开发人员轻松地并发执行任务。此外,其他 JVM 语言(如 Kotlin和 Akka)也使用这个框架来构建需要高并发性和弹性的消息驱动应用程序。
使用 ForkJoinPool 构建线程池
ForkJoinPool 存储着 worker,这些 worker 是在机器的每个 CPU 核心上运行的进程。这些进程中的每一个都存储在一个双端队列(Deque
)中。一旦工作线程的任务用完,它就开始从其他工作线程窃取任务。
首先,会有分岔任务的过程。这意味着一个大任务将被分解成可以并行执行的小任务。一旦所有子任务完成,它们就会重新加入。最后,ForkJoinPool 类通过 Join 的方式提供一个输出结果,如下图所示。
当任务在 ForkJoinPool 中提交时,该进程将被分成更小的进程并推送到共享队列中。
一旦 fork()
方法被调用,任务将被并行调用,直到基本条件为真。一旦处理被分叉,join()
方法会确保线程相互等待,直到进程完成。
所有任务最初都将提交给一个主队列,这个主队列将把任务推送给 work 线程。同时,与堆栈数据结构相同,任务是使用后进先出(LIFO
)策略插入的,如下图所示。
Work-stealing 窃取算法
Work-stealing 算法是一种用于实现并行计算和负载平衡的策略。它通常用于在分布式系统和多核处理器中,以高效地分配和平衡计算任务。
Work-stealing 算法的优点是它可以实现高效的负载平衡和并行计算,同时减少了任务的等待时间。当一个线程完成自己的任务并变得空闲时,它将尝试从另一个线程的队列末端“窃取”任务,与队列数据结构相同,它遵循 FIFO 策略。这种策略将允许空闲线程拾取等待时间最长的任务,从而减少了总体等待时间并提高了吞吐量。
在下面的图中,线程 2 通过轮询线程 1 的队列中的最后一个元素,从线程 1 窃取一个任务,然后执行该任务。被窃取的任务通常是队列中等待时间最长的的任务,这确保了工作负载在池中的所有线程之间均匀分布。
总的来说,ForkJoinPool 的工作窃取算法是一个强大的功能,可以通过确保所有可用的计算资源得到有效利用来显著提高并行程序的性能。
ForkJoinPool 主类
让我们快速浏览一下支持使用 ForkJoinPool 进行处理的主类。
- ForkJoinPool 创建一个线程池来使用
ForkJoin
:它的工作原理与其他线程池类似。这个类中最重要的方法是commonPool()
,它用于创建了 ForkJoin 线程池。 RecursiveAction
:该类的主要功能是计算递归操作。在compute()
方法中,我们没有返回值,这是因为递归发生在compute()
方法中。- RecursiveTask: 这个类的工作方式类似于
RecursiveAction
,不同之处在于compute()
方法将返回一个值。
使用 RecursiveAction
要使用 RecursiveAction 的功能,我们需要继承它并覆盖它的 compute()
方法。
在下面的代码示例中,我们将以并行和递归的方式计算数组中每个数字的两倍数。
我们看到在代码中,fork()
方法调用 compute()
方法。一旦整个数组得到了每个元素的和,递归调用就停止了。同时,一旦对数组的所有元素进行递归求和,我们就会显示结果。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class ForkJoinDoubleAction {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
int[] array = {1, 5, 10, 15, 20, 25, 50};
DoubleNumber doubleNumberTask = new DoubleNumber(array, 0, array.length);
// 调用compute方法
forkJoinPool.invoke(doubleNumberTask);
System.out.println(DoubleNumber.result);
}
}
class DoubleNumber extends RecursiveAction {
final int PROCESS_THRESHOLD = 2;
int[] array;
int startIndex, endIndex;
static int result;
DoubleNumber(int[] array, int startIndex, int endIndex) {
this.array = array;
this.startIndex = startIndex;
this.endIndex = endIndex;
}
@Override
protected void compute() {
if (endIndex - startIndex <= PROCESS_THRESHOLD) {
for (int i = startIndex; i < endIndex; i++) {
result += array[i] * 2;
}
} else {
int mid = (startIndex + endIndex) / 2;
DoubleNumber leftArray = new DoubleNumber(array, startIndex, mid);
DoubleNumber rightArray = new DoubleNumber(array, mid, endIndex);
// 递归地调用compute方法
leftArray.fork();
rightArray.fork();
// Joins
leftArray.join();
rightArray.join();
}
}
}
计算的结果输出是 252。
从 RecursiveAction 中要记住的重要一点是,它不返回值。还可以通过使用分而治之的策略来分解这个过程,从而提高性能。
同样要注意的是,当将 RecursiveAction 用于可以有效地分解为更小的子问题的任务时,它是最有效的。
因此,RecursiveAction 和 ForkJoinPool 应该用于计算密集型任务,在这些任务中,工作的并行化可以显著提高性能。否则,由于线程的创建和管理,性能会变得更差。
RecursiveTask
在这个示例中,我们使用 RecursiveTask 类看看有没有什么区别。
RecursiveAction 和 RecursiveTask 之间的区别在于,使用 RecursiveTask,我们可以在compute()
方法中返回一个值。
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ForkJoinSumArrayTask extends RecursiveTask<Integer> {
private final List<Integer> numbers;
public ForkJoinSumArrayTask(List<Integer> numbers) {
this.numbers = numbers;
}
@Override
protected Integer compute() {
if (numbers.size() <= 2) {
return numbers.stream().mapToInt(e -> e).sum();
} else {
int mid = numbers.size() / 2;
List<Integer> list1 = numbers.subList(0, mid);
List<Integer> list2 = numbers.subList(mid, numbers.size());
ForkJoinSumArrayTask task1 = new ForkJoinSumArrayTask(list1);
ForkJoinSumArrayTask task2 = new ForkJoinSumArrayTask(list2);
task1.fork();
return task1.join() + task2.compute();
}
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
List<Integer> numbers = List.of(1, 3, 5, 7, 9);
int output = forkJoinPool.invoke(new ForkJoinSumArrayTask(numbers));
System.out.println(output);
}
}
在上面的代码中,我们递归地分解数组,直到它达到基本条件。
一旦我们破坏了主数组,我们就会将 list1 和 list2 发送给 ForkJoinSumArrayTask
,然后我们分叉 task1
,它将并行执行 compute()
方法和数组的其他部分。
一旦递归过程达到基本条件,就会调用 join
方法,将结果连接起来。
最后输出结果为 25。
何时使用 ForkJoinPool
ForkJoinPool 不应该在所有情况下都使用。如前所述,最好将其用于高度密集的并发进程。让我们具体看看这些情况都有哪些:
- 递归任务: ForkJoinPool 非常适合执行递归算法,如快速排序、归并排序或二进制搜索。这些算法可以分解成更小的子问题并并行执行,显著提高性能。
- 并行问题:如果你的问题可以很容易地划分为独立的子任务,例如图像处理或数值模拟,那么可以使用 ForkJoinPool 并行执行子任务。
- 高并发场景:在高并发场景中,例如 web 服务器、数据处理管道或其他高性能应用程序,可以使用 ForkJoinPool 跨多个线程并行执行任务,这有助于提高性能和吞吐量。
结尾
在本文中,我们看到了如何使用最重要的 ForkJoinPool 功能在 CPU 内核中执行繁重的操作。最后让我们来总结本文的要点:
- ForkJoinPool 是一个线程池,它使用分而治之策略递归地执行任务。
- JVM 语言(如Kotlin和Akka)使用
ForkJoinPool
来构建消息驱动型的应用程序。 - ForkJoinPool 并行执行任务,从而有效地利用计算机资源。
- Work-stealing 窃取算法通过允许空闲线程从繁忙线程窃取任务来优化资源利用。
- 任务存储在双端队列中,存储采用后进先出策略,窃取采用先进先出策略。
- ForkJoinPool 框架中的主要类包括 ForkJoinPool、RecursiveAction 和RecursiveTask:
- RecursiveAction 用于计算递归操作,它不返回任何值。
- RecursiveTask 用于计算递归操作,但返回一个值。
- compute() 方法在两个类中被重写以实现自定义逻辑。
- fork() 方法调用 compute() 方法并将任务分解为更小的子任务。
- join() 方法等待子任务完成并合并它们的结果。
- ForkJoinPool 通常与并行流和 CompletableFuture 一起使用。