目录
引言
数据准备
游标方式
分页方式
转视频版
引言
接着上篇:Spring Batch ItemReader组件-Json文件,了解Spring Batch 读取Json文件后,接下来一起学习一下Spring Batch 如何读数据库中的数据
数据准备
下面是一张用户表user, 如果数据是存放在数据库中,那么又该怎么读取?
CREATE TABLE `user` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
`name` varchar(255) DEFAULT NULL COMMENT '用户名',
`age` int DEFAULT NULL COMMENT '年龄',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb3;
INSERT INTO `user` VALUES (1, 'dafei', 18);
INSERT INTO `user` VALUES (2, 'xiaofei', 17);
INSERT INTO `user` VALUES (3, 'zhongfei', 16);
INSERT INTO `user` VALUES (4, 'laofei', 15);
INSERT INTO `user` VALUES (5, 'feifei', 14);
Spring Batch 提供2种从数据库中读取数据的方式,分别为:基于游标方式和基于分页方式。
游标方式
游标是数据库中概念,可以简单理解为一个指针
游标遍历时,获取数据表中某一条数据,如果使用JDBC操作,游标指向的那条数据会被封装到ResultSet中,如果想将数据从ResultSet读取出来,需要借助Spring Batch 提供RowMapper 实现表数据与实体对象的映射。
user表数据---->User对象
Spring Batch JDBC 实现数据表读取需要做几个准备
1>实体对象User
@Getter
@Setter
@ToString
public class User {
private Long id;
private String name;
private int age;
}
2>RowMapper 表与实体对象映射实现类
public class UserRowMapper implements RowMapper<User> {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setId(rs.getLong("id"));
user.setName(rs.getString("name"));
user.setAge(rs.getInt("age"));
return user;
}
}
3>JdbcCursorItemReader编写
package com.langfeiyes.batch._24_itemreader_db_cursor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import javax.sql.DataSource;
import java.util.List;
@SpringBootApplication
@EnableBatchProcessing
public class CursorDBReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public UserRowMapper userRowMapper(){
return new UserRowMapper();
}
@Bean
public JdbcCursorItemReader<User> userItemReader(){
return new JdbcCursorItemReaderBuilder<User>()
.name("userCursorItemReader")
.dataSource(dataSource)
.sql("select * from user")
.rowMapper(userRowMapper())
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("cursor-db-reader-job")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(CursorDBReaderJob.class, args);
}
}
解析:
1>操作数据库,需要引入DataSource
2>留意userItemReader() 方法,需要明确指定操作数据库sql
3>留意userItemReader() 方法,需要明确指定游标回来之后,数据映射规则:rowMapper
这里要注意,如果sql需要where 条件,需要额外定义
比如: 查询 age > 16的用户
@Bean
public JdbcCursorItemReader<User> userItemReader(){
return new JdbcCursorItemReaderBuilder<User>()
.name("userCursorItemReader")
.dataSource(dataSource)
.sql("select * from user where age > ?")
.rowMapper(userRowMapper())
//拼接参数
.preparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{16}))
.build();
}
分页方式
游标的方式是查询出所有满足条件的数据,然后一条一条读取,而分页是按照指定设置的pageSize数,一次性读取pageSize条。
分页查询方式需要几个要素
1>实体对象,跟游标方式一样
2>RowMapper映射对象,跟游标方式一样
3>数据源,跟游标方式一样
4>PagingQueryProvider 分页逻辑提供者
package com.langfeiyes.batch._25_itemreader_db_page;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.batch.item.database.support.SqlitePagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.ArgumentPreparedStatementSetter;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
@SpringBootApplication
@EnableBatchProcessing
public class PageDBReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public UserRowMapper userRowMapper(){
return new UserRowMapper();
}
@Bean
public PagingQueryProvider pagingQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setSelectClause("select *"); //查询列
factoryBean.setFromClause("from user"); //查询的表
factoryBean.setWhereClause("where age > :age"); //where 条件
factoryBean.setSortKey("id"); //结果排序
return factoryBean.getObject();
}
@Bean
public JdbcPagingItemReader<User> userItemReader() throws Exception {
HashMap<String, Object> param = new HashMap<>();
param.put("age", 16);
return new JdbcPagingItemReaderBuilder<User>()
.name("userPagingItemReader")
.dataSource(dataSource) //数据源
.queryProvider(pagingQueryProvider()) //分页逻辑
.parameterValues(param) //条件
.pageSize(10) //每页显示条数
.rowMapper(userRowMapper()) //映射规则
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public Step step() throws Exception {
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("page-db-reader-job1")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(PageDBReaderJob.class, args);
}
}
解析:
1>需要提供pagingQueryProvider 用于拼接分页SQL
2>userItemReader() 组装分页查询逻辑。
到这,本篇就结束了,欲知后事如何,请听下回分解~
转视频版
看文字不过瘾可以切换视频版:Spring Batch高效批处理框架实战