JUC之十一:CompletableFuture用法详解
文章目录
- JUC之十一:CompletableFuture用法详解
- 一、前言
- 二、CompletableFuture类简介
- 三、功能分类
- 3.1、获取结果
- 3.2、依赖关系
- 3.3、and聚合关系
- 3.4、or聚合关系
- 3.5、并行执行
- 3.6、结果处理
- 四、方法详解
- 4.1、结果转换
- 4.2、结果消费
- 4.3、任务组合交互
- 五、使用案例
一、前言
前面介绍了FutureTask
,获取异步执行结果通过get的形式,而且会阻塞主线程,随着开发的越来越越复杂已经无法满足真正开发场景,我们试想一个例子:
我们再炒菜的时候,先洗菜,切菜,炒菜,盛菜,然后吃。。。。。。
如果还用原来的future
来操作的话,需要每一个步骤get出前一个线程的结果之后才能继续执行,导致主线程阻塞,无法去干别的事情,如蒸米饭等,为了解决这一问题提出了CompletableFuture
的异步编程方式。
FutureTask与CompletableFuture的区别:
1、FutureTask获取异步执行的结果通过get实现,会阻塞主线程 ,CompletableFuture执行完一个阶段之后自动通知下一个阶段开始执行,主线程可继续处理其他的事情。
2、FutureTask
没有处理异常的方法,CompletableFuture
可处理异步线程内的异常。
3、CompletableFuture
可对多个任务进行任意的组合、并行、串行使用。
二、CompletableFuture类简介
继承关系如下所示:
继承自Future
接口以及CompletionStage
接口,Future前面介绍FutureTask
的时候已经介绍过。
CompletionStage
接口定义了具体的异步编程的方法,执行某一阶段之后返回一个新的CompletionStage
可继续执行下一阶段,CompletableFuture
默认使用的线程池是ForkJoinPool.commonPool()
,前面介绍过ForkJoinPool
使用的是守护线程,所以注意主线程何时结束避免异步执行失败。
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
三、功能分类
3.1、获取结果
get:
实现自Future接口,获取结果,会阻塞主线程。getTime:
实现自Future接口,在指定时间内线程完成获取结果,若未完成则抛出异常。getNow:
立即获取结果,若任务完成则返回任务值,否则返回指定的值。join:
与get相同作用,区别在于返回的是非检查异常,外部可处理也可不处理,而get必须处理。complete:
任务是否完成,若完成返回true,未完成则直接将任务结果设置为指定的值。
//如果已经完成了返回true,否则返回false,并设置为该值
boolean complete = future.complete(10);
System.out.println(complete);
// 获取结果
Integer get = future.get();
// 获取结果,如果超时直接抛出异常
Integer getTime = future.get(1, TimeUnit.DAYS);
// 直接获取结果,如果任务没完成,则返回指定的值
Integer getNow = future.getNow(20);
// 获取结果,与get区别是返回一个(unchecked)CompletionException异常,可处理也可不处理,而get返回检查异常,需要具体的处理
Integer join = future.join();
3.2、依赖关系
thenApply:
拿着上一阶段的执行结果,执行函数,返回结果。thenCompose:
拿着上一阶段的结果,执行新的任务,并返回第二个任务的结果。thenAccept:
消费函数,拿着上一阶段的结果,执行函数。thenRun:
任务执行完直接执行函数,不关心任务结果,无返回值。
3.3、and聚合关系
-
thenCombine:
合并两个任务,两个任务的结果执行函数,返回函数执行后的结果。 -
thenAcceptBoth:
合并两个任务,两个任务的结果执行函数,无返回值。 -
runAfterBoth:
执行完两个任务之后,执行函数,不关心前两个任务的结果。
3.4、or聚合关系
-
applyToEither:
两个线程任务相比较,先获得执行结果的,执行后续的函数返回结果。 -
acceptEither:
两个线程任务相比较,先获得执行结果的,执行后续的函数,无返回值。 -
runAfterEither:
两个线程任务任何一个执行完,执行函数,不关心任务结果,无返回值。
3.5、并行执行
-
anyOf:
任何一个线程任务执行结束,返回结果。 -
allOf:
等待所有线程执行结束,无返回值。
3.6、结果处理
-
whenComplete:
任务执行结束,对结果以及异常进行处理,返回同一种类型。 -
exceptionally:
任务执行结束,对异常进行处理。 -
handle:
任务执行结束,对结果以及异常进行处理,返回任意类型的结果。
四、方法详解
CompletionStage
接口同一种类型的方法一般是三个
如:
//同步方法
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
//异步方法
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
//异步方法,不使用默认的线程池,指定线程池。
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
后面只做方法介绍,不区分同步以及异步。
4.1、结果转换
thenApply:
获取上一任务的执行结果,执行当前函数,返回函数执行结果。
入参:
Function<? super T,? extends U> fn:
函数的入参是前一个任务的结果, 出参是函数的执行结果。
方法:
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
使用示例:
CompletableFuture<String> thenApply = CompletableFuture
.supplyAsync(() -> 10)
.thenApply(x -> "衬衫的价格是:" + x);
thenCompose:
获取上一任务的执行结果,开启新的 CompletableFuture
进行计算。
方法:
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
使用实例:
CompletableFuture<String> thenCompose = CompletableFuture
.supplyAsync(() -> 10)
.thenCompose(x -> CompletableFuture.supplyAsync(() -> "衬衫的价格是:" + x));
thenApply
和 thenCompose
的区别:
thenApply
转换的是泛型中的类型,返回的是同一个CompletableFuture
;thenCompose
将内部的CompletableFuture
调用展开来并使用上一个CompletableFutre
调用的结果在下一步的CompletableFuture
调用中进行运算,是生成一个新的CompletableFuture
。
thenCombine:
组合两个任务,对两个任务结果进行计算,并返回计算结果。
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
使用实例:
CompletableFuture<String> thenCombine = CompletableFuture
.supplyAsync(() ->10)
.thenCombine(CompletableFuture.supplyAsync(() -> 0.8),
(r1, r2) -> "衬衫打八折的价格是:" + r1 * r2);
4.2、结果消费
thenAccept:
获取上一任务的执行结果,进行消费,无返回值。
方法:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
使用实例:
CompletableFuture<Void> thenAccept = CompletableFuture
.supplyAsync(() -> 10)
.thenAccept(x -> System.out.println("衬衫的价格是" + x));
thenRun:
不考虑上一任务的执行结果,执行Runnable
函数,无返回值。
方法:
public CompletableFuture<Void> thenRun(Runnable action);
使用实例:
CompletableFuture<Void> thenRun = CompletableFuture
.supplyAsync(() -> 10)
.thenRun(() -> System.out.println("执行thenRun"));
thenAcceptBoth:
获取两个任务的执行结果,进行消费。
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
使用实例:
CompletableFuture<Void> thenAcceptBoth = CompletableFuture
.supplyAsync(() -> 10)
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> 0.8), (r1, r2) -> {
System.out.println("衬衫打八折的价格是:" + r1 * r2);
});
runAfterBoth:
执行完两个任务后,执行runnable
函数,不关心前两个任务的执行结果,无返回值。
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
使用实例:
CompletableFuture<Void> runAfterBoth = CompletableFuture
.supplyAsync(() -> 10)
.runAfterBoth(CompletableFuture.supplyAsync(() -> 0.8), () -> {
System.out.println("runAfterBoth正在执行计算函数");
});
4.3、任务组合交互
applyToEither:
两个任务谁先执行结束,就做为后续Function
函数的计算入参。
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other,
Function<? super T, U> fn)
使用实例:
CompletableFuture<Integer> applyToEither = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
})
.applyToEither(CompletableFuture.supplyAsync(() -> {
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
return 20;
}), r -> r);
acceptEither:
两个任务谁先执行结束,就进行后续的消费操作。
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other,
Consumer<? super T> action)
使用实例:
CompletableFuture<Void> acceptEither = CompletableFuture
.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
})
.acceptEither(CompletableFuture.supplyAsync(() -> {
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
return 20;
}), () -> System.out.println("acceptEither执行结束啦"));
runAfterEither:
两个任务任意一个执行结束,执行Runnable
函数。
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
CompletableFuture<Void> runAfterEither = CompletableFuture
.supplyAsync(() -> 10)
.runAfterEither(CompletableFuture.supplyAsync(() -> 20),
() -> System.out.println("runAfterEither执行结束啦"));
anyOf:
任何一个线程任务执行结束,返回结果。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
CompletableFuture<Object> anyOf = CompletableFuture
.anyOf(CompletableFuture.supplyAsync(() -> 100),
CompletableFuture.supplyAsync(() -> "200"));
allOf:
等待所有线程执行结束,无返回值。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
CompletableFuture<Void> allOf = CompletableFuture
.allOf(CompletableFuture.supplyAsync(() -> 100),
CompletableFuture.supplyAsync(() -> "200"));
五、使用案例
实现最优的“烧水泡茶”程序
对于烧水泡茶这个程序,一种最优的分工方案:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。
public class CompletableFutureTest {
public static void main(String[] args) {
//任务1:洗水壶->烧开水
CompletableFuture<Void> f1 = CompletableFuture
.runAsync(() -> {
System.out.println("T1:洗水壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1:烧开水...");
sleep(15, TimeUnit.SECONDS);
});
//任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture<String> f2 = CompletableFuture
.supplyAsync(() -> {
System.out.println("T2:洗茶壶...");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯...");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶叶...");
sleep(1, TimeUnit.SECONDS);
return "龙井";
});
//任务3:任务1和任务2完成后执行:泡茶
CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {
System.out.println("T1:拿到茶叶:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});
//等待任务3执行结果
System.out.println(f3.join());
}
static void sleep(int t, TimeUnit u){
try {
u.sleep(t);
} catch (InterruptedException e) {
}
}
}
参考链接:
https://blog.csdn.net/sermonlizhi/article/details/123356877