简单的 CompletableFuture学习笔记
这里记录一下自己学习的内容,简单记录一下方便后续学习,内容部分参考 CompletableFuture学习博客
1. CompletableFuture简介
在api接口调用时间过长,调用过多外围接口时,为了提升性能,我们采用线程池来负责多线程的处理操作,因为我们需要得到各个子线程处理的结果,所以我们需要使用线程池+Future的方式进行接口优化,但Future在应对并行结果组合以及后续处理等方面显得力不从心,弊端明显,然后便引入了本次学习的CompletableFuture。
CompletableFuture
是 Java 8 中引入的一个新的并发编程工具,它为开发者提供了一种简单、高效的方式来处理异步操作和并发任务。CompletableFuture
可以看作是 Future 的增强版,它提供了更丰富的功能和更方便的使用方式。
2. 简单示例
以下记录一下简单的代码示例
2.1 CompletableFuture初识
当我们需要进行异步处理的时候,我们可以通过CompletableFuture.supplyAsync
方法,传入一个具体的要执行的处理逻辑函数,这样就轻松的完成了CompletableFuture的创建与触发执行
方法名称 | 作用描述 |
---|---|
supplyAsync | 静态方法,用于构建一个CompletableFuture<T> 对象,并异步执行传入的参数,允许执行函数有返回值 |
runAsync | 静态方法,用于构建一个CompletableFuture<Void> 对象,并异步执行传入函数,与supplyAsync 的区别在于此方法传入的是Callable类型,仅执行,没有返回值 |
示例代码如下:
package cn.git.future;
import java.util.concurrent.*;
/**
* @description: 初始了解completableFuture
* @program: bank-credit-sy
* @author: lixuchun
* @create: 2024-08-07
*/
public class CompletableFutureDemo01 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,
50,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程start ......");
// 没有返回结果,可以执行任务
CompletableFuture.runAsync(() -> {
System.out.println("子线程执行了:" + Thread.currentThread().getName());
}, executor);
// 带返回结果
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程2执行了:" + Thread.currentThread().getName());
return 10;
}, executor);
Integer result = future2.get();
System.out.println("result = " + result);
System.out.println("主线程end ......");
}
}
supplyAsync
或者runAsync
创建后便会立即执行,无需手动调用触发。
2.2 环环相扣处理
在流水线处理场景中,往往都是一个任务环节处理完成后,下一个任务环节接着上一环节处理结果继续处理。
CompletableFuture
用于这种流水线环节驱动类的方法有很多,相互之间主要是在返回值或者给到下一环节的入参上有些许差异,使用时需要注意区分:
具体的方法的描述归纳如下:
方法名称 | 作用描述 |
---|---|
thenApply | 对 CompletableFuture 的执行后的结果进行追加处理,并将当前的CompletableFuture 泛型对象更改为处理后新的对象类型,返回当前CompletableFuture 对象 |
thenCompose | 与thenApply 类似,区别点在于:此方法的入参函数返回一个CompletableFuture 类型对象 |
thenAccept | 在所有异步任务完成后执行一系列操作,与thenApply 类似,区别点在于thenApply 返回void类型,没有具体结果输出,适合无需返回值的场景 |
thenRun | 与thenAccept 类似,区别点在于thenAccept 可以将前面CompletableFuture 执行的实际结果作为参数进行传入并使用,但是thenRun 方法没有任何入参,只能执行一个Runnable函数,并且返回void类型 |
期望总是美好的,但是实际情况却总不尽如人意。在我们编排流水线的时候,如果某一个环节执行抛出异常了,会导致整个流水线后续的环节就没法再继续下去了,这时候需要使用 handle或者whenComplete来处理,具体比较如下
方法名称 | 方法描述 |
---|---|
handle | 与thenApply 类似,区别点在于handle执行函数的入参有两个,一个是CompletableFuture 执行的实际结果,一个是是Throwable对象,这样如果前面执行出现异常的时候,可以通过handle获取到异常并进行处理。 |
whenComplete | 与handle类似,区别点在于whenComplete 执行后无返回值。 |
-
whenComplete代码示例:
package cn.git.future; import java.util.concurrent.*; /** * @description: 回调方法以及发生异常处理 * whenCompleteAsync作用为异步执行 * exceptionally作用为异常处理 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class CompletableFutureDemo02 { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { System.out.println("主线程start ......"); // 没有返回结果,可以执行任务 CompletableFuture.runAsync(() -> { System.out.println("子线程执行了:" + Thread.currentThread().getName()); int i = 10 / 0; }, executor).whenCompleteAsync((res, exec) -> { System.out.println("whenCompleteAsync1"); System.out.println("res = " + res); System.out.println("exec = " + exec); }).exceptionally((exec) -> { // 发生异常处理 System.out.println("exec1 = " + exec); return null; }); // 带返回结果 CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("子线程2执行了:" + Thread.currentThread().getName()); int i = 10 / 0; return 10; }, executor).whenCompleteAsync((res, exec) -> { System.out.println("whenCompleteAsync2"); System.out.println("res = " + res); System.out.println("exec = " + exec); }).exceptionally((exec) -> { // 发生异常处理,带返回值 System.out.println("exec2" + exec); return -1; }); Integer integer = integerCompletableFuture.get(); System.out.println(integer); System.out.println("主线程end ......"); executor.shutdown(); } }
执行结果
-
handle代码示例
package cn.git.future; import java.util.concurrent.*; /** * @description: handleAsync任务执行后自定义执行器 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class CompletableFutureDemo03 { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 没有返回结果,可以执行任务 CompletableFuture.runAsync(() -> { System.out.println("子线程执行了:" + Thread.currentThread().getName()); }, executor).handleAsync((result, exec) -> { System.out.println("exec = " + exec); return null; }); // 带返回结果 CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("子线程2执行了:" + Thread.currentThread().getName()); int i = 1 / 0; return 10; }, executor).handleAsync((result, exec) -> { System.out.println("exec = " + exec); return 55; }); Integer integer = integerCompletableFuture.get(); System.out.println(integer); executor.shutdown(); } }
执行结果:
2.3 多个CompletableFuture组合操作
前面一直在介绍流水线式的处理场景,但是很多时候,流水线处理场景也不会是一个链路顺序往下走的情况,很多时候为了提升并行效率,一些没有依赖的环节我们会让他们同时去执行,然后在某些环节需要依赖的时候,进行结果的依赖合并处理。
CompletableFuture相比于Future的一大优势,就是可以方便的实现多个并行环节的合并处理。相关涉及方法介绍归纳如下:
方法名称 | 方法描述 |
---|---|
thenCombine | 将两个CompletableFuture 对象组合起来进行下一步处理,可以拿到两个执行结果,并传给自己的执行函数进行下一步处理,最后返回一个新的CompletableFuture 对象。 |
thenAcceptBoth | 与thenCombine 类似,区别点在于thenAcceptBoth 传入的执行函数没有返回值,即thenAcceptBoth 返回值为CompletableFuture<void> 。 |
runAfterBoth | 等待两个CompletableFuture 都执行完成后再执行某个Runnable 对象,再执行下一个的逻辑,类似thenRun 。 |
applyToEither | 两个CompletableFuture 中任意一个完成的时候,继续执行后面给定的新的函数处理。再执行后面给定函数的逻辑,类似thenApply 。 |
acceptEither | 两个CompletableFuture 中任意一个完成的时候,继续执行后面给定的新的函数处理。再执行后面给定函数的逻辑,类似thenAccept 。 |
runAfterEither | 等待两个CompletableFuture 中任意一个执行完成后再执行某个Runnable 对象,可以理解为thenRun 的升级版,注意与runAfterBoth 对比理解。 |
allOf | 静态方法,阻塞等待所有给定的CompletableFuture 执行结束后,返回一个CompletableFuture<Void> 结果。 |
anyOf | 静态方法,阻塞等待任意一个给定的CompletableFuture 对象执行结束后,返回一个CompletableFuture<Void> 结果。 |
-
thenAcceptAsync 串行任务编排,任务有先后顺序
package cn.git.future; import java.util.concurrent.*; /** * @description: 串行任务编排,任务有先后顺序 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class CompletableFutureDemo04 { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 没有返回结果,可以执行任务 线程1结束执行线程2 CompletableFuture.runAsync(() -> { System.out.println("子线程1执行了:" + Thread.currentThread().getName()); }, executor).thenRunAsync(()-> { System.out.println("子线程2执行了:" + Thread.currentThread().getName()); }, executor); // 有返回结果,可以执行任务 线程1结束执行线程4 CompletableFuture.supplyAsync(() -> { System.out.println("子线程3执行了:" + Thread.currentThread().getName()); return 100; }, executor).thenAcceptAsync((param) -> { // 线程3执行结果作为线程4入参参数 System.out.println("子线程4执行了:" + Thread.currentThread().getName()); System.out.println("线程4参数 = " + param); }, executor); // 有返回结果,获取处理 CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println("子线程5执行了:" + Thread.currentThread().getName()); return 100; }, executor).thenApplyAsync((param) -> { // 线程3执行结果作为线程4入参参数 System.out.println("子线程6执行了:" + Thread.currentThread().getName()); System.out.println("线程6参数 = " + param); return param * 100; }, executor); System.out.println("thread6 result = " + integerCompletableFuture.get()); executor.shutdown(); } }
执行结果:
-
runAfterBothAsync两个线程,都完成再执行
package cn.git.future; import java.util.concurrent.*; /** * @description: 多个线程,都完成再执行 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class CompletableFutureDemo05 { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 没有返回结果,串行执行任务 final CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { System.out.println("子线程1执行了:" + Thread.currentThread().getName()); }, executor).thenRunAsync(() -> { System.out.println("子线程2执行了:" + Thread.currentThread().getName()); }, executor); // 串行执行任务 CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("子线程3执行了:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 100; }, executor).thenAcceptAsync((param) -> { // 线程3执行结果作为线程4入参参数 System.out.println("子线程4执行了:" + Thread.currentThread().getName()); System.out.println("线程4参数 = " + param); }, executor); // 两个任务都执行完成后,执行线程5 future1.runAfterBothAsync(future2, () -> { System.out.println("子线程5执行了:" + Thread.currentThread().getName()); }, executor); } }
执行结果:
-
thenAcceptBothAsync 两个线程都完成再处理带参数
package cn.git.future; import java.util.concurrent.*; /** * @description: 多个线程,都完成再处理带参数 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class CompletableFutureDemo06 { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 没有返回结果,串行执行任务 final CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { System.out.println("子线程1执行了:" + Thread.currentThread().getName()); }, executor).thenRunAsync(() -> { System.out.println("子线程2执行了:" + Thread.currentThread().getName()); }, executor); // 串行执行任务 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("子线程3执行了:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 100; }, executor).exceptionally((e) -> { // 线程3执行结果作为线程4入参参数 System.out.println("子线程4执行了:" + Thread.currentThread().getName()); System.out.println("子线程4执行异常了:" + e.getMessage()); return -1; }); // 两个任务都执行完成后,执行线程5,带有返回参数 future1.thenAcceptBothAsync(future2, (param1, param2) -> { // param1 = 线程3执行结果, param2 = 线程4执行结果 System.out.println("线程5参数1 = " + param1); System.out.println("线程5参数2 = " + param2); System.out.println("子线程5执行了:" + Thread.currentThread().getName()); }); } }
执行结果:
-
thenCombineAsync 两个线程结果进行统一处理,返回新的返回结果
package cn.git.future; import java.util.concurrent.*; /** * @description: 获取两个线程结果进行统一处理,返回新的返回结果 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class CompletableFutureDemo07 { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 没有返回结果,串行执行任务 final CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("子线程1执行了:" + Thread.currentThread().getName()); return 50; }, executor); // 串行执行任务 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("子线程3执行了:" + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 100; }, executor).exceptionally((e) -> { // 线程3执行结果作为线程4入参参数 System.out.println("子线程4执行了:" + Thread.currentThread().getName()); System.out.println("子线程4执行异常了:" + e.getMessage()); return -1; }); // 获取两个线程结果进行统一处理,返回新的返回结果 final CompletableFuture<Integer> future3 = future1.thenCombineAsync(future2, (f1, f2) -> { System.out.println("f1 = " + f1); System.out.println("f2 = " + f2); return f1 + f2; }, executor); System.out.println("future3 = " + future3.get()); } }
执行结果:
-
runAfterEitherAsync两个任务完成其中任意一个任务
package cn.git.future; import java.util.concurrent.*; /** * @description: 两个任务完成其中任意一个任务 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class CompletableFutureDemo08 { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 没有返回结果,串行执行任务 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("子线程1执行了:" + Thread.currentThread().getName()); return 50; }, executor); // 串行执行任务 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程3执行了:" + Thread.currentThread().getName()); return 100; }, executor).exceptionally((e) -> { // 线程3执行结果作为线程4入参参数 System.out.println("子线程4执行了:" + Thread.currentThread().getName()); System.out.println("子线程4执行异常了:" + e.getMessage()); return -1; }); // 两个任务完成其中任意一个任务,便开始执行 future1.runAfterEitherAsync(future2, () -> { System.out.println("子线程5执行了:" + Thread.currentThread().getName()); }, executor); } }
执行结果:
-
acceptEitherAsync 带有任意一个执行完毕参数信息
package cn.git.future; import java.util.concurrent.*; /** * @description: 两个任务完成其中任意一个任务 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class CompletableFutureDemo09 { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 没有返回结果,串行执行任务 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程1执行了:" + Thread.currentThread().getName()); return 50; }, executor); // 串行执行任务 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("子线程3执行了:" + Thread.currentThread().getName()); return 100; }, executor).exceptionally((e) -> { // 线程3执行结果作为线程4入参参数 System.out.println("子线程4执行了:" + Thread.currentThread().getName()); System.out.println("子线程4执行异常了:" + e.getMessage()); return -1; }); // 两个任务完成其中任意一个任务,便开始执行,带有任意一个执行完毕参数信息 future1.acceptEitherAsync(future2, (res) -> { System.out.println("新线程:" + Thread.currentThread().getName()); System.out.println("获取任意结果:" + res); }, executor); } }
执行结果:
-
applyToEitherAsync 两个任务完成其中任意一个任务,便开始执行,并且获取返回结果
package cn.git.future; import java.util.concurrent.*; /** * @description: 两个任务完成其中任意一个任务 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class CompletableFutureDemo10 { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 没有返回结果,串行执行任务 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程1执行了:" + Thread.currentThread().getName()); return 50; }, executor); // 串行执行任务 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("子线程3执行了:" + Thread.currentThread().getName()); return 100; }, executor).exceptionally((e) -> { // 线程3执行结果作为线程4入参参数 System.out.println("子线程4执行了:" + Thread.currentThread().getName()); System.out.println("子线程4执行异常了:" + e.getMessage()); return -1; }); // 两个任务完成其中任意一个任务,便开始执行,并且获取返回结果 final CompletableFuture<Integer> future3 = future1.applyToEitherAsync(future2, (res) -> { System.out.println("子线程2执行了:" + Thread.currentThread().getName()); System.out.println("子线程2执行结果:" + res); return res; }, executor); System.out.println("future3 = " + future3.get()); } }
执行结果:
-
allOf / anyOf 多任务组合处理
package cn.git.future; import java.util.concurrent.*; /** * @description: 多任务组合处理 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class CompletableFutureDemo11 { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 没有返回结果,串行执行任务 CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("子线程1执行了:" + Thread.currentThread().getName()); return 50; }, executor); // 串行执行任务 CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("子线程3执行了:" + Thread.currentThread().getName()); return 100; }, executor).exceptionally((e) -> { // 线程3执行结果作为线程4入参参数 System.out.println("子线程4执行了:" + Thread.currentThread().getName()); System.out.println("子线程4执行异常了:" + e.getMessage()); return -1; }); // 串行执行任务 CompletableFuture<Integer> future3 = CompletableFuture.supplyAsync(() -> { System.out.println("子线程5执行了:" + Thread.currentThread().getName()); return 200; }, executor).exceptionally((e) -> { // 线程3执行结果作为线程4入参参数 System.out.println("子线程6执行了:" + Thread.currentThread().getName()); System.out.println("子线程6执行异常了:" + e.getMessage()); return -1; }); // 等待三个任务执行完成 才开始进行后续处理操作,需要進行阻塞操作 final CompletableFuture<Void> finalFuture = CompletableFuture.allOf(future1, future2, future3); finalFuture.get(); // 任意一个任务执行完成 才开始进行后续处理操作,需要進行阻塞操作 final CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2, future3); finalFuture.get(); System.out.println("三个任务全部执行完毕啦。。。。。。"); } }
执行结果:
-
自定义循环异步调用例子
package cn.git.future; import cn.git.entity.Person; import java.util.Arrays; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; /** * @description: 自定义循环异步调用例子 * @program: bank-credit-sy * @author: lixuchun * @create: 2024-08-07 */ public class TestDemo { static ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 设定参数信息,循环多少次也是入参参数 int [] params = {1,2,3,4,5,6,7,8,9,10}; CompletableFuture<Person>[] futures = IntStream.of(params) .mapToObj(param -> { CompletableFuture<Person> future = CompletableFuture.supplyAsync(() -> { if (param == 5) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("线程:" + Thread.currentThread().getName() + "正在执行任务:" + param); Person person = new Person(); person.setSex("男"); person.setAge(param); person.setName("张三"); return person; }, executor); return future; }).toArray(CompletableFuture[]::new); // 等待所有CompletableFuture完成 CompletableFuture.allOf(futures).join(); // 获取所有结果 Object[] results = Stream.of(futures) .map(future -> { try { return future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return null; } }).toArray(); // 打印结果 System.out.println(Arrays.stream(results).map(Object::toString).collect(Collectors.joining("\n"))); } }
执行结果:
2.3 结果等待与获取
在执行线程中将任务放到工作线程中进行处理的时候,执行线程与工作线程之间是异步执行的模式,如果执行线程需要获取到共工作线程的执行结果,则可以通过get
或者join
方法,阻塞等待并从CompletableFuture
中获取对应的值。
对get和join的方法功能含义说明归纳如下:
方法名称 | 作用描述 |
---|---|
get() | 等待CompletableFuture 执行完成并获取其具体执行结果,可能会抛出异常,需要代码调用的地方手动try…catch进行处理。 |
get(long, TimeUnit) | 与get() 相同,只是允许设定阻塞等待超时时间,如果等待超过设定时间,则会抛出异常终止阻塞等待。 |
join() | 等待CompletableFuture 执行完成并获取其具体执行结果,可能会抛出运行时异常,无需代码调用的地方手动try…catch进行处理。 |
从介绍上可以看出,两者的区别就在于是否需要调用方显式的进行try…catch处理逻辑,使用代码示例如下:
public void testGetAndJoin(String product) {
// join无需显式try...catch...
PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
.join();
try {
// get显式try...catch...
PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
.get(5L, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}