可以看到CompletableFuture实现了CompletionStage 和Future的两个接口。CompletionStage提供了任务之间的衔接能力,而Future则是经常用于阻塞获取结果。
CompletableFuture 的内部使用了基于 ForkJoinPool 的线程池,这种线程池可以高效地调度和执行任务。CompletableFuture 的非阻塞特性得益于其对任务完成的监听机制。当任务完成时,它会遍历所有注册的回调函数,并在合适的线程中执行这些回调。通过这种机制,CompletableFuture 能够在任务完成后及时返回结果或触发后续处理逻辑,而不会阻塞主线程的执行。
使用场景
场景一 并行执行多个任务
public class FutureDemo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
task1();
});
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> {
task2();
});
CompletableFuture<Void> f3 = CompletableFuture.runAsync(() -> {
task3();
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(f1, f2, f3);
try {
// 所有任务一起并行
allOf.get();
System.out.println("all task finish cost:" + (System.currentTimeMillis() - start));
} catch (InterruptedException e) {
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("main thread finsh!!!");
}
public static void task1() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task1 finish cost 3000!!!");
}
public static void task2() {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task2 finish cost 6000!!!");
}
public static void task3() {
try {
Thread.sleep(9000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task3 finish cost 9000!!!");
}
}
可以看到运行结果,多个任务并行执行,总任务完成时间是耗时最长的任务:
场景一 父任务执行完毕以后 开启多个子任务
public class FutureDemo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
try {
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
task1();
});
// 任务一执行完毕
f1.get();
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> {
task2();
});
CompletableFuture<Void> f3 = CompletableFuture.runAsync(() -> {
task3();
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(f2, f3);
// 任务2,3并行
allOf.get();
System.out.println("all task finish cost:" + (System.currentTimeMillis() - start));
} catch (InterruptedException e) {
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("main thread finsh!!!");
}
public static void task1() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task1 finish cost 3000!!!");
}
public static void task2() {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task2 finish cost 6000!!!");
}
public static void task3() {
try {
Thread.sleep(9000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task3 finish cost 9000!!!");
}
}
由于第一个任务阻塞执行完毕,2,3任务并行执行,时间最长的执行了9秒所以总时间是12左右。
场景三 场景一的变种
A B
C
我们希望A和C都执行完以后继续往后执行,并且C的执行顺序严格在B以后。
public class FutureDemo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
try {
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
task1();
});
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> {
task2();
});
CompletableFuture<Void> f3 = f2.thenRunAsync(() -> {
task3();
});
CompletableFuture<Void> allOf = CompletableFuture.allOf(f1, f3);
allOf.get();
System.out.println("all task finish cost:" + (System.currentTimeMillis() - start));
} catch (InterruptedException e) {
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("main thread finsh!!!");
}
public static void task1() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task1 finish cost 3000!!!");
}
public static void task2() {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task2 finish cost 6000!!!");
}
public static void task3() {
try {
Thread.sleep(9000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task3 finish cost 9000!!!");
}
}
由于任务一是一个任务,而任务2和3是一起执行,所以相当于是两个任务并行,时间最长是任务2,3加在一起。
场景四 任取其一
这种场景什么时候使用呢?比如我们对数处理做了不同的策略,我们不知道哪个计算的速度更快,我们希望谁先出结果就优先使用。
public class FutureDemo {
public static void main(String[] args) {
long start = System.currentTimeMillis();
try {
CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
task1();
});
CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> {
task2();
});
CompletableFuture<Void> f3 = CompletableFuture.runAsync(() -> {
task3();
});
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(f1, f2, f3);
// 任务2,3并行
anyOf.get();
System.out.println("all task finish cost:" + (System.currentTimeMillis() - start));
} catch (InterruptedException e) {
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("main thread finsh!!!");
}
public static void task1() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task1 finish cost 3000!!!");
}
public static void task2() {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task2 finish cost 6000!!!");
}
public static void task3() {
try {
Thread.sleep(9000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("task3 finish cost 9000!!!");
}
}
可以看到这里肯定是耗时最短的任务1先执行完毕,所以 总耗时为3秒。
参考资料:
https://www.yuque.com/itlaoqi/uprrn9/nakraehf13w4xdy4