一:ListItemReader
用于简单的开发测试。
@Bean
public ItemReader<String> listItemReader() {
    return new ListItemReader<>(Arrays.asList("a", "b", "c"));
}
二:FlatFileItemReader
1.1 完全映射
当文件里的字段值和实体类的属性完全一样时,可以直接使用targetType(Class)来完成映射。常用的分割符如逗号, “\u001B” 表示ESC,
1,monday,10,上海市,浦东新区
2,zhangsan,20,北京市,朝阳区
3,lisi,30,深圳市,宝安区
4,wangwu,31,上海市,浦东新区
5,huihui,32,上海市,浦东新区
@Getter
@Setter
@ToString
public class UserInfo {
    private Long id;
    private String username;
    private Integer age;
    private String city;
    private String area;
}
@Configuration
public class HelloWorldChunkJobConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job helloWorldChunkJob() {
        return jobBuilderFactory.get("helloWorldChunkJob")
                .start(step1())
                .build();
    }
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<UserInfo, UserInfo>chunk(3)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }
    @Bean
    public ItemReader<UserInfo> itemReader() {
        return new FlatFileItemReaderBuilder<UserInfo>()
                .encoding("UTF-8")
                .name("userItemReader")
                .resource(new ClassPathResource("static/user.csv"))
                //.resource(new PathResource("/a/b/c/user.csv"))
                .delimited().delimiter(",")
                .names("id", "username", "age", "city", "area")
                .targetType(UserInfo.class)
                .build();
    }
    @Bean
    public ItemWriter<UserInfo> itemWriter() {
        return new ItemWriter<UserInfo>() {
            @Override
            public void write(List<? extends UserInfo> items) throws Exception {
                System.out.println("itemWriter=" + items);
            }
        };
    }
}
1.2 自定字段映射 fieldSetMapper
@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String username;
    private Integer age;
    private String address;
}
@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<UserInfo, User>chunk(3)
            .reader(itemReader())
            .writer(itemWriter())
            .build();
}
@Bean
public ItemReader<UserInfo> itemReader() {
    return new FlatFileItemReaderBuilder<UserInfo>()
            .encoding("UTF-8")
            .name("userItemReader")
            .resource(new ClassPathResource("static/user.csv"))
            //.resource(new PathResource("/a/b/c/user.csv"))
            .delimited().delimiter(",")
            .names("id", "username", "age", "city", "area")
            .fieldSetMapper(fieldSetMapper())
            .build();
}
@Bean
public FieldSetMapper fieldSetMapper() {
    return new UserFieldSetMapper();
}
public class UserFieldSetMapper implements FieldSetMapper<User> {
    @Override
    public User mapFieldSet(FieldSet fieldSet) throws BindException {
        User user = new User();
        user.setId(fieldSet.readLong("id"));
        user.setUsername(fieldSet.readString("username"));
        user.setAge(fieldSet.readInt("age"));
        // 字段处理
        user.setAddress(fieldSet.readString("city") + fieldSet.readString("area"));
        return user;
    }
}
1.3 行映射 lineMapper
public ItemReader<UserInfo> itemReader() {
    //ESC
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer("\u001B");
    // 行结束标志0
    tokenizer.setQuoteCharacter('\u001A');
    tokenizer.setFieldSetFactory(new DefaultFieldSetFactory());
    tokenizer.setNames("id", "username", "age", "city", "area");
    BeanWrapperFieldSetMapper<UserInfo> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(UserInfo.class);
    DefaultLineMapper<UserInfo> lineMapper = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);
    return new FlatFileItemReaderBuilder<UserInfo>()
            .encoding("UTF-8")
            .name("userItemReader")
            .resource(new ClassPathResource("static/user.dat"))
            .lineMapper(lineMapper)
            .build();
}
三:JsonItemReader
[
  {"id":1, "username":"a", "age":18},
  {"id":2, "username":"b", "age":17},
  {"id":3, "username":"c", "age":16},
  {"id":4, "username":"d", "age":15},
  {"id":5, "username":"e", "age":14}
]
@Bean
public JsonItemReader<UserInfo> itemReader() {
    ObjectMapper objectMapper = new ObjectMapper();
    JacksonJsonObjectReader<UserInfo> jsonObjectReader = new JacksonJsonObjectReader<>(UserInfo.class);
    jsonObjectReader.setMapper(objectMapper);
    return new JsonItemReaderBuilder<UserInfo>()
            .name("jsonItemReader")
            .resource(new ClassPathResource("static/user.json"))
            .jsonObjectReader(jsonObjectReader)
            .build();
}
四:数据库
3.1 JdbcCursorItemReader
游标一次读一条。
 
@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String name;
    private int age;
}
public class UserRowMapper implements RowMapper<User> {
    @Override
    public User mapRow(ResultSet rs, int rowNum) throws SQLException {
        User user = new User();
        user.setId(rs.getLong("id"));
        user.setName(rs.getString("name"));
        user.setAge(rs.getInt("age"));
        return user;
    }
}
@Configuration
public class CursorDBReaderJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DataSource dataSource;
    @Bean
    public UserRowMapper userRowMapper(){
        return new UserRowMapper();
    }
    @Bean
    public JdbcCursorItemReader<User> userItemReader(){
        return new JdbcCursorItemReaderBuilder<User>()
                .name("userCursorItemReader")
                .dataSource(dataSource)
                .sql("select * from user where age > ?")
                .rowMapper(userRowMapper())
                //拼接参数
        		.preparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{16}))
                .build();
	}
    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }
    @Bean
    public Step step(){
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(1)
                .reader(userItemReader())
                .writer(itemWriter())
                .build();
    }
    @Bean
    public Job job(){
        return jobBuilderFactory.get("cursor-db-reader-job")
                .start(step())
                .build();
    }
}
3.2 JdbcPagingItemReader 分页
一次性读一页。

@Configuration
public class PageDBReaderJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DataSource dataSource;
    @Bean
    public UserRowMapper userRowMapper(){
        return new UserRowMapper();
    }
    @Bean
    public PagingQueryProvider pagingQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();
        factoryBean.setDataSource(dataSource);
        factoryBean.setSelectClause("select *");   //查询列
        factoryBean.setFromClause("from user");    //查询的表
        factoryBean.setWhereClause("where age > :age"); //where 条件
        factoryBean.setSortKey("id");   //结果排序
        return factoryBean.getObject();
    }
    @Bean
    public JdbcPagingItemReader<User> userItemReader() throws Exception {
        HashMap<String, Object> param = new HashMap<>();
        param.put("age", 16);
        return new JdbcPagingItemReaderBuilder<User>()
                .name("userPagingItemReader")
                .dataSource(dataSource)  //数据源
                .queryProvider(pagingQueryProvider())  //分页逻辑
                .parameterValues(param)   //条件
                .pageSize(10) //每页显示条数
                .rowMapper(userRowMapper())  //映射规则
                .build();
    }
    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }
    @Bean
    public Step step() throws Exception {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(1)
                .reader(userItemReader())
                .writer(itemWriter())
                .build();
    }
    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("page-db-reader-job1")
                .start(step())
                .build();
    }
}
3.3 MyBatisPagingItemReader
@Configuration
public class HelloWorldChunkJobConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Autowired
    private SqlSessionFactory sqlSessionFactory;
    private static int PAGE_SIZE = 3;
    @Bean
    public Job helloWorldChunkJob() {
        return jobBuilderFactory.get("helloWorldChunkJob")
                .start(step1())
                .build();
    }
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(PAGE_SIZE)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }
    @Bean
    public MyBatisPagingItemReader<User> itemReader() {
        Map<String, Object> map = new HashMap<>();
        map.put("id", 1);
        MyBatisPagingItemReader<User> itemReader = new MyBatisPagingItemReader<>();
        itemReader.setSqlSessionFactory(sqlSessionFactory);
        itemReader.setQueryId("com.example.batch.mapper.UserMapper.selectUserList");
        itemReader.setPageSize(PAGE_SIZE);
        itemReader.setParameterValues(map);
        return itemReader;
    }
    @Bean
    public ItemWriter<User> itemWriter() {
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                System.out.println("itemWriter=" + items);
            }
        };
    }
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.example.batch.mapper.UserMapper">
    <select id="selectUserList" resultType="com.example.batch.entity.User">
        select * from tbl_user
        where id > #{id}
        limit #{_pagesize} offset #{_skiprows}
    </select>
</mapper>
五:多线程读
-  userItemReader() 加上 saveState(false)Spring Batch 提供大部分的ItemReader是有状态的,作业重启基本通过状态来确定作业停止位置,而在多线程环境中,如果对象维护状态被多个线程访问,可能存在线程间状态相互覆盖问题。所以设置为false表示关闭状态,但这也意味着作业不能重启了。
-  step() 方法加上 .taskExecutor(new SimpleAsyncTaskExecutor())为作业步骤添加了多线程处理能力,以块为单位,一个块一个线程,观察上面的结果,很明显能看出输出的顺序是乱序的。改变 job 的名字再执行,会发现输出数据每次都不一样。
@Bean
public FlatFileItemReader<User> userItemReader(){
    System.out.println(Thread.currentThread());
    FlatFileItemReader<User> reader = new FlatFileItemReaderBuilder<User>()
            .name("userItemReader")
            .saveState(false) //防止状态被覆盖
            .resource(new ClassPathResource("static/user.csv"))
            .delimited().delimiter("#")
            .names("id", "username", "age")
            .targetType(User.class)
            .build();
    return reader;
}
@Bean
public Step step(){
    return stepBuilderFactory.get("step1")
            .<User, User>chunk(2)
            .reader(userItemReader())
            .writer(itemWriter())
            .taskExecutor(new SimpleAsyncTaskExecutor())
            .build();
}
六:多步骤并行执行
@Bean
public Job parallelJob(){
    //线程1-读user-parallel.txt
    Flow parallelFlow1 = new FlowBuilder<Flow>("parallelFlow1")
            .start(flatStep())
            .build();
    //线程2-读user-parallel.json
    Flow parallelFlow2 = new FlowBuilder<Flow>("parallelFlow2")
            .start(jsonStep())
            .split(new SimpleAsyncTaskExecutor())
            .add(parallelFlow1)
            .build();
    return jobBuilderFactory.get("parallel-step-job")
            .start(parallelFlow2)
            .end()
            .build();
}
parallelJob() 配置job,需要指定并行的flow步骤,先是parallelFlow1然后是parallelFlow2 , 2个步骤间使用**.split(new SimpleAsyncTaskExecutor())** 隔开,表示线程池开启2个线程,分别处理parallelFlow1, parallelFlow2 2个步骤。
七:异常处理
方式一:设置跳过异常次数
@Bean
public Step step() throws Exception {
    return stepBuilderFactory.get("step1")
        .<User, User>chunk(1)
        .reader(userItemReader())
        .writer(itemWriter())
        .faultTolerant() //容错
        .skip(Exception.class)  //跳过啥异常
        .noSkip(RuntimeException.class)  //不能跳过啥异常
        .skipLimit(10)  //跳过异常次数
        .throttleLimit(10)
        .skipPolicy(new SkipPolicy() {
            @Override
            public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
                //定制跳过异常与异常次数
                return false;
            }
        })
        .build();
}
方式二:记录错误信息
public class ErrorItemReaderListener implements ItemReadListener {
    @Override
    public void beforeRead() {
    }
    @Override
    public void afterRead(Object item) {
    }
    @Override
    public void onReadError(Exception ex) {
        System.out.println("记录读数据相关信息...");
    }
}
方式三:直接跳过不处理
@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .<User, User>chunk(PAGE_SIZE)
            .reader(itemReader())
            .writer(itemWriter())
            .faultTolerant()
            .skip(Exception.class)
            .build();
}



















