前言
Java CompletableFuture 提供了一种异步编程的方式,可以在一个线程中执行长时间的任务,而不会堵塞主线程。
和Future相比,CompletableFuture不仅实现了Future接口,也实现了 CompletionStage接口。Future接口不用多说,CompletionStage接口将多个CompletionStage执行顺序依赖给抽象了出来。
有了CompletableFuture接口,就能将多个异步事件的结果进行执行顺序编排。
使用
可数操作
一般使用 CompletableFuture的场景是有一个 a 操作,一个 b操作,还有一个 c 操作依赖 a、b两个操作的返回结果。可以直接使用 allOf()接受一长串的入参,也可以使用thenCombine()针对两个操作的特定情况。
public static void main(String[] argv) {
CompletableFuture<String> c1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(second * 20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "1";
});
CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(second * 20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "2";
});
CompletableFuture c9 = CompletableFuture.allOf(c1, c2);
c9.thenApply(v -> {
try {
c1.get();
c2.get();
System.out.println("Everything is all right");
} catch(Exception e) {
e.printStackTrace();
} finally {
System.out.println("Something error");
}
return v;
});
c9.join();
}
可变操作
当想要处理的 CompletableFuture 是可变的,比如说根据数据库查出的数据每个都需要执行一个 CompletableFuture 操作,也就是 n 个 CompletableFuture。
CompletableFuture<Void> allFuture = CompletableFuture
.allOf(completableFutureList.toArray(new CompletableFuture[0]));
CompletableFuture<List<T>> result = allFuture.thenApply(v ->
completableFutureList.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
List<T> tList = result.get(50, TimeUnit.SECONDS);
源码实现
CompletableFuture 类成员变量
CompletableFuture中有一个 volatile 关键词修饰的成员变量,result
,CompletableFuture.get()
函数中的返回的就是这个变量。它会先检查result变量是否为null,不为null则直接返回,为null则会根据是否可中断进行一个while循环等。
根据使用get()
或者 get(long timeout, TimeUnit unit)
函数的不同,最终等待result结果的函数也不同。get(long timeout, TimeUnit unit)
函数会是用 timedGet(long nanos) 函数进行等待。
/**
* Waits if necessary for this future to complete, and then
* returns its result.
*
* @return the result value
* @throws CancellationException if this future was cancelled
* @throws ExecutionException if this future completed exceptionally
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
除了代表结果的 result
之外,还有一个 Completion 类 的变量 stack
。从断点执行和代码的注解上看,这个stack
代表者从属当前CompletableFuture的操作。当前CompletableFuture操作执行完毕后(result里有结果),会引动其他Completion进行处理。
/*
* A CompletableFuture may have dependent completion actions,
* collected in a linked stack. It atomically completes by CASing
* a result field, and then pops off and runs those actions. This
* applies across normal vs exceptional outcomes, sync vs async
* actions, binary triggers, and various forms of completions.
*
/*
可以通过截图看出 在Idea 内存中有和没有 Completion stack的CompletableFuture相比,有比没有多了 1 dependents的标记
Completion stack里没有东西的CompletableFuture | Completion stack里有东西的CompletableFuture |
---|---|
CompletableFuture 多个操作组织结构
CompletableFuture类能够通过 CompletableFuture.allOf()
或者 CompletableFuture.anyOf()
将多个CompletableFuture 对象组合在一起,等到满足条件时,再触发之后操作的执行。
以allOf
方法为例,CompletableFuture.allOf(CompletableFuture<?>... cfs)
方法会整合作为入参的所有CompletableFuture,等到他们呢所有的都完成之后,才返回结果。
/* ------------- Arbitrary-arity constructions -------------- */
/**
* Returns a new CompletableFuture that is completed when all of
* the given CompletableFutures complete. If any of the given
* CompletableFutures complete exceptionally, then the returned
* CompletableFuture also does so, with a CompletionException
* holding this exception as its cause. Otherwise, the results,
* if any, of the given CompletableFutures are not reflected in
* the returned CompletableFuture, but may be obtained by
* inspecting them individually. If no CompletableFutures are
* provided, returns a CompletableFuture completed with the value
* {@code null}.
*
* <p>Among the applications of this method is to await completion
* of a set of independent CompletableFutures before continuing a
* program, as in: {@code CompletableFuture.allOf(c1, c2,
* c3).join();}.
*
* @param cfs the CompletableFutures
* @return a new CompletableFuture that is completed when all of the
* given CompletableFutures complete
* @throws NullPointerException if the array or any of its elements are
* {@code null}
*/
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
/** Recursively constructs a tree of completions. */
static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
int lo, int hi) {
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (lo > hi) // empty
d.result = NIL;
else {
CompletableFuture<?> a, b;
int mid = (lo + hi) >>> 1;
if ((a = (lo == mid ? cfs[lo] :
andTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
andTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
if (!d.biRelay(a, b)) {
BiRelay<?,?> c = new BiRelay<>(d, a, b);
a.bipush(b, c);
c.tryFire(SYNC);
}
}
return d;
}
/** Pushes completion to this and b unless both done. */
final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
if (c != null) {
Object r;
while ((r = result) == null && !tryPushStack(c))
lazySetNext(c, null); // clear on failure
if (b != null && b != this && b.result == null) {
Completion q = (r != null) ? c : new CoCompletion(c);
while (b.result == null && !b.tryPushStack(q))
lazySetNext(q, null); // clear on failure
}
}
}
/** Returns true if successfully pushed c onto stack. */
final boolean tryPushStack(Completion c) {
Completion h = stack;
lazySetNext(c, h);
return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
Object r, s; Throwable x;
if (a == null || (r = a.result) == null ||
b == null || (s = b.result) == null)
return false;
if (result == null) {
if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
completeThrowable(x, r);
else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
completeThrowable(x, s);
else
completeNull();
}
return true;
}
从源码上看是,是将整个CompletableFuture数组通过andTree()
方法划分成了一颗二叉树,这个二叉树的叶子节点是传入的CompletableFuture对象,非叶子节点代表了它的子节点CompletableFuture的完成情况。
然后检测根节点的CompletableFuture的两个子节点是否完成。
cfs1、cfs2、cfs3、cfs4 是allOf
的入参,四个CompletableFuture对象。
代码中通过a.bipush(b, c)
将 a、b串在一起。因为涉及到UNSAFE方法,不知道方法具体执行了什么操作。所以只能通过IDEA里内存里实际的值,去由结果推过程。
a.bipush(b,c)
前,内存各个变量实际值。
a.bipush(b,c)
后,内存各个变量实际值。
tryPushStack(Completion c) 方法前
tryPushStack(Completion c)
方法后 可以看到内存中 变量b 对应的内存地址为 75bd9247的 stack
被赋值了成为了Completion c。
tryFire(int mode)
方法执行前
可以看到 cfs 除了 cfs1 之外,其他的 cfs 中的 stack都被赋值了。通过观察IDEA中内存中对象实际值,可以发现stack中 的 src 是 自己的树上的兄弟节点, snd 是自己。
CompletableFuture 多个操作执行顺序控制
CompletableFuture 一个节点要开始执行的前提是他的子节点全部执行完毕之后,才能触发自己节点上的操作。
当调用CompletableFuture 异步执行方法 supplyAsync
会传递一个 Supplier
对象作为入参。这个Supplier
会被封装成为 一个Runnable
子类 AsyncSupply
对象,作为其抽象方法 run
中 执行的一部分。
CompletableFuture<String> c2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(second * 20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "2";
});
---------------------------------------------------------------------------------
/**
* Returns a new CompletableFuture that is asynchronously completed
* by a task running in the {@link ForkJoinPool#commonPool()} with
* the value obtained by calling the given Supplier.
*
* @param supplier a function returning the value to be used
* to complete the returned CompletableFuture
* @param <U> the function's return type
* @return the new CompletableFuture
*/
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
-----------------------------------------------------------------------------------
public void run() {
// fn 就是 CompletableFuture.supplyAsync 传入的 Supplier
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
// 将 Supplier 处理结果赋值给 CompletableFuture 的 result
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
// Pops and tries to trigger all reachable dependents. Call only when known to be done.
d.postComplete();
}
}
从源码中可以看到,当执行了CompletableFuture.supplyAsync()
他的通知机制封装在实现Runnable
抽象方法run
里。当你传入的Supplier
有结果返回之后,会调用 CompletableFuture 中的 postComplete()
方法,通知 stack
中其他可达的 从属 Completion
,让他们各自完成自己的 action。
/**
* Pops and tries to trigger all reachable dependents. Call only
* when known to be done.
*/
final void postComplete() {
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
// 将 下一个需要执行的 Completion 弹出来后 执行 tryFire
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
// uniApply 对封装的 Supplier 进行执行
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
// 这里实际执行 CompletableFuture 的 Supplier
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
从Idea里的 栈帧中可以看出来,是由 CompletableFuture 1 执行完后的 postComplete 引发了接下来的CompletableFuture