概述
官网,GitHub
A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.
执行流程
实战
假设有个待处理的任务,如文件batch-test.csv
:
1,TODO
2,TODO
3,TODO
Maven引入依赖:
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
</dependency>
测试类:
private static final String CSV_FILE = "batch-test.csv";
@Resource
private JobLauncher jobLauncher;
@Resource
private JobRepository jobRepository;
@Resource
private PlatformTransactionManager manager;
@Test
public static void test() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
// 创建reader
FlatFileItemReader<TodoItem> reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource(CSV_FILE));
reader.setLineMapper(new TestLineMapper());
// 创建processor
TestItemProcessor processor = new TestItemProcessor();
// 创建writer
FlatFileItemWriter<TodoItem> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource(CSV_FILE));
writer.setLineAggregator(new TestLineAggregator());
// 创建Step
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, manager);
Step step = stepBuilderFactory.get("step")
.<TodoItem, TodoItem>chunk(1)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
// 创建Job
JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository);
Job job = jobBuilderFactory.get("job")
.start(step)
.build();
// 启动任务
jobLauncher.run(job, new JobParameters());
}
POJO实体类:
@Data
public class TodoItem {
private String id;
private String status;
}
其中处理器如下:
public class TestItemProcessor implements ItemProcessor<TodoItem, TodoItem> {
@Override
public TodoItem process(TodoItem item) {
// 模拟任务处理逻辑
log.info("doing item, id=" + item.getId());
item.setStatus("DONE");
return item;
}
}
行(记录)聚合器如下:
public class TestLineAggregator implements LineAggregator<TodoItem> {
@Override
public String aggregate(TodoItem item) {
return item.getId() + "," + item.getStatus();
}
}
行映射器如下:
public class TestLineMapper implements LineMapper<TodoItem> {
@Override
public TodoItem mapLine(String line, int lineNumber) {
String[] args = line.split(",");
TodoItem item = new TodoItem();
item.setId(args[0]);
item.setStatus(args[1]);
return item;
}
}
方法执行成功时,console控制台输出:
doing item, id=1
doing item, id=2
doing item, id=3
另外batch-test.csv
会变成:
1,DONE
2,DONE
3,DONE
概念
Job
Job和Step是Spring Batch执行批处理任务最为核心的两个概念。Job是一个封装整个批处理过程的一个概念。Job在Spring Batch的体系当中是最顶层的抽象概念,是最上层接口:
/**
* 重启策略应用于整个作业,而不是某个步骤
*/
public interface Job {
String getName();
default boolean isRestartable() {
return true;
}
void execute(JobExecution execution);
@Nullable
default JobParametersIncrementer getJobParametersIncrementer() {
return null;
}
default JobParametersValidator getJobParametersValidator() {
return new DefaultJobParametersValidator();
}
}
定义有五个方法,实现类主要有两种类型的Job,SimpleJob,FlowJob。除Job外还有JobInstance、JobExecution两个更加底层的抽象。
Job是运行的基本单位,内部由Step组成,本质上可看成Step的容器。一个Job可按照指定的逻辑顺序组合Step,并提供给所有Step设置相同属性的方法,如一些事件监听,跳过策略。
Spring Batch以SimpleJob类的形式提供Job接口的默认简单实现,在Job之上创建一些标准功能:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad()).next(gameLoad())
.next(playerSummarization()).end().build();
}
JobInstance
源码如下:
public class JobInstance extends Entity {
private final String jobName;
}
其中Entity如下:
public class Entity implements Serializable {
private Long id;
private volatile Integer version;
}
JobInstance指的是Job运行当中,作业执行过程当中的概念。Instance即实例。
JobParameters
同一个Job每天运行一次的话,则每天都有一个JobIntsance,但他们的Job定义都是一样的,怎么来区别一个Job的不同JobIntsance?
Spring Batch提供JobParameters用来唯一标识JobIntsance。JobParameters对象包含一组用于启动批处理作业的参数,它可以在运行期间用于识别或甚至用作参考数据。JobInstance=Job+JobParameters。
JobExecution指的是单次尝试运行一个定义好的Job的代码层面的概念。Job的一次执行可能失败或成功。只有当执行成功完成时,给定的与执行相对应的JobInstance才也被视为完成。
JobParameters源码如下:
public class JobParameters implements Serializable {
private final Map<String, JobParameter<?>> parameters;
}
可知,JobParameters是JobParameter的Map集合。而JobParameter是一个泛型类:
public class JobParameter<T> implements Serializable {
private final T value;
private final Class<T> type;
private final boolean identifying;
}
JobExecution
public class JobExecution extends Entity {
private final JobParameters jobParameters;
private JobInstance jobInstance;
private volatile Collection<StepExecution> stepExecutions;
private volatile BatchStatus status;
private volatile LocalDateTime startTime;
private volatile LocalDateTime createTime;
private volatile LocalDateTime endTime;
private volatile LocalDateTime lastUpdated;
private volatile ExitStatus exitStatus;
private volatile ExecutionContext executionContext;
private transient volatile List<Throwable> failureExceptions;
}
JobExecution当中提供getStatus方法用于获取一个Job某一次特地执行的一个状态。BatchStatus代表Job枚举状态:
public enum BatchStatus {
COMPLETED,
STARTING,
STARTED,
STOPPING,
STOPPED,
FAILED,
ABANDONED,
UNKNOWN;
}
这些属性对于一个Job的执行来说是非常关键的信息,且Spring Batch会将其持久到数据库当中,用于存储JobExecution的表为batch_job_execution
。
Step
每一个Step对象都封装批处理作业的一个独立阶段。事实上每一个Job本质上都是由一个或多个步骤组成。每一个Step包含定义和控制实际批处理所需的所有信息。任何特定的内容都由编写Job的开发人员自行决定。
一个Step可非常简单也可非常复杂。如,一个Step的功能是将文件中的数据加载到数据库中,基于现在Spring Batch的支持则几乎不需要写代码。更复杂的Step可能具有复杂的业务逻辑,这些逻辑作为处理的一部分。
与Job一样,Step具有与JobExecution类似的StepExecution。
StepExecution
StepExecution表示一次执行Step,每次运行一个Step时都会创建一个新的StepExecution,类似于JobExecution。但是,某个步骤可能由于其之前的步骤失败而无法执行。且仅当Step实际启动时才会创建StepExecution。
一次Step执行的实例由StepExecution类的对象表示。每个StepExecution都包含对其相应步骤的引用以及JobExecution和事务相关的数据,例如提交和回滚计数以及开始和结束时间。
此外,每个步骤执行都包含一个ExecutionContext,其中包含开发人员需要在批处理运行中保留的任何数据,例如重新启动所需的统计信息或状态信息。下面是一个从数据库当中截图的实例:
ExecutionContext
ExecutionContext即每一个StepExecution的执行环境。它包含一系列的键值对。获取ExecutionContext:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
JobRepository
JobRepository用于存储任务执行的状态信息,比如什么时间点执行什么任务、任务执行结果如何等。提供2种实现,一种是通过Map形式保存在内存中,程序重启后任务信息丢失,在分布式下无法获取其他节点的任务执行情况;另一种是保存在数据库中,将数据保存在下面6张表里:
- BATCH_JOB_INSTANCE
- BATCH_JOB_EXECUTION_PARAMS
- BATCH_JOB_EXECUTION
- BATCH_STEP_EXECUTION
- BATCH_JOB_EXECUTION_CONTEXT
- BATCH_STEP_EXECUTION_CONTEXT
JobRepository是一个用于将上述Job,Step等概念进行持久化的一个类。它同时给Job和Step以及下文会提到的JobLauncher实现提供CRUD操作。首次启动Job时,将从repository中获取JobExecution,并且在执行批处理的过程中,StepExecution和JobExecution将被存储到repository当中。@EnableBatchProcessing注解可以为JobRepository提供自动配置。
支持主流数据库:DB2、Derby、H2、HSQLDB、MySQL、Oracle、PostgreSQL、SQLServer、Sybase。
源码:
public interface JobRepository {
default List<String> getJobNames() {
return Collections.emptyList();
}
default List<JobInstance> findJobInstancesByName(String jobName, int start, int count) {
return Collections.emptyList();
}
default List<JobExecution> findJobExecutions(JobInstance jobInstance) {
return Collections.emptyList();
}
boolean isJobInstanceExists(String jobName, JobParameters jobParameters);
JobInstance createJobInstance(String jobName, JobParameters jobParameters);
JobExecution createJobExecution(String jobName, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException;
void update(JobExecution jobExecution);
void add(StepExecution stepExecution);
void addAll(Collection<StepExecution> stepExecutions);
void update(StepExecution stepExecution);
void updateExecutionContext(StepExecution stepExecution);
void updateExecutionContext(JobExecution jobExecution);
@Nullable
default JobInstance getJobInstance(String jobName, JobParameters jobParameters) {
throw new UnsupportedOperationException();
}
@Nullable
StepExecution getLastStepExecution(JobInstance jobInstance, String stepName);
long getStepExecutionCount(JobInstance jobInstance, String stepName);
@Nullable
JobExecution getLastJobExecution(String jobName, JobParameters jobParameters);
default void deleteStepExecution(StepExecution stepExecution) {
throw new UnsupportedOperationException();
}
default void deleteJobExecution(JobExecution jobExecution) {
throw new UnsupportedOperationException();
}
default void deleteJobInstance(JobInstance jobInstance) {
throw new UnsupportedOperationException();
}
}
JobLauncher
用于启动指定JobParameters的Job:
@FunctionalInterface
public interface JobLauncher {
JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
run方法根据传入的Job以及jobparamaters从JobRepository获取一个JobExecution并执行Job。
Line、Item、Field
三个很重要的概念:
- Line:表示行,Spring Batch提供LineAggregator、LineMapper、LineTokenizer、LineCallbackHandler等接口;
- Item:表示记录,逻辑概念,可包含多行,Spring Batch提供ItemReader、ItemWriter、ItemProcessor等接口;
- Field:表示字段,Spring Batch提供FieldExtractor、FieldSet、FieldSetFactory等接口;
特性 | Line | Item | Field |
---|---|---|---|
定义层次 | 数据源中未解析的基础单元 | 逻辑层级,业务数据的领域对象 | 数据(字段/属性) |
关系 | Line是Item的输入 | 由Field组成一个Item | Item组成部分 |
ItemReader
public interface ItemReader<T> {
@Nullable
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
ItemReader是一个读数据的接口,为每一个Step提供数据输入。当ItemReader读完所有数据时,它会返回null来告诉后续操作数据已经读完。
Spring Batch提供很多实现类,支持各种读取数据源,包括各种类型数据库,文件,数据流等,几乎涵盖所有场景:
- FlatFileItemReader
- JdbcPagingItemReader
- JdbcCursorItemReader
FlatFileItemReader
实战例子即为FlatFileItemReader,用于读取表格文件,文本文件,有如下属性:
- setResource 指定文件资源的位置:通过ClassPathResource(类所在路径)或者FileSystemResource(文件系统所在路径)来指定要读取的文件
- setLineMapper 行映射:指定行与实体对象之间的映射关系,示例代码使用DefaultLineMapper
- seEncoding 读取编码格式,默认为‘iso-8859-1’
- setStrict 严格模式,输入文件不存在会抛出异常,阻断当前Job,默认为true
JdbcPagingItemReader
例子:
@Bean
public JdbcPagingItemReader itemReader(DataSource source, PagingQueryProvider provider) {
Map<String, Object> params = new HashMap<>();
params.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader").dataSource(source).queryProvider(provider)
.parameterValues(params).rowMapper(customerCreditMapper()).pageSize(1000).build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
JdbcPagingItemReader必须指定一个PagingQueryProvider,负责提供SQL查询语句来按分页返回数据。
JdbcCursorItemReader
例子:
private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName, String tenant) {
JdbcCursorItemReader<Map<String, Object>> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
reader.setSql("sql here");
reader.setRowMapper(new RowMapper());
return reader;
}
ItemWriter
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
写数据的抽象,为每一个Step提供数据写出的功能。单位是可以配置的,可以一次写一条、一个chunk的数据,ItemWriter对于读入的数据是不能做任何操作的。
提供多种ItemWriter接口的实现类,包括对文本文件、XML文件、数据库、JMS消息等写的处理,支持自定义writer功能。
Spring Batch提供很多实现类,支持各种读取数据源,包括各种类型数据库,文件,数据流等,几乎涵盖所有场景:
- FlatFileItemWriter
- StaxEventItemWriter
FlatFileItemWriter
StaxEventItemWriter
用于写到XML文件,需引入如下依赖,用法和FlatFileItemWriter类似:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
默认编码为utf-8
,其他属性:
- setRootTagName:设置根节点标签名称
- setMarshaller:指定对象与节点映射关系
ItemProcessor
处理操作,其接口:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
ItemProcessor主要负责数据的转换与处理,process方法的形参传入I类型的对象,通过处理后返回O型的对象。支持自定义业务代码来对数据进行处理。
ItemProcessor对项目的业务逻辑处理的一个抽象, 当ItemReader读取到一条记录之后,ItemWriter还未写入这条记录之前,可以借助temProcessor提供一个处理业务逻辑的功能,并对数据进行相应操作。如果在ItemProcessor发现一条数据不应该被写入,可以通过返回null来表示。ItemProcessor和ItemReader以及ItemWriter可以非常好的结合在一起工作,他们之间的数据传输也非常方便,直接使用即可。
进阶
注解
Spring Batch提供的注解:
- EnableBatchProcessing
- JobScope
- StepScope
- AfterChunk
- AfterChunkError
- AfterJob
- AfterProcess
- AfterRead
- AfterStep
- AfterWrite
- BeforeChunk
- BeforeJob
- BeforeProcess
- BeforeRead
- BeforeStep
- BeforeWrite
- OnProcessError
- OnReadError
- OnSkipInProcess
- OnSkipInRead
- OnSkipInWrite
- OnWriteError
异常
包括:
- UnexpectedJobExecutionException
- FatalStepExecutionException
- StartLimitExceededException
- NonTransientResourceException
- ItemWriterException
- ItemReaderException
- ItemStreamException
- ReaderNotOpenException
- UnexpectedInputException
- WriterNotOpenException
- WriteFailedException
- RepeatException
- FlushFailedException
- JobExecutionAlreadyRunningException
- JobRestartException
- JobInstanceAlreadyCompleteException
- JobParametersInvalidException
- UnsupportedOperationException
- 还有很多
每种异常的出错场景,问题根源,以及解决方法,有待进一步学习。
问题
如何默认不启动Job?
在使用Java config使用Spring Batch的Job时,如果不做任何配置,项目在启动时就会默认去跑定义好的批处理Job。如何让项目在启动时不自动去跑Job呢?在application.properties
中添加如下属性:spring.batch.job.enabled=false
在读数据时内存不够?
在使用Spring Batch做数据迁移时,发现在Job启动后,执行到一定时间点时就卡在一个地方不动,且log也不再打印,等待一段时间之后,得到如下错误:
红字的信息为:Resource exhaustion event:the JVM was unable to allocate memory from the heap.
项目发出一个资源耗尽的事件,告诉JVM无法再为堆分配内存?
造成这个错误的原因是:batch Job的reader是一次性拿回数据库里的所有数据,并没有进行分页,当这个数据量太大时,就会导致内存不够用。解决办法有两个:
- 调整reader读数据逻辑,按分页读取,但实现上会麻烦一些,且运行效率会下降
- 增大service内存
原则
构建批处理任务时,应考虑以下关键原则和注意事项:
- 批处理体系结构通常会影响体系结构
- 尽可能简化并避免在单批应用程序中构建复杂的逻辑结构
- 保持数据的处理和存储在物理上靠得很近(换句话说,将数据保存在处理过程中)
- 最大限度地减少系统资源的使用,尤其是IO,在internal memory中执行尽可能多的操作
- 分析SQL语句,查看应用程序IO,以确保避免不必要的物理IO,特别是,需要寻找以下四个常见缺陷:
- 当数据可以被读取一次并缓存或保存在工作存储中时,读取每个事务的数据
- 重新读取先前在同一事务中读取数据的事务的数据
- 导致不必要的表或索引扫描
- 未在SQL语句的WHERE子句中指定键值
- 在批处理运行中不要做两次一样的事情。例如,如果需要数据汇总以用于报告目的,则应该(如果可能)在最初处理数据时递增存储的总计,因此您的报告应用程序不必重新处理相同的数据
- 在批处理应用程序开始时分配足够的内存,以避免在此过程中进行耗时的重新分配
- 总是假设数据完整性最差。插入适当的检查和记录验证以维护数据完整性
- 尽可能实施校验和以进行内部验证。例如,对于一个文件里的数据应该有一个数据条数纪录,告诉文件中的记录总数以及关键字段的汇总
- 在具有真实数据量的类似生产环境中尽早计划和执行压力测试
- 在大批量系统中,数据备份可能具有挑战性,特别是如果系统以24-7在线的情况运行。数据库备份通常在在线设计中得到很好的处理,但文件备份应该被视为同样重要。如果系统依赖于文件,则文件备份过程不仅应该到位并记录在案,还应定期进行测试。
参考
- 批处理框架Spring Batch