CompletableFuture<Map<Menu, Map<IntentDetail, Double>>> xxx= CompletableFuture.supplyAsync(() -> {
Map<Menu, Map<IntentDetail, Double>> scores = new ConcurrentHashMap<>();
// 存储结果
scores.computeIfAbsent(menu, k -> new ConcurrentHashMap<>()).putAll(intentScoreMap);
});
}
return scores;
}, addTaskExecutor);
mergeScoreResults(cardIntentScore, xxx.join());
mergeScoreResults(cardIntentScore, xxx.join());
mergeScoreResults(cardIntentScore, xxx.join());
mergeScoreResults(cardIntentScore, xxx.join());
mergeScoreResults(cardIntentScore, xxx.join());
// 等待所有计算完成并合并结果
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
xxx,
xx,
xx,
xx,
xx
);
try {
allFutures.get(2000, TimeUnit.MILLISECONDS); // 设置超时时间
} catch (Exception e) {
log.error("并行计算意图打分超时", e);
break;
}
List<CompletableFuture<Void>> futures = new CopyOnWriteArrayList<>();
futuresCard.add(CompletableFuture.supplyAsync(() -> {
Map<Menu, Map<IntentDetail, Double>> scores = new ConcurrentHashMap<>();
// 逻辑处理
});
return scores;
}, addTaskExecutor));
Map<Menu, Map<IntentDetail, Double>> cardIntentScore = new ConcurrentHashMap<>();
try {
for (CompletableFuture<Map<Menu, Map<IntentDetail, Double>>> future : futuresCard) {
final Map<Menu, Map<IntentDetail, Double>> join = future.join();
mergeScoreResults(cardIntentScore, join);
}
} catch (Exception e) {
log.error("并行计算意图打分超时", e);
break;
}
问答环节
1.supplyAsync 和 runAsync 的区别
supplyAsync 和 runAsync 都是 CompletableFuture 类中的静态方法,用于异步执行任务。它们的主要区别在于返回值和使用场景:
runAsync
定义: public static CompletableFuture runAsync(Runnable runnable)
扩展形式: 还可以指定一个 Executor(线程池),如:public static CompletableFuture runAsync(Runnable runnable, Executor executor)
用途: 当你有一个不需要返回结果的任务时使用。这个方法接收一个 Runnable 作为参数,意味着它执行的操作不会返回任何结果(即返回类型为 Void)。
示例:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 执行一些不需要返回结果的操作
System.out.println("Running async operation...");
});
supplyAsync
定义: public static CompletableFuture supplyAsync(Supplier supplier)
扩展形式: 同样可以指定一个 Executor,如:public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
用途: 当你需要从异步操作中获取一个结果时使用。这个方法接收一个 Supplier 作为参数,意味着它可以返回一个结果(类型为 U),并且你可以通过 join() 或 get() 方法来获取这个结果。
示例:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 执行一些需要返回结果的操作
return 100;
});
try {
Integer result = future.get(); // 获取异步操作的结果
System.out.println("Result from async operation: " + result);
} catch (Exception e) {
e.printStackTrace();
}
总结
如果你的异步任务不需要返回任何结果,那么应该使用 runAsync。
如果你的异步任务需要返回一个结果以便后续处理,则应使用 supplyAsync。
两者都可以接受一个 Executor 参数来指定任务的执行器(比如自定义的线程池),如果没有提供执行器,则默认使用 ForkJoinPool.commonPool() 来执行异步任务。
2.什么是CompletableFuture的allOf方法?
当你有多个独立的异步任务需要执行,并且希望在所有这些任务都完成后进行一些操作时,就可以使用 allOf() 方法。需要注意的是,allOf() 方法返回的 CompletableFuture 不会提供各个 CompletableFuture 的结果。如果需要获取每个 CompletableFuture 的结果,则需要分别调用它们的 join() 或 get() 方法。
3.join() 和 get() 是 CompletableFuture 中用于获取异步计算结果的方法,但它们之间有一些重要的区别是什么?
下面是这两个方法的详细比较:
1.方法定义
get():
返回计算结果。
如果计算未完成,调用此方法会阻塞当前线程,直到计算完成。
抛出 InterruptedException 和 ExecutionException,后者用于指示计算过程中发生的异常。
join():
返回计算结果。
如果计算未完成,调用此方法也会阻塞当前线程,直到计算完成。
只抛出 CompletionException,而不是 InterruptedException 或 ExecutionException。如果计算过程中发生异常,join() 会将其封装在 CompletionException 中抛出。
2. 异常处理
get():
需要处理两种异常:
InterruptedException:当前线程在等待时被中断。
ExecutionException:当计算过程中发生异常。
join():
只需处理 CompletionException,这使得异常处理更简单。
3. 使用场景
get():
当你需要详细的异常信息时,可以使用 get(),因为它提供了更具体的异常类型。
join():
当你只关心是否成功完成,并且不需要关心中断状态时,使用 join() 更为简洁。
个人总结:复杂的组合类型的线程,使用CompletableFuture 是一个非常好的建议
在使用过程中对
1.不同的CompletableFuture ,不要求返回结果的时候 ,可以使用allOf + get/join 这样的话整个代码就会很简洁美丽
2.如果不同的CompletableFuture 嵌套在方法里面可以选择将CompletableFuture 放入一个List集合内,然后for 循环依次使用 get/join ,这样的方法同时显得格外的简洁美丽
区别于使用计数器和线程池处理的负责且独立,CompletableFuture 为多个复杂的组提供了简洁且美丽的解决方法。不过要注意的是,在这个过程中,CompletableFuture 存在时间损耗,比计数器和线程 耗时久一些,但是为了整体的可控和代码的可维护,在复杂组场景的线程池来讲使用它还是一个相对友好的方案。