目录
引言
概念
默认ItemProcessor
ValidatingItemProcessor:校验处理器
ItemProcessorAdapter:适配器处理器
ScriptItemProcessor:脚本处理器
CompositeItemProcessor:组合处理器
自定义ItemProcessor处理器
转视频版
引言
接着上篇:Spring Batch ItemReader组件-读数据库 了解Spring Batch ItemReader读组件后,接下来一起学习一下Spring Batch 数据处理组件:ItemProcessor
概念
前面我们多次讲过,居于块的读与写,中间还夹着一个ItemProcessor 条目处理。当我们通过ItemReader 将数据读取出来之后,你面临2个选择:
1>直接将数据转向输出
2>对读入的数据进行再加工。
如果选择第一种,那ItemProcessor 可以不用出现,如果选择第二种,就需要引入ItemProcessor 条目处理组件啦。
Spring Batch 为ItemProcessor提供默认的处理器与自定义处理器2种模式以满足各种需求。
默认ItemProcessor
Spring Batch 提供现成的ItemProcessor 组件有4种:
-
ValidatingItemProcessor:校验处理器
-
ItemProcessorAdapter:适配器处理器
-
ScriptItemProcessor:脚本处理器
-
CompositeItemProcessor:组合处理器
ValidatingItemProcessor:校验处理器
这个好理解,很多时候ItemReader读出来的数据是相对原始的数据,并没有做过多的校验
数据文件users-validate.txt
1##18
2##16
3#laofei#20
4#zhongfei#19
5#feifei#15
比如上面文本数据,第一条,第二条name数值没有指定,在ItemReader 读取之后,必定将 "" 空串封装到User name属性中,语法上没有错,但逻辑上可以做文章,比如:用户名不为空。
解决上述问题,可以使用Spring Batch 提供ValidatingItemProcessor 校验器处理。
接下来我们看下ValidatingItemProcessor 怎么实现
1>导入校验依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
2>定义实体对象
@Getter
@Setter
@ToString
public class User {
private Long id;
@NotBlank(message = "用户名不能为null或空串")
private String name;
private int age;
}
3>实现
package com.langfeiyes.batch._26_itemprocessor_validate;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.validator.BeanValidatingItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.util.StringUtils;
import java.util.List;
@SpringBootApplication
@EnableBatchProcessing
public class ValidationProcessorJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users-validate.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public BeanValidatingItemProcessor<User> beanValidatingItemProcessor(){
BeanValidatingItemProcessor<User> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true); //不满足条件丢弃数据
return beanValidatingItemProcessor;
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.processor(beanValidatingItemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("validate-processor-job4")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(ValidationProcessorJob.class, args);
}
}
解析:
1>核心是beanValidatingItemProcessor() 实例方法,核心BeanValidatingItemProcessor 类是Spring Batch 提供现成的Validator校验类,这里直接使用即可。BeanValidatingItemProcessor 是 ValidatingItemProcessor 子类
2> step()实例方法,多了.processor(beanValidatingItemProcessor()) 操作,引入ItemProcessor 组件。
ItemProcessorAdapter:适配器处理器
开发中,很多的校验逻辑已经有现成的啦,那做ItemProcessor处理时候,是否能使用现成逻辑呢?答案 是:yes
比如:现有处理逻辑:将User对象中name转换成大写
public class UserServiceImpl{
public User toUppeCase(User user){
user.setName(user.getName().toUpperCase());
return user;
}
}
新建users-adapter.txt 文件,用于测试
1#dafei#18
2#xiaofei#16
3#laofei#20
4#zhongfei#19
5#feifei#15
完整的逻辑
package com.langfeiyes.batch._27_itemprocessor_adapter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.adapter.ItemProcessorAdapter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import java.util.List;
@SpringBootApplication
@EnableBatchProcessing
public class AdapterProcessorJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users-adapter.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public UserServiceImpl userService(){
return new UserServiceImpl();
}
@Bean
public ItemProcessorAdapter<User, User> itemProcessorAdapter(){
ItemProcessorAdapter<User, User> adapter = new ItemProcessorAdapter<>();
adapter.setTargetObject(userService());
adapter.setTargetMethod("toUppeCase");
return adapter;
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.processor(itemProcessorAdapter())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("adapter-processor-job")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(AdapterProcessorJob.class, args);
}
}
ScriptItemProcessor:脚本处理器
前面要实现User name 变大写,需要大费周折,又定义类,又是定义方法,能不能简化一点。答案也是yes, Spring Batch 提供js脚本的形式,将上面逻辑写到js文件中,加载这文件,就可以实现,省去定义类,定义方法的麻烦。
需求:使用js脚本方式实现用户名大写处理
userScript.js
item.setName(item.getName().toUpperCase());
item;
这里注意:
1>item是约定的单词,表示ItemReader读除来每个条目
2>userScript.js文件放置到resource资源文件中
完整代码
package com.langfeiyes.batch._28_itemprocessor_script;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.support.ScriptItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import java.util.List;
@SpringBootApplication
@EnableBatchProcessing
public class ScriptProcessorJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users-adapter.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public ScriptItemProcessor<User, User> scriptItemProcessor(){
ScriptItemProcessor<User, User> scriptItemProcessor = new ScriptItemProcessor();
scriptItemProcessor.setScript(new ClassPathResource("userScript.js"));
return scriptItemProcessor;
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.processor(scriptItemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("script-processor-job")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(ScriptProcessorJob.class, args);
}
}
解析:
核心还是scriptItemProcessor() 实例方法,ScriptItemProcessor 类用于加载js 脚本并处理js脚本。
CompositeItemProcessor:组合处理器
CompositeItemProcessor是一个ItemProcessor处理组合,类似于过滤器链,数据先经过第一个处理器,然后再经过第二个处理器,直到最后。前一个处理器处理的结果,是后一个处理器的输出。
需求:将解析出来用户name进行判空处理,并将name属性转换成大写
1>读取文件:users-validate.txt
1##18
2##16
3#laofei#20
4#zhongfei#19
5#feifei#15
2>封装的实体对象
@Getter
@Setter
@ToString
public class User {
private Long id;
@NotBlank(message = "用户名不能为null或空串")
private String name;
private int age;
}
3>用于转换大写工具类
public class UserServiceImpl {
public User toUppeCase(User user){
user.setName(user.getName().toUpperCase());
return user;
}
}
4>完整代码
package com.langfeiyes.batch._29_itemprocessor_composite;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.adapter.ItemProcessorAdapter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.validator.BeanValidatingItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import java.util.Arrays;
import java.util.List;
@SpringBootApplication
@EnableBatchProcessing
public class CompositeProcessorJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users-validate.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public UserServiceImpl userService(){
return new UserServiceImpl();
}
@Bean
public BeanValidatingItemProcessor<User> beanValidatingItemProcessor(){
BeanValidatingItemProcessor<User> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true); //不满足条件丢弃数据
return beanValidatingItemProcessor;
}
@Bean
public ItemProcessorAdapter<User, User> itemProcessorAdapter(){
ItemProcessorAdapter<User, User> adapter = new ItemProcessorAdapter<>();
adapter.setTargetObject(userService());
adapter.setTargetMethod("toUppeCase");
return adapter;
}
@Bean
public CompositeItemProcessor<User, User> compositeItemProcessor(){
CompositeItemProcessor<User, User> compositeItemProcessor = new CompositeItemProcessor<>();
compositeItemProcessor.setDelegates(Arrays.asList(
beanValidatingItemProcessor(), itemProcessorAdapter()
));
return compositeItemProcessor;
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.processor(compositeItemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("composite-processor-job")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(CompositeProcessorJob.class, args);
}
}
解析:
核心代码:compositeItemProcessor() 实例方法,使用setDelegates 操作将其他ItemProcessor 处理合并成一个。
自定义ItemProcessor处理器
除去上面默认的几种处理器外,Spring Batch 也允许我们自定义,具体做法只需要实现ItemProcessor接口即可
需求:自定义处理器,筛选出id为偶数的用户
1>定义读取文件user.txt
1#dafei#18
2#xiaofei#16
3#laofei#20
4#zhongfei#19
5#feifei#15
2>定义实体对象
@Getter
@Setter
@ToString
public class User {
private Long id;
private String name;
private int age;
}
3>自定义处理器
//自定义
public class CustomizeItemProcessor implements ItemProcessor<User,User> {
@Override
public User process(User item) throws Exception {
//id 为偶数的用户放弃
//返回null时候 读入的item会被放弃,不会进入itemwriter
return item.getId() % 2 != 0 ? item : null;
}
}
4>完整代码
package com.langfeiyes.batch._30_itemprocessor_customize;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import java.util.List;
@SpringBootApplication
@EnableBatchProcessing
public class CustomizeProcessorJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public CustomizeItemProcessor customizeItemProcessor(){
return new CustomizeItemProcessor();
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.processor(customizeItemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("customize-processor-job")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(CustomizeProcessorJob.class, args);
}
}
到这,本篇就结束了,欲知后事如何,请听下回分解~
转视频版
看文字不过瘾可以切换视频版:Spring Batch高效批处理框架实战