Java-ForkJoinPool(线程池-工作窃取算法)

news2025/1/18 15:00:00

文章目录

    • 概述
    • 工作窃取算法
    • 工作窃取算法的优缺点
    • 使用 ForkJoinPool 进行分叉和合并
    • ForkJoinPool使用
      • RecursiveAction
      • RecursiveTask
    • Fork/Join 案例Demo

概述

在这里插入图片描述

Fork 就是把一个大任务切分为若干个子任务并行地执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。Fork/Join 框架使用的是工作窃取算法。

工作窃取算法

工作窃取算法是指某个线程从其他队列里窃取任务来执行。对于一个比较大的任务,可以把它分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务需要处理,于是它就去其他线程的队列里窃取一个任务来执行。由于此时它们访问同一个队列,为了减小竞争,通常会使用双端队列。被窃取任务的线程永远从双端队列的头部获取任务,窃取任务的线程永远从双端队列的尾部获取任务。

在这里插入图片描述

工作窃取算法的优缺点

优点:充分利用线程进行并行计算,减少了线程间的竞争。
缺点:双端队列只存在一个任务时会导致竞争,会消耗更多的系统资源,因为需要创建多个线程和多个双端队列。

使用 ForkJoinPool 进行分叉和合并

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一点不同。ForkJoinPool 让我们可以很方便地把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。可能听起来有些抽象,因此本节中我们将会解释 ForkJoinPool 是如何工作的,还有任务分割是如何进行的。

分叉和合并解释

在我们开始看 ForkJoinPool 之前我们先来简要解释一下分叉和合并的原理。
分叉和合并原理包含两个递归进行的步骤。两个步骤分别是分叉步骤和合并步骤。

分叉

一个使用了分叉和合并原理的任务可以将自己分叉(分割)为更小的子任务,这些子任务可以被并发执行。如下图所示:

在这里插入图片描述

通过把自己分割成多个子任务,每个子任务可以由不同的 CPU 并行执行,或者被同一个 CPU 上的不同线程执行。

只有当给的任务过大,把它分割成几个子任务才有意义。把任务分割成子任务有一定开销,因此对于小型任务,这个分割的消耗可能比每个子任务并发执行的消耗还要大。

什么时候把一个任务分割成子任务是有意义的,这个界限也称作一个阀值。这要看每个任务对有意义阀值的决定。很大程度上取决于它要做的工作的种类。

合并

当一个任务将自己分割成若干子任务之后,该任务将进入等待所有子任务的结束之中。一旦子任务执行结束,该任务可以把所有结果合并到同一个结果。图示如下:

在这里插入图片描述

当然,并非所有类型的任务都会返回一个结果。如果这个任务并不返回一个结果,它只需等待所有子任务执行完毕。也就不需要结果的合并啦。

ForkJoinPool使用

ForkJoinPool 是一个特殊的线程池,它的设计是为了更好的配合 分叉-和-合并 任务分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整类名为 java.util.concurrent.ForkJoinPool。

创建一个 ForkJoinPool

你可以通过其构造子创建一个 ForkJoinPool。作为传递给 ForkJoinPool 构造子的一个参数,你可以定义你期望的并行级别。并行级别表示你想要传递给 ForkJoinPool 的任务所需的线程或 CPU 数量。以下是一个 ForkJoinPool 示例:

// 这个示例创建了一个并行级别为 4 的 ForkJoinPool。   如果是默认构造会自动识别当前电脑的cup核数进行并行
ForkJoinPool forkJoinPool = new ForkJoinPool(4);

提交任务到 ForkJoinPool

就像提交任务到 ExecutorService 那样,把任务提交到 ForkJoinPool。你可以提交两种类型的任务。一种是没有任何返回值的(一个 “行动”),另一种是有返回值的(一个"任务")。这两种类型分别由 RecursiveAction 和 RecursiveTask 表示。接下来介绍如何使用这两种类型的任务,以及如何对它们进行提交。

RecursiveAction

RecursiveAction 是一种没有任何返回值的任务。它只是做一些工作,比如写数据到磁盘,然后就退出了。一个 RecursiveAction 可以把自己的工作分割成更小的几块,这样它们可以由独立的线程或者 CPU 执行。
你可以通过继承来实现一个 RecursiveAction。示例如下:

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class MyRecursiveAction extends RecursiveAction {

    private long workLoad = 0;

    public MyRecursiveAction(long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected void compute() {

        //如果工作超出阈值,将任务分解成更小的任务
        if(this.workLoad > 10) {
            System.out.println("将工作负载 : " + this.workLoad);
            //将工作负载分成多个子任务
            List<MyRecursiveAction> subtasks =
                    new ArrayList<MyRecursiveAction>();
            subtasks.addAll(createSubtasks());
            //将子任务加入到任务队列中
            for(RecursiveAction subtask : subtasks){
                subtask.fork();
            }
        } else {
            System.out.println("自己完成工作量: " + this.workLoad);
        }
    }
    //将工作负载分成多个子任务
    private List<MyRecursiveAction> createSubtasks() {
        List<MyRecursiveAction> subtasks = new ArrayList<MyRecursiveAction>();
        //将工作负载分成两个子任务   24/2=12  12/2=6
        MyRecursiveAction subtask1 = new MyRecursiveAction(this.workLoad / 2);
        MyRecursiveAction subtask2 = new MyRecursiveAction(this.workLoad / 2);
        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        MyRecursiveAction myRecursiveAction = new MyRecursiveAction(24);
        forkJoinPool.invoke(myRecursiveAction);

    }

}

在这里插入图片描述

RecursiveTask

RecursiveTask 是一种会返回结果的任务。它可以将自己的工作分割为若干更小任务,并将这些子任务的执行结果合并到一个集体结果。用法和RecursiveAction一样唯一不同的就是可以返回值

以下是一个 RecursiveTask 示例:

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

//配置RecursiveTask,返回值为Long
public class MyRecursiveTask  extends RecursiveTask<Long> {

    private long workLoad = 0;

    public MyRecursiveTask (long workLoad) {
        this.workLoad = workLoad;
    }

    @Override
    protected Long compute() {

        //如果工作超出阈值,将任务分解成更小的任务
        if(this.workLoad > 10) {
            System.out.println("将工作负载 : " + this.workLoad);
            //将工作负载分成多个子任务
            List<MyRecursiveTask > subtasks = new ArrayList<MyRecursiveTask >();
            subtasks.addAll(createSubtasks());
            //将子任务加入到任务队列中
            for(RecursiveTask subtask : subtasks){
                subtask.fork();
            }
            //等待子任务执行完,并得到其结果,并将结果相加
            long result = 0;
            for(MyRecursiveTask subtask : subtasks) {
                result += subtask.join();
            }
            return result;
        } else {
            System.out.println("自己完成工作量: " + this.workLoad);
            return 1L ;//返回计算结果
        }
    }
    //将工作负载分成多个子任务
    private List<MyRecursiveTask > createSubtasks() {
        List<MyRecursiveTask > subtasks = new ArrayList<MyRecursiveTask >();
        //将工作负载分成两个子任务   24/2=12  12/2=6
        MyRecursiveTask  subtask1 = new MyRecursiveTask (this.workLoad / 2);
        MyRecursiveTask  subtask2 = new MyRecursiveTask (this.workLoad / 2);
        subtasks.add(subtask1);
        subtasks.add(subtask2);
        return subtasks;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        MyRecursiveTask myRecursiveAction = new MyRecursiveTask(24);
        Long invoke = forkJoinPool.invoke(myRecursiveAction);
        System.out.println("最终结果: " + invoke);//4 从结果可以看出,任务被分成了4个子任务,每个子任务都是一个线程

    }

}

在这里插入图片描述

MyRecursiveTask 类继承自 RecursiveTask,这也就意味着它将返回一个 Long 类型的结果。MyRecursiveTask 示例也会将工作分割为子任务,并通过 fork() 方法对这些子任务计划执行。此外,本示例还通过调用每个子任务的 join() 方法收集它们返回的结果。子任务的结果随后被合并到一个更大的结果,并最终将其返回。对于不同级别的递归,这种子任务的结果合并可能会发生递归。

Fork/Join 案例Demo

需求:使用 Fork/Join 计算 1-10000的和,当一个任务的计算数量大于3000时拆分任务,数量小于3000时计算。

在这里插入图片描述

因为1~10000求和,耗时较少。下面我们将数据调大,求和1 ~ 59999999999(599亿),然后来对比一下使用 Fork/Join求和 和 普通求和之间的效率差异。

普通求和

    public static void main(String[] args) {
        //开始时间
        Long start = System.currentTimeMillis();
        long sum = 0l;
        for (long i = 1; i <= 59999999999L; i++) {
            sum+=i;
        }
        System.out.println(sum); //结果为负数,因为超出了long的最大值了   ,平均消耗时间:16秒
        //结束时间
        Long end = System.currentTimeMillis();
        System.out.println("消耗时间:"+(end-start));
    }

Fork/Join求和

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

//配置RecursiveTask,返回值为Long
public class SumRecursiveTask   extends RecursiveTask<Long> {

    //大于3000要拆分(创建一个变量)
    //是否要拆分的临界值
    private static final long THRESHOLD = 3000L;

    //起始值
    private final long start;
    //结束值
    private final long end;

    //构造方法(传递起始值、结束值)
    public SumRecursiveTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    //任务编写完成
    @Override
    protected Long compute() {
        long length = end - start;
        //计算
        if(length < THRESHOLD){
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum +=i;
            }
            return sum;
        }else{
            //拆分
            long middle = (start + end) /2;
            SumRecursiveTask left = new SumRecursiveTask(start,middle);//从小到大
            left.fork();
            SumRecursiveTask right = new SumRecursiveTask(middle+1,end);//从大到小
            right.fork();
            return left.join() +right.join();
        }
    }

    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //放入线程池
        ForkJoinPool pool = new ForkJoinPool();
        SumRecursiveTask task = new SumRecursiveTask(1, 59999999999L);
        Long result = pool.invoke(task);
        System.out.println("result="+result); //结果为负数,因为超出了long的最大值了   ,平均消耗时间:4秒
        Long end = System.currentTimeMillis();
        System.out.println("消耗时间:"+(end-start));
    }

}

总结: 可以发现使用工作窃取算法能大大的提高我们计算的速度,理论上只要你电脑足够快这个提升是没有上限的 ,前提是任务是可拆分的

在这里插入图片描述

点赞 -收藏-关注-便于以后复习和收到最新内容
有其他问题在评论区讨论-或者私信我-收到会在第一时间回复
在本博客学习的技术不得以任何方式直接或者间接的从事违反中华人民共和国法律,内容仅供学习、交流与参考
免责声明:本文部分素材来源于网络,版权归原创者所有,如存在文章/图片/音视频等使用不当的情况,请随时私信联系我、以迅速采取适当措施,避免给双方造成不必要的经济损失。
感谢,配合,希望我的努力对你有帮助^_^

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/41406.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《精神与爱欲》爱源于母性,且超越性别

《精神与爱欲》爱源于母性&#xff0c;且超越性别 赫尔曼黑塞&#xff08;1877-1962&#xff09;&#xff0c;作家&#xff0c;诗人&#xff0c;画家。1877年生于德国&#xff0c;1924年入籍瑞士。1946年获诺贝尔文学奖。被誉为“德国浪漫派的最后一位骑士”。 文章目录《精神与…

扩展函数和运算符重载

扩展函数和运算符重载 扩展函数 扩展函数表示在不改变某个类的源代码的情况下,仍然可以打开这个类,向该类中添加新的函数为了能够更好的理解扩展函数的功能,先来思考一个问题:给定一个字符串,这个字符串由字母,数字,特殊符号组成,我们想要统计这个字符串当中字母的个数可以这…

第十章 开源许可证

软件是一种著作&#xff0c;天然是拥有版权的。很多人会认为放在 Github 上的就是开源软件&#xff0c;既然放了源代码&#xff0c;我就可以随便使用了。其实版权法规定著作是禁止共享的&#xff0c;也就是说没有许可证的软件等于保留版权。虽然源代码公开了&#xff0c;但并不…

GUI编程--PyQt5--QLabel

文章目录QLabel 文本展示QLabel 图片展示QLCDNumberQProgressBarQErrorMessageQProgressDialogQLabel 文本展示 展示文本、富文本、图片、动画。 # 实例化 label QLabel(self) # 设置文本 label.setText("666") # 设置图片 label.setPixmap(QPixmap) label.resize…

[BUG] runtime network not ready: NetworkReady=false reason:NetworkPluginNotRead

1 背景 执行kubectl get node是发现节点是NotReady状态&#xff0c;接着执行kubectl describe node 节点名 详细查看NotReady状态原因如下&#xff1a; runtime network not ready: NetworkReadyfalse reason:NetworkPluginNotReady message:docker: network plugin is not r…

数据结构之线性表中的双向循环链表【详解】

前言&#xff1a; 嗯&#xff01;昨天我们的无头单向非循环链表咱已经是可以顺利完成出来了的&#xff0c;今天我们就来看一下什么是有头双向循环链表&#xff0c;不要看着这个链表又双向又循环的就比单向不循环链表难&#xff0c;其实这个更加的简单哦&#xff01;前提是你有…

SpringBoot SpringBoot 原理篇 1 自动配置 1.17 自动配置原理【3】

SpringBoot 【黑马程序员SpringBoot2全套视频教程&#xff0c;springboot零基础到项目实战&#xff08;spring boot2完整版&#xff09;】 SpringBoot 原理篇 文章目录SpringBootSpringBoot 原理篇1 自动配置1.17 自动配置原理【3】1.17.1 看源码了1.17.2 小结1 自动配置 1.…

【STA】(1)引言

目录 1. 纳米级设计 2. 什么是STA 3. 为什么要进行STA 4. 设计流程 5. 不同阶段的STA 6. STA的局限性 1. 纳米级设计 在半导体器件中&#xff0c;金属互连线通常被用来连接电路中的各个部分&#xff0c;进而实现整个芯片。随着制造工艺的进一步缩小&#xff0c;这些互连线…

【电源专题】案例:不导这颗MOS管的原因是在电路上不通用?

本案例发生在MOS管替代料导入时。正常情况下在替代料导入、部品导入的时候,我们需要查看规格书。怎么查找规格书可以看文章【电子通识】芯片资料查询方法 对于一些关键的信息我们要做对比,一般来说要通过列表进行对比。但因为不同的供应商的测试标准不同,有很多是很难对比的…

信号与系统2——LTI

信号与系统2——LTI一、Introduction1. Representation of LTI systems2. Significance of unit impulse二、DT-LTI&#xff1a;Convolution Sum1. Output2. Impulse response of LTI system H3. Convolution sum4. Convolution Sum Evaluation Procedure5. Sequence Convoluti…

Python 数据容器(1) - list(列表)

文章目录什么是数据容器&#xff1f;Python中的数据容器数据容器&#xff1a;list&#xff08;列表&#xff09;基本语法案例演示列表的下标&#xff08;索引&#xff09;列表常用操作list容器操作总结什么是数据容器&#xff1f; 一种可以容纳多份数据的数据类型&#xff0c;容…

算法学习 | 回溯算法之深度优先搜索常见题型练习

目录 岛屿的最大面积 电话号码的字母组合 二进制手表 组合总数 活字印刷 岛屿的最大面积 题目链接&#xff1a;leetcode-695.岛屿的最大面积 示例 输入&#xff1a;grid [[0,0,1,0,0,0,0,1,0,0,0,0,0],[0,0,0,0,0,0,0,1,1,1,0,0,0],[0,1,1,0,1,0,0,0,0,0,0,0,0],[0,…

线程“八锁“ synchronized到底是对哪个对象加锁?

线程"八锁" synchronized到底是对哪个对象加锁? 习题一 class Number{public synchronized void a(){System.out.println("1");}public synchronized void b(){System.out.println("2");} } public class TestBlock {public static void main(…

从Zemax OpticStudio导入光学系统

摘要 ZemaxOpticStudio是一款广泛使用的光线追迹软件。VirtualLab Fusion可以从Zemax OpticStudio导入光学系统&#xff0c;包括完整3D位置信息和镜片玻璃。导入后&#xff0c;光学系统的结构数据将显示为单独的表面或可以组合成VirtualLab Fusion中的组件。VirtualLab Fusion可…

docker入门(一):在centOS虚拟机上安装docker

索引CentOS虚拟机安装1.下载CentOS镜像问题1-报错“您已输入用户名&#xff0c;客户机操作系统将保留此用户名”2.根据docker官方指导进行安装1.卸载旧版本&#xff08;初次安装可以忽略&#xff09;2.确保能联网后下载前置软件包3.设置镜像库&#xff08;阿里版&#xff09;4.…

CLIP后续--LSeg,GroupViT,ViLD

这个博客开了有两个月&#xff0c;一直没写成&#xff0c;最近封寝给它完成~躺平第三天 CLIP应用领域概览&#xff1a; 1. LSeg 原论文地址&#xff1a;https://arxiv.org/abs/2201.03546 代码&#xff1a;https://github.com/isl-org/lang-seg 这个图就很清楚的说明了zero…

mysql数据库管理

目录 一、MySQL数据库管理 1、库和表 2、常用的数据类型 3、char和varchar区别 二、查看数据库结构 三、SQL语句 1、SQL语句分类&#xff1a; 四、创建及删除数据库和表 五、管理表中的数据记录 六、修改表名和表结构 七、自增 八、填充 九、克隆表 十、清空表&am…

信号与系统1——Signals and Systems

信号与系统1——Signals and Systems一、Introduction1. Signals and Systems信号与系统(1) Signal信号(2) System系统2. Classification of Signals信号的分类(1) Continuous-time & discrete-time1) Continuous-Time signal连续时间信号2) Discrete-Time signal离散时间信…

【Hack The Box】linux练习-- Passage

HTB 学习笔记 【Hack The Box】linux练习-- Passage &#x1f525;系列专栏&#xff1a;Hack The Box &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f4c6;首发时间&#xff1a;&#x1f334;2022年9月7日&#x1f334; &#x1f36…

浅析数据仓库和建模理论

第一章 认识数据仓库 1.1 数据仓库概念 数据仓库&#xff0c;英文名称为 Data Warehouse&#xff0c;可简写为 DW 或 DWH。数据仓库&#xff0c;是为企业所有级别的决策制定过程&#xff0c;提供所有类型数据支持的战略集合。它是单个数据存储&#xff0c;出于分析性报告和决…