CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。
一、概述
1.CompletableFuture和Future的区别?
CompletableFuture和Future出现的原因是继承Thread或者实现Runnable接口的异步线程没有返回值,需要返回值的异步线程可以通过CompletableFuture和Future来创建。
CompletableFuture和Future都可以获取到异步线程的返回值,但是Future只能通过get()方法阻塞式获取,CompletableFuture由于实现了CompletionStage接口,可以通过丰富的异步回调方式来执行后续的操作,同时还能对多个任务按照先后顺序进行任务编排。
2.特性
CompletableFuture的作用主要体现在:(1)异步回调;(2)任务编排;
3.使用场景
- 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度
- 使用CompletableFuture类,它提供了异常管理的机制,让你有机会抛出、管理异步任务执行种发生的异常
- 如果这些异步任务之间相互独立,或者他们之间的的某一些的结果是另一些的输入,你可以讲这些异步任务构造或合并成一个
二、原理
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
Future接口主要提供get()和join()方法,可以获取任务的返回值,同时也会阻塞调用线程。
CompletionStage接口提供了丰富的执行异步调用和任务处理的接口,任务之间可以分为存在时序关系,包括串行关系、并行关系和汇聚关系等。
三、实践
1.创建任务和获取结果
(1)runAsync创建任务
通过runAsync()方法创建的异步任务没有返回值,其中有有runAsync(Runnable runnable)和runAsync(Runnable runnable, Executor executor)两个重载方法,后者比前者多一个Executor executor,即可以传入自定义的线程池,如果没传即用默认线程池(ForkJoinPool.commonPool())。
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<Void> f1 = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1 start");
}
}, executor);
(2)supplyAsync创建任务
通过supplyAsync()方法创建的异步任务有返回值,其中有有supplyAsync(Runnable runnable)和supplyAsync(Runnable runnable, Executor executor)两个重载方法,后者比前者多一个Executor executor,即可以传入自定义的线程池,如果没传即用默认线程池(ForkJoinPool.commonPool())。
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println("f2 start");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "f2 return";
}
}, executor);
总结:
runAsync创建的异步任务没有返回值,supplyAsync创建的异步任务有返回值。
(3)获取结果和结束任务
// 如果完成则返回结果,否则就抛出具体的异常
public T get()
// 最大时间等待返回结果,否则就抛出具体异常
public T get(long timeout, TimeUnit unit)
// 如果完成则返回结果值(或抛出任何遇到的异常),否则返回默认值。
public T getNow(T valueIfAbsent)
// 完成时返回结果值,否则抛出unchecked异常。为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture所涉及的计算引发异常,则此方法将引发unchecked异常并将底层异常作为其原因
public T join()
//如果任务还未完成,直接给他返回值置为value
public boolean complete(T value)
2.异步回调
(4)whenComplete和whenCompleteAsync
whenComplete()方法是当某个任务执行完成后执行的回调方法。该方法会将该任务的执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和是该任务的返回值,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。该任务抛出的异常,一般用exceptionally()处理,其入参是异常Throwable throwable,可以有返回值。
whenComplete和whenCompleteAsync和区别在于whenComplete是在当前线程中执行该回调任务,whenCompleteAsync是会另启动一个线程来执行该回调任务,默认情况下是使用ForkJoinPool.commonPool()线程池中的线程,也可以设置自定义线程池。后面以-Async为后缀的方法也都是这样的区别。
CompletableFuture<String> f7 = f2.whenCompleteAsync((result, e) -> {
System.out.println("f7 start");
if (1 == 1) {
throw new IllegalStateException("f7 IllegalStateException");
}
}
).exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
System.out.println("f8 exception: " + throwable);
return "f8 return";
}
});
(5)handle和handleAsync
handle方法也是当某个任务执行完成后执行的回调方法,整体功能和whenComplete方法差不多,但是handleAsync方法会有返回值,handleAsync()可以传入自定义线程池。
CompletableFuture<String> f5 = f2.handleAsync(new BiFunction<String, Throwable, String>() {
@Override
public String apply(String s, Throwable throwable) {
System.out.println("f5 start");
if (1 == 1) {
throw new IllegalStateException("f5 IllegalStateException");
}
return "f5 return";
}
}, executor).exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
System.out.println("f6 exception: " + throwable);
return "f6 return";
}
});
总结:
whenComplete和handle的区别?
whenComplete的回调方法没有返回值,handle方法有返回值?
(6)thenApply和thenApplyAsync
thenApply 表示某个任务执行完成后执行回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,带有返回值。其中
CompletableFuture<String> f3 = f2.thenApplyAsync(new Function<String, String>() {
@Override
public String apply(String s) {
System.out.println(s + ",f3 start");
return "f3 return";
}
}, executor);
(7)thenAccept和thenAcceptAsync
thenAccep表示某个任务执行完成后执行的回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,无返回值。
CompletableFuture<Void> f3 = f2.thenAcceptAsync(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println("f3 start");
}
}, executor);
(8)thenRun和thenRunAsync
thenRun表示某个任务执行完成后执行的动作,即回调方法,无入参,无返回值。
CompletableFuture<Void> f4 = f2.thenRunAsync(new Runnable() {
@Override
public void run() {
System.out.println("f4 start");
}
}, executor);
异步回调方法总结:
方法 | 入参 | 返回值 | 异常处理 |
---|---|---|---|
whenComplete | 有入参 | 无返回值 | 能抛出异常 |
handle | 有入参 | 有返回值 | 能抛出异常 |
thenApply | 有入参 | 有返回值 | 不能抛出异常 |
thenAccept | 有入参 | 无返回值 | 不能抛出异常 |
thenRun | 无入参 | 无返回值 | 不能抛出异常 |
3.任务编排
(9)thenCombine、thenAcceptBoth 和runAfterBoth
这三个方法都是将两个CompletableFuture组合起来处理,只有两个任务都正常完成时,才进行下阶段任务。可以理解为为:两个任务AND汇聚后才能执行。
区别:thenCombine会将两个任务的执行结果作为所提供函数的参数,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println("f1 running");
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "f1 return";
}
}, executor);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println("f2 running");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "f2 return";
}
}, executor);
CompletableFuture<String> f30 = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
System.out.println("f30 running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "f30 return";
}
}, executor);
CompletableFuture<String> f3 = f1.thenCombine(f2, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s + "," + s2 + "," + "f3 running");
return "f3 ruturn";
}
});
CompletableFuture<Void> f4 = f1.thenAcceptBoth(f2, new BiConsumer<String, String>() {
@Override
public void accept(String s, String s2) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s + "," + s2 + "," + "f4 running");
}
});
(10)applyToEither、acceptEither和runAfterEither
这三个方法和上面一样也是将两个CompletableFuture组合起来处理,当有一个任务正常完成时,就会进行下阶段任务。可以理解为:两个任务OR汇聚后才能执行。
区别:applyToEither会将已经完成任务的执行结果作为所提供函数的参数,且该方法有返回值;acceptEither同样将已经完成任务的执行结果作为方法入参,但是无返回值;runAfterEither没有入参,也没有返回值。
CompletableFuture<String> f6 = f1.applyToEither(f2, new Function<String, String>() {
@Override
public String apply(String s) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s + "," + "f6 running");
return "return f6";
}
});
CompletableFuture<Void> f7 = f1.acceptEither(f2, new Consumer<String>() {
@Override
public void accept(String s) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(s + "," + "f7 running");
}
});
CompletableFuture<Void> f8 = f1.runAfterEither(f2, new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f7 running");
}
});
(11)allOf / anyOf
allOf和anyOf都是CompletableFuture的方法,他们针对的都是多任务汇聚后才能执行的逻辑,可以理解为多任务AND/OR汇聚后模式。
allOf:CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
anyOf :CompletableFuture是多个任务只要有一个任务执行完成,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回执行完成任务的结果。
CompletableFuture<Void> f9 = CompletableFuture.allOf(f1, f2, f30);
CompletableFuture<Object> f10 = CompletableFuture.anyOf(f1, f2, f30);
任务编排方法总结:
类型 | 方法 | 入参 | 返回值 | 描述 |
---|---|---|---|---|
两个任务AND汇聚 | thenCombine | 有入参 | 有返回值 | |
两个任务AND汇聚 | thenAcceptBoth | 有入数 | 无返回值 | |
两个任务AND汇聚 | runAfterBoth | 无入参 | 无返回值 | |
两个任务OR汇聚 | applyToEither | 有入参 | 有返回值 | |
两个任务OR汇聚 | acceptEither | 有入参 | 无返回值 | |
两个任务OR汇聚 | runAfterEither | 无入参 | 无返回值 | |
多任务AND汇聚后模式 | allOf | 全部执行 | ||
多任务OR汇聚后模式 | anyOf | 至少一个执行 |
总结以上方法:
- 以Async结尾的方法,都是异步方法,对应的没有Async则是同步方法,一般都是一个异步方法对应一个同步方法;以Async后缀结尾的方法,都有两个重载的方法,一个是使用内容的forkjoin线程池,一种是使用自定义线程池;
- 以Apply开头或者结尾的方法,入口有参数,有返回值;
- 以supply开头的方法,入口也是没有参数的,但是有返回值;
- 以Accept开头或者结尾的方法,入口参数是有参数,但是没有返回值;
- 以run开头的方法,其入口参数一定是无参的,并且没有返回值,类似于执行Runnable方法。
- 和异步方法相比,任务编排方法多数带有-Both或-Either的后缀,-Both表示需要执行全部任务,-Either表示至少执行一个任务。
4.代码实现
参考资料
CompletableFuture使用详解(全网看这一篇就行):https://blog.csdn.net/zsx_xiaoxin/article/details/123898171 (主要参考)
CompletableFuture使用大全,简单易懂:https://juejin.cn/post/6844904195162636295
CompletableFuture用法详解:https://zhuanlan.zhihu.com/p/344431341
并发编程系列-CompletableFuture:https://zhuanlan.zhihu.com/p/650700731
Java CompletableFuture实现多线程异步编排:https://juejin.cn/post/7140244126679138312
使用CompletableFuture:https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
并发编程 - CompletableFuture 解析 | 京东物流技术团队:https://zhuanlan.zhihu.com/p/646472720
本文由博客一文多发平台 OpenWrite 发布!