SpringBoot:使用Spring Batch实现批处理任务

news2025/1/12 8:52:21

引言

在这里插入图片描述

在企业级应用中,批处理任务是不可或缺的一部分。它们通常用于处理大量数据,如数据迁移、数据清洗、生成报告等。Spring Batch是Spring框架的一部分,专为批处理任务设计,提供了简化的配置和强大的功能。本文将介绍如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。

项目初始化

首先,我们需要创建一个SpringBoot项目,并添加Spring Batch相关的依赖项。可以通过Spring Initializr快速生成项目。

添加依赖

pom.xml中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>org.hsqldb</groupId>
    <artifactId>hsqldb</artifactId>
    <scope>runtime</scope>
</dependency>

配置Spring Batch

基本配置

Spring Batch需要一个数据库来存储批处理的元数据。我们可以使用HSQLDB作为内存数据库。配置文件application.properties

spring.datasource.url=jdbc:hsqldb:mem:testdb
spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver
spring.datasource.username=sa
spring.datasource.password=
spring.batch.initialize-schema=always
创建批处理任务

一个典型的Spring Batch任务包括三个主要部分:ItemReader、ItemProcessor和ItemWriter。

  1. ItemReader:读取数据的接口。
  2. ItemProcessor:处理数据的接口。
  3. ItemWriter:写数据的接口。
创建示例实体类

创建一个示例实体类,用于演示批处理操作:

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class Person {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String firstName;
    private String lastName;

    // getters and setters
}
创建ItemReader

我们将使用一个简单的FlatFileItemReader从CSV文件中读取数据:

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

@Configuration
public class BatchConfiguration {

    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
                .name("personItemReader")
                .resource(new ClassPathResource("sample-data.csv"))
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .build();
    }
}
创建ItemProcessor

创建一个简单的ItemProcessor,将读取的数据进行处理:

import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
public class PersonItemProcessor implements ItemProcessor<Person, Person> {

    @Override
    public Person process(Person person) throws Exception {
        final String firstName = person.getFirstName().toUpperCase();
        final String lastName = person.getLastName().toUpperCase();

        final Person transformedPerson = new Person();
        transformedPerson.setFirstName(firstName);
        transformedPerson.setLastName(lastName);

        return transformedPerson;
    }
}
创建ItemWriter

我们将使用一个简单的JdbcBatchItemWriter将处理后的数据写入数据库:

import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

@Configuration
public class BatchConfiguration {

    @Bean
    public JdbcBatchItemWriter<Person> writer(NamedParameterJdbcTemplate jdbcTemplate) {
        return new JdbcBatchItemWriterBuilder<Person>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)")
                .dataSource(jdbcTemplate.getJdbcTemplate().getDataSource())
                .build();
    }
}

配置Job和Step

一个Job由多个Step组成,每个Step包含一个ItemReader、ItemProcessor和ItemWriter。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob")
                .listener(listener)
                .flow(step1)
                .end()
                .build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }
}

监听Job完成事件

创建一个监听器,用于监听Job完成事件:

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        System.out.println("Job Started");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        System.out.println("Job Ended");
    }
}

测试与运行

创建一个简单的CommandLineRunner,用于启动批处理任务:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchApplication implements CommandLineRunner {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job job;

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        jobLauncher.run(job, new JobParameters());
    }
}

在完成配置后,可以运行应用程序,并检查控制台输出和数据库中的数据,确保批处理任务正常运行。

扩展功能

在基本的批处理任务基础上,可以进一步扩展功能,使其更加完善和实用。例如:

  • 多步骤批处理:一个Job可以包含多个Step,每个Step可以有不同的ItemReader、ItemProcessor和ItemWriter。
  • 并行处理:通过配置多个线程或分布式处理,提升批处理任务的性能。
  • 错误处理和重试:配置错误处理和重试机制,提高批处理任务的可靠性。
  • 数据验证:在处理数据前进行数据验证,确保数据的正确性。
多步骤批处理
@Bean
public Job multiStepJob(JobCompletionNotificationListener listener, Step step1, Step step2) {
    return jobBuilderFactory.get("multiStepJob")
            .listener(listener)
            .start(step1)
            .next(step2)
            .end()
            .build();
}

@Bean
public Step step2(JdbcBatchItemWriter<Person> writer) {
    return stepBuilderFactory.get("step2")
            .<Person, Person>chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .build();
}
并行处理

可以通过配置多个线程来实现并行处理:

@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
    return stepBuilderFactory.get("step1")
            .<Person, Person>chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(writer)
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(10);
    return taskExecutor;
}

结论

通过本文的介绍,我们了解了如何使用Spring Batch与SpringBoot结合,构建和管理批处理任务。从项目初始化、配置Spring Batch、实现ItemReader、ItemProcessor和ItemWriter,到配置Job和Step,Spring Batch提供了一系列强大的工具和框架,帮助开发者高效地实现批处理任务。通过合理利用这些工具和框架

,开发者可以构建出高性能、可靠且易维护的批处理系统。希望这篇文章能够帮助开发者更好地理解和使用Spring Batch,在实际项目中实现批处理任务的目标。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1879414.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

cpu,缓存,辅存,主存之间的关系及特点

关系图 示意图&#xff1a; ------------------- | CPU | | ------------- | | | 寄存器 | | | ------------- | | | L1缓存 | | | ------------- | | | L2缓存 | | | ------------- | | | L3缓存 | | | ------------- | ----…

test——认识测试

目录 前言 一什么是测试 1测试场景 2为什么需要测试 3测试定义 二测试的岗位 1测开与测试 2测试与开发的区别 a工作内容 b难易程度 c其它不同 三测试人员具备的素质 1综合能力 a沟通能力 b快速学习能力 c开发能力 d文字能力 2掌握自动化测试技术 前言 互联⽹…

Python基于决策树分类模型、随机森林分类模型、KNN分类模型和GBDT分类模型完成收入预测项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 在全球化的今天&#xff0c;收入不平等已经成为各国政府和社会关注的焦点问题之一。了解居民收入状况&…

后端之路第三站(Mybatis)——XML文件操作sql

一、XML映射文件是啥 前面我们学过了在Mapper接口用注解的方式来操作sql语句 那么XML映射文件就另一种操作sql语句的方法 为什么还要有这么个玩意&#xff1f; 我简单说就是&#xff1a;如果有的sql特别复杂的话&#xff0c;比如需要【动态sql】的话&#xff0c;就得用到XM…

木各力“GERRI”被“GREE”格力无效宣告成功

近日“GERRI”被“GREE”格力无效宣告成功&#xff0c;“GERRI”和“GREE”近似不&#xff0c;如果很近似当初就不会通过初审和下商标注册证&#xff0c;但是如果涉及知名商标和驰名商标&#xff0c;人家就可以异议和无效。 “GERRI”在被无效宣告时&#xff0c;引用了6个相关的…

电子电路学习笔记(4)三极管

部分内容参考链接&#xff1a; 电子电路学习笔记&#xff08;5&#xff09;——三极管_三极管 箭头-CSDN博客 模拟电子技术基础笔记&#xff08;4&#xff09;——晶体三极管_集电结的单向导电性-CSDN博客 硬件基本功-36-三极管Ib电流如何控制Ic电流_哔哩哔哩_bilibili 部分…

算法入门(上)

什么是算法&#xff1f; 算法&#xff08;Algorithm&#xff09;是解决特定问题求解步骤的描述&#xff0c;在计算机中表现为指令的有限序列&#xff0c;并且每条指令表示一个或多个操作。 给定一个问题&#xff0c;能够解决这个问题的算法是有很多种的。算式中的问题是千奇百怪…

【SCAU操作系统】期末复习简答及计算题例题解析

目录 一、写出下列英文缩写词在计算机系统中的英文或中文全名。 二、进程状态/调度/周转问题 &#xff08;1&#xff09;进程状态 &#xff08;2&#xff09;进程状态转换 &#xff08;3&#xff09;进程调度 &#xff08;4&#xff09;最短进程优先调度算法 三、逻辑地…

扫描全能王的AI驱动创新与智能高清滤镜技术解析

目录 引言1、扫描全能王2、智能高清滤镜黑科技2.1、图像视觉矫正2.2、去干扰技术 3、实际应用案例3.1、打印文稿褶皱检测3.2、试卷擦除手写3.3、老旧文件处理3.4、收银小票3.5、从不同角度扫描文档 4、用户体验结论与未来展望 引言 在数字化时代背景下&#xff0c;文档扫描功能…

51单片机第17步_外部中断

本章重点学习外部中断。 1、外部中断0框图&#xff1a; 2、外部中断1框图&#xff1a; 3、Keil C51中有一些关键字&#xff0c;需要牢记&#xff1a; interrupt 0&#xff1a;指定当前函数为外部中断0&#xff1b; interrupt 1&#xff1a;指定当前函数为定时器0中断&#x…

【Spring Boot】认识 JPA 的接口

认识 JPA 的接口 1.JPA 接口 JpaRepository2.分页排序接口 PagingAndSortingRepository3.数据操作接口 CrudRepository4.分页接口 Pageable 和 Page5.排序类 Sort JPA 提供了操作数据库的接口。在开发过程中继承和使用这些接口&#xff0c;可简化现有的持久化开发工作。可以使 …

电路笔记(电源模块): 基于FT2232HL实现的jtag下载器硬件+jtag的通信引脚说明

JTAG接口说明 JTAG 接口根据需求可以选择20针或14针的配置&#xff0c;具体选择取决于应用场景和需要连接的功能。比如之前的可编程逻辑器件XC9572XL使用JTAG引脚&#xff08;TCK、TDI、TDO、TMS、VREF、GND&#xff09;用于与器件进行调试和编程通信。更详细的内容可以阅读11…

(超详细)数据结构——“栈”的深度解析

前言&#xff1a; 在前几章我们介绍了线性表的基本概念&#xff0c;也讲解了包括顺序表&#xff0c;单链表&#xff0c;双向链表等线性表&#xff0c;相信大家已经对线性表比较熟悉了&#xff0c;今天我们要实现线性表的另一种结构——栈。 1.栈的概念 栈&#xff1a;一种特殊…

AI是如何与快充技术结合的?

针对AI技术在快充领域的运用&#xff0c;我们可以进一步深入探讨AI如何与快充技术结合&#xff0c;提升充电效率和用户体验。以下是一些具体的AI技术在快充领域的应用场景&#xff1a; 一、智能充电算法 学习充电模式&#xff1a;AI算法可以学习用户的充电习惯&#xff0c;比…

批量文件名修改软件:一键解决同一编码多型号文件分类与命名难题,高效管理文件

在数字化时代&#xff0c;图片文件已经成为我们工作中不可或缺的一部分。然而&#xff0c;当面对成百上千个同一编码下不同型号的图片文件时&#xff0c;如何快速、准确地进行分类和命名&#xff0c;成为了许多职场人士头疼的问题。现在&#xff0c;我们为您带来了一款神奇的批…

智能环境监测与数据分析系统

项目名称&#xff1a;智能环境监测与数据分析系统 一、引言 随着科技的发展和人们环保意识的增强&#xff0c;对环境监测的需求日益增加。传统的环境监测手段往往存在数据收集不及时、数据分析不准确等问题。因此&#xff0c;设计一个智能环境监测与数据分析系统具有重要的现…

如何在 SQL 中删除一条记录?

如何在 SQL 中删除一条记录&#xff1f; 在 SQL 中&#xff0c;您可以使用DELETE查询和WHERE子句删除表中的一条记录。在本文中&#xff0c;我将向您介绍如何使用DELETE查询和WHERE子句删除记录。我还将向您展示如何一次从表中删除多条记录 如何在 SQL 中使用 DELETE 这是使…

GMSB文章八:微生物中介分析

欢迎大家关注全网生信学习者系列&#xff1a; WX公zhong号&#xff1a;生信学习者Xiao hong书&#xff1a;生信学习者知hu&#xff1a;生信学习者CDSN&#xff1a;生信学习者2 介绍 中介分析&#xff08;Mediation Analysis&#xff09;是一种统计方法&#xff0c;用于研究一…

Linux基础篇——目录结构

基本介绍 Linux的文件系统是采用级层式的树状目录结构&#xff0c;在此结构中的最上层是根目录"/"&#xff0c;然后在根目录下再创建其他的目录 在Linux中&#xff0c;有一句经典的话&#xff1a;在Linux世界里&#xff0c;一切皆文件 Linux中根目录下的目录 具体的…

新能源行业知识体系-------主目录-----持续更新

本文相当于目录方便快速检索内容&#xff0c;没有实际内容&#xff0c;只做索引 文章目录 一、电力市场概论二、蒙西电网需求侧响应三、蒙西电网市场结算V2.0 一、电力市场概论 是学习清华大学电力市场概论(2024年春)的学习笔记&#xff0c;详细了解电力市场是如何利用经济学知…