线程池ForkJoinPool详解

news2025/1/24 17:46:49

由一道算法题引发的思考

算法题:如何充分利用多核CPU的性能,快速对一个2千万大小的数组进行排序?

这道算法题可以拆解来看:

1)首先这是一道排序的算法题,而且是需要使用高效的排序算法对2千万大小的数组进行排序,可以考虑使用快速排序或者归并排序。

2)可以使用多线程并行排序算法来充分利用多核CPU的性能。

基于归并排序算法实现

快速对一个大小为2千万的数组进行排序,可以使用高效的归并排序算法来实现。

什么是归并排序

归并排序(Merge Sort)是一种基于分治思想的排序算法。归并排序的基本思想是将一个大数组分成两个相等大小的子数组,对每个子数组分别进行排序,然后将两个子数组合并成一个有序的大数组。因为常常使用递归实现(由先拆分后合并的性质决定的),所以我们称其为归并排序。

归并排序的步骤包括以下几个方面:

  • 将数组分成两个子数组
  • 对每个子数组进行排序
  • 合并两个有序的子数组

归并排序的时间复杂度为O(nlogn),空间复杂度为O(n),其中n为数组的长度。

分治思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题性质相同。求出子问题的解,就可得到原问题的解。

分治思想的步骤如下:

分解:将要解决的问题划分成若干规模较小的同类问题;

求解:当子问题划分得足够小时,用较简单的方法解决;

合并:按原问题的要求,将子问题的解逐层合并构成原问题的解。

计算机十大经典算法中的归并排序、快速排序、二分查找都是基于分治思想实现的算法。

分治任务模型图如下:

归并排序演示地址:

https://www.cs.usfca.edu/~galles/visualization/ComparisonSort.html

使用归并排序实现上面的算法题

单线程实现归并排序

单线程归并算法的实现,它的基本思路是将序列分成两个部分,分别进行递归排序,然后将排序好的子序列合并起来。

Fork/Join并行归并排序

并行归并排序是一种利用多线程实现的归并排序算法。它的基本思路是将数据分成若干部分,然后在不同线程上对这些部分进行归并排序,最后将排好序的部分合并成有序数组。在多核CPU上,这种算法也能够有效提高排序速度。

可以使用Java的Fork/Join框架来实现归并排序的并行化

代码示例

import java.util.Arrays;
import java.util.concurrent.RecursiveAction;

/**
 * 利用fork-join实现数组排序
 */
public class MergeSortTask extends RecursiveAction {

  // 数组是否继续拆分的阈值,数组长度低于此阈值就不再进行拆分
  private final int threshold;
  // 要排序的数组
  private int[] arrayToSort;

  public MergeSortTask(final int[] arrayToSort, final int threshold) {
    this.arrayToSort = arrayToSort;
    this.threshold = threshold;
  }

  @Override
  protected void compute() {
    // 拆分后的数组长度小于阈值,直接进行排序
    if (arrayToSort.length <= threshold) {
      // 调用jdk提供的排序方法
      Arrays.sort(arrayToSort);
      return;
    }
    // 对数组进行拆分
    int midpoint = arrayToSort.length / 2;
    int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);
    int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length);
    MergeSortTask leftTask = new MergeSortTask(leftArray, threshold);
    MergeSortTask rightTask = new MergeSortTask(rightArray, threshold);
    //提交任务
//    leftTask.fork();
//    rightTask.fork();
//    // 阻塞当前线程,直到获取任务的执行结果
//    leftTask.join();
//    rightTask.join();
    // 调用任务,阻塞当前线程,直到所有子任务执行完成
    invokeAll(leftTask, rightTask);
    // 合并排序结果
    arrayToSort = MergeSort.merge(leftTask.getSortedArray(), rightTask.getSortedArray());
  }

  public int[] getSortedArray() {
    return arrayToSort;
  }
}
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;

public class MergeSort {

  // 要排序的数组
  private final int[] arrayToSort;
  // 拆分的阈值,低于此阈值就不再进行拆分
  private final int threshold;

  public MergeSort(final int[] arrayToSort, final int threshold) {
    this.arrayToSort = arrayToSort;
    this.threshold = threshold;
  }

  /**
   * 排序
   *
   * @return
   */
  public int[] mergeSort() {
    return mergeSort(arrayToSort, threshold);
  }

  public static int[] mergeSort(final int[] arrayToSort, int threshold) {
    // 拆分后的数组长度小于阈值,直接进行排序
    if (arrayToSort.length <= threshold) {
      // 调用jdk提供的排序方法
      Arrays.sort(arrayToSort);
      return arrayToSort;
    }
    int midpoint = arrayToSort.length / 2;
    // 对数组进行拆分
    int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);
    int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length);
    // 递归调用
    leftArray = mergeSort(leftArray, threshold);
    rightArray = mergeSort(rightArray, threshold);
    // 合并排序结果
    return merge(leftArray, rightArray);
  }

  public static int[] merge(final int[] leftArray, final int[] rightArray) {
    // 定义用于合并结果的数组
    int[] mergedArray = new int[leftArray.length + rightArray.length];
    int mergedArrayPos = 0;
    int leftArrayPos = 0;
    int rightArrayPos = 0;
    while (leftArrayPos < leftArray.length && rightArrayPos < rightArray.length) {
      if (leftArray[leftArrayPos] <= rightArray[rightArrayPos]) {
        mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
        leftArrayPos++;
      } else {
        mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
        rightArrayPos++;
      }
      mergedArrayPos++;
    }
    while (leftArrayPos < leftArray.length) {
      mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
      leftArrayPos++;
      mergedArrayPos++;
    }
    while (rightArrayPos < rightArray.length) {
      mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
      rightArrayPos++;
      mergedArrayPos++;
    }
    return mergedArray;
  }

  /**
   * 随机生成数组
   *
   * @param size 数组的大小
   * @return
   */
  public static int[] buildRandomIntArray(final int size) {
    int[] arrayToCalculateSumOf = new int[size];
    Random generator = new Random();
    for (int i = 0; i < arrayToCalculateSumOf.length; i++) {
      arrayToCalculateSumOf[i] = generator.nextInt(100000000);
    }
    return arrayToCalculateSumOf;
  }

  public static void main(String[] args) {
//    int[] arrayToSortByMergeSort = {6, 5, 3, 1, 8, 7, 2, 4};
//    int threshold = 2;
    int[] arrayToSortByMergeSort = buildRandomIntArray(20000000);
    int threshold = 10000;
    MergeSort mergeSort = new MergeSort(arrayToSortByMergeSort, threshold);
    long startTime = System.nanoTime();
    int[] sort = mergeSort.mergeSort();
    long duration = System.nanoTime() - startTime;
//    System.out.println("执行结果:" + Arrays.toString(sort) + ",单线程归并排序时间:" + (duration / (1000f * 1000f)) + "毫秒");
    System.out.println("单线程归并排序时间:" + (duration / (1000f * 1000f)) + "毫秒");

    // 生成测试数组  用于ForkJoin排序
    int[] arrayToSortByForkJoin = Arrays.copyOf(arrayToSortByMergeSort, arrayToSortByMergeSort.length);
    // 获取处理器数量,用于配置ForkJoin线程池中的核心线程数
    int processors = Runtime.getRuntime().availableProcessors();
    // 利用ForkJoin排序
    MergeSortTask mergeSortTask = new MergeSortTask(arrayToSortByForkJoin, threshold);
    // 构建ForkJoin线程池,传入核心线程数
    ForkJoinPool forkJoinPool = new ForkJoinPool(processors);
    startTime = System.nanoTime();
    // 执行排序任务
    forkJoinPool.invoke(mergeSortTask);
    duration = System.nanoTime() - startTime;
    System.out.println("ForkJoin排序时间: " + (duration / (1000f * 1000f)) + "毫秒");
  }
}  

执行结果对比

单线程归并排序时间:3302.956毫秒
ForkJoin排序时间: 1592.1223毫秒

根据测试结果可以看出,数组越大,利用Fork/Join框架实现的并行化归并排序比单线程归并排序的效率更高。 

在这个示例中,使用Fork/Join框架实现了归并排序算法,并通过递归调用实现了并行化。使用Fork/Join框架实现归并排序算法的关键在于将排序任务分解成小的任务,使用Fork/Join框架将这些小任务提交给线程池中的不同线程并行执行,并在最后将排序后的结果进行合并。这样可以充分利用多核CPU的并行处理能力,提高程序的执行效率。 

并行实现归并排序的优化和注意事项

在实际应用中,我们需要考虑数据分布的均匀性、内存使用情况、线程切换开销等因素,以充分利用多核CPU并保证算法的正确性和效率。

以下是并行实现归并排序的一些优化和注意事项:

  • 任务的大小:任务大小的选择会影响并行算法的效率和负载均衡,如果任务太小,会造成任务划分和合并的开销过大;如果任务太大,会导致任务无法充分利用多核CPU并行处理能力。因此,在实际应用中需要根据数据量、CPU核心数等因素选择合适的任务大小。
  • 负载均衡:并行算法需要保证负载均衡,即各个线程执行的任务大小和时间应该尽可能相等,否则会导致某些线程负载过重,而其他线程负载过轻的情况。在归并排序中,可以通过递归调用实现负载均衡,但是需要注意递归的层数不能太深,否则会导致任务划分和合并的开销过大。
  • 数据分布:数据分布的均匀性也会影响并行算法的效率和负载均衡。在归并排序中,如果数据分布不均匀,会导致某些线程处理的数据量过大,而其他线程处理的数据量过小的情况。因此,在实际应用中需要考虑数据的分布情况,尽可能将数据分成大小相等的子数组。
  • 内存使用:并行算法需要考虑内存的使用情况,特别是在处理大规模数据时,内存的使用情况会对算法的执行效率产生重要影响。在归并排序中,可以通过对数据进行原地归并实现内存的节约,但是需要注意归并的实现方式,以避免数据的覆盖和不稳定排序等问题。
  • 线程切换:线程切换是并行算法的一个重要开销,需要尽量减少线程的切换次数,以提高算法的执行效率。在归并排序中,可以通过设置线程池的大小和调整任务大小等方式控制线程的数量和切换开销,以实现算法的最优性能。

Fork/Join框架介绍

什么是Fork/Join

Fork/Join是一个是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。它的核心思想是将一个大任务分成许多小任务,然后并行执行这些小任务,最终将它们的结果合并成一个大的结果。它适用于可以采用分治策略的计算密集型任务,例如大规模数组的排序、图形的渲染、复杂算法的求解等。

应用场景

1、并行计算:

ForkJoinPool 提供了一种方便的方式来执行大规模的计算任务,并充分利用多核处理器的性能优势。通过将大任务分解成小任务,并通过工作窃取算法实现任务的并行执行,可以提高计算效率。

2、递归任务处理:

ForkJoinPool 特别适用于递归式的任务分解和执行。它可以将一个大任务递归地分解成许多小任务,并通过工作窃取算法动态地将这些小任务分配给工作线程执行。

3、并行流操作:

Java 8 引入了 Stream API,用于对集合进行函数式编程风格的操作。ForkJoinPool 通常用于执行并行流操作中的并行计算部分,例如对流中的元素进行过滤、映射、聚合等操作。

4、高性能任务执行:

ForkJoinPool 提供了一种高性能的任务执行机制,通过对任务进行动态调度和线程池管理,可以有效地利用系统资源,并在多核处理器上实现任务的并行执行。

总的来说,ForkJoinPool 类在 Java 中具有广泛的应用场景,特别适用于大规模的并行计算任务和递归式的任务处理。它通过工作窃取算法和任务分割合并机制,提供了一种高效的并行计算方式,可以显著提高计算效率和性能。

Fork/Join使用

Fork/Join框架的主要组成部分是ForkJoinPool、ForkJoinTask。ForkJoinPool是一个线程池,它用于管理ForkJoin任务的执行。ForkJoinTask是一个抽象类,用于表示可以被分割成更小部分的任务。

ForkJoinPool

ForkJoinPool是Fork/Join框架中的线程池类,它用于管理Fork/Join任务的线程。ForkJoinPool类包括一些重要的方法,例如submit()、invoke()、shutdown()、awaitTermination()等,用于提交任务、执行任务、关闭线程池和等待任务的执行结果。ForkJoinPool类中还包括一些参数,例如线程池的大小、工作线程的优先级、任务队列的容量等,可以根据具体的应用场景进行设置。

构造器

ForkJoinPool中有四个核心参数,用于控制线程池的并行数、工作线程的创建、异常处理和模式指定等。各参数解释如下:

  • int parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定工作线程的数量。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()来设置并行级别。
  • ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过factory来创建。注意,这里需要实现的是ForkJoinWorkerThreadFactory,而不是ThreadFactory。如果你不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作。
  • UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理。
  • boolean asyncMode:设置队列的工作模式。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式。
// 获取处理器数量
int processors = Runtime.getRuntime().availableProcessors();
// 构建forkjoin线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(processors);

任务提交方式

任务提交是ForkJoinPool的核心能力之一,提交任务有三种方式:

返回值

方法

提交异步执行

void

execute(ForkJoinTask task)

execute(Runnable task)

等待并获取结果

T

invoke(ForkJoinTask task)

提交执行获取Future结果

ForkJoinTask

submit(ForkJoinTask task)

submit(Callable task)

submit(Runnable task)

submit(Runnable task, T result)

ForkJoinTask

ForkJoinTask是Fork/Join框架中的抽象类,它定义了执行任务的基本接口。用户可以通过继承ForkJoinTask类来实现自己的任务类,并重写其中的compute()方法来定义任务的执行逻辑。通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下三个子类:

  • RecursiveAction:用于递归执行但不需要返回结果的任务。
  • RecursiveTask :用于递归执行需要返回结果的任务。
  • CountedCompleter :在任务完成执行后会触发执行一个自定义的钩子函数

调用方法

ForkJoinTask 最核心的是 fork() 方法和 join() 方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。

  • fork()——提交任务

fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。

  • join()——获取任务执行结果

join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2263178.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于多尺度动态卷积的图像分类

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

[Linux] 信号保存与处理

&#x1fa90;&#x1fa90;&#x1fa90;欢迎来到程序员餐厅&#x1f4ab;&#x1f4ab;&#x1f4ab; 主厨&#xff1a;邪王真眼 主厨的主页&#xff1a;Chef‘s blog 所属专栏&#xff1a;青果大战linux 总有光环在陨落&#xff0c;总有新星在闪烁 信号的保存 下面的概…

计算机网络-GRE Over IPSec实验

一、概述 前情回顾&#xff1a;上次基于IPsec VPN的主模式进行了基础实验&#xff0c;但是很多高级特性没有涉及&#xff0c;如ike v2、不同传输模式、DPD检测、路由方式引入路由、野蛮模式等等&#xff0c;以后继续学习吧。 前面我们已经学习了GRE可以基于隧道口实现分支互联&…

进网许可认证、交换路由设备检测项目更新25年1月起

实施时间 2025年1月1日起实施 涉及设备范围 核心路由器、边缘路由器、以太网交换机、三层交换机、宽带网络接入服务器&#xff08;BNAS&#xff09; 新增检测依据 GBT41266-2022网络关键设备安全检测方法交换机设备 GBT41267-2022网络关键设备安全技术要求交换机设备 GB/…

用C#(.NET8)开发一个NTP(SNTP)服务

完整源码&#xff0c;附工程下载&#xff0c;工程其实也就下面两个代码。 想在不能上网的服务器局域网中部署一个时间服务NTP&#xff0c;当然系统自带该服务&#xff0c;可以开启&#xff0c;本文只是分享一下该协议报文和能跑的源码。网上作为服务的源码不太常见&#xff0c;…

Connection lease request time out 问题分析

Connection lease request time out 问题分析 问题背景 使用apache的HttpClient&#xff0c;我们知道可以通过setConnectionRequestTimeout()配置从连接池获取链接的超时时间&#xff0c;而Connection lease request time out正是从连接池获取链接超时的报错&#xff0c;这通常…

【课程论文系列实战】:随机对照实验驱动的电商落地页优化

数据与代码见文末 摘要 随机对照试验&#xff08;Randomized Controlled Trial&#xff0c;RCT&#xff09;被认为是因果推断的“金标准”方法。通过随机分配实验参与者至不同组别&#xff0c;确保了组间可比性&#xff0c;RCT能够有效地消除样本选择偏差和混杂变量问题。本文…

UML 建模实验

文章目录 实验一 用例图一、安装并熟悉软件EnterpriseArchitect16二、用例图建模 实验二 类图、包图、对象图类图第一题第二题 包图对象图第一题第二题 实验三 顺序图、通信图顺序图银行系统学生指纹考勤系统饮料自动销售系统“买到饮料”“饮料已售完”“无法找零”完整版 通信…

高质量翻译如何影响软件用户体验 (UX)

在软件开发领域&#xff0c;用户体验 (UX) 是决定产品成败的关键因素之一。一个流畅、吸引人且直观的用户体验可以决定一款软件的成功与否。在影响优秀用户体验的众多因素中&#xff0c;高质量翻译尤为重要&#xff0c;尤其是在当今全球化的市场环境中。确保软件为不同语言和文…

ArcGIS Pro 3.4新功能2:Spatial Analyst新特性,密度、距离、水文、太阳能、表面、区域分析

Spatial Analyst 扩展模块在 ArcGIS Pro 3.4 中引入了新功能和增强功能。此版本为您提供了用于表面和区域分析的新工具以及改进的密度和距离分析功能&#xff0c;多种用于水文分析的工具性能的提高&#xff0c;一些新的太阳能分析功能。 目录 1.密度分析 2.距离分析 3.水文…

Linux C 程序 【05】异步写文件

1.开发背景 Linux 系统提供了各种外设的控制方式&#xff0c;其中包括文件的读写&#xff0c;存储文件的介质可以是 SSD 固态硬盘或者是 EMMC 等。 其中常用的写文件方式是同步写操作&#xff0c;但是如果是写大文件会对 CPU 造成比较大的负荷&#xff0c;采用异步写的方式比较…

凯酷全科技抖音电商服务的卓越践行者

在数字经济蓬勃发展的今天&#xff0c;电子商务已成为企业增长的新引擎。随着短视频平台的崛起&#xff0c;抖音作为全球领先的短视频社交平台&#xff0c;不仅改变了人们的娱乐方式&#xff0c;也为品牌和商家提供了全新的营销渠道。厦门凯酷全科技有限公司&#xff08;以下简…

精准提升:从94.5%到99.4%——目标检测调优全纪录

&#x1f680; 目标检测模型调优过程记录 在进行目标检测模型的训练过程中&#xff0c;我们面对了许多挑战与迭代。从初始模型的训练结果到最终的调优优化&#xff0c;每一步的实验和调整都有其独特的思路和收获。本文记录了我在优化目标检测模型的过程中进行的几次尝试&#…

STM8单片机学习笔记·GPIO的片上外设寄存器

目录 前言 IC基本定义 三极管基础知识 单片机引脚电路作用 STM8GPIO工作模式 GPIO外设寄存器 寄存器含义用法 CR1&#xff1a;Control Register 1 CR2&#xff1a;Control Register 2 ODR&#xff1a;Output Data Register IDR&#xff1a;Input Data Register 赋值…

国标GB28181平台EasyGBS在安防视频监控中的信号传输(电源/视频/音频)特性及差异

在现代安防视频监控系统中&#xff0c;国标GB28181协议作为公共安全视频监控联网系统的国家标准&#xff0c;该协议不仅规范了视频监控系统的信息传输、交换和控制技术要求&#xff0c;还为不同厂商设备之间的互联互通提供了统一的框架。EasyGBS平台基于GB28181协议&#xff0c…

如何使用checkBox组件实现复选框

文章目录 概念介绍使用方法示例代码我们在上一章回中介绍了DatePickerDialog Widget相关的内容,本章回中将介绍Checkbox Widget.闲话休提,让我们一起Talk Flutter吧。 概念介绍 我们在这里说的Checkbox也是叫复选框,没有选中时是一个正方形边框,边框内容是空白的,选中时会…

基于“2+1 链动模式商城小程序”的微商服务营销策略探究

摘要&#xff1a;本文探讨在竞争激烈的市场经济与移动互联网时代背景下&#xff0c;微商面临的机遇与挑战。着重分析“21 链动模式商城小程序”如何助力微商改变思路&#xff0c;通过重视服务、提升服务质量&#xff0c;以服务营销放大利润&#xff0c;实现从传统微商模式向更具…

1-1 STM32-0.96寸OLED显示与控制

1.0 模块原理图 2.0 0.96OLED简介 资料下载&#xff1a;https://jiangxiekeji.com/download.html 程序介绍&#xff1a;https://jiangxiekeji.com/tutorial/oled.html SSD1306是一款OLED/PLED点阵显示屏的控制器&#xff0c;可以嵌入在屏幕中&#xff0c;用于执行接收数据、显…

在Visual Studio 2022中配置C++计算机视觉库Opencv

本文主要介绍下载OpenCV库以及在Visual Studio 2022中配置、编译C计算机视觉库OpenCv的方法 1.Opencv库安装 ​ 首先&#xff0c;我们需要安装OpenCV库&#xff0c;作为一个开源库&#xff0c;我们可以直接在其官网下载Releases - OpenCV&#xff0c;如果官网下载过慢&#x…

QT:QDEBUG输出重定向和命令行参数QCommandLineParser

qInstallMessageHandler函数简介 QtMessageHandler qInstallMessageHandler(QtMessageHandler handler) qInstallMessageHandler 是 Qt 框架中的一个函数&#xff0c;用于安装一个全局的消息处理函数&#xff0c;以替代默认的消息输出机制。这个函数允许开发者自定义 Qt 应用…