JAVA通过ThreadPoolTaskExecutor批量插入百万级数据
文章目录
- JAVA通过ThreadPoolTaskExecutor批量插入百万级数据
- 一、前言
- 二、实现步骤
- 1、application.yml添加线程池配置信息
- 2、业务类,创建多线程批量插入具体业务方法
- 3、spring容器注入线程池bean对象
- 4、测试
- 三、总结
一、前言
百万级数据批量插入数据库,如何提高效率?
采取方案:利用ThreadPoolTaskExecutor多线程批量插入。
采用技术:springboot+mybatisPlus+mysql+ThreadPoolTaskExecutor。
二、实现步骤
直接上代码,代码如下:
1、application.yml添加线程池配置信息
# 异步线程配置
# 配置核心线程数
async:
executor:
thread:
core_pool_size: 4
max_pool_size: 9
queue_capacity: 99988
name:
prefix: async-importDB-
2、业务类,创建多线程批量插入具体业务方法
@Service
@Slf4j
public class AsyncServiceImpl {
@Resource
private PostMapper postMapper;
/**
* 生成虚拟数据
*/
public List<Post> getPostList(){
List<Post> postList=new ArrayList<>();
for(int i=1;i<=2000003;i++){
Post post=new Post();
post.setPostId((long) i);
post.setPostCode("code_"+i);
post.setPostName("name_"+i);
post.setPostSort(String.valueOf(i));
post.setStatus("0");
postList.add(post);
}
return postList;
}
/**
* 开启多线程
* @param list 每个批次的数据
* @param countDownLatch Java 中的一个并发工具类,用于协调多个线程之间的同步
*/
@Async("asyncServiceExecutor")
public void executeAsync(List<Post> list,CountDownLatch countDownLatch) {
try{
log.warn("start executeAsync");
//异步线程要做的事情
//批量新增插入数据库
postMapper.batchPost(list);
log.warn("end executeAsync");
}finally {
countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
}
}
/**
* 执行业务
*/
public void testMultiThread() {
List<Post> logOutputResults = getPostList();
Date start=DateUtils.getNowDate();
//测试每1000条数据插入开一个线程
List<List<Post>> lists = groupList(logOutputResults, 1000);
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
for (List<Post> listSub:lists) {
//分批次执行数据
executeAsync(listSub,countDownLatch);
}
try {
countDownLatch.await(); //保证之前的所有的线程都执行完成,才会走下面的;
// 这样就可以在下面拿到所有线程执行完的集合结果
} catch (Exception e) {
log.error("阻塞异常:"+e.getMessage());
}
Date end=DateUtils.getNowDate();
System.out.println("导入消耗时间"+DateUtils.getDatePoor(end,start));
}
/**
* 根据每个线程执行的数量拆分成多个数组
* @param list 所有数据
* @param size 拆分每个批次数量
*/
public List<List<Post>> groupList(List<Post> list, int size) {
List<List<Post>> resList=new ArrayList<>();
int totalBatches = (int) Math.ceil((double) list.size()/ size);
for (int i = 0; i < totalBatches; i++) {
int startIndex = i * size;
int endIndex = Math.min((i + 1) * size, list.size());
List<Post> batch = list.subList(startIndex, endIndex);
resList.add(batch);
}
return resList;
}
/**
* 计算两个时间差
*/
public static String getDatePoor(Date endDate, Date nowDate) {
long nd = 1000 * 24 * 60 * 60;
long nh = 1000 * 60 * 60;
long nm = 1000 * 60;
long ns = 1000;
// 获得两个时间的毫秒时间差异
long diff = endDate.getTime() - nowDate.getTime();
// 计算差多少天
long day = diff / nd;
// 计算差多少小时
long hour = diff % nd / nh;
// 计算差多少分钟
long min = diff % nd % nh / nm;
// 计算差多少秒//输出结果
long sec = diff % nd % nh % nm / ns;
return day + "天" + hour + "小时" + min + "分钟" + sec + "秒";
}
public static void main(String[] args) {
testMultiThread();
}
}
3、spring容器注入线程池bean对象
代码如下(示例):
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
@Value("${async.executor.thread.core_pool_size}")
private int corePoolSize;
@Value("${async.executor.thread.max_pool_size}")
private int maxPoolSize;
@Value("${async.executor.thread.queue_capacity}")
private int queueCapacity;
@Value("${async.executor.thread.name.prefix}")
private String namePrefix;
@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
log.warn("start asyncServiceExecutor");
//在这里修改
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePoolSize);
//配置最大线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列大小
executor.setQueueCapacity(queueCapacity);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(namePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
4、测试
模拟2000003 条数据进行测试。
检查多线程入库的数据,检查是否存在重复入库的问题:
根据id分组,查看是否有id重复的数据,通过sql语句检查,没有发现重复入库的问题
三、总结
通过以上测试案列,同样是导入2000003 条数据,多线程耗时1.67分钟,单线程耗时15.38分钟。
这里使用线程数量计算公式:CPU核心数量*2 +1个线程。我的电脑是4核16G,所以设定线程数是9个。