Spring Boot 知识集锦之Spring-Batch批处理组件详解

news2024/9/25 7:19:43

文章目录

  • 0.前言
  • 1.参考文档
  • 2.基础介绍
    • 2.1. 核心组件
  • 3.步骤
    • 3.1. 引入依赖
    • 3.2. 配置文件
    • 3.3. 核心源码
  • 4.示例项目
  • 5.总结

在这里插入图片描述

0.前言

背景: 一直零散的使用着Spring Boot 的各种组件和特性,从未系统性的学习和总结,本次借着这个机会搞一波。共同学习,一起进步。哈哈

在这里插入图片描述

Spring Boot Starter Batch 是基于 Spring Batch 构建的批处理应用程序的开发套件,它为开发者提供了一种简化和便捷的方式来处理大规模数据处理任务。批处理应用程序通常涉及大量数据的读取、处理和写入,而 Spring Batch 提供了强大的功能和组件来管理和执行这些任务。

本文将带你深入探索 Spring Boot Starter Batch 的核心源码和机制, 将介绍作业配置类的创建和使用,以定义作业结构和配置步骤、读取器、处理器、写入器等组件。

你还将了解到作业监听器和步骤监听器的用法,以及如何处理错误和异常情况。通过深入研究 Spring Batch 的核心组件,你将对作业执行器、作业存储库和事务管理器有更深入的了解。

无论你是菜鸟还是大牛,都可以看看指点一番。让我们一起开始这个令人兴奋的探索之旅吧!

1.参考文档

  1. 《SpringBoot集成Spring Batch批处理框架入门案例实战》https://blog.51cto.com/u_15891990/5908727
    以下是一些参考文档,可以帮助你更深入地了解 Spring Boot Starter Batch 的使用和内部机制:

  2. Spring Boot 官方文档提供了关于 Spring Boot Starter Batch 的详细介绍和使用方法。 可以了解如何使用 Spring Boot Starter Batch 来简化批处理应用程序的开发和部署,并深入了解自动配置和属性的使用。

    官方文档链接:https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#howto-batch-applications

2.基础介绍

Spring Boot Starter for Batch(spring-boot-starter-batch)是一个用于在 Spring Boot 项目中实现批处理任务的启动器。它是基于 Spring Batch 框架构建的,通过自动配置功能简化了批处理作业的配置和依赖项管理。

Spring Batch 是一个功能强大的开源批处理框架,旨在处理大规模数据处理和批量任务。它提供了读取、处理和写入大量数据的机制,并支持事务管理、错误处理、跳过策略等功能。

使用 Spring Boot Starter for Batch 可以带来以下好处:

  1. 简化配置:Spring Boot Starter for Batch 利用 Spring Boot 的自动配置机制,减少了繁琐的配置步骤,使批处理任务的配置更加简单和直观。
  2. 依赖管理:启动器自动管理 Spring Batch 和其他相关依赖项的版本兼容性,避免了手动解决依赖冲突的问题。
  3. 快速启动:通过使用启动器,你可以快速搭建和启动一个批处理任务,无需手动配置各种依赖项和组件。
  4. 集成监控:Spring Boot Starter for Batch 与 Spring Boot Actuator 集成,提供了监控和管理批处理作业的端点,方便对作业进行跟踪和管理。

要使用 Spring Boot Starter for Batch,你只需在项目的构建文件中添加相应的依赖项,然后配置作业和步骤的读取器、处理器和写入器组件。启动器将自动加载所需的配置和组件,并提供简洁的编程模型来实现批处理任务。

2.1. 核心组件

当构建批处理作业时,Spring Batch 提供了以下核心组件,每个组件都有特定的功能和责任,通过配置和定义它们的行为,可以实现具体的批处理逻辑:
在这里插入图片描述

  1. Job(作业):Job 是批处理的最高级别组件,表示一个完整的批处理作业。它由一个或多个步骤(Step)组成。Job 可以包含作业参数、监听器、错误处理策略等。

  2. Step(步骤):Step 是批处理作业的基本处理单元,表示一个独立的处理步骤。每个步骤可以包含一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。步骤可以定义事务管理、错误处理、跳过策略等。

  3. ItemReader(读取器):ItemReader 用于读取输入数据。它提供了不同的实现方式,如从文件、数据库、消息队列等读取数据。ItemReader 从源数据中逐条读取数据,并将其传递给 ItemProcessor 进行处理。

  4. ItemProcessor(处理器):ItemProcessor 用于处理输入数据。它接收从 ItemReader 读取的数据,并进行自定义的业务逻辑处理,如数据转换、过滤、验证等。ItemProcessor 可以对输入数据进行任意的处理操作,并返回处理后的数据。

  5. ItemWriter(写入器):ItemWriter 用于写入处理后的数据。它将 ItemProcessor 处理后的数据写入目标位置,如文件、数据库等。ItemWriter 提供了不同的实现方式,以适应不同的输出场景。

这些组件通过配置和组合,形成了一个完整的批处理作业。可以使用 Spring Batch 提供的注解和构建器来定义这些组件,并将它们组装成一个作业流程。例如,可以创建一个 Job,它包含一个或多个 Step,每个 Step 包含一个 ItemReader、一个 ItemProcessor 和一个 ItemWriter。通过定义这些组件的行为和属性,可以实现从输入数据读取、处理、写入输出数据的批处理逻辑。

此外,Spring Batch 还提供了其他辅助组件,如监听器(Listeners)、错误处理(Error Handling)、事务管理(Transaction Management)等,用于监控和管理批处理作业的执行过程和异常情况。这些组件可以通过配置和扩展来实现特定的需求和处理逻辑。

3.步骤

3.1. 引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>

3.2. 配置文件

可以通过配置文件来自定义和配置批处理作业的行为。常用的配置属性

# 指定要运行的作业的名称
spring.batch.job.names=job1,job2

# 是否启用 Spring Batch 的作业
spring.batch.job.enabled=true

# 是否在启动时初始化 Spring Batch 的数据库模式
spring.batch.initialize-schema=false

# 指定 Spring Batch 的数据库模式(Schema)名称
spring.batch.schema=classpath:org/springframework/batch/core/schema-*.sql

# 指定 Spring Batch 数据库表的前缀
spring.batch.table-prefix=BATCH_

# 配置数据源
spring.datasource.url=jdbc:mysql://localhost:3306/batchdb
spring.datasource.username=root
spring.datasource.password=secret

# 配置数据源连接池
spring.datasource.hikari.maximum-pool-size=10

# 配置事务管理器
spring.batch.transaction.manager-bean-name=myTransactionManager

# 配置作业监听器
spring.batch.job.job-listeners=myJobListener

# 配置步骤监听器
spring.batch.step.step-listeners=myStepListener

# 配置错误处理策略
spring.batch.job.error-handlers=skipExceptionHandler

# 配置作业参数
spring.batch.job.parameters.param1=value1
spring.batch.job.parameters.param2=value2

3.3. 核心源码

Spring Boot Starter Batch 是基于 Spring Batch 构建的批处理应用程序的开发套件,它简化了批处理作业的配置和部署。通过自动配置和注解扫描,Spring Boot Starter Batch 提供了简化和便捷的方式来开发和配置批处理应用程序。它集成了 Spring Batch 的核心功能,并提供了默认的配置,使得开发者可以更专注于业务逻辑的实现。

  1. 自动配置:Spring Boot Starter Batch 提供了自动配置类 BatchAutoConfiguration,它在应用程序启动时自动配置 Spring Batch 相关的组件和属性。它会根据配置文件中的属性来初始化和配置批处理作业的运行环境。

  2. 注解扫描:自动配置类使用 @EnableBatchProcessing 注解来启用批处理功能,并自动扫描作业和步骤的配置类。

  3. JobRepository:自动配置类会创建一个 JobRepository 实例,用于管理作业的元数据和状态。它使用 Spring Batch 提供的默认实现 JobRepositoryFactoryBean,并根据配置文件中的属性进行配置。

  4. 事务管理:自动配置类会创建一个 PlatformTransactionManager 实例,用于管理批处理作业的事务。它使用 Spring Batch 提供的默认实现 DataSourceTransactionManager,并使用配置文件中的数据源进行配置。

  5. 作业执行器:自动配置类会创建一个 JobLauncher 实例,用于启动和执行批处理作业。它使用 Spring Batch 提供的默认实现 SimpleJobLauncher,并使用配置文件中的作业存储库和事务管理器进行配置。
    Spring Boot 3.x版本中SimpleJobLauncher 已经废弃在这里插入图片描述
    需要使用 TaskExecutorJobLauncher.
    在这里插入图片描述

  6. 作业监听器:自动配置类会自动注册配置类中定义的作业监听器(JobListener),以便在作业执行的不同阶段触发相应的事件。

  7. 步骤监听器:自动配置类会自动注册配置类中定义的步骤监听器(StepListener),以便在步骤执行的不同阶段触发相应的事件。

  8. 作业配置类:开发者可以创建作业配置类来定义批处理作业的结构和行为。作业配置类使用 @Configuration 注解标记,并使用 @EnableBatchProcessing 注解启用批处理功能。在作业配置类中,可以定义一个或多个批处理作业,并配置它们的步骤、读取器、处理器、写入器等组件。

  9. 作业执行:通过调用 JobLauncherrun() 方法,并传入作业名称和作业参数,可以启动和执行批处理作业。JobLauncher 会根据作业配置类中的定义,按照步骤的顺序执行作业的各个阶段,并将数据从读取器传递给处理器和写入器。

4.示例项目

从一个包含学生成绩的 CSV 文件中读取数据,并根据一定的条件进行处理和分析。我们简化一下,实际业务肯定比较复杂,我们为了演示,简化一个版本方便大家理解。实现一个批处理任务,从 students.csv 文件中读取学生数据,将分数乘以 10 进行处理,并将处理后的学生数据写入到日志中。

  1. 创建一个名为 Student 的类
@Data
public class Student {
    private String name;
    private int score;
}
  1. StudentItemReader 的类,实现 Spring Batch 的 ItemReader 接口,用于从 CSV 文件中读取学生数据。
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;

public class StudentItemReader implements ItemReader<Student> {
    private FlatFileItemReader<Student> reader;

    public StudentItemReader() {
        reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("students.csv"));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("name", "score");

        BeanWrapperFieldSetMapper<Student> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(Student.class);

        DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper(fieldSetMapper);

        reader.setLineMapper(lineMapper);
    }

    @Override
    public Student read() throws Exception {
        return reader.read();
    }
}
  1. 创建一个名为 StudentItemProcessor 的类,实现 Spring Batch 的 ItemProcessor 接口,用于处理学生数据。
import org.springframework.batch.item.ItemProcessor;

public class StudentItemProcessor implements ItemProcessor<Student, Student> {

    @Override
    public Student process(Student student) throws Exception {
        // 在这里进行学生数据的处理和分析
        // 这里只是简单地将分数乘以 10
        student.setScore(student.getScore() * 10);

        return student;
    }
}
  1. 创建一个名为 StudentItemWriter 的类,实现 Spring Batch 的 ItemWriter 接口,用于将处理后的学生数据写入到日志中。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;

public class StudentItemWriter implements ItemWriter<Student> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StudentItemWriter.class);

    @Override
    public void write(List<? extends Student> items) throws Exception {
        for (Student student : items) {
            LOGGER.info("Processed student: {}", student);
        }
    }
}
  1. 创建一个名为 BatchConfig 的配置类,用于配置批处理作业的相关组件。
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 创建 ItemReader:读取输入数据
    @Bean
    public ItemReader<Student> itemReader() {
        return new StudentItemReader();
    }

    // 创建 ItemProcessor:处理输入数据
    @Bean
    public ItemProcessor<Student, Student> itemProcessor() {
        return new StudentItemProcessor();
    }

    // 创建 ItemWriter:写入输出数据
    @Bean
    public ItemWriter<Student> itemWriter() {
        return new StudentItemWriter();
    }

    // 创建 Step:定义处理步骤
    @Bean
    public Step step(ItemReader<Student> reader, ItemProcessor<Student, Student> processor,
                     ItemWriter<Student> writer) {
        return stepBuilderFactory.get("step")
                .<Student, Student>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    // 创建 Job:定义作业
    @Bean
    public Job job(Step step) {
        return jobBuilderFactory.get("job")
                .start(step)
                .build();
    }
}
  1. 创建一个名为 Application 的启动类,用于启动 Spring Boot 应用和运行批处理作业。
import org.springframework.batch.core.Job;
import org.springframework.batch.core```java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);

        // 获取 job bean
        Job job = context.getBean(Job.class);

        try {
            // 启动批处理作业
            context.getBean(JobLauncher.class).run(job, new JobParameters());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            context.close();
        }
    }
}
  1. 创建一个名为 students.csv 的 CSV 文件,包含学生的姓名和分数数据,例如:
姓名分数
张三80
小明75
小花90
大卫85
艾玛92
弗兰克78
格蕾丝88
海伦91
伊万83
爱迪生87

5.总结

使用 Spring Boot Starter Batch,开发者可以更专注于业务逻辑的实现,而不必过多关注底层的配置和管理细节。它提供了一种高效、可靠的方式来处理大规模数据处理任务,为批处理应用程序的开发带来了便利和灵活性。
Spring Boot Starter Batch 是基于 Spring Batch 的开发套件,为批处理应用程序提供了便捷的开发和配置方式。通过自动配置和注解扫描,它简化了批处理作业的搭建和部署过程。

在 Spring Boot Starter Batch 的核心源码中,关键的组件包括自动配置类、作业配置类、JobRepository、事务管理器、作业执行器等。自动配置类负责初始化和配置这些组件,并根据配置文件中的属性进行设置。

开发者可以创建作业配置类,通过定义作业结构和配置步骤、读取器、处理器、写入器等组件来定制批处理作业的行为。作业监听器和步骤监听器可以用来处理作业执行过程中的事件和异常情况。

在这里插入图片描述

大家好,我是冰点,今天的Spring Boot Starter Batch 全部内容就是这些。如果你有疑问或见解可以在评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/900069.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

驱动开发——字符设备

字符设备 Linux 将系统设备分为&#xff1a;字符设备、块设备、网络设备。工作原理 字符设备是 Linux 驱动中最基本的一类设备驱动&#xff0c;字符设备就是一个一个字节&#xff0c; 按照字节流进行读写操作的设备&#xff0c;读写数据是分先后顺序的。在Linux的世界里面一切…

第5章 性能分析方法

有时看到修改后程序的运行时间发生变化时&#xff0c;却不清楚具体原因是什么。单独的时间信息有时无法给出问题发生的根本原因。 程序运行时硬件和软件都可以采集性能数据&#xff0c;硬件是指运行程序的CPU&#xff0c;软件是指操作系统和所有可用于分析的工具。通常软件栈提…

设计模式篇---抽象工厂(包含优化)

文章目录 概念结构实例优化 概念 抽象工厂&#xff1a;提供一个创建一系列相关或相互依赖对象的接口&#xff0c;而无须指定它们具体的类。 工厂方法是有一个类型的产品&#xff0c;也就是只有一个产品的抽象类或接口&#xff0c;而抽象工厂相对于工厂方法来说&#xff0c;是有…

qsort函数详解

大家好&#xff0c;我是苏貝&#xff0c;本篇博客带大家了解qsort函数&#xff0c;如果你觉得我写的不错的话&#xff0c;可以给我一个赞&#x1f44d;吗&#xff0c;感谢❤️ 文章目录 一. qsort函数参数详解1.数组首元素地址base2.数组的元素个数num和元素所占内存空间大小w…

Xxl-job安装部署以及SpringBoot集成Xxl-job使用

1、安装Xxl-job&#xff1a; 可以使用docker拉取镜像部署和源码编译两种方式&#xff0c;这里选择源码编译安装。 代码拉取地址&#xff1a; https://github.com/xuxueli/xxl-job/tree/2.1.2 官方开发文档&#xff1a; https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%…

uni-app的Vue.js实现微信小程序的紧急事件登记页面功能

主要功能实现 完成发生时间选择功能&#xff0c;用户可以通过日期选择器选择事件发生的时间。实现事件类型选择功能&#xff0c;用户可以通过下拉选择框选择事件的类型。添加子养殖场编号输入框&#xff0c;用户可以输入与事件相关的子养殖场编号。完成事件描述输入功能&#…

C++笔记之条件变量(Condition Variable)与cv.wait 和 cv.wait_for的使用

C笔记之条件变量&#xff08;Condition Variable&#xff09;与cv.wait 和 cv.wait_for的使用 参考博客&#xff1a;C笔记之各种sleep方法总结 code review! 文章目录 C笔记之条件变量&#xff08;Condition Variable&#xff09;与cv.wait 和 cv.wait_for的使用1.条件变量&…

msf和cs联动

cs设置外部监听器 在vps上执行 vim /etc/ssh/sshd_config AllowTcpForwarding yes GatewayPorts yes TCPKeepAlive yes PasswordAuthentication yes systemctl restart sshd.service 在kali上执行&#xff0c;进行端口转发 ssh -C -f -N -g -R 0.0.0.0:1234:192.168.1.30:…

STM32开关输入控制220V灯泡亮灭源代码(附带PROTEUSd电路图)

//main.c文件 /* USER CODE BEGIN Header */ /********************************************************************************* file : main.c* brief : Main program body************************************************************************…

525. 连续数组

525. 连续数组 原题链接&#xff1a;完成情况&#xff1a;解题思路&#xff1a;参考代码&#xff1a; 原题链接&#xff1a; 525. 连续数组 https://leetcode.cn/problems/contiguous-array/description/ 完成情况&#xff1a; 解题思路&#xff1a; 参考代码&#xff1a; …

奇舞周刊第503期:图解串一串 webpack 的历史和核心功能

记得点击文章末尾的“ 阅读原文 ”查看哟~ 下面先一起看下本期周刊 摘要 吧~ 奇舞推荐 ■ ■ ■ 图解串一串 webpack 的历史和核心功能 提到打包工具&#xff0c;可能你会首先想到 webpack。那没有 webpack 之前&#xff0c;都是怎么打包的呢&#xff1f;webpack 都有哪些功能&…

喜讯?宁德时代首个零碳工厂成功建立,碳中和“任重道远”

8月19日消息&#xff0c;据宁德时代消息&#xff0c;广东瑞庆时代新能源科技有限公司宣布获得国际认可机构SGS颁发的PAS2060碳中和认证证书&#xff0c;从而正式成为一家零碳工厂。这标志着宁德时代首个储能电池为主的零碳工厂成功建立&#xff0c;也是公司继四川时代宜宾工厂之…

编程练习(3)

一.选择题 第一题&#xff1a; 函数传参的两个变量都是传的地址&#xff0c;而数组名c本身就是地址&#xff0c;int型变量b需要使用&符号&#xff0c;因此答案为A 第二题&#xff1a; 本题考察const修饰指针变量&#xff0c;答案为A,B,C,D 第三题&#xff1a; 注意int 型变…

速通蓝桥杯嵌入式省一教程:(五)用按键和屏幕实现嵌入式交互系统

一个完整的嵌入式系统&#xff0c;包括任务执行部分和人机交互部分。在前四节中&#xff0c;我们已经讲解了LED、LCD和按键&#xff0c;用这三者就能够实现一个人机交互系统&#xff0c;也即搭建整个嵌入式系统的框架。在后续&#xff0c;只要将各个功能加入到这个交互系统中&a…

赏味不足:详细来聊下轻资产运作,我从不做重资产

来源&#xff1a;BV1F84y1g7u3 好 大家好&#xff0c;是这样子的 对吧&#xff0c;因为最近聊了也很多&#xff0c;然后我在过程当中也发现&#xff0c;很多人对于不管是创业还是做副业&#xff0c;还是说找一条多种路&#xff0c;就是说可能不是那么清晰&#xff0c;然后我在这…

使用Dockerfile部署java项目

1、移动java包到创建的目录下 2、编写Dockerfile文件 在同一目录下使用如下命令创建文件 touch Dockerfile 文件内容如下&#xff1a; #依赖的父镜像 FROM java:8 #作者 MAINTAINER maxurui #jar包添加到镜像中 ADD springboot3-0.0.1-SNAPSHOT.jar springboot3-0.0.1-SNAPSHO…

射频同轴线阻抗

射频同轴线阻抗 射频同轴线的阻抗与线的绝缘介质的介电常数有关&#xff0c;与线的屏蔽层半径与内部导线半径的比值有关&#xff1a; R 0 1 2 π μ ′ ϵ ′ ln ⁡ ( r 2 r 1 ) \begin{align} R_0\frac{1}{2\pi}\sqrt{\frac{\mu^{}}{\epsilon^{}}}\ln(\frac{r_2}{r_1}) \en…

图数据库_Neo4j中文版_Centos7.9安装Neo4j社区版3.5.9_基于jdk1.8---Neo4j图数据库工作笔记0012

由于我们在国内使用啊,具体还是要用中文版滴,找了好久这个neo4j,原来还是有中文版的, https://we-yun.com/doc/neo4j-chs/ 中文版下载地址在这里: 所有版本都在这里了,需要哪个自己去下载就可以了,要注意下载以后,参考: https://we-yun.com/blog/prod-56.html 在这个位置下载…

Linux系统--进程间通信

文章目录 什么是进程间通信匿名管道命名管道 system V共享内存 system V消息队列 信号量 一、什么是进程间通信 首先由于进程运行是具有独立性的&#xff0c;具有独立的页表&#xff0c;PCB和虚拟地址空间等&#xff0c;父子进程间数据互补干扰。这就让进程间通信难度加大。由…