ForkJoin框架与工作窃取算法详解

news2024/10/6 18:30:52

文章目录

  • 一、ForkJoin框架概述
    • 1_核心概念
    • 2_主要类和方法
      • 1_ForkJoinPool
      • 2_ForkJoinTask
  • 二、启用异步模式与否的区别
  • 三、ForkJoinPool的三种任务提交方式
  • 四、执行逻辑及使用示例
    • 1_示例:并行计算数组元素和
    • 2_`forkJoinPool.submit`
    • 3_`ForkJoinTask<?>`中任务的执行逻辑
    • 4_fork\join中的逻辑
    • 5_窃取任务的实现
  • 五、线程池的大小对性能的影响
  • 六、ForkJoin与传统线程池的比较
  • 七、同步与异步线程池
    • 1_Executors.newWorkStealingPool
    • 2_ForkJoinPool.commonPool
    • 3_对比与总结
      • 1_共同点
      • 2_不同点
      • 3_窃取工作进行计算
  • 八、总结

在现代计算中,高效并行处理是提升应用程序性能的关键之一。Java 7 引入的 ForkJoinPool 框架是实现高并行度和递归任务处理的强大工具。本文将详细介绍 ForkJoinPool 框架及其核心的工作窃取算法。


一、ForkJoin框架概述

ForkJoinPool 是 Java 并发包中的并行计算框架,旨在有效执行递归任务和大规模并行计算。其核心思想是通过任务拆分(fork)和任务合并(join)来实现高并行度,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算。并利用工作窃取算法来平衡负载。

所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解。

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。

ForkJoinPool是ForkJoin框架执行任务的核心组件。它是工作窃取算法的执行地,负责管理工作线程和提供任务执行环境。

1_核心概念

  1. ForkJoinTask

    • ForkJoinTask 是一个抽象类,表示一个可以并行执行的任务,有两个子类:
      • RecursiveTask<V>:有返回值的任务。
      • RecursiveAction:没有返回值的任务。
    • 也就是说提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值)
  2. 工作窃取算法

    • 每个工作线程维护一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
    • 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
    • 当一个线程的任务队列为空时或当前任务(join),它可以从其他忙碌线程的队列“窃取”任务来执行,极大提高了并行计算的效率。
    • 默认情况下,从自己队列中获取任务执行和其他线程窃取自己队列中的任务执行是从队列相反的两端(头部、尾部)来进行的,极大的减少了竞争的情况出现。
      在这里插入图片描述

2_主要类和方法

1_ForkJoinPool

ForkJoinPool 是核心类,负责管理和调度 ForkJoinTask 任务。

  • 构造函数
    Fork/Join 默认会创建与 cpu 核心数大小相同的线程池,也就是第一个构造方法。

    ForkJoinPool()
    ForkJoinPool(int parallelism)
    ForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode)
    
    • parallelism:并行度,通常设置为可用处理器的数量。
    • factory:用于创建工作线程的工厂。
    • handler:未捕获异常处理器。
    • asyncMode:是否启用异步模型。
  • 常用方法

    <T> T invoke(ForkJoinTask<T> task)
    void execute(ForkJoinTask<?> task)
    <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
    

2_ForkJoinTask

ForkJoinTask 是一个抽象类,表示一个可以并行执行的任务。

  • 常用方法
    final ForkJoinTask<V> fork() // 异步执行任务 开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。
    final V join() // 等待任务完成并获取结果
    final V invoke() // 同步执行任务并返回结果
    final V get() // 获取任务结果(可能阻塞)
    

这里并不会每个 fork 都会创建新线程, 也不是每个 join 都会造成线程被阻塞, 而是采取了 work-stealing 原理


二、启用异步模式与否的区别

ForkJoinPool 构造函数中的 asyncMode 参数决定线程池是否启用异步模型。这一选项主要影响本地任务的调度和执行策略,窃取任务无论在哪种模式下都是 FIFO 是没有区别的,也就是从其他线程的工作队列头部窃取任务,先创建的任务会被优先处理。

  • 不启用异步模型 (asyncMode = false) 默认:

    • 工作线程从自己队列的尾部弹出(pop)任务(LIFO)执行,等同于”栈操作”,后续创建的任务会被优先处理,适合需要深度优先执行的任务。
    • 更快地触及递归的底部,使得子任务尽快完成和合并,适合小规模和短生命周期的任务。
      在这里插入图片描述
  • 启用异步模型 (asyncMode = true)

    • 工作线程从自己队列的头部弹出(poll)任务(FIFO)执行,等同于”队列操作”,先创建的任务会被优先处理,适合需要广度优先执行的任务。
    • 降低某些情况下的任务窃取成本,更适合较大批量和较长生命周期的任务。

根据 JDK 官方文档,将 asyncMode 设置为 true从未加入的分叉任务建立本地先进先出的调度模式。在工作线程仅处理事件样式异步任务的应用程序中,这个模式可能比默认的基于本地堆栈的模式更合适。

在这里插入图片描述

asyncMode ? FIFO_QUEUE : LIFO_QUEUE,

可以从图中看到 asyncMode 参数的值决定了使用的队列。

根据Doug Lea的说法,FIFO 方法比 LIFO 具有以下优势:

Doug Lea 是 JDK7 中 fork/join 框架的实现者,也是 JDK 中 JUC 的核心开发者,真 Java 大神。

  • 它通过让"窃贼"作为所有者在 deque 的另一侧操作来减少竞争。
  • 它利用了早期生成“大型”任务的递归分治算法的性质。

上面的第二点意味着可以通过一个被盗任务的线程可以进一步分解旧的被盗任务。

三、ForkJoinPool的三种任务提交方式

ForkJoinPool 提供了三种提交任务的方式:invokeexecutesubmit。这三种方式都利用了工作窃取算法来优化任务调度和执行。

  1. invoke 方法

    • 同步执行任务,并返回其结果。
    • 阻塞调用线程,直到任务完成。
    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(array, 0, array.length);
    Integer result = pool.invoke(task);
    
  2. execute 方法

    • 异步执行任务,不返回结果。
    • 常用于提交不需要返回结果的任务。
    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(array, 0, array.length);
    pool.execute(task);
    
  3. submit 方法

    • 异步执行任务,并返回一个 Future 对象,用于获取任务结果或检查任务状态。
    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(array, 0, array.length);
    Future<Integer> future = pool.submit(task);
    Integer result = future.get(); // 阻塞,直到任务完成
    

四、执行逻辑及使用示例

1_示例:并行计算数组元素和

下面是一个使用 ForkJoinPool 进行并行计算的示例,我们将实现一个计算数组元素和的任务,并展示如何使用 RecursiveTask 来实现任务拆分和合并。

import java.util.concurrent.*;

public class ForkJoinPoolExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        int[] array = new int[100];
        for (int i = 0; i < array.length; i++) {
            array[i] = i + 1; // 初始化数组
        }

        SumTask task = new SumTask(array, 0, array.length);
        Future<Integer> result = forkJoinPool.submit(task);

        System.out.println("Sum: " + result.get());

        forkJoinPool.shutdown();
    }

    static class SumTask extends RecursiveTask<Integer> {
        private static final int THRESHOLD = 10; // 阈值
        private int[] array;
        private int start;
        private int end;

        SumTask(int[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            if (end - start <= THRESHOLD) {
                int sum = 0;
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
                return sum;
            } else {
                int mid = (start + end) / 2;
                SumTask leftTask = new SumTask(array, start, mid);
                SumTask rightTask = new SumTask(array, mid, end);
                
                leftTask.fork(); // 异步执行左任务
                int rightResult = rightTask.compute(); // 当前线程执行右任务
                int leftResult = leftTask.join(); // 等待左任务完成并获取结果
                
                return leftResult + rightResult;
            }
        }
    }
}

2_forkJoinPool.submit

提交任务后的执行流程分析:

1、可以看到submit方法进行校验后,会执行到externalPush中,真正处理的方法是externalPush

在这里插入图片描述

2、externalPush执行流程分析如下:

  1. 判断工作队列数组是否为空

  2. 通过按位与得到当前任务分配的工作队列

  3. 尝试通过CAS锁定该队列

  4. 判断工作队列中的任务数组是否为空

  5. 释放锁

  6. 提交任务(如果前面的判断满足会直接返回,不会执行到这里)

final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws; WorkQueue q; int m;
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&// 判断工作队列数组是否为空
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&// 通过按位与得到当前任务分配的工作队列
        U.compareAndSwapInt(q, QLOCK, 0, 1)) { // 尝试通过CAS锁定该队列
        ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null &&// 判断工作队列中的任务数组是否为空
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {// 尝试添加任务
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            U.putIntVolatile(q, QLOCK, 0);
            if (n <= 1)// 如果就只有一个任务
                signalWork(ws, q);// 调用单个任务执行
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);// 释放锁
    }
    externalSubmit(task); // 提交任务
}

在这里插入图片描述

根据 debug 由于我们是第一次执行任务提交,workQueues参数为 null,所以直接进入到externalSubmit方法中。

3、externalSubmit(task);方法流程分析。主要流程是在for (;;) { 中分三次循环完成的。

  1. 创建工作队列数组。workQueues = new WorkQueue[n];
  2. 创建一个具体的队列 q = new WorkQueue(this, null); ws[k] = q;
  3. 大任务存放到队列中 U.putOrderedObject(a, j, task);
  4. signalWork(ws, q); 这个方法有两个参数,一个是工作队列数组,还有一个是工作队列。
private void externalSubmit(ForkJoinTask<?> task) {
    int r;                                    // initialize caller's probe
    if ((r = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();
        r = ThreadLocalRandom.getProbe();
    }
    for (;;) {//死循环中
        WorkQueue[] ws; WorkQueue q; int rs, m, k;
        boolean move = false;
        if ((rs = runState) < 0) {
            tryTerminate(false, false);     // help terminate
            throw new RejectedExecutionException();
        }
        else if ((rs & STARTED) == 0 ||     // initialize
                 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            rs = lockRunState();
            try {
                if ((rs & STARTED) == 0) {
                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                                               new AtomicLong());
                    // create workQueues array with size a power of two
                    int p = config & SMASK; // ensure at least 2 slots
                    int n = (p > 1) ? p - 1 : 1;
                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                    workQueues = new WorkQueue[n];//1.创建了一个工作队列数组 此时还没有东西
                    ns = STARTED;
                }
            } finally {
                unlockRunState(rs, (rs & ~RSLOCK) | ns);
            }
        }
        else if ((q = ws[k = r & m & SQMASK]) != null) {
            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                ForkJoinTask<?>[] a = q.array;
                int s = q.top;
                boolean submitted = false; // initial submission or resizing
                try {                      // locked version of push
                    if ((a != null && a.length > s + 1 - q.base) ||
                        (a = q.growArray()) != null) {
                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                        U.putOrderedObject(a, j, task);//3.将任务存到队列中的位置
                        U.putOrderedInt(q, QTOP, s + 1);
                        submitted = true;
                    }
                } finally {
                    U.compareAndSwapInt(q, QLOCK, 1, 0);
                }
                if (submitted) {
                    signalWork(ws, q);
                    return;
                }
            }
            move = true;                   // move on failure
        }
        else if (((rs = runState) & RSLOCK) == 0) { // create new queue
            q = new WorkQueue(this, null);//2.构造出一个队列
            q.hint = r;
            q.config = k | SHARED_QUEUE;
            q.scanState = INACTIVE;
            rs = lockRunState();           // publish index
            if (rs > 0 &&  (ws = workQueues) != null &&
                k < ws.length && ws[k] == null)
                ws[k] = q; 			//2.将队列存到某一个位置                // else terminated
            unlockRunState(rs, rs & ~RSLOCK);
        }
        else
            move = true;                   // move if busy
        if (move)
            r = ThreadLocalRandom.advanceProbe(r);
    }
}

4、signalWork(ws, q);之后的流程分析

  1. signalWork中有一个主要的方法就是tryAddWorker

    在这里插入图片描述

  2. 而在tryAddWorker中又主要看createWorker

    在这里插入图片描述

  3. createWorker中利用 ForkJoinWorkerThreadFactory 的方法新建了一个线程并启动,然后执行任务。

    private boolean createWorker() {
    	ForkJoinWorkerThreadFactory fac = factory;
    	Throwable ex = null;
    	ForkJoinWorkerThread wt = null;
     	try {
        	if (fac != null && (wt = fac.newThread(this)) != null) {//新建一个线程
            	wt.start();//启动线程
           		 return true;
        	}
    	} catch (Throwable rex) {
        	ex = rex;
     }
    	deregisterWorker(wt, ex);
    	return false;
    }
    
  4. 新建的线程是ForkJoinWorkerThread是 Thread 的子类。

    public static interface ForkJoinWorkerThreadFactory {
        /**
         * Returns a new worker thread operating in the given pool.
         *
         * @param pool the pool this thread works in
         * @return the new worker thread
         * @throws NullPointerException if the pool is null
         */
        public ForkJoinWorkerThread newThread(ForkJoinPool pool);
    }
    public class ForkJoinWorkerThread extends Thread//Thread 的子类
    

到这里forkJoinPool.submit提交任务后的流程就结束了,主要做的事情就是将task任务放入到创建的workQueue中,并创建一个线程执行任务,此时也会将队列和工作线程相关联,剩下执行任务的逻辑就属于ForkJoinTask<?>的了。

在这里插入图片描述

3_ForkJoinTask<?>中任务的执行逻辑

经过上面的分析我们可以知道,最后新建了一个ForkJoinWorkerThread线程,然后调用了start方法。那么我们看看这个线程类的run方法吧~

在这里插入图片描述

1、在run方法中我们可以看到主要的代码逻辑在pool.runWorker(workQueue);中,ForkJoinPool执行任务的过程如下:

  1. 判断工作队列是否需要扩容

  2. 阻塞调用对应WorkQueue的runTask方法

  3. 如果无法获取到执行资源,就等待任务调用

final void runWorker(WorkQueue w) {
    w.growArray();                   // allocate queue
    int seed = w.hint;               // initially holds randomization hint
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
    for (ForkJoinTask<?> t;;) {
        if ((t = scan(w, r)) != null)
            w.runTask(t);
        else if (!awaitWork(w, r))
            break;
        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
    }
}

2、WorkQueue的rukTask

  1. 标记当前状态为忙碌

  2. 调用当前任务执行

  3. 继续执行当前队列中等待执行的任务

  4. 尝试从其他WorkQueue中窃取任务执行

final void runTask(ForkJoinTask<?> task) {
            if (task != null) {
                scanState &= ~SCANNING; // mark as busy
                (currentSteal = task).doExec();//执行任务
                U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                execLocalTasks();// *** 重点
                ForkJoinWorkerThread thread = owner;
                if (++nsteals < 0)      // collect on overflow
                    transferStealCount(pool);
                scanState |= SCANNING;
                if (thread != null)
                    thread.afterTopLevelExec();
            }
        }

注意:doExec()方法无论在runTask还是execLocalTasks中都是真正执行任务的方法,最终会调用ForkJoinTask<?>实现类的compute()方法。

final void execLocalTasks() {
            int b = base, m, s;
            ForkJoinTask<?>[] a = array;
            if (b - (s = top - 1) <= 0 && a != null &&
                (m = a.length - 1) >= 0) {
                if ((config & FIFO_QUEUE) == 0) {//由asyncMode决定
                    for (ForkJoinTask<?> t;;) {
                        if ((t = (ForkJoinTask<?>)U.getAndSetObject
                             (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
                            break;
                        U.putOrderedInt(this, QTOP, s);
                        t.doExec();//执行任务
                        if (base - (s = top - 1) > 0)
                            break;
                    }
                }
                else
                    pollAndExecAll();
            }
        }

4_fork\join中的逻辑

我们知道,如果在compute中使用fork会将大任务拆分成较小的任务来执行。那么流程是怎么样的呢?

1、将当前线程的工作队列取到,并将拆分的小任务放入到当前线程的工作队列中。

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)//判断
        ((ForkJoinWorkerThread)t).workQueue.push(this);//this是拆分的小任务
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

2、注意看放入队列的push方法,其中最主要的是p.signalWork(p.workQueues, this);这行代码。

final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);//这里的this是workQueue
        }
        else if (n >= m)
            growArray();
    }
}

3、根据之前的分析流程(2_forkJoinPool.submit4、signalWork(ws, q))可知,signalWork 方法会新创建一个ForkJoinWorkerThread去执行任务。

4、总结,fork会将当前拆分的任务放入当前线程的工作队列中,并且还会创建新的线程(在没达到设定的上限时)执行任务,由于新建的线程任务队列中为空,多半会去窃取当前线程工作队列中的任务。之后将循环这个流程。

大致流程如下所示:

在这里插入图片描述

5、当 fork执行完了之后到达图中箭头位置的点,我们发现 线程0、线程1 阻塞住,但是任务4、任务5、任务6…这些拆分的小任务都还没有执行,现在没有线程执行任务那么怎么办呢?这就要我们去了解join方法中的逻辑了。

在这里插入图片描述

注意:这里的join方法与线程中的是不一样的。这里的 join 方法主要是调用 doJoin 方法:

  • status表示任务的状态,如果小于 0 表示任务已经结束了。
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?//判断当前线程是否是工作线程
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}

6、那么这个方法主要是干了啥呢?主要去看awaitJoin方法。

从下面源码发现,主要部分还是一个死循环。

  1. 首先还是检查当前任务是否执行完: if ((s = task.status) < 0)
  2. 然后可以发现如果当前任务没有执行完则可能会进入helpStealer(w, task)helpComplete(w, cc, 0)
    • helpStealer:先定位的偷取者的任务队列;从偷取者的base索引开始,每次偷取一个任务执行。
    • tryCompensate: tryCompensate主要用来补偿工作线程因为阻塞而导致的算力损失,当工作线程自身的队列不为空,且还有其它空闲工作线程时,如果自己阻塞了,则在此之前会唤醒一个工作线程。
  3. 根据这两个方法的方法名可以大致知晓:这两个方法是让当前线程帮助其他线程执行任务。
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
    int s = 0;
    if (task != null && w != null) {
        ForkJoinTask<?> prevJoin = w.currentJoin;   // 获取给定Worker的join任务
        U.putOrderedObject(w, QCURRENTJOIN, task);  // 把currentJoin替换为给定任务
        
        // 判断是否为CountedCompleter类型的任务
        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
            (CountedCompleter<?>) task : null;
        for (; ; ) {
            if ((s = task.status) < 0)              // 已经完成|取消|异常 跳出循环
                break;

            if (cc != null)                         // CountedCompleter任务由helpComplete来完成join
                helpComplete(w, cc, 0);
            else if (w.base == w.top || w.tryRemoveAndExec(task))  //尝试执行
                helpStealer(w, task);               // 队列为空或执行失败,任务可能被偷,帮助偷取者执行该任务

            if ((s = task.status) < 0)              // 已经完成|取消|异常,跳出循环
                break;
            
            // 计算任务等待时间
            long ms, ns;
            if (deadline == 0L)
                ms = 0L;
            else if ((ns = deadline - System.nanoTime()) <= 0L)
                break;
            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                ms = 1L;

            if (tryCompensate(w)) {                         // 执行补偿操作
                task.internalWait(ms);                      // 补偿执行成功,任务等待指定时间
                U.getAndAddLong(this, CTL, AC_UNIT);     // 更新活跃线程数
            }
        }
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);      // 循环结束,替换为原来的join任务
    }
    return s;
}

总结:假设当前线程是 线程0 ,如上图执行到 任务1.join。当前任务1没有执行完,但是线程会帮助其他线程执行拆分后的小任务,不断循环,直到任务1执行完成 也就是status<0。

在这里插入图片描述

5_窃取任务的实现

final class WorkQueue {

    // 省略其他代码...

    // 偷取任务的方法
    final ForkJoinTask<?> pollAt(int b) {
        ForkJoinTask<?>[] a; int i; ForkJoinTask<?> t;
        if ((a = array) != null && (i = a.length - 1) >= 0 &&
            (t = a[b & i]) != null && t != a[b & (i >>>= 1)]) {
            if (t instanceof CountedCompleter) // 偷取CountedCompleter任务
                ((CountedCompleter<?>)t).propagateCompletion();
            return t;
        }
        return null;
    }

    // 从其他队列中偷取任务
    final ForkJoinTask<?> poll() {
        WorkQueue[] ws; int m;
        int r = ThreadLocalRandom.current().nextInt();
        if ((ws = pool.workQueues) != null && (m = ws.length - 1) >= 0 &&
            (ws = ws[m & r & SQMASK]) != null) {
            int j = r & SMASK; // 随机选择一个队列进行偷取
            WorkQueue w; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
            if ((w = ws[j]) != null && w.base - w.top < 0 &&
                (a = w.array) != null) { // 如果队列不为空,且还有任务未执行
                for (int b = r;;) { // 循环偷取任务
                    int n = a.length;
                    if (n >= NCPU || (t = w.pollAt(b & (n - 1))) == null) // 如果当前队列中没有可偷取的任务
                        break;
                    if (w.base == (b + 1)) // 如果队列被其他线程修改
                        return null;
                    if (UNSAFE.compareAndSwapObject(a, ((n - 1) & b) << ASHIFT, t, null))
                        return t;
                    if (n <= 1 || w.currentSteal != b)
                        break;
                }
            }
        }
        return null;
    }

    // 省略其他代码...
}

五、线程池的大小对性能的影响

线程池的大小直接影响到程序的性能。在ForkJoin框架中,理想的情况是线程数等于处理器核心数。

int processors = Runtime.getRuntime().availableProcessors();
ForkJoinPool pool = new ForkJoinPool(processors);

通常来讲,对于计算密集型任务,设置线程池大小为处理器核心数可获得最佳性能。对于包含IO操作或是响应中断的任务,可能需要更多的线程来维持CPU的利用率。

六、ForkJoin与传统线程池的比较

传统的线程池如Executors.newFixedThreadPool()对所有任务采用一个共享的工作队列,而ForkJoin采用工作窃取算法和每个线程一个工作队列的设计,这提高了处理并行任务的效率。

在这里插入图片描述

// 传统线程池使用示例
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.submit(new Task());

// ForkJoin线程池使用示例
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(new ForkJoinTask<Void>() { /* ... */ });

对于计算密集型任务,尤其是可以进行合理拆分的任务,ForkJoin往往比传统线程池表现更好。

七、同步与异步线程池

ForkJoinPool 除了直接使用 new关键字直接创建,还可以使用 ForkJoinPool.commonPoolExecutors.newWorkStealingPool 的方式获得 ForkJoinPool 对象。

1_Executors.newWorkStealingPool

newWorkStealingPool 简单翻译是任务窃取线程池。

newWorkStealingPool 是Java8添加的线程池。和别的Executors创建的其他4种不同,它用的是ForkJoinPool

使用ForkJoinPool的好处是,把1个任务拆分成多个“小任务”,把这些“小任务”分发到多个线程上执行。这些“小任务”都执行完成后,再将结果合并。

之前的线程池中,多个线程共有一个阻塞队列,而newWorkStealingPool 中每一个线程都有一个自己的队列。

当线程发现自己的队列没有任务了,就会到别的线程的队列里窃取任务执行。

一般是自己的本地队列采取LIFO(后进先出),窃取时采用FIFO(先进先出),一个从尾部开始执行,一个从头开始执行,由于偷取的动作十分快速,会大量降低这种冲突,也是一种优化方式。

它有2种实现,无参:

public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

有参:

就一个参数parallelism,可以自定义并行度。

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

特点

  1. 独立实例:每次调用 newWorkStealingPool 都会创建一个新的 ForkJoinPool 实例,不是共享的单例。
  2. 默认线程数量:默认情况下,线程池的并行度是 CPU 内核数,可以通过传入一个整数参数来指定并行度。
  3. 灵活性高:适用于需要单独管理并行度或者需要隔离不同并行任务的应用场景。

根据上面的分析我们可以看到,无论是无参构造还是有参,asyncMode 的参数都是true。这证实了我们将使用 FIFO 的队列配置。

2_ForkJoinPool.commonPool

ForkJoinPool.commonPool 是一个全局共享的 ForkJoinPool 实例(内部 static final ForkJoinPool common的变量),Java 系统在启动时就会初始化这个线程池。

特点

  1. 全局共享:commonPool 是一个单例,适用于应用程序中的并行计算任务。
  2. 默认线程数量:默认情况下,线程池的并行度(parallelism level)是 CPU 内核数减一。可以通过设置系统属性 java.util.concurrent.ForkJoinPool.common.parallelism 来修改默认并行度。
  3. 适用范围广:适用于相对轻量级的并行任务,比如并行流操作、递归任务等。

类初始化时会使用 makeCommonPool() 方法中完成,为 ForkJoinPool 中的 common 参数赋值。该方法根据可用的处理器数量计算并行度,并创建一个新的 ForkJoinPool 实例。

private static ForkJoinPool makeCommonPool() {

        final ForkJoinWorkerThreadFactory commonPoolForkJoinWorkerThreadFactory =
                new CommonPoolForkJoinWorkerThreadFactory();
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = commonPoolForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)//这一步会将并行度设置为 CPU 内核数减一
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,//使用LIFO的队列模式
                                "ForkJoinPool.commonPool-worker-");
    }

ForkJoinPool.commonPool使用后进先出(LIFO)队列配置,而Executors.newWorkStealingPool使用先进先出的方法(FIFO)配置。

3_对比与总结

1_共同点

  1. 工作窃取算法:二者都使用工作窃取算法来优化并行任务处理。
  2. 任务类型:都适用于 ForkJoinTask 及其子类,如 RecursiveTaskRecursiveAction

2_不同点

  1. 实例共享

    • ForkJoinPool.commonPool 是全局共享单例,适合对所有并行计算任务使用统一的线程池。
    • Executors.newWorkStealingPool 每次调用都会创建一个新的实例,适合需要单独管理不同任务群或不同并行度的场景。
  2. 并行度设置

    • ForkJoinPool.commonPool 的并行度可以通过系统属性来更改,但所有使用 commonPool 的任务共享同一个并行度配置。
    • Executors.newWorkStealingPool 可以灵活指定并行度,每个创建的线程池实例可以有不同的并行度。
  3. 使用场景

    • ForkJoinPool.commonPool 适合全局共享并行线程池的轻量并行任务,比如并行流操作。
    • Executors.newWorkStealingPool 适合需要隔离不同任务群的场景,例如不同模块有不同的并行处理需求,或者某些任务需要特定的并行度配置。

选择使用哪种线程池应根据具体的应用场景和需要,并考虑共享性、灵活性和并行度的需求。通过合理选择适合的线程池,可以有效提升应用程序的并行处理能力和整体性能。

3_窃取工作进行计算

有了同步线程池,ForkJoinPool.commonPool只要任务仍在进行中,就会将线程放入池中。

因此,工作窃取的程度并不取决于任务粒度水平。

异步Executors.newWorkStealingPool的管理更强,允许工作窃取级别取决于任务粒度级别。

我们使用ForkJoinPool类的 getStealCount获得工作窃取级别:

long steals = forkJoinPool.getStealCount();

确定Executors.newWorkStealingPoolForkJoinPool.commonPool的工作窃取计数给我们带来了不同的行为:

Executors.newWorkStealingPool ->
Granularity: [1], Steals: [6564]
Granularity: [10], Steals: [572]
Granularity: [100], Steals: [56]
Granularity: [1000], Steals: [60]
Granularity: [10000], Steals: [1]

ForkJoinPool.commonPool ->
Granularity: [1], Steals: [6923]
Granularity: [10], Steals: [7540]
Granularity: [100], Steals: [7605]
Granularity: [1000], Steals: [7681]
Granularity: [10000], Steals: [7681]

Executors.newWorkStealingPool的粒度从精细变为粗(1到10,000)时,工作窃取水平就会降低。

因此,当任务没有分解时,窃取计数为1(粒度[granularity]为10000)。

ForkJoinPool.commonPool有不同的行为。

工作窃取水平总是很高,受到任务粒度变化的影响并不大。

从技术上讲,我们的素数示例支持异步处理事件式任务。

这是因为我们的实现不会强制结果的加入。

可以证明,Executors.newWorkStealingPool在解决问题时提供了最佳资源利用。

八、总结

ForkJoinPool 通过任务拆分和工作窃取机制,实现了高效的并行计算,非常适合需要递归计算和大规模并行处理的场景。通过合理选择是否启用异步模式,以及合适的任务提交方式,ForkJoinPool 能显著提升应用程序的并行处理能力和整体性能。

希望本文能帮助您更好地理解 ForkJoinPool 框架及其工作窃取算法,并能在实际项目中有效应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1871088.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

支持向量回归原理详解及Python代码示例

支持向量回归原理详解 支持向量回归&#xff08;Support Vector Regression, SVR&#xff09;是支持向量机&#xff08;SVM&#xff09;的一种扩展&#xff0c;用于回归问题。SVR通过寻找一个最佳的回归超平面&#xff0c;使得尽可能多的数据点落在超平面附近的ε-管内&#xf…

ubuntu16安装DHCP

一、安装dns server软件包 $ apt-get install bind9 二、配置BIND9 配置文件的目录默认为/etc/bind cd /etc/bind 进入该目录。。。 1、vi /etc/bind/named.conf.local zone "xuehai.com" {type master;file "/etc/bind/db.xuehai.com"; }; image.png …

win11记事本错误打开一次非常多的文件再次使用时造成卡住

错误地不小心一次性打开数十个数百个文件造成再次使用时&#xff0c;文件卡住。 亲测有效。

ISP IC/FPGA设计-第一部分-SC130GS摄像头分析-IIC通信(1)

1.摄像头模组 SC130GS通过一个引脚&#xff08;SPI_I2C_MODE&#xff09;选择使用IIC或SPI配置接口&#xff0c;通过查看摄像头模组的原理图&#xff0c;可知是使用IIC接口&#xff1b; 通过手册可知IIC设备地址通过一个引脚控制&#xff0c;查看摄像头模组的原理图&#xff…

离散傅里叶变化

傅里叶变换 对傅里叶变换了解不是很清楚的朋友推荐一下这个帖子&#xff0c;讲得很详细 傅里叶变换 源码 先看源码链接 #include "opencv2/core.hpp" #include "opencv2/imgproc.hpp" #include "opencv2/imgcodecs.hpp" #include "open…

Yolov8可视化界面使用说明,含代码

⭐⭐ YOLOv8改进专栏|包含主干、模块、注意力机制、检测头等前沿创新 ​ ⭐⭐ YOLOv8可视化界面如下 使用需要安装opencv-python、torch、numpy及PySide6(python版本>3.9) pip install PySide6 pip install numpy pip install opencv-python 使用说明 运行下方代码&#xf…

《software architecture patterns》学习笔记

了解通用的架构模式并知道什么时候使用它们。 软件架构定义了软件的基本特点和行为。比如&#xff0c;有些软件架构会让软件变得可扩展&#xff0c;而有些软件架构会让软件变得易于修改。 知道每一种软件架构的特点、优缺点是非常有必要的&#xff0c;因为它们能帮助你选择一种…

LeetCode刷题之HOT100之课程表

吃完普通的食堂饭菜&#xff0c;回到实验室&#xff0c;继续做一道题&#xff01; 1、题目描述 2、逻辑分析 这道题涉及到图相关知识&#xff0c;应用到了拓扑排序。 题意解释 一共有 n 门课要上&#xff0c;编号为 0 ~ n-1。先决条件 [1, 0]&#xff0c;意思是必须先上课 0…

湖北大学2024年成人高考函授报名专升本法学专业介绍

湖北大学&#xff0c;这所承载着深厚文化底蕴和学术积淀的高等学府&#xff0c;始终致力于为广大有志之士提供多元化的学习机会。在时代的浪潮中&#xff0c;为了满足社会对于高层次法律人才的需求&#xff0c;湖北大学特别推出了成人高等继续教育项目&#xff0c;为广大在职人…

双击跳转到 BP 事务代码 CALL TRANSACTION BP AND SKIP FIRST SCREEN

BP 维护业务伙伴 BP事务代码不能像普通的VA03 这样跳转&#xff0c;下面介绍3种方法。 1. 单纯跳转到BP FORM FRM_SHOW_BP USING LV_BP TYPE BU_PARTNER.CALL METHOD CL_RMPS_ADDRESS>SHOW_BPEXPORTINGIM_BP_NUMBER LV_BP. ENDFORM. 2. 带业务伙伴角色跳转到BP&#xff…

NAND闪存巨头铠侠(Kioxia)计划最迟于10月下旬通过首次公开募股IPO

据路透社于6月26日引用消息来源的报道&#xff0c;在半导体市场条件反弹及财务业绩迅速改善的背景下&#xff0c;NAND闪存巨头铠侠&#xff08;Kioxia&#xff09;正准备尽快提交初步申请&#xff0c;并计划最迟于10月下旬通过首次公开募股&#xff08;IPO&#xff09;在东京证…

Kubernetes之Scheduler详解

本文尝试从Kubernetes Scheduler的功能介绍、交互逻辑、伪代码实现、最佳实践、自定义Scheduler举例及其历史演进6个方面进行详细阐述。希望对您有所帮助&#xff01; 一、Kubernetes Scheduler 功能 Kubernetes Scheduler 是 Kubernetes 集群的核心组件之一&#xff0c;负责…

使用jupyter打开本地ipynb文件的方法

常用方法&#xff1a; 先启动jupyter&#xff0c;然后在打开的页面点击upload&#xff0c;选择想要打开的文件上传然后打开&#xff0c;但是这样其实是先复制了一份到jupyter中&#xff0c;然后打开运行。而我不想复制。 方法二 先打开项目文件所在文件夹&#xff0c;文件夹…

M芯片 Parallels Desktop 19虚拟机安装Windows11教程

Parallels Desktop 19 for Mac 乃是一款适配于 Mac 的虚拟化软件。它能让您在 Mac 计算机上同时运行多个操作系统。您可借此创建虚拟机&#xff0c;并于其中装设不同的操作系统&#xff0c;如 Windows、Linux 或 macOS。使用 Parallels Desktop 19 mac 版时&#xff0c;您可在 …

情感分析方法与实践

第1关&#xff1a;情感分析的基本方法 情感分析简介 情感分析&#xff0c;又称意见挖掘、倾向性分析等。简单而言&#xff0c;是对带有情感色彩的主观性文本进行分析、处理、归纳和推理的过程。在日常生活中&#xff0c;情感分析的应用非常普遍&#xff0c;下面列举几种常见的…

数据库和程序 按照层级进行排序

文章目录 先上效果图(四种方式实现)前期工作创建表添加表数据 第一种方式: 具体执行SQL更深层次的sql案例 第二种方式: 使用java程序动态的生成SQL进行执行单元测试注意事项 第三种方式: 使用java程序进行排序[单字段排序]第四种方式: 使用lambda方式进行排序[多字段排序]最后的…

一个最简单的MySQL事务模拟测试

这里只是简单写了一个转账的小事务&#xff0c;模拟一下事务的过程 代码&#xff1a; 初始数据&#xff1a; 当你关闭自动提交 并且开启一个事务执行了下面的更新语句 但是没有提交时&#xff1a; 此时虽然你运行查询语句会发现他的值发生了变化 &#xff0c;但是当你运行回滚…

数据结构速成--树和二叉树

由于是速成专题&#xff0c;因此内容不会十分全面&#xff0c;只会涵盖考试重点&#xff0c;各学校课程要求不同 &#xff0c;大家可以按照考纲复习&#xff0c;不全面的内容&#xff0c;可以看一下小编主页数据结构初阶的内容&#xff0c;找到对应专题详细学习一下。 气死了…

C++ ─── vector模拟实现的扩容拷贝问题

扩容拷贝问题 源代码使用memcpy拷贝&#xff0c;在使用vector<int>存储内置类型时没有问题&#xff0c; 但是如果存储的是含有指针的类型&#xff0c;如string&#xff0c;就会发生浅拷贝问题 //3、容量相关void reserve(size_t n){if (n > capacity()){size_t old_si…

【PWN · ret2libc | protobuf】[2024CISCN · 华中赛区]protoverflow

套了一层protobuf壳&#xff0c;然后就是简单的ret2libc 参考速递&#xff1a;深入二进制安全&#xff1a;全面解析Protobuf-CSDN博客 前言 第一次遇到protobuf&#xff0c;如果没有了解过&#xff0c;是显然做不出来的。此次复现&#xff0c;也算是点亮了一个技能点 一、什么…