Java并发工具Fork/Join原理

news2024/11/18 19:49:20

我们一直讲,并发编程可以分为三个层面的问题,分别是分工、协作和互斥,当你关注于任务的时候,你会发现你的视角已经从并发编程的细节中跳出来了,你应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,所以我把线程池、Future、CompletableFuture和CompletionService都列到了分工里面。

下面我用现实世界里的工作流程图描述了并发编程领域的简单并行任务、聚合任务和批量并行任务,辅以这些流程图,相信你一定能将你的思维模式转换到现实世界里来。

alt

从上到下,依次为简单并行任务、聚合任务和批量并行任务示意图

上面提到的简单并行、聚合、批量并行这三种任务模型,基本上能够覆盖日常工作中的并发场景了,但还是不够全面,因为还有一种“分治”的任务模型没有覆盖到。 分治,顾名思义,即分而治之,是一种解决复杂问题的思维方法和模式;具体来讲,指的是 把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解。理论上来讲,解决每一个问题都对应着一个任务,所以对于问题的分治,实际上就是对于任务的分治。

分治思想在很多领域都有广泛的应用,例如算法领域有分治算法(归并排序、快速排序都属于分治算法,二分法查找也是一种分治算法);大数据领域知名的计算框架MapReduce背后的思想也是分治。既然分治这种任务模型如此普遍,那Java显然也需要支持,Java并发包里提供了一种叫做Fork/Join的并行计算框架,就是用来支持分治这种任务模型的。

分治任务模型

这里你需要先深入了解一下分治任务模型,分治任务模型可分为两个阶段:一个阶段是 任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;另一个阶段是 结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图,你可以对照着理解。

alt

简版分治任务模型图

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。

Fork/Join的使用

Fork/Join是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork对应的是分治任务模型里的任务分解,Join对应的是结果合并。Fork/Join计算框架主要包含两部分,一部分是 分治任务的线程池ForkJoinPool,另一部分是 分治任务ForkJoinTask。这两部分的关系类似于ThreadPoolExecutor和Runnable的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型ForkJoinTask。

ForkJoinTask是一个抽象类,它的方法有很多,最核心的是fork()方法和join()方法,其中fork()方法会异步地执行一个子任务,而join()方法则会阻塞当前线程来等待子任务的执行结果。ForkJoinTask有两个子类——RecursiveAction和RecursiveTask,通过名字你就应该能知道,它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法compute(),不过区别是RecursiveAction定义的compute()没有返回值,而RecursiveTask定义的compute()方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。

接下来我们就来实现一下,看看如何用Fork/Join这个并行计算框架计算斐波那契数列(下面的代码源自Java官方示例)。首先我们需要创建一个分治任务线程池以及计算斐波那契数列的分治任务,之后通过调用分治任务线程池的 invoke() 方法来启动分治任务。由于计算斐波那契数列需要有返回值,所以Fibonacci 继承自RecursiveTask。分治任务Fibonacci 需要实现compute()方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的。

static void main(String[] args){
  //创建分治任务线程池
  ForkJoinPool fjp =
    new ForkJoinPool(4);
  //创建分治任务
  Fibonacci fib =
    new Fibonacci(30);
  //启动分治任务
  Integer result =
    fjp.invoke(fib);
  //输出结果
  System.out.println(result);
}
//递归任务
static class Fibonacci extends
    RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){
    if (n <= 1)
      return n;
    Fibonacci f1 =
      new Fibonacci(n - 1);
    //创建子任务
    f1.fork();
    Fibonacci f2 =
      new Fibonacci(n - 2);
    //等待子任务结果,并合并结果
    return f2.compute() + f1.join();
  }
}

ForkJoinPool工作原理

Fork/Join并行计算的核心组件是ForkJoinPool,所以下面我们就来简单介绍一下ForkJoinPool的工作原理。

ThreadPoolExecutor本质上是一个生产者-消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;ThreadPoolExecutor可以有多个工作线程,但是这些工作线程都共享一个任务队列。

ForkJoinPool本质上也是一个生产者-消费者的实现,但是更加智能,你可以参考下面的ForkJoinPool工作原理图来理解其原理。ThreadPoolExecutor内部只有一个任务队列,而ForkJoinPool内部有多个任务队列,当我们通过ForkJoinPool的invoke()或者submit()方法提交任务时,ForkJoinPool根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。

如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool支持一种叫做“ 任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务,例如下图中,线程T2对应的任务队列已经空了,它可以“窃取”线程T1对应的任务队列的任务。如此一来,所有的工作线程都不会闲下来了。

ForkJoinPool中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。我们这里介绍的仅仅是简化后的原理,ForkJoinPool的实现远比我们这里介绍的复杂,如果你感兴趣,建议去看它的源码。

alt

ForkJoinPool工作原理图

模拟MapReduce统计单词数量

学习MapReduce有一个入门程序,统计一个文件里面每个单词的数量,下面我们来看看如何用Fork/Join并行计算框架来实现。

我们可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果,你可以对照前面的简版分治任务模型图来理解这个过程。

思路有了,我们马上来实现。下面的示例程序用一个字符串数组 String[] fc 来模拟文件内容,fc里面的元素与文件里面的行数据一一对应。关键的代码在 compute() 这个方法里面,这是一个递归方法,前半部分数据fork一个递归任务去处理(关键代码mr1.fork()),后半部分数据则在当前任务中递归处理(mr2.compute())。

static void main(String[] args){
  String[] fc = {"hello world",
          "hello me",
          "hello fork",
          "hello join",
          "fork join in world"};
  //创建ForkJoin线程池
  ForkJoinPool fjp =
      new ForkJoinPool(3);
  //创建任务
  MR mr = new MR(
      fc, 0, fc.length);
  //启动任务
  Map<String, Long> result =
      fjp.invoke(mr);
  //输出结果
  result.forEach((k, v)->
    System.out.println(k+":"+v));
}
//MR模拟类
static class MR extends
  RecursiveTask<Map<String, Long>> {
  private String[] fc;
  private int start, end;
  //构造函数
  MR(String[] fc, int fr, int to){
    this.fc = fc;
    this.start = fr;
    this.end = to;
  }
  @Override protected
  Map<String, Long> compute(){
    if (end - start == 1) {
      return calc(fc[start]);
    } else {
      int mid = (start+end)/2;
      MR mr1 = new MR(
          fc, start, mid);
      mr1.fork();
      MR mr2 = new MR(
          fc, mid, end);
      //计算子任务,并返回合并的结果
      return merge(mr2.compute(),
          mr1.join());
    }
  }
  //合并结果
  private Map<String, Long> merge(
      Map<String, Long> r1,
      Map<String, Long> r2) {
    Map<String, Long> result =
        new HashMap<>();
    result.putAll(r1);
    //合并结果
    r2.forEach((k, v) -> {
      Long c = result.get(k);
      if (c != null)
        result.put(k, c+v);
      else
        result.put(k, v);
    });
    return result;
  }
  //统计单词数量
  private Map<String, Long>
      calc(String line) {
    Map<String, Long> result =
        new HashMap<>();
    //分割单词
    String [] words =
        line.split("\\s+");
    //统计单词数量
    for (String w : words) {
      Long v = result.get(w);
      if (v != null)
        result.put(w, v+1);
      else
        result.put(w, 1L);
    }
    return result;
  }
}

总结

Fork/Join并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的MapReduce,所以你可以把Fork/Join看作单机版的MapReduce。

Fork/Join并行计算框架的核心组件是ForkJoinPool。ForkJoinPool支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8提供的Stream API里面并行流也是以ForkJoinPool为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个ForkJoinPool,这个共享的ForkJoinPool默认的线程数是CPU的核数;如果所有的并行流计算都是CPU密集型计算的话,完全没有问题,但是如果存在I/O密集型的并行流计算,那么很可能会因为一个很慢的I/O计算而拖慢整个系统的性能。所以 建议用不同的ForkJoinPool执行不同类型的计算任务

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/681662.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Golang每日一练(leetDay0107) 去除重复字母、最大单词长度乘积

目录 316. 去除重复字母 Remove Duplicate Letters &#x1f31f;&#x1f31f; 318. 最大单词长度乘积 Maximum-product-of-word-lengths &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日一练 专栏 Golang每日一练 专栏 Python每日…

【Less】四则运算

Less四则运算特点 对于两个不同单位值之间的运算&#xff0c;不要求你进行运算操作的几个值必须带单位&#xff0c;只要其中有一个有单位就可以了&#xff0c;运算结果的值会优先取第一个值的单位为准。如&#xff1a; 2030px-10em编译成40px。border:10em; border:(border2px)…

数据挖掘——宁县(区、市)农村居民人均可支配收入影响因子分析(论文)

《数据挖掘与分析》课程论文 题目&#xff1a;宁县&#xff08;区、市&#xff09;农村居民人均可支配收入影响因子分析 xx学院xx班&#xff1a;xxx 2022年6月 摘要&#xff1a;农村居民人均可支配收入可能被农作物产量、牲畜存栏、农作物播种数量等诸多因素影响。为此&#…

JavaSE基础语法--封装

Java是一门面向对象的语言。面向对象的三大特性&#xff1a;封装&#xff0c;继承&#xff0c;多态。封装到底是什么含义呢&#xff1f;通俗来讲就是屏蔽掉类的实现细节&#xff0c;对外提供接口让你调用。举个现实生活中的例子&#xff1a; 刚好618刚过&#xff0c;我因为需求…

chatgpt赋能python:Python算和的重要性及优势

Python算和的重要性及优势 在现代科技时代&#xff0c;计算机的应用范围越来越广泛&#xff0c;Python算和作为一种高效而强大的计算工具&#xff0c;已经成为了无数科学家和工程师的必备技能。Python算和不仅仅在各类科学实验中有着重要的应用&#xff0c;也在企业开发、数据…

chatgpt赋能python:Python程序一直运行怎么停止?

Python程序一直运行怎么停止&#xff1f; 在开发软件时&#xff0c;有时候我们会遇到Python程序一直运行不停止的情况&#xff0c;这时候我们该如何解决呢&#xff1f;本文将介绍一些常见的方法帮助您停止Python程序。 常见的停止Python程序的方法 1. KeyboardInterrupt&…

【硬件5】vr电源芯片驱动

文章目录 1.读MPS5023芯片&#xff1a;0x03ff即将前6位屏蔽2.读PXE1410CDM电压和电流&#xff1a;一个数&0x7ff&#xff0c;将这个数前5位全变为0&#xff0c;其余位不变2.1 1ine11&#xff1a;先看第15和10位&#xff0c;e9b6是上面读出的值2.2 1ine16&#xff1a;PMBUS协…

chatgpt赋能python:Python空循环:提高代码效率的神器

Python空循环&#xff1a;提高代码效率的神器 Python作为一门高效、易学的编程语言&#xff0c;广泛应用于各行各业。在编写Python代码时&#xff0c;循环结构是经常使用的。但是&#xff0c;有时候我们需要使用循环结构&#xff0c;但并不需要执行任何操作。这时候&#xff0…

chatgpt赋能python:Python中的空格:一种重要的编程元素

Python中的空格&#xff1a;一种重要的编程元素 在Python编程中&#xff0c;空格是被广泛使用的重要元素之一。本文将介绍Python中空格的重要性&#xff0c;并探讨空格在编程中的不同应用。 为什么空格在Python编程中如此重要&#xff1f; Python对空格敏感&#xff0c;意味…

CVPR2023 多目标跟踪(MOT)汇总

一、《OVTrack: Open-Vocabulary Multiple Object Tracking》 作者:Siyuan Li* Tobias Fischer* Lei Ke Henghui Ding Martin Danelljan Fisher Yu Computer Vision Lab, ETH Zurich 论文链接 &#xff1a;https://openaccess.thecvf.com/content/CVPR2023/papers/Li_OVTrack…

[Selenium] 通过Java+Selenium查询某个博主的Top40文章质量分

系列文章目录 通过JavaSelenium查询文章质量分 通过JavaSelenium查询某个博主的Top40文章质量分 文章目录 系列文章目录前言一、环境准备二、查询某个博主的Top40文章2.1、修改pom.xml配置2.2、配置Chrome驱动2.3、引入浏览器配置2.4、设置无头模式2.5、启动浏览器实例&#x…

【瑞萨RA_FSP】WiFi——ESP8266模块通讯

文章目录 一、Wifi模块简介二、ESP8266功能介绍1. 通用输入/输出接口&#xff08;GPIO&#xff09;2. 使用UART与WIFI通讯3. ESP8266工作模式介绍 三、AT指令四、实验&#xff1a;STA模式测试1. 文件结构2. 宏定义函数3. ESP8266-STA功能函数4. 中断回调函数5. hal_entry入口函…

chatgpt赋能python:Python中的空值

Python中的空值 在Python编程中&#xff0c;空值指的是没有任何值的对象。在其他编程语言中&#xff0c;空值常常被称为null、nil、None或者undefined。Python中的空值用None关键字表示。 None 在Python语言中&#xff0c;None用于表示没有任何值。当用户要定义一个值却不想…

chatgpt赋能python:Python组合框(Combobox)介绍

Python组合框&#xff08;Combobox&#xff09;介绍 Python组合框&#xff08;Combobox&#xff09;是GUI编程中常用的一个组件&#xff0c;它可以让你在一个下拉框中选择一个或多个选项。Python组合框的特点是可以让用户自定义选项&#xff0c;也可以在选项中加入数据&#x…

红黑树-迭代器实现

目录 迭代器自增 当前结点存在右子树 当前结点没有存在右子树 迭代器自增完整代码 迭代器自减 迭代器自减代码&#xff1a; 迭代器自增 红黑树的迭代器应该怎么实现呢&#xff1f;现在我模仿大佬的实现逻辑。 我们迭代器最重要是可以允许自增与自减的实现的。 会发现我们…

chatgpt赋能python:Python程序运行速度问题

Python程序运行速度问题 Python是一种高级语言&#xff0c;其语法简洁、易于学习&#xff0c;在科学计算、数据分析、web开发等领域有着广泛的应用。然而&#xff0c;Python程序在运行速度方面却存在一定的瓶颈&#xff0c;这也是许多开发者关注的问题。本文将从多个方面探讨影…

【深度学习笔记】二分类问题与 Logistic 回归

本专栏是网易云课堂人工智能课程《神经网络与深度学习》的学习笔记&#xff0c;视频由网易云课堂与 deeplearning.ai 联合出品&#xff0c;主讲人是吴恩达 Andrew Ng 教授。刚兴趣的网友可以观看网易云课堂的视频进行深入学习&#xff0c;视频的链接如下&#xff1a; https://m…

chatgpt赋能python:Python等待一秒:介绍和用法全解析

Python等待一秒&#xff1a;介绍和用法全解析 什么是Python等待一秒&#xff1f; 在编写Python脚本时&#xff0c;我们通常要让程序暂停一段时间&#xff0c;这可以通过让程序等待一定的时间来实现。等待时间可以是任意长度的时间&#xff0c;最常见的时间单位是秒。Python中…

chatgpt赋能python:三种常见的Python程序错误及解决方案

三种常见的Python程序错误及解决方案 Python是一种高级编程语言&#xff0c;具有易读、易学、易维护等特点&#xff0c;被广泛应用于Web开发、数据分析、人工智能等领域。但是&#xff0c;即使是有10年Python编程经验的工程师也难免会犯错误。这篇文章将介绍Python程序中的三种…

Unity3D:添加设备

Unity3D&#xff1a;添加设备 推荐&#xff1a;将NSDT场景编辑器加入你的3D工具链 3D工具集&#xff1a;NSDT简石数字孪生 添加设备 若要将新设备添加到设备模拟器&#xff0c;请创建设备定义和设备覆盖。 设备定义是 Unity 项目中扩展名的文本文件。它包含描述设备属性的 …