CompletableFuture
在Java8中推出,Java8中的异步编程就是依靠此类。
几种任务接口
四种任务 | 无参数 | 有一个参数 | 有两个参数 |
无返回值 | Runnable | Consumer | BiConsumer |
有返回值 | Supplier | Function | BiFunction |
CompletionStage接口
这个类中定义的许多能够链式调用的方法和组合方法时是CompletableFuture能实现异步编程的关键。所有方法的返回值都是CompletionStage类型,所以能够链式调用,例如以下方法:
以thenApply为例,thenApply接受一个Function。Function的参数是<?Super T >类型,也就是T或者T的父类型,而T是调用thenApply的CompletableFuture对象泛型的类型;返回值是<?Extends U>类型,也就是U或者U的子类型,而U恰好是thenApply的返回的CompletionStage对应的泛型类型。
其他函数也是类似。
public interface CompletionStage<T> {
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenRun(Runnable action);
public <U,V> CompletionStage<V> thenCombine
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U> CompletionStage<Void> thenAcceptBoth
(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<U> thenCompose
(Function<? super T, ? extends CompletionStage<U>> fn);
//......
}
使用
测试类
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import org.junit.Test;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class AsyncTest {
private void waitRandomTime(int bound) {
ThreadLocalRandom random = ThreadLocalRandom.current();
try {
TimeUnit.SECONDS.sleep(random.nextInt(bound));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void waitRandomTime5() {
waitRandomTime(5);
}
}
run
适用于执行没有返回值的任务。
@Test
public void testRun() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.runAsync(//Runnable
() -> {
System.out.println(LocalDateTime.now().toString() + ":1开始跑...");
waitRandomTime5();
System.out.println(LocalDateTime.now().toString() + ":1跑完啦...");
}
).thenRunAsync(//Runnable
() -> {
System.out.println(LocalDateTime.now().toString() + ":2开始跑...");
waitRandomTime5();
System.out.println(LocalDateTime.now().toString() + ":2跑完啦...");
}
).whenComplete(
(v, t) -> System.out.println("完成啦")
);
future.get();
}
supply
适用于执行有返回值的任务,可以把上个任务的返回值传到下个任务,适当组合可以实现reduce的效果。
@Test
public void testSupplier() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(//Supplier
() -> {
System.out.println(LocalDateTime.now().toString() + ":1开始跑...");
waitRandomTime5();
System.out.println(LocalDateTime.now().toString() + ":1跑完啦...");
return "hello,world";
}
).thenApply(//Function
(res) -> {
System.out.println("1的结果:" + res);
System.out.println(LocalDateTime.now().toString() + ":2开始跑...");
waitRandomTime5();
System.out.println(LocalDateTime.now().toString() + ":2跑完啦...");
return res + "你好,世界";
}
).thenAcceptAsync(//Consumer
(res)-> {
System.out.println("1,2合并的结果:" + res);
System.out.println(LocalDateTime.now().toString() + ":3开始跑...");
waitRandomTime5();
System.out.println(LocalDateTime.now().toString() + ":3跑完啦...");
}
).whenComplete(
(v, t) -> System.out.println("完成啦")
);
future.get();
}
compose
用来合并嵌套的CompletableFuture。如果在任务中又返回了CompletableFuture类型,则在最外层的返回值中泛型类型会嵌套,使用compose则可以消除这种嵌套。
@Test
public void testCompose() throws ExecutionException, InterruptedException {
//不使用compose,返回值嵌套
CompletableFuture<CompletableFuture<String>> future1 =
CompletableFuture
.supplyAsync(() -> "hello")
.thenApplyAsync(
(s) -> CompletableFuture.supplyAsync(() -> {
waitRandomTime5();
return s + ",world";
})
);
String s1 = future1.get().get();
System.out.println(s1);
//使用compose展开,返回值没有嵌套
CompletableFuture<String> future2 =
CompletableFuture
.supplyAsync(() -> "hello")
.thenComposeAsync(
(s) -> CompletableFuture.supplyAsync(() -> {
waitRandomTime5();
return s + ",world";
})
);
String s2 = future2.get();
System.out.println(s2);
}
combine
用来执行完两个CompletableFuture后再使用自定义的函数执行某些操作。
@Test
public void testCombine() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> num1 = CompletableFuture.supplyAsync(
() -> {
waitRandomTime5();
int n = ThreadLocalRandom.current().nextInt(10);
System.out.println("第一个数:"+n);
return n;
}
);
CompletableFuture<Integer> num2 = CompletableFuture.supplyAsync(
() -> {
waitRandomTime5();
int n = ThreadLocalRandom.current().nextInt(10);
System.out.println("第二个数:"+n);
return n;
}
);
CompletableFuture<Integer> future = num1.thenCombineAsync(num2, (n1, n2) -> n1 * n2);
System.out.println("乘积:"+future.get());
}
allOf
可以组合任意多个CompletableFuture,所有的CompletableFuture执行完成才算完成,是“与”的关系。
它的返回值是CompletableFuture<Void>类型,因为有的可能有返回值,有的可能没有返回值,所以用直接返回Void,想获取返回值用get的方式,如以下例子:
下面的例子模拟多个用户ID调用接口返回用户信息,最终在apply中收集结果到list中。
@Test
public void testAllOf() throws ExecutionException, InterruptedException {
List<String> userIds = new ArrayList<>();
userIds.add("zhangsan");
userIds.add("lisi");
userIds.add("wangwu");
List<CompletableFuture<User>> userInfos = userIds.stream().map((id) -> getUserInfo(id)).collect(Collectors.toList());
List<User> users = CompletableFuture
.allOf(userInfos.toArray(new CompletableFuture[userInfos.size()]))
.thenApplyAsync(
//收集结果
(v) -> userInfos.stream().map((future) -> {
try {
return future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}).collect(Collectors.toList())
).get();
users.forEach(System.out::println);
}
private CompletableFuture<User> getUserInfo(String userId) {
//模拟掉接口,根据用户id返回用户信息
return CompletableFuture.supplyAsync(
() -> {
waitRandomTime5();
User user = new User();
user.setId(userId);
user.setAge(ThreadLocalRandom.current().nextInt(20,30));
user.setMoney(BigDecimal.valueOf(ThreadLocalRandom.current().nextDouble(50, 100)));
return user;
}
);
}
@Getter
@Setter
@EqualsAndHashCode
static class User {
private String id;
private int age;
private BigDecimal money;
@Override
public String toString() {
return "[id="+id+",age="+age+",money="+money.setScale(2,BigDecimal.ROUND_HALF_UP).toString()+"]";
}
}
anyOf
也可以组合任意多个CompletableFuture,不过任意一个CompletableFuture执行完成就算完成,其余的结果会被丢弃,是“或”的关系。
返回值类型是CompletableFuture<Object>,因为最终只有一个任务真正执行完成,所以返回值只有一个,用Object可以适配所有返回值类型。
@Test
public void testAnyOf() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() ->
{
waitRandomTime5();
return 1;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() ->
{
waitRandomTime5();
return 2;
});
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() ->
{
waitRandomTime5();
return 3;
});
CompletableFuture<Object> f = CompletableFuture.anyOf(future1, future2, future3);
System.out.println("最终结果:"+f.get());
}
原理
任务适配
CompletableFuture的执行交给了ForkJoinPool(一般并行度一定大于1,否则forkjoin的价值就体现不出来了)。因为ForkJoinPool只能执行ForkJoinTask类型的任务,这就涉及到任务转换问题,CompletableFuture内部定义了一些适配器接口用来做任务的转换。
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
下图为CompletableFuture内部定义的一些适配器类
在supplierAsync(..)函数内部,会把一个Supplier 转换成一个AsyncSupply,然后提交给ForkJoinPool执行;
在runAsync(..)函数内部,会把一个Runnable转换成一个AsyncRun,然后提交给ForkJoinPool执行;
在thenRun/thenAccept/thenApply 内部,会分别把Runnable/Consumer/Function 转换成UniRun/UniAccept/UniApply对象,然后提交给ForkJoinPool执行;
在whenComplete内部,会将BiConsumer转换为UniWhenComplete,然后提交给ForkJoinPool执行;
除此之外,还有两种CompletableFuture 组合的情况(例如combine),分为“与”和“或”,所以有对应的Bi和Or类型的Completion类型。
链式调用
以下列代码为例,分析一下链式调用过程
@Test
public void testLink() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> "hello")
.thenApplyAsync((s) -> s + ",world")
.thenAcceptAsync((s) -> System.out.println(s))
.thenRunAsync(() -> System.out.println("完成"));
future.get();
}
Completion
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next;
/**
* 如果触发,则执行 完成操作,返回可能需要传播的依赖(如果存在)
*
* @param 模式 同步、异步或嵌套
*/
abstract CompletableFuture<?> tryFire(int mode);
abstract boolean isLive();
public final void run() { tryFire(ASYNC); }
public final boolean exec() { tryFire(ASYNC); return true; }
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
}
Completion是一个Treiber Stack,Treiber Stack是一个无锁的栈,由R.Kent Treiber在其于1986年发表的论文Systems Programming:Coping with Para llelism 中首次提出。它是一个单向链表,出栈、入栈都在链表头部,所以只需要一个head指针,而不需要tail指针。
- 执行supplyAsync
第1步的关键是构造了一个AsyncSupply对象,该对象有几个关键点:
(1)继承自ForkJoinTask,能够提交给ForkJoinPool来执行。
(2)封装了Supplier f
(3)封装了该任务的返回值,即变量d。
该任务的输入就是Supply,输出结果存放在新建的CompletableFuture的result字段中。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
//结果放在result中
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
- 执行thenApplyAsync
第2步必须等第1步执行完成后再执行。此处构建了一个UniApply对象,这个任务放入了第1个任务的栈当中。
每一个CompletableFuture都有一个Treiber Stack(stack字段,栈顶指针),存放着它后续依赖的任务。
在第1步的AsyncSupply类中run方法里调用了CompletableFuture的postComplete方法,此方法里会弹出本步骤压入的UniApply对象执行。
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
final void push(UniCompletion<?,?> c) {
if (c != null) {
// 压入Treiber Stack,如果有返回值则不压栈
while (result == null && !tryPushStack(c))
lazySetNext(c, null); // 失败进行清理,直到成功为止
}
}
static final class UniApply<T,V> extends UniCompletion<T,V> {
Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep,
CompletableFuture<T> src,
Function<? super T,? extends V> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<V> tryFire(int mode) {
CompletableFuture<V> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
//弹出栈
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null;
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
- 执行thenAcceptAsync
第三步必须等第二步执行完成后再执行。此处构建了一个UniAccept对象,这个任务放入了第2个任务的栈当中。当第2步的任务执行完成时,从栈中弹出UniAccept对象并执行。
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(asyncPool, action);
}
private CompletableFuture<Void> uniAcceptStage(Executor e,
Consumer<? super T> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.uniAccept(this, f, null)) {
UniAccept<T> c = new UniAccept<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
static final class UniAccept<T> extends UniCompletion<T,Void> {
Consumer<? super T> fn;
UniAccept(Executor executor, CompletableFuture<Void> dep,
CompletableFuture<T> src, Consumer<? super T> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniAccept(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
- 执行thenRunAsync
第4步和第3步的过程类似,构建了一个UniRun 对象,这个对象被压入第3步的CompletableFuture的栈中。第3步的任务执行完成时,从栈中弹出UniRun对象并执行。
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
if (e != null || !d.uniRun(this, f, null)) {
UniRun<T> c = new UniRun<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
static final class UniRun<T> extends UniCompletion<T,Void> {
Runnable fn;
UniRun(Executor executor, CompletableFuture<Void> dep,
CompletableFuture<T> src, Runnable fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<Void> tryFire(int mode) {
CompletableFuture<Void> d; CompletableFuture<T> a;
if ((d = dep) == null ||
!d.uniRun(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
不带Async后缀的方法
除了supply,其他方法都有不带Async后缀的同名方法,例如thenApplyAsync和thenApply,他们的区别如下:
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
看来都是调用的同一个方法,只不过参数不一样,区别在于if (e != null || !d.uniApply(this, f, null))这一句:
thenApplyAsync 判断e!=null固定为true,进入if分支执行;
thenApply会调用d.uniApply(this, f, null),如果前一个任务还没执行完,则也会进入if分支,如果前一个任务执行完了,result有值,则不会入栈,直接返回。
其他类似方法同理。
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
final <S> boolean uniApply(CompletableFuture<S> a,
Function<? super S,? extends T> f,
UniApply<S,T> c) {
Object r; Throwable x;
//之前的任务还没执行完
if (a == null || (r = a.result) == null || f == null)
return false;
tryComplete: if (result == null) {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
try {
if (c != null && !c.claim())
return false;
@SuppressWarnings("unchecked") S s = (S) r;
completeValue(f.apply(s));
} catch (Throwable ex) {
completeThrowable(ex);
}
}
return true;
}
网状任务
任务执行时有可能不是单单链表式的调用,还可能形成一个网络结构,用图表示。例如下面的有向无环图:
执行过程
任务1执行完成之后,任务2、任务3可以并行,伪代码:
future1.thenApply(任务2),future1.thenApply(任务3);
任务4在任务2执行完成时可开始执行;
任务5要等待任务2、任务3都执行完成,才能开始,是and关系;
任务6在任务3执行完成时可以开始执行;
对于任务7,只要任务4、5、6中任意一个任务结束,就可以开始执行,是or的关系。
总之,任务依赖之间是多对多的关系:1个任务可以有n个依赖它的后继任务;1个任务也可以有n个它依赖的前驱任务。
(1)在每个任务的返回值里面,存储了依赖它的接下来要执行的任务。任务1的CompletableFuture的栈中存储了任务2、任务3;任务2的CompletableFuutre中存储了任务4、任务5;任务3的CompletableFuture中存储了任务5、任务6。也就是说,每个任务的CompletableFuture对象的栈里面,存储了该节点的 出边对应的任务集合。
(2)任务2、任务3的CompletableFuture里面,都存储了任务5,任务5会被触发2次,但它会判断任务2、任务3的结果是不是都完成,如果只完成其中一个,它就不会执行。
(3)任务7存在于任务4、任务5、任务6的CompletableFuture的栈里面,因此会被触发三次。但它只会执行一次,只要其中1个任务执行完成,就可以执行任务7。
(4)因为有and和or 两种不同的关系,因此对应BiApply和OrApply两个类,这两个对象的构造函数几乎一样,只是在内部执行的时候,一个是and的逻辑,一个是or的逻辑。
(5)BiApply和OrApply都是二元操作符,只能传入二个被依赖的任务。但是任何一个多元操作,都能被转换为多个二元操作的叠加。
上面的任务7同时依赖于任务4、任务5、任务6,就可以采用下图的转换方法,and转换类似。