一:ListItemReader
用于简单的开发测试。
@Bean
public ItemReader<String> listItemReader() {
return new ListItemReader<>(Arrays.asList("a", "b", "c"));
}
二:FlatFileItemReader
1.1 完全映射
当文件里的字段值和实体类的属性完全一样时,可以直接使用targetType(Class)
来完成映射。常用的分割符如逗号, “\u001B” 表示ESC,
1,monday,10,上海市,浦东新区
2,zhangsan,20,北京市,朝阳区
3,lisi,30,深圳市,宝安区
4,wangwu,31,上海市,浦东新区
5,huihui,32,上海市,浦东新区
@Getter
@Setter
@ToString
public class UserInfo {
private Long id;
private String username;
private Integer age;
private String city;
private String area;
}
@Configuration
public class HelloWorldChunkJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job helloWorldChunkJob() {
return jobBuilderFactory.get("helloWorldChunkJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<UserInfo, UserInfo>chunk(3)
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public ItemReader<UserInfo> itemReader() {
return new FlatFileItemReaderBuilder<UserInfo>()
.encoding("UTF-8")
.name("userItemReader")
.resource(new ClassPathResource("static/user.csv"))
//.resource(new PathResource("/a/b/c/user.csv"))
.delimited().delimiter(",")
.names("id", "username", "age", "city", "area")
.targetType(UserInfo.class)
.build();
}
@Bean
public ItemWriter<UserInfo> itemWriter() {
return new ItemWriter<UserInfo>() {
@Override
public void write(List<? extends UserInfo> items) throws Exception {
System.out.println("itemWriter=" + items);
}
};
}
}
1.2 自定字段映射 fieldSetMapper
@Getter
@Setter
@ToString
public class User {
private Long id;
private String username;
private Integer age;
private String address;
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<UserInfo, User>chunk(3)
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public ItemReader<UserInfo> itemReader() {
return new FlatFileItemReaderBuilder<UserInfo>()
.encoding("UTF-8")
.name("userItemReader")
.resource(new ClassPathResource("static/user.csv"))
//.resource(new PathResource("/a/b/c/user.csv"))
.delimited().delimiter(",")
.names("id", "username", "age", "city", "area")
.fieldSetMapper(fieldSetMapper())
.build();
}
@Bean
public FieldSetMapper fieldSetMapper() {
return new UserFieldSetMapper();
}
public class UserFieldSetMapper implements FieldSetMapper<User> {
@Override
public User mapFieldSet(FieldSet fieldSet) throws BindException {
User user = new User();
user.setId(fieldSet.readLong("id"));
user.setUsername(fieldSet.readString("username"));
user.setAge(fieldSet.readInt("age"));
// 字段处理
user.setAddress(fieldSet.readString("city") + fieldSet.readString("area"));
return user;
}
}
1.3 行映射 lineMapper
public ItemReader<UserInfo> itemReader() {
//ESC
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer("\u001B");
// 行结束标志0
tokenizer.setQuoteCharacter('\u001A');
tokenizer.setFieldSetFactory(new DefaultFieldSetFactory());
tokenizer.setNames("id", "username", "age", "city", "area");
BeanWrapperFieldSetMapper<UserInfo> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(UserInfo.class);
DefaultLineMapper<UserInfo> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return new FlatFileItemReaderBuilder<UserInfo>()
.encoding("UTF-8")
.name("userItemReader")
.resource(new ClassPathResource("static/user.dat"))
.lineMapper(lineMapper)
.build();
}
三:JsonItemReader
[
{"id":1, "username":"a", "age":18},
{"id":2, "username":"b", "age":17},
{"id":3, "username":"c", "age":16},
{"id":4, "username":"d", "age":15},
{"id":5, "username":"e", "age":14}
]
@Bean
public JsonItemReader<UserInfo> itemReader() {
ObjectMapper objectMapper = new ObjectMapper();
JacksonJsonObjectReader<UserInfo> jsonObjectReader = new JacksonJsonObjectReader<>(UserInfo.class);
jsonObjectReader.setMapper(objectMapper);
return new JsonItemReaderBuilder<UserInfo>()
.name("jsonItemReader")
.resource(new ClassPathResource("static/user.json"))
.jsonObjectReader(jsonObjectReader)
.build();
}
四:数据库
3.1 JdbcCursorItemReader
游标一次读一条。
@Getter
@Setter
@ToString
public class User {
private Long id;
private String name;
private int age;
}
public class UserRowMapper implements RowMapper<User> {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setId(rs.getLong("id"));
user.setName(rs.getString("name"));
user.setAge(rs.getInt("age"));
return user;
}
}
@Configuration
public class CursorDBReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public UserRowMapper userRowMapper(){
return new UserRowMapper();
}
@Bean
public JdbcCursorItemReader<User> userItemReader(){
return new JdbcCursorItemReaderBuilder<User>()
.name("userCursorItemReader")
.dataSource(dataSource)
.sql("select * from user where age > ?")
.rowMapper(userRowMapper())
//拼接参数
.preparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{16}))
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("cursor-db-reader-job")
.start(step())
.build();
}
}
3.2 JdbcPagingItemReader 分页
一次性读一页。
@Configuration
public class PageDBReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public UserRowMapper userRowMapper(){
return new UserRowMapper();
}
@Bean
public PagingQueryProvider pagingQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setSelectClause("select *"); //查询列
factoryBean.setFromClause("from user"); //查询的表
factoryBean.setWhereClause("where age > :age"); //where 条件
factoryBean.setSortKey("id"); //结果排序
return factoryBean.getObject();
}
@Bean
public JdbcPagingItemReader<User> userItemReader() throws Exception {
HashMap<String, Object> param = new HashMap<>();
param.put("age", 16);
return new JdbcPagingItemReaderBuilder<User>()
.name("userPagingItemReader")
.dataSource(dataSource) //数据源
.queryProvider(pagingQueryProvider()) //分页逻辑
.parameterValues(param) //条件
.pageSize(10) //每页显示条数
.rowMapper(userRowMapper()) //映射规则
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public Step step() throws Exception {
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("page-db-reader-job1")
.start(step())
.build();
}
}
3.3 MyBatisPagingItemReader
@Configuration
public class HelloWorldChunkJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private SqlSessionFactory sqlSessionFactory;
private static int PAGE_SIZE = 3;
@Bean
public Job helloWorldChunkJob() {
return jobBuilderFactory.get("helloWorldChunkJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<User, User>chunk(PAGE_SIZE)
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public MyBatisPagingItemReader<User> itemReader() {
Map<String, Object> map = new HashMap<>();
map.put("id", 1);
MyBatisPagingItemReader<User> itemReader = new MyBatisPagingItemReader<>();
itemReader.setSqlSessionFactory(sqlSessionFactory);
itemReader.setQueryId("com.example.batch.mapper.UserMapper.selectUserList");
itemReader.setPageSize(PAGE_SIZE);
itemReader.setParameterValues(map);
return itemReader;
}
@Bean
public ItemWriter<User> itemWriter() {
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
System.out.println("itemWriter=" + items);
}
};
}
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.example.batch.mapper.UserMapper">
<select id="selectUserList" resultType="com.example.batch.entity.User">
select * from tbl_user
where id > #{id}
limit #{_pagesize} offset #{_skiprows}
</select>
</mapper>
五:多线程读
-
userItemReader() 加上
saveState(false)
Spring Batch 提供大部分的ItemReader是有状态的,作业重启基本通过状态来确定作业停止位置,而在多线程环境中,如果对象维护状态被多个线程访问,可能存在线程间状态相互覆盖问题。所以设置为false表示关闭状态,但这也意味着作业不能重启了。 -
step() 方法加上
.taskExecutor(new SimpleAsyncTaskExecutor())
为作业步骤添加了多线程处理能力,以块为单位,一个块一个线程,观察上面的结果,很明显能看出输出的顺序是乱序的。改变 job 的名字再执行,会发现输出数据每次都不一样。
@Bean
public FlatFileItemReader<User> userItemReader(){
System.out.println(Thread.currentThread());
FlatFileItemReader<User> reader = new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.saveState(false) //防止状态被覆盖
.resource(new ClassPathResource("static/user.csv"))
.delimited().delimiter("#")
.names("id", "username", "age")
.targetType(User.class)
.build();
return reader;
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(2)
.reader(userItemReader())
.writer(itemWriter())
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
六:多步骤并行执行
@Bean
public Job parallelJob(){
//线程1-读user-parallel.txt
Flow parallelFlow1 = new FlowBuilder<Flow>("parallelFlow1")
.start(flatStep())
.build();
//线程2-读user-parallel.json
Flow parallelFlow2 = new FlowBuilder<Flow>("parallelFlow2")
.start(jsonStep())
.split(new SimpleAsyncTaskExecutor())
.add(parallelFlow1)
.build();
return jobBuilderFactory.get("parallel-step-job")
.start(parallelFlow2)
.end()
.build();
}
parallelJob() 配置job,需要指定并行的flow步骤,先是parallelFlow1然后是parallelFlow2 , 2个步骤间使用**.split(new SimpleAsyncTaskExecutor())** 隔开,表示线程池开启2个线程,分别处理parallelFlow1, parallelFlow2 2个步骤。
七:异常处理
方式一:设置跳过异常次数
@Bean
public Step step() throws Exception {
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.faultTolerant() //容错
.skip(Exception.class) //跳过啥异常
.noSkip(RuntimeException.class) //不能跳过啥异常
.skipLimit(10) //跳过异常次数
.throttleLimit(10)
.skipPolicy(new SkipPolicy() {
@Override
public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
//定制跳过异常与异常次数
return false;
}
})
.build();
}
方式二:记录错误信息
public class ErrorItemReaderListener implements ItemReadListener {
@Override
public void beforeRead() {
}
@Override
public void afterRead(Object item) {
}
@Override
public void onReadError(Exception ex) {
System.out.println("记录读数据相关信息...");
}
}
方式三:直接跳过不处理
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<User, User>chunk(PAGE_SIZE)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.skip(Exception.class)
.build();
}