Spring Batch 高级篇-分区步骤

news2024/11/18 5:32:25

目录

引言

概念

分区器

分区处理器

案例

转视频版


引言

接着上篇:Spring Batch 高级篇-并行步骤了解Spring Batch并行步骤后,接下来一起学习一下Spring Batch 高级功能-分区步骤

概念

分区:有划分,区分意思,在SpringBatch 分区步骤讲的是给执行步骤区分上下级。

上级: 主步骤(Master Step)

下级: 从步骤--工作步骤(Work Step)

主步骤是领导,不用干活,负责管理从步骤,从步骤是下属,必须干活。

一个主步骤下辖管理多个从步骤。

注意: 从步骤,不管多小,它也一个完整的Spring Batch 步骤,负责各自的读入、处理、写入等。

分区步骤结构图

 分区步骤一般用于海量数据的处理上,其采用是分治思想。主步骤将大的数据划分多个小的数据集,然后开启多个从步骤,要求每个从步骤负责一个数据集。当所有从步骤处理结束,整作业流程才算结束。

分区器

主步骤核心组件,负责数据分区,将完整的数据拆解成多个数据集,然后指派给从步骤,让其执行。

拆分规则由Partitioner分区器接口定制,默认的实现类:MultiResourcePartitioner

public interface Partitioner {
	Map<String, ExecutionContext> partition(int gridSize);
}

Partitioner 接口只有唯一的方法:partition 参数gridSize表示要分区的大小,可以理解为要开启多个worker步骤,返回值是一个Map, 其中key:worker步骤名称, value:worker步骤启动需要参数值,一般包含分区元数据,比如起始位置,数据量等。

分区处理器

主步骤核心组件,统一管理work 步骤, 并给work步骤指派任务。

管理规则由PartitionHandler 接口定义,默认的实现类:TaskExecutorPartitionHandler

案例

需求:下面几个文件将数据读入内存

 

步骤1:准备数据

user1-10.txt

1#dafei#18
2#dafei#18
3#dafei#18
4#dafei#18
5#dafei#18
6#dafei#18
7#dafei#18
8#dafei#18
9#dafei#18
10#dafei#18

user11-20.txt

11#dafei#18
12#dafei#18
13#dafei#18
14#dafei#18
15#dafei#18
16#dafei#18
17#dafei#18
18#dafei#18
19#dafei#18
20#dafei#18

user21-30.txt

21#dafei#18
22#dafei#18
23#dafei#18
24#dafei#18
25#dafei#18
26#dafei#18
27#dafei#18
28#dafei#18
29#dafei#18
30#dafei#18

user31-40.txt

31#dafei#18
32#dafei#18
33#dafei#18
34#dafei#18
35#dafei#18
36#dafei#18
37#dafei#18
38#dafei#18
39#dafei#18
40#dafei#18

user41-50.txt

41#dafei#18
42#dafei#18
43#dafei#18
44#dafei#18
45#dafei#18
46#dafei#18
47#dafei#18
48#dafei#18
49#dafei#18
50#dafei#18

步骤2:准备实体类

@Getter
@Setter
@ToString
public class User {
    private Long id;
    private String name;
    private int age;
}

步骤3:配置分区逻辑

public class UserPartitioner  implements Partitioner {
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> result = new HashMap<>(gridSize);

        int range = 10; //文件间隔
        int start = 1; //开始位置
        int end = 10;  //结束位置
        String text = "user%s-%s.txt";

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext value = new ExecutionContext();
            Resource resource = new ClassPathResource(String.format(text, start, end));
            try {
                value.putString("file", resource.getURL().toExternalForm());
            } catch (IOException e) {
                e.printStackTrace();
            }
            start += range;
            end += range;

            result.put("user_partition_" + i, value);
        }
        return result;
    }
}

步骤4:全部代码

package com.langfeiyes.batch._37_step_part;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import java.util.List;

@SpringBootApplication
@EnableBatchProcessing
public class PartStepJob {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;



    //每个分区文件读取
    @Bean
    @StepScope
    public FlatFileItemReader<User> flatItemReader(@Value("#{stepExecutionContext['file']}") Resource resource){
        return new FlatFileItemReaderBuilder<User>()
                .name("userItemReader")
                .resource(resource)
                .delimited().delimiter("#")
                .names("id", "name", "age")
                .targetType(User.class)
                .build();
    }

    @Bean
    public ItemWriter<User> itemWriter(){
        return new ItemWriter<User>() {
            @Override
            public void write(List<? extends User> items) throws Exception {
                items.forEach(System.err::println);
            }
        };
    }


    //文件分区器-设置分区规则
    @Bean
    public UserPartitioner userPartitioner(){
        return new UserPartitioner();
    }

    //文件分区处理器-处理分区
    @Bean
    public PartitionHandler userPartitionHandler() {
        TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        handler.setGridSize(5);
        handler.setTaskExecutor(new SimpleAsyncTaskExecutor());
        handler.setStep(workStep());
        try {
            handler.afterPropertiesSet();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return handler;
    }
    
    //每个从分区操作步骤
    @Bean
    public Step workStep() {
        return stepBuilderFactory.get("workStep")
                .<User, User>chunk(10)
                .reader(flatItemReader(null))
                .writer(itemWriter())
                .build();
    }

    //主分区操作步骤
    @Bean
    public Step masterStep() {
        return stepBuilderFactory.get("masterStep")
                .partitioner(workStep().getName(),userPartitioner())
                .partitionHandler(userPartitionHandler())
                .build();
    }


    @Bean
    public Job partJob(){
        return jobBuilderFactory.get("part-step-job")
                .start(masterStep())
                .build();
    }
    public static void main(String[] args) {
        SpringApplication.run(PartStepJob.class, args);
    }
}

结果:

User(id=31, name=dafei, age=18)
User(id=32, name=dafei, age=18)
User(id=33, name=dafei, age=18)
User(id=34, name=dafei, age=18)
User(id=35, name=dafei, age=18)
User(id=36, name=dafei, age=18)
User(id=37, name=dafei, age=18)
User(id=38, name=dafei, age=18)
User(id=39, name=dafei, age=18)
User(id=40, name=dafei, age=18)
User(id=41, name=dafei, age=18)
User(id=42, name=dafei, age=18)
User(id=43, name=dafei, age=18)
User(id=44, name=dafei, age=18)
User(id=45, name=dafei, age=18)
User(id=46, name=dafei, age=18)
User(id=47, name=dafei, age=18)
User(id=48, name=dafei, age=18)
User(id=49, name=dafei, age=18)
User(id=50, name=dafei, age=18)
User(id=21, name=dafei, age=18)
User(id=22, name=dafei, age=18)
User(id=23, name=dafei, age=18)
User(id=24, name=dafei, age=18)
User(id=25, name=dafei, age=18)
User(id=26, name=dafei, age=18)
User(id=27, name=dafei, age=18)
User(id=28, name=dafei, age=18)
User(id=29, name=dafei, age=18)
User(id=30, name=dafei, age=18)
User(id=1, name=dafei, age=18)
User(id=2, name=dafei, age=18)
User(id=3, name=dafei, age=18)
User(id=4, name=dafei, age=18)
User(id=5, name=dafei, age=18)
User(id=6, name=dafei, age=18)
User(id=7, name=dafei, age=18)
User(id=8, name=dafei, age=18)
User(id=9, name=dafei, age=18)
User(id=10, name=dafei, age=18)
User(id=11, name=dafei, age=18)
User(id=12, name=dafei, age=18)
User(id=13, name=dafei, age=18)
User(id=14, name=dafei, age=18)
User(id=15, name=dafei, age=18)
User(id=16, name=dafei, age=18)
User(id=17, name=dafei, age=18)
User(id=18, name=dafei, age=18)
User(id=19, name=dafei, age=18)
User(id=20, name=dafei, age=18)

解析:核心点

1>文件分区器:userPartitioner(), 分别加载5个文件进入到程序

2>文件分区处理器:userPartitionHandler() ,指定要分几个区,由谁来处理

3>分区从步骤:workStep() 指定读逻辑与写逻辑

4>分区文件读取:flatItemReader(),需要传入Resource对象,这个对象在userPartitioner()已经标记为file

5>分区主步骤:masterStep() ,指定分区名称与分区器,指定分区处理器

到这,本篇就结束了,欲知后事如何,请听下回分解~

转视频版

看文字不过瘾可以切换视频版:Spring Batch高效批处理框架实战

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/371869.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中国ETC行业市场规模及未来发展趋势

中国ETC行业市场规模及未来发展趋势编辑根据市场调研在线网发布的2023-2029年中国ETC行业发展策略分析及战略咨询研究报告分析&#xff1a;随着政府坚持实施绿色出行政策&#xff0c;ETC行业也受到了极大的支持。根据中国智能交通协会统计&#xff0c;2017年中国ETC行业市场规模…

浅析Linux内核进程间通信(信号量)

信号灯与其他进程间通信方式不大相同&#xff0c;它主要提供对进程间共享资源访问控制机制。相当于内存中的标志&#xff0c;进程可以根据它判定是否能够访问某些共享资源&#xff08;临界区&#xff0c;类似于互斥锁&#xff09;&#xff0c;同时&#xff0c;进程也可以修改该…

FreeRTOS任务基础知识

单任务和多任务系统单任务系统单任务系统的编程方式&#xff0c;即裸机的编程方式&#xff0c;这种编程方式的框架一般都是在main&#xff08;&#xff09;函数中使用一个大循环&#xff0c;在循环中顺序的执行相应的函数以处理相应的事务&#xff0c;这个大循环的部分可以视为…

Linux内核共享内存使用常见陷阱与分析

所谓共享内存就是使得多个进程可以访问同一块内存空间&#xff0c;是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制&#xff0c;如 信号量结合使用&#xff0c;来达到进程间的同步及互斥。其他进程能把同一段共享内存段“连接到”他们自己的…

【华为OD机试模拟题】用 C++ 实现 - 最小叶子节点(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 获得完美走位(2023.Q1) 文章目录 最近更新的博客使用说明最小叶子节点题目输入输出示例一输入输出示例二输入输出Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华…

oracle数据库使用JDBC导入ClickHouse数据

一、背景 需求要把oracle中的数据导入到clickhouse中&#xff0c;使用clickhouse的jdbc表引擎&#xff0c;把oracle11g的数据导入到clickhouse中。 二、方案 通过clickhouse-jdbc-bridge&#xff1a;是clickhouse提供的一个jdbc组件&#xff0c;用于通过JDBC的方式远程访问其他…

[面试直通版]网络协议面试核心之IP,TCP,UDP-TCP与UDP协议的区别

点击->计算机网络复习的文章集<-点击 目录 前言 UDP TCP 区别小总结 前言 TCP和UDP都是在传输层&#xff0c;在程序之间传输数据传输层OSI模型&#xff1a;第四层TCP/IP模型&#xff1a;第三层关键协议&#xff1a;TCP协议、UDP协议传输层属于主机间不同进程的通信传…

Unity Lighting -- 光照入门

识别光源 首先来看一张图&#xff0c;看看我们能在这个场景中找到几个光源。 相信大家能够很容易看出来&#xff0c;四盏路灯模型带有四个光源&#xff0c;右边的红绿蓝三个发光的灯也是光源。场景中还有一个光源&#xff0c;这个光源来自天空&#xff0c;让场景看起来有点日落…

尚医通(二十四)就医提醒和预约统计

目录一、就医提醒1、搭建定时任务模块二、后台管理系统-预约统计功能1、开发每天预约数据接口2、封装远程调用接口4、整合统计功能前端一、就医提醒 我们通过定时任务&#xff0c;每天8点执行&#xff0c;提醒就诊 1、搭建定时任务模块 &#xff08;1&#xff09;添加依赖 &l…

【MySQL】调控 字符集

一、 MySQL 启动选项 & 系统变量 启动选项 是在程序启动时我们程序员传递的一些参数&#xff0c;而 系统变量 是影响服务器程序运行行为的变量 1.1 启动项 MySQL 客户端设置项包括&#xff1a; 允许连入的客户端数量 、 客户端与服务器的通信方式 、 表的默认存储引擎 、…

zookeeper入门到精通

文章目录一、zookeeper入门1. 概述zookeeper的工作机制2.特点3.数据结构4.应用场景4.1.统一命名服务4.2.统一配置管理4.3.统一集群管理4.4.服务器节点动态上下线4.5.软负载均衡5.下载地址二、zookeeper安装1.本地模式安装2.配置参数解读三、zookeeper集群操作1.集群操作1.1 集群…

C++学习笔记-继承

继承的基本概念 类与类之间的关系 has-A&#xff0c;包含关系&#xff0c;用以描述一个类由多个“部件类”构成&#xff0c;实现has-A关系用类的成员属性表示&#xff0c;即一个类的成员属性是另一个已经定义好的类。 use-A&#xff0c;一个类使用另一个类&#xff0c;通过类…

前端面试题整理6-react

React 中 keys 的作用是什么&#xff1f; Keys是 React 用于追踪哪些列表中元素被修改、被添加或者被移除的辅助标识 在开发过程中&#xff0c;我们需要保证某个元素的 key 在其同级元素中具有唯一性。在 React Diff 算法中React 会借助元素的 Key 值来判断该元素是新近创建的还…

第五章 Opencv图像的几何变换

目录1.缩放图像1-1.resize()方法2.翻转图像2-1.flip()方法3.仿射变换图像3-1.warpAffine()方法3-2.平移3-3.旋转3-4.倾斜4.透视图像4-1.warpPerspective()方法几何变换是指改变图像的几何结构&#xff0c;例如大小、角度和形状等&#xff0c;从而使图像呈现出缩放、翻转、仿射和…

KUKA机器人外部自动运行模式的相关信号配置

KUKA机器人外部自动运行模式的相关信号配置 通过例如PLC这样的控制器来进行外部自动运行控制时,运行接口向机器人控制系统发出机器人进程的相关信号(例如运行许可、故障确认、程序启动等),机器人向上级控制系统发送有关运行状态和故障状态的信息。 必需的配置:  配置CEL…

Oracle-01-简介篇

&#x1f3c6;一、Oracle的历史和发展 Oracle公司成立于1977年&#xff0c;由拉里埃里森&#xff08;Larry Ellison&#xff09;、鲍勃明特&#xff08;Bob Miner&#xff09;和埃德奥茨&#xff08;Ed Oates&#xff09;共同创立。起初&#xff0c;公司的主要业务是开发和销售…

docker基础用法及镜像和容器的常用命令大全

1.docker 体系架构 Docker 采用了 C / S 架构&#xff0c;包括客户端和服务端。Docker 守护进程作为服务端接受来自客户端的请求&#xff0c;并处理这些请求&#xff08;创建、运行、分发容器&#xff09;。客户端和服务端既可以运行在一个机器上&#xff0c;也可通过 socket 或…

数字IC手撕代码--乐鑫科技(次小值与次小值出现的次数)

前言&#xff1a;本专栏旨在记录高频笔面试手撕代码题&#xff0c;以备数字前端秋招&#xff0c;本专栏所有文章提供原理分析、代码及波形&#xff0c;所有代码均经过本人验证。目录如下&#xff1a;1.数字IC手撕代码-分频器&#xff08;任意偶数分频&#xff09;2.数字IC手撕代…

九龙证券|阿里+鸿蒙+人工智能+元宇宙概念热度爆棚,“会说话的猫”亮了!

近一周组织调研个股数量有240多只&#xff0c;汤姆猫成为调研组织数量最多的股票。 证券时报数据宝统计&#xff0c;近一周组织调研公司数量有240多家。从调研组织类型来看&#xff0c;证券公司调研相对最广泛&#xff0c;调研230多家公司。 “会说话的猫”亮了 汤姆猫成为近…

倒计时3天:现实与虚拟交织,元宇宙警察将如何执法?

在元宇宙、Web3高速发展的时代&#xff0c;欧科云链以科技助警&#xff0c;帮助公安等机构实现对新型犯罪的监管与侦破。 ——摘要元宇宙作为应用场景和生活方式的未来&#xff0c;拥有着巨大的发展潜力。伴随5G网络、云计算、区块链等技术迅速发展&#xff0c;虚拟现实、人机交…