【Spring底层原理高级进阶】Spring Batch清洗和转换数据,一键处理繁杂数据!Spring Batch是如何实现IO流优化的?本文详解!

news2025/1/11 14:06:57

🎉🎉欢迎光临,终于等到你啦🎉🎉

🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀

🌟持续更新的专栏《Spring 狂野之旅:从入门到入魔》 🚀

本专栏带你从Spring入门到入魔 

这是苏泽的个人主页可以看到我其他的内容哦👇👇

努力的苏泽icon-default.png?t=N7T8http://suzee.blog.csdn.net/


Spring Batch的应用场景和作用

批处理是企业级业务系统不可或缺的一部分,spring batch是一个轻量级的综合性批处理框架,可用于开发企业信息系统中那些至关重要的数据批量处理业务.SpringBatch基于POJO和Spring框架,相当容易上手使用,让开发者很容易地访问和利用企业级服务.spring batch具有高可扩展性的框架,简单的批处理,复杂的大数据批处理作业都可以通过SpringBatch框架来实现。

先来个例子

假设一家电商公司,每天从不同渠道收集大量的销售数据。这些数据包含了各种商品的销售记录,但是格式和质量可能不一致。您希望将这些销售数据进行清洗和转换,以便进行后续的分析和报告生成。

使用Spring Batch,可以创建一个批处理作业来处理销售数据。作业的步骤可以包括从不同渠道读取销售数据,对数据进行清洗和转换,例如去除无效数据、修复格式错误、计算额外的指标等。然后,将清洗和转换后的数据写入数据库,以备后续的分析和报告生成使用。

先来介绍其架构

  • Application应用层:包含了所有任务batch jobs和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。
  • Batch Core核心层:包含启动和管理任务的运行环境类,如JobLauncher等。
  • Batch Infrastructure基础层:上面两层是建立在基础层之上的,包含基础的读入reader写出writer、重试框架等。

为什么它能够如此优秀?

Chunk 的中文意思是:大块、厚块;大部分,大量。Chunk 在Spring Batch 中就是“批量操作”的概念的抽象。它本身是一个类,这个类就是用来将原本的单条操作改成批量进行。
在Spring Batch 中就提出了chunk 的概念。首先我们设定一个chunk 的size,随后Spring Batch 一条条地区处理数据,但是到ItemWriter 阶段,Spirng Batch 不会选择立刻将数据提交到数据库,只有在处理的数据累积数量达到了之前设置的chunk 的size 之后,才会进行提交操作。

 

实战详细操作

引入 依赖

首先,引Spring Batch的依赖项。在Maven项目中,在pom.xml文件中添加以下依赖项:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
</dependencies>

创建一个Spring配置文件(例如batch-config.xml),并配置Spring Batch的相关组件和属性。

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:task="http://www.springframework.org/schema/task"
       xmlns:util="http://www.springframework.org/schema/util"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">

    <!-- 数据源配置 -->
    <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="com.mysql.jdbc.Driver" />
        <property name="url" value="jdbc:mysql://localhost:3306/mydb" />
        <property name="username" value="root" />
        <property name="password" value="password" />
    </bean>

    <!-- JobRepository配置 -->
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
        <property name="dataSource" ref="dataSource" />
        <property name="transactionManager" ref="transactionManager" />
        <property name="databaseType" value="mysql" />
    </bean>

    <!-- 并发任务执行器配置 -->
    <task:executor id="taskExecutor" pool-size="10" />

    <!-- 事务管理器配置 -->
    <bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />

    <!-- JobLauncher配置 -->
    <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
        <property name="taskExecutor" ref="taskExecutor" />
    </bean>

</beans>

定义数据模型:


根据需求,定义需要清洗和转换的数据模型。例如,假设数据模型是一个简单的用户对象,包含id、姓名和年龄字段。创建一个名为User的Java类,如下所示:

public class User {
    private Long id;
    private String name;
    private Integer age;

    // 省略构造函数、getter和setter方法
}

创建ItemReader:


创建一个实现ItemReader接口的自定义类,用于从数据源中读取数据。以下是一个读取数据库中用户数据的示例:

public class UserItemReader implements ItemReader<User> {

    private JdbcTemplate jdbcTemplate;
    private int rowCount = 0;
    private int currentRow = 0;

    public UserItemReader(DataSource dataSource) {
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public User read() throws Exception {
        if (currentRow < rowCount) {
            String sql = "SELECT id, name, age FROM users LIMIT ?, 1";
            User user = jdbcTemplate.queryForObject(sql, new Object[]{currentRow}, (rs, rowNum) -> {
                User u = new User();
                u.setId(rs.getLong("id"));
                u.setName(rs.getString("name"));
                u.setAge(rs.getInt("age"));
                return u;
            });
            currentRow++;
            return user;
        } else {
            return null;
        }
    }

    public void setRowCount(int rowCount) {
        this.rowCount = rowCount;
    }
}

在此示例中,我们使用JdbcTemplate来执行数据库查询,并在read方法中逐行读取用户数据。

这里就可以根据你的业务需求设置各种各样的任务

创建ItemProcessor:
创建一个实现ItemProcessor接口的自定义类,用于对读取的数据进行清洗和转换。

temProcessor的作用是在Spring Batch的批处理作业中对读取的数据进行处理、清洗和转换。它是Spring Batch框架中的一个关键接口,用于执行中间处理逻辑,并将处理后的数据传递给ItemWriter进行写入操作。

以下是一个对用户数据进行简单处理的示例:

public class UserProcessor implements ItemProcessor<UserData, ProcessedUserData> {

    @Override
    public ProcessedUserData process(UserData userData) throws Exception {
        // 获取用户数据
        String input = userData.getData();

        // 去除首尾空格
        String trimmedInput = input.trim();

        // 过滤敏感信息
        String filteredInput = filterSensitiveData(trimmedInput);

        // 转换为大写
        String upperCaseInput = filteredInput.toUpperCase();

        // 创建处理后的用户数据对象
        ProcessedUserData processedUserData = new ProcessedUserData();
        processedUserData.setProcessedData(upperCaseInput);

        return processedUserData;
    }

    private String filterSensitiveData(String input) {
        // 在这里可以根据实际需求实现敏感信息过滤逻辑
        // 使用正则表达式、敏感词库或其他方法进行过滤
        // 这里是过滤手机号码和邮箱地址

        String filteredInput = input
                .replaceAll("\\b\\d{11}\\b", "[PHONE_NUMBER]")
                .replaceAll("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}\\b", "[EMAIL]");

        return filteredInput;
    }
}

我们做了以下处理和转换:

  1. 使用trim方法去除用户数据字符串首尾的空格。
  2. 使用filterSensitiveData方法过滤敏感信息,例如手机号码和邮箱地址。在示例中,我们使用了简单的正则表达式来过滤手机号码和邮箱地址,并将其替换为占位符。
  3. 使用toUpperCase方法将字符串转换为大写形式。
  4. 创建一个ProcessedUserData对象,将处理后的数据设置到输出对象中。

创建ItemWriter:


创建一个实现ItemWriter接口的自定义类,用于将处理后的数据写入目标位置。以下是一个将用户数据写入数据库的示例:

public class UserItemWriter implements ItemWriter<User> {

    private JdbcTemplate jdbcTemplate;

    public UserItemWriter(DataSource dataSource) {
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public void write(List<? extends User> users) throws Exception {
        for (User user : users) {
            String sql = "INSERT INTO processed_users (id, name, age) VALUES (?, ?, ?)";
            jdbcTemplate.update(sql, user.getId(), user.getName(), user.getAge());
        }
    }
}

在此示例中,我们使用JdbcTemplate将处理后的用户数据插入到名为processed_users的数据库表中。

创建作业配置:


创建一个包含作业配置的类,用于将ItemReader、ItemProcessor和ItemWriter组合在一起,定义一个批处理作业。以下是一个示例的作业配置类:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public ItemReader<User> userItemReader() {
        return new UserItemReader(dataSource);
    }

    @Bean
    public ItemProcessor<User, User> userItemProcessor() {
        return new UserItemProcessor();
    }

    @Bean
    public ItemWriter<User> userItemWriter() {
        return new UserItemWriter(dataSource);
    }

    @Bean
    public Step step1(ItemReader<User> reader, ItemProcessor<User, User> processor, ItemWriter<User> writer) {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public Job dataCleanupJob(Step step1) {
        return jobBuilderFactory.get("dataCleanupJob")
                .incrementer(new RunIdIncrementer())
                .flow(step1)
                .end()
                .build();
    }
}

在此示例中,我们通过Spring Batch的注解@EnableBatchProcessing启用批处理功能,并定义了一个名为dataCleanupJob的作业,其中包含一个名为step1的步骤。

运行作业:

创建Job和Step配置:使用Spring Batch的配置文件,配置Job和Step。使用JobParametersBuilder创建一个包含当前时间戳的Job参数,然后通过jobLauncher.run()方法启动作业。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class BatchApplication {
    public static void main(String[] args) {
        // 创建Spring应用上下文
        ApplicationContext context = new AnnotationConfigApplicationContext(BatchConfiguration.class);

        // 获取JobLauncher和Job实例
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean("dataCleanupJob", Job.class);

        try {
            // 创建Job参数
            JobParameters jobParameters = new JobParametersBuilder()
                    .addLong("time", System.currentTimeMillis())
                    .toJobParameters();

            // 启动作业
            jobLauncher.run(job, jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

监听Listener

可以通过Listener接口对特定事件进行监听,以实现更多业务功能。比如如果处理失败,就记录一条失败日志;处理完成,就通知下游拿数据等。

import org.springframework.batch.core.*;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;

public class MyJobListener extends JobExecutionListenerSupport {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        // 在作业执行之前执行的逻辑
        System.out.println("作业开始执行");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        // 在作业执行之后执行的逻辑
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            System.out.println("作业执行成功");
        } else if (jobExecution.getStatus() == BatchStatus.FAILED) {
            System.out.println("作业执行失败");
        }
    }

    @Override
    public void onSkipInRead(Throwable t) {
        // 在读取过程中发生跳过记录的逻辑
        System.out.println("跳过读取记录");
    }

    @Override
    public void onSkipInProcess(Object item, Throwable t) {
        // 在处理过程中发生跳过记录的逻辑
        System.out.println("跳过处理记录");
    }

    @Override
    public void onSkipInWrite(Object item, Throwable t) {
        // 在写入过程中发生跳过记录的逻辑
        System.out.println("跳过写入记录");
    }
}

将这个自定义的监听器添加到作业配置中:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    // 省略其他配置

    @Bean
    public Job dataCleanupJob(Step step1, JobExecutionListener jobListener) {
        return jobBuilderFactory.get("dataCleanupJob")
                .incrementer(new RunIdIncrementer())
                .listener(jobListener) // 添加自定义的监听器
                .flow(step1)
                .end()
                .build();
    }

    @Bean
    public JobExecutionListener jobListener() {
        return new MyJobListener();
    }

    // 省略其他配置
}

这样  我们就能很清晰的看到 任务运行的情况啦

Spring Batch 使用内存缓冲机制,将读取的数据记录暂存于内存中,然后批量处理这些数据。通过减少对磁盘或数据库的频繁访问,内存缓冲可以提高读取和处理的效率,而且Spring Batch 提供了批量读取的机制,允许一次性读取和处理多个数据记录,这两点都减轻 I/O 压力。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1497820.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法题 — 三个数的最大乘机

三个数的最大乘机 整型数组 nums&#xff0c;在数组中找出由三个数字组成的最大乘机&#xff0c;并输出这个乘积。&#xff08;乘积不会越界&#xff09; 重点考察&#xff1a;线性扫描 排序法&#xff1a; public static void main(String[] args) {System.out.println(so…

海外IP代理应用:亚马逊使用什么代理IP?

代理IP作为网络活动的有力工具&#xff0c;同时也是跨境电商的必备神器。亚马逊作为跨境电商的头部平台&#xff0c;吸引了大量的跨境电商玩家入驻&#xff0c;想要做好亚马逊&#xff0c;养号、测评都需要代理IP的帮助。那么应该使用什么代理IP呢&#xff1f;如何使用&#xf…

Jmeter高效组织接口自动化用例

1、善用“逻辑控制器”中的“简单控制器”。可以把简单控制器像文件夹一样使用&#xff0c;通过它来对用例进行分类归档&#xff0c;方便后续用例的调试和执行。 2、同编写测试用例一样&#xff0c;这里的接口测试用例应该进行唯一性编号&#xff0c;这样在运行整个用例计划出现…

《TCP/IP详解 卷一》第15章 TCP数据流与窗口管理

目录 15.1 引言 15.2 交互式通信 15.3 延时确认 15.4 Nagle 算法 15.4.1 延时ACK与Nagle算法结合 15.4.2 禁用Nagle算法 15.5 流量控制与窗口管理 15.5.1 滑动窗口 15.5.2 零窗口与TCP持续计时器 15.5.3 糊涂窗口综合征 15.5.4 大容量缓存与自动调优 15.6 紧急机制…

汽车小车车灯无痕修复用的胶是什么胶?

汽车小车车灯无痕修复用的胶是什么胶&#xff1f; 可以使用在小车车灯无痕修复中的胶水&#xff0c;通常使用的车灯无痕修复专用UV胶。 车灯无痕修复专用胶主要成份是改性丙烯酸UV树脂&#xff0c;主要应用在车灯的专业无痕修复领域。它可以用于修复车灯壳的裂缝或破损&#xf…

阿珊解析Vuex:实现状态管理的利器

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

如何将字体添加到 ONLYOFFICE 桌面编辑器8.0

作者&#xff1a;VincentYoung 为你写好的文字挑选一款好看的字体然而自带的字体列表却找不到你喜欢的怎么办&#xff1f;这只需要自己手动安装一款字体即可。这里教你在不同的桌面操作系统里的多种字体安装方法。 ONLYOFFICE 桌面编辑器 ONLYOFFICE 桌面编辑器是一款免费的办…

第一讲 计算机组成与结构(初稿)

计算机组成与结构 计算机指令常见CPU寄存器类型有哪些&#xff1f;存储器分类&#xff1f;内存&#xff1f;存储器基本组成&#xff1a; 控制器的基本组成主机完成指令的过程以取数指令为例以存数指令为例ax^2bxc程序的运行过程 机器字长存储容量小试牛刀&#xff08;答案及解析…

【腾讯云】 爆款2核2G3M云服务器首年 61元,叠加红包再享折上折

同志们&#xff0c;云服务器行业大内圈&#xff0c;腾讯云各个活动都已开始卷中卷&#xff0c;我整理一下各个活动&#xff0c;加油冲了 【腾讯云】 爆款2核2G3M云服务器首年 61元&#xff0c;叠加红包再享折上折&#xff0c;最低只要51 【腾讯云】多款热门AI产品新春巨惠&…

计网《一》|互联网结构发展史|标准化工作|互联网组成|性能指标|计算机网络体系结构

计网《一》| 概述 计算机网络在信息时代的作用什么是互联网呢&#xff1f;互联网有什么用呢&#xff1f;为什么互联网能为用户提供许多服务 互联网基础结构发展的三个阶段第一个阶段&#xff1a;第二阶段&#xff1a;第三个阶段&#xff1a; 互联网标准化的工作互联网的组成边缘…

CCF-B推荐会议 Euro-Par‘24延期10天! 3月25日截稿!抓住机会!

会议之眼 快讯 第30届Euro-Par(International European Conference on Parallel and Distributed Computing)即国际欧洲并行和分布式计算会议将于 2024 年 8月26日-30日在西班牙马德里举行&#xff01;Euro-Par是欧洲最主要的会议之一&#xff0c;提供了一个广泛而综合的平台&a…

数字孪生10个技术栈:数据处理的六步骤,以获得可靠数据。

一、什么是数据处理 在数字孪生中&#xff0c;数据处理是指对采集到的实时或历史数据进行整理、清洗、分析和转化的过程。数据处理是数字孪生的基础&#xff0c;它将原始数据转化为有意义的信息&#xff0c;用于模型构建、仿真和决策支持。 数据处理是为了提高数据质量、整合数…

Java面试挂在线程创建后续,不要再被八股文误导了!创建线程的方式只有1种

写在开头 在上篇博文中我们提到小伙伴去面试&#xff0c;面试官让说出8种线程创建的方式&#xff0c;而他只说出了4种&#xff0c;导致面试挂掉&#xff0c;在博文中也给出了10种线程创建的方式&#xff0c;但在文章的结尾我们提出&#xff1a;真正创建线程的方式只有1种&…

Kakarot:当今以太坊的未来

1. 引言 前序博客&#xff1a; Kakarot&#xff1a;部署在Starknet上的ZK-EVM type 3 随着 Kakarot zkEVM 即将发布测试网&#xff0c;想重申下 Kakarot zkEVM 的愿景为&#xff1a; 为什么在rollup空间中还需要另一个 zkEVM&#xff1f; 开源代码见&#xff1a; https:/…

第三百八十七回

文章目录 1. 概念介绍2. 使用方法3. 示例代码 我们在上一章回中介绍了DateRangePickerDialog Widget相关的内容,本章回中将介绍Radio Widget.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 我们在这里说的Radio Widget是指单选按钮&#xff0c;没有选中时是圆形…

PEIS源码 健康体检中心源码 C/S

目录 一、系统概述 二、系统开发环境 三、系统功能 检前管理 检中管理 检后管理 设备对接-PACS 设备对接-彩超 LIS-结果录入、审核、外送结果自动导入 一、系统概述 体检系统&#xff0c;是专为体检中心/医院体检科等体检机构&#xff0c;专门开发的全流程管理系…

创建Net8WebApi自动创建OpenApi集成swagger

问题&#xff1a;用Net8创建WebAPI时勾选启动OpenAPI&#xff0c;项目自动集成了Swagger&#xff0c;但是接口注释没有展示&#xff1f; 解决&#xff1a; 1.需要生成Api项目的XML文件。操作如下&#xff1a; 2.生成XML文件后&#xff0c;还需要在启动类Program.cs里面配置Sw…

快速批量将图片变成圆角怎么弄?教你一键将图片批量加圆角

在我们日常工作中&#xff0c;在设计图片的时候会要求将直角变成圆角&#xff0c;那么为什么要这么做呢&#xff1f;首先从圆角的设计语言上来说说&#xff0c;圆角看起来很现代&#xff0c;传达给人的感觉是温和友善的&#xff0c;被广泛的应用在产品中的图标、按钮等地方。而…

代码随想录day15(2)栈与队列:滑动窗口最大值(leetcode239)

题目要求&#xff1a;给定一个数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回滑动窗口中的最大值。 思路&#xff1a;首先的想法就是暴力方法&#xff0c;遍历一遍…

SSD LDPC软错误探测方案解读

上一篇文档中,基于SSD LDPC(Low-Density Parity-Check Codes)原理背景和纠错能力作了简单的介绍。 扩展阅读: 关于SSD LDPC纠错能力的基础探究 浅析LDPC软解码对SSD延迟的影响 本篇结合SMI发布的研究成果,通过SSD控制内部LDPC更底层的架构,来解读如何增强软错误探测能力…