Day845.Fork/Join -Java 并发编程实战

news2025/1/12 9:41:30

Fork/Join

Hi,我是阿昌,今天学习记录的是关于Fork/Join的内容。

线程池、Future、CompletableFuture 和 CompletionService,仔细观察会发现这些工具类都是在帮助站在任务的视角来解决并发问题,而不是让纠缠在线程之间如何协作的细节上(比如线程之间如何实现等待、通知等)。

对于简单的并行任务,可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;批量的并行任务,则可以通过 CompletionService 来解决。

并发编程可以分为三个层面的问题,分别是:

  • 分工
  • 协作
  • 互斥

当关注于任务的时候,会发现视角已经从并发编程的细节中跳出来了,应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,所以把线程池、Future、CompletableFuture 和 CompletionService 都列到了分工里面。

下面用现实世界里的工作流程图描述了并发编程领域的简单并行任务、聚合任务和批量并行任务,辅以这些流程图。

从上到下,依次为简单并行任务、聚合任务和批量并行任务示意图

上面提到的简单并行、聚合、批量并行这三种任务模型,基本上能够覆盖日常工作中的并发场景了,但还是不够全面,因为还有一种“分治”的任务模型没有覆盖到。


分治,顾名思义,即分而治之,是一种解决复杂问题的思维方法和模式;具体来讲,指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解。

理论上来讲,解决每一个问题都对应着一个任务,所以对于问题的分治,实际上就是对于任务的分治。

分治思想在很多领域都有广泛的应用,例如算法领域有分治算法(归并排序、快速排序都属于分治算法,二分法查找也是一种分治算法);大数据领域知名的计算框架 MapReduce 背后的思想也是分治。

既然分治这种任务模型如此普遍,那 Java 显然也需要支持,Java 并发包里提供了一种叫做 Fork/Join 的并行计算框架,就是用来支持分治这种任务模型的。


一、分治任务模型

分治任务模型可分为两个阶段:

  • 一个阶段是任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;
  • 另一个阶段是结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图,你可以对照着理解。

简版分治任务模型图

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。

具备这种相似性的问题,往往都采用递归算法


二、Fork/Join 的使用

Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。

Fork/Join 计算框架主要包含两部分:

  • 一部分是分治任务的线程池 ForkJoinPool
  • 另一部分是分治任务 ForkJoinTask。

这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。

ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。

ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask,通过名字你就应该能知道,它们都是用递归的方式来处理分治任务的。

这两个子类都定义了抽象方法 compute(),不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。

这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。

接下来就来实现一下,看看如何用 Fork/Join 这个并行计算框架计算斐波那契数列(下面的代码源自 Java 官方示例)。

首先需要创建一个分治任务线程池以及计算斐波那契数列的分治任务,之后通过调用分治任务线程池的 invoke() 方法来启动分治任务。

由于计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。

分治任务 Fibonacci 需要实现 compute() 方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的。


static void main(String[] args){
  //创建分治任务线程池  
  ForkJoinPool fjp = 
    new ForkJoinPool(4);
  //创建分治任务
  Fibonacci fib = 
    new Fibonacci(30);   
  //启动分治任务  
  Integer result = 
    fjp.invoke(fib);
  //输出结果  
  System.out.println(result);
}
//递归任务
static class Fibonacci extends 
    RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){
    if (n <= 1)
      return n;
    Fibonacci f1 = 
      new Fibonacci(n - 1);
    //创建子任务  
    f1.fork();
    Fibonacci f2 = 
      new Fibonacci(n - 2);
    //等待子任务结果,并合并结果  
    return f2.compute() + f1.join();
  }
}

三、ForkJoinPool 工作原理

Fork/Join 并行计算的核心组件是 ForkJoinPool,所以下面就来简单介绍一下 ForkJoinPool 的工作原理。

ThreadPoolExecutor 本质上是一个生产者 - 消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;

ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程都共享一个任务队列。

ForkJoinPool 本质上也是一个生产者 - 消费者的实现,但是更加智能,可以参考下面的 ForkJoinPool 工作原理图来理解其原理。

ThreadPoolExecutor 内部只有一个任务队列,而 ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。如果工作线程对应的任务队列空了,是不是就没活儿干了呢?

不是的,ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务,例如下图中,线程 T2 对应的任务队列已经空了,它可以“窃取”线程 T1 对应的任务队列的任务。

如此一来,所有的工作线程都不会闲下来了。ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。

这里介绍的仅仅是简化后的原理,ForkJoinPool 的实现远比这里介绍的复杂,如果感兴趣,建议去看它的源码。

ForkJoinPool 工作原理图


四、模拟 MapReduce 统计单词数量

MapReduce 有一个入门程序,统计一个文件里面每个单词的数量,下面来看看如何用 Fork/Join 并行计算框架来实现。

可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果,可以对照前面的简版分治任务模型图来理解这个过程。

下面的示例程序用一个字符串数组 String[] fc 来模拟文件内容,fc 里面的元素与文件里面的行数据一一对应。

关键的代码在 compute() 这个方法里面,这是一个递归方法,前半部分数据 fork 一个递归任务去处理(关键代码 mr1.fork()),后半部分数据则在当前任务中递归处理(mr2.compute())。


static void main(String[] args){
  String[] fc = {"hello world",
          "hello me",
          "hello fork",
          "hello join",
          "fork join in world"};
  //创建ForkJoin线程池    
  ForkJoinPool fjp = 
      new ForkJoinPool(3);
  //创建任务    
  MR mr = new MR(
      fc, 0, fc.length);  
  //启动任务    
  Map<String, Long> result = 
      fjp.invoke(mr);
  //输出结果    
  result.forEach((k, v)->
    System.out.println(k+":"+v));
}
//MR模拟类
static class MR extends 
  RecursiveTask<Map<String, Long>> {
  private String[] fc;
  private int start, end;
  //构造函数
  MR(String[] fc, int fr, int to){
    this.fc = fc;
    this.start = fr;
    this.end = to;
  }
  @Override 
  protected Map<String, Long> compute(){
    if (end - start == 1) {
      return calc(fc[start]);
    } else {
      int mid = (start+end)/2;
      MR mr1 = new MR(
          fc, start, mid);
      mr1.fork();
      MR mr2 = new MR(
          fc, mid, end);
      //计算子任务,并返回合并的结果    
      return merge(mr2.compute(),
          mr1.join());
    }
  }
  //合并结果
  private Map<String, Long> merge(
      Map<String, Long> r1, 
      Map<String, Long> r2) {
    Map<String, Long> result = 
        new HashMap<>();
    result.putAll(r1);
    //合并结果
    r2.forEach((k, v) -> {
      Long c = result.get(k);
      if (c != null)
        result.put(k, c+v);
      else 
        result.put(k, v);
    });
    return result;
  }
  //统计单词数量
  private Map<String, Long> calc(String line) {
    Map<String, Long> result = new HashMap<>();
    //分割单词    
    String [] words = line.split("\\s+");
    //统计单词数量    
    for (String w : words) {
      Long v = result.get(w);
      if (v != null) 
        result.put(w, v+1);
      else
        result.put(w, 1L);
    }
    return result;
  }
}

五、总结

Fork/Join 并行计算框架主要解决的是分治任务。

分治的核心思想是“分而治之”:

将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。


这个过程非常类似于大数据处理中的 MapReduce,所以你可以把 Fork/Join 看作单机版的 MapReduce。Fork/Join 并行计算框架的核心组件是 ForkJoinPool。

ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。

Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。

不过需要注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;

如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。

所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。

如果对 ForkJoinPool 详细的实现细节感兴趣,也可以参考Doug Lea 的论文。


对于一个 CPU 密集型计算程序,在单核 CPU 上,使用 Fork/Join 并行计算框架是否能够提高性能呢?

CPU同一时间只能处理一个线程,所以理论上,纯cpu密集型计算任务单线程就够了。

多线程的话,线程上下文切换带来的线程现场保存和恢复也会带来额外开销。

但实际上可能要经过测试才知道。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/131571.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度学习:05 卷积神经网络介绍(CNN)

目录 卷积神经网络简介 为什么要用卷积神经网络 网络结构组成 卷积层 卷积计算 卷积核大小 f 边界填充 (p)adding 步长 (s)tride 计算公式 卷积层 激活函数 池化层&#xff08;pooling&#xff09; dropout层 全连接层 卷积神经网络简介 卷积神经网络由一个或多个…

DoIP协议从入门到精通系列——车载网络安全

现代社会慢慢步入数字时代,在这个时代,网络安全已经成为最重要的关注点。自从1980年第一次出现电脑病毒,网络威胁和攻击持续不断,给社会(经济)带来巨大影响。随着汽车的数字化和互联化发展,自然而然会联想到汽车也将为成为黑客攻击的目标。导致的问题除了单纯的不便(攻…

数据结构 | 十大排序超硬核八万字详解【附动图演示、算法复杂度性能分析】

写在前面 2023年的第一篇博客&#xff0c;在这里先祝大家兔年快乐&#x1f430; 本文从学习到搜寻各种资料&#xff0c;整理成博客的形式展现足足花了一个月的时间&#xff0c;慢工出细活&#xff0c;希望本篇文章可以真正带你学懂排序&#xff0c;不再为写排序算法而苦恼 博主…

MQTT协议的工作原理

一、MQTT概述 MQTT由IBM的Andy Stanford-Clark博士和Arcom&#xff08;现为Eurotech&#xff09;的Arlen Nipper于1999年发明。 MQTT 是物联网 &#xff08;IoT&#xff09; 最常用的消息传递协议。MQTT 代表 MQ 遥测传输。该协议是一组规则&#xff0c;用于定义物联网设备如何…

二叉树16:找树左下角的值

主要是我自己刷题的一些记录过程。如果有错可以指出哦&#xff0c;大家一起进步。 转载代码随想录 原文链接&#xff1a; 代码随想录 leetcode链接&#xff1a;513.找树左下角的值 题目&#xff1a; 给定一个二叉树的 根节点 root&#xff0c;请找出该二叉树的 最底层 最左边…

Android每周一轮子:Nvwa(热修复)

前言 &#xff08;废话&#xff09; 最近发现了一个问题&#xff0c;一些平时博客写的很多的程序员&#xff0c;反倒在日常的工作中&#xff0c;却是业务写的很一般&#xff0c;只会摆理论的人&#xff0c;甚至还跑出来教别人如何找工作&#xff0c;如何做架构&#xff0c;其实…

2022年终总结:少年不惧岁月长,彼方尚有荣光在。

2022年终总结&#xff1a;少年不惧岁月长&#xff0c;彼方尚有荣光在。 &#x1f3ac; 博客主页&#xff1a;王同学要努力 &#x1f3ac;个人简介&#xff1a;大三小白&#xff0c;喜欢前端 &#xff0c;热爱分享 &#x1f3a5; 本文由 王同学要努力 原创&#xff0c;首发于…

Eureka 注册中心

Eureka 注册中心目录概述需求&#xff1a;设计思路实现思路分析1.快速上手2.增加 Maven 依赖3.Client端配置注册中心Server端配置注册中心参考资料和推荐阅读Survive by day and develop by night. talk for import biz , show your perfect code,full busy&#xff0c;skip ha…

R 语言 管道操作符

背景 关于代码的简洁性,就是你使用了比较简化的高级操作符,但是有时候会增加代码的可读性。这种可读性在于你是否真正的去了解R的高级语法。你了解高级语法,他就不难,你不了解,他就难,可读性差。 这里我们来讲解一下,关于管道操作符,使R语言编程简化一些。 管道操作…

【登录流程执行逻辑】

1)整体流程图 2.main.js是入口&#xff0c;里面require("permission")时会触发方法体 在进行require("权限时")&#xff0c;会进行一系列初始化。 重定向。 接着走login方法。 vuex其实就是store。 触发vuex里面的方法: await store.dispatch(user/getInf…

CDN消耗速度太快,解决办法

前段时间发现我网站的CDN消耗速度太快了&#xff0c;20多块钱100G的流量&#xff0c;半个月甚至十几天左右都消耗完了。 于是我看CDN的访问ip并不多&#xff0c;发现大部分消耗的都是静态资源&#xff0c;js等文件。 后来找到了解决办法&#xff0c;目前100G还有这么多 方法 …

基于MVC的网上汽车城网站平台开发

摘要 随着现代都市生活节奏的不断加快、网络覆盖面的日益扩大&#xff0c;越来越多的人们加入了网上购物的行列。如今&#xff0c;网购已经成为人们生活的一部分。本系统主要是使用 B/S架构开发出的一个基于 ASP.NET的网上汽车城网站平台开发。前台页面使用CSSDIV&#xff0c;后…

Torch包学习

创建 torch.from_numpy(ndarray) → Tensor&#xff1a;将numpy.ndarray 转换为pytorch的 Tensor。两者共享内存。返回的张量不能改变大小。orch.linspace(start, end, steps100, outNone) → Tensor&#xff1a;生成一个 从start 到 end 的tensor。tensor的长度为steps。包括…

JVM【字节码与类的加载篇】

Class文件结构 字节码文件的跨平台性(了解) 1.Java语言&#xff1a;跨平台的语言 当Java源代码成功编译为字节码后&#xff0c;如果想在不同的平台上运行&#xff0c;则无须再次编译这个优势不再那么吸引人了。Python PHP perl ruby lisp等有强大的解释器跨平台似乎已经快成…

二叉树的遍历大全(前序,中序,后序)+(递归,迭代,Morris方法)

文章目录二叉树的前序遍历递归迭代Morris遍历二叉树的中序遍历递归迭代Morris遍历二叉树的后序遍历递归迭代Morris遍历二叉树的前序遍历 力扣传送门&#xff1a; https://leetcode.cn/problems/binary-tree-preorder-traversal/description/ 递归 递归的解法非常简单&#x…

快过年了,Python实现12306查票以及自动购票....

嗨害大家好鸭&#xff01;我是小熊猫~ 明天就是2023年啦~ 还有谁像我小熊猫一样没有回家的&#xff1f; 这次康康能不能12306抢票回家&#xff01;&#xff01;&#xff01; Python实现12306查票以及自动购票 [代码来源]: 青灯教育-自游老师 [环境使用]: Python 3.8Pycha…

构造函数和原型

1、概述 在典型的 OOP 的语言中&#xff08;如 Java&#xff09;&#xff0c;都存在类的概念&#xff0c;类就是对象的模板&#xff0c;对象就是类的实例&#xff0c;但在 ES6之前&#xff0c; JS 中并没用引入类的概念。ES6&#xff0c; 全称 ECMAScript 6.0 &#xff0c;201…

计算机组成原理实验——三、存储器实验

一、实验目的 1.掌握存储器的工作原理和接口。 2.掌握存储器的实现方法和初始化方法。 3.掌握RISC-V中存储器的存取方式。 二、实验内容 1.利用vivado IP核创建6432的ROM&#xff0c;并在 系数文件中设置数据为123489ab&#xff1b; 2.利用vivado IP核创建6432的RAM&#xf…

猿如意---初学者的一盏明灯---程序员的宝藏app

&#x1f680;write in front&#x1f680; &#x1f4dd;个人主页&#xff1a;认真写博客的夏目浅石. &#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐️ 留言&#x1f4dd;​ &#x1f4e3;系列专栏&#xff1a;我的CSDN活动之旅 &#x1f4ac;总结&#xff1a;希望你看完…

Rabbit客户端操作不同交换机[包含延迟类型]

文章目录一&#xff1a;direct-直投交换机0.0: 说明 --- 只有队列和交换机绑定&#xff0c;且routing key路由键一致才会收到消息1.1: 先后创建两个队列1.2: 队列绑定Direct交换机&#xff0c;和routing key1.3: 未指明路由键&#xff1a;1.4: 指明路由键1.5: 两个队列绑定一个…