前言
最近在项目上遇到了批量插入的场景问题,由于每次需要插入超过 10w+ 的数据量并且字段也蛮多的导致如果使用循环单次插入的方式插入数据插入的效率不高。相信读者们在实际开发中也遇到过这样类似的场景,那么批量插入如何实现呢?
其实我也是一知半解,之前只见过别人博客上的批量插入实现,对于实际优化上的细节以及优化的程度并不了解。所以正好借此机会,在这里认真地把批量插入的实现及优化过程实操一遍并记录下来,有兴趣的读者们可以接着往下观看,有不对的地方还希望能在评论里指出来。
既然涉及到了数据库层面的操作,我想从 JDBC 和 MyBatis / MyBatis Plus 两个层面分别实现一下批量插入,下面将依次讲解实现及优化过程。
JDBC 实现批量插入
在编写代码前,先准备一下 JDBC 批量插入需要的测试环境。
JDBC 测试环境
建表语句(使用数据库版本 mysql 5.7.19)
DROP TABLE IF EXISTS `fee`;
CREATE TABLE `fee` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
`owner` varchar(64) CHARACTER SET utf8 COLLATE utf8_unicode_ci NOT NULL COMMENT '归属人',
`fee1` decimal(30, 5) NULL DEFAULT NULL COMMENT '费用1',
`fee2` decimal(30, 5) NULL DEFAULT NULL COMMENT '费用2',
`fee3` decimal(30, 5) NULL DEFAULT NULL COMMENT '费用3',
`fee4` decimal(30, 5) NULL DEFAULT NULL COMMENT '费用4',
`fee5` decimal(30, 5) NULL DEFAULT NULL COMMENT '费用5',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_unicode_ci COMMENT = '费用表' ROW_FORMAT = Dynamic;
maven 坐标(使用 Java 版本 JDK 1.8)
!-- MySQL 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.9</version>
</dependency>
普通插入
在实现批量插入之前呢,为了更明显的看到普通插入方式和批量插入方式的不同,先来写一遍普通插入(循环插入)的实现并记录一下插入所需时间。
使用 JDBC 不需要添加额外的配置文件,直接上代码:
/**
* JDBC - 普通插入(循环遍历一条一条插入)
* @author 单程车票
*/
public class JDBCDemo {
public static void main(String[] args) {
String url = "jdbc:mysql://localhost:3306/test";
String user = "root";
String password = "123456";
String driver = "com.mysql.jdbc.Driver";
// sql语句
String sql = "INSERT INTO fee(`owner`,`fee1`,`fee2`,`fee3`,`fee4`,`fee5`) VALUES (?,?,?,?,?,?);";
Connection conn = null;
PreparedStatement ps = null;
// 开始时间
long start = System.currentTimeMillis();
try {
Class.forName(driver);
conn = DriverManager.getConnection(url, user, password);
ps = conn.prepareStatement(sql);
// 循环遍历插入数据
for (int i = 1; i <= 100000; i++) {
ps.setString(1, "o"+i);
ps.setBigDecimal(2, new BigDecimal("11111.111"));
ps.setBigDecimal(3, new BigDecimal("11111.111"));
ps.setBigDecimal(4, new BigDecimal("11111.111"));
ps.setBigDecimal(5, new BigDecimal("11111.111"));
ps.setBigDecimal(6, new BigDecimal("11111.111"));
ps.executeUpdate();
}
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// 结束时间
long end = System.currentTimeMillis();
System.out.println("十万条数据插入时间(普通插入方式):" + (end - start) + " ms");
}
}
执行结果:
可以看到使用普通插入的方式插入 10w 条数据需要的时间大概在 80 s 左右,接下来看看使用批量插入的方式优化了多少。
批处理插入
下面就是 JDBC 批量插入的实现方式:批处理插入方式 + 手动事务提交。
代码:
/**
* JDBC - 批处理插入
* @author 单程车票
*/
public class JDBCPlusDemo {
public static void main(String[] args) {
// url 设置允许重写批量提交 rewriteBatchedStatements=true
String url = "jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true";
String user = "root";
String password = "123456";
String driver = "com.mysql.jdbc.Driver";
// sql语句(注意url设置为rewriteBatchedStatements=true时,不允许sql语句带有;号,否则会抛出BatchUpdateException异常)
String sql = "INSERT INTO fee(`owner`,`fee1`,`fee2`,`fee3`,`fee4`,`fee5`) VALUES (?,?,?,?,?,?)";
Connection conn = null;
PreparedStatement ps = null;
// 开始时间
long start = System.currentTimeMillis();
try {
Class.forName(driver);
conn = DriverManager.getConnection(url, user, password);
ps = conn.prepareStatement(sql);
// 关闭自动提交
conn.setAutoCommit(false);
for (int i = 1; i <= 100000; i++) {
ps.setString(1, "o"+i);
ps.setBigDecimal(2, new BigDecimal("11111.111"));
ps.setBigDecimal(3, new BigDecimal("11111.111"));
ps.setBigDecimal(4, new BigDecimal("11111.111"));
ps.setBigDecimal(5, new BigDecimal("11111.111"));
ps.setBigDecimal(6, new BigDecimal("11111.111"));
// 加入批处理(将当前sql加入缓存)
ps.addBatch();
// 以 1000 条数据作为分片
if (i % 1000 == 0) {
// 执行缓存中的sql语句
ps.executeBatch();
// 清空缓存
ps.clearBatch();
}
}
ps.executeBatch();
ps.clearBatch();
// 事务提交(实际开发中需要判断有插入失败的需要在 finally 中做好事务回滚操作)
conn.commit();
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
// 结束时间
long end = System.currentTimeMillis();
System.out.println("十万条数据插入时间(批处理插入):" + (end - start) + " ms");
}
}
执行结果:
可以看到使用批处理+手动提交的方式插入 10w 条数据的执行时间大概在 1s 左右,速度明显提高了很多。接下来看看批处理方式需要注意的细节和重点有哪些:
-
利用
PreparedStatement
批量处理的三个方法来实现批量操作,分别是: -
addBatch()
:该方法用于向批处理中添加一批参数。通常在执行批量操作之前,通过多次调用该方法,将不同的参数添加到批处理中,然后一次性将这些参数一起提交给数据库执行。executeBatch()
:该方法用于执行当前的批处理。一旦添加完所有参数到批处理后,可以调用该方法将这些参数提交给数据库执行。该方法会返回一个整数数组,表示批处理中每个操作所影响的行数。clearBatch()
:该方法用于清空当前的批处理。在执行完当前批处理后需要清空当前批处理,可以调用该方法来清空之前添加的所有参数。
-
在配置 MySQL 的 url 时需要加上
rewriteBatchedStatements=true
才能达到真正意义上的批处理效果。 -
- 这个设置是为了把允许重写批量提交(
rewriteBatchedStatements
)开启。 - 在默认不开启的情况下,会无视
executeBatch()
方法,将原本应该批量执行的 sql 语句又拆散成单条语句执行。也就是说如果不开启允许重写批量提交,实际上批处理操作和原本的单条语句循环插入的效果一样。
- 这个设置是为了把允许重写批量提交(
-
使用 JDBC 时需要注意插入的 sql 语句结尾不能带
;
号,否则会抛出BatchUpdateException
异常。 -
- 如图:
- 这是因为使用批处理是会在结尾处进行拼接,如果结尾有
;
号会导致插入语句变成INSERT INTO TABLE(X,X,X,X) VALUES (X,X,X,X);,(X,X,X,X);,(X,X,X,X);,
这样自然会出现 sql 语法错误。
-
需要注意批量处理时的分片操作,上面代码的分片大小为 1000(这是参考了后面 MP 框架的默认分片大小),分片操作可以避免一次性提交的数据量过大从而导致数据库在处理时出现的性能问题和内存占用过高问题,有效的分片可以减轻数据库的负担。
-
使用手动事务提交可以提高插入速度,在批量插入大量数据时,手动事务提交相对于自动提交事务来说可以减少磁盘的写入次数,减少锁竞争,从而提高插入的性能。
-
- 可以通过
setAutoCommit(false)
来关闭自动提交事务,等全部批量插入完成后再通过commit()
手动提交事务。
- 可以通过
MyBatis / MyBatis Plus 实现批量插入
由于 MyBatis Plus 相对于 MyBatis 来说只做了增强并没有改变 MyBatis 的功能,所以接下来将以 MyBatis Plus 来实现批量插入,其中有些方式是两个框架都可以使用的,有些则是 MP 独有的,会在后续讲解中标注出来。
MyBatis Plus 测试环境
先来配一下 MP 需要的测试环境,继续利用上文的 JDBC 测试环境并补充 MP 需要的测试环境:
maven 坐标:
<!-- MyBatis Plus 依赖 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
application.properties
:
# 配置数据库
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true
spring.datasource.username=root
spring.datasource.password=123456
实现代码
由于 MyBatis / MyBatis Plus 的测试代码过多,所以在这里统一展示实体类、service、mapper 的实现代码,后续只给出测试代码。
Fee.java - 实体类
/**
* Fee 实体类
* @author 单程车票
*/
@TableName("fee")
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Fee {
@TableId(type = IdType.AUTO)
private Long id;
private String owner;
private BigDecimal fee1;
private BigDecimal fee2;
private BigDecimal fee3;
private BigDecimal fee4;
private BigDecimal fee5;
}
FeeMapper.java - Mapper 接口
/**
* Fee Mapper接口
* @author 单程车票
*/
@Mapper
public interface FeeMapper extends BaseMapper<Fee> {
/**
* 单条数据插入
* @param fee 实体类
* @return 插入结果
*/
int insertByOne(Fee fee);
/**
* foreach动态拼接sql插入
* @param feeList 实体类集合
* @return 插入结果
*/
int insertByForeach(List<Fee> feeList);
}
这里继承 BaseMapper
只是为了使用最后的 MP 自带的批处理插入方法。如果不使用那种方式则可以不继承。
FeeMapper.xml - Mapper 映射文件
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="db.review.mapper.FeeMapper">
<insert id="insertByOne">
INSERT INTO fee(`owner`,`fee1`,`fee2`,`fee3`,`fee4`,`fee5`)
VALUES (#{owner}, #{fee1}, #{fee2}, #{fee3}, #{fee4}, #{fee5})
</insert>
<insert id="insertByForeach">
INSERT INTO fee(`owner`,`fee1`,`fee2`,`fee3`,`fee4`,`fee5`)
VALUES
<foreach collection="feeList" item="fee" separator=",">
(#{fee.owner}, #{fee.fee1}, #{fee.fee2}, #{fee.fee3}, #{fee.fee4}, #{fee.fee5})
</foreach>
</insert>
</mapper>
FeeService.java - Service 接口
/**
* Fee Service 接口
* @author 单程车票
*/
public interface FeeService extends IService<Fee> {
/**
* 普通插入
* @param feeList 实体类列表
* @return 插入结果
*/
int saveByFor(List<Fee> feeList);
/**
* foreach 动态拼接插入
* @param feeList 实体类列表
* @return 插入结果
*/
int saveByForeach(List<Fee> feeList);
/**
* 批处理插入
* @param feeList 实体类列表
* @return 插入结果
*/
int saveByBatch(List<Fee> feeList);
}
同样这里继承 IService
也只是为了使用最后的 MP 自带的批处理插入方法。如果不使用那种方式则可以不继承。
FeeServiceImpl.java - Service 实现类
/**
* Fee Service 实现类
* @author 单程车票
*/
@Service
public class FeeServiceImpl extends ServiceImpl<FeeMapper, Fee> implements FeeService {
@Resource
private FeeMapper feeMapper;
@Resource
private SqlSessionFactory sqlSessionFactory;
@Override
public int saveByFor(List<Fee> feeList) {
// 记录结果(影响行数)
int res = 0;
// 循环插入
for (Fee fee : feeList) {
res += feeMapper.insertByOne(fee);
}
return res;
}
@Override
public int saveByForeach(List<Fee> feeList) {
// 通过mapper的foreach动态拼接sql插入
return feeMapper.insertByForeach(feeList);
}
@Transactional
@Override
public int saveByBatch(List<Fee> feeList) {
// 记录结果(影响行数)
int res = 0;
// 开启批处理模式
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
FeeMapper feeMapper = sqlSession.getMapper(FeeMapper.class);
for (int i = 1; i <= feeList.size(); i++) {
// 利用mapper的单条插入方法插入
res += feeMapper.insertByOne(feeList.get(i-1));
// 进行分片类似 JDBC 的批处理
if (i % 100000 == 0) {
sqlSession.commit();
sqlSession.clearCache();
}
}
sqlSession.commit();
sqlSession.clearCache();
return res;
}
}
同样这里继承 ServiceImpl
也只是为了使用最后的 MP 自带的批处理插入方法。如果不使用那种方式则可以不继承。
普通插入
同样为了形成对比,先来看看循环单条插入所需要的执行时间,通过 SpringBootTest 进行测试:
测试代码(代码没有使用 MyBatis Plus 的增强功能,所以这种方式在 MyBatis 和 MyBatis Plus 两个框架都适用):
/**
* MP 测试类
*
* @author 单程车票
*/
@SpringBootTest
public class MPDemo {
@Resource
private FeeService feeService;
@Test
public void mpDemo1() {
// 获取 10w 条测试数据
List<Fee> feeList = getFeeList();
// 开始时间
long start = System.currentTimeMillis();
// 普通插入
feeService.saveByFor(feeList);
// 结束时间
long end = System.currentTimeMillis();
System.out.println("十万条数据插入时间(普通插入方式):" + (end - start) + " ms");
}
private List<Fee> getFeeList() {
List<Fee> list = new ArrayList<>();
for (int i = 1; i <= 100000; i++) {
list.add(new Fee(null, "o" + i,
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111")));
}
return list;
}
}
测试结果:
可以看到花费时间大致和 JDBC 的普通插入方式一致都在 80s 左右。
foreach 动态拼接插入
接下来,看看使用 foreach 动态 sql 来实现拼接 sql 的方式进行插入的执行时间是多少。
测试代码(代码没有使用 MyBatis Plus 的增强功能,所以这种方式在 MyBatis 和 MyBatis Plus 两个框架都适用):
/**
* MP 测试类
*
* @author 单程车票
*/
@SpringBootTest
public class MPDemo {
@Resource
private FeeService feeService;
@Test
public void mpDemo2() {
// 获取 10w 条测试数据
List<Fee> feeList = getFeeList();
// 开始时间
long start = System.currentTimeMillis();
// foreach动态拼接插入
feeService.saveByForeach(feeList);
// 结束时间
long end = System.currentTimeMillis();
System.out.println("十万条数据插入时间(foreach动态拼接插入方式):" + (end - start) + " ms");
}
private List<Fee> getFeeList() {
List<Fee> list = new ArrayList<>();
for (int i = 1; i <= 100000; i++) {
list.add(new Fee(null, "o" + i,
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111")));
}
return list;
}
}
测试结果:
可以看到当数据量为 10w 条时,测试结果报错,这是因为默认情况下 MySQL 可执行的最大 SQL 语句大小为 4194304 即 4MB,这里使用动态 SQL 拼接后的大小远大于默认值,故报错。
可以通过设置 MySQL 的默认 sql 大小来解决此问题(这里设置为 10MB):
sql
复制代码set global max_allowed_packet=10*1024*1024;
重新运行后的测试结果:
可以看到增大默认是 SQL 大小后插入的时间在 3s 左右,相对于 JDBC 的批处理来说速度要稍微慢一点,但比起普通插入来说已经优化很多了。但是这种方式的弊端也很明显,就是无法确定 SQL 究竟多大,不能总是更改默认的 SQL 大小,不实用。
批处理插入
接下来,来看看 JDBC 的批处理插入方式在 MyBatis / MyBatis Plus 框架中是如何实现的。
测试代码(代码没有使用 MyBatis Plus 的增强功能,所以这种方式在 MyBatis 和 MyBatis Plus 两个框架都适用):
/**
* MP 测试类
*
* @author 单程车票
*/
@SpringBootTest
public class MPDemo {
@Resource
private FeeService feeService;
@Test
public void mpDemo3() {
// 获取 10w 条测试数据
List<Fee> feeList = getFeeList();
// 开始时间
long start = System.currentTimeMillis();
// 批处理插入
feeService.saveByBatch(feeList);
// 结束时间
long end = System.currentTimeMillis();
System.out.println("十万条数据插入时间(批处理插入方式):" + (end - start) + " ms");
}
private List<Fee> getFeeList() {
List<Fee> list = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
list.add(new Fee(null, "o" + i,
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111")));
}
return list;
}
}
测试结果:
可以看到使用 MyBatis / MyBatis Plus 框架实现的批处理插入方式和 JDBC 的批处理插入方式的执行时间都在 1s 左右。
实现的核心代码如下:
@Transactional
@Override
public int saveByBatch(List<Fee> feeList) {
// 记录结果(影响行数)
int res = 0;
// 开启批处理模式
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH);
FeeMapper feeMapper = sqlSession.getMapper(FeeMapper.class);
for (int i = 1; i <= feeList.size(); i++) {
// 利用mapper的单条插入方法插入
res += feeMapper.insertByOne(feeList.get(i-1));
// 进行分片类似 JDBC 的批处理
if (i % 100000 == 0) {
sqlSession.commit();
sqlSession.clearCache();
}
}
sqlSession.commit();
sqlSession.clearCache();
return res;
}
需要注意的是:
- 和 JDBC 一样都需要开启允许重写批量处理提交(即在配置文件的数据库配置 url 中加上
rewriteBatchedStatements=true
)。 - 代码中需要使用批处理模式(利用
SqlSessionFactory
设置批处理模式并获取对应的 Mapper 接口) - 代码中同样进行了分片操作,目的是为了减轻数据库的负担避免在处理时内存占用过高。
- 可以在实现方法中加上
@Transactional
注解来起到手动提交事务的效果(好处和 JDBC 一样)。
MP 自带的批处理插入
接下来,看看 MyBatis Plus 自带的批处理方法的执行效率如何。
测试代码(注意代码中没有使用到上面的任何实现代码,而是依靠 MP 自带的 saveBatch()
方法完成批量插入):
/**
* MP 测试类
*
* @author 单程车票
*/
@SpringBootTest
public class MPDemo {
@Resource
private FeeService feeService;
@Test
public void mpDemo4() {
// 获取 10w 条测试数据
List<Fee> feeList = getFeeList();
// 开始时间
long start = System.currentTimeMillis();
// MP 自带的批处理插入
feeService.saveBatch(feeList);
// 结束时间
long end = System.currentTimeMillis();
System.out.println("十万条数据插入时间(MP 自带的批处理插入方式):" + (end - start) + " ms");
}
private List<Fee> getFeeList() {
List<Fee> list = new ArrayList<>();
for (int i = 0; i < 100000; i++) {
list.add(new Fee(null, "o" + i,
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111"),
new BigDecimal("11111.111")));
}
return list;
}
}
测试结果:
可以看到使用 MP 自带的批处理方法执行时间在 2s 左右,虽然比自己实现的批处理方法差了一点点,但是架不住它可以拿来就用,所以这也是一种好的选择。
注意:这里依旧需要开启允许重写批量处理提交(即在配置文件的数据库配置 url 中加上rewriteBatchedStatements=true
)。这个很关键,否则效率上会大打折扣的。
放一张没有开启允许重写批量处理的执行结果:
MP 自带的
saveBatch()
方法源码分析
相信大家不只是为了使用 MP 的批量处理方法,应该都好奇 MP 自带的 saveBatch()
方法是如何实现的,那么接下来我想深入源码一起来看看。
进入第一层源码:
可以看到这里带上了一个参数 batchSize = 1000
(这里其实就是分片大小 1000,也是我上述代码借鉴的分片大小),接着往下进入 executeBatch()
方法:
可以看到 Lambda 表达式其实跟上面的实现批处理插入方式类似,先一条一条插入数据,当达到分片大小后,提交并刷新,从而达到批处理的效果。再深入到下一个 executeBatch()
方法会看到底层使用的也是批处理模式。
所以其实 MP 自带的批处理方法和上文中实现的批处理方法类似。
总要有总结
以上就是这次批量插入场景问题下如何通过 JDBC 和 MyBatis / MyBatis Plus 框架实现批量插入的整个优化过程了。
通过上面的讲解,相信大家应该也可以看出来哪些实现方式是有着良好的效率和性能的。
- 使用 JDBC 推荐使用自己实现批处理方式。
- 使用 MyBatis / MyBaits Plus 推荐使用自己实现的批处理方式或 MP 自带的批处理方法。
记得使用批处理方式进行批量插入一定要带上 rewriteBatchedStatements=true
,这点很重要。