并发编程之ForkJoin框架

news2025/1/24 2:26:15

什么是 Fork/Join 框架

Fork/Join 是从 java7 开始提供的并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务的结果,得到大任务结果的框架.

如下图:

Fork/Join 的特性

  1. ForJoinPool 不是为了替代 ExecutorService, 而是它的补充,在一些可分割的大任务场景下,性能会更好。
  2. ForJoinPool 主要利用“分而治之”的算法思想
  3. ForJoinPool 最适合计算型密集的任务

工作窃取算法

指的是某个线程从其他线程队列中获取任务来执行。这也是 Fork/Join 框架执行任务的核心机制

当我们需要做一比较大的任务时, 我们可以把这个任务分成若干个不互相依赖的子任务,把这些子任务放在不同的队列中,并为不同的队列生成一个单独的线程去执行任务,线程和队列是一一对应的。但是有的线程干活比较快,把自己队列中的任务执行完了,不会干等着,而是去帮其他线程干活,就是取其他队列中的任务来执行。 这样的话就会有两个线程同时访问一个队列,会发生资源抢占问题,于是,把这个队列设计成双端队列, 从队列尾部偷任务执行,不和土著线程抢队头。这样就减少了资源抢占的机会。

工作窃取算法优点是,充分利用多核 CPU 并行计算,并减少了线程间的竞争,缺点是,并没有完全避免竞争,例如队列中只有一个任务时,同时消耗了更多的系统资源。

ForkJoinPool 任务执行步骤

  1. ForJoinPool 中的每个工作线程都维护一个队列(WorkQueue), 这是一个双端队列 (Deque), 队列中只能存放 ForkJoinTask 子类型的任务
  2. 线程在工作中产生的新任务时(通常是调用了 fork() 方法), 会放入队尾,并且线程在处理任务时,使用的是 LIFO 方式,也就是从队尾取出任务执行。
  3. 每个线程在处理自己的工作队列的同时,会尝试窃取一个任务(刚提交到线程池的任务,或者其他线程队列中的任务), 窃取的任务位于其他线程队列的队头,也就是线程在窃取任务时,采取的是 FIFO 的方式。
  4. 遇到 join() 方法时,如果需要 join() 的任务尚未完成,则会先处理其他任务,并等待这个任务完成。
  5. 既没有自己的任务,也没有可窃取的任务时,进入休眠

ForkJoinPool 的使用

ForkJoin 框架要求任务必须是 ForkJoinTask 的子类,通常情况下不需要直接继承 ForkJoinTask, 而是继承它的子类,RecursiveActionRecursiveTask

RecursiveAction

用于没有返回值的任务,必须讲数据写到磁盘,可以把数据分块,多线程去写入

RecursiveTask

用于有返回值的任务

使用示例

数组中的数字累加

/**
 *有返回值类型的可拆分任务
 */
class SumTask extends RecursiveTask<Integer> {
    /**
     * 控制最小任务的粒度
     */
    private final static  int THRESHOLD = 20;
    private int[] arr;
    private int start;
    private int end;

    public SumTask(int[] arr, int start, int end){
        this.arr = arr;
        // 将数组分割成开始下表为 start, 结束下标为 end 的小数组
        this.start = start;
        this.end = end;
    }

    /**
     * 只计算某段的和
     */
    private int subTotal(){
        int sum = 0;
        for(int i = start; i < end; i ++){
            sum += arr[i];
        }
        return sum;
    }

    @Override
    protected Integer compute() {
        // 达到最小粒度时,开始计算
        if(end - start < THRESHOLD){
            return subTotal();
        }
        // 否则继续拆分成两个小任务
        int middle = (start + end) /2;
        SumTask leftSum = new SumTask(arr, start, middle);
        SumTask right = new SumTask(arr, middle, end);
        // 提交新任务
        leftSum.fork();
        right.fork();

        // 计算两个小任务的和
        return  leftSum.join() + right.join();

    }

}

public class ForkJoinTaskDemo {

    public static void main(String[] args) {
        int size = 1000000;
        int[] arr = new int[size];

        // 生成数组
        for(int i = 0; i< size ; i++){
            arr[i] = i + 1;
        }
        ForkJoinPool pool = new ForkJoinPool();
        // 提交一个大任务给线程池
        ForkJoinTask<Integer> result = pool.submit(new SumTask(arr,0, size));
        // 执行
        System.out.println("pool 结果:" + result.invoke());
        pool.shutdown();

    }
}

上面方法可以用下图说明:

重要方法解释

构造方法

完整的构造方法如下:

private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix)

参数解释:

  • parallelism 使用线程的个数,默认使用等同处理器个数的线程
  • factory 创建线程的工厂,默认情况下使用 ForkJoinWorkerThreadFactory
  • handler 线程异常时的处理器,默认 null
  • mode 表示工作线程内的任务队列是采用何种方式进行调度,可以是先进先出 FIFO,也可以是后进先出 LIFO。如果为 true,则线程池中的工作线程则使用先进先出方式进行任务调度,默认情况下是 false。
  • workerNamePrefix 线程名字前缀。

fork 方法

fork() 做的工作只有一件事:把任务推入当前工作线程的工作队列里,源码如下:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

join 方法

join() 的工作则复杂得多,也是 join() 可以使得线程免于被阻塞的原因——不像同名的 Thread.join()

  1. 检查调用 join() 方法的线程是不是 ForkJoinThread 线程,如果不是,比如:main 线程,则阻塞当前线程,如果是,则不阻塞
  2. 查看任务完成状态,如果已经完成,则直接返回结果,如果没有完成,而且处于自己的工作队列内,则完成它
  3. 如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲
    join 的任务。
  4. 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
  5. 递归地执行第 4 步。

submit 方法

ForJoinPool 自认拥有工作队列,用来接收外部线程(非 ForkJoinThread)提交过来的任务,这个工作队列被称为 submitting queue, submit() 方法和 fork() 方法没有本质区别, 只不任务的目的地是 submitting queue. submitting queue 和 work queue 一样,也是被窃取的对象。因此当一个任务被成功窃取时,就意味着被提交的任务真正开始进入执行阶段。

invoke 方法

开始执行任务,如果必要,等待计算完成。

参考资料

  • https://www.cnblogs.com/cjsblog/p/9078341.html

  • https://note.youdao.com/ynoteshare/index.html?id=43491d79e1e5735d39b34b8f7a20c5c7&type=note&_time=1667033251690

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1748.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

毕业仅1年,干Python赚了50W!网友:不是吹的

前言 惊讶 ​Py现状&#xff1a;Python职位月薪5W起&#xff1f; 其他程序员&#xff1a;心态塌了&#xff01; 秒杀各行业薪资榜单&#xff0c;拿下编程语言排行榜的Python&#xff0c;工资真的如网上说的开挂了吗&#xff1f;有人在网上发现这样的一条评论信息&#xff1a…

公众号网课查题接口题库

公众号网课查题接口题库 本平台优点&#xff1a; 多题库查题、独立后台、响应速度快、全网平台可查、功能最全&#xff01; 1.想要给自己的公众号获得查题接口&#xff0c;只需要两步&#xff01; 2.题库&#xff1a; 题库&#xff1a;题库后台&#xff08;点击跳转&#xf…

Navicat 现已支持 OceanBase 企业版

近期&#xff0c;PremiumSoft CyberTech Limited 公司发布了 Navicat 16.1.3 版本&#xff0c;正式支持蚂蚁集团旗下的 OceanBase 企业版&#xff08;MySQL 兼容模式&#xff09;。此次合作旨在帮助用户通过 Navcicat 进行 OceanBase 企业版的数据库开发及管理&#xff0c;更大…

说说 Redis 事务

Redis 事务简介# Redis 只是提供了简单的事务功能。其本质是一组命令的集合&#xff0c;事务支持一次执行多个命令&#xff0c;在事务执行过程中&#xff0c;会顺序执行队列中的命令&#xff0c;其他客户端提交的命令请求不会插入到本事务执行命令序列中。命令的执行过程是顺序…

MPLS综合实验

目录 实验要求 划分IP地址 首先对MPLSVPN骨干网络进行配置 首先配置IP地址 启动IGP协议 激活MPLS和LDP VRF空间的创建 将接口划入到VRF空间中 R1和R5通过静态路由在CE和PE上配置 建立MP-BGP 对站点R1和R5进行配置 首先把IP给配置好 在VRF空间中发布路由信息 对站点…

2000-2020上市公司全要素生产率LP方法含原始数据和Stata代码

1、时间&#xff1a;2000-2020年 2、指标包括&#xff1a;stkcd、year、证券代码、固定资产净额、营业总收入、营业收入、营业成本、销售费用、管理费用、财务费用、支付给职工以及为职工支付的现金、员工人数、折旧摊销、行业代码、上市日期、AB股交叉码、退市日期、年末是否…

windows下用Java跑通spark官方文档的quick-start

这里写自定义目录标题前置环境官方示例三个小坑maven文件引用不明确未传递master url前置环境 见上一篇&#xff1a;https://blog.csdn.net/shuzip/article/details/115606522 官方示例 https://spark.apache.org/docs/3.1.1/quick-start.html /* SimpleApp.java */ import…

廊坊特色农业 国稻种芯·中国水稻节:河北复合农业促增收

廊坊特色农业 国稻种芯中国水稻节&#xff1a;河北复合农业促增收 新闻中国采编网 中国新闻采编网 谋定研究中国智库网 中国农民丰收节国际贸易促进会 国稻种芯中国水稻节 中国三农智库网-功能性农业农业大健康大会报道&#xff1a;河北廊坊安次区“稻蟹共作”新模式 特色农业…

Codeforces Round #773 (Div. 2)

A. Hard Way 题目链接&#xff1a;Problem - A - Codeforces 样例输入&#xff1a; 5 8 10 10 4 6 2 4 6 0 1 4 2 14 1 11 2 13 2 0 0 4 0 2 4 0 1 1 1 0 0样例输出&#xff1a; 0.0000000 0 2.0000 0.00 1题意&#xff1a;给定一个三角形的三个顶点&#xff0c;输入保证三角…

echarts画各种形状水波图

各种形状水波图 代码 用的是echarts绘制&#xff0c;echarts相关api可以参考echarts官网&#xff0c;形状修改series.shape即可修改形状&#xff0c;这里用的是SVG路径 <html><head><meta charset"utf-8"><link href"https://fonts.google…

jQuery网页开发案例:jQuery常用API--jQuery 尺寸、位置操作及 电梯导航案例和节流阀(互斥锁)

jQuery 尺寸 以上参数为空&#xff0c;则是获取相应值&#xff0c;返回的是数字型。如果参数为数字&#xff0c;则是修改相应值。参数可以不必写单位。这个width方法不包含边框 innerWidth()包含widthpadding 注意这个要大写 outerWidth()包含width padding border outerW…

(JavaSE) 数组

文章目录1. 数组的作用2. 数组的创建及初始化2.1 数组的创建2.2 数组的初始化3. 数组的使用3.1 数组中元素的访问3.2 遍历数组方法4. 数组是引用类型4.1 JVM中的内存有那些4.2 数组如何开辟空间4.3 数组 null 的意思4.4 引用不能同时指向多个对象4.5 数组作为方法返回值5. 二维…

【一键生成】3DMAX配景楼生成插件使用教程

3DMAX室外设计师常常需要创建各种场景配楼&#xff0c;为了解决大场景制作难的情况&#xff0c;3dMax配景楼生成插件是一款傻瓜式的插件或许更能快速让你从繁重的体力劳动中解脱出来&#xff01; 【安装方法】 方法一&#xff1a;拖动插件文件到3dMax窗口。 方法二&#xff1a;…

MySQL 主要线程

文章目录MySQL 主要线程1. Master thread2. io thread3. purge thread4. page Cleaner ThreadMySQL 主要线程 1. Master thread Master thread有四大循环&#xff0c;分别是loop,background loop&#xff0c;suspend loop&#xff0c;flush loop。且四大循环的作用如下: loop…

第二篇 基于自然语言处理的漏洞检测方法综述

杨伊等 来源&#xff1a;计算机研究与发展 目录 1 相关技术 1.1 自然语言处理 1.2 漏洞检测与分析 据2021年CVE漏洞趋势安全报告&#xff0c;当前漏洞类型占比最大的5类漏洞分别是代码执行、拒绝服务、溢出、跨站脚本以及信息获取。基于自然语言处理技术实现漏洞检测的研究…

STM32F103实现激光测距传感器测距WT-VL53L0 L1

目录 本博客将采用标准库和HAL库实现 所用设备选择 引脚说明 与单片机的接线表 标准库实现 HAL库实现 本博客将采用标准库和HAL库实现 所用设备选择 单片机型号&#xff1a;STM32F103C8T6 激光测距传感器型号&#xff1a;WT-VL53L0 L1 采用串口TTL电平输出&#xff0c;可…

CSDN云IDE初次测评体验

CSDN云IDE初次测评体验 文章目录CSDN云IDE初次测评体验一、前言二、云IDE产品介绍三、云IDE使用教程1、尝试编写Python爬虫代码2、尝试编写Python可视化代码3、尝试连接MySQL数据库四、最后我想说一、前言 最近一直有收到CSDN官方私信参加这个测评云IDE活动&#xff0c;刚好这…

YOLOv5剪枝 | 模型剪枝理论篇 | 1/2

文章目录 1. 前言2. 摘要精读3. 背景4. 本文提出的解决方式5. 通道层次稀疏性的优势6. 挑战7. 缩放因素和稀疏性惩罚8. 利用BN图层中的缩放因子9. 通道剪枝和微调10. 多通道方案11. 处理跨层连接和预激活结构12. 实验结果12.1 CIFAR-10数据集剪枝效果12.2 CIFAR-100数据集剪枝效…

Kafka图形管理界面Kafka-eagle安装配置详解

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点&#xff0c;重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等。 官方网址&#xff1a;EFAK 点击下载&#xff0c;将安装包下载到电脑本地&…

pytoch安装

1、安装 Anaconda 2、检查显卡&#xff0c;驱动&#xff08;检查是否较新&#xff09; winR——cmd&#xff0c;进入命令行&#xff0c;输入&#xff1a; nvidia-smi 如果版本不够&#xff0c;去官网下载Release Notes :: CUDA Toolkit Documentation (nvidia.com) 3、创建P…