目录
1 前言
2 常用方法
3 测试
3.1 runAsync:无返回值 和 SupplyAsync:有返回值
3.2 串行执行
3.3 任务3等待等任务1和任务2都执行完毕后执行
3. 4 任务3等待等任务1或者任务2执行完毕后执行
3.5 handleAsync
3.6 多任务执行
1 前言
CompletableFuture 是对 Future 的扩展和增强。CompletableFuture 实现了Future和CompletionStage接口,其中最重要的是CompletionStage,它定义了CompletableFuture 实现任务编排的一些基本方法。只能说非常好用。
其继承结构如下图所示,在IDEA中使用ctrl + alt + U 实现
CompletionStage
接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的,默认线程池是ForkJoinPool.commonPool()
,但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池。
2 常用方法
主要有同步和异步两大类,这里只讲异步方法。同步方法可以类比。
创建类型的方法
- runAsync:无返回值
- SupplyAsync:有返回值
中间处理类型的方法
- 任务与任务之间串行执行
- thenRunAsync:不能获取上一次的返回值,本次没有返回值。
- thenAccpetAsync:能获取上一次的返回值,但是本次没有返回值。
- thenApplyAsync:能获取上一次的返回值,本次有返回值。
- 任务3等待等任务1和任务2都执行完毕后执行
- runAfterBothAsync:不能获取任务1和任务2的返回值,本次没有返回值。
- thenAcceptBothAsync:能获取任务1和任务2的返回值,但是本次没有返回值。
- thenCombineAsync:能获取任务1和任务2的返回值,本次也有返回值。
- 任务3等待任务1或者任务2其中一个执行完毕、
- runAfterEitherAsync:不能获取任务1或任务2的返回值,本次没有返回值
- acceptEitherAsync:能获取任务1或任务2的返回值,本次没有返回值
- applyToEither:能获取任务1或任务2的返回值,本次有返回值
- 多任务
- allof:当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
- anyof:当任何一个给定的CompletableFuture完成时,返回一个新的CompletableFuture
结果处理类型的方法
- whenCompleteAsync:获取结果和异常,但是没有返回值
- exceptionally:在whenCompleteAsync后时候,获取异常并且有返回值
- handleAsync:相当于whenCompleteAsync和exceptionally的结合体。
3 测试
3.1 runAsync:无返回值 和 SupplyAsync:有返回值
- runAsync 相当于开启一个异步线程执行任务,主线程不等待结果返回。
- supplyAsync,开启一个异步线程执行任务,主线程要等待结果返回。
package cn.itcast.n6.c3;
import java.util.concurrent.*;
/**
* @author : msf
* @date : 2022/12/4
* completable 讲解。
*/
public class CompletableFutureD1 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
5,50,10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),Executors.defaultThreadFactory()
,new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程 start ...");
CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
},executor);
CompletableFuture<Integer> futureInt = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
return 10;
}, executor);
System.out.println(futureInt.get());
System.out.println("主线程 end ...");
}
}
上述代码执行结果:
3.2 串行执行
- thenRunAsync:不能获取上一次的返回值,本次没有返回值。
- thenAccpetAsync:能获取上一次的返回值,但是本次没有返回值。
- thenApplyAsync:能获取上一次的返回值,本次有返回值
package cn.itcast.n6.c3;
import java.util.concurrent.*;
/**
* @author : msf
* @date : 2022/12/4
* completable 讲解--串行编排
*/
public class CompletableFutureD4 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 50, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
, new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程 start ...");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executor).thenRunAsync(()->{
// 等任务1 执行完毕后,任务2执行
System.out.println("任务2执行了" +Thread.currentThread().getName());
},executor);
CompletableFuture.supplyAsync(() -> {
System.out.println("任务3子线程执行..." + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
}, executor).thenAcceptAsync((t)->{
System.out.println("任务4子线程执行..." + Thread.currentThread().getName());
System.out.println("t = " + t);
},executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务5子线程执行..." + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
}, executor).thenApplyAsync((t) -> {
System.out.println("任务6子线程执行..." + Thread.currentThread().getName());
System.out.println("t = " + t);
return 120;
}, executor);
System.out.println("主线程 end ..." + future1.get());
}
}
上述代码执行结果:
3.3 任务3等待等任务1和任务2都执行完毕后执行
开启一个任务1,然后开启任务2,我们需要的是,当任务1和2都执行结束后任务3再启动。
场景:例如两个人从不同地方去吃饭,都有两个人都到了然后开始点餐。
package cn.itcast.n6.c3;
import java.util.concurrent.*;
/**
* @author : msf
* @date : 2022/12/4
* completable 讲解-- 两个一起完成,或者两者其中之一完成
*/
public class CompletableFutureD5 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 50, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
, new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程 start ...");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
try {
Thread.sleep(600);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
}, executor);
future.runAfterBothAsync(future1, () -> {
System.out.println("任务3..." + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务3.... end " + Thread.currentThread().getName());
}, executor);
System.out.println("主线程 end ..." + future1.get());
}
}
上述代码执行结果:
3. 4 任务3等待等任务1或者任务2执行完毕后执行
场景:当两个人都出去吃饭,其中一个人到了可以先点餐。
package cn.itcast.n6.c3;
import java.util.concurrent.*;
/**
* @author : msf
* @date : 2022/12/4
* completable 讲解-- 两个一起完成,或者两者其中之一完成
*/
public class CompletableFutureD6 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 50, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
, new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程 start ...");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1子线程执行...end " + Thread.currentThread().getName());
}, executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
try {
Thread.sleep(600);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2子线程执行...end " + Thread.currentThread().getName());
return 100;
}, executor);
future.runAfterEitherAsync(future1, () -> {
System.out.println("任务3..." + Thread.currentThread().getName());
}, executor);
System.out.println("主线程 end ..." + future1.get());
}
}
上述代码执行如下:
3.5 handleAsync
handleAsync:相当于whenCompleteAsync和exceptionally的结合体,可以收集结果和异常信息然后进行下一步处理。
package cn.itcast.n6.c3;
import java.util.concurrent.*;
/**
* @author : msf
* @date : 2022/12/4
* completable 讲解。
*/
public class CompletableFutureD3 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 50, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), Executors.defaultThreadFactory()
, new ThreadPoolExecutor.AbortPolicy()
);
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("主线程 start ...");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("任务1子线程执行..." + Thread.currentThread().getName());
}, executor).handleAsync((res, exec) -> {
System.out.println("res1 = " + res);
System.out.println("exec1 = " + exec);
return null;
});
CompletableFuture<Integer> futureInt = CompletableFuture.supplyAsync(() -> {
System.out.println("任务2子线程执行..." + Thread.currentThread().getName());
int i = 10 / 0;
return 10;
}, executor).handleAsync((res, exec) -> {
System.out.println("res2 = " + res);
System.out.println("exec2 = " + exec);
return 55;
});
System.out.println(futureInt.get());
System.out.println("主线程 end ...");
}
}
上述代码执行结果:
3.6 多任务执行
模拟:三人吃饭,只要一个人到了就可以点餐,但是只有所有人到了,并且厨子把菜做好了才能上菜;
package cn.itcast.n6.c3;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @author : msf
* @date : 2022/12/5
* 模拟三人点餐,只要有一个人到了就可以点餐。然后交给厨子进行做饭
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long begin = System.currentTimeMillis();
CompletableFuture<Void> client1 = CompletableFuture.runAsync(() -> {
System.out.println("客户1开始出发了.....1小时后到");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("客户1到了");
});
CompletableFuture<Void> client2 = CompletableFuture.runAsync(()->{
System.out.println("客户2开始出发了.....半小时后到");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("客户2到了");
});
CompletableFuture<Void> client3 = CompletableFuture.runAsync(()->{
System.out.println("客户3开始出发了.....1.5小时后到");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("客户3到了");
});
CompletableFuture<Object> result = CompletableFuture.anyOf(client1, client2, client3);
System.out.println("已经有客户到了开始点餐" +result.get());
CompletableFuture<Void> cook = CompletableFuture.runAsync(() -> {
System.out.println("厨师开始做饭");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("厨师做完饭了");
});
CompletableFuture<Void> reuslt1 = CompletableFuture.allOf(client1, client2, client3, cook);
reuslt1.join();
System.out.println("人全部来齐了,厨师也做好饭了,上菜!");
long end = System.currentTimeMillis();
System.out.println("总共时间盲猜2.5秒, 真实时间" + (end - begin) );
}
}