首先注入
private final SqlSessionFactory sqlSessionFactory;
private final static int BATCH_SIZE = 200; //保存数据条数
private final static int THREAD_POOL_SIZE = 15; // 线程池大小
然后把保存的数据根据BATCH_SIZE 切割成多个批次封装起来:
/**
* 将数据分成多个批次
*
* @param data 原始数据
* @param batchSize 每批次的大小
* @return 分片后的数据列表
*/
public static <T> List<List<T>> splitData(List<T> data, int batchSize) {
List<List<T>> batches = new ArrayList<>();
for (int i = 0; i < data.size(); i += batchSize) {
int end = Math.min(i + batchSize, data.size());
batches.add(data.subList(i, end));
}
return batches;
}
然后使用多线程方式实现保存数据,此数据不能使用@Transactional方式,只能手动提交
public void minusTwoFloor(List<DtsDigitalTwinModel> assetsDetailsInfos) {
List<List<DtsDigitalTwinModel>> twinModelList = splitData(assetsDetailsInfos, BATCH_SIZE);
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
// 使用 CountDownLatch 确保所有线程完成后主线程才继续
CountDownLatch latch = new CountDownLatch(twinModelList.size());
// 提交任务到线程池
for (List<DtsDigitalTwinModel> batch : twinModelList) {
executorService.submit(() -> {
try (SqlSession sqlSession = sqlSessionFactory.openSession()){
//todo 此处写批量保存
this.saveBatch(batch);
sqlSession.commit();
} finally {
latch.countDown(); // 任务完成,计数器减一
}
});
}
// 等待所有任务完成
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭线程池
executorService.shutdown();
log.info("数据保存完成!");
}