在实际项目开发过程中,大部分程序的执行顺序都是按照代码编写的先后顺序,依次从上往下挨个执行的,但是对于统计或者批量操作数据时,是否有更好的方案呢?这时候就可以考虑使用多线程编程,异步并行执行多个任务,从而提升用户使用体验,发挥多核cpu的性能。
更多关于CompletableFuture的说明,请参看如下文章:
Java多线程之CompletableFuture_java completablefuture-CSDN博客
Java中的CompletableFuture原理与用法_java_脚本之家
以下文案是基于数据统计方案去设计编写的,希望可以对各位有所帮助。
一.CompletableFuture是什么
CompletableFuture是由Java8引入的,用于异步编程,异步通常意味着非阻塞,运行任务单独的线程,与主线程隔离。并且通过回调可以在主线程中得到异步任务的执行状态,是否完成和异常等信息。通过这种方式编程主线程不会被阻塞,不需要等到多个子线程执行完成,它可以并行执行其他任务。使用这种并行编程方式,可以极大的提高程序的执行和运行效率,提升用户使用体验等。
与Future对比:
- CompletableFuture默认使用的是ForkJoinPool线程池;
- CompletableFuture 是 Future API的扩展;
- Future在实际使用过程中存在局限性,如不支持异步任务的编排组合,获取计算结果的get() 方法为阻塞调用;
- CompletableFuture除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合等扩展功能。
二.如何创建CompletableFuture实例
1. supplyAsync()
通过该函数创建的CompletableFuture实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join获取最终计算结果。
/**
* 有返回值的
* 如果不指定线程池,默认的构造方法使用ForkJoinPool
**/
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture<List<SysDept>> cfDeptList
= CompletableFuture.supplyAsync(() -> {
return remoteStationService.getAllStationInfo(0).getData();
},executorService);
List<SysDept> join = cfDeptList.join();
2.runAsync()
与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。
//无返回值
CompletableFuture cf = CompletableFuture.runAsync(()->{
getFireFightingData(params,resultAll);
},fjp);
3.Future.get()和CompletableFuture.join()对比
- 这两个方法都是阻塞方法,用于获取异步任务的结果。
- Future.get()方法在获取异步任务结果时更具灵活性,因为它必须声明抛出异常且需要手动处理,同时它会阻塞线程调用。
- CompletableFuture.join() 方法更适用于使用 ForkJoinPool 线程池执行任务的情况,它更方便用于不会阻塞 ForkJoinPool 线程池中的线程中。
三.CompletableFuture使用
//仅展示部分关键代码
ForkJoinPool fjp = new ForkJoinPool(8);
System.out.println("默认使用的线程为:" + Thread.currentThread().getName());
CompletableFuture cf1 = CompletableFuture.runAsync(()->{
System.out.println("cf1使用的线程为:" + Thread.currentThread().getName());
//站数据
List<StationDTO> stationAllVoList = staMapper.getAllStationVoList(null);
Map<String, StationDTO> stationAllVoMap = null;
if(CollectionUtils.isNotEmpty(stationAllVoList)){
stationAllVoMap = stationAllVoList.stream().collect(Collectors.toMap(StationDTO::getStationName, Function.identity()));
}else {
stationAllVoMap = new HashMap<>();
}
resultAll.put("stationAllVoMap",stationAllVoMap);
},fjp);
System.out.println("cf1使用完之后的线程为:" + Thread.currentThread().getName());
CompletableFuture cf2 = CompletableFuture.runAsync(()->{
System.out.println("cf2使用的线程为:" + Thread.currentThread().getName());
getSafetyCountData(params,resultAll);
},fjp);
CompletableFuture cf3 = CompletableFuture.runAsync(()->{
System.out.println("cf3使用的线程为:" + Thread.currentThread().getName());
getRectifyRateData(params,resultAll);
},fjp);
CompletableFuture cf4 = CompletableFuture.runAsync(()->{
System.out.println("cf3使用的线程为:" + Thread.currentThread().getName());
getAlarmARemindData(params,resultAll);
},fjp);
CompletableFuture.allOf(cf1,cf2,cf3,cf4).join();
fjp.shutdown();
需要注意的点:
- allOf()返回的CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
- anyOf()返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任务的执行结果,如果该任务执行异常,则抛出异常。
四. 模拟测试验证
经过测试, 多个子线程执行并不会影响主线程的继续执行,也不存在线程阻塞问题,与设计效果保持一致,非常值得借鉴学习。