目录
CompletableFuture引言
CompletableFuture本质
CompletableFuture与Future的关系
CompletableFuture创建
同步方法
异步方法
CompletableFuture执行结果
一元依赖
二元依赖
多元依赖
CompletableFuture异常处理
CompletableFuture实现原理
Java 8中引入了CompletableFuture类,它是一种方便的异步编程工具,其可以对多数据源之间的请求的流程编排,可以处理各种异步操作,如网络请求、文件IO和数据库操作等。它是Java的Future接口的扩展,提供了一些有用的方法来创建、操作和组合异步操作。本文将详细介绍CompletableFuture的使用方式。
CompletableFuture引言
把Runnable理解为最基本的线程任务,只具备在线程下执行一段逻辑的能力。为了获取执行的返回值,创造了Callable和与其配合使用的Future。为了将任务之间进行逻辑编排,就诞生了CompletableFuture。关于如何理解任务的逻辑编排,举一个简单的例子:
某天上班,我们需要打开电脑,等待电脑启动的时候,但电脑却需要更新系统,可以去泡杯咖啡。这时我们发现打开电脑-更新系统有先后顺序,但和泡咖啡没有先后顺序。打开电脑-更新系统是串行,但泡咖啡就是并行,此时这就形成了任务编排的执行链。
在IO密集型系统中,类似的场景有很多。不同数据集的查询依赖主键不同,A数据集的查询主键是B数据集的一个字段这种情况很常见,通常还需要并发查询多个数据集的数据,所以对于多线程的执行编排是有需求的。
一种解决办法是CountDownLatch,让线程执行到某个地方后进行等待,直到依赖的任务执行结束。对于一些简单的执行链是可以满足的,但是当编排逻辑复杂起来,CountDownLatch会导致代码难以维护和调试。所以诞生了CompletableFuture用来描述和维护任务之间的依赖关系以进行任务编排。在实际应用中,有以下两类场景是适合使用任务编排的:
-
多数据源请求的流程编排
-
非阻塞化网关等NIO场景
CompletableFuture本质
CompletableFuture是Java中的一个异步编程工具,它提供了一种简洁而强大的方式来处理异步任务和操作的结果。CompletableFuture本质上是一个Future的扩展,同时结合了回调和函数式编程的思想。CompletableFuture的本质可以总结如下:
-
异步执行:CompletableFuture允许在后台线程中执行任务,以避免阻塞主线程。通过使用线程池或ForkJoinPool来管理任务的执行。
-
链式操作:CompletableFuture提供了一系列的方法(例如thenApply、thenAccept、thenCompose等),可以对异步操作进行链式组合,形成一个操作流水线。这些方法可以接收前一个操作的结果,并返回一个新的CompletableFuture对象。
-
异步结果处理:CompletableFuture提供了一系列的方法(例如join、get、whenComplete等)来获取异步操作的结果或处理异常。它们支持阻塞和非阻塞两种方式,可以根据实际需求选择合适的方法。
-
异常处理:CompletableFuture允许对异步操作中的异常进行处理,可以通过exceptionally、handle、whenComplete等方法来定义异常处理逻辑。
-
结果组合:CompletableFuture提供了一些静态方法(例如allOf、anyOf等)来组合多个CompletableFuture的结果。这些方法可以用于并行执行多个异步操作,并在所有操作完成或任意一个操作完成时获取结果。
-
可编程性:CompletableFuture利用了Java 8中引入的函数式编程特性,可以通过Lambda表达式或方法引用来定义异步操作和回调。
CompletableFuture本质上是一个基于Future的并发编程工具,它通过异步执行、链式操作和异常处理等特性,提供了一种简单而强大的方式来处理异步任务和操作的结果。它在Java中广泛应用于并发编程、网络编程、异步IO等场景。
CompletableFuture与Future的关系
CompletableFuture是对Future的功能扩展和增强,它提供了更多的方法来处理异步任务和操作的结果。可以说CompletableFuture是Future的增强版。CompletableFuture与Future之间的关系可以总结如下:
-
继承关系:CompletableFuture是Future的子接口,它继承了Future接口的方法,并扩展了更多的方法。
-
异步操作:Future接口表示一个异步计算的结果,提供了获取结果的方法(如get())和判断任务是否完成的方法(如isDone())。但是,Future的方法是阻塞的,这意味着调用get()方法会一直等待任务的完成,如果任务没有完成,会导致主线程被阻塞。
-
异步回调:CompletableFuture引入了回调的概念,通过一系列的方法(如thenApply、thenAccept、thenCompose等),可以将一个任务的结果传递给下一个任务,并在任务完成时触发相应的回调操作。这使得异步操作变得更加灵活和可编程。
-
异常处理:CompletableFuture提供了更丰富的异常处理机制,可以通过exceptionally、handle、whenComplete等方法来处理异步操作中的异常情况。而Future只能通过捕获InterruptedException和ExecutionException来处理异常。
-
链式操作:CompletableFuture支持链式操作,可以使用一系列的方法将多个异步任务串联起来,形成一个操作流水线。而Future则需要手动编写逻辑来处理多个异步任务的结果。
CompletableFuture创建
CompletableFuture提供了多种方法来创建CompletableFuture对象。大致分为同步创建和异步创建。
同步方法
使用CompletableFuture的构造方法创建CompletableFuture对象。
CompletableFuture<String> future = new CompletableFuture<>();
// 创建方式1
CompletableFuture<String> demo = new CompletableFuture<>();
demo.complete("success");
System.out.println(demo.get());
// 创建方式2
CompletableFuture<String> demo = CompletableFuture.completedFuture("success");
System.out.println(demo.get());
与FutureTask类似,CompletableFuture也通过get()方法获取执行结果。但是不同的是,CompletableFuture本身可以不承载可执行的任务(相比FutureTask则必须承载一个可执行的任务Callable),通过一个用于标记执行成功并设置返回值的函数,在使用上也更为灵活,即方式1。和Future类似,get()函数也是同步阻塞的,调用get函数后线程会阻塞直到调用complete方法标记任务已经执行成功。
除了手动触发任务的完成,也可以让创建对象的同时就标记任务完成。即方式2。
异步方法
相比于同步方法,异步执行更为常见。比如下面这个例子:
// 方式3,需要返回结果
CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {
System.out.println("do something by thread" + Thread.currentThread().getName());
return "success";
});
System.out.println(demo.get());
// 方式4,需要返回结果
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> demo = CompletableFuture.supplyAsync(() -> {
System.out.println("do something by thread" + Thread.currentThread().getName());
return "success";
}, executor);
System.out.println(demo.get());
// 方式5,不需要返回结果
CompletableFuture.runAsync(() -> {
System.out.println("do something by thread" + Thread.currentThread().getName());
});
使用CompletableFuture.supplyAsync()方法创建异步执行的Supplier,Supplier中的代码会在异步线程中执行,代码执行完毕后,CompletableFuture将会得到执行结果。即方式3,方式4。方式3默认会使用ForkJoinPool的公共线程池来执行代码(不推荐),当然也可以指定线程池,即方式4。
使用CompletableFuture.runAsync()方法创建异步执行的Runnable,Runnable中的代码会在异步线程中执行,代码执行完毕后,CompletableFuture将会得到null作为执行结果。即方式5。
CompletableFuture执行结果
多任务编排是CompletableFuture的核心,这里列举不同的场景来进行说明
一元依赖
步骤2依赖步骤1,步骤1执行完毕才能执行步骤2,类似主线程的顺序执行
package com.common.demo.completableFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Evan Walker
* @version 1.0
* @desc 《一元依赖》步骤2依赖步骤1,步骤1执行完毕才能执行步骤2,类似主线程的顺序执行
* @date 2023/10/08 09:40:30
*/
public class Demo1 {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行【步骤1】");
return "【步骤1的执行结果】";
}, executor);
CompletableFuture<String> step2 = step1.thenApply(result -> {
System.out.println("上一步操作结果为:" + result);
return "【步骤2的执行结果】";
});
System.out.println("步骤2的执行结果:" + step2.get());
}
}
运行结果如下:
二元依赖
步骤1和步骤2是并行的,而步骤3需要等步骤1和步骤2执行完之后才能执行
package com.common.demo.completableFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Evan Walker
* @version 1.0
* @desc 《二元依赖》 步骤1和步骤2是并行的,而步骤3需要等步骤1和步骤2执行完之后才能执行
* @date 2023/10/08 09:45:17
*/
public class Demo2 {
public static void main(String[] args) throws Exception{
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行【步骤1】");
return "【步骤1的执行结果】";
}, executor);
CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行【步骤2】");
return "【步骤2的执行结果】";
}, executor);
CompletableFuture<String> step3 = step1.thenCombine(step2, (result1, result2) -> {
System.out.println("前两步操作结果分别为:" + result1 + result2);
return "【步骤3的执行结果】";
});
System.out.println("步骤3的执行结果:" + step3.get());
}
}
运行结果如下:
多元依赖
步骤N 需要依赖1-M的M个前置节点
package com.common.demo.completableFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Evan Walker
* @version 1.0
* @desc 《多元依赖》 步骤N 需要依赖1-M的M个前置节点
* @date 2023/10/08 09:50:21
*/
public class Demo3 {
public static void main(String[] args) throws Exception{
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行【步骤1】");
return "【步骤1的执行结果】";
}, executor);
CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行【步骤2】");
return "【步骤2的执行结果】";
}, executor);
CompletableFuture<String> step3 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行【步骤3】");
return "【步骤3的执行结果】";
}, executor);
CompletableFuture<Void> stepN = CompletableFuture.allOf(step1, step2, step3);
CompletableFuture<String> stepNResult = stepN.thenApply(res -> {
// 通过join函数获取返回值
String result1 = step1.join();
String result2 = step2.join();
String result3 = step3.join();
return result1 + result2 + result3;
});
System.out.println("步骤N的结果:" + stepNResult.get());
}
}
运行结果如下:
通过allOf函数声明当参数中的所有任务执行完毕后,才会执行下一步操作,但是要注意,allOf本身只是定义节点,真正阻塞的位置是thenApply函数。
和之前的方式不同,由于采用了不定变量,所以要通过CompletableFuture#join来获取每个任务的返回值。
除了allOf之外,如果我们需要任意一个任务完成后就执行下一步操作,可以使用anyOf方法
package com.common.demo.completableFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Evan Walker
* @version 1.0
* @desc 《多元依赖》 步骤N 需要依赖1-M的M个前置任意一个节点
* @date 2023/10/08 09:50:21
*/
public class Demo4 {
public static void main(String[] args) throws Exception{
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> step1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行【步骤1】");
return "【步骤1的执行结果】";
}, executor);
CompletableFuture<String> step2 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行【步骤2】");
return "【步骤2的执行结果】";
}, executor);
CompletableFuture<String> step3 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行【步骤3】");
return "【步骤3的执行结果】";
}, executor);
CompletableFuture<Object> stepN = CompletableFuture.anyOf(step1, step2, step3);
System.out.println("步骤N的结果:" + stepN.get());
}
}
运行结果如下:
CompletableFuture异常处理
当CompletableFuture执行过程中出现异常时,我们需要使用exceptionally()方法来处理异常。
package com.common.demo.completableFuture;
import java.util.concurrent.CompletableFuture;
/**
* @author Evan Walker
* @version 1.0
* @desc
* @date 2023/10/08 10:00:32
*/
public class Demo5 {
public static void main(String[] args) throws Exception {
System.out.println("success:\t" + Demo5.exceptionally(6, 3).get());
System.out.println("exception:\t" + Demo5.exceptionally(6, 0).get());
}
public static CompletableFuture exceptionally(int a, int b) {
return CompletableFuture.supplyAsync(() -> a / b)
.exceptionally(ex -> {
System.out.println("exceptionally exception:\t" + ex.getMessage());
return 0;
});
}
}
在这个例子中,我们使用exceptionally()方法来处理CompletableFuture的异常。它接受一个Function函数,用于处理异常并返回一个默认值。
运行结果如下:
CompletableFuture实现原理
CompletableFuture的实现原理涉及到Java中的线程池、Future接口以及回调机制等。
-
线程池:CompletableFuture利用线程池来执行异步任务。在创建CompletableFuture对象时,可以使用Executors类提供的静态方法指定一个线程池,也可以使用ForkJoinPool来执行任务。通过线程池,CompletableFuture可以灵活地控制并发度和线程资源的分配。
-
Future接口:CompletableFuture是Future的实现类,它继承了Future接口,具备了获取异步操作结果、判断任务是否完成等基本功能。通过调用Future接口的方法,可以获得异步任务的执行结果,并对其进行处理。
-
回调机制:CompletableFuture引入了回调机制,通过一系列的方法(如thenApply、thenAccept、thenCompose等),可以将一个任务的结果传递给下一个任务,并在任务完成时触发相应的回调操作。这种链式的回调机制使得异步操作可以按照预期的顺序进行组合和处理。
-
异步任务执行过程:当调用CompletableFuture的某个方法创建异步任务时,任务会被提交给线程池进行执行。异步任务的执行过程中会将结果保存在CompletableFuture对象中,当任务完成时,可以通过CompletableFuture对象获取结果。
-
异常处理:CompletableFuture提供了更丰富的异常处理机制。可以通过exceptionally、handle、whenComplete等方法来处理异步操作中的异常情况。异常处理方法可以捕获到异步任务执行过程中抛出的异常,并定义相应的处理逻辑,保证程序的稳定性和健壮性。
总的来说,CompletableFuture在底层利用线程池、Future接口和回调机制等来实现异步操作。它通过链式的回调方式,使得异步任务的处理变得更加灵活和可编程,同时提供了更丰富的异常处理机制,方便开发者处理异步任务中的异常情况。这些特性使得CompletableFuture成为Java中处理异步编程的强大工具。
更多消息资讯,请访问昂焱数据(https://www.ayshuju.com)