并发编程系列-分而治之思想Forkjoin

news2025/1/10 12:50:11

我们介绍过一些有关并发编程的工具和概念,包括线程池、Future、CompletableFuture和CompletionService。如果仔细观察,你会发现这些工具实际上是帮助我们从任务的角度来解决并发问题的,而不是让我们陷入线程之间如何协作的繁琐细节(比如等待和通知等)。

对于简单的并行任务,你可以使用“线程池+Future”的方式来处理。而对于任务之间存在聚合关系的情况,无论是AND聚合还是OR聚合,你都可以利用CompletableFuture来解决。至于批量的并行任务,则可以借助CompletionService来实现。

我们一直强调,并发编程可以从三个层面来思考,分别是分工、协作和互斥。当你关注于任务本身时,你会发现自己的思维模式已经超越了并发编程的技术细节,更加贴近了现实世界中的工作方式。因此,我将线程池、Future、CompletableFuture和CompletionService都归类到了“分工”这个层面。

下面我将通过现实世界里的工作流程图来描述并发编程领域中的简单并行任务、聚合任务和批量并行任务。相信通过这些图示,你能够更好地将自己的思维模式与现实世界联系起来。

alt

在前面提到的简单并行任务、聚合任务和批量并行任务模型之外,还有一种任务模型被称为“分治”。如字面意义所示,分治是一种解决复杂问题的思维方法和模式;具体而言,它将一个复杂的问题分解成多个相似的子问题,然后再将这些子问题进一步分解成更小的子问题,直到每个子问题变得足够简单从而可以直接求解。

从理论上讲,每个问题都对应着一个任务,因此分治实际上就是对任务的划分和组织。分治思想在许多领域都有广泛的应用。例如,在算法领域,我们经常使用分治算法来解决问题(如归并排序和快速排序都属于分治算法,二分查找也是一种分治算法)。在大数据领域,MapReduce计算框架背后的思想也是基于分治。

由于分治这种任务模型的普遍性,Java并发包提供了一种名为Fork/Join的并行计算框架,专门用于支持分治任务模型的应用。

分治任务模型

这里你需要先深入了解一下分治任务模型,分治任务模型可分为两个阶段:一个阶段是 任务分解,也就是将任务迭代地分解为子任务,直至子任务可以直接计算出结果;另一个阶段是 结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图,你可以对照着理解。

alt

简版分治任务模型图

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。

Fork/Join的使用

Fork/Join是一个并行计算框架,主要用于支持分治任务模型。在这个计算框架中,Fork代表任务的分解,而Join代表结果的合并。Fork/Join计算框架主要由两部分组成:分治任务的线程池ForkJoinPool和分治任务ForkJoinTask。这两部分的关系类似于ThreadPoolExecutor和Runnable之间的关系,都是用于提交任务到线程池的,只不过分治任务有自己独特的类型ForkJoinTask。

ForkJoinTask是一个抽象类,其中有许多方法,其中最核心的是fork()方法和join()方法。fork()方法用于异步执行一个子任务,而join()方法通过阻塞当前线程来等待子任务的执行结果。ForkJoinTask有两个子类:RecursiveAction和RecursiveTask。从它们的名字就可以看出,它们都使用递归的方式处理分治任务。这两个子类都定义了一个抽象方法compute(),不同之处在于RecursiveAction的compute()方法没有返回值,而RecursiveTask的compute()方法有返回值。这两个子类也都是抽象类,在使用时需要创建自定义的子类来扩展功能。

接下来,让我们来实现一下如何使用Fork/Join并行计算框架来计算斐波那契数列(下面的代码示例源自Java官方示例)。首先,我们需要创建一个ForkJoinPool线程池以及一个用于计算斐波那契数列的Fibonacci分治任务。然后,通过调用ForkJoinPool线程池的invoke()方法来启动分治任务。由于计算斐波那契数列需要返回结果,所以我们的Fibonacci类继承自RecursiveTask。Fibonacci分治任务需要实现compute()方法,在这个方法中,逻辑与普通计算斐波那契数列的方法非常相似,只是在计算Fibonacci(n - 1)时使用了异步子任务,这通过f1.fork()语句来实现。

static void main(String[] args){
  //创建分治任务线程池
  ForkJoinPool fjp =
    new ForkJoinPool(4);
  //创建分治任务
  Fibonacci fib =
    new Fibonacci(30);
  //启动分治任务
  Integer result =
    fjp.invoke(fib);
  //输出结果
  System.out.println(result);
}
//递归任务
static class Fibonacci extends
    RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){
    if (n <= 1)
      return n;
    Fibonacci f1 =
      new Fibonacci(n - 1);
    //创建子任务
    f1.fork();
    Fibonacci f2 =
      new Fibonacci(n - 2);
    //等待子任务结果,并合并结果
    return f2.compute() + f1.join();
  }
}

ForkJoinPool工作原理

Fork/Join并行计算的核心组件是ForkJoinPool。下面简单介绍一下ForkJoinPool的工作原理。

与ThreadPoolExecutor类似,ForkJoinPool现的。不同之处在部有多个任务队列,用于生产者和消费者之间的通信。当我们通过ForkJoinPool的invoke()或submit()方法提交任务时,ForkJoinPool根据一定的路由规则将任务分配到一个任务队列中。如果任务执行过程中创建了子任务,那么子任务会被提交到对应工作线程的任务队列中。

当工作线程的任务队列为空时,它是否无事可做呢?不是的。ForkJoinPool引入了一种称为"任务窃取"的机制。当工作线程空闲时,它可以从其他工作线程的任务队列中"窃取"任务。例如,在下图中线程T2的任务队列已经为空,它可以窃取线程T1的任务队列中的任务。这样,所有的工作线程都能保持忙碌状态。

ForkJoinPool中的任务队列采用双端队列的形式。工作线程从任务队列的一个端获取任务,而"窃取任务"则从另一端进行消费。这种设计能够避免许多不必要的数据竞争。我们介绍的是ForkJoinPool的简化原理,实际上它的实现比我们介绍的要复杂得多。如果你对此感兴趣,建议阅读其源码。

alt

ForkJoinPool工作原理图

模拟MapReduce统计单词数量

Fork/Join并行计算框架被用来实现学习MapReduce的入门程序,该程序用于统计文件中每个单词的数量。以下是如何使用Fork/Join并行计算框架实现此功能。

首先,我们可以使用二分法递归地将文件拆分为更小的部分,直到每个部分只有一行数据。然后,在每个部分中统计单词的数量,并逐级汇总结果。你可以参考之前提到的简化版分治任务模型图以理解该过程。

现在,让我们开始实现。下面的示例程序使用字符串数组String[] fc来模拟文件内容,其中每个元素与文件中的行数据一一对应。关键代码位于compute()方法中,这是一个递归方法。它将前半部分数据fork一个递归任务进行处理(关键代码:mr1.fork()),而后半部分数据在当前任务中递归处理(mr2.compute())。

import java.util.concurrent.RecursiveTask;

public class WordCountTask extends RecursiveTask<Integer{
    private final String[] fc;
    private final int start, end;
    
    public WordCountTask(String[] fc, int start, int end) {
        this.fc = fc;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Integer compute() {
        if (end - start <= 1) {
            // 对单行数据进行统计
            return countWords(fc[start]);
        } else {
            int mid = (start + end) / 2;
            WordCountTask mr1 = new WordCountTask(fc, start, mid);
            mr1.fork();
            WordCountTask mr2 = new WordCountTask(fc, mid, end);
            int result2 = mr2.compute();
            int result1 = mr1.join();
            // 汇总结果
            return result1 + result2;
        }
    }
    
    private int countWords(String line) {
        String[] words = line.split(" ");
        return words.length;
    }
}

这个示例程序是对Fork/Join模型的简化,实际上在真正的MapReduce框架中,还涉及到数据划分、映射阶段、归约阶段等更多的步骤。但是通过此示例,你可以初步了解如何使用Fork/Join并行计算框架来处理类似的任务。

总结

Fork/Join并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的MapReduce,所以你可以把Fork/Join看作单机版的MapReduce。

Fork/Join并行计算框架的核心组件是ForkJoinPool。ForkJoinPool支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8提供的Stream API里面并行流也是以ForkJoinPool为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个ForkJoinPool,这个共享的ForkJoinPool默认的线程数是CPU的核数;如果所有的并行流计算都是CPU密集型计算的话,完全没有问题,但是如果存在I/O密集型的并行流计算,那么很可能会因为一个很慢的I/O计算而拖慢整个系统的性能。所以 建议用不同的ForkJoinPool执行不同类型的计算任务

如果你对ForkJoinPool详细的实现细节感兴趣,也可以参考 Doug Lea的论文。

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/890168.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux学习之基本指令一

在学习Linux下的基本指令之前首先大家要知道Linux下一切皆目录&#xff0c;我们的操作基本上也都是对目录的操作&#xff0c;这里我们可以联想我们是如何在windows上是如何操作的&#xff0c;只是形式上不同&#xff0c;类比学习更容易理解。 目录 01.ls指令 02. pwd命令 0…

Android Ble蓝牙App(六)请求MTU与显示设备信息

前言 在上一篇文章中已经了解了数据操作的方式&#xff0c;而数据交互的字节长度取决于我们手机与蓝牙设备的最大支持长度。 目录 Ble蓝牙App&#xff08;一&#xff09;扫描Ble蓝牙App&#xff08;二&#xff09;连接与发现服务Ble蓝牙App&#xff08;三&#xff09;特性和属…

财报解读:多元化布局,亲亲食品欲摘“果冻之王”头衔?

提及亲亲食品&#xff0c;大多数消费者的印象是其果冻、虾条等产品&#xff0c;而随着消费升级、休闲零食行业竞争的加剧&#xff0c;亲亲食品近年来也想“多条腿走路”&#xff0c;持续进行着产品创新。 不过&#xff0c;在市场看来&#xff0c;承载其转型愿景的新品并未带来…

ORB-SLAM2学习笔记8之特征点提取、生成描述子的ORBextractor

文章目录 0 引言1 特征点提取1.1 提取流程1.2 ORBextractor.cc1.2.1 成员函数1.2.2 成员变量 1.3 构建图像金字塔1.3.1 为什么要构建图像金字塔1.3.2 金字塔参数设置 1.4 提取ORB特征点1.4.1 Fast角点检测1.4.2 特征点提取流程1.4.3 八叉树筛选及非极大值抑制 2 描述子生成2.1 …

【数据集】GRNWRZ V2.0(全球河流网络和相应的水资源分区)介绍及下载

数据介绍 论文&#xff1a;J2022-A data set of global river networks and corresponding water resources zones divisions v2 论文下载地址 GRNWRZV1.0由严登华教授&#xff08;中国水利水电科学研究院&#xff09;团队开发&#xff0c;以全球90mDEM数据为基础&#xff0c;结…

SpringBoot-Mybatis 入门(数据库增删改查)

创建SpringBoot-Mybatis项目 创建新项目&#xff0c;注意Type要选择Maven Spring Boot的版本没啥硬性要求&#xff0c;一般开开发环境 依赖必选MySQL Driver、MyBatis Framework&#xff1b; 选Spring Web是为了辅助未来的web开发&#xff1b; Lombok是个人开发习惯。 配置 …

【C++精华铺】8.C++模板初阶

目录 1. 泛型编程 2. 函数模板 2.1 函数模板的概念及格式 2.2 函数模板的原理 2.3 模板的实例化 2.4 模板参数的匹配原则 3. 类模板 3.1 类模板格式 3.2 类模板的实例化 1. 泛型编程 什么是泛型编程&#xff1f;泛型编程是避免使用某种具体类型而去使用某种通用类型来进行…

FT2000+低温情况下RTC守时问题

1、背景介绍 飞腾2000芯片通过I2C连接一块RTC时钟芯片&#xff08;BellingBL5372&#xff09;来实现麒麟信安系统下后的守时功能。目前BIOS支持UEFI功能&#xff0c;BIOS上电后能获取RTC时间&#xff0c;并将时间写入相应的UEFI变量或内存区域&#xff0c;操作系统上电后使用U…

大模型基础02:GPT家族与提示学习

大模型基础&#xff1a;GPT 家族与提示学习 从 GPT-1 到 GPT-3.5 GPT(Generative Pre-trained Transformer)是 Google 于2018年提出的一种基于 Transformer 的预训练语言模型。它标志着自然语言处理领域从 RNN 时代进入 Transformer 时代。GPT 的发展历史和技术特点如下: GP…

产业园区数字孪生3d可视化全景展示方案

随着数字经济的发展&#xff0c;数字技术给企业发展带来了机遇的同时&#xff0c;也为企业管理带来挑战。比如园区运维&#xff0c;不仅体量大&#xff0c;复杂的运维管理系统&#xff0c;落地难度也较高。那么如何通过数字化手段重塑园区运营&#xff0c;打通园区各业务数据孤…

LimeReport设置当前打印时间

拉一个文本控件&#xff0c;然后填入下面信息&#xff1a; $S{ var curDate new Date(); var strYear curDate.getFullYear().toString(); var strMonth (curDate.getMonth() 1).toString(); var strDay curDate.getDate().toString(); var strHour curDate.getHours().…

句子变形金刚:变相的含义

一、说明 变形金刚完全重建了自然语言处理&#xff08;NLP&#xff09;的格局。在变形金刚出现之前&#xff0c;由于循环神经网络&#xff08;RNN&#xff09;&#xff0c;我们的翻译和语言分类很好——它们的语言理解能力有限&#xff0c;导致许多小错误&#xff0c;而且在大块…

mybatis-x插件的使用

mybatis-x能够帮助我们快速通过数据库生成实体类&#xff0c;mapper层&#xff0c;service层等 下面就带大家如何使用 首先我们去idea里面下载mybatis-x&#xff0c;如下图 我们还需要使用idea连接mysql数据库。效果如下图 接下来我们右键选中表&#xff0c;再选择第一项这个mm…

Panorama SCADA软件在无人值守水泵房的应用

应用背景 城市自来水公司、工矿企业的水泵房普遍数量较多&#xff0c;分布也较分散&#xff0c;在管理上通常会存在需要不定时开关泵、水泵故障不能及时发现、操作人员需24小时轮班值守、人员管理成本高等问题。 虹科Panorama是一个用于构建数据采集、SCADA 和历史解决方案的软…

华为云零代码新手教学-体验通过Astro Zero快速搭建微信小程序

您将会学到 您将学会如何基于Astro零代码能力&#xff0c;DIY开发&#xff0c;完成问卷、投票、信息收集、流程处理等工作&#xff0c;还能够在线筛选、分析数据。实现一站式快速开发个性化应用&#xff0c;体验轻松拖拽开发的乐趣。 您需要什么 环境准备 注册华为云账号、实…

【socket编程简述】TCP UDP 通信总结、TCP连接的三次握手、TCP断开的四次挥手

Socket&#xff1a;Socket被称做 套接字&#xff0c;是网络通信中的一种约定。 Socket编程的应用无处不在&#xff0c;我们平时用的QQ、微信、浏览器等程序.都与Socket编程有关。 三次握手 四次断开 面试可…

Android Selector 的使用

什么是 Selector&#xff1f; Selector 和 Shape 相似&#xff0c;是Drawable资源的一种&#xff0c;可以根据不同的状态&#xff0c;设置不同的图片效果&#xff0c;关键节点 < selector > &#xff0c;例如&#xff1a;我们只需要将Button的 background 属性设置为该dr…

k8s v1.27.4二进制部署记录

记录二进制部署过程 #!/bin/bash#升级内核 update_kernel() {rpm --import https://www.elrepo.org/RPM-GPG-KEY-elrepo.orgyum -y install https://www.elrepo.org/elrepo-release-7.el7.elrepo.noarch.rpmyum --disablerepo"*" --enablerepo"elrepo-kernel&q…

男装已成越南电商红海赛道,品牌如何突围?

据Metric最新数据&#xff0c;在越南电商市场&#xff0c;男装类目竞争相对激烈&#xff0c;在各大电商平台都已出现饱和迹象。然而&#xff0c;在这片竞争激烈的红海中&#xff0c;仍有品牌找准机会成功突围&#xff0c;为其他品牌提供经验借鉴。 越南男装电商竞争激烈&#…

每天一道leetcode:433. 最小基因变化(图论中等广度优先遍历)

今日份题目&#xff1a; 基因序列可以表示为一条由 8 个字符组成的字符串&#xff0c;其中每个字符都是 A、C、G 和 T 之一。 假设我们需要调查从基因序列 start 变为 end 所发生的基因变化。一次基因变化就意味着这个基因序列中的一个字符发生了变化。 例如&#xff0c;&quo…