记录一次使用动态数据源+java8的ParallelStream并行流导致的数据源切换失效问题,先看一下异常记录:
代码如下:
@Service
@DS(DataSourceConst.ORDER)
public class OrderService {
@Resource
private VendorService vendorService;
public void getVendor() {
vendorService.getVendorInfo(Arrays.asList(1L));
}
}
@Service
@DS(DataSourceConst.VENDOR)
public class VendorService {
@Resource
private VendorInfoRepository vendorInfoRepository;
public List<VendorInfo> getVendorInfo(List<Long> vendorIdList) {
List<List<Long>> partition = Lists.partition(vendorIdList, 500);
return partition.parallelStream().flatMap(idList -> {
LambdaQueryWrapper<VendorInfo> wrapper = new LambdaQueryWrapper<VendorInfo>()
.in(VendorInfo::getId, idList);
return vendorInfoRepository.select(wrapper).stream();
}).collect(toList());
}
}
分析问题:因为dynamic-datasource是通过AOP的方式,在调用方法前切换数据源的,而java8的ParallelStream需要新开线程,这些新的线程是独立于原来的线程的,原来线程中的数据源切换并不能传递到新的线程中,这就可能导致在新的线程中对数据库的操作还是使用的原来线程的数据源,不能正确地进行数据源切换。
解决问题:把@DS放在方法上,代码如下:
@Service
@DS(DataSourceConst.VENDOR)
public class VendorService {
@Resource
private VendorInfoRepository vendorInfoRepository;
@DS(DataSourceConst.VENDOR)
public List<VendorInfo> getVendorInfo(List<Long> vendorIdList) {
List<List<Long>> partition = Lists.partition(vendorIdList, 500);
return partition.parallelStream().flatMap(idList -> {
LambdaQueryWrapper<VendorInfo> wrapper = new LambdaQueryWrapper<VendorInfo>()
.in(VendorInfo::getId, idList);
return vendorInfoRepository.select(wrapper).stream();
}).collect(toList());
}
}
以上是问题的一次简单记录,并为深挖如有不对的地方请评论指导。