文章目录
- 0.前言
- 1.参考文档
- 2.基础介绍
- 2.1. 核心组件
- 3.步骤
- 3.1. 引入依赖
- 3.2. 配置文件
- 3.3. 核心源码
- 4.示例项目
- 5.总结
0.前言
背景: 一直零散的使用着Spring Boot 的各种组件和特性,从未系统性的学习和总结,本次借着这个机会搞一波。共同学习,一起进步。哈哈
Spring Boot Starter Batch 是基于 Spring Batch 构建的批处理应用程序的开发套件,它为开发者提供了一种简化和便捷的方式来处理大规模数据处理任务。批处理应用程序通常涉及大量数据的读取、处理和写入,而 Spring Batch 提供了强大的功能和组件来管理和执行这些任务。
本文将带你深入探索 Spring Boot Starter Batch 的核心源码和机制, 将介绍作业配置类的创建和使用,以定义作业结构和配置步骤、读取器、处理器、写入器等组件。
你还将了解到作业监听器和步骤监听器的用法,以及如何处理错误和异常情况。通过深入研究 Spring Batch 的核心组件,你将对作业执行器、作业存储库和事务管理器有更深入的了解。
无论你是菜鸟还是大牛,都可以看看指点一番。让我们一起开始这个令人兴奋的探索之旅吧!
1.参考文档
-
《SpringBoot集成Spring Batch批处理框架入门案例实战》https://blog.51cto.com/u_15891990/5908727
以下是一些参考文档,可以帮助你更深入地了解 Spring Boot Starter Batch 的使用和内部机制: -
Spring Boot 官方文档提供了关于 Spring Boot Starter Batch 的详细介绍和使用方法。 可以了解如何使用 Spring Boot Starter Batch 来简化批处理应用程序的开发和部署,并深入了解自动配置和属性的使用。
官方文档链接:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#howto-batch-applications
2.基础介绍
Spring Boot Starter for Batch(spring-boot-starter-batch
)是一个用于在 Spring Boot 项目中实现批处理任务的启动器。它是基于 Spring Batch 框架构建的,通过自动配置功能简化了批处理作业的配置和依赖项管理。
Spring Batch 是一个功能强大的开源批处理框架,旨在处理大规模数据处理和批量任务。它提供了读取、处理和写入大量数据的机制,并支持事务管理、错误处理、跳过策略等功能。
使用 Spring Boot Starter for Batch 可以带来以下好处:
- 简化配置:Spring Boot Starter for Batch 利用 Spring Boot 的自动配置机制,减少了繁琐的配置步骤,使批处理任务的配置更加简单和直观。
- 依赖管理:启动器自动管理 Spring Batch 和其他相关依赖项的版本兼容性,避免了手动解决依赖冲突的问题。
- 快速启动:通过使用启动器,你可以快速搭建和启动一个批处理任务,无需手动配置各种依赖项和组件。
- 集成监控:Spring Boot Starter for Batch 与 Spring Boot Actuator 集成,提供了监控和管理批处理作业的端点,方便对作业进行跟踪和管理。
要使用 Spring Boot Starter for Batch,你只需在项目的构建文件中添加相应的依赖项,然后配置作业和步骤的读取器、处理器和写入器组件。启动器将自动加载所需的配置和组件,并提供简洁的编程模型来实现批处理任务。
2.1. 核心组件
当构建批处理作业时,Spring Batch 提供了以下核心组件,每个组件都有特定的功能和责任,通过配置和定义它们的行为,可以实现具体的批处理逻辑:
-
Job(作业):Job 是批处理的最高级别组件,表示一个完整的批处理作业。它由一个或多个步骤(Step)组成。Job 可以包含作业参数、监听器、错误处理策略等。
-
Step(步骤):Step 是批处理作业的基本处理单元,表示一个独立的处理步骤。每个步骤可以包含一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。步骤可以定义事务管理、错误处理、跳过策略等。
-
ItemReader(读取器):ItemReader 用于读取输入数据。它提供了不同的实现方式,如从文件、数据库、消息队列等读取数据。ItemReader 从源数据中逐条读取数据,并将其传递给 ItemProcessor 进行处理。
-
ItemProcessor(处理器):ItemProcessor 用于处理输入数据。它接收从 ItemReader 读取的数据,并进行自定义的业务逻辑处理,如数据转换、过滤、验证等。ItemProcessor 可以对输入数据进行任意的处理操作,并返回处理后的数据。
-
ItemWriter(写入器):ItemWriter 用于写入处理后的数据。它将 ItemProcessor 处理后的数据写入目标位置,如文件、数据库等。ItemWriter 提供了不同的实现方式,以适应不同的输出场景。
这些组件通过配置和组合,形成了一个完整的批处理作业。可以使用 Spring Batch 提供的注解和构建器来定义这些组件,并将它们组装成一个作业流程。例如,可以创建一个 Job,它包含一个或多个 Step,每个 Step 包含一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。通过定义这些组件的行为和属性,可以实现从输入数据读取、处理、写入输出数据的批处理逻辑。
此外,Spring Batch 还提供了其他辅助组件,如监听器(Listeners)、错误处理(Error Handling)、事务管理(Transaction Management)等,用于监控和管理批处理作业的执行过程和异常情况。这些组件可以通过配置和扩展来实现特定的需求和处理逻辑。
3.步骤
3.1. 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
3.2. 配置文件
可以通过配置文件来自定义和配置批处理作业的行为。常用的配置属性
# 指定要运行的作业的名称
spring.batch.job.names=job1,job2
# 是否启用 Spring Batch 的作业
spring.batch.job.enabled=true
# 是否在启动时初始化 Spring Batch 的数据库模式
spring.batch.initialize-schema=false
# 指定 Spring Batch 的数据库模式(Schema)名称
spring.batch.schema=classpath:org/springframework/batch/core/schema-*.sql
# 指定 Spring Batch 数据库表的前缀
spring.batch.table-prefix=BATCH_
# 配置数据源
spring.datasource.url=jdbc:mysql://localhost:3306/batchdb
spring.datasource.username=root
spring.datasource.password=secret
# 配置数据源连接池
spring.datasource.hikari.maximum-pool-size=10
# 配置事务管理器
spring.batch.transaction.manager-bean-name=myTransactionManager
# 配置作业监听器
spring.batch.job.job-listeners=myJobListener
# 配置步骤监听器
spring.batch.step.step-listeners=myStepListener
# 配置错误处理策略
spring.batch.job.error-handlers=skipExceptionHandler
# 配置作业参数
spring.batch.job.parameters.param1=value1
spring.batch.job.parameters.param2=value2
3.3. 核心源码
Spring Boot Starter Batch 是基于 Spring Batch 构建的批处理应用程序的开发套件,它简化了批处理作业的配置和部署。通过自动配置和注解扫描,Spring Boot Starter Batch 提供了简化和便捷的方式来开发和配置批处理应用程序。它集成了 Spring Batch 的核心功能,并提供了默认的配置,使得开发者可以更专注于业务逻辑的实现。
-
自动配置:Spring Boot Starter Batch 提供了自动配置类
BatchAutoConfiguration
,它在应用程序启动时自动配置 Spring Batch 相关的组件和属性。它会根据配置文件中的属性来初始化和配置批处理作业的运行环境。 -
注解扫描:自动配置类使用
@EnableBatchProcessing
注解来启用批处理功能,并自动扫描作业和步骤的配置类。 -
JobRepository:自动配置类会创建一个
JobRepository
实例,用于管理作业的元数据和状态。它使用 Spring Batch 提供的默认实现JobRepositoryFactoryBean
,并根据配置文件中的属性进行配置。 -
事务管理:自动配置类会创建一个
PlatformTransactionManager
实例,用于管理批处理作业的事务。它使用 Spring Batch 提供的默认实现DataSourceTransactionManager
,并使用配置文件中的数据源进行配置。 -
作业执行器:自动配置类会创建一个
JobLauncher
实例,用于启动和执行批处理作业。它使用 Spring Batch 提供的默认实现SimpleJobLauncher
,并使用配置文件中的作业存储库和事务管理器进行配置。
在Spring Boot 3.x
版本中SimpleJobLauncher
已经废弃。
需要使用TaskExecutorJobLauncher.
-
作业监听器:自动配置类会自动注册配置类中定义的作业监听器(
JobListener
),以便在作业执行的不同阶段触发相应的事件。 -
步骤监听器:自动配置类会自动注册配置类中定义的步骤监听器(
StepListener
),以便在步骤执行的不同阶段触发相应的事件。 -
作业配置类:开发者可以创建作业配置类来定义批处理作业的结构和行为。作业配置类使用
@Configuration
注解标记,并使用@EnableBatchProcessing
注解启用批处理功能。在作业配置类中,可以定义一个或多个批处理作业,并配置它们的步骤、读取器、处理器、写入器等组件。 -
作业执行:通过调用
JobLauncher
的run()
方法,并传入作业名称和作业参数,可以启动和执行批处理作业。JobLauncher
会根据作业配置类中的定义,按照步骤的顺序执行作业的各个阶段,并将数据从读取器传递给处理器和写入器。
4.示例项目
从一个包含学生成绩的 CSV 文件中读取数据,并根据一定的条件进行处理和分析。我们简化一下,实际业务肯定比较复杂,我们为了演示,简化一个版本方便大家理解。实现一个批处理任务,从
students.csv 文件中读取学生数据,将分数乘以 10 进行处理,并将处理后的学生数据写入到日志中。
- 创建一个名为
Student
的类
@Data
public class Student {
private String name;
private int score;
}
StudentItemReader
的类,实现 Spring Batch 的ItemReader
接口,用于从 CSV 文件中读取学生数据。
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;
public class StudentItemReader implements ItemReader<Student> {
private FlatFileItemReader<Student> reader;
public StudentItemReader() {
reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("students.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("name", "score");
BeanWrapperFieldSetMapper<Student> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(Student.class);
DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
reader.setLineMapper(lineMapper);
}
@Override
public Student read() throws Exception {
return reader.read();
}
}
- 创建一个名为
StudentItemProcessor
的类,实现 Spring Batch 的ItemProcessor
接口,用于处理学生数据。
import org.springframework.batch.item.ItemProcessor;
public class StudentItemProcessor implements ItemProcessor<Student, Student> {
@Override
public Student process(Student student) throws Exception {
// 在这里进行学生数据的处理和分析
// 这里只是简单地将分数乘以 10
student.setScore(student.getScore() * 10);
return student;
}
}
- 创建一个名为
StudentItemWriter
的类,实现 Spring Batch 的ItemWriter
接口,用于将处理后的学生数据写入到日志中。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;
public class StudentItemWriter implements ItemWriter<Student> {
private static final Logger LOGGER = LoggerFactory.getLogger(StudentItemWriter.class);
@Override
public void write(List<? extends Student> items) throws Exception {
for (Student student : items) {
LOGGER.info("Processed student: {}", student);
}
}
}
- 创建一个名为
BatchConfig
的配置类,用于配置批处理作业的相关组件。
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
// 创建 ItemReader:读取输入数据
@Bean
public ItemReader<Student> itemReader() {
return new StudentItemReader();
}
// 创建 ItemProcessor:处理输入数据
@Bean
public ItemProcessor<Student, Student> itemProcessor() {
return new StudentItemProcessor();
}
// 创建 ItemWriter:写入输出数据
@Bean
public ItemWriter<Student> itemWriter() {
return new StudentItemWriter();
}
// 创建 Step:定义处理步骤
@Bean
public Step step(ItemReader<Student> reader, ItemProcessor<Student, Student> processor,
ItemWriter<Student> writer) {
return stepBuilderFactory.get("step")
.<Student, Student>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
// 创建 Job:定义作业
@Bean
public Job job(Step step) {
return jobBuilderFactory.get("job")
.start(step)
.build();
}
}
- 创建一个名为
Application
的启动类,用于启动 Spring Boot 应用和运行批处理作业。
import org.springframework.batch.core.Job;
import org.springframework.batch.core```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
// 获取 job bean
Job job = context.getBean(Job.class);
try {
// 启动批处理作业
context.getBean(JobLauncher.class).run(job, new JobParameters());
} catch (Exception e) {
e.printStackTrace();
} finally {
context.close();
}
}
}
- 创建一个名为
students.csv
的 CSV 文件,包含学生的姓名和分数数据,例如:
姓名 | 分数 |
---|---|
张三 | 80 |
小明 | 75 |
小花 | 90 |
大卫 | 85 |
艾玛 | 92 |
弗兰克 | 78 |
格蕾丝 | 88 |
海伦 | 91 |
伊万 | 83 |
爱迪生 | 87 |
5.总结
使用 Spring Boot Starter Batch,开发者可以更专注于业务逻辑的实现,而不必过多关注底层的配置和管理细节。它提供了一种高效、可靠的方式来处理大规模数据处理任务,为批处理应用程序的开发带来了便利和灵活性。
Spring Boot Starter Batch 是基于 Spring Batch 的开发套件,为批处理应用程序提供了便捷的开发和配置方式。通过自动配置和注解扫描,它简化了批处理作业的搭建和部署过程。
在 Spring Boot Starter Batch 的核心源码中,关键的组件包括自动配置类、作业配置类、JobRepository、事务管理器、作业执行器等。自动配置类负责初始化和配置这些组件,并根据配置文件中的属性进行设置。
开发者可以创建作业配置类,通过定义作业结构和配置步骤、读取器、处理器、写入器等组件来定制批处理作业的行为。作业监听器和步骤监听器可以用来处理作业执行过程中的事件和异常情况。
大家好,我是冰点,今天的Spring Boot Starter Batch 全部内容就是这些。如果你有疑问或见解可以在评论区留言。