java并发编程:Fork/Join并发框架介绍

news2025/1/16 1:44:41

文章目录

  • Fork/Join简介
  • 工作窃取算法
  • Fork/Join的具体实现
    • ForkJoinTask
      • fork()方法
      • join()方法
    • ForkJoinPool
      • WorkQueue
      • runState
  • Fork/Join的异常处理
  • Fork/Join的使用


Fork/Join简介

Fork/Join框架是一个实现了ExecutorService接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。

与其他ExecutorService相关的实现相同的是,Fork/Join框架会将任务分配给线程池中的线程。而与之不同的是,Fork/Join框架在执行任务时使用了工作窃取算法

fork在英文里有分叉的意思,join在英文里连接、结合的意思。顾名思义,fork就是要使一个大任务分解成若干个小任务,而join就是最后将各个小任务的结果结合起来得到大任务的结果。

Fork/Join的运行流程大致如下所示:

image.png

需要注意的是,图里的次级子任务可以一直分下去,一直分到子任务足够小为止。用伪代码来表示如下:

 solve(任务):
     if(任务已经划分到足够小):
         顺序执行任务
     else:
         for(划分任务得到子任务)
             solve(子任务)
         结合所有子任务的结果到上一层循环
         return 最终结合的结果

通过上面伪代码可以看出,我们通过递归嵌套的计算得到最终结果,这里有体现分而治之(divide and conquer) 的算法思想。

工作窃取算法

工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。

工作窃取流程如下图所示:

image.png

值得注意的是,当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。

另外,当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。

Fork/Join的具体实现

Fork/Join框架简单来讲就是对任务的分割与子任务的合并,所以要实现这个框架,先得有任务

在Fork/Join框架里提供了抽象类ForkJoinTask来实现任务。

ForkJoinTask

ForkJoinTask是一个类似普通线程的实体,但是比普通线程轻量得多。

fork()方法

其实fork()只做了一件事,那就是把任务推入当前工作线程的工作队列里

来看下fork()的源码:

 public final ForkJoinTask<V> fork() {
     Thread t;
     // ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool管理
     // 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去
     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
         ((ForkJoinWorkerThread)t).workQueue.push(this);
     else
          // 如果不是则将线程加入队列
         // 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中
         ForkJoinPool.common.externalPush(this);
     return this;
 }

join()方法

Join() 的主要作用是阻塞当前线程并等待获取结果。

来看下join()的源码:

 public final V join() {
     int s;
     // doJoin()方法来获取当前任务的执行状态
     if ((s = doJoin() & DONE_MASK) != NORMAL)
         // 任务异常,抛出异常
         reportException(s);
     // 任务正常完成,获取返回值
     return getRawResult();
 }

 /**
  * doJoin()方法用来返回当前任务的执行状态
  **/
 private int doJoin() {
     int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
     // 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)
     return (s = status) < 0 ? s :
     // 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程
     ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
         // 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)
         // tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true
         // doExec()方法执行任务
         (w = (wt = (ForkJoinWorkerThread)t).workQueue).
         // 如果是处于顶端并且任务执行完毕,返回结果
         tryUnpush(this) && (s = doExec()) < 0 ? s :
         // 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务
         // awaitJoin():使用自旋使任务执行完成,返回结果
         wt.pool.awaitJoin(w, this, 0L) :
     // 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果
     externalAwaitDone();
 }

下面是ForkJoinPool.join()的流程图:

image.png

通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:

  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask :用于有返回结果的任务。

ForkJoinPool

ForkJoinPool是用于执行ForkJoinTask任务的执行(线程)池。

ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。

ForkJoinPool的源码如下:

 @sun.misc.Contended
 public class ForkJoinPool extends AbstractExecutorService {
     // 任务队列
     volatile WorkQueue[] workQueues;// 线程的运行状态
     volatile int runState;// 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写
     public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;// 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响
     static final ForkJoinPool common;// 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用
     // 其他构造方法都是源自于此方法
     // parallelism: 并行度,
     // 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量
     private ForkJoinPool(int parallelism,
                          ForkJoinWorkerThreadFactory factory, // 工作线程工厂
                          UncaughtExceptionHandler handler, // 拒绝任务的handler
                          int mode, // 同步模式
                          String workerNamePrefix) { // 线程名prefix
         this.workerNamePrefix = workerNamePrefix;
         this.factory = factory;
         this.ueh = handler;
         this.config = (parallelism & SMASK) | mode;
         long np = (long)(-parallelism); // offset ctl counts
         this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
     }
 }

WorkQueue

双端队列,ForkJoinTask存放在这里。

当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。

ForkJoinPool与传统线程池最显著的区别就是它维护了一个工作队列数组

runState

ForkJoinPool的运行状态。

Fork/Join的异常处理

ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常。使用如下代码:

if(task.isCompletedAbnormally()){
   System.out.println(task.getException());
}

getException 方法返回 Throwable 对象,如果任务被取消了则返回 CancellationException。如果任务没有完成或者没有抛出异常则返回 null。

Fork/Join的使用

上面我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作,所以要使用Fork/Join框架就离不开这两个类了,只是在实际开发中我们常用ForkJoinTask的子类RecursiveTask 和RecursiveAction来替代ForkJoinTask。

下面我们用一个计算斐波那契数列第n项的例子来看一下Fork/Join的使用:

斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:

1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······

如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。

 public class FibonacciTest {static class Fibonacci extends RecursiveTask<Integer> {int n;public Fibonacci(int n) {
             this.n = n;
         }// 主要的实现逻辑都在compute()里
         @Override
         protected Integer compute() {
             // 这里先假设 n >= 0
             if (n <= 1) {
                 return n;
             } else {
                 // f(n-1)
                 Fibonacci f1 = new Fibonacci(n - 1);
                 f1.fork();
                 // f(n-2)
                 Fibonacci f2 = new Fibonacci(n - 2);
                 f2.fork();
                 // f(n) = f(n-1) + f(n-2)
                 return f1.join() + f2.join();
             }
         }
     }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
        long start = System.currentTimeMillis();
        Fibonacci fibonacci = new Fibonacci(40);
        Future<Integer> future = forkJoinPool.submit(fibonacci);
        System.out.println(future.get());
        long end = System.currentTimeMillis();
        System.out.println(String.format("耗时:%d millis", end - start));
    }
}

上面例子在本机的输出:

CPU核数:6
102334155
耗时:5222 millis

需要注意的是,上述计算时间复杂度为O(2^n),随着n的增长计算效率会越来越低,这也是上面的例子中n不敢取太大的原因。

此外,也并不是所有的任务都适合Fork/Join框架,比如上面的例子任务划分过于细小反而体现不出效率,下面我们试试用普通的递归来求f(n)的值,看看是不是要比使用Fork/Join快:

 // 普通递归,复杂度为O(2^n)
 public int plainRecursion(int n) {
     if (n == 1 || n == 2) {
         return 1;
     } else {
         return plainRecursion(n -1) + plainRecursion(n - 2);
     }
 }@Test
 public void testPlain() {
     long start = System.currentTimeMillis();
     int result = plainRecursion(40);
     long end = System.currentTimeMillis();
     System.out.println("计算结果:" + result);
     System.out.println(String.format("耗时:%d millis",  end -start));
 }

普通递归的例子输出:

 计算结果:102334155
 耗时:436 millis

通过输出可以很明显的看出来,使用普通递归的效率都要比使用Fork/Join框架要高很多。

这里我们再用另一种思路来计算:

// 通过循环来计算,复杂度为O(n)
private static int computeFibonacci(int n) {
    // 假设n >= 0
    if (n <= 1) {
        return n;
    } else {
        int first = 1;
        int second = 1;
        int third = 0;
        for (int i = 3; i <= n; i ++) {
            // 第三个数是前两个数之和
            third = first + second;
            // 前两个数右移
            first = second;
            second = third;
        }
        return third;
    }
}

public static void main(String[] args) {
    long start = System.currentTimeMillis();
    int result = computeFibonacci(40);
    long end = System.currentTimeMillis();
    System.out.println("计算结果:" + result);
    System.out.println(String.format("耗时:%d millis",  end -start));
}

上面例子在笔者所用电脑的输出为:

计算结果:102334155
耗时:0 millis

这里耗时为0不代表没有耗时,是表明这里计算的耗时几乎可以忽略不计,大家可以在自己的电脑试试,即使是n取大很多量级的数据(注意int溢出的问题)耗时也是很短的。

为什么在这里普通的递归或循环效率更快呢?因为Fork/Join是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。

如果要计算的任务比较简单(比如我们案例中的斐波那契数列),那当然是直接使用单线程会更快一些。但如果要计算的东西比较复杂,计算机又是多核的情况下,就可以充分利用多核CPU来提高计算速度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/628636.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java的内部类

1.内部类的概念 内部类表示的事物是外部类的一部分&#xff0c;内部类单独出现没有任何意义。如发动机是汽车的一部分。 内部类的访问特点&#xff1a; &#xff08;1&#xff09;内部类可以直接访问外部类的成员&#xff0c;包括私有&#xff1b; &#xff08;2&#xff09;外…

11.无监督学习之主成分分析

11.1 降维 降维的两种应用&#xff1a;一是数据压缩&#xff1b;二是可视化数据。 11.1.1 数据压缩 将相关性强的两个特征导致冗余&#xff0c;可以直接去掉其中一个特征&#xff0c;或者将两个特征进行某种转换&#xff0c;得到一个特征。 11.1.2 可视化数据 直接看数据可…

设计模式:提升软件设计质量的利器,适合入门者的指南

目录 导言&#xff1a;设计模式的概念常见的设计模式2.1. 单例模式&#xff08;Singleton Pattern&#xff09;2.2. 工厂模式&#xff08;Factory Pattern&#xff09;2.3. 观察者模式&#xff08;Observer Pattern&#xff09;2.4. 策略模式&#xff08;Strategy Pattern&…

OpenGL 摄像机

1.简介 OpenGL本身没有摄像机(Camera)的概念&#xff0c;但我们可以通过把场景中的所有物体往相反方向移动的方式来模拟出摄像机&#xff0c;产生一种我们在移动的感觉&#xff0c;而不是场景在移动。 要定义一个摄像机&#xff0c;我们需要它在世界空间中的位置、观察的方向…

Java 实现在顺序表中获取 pos 元素的位置

一、思路 1.顺序表不能是空的 2.pos位置要合法 3.直接返回当前的pos位置的下标 二、图解 返回的要是当前 pos 位置的下标&#xff0c;因为下标是没有负数的&#xff0c;由此就可以得出如果下标是负数的话&#xff0c; 这就是一种不合法的情况。 pos 位置的下标也不会超过顺序…

星空特效,截图不太完美

先上效果&#xff1a; 再上代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>星空</title><meta name"viewport" content"widthdevice-width, user-scalable…

【立体视觉(三)】之张正友标定法原理

【立体视觉&#xff08;三&#xff09;】之张正友标定法原理 一、相机标定二、参数求解一&#xff09;闭合解二&#xff09;极大似然解三&#xff09;考虑相机畸变 三、实验流程 此为个人学习笔记&#xff0c;在各处借鉴了不少好图好文&#xff08;参考文献在文末&#xff09;&…

项目中常用的linux命令总结大全

哈喽 大家好啊&#xff0c;相信大家在项目中都会常常部署服务器&#xff0c;就涉及到一些常见的linux命令了 1.命令提示符表示命令输入 ps aux | grep nginx&#xff08;查看nginx进程运行状态&#xff09;whereis nginx 查找哪里有nginxlsof -i:5300 查看端口状态kill -9 xxx进…

攻防世界-Crypto-转轮机加密

1. 题目如下&#xff1a; 1: < ZWAXJGDLUBVIQHKYPNTCRMOSFE < 2: < KPBELNACZDTRXMJQOYHGVSFUWI < 3: < BDMAIZVRNSJUWFHTEQGYXPLOCK < 4: < RPLNDVHGFCUKTEBSXQYIZMJWAO < 5: < IHFRLABEUOTSGJVDKCPMNZQWXY < 6: < AMKGHIWPNYCJBFZDR…

大数据需要学习哪些内容?

大数据技术的体系庞大且复杂&#xff0c;每年都会涌现出大量新的技术&#xff0c;目前大数据行业所涉及到的核心技术主要就是&#xff1a;数据采集、数据存储、数据清洗、数据查询分析和数据可视化。 Python 已成利器 在大数据领域中大放异彩 Python&#xff0c;成为职场人追求…

论文阅读和分析:Binary CorNET Accelerator for HR Estimation From Wrist-PPG

主要贡献&#xff1a; 一种完全二值化网络(bCorNET)拓扑结构及其相应的算法-架构映射和高效实现。对CorNET进行量化后&#xff0c;减少计算量&#xff0c;又能实现减轻运动伪影的效果。 该框架在22个IEEE SPC受试者上的MAE为6.675.49 bpm。该设计采用ST65 nm技术框架&#xff…

04_两种常见的网页反爬措施及应对方法

一、封禁IP地址反爬 1、应对思路: 理解这种反爬方法的含义:当我们用自己电脑的ip地址短时间,高频率访问某个具有此类反爬设置的网站,这种网站就会把我们的ip地址封禁,一般都是封24小时或者其他时间。解决方案:通过代理ip访问,这种方式只不过就是让你有了重新访问网页的…

宝塔面板搭建thinkphp后请求中去除index.php后缀

宝塔面板搭建thinkphp后请求中去除index.php后缀 nginx配置 在宝塔面板网站中绑定thinkphp的public&#xff0c;添加站点 点击站点设置按钮打开项目设置页面 找到配置文件 选项&#xff0c;然后在 22行 后添加一下代码 location / {if (!-f $request_filename) {rewrite …

一学就会----链表的中间节点

文章目录 题目描述思路代码示例在原题上增加难度思路代码示例 题目描述 给定一个头结点为 head 的非空单链表&#xff0c;返回链表的中间结点。 如果有两个中间结点&#xff0c;则返回第二个中间结点。 图片示例&#xff1a; 思路 因为这道题目并没有时间复杂度的规定&#xf…

iOS17beta有哪些Bug?iOS17值得升级吗?iOS17Bug大汇总!

iOS17Beta已上线几天&#xff0c;带来了新增横屏待机、“嘿Siri”去除了“嘿”、联系人海报、NameDrop等新功能。 做为第一个beta版本&#xff0c;避免不了许多Bug的出现。 小编收集了目前体验遇到和网上反馈的所有BUG&#xff0c;还没更新iOS17的小伙伴们可以看看截止目前升级…

C++ 类型转换:类型萃取器进行类型转换和cast类操作符进行转换的区别?

区别 类型萃取器和cast类操作符都可以用于类型转换&#xff0c;但它们的用途和工作方式有所不同。 类型萃取器&#xff0c;如 std::remove_reference<T>、std::remove_const<T>、std::add_pointer<T> 等&#xff0c;主要用于在编译时修改类型&#xff0c;它…

# WGCNA | 不止一个组的WGCNA怎么分析嘞!?~(四)(共识网络分析-第四步-共识模块与性状相关联)

1写在前面 最近稍微没有那么忙了&#xff0c;好好搞一下公众号吧&#xff0c;好久没怎么认真做了。&#x1f637; 有的时候你会发现坏事不一定是坏事&#xff0c;塞翁失马&#xff0c;焉知非福啊&#xff01;~&#x1f643; "我只担心一件事,我怕我配不上自己所受的苦难。…

springcloud-alibaba (06)RocketMQ下载安装和单机启动个人笔记

RocketMQ 01 下载RocketMQ02 安装RocketMQ03 启动RocketMQ1. 内存分配1.1 第一步1.2 第二步1.3 第三步 2. 启动RocketMQ2.1 启动NameServer2.2 启动Broker 04 测试RocketMQ05 关闭RocketMQ 01 下载RocketMQ 下载 RocketMQ 即可以从 Apache 官网下载&#xff0c;也可以从 gitHu…

C#调用C++的动态链接库

C#调用C的动态链接库 问题所在使用VS编辑所需要调用的函数&#xff08;c&#xff09;创建c#项目&#xff0c;调用c动态库 这是一个测试代码。 问题所在 c# 调用c的dll库不能引用 解决办法是 1&#xff1a;在写c代码的时候&#xff0c;把他加入到dll中 2&#xff1a;将c的dll库…

Selenium自动化测试框架工作原理你明白了吗?

一、Selenium是什么&#xff1f; 用官网的一句话来讲&#xff1a;Selenium automates browsers. Thats it&#xff01;简单来讲&#xff0c;Selenium是一个用于Web应用程序自动化测试工具。Selenium测试直接运行在浏览器中&#xff0c;就像真正的用户在操作浏览器一样。支持的…