一:Job嵌套
Job之前也可以嵌套,比如一个父Job封装多个已经存在的子Job。
@Configuration
public class ChildrenJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job childJob1() {
return jobBuilderFactory.get("childJob1")
.start(childJob1Step())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step childJob1Step() {
return stepBuilderFactory.get("childJob1Step")
.tasklet(childJob1StepTasklet())
.build();
}
@Bean
public Tasklet childJob1StepTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " childJob1StepTasklet");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Job childJob2() {
return jobBuilderFactory.get("childJob2")
.start(childJob2Step2())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step childJob2Step2() {
return stepBuilderFactory.get("childJob2Step2")
.tasklet(childJob2Step2Tasklet2())
.build();
}
@Bean
public Tasklet childJob2Step2Tasklet2() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + " childJob2Step2Tasklet2");
return RepeatStatus.FINISHED;
}
};
}
}
@Configuration
public class ParentJobConfig {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private Job childJob1;
@Autowired
private Job childJob2;
@Bean
public Job parentJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return jobBuilderFactory.get("parentJob")
.start(parent1Step(jobRepository, transactionManager))
.next(parent2Step(jobRepository, transactionManager))
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step parent1Step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
// Job转Step: 将子Job封装成父Step
return new JobStepBuilder(new StepBuilder("parent1Step"))
.job(childJob1)
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(transactionManager)
.build();
}
@Bean
public Step parent2Step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
// Job转Step: 将子Job封装成父Step
return new JobStepBuilder(new StepBuilder("parent2Step"))
.job(childJob2)
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(transactionManager)
.build();
}
}
二:if else案例
案例:如果开始步骤成功了就执行成功步骤,否则执行失败步骤。
// 伪代码
String exitStatus = helloWorldJob();
if("FAILED".equals(exitStatus)){
failStep();
}else{
successStep();
}
@Configuration
public class HelloWorldJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job helloWorldJob() {
// 执行helloWorldStep,如果成功执行successStep,否则执行failStep(helloWorldStep抛异常时算失败)
// on()表示条判断
return jobBuilderFactory.get("helloWorldJob")
.start(helloWorldStep()).on("FAILED").to(failStep())
.from(helloWorldStep()).on("*").to(successStep())
.end()
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step helloWorldStep() {
return stepBuilderFactory.get("helloWorldStep")
.tasklet(helloWorldTasklet())
.build();
}
@Bean
public Step successStep() {
return stepBuilderFactory.get("successStep")
.tasklet(successTasklet())
.build();
}
@Bean
public Step failStep() {
return stepBuilderFactory.get("failStep")
.tasklet(failTasklet())
.build();
}
@Bean
public Tasklet helloWorldTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("helloWorldTasklet");
int a = 1/0;
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet successTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("successTasklet");
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet failTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("failTasklet");
return RepeatStatus.FINISHED;
}
};
}
}
- start(Step) 后面跟on(), on表示start()返回的退出状态
ExitStatus
,如果equals就执行to(Step)的逻辑。 - 后面都是从from(Step)开始,参数表示用那个步骤的返回值与后面的on(String)做比较,eqauls为true执行to(Step)逻辑。* 表示通配符,表示else逻辑。
三:自定义on()值
- start(Step) 后面跟next(jobExecutionDecider())。
- from都是取作业执行决策器jobExecutionDecider返回的值与on做比较。
@Bean
public Job helloWorldJob() {
return jobBuilderFactory.get("helloWorldJob")
.start(helloWorldStep())
.next(jobExecutionDecider())
.from(jobExecutionDecider()).on("OK").to(successStep())
.from(jobExecutionDecider()).on("*").to(failStep())
.end()
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public StepExecutionListener stepExecutionListener() {
return new HelloWorldStepListener();
}
@Bean
public Tasklet helloWorldTasklet() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("helloWorldTasklet");
// 不支持put chunkContext.getStepContext().getJobExecutionContext().put("code", "500");
// 500 -> ERROR -> failStep
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
executionContext.put("code", "500");
return RepeatStatus.FINISHED;
}
};
}
public class MyJobExecutionDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String code = jobExecution.getExecutionContext().getString("code");
if ("200".equals(code)) {
return new FlowExecutionStatus("OK");
} else if ("404".equals(code)) {
return new FlowExecutionStatus("NotFound");
}
return new FlowExecutionStatus("ERROR");
}
}
四:退出状态
4.1 退出状态
public class ExitStatus implements Serializable, Comparable<ExitStatus> {
//未知状态
public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");
//执行中
public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");
//执行完成
public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");
//无效执行
public static final ExitStatus NOOP = new ExitStatus("NOOP");
//执行失败
public static final ExitStatus FAILED = new ExitStatus("FAILED");
//编程方式主动停止
public static final ExitStatus STOPPED = new ExitStatus("STOPPED");
}
一般来说,作业启动之后,这些状态皆为流程自行控制。
- 顺利结束返回:COMPLETED
- 异常结束返回:FAILED
- 无效执行返回:NOOP
- 编程结束:STOPED
但是也可以通过api强制改变状态:
- end():作业流程直接成功结束,返回状态为:COMPLETED
- fail():作业流程直接失败结束,返回状态为:FAILED
- stopAndRestart(step) :作业流程中断结束,返回状态:STOPPED 再次启动时,从step位置开始执行 (注意:前提是参数与Job Name一样)
//如果helloWorldStep 执行成功:下一步执行successStep 否则是failStep
@Bean
public Job job(){
return jobBuilderFactory.get("helloWorldJob")
.start(helloWorldStep())
//表示将当前本应该是失败结束的步骤直接转成正常结束--COMPLETED
//.on("FAILED").end()
//表示将当前本应该是失败结束的步骤直接转成失败结束:FAILED
//.on("FAILED").fail()
//表示将当前本应该是失败结束的步骤直接转成停止结束:STOPPED 里面参数表示后续要重启时, 从successStep位置开始
.on("FAILED").stopAndRestart(successStep())
.from(firstStep()).on("*").to(successStep())
.end()
.incrementer(new RunIdIncrementer())
.build();
}
4.2 停止作业
方式一: ExitStatus.STOPPED
@Configuration
public class HelloWorldStopJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job helloWorldStopJob() {
return jobBuilderFactory.get("helloWorldStopJob")
.start(step1()).on("STOPPED").stopAndRestart(step1())
.from(step1()).on("*").to(step2())
.end()
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
.listener(stepExecutionListener())
// 允许执行完后,运行重启
.allowStartIfComplete(true)
.build();
}
@Bean
public StepExecutionListener stepExecutionListener() {
return new StopStepListener();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet(tasklet2())
.build();
}
int i = -1;
@Bean
public Tasklet tasklet1() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tasklet1");
if (i < 0) {
ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
executionContext.put("flag", "stop");
}
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Tasklet tasklet2() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tasklet2");
return RepeatStatus.FINISHED;
}
};
}
}
public class StopStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("beforeStep");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
Object flag = stepExecution.getExecutionContext().get("flag");
if ("stop".equals(flag)) {
return ExitStatus.STOPPED;
}
return stepExecution.getExitStatus();
}
}
Step1正常的RepeatStatus.FINISHED了所以status=COMPLETED,但是实际退出编码是STOPPED。
方式二:setTerminateOnly()
StepExecution#setTerminateOnly() 给运行中的stepExecution设置停止标记,Spring Batch 识别后直接停止步骤,进而停止流程。 这种方式更简单。
@Bean
public Job helloWorldStopJob() {
return jobBuilderFactory.get("helloWorldStopJob")
.start(step1())
.next(step2())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(tasklet1())
// 允许执行完后,运行重启
.allowStartIfComplete(true)
.build();
}
int i = -1;
@Bean
public Tasklet tasklet1() {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("tasklet1");
if (i < 0) {
// 终止后
chunkContext.getStepContext().getStepExecution().setTerminateOnly();
}
return RepeatStatus.FINISHED;
}
};
}
== 注意:使用监听器返回ExitStatus.STOPPED和使StepExecution().setTerminateOnly() 在BATCH_STEP_EXECUTION中的status字段值不一样,前者是COMPLETED后者是STOPPED。==
五:作业重启
一般情况下当某个作业发生异常或者终止状态时可以重启作业。
preventRestart()
:设置作业禁止重启,重启报错:JobInstance already exists and is not restartable。
@Bean
public Job helloWorldStopJob() {
return jobBuilderFactory.get("helloWorldStopJob")
.preventRestart()
.start(step1())
.next(step2())
.incrementer(new RunIdIncrementer())
.build();
}
- 设置步骤最多重启次数:
startLimit(num)
,有些步骤可能是因为网络等问题可以通过重试解决,但如果重启一定次数后还没有解决就不允许无限的去重试。超过最大次数报错:Maximum start limit exceeded for step: step1 StartMax:3
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.startLimit(3)
.tasklet(tasklet1())
.build();
}
- 设置步骤运行重启
allowStartIfComplete(true)
,这样在执行步骤时就不是NOOP了。
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.allowStartIfComplete(true)
.tasklet(tasklet1())
.build();
}