背景
日常的计算任务大部分都是串行来执行,但是如果一个复杂的任务需要进行拆分为多个小任务,那么以往是自行写一个递归或者循环计算等算法来实现,随着这类需求的提升,java7中引入了ForkJoin框架来支持这类计算,可以比较高效的解决大任务或需要合并计算的一些场景需要。
ForkJoin框架介绍
Java的Fork-Join框架是一种多线程并行处理任务的框架,通常被用于处理CPU密集型任务。它是Java SE 7及更高版本中提供的一种类库,旨在帮助开发者更轻松地编写可扩展的并行代码。
Fork-Join框架的核心思想是“分治算法”,将一个大的任务拆分成若干个小的子任务,然后通过递归的方式对这些子任务进行处理,最终将它们的结果合并起来得到大任务的结果。由于每个子任务都可以独立执行,因此可以利用多个处理器和多核心CPU的优势来并行处理这些子任务,从而提高整个任务的处理速度。
在使用Fork-Join框架时,需要定义一个继承自RecursiveTask或RecursiveAction的任务,然后使用ForkJoinPool.submit()方法将任务提交给Fork-Join线程池来执行。其中RecursiveTask返回任务的结果,RecursiveAction则不返回结果。
Fork-Join框架不仅使用方便、简单,而且可以自动化地利用多处理器和多核心CPU,充分发挥硬件资源的优势。同时,它也有许多其他的细节和性能优化措施,比如工作窃取算法等,使得它成为处理并行任务的一种强大工具。
建议参考:https://www.oracle.com/technical-resources/articles/java/fork-join.html
CPU密集型 与 IO密集型的区别?
CPU密集型和IO密集型指的是不同类型的计算任务,其区别主要表现在对系统资源的利用上。
CPU密集型任务(CPU-bound)是指需要大量CPU运算来完成的任务,例如复杂的数学计算、图像处理、科学模拟等。在这种情况下,系统的硬盘、内存性能相对CPU要好很多,此时往往是CPU Loading 100%,而I/O操作可以在很短时间内完成,所以CPU占用率很高,而I/O等待时间很短,因此CPU并不需要等待I/O操作的完成。
IO密集型任务(I/O-bound)则是指需要大量I/O操作来完成的任务,例如文件读写、网络传输、数据库查询等。在这种情况下,系统的CPU性能相对硬盘和内存要好很多,此时往往就是CPU在等待I/O操作(比如硬盘读写)的完成,而CPU Loading并不高。在I/O密集型任务中,大部分时间会用来等待I/O操作的完成,而CPU占用率则相对较低。在实际应用中,我们需要根据任务类型的不同选择合适的计算机配置和算法优化策略。如果是CPU密集型任务,就需要选用配置强劲的CPU,并尽可能缩短I/O等待时间;而如果是I/O密集型任务,就需要选用配置高速的硬盘和网络设备,并尽可能合理地利用CPU资源。
注意:python语言由于是解释型语言,对CPU密集型任务支持不是很好,因为python全局解释器(Global Interpreter Lock,GIL)同一时刻只有一个线程能够执行phton字节码,哪怕有多线程,同一时刻的执行也不能同时执行python字节码。(就是串行),可以参考:https://blog.csdn.net/qq_44993593/article/details/129120146
并发与并行有什么区别?
并行(Parallel):并行是在同一时刻执行多个任务,每个任务都有自己的处理器或核心来独立执行。
并发(Concurrent):并发是在同一时间段内执行多个任务,这些任务会交替使用CPU时间片来执行,让用户感觉它们在同时运行。
个人理解:例如每次过年,拜年的时候,同一天亲戚分批次来你家拜年中间互相没有遇到(并发),亲戚全部一起来到你家里拜年(并行)。
工作窃取算法是什么?主要解决什么问题?
工作窃取算法(work-stealing Algorithm)是一种用于实现任务调度的并发编程算法。该算法针对多线程环境下任务执行的负载均衡问题,让每个线程在自己的队列中执行任务,当自己的任务处理完后,会去其他线程的任务队列中获取任务来执行,从而充分利用了线程的资源,避免线程因等待某些任务而空闲。
原理:工作窃取算法是基于双端队列(deque)的,每个线程都有一个自己的工作队列,其中任务的执行顺序是先进先出(FIFO)。当线程需要执行任务时,会从自己的队列中取出最后加入的任务进行处理。如果线程的队列为空,那么它就会去其他线程的工作队列中窃取一个任务来执行,窃取的任务一般是其他线程队列的开头(队首)任务,这样可以有效减少线程之间的竞争和锁竞争,提高并行处理速度。
优点 :
能够高效地利用CPU资源;
充分发掘多核处理器的性能;
同时也能够避免线程因等待某些任务而空闲;
缺点:
任务窃取的次数越多,线程之间的负载均衡就越难以保证;
此外,在任务并行度较低的情况下,任务窃取可能会增加一些额外的开销,降低程序的性能。
工作窃取算法的实现逻辑主要分为以下几步:
每个线程都有一个工作队列,其中任务按照先进先出(FIFO)的顺序执行。
当线程需要执行任务时,它会从自己的队列末尾取出最后加入的任务进行处理。
如果线程的队列为空,那么它会去其他线程的工作队列中窃取一个任务来执行。这里需要注意的是,线程需要选择一个合适的窃取对象,以确保任务窃取的负载均衡性。
窃取任务一般是从其他线程队列的开头(队首)进行。当线程成功地从其他线程队列窃取了任务时,它会立即执行该任务。
在任务执行过程中,线程可能会生成新的子任务。这些子任务会被放到线程的本地工作队列中,而不是直接发送到其他线程的队列中,以避免锁竞争。
线程在执行任务过程中,会不断地检查自己的本地队列和其他线程的队列,以保证任务的高效执行和负载均衡性。
当所有任务都完成后,线程退出。
ForkJoin基本使用
类图
类说明:
ForkJoinTask 是 Java 并发编程中的一个类,它是基于 "分而治之" 的思想,并结合了 "工作窃取" 算法的一种并行处理框架。
在 java.util.concurrent 包中,ForkJoinTask 是一个抽象类,需要通过继承来创建具体的任务。它可以分割出子任务,利用多线程并行地执行这些任务,并将结果合并到一起,最终得到整个任务的结果。
其中,常用的子类包括:=
RecursiveAction:用于没有返回结果的任务。
RecursiveTask:用于有返回结果的任务。
在执行任务时,ForkJoinPool 会将任务拆分为多个小任务,每个线程执行其中的一个小任务,当线程执行完自己的任务后,它可以去 "窃取" 其他线程队列中的任务来执行,以此提高 CPU 使用率和并发效率。
相比较于传统的多线程编程,使用 ForkJoinTask 可以更好地利用 CPU,减少线程间的竞争,提高程序的性能。
/**
*
* 功能描述:
*
* @param: 通过fork/join 进行求合计算
* @return:
* @auther: csh
* @date: 2023/4/17 9:52 下午
*/
public class ArraySum extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000; // 阈值,当数组大小小于该值时不再进行拆分
private long[] arr;
private int start;
private int end;
public ArraySum(long[] arr, int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) { // 如果数组大小小于阈值,直接计算
long sum = 0L;
for (int i = start; i < end; i++) {
sum += arr[i];
}
return sum;
} else { // 如果数组大小大于阈值,拆分为两个任务并执行
int mid = (start + end) / 2;
ArraySum left = new ArraySum(arr, start, mid);
ArraySum right = new ArraySum(arr, mid, end);
left.fork();
right.fork();
return left.join() + right.join();
}
}
public static void main(String[] args) {
int N = 100000;
long[] arr = new long[N];
for (int i = 0; i < N; i++) {
arr[i] = i;
}
ForkJoinPool pool = new ForkJoinPool();
long sum = pool.invoke(new ArraySum(arr, 0, arr.length));
System.out.println("累加起来的结果是: " + sum);
}
}
结果
相加起来的结果是: 4999950000
fork/join的源码学习
ForkJoinTask 主要利用了 Unsafe 类的 CAS 操作实现了对任务状态的更新。在执行完成任务时,调用 onCompletion(CountedCompleter caller) 方法通知该任务的依赖任务或向等待该任务的线程发送信号,以此实现对任务结果的合并和传递。
属性:
static final int REOPR_SIGNAL: 表示任务的初始状态,即等待被执行。
static final int DONE_MASK: 任务完成状态的标识。
static final int NORMAL: 任务正常完成。
static final int CANCELLED: 任务被取消。
static final int EXCEPTIONAL: 任务发生异常。
volatile int status: 表示任务的状态,取值可能为 REOPR_SIGNAL、NORMAL、CANCELLED 或 EXCEPTIONAL 中的一个。
volatile ForkJoinTask<?> next: 指向下一个等待执行的任务。
volatile Thread runner: 执行该任务的线程。
final short statusFlags: 表示任务的状态及其他一些控制信息。
final short mode: 表示任务的运行模式。
Throwable exception: 任务执行过程中发生异常时,保存该异常对象。
方法:
public final void fork(): 将该任务加入到当前工作线程队列中,等待被执行。
public final boolean isDone(): 判断该任务是否已完成。
public final boolean cancel(boolean mayInterruptIfRunning): 取消该任务的执行。
public final void completeExceptionally(Throwable ex): 异常完成该任务,并将发生的异常传递给等待该任务的线程。
protected abstract void compute(): 子类必须实现的计算方法,用于执行具体的任务逻辑。
public final void quietlyCompleteRoot(): 安静地完成该任务,并通知等待该任务的线程。如果该任务是根任务,则将结果放到 ForkJoinPool 中的队列中。
public final int getQueuedTaskCount(): 获取等待执行的任务个数。
public final boolean isCancelled(): 判断该任务是否已取消。
public final boolean isCompletedAbnormally(): 判断该任务是否发生异常。
public final boolean isCompletedNormally(): 判断该任务是否正常完成。
public final Throwable getException(): 获取该任务发生的异常。
public final ForkJoinTask<V> submit(): 将该任务提交到 ForkJoinPool 中执行,并返回该任务的结果。
public final V invoke(): 在当前线程中执行该任务,并返回该任务的结果。
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2): 执行给定的两个任务,并等待这两个任务都完成。
public static <T> void invokeAll(ForkJoinTask<T>... tasks): 执行指定的一组任务,并等待所有任务都完成。
protected static void reportException(Throwable ex): 抛出给定的异常。
私有方法:
final int setCompletion(int completion): 原子性地将该任务的状态修改为完成状态,同时返回原状态值。
final int doExec(): 执行当前任务的 compute() 方法,并返回任务的状态值。
final boolean trySetSignal(): 尝试将当前任务的状态从新建转换为信号状态 REOPR_SIGNAL。
static void internalPropagateException(Throwable ex): 尝试将给定的异常对象抛出到外层任务。
package java.util.concurrent;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.RandomAccess;
import java.lang.ref.WeakReference;
import java.lang.ref.ReferenceQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.reflect.Constructor;
//为抽象类必须被实现,实现Future
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/** 状态:初始状态:status = SIGNAL
正常完成状态:status = NORMAL
取消状态:status = CANCELLED
异常状态:status = EXCEPTIONAL */
volatile int status; // accessed directly by pool and workers
static final int DONE_MASK = 0xf0000000; // 任务完成时的标识。
static final int NORMAL = 0xf0000000; // 正常任务状态。
static final int CANCELLED = 0xc0000000; // 取消状态。
static final int EXCEPTIONAL = 0x80000000; // 异常状态。
static final int SIGNAL = 0x00010000; // 初始状态 必须为 >= 1 << 16
static final int SMASK = 0x0000ffff; // 低位掩码,也是最大索引位
/**
* 原子性地将该任务的状态修改为完成状态,同时返回原状态值。
入参为状态
*/
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}
/**
*执行当前任务的 compute() 方法,并返回任务的状态值。
*/
final int doExec() {
int s; boolean completed;
//状态大于0
if ((s = status) >= 0) {
try {
//立即执行任务
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
//执行后状态判断
if (completed)
//设置状态为正常任务状态。
s = setCompletion(NORMAL);
}
//返回状态
return s;
}
/**
* 在等待该任务完成时,使用指定的超时时间来阻塞当前线程。
*/
final void internalWait(long timeout) {
int s;
//正常状态才继续
if ((s = status) >= 0 && // force completer to issue notify
//又是Unsafe 的cas来更改状态
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
//同步块
synchronized (this) {
//状态大于0 正常 等待timeout时间
if (status >= 0)
try { wait(timeout); } catch (InterruptedException ie) { }
else
//唤醒所有任务
notifyAll();
}
}
}
/**
* 阻止非工作线程,直到完成。
*/
private int externalAwaitDone() {
//判断类型获取池中的状态
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
//大于0证明不是初始化及已结束
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
//通过循环方式进行cas设置锁
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
//中断状态为true(有可能结束或异常了)
if (interrupted)
//进行当前线程中断
Thread.currentThread().interrupt();
}
//反回状态
return s;
}
/**
* 等待该任务完成,并允许在等待的过程中中断当前线程。
该方法通过调用 LockSupport.park(this) 方法来实现线程等待,如果当前线程被中断,则会抛出 InterruptedException 异常。
*/
private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
//中断线程成功 直接抛出
if (Thread.interrupted())
throw new InterruptedException();
//如果状态大于0 且 尝试通过线程池进行执行所有任务 证明正常
if ((s = status) >= 0 &&
(s = ((this instanceof CountedCompleter) ?
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
0)) >= 0) {
//循环进行状态置为初始化
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
wait(0L);
else
notifyAll();
}
}
}
}
return s;
}
/**
* 等待该任务完成,并返回任务的状态。
在等待任务完成的过程中,使用自旋锁的方式不断地检查任务状态。
如果任务状态为完成状态,则返回该任务的状态;否则,使用 LockSupport.park(this) 方法挂起当前线程,并等待任务完成。
注意:这个方法在等待过程中由于使用了自旋锁和线程挂起的方式,因此可能会消耗大量的 CPU 资源。
*
*/
private int doJoin() {
//参数声名 状态:s 线程:t 工作线程:wt 工作队列:w
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//获取状态如果小于0 当前线程中断 否则进行结束
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
/**
* 在当前线程中执行该任务,并返回任务的状态值(同上类似)
*/
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
// Exception table support
//任务异常列表
private static final ExceptionNode[] exceptionTable;
//任务异常重入锁
private static final ReentrantLock exceptionTableLock;
//存放异常的实例
private static final ReferenceQueue<Object> exceptionTableRefQueue;
/**
* 异常表的固定容量。
*/
private static final int EXCEPTION_MAP_CAPACITY = 32;
//异常节点类实现
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
final Throwable ex;
ExceptionNode next;
final long thrower; // use id not ref to avoid weak cycles
final int hashCode; // store task hashCode before weak ref disappears
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
super(task, exceptionTableRefQueue);
this.ex = ex;
this.next = next;
this.thrower = Thread.currentThread().getId();
this.hashCode = System.identityHashCode(task);
}
}
/**
* 记录异常并设置状态。
*
* @return status on exit
*/
final int recordExceptionalCompletion(Throwable ex) {
int s;
//状态大于等于0
if ((s = status) >= 0) {
int h = System.identityHashCode(this);
//上锁
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
int i = h & (t.length - 1);
//循环创建
for (ExceptionNode e = t[i]; ; e = e.next) {
if (e == null) {
t[i] = new ExceptionNode(this, ex, t[i]);
break;
}
if (e.get() == this) // already present
break;
}
} finally {
lock.unlock();
}
s = setCompletion(EXCEPTIONAL);
}
return s;
}
/**
* 记录异常并可能传播。
*
* @return status on exit
*/
private int setExceptionalCompletion(Throwable ex) {
int s = recordExceptionalCompletion(ex);
if ((s & DONE_MASK) == EXCEPTIONAL)
internalPropagateException(ex);
return s;
}
/**
* Hook for exception propagation support for tasks with completers.
*/
void internalPropagateException(Throwable ex) {
}
/**
* 取消,忽略取消引发的任何异常。
*/
static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
if (t != null && t.status >= 0) {
try {
//取消
t.cancel(false);
} catch (Throwable ignore) {
}
}
}
/**
* 删除异常节点并清除状态。
*/
private void clearExceptionalCompletion() {
int h = System.identityHashCode(this);
final ReentrantLock lock = exceptionTableLock;
//上锁
lock.lock();
try {
ExceptionNode[] t = exceptionTable;
int i = h & (t.length - 1);
ExceptionNode e = t[i];
ExceptionNode pred = null;
//循环处理
while (e != null) {
ExceptionNode next = e.next;
if (e.get() == this) {
if (pred == null)
t[i] = next;
else
pred.next = next;
break;
}
pred = e;
e = next;
}
expungeStaleExceptions();
status = 0;
} finally {
//释放锁
lock.unlock();
}
}
//获取异常
private Throwable getThrowableException() {
//不是异常直接返回空
if ((status & DONE_MASK) != EXCEPTIONAL)
return null;
//获取哈希code
int h = System.identityHashCode(this);
ExceptionNode e;
//上锁
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
//删除过期的引用
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
e = t[h & (t.length - 1)];
while (e != null && e.get() != this)
e = e.next;
} finally {
lock.unlock();
}
Throwable ex;
if (e == null || (ex = e.ex) == null)
return null;
if (e.thrower != Thread.currentThread().getId()) {
Class<? extends Throwable> ec = ex.getClass();
try {
Constructor<?> noArgCtor = null;
Constructor<?>[] cs = ec.getConstructors();// public ctors only
for (int i = 0; i < cs.length; ++i) {
Constructor<?> c = cs[i];
Class<?>[] ps = c.getParameterTypes();
if (ps.length == 0)
noArgCtor = c;
else if (ps.length == 1 && ps[0] == Throwable.class) {
Throwable wx = (Throwable)c.newInstance(ex);
return (wx == null) ? ex : wx;
}
}
if (noArgCtor != null) {
Throwable wx = (Throwable)(noArgCtor.newInstance());
if (wx != null) {
wx.initCause(ex);
return wx;
}
}
} catch (Exception ignore) {
}
}
return ex;
}
/**
* 删除过期引用的实现方法
*/
private static void expungeStaleExceptions() {
//循环筛选
for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
if (x instanceof ExceptionNode) {
int hashCode = ((ExceptionNode)x).hashCode;
ExceptionNode[] t = exceptionTable;
int i = hashCode & (t.length - 1);
ExceptionNode e = t[i];
ExceptionNode pred = null;
while (e != null) {
ExceptionNode next = e.next;
if (e == x) {
if (pred == null)
t[i] = next;
else
pred.next = next;
break;
}
pred = e;
e = next;
}
}
}
}
/**
* 轮询过时的引用并将其删除(带锁)
*/
static final void helpExpungeStaleExceptions() {
final ReentrantLock lock = exceptionTableLock;
if (lock.tryLock()) {
try {
//调用上面的方法
expungeStaleExceptions();
} finally {
lock.unlock();
}
}
}
/**
* 重新抛出异常的方法
*/
static void rethrow(Throwable ex) {
if (ex != null)
ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}
/**
* The sneaky part of sneaky throw, relying on generics
* limitations to evade compiler complaints about rethrowing
* unchecked exceptions
*/
@SuppressWarnings("unchecked") static <T extends Throwable>
void uncheckedThrow(Throwable t) throws T {
throw (T)t; // rely on vacuous cast
}
/**
* 根据s的不同抛出不同的异常
*/
private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}
// public methods
/**
* 将该任务加入到当前工作线程队列中,等待被执行。
*/
public final ForkJoinTask<V> fork() {
Thread t;
//获取当前线程 判断是否为ForkJoinWorkerThread类型,是的话加入工作队列
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
//不是则加入工作池队列
ForkJoinPool.common.externalPush(this);
return this;
}
/**
*返回计算的结果
注意:此方法异常会导致中断
*/
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
//返回计算结果
return getRawResult();
}
/**
* 在当前线程中执行该任务,并返回该任务的结果。
*
* 返回计算结果
*/
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
/**
* 执行给定的两个任务,并等待这两个任务都完成。
*/
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
t1.reportException(s1);
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
t2.reportException(s2);
}
/**
* 执行指定的一组任务,并等待所有任务都完成。
*/
public static void invokeAll(ForkJoinTask<?>... tasks) {
Throwable ex = null;
//获取最后一个节点下标
int last = tasks.length - 1;
//倒序
for (int i = last; i >= 0; --i) {
//获取对象
ForkJoinTask<?> t = tasks[i];
//为空 进行抛出空指针异常
if (t == null) {
if (ex == null)
ex = new NullPointerException();
}
//不为空 进行加入队列
else if (i != 0)
t.fork();
//执行并返回结果 如果状态为NORMAL 且 ex为空抛出异常
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
//自增进行
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = tasks[i];
if (t != null) {
if (ex != null)
t.cancel(false);
//加入执行队列
else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
//异常不为空则抛出异常
if (ex != null)
rethrow(ex);
}
//同上类似
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
return tasks;
}
@SuppressWarnings("unchecked")
List<? extends ForkJoinTask<?>> ts =
(List<? extends ForkJoinTask<?>>) tasks;
Throwable ex = null;
int last = ts.size() - 1;
for (int i = last; i >= 0; --i) {
ForkJoinTask<?> t = ts.get(i);
if (t == null) {
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = ts.get(i);
if (t != null) {
if (ex != null)
t.cancel(false);
else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
if (ex != null)
rethrow(ex);
return tasks;
}
//尝试取消此任务的执行。
public boolean cancel(boolean mayInterruptIfRunning) {
return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
//判断是否结束 true是 false否
public final boolean isDone() {
return status < 0;
}
// 判断该任务是否已取消。
public final boolean isCancelled() {
return (status & DONE_MASK) == CANCELLED;
}
/**
* 判断该任务是否发生异常。true是 false否
*/
public final boolean isCompletedAbnormally() {
return status < NORMAL;
}
/**
* 判断该任务是否正常完成。true是 false否
*/
public final boolean isCompletedNormally() {
return (status & DONE_MASK) == NORMAL;
}
/**
* 获取异常
public final Throwable getException() {
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
}
/**
* 异常完成该任务,并将发生的异常传递给等待该任务的线程。
*/
public void completeExceptionally(Throwable ex) {
setExceptionalCompletion((ex instanceof RuntimeException) ||
(ex instanceof Error) ? ex :
new RuntimeException(ex));
}
/**
*用于执行具体的任务逻辑。
*/
public void complete(V value) {
try {
setRawResult(value);
} catch (Throwable rex) {
setExceptionalCompletion(rex);
return;
}
setCompletion(NORMAL);
}
/**
* 在不设置值的情况下正常完成此任务
*
* @since 1.8
*/
public final void quietlyComplete() {
setCompletion(NORMAL);
}
/**等待计算完成,然后检索其结果。
*/
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone();
Throwable ex;
if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
return getRawResult();
}
/**
* 带超时的等待执行结束,获取结果
*/
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
int s;
long nanos = unit.toNanos(timeout);
if (Thread.interrupted())
throw new InterruptedException();
if ((s = status) >= 0 && nanos > 0L) {
long d = System.nanoTime() + nanos;
long deadline = (d == 0L) ? 1L : d; // avoid 0
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
}
else if ((s = ((this instanceof CountedCompleter) ?
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ?
doExec() : 0)) >= 0) {
long ns, ms; // measure in nanosecs, but wait in millisecs
while ((s = status) >= 0 &&
(ns = deadline - System.nanoTime()) > 0L) {
if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
wait(ms); // OK to throw InterruptedException
else
notifyAll();
}
}
}
}
}
if (s >= 0)
s = status;
if ((s &= DONE_MASK) != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s != EXCEPTIONAL)
throw new TimeoutException();
if ((ex = getThrowableException()) != null)
throw new ExecutionException(ex);
}
return getRawResult();
}
/**
* 加入此任务,而不返回其结果或引发其异常。
*/
public final void quietlyJoin() {
doJoin();
}
/**
*开始执行此任务,并在必要时等待其完成,而不返回其结果或引发其异常。
*/
public final void quietlyInvoke() {
doInvoke();
}
/**
* 用于等待所有正在执行的任务完成。
*/
public static void helpQuiesce() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
wt.pool.helpQuiescePool(wt.workQueue);
}
else
ForkJoinPool.quiesceCommonPool();
}
/**
* 重置此任务的内部簿记状态,允许后续
*/
public void reinitialize() {
if ((status & DONE_MASK) == EXCEPTIONAL)
clearExceptionalCompletion();
else
status = 0;
}
/**
* 获取任务线程池
*/
public static ForkJoinPool getPool() {
Thread t = Thread.currentThread();
return (t instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread) t).pool : null;
}
/**
*判断当前是否为线程池执行,如果是则返回true
*/
public static boolean inForkJoinPool() {
return Thread.currentThread() instanceof ForkJoinWorkerThread;
}
/**
* 尝试取消执行的任务
*/
public boolean tryUnfork() {
Thread t;
//获取当前线程 判断在池中还是队列,然后进行取消
return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
ForkJoinPool.common.tryExternalUnpush(this));
}
/**
* 返回当前任务的队列计数(排队数)
*/
public static int getQueuedTaskCount() {
Thread t; ForkJoinPool.WorkQueue q;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
q = ((ForkJoinWorkerThread)t).workQueue;
else
q = ForkJoinPool.commonSubmitterQueue();
return (q == null) ? 0 : q.queueSize();
}
/**
* 获取线程池中的排队数
*
* @return the surplus number of tasks, which may be negative
*/
public static int getSurplusQueuedTaskCount() {
return ForkJoinPool.getSurplusQueuedTaskCount();
}
// Extension methods
/**
* 返回的结果(即使此任务异常完成)或 {@code null}(如果此任务未知已完成)
*
* @return the result, or {@code null} if not completed
*/
public abstract V getRawResult();
/**
* Forces the given value to be returned as a result. This method
* is designed to support extensions, and should not in general be
* called otherwise.
*
* @param value the value
*/
protected abstract void setRawResult(V value);
/**
* 抽象的执行方法 (子类需要实现)
*/
protected abstract boolean exec();
/**
* 获取当前排队中的任务
*/
protected static ForkJoinTask<?> peekNextLocalTask() {
Thread t; ForkJoinPool.WorkQueue q;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
q = ((ForkJoinWorkerThread)t).workQueue;
else
q = ForkJoinPool.commonSubmitterQueue();
return (q == null) ? null : q.peek();
}
/**
* 用于从当前线程绑定的工作队列中获取下一个待执行的任务。
*/
protected static ForkJoinTask<?> pollNextLocalTask() {
Thread t;
return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
null;
}
/**
* 用于从当前线程绑定的工作队列和共享队列中获取下一个待执行的任务。
*/
protected static ForkJoinTask<?> pollTask() {
Thread t; ForkJoinWorkerThread wt;
return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
null;
}
// tag operations
/**
* 获取当前状态
* @since 1.8
*/
public final short getForkJoinTaskTag() {
return (short)status;
}
/**
* 以原子方式设置此任务的标记值。
*/
public final short setForkJoinTaskTag(short tag) {
for (int s;;) {
if (U.compareAndSwapInt(this, STATUS, s = status,
(s & ~SMASK) | (tag & SMASK)))
return (short)s;
}
}
/**
*以原子方式有条件地设置此任务的标记值。
*/
public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
for (int s;;) {
if ((short)(s = status) != e)
return false;
if (U.compareAndSwapInt(this, STATUS, s,
(s & ~SMASK) | (tag & SMASK)))
return true;
}
}
/**
* 适配器类
*/
static final class AdaptedRunnable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
//用于存储被适配的 Runnable 对象。
final Runnable runnable;
T result;
//构造方法 线程和结果为入参
AdaptedRunnable(Runnable runnable, T result) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
this.result = result; // OK to set this even before completion
}
//获取执行结果方法
public final T getRawResult() { return result; }
//设置执行结果方法
public final void setRawResult(T v) { result = v; }
//执行返回true
public final boolean exec() { runnable.run(); return true; }
//运行无返回
public final void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;
}
/**
* 有返回的适配器类 与上面的类似,唯一的区别是没有返回
*/
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
implements RunnableFuture<Void> {
final Runnable runnable;
AdaptedRunnableAction(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) { }
public final boolean exec() { runnable.run(); return true; }
public final void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;
}
/**
* 同上类似(带异常)
*/
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
final Runnable runnable;
RunnableExecuteAction(Runnable runnable) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) { }
public final boolean exec() { runnable.run(); return true; }
void internalPropagateException(Throwable ex) {
rethrow(ex); // rethrow outside exec() catches.
}
private static final long serialVersionUID = 5232453952276885070L;
}
/**
* 适配器
*/
static final class AdaptedCallable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
final Callable<? extends T> callable;
T result;
AdaptedCallable(Callable<? extends T> callable) {
if (callable == null) throw new NullPointerException();
this.callable = callable;
}
public final T getRawResult() { return result; }
public final void setRawResult(T v) { result = v; }
public final boolean exec() {
try {
result = callable.call();
return true;
} catch (Error err) {
throw err;
} catch (RuntimeException rex) {
throw rex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public final void run() { invoke(); }
private static final long serialVersionUID = 2838392045355241008L;
}
/**
* 返回一个新的AdaptedRunnableAction适配器
*/
public static ForkJoinTask<?> adapt(Runnable runnable) {
return new AdaptedRunnableAction(runnable);
}
/**
* 返回一个AdaptedRunnable适配器
*/
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
return new AdaptedRunnable<T>(runnable, result);
}
/**
* 返回一个AdaptedCallable适配器
*/
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
return new AdaptedCallable<T>(callable);
}
// Serialization support
private static final long serialVersionUID = -7721805057305804111L;
/**
* 输入对象流
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
s.writeObject(getException());
}
/**
* 读文件
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
Object ex = s.readObject();
if (ex != null)
setExceptionalCompletion((Throwable)ex);
}
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long STATUS;
//初始化静态代码块
static {
//实例会异常锁
exceptionTableLock = new ReentrantLock();
//实例会引用对象
exceptionTableRefQueue = new ReferenceQueue<Object>();
//实例化异常节点对象
exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
try {
//实例化unsafe
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ForkJoinTask.class;
STATUS = U.objectFieldOffset
(k.getDeclaredField("status"));
} catch (Exception e) {
throw new Error(e);
}
}
}
最后
非常可惜到目前为止,我在很多项目中及跟很多做开发的同事或朋友,这个ForkJoin用到的很少,大部分还是停留 在CURD,好可惜,但是其实数据库查询优化、图像处理、机器学习、大数据处理等这些场景都可以通过ForkJoin的并行处理能力来实现这些场景。当然这个ForkJoin还有非常非常多的用法,可以自行了解。