线程系列3-关于 CompletableFuture
- 1、从 Future 接口说起
- 2、CompletableFuture 对 Future 的改进
- 2.1、CompletionStage 接口类
- 2.2、runAsync 和 supplyAsync 创建子任务
- 2.3、 whenComplete 和 exceptionally 异步任务回调钩子
- 2.4、调用 handle() 方法统一处理异常和结果
- 2.5、异步任务的串行执行
- 2.5.1、thenApply() 方法
- 2.5.2、thenRun() 方法
- 2.5.3、thenAccept() 方法
- 2.5.4、thenCompose() 方法
- 2.6、异步任务的合并执行
- 2.6.1、thenCombine() 方法
- 2.6.2、runAfterBoth() 方法
- 2.6.3、thenAcceptBoth() 方法
- 2.6.4、allOf() 方法等待所有的任务结束
- 2.7、异步任务的选择执行
- 2.7.1、applyToEither() 方法
- 2.7.2、runAfterEither() 方法
- 2.7.3、acceptEither() 方法
- 2.8、CompletableFuture 的基础方法
1、从 Future 接口说起
Future 接口(常用实现类:FutureTask
)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。Future 接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。其有三个特点:多线程、有返回、异步任务,总结就是异步多线程任务执行且返回有结果。
所有已知实现类:
CompletableFuture
, CountedCompleter, ForkJoinTask, FutureTask
, RecursiveAction, RecursiveTask, SwingWorker
接口方法:
方法 | 返回值 | 描述 |
---|---|---|
cancel(boolean mayInterruptIfRunning) | boolean | 试图取消此任务的执行 |
get() | V | 如果计算完成则获取其结果,若计算未完成则阻塞直到获取其结果 |
get(long timeout, TimeUnit unit) | V | 等待固定时长,如果在这个时长内程序还是没有运行完成,就会出现 juc.TimeOutException 异常。如果完成则返回其结果 |
isCancelled() | boolean | 如果这个任务完成之前取消则返回 true |
isDone() | boolean | 如果完成这个任务则返回true |
FutureTask类图:
代码示例 1-1 :
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class FutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(4);
FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
log.info("hello ,this is futureTask1 call().");
TimeUnit.SECONDS.sleep(3);
return "futureTask1 call result";
});
pool.submit(futureTask1);
// 这里会阻塞
String futureTaskResult1= futureTask1.get();
log.info("print the futureTaskResult1 = {}", futureTaskResult1);
FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
log.info("hello ,this is futureTask2 call().");
// 这里睡眠 10 秒
TimeUnit.SECONDS.sleep(10);
return "futureTask2 call result";
});
pool.submit(futureTask2);
String futureTaskResult2 = null;
try {
// 这里只等待两秒,如果两秒后futureTask2还没执行出结果,那就抛出异常
futureTaskResult2= futureTask2.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.error("futureTaskResult2 获取失败,设置的2秒等待超时,过期不等候");
e.printStackTrace();
}
log.info("print the futureTaskResult2 = {}", futureTaskResult1);
FutureTask<String> futureTask3 = new FutureTask<String>(() -> {
log.info("hello ,this is futureTask3 call().");
// 这里睡眠 10 秒
TimeUnit.SECONDS.sleep(10);
return "futureTask3 call result";
});
pool.submit(futureTask3);
while (true) {
// 使用 isDone() 方法轮询的模式,巨耗费CPU
if (futureTask3.isDone()) {
String futureTaskResult3 = futureTask3.get();
log.info("print the futureTaskResult3 = {}", futureTaskResult3);
break;
} else {
try {
// 每隔1秒 进行轮询
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.error("futureTask3 处理中,不要催,越催越慢");
}
}
log.info("主线程的逻辑结束");
pool.shutdown();
}
}
打印结果 1-1 :
22:42:16.626 [pool-1-thread-1] INFO com.FutureDemo - hello ,this is futureTask1 call().
22:42:19.645 [main] INFO com.FutureDemo - print the futureTaskResult1 = futureTask1 call result
22:42:19.651 [pool-1-thread-2] INFO com.FutureDemo - hello ,this is futureTask2 call().
22:42:21.660 [main] ERROR com.FutureDemo - futureTaskResult2 获取失败,设置的2秒等待超时,过期不等候
22:42:21.662 [main] INFO com.FutureDemo - print the futureTaskResult2 = futureTask1 call result
22:42:21.664 [pool-1-thread-3] INFO com.FutureDemo - hello ,this is futureTask3 call().
java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204)
at com.demo.future.FutureDemo.main(FutureDemo.java:30)
22:42:22.676 [main] ERROR com.FutureDemo - futureTask3 正在处理中,不要催,越催越慢
22:42:23.677 [main] ERROR com.FutureDemo - futureTask3 正在处理中,不要催,越催越慢
22:42:24.679 [main] ERROR com.FutureDemo - futureTask3 正在处理中,不要催,越催越慢
22:42:25.690 [main] ERROR com.FutureDemo - futureTask3 正在处理中,不要催,越催越慢
22:42:26.705 [main] ERROR com.FutureDemo - futureTask3 正在处理中,不要催,越催越慢
22:42:27.712 [main] ERROR com.FutureDemo - futureTask3 正在处理中,不要催,越催越慢
22:42:28.722 [main] ERROR com.FutureDemo - futureTask3 正在处理中,不要催,越催越慢
22:42:29.736 [main] ERROR com.FutureDemo - futureTask3 正在处理中,不要催,越催越慢
22:42:30.744 [main] ERROR com.FutureDemo - futureTask3 正在处理中,不要催,越催越慢
22:42:31.760 [main] ERROR com.FutureDemo - futureTask3 正在处理中,不要催,越催越慢
22:42:31.761 [main] INFO com.FutureDemo - print the futureTaskResult3 = futureTask3 call result
22:42:31.761 [main] INFO com.FutureDemo - 主线程的逻辑结束
Process finished with exit code 0
2、CompletableFuture 对 Future 的改进
上面 FutureTask 处理任务后,获取结果时有些弊端也不够优雅,而且其提供的API对复杂场景,无法提供很好的支持,所以引出了 CompletableFuture,以声明式的方式优雅的处理各种复杂的需求。
CompletableFuture 是JDK1.8 设计出来的,其提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。CompletableFuture 是 Future 的功能增强版,减少阻塞和轮询,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture 优点:
- 异步任务结束时,会自动回调某个对象的方法。
- 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。
- 异步任务出错时,会自动回调某个对象的方法。
CompletableFuture 类图:
2.1、CompletionStage 接口类
CompletableFuture 的功能之所以强大,很大一方面原因是实现了CompletionStage 接口,所以我们需要了解CompletionStage 接口类相关的一些方法。CompletionStage 代表某个同步或者异步计算的一个阶段,或者一系列异步任务中的一个子任务(或者阶段性任务)。每个CompletionStage 子任务所包装的可以是一个 Function、Consumer 或者 Runnable 函数式接口实例。
用表格的形式对比说明,这三个常用的函数式接口的特点:
函数式接口 | 入参 | 出参 |
---|---|---|
Function | 有 | 有 |
Consumer | 有 | 无 |
Runnable | 无 | 无 |
2.2、runAsync 和 supplyAsync 创建子任务
java.util.concurrent.CompletableFuture<T>
中的 runAsync
方法和 supplyAsync
方法。
// 子任务包装一个Runnable实例,并调用ForkJoinPool.commonPool()线程池来执行
// 创建的子任务无返回值CompletableFuture<Void>,默认线程值为ForkJoinPool.commonPool()
public static CompletableFuture<Void> runAsync(Runnable runnable){
return asyncRunStage(ASYNC_POOL, runnable);
};
// 子任务包装一个Runnable实例,并调用指定的executor线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){
return asyncRunStage(screenExecutor(executor), runnable);
};
// 子任务包装一个Supplier实例,并调用ForkJoinPool.commonPool()线程池来执行
// 创建的子任务有返回值CompletableFuture<U>,默认线程值为ForkJoinPool.commonPool()
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(ASYNC_POOL, supplier);
}
// 子任务包装一个Supplier实例,并使用指定的executor线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
代码示例 2-2-1 :
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* @author pzf
* @ClassName CompletableFutureDemo
* @description: TODO
* @date 2023-07-16
*/
@Slf4j
public class CompletableFutureDemo {
public static void main(String[] args) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
log.info("{} -> 执行future1", Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ExecutorService pool = Executors.newFixedThreadPool(3);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
log.info("{} -> 执行future2", Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, pool);
try {
log.info("future2.get() -> {}", future2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
log.info("{} -> 执行future3", Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "this future3 result";
});
try {
log.info("future3.get() -> {}", future3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
log.info("{} -> 执行future4", Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "this future4 result";
}, pool);
try {
log.info("future4.get() -> {}", future4.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
pool.shutdown();
}
}
打印结果 2-2-1 :
21:07:20.411 [ForkJoinPool.commonPool-worker-3] INFO com.CompletableFutureDemo - ForkJoinPool.commonPool-worker-3 -> 执行future1
21:07:20.411 [pool-1-thread-1] INFO com.CompletableFutureDemo - pool-1-thread-1 -> 执行future2
21:07:22.424 [main] INFO com.CompletableFutureDemo - future2.get() -> null
21:07:22.426 [ForkJoinPool.commonPool-worker-3] INFO com.CompletableFutureDemo - ForkJoinPool.commonPool-worker-3 -> 执行future3
21:07:24.437 [main] INFO com.CompletableFutureDemo - future3.get() -> this future3 result
21:07:24.438 [pool-1-thread-2] INFO com.CompletableFutureDemo - pool-1-thread-2 -> 执行future4
21:07:26.447 [main] INFO com.CompletableFutureDemo - future4.get() -> this future4 result
2.3、 whenComplete 和 exceptionally 异步任务回调钩子
当异步任务计算结果完成或者抛出异常的时候,可执行 whenComplete 和 exceptionally 等这些特定的回调钩子,避免了使用 get() 方法可能出现阻塞的情况。
java.util.concurrent.CompletableFuture<T>
中的回调钩子方法
// 设置异步任务完成时的回调钩子
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
// 设置异步任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(defaultExecutor(), action);
}
// 设置异步任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
// 设置异常处理的回调钩子
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
需要注意的点:
调用 cancel()方法取消 CompletableFuture时,任务被视为异常完成,completeExceptionally()方法所设置的异常回调钩子也会被执行。
如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:
(1)在调用get()和get(long,TimeUnit)方法启动任务时,如果遇到内部异常,get()方法就会抛出ExecutionException(执行异常)。
(2)在调用join()和getNow(T)启动任务时(大多数情况下都是如此),如果遇到内部异常,join()和getNow(T)方法就会抛出CompletionException。
代码示例 2-3-1 :
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@Slf4j
public class WhenCompleteDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
log.info("{} --> future 第一阶段 coming", Thread.currentThread().getName());
// 生成随机数
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("{} --> future 第一阶段 result = {}",
Thread.currentThread().getName(), result);
return result;
// 子任务完成时的回调钩子, v是第一阶段的结果result, e是第一阶段的异常
}).whenComplete((v, e) -> {
// 第一阶段没有异常
if (e == null) {
log.info("{} --> 进入回调钩子, 其中第一阶段的结果v = {}",
Thread.currentThread().getName(), v);
}
// exceptionally -> error 回调 不管是第一阶段还是回调钩子,出现异常都会触发该回调
}).exceptionally(e -> {
log.error("{} --> 异常情况 --> {}", Thread.currentThread().getName(), e);
return null;
});
log.info("{} --> 主线程执行逻辑", Thread.currentThread().getName());
Integer result = future.get();
log.info("{} --> result = {}", Thread.currentThread().getName(), result);
}
}
打印结果 2-3-1 :
21:29:33.380 [ForkJoinPool.commonPool-worker-3] INFO com.WhenCompleteDemo - ForkJoinPool.commonPool-worker-3 --> future 第一阶段 coming
21:29:33.380 [main] INFO com.WhenCompleteDemo - main --> 主线程执行逻辑
21:29:35.399 [ForkJoinPool.commonPool-worker-3] INFO com.WhenCompleteDemo - ForkJoinPool.commonPool-worker-3 --> future 第一阶段 result = 5
21:29:35.399 [ForkJoinPool.commonPool-worker-3] INFO com.WhenCompleteDemo - ForkJoinPool.commonPool-worker-3 --> 进入回调钩子, 其中第一阶段的结果v = 5
21:29:35.399 [main] INFO com.WhenCompleteDemo - main --> result = 5
代码示例 2-3-2 :
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class WhenCompleteDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
log.info("{} --> future 第一阶段 coming", Thread.currentThread().getName());
// 生成随机数
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer.valueOf("happen error");
log.info("{} --> future 第一阶段 result = {}",
Thread.currentThread().getName(), result);
return result;
// 第一阶段结束后 进入回调钩子, v是第一阶段的结果result, e是第一阶段的异常
}, pool).whenComplete((v, e) -> {
// 第一阶段没有异常
if (e == null) {
log.info("{} --> 进入回调钩子, 其中第一阶段的结果v = {}",
Thread.currentThread().getName(), v);
} else {
log.info("{} --> 进入回调钩子, 出现异常 = {}",
Thread.currentThread().getName(), e.getMessage());
}
}).exceptionally(e -> {
log.error("{} --> 异常情况 --> message = {}", Thread.currentThread().getName(),
e.getMessage());
return null;
});
log.info("{} --> 主线程执行逻辑", Thread.currentThread().getName());
Integer result = future.get();
log.info("{} --> result = {}", Thread.currentThread().getName(), result);
pool.shutdown();
}
}
打印结果 2-3-2 :
21:42:14.189 [pool-1-thread-1] INFO com.WhenCompleteDemo - pool-1-thread-1 --> future 第一阶段 coming
21:42:14.189 [main] INFO com.WhenCompleteDemo - main --> 主线程执行逻辑
21:42:16.208 [pool-1-thread-1] INFO com.WhenCompleteDemo - pool-1-thread-1 --> 进入回调钩子, 出现异常 = java.lang.NumberFormatException: For input string: "happen error"
21:42:16.210 [pool-1-thread-1] ERROR com.WhenCompleteDemo - pool-1-thread-1 --> 异常情况 --> message = java.lang.NumberFormatException: For input string: "happen error"
21:42:16.210 [main] INFO com.WhenCompleteDemo - main --> result = null
Process finished with exit code 0
2.4、调用 handle() 方法统一处理异常和结果
除了通过whenComplete、exceptionally 分别设置完成钩子、异常钩子之外,还可以调用handle()方法统一处理结果和异常。计算结果存在依赖关系,使两个线程串行化。由于存在依赖关系,当前步骤异常,就不走下一阶段,直接叫停。
java.util.concurrent.CompletableFuture<T>
中的 handle
方法
// 在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(null, fn);
}
// 可能不在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(defaultExecutor(), fn);
}
// 在指定线程池executor中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
return uniHandleStage(screenExecutor(executor), fn);
}
示例代码 2-4-1 :
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class HandleDemo {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
log.info("hello, 进入到 future1 子任务一阶段执行");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
log.info("hi, future1 子任务一阶段执行完成,result = {}", result);
return result;
}, pool);
future1.handle((v, e) -> {
log.info("hello, 进入到 future1 子任务 handle, v = {}", v);
if (e == null) {
v = v * 2;
log.info("hi, 进入到 future1 子任务 handle 无异常,v = {}", v);
} else {
log.info("hi, 进入到 future1 子任务 handle 有异常,v = {}", v);
}
return v;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
log.info("hello, 进入到 future2 子任务一阶段执行");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
int count = 10/0;
log.info("hi, future2 子任务一阶段执行完成,result = {}", result);
return result;
}, pool).handleAsync((v, e) -> {
log.info("hello, 进入到 future2 子任务 handle, v = {}", v);
if (e == null) {
v = v * 2;
log.info("hi, 进入到 future2 子任务 handle 无异常,v = {}", v);
} else {
log.info("hi, 进入到 future2 子任务 handle 有异常,v = {}", v);
}
return v;
});
log.info("执行主线程任务完成");
pool.shutdown();
}
}
打印结果 2-4-1 :
14:52:12.610 [main] INFO com.HandleDemo - 执行主线程任务完成
14:52:12.610 [pool-1-thread-2] INFO com.HandleDemo - hello, 进入到 future2 子任务一阶段执行
14:52:12.610 [pool-1-thread-1] INFO com.HandleDemo - hello, 进入到 future1 子任务一阶段执行
14:52:15.618 [pool-1-thread-1] INFO com.HandleDemo - hi, future1 子任务一阶段执行完成,result = 8
14:52:15.623 [ForkJoinPool.commonPool-worker-3] INFO com.HandleDemo - hello, 进入到 future2 子任务 handle, v = null
14:52:15.625 [pool-1-thread-1] INFO com.HandleDemo - hello, 进入到 future1 子任务 handle, v = 8
14:52:15.625 [ForkJoinPool.commonPool-worker-3] INFO com.HandleDemo - hi, 进入到 future2 子任务 handle 有异常,v = null
14:52:15.626 [pool-1-thread-1] INFO com.HandleDemo - hi, 进入到 future1 子任务 handle 无异常,v = 16
Process finished with exit code 0
2.5、异步任务的串行执行
如果两个异步任务需要串行(一个任务依赖另一个任务)执行,可以通过 CompletionStage 接口的 thenApply()
、thenAccept()
、thenRun()
和 thenCompose()
四个方法来实现。
方法 | 入参 | 出参 |
---|---|---|
thenApply() | 上一个任务所返回结果 | 有 |
thenRun() | 无 | 无 |
thenAccept() | 上一个任务所返回结果 | 无 |
thenCompose() | 上一个任务所返回结果 | CompletionStage异步实例 |
2.5.1、thenApply() 方法
计算结果存在依赖关系,使两个线程串行化。由于存在依赖关系,当前步骤异常,就不走下一阶段,直接叫停。
java.util.concurrent.CompletableFuture<T>
中的 thenApply
方法
/**
* 后一个任务与前一个任务在同一个线程中执行
* 泛型参数 T:上一个任务所返回结果的类型。
* 泛型参数 U:当前任务的返回值类型。
**/
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
/**
* 后一个任务与前一个任务不在同一个线程中执行
* 泛型参数 T:上一个任务所返回结果的类型。
* 泛型参数 U:当前任务的返回值类型。
**/
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(defaultExecutor(), fn);
}
/**
* 后一个任务在指定的executor线程池中执行
* 泛型参数 T:上一个任务所返回结果的类型。
* 泛型参数 U:当前任务的返回值类型。
**/
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor) {
return uniApplyStage(screenExecutor(executor), fn);
}
代码示例 2-5-1-1 :
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class ThenApplyDemo {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
log.info("hello, 进入到 future 任务第一阶段执行");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
log.info("hi, future 任务第一阶段执行完成,result = {}", result);
return result;
}, pool);
future.thenApply((firstValue) -> {
log.info("hello, 进入到 future 任务第二阶段执行 firstValue = {}", firstValue);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10) + firstValue;
log.info("hi, future 任务第二阶段执行完成,result = {}", result);
return "future=" +result;
}).handle((v, e) -> {
if (e == null) {
log.info("hi, 进入到 future 任务 handle 无异常,v = {}", v);
} else {
log.info("hi, 进入到 future 任务 handle 有异常,v = {}", v);
}
return v;
});
log.info("主线程执行逻辑");
pool.shutdown();
}
}
执行结果 2-5-1-1 :
15:31:20.879 [main] INFO com.ThenApplyDemo - 主线程执行逻辑
15:31:20.879 [pool-1-thread-1] INFO com.ThenApplyDemo - hello, 进入到 future 任务第一阶段执行
15:31:23.891 [pool-1-thread-1] INFO com.ThenApplyDemo - hi, future 任务第一阶段执行完成,result = 7
15:31:23.894 [pool-1-thread-1] INFO com.ThenApplyDemo - hello, 进入到 future 任务第二阶段执行 firstValue = 7
15:31:26.896 [pool-1-thread-1] INFO com.ThenApplyDemo - hi, future 任务第二阶段执行完成,result = 11
15:31:26.897 [pool-1-thread-1] INFO com.ThenApplyDemo - hi, 进入到 future 任务 handle 无异常,v = future=11
Process finished with exit code 0
2.5.2、thenRun() 方法
thenRun()方法与thenApply()方法不同的是,不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。thenRun()既不能接收参数又不支持返回值。异步任务A执行完执行B,并且B不需要A的结果。
java.util.concurrent.CompletableFuture<T>
中的 thenRun
方法
// 后一个任务与前一个任务在同一个线程中执行
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
// 后一个任务与前一个任务不在同一个线程中执行
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(defaultExecutor(), action);
}
// 后一个任务在executor线程池中执行
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
2.5.3、thenAccept() 方法
调用thenAccept()方法时,后一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出。接收任务的处理结果,并消费处理,无返回结果。
java.util.concurrent.CompletableFuture 中的 thenAccept 方法
// 后一个任务与前一个任务在同一个线程中执行
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
return uniAcceptStage(null, action);
}
// 后一个任务与前一个任务不在同一个线程中执行
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
return uniAcceptStage(defaultExecutor(), action);
}
// 后一个任务在指定的executor线程池中执行
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor) {
return uniAcceptStage(screenExecutor(executor), action);
}
2.5.4、thenCompose() 方法
thenCompose() 方法,要求第二个任务的返回值是一个 CompletionStage 异步实例。因此,可以调用 CompletableFuture.supplyAsync() 方法将第二个任务所要调用的普通异步方法包装成一个 CompletionStage 异步实例。通过该实例,还可以进行下一轮CompletionStage任务的调度和执行,比如可以持续进行CompletionStage链式(或者流式)调用。(个人感觉这个方法用的比较少)
java.util.concurrent.CompletableFuture<T>
中的 thenCompose
方法
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(defaultExecutor(), fn);
}
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor) {
return uniComposeStage(screenExecutor(executor), fn);
}
代码示例 2-5-4-1 :
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class ThenComposeDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
log.info("hello, 进入到 future 任务第一阶段执行");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
log.info("hi, future 任务第一阶段执行完成,result = {}", result);
return result;
}, pool).thenCompose((firstValue) -> {
log.info("hello, thenCompose --> firstValue = {}", firstValue);
return CompletableFuture.supplyAsync(() -> {
log.info("hello, 进入到 future 任务第二阶段执行");
int result = ThreadLocalRandom.current().nextInt(10);
result = result + firstValue;
log.info("hi, future 任务第二阶段执行完成,result = {}", result);
return result;
});
});
Integer result = future.get();
log.info("result -------> {}", result);
log.info("主线程执行逻辑");
pool.shutdown();
}
}
打印结果 2-5-4-1 :
16:02:12.403 [pool-1-thread-1] INFO com.ThenComposeDemo - hello, 进入到 future 任务第一阶段执行
16:02:15.413 [pool-1-thread-1] INFO com.ThenComposeDemo - hi, future 任务第一阶段执行完成,result = 8
16:02:15.415 [pool-1-thread-1] INFO com.ThenComposeDemo - hello, thenCompose --> firstValue = 8
16:02:15.416 [ForkJoinPool.commonPool-worker-3] INFO com.ThenComposeDemo - hello, 进入到 future 任务第二阶段执行
16:02:15.416 [ForkJoinPool.commonPool-worker-3] INFO com.ThenComposeDemo - hi, future 任务第二阶段执行完成,result = 15
16:02:15.416 [main] INFO com.ThenComposeDemo - result -------> 15
16:02:15.416 [main] INFO com.ThenComposeDemo - 主线程执行逻辑
Process finished with exit code 0
2.6、异步任务的合并执行
2.6.1、thenCombine() 方法
thenCombine() 会在两个 CompletionStage 任务都执行完成后,把两个任务的结果一起交给 thenCombine() 来处理。两个 CompletionStage 任务都完成后,最终能把两个任务的结果一起交给 thenCombine 来处理。先完成的先等着,等待其他分支任务。
java.util.concurrent.CompletableFuture<T>
中的 thenCombine
方法:
/**
* 合并代表第二步任务(参数other)的CompletionStage实例,返回第三步任务的CompletionStage
* @param other 表示待合并的第二步任务的CompletionStage实例
* @param fn 表示第一个任务和第二个任务执行完成后,第三步需要执行的逻辑
* @patam <T> 表示第一个任务所返回结果的类型
* @param <U> 表示第二个任务所返回结果的类型
* @param <V> 表示第三个任务所返回结果的类型
* @return
*/
public <U, V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn) {
return biApplyStage(null, other, fn);
}
/**
* 不一定在同一个线程中执行第三步任务的CompletionStage实例
* @param other 表示待合并的第二步任务的CompletionStage实例
* @param fn 表示第一个任务和第二个任务执行完成后,第三步需要执行的逻辑
* @patam <T> 表示第一个任务所返回结果的类型
* @param <U> 表示第二个任务所返回结果的类型
* @param <V> 表示第三个任务所返回结果的类型
* @return
*/
public <U, V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn) {
return biApplyStage(defaultExecutor(), other, fn);
}
/**
* 第三步任务的CompletionStage实例在指定的executor线程池中执行
* @param other 表示待合并的第二步任务的CompletionStage实例
* @param fn 表示第一个任务和第二个任务执行完成后,第三步需要执行的逻辑
* @patam <T> 表示第一个任务所返回结果的类型
* @param <U> 表示第二个任务所返回结果的类型
* @param <V> 表示第三个任务所返回结果的类型
* @return
*/
public <U, V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn, Executor executor) {
return biApplyStage(screenExecutor(executor), other, fn);
}
代码示例 2-6-1-1 :
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class ThenCombineDemo {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
log.info("hello, 进入到 future1 任务执行开始");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
log.info("hi, future1 任务执行完成,result = {}", result);
return result;
}, pool);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
log.info("hello, 进入到 future2 任务执行开始");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
log.info("hi, future2 任务执行完成,result = {}", result);
return result;
}, pool);
CompletableFuture<Integer> future3 = future1.thenCombine(future2,(firstValue, secondValue) -> {
log.info("hello, 进入到 future3 任务执行开始");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
result = result + firstValue + secondValue;
log.info("hi, future3 任务执行完成,result = {}", result);
return result;
}).whenComplete((v, e) -> {
if (e == null) {
log.info("进入whenComplete, v = {}", v);
} else {
log.info("进入whenComplete 出现异常, v = {}, e", v, e.getMessage());
}
});
log.info("主线程执行逻辑");
pool.shutdown();
}
}
执行结果 2-6-1-1 :
16:26:54.758 [main] INFO com.ThenCombineDemo - 主线程执行逻辑
16:26:54.758 [pool-1-thread-2] INFO com.ThenCombineDemo - hello, 进入到 future2 任务执行开始
16:26:54.758 [pool-1-thread-1] INFO com.ThenCombineDemo - hello, 进入到 future1 任务执行开始
16:26:57.778 [pool-1-thread-2] INFO com.ThenCombineDemo - hi, future2 任务执行完成,result = 0
16:26:57.778 [pool-1-thread-1] INFO com.ThenCombineDemo - hi, future1 任务执行完成,result = 5
16:26:57.787 [pool-1-thread-1] INFO com.ThenCombineDemo - hello, 进入到 future3 任务执行开始
16:27:00.795 [pool-1-thread-1] INFO com.ThenCombineDemo - hi, future3 任务执行完成,result = 9
16:27:00.795 [pool-1-thread-1] INFO com.ThenCombineDemo - 进入whenComplete, v = 9
Process finished with exit code 0
2.6.2、runAfterBoth() 方法
runAfterBoth() 方法不关心每一步任务的输入参数和处理结果.
java.util.concurrent.CompletableFuture<T>
中的 runAfterBoth
方法
// 合并第二步任务的CompletionStage实例,返回第三步任务的
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action) {
return biRunStage(null, other, action);
}
// 不一定在同一个线程中执行第三步任务的CompletionStage实例
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action) {
return biRunStage(defaultExecutor(), other, action);
}
// 第三步任务的CompletionStage实例在指定的executor线程池中执行
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return biRunStage(screenExecutor(executor), other, action);
}
2.6.3、thenAcceptBoth() 方法
调用thenAcceptBoth()方法,第三个任务可以接收其合并过来的第一个任务、第二个任务的处理结果,但是第三个任务(合并任务)却不能返回结果。
java.util.concurrent.CompletableFuture<T>
中的 thenAcceptBoth
方法
// 合并第二步任务的CompletionStage实例,返回第三步任务的CompletionStage
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
// 功能与上一个方法相同,不一定在同一个线程执行第三步任务
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(defaultExecutor(), other, action);
}
// 功能与上一个方法相同,在指定的executor线程池中执行第三步任务
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) {
return biAcceptStage(screenExecutor(executor), other, action);
}
2.6.4、allOf() 方法等待所有的任务结束
allOf()会等待所有的任务结束,以合并所有的任务。thenCombine()只能合并两个任务,如果需要合并多个异步任务,那么可以调用allOf()
java.util.concurrent.CompletableFuture<T>
中的 allOf
方法
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
代码示例 2-6-4-1 :
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
@Slf4j
public class AllOfDemo {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
log.info("hello, 进入到 future1 任务执行开始");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
log.info("hi, future1 任务执行完成,result = {}", result);
return result;
}, pool);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
log.info("hello, 进入到 future2 任务执行开始");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
log.info("hi, future2 任务执行完成,result = {}", result);
return result;
}, pool);
CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> {
log.info("hello, 进入到 future3 任务执行开始");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = ThreadLocalRandom.current().nextInt(10);
log.info("hi, future3 任务执行完成,result = {}", result);
return result;
}, pool);
CompletableFuture<Void> futureAll = CompletableFuture.allOf(future1,future2,future3).whenComplete((v, e) -> {
if (e == null) {
log.info("进入whenComplete, v = {}", v);
} else {
log.info("进入whenComplete 出现异常, v = {}, e", v, e.getMessage());
}
});
log.info("主线程执行逻辑");
pool.shutdown();
}
}
打印结果 2-6-4-1 :
16:42:51.635 [main] INFO com.AllOfDemo - 主线程执行逻辑
16:42:51.635 [pool-1-thread-1] INFO com.AllOfDemo - hello, 进入到 future1 任务执行开始
16:42:51.635 [pool-1-thread-2] INFO com.AllOfDemo - hello, 进入到 future2 任务执行开始
16:42:51.635 [pool-1-thread-3] INFO com.AllOfDemo - hello, 进入到 future3 任务执行开始
16:42:54.645 [pool-1-thread-3] INFO com.AllOfDemo - hi, future3 任务执行完成,result = 7
16:42:54.645 [pool-1-thread-1] INFO com.AllOfDemo - hi, future1 任务执行完成,result = 3
16:42:54.645 [pool-1-thread-2] INFO com.AllOfDemo - hi, future2 任务执行完成,result = 2
16:42:54.651 [pool-1-thread-3] INFO com.AllOfDemo - 进入whenComplete, v = null
Process finished with exit code 0
2.7、异步任务的选择执行
所谓选择执行,就是看谁的速度快,前面两个并行任务,谁的结果返回速度快,谁的结果将作为第三步任务的输入。对两个异步任务的选择可以通过 CompletionStage 接口的 applyToEither()
、 runAfterEither()
和 acceptEither()
三个方法来实现。这三个方法的不同之处在于它的核心参数的类型不同,分别为 Function<T,R>
、Runnable
、Consumer<? superT>
类型。
2.7.1、applyToEither() 方法
两个 CompletionStage 谁返回结果的速度快,applyToEither()
方法就用这个最快的 CompletionStage 的结果,进行下一步(第三步)的回调操作。
java.util.concurrent.CompletableFuture<T>
中的 applyToEither
方法
// 和other任务进行速度PK,最快返回的结果用于执行fn回调函数
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(null, other, fn);
}
// 功能与上一个方法相同,不一定在同一个线程中执行fn回调函数
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn) {
return orApplyStage(defaultExecutor(), other, fn);
}
// 功能与上一个方法相同,在指定线程执行fn回调函数
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor) {
return orApplyStage(screenExecutor(executor), other, fn);
}
2.7.2、runAfterEither() 方法
runAfterEither() 方法的功能为:前面两个 CompletionStage 实例,任何一个完成了都会执行第三步回调操作。三个任务的回调函数都是 Runnable 类型的。
java.util.concurrent.CompletableFuture<T>
中的 runAfterEither
方法
// 和other任务进行速度PK,只要一个执行完成,就开始执行action回调函数
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action) {
return orRunStage(null, other, action);
}
// 功能与上一个方法相同,不一定在同一个线程中执行action回调函数
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action) {
return orRunStage(defaultExecutor(), other, action);
}
// 功能与上一个方法相同,在指定线程执行action回调函数
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor) {
return orRunStage(screenExecutor(executor), other, action);
}
2.7.3、acceptEither() 方法
调用 acceptEither() 方法,两个 CompletionStage 谁返回结果的速度快,acceptEither() 就用那个最快的 CompletionStage 的结果作为下一步(第三步)的输入,但是第三步没有输出。
java.util.concurrent.CompletableFuture<T>
中的 acceptEither
方法
// 和other任务进行速度PK,最快返回的结果用于执行action回调函数
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(null, other, action);
}
// 功能与上一个方法相同,不一定在同一个线程中执行action回调函数
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action) {
return orAcceptStage(defaultExecutor(), other, action);
}
// /功能与上一个方法相同,在指定的executor线程池中执行第三步任务
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor) {
return orAcceptStage(screenExecutor(executor), other, action);
}
2.8、CompletableFuture 的基础方法
说明 | 方法 | 备注 |
---|---|---|
获得结果 | get() | 不见不散(阻塞),get() 方法在编译期间会去做检查异常的工作,需要捕获/抛出异常 |
获得结果 | get(long timeout, TimeUnit unit) | 过时不候 |
获得结果 | join() | join() 方法在编译期间不会去做检查异常的工作 |
获得结果 | getNow(T valueIfAbsent) | 异步任务计算完,返回异步任务的计算结果。异步任务没有计算完,返回设定的 valueIfAbsent |
主动触发计算 | boolean complete(T value) | 是否打断get() 方法,立即返回入参中的值 |
.