项目场景:
商品改价:商品改价中通过多线程批量处理经过 Lists.partition拆分的集合对象
问题描述
商品改价中通过多线程批量处理经过 Lists.partition拆分的集合对象,发现偶尔会报
java.util.ConcurrentModificationException: null
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1231)
at java.util.ArrayList$SubList.spliterator(ArrayList.java:1235)
at java.util.Collection.stream(Collection.java:581)
代码实现逻辑
for (Map.Entry<Integer, List<PriceHandle>> partnerIdProductId : priceHandleGroup.entrySet()) {
Integer partnerId = partnerIdProductId.getKey();
List<PriceHandle> priceHandles = partnerIdProductId.getValue();
log.info("当前分片={},开始同步partnerId={} size={}", shardingItem, partnerId, priceHandles.size());
List<List<PriceHandle>> pidsGroup = Lists.partition(priceHandles, splitSize);
log.info("partnerId={},分为{}组,每组{}条", partnerId, pidsGroup.size(), splitSize);
CountDownLatch countDownLatch = new CountDownLatch(pidsGroup.size());
for (List<PriceHandle> priceHandlesLittle : pidsGroup) {
executorService.submit(() -> {
try {
List<Long> productIds = priceHandlesLittle.stream().map(PriceHandle::getProductId).collect(Collectors.toList());
List<ProductMapEntity> productMapEntities = productMappingAllPartnerService.getProductMapListByPartnerIdAndProductIds(partnerId, productIds);
//没有映射的失败不处理
List<Long> mapProductIds = productMapEntities.stream().map(ProductMapEntity::getProductId).collect(Collectors.toList());
List<PriceHandle> noMapPriceHandles = priceHandlesLittle.stream().filter(e -> !mapProductIds.contains(e.getProductId())
).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(noMapPriceHandles)) {
jingdongPriceSyncService.updateBatch(noMapPriceHandles, EventStatus.HANDLED_NO_NEED.eventStatus, "商品无映射");
}
priceHandlesLittle.removeAll(noMapPriceHandles);
// 过滤productPool中不存在的重复品
filterRepeatProductId(productMapEntities, priceHandlesLittle);
if (CollectionUtils.isEmpty(productMapEntities) || CollectionUtils.isEmpty(priceHandlesLittle)) {
log.info("过滤productPool中不存在的重复品");
return;
}
PriceSyncService.synPrice(partnerId, productMapEntities, false, priceHandlesLittle);
} catch (Exception e) {
log.error("同步价格异常,当前处理的店铺:{}", partnerId, e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("countDownLatch.await error", e);
Thread.currentThread().interrupt();
}
}
原因分析:
这里需要注意的是在使用Google Guava库中的Lists.partition方法时,如果对原始列表进行了修改,则可能会导致ConcurrentModificationException异常的抛出。这是因为Lists.partition方法返回的是原始列表的视图,而不是副本。因此,如果在对视图进行操作时修改了原始列表,则会导致ConcurrentModificationException异常的抛出。
因此在我们的这段代码中 问题在于又通过拆分后的subList做了removeAll的操作
具体复现
解决方案:
方法1不要再for循环中基于视图 修改修改原始列表
方法2 在for循环中新建一个list对象来包装原来的list
具体实现代码
for (Map.Entry<Integer, List<PriceHandle>> partnerIdProductId : priceHandleGroup.entrySet()) {
Integer partnerId = partnerIdProductId.getKey();
List<PriceHandle> priceHandles = partnerIdProductId.getValue();
log.info("当前分片={},开始同步partnerId={} size={}", shardingItem, partnerId, priceHandles.size());
List<List<PriceHandle>> pidsGroup = Lists.partition(priceHandles, splitSize);
log.info("partnerId={},分为{}组,每组{}条", partnerId, pidsGroup.size(), splitSize);
CountDownLatch countDownLatch = new CountDownLatch(pidsGroup.size());
for (List<PriceHandle> priceHandlesLittle : pidsGroup) {
// 由于存在并发remove,需要重新赋值新ArrayList,防止ArrayList$SubList引起的ConcurrentModificationException
List<PriceHandle> copyPriceHandlesLittle = new ArrayList<>(priceHandlesLittle);
executorService.submit(() -> {
try {
List<Long> productIds = copyPriceHandlesLittle.stream().map(PriceHandle::getProductId).collect(Collectors.toList());
List<ProductMapEntity> productMapEntities = productMappingAllPartnerService.getProductMapListByPartnerIdAndProductIds(partnerId, productIds);
//没有映射的失败不处理
List<Long> mapProductIds = productMapEntities.stream().map(ProductMapEntity::getProductId).collect(Collectors.toList());
List<PriceHandle> noMapPriceHandles = copyPriceHandlesLittle.stream().filter(e -> !mapProductIds.contains(e.getProductId())
).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(noMapPriceHandles)) {
PriceSyncService.updateBatch(noMapPriceHandles, EventStatus.HANDLED_NO_NEED.eventStatus, "商品无映射");
}
copyPriceHandlesLittle.removeAll(noMapPriceHandles);
// 过滤productPool中不存在的重复品
filterRepeatProductId(productMapEntities, copyPriceHandlesLittle);
if (CollectionUtils.isEmpty(productMapEntities) || CollectionUtils.isEmpty(copyPriceHandlesLittle)) {
log.info("过滤productPool中不存在的重复品");
return;
}
PriceSyncService.synJPrice(partnerId, productMapEntities, false, copyPriceHandlesLittle);
} catch (Exception e) {
log.error("同步价格异常,当前处理的店铺:{}", partnerId, e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error("countDownLatch.await error", e);
Thread.currentThread().interrupt();
}
}
参考
https://blog.csdn.net/weixin_48321825/article/details/121012733?utm_medium=distribute.pc_relevant.none-task-blog-2defaultbaidujs_baidulandingword~default-0-121012733-blog-102456357.235v38pc_relevant_default_base3&spm=1001.2101.3001.4242.1&utm_relevant_index=3