【JUC基础】16. Fork Join

news2025/1/9 1:18:25

1、前言

“分而治之”一直是一个非常有效的处理大量数据的方法。著名的MapReduce也是采取了分而治之的思想。。简单地说,就是如果你要处理 1000 个数据,但是你并不具备处理 1000个数据的能力,那么你可以只处理其中的 10 个,然后分阶段处理 100 次,将 100 次的结进行合成,就是最终想要的对原始 1000 个数据的处理结果。而这就是Fork Join的基本思想。

2、Fork/Join框架

Fork 一词的原始含义是吃饭用的叉子,也有分叉的意思。在 Linux 平台中,方法 fork()用来创建子进程,使得系统进程可以多一个执行分支。在 Java 中也沿用了类似的命名方式。

而 join()方法的含义在之前的章节中已经解释过,这里表示等待。也就是使用 fork()方法后系统多了一个执行分支(线程),所以需要等待这个执行分支执行完毕,才有可能得到最终的结果,因此join()方法就表示等待。

3、JUC中的Fork/Join

在实际使用中,如果毫无顾忌地使用 fork()方法开启线程进行处理,那么很有可能导致系统开启过多的线程而严重影响性能。所以,在JDK 中,给出了一个 ForkJoinPool线程池对于fork()方法并不急着开启线程,而是提交给 ForkJoinPool线程池进行处理,以节省系统资源。

由于线程池的优化,提交的任务和线程数量并不是一对一的关系。在绝大多数情况下一个物理线程实际上是需要处理多个逻辑任务的。因此,每个线程必然需要拥有一个任务队列。因此,在实际执行过程中,可能遇到这么一种情况: 线程 A 已经把自己的任务都执行完了,而线程 B 还有一堆任务等着处理,此时,线程A 就会“帮助”线程 B,从线程 B的任务队列中拿一个任务过来处理,尽可能地达到平衡。也就是所谓的工作窃取。

3.1、实现1累计到1亿

这个是高频面试题,这时候当你回答用for循环去累加的时候,你就已经输了。正儿八经的,你高低说个fork join,面试官还能微微一笑。

下面我们简单写个示例实现这个场景,也更好的理解以下fork join。

public class ForkJoinTest {
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(1, 100000000L);
        // 线程池调用方式一
        long result = forkJoinPool.invoke(task);
        
        long endTime = System.currentTimeMillis()
        System.out.println("Sum: " + result + ", 计算耗时:" + (endTime - startTime) + "ms");
        
        // 线程池调用方式二
        // ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(task);
        // System.out.println("Sum: " + result + ", 计算耗时:" + (endTime - startTime) + "ms");
    }
}

class CountTask extends RecursiveTask<Long> {

    // 批次数量,当数量达到10000,就继续分解
    private static final int THRESHOLD = 10000;
    private final long start;
    private final long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 如果任务足够小,直接计算结果
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            // 如果任务较大,将任务拆分为更小的子任务
            long mid = (start + end) / 2;
            CountTask leftTask = new CountTask(start, mid);
            CountTask rightTask = new CountTask(mid + 1, end);

            leftTask.fork();
            rightTask.fork();

            long leftSum = leftTask.join();
            long rightSum = rightTask.join();

            return leftSum + rightSum;
        }
    }
}

我们重点看CountTask中的compute()方法。首先我们定义了要计算的规模大小THRESHOLD=10000。意味着我们会把累计1亿(因为我们要从1累加到1亿)个任务,按照10000个分解成子任务。并使用fork()方法提交子任务,最终join()方法等待各个子任务结束,并将结果再次求和。

来看下执行结果:

 

从代码中,还有几个和平时使用不一样的地方:

  1. CountTask继承了RecursiveTask
  2. main中线程池使用了ForkJoinPool

3.2、RecursiveTask

Recursive翻译过来就是递归,RecursiveTask也就是递归任务。没错,fork join的思想其实就是分批递归做同样的事情,所以也不难理解。

RecursiveTask是一个抽象类,用于支持Fork/Join框架的任务并行执行。他继承自ForkJoinTask。有如下特性:

  1. 继承关系:RecursiveTask是ForkJoinTask的子类,它继承了ForkJoinTask的一些方法和特性,比如执行任务、取消任务、等待任务完成等。
  2. 泛型类型:RecursiveTask是一个泛型类,通过类型参数V表示任务执行的结果类型。
  3. 抽象方法:RecursiveTask是一个抽象类,需要子类实现其唯一的抽象方法compute(),用于定义具体的任务逻辑。
  4. 任务拆分:RecursiveTask通常用于需要将大任务划分为小任务并以递归的方式执行的场景。在compute()方法中,可以通过判断任务的规模或条件,将任务拆分为更小的子任务,并在子任务中调用fork()方法提交并行执行。
  5. 任务合并:在子任务完成后,可以通过调用join()方法获取子任务的结果,并进行合并。这样,可以逐级向上合并子任务的结果,直到最终得到整个任务的结果。
  6. 返回结果:RecursiveTask的compute()方法必须返回一个结果,类型与泛型参数V一致。任务执行完成后,可以通过get()方法或join()方法获取任务的结果。

 

3.3、RecursiveAction

而RecursiveAction与RecursiveTask相似,RecursiveTask是带有返回值类型;而RecursiveAction是不带有返回值的任务。RecursiveAction不具备上面说到的RecursiveTask泛型的特性,以及无返回结果。

3.4、ForkJoinPool

上面提到的不管是RecursiveTask还是RecursiveAction,都无法独立使用,都是需要配合ForkJoinPool来执行任务,ForkJoinPool是一个线程池,同时也是一个任务调度机制。

主要有如下一些特性,包括我们前面提到的工作窃取也是他:

  1. 工作窃取算法:ForkJoinPool使用一种称为"工作窃取"(work-stealing)的算法来实现任务的调度和执行。每个线程都有一个自己的工作队列,当一个线程完成自己的任务后,它可以从其他线程的工作队列中窃取任务来执行。这种方式使得任务能够自动地在多个线程之间动态平衡,提高了并行执行的效率。
  2. 并行度控制:ForkJoinPool允许控制并行度,即同时执行的线程数量。可以通过构造函数或者ForkJoinPool.commonPool()方法来创建一个线程池实例,并指定最大的并行度。默认情况下,ForkJoinPool使用可用处理器的数量作为默认的并行度。
  3. 分治任务的执行:ForkJoinPool最适合执行可以通过递归的方式拆分成更小子任务的分治任务。通过提交RecursiveTask或RecursiveAction的实例给ForkJoinPool执行,框架会自动将任务拆分为更小的子任务,并提交给线程池中的线程进行执行。
  4. invoke()方法:ForkJoinPool提供了invoke()方法用于提交一个任务并等待其执行完成。该方法会阻塞直到任务执行完成并返回结果。
  5. submit()方法:除了invoke()方法,ForkJoinPool还提供了submit()方法用于异步提交一个任务,并返回一个ForkJoinTask的实例,可以通过该实例获取任务的结果。
  6. 任务的取消和异常处理:ForkJoinPool提供了一些方法用于任务的取消和异常处理,比如cancel()用于取消任务,isCancelled()用于判断任务是否被取消,getException()用于获取任务执行过程中的异常。

注:ForkJoinPool其实就是个CPU密集型的线程池。因此给定的线程个数最好是CPU的核心数+1。

使用ForkJoinPool可以充分利用多核处理器的性能,提高任务执行的效率。

4、进阶实现

到此,fork join基本的思想以及基础介绍也差不多了。但是如果3.1的问题,只回答到fork join可能只能得60分。从上面的代码上看有没有更高效的方法?答案是有的,高低我们现在用的开始JDK8啊,我们知道JDK8里面的stream也相应提供了并行流的计算。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    long startTime = System.currentTimeMillis();
    long sum = LongStream.rangeClosed(0, 100000000L).parallel().reduce(0, Long::sum);
    long endTime = System.currentTimeMillis();
    System.out.println("Sum: " + sum + ", 计算耗时:" + (endTime - startTime) + "ms");
}

e) + "ms"); }

执行结果:

效率高得离谱。这里我们干脆把三种方式都实现一遍对比下结果:

package forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
 * @author Shamee loop
 * @date 2023/6/4
 */
public class ForkJoinTest {
    public static void main(String[] args) {

        System.out.println("==============传统实现方式================");
        long start0 = System.currentTimeMillis();
        long sum0 = 0;
        for (int i = 0; i <= 1000000000L; i++) {
            sum0 += i;
        }
        long end0 = System.currentTimeMillis();

        System.out.println("Sum: " + sum0 + ", 计算耗时:" + (end0 - start0) + "ms");
        System.out.println("==============传统实现方式================");
        System.out.println();
        System.out.println();

        System.out.println("==============Fork Join 实现方式================");
        long start1 = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0, 1000000000L);
        long sum1 = forkJoinPool.invoke(task);
        long end1 = System.currentTimeMillis();

        System.out.println("Sum: " + sum1 + ", 计算耗时:" + (end1 - start1) + "ms");
        System.out.println("==============Fork Join 实现方式================");
        System.out.println();
        System.out.println();


        System.out.println("==============JDK8 Stream 实现方式================");
        long start2 = System.currentTimeMillis();
        long sum2 = LongStream.range(0, 1000000000L).parallel().reduce(0, Long::sum);
        long end2 = System.currentTimeMillis();
        System.out.println("Sum: " + sum2 + ", 计算耗时:" + (end2 - start2) + "ms");
        System.out.println("==============JDK8 Stream 实现方式================");
    }

}

class CountTask extends RecursiveTask<Long> {

    // 批次数量,当数量达到10000,就继续分解
    private static final int THRESHOLD = 10000;
    private final long start;
    private final long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 如果任务足够小,直接计算结果
            long sum = 0;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            // 如果任务较大,将任务拆分为更小的子任务
            long mid = (start + end) / 2;
            CountTask leftTask = new CountTask(start, mid);
            CountTask rightTask = new CountTask(mid + 1, end);

            leftTask.fork();
            rightTask.fork();

            long leftSum = leftTask.join();
            long rightSum = rightTask.join();

            return leftSum + rightSum;
        }
    }
}

执行结果:

5、小结

在使用 Fork/Join 框架时需要注意: 如果任务的划分层次很多,一直得不到返回,可能出现两种情况。第一,系统内的线程数量越积越多,导致性能严重下降。第二.,医的调用层次变多,最终导致栈溢出。不同版本的 JDK 内部实现机制可能有差异,从而导其表现不同。

此外,ForkJoin 线程池使用一个无锁的栈来管理空闲线程。如果一个工作线程暂时取不到可用的任务,则可能会被挂起,挂起的线程将会被压入由线程池维护的栈中。待将来有任务可用时,再从栈中唤醒这些线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/622173.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用JSAPl来做一个倒计时的效果

今天的小案例需要做一个倒计时的效果 我们的时分秒需要一直进行倒计时&#xff0c;然后我们的页面颜色需要根据定时器的操作来进行更换&#xff0c;首先我们还是可以来分析一下我们的HTML步骤 <div class"countdown"><p class"next">今天是22…

HCIE-Cloud Computing LAB备考第二步:实战解题–第五题:论述二--跨数据中心部署问答--缩写法

跨数据中心部署 通常部署在同城或相近城市存在的两个数据中心&#xff0c;其物理距离在300km以内&#xff0c;两个数据中心均处于运行状态&#xff0c;可同时承担相同业务&#xff0c;提高数据中心的整体服务能力和系统资源利用率&#xff0c;当单数据中心故障时&#xff0c;业…

学会使用perf性能分析工具(含移植到arm-linux开发板)

文章目录 一、在ubuntu中使用apt包下载Perf二、使用源码安装Perf&#xff0c;并移植到arm-linux环境下三、使用perf四、Perf的功能介绍 系统&#xff1a;Ubuntu18.04系统 内核版本&#xff1a;5.4.0-150-generic&#xff08;通过uname -r查看&#xff09; 一、在ubuntu中使用ap…

针对大屏设备优化 Android 应用的方式及相关注意事项

作者 / Android 团队 近年来&#xff0c;包括大型可折叠设备、平板电脑以及 Chromebook 等大屏 Android 设备的数量与日俱增。确保应用可以在大屏设备上为用户提供无缝体验比以往任何时候都更加重要。例如&#xff0c;用户希望应用能够更充分利用这些设备的更大屏幕空间。我们发…

数据库信息速递 MONGODB CTO 看数据库发展趋势 与 不使用MONGODB你就要交“创新税”...

开头还是介绍一下群&#xff0c;如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请联系 liuaustin3 &#xff0c;在新加的朋友会分到2群 3群&#xff…

为了女神,我拼了!

大家注意&#xff1a;因为微信最近又改了推送机制&#xff0c;经常有小伙伴说错过了之前被删的文章&#xff0c;比如前阵子冒着风险写的爬虫&#xff0c;再比如一些限时福利&#xff0c;错过了就是错过了。 所以建议大家加个星标&#xff0c;就能第一时间收到推送。&#x1f44…

第二十八章 开发Productions - ObjectScript Productions - 定义业务操作

文章目录 第二十八章 开发Productions - ObjectScript Productions - 定义业务操作介绍关键原则定义业务操作类 第二十八章 开发Productions - ObjectScript Productions - 定义业务操作 本页介绍如何定义业务操作类。 提示&#xff1a; IRIS 提供使用特定出站适配器的专用业…

解读大模型的微调

在快速发展的人工智能领域中&#xff0c;有效地利用大型语言模型&#xff08;LLM&#xff09;变得越来越重要。然而&#xff0c;有许多不同的方式可以使用大型语言模型&#xff0c;这可能会让我们感到困惑。实际上&#xff0c;可以使用预训练的大型语言模型进行新任务的上下文学…

【移动架构】Flutter和React Native:最后的PK

首先&#xff0c;有点离题。做出决定的最简单方法是回顾历史。让我们沿着怀旧之路走一趟。早在2000年初&#xff0c;JAVA就有两个UI框架。一个是AWT&#xff0c;它是一种为多个操作系统构建UI的方法&#xff0c;同时仍然保持操作系统的外观。 每个操作系统都有自己的组件&#…

驱动开发--创建设备文件--控制LED灯

目录 1、手动创建设备文件 2、应用程序如何将数据传递给驱动 3、控制LED灯&#xff1a; 4、应用层控制灯 5、自动创建设备节点 1、手动创建设备文件 cat /proc/devices 查看主设备号 sudo mknod hello(路径&#xff1a;任意的) c/b&#xff08;C代表字符设备 b代表块设备&a…

华为无线AC双机热备三层组网配置案例

核心交换机: dis current-configuration sysname hx undo info-center enable vlan batch 10 66 88 99 to 100 ip pool vlan10 gateway-list 192.168.10.254 network 192.168.10.0 mask 255.255.255.0 dns-list 8.8.8.8 ip pool vlan100 gateway-list 172.16.100.254 network …

Qt在MySQL中存储音频文件

一、在存储音频视频等大文件时需要以二进制文件进行存储&#xff0c;首先需要了解mysql存储二进制文件的字段类型以及大小&#xff1a; 需要创建数据库中的图片类型为&#xff1a;二进制mediumblob类型&#xff0c;&#xff08; TinyBlob 最大 255 Blob 最大 65K MediumBlob …

苹果发布会,卧槽,卧槽,卧槽

今天跟二哥在群里聊到苹果的发布会&#xff0c;二哥完整的看了发布会&#xff0c;我随口问一句二哥看完后什么感受。 二哥说「苹果的工业设计还是遥遥领先&#xff0c;交互设计也是一流水准&#xff0c;然后价格也是遥遥领先」。 然后&#xff0c;我今天也抽空看了关于苹果新发…

【算法与数据结构】203、LeetCode移除链表元素

文章目录 题目一、解题思路完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 题目 一、解题思路 思路分析&#xff1a;这道题需要注意一个特殊情况&#xff0c;当删除的是头结点时&#xff0c;直接删除就找不到整个链表。因此我…

2023LRC软件、Adobe Lightroom Classic下载、安装教程

最后附下载地址 LRC简介&#xff1a; Adobe Lightroom Classic&#xff08;简称LR&#xff09;是Adobe Creative Cloud大家庭中的一款专业的图片管理和编辑工具&#xff0c;用于专业摄影师、摄影爱好者以及所有不断优化数码影像的人等。其目标是以丰富的功能提供高效、一致的…

03【WebStorm开发工具】

上一篇&#xff1a;02【HTML快速入门】 下一篇&#xff1a;04【】 目录&#xff1a;【HTML5系列教程】 文章目录 三、WebStorm开发工具3.1 WebStorm简介3.2 WebStorm安装3.3 WebStorm基本使用3.3.1 创建项目3.3.2 调整字体大小3.3.3 代码自动补全3.3.4 WebStorm常用快捷键 三…

在Anaconda的虚拟环境中添加环境变量并通过python访问(win/mac/linux)

一、前言 有的时候密码登比较敏感的信息&#xff0c;不方便直接写在代码里有很多变量我想很多project都可以访问到 那这时候使用环境变量是非常合适的了。 二、设置环境变量 以linux为例 直接在internal执行命令 export 变量值通过更改bashc文件 vim ~/.bashrc # 在最后一行加上…

【双向链表】

双向链表 带头双向循环链表的实现1. 函数的声明2. 函数的实现3. 主函数测试 带头双向循环链表的实现 今天我们来实现一下带头双向循环链表&#xff0c;顾名思义&#xff0c;带头就是有哨兵位&#xff0c;哨兵位不是链表的头&#xff0c;它是连接头节点的一个节点&#xff0c;方…

ChatGPT提示词攻略之迭代提示词

当我们在调试程序时&#xff0c;通常很难一次就把程序正常跑起来。这是普遍现象。但我们会借助一些工具和手段&#xff0c;有步骤有流程地去调整程序&#xff0c;最终让程序按照我们想要的样子正常执行。 对于提示词来说也是一样的。当我们向ChatGPT提问时&#xff0c;一开始它…

从操作系统角度了解内存管理

一.内存管理 1.主要功能 内存管理的主要功能有: 内存空间的分配与回收。由操作系统完成主存储器空间的分配和管理&#xff0c;使程序员摆脱存储分配的麻烦&#xff0c;提高编程效率。地址转换。在多道程序环境下&#xff0c;程序中的逻辑地址与内存中的物理地址不可能一致, …