Spring Batch
Job instance(作业实例)
当作业运行时,会创建一个Job Instance(作业实例),它代表作业的一次逻辑运行,可通过作业名称与作业标识参数进行区分。
比如一个业务需求: 每天定期数据同步,作业名称-daily-sync-job 作业标记参数-当天时间
Job Execution(作业执行对象)
当作业运行时,也会创建一个Job Execution(作业执行器),负责记录Job执行情况(比如:开始执行时间,结束时间,处理状态等)。
Job Instance = Job名称 + 识别参数
Job Instance 一次执行创建一个 Job Execution对象
完整的一次Job Instance 执行可能创建一个Job Execution对象,也可能创建多个Job Execution对象
入门案例-H2版
导入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--内存版-->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
测试方法
//之前使用@Component导致jobParameters获取失败
@Configuration
public class JobDemo1 {
//作业启动器
@Autowired
private JobLauncher jobLauncher;
//Job构造工厂(构建job对象)
@Autowired
private JobBuilderFactory jobBuilderFactory;
//step构造工厂(构建step对象)
@Autowired
private StepBuilderFactory stepBuilderFactory;
//构造一个step对象执行的任务
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
//要执行逻辑 step步骤执行逻辑
System.out.println("hello spring batch");
return RepeatStatus.FINISHED;
}
};
}
//构造一个step对象
@Bean
public Step step1(){
//tasklet指step执行逻辑
return stepBuilderFactory.get("step1").tasklet(tasklet()).build();
}
//构造job对象
//start(step1).next(step2)
@Bean
public Job job(){
return jobBuilderFactory.get("hello-job").start(step1()).build();
}
}
启动类
//启动springBatch
@EnableBatchProcessing
@SpringBootApplication
public class SpringbootApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootApplication.class, args);
}
}
入门案例-MySQL版
依赖
<!-- <dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency> -->
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.7.2</version>
</dependency>
application-dev.yaml
此处测试时并没有自动创建表
server:
port: 8896
spring:
datasource:
username: root
password: root
url: jdbc:mariadb://ip地址:3306/springbatch?serverTimezone=GMT%2B8&useSSL=false&allowPublicKeyRetrieval=true
driver-class-name: org.mariadb.jdbc.Driver
# 初始化数据库,文件在依赖jar包中
sql:
init:
schema-locations: classpath:org/springframework/batch/core/schema-mysql.sql
#mode: always
mode: never
其余代码与H2版本相同
作业参数获取
在application中设置program arguments比如,name=zs
方案1:使用ChunkContext类
方案2:使用@Value 延时获取
//之前使用@Component导致jobParameters获取失败
@Configuration
public class JobDemo1 {
//作业启动器
@Autowired
private JobLauncher jobLauncher;
//Job构造工厂(构建job对象)
@Autowired
private JobBuilderFactory jobBuilderFactory;
//step构造工厂(构建step对象)
@Autowired
private StepBuilderFactory stepBuilderFactory;
//构造一个step对象执行的任务
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
//方案1:使用ChunkContext类获取参数
Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();
System.out.println("job param name's value is "+jobParameters.get("name"));
System.out.println("hello spring batch");
return RepeatStatus.FINISHED;
}
};
}
//构造一个step对象执行的任务
//@StepScope在启动项目的时候,不加载该Step步骤bean,等step1()被调用时才加载。这就是所谓延时获取
@StepScope
@Bean
/方案2:使用@Value 延时获取
public Tasklet tasklet2(@Value("#{jobParameters['name']}")String name){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
//获取参数中设置的param的value
System.out.println("tasklet2: params---name is " + name);
return RepeatStatus.FINISHED;
}
};
}
//构造一个step对象
@Bean
public Step step1(){
//tasklet指step执行逻辑
return stepBuilderFactory
.get("step1")
.tasklet(tasklet())
.build();
}
//构造一个step对象
@Bean
public Step step2(){
//tasklet指step执行逻辑
return stepBuilderFactory
.get("step2")
.tasklet(tasklet2(null))
.build();
}
//构造job对象
//start(step1).next(step2)
@Bean
public Job job(){
return jobBuilderFactory
.get("param-job12")
.start(step1())
.next(step2())
.build();
}
}
作业参数校验
定制参数校验器
定制参数校验器
JobParametersInvalidException 异常是Spring Batch 专门提供参数校验失败异常
public class NameParamValidator implements JobParametersValidator {
@Override
public void validate(JobParameters parameters) throws JobParametersInvalidException {
String name = parameters.getString("name");
if(!StringUtils.hasText(name)){
throw new JobParametersInvalidException("name 参数不能为空");
}
}
}
validator()实例方法,将定制的参数解析器加到Spring容器中,修改job()实例方法,加上.validator(validator()) 校验逻辑。
@Configuration
public class JobDemo2 {
//作业启动器
@Autowired
private JobLauncher jobLauncher;
//Job构造工厂(构建job对象)
@Autowired
private JobBuilderFactory jobBuilderFactory;
//step构造工厂(构建step对象)
@Autowired
private StepBuilderFactory stepBuilderFactory;
//构造一个step对象执行的任务
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
//使用ChunkContext类获取参数
Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();
System.out.println("job param name's value is "+jobParameters.get("name"));
System.out.println("hello spring batch");
return RepeatStatus.FINISHED;
}
};
}
//构造一个step对象
@Bean
public Step step1(){
//tasklet指step执行逻辑
return stepBuilderFactory
.get("step1")
.tasklet(tasklet())
.build();
}
//注入自定义校验器
@Bean
public NameParamValidator nameParamValidator(){
return new NameParamValidator();
}
//构造job对象
//start(step1).next(step2)
@Bean
public Job job(){
return jobBuilderFactory
.get("param-valid")
.start(step1())
.validator(nameParamValidator())//参数校验器
.build();
}
}
默认参数校验器
在application中设置program arguments比如,name=zs age=18
@Configuration
public class JobDemo3 {
//作业启动器
@Autowired
private JobLauncher jobLauncher;
//Job构造工厂(构建job对象)
@Autowired
private JobBuilderFactory jobBuilderFactory;
//step构造工厂(构建step对象)
@Autowired
private StepBuilderFactory stepBuilderFactory;
//构造一个step对象执行的任务
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
//方式1:获取参数中设置的param的value
Map<String, Object> parameters = chunkContext.getStepContext().getJobParameters();
System.out.println("params---name: " + parameters.get("name"));
System.out.println("params---age: " + parameters.get("age"));
return RepeatStatus.FINISHED;
}
};
}
//构造一个step对象
@Bean
public Step step1(){
//tasklet指step执行逻辑
return stepBuilderFactory
.get("step1")
.tasklet(tasklet())
.build();
}
//配置默认参数校验器
@Bean
public DefaultJobParametersValidator customDefaultValidator(){
DefaultJobParametersValidator defaultValidator = new DefaultJobParametersValidator();
defaultValidator.setRequiredKeys(new String[]{"name"}); //必填
defaultValidator.setOptionalKeys(new String[]{"age"}); //可选
return defaultValidator;
}
//构造job对象
@Bean
public Job job(){
return jobBuilderFactory
.get("default-param-valid")
.start(step1())
.validator(customDefaultValidator()) //默认参数校验器
.build();
}
}
组合参数校验器
@Configuration
public class JobDemo4 {
//作业启动器
@Autowired
private JobLauncher jobLauncher;
//Job构造工厂(构建job对象)
@Autowired
private JobBuilderFactory jobBuilderFactory;
//step构造工厂(构建step对象)
@Autowired
private StepBuilderFactory stepBuilderFactory;
//构造一个step对象执行的任务
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
//方式1:获取参数中设置的param的value
Map<String, Object> parameters = chunkContext.getStepContext().getJobParameters();
System.out.println("params---name: " + parameters.get("name"));
System.out.println("params---age: " + parameters.get("age"));
return RepeatStatus.FINISHED;
}
};
}
//构造一个step对象
@Bean
public Step step1(){
//tasklet指step执行逻辑
return stepBuilderFactory
.get("step1")
.tasklet(tasklet())
.build();
}
//配置组合参数校验器
@Bean
public CompositeJobParametersValidator compositeValidator(){
DefaultJobParametersValidator defaultValidator = new DefaultJobParametersValidator();
defaultValidator.setRequiredKeys(new String[]{"name"}); //name必填
defaultValidator.setOptionalKeys(new String[]{"age"}); //age可选
NameParamValidator nameParamValidator = new NameParamValidator(); //name 不能为空
CompositeJobParametersValidator compositeValidator = new CompositeJobParametersValidator();
//按照传入的顺序,先执行defaultValidator 后执行nameParamValidator
compositeValidator.setValidators(Arrays.asList(defaultValidator, nameParamValidator));
try {
compositeValidator.afterPropertiesSet(); //判断校验器是否为null
} catch (Exception e) {
e.printStackTrace();
}
return compositeValidator;
}
//构造job对象
//start(step1).next(step2)
@Bean
public Job job(){
return jobBuilderFactory
.get("default-param-valid3")
.start(step1())
.validator(compositeValidator()) //默认参数校验器
.build();
}
}
作业增量参数
作业递增run.id参数
Spring Batch 提供一个run.id自增参数增量器:RunIdIncrementer,每次启动时,里面维护名为run.id 标识参数,每次启动让其自增 1。
@Configuration
public class IncrementParamJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Map<String, Object> parameters = chunkContext.getStepContext().getJobParameters();
System.out.println("params---run.id:" + parameters.get("run.id"));
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("incr-params-job")
.start(step1())
.incrementer(new RunIdIncrementer()) //参数增量器(run.id自增)
.build();
}
}
作业时间戳参数
自定义时间戳增量器
//时间戳作业参数增量器
public class DailyTimestampParamIncrementer implements JobParametersIncrementer {
@Override
public JobParameters getNext(JobParameters parameters) {
return new JobParametersBuilder(parameters)
.addLong("daily", new Date().getTime()) //添加时间戳
.toJobParameters();
}
}
修改tasklet() 方法,获取 daily参数
@Configuration
public class IncrementParamJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Map<String, Object> parameters = chunkContext.getStepContext().getJobParameters();
//System.out.println("params---run.id:" + parameters.get("run.id"));
System.out.println("params---daily:" + parameters.get("daily"));
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet())
.build();
}
//时间戳增量器
@Bean
public DailyTimestampParamIncrementer dailyTimestampParamIncrementer(){
return new DailyTimestampParamIncrementer();
}
@Bean
public Job job(){
return jobBuilderFactory.get("incr-params-job2")
.start(step1())
//.incrementer(new RunIdIncrementer()) //参数增量器(run.id自增)
.incrementer(dailyTimestampParamIncrementer()) //时间戳增量器
.build();
}
}
执行上下文ExecutionContext
Job ExecutionContext(作业执行上下文)
作用域:一次作业运行,所有Step步骤间数据共享。
Step ExecutionContext(步骤执行上下文)
作用域:一次步骤运行,单个Step步骤间(ItemReader/ItemProcessor/ItemWrite组件间)数据共享。
作业线
Job—JobInstance—JobContext—JobExecution–ExecutionContext
步骤线
Step–StepContext --StepExecution–ExecutionContext
小案例
在stepContext设置的参数作用域仅在StepExecution执行范围有效,而JobContext设置参数作用与在所有StepExcution有效
//作业-ExecutionContext 添加: key-step1-job value-step1-job
//步骤-ExecutionContext 添加: key-step1-step value-step1-step
@Configuration
public class ExecutionContextJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
//步骤
ExecutionContext stepEC = chunkContext.getStepContext().getStepExecution().getExecutionContext();
stepEC.put("key-step1-step","value-step1-step");
System.out.println("------------------1---------------------------");
//作业
ExecutionContext jobEC = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
jobEC.put("key-step1-job","value-step1-job");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
//步骤
ExecutionContext stepEC = chunkContext.getStepContext().getStepExecution().getExecutionContext();
System.err.println(stepEC.get("key-step1-step"));//null
System.out.println("------------------2---------------------------");
//作业
ExecutionContext jobEC = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
System.err.println(jobEC.get("key-step1-job"));//value-step1-job
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("execution-context-job")
.start(step1())
.next(step2())
.incrementer(new RunIdIncrementer())//作业递增run.id参数
.build();
}
}
步骤数据保存在Step ExecutionContext,只能在Step中使用,作业数据保存在Job ExecutionContext,可以在所有Step中共享
Tasklet
简单SimpleTasklet
RepeatStatus.FINISHED:当前步骤结束,可以为成功也可以表示不成,仅代表当前step执行结束了
RepeatStatus.CONTINUABLE:当前步骤依然可以执行,如果步骤返回该值,会一直循环执行
@Configuration
public class SimpleTaskletJob {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("------>" + System.currentTimeMillis());
//return RepeatStatus.CONTINUABLE; //循环执行
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("step-simple-tasklet-job")
.start(step1())
.build();
}
}
块状ChunkTasklet
ChunkTasklet 执行特点,ItemReader会一直循环读,直到返回null,才停止。而processor也是一样,itemReader读多少次,它处理多少次, itemWriter 一次性输出当前次输入的所有数据
chunkSize 表示: 一趟需要ItemReader读多少次,ItemProcessor要处理多少次
@Configuration
public class JobChunk {
//作业启动器
@Autowired
private JobLauncher jobLauncher;
//Job构造工厂(构建job对象)
@Autowired
private JobBuilderFactory jobBuilderFactory;
//step构造工厂(构建step对象)
@Autowired
private StepBuilderFactory stepBuilderFactory;
int timer = 10;
@Bean
public ItemReader<String> itemReader(){
return new ItemReader<String>() {
@Override
public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if(timer > 0){
System.out.println("-------------read------------");
return "read-ret-->" + timer--;
}else{
return null;
}
}
};
}
//处理操作
@Bean
public ItemProcessor<String, String> itemProcessor(){
return new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
System.out.println("-------------process------------>" + item);
return "process-ret->" + item;
}
};
}
//写操作
@Bean
public ItemWriter<String> itemWriter(){
return new ItemWriter<String>() {
@Override
public void write(List<? extends String> items) throws Exception {
System.out.println(items);
}
};
}
//构造一个step对象
@Bean
public Step step1(){
//tasklet指step执行逻辑
return stepBuilderFactory
.get("step1")
//1个批次,itemReader读3次,itemProcessor处理3次
.<String, String>chunk(3)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
//构造job对象
@Bean
public Job job(){
return jobBuilderFactory
.get("chunk-job")
.start(step1())
.incrementer(new RunIdIncrementer())//作业递增run.id参数
.build();
}
}
执行结果
-------------read------------
-------------read------------
-------------read------------
-------------process------------>read-ret-10
-------------process------------>read-ret-9
-------------process------------>read-ret-8
itemWriter()
[process-ret->read-ret-10, process-ret->read-ret-9, process-ret->read-ret-8]
-------------read------------
-------------read------------
-------------read------------
-------------process------------>read-ret-7
-------------process------------>read-ret-6
-------------process------------>read-ret-5
itemWriter()
[process-ret->read-ret-7, process-ret->read-ret-6, process-ret->read-ret-5]
-------------read------------
-------------read------------
-------------read------------
-------------process------------>read-ret-4
-------------process------------>read-ret-3
-------------process------------>read-ret-2
itemWriter()
[process-ret->read-ret-4, process-ret->read-ret-3, process-ret->read-ret-2]
-------------read------------
-------------process------------>read-ret-1
itemWriter()
[process-ret->read-ret-1]
监听器
作业监听器JobExecutionListener
记录作业执行前,执行中,与执行后的状态
方式1:接口方式
实现JobExecutionListener 接口,重写beforeJob,afterJob 2个方法
/**
* 自定义作业状态监听器
*/
public class JobStateListener implements JobExecutionListener {
//作业执行前
@Override
public void beforeJob(JobExecution jobExecution) {
System.err.println("执行前-status:" + jobExecution.getStatus());
}
//作业执行后
@Override
public void afterJob(JobExecution jobExecution) {
System.err.println("执行后-status:" + jobExecution.getStatus());
}
}
修改job()方法,添加.listener(jobStateListener()) 状态监听器
@Configuration
public class StatusListenerJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
JobExecution jobExecution = contribution.getStepExecution().getJobExecution();
System.err.println("执行中-status:" + jobExecution.getStatus());
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet())
.build();
}
//状态监听器
@Bean
public JobStateListener jobStateListener(){
return new JobStateListener();
}
@Bean
public Job job(){
return jobBuilderFactory.get("status-job2")
.start(step1())
.listener(jobStateListener())
.build();
}
}
执行结果
执行前-status:STARTED
执行中-status:STARTED
执行后-status:COMPLETED
方式2:注解方式
使用@BeforeJob @AfterJob 2个注解
//作业状态--注解方式
public class JobStateAnnoListener {
@BeforeJob
public void beforeJob(JobExecution jobExecution) {
System.err.println("执行前-anno-status:" + jobExecution.getStatus());
}
@AfterJob
public void afterJob(JobExecution jobExecution) {
System.err.println("执行后-anno-status:" + jobExecution.getStatus());
}
}
job中添加监听器
@Configuration
public class StatusListenerJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
JobExecution jobExecution = contribution.getStepExecution().getJobExecution();
System.err.println("执行中-status:" + jobExecution.getStatus());
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet())
.build();
}
//状态监听器
@Bean
public JobStateListener jobStateListener(){
return new JobStateListener();
}
@Bean
public Job job(){
return jobBuilderFactory.get("status-job3")
.start(step1())
//.listener(jobStateListener())
.listener(JobListenerFactoryBean.getListener(new JobStateAnnoListener()))
.build();
}
}
步骤监听器StepExecutionListener
自定义监听接口
public class MyStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("-----------beforeStep--------->");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("-----------afterStep--------->");
return stepExecution.getExitStatus(); //不改动返回状态
}
}
在step方法中加入stepListener即可
@Configuration
public class JobForListener {
//作业启动器
@Autowired
private JobLauncher jobLauncher;
//Job构造工厂(构建job对象)
@Autowired
private JobBuilderFactory jobBuilderFactory;
//step构造工厂(构建step对象)
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public MyStepListener stepListener(){
return new MyStepListener();
}
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("------>" + System.currentTimeMillis());
return RepeatStatus.FINISHED;
}
};
}
//构造一个step对象
@Bean
public Step step1(){
//tasklet指step执行逻辑
return stepBuilderFactory
.get("step1")
.tasklet(tasklet())
.listener(stepListener())
.build();
}
//构造job对象
@Bean
public Job job(){
return jobBuilderFactory
.get("step-listener-job")
.start(step1())
.incrementer(new RunIdIncrementer())//作业递增run.id参数
.build();
}
}
执行结果
-----------beforeStep--------->
------>1725852807773
-----------afterStep--------->
chunk块监听器ChunkListener
实现ChunkListener接口
与StepListener相比,唯一的区别是多了一个afterChunkError 方法,表示当chunk执行失败后回调
多步骤
多步骤执行
使用next 执行下一步步骤,如果还有第三个step,再加一个next(step3)即可
@Configuration
class MultiStepJob {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
protected StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("--------------tasklet1---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("--------------tasklet2---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("step-multi-job1")
.start(step1())
.next(step2()) //job 使用next 执行下一步骤
.incrementer(new RunIdIncrementer())
.build();
}
}
多步骤控制
1> on 方法表示条件, 上一个步骤返回值,匹配指定的字符串,满足后执行后续 to 步骤
2> * 为通配符,表示能匹配任意返回值
3> from 表示从某个步骤开始进行条件判断
4> 分支判断结束,流程以end方法结束,表示if/else逻辑结束
5> on 方法中字符串取值于 ExitStatus 类常量,当然也可以自定义。
@Configuration
public class ConditionStepJob {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet firstTasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("--------------firstTasklet---------------");
int a = 1/0;
return RepeatStatus.FINISHED;
//throw new RuntimeException("测试fail结果");
}
};
}
@Bean
public Tasklet successTasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("--------------successTasklet---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet failTasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("--------------failTasklet---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step firstStep(){
return stepBuilderFactory.get("step1")
.tasklet(firstTasklet())
.build();
}
@Bean
public Step successStep(){
return stepBuilderFactory.get("successStep")
.tasklet(successTasklet())
.build();
}
@Bean
public Step failStep(){
return stepBuilderFactory.get("failStep")
.tasklet(failTasklet())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("condition-multi-job")
//if success
.start(firstStep())
.on("FAILED").to(failStep())
//else
.from(firstStep()).on("*").to(successStep())
.end()
.incrementer(new RunIdIncrementer())
.build();
}
}
执行结果
--------------firstTasklet---------------
java.lang.ArithmeticException: / by zero
--------------failTasklet---------------
条件分支控制-JobExecutionDecider决策器
on条件的值取值于ExitStatus 类常量,具体值有:UNKNOWN,EXECUTING,COMPLETED,NOOP,FAILED,STOPPED等,也可以实现JobExecutionDecider接口实现状态值定制
Spring Batch 提供 3个方法决定作业流程走向:
end():作业流程直接成功结束,返回状态为:COMPLETED
fail():作业流程直接失败结束,返回状态为:FAILED
stopAndRestart(step) :作业流程中断结束,返回状态:STOPPED 再次启动时,从step位置开始执行 (注意:前提是参数与Job Name一样)
定义一个决策器,随机决定返回A / B / C
//定义一个决策器,随机决定返回A / B / C
public class MyStatusDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
long ret = new Random().nextInt(3);
if(ret == 0){
return new FlowExecutionStatus("A");
}else if(ret == 1){
return new FlowExecutionStatus("B");
}else{
return new FlowExecutionStatus("C");
}
}
}
@Configuration
public class CustomizeStatusStepJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet taskletFirst(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("--------------taskletFirst---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet taskletA(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("--------------taskletA---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet taskletB(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("--------------taskletB---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet taskletDefault(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("--------------taskletDefault---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step firstStep(){
return stepBuilderFactory.get("firstStep")
.tasklet(taskletFirst())
.build();
}
@Bean
public Step stepA(){
return stepBuilderFactory.get("stepA")
.tasklet(taskletA())
.build();
}
@Bean
public Step stepB(){
return stepBuilderFactory.get("stepB")
.tasklet(taskletB())
.build();
}
@Bean
public Step defaultStep(){
return stepBuilderFactory.get("defaultStep")
.tasklet(taskletDefault())
.build();
}
//决策器
@Bean
public MyStatusDecider statusDecider(){
return new MyStatusDecider();
}
//定义作业
//先执行firstStep,如果返回值为A,执行stepA, 返回值为B,执行stepB, 其他执行defaultStep
@Bean
public Job job(){
return jobBuilderFactory.get("custom-step-job")
.start(firstStep())
//决策器控制下一步
.next(statusDecider())
.from(statusDecider()).on("A").to(stepA())
.from(statusDecider()).on("B").to(stepB())
.from(statusDecider()).on("*").to(defaultStep())
.end()//作业流程直接成功结束,返回状态为:COMPLETED
.incrementer(new RunIdIncrementer())
.build();
}
}
流式步骤FlowStep
使用FlowStep的好处在于,在处理复杂额批处理逻辑中,flowStep可以单独实现一个子步骤流程,为批处理提供更高的灵活性
先后执行stepA,stepB,stepC, 其中stepB中包含stepB1, stepB2,stepB3
@Configuration
public class FlowStepJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet taskletA(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("------------stepA--taskletA---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet taskletB1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("------------stepB--taskletB1---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet taskletB2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("------------stepB--taskletB2---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet taskletB3(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("------------stepB--taskletB3---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet taskletC(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.out.println("------------stepC--taskletC---------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step stepA(){
return stepBuilderFactory.get("stepA")
.tasklet(taskletA())
.build();
}
@Bean
public Step stepB1(){
return stepBuilderFactory.get("stepB1")
.tasklet(taskletB1())
.build();
}
@Bean
public Step stepB2(){
return stepBuilderFactory.get("stepB2")
.tasklet(taskletB2())
.build();
}
@Bean
public Step stepB3(){
return stepBuilderFactory.get("stepB3")
.tasklet(taskletB3())
.build();
}
//流式步骤
@Bean
public Flow flowB(){
return new FlowBuilder<Flow>("flowB")
.start(stepB1())
.next(stepB2())
.next(stepB3())
.build();
}
@Bean
public Step stepB(){
return stepBuilderFactory.get("stepB")
.flow(flowB())
.build();
}
@Bean
public Step stepC(){
return stepBuilderFactory.get("stepC")
.tasklet(taskletC())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("flow-step-job")
.start(stepA())
.next(stepB())
.next(stepC())
.incrementer(new RunIdIncrementer())
.build();
}
}
执行结果
------------stepA--taskletA---------------
------------stepB--taskletB1---------------
------------stepB--taskletB2---------------
------------stepB--taskletB3---------------
------------stepC--taskletC---------------
批处理数据表
batch_job_instance表
当作业第一次执行时,会根据作业名,标识参数生成一个唯一JobInstance对象
batch_job_execution表
每次启动作业时,都会创建一个JobExecution作业执行对象,代表一次作业执行,该对象记录存放于batch_job_execution 表。
batch_job_execution_context表
batch_job_execution_context用于保存JobContext对应的ExecutionContext对象数据。保存job执行对象中所有step的上下文。
batch_job_execution_params表
作业启动时使用标识性参数保存的位置:batch_job_execution_params, 一个参数一个记录
batch_step_execution表
作业启动,执行步骤,step中tasklet和块状ChunkTasklet执行信息
batch_step_execution_context表
步骤执行对象上下文信息
作业控制
作业的运行指的是对作业的控制,包括作业启动,作业停止,作业异常处理,作业重启处理等
作业启动
SpringBoot 启动
spring:
batch:
job:
enabled: true #false表示不启动,true表示启动
RESTful API 启动
application.yml关闭自动启动
spring:
batch:
job:
enabled: false #false表示不启动,true表示启动
restful api启动
@RestController
public class HelloController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@GetMapping("/job/start")
public ExitStatus start() throws Exception {
//启动job作业
JobExecution jobExet = jobLauncher.run(job, new JobParameters());
return jobExet.getExitStatus();
}
}
执行结果
http://localhost:8896/job/start
{"exitCode":"COMPLETED","exitDescription":"","running":false}
RESTful API 启动 带请求参数
application.yml关闭自动启动
spring:
batch:
job:
enabled: false #false表示不启动,true表示启动
作业使用run.id自增,自增run.id参数方便每次启动
//构造一个job对象
@Bean
public Job job(){
return jobBuilderFactory.get("hello-restful-job")
.start(step1())
.incrementer(new RunIdIncrementer())
.build();
}
通过job展示对象,JobParameters对象,指定需要启动的job和参数
@RestController
public class HelloController {
@Autowired
private JobLauncher launcher;
@Autowired
private Job job;
@Autowired
private JobExplorer jobExplorer; //job 展示对象
@GetMapping("/job/start")
public ExitStatus startJob(String name) throws Exception {
//启动job作业
JobParameters jp = new JobParametersBuilder(jobExplorer)
.getNextJobParameters(job)
.addString("name", name)
.toJobParameters();
JobExecution jobExet = launcher.run(job, jp);
return jobExet.getExitStatus();
}
}
作业停止
一种自然结束
作业成功执行,正常停止,此时作业返回状态为:COMPLETED
一种异常结束
作业执行过程因为各种意外导致作业中断而停止,大多数作业返回状态为:FAILED
一种编程结束
方案1:Step 步骤监听器方式
监听器
public class StopStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
//不满足
if(ResouceCount.totalCount != ResouceCount.readCount){
return ExitStatus.STOPPED; //手动停止,后续可以重启
}
return stepExecution.getExitStatus();
}
}
public class ResouceCount {
public static int totalCount = 100; //总数
public static int readCount = 0; //读取数
}
Step步骤监听器控制流程
@Configuration
public class ListenerJobStopJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private int readCount = 50; //模拟只读取50个
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (int i = 1; i <= readCount; i++) {
System.out.println("---------------step1执行-"+i+"------------------");
ResouceCount.readCount ++;
}
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("step2执行了.......");
System.err.println("readCount:" + ResouceCount.readCount + ", totalCount:" + ResouceCount.totalCount);
return RepeatStatus.FINISHED;
}
};
}
@Bean
public StopStepListener stopStepListener(){
return new StopStepListener();
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.listener(stopStepListener())
//因为第一次执行step1步骤,虽然不满足条件,但是它仍属于正常结束(正常执行完tasklet1的流程),状态码:COMPLETED
.allowStartIfComplete(true) //执行完后,运行重启
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-stop-job")
.start(step1())
//停止并允许重启--下次重启,从step1步骤开始执行
.on("STOPPED").stopAndRestart(step1())
.from(step1()).on("*").to(step2()).end()
.build();
}
}
方案2:StepExecution停止标记
StepExecution#setTerminateOnly() 给运行中的stepExecution设置停止标记,Spring Batch 识别后直接停止步骤,进而停止流程
@Configuration
public class SignJobStopJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private int readCount = 50; //模拟只读取50个
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
for (int i = 1; i <= readCount; i++) {
System.out.println("---------------step1执行-"+i+"------------------");
ResouceCount.readCount ++;
}
if(ResouceCount.readCount != ResouceCount.totalCount){
// 设置停止标记
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
}
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("step2执行了.......");
System.err.println("readCount:" + ResouceCount.readCount + ", totalCount:" + ResouceCount.totalCount);
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.allowStartIfComplete(true)
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-stop-job")
.start(step1())
//.on("STOPPED").stopAndRestart(step1())
//.from(step1()).on("*").to(step2()).end()
.next(step2())
.build();
}
}
作业重启
禁止重启
这种适用一次性执行场景,如果执行失败,就不允许再次执行。可以使用作业的禁止重启逻辑
//tasklet1让状态处于停止状态
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet1-------------");
//停止步骤
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
return RepeatStatus.FINISHED;
}
};
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-forbid-restart-job")
//重启时报错,因为禁止重启,只允许启动一次
//JobInstance already exists and is not restartable
.preventRestart() //禁止重启
.start(step1())
.next(step2())
.build();
}
限制重启次数
适用于重启次数有限的场景,比如下载/读取操作,可能因为网络原因导致下载/读取失败,运行重试几次,但是不能无限重试。
startLimit(2) 表示运行重启2次,注意,第一次启动也算一次
@Configuration
public class JobLimitRestartJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet1-------------");
//停止步骤
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet2-------------");
return RepeatStatus.FINISHED;
}
};
}
//限制启动次数最多2次
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.startLimit(2)
.tasklet(tasklet1())
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-restart-limit-job")
.start(step1())
.next(step2())
.build();
}
}
无限重启
Spring Batch 限制同job名跟同标识参数作业只能成功执行一次,这是Spring Batch 定理,无法改变的。但是,对于步骤不一定适用,可以通过步骤的allowStartIfComplete(true) 实现步骤的无限重启。
public class JobAllowRestartJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet1(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet1-------------");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
System.err.println("-------------tasklet2-------------");
return RepeatStatus.FINISHED;
}
};
}
//给step1 step2 添加上allowStartIfComplete(true), 可以无限启动
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.allowStartIfComplete(true)
.tasklet(tasklet1())
.build();
}
@Bean
public Step step2(){
return stepBuilderFactory.get("step2")
.allowStartIfComplete(true)
.tasklet(tasklet2())
.build();
}
//定义作业
@Bean
public Job job(){
return jobBuilderFactory.get("job-allow-restart-job")
.start(step1())
.next(step2())
.build();
}
}
ItemWriter读数据
读平面文件
平面文件一般指的都是简单行/多行结构的纯文本文件,比如记事本记录文件。与xml这种区别在于没有结构,没有标签的限制。Spring Batch默认使用 FlatFileItemReader 实现平面文件的输入。
方式1:delimited–字符串截取
resources目录下新建users.txt
1#dafei#18
2#xiaofei#16
3#laofei#20
4#zhongfei#19
5#feifei#15
实体类
@Data
public class User {
private Long id;
private String name;
private int age;
}
核心在userItemReader() 实例方法
@Configuration
public class FlatReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
//FlatFileItemReader spring batch 平面文件读入类
//这个类操作特点:一行一行的读数据
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
//指定读取的文件
.resource(new ClassPathResource("users.txt"))
//读取出一行数据,该如何分割数据,默认以,分割,当前使用#号分割
.delimited().delimiter("#")
//给分割后数据打name标记,后续跟User对象属性进行映射
.names("id", "name", "age")
//读取出一行数据封装成什么对象
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("flat-reader-job")
.start(step())
.build();
}
}
测试结果
User(id=1, name=dafei, age=18)
User(id=2, name=xiaofei, age=16)
User(id=3, name=laofei, age=20)
User(id=4, name=zhongfei, age=19)
User(id=5, name=feifei, age=15)
方式2:FieldSetMapper–字段映射
FlatFileItemReaderBuilder 提供的方法,用于字段映射,方法参数是一个FieldSetMapper接口对象
源文件users2.txt
1#dafei#18#广东#广州#天河区
2#xiaofei#16#四川#成都#武侯区
3#laofei#20#广西#桂林#雁山区
4#zhongfei#19#广东#广州#白云区
5#feifei#15#广东#广州#越秀区
实体类
@Data
public class User {
private Long id;
private String name;
private int age;
private String address;
}
自定义的字段映射器
public class UserFieldMapper implements FieldSetMapper<User> {
@Override
public User mapFieldSet(FieldSet fieldSet) throws BindException {
//自己定义映射逻辑
User User = new User();
User.setId(fieldSet.readLong("id"));
User.setAge(fieldSet.readInt("age"));
User.setName(fieldSet.readString("name"));
String addr = fieldSet.readString("province") + " "
+ fieldSet.readString("city") + " " + fieldSet.readString("area");
User.setAddress(addr);
return User;
}
}
fieldSetMapper(userFieldMapper()) 用上自定义的字段映射器
@Configuration
public class MapperFlatReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public UserFieldMapper userFieldMapper(){
return new UserFieldMapper();
}
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userMapperItemReader")
.resource(new ClassPathResource("users2.txt"))
.delimited().delimiter("#")
.names("id", "name", "age", "province", "city", "area")
.fieldSetMapper(userFieldMapper())
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("mapper-flat-reader-job")
.start(step())
.build();
}
}
测试结果
User(id=1, name=dafei, age=18, address=广东 广州 天河区)
User(id=2, name=xiaofei, age=16, address=四川 成都 武侯区)
User(id=3, name=laofei, age=20, address=广西 桂林 雁山区)
User(id=4, name=zhongfei, age=19, address=广东 广州 白云区)
User(id=5, name=feifei, age=15, address=广东 广州 越秀区)
读JSON文件
json格式文档:users.json
[
{"id":1, "name":"dafei", "age":18},
{"id":2, "name":"xiaofei", "age":17},
{"id":3, "name":"zhongfei", "age":16},
{"id":4, "name":"laofei", "age":15},
{"id":5, "name":"feifei", "age":14}
]
实体类
@Data
public class User {
private Long id;
private String name;
private int age;
}
userItemReader() 实例方法,明确指定转换成json格式需要使用转换器,本次使用的Jackson
@Configuration
public class JsonFlatReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public JsonItemReader<User> userItemReader(){
ObjectMapper objectMapper = new ObjectMapper();
JacksonJsonObjectReader<User> jsonObjectReader = new JacksonJsonObjectReader<>(User.class);
jsonObjectReader.setMapper(objectMapper);
return new JsonItemReaderBuilder<User>()
.name("userJsonItemReader")
.jsonObjectReader(jsonObjectReader)
.resource(new ClassPathResource("users.json"))
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("json-flat-reader-job")
.start(step())
.build();
}
}
读数据库
基于游标方式
user表数据---->User对象
RowMapper:表与实体对象映射实现类
public class UserRowMapper implements RowMapper<User> {
@Override
//rs就是游标
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setId(rs.getLong("id"));
user.setName(rs.getString("name"));
user.setAge(rs.getInt("age"));
return user;
}
}
需要明确指定操作数据库sql
需要明确指定游标回来之后,数据映射规则rowMapper
@Configuration
public class CursorDBReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
//将列数据与对象属性一一映射
public UserRowMapper userRowMapper(){
return new UserRowMapper();
}
@Bean
public JdbcCursorItemReader<User> userItemReader(){
return new JdbcCursorItemReaderBuilder<User>()
.name("userCursorItemReader")
//连接数据库,spring容器自己实现
.dataSource(dataSource)
//执行sql查询,将返回数据以游标形式一条条读
.sql("select * from user where age > ?")
//数据库读出数据跟用户对象属性一一映射
.rowMapper(userRowMapper())
//拼接参数
.preparedStatementSetter(new ArgumentPreparedStatementSetter(new Object[]{16}))
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("cursor-db-reader-job")
.start(step())
.build();
}
}
基于分页方式
RowMapper:表与实体对象映射实现类
public class UserRowMapper implements RowMapper<User> {
@Override
//rs就是游标
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setId(rs.getLong("id"));
user.setName(rs.getString("name"));
user.setAge(rs.getInt("age"));
return user;
}
}
pagingQueryProvider 用于拼接分页SQL
userItemReader() 组装分页查询逻辑
@Configuration
public class PageDBReaderJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public UserRowMapper userRowMapper(){
return new UserRowMapper();
}
@Bean
public PagingQueryProvider pagingQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setSelectClause("select *"); //查询列
factoryBean.setFromClause("from user"); //查询的表
factoryBean.setWhereClause("where age > :age"); //where 条件 :age占位符
factoryBean.setSortKey("id"); //结果排序
return factoryBean.getObject();
}
@Bean
public JdbcPagingItemReader<User> userItemReader() throws Exception {
HashMap<String, Object> param = new HashMap<>();
param.put("age", 16);
return new JdbcPagingItemReaderBuilder<User>()
.name("userPagingItemReader")
.dataSource(dataSource) //数据源
.queryProvider(pagingQueryProvider()) //分页逻辑
.parameterValues(param) //条件
.pageSize(10) //每页显示条数
.rowMapper(userRowMapper()) //映射规则
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public Step step() throws Exception {
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("page-db-reader-job1")
.start(step())
.build();
}
}
读取异常
跳过异常记录
@Bean
public Step step() throws Exception {
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.faultTolerant() //容错
.skip(Exception.class) //跳过啥异常
.noSkip(RuntimeException.class) //不能跳过啥异常
.skipLimit(10) //跳过异常次数
.skipPolicy(new SkipPolicy() {
@Override
public boolean shouldSkip(Throwable t, int skipCount) throws SkipLimitExceededException {
//定制跳过异常与异常次数
return false;
}
})
.build();
}
异常记录记日志
public class ErrorItemReaderListener implements ItemReadListener {
@Override
public void beforeRead() {
}
@Override
public void afterRead(Object item) {
}
@Override
public void onReadError(Exception ex) {
System.out.println("记录读数据相关信息...");
}
}
ItemProcessor处理数据
ValidatingItemProcessor:校验处理器
数据文件users-validate.txt
1##18
2##16
3#laofei#20
4#zhongfei#19
5#feifei#15
导入校验依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
实体类
@Data
public class User {
private Long id;
@NotBlank(message = "用户名不能为null或空串")
private String name;
private int age;
}
BeanValidatingItemProcessor 类是Spring Batch 提供现成的Validator校验类
@Configuration
public class ValidationProcessorJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users-validate.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
//Validator校验类
public BeanValidatingItemProcessor<User> beanValidatingItemProcessor(){
BeanValidatingItemProcessor<User> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true); //不满足条件丢弃数据
return beanValidatingItemProcessor;
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
//处理时加上校验
.processor(beanValidatingItemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("validate-processor-job")
.start(step())
.build();
}
}
User(id=3, name=laofei, age=20)
User(id=4, name=zhongfei, age=19)
User(id=5, name=feifei, age=15)
ItemProcessorAdapter:适配器处理器
users-adapter.txt
1#dafei#18
2#xiaofei#16
3#laofei#20
4#zhongfei#19
5#feifei#15
自定义的大小写处理类
public class UserServiceImpl {
public User toUppeCase(User user){
user.setName(user.getName().toUpperCase());
return user;
}
}
@Configuration
public class AdapterProcessorJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users-adapter.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
//用户名转换类
public UserServiceImpl userService(){
return new UserServiceImpl();
}
//处理器适配器
@Bean
public ItemProcessorAdapter<User, User> itemProcessorAdapter(){
//适配器类,绑定自定义的UserServiceImpl类与toUppeCase方法
ItemProcessorAdapter<User, User> adapter = new ItemProcessorAdapter<>();
adapter.setTargetObject(userService());
adapter.setTargetMethod("toUppeCase");
return adapter;
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.processor(itemProcessorAdapter())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("adapter-processor-job")
.start(step())
.build();
}
}
处理结果
User(id=1, name=DAFEI, age=18)
User(id=2, name=XIAOFEI, age=16)
User(id=3, name=LAOFEI, age=20)
User(id=4, name=ZHONGFEI, age=19)
User(id=5, name=FEIFEI, age=15)
ScriptItemProcessor:脚本处理器
userScript.js
item.setName(item.getName().toUpperCase());
item;
ScriptItemProcessor 类用于加载js 脚本并处理js脚本
@Configuration
public class ScriptProcessorJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users-adapter.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
//ScriptItemProcessor 类用于加载js 脚本并处理js脚本
@Bean
public ScriptItemProcessor<User, User> scriptItemProcessor(){
ScriptItemProcessor<User, User> scriptItemProcessor = new ScriptItemProcessor();
scriptItemProcessor.setScript(new ClassPathResource("userScript.js"));
return scriptItemProcessor;
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.processor(scriptItemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("script-processor-job")
.start(step())
.build();
}
}
CompositeItemProcessor:组合处理器
读取文件:users-validate.txt
1##18
2##16
3#laofei#20
4#zhongfei#19
5#feifei#15
封装的实体对象
@Data
public class User {
private Long id;
@NotBlank(message = "用户名不能为null或空串")
private String name;
private int age;
}
用于转换大写工具类
public class UserServiceImpl {
public User toUppeCase(User user){
user.setName(user.getName().toUpperCase());
return user;
}
}
使用setDelegates 操作将其他ItemProcessor 处理合并成一个
//组合多个处理器
@Configuration
public class CompositeProcessorJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users-validate.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public UserServiceImpl userService(){
return new UserServiceImpl();
}
//校验name是否为空
@Bean
public BeanValidatingItemProcessor<User> beanValidatingItemProcessor(){
BeanValidatingItemProcessor<User> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true); //不满足条件丢弃数据
return beanValidatingItemProcessor;
}
//name大小写转换
@Bean
public ItemProcessorAdapter<User, User> itemProcessorAdapter(){
ItemProcessorAdapter<User, User> adapter = new ItemProcessorAdapter<>();
adapter.setTargetObject(userService());
adapter.setTargetMethod("toUppeCase");
return adapter;
}
//组合多个处理器:校验name是否为空 + name大小写转换
@Bean
public CompositeItemProcessor<User, User> compositeItemProcessor(){
CompositeItemProcessor<User, User> compositeItemProcessor = new CompositeItemProcessor<>();
compositeItemProcessor.setDelegates(Arrays.asList(
beanValidatingItemProcessor(), itemProcessorAdapter()
));
return compositeItemProcessor;
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.processor(compositeItemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("composite-processor-job1")
.start(step())
.build();
}
}
处理结果
User(id=3, name=LAOFEI, age=20)
User(id=4, name=ZHONGFEI, age=19)
User(id=5, name=FEIFEI, age=15)
自定义ItemProcessor处理器
自定义处理器
public class CustomizeItemProcessor implements ItemProcessor<User,User> {
@Override
public User process(User item) throws Exception {
//id 为偶数的用户放弃
//返回null时候 读入的item会被放弃,不会进入itemwriter
return item.getId() % 2 != 0 ? item : null;
}
}
processor(customizeItemProcessor())处理时配置自定义处理器
@Configuration
public class CustomizeProcessorJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public ItemWriter<User> itemWriter(){
return new ItemWriter<User>() {
@Override
public void write(List<? extends User> items) throws Exception {
items.forEach(System.err::println);
}
};
}
@Bean
public CustomizeItemProcessor customizeItemProcessor(){
return new CustomizeItemProcessor();
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.processor(customizeItemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("customize-processor-job")
.start(step())
.build();
}
}
ItemWriter写数据
平面文件txt输出
user.txt读文件
1#dafei#18
2#xiaofei#16
3#laofei#20
4#zhongfei#19
5#feifei#15
实体对象
@Data
public class User {
private Long id;
private String name;
private int age;
}
itemWriter() 方法指定格式和输出文件
@Configuration
public class FlatWriteJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
@Bean
public FlatFileItemWriter<User> itemWriter(){
return new FlatFileItemWriterBuilder<User>()
.name("userItemWriter")
.resource(new PathResource("c:/outUser.txt")) //输出的文件
.formatted() //数据格式指定
.format("id: %s,姓名:%s,年龄:%s") //输出数据格式
.names("id", "name", "age") //需要输出属性
.shouldDeleteIfEmpty(true) //如果读入数据为空,输出时创建文件直接删除
//.shouldDeleteIfExists(true) //如果输出文件已经存在,则删除
.append(true) //如果输出文件已经存在, 不删除,直接追加到现有文件中
.build();
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("flat-writer-job")
.start(step())
.build();
}
}
输出文件:“c:/outUser.txt”
id: 1,姓名:dafei,年龄:18
id: 2,姓名:xiaofei,年龄:16
id: 3,姓名:laofei,年龄:20
id: 4,姓名:zhongfei,年龄:19
id: 5,姓名:feifei,年龄:15
Json文件输出
注意:json格式对象调度器
默认提供调度器有2个:JacksonJsonObjectMarshaller GsonJsonObjectMarshaller 分别对应Jackson 跟 Gson 2种json格式解析逻辑
@Configuration
public class JsonWriteJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
//json格式对象调度器
@Bean
public JacksonJsonObjectMarshaller<User> objectMarshaller(){
JacksonJsonObjectMarshaller marshaller = new JacksonJsonObjectMarshaller();
return marshaller;
}
@Bean
public JsonFileItemWriter<User> itemWriter(){
return new JsonFileItemWriterBuilder<User>()
.name("jsonUserItemWriter")
.resource(new PathResource("c:/outUser.json"))
.jsonObjectMarshaller(objectMarshaller())
.build();
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("json-writer-job")
.start(step())
.build();
}
}
输出文件:“c:/outUser.json”
[
{"id":1,"name":"dafei","age":18},
{"id":2,"name":"xiaofei","age":16},
{"id":3,"name":"laofei","age":20},
{"id":4,"name":"zhongfei","age":19},
{"id":5,"name":"feifei","age":15}
]
数据库输出
定义操作数据库预编译类
//写入数据库需要操作insert sql, 使用预编译就需要明确指定参数值
public class UserPreStatementSetter implements ItemPreparedStatementSetter<User> {
@Override
public void setValues(User item, PreparedStatement ps) throws SQLException {
ps.setLong(1, item.getId());
ps.setString(2, item.getName());
ps.setInt(3, item.getAge());
}
}
写入数据库
@Configuration
public class JdbcWriteJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
//操作数据库预编译类
@Bean
public UserPreStatementSetter preStatementSetter(){
return new UserPreStatementSetter();
}
@Bean
public JdbcBatchItemWriter<User> itemWriter(){
return new JdbcBatchItemWriterBuilder<User>()
//数据源
.dataSource(dataSource)
//准备sql语句
.sql("insert into user(id, name, age) values(?,?,?)")
//准备参数绑定器
.itemPreparedStatementSetter(preStatementSetter())
.build();
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(itemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("jdbc-writer-job")
.start(step())
.build();
}
}
执行结果
多终端输出
平面txt + json +数据库 --> 输出
//组合输出
@Configuration
public class CompositeWriteJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
public DataSource dataSource;
@Bean
public FlatFileItemReader<User> userItemReader(){
return new FlatFileItemReaderBuilder<User>()
.name("userItemReader")
.resource(new ClassPathResource("users.txt"))
.delimited().delimiter("#")
.names("id", "name", "age")
.targetType(User.class)
.build();
}
//平面txt输出
@Bean
public FlatFileItemWriter<User> flatFileItemWriter(){
return new FlatFileItemWriterBuilder<User>()
.name("userItemWriter")
.resource(new PathResource("c:/outUser.txt"))
.formatted() //数据格式指定
.format("id: %s,姓名:%s,年龄:%s") //输出数据格式
.names("id", "name", "age") //需要输出属性
.build();
}
//json输出
@Bean
public JacksonJsonObjectMarshaller<User> objectMarshaller(){
JacksonJsonObjectMarshaller marshaller = new JacksonJsonObjectMarshaller();
return marshaller;
}
@Bean
public JsonFileItemWriter<User> jsonFileItemWriter(){
return new JsonFileItemWriterBuilder<User>()
.name("jsonUserItemWriter")
.resource(new PathResource("c:/outUser.json"))
.jsonObjectMarshaller(objectMarshaller())
.build();
}
//数据库输出
@Bean
public UserPreStatementSetter preStatementSetter(){
return new UserPreStatementSetter();
}
@Bean
public JdbcBatchItemWriter<User> jdbcBatchItemWriter(){
return new JdbcBatchItemWriterBuilder<User>()
.dataSource(dataSource)
.sql("insert into user(id, name, age) values(?,?,?)")
.itemPreparedStatementSetter(preStatementSetter())
.build();
}
//组合三种输出
@Bean
public CompositeItemWriter<User> compositeItemWriter(){
return new CompositeItemWriterBuilder<User>()
.delegates(Arrays.asList(flatFileItemWriter(), jsonFileItemWriter(), jdbcBatchItemWriter()))
.build();
}
@Bean
public Step step(){
return stepBuilderFactory.get("step1")
.<User, User>chunk(1)
.reader(userItemReader())
.writer(compositeItemWriter())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("composite-writer-job")
.start(step())
.build();
}
}