ForkJoin框架

news2024/12/23 4:45:05

1. ForkJoin框架概述

ForkJoin模式先把一个大任务分解成许多个独立的子任务,然后开启多个线程并行去处理这些子任务。有可能子任务还是很大而需要进一步分解,最终得到足够小的任务。ForkJoin模式的任务分解和执行过程大致如下图所示。
在这里插入图片描述
ForkJoin模式借助于现代计算机多核的优势并行处理数据。通常情况下,ForkJoin模式将分解出来的子任务放入双端队列中,然后几个启动线程从双端队列中获取任务并执行。子任务执行的结果放到一个队列中,各个线程从队列中获取数据,然后进行局部结果的合并,得到最终结果。

2. ForkJoin框架

JUC包提供了一套ForkJoin框架的实现。具体以ForkJoinPool线程池的形式提供,并且该线程池在Java 8的Lambda并行流框架中充当着底层框架的角色。JUC包的ForkJoin框架包含如下组件:

  • ForkJoinPool:执行任务的线程池,继承了AbstractExecutorService类
  • ForkJoinWorkerThread:执行任务的工作线程(ForkJoinPool线程池中的线程),每个线程都维护者一个内部队列,用于存放“内部任务”该类继承了Thread类
  • ForkJoinTask:用于ForkJoinPool的任务抽象类,实现了Future接口
  • WorkQueue:在ForkJoin框架里面,有一个WorkQueue[]
    分为两种:1.有工作线程绑定的任务队列:通常在WorkQueue数组的奇数位索引上,里面的任务,由工作线程产生,比如:工作线程执行过程中,fork出的新任务;2.没有工作线程绑定的任务队列:通常在WorkQueue数组的偶数位索引上,这些队列里的任务,通常都是由其它线程提交的
  • RecurisveTask:带返回结果的递归执行任务,是ForkJoinTask的子类,在子任务带返回结果时使用
  • RecurisveAction:不返回结果的递归执行任务,是ForkJoinTask的子类,在子任务不带返回结果时使用

因为ForkJoinTask比较复杂,并且其抽象方法比较多,故在日常使用时一般不会直接继承ForkJoinTask来实现自定义的任务类,而是通过继承ForkJoinTask两个子类RecurisveTask或者RecurisveAction之一去实现自定义任务类,自定义任务类需要实现这些子类的compute()方法,该方法的执行流程一般如下:

if 任务足够小
	直接返回结果
else
	分割成n个子任务
	依次调用每个子任务的fork方法执行子任务
	依次调用每个子任务的join方法,等待子任务完成,然后合并执行结果

3. ForkJoin框架使用实践

假设需要计算0~10000的累加求和,可以使用ForkJoin框架完成,首先需要设计一个可以递归执行的异步任务子类。

3.1 可递归执行的异步任务类AccumulateTask

public class AccumulateTask extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 100;
    // 累加的起始编号
    private int start;
    // 累加的结束编号
    private int end;

    public AccumulateTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        // 判断任务的规模:若规模小则可以直接计算
        boolean canCompute = (end - sum) <= THRESHOLD;
        // 若任务足够小,则可以直接计算
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            System.out.println(Thread.currentThread().getName() + ":执行任务,计算 " + start + " 到 " + end + " 的和,结果是:" + sum);
        } else {
            // 任务过大,需要切割,Recursive递归计算
            System.out.println(Thread.currentThread().getName() + ":切割任务:将 " + start + " 到 " + end + " 的和一分为二");
            int middle = (start + end) / 2;
            // 切割成两个任务
            AccumulateTask task1 = new AccumulateTask(start, middle);
            AccumulateTask task2 = new AccumulateTask(middle + 1, end);
            // 依次调用每个子任务的fork()方法执行子任务
            task1.fork();
            task2.fork();
            // 依次调用每个子任务的join()方法合并执行结果
            Integer leftResult = task1.join();
            Integer rightResult = task2.join();
            // 合并子任务执行结果
            sum = leftResult + rightResult;
        }
        return sum;
    }
}

自定义的异步任务子类AccumulateTask继承自RecursiveTask,每一次执行可以携带返回值。AccumulateTask通过THRESHOLD常量设置子任务分割的阈值。并在它compute()方法中进行阈值判断,判断的逻辑如下:

  • 若当前的计算规模(这里为求和的数字个数)大于THRESHOLD,则当前子任务需要进一步分解,若当前的计算规模没有大于THRESHOLD,则直接计算(这里为求和)
  • 如果子任务可以直接执行,就进行求和操作,并返回结果。如果任务进行了分解,就需要等待所以的子任务执行完毕,然后对各个分割结果求和。如果一个任务分解为多个子任务(含两个),就依次调用每个子任务的fork()方法执行子任务,然后依次调用每个子任务的join()方法合并执行结果

3.2 使用ForkJoinPool调度AccumulateTask()

使用ForkJoinPool调度AccumulateTask()的示例代码如下:

public class ForkJoinTest {
    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        // 创建一个累加任务,计算由1到1000的和
        AccumulateTask accumulateTask = new AccumulateTask(1, 1000);
        ForkJoinTask<Integer> future = forkJoinPool.submit(accumulateTask);
        Integer sum = null;
        try {
            sum = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            forkJoinPool.shutdown();
        }
        System.out.println(Thread.currentThread().getName() + ":最终的计算结果:" + sum);
    }
}

执行以上代码,部分结果如下:

ForkJoinPool-1-worker-3:切割任务:将 11000 的和一分为二
ForkJoinPool-1-worker-3:切割任务:将 1500 的和一分为二
ForkJoinPool-1-worker-5:切割任务:将 5011000 的和一分为二
ForkJoinPool-1-worker-7:切割任务:将 1250 的和一分为二
ForkJoinPool-1-worker-5:切割任务:将 501750 的和一分为二
ForkJoinPool-1-worker-7:切割任务:将 1125 的和一分为二
ForkJoinPool-1-worker-5:切割任务:将 501625 的和一分为二
ForkJoinPool-1-worker-1:切割任务:将 126250 的和一分为二
ForkJoinPool-1-worker-5:执行任务,计算 501563 的和,结果是:33516
ForkJoinPool-1-worker-1:执行任务,计算 126188 的和,结果是:9891
ForkJoinPool-1-worker-7:执行任务,计算 163 的和,结果是:2016
ForkJoinPool-1-worker-5:执行任务,计算 564625 的和,结果是:36859
ForkJoinPool-1-worker-1:执行任务,计算 189250 的和,结果是:13609
ForkJoinPool-1-worker-7:执行任务,计算 64125 的和,结果是:5859
ForkJoinPool-1-worker-5:切割任务:将 626750 的和一分为二
ForkJoinPool-1-worker-7:切割任务:将 7511000 的和一分为二
ForkJoinPool-1-worker-5:执行任务,计算 626688 的和,结果是:41391
ForkJoinPool-1-worker-1:切割任务:将 251500 的和一分为二
ForkJoinPool-1-worker-5:执行任务,计算 689750 的和,结果是:44609
ForkJoinPool-1-worker-7:切割任务:将 751875 的和一分为二
ForkJoinPool-1-worker-5:切割任务:将 8761000 的和一分为二
ForkJoinPool-1-worker-7:执行任务,计算 751813 的和,结果是:49266
ForkJoinPool-1-worker-5:执行任务,计算 876938 的和,结果是:57141
ForkJoinPool-1-worker-1:切割任务:将 251375 的和一分为二
ForkJoinPool-1-worker-5:执行任务,计算 9391000 的和,结果是:60109
ForkJoinPool-1-worker-7:执行任务,计算 814875 的和,结果是:52359
ForkJoinPool-1-worker-1:执行任务,计算 251313 的和,结果是:17766
ForkJoinPool-1-worker-5:执行任务,计算 314375 的和,结果是:21359
ForkJoinPool-1-worker-7:切割任务:将 376500 的和一分为二
ForkJoinPool-1-worker-7:执行任务,计算 376438 的和,结果是:25641
ForkJoinPool-1-worker-5:执行任务,计算 439500 的和,结果是:29109
main:最终的计算结果:500500

Process finished with exit code 0

4. ForkJoin框架的核心API

ForkJoin框架的核心是ForkJoinPool线程池。该线程池使用一个无锁的栈来管理空闲线程,如果一个工作线程暂时取不到可用的任务,则可能被挂起,而挂起的线程将被压入由ForkJoinPool维护的栈中,将有新任务到来时,再从栈中唤醒这些线程。

4.1 ForkJoinPool的构造器

public ForkJoinPool(int parallelism, // 并行度,默认为CPU核心数,最小为1
                        ForkJoinWorkerThreadFactory factory, // 线程创建工厂
                        UncaughtExceptionHandler handler, // 异常处理程序
                        boolean asyncMode) // 是否为异步模式 
                        {
        this(parallelism, factory, handler, asyncMode,
             0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
    }

对以上构造器的4个参数具体介绍如下:

  • parallelism:可并行级别
    ForkJoin框架将依赖 parallelism 设定的级别决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理。但 parallelism 属性并不是ForkJoin框架中最大的线程数量。该属性和ThreadPoolExecutor线程池中的corePoolSize、maxmumPoolSize属性有区别,因为ForkJoinPool的结构和工作方式与ThreadPoolExecutor完全不一样。ForkJoin框架中可存在的线程数量和 parallelism 参数值并不是绝对关联的
  • factory:线程创建工厂
    当ForkJoin框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现ThreadFactory接口,而是需要实现ForkJoinWorkerThreadFactory接口。后者是一个函数式接口,只需要实现一个名叫newThread()的方法。在ForkJoin()框架中有一个默认的ForkJoinWorkerThreadFactory接口实现DefaultForkJoinWorkerThreadFactory
  • handler:异常 捕获处理程序
    当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获
  • asyncMode:异步模式
    asyncMode参数表示任务是否为异步模式,其默认值为false。如果asyncMode为true,就表示子任务的执行遵循FIFO(先进先出)顺序,并且子任务不能被合并;如果asyncMode为false,就表示子任务的执行遵循LIFO(后进先出)顺序,并且子任务可以被合并。虽然从字面意思来看asyncMode是指异步模式,它并不是指ForkJoin框架的调度模式采用是同步模式还是异步模式工作,仅仅指任务的调度方式。ForkJoin框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。asyncMode模式的主要意思指的是待执行任务可以使用FIFO(先进先出)的工作模式,也可以使用LIFO(后进先出)的工作模式,工作模式为FIFO(先进先出)的任务适用于工作线程只负责运行异步任务,不需要合并结果的异步任务。

ForkJoinPool无参数的,默认构造器如下:

public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false,
             0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
    }

该构造器的parallelism值为CPU核数;factory为defaultForkJoinWorkerThreadFactory默认的线程工厂;异常捕获处理程序handler为null,表示不进行异常处理;异步模式asyncMode值为false,使用LIFO(后进先出)的,可以合并子任务的模式。

4.2 向ForkJoinPool綫程池提交任务的方式

可以向ForkJoinPool线程池提交以下两类任务:

  • 外部任务(External/Submissions Task)提交
    向ForkJoinPool提交外部任务有三种方式:方式一是调用invoke()方法,该方法提交任务后线程会等待,等待任务计算完成返回结果;方式二是调用execute()方法提交一个任务来异步执行,无返回结果;方式三是调用submit()方法提交一个任务,并且会返回一个ForkJoinTask实现,之后适当的时候可以通过ForkJoinTask实例获取执行结果。
  • 子任务(Worker Task)提交
    向ForkJoinPool提交子任务的方法相对比较简单,由任务实例的fork()方法完成。当任务被分割之后,内部会调用ForkJoinPool.WorkQueue.push()方法直接把任务放到内部队列中等待被执行。

4.3 工作窃取算法

ForkJoinPool线程池的任务分为“外部任务”和“内部任务”,两种任务的存放为止不同:

  • 外部任务存放在ForkJoinPool的全局队列中
  • 子任务会作为“内部任务”放到内部队列中,ForkJoinPool线程池中的每个线程都维护着一个内部队列,用于存放这些“内部任务”

由于ForkJoinPool线程池通常有多个工作线程,与之相对应的就会有多个任务队列,这就会出现任务分配不均衡的问题:有的队列任务多、有的队列没有任务,一直空闲。那么有没有一种机制帮忙将任务从繁忙的线程分摊给空闲的线程呢?答案是使用工作窃取算法。

工作窃取算法的核心思想是:工作线程自己的活干完之后,会去看别人有没有没干完的活,如果有就拿过来帮忙干。工作窃取算法的主要逻辑:每个线程拥有一个双端队列(本地队列),用于存放需要执行的任务,当自己的队列没有任务时,可以从其他线程的任务队列中获得一个任务继续执行。
在这里插入图片描述
在实际进行任务窃取操作的时候,操作线程会进行其他线程的任务队列的扫描和任务出队尝试。为什么说是尝试?因为完全有可能操作失败,主要原因是并行执行肯定设计线程安全的问题,假如在窃取过程中该任务已经开始执行,那么任务的窃取操作就会失败。

如何尽可能避免在任务窃取中发生的线程安全问题?一种简单的优化方法是:在线程自己的本地队列采用LIFO(后进先出)策略,窃取其他任务队列的任务时采用FIFO(先进先出)策略。简单来说,获取自己队列的任务时从头开始,窃取其他队列的任务时从尾开始。由于窃取的动作十分快速,会大量降低这种冲突。也是一种优化方式。

4.4 ForkJoin框架原理

ForkJoin框架的核心原理大致如下:

  • ForkJoin框架的线程池ForkJoinPool的任务分为“外部任务”和“内部任务”
  • “外部任务”放在ForkJoinPool的全局队列中
  • ForkJoinPool池中的每个线程都维护着一个任务队列,用于存放“内部任务”,线程切割任务得到的子任务会作为“内部任务”放到内部队列中
  • 当工作线程想要拿到子任务的计算结果时,先判断子任务有没有完成,如果没有完成,再判断子任务有没有被其他线程“窃取”,如果子任务没有被窃取,就由本线程来完成;一旦子任务被窃取了,就去执行本线程“内部队列”的其他任务,或者扫描其他的任务队列并窃取任务
  • 当工作线程完成其“内部任务”,处于空闲状态时,就会扫描其他的任务队列窃取任务,尽可能不会阻塞等待

总之,ForkJoin线程在等待一个任务完成时,要么自己来完成这个任务,要么其他线程窃取这个任务的情况下,去执行其他任务,是不会阻塞等待的。从而避免资源浪费,除非所有任务队列都为空。

工作窃取算法的优点如下:

  • 线程是不会因为等待某个子任务的执行或者没有内部任务要执行而被阻塞等待,挂起的,而是会扫描所有的队列窃取任务,直到所有队列都为空时才会被挂起
  • ForkJoin框架为每个线程维护着一个内部任务以及一个全局的任务队列,而且任务队列都是双向队列,可从首尾两端来获取任务,极大地减少了竞争的可能性,提高并行的性能。

ForkJoinPool适合需要“分而治之”的场景,特别是分治之后递归调用的函数。例如快速排序、二分搜索、大整数除法、矩阵乘法、棋盘覆盖、归并排序、线性时间选择、汉诺塔问题等。ForkJoinPool适合调度的任务为CPU密集型任务,如果任务存在I/O操作、线程同步操作、sleep()睡眠等较长时间阻塞的情况,最好配置使用ManagedBlocker进行阻塞管理。总体来说,ForkJoinPool不适合进行IO密集型、混合型的任务调用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/903230.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NSS [CISCN 2019初赛]Love Math

NSS [CISCN 2019初赛]Love Math 开题直接给源码 <?php error_reporting(0); //听说你很喜欢数学&#xff0c;不知道你是否爱它胜过爱flag if(!isset($_GET[c])){show_source(__FILE__); }else{//例子 c20-1$content $_GET[c];if (strlen($content) > 80) {die("…

差值结构的复合底部

( A, B )---3*30*2---( 1, 0 )( 0, 1 ) 让网络的输入只有3个节点&#xff0c;AB训练集各由6张二值化的图片组成&#xff0c;让A 中有3个点&#xff0c;B中有1个点&#xff0c;且不重合&#xff0c;统计迭代次数并排序。 其中有20组数据 让迭代次数与排斥能成反比&#xff0c;排…

1、Spring_IOC

IOC 1.概述 IOC&#xff1a;Inversion of Control 控制反转&#xff0c;可以让容器负责对象的创建以及销毁操作&#xff0c;对象在容器中叫 bean 2.回顾问题 问题&#xff1a;写了太多与业务无关的代码 耦合度非常高&#xff0c;写了很多和业务无关的代码不利于项目的升级迭…

分类预测 | MATLAB实现S4VM半监督支持向量机二分类预测

分类预测 | MATLAB实现S4VM半监督支持向量机二分类预测 目录 分类预测 | MATLAB实现S4VM半监督支持向量机二分类预测分类效果基本介绍程序设计参考资料 分类效果 基本介绍 分类预测 | MATLAB实现S4VM半监督支持向量机二分类预测 程序设计 完整源码和数据获取方式&#xff1a; …

高级产品经理如何以不同的方式应对挑战

我经常被问到产品经理如何晋升到更高级别。事实上&#xff0c;获得晋升往往是一场复杂的游戏。是的&#xff0c;你的技能和成就很重要&#xff0c;但其他因素也很重要&#xff0c;比如你的经理对人才培养的关心程度、你的同事有多优秀、任期有多长、公司的政治氛围如何等等。 所…

TCP编程流程(补充)

目录 1、listen&#xff1a; 2、listen、tcp三次握手 3、 发送缓冲区和接收缓冲区&#xff1a; 4、tcp编程启用多线程 1、listen&#xff1a; 执行listen会创建一个监听队列 listen(sockfd,5) 2、listen、tcp三次握手 三次握手 3、 发送缓冲区和接收缓冲区&#xff1a;…

【深入探究人工智能】:常见机器学习算法总结

文章目录 1、前言1.1 机器学习算法的两步骤1.2 机器学习算法分类 2、逻辑回归算法2.1 逻辑函数2.2 逻辑回归可以用于多类分类2.3 逻辑回归中的系数 3、线性回归算法3.1 线性回归的假设3.2 确定线性回归模型的拟合优度3.3线性回归中的异常值处理 4、支持向量机&#xff08;SVM&a…

Linux的热拔插UDEV机制

文章目录 UDEV简介守护进程基本特点 守护进程和后台进程的区别开发守护进程结束 UDEV简介 udev是一个设备管理工具&#xff0c;udev以守护进程的形式运行&#xff0c;通过侦听内核发出来的uevent来管理/dev目录下的设备文件。 udev在用户空间运行&#xff0c;而不在内核空间 …

⛳ Java 网络编程

目录 ⛳ Java 网络编程&#x1f3a8; 一、TCP / IP 协议&#x1f463; 二、IP 和 端口号&#x1f381; 三、TCP 网络层编程&#x1f3a8; 3.1、Socket⭐ 3.2、基于Socket的TCP编程 &#x1f3ed; 四、UDP网络编程&#x1f43e; 五、URL编程 ⛳ Java 网络编程 &#x1f3a8; 一…

语法篇--XML数据传输格式

一、XML概述 1.1简介 XML&#xff0c;全称为Extensible Markup Language&#xff0c;即可扩展标记语言&#xff0c;是一种用于存储和传输数据的文本格式。它是由W3C&#xff08;万维网联盟&#xff09;推荐的标准&#xff0c;广泛应用于各种系统中&#xff0c;如Web服务、数据…

Handler机制(二)

在上一篇文章中&#xff0c;我们分析了Handler基本流程&#xff0c;下面分析一些上层开发很少接触的部分。 IdleHandler 从命名可以看出IdleHandler 是Handler出现空闲时的一种机制&#xff0c;IdleHandler是一种只有当消息队列没有消息时或者当前队列中的消息还没有到执行时…

linux 上安装es

首先 到官网 https://www.elastic.co/cn/downloads/elasticsearch 下载对应的安装包&#xff0c;我这里下载的是 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.9.1-linux-x86_64.tar.gz 然后讲该压缩包上传到 linux 的/usr/local 目录下执行 tar -z…

Go语言入门指南:基础语法和常用特性解析(上)

一、Go语言前言 Go是一种静态类型的编译语言&#xff0c;常常被称作是21世纪的C语言。Go语言是一个开源项目&#xff0c;可以免费获取编译器、库、配套工具的源代码&#xff0c;也是高性能服务器和应用程序的热门选择。 Go语言可以运行在类UNIX系统——比如Linux、OpenBSD、M…

基于ChatYuan-large-v2 微调训练 医疗问答 任务

一、ChatYuan-large-v2 上篇基于ChatYuan-large-v2 语言模型 Fine-tuning 微调训练了广告生成任务&#xff0c;总体生成效果还可以&#xff0c;但上篇文章的训练是微调的模型全部的参数&#xff0c;本篇文章还是以 ChatYuan-large-v2 作为基础模型&#xff0c;继续探索仅训练解…

【100天精通python】Day38:GUI界面编程_PyQt 从入门到实战(中)_数据库操作与多线程编程

目录 专栏导读 4 数据库操作 4.1 连接数据库 4.2 执行 SQL 查询和更新&#xff1a; 4.3 使用模型和视图显示数据 5 多线程编程 5.1 多线程编程的概念和优势 5.2 在 PyQt 中使用多线程 5.3 处理多线程间的同步和通信问题 5.3.1 信号槽机制 5.3.2 线程安全的数据访问 Q…

Spring Boot整合RabbitMQ之发布与订阅模式

RabbitMQ的模式中&#xff0c;常用的模式有&#xff1a;简单模式&#xff0c;发布与订阅模式&#xff0c;工作模式&#xff0c;路由模式&#xff0c;主题模式。简单模式不太会运用到工作中&#xff0c;我们可以使用 RabbitMQ 的发布订阅模式&#xff0c;实现&#xff1a; 用户…

KUST_LI计算机视觉实验室服务器安装与管理

第一步&#xff1a;安装 Linux-Ubuntu系统 系统语言设置为英文 ENGLISH&#xff0c;防止系统 BUG&#xff1b;选择-清除整个磁盘并安装系统&#xff1b;设置用户名和密码&#xff0c;实验室统一其余全部默认设置 开机后设置磁盘挂载 在系统设置中找到 desk 打开&#xff0c;…

YOLOv7训练结果解析

前言&#xff1a; 已训练完模型&#xff0c;且把结果下载下来&#xff0c;以下某一次id识别训练结果为例&#xff0c;如下图所示。 注&#xff1a;YOLOv7每次train完成&#xff08;如果没有中途退出&#xff09;都会在run目录下生成expX目录&#xff08;X代表生成结果次数 第一…

CentOS7.9手工配置静态网络流程

进入网卡配置文件 vim /etc/sysconfig/network-scripts/ifcfg-ens33 配置 TYPE"Ethernet" PROXY_METHOD"none" BROWSER_ONLY"no" BOOTPROTO"static" //static 配置静态网络 DEFROUTE"yes" IPV4_FAILURE_FATAL"no…