Spring Batch 综合案例实战中

news2025/1/16 5:38:21

目录

需求一

需求二

转视频版


需求一

需求:先动态生成50w条员工数据,存放再employee.csv文件中

步骤1:定义:DataInitController

@RestController
public class DataInitController {

    @Autowired
    private IEmployeeService employeeService;

    @GetMapping("/dataInit")
    public String dataInit() throws IOException {
        employeeService.dataInit();
        return "ok";
    }
}

步骤2:在IEmployeeService 添加dataInit 方法

public interface IEmployeeService {
    /**
     * 保存
     */
    void save(Employee employee);

    /**
     * 初始化数据:生成50w数据
     */
    void dataInit() throws IOException;
}

步骤3:在EmployeeServiceImpl 实现方法

@Service
public class EmployeeServiceImpl implements IEmployeeService {
    @Autowired
    private EmployeeMapper employeeMapper;
    @Override
    public void save(Employee employee) {
        employeeMapper.save(employee);
    }

    @Value("${job.data.path}")
    public String path;

    @Override
    public void dataInit() throws IOException {
        File file = new File(path, "employee.csv");
        if (file.exists()) {
            file.delete();
        }
        file.createNewFile();
        FileOutputStream out = new FileOutputStream(file);
        String txt = "";

        Random ageR = new Random();
        Random boolR = new Random();

        // 给文件中生产50万条数据
        long beginTime = System.currentTimeMillis();
        System.out.println("开始时间:【 " + beginTime + " 】");
        for (int i = 1; i <= 500000; i++) {
            if(i == 500000){
                txt = i+",dafei_"+ i +"," + ageR.nextInt(100) + "," + (boolR.nextBoolean()?1:0);
            }else{
                txt = i+",dafei_"+ i +"," + ageR.nextInt(100) + "," + (boolR.nextBoolean()?1:0) +"\n";
            }

            out.write(txt.getBytes());
            out.flush();
        }
        out.close();
        System.out.println("总共耗时:【 " + (System.currentTimeMillis() - beginTime) + " 】毫秒");
    }
}

步骤4:访问http://localhost:8080/dataInit 生成数据。

需求二

需求:启动作业异步读取employee.csv文件,将读到数据写入到employee_temp表,要求记录读与写消耗时间

步骤1:修改IEmployeeService 接口

public interface IEmployeeService {
    /**
     * 保存
     */
    void save(Employee employee);

    /**
     * 初始化数据:生成50w数据
     */
    void dataInit() throws IOException;
    
    /**
     * 清空数据
     */
    void truncateAll();
    
    /**
     * 清空employee_temp数据
     */
    void truncateTemp();
}

 步骤2:修改EmployeeServiceImpl

@Override
public void truncateAll() {
    employeeMapper.truncateAll();
}

@Override
public void truncateTemp() {
    employeeMapper.truncateTemp();
}

步骤3:修改IEmployeeMapper.java

public interface EmployeeMapper {

    /**
     * 添加
     */
    int save(Employee employee);

    /**
     * 添加临时表
     * @param employee
     * @return
     */
    int saveTemp(Employee employee);

    /**
     * 清空数据
     */
    void truncateAll();

    /**
     * 清空临时表数据
     */
    void truncateTemp();
}

步骤4:修改EmployeeMapper.xml

<insert id="saveTemp" keyColumn="id" useGeneratedKeys="true" keyProperty="id">
    insert into employee_temp(id, name, age, sex) values(#{id},#{name},#{age},#{sex})
</insert>

<delete id="truncateAll">
    truncate employee
</delete>

<delete id="truncateTemp">
    truncate employee_temp
</delete>

步骤5:在com.langfeiyes.exp.job.listener 包新建监听器,用于计算开始结束时间

package com.langfeiyes.exp.job.listener;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;


public class CsvToDBJobListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
            long begin = System.currentTimeMillis();
            jobExecution.getExecutionContext().putLong("begin", begin);
            System.err.println("-------------------------【CsvToDBJob开始时间:】---->"+begin+"<-----------------------------");
        }

    @Override
    public void afterJob(JobExecution jobExecution) {
                long begin = jobExecution.getExecutionContext().getLong("begin");
                long end = System.currentTimeMillis();
                System.err.println("-------------------------【CsvToDBJob结束时间:】---->"+end+"<-----------------------------");
                System.err.println("-------------------------【CsvToDBJob总耗时:】---->"+(end - begin)+"<-----------------------------");
            }
}

步骤6:在com.langfeiyes.exp.job.config包定义CsvToDBJobConfig配置类

package com.langfeiyes.exp.job.config;


import com.langfeiyes.exp.domain.Employee;
import com.langfeiyes.exp.job.listener.CsvToDBJobListener;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import java.io.File;

/**
 * 将数据从csv文件中读取,并写入数据库
 */
@Configuration
public class CsvToDBJobConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private SqlSessionFactory sqlSessionFactory;

    @Value("${job.data.path}")
    private String path;

    //多线程读-读文件,使用FlatFileItemReader
    @Bean
    public FlatFileItemReader<Employee> cvsToDBItemReader(){
        FlatFileItemReader<Employee> reader = new FlatFileItemReaderBuilder<Employee>()
                .name("employeeCSVItemReader")
                .saveState(false) //防止状态被覆盖
                .resource(new PathResource(new File(path, "employee.csv").getAbsolutePath()))
                .delimited()
                .names("id", "name", "age", "sex")
                .targetType(Employee.class)
                .build();

        return reader;
    }

    //数据库写-使用mybatis提供批处理读入
    @Bean
    public MyBatisBatchItemWriter<Employee> cvsToDBItemWriter(){
        MyBatisBatchItemWriter<Employee> itemWriter = new MyBatisBatchItemWriter<>();
        itemWriter.setSqlSessionFactory(sqlSessionFactory); //需要指定sqlsession工厂
        //指定要操作sql语句,路径id为:EmployeeMapper.xml定义的sql语句id
        itemWriter.setStatementId("com.langfeiyes.exp.mapper.EmployeeMapper.saveTemp");  //操作sql
        return itemWriter;
    }

    @Bean
    public Step csvToDBStep(){
        return stepBuilderFactory.get("csvToDBStep")
                .<Employee, Employee>chunk(10000)  //每个块10000个 共50个
                .reader(cvsToDBItemReader())
                .writer(cvsToDBItemWriter())
                .taskExecutor(new SimpleAsyncTaskExecutor())  //多线程读写
                .build();

    }

    //job监听器
    @Bean
    public CsvToDBJobListener csvToDBJobListener(){
        return new CsvToDBJobListener();
    }

    @Bean
    public Job csvToDBJob(){
        return jobBuilderFactory.get("csvToDB-step-job")
                .start(csvToDBStep())
                .incrementer(new RunIdIncrementer()) //保证可以多次执行
                .listener(csvToDBJobListener())
                .build();

    }
}

步骤7:在com.langfeiyes.exp.controller 添加JobController

package com.langfeiyes.exp.controller;

import com.langfeiyes.exp.service.IEmployeeService;
import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController
public class JobController {

    @Autowired
    private IEmployeeService employeeService;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobExplorer jobExplorer;

    @Autowired
    @Qualifier("csvToDBJob")
    private Job csvToDBJob;

    @GetMapping("/csvToDB")
    public String csvToDB() throws Exception {
        employeeService.truncateTemp(); //清空数据运行多次执行
        
        //需要多次执行,run.id 必须重写之前,再重构一个新的参数对象
        JobParameters jobParameters = new JobParametersBuilder(new JobParameters(),jobExplorer)
                .addLong("time", new Date().getTime())
                .getNextJobParameters(csvToDBJob).toJobParameters();
        JobExecution run = jobLauncher.run(csvToDBJob, jobParameters);
        return run.getId().toString();
    }
}

步骤8:访问测试:http://localhost:8080/csvToDB

-------------------------【CsvToDBJob开始时间:】---->1670575356773<-----------------------------
-------------------------【CsvToDBJob结束时间:】---->1670575510967<-----------------------------
-------------------------【CsvToDBJob总耗时:】---->154194<-----------------------------

到这,本篇就结束了,欲知后事如何,请听下回分解~

转视频版

看文字不过瘾可以切换视频版:Spring Batch高效批处理框架实战

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/379064.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

arduino-sentry2之卡片篇

欧克,今天在学生的强烈要求下 我又重启arduino的sentry2调试篇 目前实验结果,可以检测到10张交通卡片 也就是如图所示十张 具体视频如下: https://live.csdn.net/v/279170 具体代码如下: #include <Arduino.h> #include <

什么是千年虫?计算机如何开始处理日期?都有哪些时间日期格式化?

目录 “千年虫”漏洞&#xff08;Year 2000 Problem&#xff0c;简称“Y2K”&#xff09; 计算机是怎么开始处理日期的么&#xff1f; 举例1&#xff1a;时间格式化举例( 过滤器) 举例2&#xff1a;时间格式化 自定义私有过滤器(日期格式化) 高性能计数器演示 OLE时间对象…

Vue的组件(注册、局部、组件复用、props、emit、生命周期)全解

文章目录前言知识点组件注册局部组件组件复用组件间通信props 类型检测子父组件通信之 emit动态组件生命周期函数前言 Vue 支持模块化和组件化开发&#xff0c;可以将整个页面进行模块化分割&#xff0c;低耦合高内聚&#xff0c;使得代码可以在各个地方使用。 知识点 组件注册…

python自学之《21天学通Python》(15)——第18章 数据结构基础

数据结构是用来描述一种或多种数据元素之间的特定关系&#xff0c;算法是程序设计中对数据操作的描述&#xff0c;数据结构和算法组成了程序。对于简单的任务&#xff0c;只要使用编程语言提供的基本数据类型就足够了。而对于较复杂的任务&#xff0c;就需要使用比基本的数据类…

华三OSPF 综合实验

OSPF 实验 实验拓扑 实验需求 按照图示配置 IP 地址按照图示分区域配置 OSPF &#xff0c;实现全网互通为了路由结构稳定&#xff0c;要求路由器使用环回口作为 Router-id&#xff0c;ABR 的环回口宣告进骨干区域 实验解法 1.配置 IP 地址部分 2.按照图示分区域配置 OS…

FFmpeg从入门到入魔(1):初探FFmpeg框架

1. FFmpeg介绍与裁剪1.1 FFmpeg简介FFmpeg&#xff08;Fast forword mpeg&#xff0c;音视频转换器&#xff09;是一个开源免费跨平台的视频和音频流方案&#xff0c;它提供了录制/音视频编解码、转换以及流化音视频的完整解决方案。ffmpeg4.0.2源码目录结构如下&#xff1a;目…

为什么IBDP的文凭更受美国大学的青睐?

家长们可以看到&#xff0c;不管是AP还是A-LEVEL这样的课程&#xff0c;都只是单科的课程&#xff08;A-LEVEL也是英国发展出来&#xff0c;AP是针对美国大学设计的&#xff09;&#xff0c;学生是可以针对他们的强项去做选修&#xff0c;比如我的化学很强&#xff0c;那我可以…

第十节 集合

集合 什么是集合 集合就是能储存一批元素的容器。 特征&#xff1a; 集合类型可以不固定&#xff0c;大小也是可变的。 ArrayList集合 ArrayList是集合中的一种&#xff0c;它支持索引。 ArrayList集合的对象获取 public ArrayList()创建一个空的集合对象 ArrayList集合的添加…

Hive 一文读懂

Hive 简介1.1 什么是Hive1&#xff09;hive简介Hive&#xff1a;由Facebook开源用于解决海量结构化日志的数据统计。Hive是基于Hadoop的一个数据仓库工具&#xff0c;可以将结构化的数据文件映射为一张表&#xff0c;并提供类SQL查询功能。2&#xff09;Hive本质&#xff1a;将…

Goframe快速创建项目,并使用Cli工具创建dao、service、logic

GoFrame项目创建与Cli工具创建1.项目创建2.Mysql数据库配置3.Cli工具dao自动生成4.业务模型须知5.Cli工具service/logic自动生成 - 接口6.Controller/Api创建1.项目创建 官网 - 项目创建-init 开发文档 - 目录介绍 官网 - 示例项目 1.gf init 项目名 &#xff08;创建项目…

无法定位程序输入点kernel32.dll,如何修复kernel32.dll

kernel32.dll是Windows操作系统中非常重要的一个系统文件&#xff0c;如果它丢失或损坏可能会导致许多应用程序无法正常运行。今天小编就来给大家详细的讲解一下无法定位程序输入点kernel32.dll&#xff0c;我们要怎么修复这个kernel32.dll缺失的问题。 一.kernel32.dll时候什么…

前端开发环境配置,浏览器跨域配置,代码提交配置git等

这是我目前公司的开发配置文档大家可以参考&#xff1a; 前端文档 1 搭建前端环境 1.1 安装nodejs 1.1.1 nodejs下载地址 https://nodejs.org/dist/v10.15.3/node-v10.15.3-x64.msi&#xff08;win64&#xff09; https://nodejs.org/dist/v10.15.3/node-v10.15.3.pkg&…

查询性能较 Trino/Presto 3-10 倍提升!Apache Doris 极速数据湖分析深度解读

从上世纪 90 年代初 Bill Inmon 在《building the Data Warehouse》一书中正式提出数据仓库这一概念&#xff0c;至今已有超过三十年的时间。在最初的概念里&#xff0c;数据仓库被定义为「一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合&#xff0c;用于支持管理…

Python排序 -- 内附蓝桥题:错误票据,奖学金

排序 ~~不定时更新&#x1f383;&#xff0c;上次更新&#xff1a;2023/02/28 &#x1f5e1;常用函数&#xff08;方法&#xff09; 1. list.sort() --> sort 是 list 的方法&#xff0c;会直接修改 list 举个栗子&#x1f330; li [2,3,1,5,4] li.sort() print(li) …

New Bing怼人、说谎、PUA,ChatGPT已经开始胡言乱语了

最近&#xff0c;来自大洋彼岸那头的ChatGPT科技浪潮席卷而来&#xff0c;微软将chatGPT整合搜索引擎Bing开启内测后&#xff0c;数百万用户蜂拥而至&#xff0c;都想试试这个「百事通」。 赶鸭子上架&#xff0c;“翻车”了&#xff1f; 但短短上线十几天&#xff0c;嵌入了…

5个开源的Java项目快速开发脚手架

概览 &#xff1a; GunspigRuoYiJeecg-bootiBase4J 一、Guns 推荐指数 &#xff1a;⭐⭐⭐⭐⭐ 简介 采用主流框架 &#xff1a; 基于 Spring Boot2.0版本开发&#xff0c;并且支持 Spring Cloud Alibaba 微服务。功能齐全 &#xff1a;包含系统管理&#xff0c;代码生成&a…

python线程池【ThreadPoolExecutor()】批量获取博客园标题数据

转载&#xff1a;蚂蚁学python 网址&#xff1a;【【2021最新版】Python 并发编程实战&#xff0c;用多线程、多进程、多协程加速程序运行】 https://www.bilibili.com/video/BV1bK411A7tV/?p8&share_sourcecopy_web&vd_sourced0ef3d08fdeef1740bab49cdb3e96467实战案…

SpringMVC 面试题

1、什么是SpringMVC&#xff1f; SpringMVC是一个基于Java的实现了MVC设计模式的“请求驱动型”的轻量级WEB框架&#xff0c;通过把model&#xff0c;view&#xff0c;controller 分离&#xff0c;将web层进行职责的解耦&#xff0c;把复杂的web应用分成逻辑清晰的几个部分&am…

Arduino-PWM调光

PWM调光实验什么是PWM&#xff1f;PWM是&#xff08;Pulse Width Modulation&#xff09;的简称&#xff0c;中文我们说这是脉冲宽度调制。脉冲宽度调制是一种模拟控制方式&#xff0c;根据相应载荷的变化来调制晶体管基极或MOS管栅极的偏置&#xff0c;来实现晶体管或MOS管导通…

react 函数式组件的hooks

目录 useState useEffect useCallback useMemo useRef useContext useReducer 自定义hooks useState 函数式组件的状态 &#xff0c;格式&#xff1a; const [value,setValue] useState( {xxxx} ) console.log([value, setValue])打印一下可以看到&#xff1a; value…