Java Fork-Join框架学习

news2025/1/17 0:52:52

概述

Fork/Join是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork负责把一个大任务切分为若干并行执行的子任务,Join负责合并这些子任务的执行结果,最后得到这个大任务的结果。

使用类似MapReduce的分治思想,先分割再合并,故而包括两个关键步骤:

  1. 分割任务。需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
  2. 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join使用两个类来完成以上两件事情:

  1. ForkJoinTask:要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供以下两个子类:
    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask:用于有返回结果的任务。
  2. ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

ForkJoinTask与一般的任务的主要区别在于它需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

实例

public class ForkJoinMergeSort {
	
	public static void main(String[] args) {
		int length = 200000000;
		int[] array = new int[length];
		Random random = new Random();
		for (int i = 0; i < length; i++) {
			int x = random.nextInt(1000000000);
			array[i] = x;
		}
		long start = System.currentTimeMillis();
		int[] tmp = new int[length];
		MergeSort task = new MergeSort(array, tmp, 0, length - 1);
		// fork/join框架测试
		ForkJoinPool pool = new ForkJoinPool(8);
		pool.invoke(task);
		if (task.isCompletedNormally()) {
			System.out.println("fork/join timing: " + (System.currentTimeMillis() - start) + "ms");
		}
		// 单线程测试
		task.sort(array, tmp, 0, length-1);
		System.out.println("single thread timing: " + (System.currentTimeMillis() - start) + "ms");
	}
}

@AllArgsConstructor
class MergeSort extends RecursiveAction {
	private final int[] array;
	private final int[] tmp;
	private final int first;
	private final int last;
	
	@Override
	protected void compute() {
		// 当排序项分解成少于1000时直接执行归并排序算法
		if (last - first < 1000) {
			sort(array, tmp, first, last);
		} else {
			// 当排序项大于1000时,将数组分成两部分(由框架根据条件自动递归分解,直到项数少于1000为止)
			int middle = (first + last) / 2;
			MergeSort t1 = new MergeSort(array, tmp, first, middle);
			MergeSort t2 = new MergeSort(array, tmp, middle + 1, last);
			invokeAll(t1, t2);
			// 递归归并排序被分解的两组数字
			merge(array, tmp, first, middle + 1, last);
		}
	}
	
	public void sort(int[] array, int[] tmp, int first, int last) {
		if (first < last) {
			int middle = (first + last) / 2;
			sort(array, tmp, first, middle);
			sort(array, tmp, middle + 1, last);
			merge(array, tmp, first, middle + 1, last);
		}
	}
	
	private void merge(int[] array, int[] tmp, int leftStart, int rightStart, int rightEnd) {
		int leftEnd = rightStart - 1;
		int tmpPos = leftStart;
		int total = rightEnd - leftStart + 1;
		while (leftStart <= leftEnd && rightStart <= rightEnd) {
		    if (array[leftStart] <= array[rightStart]) {
		        tmp[tmpPos++] = array[leftStart++];
		    } else {
		        tmp[tmpPos++] = array[rightStart++];
		    }
		}
		while (leftStart <= leftEnd) {
		    tmp[tmpPos++] = array[leftStart++];
		}
		while (rightStart <= rightEnd) {
		    tmp[tmpPos++] = array[rightStart++];
		}
		for (int i = 0; i < total; i++, rightEnd--) {
		    array[rightEnd] = tmp[rightEnd];
		}
	}
}

输出:

fork/join timing: 19109ms
single thread timing: 38884ms

原理

并行度

即parallelism参数,也就是初始化ForkJoinPool时传入的参数,对应的构造函数如下:

public ForkJoinPool(int parallelism) {
	this(parallelism, defaultForkJoinWorkerThreadFactory, null, false, 0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
}

parallelism参数指定线程池中可用的最大并行线程数,基本上就是允许同时运行的线程数量,会影响任务的并行度和性能。

提高parallelism的直观预期是提高并行度,从而减少任务执行时间,但实际情况可能并非如此,原因有以下几点:

  • 任务分解粒度:如果任务过于简单,分解为多个子任务时可能不值得产生额外的线程开销。每个线程的上下文切换和管理都有成本,线程过多反而会导致性能下降;
  • 任务间依赖:如果任务之间有依赖关系,增加并行度可能导致线程等待其他线程完成,从而减少并行执行的效果;
  • 资源竞争:如果任务频繁访问共享资源(如内存、文件系统等),会导致线程间的竞争和锁争用,降低整体性能;
  • CPU核心数限制:parallelism不应超过可用CPU核心数。过多的线程会导致线程上下文切换,降低效率。理想情况下,parallelism设置为核心数的1到2倍通常能获得最佳性能;
  • 工作窃取机制:ForkJoinPool使用工作窃取算法来分配任务。增加parallelism可能会导致一些线程空闲,而其他线程忙于执行任务。

工作窃取

work-stealing,采用工作窃取算法来实现,核心是指某个线程从其他队列里窃取任务来执行。为减少线程间的竞争,会把这些子任务分别放到不同的队列里,然后为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。假设此时A线程已经把自己A队列里的所有子任务执行完毕,而B线程还没将对应的B队列里子任务执行完毕,此时A线程会去B线程的队列里窃取一个子任务来执行。在窃取操作中由于A、B线程同时访问B线程对应的子任务队列,为减少AB之间的竞争,通常使用双端队列,B永远从双端队列的头部获取任务执行,而A从尾部获取任务执行。

优势:充分利用线程进行并行计算,提高执行效率,以空间换时间。

缺点:

  1. 某些情况下存在竞争,如双端队列只剩一个任务时;
  2. 消耗更多的系统资源(创建多个线程和多个双端队列)。

核心类

  1. ForkJoinPool
    ForkJoinPool自动计算线程池参数,且提供工作窃取算法来管理这些任务。如果有线程空闲,则会从其它线程的队列尾中窃取一个任务给空闲线程来运行。ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
  2. ForkJoinTask:抽象泛型类。是Fork/Join任务的一个抽象,你需要继承此类,然后定义自己的计算逻辑。任务的创建就是通过此类中的fork()方法来实现的。这里说的任务几乎类似Thread类创建的那些普通线程,但更轻量级。因为它可以使用ForkJoinPool中少量有限的线程来管理大量的任务,所以它要比Thread类创建的线程更轻量。fork()方法异步执行任务,join方法可以一直等待到任务执行完毕。invoke()方法把fork和join两个操作合二为一成一个单独的调用。代表fork/join里面任务类型,一般用它的两个子类RecursiveTask、RecursiveAction。任务的处理逻辑包括任务的切分都集中在compute()方法里面。
  3. RecursiveTask:ForkJoinTask的子类,也是抽象泛型类。通过重载RecursiveTask类的compute方法来实现Fork-Join的逻辑。在compute方法里,要实现两件事,Fork-Join就是要先fork出RecursiveTask对象的子任务,然后将它们join在一起。有返回值。
  4. RecursiveAction:没有返回值
  5. ForkJoinPool.WorkQueue:任务队列
  6. ForkJoinWorkerThread:fork/join里面真正干活的线程,里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。依附于ForkJoinPool而存活,如果ForkJoinPool销毁,它也会跟着结束。

在这里插入图片描述

源码分析

源码基于JDK22。

任务状态

任务状态有四种:DONE(已完成,不一定是成功),ABNORMAL(不正常),和THROWN(出现异常),HAVE_EXCEPTION(异常)。对应源码如下:

static final int DONE           = 1 << 31; // must be negative
static final int ABNORMAL       = 1 << 16;
static final int THROWN         = 1 << 17;
static final int HAVE_EXCEPTION = DONE | ABNORMAL | THROWN;

fork

ForkJoinTask的fork方法源码如下:

public final ForkJoinTask<V> fork() {
	Thread t;ForkJoinWorkerThread wt;
	ForkJoinPool p; ForkJoinPool.WorkQueue q; boolean internal;
	// 先判断当前线程是否是ForkJoinWorkerThread的实例,是则将任务push到当前线程所维护的双端队列中
	if (internal = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
		q = (wt = (ForkJoinWorkerThread)t).workQueue;
		p = wt.pool;
	}
	else
		q = (p = ForkJoinPool.common).externalSubmissionQueue();
	q.push(this, p, internal);
	return this;
}

解析:调用fork方法时,会调用ForkJoinPool.WorkQueue的push方法将任务放进队列,然后立即返回结果。push方法源码如下:

final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
    int s = top, b = base, cap, m, p, room, newCap; ForkJoinTask<?>[] a;
    if ((a = array) == null || (cap = a.length) <= 0 || (room = (m = cap - 1) - (s - b)) < 0) {
        // could not resize
        if (!internal)
            unlockPhase();
        throw new RejectedExecutionException("Queue capacity exceeded");
    }
    top = s + 1;
    long pos = slotOffset(p = m & s);
    if (!internal)
        U.putReference(a, pos, task);         // inside lock
    else
        U.getAndSetReference(a, pos, task);   // fully fenced
    if (room == 0 && (newCap = cap << 1) > 0) {
        ForkJoinTask<?>[] newArray = null;
        try {                                 // resize for next time
            newArray = new ForkJoinTask<?>[newCap];
        } catch (OutOfMemoryError ex) {
        }
        if (newArray != null) {               // else throw on next push
            int newMask = newCap - 1;         // poll old, push to new
            for (int k = s, j = cap; j > 0; --j, --k) {
                ForkJoinTask<?> u;
                if ((u = (ForkJoinTask<?>)U.getAndSetReference(a, slotOffset(k & m), null)) == null)
                    break;                    // lost to pollers
                newArray[k & newMask] = u;
            }
            updateArray(newArray);            // fully fenced
        }
        a = null;                             // always signal
    }
    if (!internal)
        unlockPhase();
    if ((a == null || a[m & (s - 1)] == null) && pool != null)
        pool.signalWork(a, p);
}

push方法把当前任务存放在ForkJoinPool.WorkQueue里的ForkJoinTask<?>数组,然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。ForkJoinPool.signalWork()方法如下:

final void signalWork(ForkJoinTask<?>[] a, int k) {
	int pc = parallelism;
	for (long c = ctl;;) {
	    WorkQueue[] qs = queues;
	    long ac = (c + RC_UNIT) & RC_MASK, nc;
	    int sp = (int)c, i = sp & SMASK;
	    if (qs == null || qs.length <= i)
	        break;
	    WorkQueue w = qs[i], v = null;
	    if (sp == 0) {
	        if ((short)(c >>> TC_SHIFT) >= pc)
	            break;
	        nc = ((c + TC_UNIT) & TC_MASK);
	    }
	    else if ((short)(c >>> RC_SHIFT) >= pc || (v = w) == null)
	        break;
	    else
	        nc = (v.stackPred & LMASK) | (c & TC_MASK);
	    if (c == (c = compareAndExchangeCtl(c, nc | ac))) {
	        if (v == null)
	            createWorker();
	        else {
	            v.phase = sp;
	            if (v.parking != 0)
	                U.unpark(v.owner);
	        }
	        break;
	    }
	    if (a != null && k >= 0 && k < a.length && a[k] == null)
	        break;
	}
}

join

ForkJoinTask的join方法源码如下:

public final V join() {
	int s;
	if ((((s = status) < 0 ? s : awaitDone(false, 0L)) & ABNORMAL) != 0)
		reportException(false);
	return getRawResult();
}

就一行代码,如果状态不为0,表示异常,调用方法reportException。awaitDone表示阻塞当前线程并等待获取结果。

private int awaitDone(boolean interruptible, long deadline) {
	ForkJoinWorkerThread wt; ForkJoinPool p; ForkJoinPool.WorkQueue q;
	Thread t; boolean internal; int s;
	if (internal = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
		p = (wt = (ForkJoinWorkerThread)t).pool;
		q = wt.workQueue;
	}
	else
		q = ForkJoinPool.externalQueue(p = ForkJoinPool.common);
	return (((s = (p == null) ? 0 :
	          ((this instanceof CountedCompleter) ?
	           p.helpComplete(this, q, internal) :
	           (this instanceof InterruptibleTask) && !internal ? status :
	           p.helpJoin(this, q, internal))) < 0)) ? s :
	    awaitDone(internal ? p : null, s, interruptible, deadline);
}

ForkJoinPool的helpComplete和helpJoin两个方法可以看到工作窃取算法的思想,源码优点长,略。大致思路是通过for循环加if条件判断来分担任务。

异常处理

ForkJoinTask在执行时可能会抛出异常,但是没办法在主线程里直接捕获异常,ForkJoinTask提供isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消,可通过ForkJoinTask.getException()方法获取异常:

if(task.isCompletedAbnormally()) {
	System.out.println(task.getException());
}

getException方法返回Throwable对象,如果任务被取消则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

适用场景

分治法非常适合解决以下问题:

  • 二分搜索
  • 大整数乘法
  • Strassen矩阵乘法
  • 棋盘覆盖
  • 合并排序
  • 快速排序
  • 线性时间选择
  • 汉诺塔

在真实业务开发中,也有很多场景:

  • 做报表导出时,大量数据的导出处理;
  • 做BI时,大量的数据迁移清洗作业等。

参考

  • fork/join全面剖析
  • how-to-specify-forkjoinpool-for-java-8-parallel-stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2214519.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu系衍生版手动修改配置网卡的配置总结

一、Ubuntu系的IP地址配置文件的目录&#xff1a; sudo vim /etc/network/interfaces 二、以DHCP方式配置网卡&#xff1a; 在以上配置文件中添加以下两行&#xff1a; auto enp3s0 iface enp3s0 inet dhcp 三、为网卡配置静态IP地址&#xff1a; 在以上配置文件中添…

实验3,网络地址转换

实验3&#xff1a;网络地址转换 实验目的及要求&#xff1a; 通过实验&#xff0c;掌握NAT技术的工作原理&#xff0c;了解三种不同类型NAT技术的主要作用以及各自的主要应用环境。能够完成静态NAT和复用NAT技术的应用&#xff0c;并熟练掌握NAT技术相关的配置命令。 实验设…

el-date-picker选择时间后标准时间少1小时问题

问题 前端开发中发现Element的时间组件el-date-picker在选择选择部分时间后js对象的标准时间少1小时&#xff0c;如果选择的小时为0&#xff0c;会导致部分转换条件下结果少1天。 比如组件中选择的本地时间为&#xff1a; 1988-08-01 00:00:00 而js对象获取到是标准时间是&am…

ubuntu 安装kali命令补全功能

输入命令时&#xff0c;之前的命令会以阴影显示&#xff0c;按下右键或 Tab 键可以直接补全 安装zsh-autosuggestions sudo apt install zsh-autosuggestions编辑 ~/.zshrc环境变量 if [ -f /usr/share/zsh-autosuggestions/zsh-autosuggestions.zsh ]; then. /usr/share/zs…

【从零开始的LeetCode-算法】945. 使数组唯一的最小增量

给你一个整数数组 nums 。每次 move 操作将会选择任意一个满足 0 < i < nums.length 的下标 i&#xff0c;并将 nums[i] 递增 1。 返回使 nums 中的每个值都变成唯一的所需要的最少操作次数。 生成的测试用例保证答案在 32 位整数范围内。 示例 1&#xff1a; 输入&am…

【Hadoop】HDFS基本操作

参考&#xff1a;3.HDFS基本操作_哔哩哔哩_bilibili 创建目录 hadoop fs -mkdir -p /training/qiang查看当前根目录下文件 hadoop fs -ls /hadoop fs -ls /training/目录授权 hadoop fs -chmod -R 777 /training/qm777是最大权限&#xff0c;读写 4、2、1 上传文件 先创…

YZ系列工具之YZ09:VBA_Excel之读心术

我给VBA下的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的工作效率&#xff0c;而且可以提高数据的准确度。我的教程一共九套一部VBA手册&#xff0c;教程分为初级、中级、高级三大部分。是对VBA的系统讲解&#xff0c;从简单的…

Python自动化脚本裁剪图片为1:1比例

一、创建输入文件夹&#xff08;in&#xff09;和输出文件夹&#xff08;out&#xff09;&#xff0c;将原始图片放在输入文件夹&#xff08;in&#xff09;里 二、 安装对应的Python库 pip install Pillow 三、编写自动化脚本代码 import os from PIL import Imagedef crop…

Axure RP电商系统商城PC+app+后台买家卖端高保真原型模板及元件库

AxureRP电商商城PCapp后台买家卖端高保真原型模板本套包含三份原型图素材 APP买家端原型简介&#xff1a; 包含了用户中心、会员成长、优惠券、积分、互动社区、运营推广、内容推荐、商品展示、订单流程、订单管理、售后及服务等完整的电商体系功能架构和业务流程。 本模板由…

Spring中ApplicationEvent事件的实现

1&#xff09;简介 Spring Event (接口名为Aplication Event) 观察者设计模式&#xff0c;由事件发起者publisher发布事件&#xff08;指定事件名&#xff09;&#xff0c;事件监听者监听事件&#xff08;指定事件名&#xff09;。 好比&#xff0c; A 说了一句话&#xff0c…

安卓14无法安装应用解决历程

客户手机基本情况&#xff1a; 安卓14&#xff0c;对应的 targetSdkVersion 34 前天遇到了安卓14适配问题&#xff0c;客户发来的截图是这样的 描述&#xff1a;无法安装我们公司的B应用。 型号&#xff1a;三星google美版 解决步骤&#xff1a; 1、寻找其他安卓14手机测试…

从源码到平台:使用视频美颜SDK构建高性能直播美颜系统详解

本文将深入探讨如何从源码出发&#xff0c;借助视频美颜SDK构建一套高性能的直播美颜系统&#xff0c;涵盖技术架构、核心功能的实现以及性能优化等方面的详解。 一、视频美颜SDK的作用与选择 视频美颜SDK是开发直播美颜系统的基础&#xff0c;它能够提供实时美颜处理功能&am…

STM32的GPIO片上外设

一、STM32的片上外设 片上外设是集成在 MCU 芯片内部的硬件模块&#xff0c;它们通过片内总线与 CPU 直接通信&#xff0c;而不是通过外部引脚或接口连接的设备。 片上外设&#xff1a; 包括 UART、I2C、SPI、ADC、定时器、GPIO、DMA、RTC 等集成在 MCU 内部的模块。这些外设…

《Linux从小白到高手》综合应用篇:深入理解Linux磁盘及IO优化

1. 前言 其实磁盘优化和IO优化&#xff0c;我在前面的其他Linux调优博文中已经讲述过或者涉及过了&#xff0c;但是太过零碎&#xff0c;所以本篇就来集中深入讨论下Linux磁盘和IO调优。 2.磁盘调优 结合我多年的经验&#xff0c;本人认为磁盘调优最重要的是读写性能的提升和…

Unity 实战案例全解析 实现时间停止效果+世界变灰

画面里运动的那个小玩意这么写 using System.Collections; using System.Collections.Generic; using UnityEngine;public class Partol : MonoBehaviour {public Transform pos1;public Transform pos2;public float speed;private Transform target;void Start() {target p…

SpringBoot智能推荐:健康生活新选择

3系统分析 3.1可行性分析 通过对本基于智能推荐的卫生健康系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本基于智能推荐的卫生健康系统采用SSM框架&#…

二叉查找树(Binary Search Tree)Java语言实现

一、二叉查找树 二叉查找树&#xff08;Binary Search Tree&#xff09;&#xff0c;也称为二叉搜索树、有序二叉树&#xff08;Ordered Binary Tree&#xff09;或排序二叉树&#xff08;Sorted Binary Tree&#xff09;。 是指一棵空树或者具有下列性质的二叉树&#xff1a…

Unity DOTS中的Archetype与Chunk

Unity DOTS中的Archetype与Chunk 在Unity中&#xff0c;archetype&#xff08;原型&#xff09;用来表示一个world里具有相同component类型组合的entity。也就是说&#xff0c;相同component类型的entity在Unity内部会存储到一起&#xff0c;共享同一个archetype。 使用这样的设…

JAVA毕业设计191—基于Java+Springboot+vue的电子产品商城管理系统(源代码+数据库)

毕设所有选题&#xff1a; https://blog.csdn.net/2303_76227485/article/details/131104075 基于JavaSpringbootvue的电子产品商城管理系统(源代码数据库)191 一、系统介绍 本项目前后端不分离&#xff0c;分为用户、管理员两种角色 1、用户&#xff1a; 注册、登录、商品…

C++在vscode中的code runner配置/环境配置

C在vscode中快捷运行&#xff08;code runner&#xff09; 一、配置tasks.json 在vscode中创建文件夹或打开文件夹&#xff0c;会发现文件夹下多了一个.vscode文件夹&#xff0c;在该文件夹下创建tasks.json文件&#xff0c;并添加一下内容 {"version": "2.0…