引言
接着上篇:Spring Batch 批处理-作业参数校验,了解作业参数校验后,本篇就来了解一下Spirng Batch 作业增量参数。
作业增量参数
不知道大家发现了没有,每次运行作业时,都改动作业名字,或者改动作业的参数,原因是作业启动有限制:相同标识参数与相同作业名的作业,只能成功运行一次。那如果想每次启动,又不想改动标识参数跟作业名怎么办呢?答案是:使用JobParametersIncrementer (作业参数增量器)
看下源码,了解一下原理
public interface JobParametersIncrementer {
JobParameters getNext(@Nullable JobParameters parameters);
}
JobParametersIncrementer 增量器是一个接口,里面只有getNext方法,参数是JobParameters 返回值也是JobParameters。通过这个getNext方法,在作业启动时我们可以给JobParameters 添加或者修改参数。简单理解就是让标识参数每次都变动
作业递增run.id参数
.Spring Batch 提供一个run.id自增参数增量器:RunIdIncrementer,每次启动时,里面维护名为run.id 标识参数,每次启动让其自增 1。
看下源码:
public class RunIdIncrementer implements JobParametersIncrementer {
private static String RUN_ID_KEY = "run.id";
private String key = RUN_ID_KEY;
public void setKey(String key) {
this.key = key;
}
@Override
public JobParameters getNext(@Nullable JobParameters parameters) {
JobParameters params = (parameters == null) ? new JobParameters() : parameters;
JobParameter runIdParameter = params.getParameters().get(this.key);
long id = 1;
if (runIdParameter != null) {
try {
id = Long.parseLong(runIdParameter.getValue().toString()) + 1;
}
catch (NumberFormatException exception) {
throw new IllegalArgumentException("Invalid value for parameter "
+ this.key, exception);
}
}
return new JobParametersBuilder(params).addLong(this.key, id).toJobParameters();
}
}
核心getNext方法,在JobParameters 对象维护一个run.id,每次作业启动时,都调用getNext方法获取JobParameters,保证其 run.id 参数能自增1
具体用法:
package com.langfeiyes.batch._04_param_incr;
import com.langfeiyes.batch._03_param_validator.NameParamValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.CompositeJobParametersValidator;
import org.springframework.batch.core.job.DefaultJobParametersValidator;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.Arrays;
import java.util.Map;
@SpringBootApplication
@EnableBatchProcessing
public class IncrementParamJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Map<String, Object> parameters = chunkContext.getStepContext().getJobParameters();
System.out.println("params---run.id:" + parameters.get("run.id"));
return RepeatStatus.FINISHED;
}
};
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("incr-params-job")
.start(step1())
.incrementer(new RunIdIncrementer()) //参数增量器(run.id自增)
.build();
}
public static void main(String[] args) {
SpringApplication.run(IncrementParamJob.class, args);
}
}
修改tasklet()方法,获取run.id参数,修改job实例方法,加上.incrementer(new RunIdIncrementer()) ,保证参数能自增。
连续执行3次,观察:batch_job_execution_params 表
其中的run.id参数值一直增加,其中再多遍也没啥问题。
作业时间戳参数
run.id 作为标识参数貌似没有具体业务意义,如果将时间戳作为标识参数那就不一样了,比如这种运用场景:每日任务批处理,这时就需要记录每天的执行时间了。那该怎么实现呢?
Spring Batch 中没有现成时间戳增量器,需要自己定义
//时间戳作业参数增量器
public class DailyTimestampParamIncrementer implements JobParametersIncrementer {
@Override
public JobParameters getNext(JobParameters parameters) {
return new JobParametersBuilder(parameters)
.addLong("daily", new Date().getTime()) //添加时间戳
.toJobParameters();
}
}
定义一个标识参数:daily,记录当前时间戳
package com.langfeiyes.batch._04_param_incr;
import com.langfeiyes.batch._03_param_validator.NameParamValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.CompositeJobParametersValidator;
import org.springframework.batch.core.job.DefaultJobParametersValidator;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.Arrays;
import java.util.Map;
@SpringBootApplication
@EnableBatchProcessing
public class IncrementParamJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Tasklet tasklet(){
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Map<String, Object> parameters = chunkContext.getStepContext().getJobParameters();
System.out.println("params---daily:" + parameters.get("daily"));
return RepeatStatus.FINISHED;
}
};
}
//时间戳增量器
@Bean
public DailyTimestampParamIncrementer dailyTimestampParamIncrementer(){
return new DailyTimestampParamIncrementer();
}
@Bean
public Step step1(){
return stepBuilderFactory.get("step1")
.tasklet(tasklet())
.build();
}
@Bean
public Job job(){
return jobBuilderFactory.get("incr-params-job")
.start(step1())
//.incrementer(new RunIdIncrementer()) //参数增量器(run.id自增)
.incrementer(dailyTimestampParamIncrementer()) //时间戳增量器
.build();
}
public static void main(String[] args) {
SpringApplication.run(IncrementParamJob.class, args);
}
}
定义实例方法dailyTimestampParamIncrementer()将自定义时间戳增量器添加Spring容器中,修改job()实例方法,添加.incrementer(dailyTimestampParamIncrementer()) 增量器,修改tasklet() 方法,获取 daily参数。
连续执行3次,查看batch_job_execution_params 表
很明显可以看出daily在变化,而run.id 没有动,是3,为啥?因为.incrementer(new RunIdIncrementer()) 被注释掉了。