基本概念
Spring Batch是批处理框架。
作业(Job)是状态以及状态之间转换的集合。
作业里包含步骤(Spring Bean),每一个步骤解耦到独立的处理器中,并负责自己的数据,把所需的业务逻辑应用到数据上,然后把数据写入合适的位置。这样提供了灵活性、可维护性、伸缩性、可靠性。
又两类主要步骤:
步骤分类 | 简介 | 常用于 |
---|---|---|
基于Tasklet的步骤 | 相对简单,使用Tasklet实现,在事务范围内重复执行execute方法,直到execute方法让步骤停止 | 初始化、运行存储过程、发送通知等 |
基于块Chunk的步骤 | 更严格,最多又三个主要部分:ItemRead、ItemProcessor、ItemWriter(条目读写处理器) | 适于基于条目的处理(item-based procssing) |
批处理作业的接口
接口 | 说明 |
---|---|
org.springframework.batch.core.job | 作业,ApplicationContext里配置 |
org.springframework.batch.core.step | 配置的步骤 |
org.springframework.batch.core.step.tasklet | 策略接口,提供了在一次事务范围内执行逻辑的能力 |
org.springframework.batch.item.ItemReader<T> | 策略接口,提供步骤的输入 |
org.springframework.batch.item.ItemProcessor<I,O> | 业务逻辑、验证等应用于单个条目 |
org.springframework.batch.item.ItemWriter<T> | 策略接口,提供步骤的持久化 |
JobRepository是架构主要共享部分,负责维护作业的状态、各种处理指标,如开始时间、结束时间、状态、读写次数等,通常由RDB数据库支持。
JobLauncher组件负责作业的执行,调用execute方法,验证重启作业是否有效,决定如何执行作业(当前线程、线程池),验证参数等,与具体实现无关,开箱即用。
JobInstance是Spring Batch作业的一次逻辑执行,可由作业名称、逻辑执行提供的唯一参数集来标识。每次执行,不一定有JobInstance,除非执行标识变了。
JobExecution代表作业的一次物理执行,每次执行都会有新的JobExecution。
StepExecution是步骤的一次物理执行,一个JobExecution可以有多个StepExecution。
作业中的组件及其关系:
并行化
并行化有5种方式
方式 | 说明 |
---|---|
多线程的步骤 | 作业被配置为以块来处理工作,每个块被封装在自己的事务中。 |
并行地执行步骤 | 两个步骤之间没有任何关系 |
异步的ItemProcesser/ItemWriter | AsychonousItemProcessor作为修饰器,返回Future,最后Future列表传给AsynchonousItemWriter |
远程分块 | 跨JVM处理,主节点与远程工作者保持通信,工作者处理条目的自主写入还是发给主节点写入。JobRepository对分布式状态一无所知。 |
远程分取 | 不需要持久通信,每个工作者自包含,使用相同配置。 |
运行示例
先用Spring Initializr生成代码框架:
修改HelloWorldApplication.java
文件:
package com.xiaolong.helloworld;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@EnableBatchProcessing // 启动批处理基础设施
@SpringBootApplication // 启动自动配置,结合了@ComponentScan和@EnableAutoConfiguration的元注解
public class HelloWorldApplication {
@Autowired // 作业构建器
private JobBuilderFactory jobBuilderFactory;
@Autowired // 步骤构建器
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step() {
return this.stepBuilderFactory.get("step1")
.tasklet(new Tasklet() { // 内联方式实现tasklet
@Override
public RepeatStatus execute(
StepContribution contribution,
ChunkContext chunkContext) throws Exception {
System.out.println("Hello, World!");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(HelloWorldApplication.class, args);
}
}
代码主要分三部分:@EnableBatchProcessing
注解、JobBuilderFactory
注入、StepBuilderFactory
注入。
@EnableBatchProcessing
注解用来启动批处理的基础设施,大部分基础设施由Spring Bean
定义,不必再提供,其中包括:
JobRepository
:记录作业运行时状态。
JobLauncher
:启动作业。
JobExplorer
:使用JobRegistry
执行制度操作。
JobRegistry
:在使用特定的启动器实现时找到作业
PlatformTransactionManager
:在工作过程中处理事务。
JobBuilderFactory
:作业构建器。
StepBuilderFactory
:步骤构建器。
运行结果:
参考
https://docs.spring.io/spring-batch/docs/current/api/