并发编程-CompletableFuture
本篇主要讲述 JDK1.8 里面 CompletableFuture 的原理与源码分析。这一篇暂且作为整个章节的最后一篇(若有时间继承增加关于并发编程的其他内容)。闲话少叙,进入正题。在深入了解 CompletableFuture 之前我们先要看一下 Future&Callable
Future&Callable
众所周知,我们在异步执行一个任务的时候通常是继承 Thread 类或者实现 Runnable 接口,而最终都会去回调 run() 方法去执行任务。可 run() 方法是没有返回值的。那如何在线程执行完之后拿到返回结果呢?Future&Callable 就派上了用场,它提供了一个异步执行并带有返回的功能,通常它俩需要一起使用。下面我们通过一段代码示例来感受一下。
使用
public class FutureCallableExample {
public static void main(String[] args) {
// 创建一个单线程的ExecutorService
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交一个Callable任务
Callable<String> task = () -> {
// 模拟耗时操作
TimeUnit.SECONDS.sleep(1);
return "Hello, Future!";
};
// 提交Callable任务并获取Future对象
Future<String> future = executor.submit(task);
try {
// 检查任务是否完成
// 可能会立即返回false,因为任务可能还在执行
System.out.println("任务是否完成: " + future.isDone());
// 等待任务完成并获取结果
// 这里会阻塞,直到任务完成
String result = future.get();
System.out.println("任务结果: " + result);
// 再次检查任务是否完成
System.out.println("任务是否完成: " + future.isDone());
} catch (InterruptedException | ExecutionException e) {
// 处理异常
e.printStackTrace();
}
// 关闭线程池
executor.shutdown();
}
}
在上述代码中,我们创建了一个实现了 Callable 接口的匿名类,它的 call() 方法模拟了一个耗时操作,并返回了一个字符串结果。然后,我们将这个 Callable 任务提交给了 ExecutorService ,并获得了一个 Future 对象。
我们调用了 future.get() 方法来等待任务完成并获取结果。这个方法会阻塞调用它的线程,直到任务完成。任务完成就可以通过future.get() 获取到结果,并打印出来。
类关系图
在上面的代码示例中有的小伙伴可能觉得多少有点混乱,又得提交Callable 任务,还要得到 Future 对象,最后又传递给线程去执行。那它们之间的关系如何,我们通过类关系图来看一下。
通过类关系图我们就比较清晰的看出它们之间的联系。
-
Future 提供了获取线程执行结果的方法。
public interface Future<V> { //......代码省略...... V get() throws InterruptedException, ExecutionException; //......代码省略...... }
-
RunnableFuture 接口继承了 Runnable、Future,并提供一个抽象的 run() 方法。
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
-
而 FutureTask又实现了 RunnableFuture 接口,重写了 Future 的 get() 方法和 RunnableFuture 的 run() 方法
public class FutureTask<V> implements RunnableFuture<V> { public V get() throws InterruptedException, ExecutionException { //......代码省略...... } public void run() { //......代码省略...... } }
-
FutureTask 的构造方法中传递 Callable 接口
public FutureTask(Callable<V> callable) { //......代码省略...... }
原理分析
熟悉了 Future&Callable 的使用以及它们的类关系图之后我们来逐步推测一下它的核心原理。
- 我们都知道线程执行完之后是无法获得返回值的(因为 run() 方法就不带返回值),而 Future 之所以能够获得返回值一定是在执行过程中或者执行结束的时候做了相关处理,FutureTask 就是用来获得线程执行结果的。
- FutureTask 实现了 RunnableFuture 接口而 RunnableFuture 又继承了 Runnable 接口,所以线程执行的时候肯定会执行 run() 方法,但 run() 并没有返回值。这里有个很关键的地方,在 FutureTask 的构造方法中把实现了 Callable 接口的任务作为参数进行传递。因此我们可以推测出返回值应该是 call() 方法提供的。那这就相当于调用 FutureTask.run(),实际还会再调用 call() 方法。
- 当 run() 执行并调用 call() 方法时,假设 call() 方法执行需要很长的时间。那么 future.get() 获取结果会处于阻塞状态直到 call() 方法执行结束。如果在多线程环境下就会必然涉及到线程的阻塞和唤醒。这个我们在之前的文章中多次碰到过,常规套路就是需要有一个队列来存储阻塞的线程,基于 LockSupport.park() 和 LockSupport.unpark() 来实现线程阻塞和唤醒。
至于以上的推导分析正确性如何,我们还是通过源码来对照一下,看是不是和我们想的一样
源码分析
我们通过之前的分析已经知道了核心逻辑都在 FutureTask 这个类中。在进入源码逻辑分析之前,我们先来看一下 FutureTask 的核心属性。
public class FutureTask<V> implements RunnableFuture<V> {
/**
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
// 任务的状态
private volatile int state;
//初始状态,任务还没有开始执行
private static final int NEW = 0;
//任务正在执行过程中,此时还没有结果
private static final int COMPLETING = 1;
//任务正常完成,并设置了结果
private static final int NORMAL = 2;
//任务执行过程中抛出异常
private static final int EXCEPTIONAL = 3;
//任务被取消
private static final int CANCELLED = 4;
//任务正在被中断的过程中,但还未完全中断。这是一个过渡状态
private static final int INTERRUPTING = 5;
//任务已经被中断
private static final int INTERRUPTED = 6;
//提交的任务
private Callable<V> callable;
// 非volatile 字段,用于存储任务执行的结果或抛出的异常
private Object outcome;
//正在执行任务的线程
private volatile Thread runner;
//单向链表,用来存储等待执行任务结束的线程
private volatile WaitNode waiters;
}
我们看到在注释中还标注了几种状态的流转情况
-
NEW(初始状态) -> COMPLETING(正在执行任务) -> NORMAL(任务执行完成并设置了结果)
这是任务正常执行完成的状态流转
-
NEW(初始状态) -> COMPLETING(正在执行任务) -> EXCEPTIONAL(任务执行过程中发生异常)
-
NEW(初始状态) -> CANCELLED(任务被取消)
通常是调用 FutureTask 中的cancel() 方法
-
NEW(初始状态) -> INTERRUPTING(任务正在被中断) -> INTERRUPTED(任务已经被中断)
接下来就正式进入源码的分析了,我们就以 FutureTask 的 run() 方法作为入口进行分析
run()
public void run() {
/**
*判断 state 是不是初始状态或者通过cas操作尝试把runner设置为当前线程
*如果不为初始状态或者cas失败证明当前有任务在执行直接返回
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
//提交的 callable 任务
Callable<V> c = callable;
//任务不为空且 state是初始状态
if (c != null && state == NEW) {
//任务执行的结果
V result;
boolean ran;
try {
//这里就是回调 callable 的call()
result = c.call();
ran = true;
} catch (Throwable ex) {
//出现异常的情况 任务执行结果设置为空
result = null;
ran = false;
//任务执行异常处理
setException(ex);
}
if (ran)
//设置任务执行结果
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
//处理任务被中断的逻辑
handlePossibleCancellationInterrupt(s);
}
}
整个 run() 方法并不复杂,我们先按 NEW(初始状态) -> COMPLETING(正在执行任务) -> NORMAL 这个流转状态来看,如果任务正常执行那么必然会调用 set(result) 方法
set()
protected void set(V v) {
//cas操作尝试把状态改为COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//修改状态成功把结果 v 赋值给 outcome
outcome = v;
//结果赋值完后 COMPLETING(正在执行任务) -> NORMAL 修改状态为 NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
//唤醒调用 get()方法时阻塞的线程(这个方法我们先不详细展开,先顺着思路往下看)
finishCompletion();
}
}
紧接着我们来看 get() 方法
get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
//如果当前状态是初始化状态或者是 COMPLETING 状态
if (s <= COMPLETING)
//让当前调用 get() 方法的线程等待。注:这里 awaitDone() 方法是返回的任务状态
s = awaitDone(false, 0L);
return report(s);
}
// report() 方法是根据状态类型来决定是返回运行结果还是抛出异常
private V report(int s) throws ExecutionException {
Object x = outcome;
//如果状态是 NORMAL 返回结果
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
既然前面说到了当调用 get() 方法时如果 call() 方法还没执行完则需要阻塞线程,那么我们来看一下 awaitDone(false, 0L) 方法做了什么
awaitDone()
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//deadline 表示阻塞超时时间,timed表示是否有阻塞时间。若为true则计算当前时间+nanos 得出超时截止时间反之为0
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//看到Node就很眼熟了,肯定是构建队列用的
WaitNode q = null;
boolean queued = false;
for (;;) {
//如果检测到线程被中断了则从等待队列中移除该节点,并抛出中断异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
//得到当前的任务状态
int s = state;
//如果当前状态大于 COMPLETING,说明任务可能已经完成
if (s > COMPLETING) {
if (q != null)
//把执行任务的线程设置为空
q.thread = null;
//返回当前的任务状态
return s;
}
//如果任务正在执行中,通过Thread.yield()让出当前CPU资源
else if (s == COMPLETING)
Thread.yield();
//q为null则证明链表还未构建则新建一个节点
else if (q == null)
q = new WaitNode();
//这里有cas操作,又在for循环里边所以可以判定这个个自旋锁操作,把q加入到链表中
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//下面这段逻辑是如果加入队列之后的任务还没有完成
//如果设置了超时
else if (timed) {
//计算剩余的时间
nanos = deadline - System.nanoTime();
//如果已经超时则从队列中移除当前的q节点并返回任务状态
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//挂起当前线程并等待执行的时间
LockSupport.parkNanos(this, nanos);
}
else
//没有设置超时则直接挂起阻塞
LockSupport.park(this);
}
}
这个方法整体上并不复杂,我们来重点关注它的核心点
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
先来看 WaitNode 包含了哪些信息
static final class WaitNode {
volatile Thread thread;
//只有一个next指向证明是个单向链表
volatile WaitNode next;
//调用get() 方法阻塞的线程都赋值给 thread
WaitNode() { thread = Thread.currentThread(); }
}
这里构建链表的情况有些特殊我们逐步来分析
-
当 q == null 此时还没有链表所以通过 new WaitNode() 创建一个节点(此时的q是头节点)。
-
当 !queued == true 时执行一个cas操作UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q) 。关键点来了,这个操作是把新的q节点加入到头节点之前。我们把这个操作分开理解即
-
第一步 设置新节点q 的next指向 q.next = waiters 新节点 q 的 next 指针指向当前的头节点 waiters
-
第二步 更新头节点也就是执行cas操作 waiters = q 。这样 q 就成了新的头节点
既然等待执行任务的线程被阻塞,那肯定还会去唤醒,那是什么时候被唤醒。这里有两种情况,一是在任务执行完成会调用 set() 方法中的finishCompletion() 方法唤醒阻塞的线程。二是如果线程被中断,在awaitDone() 方法中会判断 if (Thread.interrupted()) ,我们主要关注 finishCompletion() 这个方法。
finishCompletion()
private void finishCompletion() {
//遍历整个链表,这里要唤醒所有阻塞的线程
for (WaitNode q; (q = waiters) != null;) {
/**通过cas操作尝试将 waiters 设置为null
* 这里小伙伴可能有疑惑为什么要设置为null,其实我们可以这么理解
* 当执行finishCompletion(),此时outcome已经赋值完成就不需要对线程进行阻塞了
*所以将 waiters 设置为 null 的目的就是不再让新的线程加入到等待队列中。
*/
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
//遍历获取每个节点的线程
Thread t = q.thread;
if (t != null) {
q.thread = null;
//唤醒
LockSupport.unpark(t);
}
WaitNode next = q.next;
//next 为空则证明链表遍历完成
if (next == null)
break;
//已经唤醒的节点从链表中移除,等待jvm回收
q.next = null;
q = next;
}
break;
}
}
done();
//最后把callable 设置为空
callable = null;
}
小结
我们简单的梳理了一下 Future&Callable 的原理以及源码,其中 FutureTask 实现了 Runnable 和 Future接口。通过传入一个 Callable 任务,实现一个返回值的执行任务,也就是 call() 方法的返回值,call() 方法没有执行完之前,获取返回值则是通过阻塞的方式进行获取(future.get() 方法) 。阻塞的线程则存储在一个 Treiber stack 数据结构中等待任务执行完成并唤醒。
对于 Future&Callable来说它是存在缺陷的,一是在获取任务执行结果的时候,只能通过 future.get() 阻塞的方式获取或者不断调用 future.isDone() 判断任务是否执行完成非常的不方便。二是任务执行完成后没有通知机制。那有没有更好的获取异步执行结果的方式呢?因此我们引出文章下半部分的主要内容也是全篇的核心——CompletableFuture
CompletableFuture
概念
CompletableFuture
是 Java 8 引入的一个类,它实现了 Future
和 CompletionStage
接口,提供了非阻塞的方式来处理异步计算的结果。CompletableFuture
的主要目的是简化异步编程的复杂性,使得异步代码更加清晰和易于编写。CompletableFuture
在 Future
的基础上增加了在异步操作完成之后,主线程如果需要以来该任务的执行结果继续后面的操作不需要等待,可直接传入一个回调对象。当异步任务执行完之后自动调用传入的回调对象。
类关系图
它们之间的类关系图就比较简单
从图中我们可以很明显的看到 CompletableFuture 实现了 Future于 CompletionStage 接口。Futrue我们在上半章节已经说过了它提供了获取任务执行结果和任务执行状态的方法,而 CompletionStage 则表示任务执行的阶段,它定义了很多的方法如 thenApply()、thenAccept() 等。通过这一系列的方法可以实现多个任务之间的执行关系如串行、并行、聚合等。
使用
由于 CompletableFuture 里面涉及了各种异步非阻塞方式的处理方法且方法比较多我们就不挨个进行模拟使用了,这里我们把方法进行分类着重点演示如何使用。这里我们可以分为两大类,异步执行和异步执行完回调。我们先来看异步执行
异步执行
CompletableFuture 提供了四种静态方法来构建一个异步事件
-
supplyAsync(Supplier supplier)
带返回值的异步执行方法,传入一个函数式接口返回 CompletableFuture 对象。默认使用ForkJoinPool.commonPool() 线程池执行异步任务
public class SupplyAsyncExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture future = CompletableFuture.supplyAsync(()->{ return 2*5; }); System.out.println(future.get()); } }
-
supplyAsync(Supplier supplier,Executor executor)
带返回值的异步执行方法,传入一个函数式接口与线程池返回CompletableFuture 对象,可自定义线程池来执行异步任务
public class SupplyAsyncExample { public static void main(String[] args) throws ExecutionException, InterruptedException { // 创建一个自定义的线程池 ExecutorService executor = Executors.newSingleThreadExecutor(); // 使用 supplyAsync 异步地执行 2*5 的任务,并传入自定义的线程池 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{ System.out.println("执行异步计算的线程:" + Thread.currentThread().getName()); return 2*5; }, executor); // 打印当前线程名称,表明 main 线程不会被阻塞 System.out.println("主线程继续执行,线程名:" + Thread.currentThread().getName()); // 使用 thenAccept 非阻塞地处理异步操作的结果 future.thenAccept(result -> System.out.println("异步计算的结果是:" + result)); //关闭线程池 executor.shutdown(); } }
执行结果如下
-
runAsync(Runnable runnable)
不带返回值的异步执行方法,传入的是一个Runnable,返回的同样是CompletableFuture对象
public class RunAsyncExample { public static void main(String[] args) throws ExecutionException, InterruptedException { AtomicInteger i= new AtomicInteger(2); CompletableFuture future = CompletableFuture.runAsync(()->{ System.out.println(Thread.currentThread().getName()+" 异步执行任务"); i.set(i.get() * 5); }); System.out.println("异步执行结果 "+ future.get()); System.out.println("i 的值"+ i); } }
执行结果如下
由于 runAsync() 方法是没有返回值的,所以get() 方法返回的是null 但计算 2*5 的任务已经执行了,i 变为了10。
-
runAsync(Runnable runnable,Executor executor)
不带返回值的异步执行方法,传入的是Runnable和 Executor ,返回的同样是CompletableFuture对象,可自定义线程池来执行异步任务。
public class RunAsyncExample { public static void main(String[] args) throws ExecutionException, InterruptedException { // 创建一个自定义的线程池 ExecutorService executor = Executors.newSingleThreadExecutor(); AtomicInteger i= new AtomicInteger(2); // 使用 runAsync 异步地执行 2*5 的任务,并传入自定义的线程池 CompletableFuture future = CompletableFuture.runAsync(()->{ System.out.println(Thread.currentThread().getName()+" 异步执行任务"); i.set(i.get() * 5); },executor); // 打印当前线程名称,表明 main 线程不会被阻塞 System.out.println("主线程继续执行,线程名:" + Thread.currentThread().getName()); future.get(); // 获取结果 System.out.println("异步计算的结果是:" + i); //关闭线程池 executor.shutdown(); } }
执行结果如下
CompletableFuture 中还有两个特殊的静态方法 allOf() 和 anyOf()
-
allOf(CompletableFuture<?>… cfs)
它的含义是接收多个 CompletableFuture 无返回值任务,当所有的 CompletableFuture 任务执行完,返回一个新的 CompletableFuture 对象。
public class AllOfExample { public static void main(String[] args) { // 创建三个异步任务 CompletableFuture future1 = CompletableFuture.runAsync(() -> { try { // 模拟耗时操作 Thread.sleep(1000); System.out.println("任务1完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); CompletableFuture future2 = CompletableFuture.runAsync(() -> { try { // 模拟耗时操作 Thread.sleep(2000); System.out.println("任务2完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); CompletableFuture future3 = CompletableFuture.runAsync(() -> { try { // 模拟耗时操作 Thread.sleep(500); System.out.println("任务3完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); // 使用CompletableFuture.allOf等待所有任务完成 CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3); // 当所有任务完成时,可以执行一些后续操作 try { // 阻塞等待所有任务完成 allFutures.get(); System.out.println("所有任务完成"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
执行结果如下
在上述代码中有个需要注意的点,如果其中任何一个任务抛出异常,那么get() 方法将抛出 ExecutionException 异常,我们可以对这个异常进行捕获或处理。
-
anyOf(CompletableFuture<?>… cfs)
接收多个 CompletableFuture 带返回值任务,当其中任何一个 CompletableFuture 任务执行完成之后,返回一个新的CompletableFuture 对象。
public class AnyOfExample { public static void main(String[] args) { // 创建两个异步任务,它们返回不同的计算结果 CompletableFuture future1 = CompletableFuture.supplyAsync(() -> { try { // 模拟耗时操作 Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // 任务1的结果 return 2*5; }); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { try { // 模拟耗时操作 Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // 任务2的结果 return 3*5; }); CompletableFuture anyFuture = CompletableFuture.anyOf( future1.thenApply(result -> result), future2.thenApply(result -> result) ); // 等待任意一个任务完成 try { // 阻塞等待任意一个任务完成 Object result = anyFuture.get(); // 注意:这里无法得知是哪个任务完成了,只知道结果 System.out.println("任意一个任务完成,结果是:" + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
执行结果如下
异步回调
到目前为止我们了解了 CompletableFuture 的异步执行的基本使用,但我们还没体会到 CompletableFuture 的异步回调的特性,这就用到了 CompletableFuture 实现的另一个接口 CompletionStage,而 CompletionStage 表示任务执行的一个阶段,每个异步任务都会返回一个新的 CompletionStage 对象,我们可以针对多个 CompletionStage 对象进行串行、并行、聚合的操作,这就相当于异步任务执行后的回调操作。并且我们在之前的代码示例中用到了 thenAccept() ,这个方法的特点就是传入的回调对象是上一个异步任务执行完后自动触发的,不需要通过get() 方法阻塞才拿到结果。这只是其中一种方法,接下来我们就来看看 CompletionStage 中的方法。整个 CompletionStage 提供的方法非常多,如下图所示
这里我们无法挨个进行模拟代码演示,不过我们可以将这些方法按功能进行大致的分类
-
纯消费类型的方法
纯消费类型方法主要是指那些不返回新的计算值,而是依赖上一个异步任务的结果作为当前CompletionStage的参数进行下一步计算的方法。这些方法都包含“Accept”关键字,并且主要关注的是对任务结果的处理,而不是产生新的结果。包含“Accept”关键字又分3类,如下图所示
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); } public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(asyncPool, action); } public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) { return uniAcceptStage(screenExecutor(executor), action); }
我们通过示例来感受一下代码的含义
public class AcceptExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //计算 2*5 return 2*5; }); // 使用thenAccept同步处理结果 future.thenAccept(result -> { System.out.println("同步接收到结果: "+Thread.currentThread().getName()+" " + result); }); future.thenAcceptAsync(result->{ System.out.println("异步接收到结果: "+Thread.currentThread().getName()+" " + result); }); // 等待Future完成,因为thenAccept没有返回值,为了体现打印结果调用get() 方法 future.get(); } }
执行结果如下
在上述代码示例中 thenAccept 与 thenAcceptAsync 方法接收的参数是 Consumer 函数式接口 ,使用上一个任务的执行结果作为参数进行传递用来执行后续的操作。Consumer 表示一个待执行的任务。
我们接着往下看 thenAcceptBoth() 方法
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);
下面是 thenAcceptBoth() 相关的代码示例
public class BothExample { public static void main(String[] args) { // 创建两个异步任务 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() ->{ try { //模拟操作耗时 Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行任务1的线程是 "+Thread.currentThread().getName()); return 2*5; }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() ->{ try { //模拟操作耗时 Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行任务2的线程是 "+Thread.currentThread().getName()); return 3*5; }); try { // 将main线程阻塞 假设这足够等待两个任务完成 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } // 当两个任务都完成时,执行的操作 future1.thenAcceptBoth(future2, (result1, result2) -> { System.out.println("任务1的结果是:" + result1 + ",任务2的结果是:" + result2); }); System.out.println(Thread.currentThread().getName()+" 线程继续执行"); } }
执行结果如下
thenAcceptBoth() 与 thenAcceptBothAsync() 的最主要区别在于执行方式的不同。thenAcceptBoth()是同步执行的,而thenAcceptBothAsync() 是异步执行的,我们将上述代码稍作改动
// 当两个任务都完成时,执行的操作 future1.thenAcceptBothAsync(future2, (result1, result2) -> { System.out.println("任务1的结果是:" + result1 + ",任务2的结果是:" + result2); });
再次执行结果如下
最后我们来看 acceptEither() 相关的方法,acceptEither() 方法与 thenAcceptBoth() 方法差不多。同样接收两个任务,两个任务中谁先完成就先获取谁的返回值,并作为参数传递后续执行。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);
下面是 acceptEither() 的代码示例
public class AcceptEitherExample { public static void main(String[] args) { // 创建两个 CompletableFuture 实例 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { // 模拟操作耗时 Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("执行任务1的线程是 "+Thread.currentThread().getName()); return 2*5; }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { try { // 模拟操作耗时 Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("执行任务2的线程是 "+Thread.currentThread().getName()); return 3*5; }); // 使用 acceptEither 方法 // 当 future1 或 future2 中的任何一个完成时,就会调用 consumer future1.acceptEither(future2, result -> { System.out.println("一个 CompletableFuture 完成了,结果是:" + result); }); // 为了看到输出结果,让主线程等待一会儿 try { // 等待足够长的时间以确保异步操作完成 Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
执行结果如下所示
-
有返回值类型的方法
我们了解了纯消费类型的方法(或者说无返回值的方法)后,那么有返回值类型的方法就很好理解了。无非就是用上一个异步任务执行的结果执行下一步的操作并返回以一个带返回值的 CompletionStage 对象。同样的,有返回值类型的方法也分了3类
我们先来看依赖单个任务完成的方法 thenApply() 相关方法,下面是关于 thenApply() 的代码示例
public class ThenApplyExample { public static void main(String[] args) { // 创建一个 CompletableFuture 实例,它将在某个异步操作中完成 // 使用 thenApply() 方法对异步操作的结果进行处理。这里我们改变一下以往的写法,采用链式调用的形式 CompletableFuture<String> processedFuture = CompletableFuture.supplyAsync(() -> { // 模拟异步操作耗时 try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return 2*5; }).thenApply(result -> { // 这里可以对 result 进行处理 Integer newResult = result+5; return "处理后的数据: " + newResult; }); // 等待处理后的 CompletableFuture 完成,并获取结果 try { // 为了获取输出结果,调用get() 方法 String finalResult = processedFuture.get(); // 输出处理后的结果 System.out.println(finalResult); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
执行结果如下
thenApply() 方法是等待上一个任务执行完成后,将执行结果传递给给函数fn,而函数fn又作为一新的任务执行并返回一个新的带有返回值的 CpmpletionStage。整体上比较简单就不过多描述了。
再来看 thenCombine() 方法
public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);
thenCombine() 方法就跟 thenAcceptBoth() 方法很类似,无非就多了返回值。我们来看一下代码示例
public class ThenCombineExample { public static void main(String[] args) { // 创建第一个 CompletableFuture 实例 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(800); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return 2*5; // 假设这是第一个异步操作的结果 }); // 创建第二个 CompletableFuture 实例 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return 3*5; }); // 使用 thenCombine() 来组合两个 CompletableFuture 的结果,并链式调用 thenAccept 来处理结果 future1.thenCombine(future2, (result1, result2) -> { // 两个结果相加 return result1 + result2; }).thenAccept(sum -> { // 非阻塞地处理组合后的结果 System.out.println("两个任务的最终结果: " + sum); }); // 等待足够的时间让异步操作完成 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果如下
最后我们看一下 applyToEither() 它表示两个任务中任意一个任务完成后都会执行候面的函数式接口 fn。 而 fn 的返回值会作为新任务(CompletionStage)的计算结果
public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync (CompletionStage<? extends T> other, Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync (CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);
下面是 applyToEither() 的使用示例
public class EitherExample { public static void main(String[] args) { // 创建两个异步操作 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { try { // 模拟长时间运行的任务 Thread.sleep(800); } catch (InterruptedException e) { e.printStackTrace(); } return "计算任务1:" +2*5; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { // 模拟长时间运行的任务 Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return "计算任务2:" +3*5; }); // 使用applyToEither等待任意一个完成,并打印结果 future1.applyToEither(future2, result -> { // 当任意一个future完成时,这里将打印结果 System.out.println(result); return result; }); // 这里体现打印结果让主线程等待一段时间。 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
运行结果如下
-
不消费也不返回新值的方法
不消费也不返回新值的方法是指不依赖上一个任务的执行结果,只要上一个任务完成,就执行指定的后续任务,并且也不返回新的结果。这类的方法名中大都包含 Run 关键字。同样的 CompletionStage 也分为3类 ,具体如下图所示
public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor); public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
先来看 thenRun() 方法,对于 thenRun() 方法它只需要上一个阶段的任务执行完成后,就立即执行后续任务,而不关心上一个任务的执行结果,并且后续任务执行也没有返回值。下面是 thenRun() 方法的使用示例
public class ThenRunExample { public static void main(String[] args) { AtomicInteger i = new AtomicInteger(2); // 创建一个异步操作计算 2*5 的值,当 future 执行完成,再原来的基础上继续计算*5 的操作 CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName()+" 异步执行任务"); i.set(i.get() * 5); }).thenRun(() -> { // 使用thenRun在当前CompletableFuture完成时执行另一个任务 System.out.println("thenRun中的任务执行"); i.set(i.get() * 5); });; //为了体现打印效果,使main线程阻塞 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("最后的计算结果是:"+i.get()); } }
执行结果如下图所示
继续看 runAfterBoth() 方法。该方法接收一个 CompletionStage 任务,这个方法要保证两个任务都完成才能执行后续任务
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action, Executor executor);
下面是 runAfterBoth() 方法的使用示例
public class RunAfterBothExample { public static void main(String[] args) { // 创建两个异步操作 CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { try { // 模拟长时间运行的任务 Thread.sleep(800); System.out.println("任务1完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { try { Thread.sleep(500); System.out.println("任务2完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); // 使用runAfterBoth在两个CompletableFuture都完成时执行一个任务 CompletableFuture<Void> bothCompletedFuture = future1.runAfterBoth(future2, () -> { System.out.println("两个任务都完成了"); }); // 这里体现打印结果让主线程等待一段时间。 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果如下
最后我们来看一下 runAfterEither(),它只需要保证两个任务中有一个任务执行完成就可以执行后续操作,该方法使用如下
public class RunAfterEitherExample { public static void main(String[] args) { // 创建两个异步操作 CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { try { // 模拟长时间运行的任务 Thread.sleep(800); System.out.println("任务1完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> { try { Thread.sleep(500); System.out.println("任务2完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); // 使用runAfterEither在一个任务完成就执行后续任务 future1.runAfterEither(future2, () -> { System.out.println("有一个任务完成就执行后续任务"); }); // 这里体现打印结果让主线程等待一段时间。 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果如下
-
组合类型的方法
public <U> CompletionStage<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);
我们先来看一下 thenCompose() 方法的使用示例
public class ThenComposeExample { /** * 获取用户信息的异步方法 */ private static CompletableFuture<User> fetchUserInfoAsync(String userId) { // 这里使用CompletableFuture.supplyAsync来模拟异步操作 return CompletableFuture.supplyAsync(() -> { // 模拟耗时操作 try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // 假设这是从远程服务获取到的用户信息 User user = null; if(userId.equals("123")){ user = new User(userId, "张三"); System.out.println("查询到的用户信息是:"+user.toString()); }else { System.out.println("暂无此用户信息"); } return user; }); } /** * 基于用户信息获取订单信息的异步方法 */ private static CompletableFuture<List<Order>> fetchOrdersAsync(User user) { // 同样使用CompletableFuture.supplyAsync来模拟异步操作 return CompletableFuture.supplyAsync(() -> { // 模拟查询订单耗时操作 try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } if(null != user){ // 根据用户id获取到订单信息 if(user.getUserId().equals("123")){ System.out.println("查询到的订单是:"); return Arrays.asList(new Order("Order1"), new Order("Order2")); } } return new ArrayList<>(); }); } public static void main(String[] args) { //定义一个用户ID String userId = "123"; // 首先异步获取用户信息 CompletableFuture<User> userFuture = fetchUserInfoAsync(userId); // 基于用户信息异步获取订单信息 // 使用thenCompose来链接两个异步操作 CompletableFuture<List<Order>> ordersFuture = userFuture.thenCompose(ThenComposeExample::fetchOrdersAsync); // 等待订单信息获取完成,并打印结果 ordersFuture.thenAccept(orders -> orders.forEach(order -> System.out.println(order.getOrderId()))).join(); } /** * 建议的用户User类 */ static class User { String userId; String name; User(String userId, String name) { this.userId = userId; this.name = name; } String getUserId(){ return userId; } @Override public String toString() { return "用户{userId='"+userId+"',name='"+name+"'}"; } } /** * 简易的订单Order类 */ static class Order { String orderId; Order(String orderId) { this.orderId = orderId; } String getOrderId() { return orderId; } } }
执行结果如下
在上述代码中我们模拟了常见的根据用户信息获取其订单的操作。在这个过程中我们通过 thenCompose() 方法将这两个异步操作链接起来。thenCompose() 是多任务组合方法,它的作用是把两个 CompletionStage 任务进行组合达到串行执行的目的。它与 thenCombine() 方法的区别是 thenCompose() 中的任务执行存在先后顺序(串行),而 thenCombine() 中的任务是并行执行的。
异常处理
在之前我们所有的模拟异步回调方式的示例中都是按照所有任务都能正常执行完成来进行的。那如果当前任务所依赖的上一个任务没有执行成功出现异常的情况该怎么处理呢?CompletionStage 提供了3类异常处理的办法
-
whenComplete 相关的方法
以 whenComplete 开头的方法表示当任务执行完成后会触发的方法,它的特点是无论上一个任务是执行成功或出现异常都会触发特定的 action 主要的方法如下
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
我们还是通过示例来了解 whenComplete()
public class WhenCompleteExample { public static void main(String[] args) { // 创建一个异步操作 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 通过比较随机数来模拟是否发生异常 Double d = Math.random(); if (d > 0.5) { return "获取随机数为: "+d; } else { throw new RuntimeException("获取随机数失败"); } }); // 使用whenComplete来处理异步操作完成后的逻辑 future.whenComplete((result, throwable) -> { if (throwable != null) { // 如果throwable不为null,说明操作失败 System.err.println("前置任务出现异常: " + throwable.getMessage()); } else { // 否则,操作成功,打印结果 System.out.println("前置任务执行成功: " + result); } }); //等待异步操作执行完成 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果如下
在上述代码中我们可以看到,whenComplete() 接收的是一个函数式接口,而函数式接口有两个参数一个是前置任务的执行结果,另一个是前置任务抛出的异常对象,如果出现异常,那么第二个参数(throwable)就不为空。
-
handle 相关的方法
以 handle 开头的方法表示前置任务执行完成后,不管这个任务执行成功还是不成功也会触发后续的操作跟 whenComplete() 的作用一样。不同点在于 handle() 是有返回值的。下面是 handle() 的使用示例
public class HandleExample { public static void main(String[] args) { // 创建一个异步操作 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 通过比较随机数来模拟是否发生异常 Double d = Math.random(); if (d > 0.5) { return "获取随机数为: "+d; } else { throw new RuntimeException("获取随机数失败"); } }); // 使用handle来处理异步操作完成后的逻辑 CompletableFuture<String> handledFuture = future.handle((result, throwable) -> { if (throwable != null) { // 如果throwable不为null,说明操作失败 return "前置任务出现异常: " + throwable.getMessage(); } else { // 否则,操作成功,返回结果 return "前置任务执行成功: " + result; } }); //等待异步操作执行完成 try { String result = handledFuture.get(); System.out.println("最终的结果为:"+result); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果如下
-
exceptionally()
exceptionally() 是一个只针对前一个任务出现异常时的处理方法。定义如下
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
我们来看一下具体的使用示例
public class ExceptionallyExample { public static void main(String[] args) { // 创建一个可能会失败的CompletableFuture CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟一个可能抛出异常的异步操作 Double d = Math.random(); if (d > 0.5) { throw new RuntimeException("获取小于0.5的随机数失败"); } return "获取小于0.5的随机数成功: "+d; }); // 使用exceptionally来处理异常 CompletableFuture<String> futureWithExceptionHandling = future.exceptionally(throwable -> { // 处理异常,这里简单地返回一个错误消息 return "Error: " + throwable.getMessage(); }); // 等待异步操作完成并获取结果 String result = futureWithExceptionHandling.join(); System.out.println(result); } }
执行结果如下
原理分析
在前面我们花了不少的篇幅来讲述了 CompletableFuture 的使用,本质上都是在说 CompletableFuture 是如何实现任务完成后的主动通知功能即异步回调,那它的核心原理是怎样的呢?我们不妨先来梳理下对它的初步猜想,看能不能找到点蛛丝马迹。
猜想1:多个任务间是有串行执行的关系,假设有任务A和任务B,那么任务B相当于任务A执行结束后的异步回调,它们之间可以用链式风格串联。
猜想2:在 CompletableFuture 中肯定要存储任务的执行结果及多个任务节点的链接关系。
为了验证我们的初步猜想我们先来看一下 CompletableFuture 的定义。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
volatile Object result; // Either the result or boxed AltResult
volatile Completion stack;
//。。。。。代码省略。。。。。。
}
在上述代码中我们可以看到 CompletableFuture 的成员变量有两个。 其中 result 表示 CompletableFuture 的返回结果或者是异常的封装对象 AltResult,result 就可以理解为任务的执行结果 。而 stack 表示的是栈,简单的说多个任务的链式关系就是通过栈的形式实现的。 接下来我们需要着重看一下 Completion
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
// Treiber stack 的指向
volatile Completion next;
/**
* Performs completion action if triggered, returning a
* dependent that may need propagation, if one exists.
*
* @param mode SYNC, ASYNC, or NESTED
*/
abstract CompletableFuture<?> tryFire(int mode);
/** Returns true if possibly still triggerable. Used by cleanStack. */
abstract boolean isLive();
public final void run() { tryFire(ASYNC); }
public final boolean exec() { tryFire(ASYNC); return true; }
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
}
由 Completion 对象组成的栈是一个 Treiber Stack 结构( Treiber Stack 是一种基于CAS机制实现的无锁并发栈)进栈和出栈都是从栈顶开始(划重点,这里先初步留个印象后续的原理分析中会用到)。Completion 组成的 Treiber Stack 就构成的了任务间的链式关系,Completion 则表示一个具体的任务,每个任务都会封装成 Completion 压入 Treiber Stack中,而 Completion 中的next则指向的是下一个任务。
Completion 还有三个子类实现,是为了满足 CompletionStage中不同的方法类型。它的类关系图如下
UniCompletion 表示单个 CompletionStage 任务输入的实现
CoCompletion 表示两个 CompletionStage 中第二个任务输入实现
Signaller 是针对不输入和不返回的操作提供的信号器,如get() 、join() 等方法
UniCompletion、CoCompletion、Signaller 又有很多不同的子类实现这里就不一一列举了。 到目前为止,我们基本了解了 CompletableFuture 的大致实现思路,我们先根据一段代码示例为入口来了解CompletableFuture 中是如何基于任务链路构建 Completion 栈结构。假设有这样一段代码
public class CompletionExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("总任务是 f1");
sleep(1);
return "f1";
});
CompletableFuture<String> f2 = f1.thenApply(r -> {
System.out.println("当前任务是f2,上一个任务是:"+r);
sleep(2);
return "f2";
});
CompletableFuture<String> f3 = f2.thenApply(r -> {
System.out.println("当前任务是f3,上一个任务是:"+r);
sleep(2);
return "f3";
});
CompletableFuture<String> f4 = f1.thenApply(r -> {
System.out.println("当前任务是f4,上一个任务是:"+r);
sleep(1);
return "f4";
});
CompletableFuture<String> f5 = f4.thenApply(r -> {
System.out.println("当前任务是f5,上一个任务是:"+r);
sleep(1);
return "f5";
});
CompletableFuture<String> f6 = f5.thenApply(r -> {
System.out.println("当前任务是f6,上一个任务是:"+r);
sleep(1);
return "f6";
});
try {
System.out.println(f1.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
static void sleep(int seconds){
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行结果如下
注:这里的执行结果的顺序受默认线程池 fork join的影响,其任务回调顺序可能会跟理想情况不一致。总的回调顺序为 f1->f4->f5->f6->f2->f3
在上述示例代码以及运行结果中我们可以看出,整个回调过程又可以分为两条线,分别是 f1->f4->f5->f6 与 f1->f2->f3。为了便于理解我们将这个回调过程汇总成图形大致是这样的
结合之前提到过的栈结构,我们发现整个回调链路就是一个栈结构。那么链路中每个节点靠什么来关联的呢?在源码中是这样写的
// thenApply 单个任务 所以这里我们以 UniCompletion 为例
abstract static class UniCompletion<T,V> extends Completion {
Executor executor; // executor to use (null if none)
//指当前任务构建的 CompletableFuture 对象,通过它来连接当前任务后置的回调方法
CompletableFuture<V> dep;
//指向源 CompletableFuture 任务
CompletableFuture<T> src;
UniCompletion(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src) {
this.executor = executor; this.dep = dep; this.src = src;
}
final boolean claim() {
Executor e = executor;
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null)
return true;
executor = null; // disable
e.execute(this);
}
return false;
}
final boolean isLive() { return dep != null; }
}
接下来我们来逐步分析,首先我们分析如下代码
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("总任务是 f1");
sleep(1);
return "f1";
});
CompletableFuture<String> f2 = f1.thenApply(r -> {
System.out.println("当前任务是f2,上一个任务是:"+r);
sleep(2);
return "f2";
});
因为 f1 调用的是 thenApply() 方法只接收一个任务,所以此时 Completion的对象类型是 UniCompletion ,这段代码执行后会创建如下的结构
我们接着往下走,f3 是基于 f2 的一个回调
CompletableFuture<String> f3 = f2.thenApply(r -> {
System.out.println("当前任务是f3,上一个任务是:"+r);
sleep(2);
return "f3";
});
该代码执行后,整个栈结构发生如下的变化
这里解释说明一下,由于每一个任务的回调都会生成一个新的 CompletableFuture ,而这个 CompletableFuture 是在前置的 Completion 中产生,所以在 Completion 中会有一个 dep 属性来指向新的实例。
紧接着下一步,这里我们就不挨个拆分了,原理跟上一步一样,把 f4->f5->f6 压入栈中
CompletableFuture<String> f4 = f1.thenApply(r -> {
System.out.println("当前任务是f4,上一个任务是:"+r);
sleep(1);
return "f4";
});
CompletableFuture<String> f5 = f4.thenApply(r -> {
System.out.println("当前任务是f5,上一个任务是:"+r);
sleep(1);
return "f5";
});
CompletableFuture<String> f6 = f5.thenApply(r -> {
System.out.println("当前任务是f6,上一个任务是:"+r);
sleep(1);
return "f6";
});
此时栈结构又发生了变化
f4 同样是在 f1 的基础上增加的一个回调,根据 Treiber Stack 栈结构的后进先出的原则,把 f4 压入栈顶。同样的 f5 作为 f4 的回调仍旧会生成一个新的 CompletableFuture,dep 属性指向新实例。f6 也一样。f4->f5->f6 就构建起了一条链的关系。
当我们调用 f1.get() 获取 f1 的执行结果后,整个栈结构就会逐层执行,大致执行过程如下
- 从 f1 中取出第一个 Completion 对象,根据后进先出的规则,执行出栈的是 f4。
- 取出 f4 中 dep 属性指向的 CompletableFuture 对象存储的 stack 栈结构,继续执行出栈。依次类推,直到 f6 执行完成,并回到 f1 的位置
- 根据栈中的next 指向,取出下一个 Completion 对象也就是 f2,执行并出栈。此时 f2 中 dep 属性指向一个新的 CompletableFuture 实例,因此继续执行该实例中存储的 stack栈结构中的任务,也就是 f3。
至此,整个栈结构中的所有任务都执行完成,总体来看 CompletableFuture 中的回调任务是基于 Completion 来实现的,并且根据不同的方法类型,Completion 也有对应的不同子类来处理。另外 Completion 用栈结构来存储,并且每个节点又可以产生新的 CompletableFuture 也就是每个节点下的任务又构成了一条链。如果当前出栈的 Completion 中存在一个 CompletableFuture stack 结构的链,那么则会先执行该链路上的任务。
源码分析
在了解了 CompletableFuture 的原理之后,我们正式进入源码分析的环节,由于源码中包含了很多适应不同类型的子类方法我们无法每个方法都涉及到。这里我们从原理分析中的代码示例入手,对主要逻辑进行分析。
1.从创建一个 CompletableFuture 任务开始
我们来看一下当调用 supplyAsync() 方法时,CompletableFuture 是如何创建的
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
//这个地方的 asyncPool 使用的就是默认的ForkJion 线程池
//supplier则是传入的函数式接口
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {
if (f == null) throw new NullPointerException();
//创建一个新的 CompletableFuture,用于存储异步任务的结果
CompletableFuture<U> d = new CompletableFuture<U>();
//通过线程池执行异步任务,并将结果赋值到CompletableFuture中
e.execute(new AsyncSupply<U>(d, f));
return d;
}
在上述代码片段中我们可以看出在 asyncSupplyStage() 方法里面通过默认的 ForkJoinPool.commonPool() 来执行 AsyncSupply()构建的任务。我们接着来看 AsyncSupply()
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
//参数以及构造方法部分就不做过多解释了,比较清晰明了
//dep就是 asyncSupplyStage()方法中的 new CompletableFuture()
//fn 代表要执行的任务
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
//看到这个 run() 方法就很熟悉了,线程池执行任务本质还是线程执行run() 方法
public void run() {
CompletableFuture<T> d; Supplier<T> f;
//赋值并判断是否为空
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//f.get() 获取任务的执行结果
//通过 CAS 操作将结果赋值给CompletableFuture 中的result
d.completeValue(f.get());
} catch (Throwable ex) {
//如果赋值失败则将异常封装成 AltResult 也赋值到 CompletableFuture 中的result
d.completeThrowable(ex);
}
}
//表示任务执行完成,本质会执行 stack栈结构中所有的回调任务,后面我们会详细解读这个方法
d.postComplete();
}
}
}
//completeValue()
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,(t == null) ? NIL : t);
}
//completeThrowable()
final boolean completeThrowable(Throwable x) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,encodeThrowable(x));
}
static AltResult encodeThrowable(Throwable x) {
return new AltResult((x instanceof CompletionException) ? x :new CompletionException(x));
}
创建一个 CompletableFuture 任务我们就分析完了,那后续那些回调任务又是如何加入到 stack 栈结构里的,接下来我们来看栈结构的构建
2. 构建 Completion stack 结构
当我们执行 f1.thenApply()、f2.thenApply()、f4.thenApply() 等方法的时候,会构建一个 Completion stack 栈,把这些任务包装成Completion 压入到栈中。那这是如何做的呢?我们从源码中来找答案
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
//创建一个新的 CompletableFuture 对象
CompletableFuture<V> d = new CompletableFuture<V>();
//d.uniApply(this, f, null) 是判断源CompletableFuture任务是否完成
//如果已经完成,则不需要压入栈中
if (e != null || !d.uniApply(this, f, null)) {
//将回调任务,源任务,新的CompletableFuture包装成一个 UniApply 对象
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
//压入回调方法对应源任务的中stack栈的栈顶
push(c);
//尝试执行任务
c.tryFire(SYNC);
}
return d;
}
我们接着来看 uniApplyStage() 中一些主要方法的具体实现,首先来看 uniApply()
//这个方法的主要做用就是判断源任务是否完成
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
//如果a.result == null 表示源任务未完成,直接返回false
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
//判断result 是否是异常的AltResult,如果是调用 completeThrowable 重新封装成一个新的AltResult
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
// 代码执行到这里,证明 a.result !=null 并且源任务执行成功
try {
//这里的 c 是代表的传入的自定义线程池
//c.claim() 表示回调任务是否需要异步执行
if (c != null && !c.claim())
return false;
//把源CompletableFuture 任务的执行结果s传递给回调任务
@SuppressWarnings("unchecked") S s = (S) r;
//f.apply是一个函数接口,它定义了一个接收一个参数并返回结果的函数
//执行回调任务并将结果赋值当前 CompletableFuture 中的result
//这个 CompletableFuture 是uniApplyStage()方法中的 new CompletableFuture
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
我们再把目光拉回到 uniApplyStage() 中来,if (e != null || !d.uniApply(this, f, null)) 在这个条件中当 !d.uniApply(this, f, null) == true 即 d.uniApply(this, f, null) == false 时条件成立,也就是源任务没有执行完,所以需要把当前回调任务包装成 UniApply 对象压入栈中。我们来看 UniApply()
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
在上述代码中 UniApply 继承自 UniCompletion,而 UniCompletion 又继承自 Completion
static final class UniApply<T,V> extends UniCompletion<T,V> {}
abstract static class UniCompletion<T,V> extends Completion {}
所以将包装的 UniApply类压入栈中就跟我们之前说的 Completion 栈结构对应了起来。我们再来看压入栈的方法 push()
final void push(UniCompletion<?,?> c) {
if (c != null) {
//尝试将 UniApply 也就是参数 c 压入栈中
while (result == null && !tryPushStack(c))
//如果失败 tryPushStack(c) == false ,将c的next指向设为null
lazySetNext(c, null); // clear on failure
}
}
final boolean tryPushStack(Completion c) {
//获取到当前的 stack栈
Completion h = stack;
//将 UniApply 的next 指向栈顶 因为是从栈顶进栈
lazySetNext(c, h);
//通过 CAS 操作把c设置为栈顶,入栈成功
return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
到目前为止,栈结构就构建起来了,那么要如何执行栈结构中的任务呢?
3.执行 Completion stack 结构
在 UniApply() 中的 tryFire() 方法尝试执行任务
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
//dep 为空或者 d.uniApply == false 也就是源任务执行完成直接返回null,注意这里源任务已经赋值于a
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
//如果uniApply方法成功执行,则清除依赖(dep)、源(src)和函数(fn)的引用,以避免内存泄漏。
dep = null; src = null; fn = null;
//d.postFire()的参数mode SYNC、ASYNC、NESTED
//SYNC代表同步执行意味着要等待上一个任务完成,然后才能继续执行后续的代码
//ASYNC 代表异步执行,即它被提交到一个线程池(如ForkJoinPool)中,并在一个单独的线程中执行。
//NESTED 通常与CompletableFuture的嵌套执行或组合有关
return d.postFire(a, mode);
}
我们来看一下 postFire() 方法
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
if (a != null && a.stack != null) {
//判断任务异常或者被取消进行 cleanStack() 遍历栈清理无效的 Completion
if (mode < 0 || a.result == null)
a.cleanStack();
else
//执行当前 CompletableFuture中的对应的 stack栈中存储的 Completion任务
a.postComplete();
}
if (result != null && stack != null) {
if (mode < 0)
return this;
else
postComplete();
}
return null;
}
在上述代码片段中我们可以看到核心方法是 postComplete()。通过调用 postComplete() 方法来执行当前 CompletableFuture中 stack栈的所有任务。详细看一下 postComplete()
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
//stack不为空,不断从stack中出栈
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
//通过 CAS操作从stack中逐个取出任务并重置stack
if (f.casStack(h, t = h.next)) {
//t不为空则还有任务需要处理
if (t != null) {
//如果此时f不等于this(意味着在循环中f已经被改变为其他CompletableFuture),则将当前的h重新压入栈中
if (f != this) {
//这个地方的意思可以大致理解位f1->f4->f2,当f4的子链路执行完成,f2还未执行,那么就把f1指向f2
//f1->f2
pushStack(h);
continue;
}
//f仍然等于this,则证明没有后续任务了
h.next = null;
}
//如果t为空,尝试通过h.tryFire(NESTED)方法触发依赖任务h。
//如果tryFire返回非空值d,则f被更新为d(表示d是下一个需要处理的CompletableFuture)
//如果tryFire返回空,f则保持为this,继续处理当前对象的剩余依赖任务。
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
总体来说 postComplete() 方法用来触发 stack 栈中所有可执行的回调任务,主要就是遍历整个 stack,并通过当前的 UniApply任务的 tryFire() 方法来尝试执行。
总结
本篇我们先通过 Future&Callable 来了解了异步执行,但是它并不完善缺少回调机制。由此我们引出了 CompletableFutrue,并通过一系列的使用示例来体会异步回调机制。再通过原理分析和源码分析明白了它的链式执行逻辑是通过 Treiber Stack 结构来实现的。由于篇幅有限,有些方法并没有在文章中体现。文章中错误之处还请广大小伙伴批评指正,共勉之。