文章目录
- Pre
- 概述
- Future
- Future的缺陷
- 类继承关系
- 功能概述
- API
- 提交任务的相关API
- 结果转换的相关API
- thenApply
- handle
- thenRun
- thenAccept
- thenAcceptBoth
- thenCombine
- thenCompose
- 回调方法的相关API
- 异常处理的相关API
- 获取结果的相关API
- DEMO
- 实战
- 注意事项

Pre
每日一博 - Java 异步编程的 Promise 模式 CompletableFuture的前世今生 (上)
Java8 - 使用CompletableFuture 构建异步应用
概述
常见的线程创建方式有两种,一是直接继承Thread,另一种是实现Runnable接口。但这两种方式有个缺点,不支持获取线程执行结果。
所以在JDK1.5之后,提供了Callable和Future,可以在任务执行后获取执行结果。
Future
Future类位于java.util.concurrent包下,从下面的源码可以看出,Future主要提供了三种能力:
- 关闭执行中的任务
- 判断任务是否执行完成
- 获取任务执行的结果
package java.util.concurrent;
public interface Future<V> {
// 取消执行中的任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否被取消成功
boolean isCancelled();
// 判断任务是否执行完成
boolean isDone();
// 获取任务执行结果
V get() throws InterruptedException, ExecutionException;
// 在规定时间内获取任务执行结果,若规定时间任务还没执行完,则返回null,而非抛异常
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
Future的缺陷
Future通过isDone()判断任务是否执行完成,get()获取任务执行的结果,解决了创建线程异步执行任务不能获取执行结果的问题。
在任务异步执行中,主线程在等待过程中可以做其他事,但其本身也存在一定的局限性
- 并行执行多任务获取结果主线程长时间阻塞:当需要将多个模块的任务异步执行时,使用for循环遍历任务列表,通过isDone()轮询判断任务是否执行完成,通过get()方法获取任务执行结果,且结果获取依赖任务执行顺序。但因Future的get()方法是主线程阻塞等待获取执行结果,所以在结果返回前,主线程不能处理其他任务,长时间阻塞,可能会产生block,在使用时考虑用超时时间的get()方法。
- 不能链式执行任务:如上述场景,希望在获取商品基础信息后执行获取优惠信息任务,Future没有提供这种能力,不能实现链式调用。
- 不能将多个任务执行的结果组合:在上述场景中,希望获取商详所需的各个模块信息后,组合成调用方需要的结果,但Future不支持。
- 不能处理异常:Future没有异常处理的能力。
综上所述,阻塞主线程获取结果的方式与异步编程的初衷相违背,轮询判断任务是否执行完成会耗费不必要的CPU资源,为优化上述问题,在JDK1.8时引入了CompletableFuture实现类,提供异步链式编程的能力。
类继承关系
CompletableFuture
扩展了Future接口,实现了 CompletionStage
,提供了函数式编程的能力,通过回调的方式处理计算结果,并提供了转换和组合CompletableFuture
的方法,从而简化异步编程的复杂性
CompletableFuture对象是JDK1.8版本新引入的类,这个类实现了两个接口,
- 一个是Future接口
- 一个是CompletionStage接口
CompletionStage
接口是JDK1.8版本提供的接口,用于异步执行中的阶段处理,CompletionStage
定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对结果进行转换产生新的结果等,一般来说要执行下一个阶段都需要上一个阶段正常完成,这个类也提供了对异常结果的处理接口。
功能概述
CompletableFuture
是一个实现了 Java 8 中 Completable
接口的类,它代表一个异步计算的结果。 它提供了异步编程的能力,可以让开发者在编写代码时更加方便地处理异步操作。
CompletableFuture具有以下主要特征:
-
异步编程能力
可以通过supplyAsync、runAsync
等方法异步执行任务,不会阻塞当前线程。 -
组合式编程
支持thenApply、thenAccept、thenCompose
等方法将多个CompletableFuture
进行组合,实现复杂的异步流水线计算。 -
异常处理
可以通过whenComplete、exceptionally
等方法设置异常处理,方便异步任务链的异常传播与处理。 -
结果获取
可以通过join()
等待获取结果,也可以通过回调注册方式获取结果。 -
取消任务
可以通过cancel()
取消正在执行的CompletableFuture任务。 -
依赖管理
可以通过thenCompose
等方法明确定义任务间依赖关系。
综上,CompletableFuture为Java异步编程提供了强大支持,可以帮助构建高效、可靠的异步应用程序,是Java 8非常重要的新特性之一。它极大地简化并丰富了Java的异步编程模型。
API
提交任务的相关API
CompletableFuture提供了四种创建异步对象的方法
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- supplyAsync提交的任务有返回值
- runAsync提交的任务没有返回值
两个接口都有一个重载的方法,第二个入参为指定的线程池,如果不指定,则默认使用ForkJoinPool.commonPool()
线程池。在使用的过程中尽量根据不同的业务来指定不同的线程池,方便对不同线程池进行监控,同时避免业务共用线程池相互影响。
结果转换的相关API
thenApply
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
入参是Function,意思是将上一个CompletableFuture执行结果作为入参,再次进行转换或者计算,重新返回一个新的值。
handle
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
入参是BiFunction,该函数式接口有两个入参一个返回值,意思是处理上一个CompletableFuture的处理结果,同时如果有异常,需要手动处理异常。
thenRun
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
入参是Runnable函数式接口,该接口无需入参和出参,这一组函数是在上一个CompletableFuture任务执行完成后,在执行另外一个接口,不需要上一个任务的结果,也不需要返回值,只需要在上一个任务执行完成后执行即可。
thenAccept
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
入参是Consumer,该函数式接口有一个入参,没有返回值,所以这一组接口的意思是处理上一个CompletableFuture的处理结果,但是不返回结果。
thenAcceptBoth
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
入参包括CompletionStage以及BiConsumer,
-
CompletionStage是JDK1.8新增的接口,在JDK中只有一个实现类:CompletableFuture,所以第一个入参就是CompletableFuture,这一组函数是用来接受两个CompletableFuture的返回值,并将其组合到一起。
-
BiConsumer这个函数式接口有两个入参,并且没有返回值,BiConsumer的第一个入参就是调用方CompletableFuture的执行结果,第二个入参就是thenAcceptBoth接口入参的CompletableFuture的执行结果。
所以这一组函数意思是将两个CompletableFuture执行结果合并到一起。
thenCombine
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
和thenAcceptBoth类似,入参都包含一个CompletionStage,也就是CompletableFuture对象,意思也是组合两个CompletableFuture的执行结果,不同的是thenCombine的第二个入参为BiFunction,该函数式接口有两个入参,同时有一个返回值。所以与thenAcceptBoth不同的是,thenCombine将两个任务结果合并后会返回一个全新的值作为出参。
thenCompose
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
意思是将调用方的执行结果作为Function函数的入参,同时返回一个新的CompletableFuture对象。
回调方法的相关API
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
意思是当上一个CompletableFuture对象任务执行完成后执行该方法。BiConsumer函数式接口有两个入参没有返回值,这两个入参第一个是CompletableFuture任务的执行结果,第二个是异常信息。表示处理上一个任务的结果,如果有异常,则需要手动处理异常,与handle方法的区别在于,handle方法的BiFunction是有返回值的,而BiConsumer是没有返回值的。
以上方法都有一个带有Async的方法,带有Async的方法表示是异步执行的,会将该任务放到线程池中执行,同时该方法会有一个重载的方法,最后一个参数为Executor,表示异步执行可以指定线程池执行。为了方便进行控制,最好在使用CompletableFuture时手动指定我们的线程池。
异常处理的相关API
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
是用来处理异常的,当任务抛出异常后,可以通过exceptionally来进行处理,也可以选择使用handle来进行处理,不过两者有些不同,hand是用来处理上一个任务的结果,如果有异常情况,就处理异常。而exceptionally可以放在CompletableFuture处理的最后,作为兜底逻辑来处理未知异常。
获取结果的相关API
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf是需要入参中所有的CompletableFuture任务执行完成,才会进行下一步;
anyOf是入参中任何一个CompletableFuture任务执行完成都可以执行下一步。
public T get() throws InterruptedException, ExecutionException
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
public T getNow(T valueIfAbsent)
public T join()
get方法一个是不带超时时间的,一个是带有超时时间的。
getNow方法则是立即返回结果,如果还没有结果,则返回默认值,也就是该方法的入参。
join方法是不带超时时间的等待任务完成。
DEMO
通过CompletableFuture
来实现一个多线程处理异步任务的例子。
创建10个任务提交到我们指定的线程池中执行,并等待这10个任务全部执行完毕。
每个任务的执行流程为第一次先执行加法,第二次执行乘法,如果发生异常则返回默认值,当10个任务执行完成后依次打印每个任务的结果。
public void demo() throws InterruptedException, ExecutionException, TimeoutException {
// 1、自定义线程池
ExecutorService executorService = new ThreadPoolExecutor(5, 10,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100));
// 2、集合保存future对象
List<CompletableFuture<Integer>> futures = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
CompletableFuture<Integer> future = CompletableFuture
// 提交任务到指定线程池
.supplyAsync(() -> this.addValue(finalI), executorService)
// 第一个任务执行结果在此处进行处理
.thenApplyAsync(k -> this.plusValue(finalI, k), executorService)
// 任务执行异常时处理异常并返回默认值
.exceptionally(e -> this.defaultValue(finalI, e));
// future对象添加到集合中
futures.add(future);
}
// 3、等待所有任务执行完成,此处最好加超时时间
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(5, TimeUnit.MINUTES);
for (CompletableFuture<Integer> future : futures) {
Integer num = future.get();
System.out.println("任务执行结果为:" + num);
}
System.out.println("任务全部执行完成!");
}
private Integer addValue(Integer index) {
System.out.println("第" + index + "个任务第一次执行");
if (index == 3) {
int value = index / 0;
}
return index + 3;
}
private Integer plusValue(Integer index, Integer num) {
System.out.println("第" + index + "个任务第二次执行,上次执行结果:" + num);
return num * 10;
}
private Integer defaultValue(Integer index, Throwable e) {
System.out.println("第" + index + "个任务执行异常!" + e.getMessage());
e.printStackTrace();
return 10;
}
实战
举个例子:
假设详页有以下几个模块,每个模块需要如下时间才能完成:
获取商品基础信息 0.5s
获取优惠信息 1s
获取门店信息 1s
获取验机评估报告信息 1s
获取标品参数信息 1s
组装返回商品信息 0.5s
串行执行每个模块,那么一共需要5s才能返回给调用方,如果接口产生超时等,会比5s还要长,显然是不能接受的
如果有多个线程并行完成各个模块,可能2s内就能返回信息。
可以将获取商品详情页的步骤分为三步,分别为:
- 获取商品基础信息、商品验机评估报告信息、商品标品参数信息、门店信息;
- 获取商品优惠信息,需要等1结束;
- 将上述信息RPC结果组装返回,需要等1,2结束。
因为多任务异步并行执行,最终耗时将取决于耗时最长的链路。如下图所示
代码示例:
ExecutorService testThreadPool = Executors.newFixedThreadPool(10);
ResultDTO resultDTO = new ResultDTO();
//基础信息
CompletableFuture<Void> productBaseInfoFuture = CompletableFuture.runAsync(() -> {
BaseInfoDTO baseInfoDTO = rpcxx;
resultDTO.setBaseInfoDTO(baseInfoDTO);
}, testThreadPool);
//优惠信息
CompletableFuture<Void> couponInfoFuture = productBaseInfoFuture.thenAcceptAsync(() -> {
CouponInfoDTO couponInfoDTO = rpcxx;
resultDTO.setCouponInfoDTO(couponInfoDTO);
}, testThreadPool);
//验机评估报告信息
CompletableFuture<Void> qcInfoFuture = CompletableFuture.runAsync(() -> {
QcInfoDTO qcInfoDTO = rpcxx;
resultDTO.setQcInfoDTO(qcInfoDTO);
}, testThreadPool);
//门店信息
CompletableFuture<Void> storeInfoFuture = CompletableFuture.runAsync(() -> {
StoreInfoDTO storeInfoDTO = rpcxx;
resultDTO.setStoreInfoDTO(storeInfoDTO);
}, testThreadPool);
//标品参数信息
CompletableFuture<Void> spuInfoFuture = CompletableFuture.runAsync(() -> {
SpuInfoDTO spuInfoDTO = rpcxx;
resultDTO.setSpuInfoDTO(spuInfoDTO);
}, testThreadPool);
//组装结果
CompletableFuture<Void> allQuery = CompletableFuture.allOf(couponInfoFuture, qcInfoFuture, storeInfoFuture, spuInfoFuture);
CompletableFuture<Void> buildFuture = allQuery.thenAcceptAsync((result) -> {
//组装逻辑
return null;
}).join();
以上即为获取门店商品详情页异步编排的实现逻辑,但也发现,该方案创建多个异步任务,执行逻辑不一样但流程大致相同,类似的代码重复写多遍,不便于扩展和阅读。
在此基础上可以优化为使用CompletableFuture+简单工厂+策略模式,将上述步骤中的每个模块都作为策略handler,且策略之间有权重依赖关系,模块类型作为工厂类型,将模块类型放进列表中,使用CompletableFuture.allOf()异步执行列表中的任务。
伪代码如下:
List<String> eventList = Arrays.asList("xx", "xxx");
CompletableFuture.allOf(eventList.stream().map(event ->
CompletableFuture.runAsync(() -> {
//通过工厂类型获取策略实现handler
if (Objects.nonNull(handler)) {
//如果存在则执行
}
}, testThreadPool)).toArray(CompletableFuture[]::new)).join();
注意事项
- 当有多个任务可以异步并行执行时,使用CompletableFuture,任务越多效果越明显;
- 使用CompletableFuture可以将多个任务串联执行,也可以利用组合方式将任务排列由列表变成树结构;
- 在使用集合接收多线程处理任务的结果时,需要考虑线程安全问题;
- 当任务执行有相互依赖关系时,需考虑任务超时主动结束,避免系统block。