前提
本文章是在项目中发现一些同学对并行编程的思想理解,或者说对代码的执行逻辑有些理解偏颇的地方。特整理此文章进行分享,希望有同样困惑的小伙伴能够对此类问题有一个清晰的理解。
在此不会介绍CompletableFuture对库函数用法,因为库函数的用法,有很多人已经总结的很清楚了,大家可以自行搜索。
背景
最近在公司的一次代码评审的时候,发现一位同学的代码中,出现了在使用CompletableFuture的一个问题。
场景
本地方法,需要同时调用多家自营商户,获取商品的价格信息。如果哪家平台出现异常(超时、业务异常)场景,则本地方法不向下游返回该商品数据。
问题
在调用三个不同的自营平台接口时,诉求时并行执行,总时长为三个平台中交互时间最长的那个,且不可超过500ms,如果超过500ms则丢弃该任务,直接返回其他调用结果。但发现如果调用超过设置的超时阈值500ms时,并没有超时丢弃,而是等待着时间较长的任务结束后一并返回。业务代码如下:
- 原代码逻辑
public class Parallel_3 {
private static final Map<Integer, Long> TENANT_SLEEP_CONFIG = new HashMap<>(8);
static {
TENANT_SLEEP_CONFIG.put(1, 100L);
TENANT_SLEEP_CONFIG.put(2, 150L);
TENANT_SLEEP_CONFIG.put(3, 800L);
}
private static List<String> parallelExecuteTask(Set<Integer> tenantIdSet) throws Exception {
CompletableFuture<String>[] completableFutures = new CompletableFuture[3];
Iterator<Integer> tenantIdIterator = tenantIdSet.iterator();
int index = 0;
while (tenantIdIterator.hasNext()) {
Integer tenantId = tenantIdIterator.next();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> Parallel_3.executeTask(tenantId));
completableFutures[index++] = completableFuture;
}
try {
CompletableFuture.allOf(completableFutures).get(500, TimeUnit.MILLISECONDS);
} catch (Exception e) {
System.err.println("调用任务超时, 不再等待, 直接处理后序逻辑.");
}
List<String> resultList = new ArrayList<>();
for (CompletableFuture<String> completableFuture : completableFutures) {
resultList.add(completableFuture.get());
}
return resultList;
}
private static String executeTask(int tenantId) {
try {
Thread.sleep(TENANT_SLEEP_CONFIG.get(tenantId));
System.out.println("current index : " + tenantId);
return "任务返回值 : " + tenantId;
} catch (InterruptedException e) {
e.printStackTrace();
}
return "";
}
private static Set<Integer> getTenantIdSet() {
Set<Integer> tenantIdSet = new HashSet<>();
tenantIdSet.add(1);
tenantIdSet.add(2);
tenantIdSet.add(3);
return tenantIdSet;
}
public static void main(String[] args) throws Exception {
// 参数集
Set<Integer> tenantIdSet = getTenantIdSet();
// 并行执行任务
List<String> resultList = parallelExecuteTask(tenantIdSet);
// 打印结果集合
System.out.println("result data: " + resultList);
// 主线程结束
System.out.println("main thead end ...");
}
}
- 打印结果
current index : 1
current index : 2
调用任务超时, 不再等待, 直接处理后序逻辑.
current index : 3
result data: [任务返回值 : 1, 任务返回值 : 2, 任务返回值 : 3]
main thead end ...
我们发现result data中居然还有“任务返回值 :3”,该任务按照预想已经超时,并不应该出现在返回值列表中。
理解误区
- 当多任务初始化完成,执行后,调用如下代码后,即等待全部任务执行完毕,如果有超时任务不在等待。
CompletableFuture.allOf(completableFutures).get(500, TimeUnit.MILLISECONDS);
- 执行完等待全部任务结束,并设置超时时间,再执行如下代码,只会取到未超时的任务的执行结果。
for (CompletableFuture<String> completableFuture : completableFutures) { resultList.add(completableFuture.get()); }
- 问题原因
第一步,并没有问题,当调用了allOf方法并设置结束时间,如果等到了超时时间,并没有全部任务都执行完毕,则不会继续等待,而是继续执行后续逻辑。
当执行到第二步的代码逻辑时,循环任务内容并调用get()方法时,则会等待超时的任务执行完毕,并返回任务执行结果,导致外层调用整体超时。
修复方案
- 上下文对象
可以声明一个方法的上下文对象,将每个任务的处理结果设置到上下文对象中,在调用allOf方法后,直接返回上下文对象的内容,不在循环调用get方法。
- getNow(T valueIfAbsent)
使用getNow方法替换get方法,并设置返回值,如果超时情况,则直接使用默认值。