【并发编程】ForkJoinPool工作原理分析

news2025/1/16 2:03:24

目录

  • 前置内容
  • 课程内容
    • 一、由一道算法题引发的思考
      • 1.算法题
      • 2.什么是归并排序法
    • 二、什么是Fork/Join框架
      • 1.基本介绍
      • 2.ForkJoinPool
      • 2.ForkJoinPool构造函数及参数解读
      • 3.任务提交方式
      • 4.工作原理图
      • 5.工作窃取
      • 6.和普通线程池之间的区别
      • 7.ForkJoinTask
  • 学习总结

前置内容

Q1:在并发编程里面,通常我们遇到的任务类型都有哪些?
答:通常有:计算密集型(CPU密集型)、IO密集型

Q2:它们有什么区别?
答:计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。
IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

课程内容

一、由一道算法题引发的思考

1.算法题

有一道算法题,题目为:【如何充分利用多核CPU的性能,快速对一个2千万大小的数组进行排序?】
说到排序算法,我估计大家多少会有点印象的, 毕竟很多面试都会偶尔问到。比如有:冒泡排序啦,选择排序啦,快速排序等等。但是在这个2KW的体量下,显然是不适用的。或许更有经验一点的朋友们想到了一个办法了,那就是:归并排序法。
是的,这里就是我想引出的一个东西【归并排序法】。

2.什么是归并排序法

归并排序(Merge Sort)是一种基于分治思想的排序算法。归并排序的基本思想是将一个大数组分成
两个相等大小的子数组,对每个子数组分别进行排序,然后将两个子数组合并成一个有序的大数组。
因为常常使用递归实现(由先拆分后合并的性质决定的),所以我们称其为归并排序。
归并排序的步骤包括以下3个:

  • 将数组分成两个子数组
  • 对每个子数组进行排序
  • 合并两个有序的子数组

归并排序的时间复杂度为O(nlogn),空间复杂度为O(n),其中n为数组的长度。
当然,还有一个更学术一点的解释:

分治思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题
性质相同
。求出子问题的解,就可得到原问题的解。
分治思想的步骤如下:

  1. 分解:将要解决的问题划分成若干规模较小的同类问题;
  2. 求解:当子问题划分得足够小时,用较简单的方法解决;
  3. 合并:按原问题的要求,将子问题的解逐层合并构成原问题的解。
    计算机十大经典算法中的归并排序、快速排序、二分查找都是基于分治思想实现的算法
    分治任务模型图如下:

这里用归并排序算法,简单实现了一下这个算法题,代码如下:

public class MergeSort {

    private final int[] arrayToSort; //要排序的数组
    private final int threshold;  //拆分的阈值,低于此阈值就不再进行拆分

    public MergeSort(final int[] arrayToSort, final int threshold) {
        this.arrayToSort = arrayToSort;
        this.threshold = threshold;
    }

    /**
     * 排序
     * @return
     */
    public int[] sequentialSort() {
        return sequentialSort(arrayToSort, threshold);
    }

    public static int[] sequentialSort(final int[] arrayToSort, int threshold) {
        //拆分后的数组长度小于阈值,直接进行排序
        if (arrayToSort.length < threshold) {
            //调用jdk提供的排序方法
            Arrays.sort(arrayToSort);
            return arrayToSort;
        }

        int midpoint = arrayToSort.length / 2;
        //对数组进行拆分
        int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);
        int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length);
        //递归调用
        leftArray = sequentialSort(leftArray, threshold);
        rightArray = sequentialSort(rightArray, threshold);
        //合并排序结果
        return merge(leftArray, rightArray);
    }

    public static int[] merge(final int[] leftArray, final int[] rightArray) {
        //定义用于合并结果的数组
        int[] mergedArray = new int[leftArray.length + rightArray.length];
        int mergedArrayPos = 0;
        int leftArrayPos = 0;
        int rightArrayPos = 0;
        while (leftArrayPos < leftArray.length && rightArrayPos < rightArray.length) {
            if (leftArray[leftArrayPos] <= rightArray[rightArrayPos]) {
                mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
                leftArrayPos++;
            } else {
                mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
                rightArrayPos++;
            }
            mergedArrayPos++;
        }

        while (leftArrayPos < leftArray.length) {
            mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
            leftArrayPos++;
            mergedArrayPos++;
        }

        while (rightArrayPos < rightArray.length) {
            mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
            rightArrayPos++;
            mergedArrayPos++;
        }

        return mergedArray;
    }

    public static void main(String[] args) {

        // 初始化一个2KW的数组
        Random random = new Random();
        int[] arrays = new int[20000000];
        for (int i = 0; i < 20000000; i++) {
            arrays[i] = random.nextInt(5);
        }


        long start1 = System.currentTimeMillis();

        // 开始拆分排序
        MergeSort mergeSort = new MergeSort(arrays, 100000);
        mergeSort.sequentialSort();
        System.out.println("任务总耗时:wasteTime=" + (System.currentTimeMillis() - start1));
    }
//    系统输出:
//    任务总耗时:wasteTime=921
} 

可以看出来,总共的耗时是:921ms(这还是我的电脑相对较好)
我想大家都看出来了,这里921ms看似不错的,那如果这个数据规模再大一点又会怎样呢?可以很负责的告诉大家,那时候就不是简单累加了,可能是指数型增长。而且细心的朋友可能已经看出来了,上面的代码完全是在单线程环境下运行(完全就是1核负重前行,其他核在岁月静好,不能忍),有没有可能,多线程环境下,运行效果会更好呢?
我们知道,我们无论做了什么优化,其实很多目的都是为了尽可能地压榨CPU资源的,像我们很多电脑一样,很多时候CPU使用率都是极低的,于是在一些大牛眼里,这样的行为非常浪费资源(/狗头/狗头)。如下图:13%的CPU使用率,资本家看了都在叹气,何其浪费啊
在这里插入图片描述
于是,大牛们为了解决上面说的【1核负重前行,其他核在岁月静好】的不公平现象,提出了一种新的线程池,ForkJoinPool(相信大家会有疑问:线程池?那为啥不用之前的ThreadPoolExecutor?别急,下面会给大家解释)

二、什么是Fork/Join框架

1.基本介绍

Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架(分治思想)。(Fork,刀叉的意思;join,连接的意思。 用叉子把某个东西分开,然后最后又连接起来)
Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+ ...+20000000,可以分割成 10 个子任务,每n个子任务分别对 2000000 个数进行求和,最终汇总这 10 个子任务的结果。如下图所示:
在这里插入图片描述

2.ForkJoinPool

ForkJoinPool是Fork/Join框架中的线程池类,它用于管理Fork/Join任务的线程。跟ThreadPoolExecutor一样,它也是继承于AbstractExecutorService类,所以它跟ThreadPoolExecutor相同的行为,例如submit()、invoke()、shutdown()、awaitTermination()等,用于提交任务、执行任务、关闭线程池和等待任务的执行结果。ForkJoinPool类中还包括一些参数,例如线程池的大小、工作线程的优先级、任务队列的容量等,可以根据具体的应用场景进行设置。类图如下:
在这里插入图片描述

它具有以下特性:

  1. ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好;
  2. ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等;
  3. ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。

就像上面说的,ForkJoinPool是ThreadPoolExecutor的补充,前者适合于计算密集型任务,所以后者通常就是适合IO密集型任务了。

2.ForkJoinPool构造函数及参数解读

在这里插入图片描述

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode)

如上面代码所示,ForkJoinPool中有4个核心参数,分别用于:控制线程池的并行数、工作线程的创建、异常处理和指定队列模式等。各参数解释如下:

  1. int parallelism:指定并行级别(parallelism level)。ForkJoinPool将根据这个设定,决定工作线程的数量。如果未设置的话,将使用Runtime.getRuntime().availableProcessors()来设置并行级别;
  2. ForkJoinWorkerThreadFactory factory:ForkJoinPool在创建线程时,会通过factory来创建。注意,这里需要实现的是ForkJoinWorkerThreadFactory,而不是ThreadFactory。如果你不指定factory,那么将由默认的DefaultForkJoinWorkerThreadFactory负责线程的创建工作;
  3. UncaughtExceptionHandler handler:指定异常处理器,当任务在运行中出错时,将由设定的处理器处理;
  4. boolean asyncMode:设置队列的工作模式。当asyncMode为true时,将使用先进先出队列,而为false时则使用后进先出的模式。

3.任务提交方式

任务提交是ForkJoinPool的核心能力之一,提交任务有三种方式:
在这里插入图片描述

4.工作原理图

还记得下面这张图吗,ThreadPoolExecutor的工作原理图:
在这里插入图片描述
ForkJoinPool与ThreadPoolExecutor不一样,它为了适应更大的并行度,对工作队列的设计做了改造。它的原理图如下:
在这里插入图片描述

ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中(如上图中的Task-4,通过fork方法继续分出了Task-4.1和Task-4.2)。
如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 支持一种叫做【任务窃取】的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务。如此一来,所有的工作线程都不会闲下来了。

5.工作窃取

ForkJoinPool与ThreadPoolExecutor有个很大的不同之处在于,ForkJoinPool存在引入了工作窃取设计,它是其性能保证的关键之一。工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法,最大限度地减少了线程竞争任务的可能性。

ForkJoinPool的大部分操作都发生在工作窃取队列(work-stealing queues ) 中,该队列由内部类WorkQueue实现。它是Deques的特殊形式,但仅支持三种操作方式:push、pop和poll(也称为窃取)。在ForkJoinPool中,队列的读取有着严格的约束,push和pop仅能从其所属线程调用,而poll则可以从其他线程调用。
通过工作窃取,Fork/Join框架可以实现任务的自动负载均衡,以充分利用多核CPU的计算能力,同时也可以避免线程的饥饿和延迟问题
在这里插入图片描述

6.和普通线程池之间的区别

  • 工作窃取算法
    ForkJoinPool采用多任务队列,以及工作窃取算法来提高线程的利用率,而普通线程池则采用共享阻塞任务队列来管理任务。在工作窃取算法中,当一个线程完成自己的任务后,它可以从其它线程的队列中获取一个任务来执行,以此来提高线程的利用率。
  • 任务的分解和合并
    ForkJoinPool可以将一个大任务分解为多个小任务,并行地执行这些小任务,最终将它们的结果合并起来得到最终结果。而普通线程池只能按照提交的任务顺序一个一个地执行任务。
  • 工作线程的数量
    ForkJoinPool会根据当前系统的CPU核心数来自动设置工作线程的数量,以最大限度地发挥CPU的性能优势。而普通线程池需要手动设置线程池的大小,如果设置不合理,可能会导致线程过多或过少,从而影响程序的性能。
  • 任务类型
    ForkJoinPool适用于执行大规模任务并行化,而普通线程池适用于执行一些短小的任务,如处理请求等。

7.ForkJoinTask

ForkJoinTask是Fork/Join框架中的抽象类,它定义了执行任务的基本接口。用户可以通过继承ForkJoinTask类来实现自己的任务类,并重写其中的compute()方法来定义任务的执行逻辑。通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下三个子类:

RecursiveAction:用于递归执行但不需要返回结果的任务。
RecursiveTask :用于递归执行需要返回结果的任务。
CountedCompleter :在任务完成执行后会触发执行一个自定义的钩子函数

ForkJoinTask 最核心的是 fork() 方法和 join() 方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。

fork()——提交任务
fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。

join()——获取任务执行结果
join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。

学习总结

  1. 学习了ForkJoinPool,它的原理,以及工作队列设计
  2. 学习了ForkJoinPool与ThreadPoolExecutor的区别

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/810425.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数学建模学习(7):Matlab绘图

一、二维图像绘制 1.绘制曲线图 最基础的二维图形绘制方法&#xff1a;plot -plot命令自动打开一个图形窗口Figure&#xff1b; 用直线连接相邻两数据点来绘制图形 -根据图形坐标大小自动缩扩坐标轴&#xff0c;将数据标尺及单位标注自动加到两个坐标轴上&#xff0c;可自定…

类和对象|六个默认成员函数|const成员函数|运算符重载

文章目录 默认成员构造函数1. 构造函数1.1 概念1.2 特性 2. 析构函数2.1 概念2.2 特性 3. 拷贝构造函数3.1 概念3.2 特性 4. 运算符重载4.1 赋值重载4.2 自增自减重载4.3 取地址操作符重载 5. const成员函数6. 取地址重载 默认成员构造函数 上一节我们说过&#xff0c;空类的大…

行为型:发布订阅模式

定义   发布订阅模式是基于一个事件&#xff08;主题&#xff09;通道&#xff0c;希望接收通知的对象Subscriber&#xff08;订阅者&#xff09;通过自定义事件订阅主题&#xff0c;被激活事件的对象 Publisher &#xff08;发布者&#xff09;通过发布主题事件的方式通知订…

STM32F103利用CubeMX配置开启定时中断

1、外部晶振8MHz&#xff0c;下载方式SWD模式&#xff0c;需求配置定时器1&#xff0c;产生每100ms一次中断 新建工程、配置晶振、选择下载方式等略 2、查阅资料&#xff0c;STM32F103的时钟树分配 3、配置CubeMX的时钟树 4、配置定时器-开启定时中断 5、配置定时时间 &…

VoxPoser:使用大语言模型(GPT-4)来对机器人操作的可组合三维值图【论文解读】

这是最近斯坦福的李飞飞团队的一篇论文:VoxPoser: Composable 3D Value Maps for Robotic Manipulation with Language Models 主要是通过大语言模型LLM和视觉语言模型VLM结合&#xff0c;来对机器人做各种日常操作&#xff0c;我们可以先来看下实际效果&#xff1a;大语言模型…

使用LangChain构建问答聊天机器人案例实战(一)

使用LangChain构建问答聊天机器人案例实战 现场演示GPT-4代码生成 本节我们会通过一个综合案例,跟大家讲解LangChain,这个案例产生的代码会直接在浏览器中运行,并且会输出结果,如图14-1所示,用户问:“What was the highest close price of IBM?”(“IBM的最高收盘价是…

【Linux命令200例】mdel删除指定目录下的多个文件

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;全栈领域新星创作者✌&#xff0c;2023年6月csdn上海赛道top4。 &#x1f3c6;本文已收录于专栏&#xff1a;Linux命令大全。 &#x1f3c6;本专栏我们会通过具体的系统的命令讲解加上鲜活的实操案例对各个命令进行深入…

基于注解手写Spring的IOC(上)

一、思路 先要从当前类出发找到对应包下的所有类文件&#xff0c;再从这些类中筛选出类上有MyComponent注解的类&#xff1b;把它们都装入Map中&#xff0c;同时类属性完成MyValue的赋值操作。 二、具体实现 测试类结构&#xff1a; 测试类&#xff1a;myse、mycontor、BigSt…

hcq1-1300-d

禾川的产品&#xff1a;版本V3.22 网口1&#xff1a; IPV4&#xff1a;192.168.188.100 子网掩码&#xff1a;255.255.255.0 网口2&#xff1a; IPV4&#xff1a;192.168.88.100 子网掩码&#xff1a;255.255.255.0 功能按键&#xff1a; 旋转拨码0 切换 SYS\IN\OUT 指示灯及…

NLP From Scratch: 基于注意力机制的 seq2seq 神经网络翻译

NLP From Scratch: 基于注意力机制的 seq2seq 神经网络翻译 这是关于“从头开始进行 NLP”的第三篇也是最后一篇教程&#xff0c;我们在其中编写自己的类和函数来预处理数据以完成 NLP 建模任务。 我们希望在完成本教程后&#xff0c;您将继续学习紧接着本教程的三本教程&…

C#,数值计算——对数正态分布(logarithmic normal distribution)的计算方法与源程序

对数正态分布&#xff08;logarithmic normal distribution&#xff09;是指一个随机变量的对数服从正态分布&#xff0c;则该随机变量服从对数正态分布。对数正态分布从短期来看&#xff0c;与正态分布非常接近。但长期来看&#xff0c;对数正态分布向上分布的数值更多一些。 …

【机器学习】Gradient Descent

Gradient Descent for Linear Regression 1、梯度下降2、梯度下降算法的实现(1) 计算梯度(2) 梯度下降(3) 梯度下降的cost与迭代次数(4) 预测 3、绘图4、学习率 首先导入所需的库&#xff1a; import math, copy import numpy as np import matplotlib.pyplot as plt plt.styl…

Pytest学习教程_装饰器(二)

前言 pytest装饰器是在使用 pytest 测试框架时用于扩展测试功能的特殊注解或修饰符。使用装饰器可以为测试函数提供额外的功能或行为。   以下是 pytest 装饰器的一些常见用法和用途&#xff1a; 装饰器作用pytest.fixture用于定义测试用例的前置条件和后置操作。可以创建可重…

读发布!设计与部署稳定的分布式系统(第2版)笔记26_安全性上

1. 安全问题 1.1. 系统违规并不总是涉及数据获取&#xff0c;有时会出现植入假数据&#xff0c;例如假身份或假运输文件 1.2. 必须在整个开发过程中持续地把安全内建到系统里&#xff0c;而不是把安全像胡椒面那样在出锅前才撒到系统上 2. OWASP 2.1. Open Web Application…

DataStructure--Basic

程序设计数据结构算法 只谈数据结构不谈算法就跟去话剧院看梁山伯与祝英台结果只有梁山伯在演&#xff0c;祝英台生病了没来一样。 本文的所有内容都出自《大话数据结构》这本书中的代码实现部分&#xff0c;建议看书&#xff0c;书中比我本文写的全。 数据结构&#xff0c;直…

论文笔记——Influence Maximization in Undirected Networks

Influence Maximization in Undirected Networks ContributionMotivationPreliminariesNotations Main resultsReduction to Balanced Optimal InstancesProving Theorem 3.1 for Balanced Optimal Instances Contribution 好久没发paper笔记了&#xff0c;这篇比较偏理论&…

【笔试强训选择题】Day32.习题(错题)解析

作者简介&#xff1a;大家好&#xff0c;我是未央&#xff1b; 博客首页&#xff1a;未央.303 系列专栏&#xff1a;笔试强训选择题 每日一句&#xff1a;人的一生&#xff0c;可以有所作为的时机只有一次&#xff0c;那就是现在&#xff01;&#xff01; 文章目录 前言 一、Da…

线性代数(应用篇):第五章:特征值与特征向量、第六章:二次型

文章目录 第5章 特征值与特征向量、相似矩阵(一) 特征值与特征向量1.定义2.性质3.求解(1)具体型矩阵试根法、多项式带余除法&#xff1a;三阶多项式分解因式 (2)抽象型矩阵 (二) 相似1.矩阵相似(1)定义(2)性质 2.相似对角化(1)定义(2)相似对角化的条件&#xff08;n阶矩阵A可相…

自动化运维工具——Ansible

自动化运维工具——Ansible 一、Ansible概述二、ansible 环境安装部署1.管理端安装 ansible2.ansible 目录结构3.配置主机清单4.配置密钥对验证 三、ansible 命令行模块1.command 模块2.shell 模块3.cron 模块4.user 模块5.group 模块6.copy 模块7.file 模块8.hostname 模块9&a…

LeetCode102.Binary-Tree-Level-Order-Traversal<二叉树的层序遍历>

题目&#xff1a; 思路&#xff1a; 写过N叉树的层序遍历&#xff0c;(8条消息) LeetCode429.N-Ary-Tree-Level-Order-Traversal&#xff1c;N 叉树的层序遍历&#xff1e;_Eminste的博客-CSDN博客 使用栈保存每一层的结点。然后每次当前层结束。将这一层的值添加进去res中。…