CompletableFuture
- 一、介绍
- 1、概述
- 2、常用方法
- 二、方法使用
- 1、异步操作
- 1.1、创建任务(runAsync | supplyAsync)
- runAsync
- supplyAsync
- 1.2、获取结果(get | join)
- 1.3、异常处理(whenComplete | exceptionally)
- whenComplete
- exceptionally
- 2、场景使用
- 2.1、结果转换(thenApply | thenCompose)
- thenApply
- thenCompose
- 2.2、结果消费(thenAccept | thenAcceptBoth | thenRun)
- thenAccept
- thenAcceptBothAsync
- thenRun
- 2.3、任务组合(thenCombine)
- thenCombine
- 2.4、任务对比(applyToEither | acceptEither | runAfterEither)
- applyToEither
- acceptEither
- runAfterEither
- 2.4、批量处理任务(allOf | anyOf)
- allOf
- anyOf
一、介绍
1、概述
CompletableFuture
是 Java 8 引入的一种异步编程工具,位于 java.util.concurrent
包中。它提供了一种方便的方式来执行异步任务,并在任务完成时执行相应的操作。
CompletableFuture
是对Future
的扩展和增强;CompletableFuture
是一种可完成或失败的Future
。它提供了一种通用的机制来处理异步操作的结果,包括处理异步任务的完成、异常处理和组合多个异步任务的结果。- 并且通过实现CompletionStage实现了对任务编排的能力,执行某一阶段,可以向下执行后续阶段。
- 异步执行的时候,如果未定义线程池,默认线程池是
ForkJoinPool.commonPool()
,但一般实际开发中还是尽量自己定义线程池,避免出现因为默认线程池线程被占满造成阻塞问题。
2、常用方法
CompletableFuture
的方法中不以Async
结尾,意味着Action使用相同的线程执行;而Async
可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
方法名 | 描述 | 返回值 | 静态方法 |
---|---|---|---|
supplyAsync | 异步执行一个有返回值的任务CompletableFuture | 有 | 是 |
runAsync | 异步执行一个无返回值的任务CompletableFuture | 无 | 是 |
join | 获取CompletableFuture异步之后的返回值,抛出的是未经检测异常 | 有 | 否 |
get | 获取CompletableFuture异步之后的返回值,抛出的是经过检查的异常,需要手动处理(抛出或者 try catch) | 有 | 否 |
whenComplete | 用于在 CompletableFuture 完成(正常或异常完成)后执行指定的操作 | 有 | 否 |
exceptionally | 当CompletableFuture任务发生异常时执行,不发生异常时不执行 | 有 | 否 |
thenApply | 当前任务完成时,执行一个函数并返回新的CompletableFuture | 有 | 否 |
thenCompose | 当前任务完成时,执行一个函数并返回新的CompletableFuture(同thenApply入参不同) | 有 | 否 |
thenAccept | 当前一个任务完成时,执行一个消费者函数,没有返回值 | 无 | 否 |
thenAcceptBoth | 当两个CompletableFutures都执行完毕后,执行指定的操作,没有返回值 | 无 | 否 |
thenRun | 当上一阶段任务执行完成后异步执行一个不带任何输入参数的任务; | 无 | 否 |
thenCombine | 组合两个CompletableFuture的结果,返回新的CompletableFuture | 有 | 否 |
applyToEither | 对比并向处理的两个任务哪个先执行完就采用哪一个任务的结果进行处理并返回新的结果 | 有 | 否 |
acceptEither | 对比并向处理的两个任务哪个先执行完就采用哪一个任务的结果进行处理直接消费结果,不会返回新的结果 | 无 | 否 |
runAfterEither | 对比并向处理的两个任务哪个先执行完,直接进行下一步操作,不关心上一步的运行结果也无返回 | 无 | 否 |
allOf | 接收future任务对象的数组,当全部任务执行完成后,返回一个新的任务 | 无 | 是 |
anyOf | 接收future任务对象的数组,当有一个任务执行完成后,返回该任务 | 有 | 是 |
这些方法用于创建、组合和处理CompletableFuture实例,以实现异步编程。每个方法都具有不同的功能和用途,可以根据具体的需求选择适合的方法来构建
二、方法使用
1、异步操作
1.1、创建任务(runAsync | supplyAsync)
runAsync
runAsync()
无返回值,参数为一个 Runnable函数类型,可以指定线程池;如果没有指定会内部使用默认的线程池ForkJoinPool.commonPool()
Runnable runnable = () -> {
System.out.println("无返回值的异步任务"+Thread.currentThread().getName());
};
// 非简写方式
CompletableFuture.runAsync(runnable).join();
supplyAsync
supplyAsync
有返回值,其他的使用和runAsync()基本一致,返回时可以指定类型
// 简写方式
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("有返回值的异步任务"+Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello World!";
}, Executors.newFixedThreadPool(1)); //指定线程池
String join = future.join();
System.out.println(Thread.currentThread().getName()+"结果"+join);
1.2、获取结果(get | join)
get()和join都是用于获取CompletableFuture异步之后的返回值,join()方法抛出的是uncheck异常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException, InterruptedException 需要用户手动处理(抛出或者 try catch)
1.3、异常处理(whenComplete | exceptionally)
whenComplete
用于在
CompletableFuture
完成(正常或异常完成)后执行指定的操作; 用于在CompletableFuture
完成时执行一些附加操作的有用工具,无论是成功完成还是出现异常,(相当于finally部分) ,异常和结果之会返回一个,另外一个会为null。
- 场景:记录日志、清理资源
- 代码实现
CompletableFuture.supplyAsync(() -> {
System.out.println("无返回值的异步任务线程===="+Thread.currentThread().getName());
return "----德玛西亚!-----";
}).whenComplete((u, e) -> { //(u:没有异常时返回的结果,e:有异常时返回的结果)
System.out.println("future线程======" + Thread.currentThread().getName());
System.out.println("执行结果:"+u);
System.out.println("异常输出:" + e);
}).join();
- 日志输出
exceptionally
exceptionally可用于搭配一起使用:只有当发生异常才执行exceptionally里的任务(相当于catch部分)
- 代码实现:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer i= 10/0; //模拟一个异常
return "Hello World!";
}).whenCompleteAsync((u, e) -> { //(u:没有异常时返回的结果,e:有异常时返回的结果)
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("执行结果:"+u);
System.out.println("异常输出:" + e);
}).exceptionally(t -> { //进行异常处理,无异常时不会执行下面的处理
System.out.println("执行失败进行异常处理!");
return "异常XXXX--" + t;
});
System.out.println("结果:"+future.join());
- 日志输出:
- 有异常时输出:
- 无异常时输出:
2、场景使用
2.1、结果转换(thenApply | thenCompose)
thenApply
和thenCompose
都是用于链式操作,只是入参方式不同而已。
作用:一般用于处理一些需要异步处理的结果该方法不会改变Future的结果类型,执行方式就比如流水线一样,一步一步往下执行,用法和stream流一样,用于做一些中间操作;
比如:你可以将一个字符串结果转换为大写或小写,或者对数字进行一些计算。如: future1->future2->future3
thenApply
thenApply
: 接收一个函数作为参数,使用该函数处理上一个CompletableFuture
调用的结果,并返回一个具有处理结果的Future对象
- 代码实现:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int result = new Random().nextInt(5) ;
try {
TimeUnit.SECONDS.sleep(result);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("建立人物模型,耗时:"+result);
return result;
}, Executors.newFixedThreadPool(5));
// thenApply: 接收一个函数作为参数,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处理结果的Future对象
CompletableFuture<Integer> future2 = future1.thenApply(number -> { //把future1的结果作为入参
int result = new Random().nextInt(5) ;
try {
TimeUnit.SECONDS.sleep(result);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("加载模型骨骼,耗时:"+result);
return result+number;
});
System.out.println("全部加载完成,耗时:"+future2.join()+"y");
- 日志输出:
thenCompose
thenCompose
:的参数为一个返回CompletableFuture
实例的函数(而这个实例的参数是先前计算步骤的结果),其他的使用上和thenApply
没有区别。
- 代码实现:
// 拼接前面thenApply部分的代码
CompletableFuture<Integer> future3 = future2.thenCompose(param ->
CompletableFuture.supplyAsync(() -> {
int result = new Random().nextInt(5) ;
try {
TimeUnit.SECONDS.sleep(result);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("加载模型皮肤,耗时:"+result);
return result+param;
})
);
不简写的方式
// CompletableFuture<Integer> future4 = future2.thenCompose(new Function<Integer, CompletionStage<Integer>>() {
// @Override
// public CompletionStage<Integer> apply(Integer param) {
// return CompletableFuture.supplyAsync(new Supplier<Integer>() {
// @Override
// public Integer get() {
// int result = new Random().nextInt(5);
// try {
// TimeUnit.SECONDS.sleep(result);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("加载模型皮肤,耗时:" + result);
// return result + param;
// }
// });
// }
// });
System.out.println("全部加载完成,耗时:"+future3.join()+"y");
- 日志输出:
2.2、结果消费(thenAccept | thenAcceptBoth | thenRun)
thenAccept
thenAccept
用于异步处理上一个Future任务的结果,并且会把结果消费掉,无返回值
作用:比如完成一些操作需要一个前置的条件,并且通过异步执行不需要阻塞住当前线程,可以通过异步同时处理多个任务
public static void main(String[] args) {
// 消费之前,有返回Integer
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int number = new Random().nextInt(10);
System.out.println("加载商城页面" + number);
return number;
});
System.out.println("商城页面加载成功:==="+future1.join()+"===继续加载商城内置页面!");
CompletableFuture<Void> future2 = future1.thenAcceptAsync(number -> { //结果消费掉之后会没有返回值
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int n2 = number * 2;
System.out.println("加载商城英雄页面:" + n2);
});
CompletableFuture<Void> future3 = future1.thenAcceptAsync(number -> { //结果消费掉之后会没有返回值
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int n2 = number * 3;
System.out.println("加载商城皮肤页面:" + n2);
});
future2.join();
future3.join();
System.out.println("商城内置页面也加载成功:");
}
thenAcceptBothAsync
thenAcceptBothAsync
会消费两个future的结果,x当前的,y入参的,无返回值
作用:主要用于当需要两个前置任务完成后,再消费两个前置任务的结果去执行thenAcceptBoth的任务(需要注意的是该方法的前置任务的执行顺序是无法确认的)
- 代码实现:
// 消费之前,有返回Integer
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("工商银行储蓄卡余额:" + number);
return number;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(10);
System.out.println("工商银行信用卡余额:" + number);
return number;
});
// thenAcceptBothAsync会消费两个future的结果,x当前的,y入参的,无返回值
// 作用:主要用于当需要两个前置任务完成后,
// 再消费两个前置任务的结果去执行thenAcceptBoth的任务(需要注意的是该方法的前置任务的执行顺序是无法确认的)
future1.thenAcceptBothAsync(future2, (x, y) -> //会消费掉两个前置任务的结果
System.out.println("工商银行总余额:" + (x + y))
);
- 日志输出:
thenRun
thenRun
主要在前任务执行完成后异步执行一个不带任何输入参数的任务;一般用于某些清理,日志记录,关闭资源、释放锁等
- 代码实现:
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
int number = new Random().nextInt(10);
System.out.println("初始值:" + number);
return number;
});
// thenRun:主要在前任务执行完成后异步执行一个不带任何输入参数的任务;一般用于某些清理,日志记录,关闭资源、释放锁等
future1.thenRun(()->{
System.out.println(Thread.currentThread().getName()+"执行run关闭资源");
});
- 日志输出:
2.3、任务组合(thenCombine)
thenCombine
thenCombine
:和thenAcceptBoth
使用类似,都是合并使用两个前置任务future的结果,进行最终处理,但thenCombine
会把组合处理然后返回一个新的值
场景:一般用于并行数据获取与组合和一些数据转换与组合
- 代码实现
int i = 10;
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(10) * i;
System.out.println("获取用户的信息:" + number);
return number;
});
CompletableFuture<Integer> future2 =CompletableFuture.supplyAsync(()->{
int number = new Random().nextInt(10) * i;
System.out.println("获取用户的订单信息:"+number);
return number;
});
//不简写写法
// CompletableFuture<Integer> result = future1
// .thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {
// @Override
// public Integer apply(Integer x, Integer y) {
// return x + y;
// }
// });
//
CompletableFuture<Integer> future3 = future1.thenCombine(future2, (x, y) -> {
Integer sum = x + y;
System.out.println("进行合并处理:" + sum);
return sum;
});
System.out.println("最终处理结果信息:"+future3.get());
- 日志输出:
2.4、任务对比(applyToEither | acceptEither | runAfterEither)
applyToEither
applyToEither
: 主要用于对比并向处理的两个任务哪个先执行完就采用哪一个任务的结果进行处理。
场景:假设我们需要从两个不同的远程服务器获取数),或者采用两种通讯协议(SSH、HTTP)获取数据,可以异步通过applyToEither
择优选择最快方式的得到结果进行处理
- 代码实现:
int i= 250;
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("使用银河计算机计算结果为:" + i+",耗时:"+number+"秒");
return number;
});
CompletableFuture<Integer> future2 =CompletableFuture.supplyAsync(()->{
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("使用长城计算机计算结果为:" + i+",耗时:"+number+"秒");
return number;
});
//applyToEither会得到最先执行完成的结果,进行处理,并返回一个新的结果
CompletableFuture<String> future3 = future1.applyToEither(future2, (x) -> {
System.out.println("结果计算成功" );
return "最终结果执行耗时:"+x ;
});
System.out.println(future3.join());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
- 日志输出:
acceptEither
acceptEitherAsync
:和applyToEither
意思和使用都一样,只不过acceptEither会把前置任务的结果直接消费掉,不会有返回值,
场景:适用于在两个异步操作之一完成时执行一些副作用的情况
- 代码实现:
int i= 10;
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("使用银河计算机计算结果为:" + i+",耗时:"+number+"秒");
return "银河计算机";
});
CompletableFuture<String> future2 =CompletableFuture.supplyAsync(()->{
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("使用长城计算机计算结果为:" + i+",耗时:"+number+"秒");
return "长城计算机";
});
// acceptEitherAsync:和applyToEither意思和使用都一样,只不过acceptEither会把前置任务的结果直接消费掉,不会有返回值
future1.acceptEitherAsync(future2, (x) -> {
System.out.println("最终处理结果采用:" + x);
}).join();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
- 日志输出:
runAfterEither
runAfterEither
:和前面俩一样,也是两个前置任务进行比较,看哪个先执行完成,就进行下一步操作,但runAfterEither
不会关心前置任务的处理结果(无入参),也没返回值
适用于在两个异步操作之一完成后执行一些无需依赖结果的操作。
- 代码实现:
int i= 10;
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("通过http传输数据:" + i+",耗时:"+number+"秒");
return number;
});
CompletableFuture<Integer> future2 =CompletableFuture.supplyAsync(()->{
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("通过web socket传输数据:" + i+",耗时:"+number+"秒");
return number;
});
//没有参数也没有返回值
future1.runAfterEither(future2,()->{
System.out.println("任务传输成功!");
}).join();
- 日志输出:
2.4、批量处理任务(allOf | anyOf)
allOf
allOf
无返回值,接收future任务对象的数组作为参数并且返回一个新的CompletableFuture
。这个新的
CompletableFuture<Void>
在所有传入的CompletableFuture
对象都完成后才会完成,无返回值。
场景:并行执行一组独立的异步任务,并在所有任务完成后进行下一步操作;等待多个异步操作完成后进行某种聚合操作等
-代码实现:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i+"写入mysql成功,耗时:"+number);
return "mysql";
});
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i+"写入sqlserver成功,耗时:"+number);
});
CompletableFuture.allOf(future1, future2).join();
System.out.println("写入数据库成功,执行后续。。。。");
- 日志输出:
anyOf
anyOf
是CompletableFuture
的静态方法,主要用于接收多个Future任务,然后从中选取到最先执行完的任务的结果进行使用;anyOf
是静态方法,并且可以接收多个Future任务;
场景:同样适用于当并发执行多个独立任务时,从中选取最快的结果进行使用的场景;或者是超时处理的场景,当你希望在一段时间内完成多个任务中的任何一个,并在超时发生时做一些特定操作时使用。
- 代码实现:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("乌龟到达终点,耗时:"+number+"秒");
return "乌龟";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("兔子到达终点,耗时:"+number+"秒");
return "兔子";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
int number = new Random().nextInt(5);
try {
TimeUnit.SECONDS.sleep(number);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("蜗牛到达终点,耗时:"+number+"秒");
return "蜗牛";
});
CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2 ,future3);
System.out.println("获得第一名是:"+result.join());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
- 日志输出:
—部分介绍引用 https://blog.csdn.net/sermonlizhi/article/details/