SpringBatch 介绍
目前,Spring Batch是批处理框架界为数不多的优秀框架(Java语言开发)。
Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。
Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用 Spring 框架的开发者或者企业更容易访问和利用企业服务;
Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。
通过 Spring Batch 能够支持简单的、复杂的和大数据量的批处理作业。同时它也提供了优化和分片技术用于实现高性能的批处理任务。
一个典型的批处理应用程序大致如下:
- 从数据库,文件或队列中读取大量记录。
- 以某种方式处理数据。
- 以修改之后的形式写回数据。
在SpringBatch 中 Job是step的运行框架 ,而具体的运行业务是由step进行完成
Step
Step
下图就是Step的简要构造
一个Step通常涵盖三个部分:读数据(Reader)、处理数据(Processor)和写数据(Writer)。但是并不是所有的Step都需要自身来完成数据的处理,比如存储过程等方式是通过外部功能来完成,因此Spring Batch提供了2种Step的处理方式:
1)面向分片的ChunkStep,
2)面向过程的TaskletStep。
一般使用ChunkStep
ChunkStep
在Step中数据是按记录(按行)处理的,但是每条记录处理完毕之后马上提交事物反而会导致IO的巨大压力。因此Spring Batch提供了数据处理的分片功能。设置了分片之后,一次工作会从Read开始,然后交由给Processor处理。处理完毕后会进行聚合,待聚合到一定的数量的数据之后一次性调用Write将数据提交到物理数据库。
如果在聚合数据期间出现任何错误,所有的这些数据都将不执行写入。
面向分片的处理过程图
@Bean
public Step testStep(PlatformTransactionManager transactionManager) {
return stepBuilderFactory.get("testStep")
.transactionManager(transactionManager)
.<String, String>chunk(int)
.reader(testReader())
.processor(testProcessor)
.writer(testWriter())
.build();
}
transactionManager:使用默认的 PlatformTransactionManager 对事物进行管理。当配置好事物之后Spring Batch会自动对事物进行管理,无需开发人员显示操作。
提交间隔
Step使用PlatformTransactionManager管理事物。每次事物提交的间隔根据chunk方法中配置的数据执行。提交间隔设置太小,那么会浪费需要多不必要的资源,提交间隔设置的太长,会导致事物链太长占用空间,并且出现失败会导致大量数据回滚。一般设为10到20。
stepBuilderFactory.get("testStep")
.<String, String>chunk(int)
.reader(testReader())
.processor(testProcessor)
.writer(testWriter())
.startLimit(1)
.build();
step重启次数
某些Step可能用于处理一些先决的任务,所以当Job再次重启时这Step就没必要再执行,可以通过设置startLimit来限定某个Step重启的次数。当设置为1时候表示仅仅运行一次,而出现重启时将不再执行。
stepBuilderFactory.get("testStep")
.<String, String>chunk(int)
.reader(testReader())
.processor(testProcessor)
.writer(testWriter())
.startLimit(1)
.build();
step每次重启都运行
可以通过设置allow-start-if-complete为true告知框架每次重启该Step都要执行
stepBuilderFactory.get("testStep")
.<String, String>chunk(int)
.reader(testReader())
.processor(testProcessor)
.writer(testWriter())
.allowStartIfComplete(true)
.build();
step失败后跳过
stepBuilderFactory.get("testStep")
.<String, String>chunk(int)
.reader(testReader())
.processor(testProcessor)
.writer(testWriter())
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
skip-limit(skipLimit方法)配置的参数表示当跳过的次数超过数值时则会导致整个Step失败,从而停止继续运行
skip表示要当捕捉到Exception异常就跳过。但是Exception有很多继承类,此时可以使用noSkip方法指定某些异常不能跳过
step重试
stepBuilderFactory.get("testStep")
.<String, String>chunk(int)
.reader(testReader())
.processor(testProcessor)
.writer(testWriter())
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
retry(DeadlockLoserDataAccessException.class)表示只有捕捉到该异常才会重试,retryLimit(3)表示最多重试3次,faultTolerant()表示启用对应的容错功能。
step控制不回滚
stepBuilderFactory.get("testStep")
.<String, String>chunk(int)
.reader(testReader())
.processor(testProcessor)
.writer(testWriter())
.faultTolerant()
.noRollback(ValidationException.class) //不必回滚的异常
.build();
noRollback属性为Step提供了不必进行事物回滚的异常配置
step数据重读
stepBuilderFactory.get("testStep")
.<String, String>chunk(int)
.reader(testReader())
.processor(testProcessor)
.writer(testWriter())
.readerIsTransactionalQueue() //数据重读
.build();
默认情况下如果错误不是发生在Reader阶段,那么没必要再去重新读取一次数据。但是某些场景下需要Reader部分也需要重新执行,比如Reader是从一个JMS队列中消费消息,当发生回滚的时候消息也会在队列上重放,因此也要将Reader纳入到回滚的事物中,根据这个场景可以使用readerIsTransactionalQueue来配置数据重读
step事务属性
//配置事物属性
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return this.stepBuilderFactory.get("testStep")
.<String, String>chunk(int)
.reader(testReader())
.processor(testProcessor)
.writer(testWriter())
.transactionAttribute(attribute) //设置事物属性
.build();
step顺序执行
jobBuilderFactory.get("job")
.start(stepA())
.next(stepB()) //顺序执行
.next(stepC())
.build();
step条件执行
jobBuilderFactory.get("job")
.start(stepA()) //启动时执行的step
.on("*").to(stepB()) //默认跳转到stepB
.from(stepA()).on("FAILED").to(stepC()) //当返回的ExitStatus为"FAILED"时,执行。
.end()
.build();
End退出
默认情况下(没有使用end、fail方法结束),Job要顺序执行直到退出,这个退出称为end。这个时候,BatchStatus=COMPLETED、ExitStatus=COMPLETED,表示成功执行。除了Step链式处理自然退出,也可以显示调用end来退出系统。
jobBuilderFactory.get("job")
.start(step1()) //启动
.next(step2()) //顺序执行
.on("FAILED").end()
.from(step2()).on("*").to(step3()) //条件执行
.end()
.build();
step1到step2是顺序执行,当step2的exitStatus返回"FAILED"时则直接End退出。其他情况执行Step3。
Fail退出
除了end还可以使用fail退出,这个时候,BatchStatus=FAILED、ExitStatus=EARLY TERMINATION,表示执行失败。这个状态与End最大的区别是Job会尝试重启执行新的JobExecution。看下面代码的例子:
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()) //执行step1
.next(step2()).on("FAILED").fail() //step2的ExitStatus=FAILED 执行fail
.from(step2()).on("*").to(step3()) //否则执行step3
.end()
.build();
}
@StepScope和@JobScope
@StepScope
Spring Batch框架只有在批处理时才需要实例化job以及对应的最底层处理单位(ItemReader,ItemProcessor,ItemWriter,Tasklet)且在job启动后的运行参数一旦确定便无法修改。
为了使每一次启动job时使处理单位的参数可以动态修改(比如第一次job启动时参数birthDate=“20210101”,第二次job参数启动时参数改为birthDate=“20210102”)。 所以设计了@StepScope配合@Value(“#{jobParameters[‘birthDate’]}”) 从job的启动参数中获取所需参数。
@StepScope注解使用注意事项:
只能用在最底层处理单位(ItemReader,ItemProcessor,ItemWriter,Tasklet)的方法上,配合@Bean使用。
被@StepScope注解修饰的bean只会在Step被加载时进行初始化,Step处理完成后便会被销毁(也就是说被@StepScope注解所修饰bean的生命周期与Step的生命周期保持同步)
@JobScope
Job Scope的概念和 Step Scope类似,都是用于标识在到了某个执行时间段再添加和注入Bean。
@JobScope用于告知框架知道JobInstance存在时候才初始化对应的@Bean
@JobScope
@Bean
// 初始化获取 jobParameters中的参数
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@JobScope
@Bean
// 初始化获取jobExecutionContext中的参数
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
Listener监听器
Spring Batch提供了多种监听器Listener,用于在任务处理过程中触发我们的逻辑代码。具体可以参考下表:
监听器 | 具体说明 |
---|---|
JobExecutionListener | 在Job开始之前(beforeJob)和之后(afterJob)触发 |
StepExecutionListener | 在Step开始之前(beforeStep)和之后(afterStep)触发 |
ChunkListener | 在 Chunk 开始之前(beforeChunk),之后(afterChunk)和错误后(afterChunkError)触发 |
ItemReadListener | 在 Read 开始之前(beforeRead>,之后(afterRead)和错误后(onReadError)触发 |
ItemProcessListener | 在 Processor 开始之前(beforeProcess),之后(afterProcess)和错误后(onProcessError)触发 |
ItemWriteListener | 在 Writer 开始之前(beforeWrite),之后(afterWrite)和错误后(onWriteError)触发 |
SkipListener | 在 Skip(reder)时候,在 Skip(writer)时候,在 Skip(processor)时候 |
Listener 的执行顺序
- JobExecutionListener.beforeJob()
- StepExecutionListener.beforeStep()
- ChunkListener.beforeChunk()
- ItemReaderListener.beforeReader()
- ItemReaderListener.afterReader()
- ItemProcessListener.beforeProcess()
- ItemProcessListener.afterProcess()
- ItemWriterListener.beforeWriter()
- ItemWriterListener.afterWriter()
- ChunkListener.afterChunk()
- StepExecutionListener.afterStep()
- JobExecutionListener.afterJob()
需要注意的是如果在同一个级别定义多个 Listener,比如 Parent Step中定一个了一个 StepListener,自身又定义了一个 StepListener,如果需要两个都执行,那么需要在自身 listener 上加上merge 属性。
获取JobParameters参数
为了能在ItemReader, ItemWriter, ItemProcessor中读取JobParameters中的参数,有三个方法:
- 使用 @BeforeStep注解
@Component
@StepScope
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
JobParameters jobParameters;
@BeforeStep
public void beforeStep(final StepExecution stepExecution) {
jobParameters = stepExecution.getJobParameters();
log.info("jobParameters: {}", jobParameters);
}
- 实现StepExecutionListener接口
@Component
@StepScope
public class MyStepExecutionListener implements StepExecutionListener {
private StepExecution stepExecution;
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
jobParameters = stepExecution.getJobParameters();
}
}
- 把相应的Bean定义成@StepScope,再用@Value直接注入
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
@StepScope
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>()
.name("personItemReader")
.resource(new ClassPathResource("sample-data.csv"))
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.build();
}
@Bean
@StepScope
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
@StepScope
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("...")
.dataSource(dataSource)
.build();
}
@Bean("importUserJob") //这个不要定义StepScope
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(step1)
.end()
.build();
}
@Bean //这个不要定义StepScope
public Step step1(JdbcBatchItemWriter<Person> writer) {
}
重点注意的是,要把ItemReader, ItemWriter, ItemProcessor声明成StepScope,而Job和Step本身不能申明成StepScope
然后在ItemReader, ItemWriter, ItemProcessor中可以直接注入参数了:
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
@Value("#{jobParameters['dataUnitId']}")
private Long dataUnitId;
}
参考知识:
Spring Batch 教程之配置Step