引言
CompletableFuture 是 Java 8 引入的一个强大的异步编程工具,用于处理异步操作和处理结果。它实现了 Future 和 CompletionStage 接口,提供了丰富的方法来处理异步任务的完成、组合和异常处理。
CompletableFuture本质是对异步线程的返回值的处理,所以要有线程池和对异步结果的处理
方法使用
使用 runAsync 执行无返回值的异步任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureExample {
public static void main(String[] args) {
// 创建一个线程池
ExecutorService executor = Executors.newSingleThreadExecutor();
// 创建一个无返回值的异步任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
System.out.println("异步任务执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executor);
// 主线程继续执行其他任务
System.out.println("主线程继续执行");
// 等待异步任务完成
future.join();
// 关闭线程池
executor.shutdown();
}
}
使用 supplyAsync 执行有返回值的异步任务
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureSupplyExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
// 创建一个有返回值的异步任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
return "异步任务的结果";
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}, executor);
System.out.println("主线程继续执行");
// 获取异步任务的结果
String result = future.join();
System.out.println("异步任务的结果是: " + result);
executor.shutdown();
}
}
和runAsync区别是join方法有没有返回结果
使用 thenApply 对结果进行转换
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThenApplyExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
return 10;
} catch (InterruptedException e) {
e.printStackTrace();
return 0;
}
}, executor).thenApply(result -> result * 2);
Integer finalResult = future.join();
System.out.println("最终结果是: " + finalResult);
executor.shutdown();
}
}
使用 thenAccept 消费结果
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThenAcceptExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
return 20;
} catch (InterruptedException e) {
e.printStackTrace();
return 0;
}
}, executor).thenAccept(result -> System.out.println("消费结果: " + result));
future.join();
executor.shutdown();
}
}
源码解读
CompletableFuture 内部有一个 volatile 类型的 state 变量来表示任务的状态,常见的状态有:
- NEW:初始状态,表示任务还未开始执行。
- COMPLETING:正在完成状态,表示任务正在执行完成操作。
- NORMAL:正常完成状态,表示任务正常执行并返回结果。
- EXCEPTIONAL:异常完成状态,表示任务执行过程中抛出了异常。
runAsync
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
private static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
e.execute(new AsyncRun(d, f));
return d;
}
runAsync 方法接收一个 Runnable 任务和一个 Executor 线程池,它会创建一个新的 CompletableFuture 对象,并将任务封装成 AsyncRun 对象提交给线程池执行。
supplyAsync 方法源码
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
private static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
supplyAsync 方法与 runAsync 类似,只不过它接收的是一个 Supplier 任务,该任务有返回值。它会创建一个新的 CompletableFuture 对象,并将任务封装成 AsyncSupply 对象提交给线程池执行。
thenApply 方法源码
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
if (e != null || !d.uniApply(this, f, null)) {
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
thenApply 方法源码thenApply 方法接收一个 Function 函数,用于对 CompletableFuture 的结果进行转换。它会创建一个新的 CompletableFuture 对象,并将转换操作封装成 UniApply 对象。如果当前任务已经完成,则直接执行转换操作;否则,将 UniApply 对象添加到等待队列中,等待当前任务完成后再执行转换操作。