Future接口理论知识复习
Future接口(FutureTask实现类)定义了操作异步任务执行的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
找到java.util.concurrent.Future
,看到里面定义的方法,这些方法就是我们需要关注的方法。
Future接口可以为主线程开一个分支任务,专门为主线程处理耗时和费力的复杂业务。
Future接口常用实现类FutureTask异步任务
Future接口能干什么
Future是Java5新加的一个接口,它提供了一个异步并行计算的功能,如果主线程需要执行一个很耗时的计算任务,我们就可以通过Future把这个任务放到异步线程中执行,主线程继续处理其他任务或者先行结束,再通过Future获取计算结果。
目的:异步多线程任务执行且返回有结果,三个特点:多线程、有返回、异步任务。
本源的Future接口相关架构
可以看到FutureTask实现了Runnable、Future接口,而且它的构造参数还支持传入Callable,所以FutureTask现在就具有多线程(Runnable)、有返回(Callable)、异步任务(Future)这3个特点了。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<>(new MyThread());// 将一个耗时的操作封装到FutureTask里
new Thread(futureTask, "threadName").start();// 启动一个子线程执行FutureTask
System.out.println(futureTask.get());// 获取FutureTask的返回值
}
}
class MyThread implements Callable<String> {
@Override
public String call() {
System.out.println("MyThread.call");
return "Hello Callable";
}
}
Future编码实战和优缺点分析
优点
Future结合线程池,可以显著提高程序的执行效率。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class FutureThreadPoolDemo {
/**
* 有3个任务,分别耗时500ms,300ms,300ms
* fun1():3个任务由主线程依次执行
* fun2():将3个任务放到线程池执行
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
fun1();
fun2();
}
private static void fun2() throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
ExecutorService executorService = Executors.newFixedThreadPool(3);// 创建一个线程池
FutureTask<String> futureTask1 = new FutureTask<>(() -> {// 任务1
Thread.sleep(500);
return "task1 over";
});
executorService.submit(futureTask1);
FutureTask<String> futureTask2 = new FutureTask<>(() -> {// 任务2
Thread.sleep(300);
return "task2 over";
});
executorService.submit(futureTask2);
FutureTask<String> futureTask3 = new FutureTask<>(() -> {// 任务3
Thread.sleep(300);
return "task3 over";
});
executorService.submit(futureTask3);
// 这里获取返回值,主线程会阻塞等待直到拿到返回值,如果把获取结果注释掉,fun2()的执行时间会很短,因为主线程执行完毕,但是子线程依旧在跑
System.out.println(futureTask1.get());
System.out.println(futureTask2.get());
System.out.println(futureTask3.get());
long endTime = System.currentTimeMillis();
System.out.println("fun()2总耗时:" + (endTime - startTime));
executorService.shutdown();// 关闭线程池
}
private static void fun1() throws InterruptedException {
long startTime = System.currentTimeMillis();
Thread.sleep(500);// 任务1
Thread.sleep(300);// 任务2
Thread.sleep(300);// 任务3
long endTime = System.currentTimeMillis();
System.out.println("fun1()总耗时:" + (endTime - startTime));
}
}
缺点
get()阻塞
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<>(()->{
Thread.sleep(5000);
return "task over";
});
new Thread(futureTask).start();
System.out.println("主线程正在运行");
// System.out.println(futureTask.get());// 阻塞,直到获取到返回结果
System.out.println(futureTask.get(2000, TimeUnit.MILLISECONDS));// 只等待2000ms,超时未返回直接抛异常
}
}
get()
方法会有阻塞问题,导致主线程无法继续执行,另外提供了一个带参的get()
方法,超时自动放弃等待。
isDone()轮询
有时候,在get()
阻塞期间,我们希望看到进度或者提示信息,而不是一味地等待,可以将get()
改成轮询,在轮询方法里通过isDone()
判断任务是否执行完毕,只是多了一些提示信息。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeoutException;
public class FutureAPIDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
FutureTask<String> futureTask = new FutureTask<>(() -> {
Thread.sleep(5000);
return "task over";
});
new Thread(futureTask).start();
System.out.println("主线程正在运行");
// System.out.println(futureTask.get());// 阻塞,直到获取到返回结果
// System.out.println(futureTask.get(2000, TimeUnit.MILLISECONDS));// 只等待2000ms,超时未返回直接抛异常
while (true) {
if (futureTask.isDone()) {
System.out.println(futureTask.get());
break;
} else {
Thread.sleep(1000);
System.out.println("正在处理中,请稍等……");
}
}
}
}
这样的缺点就是:频繁的调用isDone()
方法,对CPU来说是浪费资源。
结论
Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式获取任务结果。
想完成一些复杂的任务
对于简单的业务场景,Future是完全可以胜任的。
回调通知
对于Future的完成时间,我们希望,完成之后可以通知主线程。
创建异步任务
Future结合线程池。
多个任务前后依赖可以组合处理(水煮鱼)
希望将多个异步任务的结果组合起来,后一个异步任务的计算需要前一个任务的值。
希望将多个异步计算合成成一个异步计算,这几个异步计算相互独立,同时,后一个的计算依赖前一个的计算结果。
对计算速度选最快
当Future集合中有多个任务的时候,处理最快的一个完成,返回第一个处理的结果。
CompletableFuture对Future的改进
CompletableFuture为什么会出现
Future中get()
方法和isDone()
方法都存在问题,对于真正的异步处理,我们希望可以通过回调函数,在Future结束之后,自动调用回调函数,这样就不用等待返回结果了。
阻塞的方式和异步编程的设计理念相违背,轮询会额外耗费CPU资源,因此JDK8设计出了CompletableFuture,它提供了一个类似观察者模式的机制,任务完成之后,通知监听的一方。
CompletableFuture和CompletionStage源码分别介绍
类架构说明
接口CompletionStage
- CompletionStage代表异步计算过程中的某个阶段,一个阶段完成后可能会触发另一个阶段,有些类似Linux系统管道分隔传参数
- 一个阶段的执行可以是一个Function,Consumer或者Runnable。比如:
stage.thenApply(x -> square(x)).thenAccept(x -> System.out.println(x)).thenRun(() -> System.out.println())
- 一个阶段的执行可能被单个阶段的完成触发,有可能是由多个阶段一起触发
类CompletableFuture
- 在Java8中,CompletableFuture提供了非常强大的Future扩展功能,可以帮助我们简化异步编程复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,提供了转换和组合CompletableFuture的方法
- 它可能代表一个明确完成的Future,也可能带一个完成阶段(CompletionStage),它支持在计算完成后触发一些函数或者执行某些动作
- 它实现了Future和CompletionStage接口
核心的四个静态方法,创建一个异步任务
runAsync:无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
supplyAsync:有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
上述Executor参数说明
没有指定线程池executor的时候,使用的是默认的ForkJoinPool.commonPool(),如果指定了线程池,则使用自定义的或指定的线程池。
Code
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureBuildDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
fun1();
System.out.println("----------------------------------------");
fun2();
System.out.println("----------------------------------------");
fun3();
System.out.println("----------------------------------------");
fun4();
}
private static void fun1() throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(completableFuture.get());
}
private static void fun2() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executorService);
System.out.println(completableFuture.get());
executorService.shutdown();
}
private static void fun3() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "CompletableFutureBuildDemo.fun3";
});
System.out.println(completableFuture.get());
}
private static void fun4() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "CompletableFutureBuildDemo.fun4";
}, executorService);
System.out.println(completableFuture.get());
executorService.shutdown();
}
}
通用演示,减少阻塞和轮询
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
public class CompletableFutureUserDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
int result = ThreadLocalRandom.current().nextInt();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}, executorService)
.whenComplete((v, e) -> {
if (e == null) {
System.out.println("计算完成,计算结果:" + v);
}
})
.exceptionally((e) -> {
e.printStackTrace();
System.out.println("出现异常:" + e.getCause() + "\t" + e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName() + "正在运行");
executorService.shutdown();
}
}
注意这里的一个坑,如果使用的是默认的线程池,主线程执行完毕后,CompletableFuture使用的默认线程池会立刻关闭,就会导致whenComplete
方法不能被执行到,所以这里,还是推荐使用自定义线程池。
如果在supplyAsync
方法中,出现了异常,也会走whenComplte
方法,而且也走execptionally
方法。
CompletableFuture的优点
- 异步任务结束时,自动调用某个对象的方法
- 主线程设置好回调后,不需要再关心异步任务的执行,异步任务之间可以顺序执行
- 异步任务出错时,会自动回调某个对象的方法
案例精讲-从电商网站的比价需求说开去
函数式编程已经主流
Lambda表达式、Stream流式调用、Chain链式调用、Java8函数式编程。
函数式编程:
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}
@FunctionalInterface
public interface BiConsumer<T, U> {
void accept(T t, U u);
}
@FunctionalInterface
public interface Supplier<T> {
T get();
}
函数式接口名称 | 方法名称 | 参数个数 | 返回值 |
---|---|---|---|
Runnable | run | 0 | 无 |
Function | apply | 1 | 有 |
Consumer | accept | 1 | 无 |
BiConsumer | accept | 2 | 无 |
Supplier | get | 0 | 有 |
还有一个常用的函数式接口,这里视频并没有提及,它是Predicate
。
先说说join和get对比
CompletableFuture的get()
方法和join()
方法相比,作用是一样的,区别是get()
在编译阶段,会抛出checkedException,而join()
不会。
这里还学到一个Lombok的新知识:在类上添加这个注解:@Accessors(chain = true)// 开启链式写法
,可以开启对象的链式写法,比如student.setId(1).setName("xxx").setMajor("yyy");
,把原来竖着写的set方法,扭转成横着写了,算是个语法糖吧。
大厂业务需求说明
需求说明:
同一款产品,同时搜索出同款产品在各大电商平台的售价
同一款产品,同时搜索出本产品在同一电商平台下,各个入驻卖家售价
输出返回:
希望查询结果是这款产品在不同地方的价格清单列表,返回一个List<String>
解决方案:
一步一步的查询,最后汇总,效率上会慢
多线程异步任务同时查询 ,返回结果汇总,效率上很高
一波流Java8函数式编程带走-比价案例实战Case
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
public class CompletableFutureMallDmo {
static List<Mall> mallList = new ArrayList() {{
add(new Mall("jd", "mysql"));
add(new Mall("tb", "mysql"));
add(new Mall("dd", "mysql"));
}};
public static void main(String[] args) {
long begin1 = System.currentTimeMillis();
List<String> list1 = fun1(mallList, "mysql");
list1.forEach(System.out::println);
long end1 = System.currentTimeMillis();
System.out.println("fun1用时:" + (end1 - begin1));
System.out.println("---------------------------------------------");
long begin2 = System.currentTimeMillis();
List<String> list2 = fun2(mallList, "mysql");
list2.forEach(System.out::println);
long end2 = System.currentTimeMillis();
System.out.println("fun2用时:" + (end2 - begin2));
}
private static List<String> fun1(List<Mall> mallList, String productName) {
return mallList.stream().map(mall -> String.format("%s in %s price is %s", productName, mall.getMallName(), mall.calculatePrice())).collect(Collectors.toList());
}
private static List<String> fun2(List<Mall> mallList, String productName) {
return mallList.stream().map(mall -> CompletableFuture.supplyAsync(() -> String.format("%s in %s price is %s", productName, mall.getMallName(), mall.calculatePrice())))
.collect(Collectors.toList()).stream()// 这里collect一下,是为了让前一个stream流完成,这样stream流里的线程就可以开始运算,如果没有这一行,就起不到并行作用,因为stream有懒惰的特性,只有执行终端操作时候,才会真正执行运算
.map(CompletableFuture::join).collect(Collectors.toList());
}
}
@AllArgsConstructor
@NoArgsConstructor
@Data
@Accessors(chain = true)
class Mall {
private String mallName;
private String productName;
public Double calculatePrice() {
try {
Thread.sleep(1000);// 模拟查询时间
} catch (InterruptedException e) {
e.printStackTrace();
}
return ThreadLocalRandom.current().nextDouble();
}
}
CompletableFuture常用方法
获取结果和触发计算
获取结果
public T get()
public T get(long timeout, TimeUnit unit)
public T join()
public T getNow(T valueIfAbsent)
:如果在get的时候,还没有返回结果,就将valueIfAbsent的值作为返回值返回
主动触发计算
public boolean complete(T value)
:方法首先判断进程有没有执行完,如果没有执行完,进行打断,并将value赋值为线程的返回值(通过get()
或join()
获取),如果执行完了,就不需要打断,线程的返回值就是线程里正常的返回值。
关于complete()
方法的返回值,有点不好理解,如果线程是被complete()
方法触发结束的,返回true,如果线程在执行complete()
方法的时候,已经结束,返回false。
public static void main(String[] args) throws InterruptedException {
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
Thread.sleep(2000);
System.out.println(completableFuture.complete("default") + "\t" + completableFuture.join());
}
修改主线程里的sleep为1000,子线程里的sleep为2000,查看结果对比。
对计算结果进行处理
计算结果存在依赖关系,这两个线程串行化。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一步");
return 1;
}, executorService).thenApply(v -> {
System.out.println("第二步");
int a = 1 / 0;
return v + 1;
}).thenApply(v -> {
System.out.println("第三步");
return v + 1;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("最终结果为:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
executorService.shutdown();
System.out.println(completableFuture.join());
}
}
把thenApply
换成handle
,对比区别。可以看到,使用handle
后第二步报错的情况下,第三步依旧执行了,而且会带着异常参数,可以根据异常参数做一些判断处理,使用thenApply
的话,如果线程内部报错,后续的thenApply
就不会执行了。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("第一步");
return 1;
}, executorService).handle((v, e) -> {
System.out.println("第二步");
int a = 1 / 0;
return v + 1;
}).handle((v, e) -> {
System.out.println("第三步");
return v + 1;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("最终结果为:" + v);
}
}).exceptionally(e -> {
e.printStackTrace();
System.out.println(e.getMessage());
return null;
});
executorService.shutdown();
System.out.println(completableFuture.join());
}
}
对计算结果进行消费
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
把线程的运算结果消费掉,没有返回值。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}, executorService).thenAccept(System.out::println);
executorService.shutdown();
}
}
任务之间的执行顺序对比:
public CompletableFuture<Void> thenRun(Runnable action)
:任务A执行完执行任务B,任务B不需要任务A的结果,也无返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
:任务A执行完执行任务B,任务B需要任务A的结果,但是任务B无返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
:任务A执行完执行任务B,任务B需要任务A的结果,但是任务B有返回值
CompletableFuture和线程池的说明:
如果不指定线程池,默认使用ForkJoinPool,如果指定了线程池,使用指定的线程池,在调用then*()
方法的时候,还有一个then*Async()
方法,如果使用了then*Async()
方法,这个方法内的任务和这个方法后的任务都会使用ForkJoinPool来执行,除非你在then*Async()
方法里又传了自定义线程池。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println("任务1" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "";
}, executorService).thenRun(() -> {
System.out.println("任务2" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).thenRun(() -> {
System.out.println("任务3" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
executorService.shutdown();
}
}
如果线程内执行的太快,Thread.currentThread().getName()
的结果可能是main线程,比方说,把Thread.sleep()
去掉,就可以复现。
查看thenRunAsync()
方法,里面有一个asyncPool
变量,查看它的赋值过程,判断useCommonPool
,如果为真,使用ForkJoinPool,如果为假,新建一个线程池。useCommonPool
的值又取决于ForkJoinPool.getCommonPoolParallelism() > 1
,通过调试发现ForkJoinPool.getCommonPoolParallelism()
的值是7,所以默认情况下,使用的是ForkJoinPool。
对计算速度进行选用
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
应用场景:一个方法需要实现多个查询服务,多个服务之间相互独立,只要有一个能返回结果,自动放弃等待其他未执行完的查询。
又找了一个例子:我计划从A到B去,有1路车和2路车,都可以到达,而且它们路线一致,我坐哪个呢?哪个车先来坐哪个呗,这里比较的是等待时间,可以把等待时间抽象成程序处理时间,哪个快走哪个,而且放弃其他所有的,也是这个道理。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result1";
}, executorService);
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result2";
}, executorService);
CompletableFuture<String> completableFuture = completableFuture1.applyToEither(completableFuture2, v -> v + " is winner");
System.out.println(completableFuture.join());
executorService.shutdown();
}
}
对计算结果进行合并
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
两个CompletionStage任务都完成后,将两个任务的结果一起提交给thenCombine
来处理,先完成的任务需要等待另一个任务完成。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}, executorService);
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
}, executorService);
CompletableFuture<Integer> completableFuture = completableFuture1.thenCombine(completableFuture2, (v1, v2) -> v1 * v2);
System.out.println(completableFuture.join());
executorService.shutdown();
}
}
看到这里,发现弹幕有人提到public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
和public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
老师没讲,我就自己搜索了下。
allOf
接收若干个CompletableFuture,当所有的CompletableFuture都完成后,才会执行返回CompletableFuture。
anyOf
接收若干个CompletableFuture,当任意一个任务执行完成,就返回CompletableFuture。