一:HelloWorld
1.1 配置Job、Step、Tasklet
@Configuration
public class HelloWorldJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job helloWorldJob() {
return jobBuilderFactory.get("helloWorldJob")
.start(hellWorldStep())
.build();
}
@Bean
public Step hellWorldStep() {
return stepBuilderFactory.get("hellWorldStep")
.tasklet(hellWorldTasklet())
.build();
}
@Bean
public Tasklet hellWorldTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("hello world spring batch");
return RepeatStatus.FINISHED;
}
};
}
}
RepeatStatus枚举有两个值:
- FINISHED:表示完成,一般都是返回FINISHED。
- CONTINUABLE:继续,如果返回该值,会死循环的执行execute()方法。
1.2 手动触发作业Job
@RestController
@RequestMapping("/job")
public class JobController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("helloWorldJob")
private Job job;
@RequestMapping("/start")
public ExitStatus start() throws Exception {
JobExecution jobExecution = jobLauncher.run(job, new JobParameters());
return jobExecution.getExitStatus();
}
}
1.3 基础概念
JobLauncher
:作业启动器,启动作业的入口。对应的实现类为SimpleJobLauncher。Job
:作业,用于配置作业的相关配置,一个作业可以配置多个步骤,步骤之间是有序的。Step
:步骤,作业具体执行的业务逻辑,一个Job可以配置多个Step。步骤有两种实现方式:Tasklet方式
:所有作业逻辑写在一个方法中。Chunk方式
:将一个完整的作业逻辑根据作用拆分到三个方法中ItemReader
:负责从数据源中读数据(如从文件、数据库等)。ItemProcessor
:负责对读出来的数据进行非法校验和对数据进行加工。ItemWriter
:将数据写到某个目标中(如文件、数据库等)。
JobBuilderFactory
:作业构建起工厂,用于构建作业Job对象。- get(String name):设置作业名称。
- start(Step step):设置作业启动的
第一个
步骤。 - build():构建Job对象。
StepBuilderFactory
:作业构建器工厂,用于构造步骤Step对象。- get(String name):设置步骤名称。
- tasklet(Tasklet tasklet):设置Tasklet。
- build():构建Step对象。
Tasklet
:用来封装批处理具体的业务逻辑。- JobRepository:作业持久化,在执行作业的过程中用于操作spring batch相关的表,记录作业的相关状态等。
1.4 JobInstance作业实例和JobExecution作业执行
- exitCode=
COMPLETED
, 表示一个执行实例,往BATCH_JOB_INSTANCE
表中插入一条数据。 - 每执行一次作业(无论exitCode是什么)都会往
BATCH_JOB_EXECUTION
表中插入一条数据。 - 当exitCode=COMPLETED时,一个作业中配置了多少个步骤就往
BATCH_STEP_EXECUTION
表插入多少条数据。
1.5 第二次启动作业
注意:exitCode: COMPLETED,表示作业真正完成。一旦成功执行了这次作业,就不允许执行第二次,如果再执行会返回NOOP
,表示无效的作业。 如果想重复执行可以改一下jobName或者改一下jobParameters。
每执行一次(无论exitCode是什么值)就会往BATCH_JOB_EXECUTION表插入一条记录,所以第二次执行作业也会插入一条数据,但是status=COMPLETED
,exit_code=NOOP
,exit_code表示此次的退出状态,status表示整个作业状态。
1.6 Job、JobInstance、JobExecution关系
- 同一个job_name在BATCH_JOB_INSTANCE表中有多少条数据取决于 (job_name + job_key) 作为联合主键,job_key又依赖于JobParameters,同一个Job可以有不同的JobParameters,所以Job和JobInstance是多对多的关系。
- 每执行一次作业就会生成一条JobExecution(无论exitCode是什么),当作业执行成功时有1条JobExecution,当执行失败时可以重新执行,JobExecution会大于1条,所以JobInstance和JobExecution是多对多的关系。
二:多步骤
一个Job可以配置多个Step,多个步骤按照定义的先后顺序执行:
- start(Step):定义第一个步骤。
- next(Step):定义其它后续步骤。
@Bean
public Job helloWorldJob() {
return jobBuilderFactory.get("helloWorldJob")
// 第一个步骤
.start(hellWorldStep())
// 其它步骤
.next(hellWorldStep2())
.build();
}
@Bean
public Step hellWorldStep() {
return stepBuilderFactory.get("hellWorldStep")
.tasklet(hellWorldTasklet())
.build();
}
@Bean
public Tasklet hellWorldTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(" hello world spring batch");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step hellWorldStep2() {
return stepBuilderFactory.get("hellWorldStep2")
.tasklet(hellWorldTasklet2())
.build();
}
@Bean
public Tasklet hellWorldTasklet2() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
JobExecution jobExecution = stepContribution.getStepExecution().getJobExecution();
// STARTED
System.out.println(jobExecution.getStatus());
return RepeatStatus.FINISHED;
}
};
}
三:流式步骤 Flow
流式步骤是一系列有序子步骤的集合,将一个大的步骤拆分成多个有序子步骤。执行顺序:
- Step1: tasklet1
- Step2: Flow
- 2.1 Step21:tasklet21
- 2.2 Step22:tasklet22
- 2.3 Step23:tasklet23
- Step3: tasklet3
@Configuration
public class HelloWorldFlowJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job flowStepJob() {
return jobBuilderFactory.get("flowStepJob")
.start(step1())
.next(step2())
.next(step3())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.build();
}
@Bean
public Step step2() {
// 将Flow包装成Step
return stepBuilderFactory.get("step2")
.flow(flow2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<Flow>("flow2")
.start(step21())
.next(step22())
.next(step23())
.build();
}
@Bean
public Step step21() {
return stepBuilderFactory.get("step21")
.tasklet(tasklet21())
.build();
}
@Bean
public Step step22() {
return stepBuilderFactory.get("step22")
.tasklet(tasklet22())
.build();
}
@Bean
public Step step23() {
return stepBuilderFactory.get("step23")
.tasklet(tasklet23())
.build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3")
.tasklet(tasklet3())
.build();
}
@Bean
public Tasklet tasklet1() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tasklet1");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet21() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tasklet21");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet22() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tasklet22");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet23() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tasklet23");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet3() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tasklet3");
return RepeatStatus.FINISHED;
}
};
}
}
四:步骤并行执行
默认步骤之间是使用同一个线程串行执行的,先执行第一个步骤,第一个步骤执行完后再执行第二个步骤,第二个步骤执行完后再执行第三个步骤。通过 split(TaskExecutor executor)
可以将步骤放在不同的线程中并行执行。并行执行哪个步骤先执行是不确定的。
@Bean
public Job helloWorldJob() {
return jobBuilderFactory.get("helloWorldJob")
.start(hellWorldStep())
.incrementer(new RunIdIncrementer())
.split(new SimpleAsyncTaskExecutor())
.add(hellWorldFlow())
.end()
.build();
}
@Bean
public Step hellWorldStep() {
return stepBuilderFactory.get("hellWorldStep")
.tasklet(hellWorldTasklet())
.build();
}
@Bean
public Step hellWorldStep2() {
return stepBuilderFactory.get("hellWorldStep2")
.tasklet(hellWorldTasklet2())
.build();
}
@Bean
public Flow hellWorldFlow() {
return new FlowBuilder<Flow>("hellWorldFlow")
.start(hellWorldStep2())
.build();
}
@Bean
public Tasklet hellWorldTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " hellWorldTasklet");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet hellWorldTasklet2() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " hellWorldTasklet2");
return RepeatStatus.FINISHED;
}
};
}
并行执行,Step2比Step1先执行了。
五:作业执行监听器 JobExecutionListener
作业执行监听器用于作业开始前执行某些操作,作业结束时执行某些操作。
方式一:实现接口 JobExecutionListener
public class HelloWorldJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
jobExecution.getExecutionContext().put("begin", System.currentTimeMillis());
// Status=STARTED
System.err.println("beforeJob " + jobExecution.getStatus());
}
@Override
public void afterJob(JobExecution jobExecution) {
long begin = jobExecution.getExecutionContext().getLong("begin");
long total = System.currentTimeMillis() - begin;
// Status=COMPLETED
System.err.println("afterJob " + jobExecution.getStatus() + " total: "+ total);
}
}
方式二:注解 @BeforeJob + @AfterJob
public class HelloWorldJobAnnationListener {
@BeforeJob
public void beforeJob(JobExecution jobExecution) {
jobExecution.getExecutionContext().put("begin", System.currentTimeMillis());
// Status=STARTED
System.err.println("beforeJob " + jobExecution.getStatus());
}
@AfterJob
public void afterJob(JobExecution jobExecution) {
long begin = jobExecution.getExecutionContext().getLong("begin");
long total = System.currentTimeMillis() - begin;
// Status=COMPLETED
System.err.println("afterJob " + jobExecution.getStatus() + " total: "+ total);
}
}
@Bean
public Job helloWorldJob() {
return jobBuilderFactory.get("helloWorldJob")
.start(hellWorldStep())
// 配置监听器
.listener(jobExecutionListener())
.build();
}
@Bean
public JobExecutionListener jobExecutionListener() {
return new HelloWorldJobListener();
}
六:步骤执行监听器
可以通过实现StepExecutionListener接口实现,也可以通过注解方式实现(@BeforeStep、@AfterStep)。
public class HelloWorldStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("步骤监听器:步骤执行之前监听," + stepExecution.getStepName());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("步骤监听器:步骤执行之前监听," + stepExecution.getStepName());
return stepExecution.getExitStatus();
}
}
@Bean
public Step helloWorldStep() {
return stepBuilderFactory.get("helloWorldStep")
.tasklet(hellWorldTasklet())
// 配置步骤监听器
.listener(stepExecutionListener())
.build();
}
@Bean
public StepExecutionListener stepExecutionListener() {
return new HelloWorldStepListener();
}
七:Job嵌套
Job之间也可以嵌套,比如一个父Job封装多个已经存在的子Job。
@Configuration
public class ChildrenJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job childJob1() {
return jobBuilderFactory.get("childJob1")
.start(childJob1Step())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step childJob1Step() {
return stepBuilderFactory.get("childJob1Step")
.tasklet(childJob1StepTasklet())
.build();
}
@Bean
public Tasklet childJob1StepTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " childJob1StepTasklet");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Job childJob2() {
return jobBuilderFactory.get("childJob2")
.start(childJob2Step2())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step childJob2Step2() {
return stepBuilderFactory.get("childJob2Step2")
.tasklet(childJob2Step2Tasklet2())
.build();
}
@Bean
public Tasklet childJob2Step2Tasklet2() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " childJob2Step2Tasklet2");
return RepeatStatus.FINISHED;
}
};
}
}
@Configuration
public class ParentJobConfig {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private Job childJob1;
@Autowired
private Job childJob2;
@Bean
public Job parentJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return jobBuilderFactory.get("parentJob")
.start(parent1Step(jobRepository, transactionManager))
.next(parent2Step(jobRepository, transactionManager))
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step parent1Step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
// Job转Step: 将子Job封装成父Step
return new JobStepBuilder(new StepBuilder("parent1Step"))
.job(childJob1)
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(transactionManager)
.build();
}
@Bean
public Step parent2Step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
// Job转Step: 将子Job封装成父Step
return new JobStepBuilder(new StepBuilder("parent2Step"))
.job(childJob2)
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(transactionManager)
.build();
}
}
八:轻舟已过万重山
小时候,觉得忘记带作业是天大的事
高中的时候,觉得考不上大学是天大的事
恋爱的时候,觉得和喜欢的 人分开是天大的事
大学毕业时,觉得没有一个稳定的工作是天大的事
但现在回头看看
自己曾经难以跨过的山其实都已经跨过了
曾认为不能接受的,也都渐渐接受了
生活充满了选择
遗憾也不过是常态
失败也是贯穿生活始终的
其实人本身就是无论做什么选择都会后悔
只是总是习惯美化自己当初没有选择的另外一条路
可是大家都心知肚明,就算时间重来一次,以当时的心智和阅历还是会做出同样的选择
那么故事的结局可能就没那么重要了
我想人生就是一场享受过程的修行
与其后悔当初
不如回头看
轻舟已过万重山
向前看
前路漫漫亦灿灿