说明:在一些场景,如导入数据,批量插入数据库,使用常规方法,需要等待较长时间,而使用线程池可以提高效率。本文介绍如何在Spring Boot中使用线程池来批量插入数据。
搭建环境
首先,创建一个Spring Boot项目,pom文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/>
</parent>
<groupId>com.hezy</groupId>
<artifactId>thread_pool_demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.2.8</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.6</version>
</dependency>
</dependencies>
</project>
写一个插入数据的Mapper方法
import com.hezy.pojo.User;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface UserMapper {
@Insert("insert into i_users (username, password) values (#{user.username}, #{user.password})")
void insert(@Param("user") User user);
}
写一个接口,用来插入20万条记录,如下:
import com.hezy.pojo.User;
import com.hezy.service.AsyncService;
import com.hezy.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
@RestController
@RequestMapping("user")
public class UserController {
/**
* 总记录数
*/
private final static int SIZE = 40 * 10000;
@Autowired
private UserService userService;
@Autowired
private AsyncService asyncService;
@GetMapping("insert1")
public void insert1() {
ArrayList<User> list = new ArrayList<>(SIZE);
for (int i = 0; i < SIZE; i++) {
User user = new User();
user.setUsername("user" + i);
user.setPassword("password" + i);
list.add(user);
}
long startTime = System.currentTimeMillis();
// 批量插入
for (User user : list) {
userService.insert(user);
}
long endTime = System.currentTimeMillis();
System.out.println("不用线程池===插入40万条记录耗时:" + ((endTime - startTime) / 1000) + "s");
}
}
启动项目,测试一下,看要多长时间……11分钟
使用线程池
Spring Boot有自动注入的线程池(threadPoolTaskExecutor
),可以手动设置一些属性,为我们所用。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(40);
executor.setQueueCapacity(500);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("hezy-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
使用线程池来完成上面插入数据的操作,如下:
@GetMapping("insert2")
public void insert2() {
ArrayList<User> list = new ArrayList<>(SIZE);
for (int i = 0; i < SIZE; i++) {
User user = new User();
user.setUsername("user" + i);
user.setPassword("password" + i);
list.add(user);
}
// 将数据分成4000批,每批插入100条
List<List<User>> batchList = new ArrayList<>();
for (int i = 0; i < list.size(); i += 100) {
batchList.add(list.subList(i, i + 100));
}
long startTime = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(batchList.size());
// 线程池分批插入
for (List<User> batch : batchList) {
asyncService.executeAsync(batch, userService, countDownLatch);
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
System.out.println("使用线程池===插入40万条记录耗时:" + ((endTime - startTime) / 1000) + "s");
}
AsyncService实现类
import com.hezy.pojo.User;
import com.hezy.service.AsyncService;
import com.hezy.service.UserService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@Service
public class AsyncServiceImpl implements AsyncService {
@Async("threadPoolTaskExecutor")
@Override
public void executeAsync(List<User> batch, UserService userService, CountDownLatch countDownLatch) {
try {
for (User user : batch) {
userService.insert(user);
}
} finally {
countDownLatch.countDown();
}
}
}
启动,测试,速度提升很明显。如果再改造一下insert()方法,一次插入多条数据,肯定还能更快。
总结
本文介绍如何使用Spring Boot装配的线程池Bean,完成大数据量的批量插入操作,提高程序执行效率。
实例完整代码:https://github.com/HeZhongYing/thread_pool_demo
参考B站UP主(孟哥说Java)视频:https://www.bilibili.com/video/BV18r421F7CQ