【JUC】- CompletableFuture详细学习
😄生命不息,写作不止
🔥 继续踏上学习之路,学之分享笔记
👊 总有一天我也能像各位大佬一样
🏆 博客首页 @怒放吧德德 To记录领地
🌝分享学习心得,欢迎指正,大家一起学习成长!
转发请携带作者信息 @怒放吧德德 @一个有梦有戏的人
文章目录
- 【JUC】- CompletableFuture详细学习
- 前言
- Future的不足
- 缺乏灵活的回调机制
- 无法处理异常
- 无法组合多个Future
- CompletableFuture的演化
- 提供非阻塞的回调机制
- 支持异常处理
- 支持任务组合
- CompletableFuture的诞生
- 架构说明
- CompletionStage
- 什么是CompletionStage
- CompletionStage的主要方法
- CompletableFuture
- CompletableFuture基础使用
- runAsync无返回值方法
- supplyAsync有返回值方法
- 使用创建异步线程
- whenComplete
- get与join
- CompletableFuture高级应用
- 对计算进行处理
- thenApply的用法
- handle的用法
- 对计算进行消费
- thenAccept的用法
- thenRun的用法
- 使用线程池执行异步任务
- 任务组
- 总结
前言
上篇文章,我们学习了Future的基本使用,以及其优缺点,然而其缺点是更加突出的,这也就在jdk8的时候就引申出CompletableFuture,这个类更能够很好的解决了异步编程来使性能提升。然而这是如何从Future演变到CompletableFuture呢?这就是我们这章将要学习的内容。
Future的不足
经过上章节的编码演练,我们很清楚的看到,如果是简单的业务,那么使用Future是能够胜任的。但是,在现实中,我们更多的业务场景呢并不是独立存在的,我们需要的是将多个异步任务计算结果结合起来,并且是后一个的异步任务的计算需要前一个异步任务的计算结果来支持。
缺乏灵活的回调机制
Future接口主要用于表示异步计算的结果,但它缺乏一种直接获取计算结果的回调机制。开发者通常需要调用get()方法阻塞线程,直到结果可用。这种同步阻塞方式降低了系统的并发性能和响应能力。
无法处理异常
在处理异步任务时,异常处理是一个不可忽视的问题。然而,Future接口没有提供一种优雅的方式来处理计算过程中可能发生的异常,开发者必须通过显式捕获和处理异常,增加了代码的复杂性。
无法组合多个Future
在实际应用中,经常需要组合多个异步任务。然而,Future接口并没有提供直接的支持来组合多个Future结果,这使得开发者必须手动协调多个Future的执行和结果处理,增加了代码的复杂性和出错概率。
CompletableFuture的演化
为了克服Future接口的这些不足,Java 8引入了CompletableFuture类。它不仅实现了Future接口,还提供了丰富的API用于构建和管理异步任务。
提供非阻塞的回调机制
CompletableFuture支持各种回调函数,例如thenApply、thenAccept和thenRun,这些方法允许开发者在任务完成后异步地处理结果,而不需要显式地调用阻塞的get()方法。
支持异常处理
CompletableFuture提供了exceptionally、handle等方法,允许开发者在计算过程中发生异常时进行处理,使得代码更加简洁和易读。
支持任务组合
通过thenCombine、thenCompose等方法,CompletableFuture支持将多个异步任务组合起来,形成更复杂的异步操作链,大大简化了代码的编写和维护。
:::info
Future提供的API不足以解决我们复杂的需求,处理起来不够优雅,由此就诞生了CompletableFuture。
:::
CompletableFuture的诞生
CompletableFuture
是在Java 8中引入的一个类,它实现了 Future
接口并提供了更多的功能。与 Future
的传统用法相比,CompletableFuture
提供了更多灵活性,特别是在构建异步编程流水线和组合异步任务方面。CompletableFuture提供了观察者模式类似的机制,可以让任务执行完毕之后通知监听者。
我们通过源码来观察CompletableFuture,通过以下三个步骤来一步一步学习CompletableFuture。
架构说明
首先看一下CompletableFuture这个类,
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
// ...
}
他实现了Future以及CompletionStage,他是FutureTask的增强版,也就是不仅是完成了Future有的内容,还进行了一些列的扩充。
这是三个类的类图。
CompletionStage
CompletionStage
是Java 8为处理异步编程引入的一个接口,它作为Future接口的一个补充,允许你明确地处理异步操作的完成情况,让你能更好地掌控异步操作的复杂流程。比较重要的是,CompletionStage提供了链式操作,能够更好地组合与管理多个异步操作。
什么是CompletionStage
在Java 8之前,异步编程通常使用java.util.concurrent.Future接口。然而,这个接口有很多的局限,比如不能直接把两个异步操作的结果直接关联起来,操作完成后也无法触发某个行为等。所以CompletionStage就是来解决这个问题的。
CompletionStage代表异步计算过程中的某个阶段,一个阶段的完成能够触发另一个阶段。
一个阶段的计算执行可以是一个Function,Consumer,Runnable。
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发。
CompletionStage的主要方法
我们从类图中可以看到,CompletionStage接口提供了许许多多用于处理异步操作的方法。以下简单介绍几个:
- thenApply:用于对之前异步操作的结果进行转换。
- thenAccept:用操作结果进行一些消费操作,比如输出结果。
- thenCombine:用于把两个CompletionStage的结果合并。
- thenCompose:用于创建一个新的stage,它的值由之前stage的结果推演得出。
CompletableFuture
CompletableFuture是Java 8引入的一个类,是对Future的强化,实现了Future和CompletionStage接口。它可以帮助我们以异步的方式执行任务,并且提供了大量的方法来处理和控制这些任务的结果。
以下是一个使用CompletableFuture进行异步计算的简单例子,这里先简单了解一下。
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
}).thenApply(s -> s + " World");
System.out.println(future.get()); //输出 Hello World
}
}
在这个例子中,我们首先使用supplyAsync()来创建一个异步任务,然后使用thenApply()来对这个任务的结果进行处理。最后通过future.get()方法获取最终的结果。
CompletableFuture不仅仅是Future接口的一个实现,更重要的是,CompletableFuture提供了一种新的编程模型,让我们能够使用函数式的风格来更加简洁、优雅地进行异步编程。
CompletableFuture基础使用
接下来介绍如何获取CompletableFuture,最简单的方式就是使用构造方式 CompletableFuture completableFuture = new CompletableFuture();
,但是并不建议这么做,为什么呢?我们通过API文档看一下。
上面就已经写着,创建一个不完整的CompletableFuture对象。既然不建议使用这种方式,那我们应该使用哪种呢?在CompletableFuture类中,提供了四个静态方法,这四个方法能够创建对象,先看一下API文档的描述。
分别是两个runAsync与两个supplyAsync方法,接下来就对这四个方法进行解析。
runAsync无返回值方法
runAsync()是Java 8中CompletableFuture的一个重要方法,用于异步执行没有返回值的任务。这个方法接受一个实现了Runnable接口的对象作为参数,返回一个新的CompletableFuture。runAsync()的工作原理是:在新的线程中执行传入的Runnable任务,然后在任务完成后,返回的CompletableFuture也就完成了。
通过源码中可以看到,提供了两个runAsunc方法
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
我们先来看他们的返回结果的类型是Void
,Void类是一个不可实例化的占位符类,用于保存对表示Java关键字Void的class对象的引用。
然后是这个第一个方法,需要传来一个Runnable接口,这里在掉用创建对象的时候,有个asyncPool,这就是默认的线程池-ForkJoinPool,这个是自己携带的线程池。
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
接着是第二个方法,多了个Executor executor参数,CompletableFuture是提供了可以让用户使用自己的线程池。
最后就是实例化对象了
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
e.execute(new AsyncRun(d, f));
return d;
}
supplyAsync有返回值方法
supplyAsync()方法接收一个实现了Supplier接口的对象作为参数,这个Supplier对象提供了一个get()方法用以生成结果。然后,supplyAsync()以异步的方式执行这个Supplier任务,并返回一个CompletableFuture对象,这个CompletableFuture对象以后可以用来获取任务的执行结果。
通过源码来看,supplyAsync也是提供了两种方法,区别也是可以传入自己定义的线程池,而不用默认的ForkJoinPool。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
最后创建对象的方法,具体与runAsync是大差不差的。就是这里的传参与runAsync不同的是,不再是传入Runnable接口,而是Supplier接口。
在Java中,Supplier是一个函数式接口,主要被用来代表一个无参数的函数,返回一个结果。Supplier接口在Java 8中被引入,主要为了支持Java的函数式编程。
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
使用创建异步线程
首先,我们用一个简单的例子来测试以下runAsync方法。
public class CompletionFutureBuildDemo {
@SneakyThrows
public static void main(String[] args) {
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " - 执行任务");
// 模拟一个长时间的计算任务
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 打印获取结果
System.out.println(completableFuture.get());
}
}
我们先传入一个Runnable接口,并且打印了线程名,以及睡眠1s来表示计算的时间,我们知道返回的是Void
,是无返回值的,我们通过get方法来查看是什么结果。
ForkJoinPool.commonPool-worker-25 - 执行任务
null
通过执行后,没有指定线程池的时候,就会默认使用ForkJoinPool线程池,并且无返回值,因此get得到的数据将会是null。
接着我们使用携带线程池的方法:
public class CompletionFutureBuildDemo {
@SneakyThrows
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " - 执行任务");
// 模拟一个长时间的计算任务
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, threadPool);
// 打印获取结果
System.out.println(completableFuture.get());
threadPool.shutdown();
}
}
我们就能够看到所输出的结果,用到了创建的线程池。
pool-1-thread-1 - 执行任务
null
接下来我们使用有返回值的supplyAsync方法。
也是和runAsync方法一样,有两个方法,一个携带自己定义的线程池,这里就直接使用携带创建的线程池的方法来创建异步任务。
@SneakyThrows
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " - 执行任务");
// 模拟一个长时间的计算任务
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "获得结果";
}, threadPool);
System.out.println(async.get());
threadPool.shutdown();
}
运行一下,就能看到得到了异步线程执行后的返回值。
pool-1-thread-1 - 执行任务
获得结果
whenComplete
异步线程创建就如上代码,那么如果我们是下个任务依赖上个异步任务呢?这又要怎么办呢?在CompletionFuture中,有个方法whenComplete,这个方法代表当第一个异步任务执行完毕,将执行这个whenComplete后面的任务,并且可以使用到任务一的结果。
我们看一下源码
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
里面是传入BiConsumer,他是个消费型函数接口,通过给定参数进行一系列的操作,但是没有返回值。
在Java中,BiConsumer是一个函数式接口,它代表了一个接受两个输入参数并且没有返回类型的方法。也就是说,它代表了可以接受两个参数并且进行某种操作,但不需要返回结果的行为。
这里我们通过案例来了解
public class CompletionFutureDemo {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println("任务一正在执行...");
int i = ThreadLocalRandom.current().nextInt();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("任务一执行完毕");
return i;
}, pool).whenComplete((v, e) -> {
if (e == null) {
System.out.println("没有异常,获得到任务一的返回值: " + v);
}
}).exceptionally((e) -> {
e.printStackTrace();
System.out.println("异常情况:" + e.getCause() + " - - - " + e.getMessage());
return null;
});
System.out.println("主线程继续执行其他方法...");
pool.shutdown();
}
}
简单啰嗦一下上面代码,首先是一样的使用supplyAsync创建异步任务,这里模拟两秒获得随即数,然后完成之后去做第二个任务,知道第二个任务也做完了就结束。
接着详细看一下,首先是whenComplete方法,如下图可以看到,需要的是一个携带参数的函数接口,这里需要携带两个,一个是任务一返回的结果,第二个是出现的异常。
获得之后,通过判断异常是否为空,如果为空就是没有异常,那就直接输出结果。那如果发生异常呢?那就跟如下图一样,需要对异常进行处理。
这里需要传入的参数就是异常,这里就是简单输出异常信息,并且返回null。我们看一下输出的结果:
任务一正在执行...
主线程继续执行其他方法...
任务一执行完毕
没有异常,获得到任务一的返回值: 729194893
注:这里使用了线程池,如果不使用线程池就会导致没有输出后面的内容,这就是因为主线程执行太快了已经结束了,导致守护线程也结束了。
当我们在获取随机数之后执行一段异常代码int n = 1 / 0
,我们再来看看最终的结果。
很明显就走了异常处理的方法,但是主线程还是继续执行。
get与join
接下来看看get与join方法有什么不同,这两个方法都是为了获取返回值。
我们通过源码来简单了解:
get():
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
join():
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
两个的区别就是get需要抛出异常,而join不需要抛出异常。
转发请携带作者信息 @怒放吧德德 @一个有梦有戏的人
CompletableFuture高级应用
对计算进行处理
计算结果存在依赖,线程是串行化,通过上个任务执行完毕的回调给下个任务结果进行处理。
thenApply的用法
thenApply是能够获取上一个任务的执行结果,并且进行接下来的处理,只需要提供一个参数,这个参数就是上个任务执行的结果,如以下例子:
public class ThenApplyDemo {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println("计算第一步");
return 1;
}, pool).thenApply((v) -> {
System.out.println("计算第二步");
return v + 1;
}).thenApply(v -> {
System.out.println("计算第三步");
return v + 1;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("经过三个步骤计算出结果: " + v);
}
}).exceptionally(e -> {
System.out.println("出现异常" + e);
return null;
});
System.out.println("执行其他业务...");
pool.shutdown();
}
}
执行完成之后我们可以从控制台看到得到的结果是计算后的结果:3,whenComplete是为了等任务执行之后进行输出,具体用法本文上部分已经介绍了。
计算第一步
计算第二步
计算第三步
经过三个步骤计算出结果: 3
执行其他业务...
但是thenApply有个问题,就是当出现异常的时候,后续相关的任务将不会继续执行,我们如上例子,将第二步打印之前通过int i = 1/0;
抛出异常。
计算第一步
出现异常java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
执行其他业务...
得到的结果就是出现异常之后的任务不会继续执行,但是不会影响主线程的执行。
总的来说就是,thenApply的任务要等上个任务执行之后才会执行,也需要上个任务的返回值,并且需要返回结果。
handle的用法
使用thenApply会使出现异常之后的任务不在继续执行,那么使用handle就不会受这个限制,会继续将任务进行到底。
使用handle方法,是需要传递一个BiFunction,需要提供两个参数,一个是上个任务的结果,另一个是异常。
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println("计算第一步");
return 1;
}, pool).handle((v, e) -> {
System.out.println("计算第二步");
return v + 1;
}).handle((v, e) -> {
System.out.println("计算第三步");
return v + 1;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("经过三个步骤计算出结果: " + v);
}
}).exceptionally(e -> {
System.out.println("出现异常" + e);
return null;
});
System.out.println("执行其他业务...");
pool.shutdown();
}
以上是正常流程,输出和上文一样。
当我们一样在计算第二步之前抛出异常,就会得到以下结果:
计算第一步
计算第三步
出现异常java.util.concurrent.CompletionException: java.lang.NullPointerException
执行其他业务...
虽然会继续往下走,但是第三步获取到的数据就已经是null了。
对计算进行消费
接收任务的处理结果,并且消费处理,没有返回结果。
thenAccept的用法
对计算进行消费使用了thenAccept方法,通过下图,我们可以对thenApply与thenAccept进行对比,thenApply是个接受一个参数并产生结果的函数接口,而thenAccept是消费型函数接口,需要传递接口,并不需要返回值。
如下代码来看看案例
public class ThenAcceptDemo {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
return 1;
}, pool).thenApply(v -> {
return v + 100;
}).thenApply(v -> {
return v + 101;
}).thenAccept(a -> {
System.out.println("消费结果:" + a);
});
}
}
输出的结果就是1+100+101的结果202。
总的来说就是,thenAccept的任务要等上个任务执行之后才会执行,也需要上个任务的返回值,但是不需要返回任何结果。
thenRun的用法
这里简单介绍一下thenRun。thenRun方法让你能够在CompletableFuture完成后执行某一段代码或某个动作,但是它不关心CompletableFuture的结果。换句话说,它只是在Future完成后运行一个Runnable,并返回新的CompletableFuture。
通过他的定义
public CompletableFuture<Void> thenRun(Runnable action) {...}
他需要的是个Runnable接口,而这个接口是不需要任何参数,也不会返回任何结果。
以下是简单案例:
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenRun( () -> System.out.println("Computation finished.") );
说白了就是thenRun方法会等上一个任务执行完后执行此任务,但是不需要上个任务的结果,也不会返回任何结果,所以,当使用future.get()
,得到的将会是null。
使用线程池执行异步任务
在使用thenApply还是thenAccept等方法,CompletionStage接口都提供了Async的方法。
因为他们也是属于异步任务的一种,使用...Async
方法能够用新的线程来执行。我们以thenApply的来做比较。
CompletableFuture的thenApply和thenApplyAsync这两个方法看起来非常相似,它们都是用来处理CompletableFuture的结果并生成新的CompletableFuture。但它们在实现异步行为时,采用的线程模型是不同的。
- thenApply: thenApply方法中的函数是在原来的CompletableFuture完成后,在同一个线程中执行的。这意味着如果你的第一个操作完成后,立即对其返回的结果进行处理,那么处理结果的操作就会在同一个线程中执行。
- thenApplyAsync: thenApplyAsync方法中的函数是在一个新的线程中执行的,这个线程是由ForkJoinPool.commonPool()提供的,除非你显式提供了一个自定义的Executor。也就是说,你的结果处理函数将会在一个独立的线程中异步执行。
如果需要更高的并发性,或者如果结果处理函数可能需要执行长时间的操作,那么使用thenApplyAsync会更合适。否则,为了减少线程切换带来的开销,选择thenApply.
当线程混合使用thenApply与thenApplyAsync的时候,会发生一些情况。
1 如果一开始就没有使用自定义线程池,第二个任务是thenApplyAsync,那么两次都会是使用默认线程池。
2 如果第一个是使用了自定义线程池,那么第二个是thenApplyAsync的时候,第二个还是会使用默认线程池。
3 如果第一个使用了自定义线程池,第二个thenApply时候,将会和第一个一致的自定义线程池。
4 如果是以下情况
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " - 1");
return 1;
}, pool).thenApply(v -> {
System.out.println(Thread.currentThread().getName() + " - 2");
return 1+1;
}).thenApplyAsync(v -> {
System.out.println(Thread.currentThread().getName() + " - 3");
return 1+1;
})
输出的结果将会是第二个使用了main线程。
pool-1-thread-1 - 1
main - 2
ForkJoinPool.commonPool-worker-25 - 3
那么这是为什么呢?
这是因为处理太快了,系统优化切换原则,直接使用了main线程处理。
任务组
使用thenCompose组合多个异步任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1");
CompletableFuture<String> future2 = future1.thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " Task 2"));
System.out.println(future2.get());
使用thenCombine组合两个并行执行的任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2);
使用allof等待所有任务完成
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2);
allOf.thenRun(() -> {
System.out.println("All tasks completed.");
});
这是最常见的api,这个的处理最后会等待所有的异步任务都结束。
总结
本文详细介绍了Future的不足以及CompletableFuture的演化过程和使用方法。通过具体的案例分析,我们看到CompletableFuture在处理异步任务时提供了更灵活和强大的功能。它不仅解决了Future接口的局限性,还通过丰富的API极大地简化了异步编程的复杂度,使得代码更加简洁和易于维护。
通过掌握CompletableFuture的基本使用和高级用法,开发者可以更高效地处理并发任务,提高系统的响应性能和可靠性。在实际开发中,合理使用CompletableFuture,可以显著提升应用程序的并发处理能力和用户体验。
转发请携带作者信息 @怒放吧德德 @一个有梦有戏的人
持续创作很不容易,作者将以尽可能的详细把所学知识分享各位开发者,一起进步一起学习。
👍创作不易,如有错误请指正,感谢观看!记得点赞哦!👍
谢谢支持!