SpringBatch从入门到实战(三):父子Job和多步骤控制

news2024/11/25 2:59:17

一:Job嵌套

Job之前也可以嵌套,比如一个父Job封装多个已经存在的子Job。

@Configuration
public class ChildrenJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    @Bean
    public Job childJob1() {
        return jobBuilderFactory.get("childJob1")
                .start(childJob1Step())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    public Step childJob1Step() {
        return stepBuilderFactory.get("childJob1Step")
                .tasklet(childJob1StepTasklet())
                .build();
    }

    @Bean
    public Tasklet childJob1StepTasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println(Thread.currentThread().getName() + " childJob1StepTasklet");
                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    public Job childJob2() {
        return jobBuilderFactory.get("childJob2")
                .start(childJob2Step2())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    public Step childJob2Step2() {
        return stepBuilderFactory.get("childJob2Step2")
                .tasklet(childJob2Step2Tasklet2())
                .build();
    }

    @Bean
    public Tasklet childJob2Step2Tasklet2() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println(Thread.currentThread().getName() + " childJob2Step2Tasklet2");
                return RepeatStatus.FINISHED;
            }
        };
    }
}
@Configuration
public class ParentJobConfig {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;


    @Autowired
    private Job childJob1;

    @Autowired
    private Job childJob2;


    @Bean
    public Job parentJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        return jobBuilderFactory.get("parentJob")
                .start(parent1Step(jobRepository, transactionManager))
                .next(parent2Step(jobRepository, transactionManager))
                .incrementer(new RunIdIncrementer())
                .build();

    }

    @Bean
    public Step parent1Step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        // Job转Step: 将子Job封装成父Step
        return new JobStepBuilder(new StepBuilder("parent1Step"))
                .job(childJob1)
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(transactionManager)
                .build();
    }

    @Bean
    public Step parent2Step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
        // Job转Step: 将子Job封装成父Step
        return new JobStepBuilder(new StepBuilder("parent2Step"))
                .job(childJob2)
                .launcher(jobLauncher)
                .repository(jobRepository)
                .transactionManager(transactionManager)
                .build();
    }
}

在这里插入图片描述

二:if else案例

案例:如果开始步骤成功了就执行成功步骤,否则执行失败步骤。

// 伪代码
String exitStatus = helloWorldJob();
if("FAILED".equals(exitStatus)){
    failStep();
}else{
    successStep();
}
@Configuration
public class HelloWorldJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    @Bean
    public Job helloWorldJob() {
        // 执行helloWorldStep,如果成功执行successStep,否则执行failStep(helloWorldStep抛异常时算失败)
        // on()表示条判断
        return jobBuilderFactory.get("helloWorldJob")
                .start(helloWorldStep()).on("FAILED").to(failStep())
                .from(helloWorldStep()).on("*").to(successStep())
                .end()
                .incrementer(new RunIdIncrementer())
                .build();
    }


    @Bean
    public Step helloWorldStep() {
        return stepBuilderFactory.get("helloWorldStep")
                .tasklet(helloWorldTasklet())
                .build();
    }

    @Bean
    public Step successStep() {
        return stepBuilderFactory.get("successStep")
                .tasklet(successTasklet())
                .build();
    }

    @Bean
    public Step failStep() {
        return stepBuilderFactory.get("failStep")
                .tasklet(failTasklet())
                .build();
    }

    @Bean
    public Tasklet helloWorldTasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("helloWorldTasklet");
                int a = 1/0;
                return RepeatStatus.FINISHED;
            }
        };
    }


    @Bean
    public Tasklet successTasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("successTasklet");
                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    public Tasklet failTasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("failTasklet");
                return RepeatStatus.FINISHED;
            }
        };
    }
}
  • start(Step) 后面跟on(), on表示start()返回的退出状态ExitStatus,如果equals就执行to(Step)的逻辑。
  • 后面都是从from(Step)开始,参数表示用那个步骤的返回值与后面的on(String)做比较,eqauls为true执行to(Step)逻辑。* 表示通配符,表示else逻辑。

三:自定义on()值

  • start(Step) 后面跟next(jobExecutionDecider())。
  • from都是取作业执行决策器jobExecutionDecider返回的值与on做比较。
@Bean
public Job helloWorldJob() {
    return jobBuilderFactory.get("helloWorldJob")
            .start(helloWorldStep())
            .next(jobExecutionDecider())
            .from(jobExecutionDecider()).on("OK").to(successStep())
            .from(jobExecutionDecider()).on("*").to(failStep())
            .end()
            .incrementer(new RunIdIncrementer())
            .build();
}


@Bean
public StepExecutionListener stepExecutionListener() {
    return new HelloWorldStepListener();
}


@Bean
public Tasklet helloWorldTasklet() {
    return new Tasklet() {
        @Override
        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
            System.out.println("helloWorldTasklet");
            // 不支持put chunkContext.getStepContext().getJobExecutionContext().put("code", "500");

			// 500 -> ERROR -> failStep
            ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
            executionContext.put("code", "500");
            return RepeatStatus.FINISHED;
        }
    };
}
public class MyJobExecutionDecider implements JobExecutionDecider {
    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        String code = jobExecution.getExecutionContext().getString("code");
        if ("200".equals(code)) {
            return new FlowExecutionStatus("OK");
        } else if ("404".equals(code)) {
            return new FlowExecutionStatus("NotFound");
        }
        return new FlowExecutionStatus("ERROR");
    }
}

四:退出状态

4.1 退出状态

public class ExitStatus implements Serializable, Comparable<ExitStatus> {

    //未知状态
	public static final ExitStatus UNKNOWN = new ExitStatus("UNKNOWN");

    //执行中
	public static final ExitStatus EXECUTING = new ExitStatus("EXECUTING");

    //执行完成
	public static final ExitStatus COMPLETED = new ExitStatus("COMPLETED");

    //无效执行
	public static final ExitStatus NOOP = new ExitStatus("NOOP");

    //执行失败
	public static final ExitStatus FAILED = new ExitStatus("FAILED");

    //编程方式主动停止
	public static final ExitStatus STOPPED = new ExitStatus("STOPPED");
}    

一般来说,作业启动之后,这些状态皆为流程自行控制。

  • 顺利结束返回:COMPLETED
  • 异常结束返回:FAILED
  • 无效执行返回:NOOP
  • 编程结束:STOPED

但是也可以通过api强制改变状态:

  • end():作业流程直接成功结束,返回状态为:COMPLETED
  • fail():作业流程直接失败结束,返回状态为:FAILED
  • stopAndRestart(step) :作业流程中断结束,返回状态:STOPPED 再次启动时,从step位置开始执行 (注意:前提是参数与Job Name一样)
//如果helloWorldStep 执行成功:下一步执行successStep 否则是failStep
@Bean
public  Job job(){
    return jobBuilderFactory.get("helloWorldJob")
            .start(helloWorldStep())
            //表示将当前本应该是失败结束的步骤直接转成正常结束--COMPLETED
            //.on("FAILED").end()  
            //表示将当前本应该是失败结束的步骤直接转成失败结束:FAILED
            //.on("FAILED").fail()  
            //表示将当前本应该是失败结束的步骤直接转成停止结束:STOPPED   里面参数表示后续要重启时, 从successStep位置开始
            .on("FAILED").stopAndRestart(successStep())
            .from(firstStep()).on("*").to(successStep())
            .end()
            .incrementer(new RunIdIncrementer())
            .build();
}

4.2 停止作业

方式一: ExitStatus.STOPPED

@Configuration
public class HelloWorldStopJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;


    @Bean
    public Job helloWorldStopJob() {
        return jobBuilderFactory.get("helloWorldStopJob")
                .start(step1()).on("STOPPED").stopAndRestart(step1())
                .from(step1()).on("*").to(step2())
                .end()
                .incrementer(new RunIdIncrementer())
                .build();
    }


    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet(tasklet1())
                .listener(stepExecutionListener())
                // 允许执行完后,运行重启
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public StepExecutionListener stepExecutionListener() {
        return new StopStepListener();
    }


    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .tasklet(tasklet2())
                .build();
    }


    int i = -1;
    @Bean
    public Tasklet tasklet1() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("tasklet1");
                if (i < 0) {
                    ExecutionContext executionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();
                    executionContext.put("flag", "stop");
                }
                return RepeatStatus.FINISHED;
            }
        };
    }


    @Bean
    public Tasklet tasklet2() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
                System.out.println("tasklet2");
                return RepeatStatus.FINISHED;
            }
        };
    }
}


public class StopStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("beforeStep");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        Object flag = stepExecution.getExecutionContext().get("flag");
        if ("stop".equals(flag)) {
            return ExitStatus.STOPPED;
        }
        return stepExecution.getExitStatus();
    }
}

在这里插入图片描述

在这里插入图片描述
Step1正常的RepeatStatus.FINISHED了所以status=COMPLETED,但是实际退出编码是STOPPED。
在这里插入图片描述

方式二:setTerminateOnly()

StepExecution#setTerminateOnly() 给运行中的stepExecution设置停止标记,Spring Batch 识别后直接停止步骤,进而停止流程。 这种方式更简单。

@Bean
public Job helloWorldStopJob() {
    return jobBuilderFactory.get("helloWorldStopJob")
            .start(step1())
            .next(step2())
            .incrementer(new RunIdIncrementer())
            .build();
}


@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .tasklet(tasklet1())
            // 允许执行完后,运行重启
            .allowStartIfComplete(true)
            .build();
}


int i = -1;
@Bean
public Tasklet tasklet1() {
    return new Tasklet() {
        @Override
        public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
            System.out.println("tasklet1");
            if (i < 0) {
                // 终止后
                chunkContext.getStepContext().getStepExecution().setTerminateOnly();
            }
            return RepeatStatus.FINISHED;
        }
    };
}

在这里插入图片描述

在这里插入图片描述

== 注意:使用监听器返回ExitStatus.STOPPED和使StepExecution().setTerminateOnly() 在BATCH_STEP_EXECUTION中的status字段值不一样,前者是COMPLETED后者是STOPPED。==

在这里插入图片描述

五:作业重启

一般情况下当某个作业发生异常或者终止状态时可以重启作业。

  • preventRestart() :设置作业禁止重启,重启报错:JobInstance already exists and is not restartable。
@Bean
public Job helloWorldStopJob() {
    return jobBuilderFactory.get("helloWorldStopJob")
            .preventRestart()
            .start(step1())
            .next(step2())
            .incrementer(new RunIdIncrementer())
            .build();
}
  • 设置步骤最多重启次数:startLimit(num),有些步骤可能是因为网络等问题可以通过重试解决,但如果重启一定次数后还没有解决就不允许无限的去重试。超过最大次数报错:Maximum start limit exceeded for step: step1 StartMax:3
@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .startLimit(3)
            .tasklet(tasklet1())
            .build();
}
  • 设置步骤运行重启 allowStartIfComplete(true),这样在执行步骤时就不是NOOP了。
@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
            .allowStartIfComplete(true)
            .tasklet(tasklet1())
            .build();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/644395.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基础知识学习---牛客网C++面试宝典(八)操作系统--第三节

1、本栏用来记录社招找工作过程中的内容&#xff0c;包括基础知识学习以及面试问题的记录等&#xff0c;以便于后续个人回顾学习&#xff1b; 暂时只有2023年3月份&#xff0c;第一次社招找工作的过程&#xff1b; 2、个人经历&#xff1a; 研究生期间课题是SLAM在无人机上的应…

Golang每日一练(leetDay0096) 添加运算符、移动零

目录 282. 给表达式添加运算符 Expression Add Operators &#x1f31f;&#x1f31f;&#x1f31f; 283. 移动零 Move Zeroes &#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日一练 专栏 Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 …

Cenos7 --- Redis下载和安装(Linux版本)

1.下载和安装 Download | Redis进入官网Download | Redis&#xff0c; 上边点击下载7.0.11,右键复制下载衔接 https://download.redis.io/releases/redis-7.0.2.tar.gz 1.weget获取 我这个安装包放在 /tools/installbags下 cd /tools/installbags wget https://download.red…

Java进阶 —— Java多线程编程笔记

❤ 作者主页&#xff1a;欢迎来到我的技术博客&#x1f60e; ❀ 个人介绍&#xff1a;大家好&#xff0c;本人热衷于Java后端开发&#xff0c;欢迎来交流学习哦&#xff01;(&#xffe3;▽&#xffe3;)~* &#x1f34a; 如果文章对您有帮助&#xff0c;记得关注、点赞、收藏、…

【头歌-Python】9.3 中英文词云绘制(project) 第1~3关

第1关&#xff1a;词云练习1 任务描述 本关任务&#xff1a;编写一个能制作词云的小程序。 相关知识 词云 词云&#xff0c;也叫文字云&#xff0c;是一种应用广泛的数据可视化方法。是过滤掉文本中大量的低频信息&#xff0c;形成“关键词云层”或“关键词渲染”&#xf…

基于VMWare组件安装Centos7.9

1.前提条件 使用VMware进行安装&#xff0c;VMware可以自行下载&#xff0c;需要介质(VMware和CentOS7.9)的同仁&#xff0c;请留言&#xff0c;我给你下载链接。 2.CentOS7.9安装 1.打开VMware&#xff0c;点击“新建虚拟机(N)...” 2.选择“典型” &#xff0c;点击“下一步…

基础知识学习---牛客网C++面试宝典(六)操作系统--第一节

1、本栏用来记录社招找工作过程中的内容&#xff0c;包括基础知识学习以及面试问题的记录等&#xff0c;以便于后续个人回顾学习&#xff1b; 暂时只有2023年3月份&#xff0c;第一次社招找工作的过程&#xff1b; 2、个人经历&#xff1a; 研究生期间课题是SLAM在无人机上的应…

A100 GPU服务器安装GPU驱动教程

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

【OpenCV DNN】Flask 视频监控目标检测教程 06

欢迎关注『OpenCV DNN Youcans』系列&#xff0c;持续更新中 【OpenCV DNN】Flask 视频监控目标检测教程 06 3.6 OpenCVFlask实时监控和视频播放cvFlask06 项目的文件树cvFlask06 项目的程序文件cvFlask06 项目的网页模板cvFlask06 项目的运行 本系列从零开始&#xff0c;详细…

chatgpt赋能python:Python排序算法实现及其应用

Python排序算法实现及其应用 排序是计算机科学中最基础也是最常用的算法之一。在数据分析、数据挖掘和机器学习等领域&#xff0c;排序算法有着广泛的应用。Python作为一种流行的编程语言&#xff0c;在排序方面具有一定的优势。本文将介绍一些常见的Python排序算法实现以及应…

有趣的图(三)(57)

小朋友们好&#xff0c;大朋友们好&#xff01; 我是猫妹&#xff0c;一名爱上Python编程的小学生。 和猫妹学Python&#xff0c;一起趣味学编程。 今日主题 咱们之前分别学习了图的基本概念&#xff0c;和图的深度优先遍历算法dfs。 你学会了吗&#xff1f; 咱们今天要学…

Linux系统的tty架构及UART驱动详解

​一、模块硬件学习 1.1. Uart介绍 通用异步收发传输器&#xff08;Universal Asynchronous Receiver/Transmitter)&#xff0c;通常称为UART&#xff0c;是一种异步收发传输器&#xff0c;是电脑硬件的一部分。它将要传输的资料在串行通信与并行通信之间加以转换。 作为把并…

面试问题总结----C/C++部分

1、本栏用来记录社招找工作过程中的内容,包括基础知识学习以及面试问题的记录等,以便于后续个人回顾学习; 暂时只有2023年3月份,第一次社招找工作的过程; 2、个人经历: 研究生期间课题是SLAM在无人机上的应用,有接触SLAM、Linux、ROS、C/C++、DJI OSDK等; 3、参加工作后…

C++程序流程结构

目录 程序流程结构 一、选择结构 1.1 If语句 1.2 三目运算符 1.3 switch语句 二、循环结构 2.1 while 循环语句 2.2 do…while循环 2.3 for循环 2.4 嵌套循环 三、跳转语句 3.1 break语句 3.2 continue 语句 3.3 goto语句 程序流程结构 C/C支持最基本的三种程…

20230623在WIN10安装PROTEL DXP2004(STEP-BY-STEP)

20230623在WIN10安装PROTEL DXP2004&#xff08;STEP-BY-STEP&#xff09; https://xiazai.zol.com.cn/detail/43/428470.shtml Protel DXP 2004 https://www.onlinedown.net/soft/580490.htm Protel DXP 2004 DXP2004 安装步骤 Failed To load Parallel Port Driver Welcom…

vue或react中修改组件样式的方法

vue或react中修改组件样式的方法 从组件库中引入的组件深度选择器&#xff1a;deep和&#xff1a;global深度选择器在scss中的使用关键点 常规的组件样式修改vue中的样式修改react中的样式修改 从组件库中引入的组件 深度选择器&#xff1a;deep和&#xff1a;global 在 Vue …

Python 算法交易实验63 关于回测

说明 项目结束了&#xff0c;这几天把量化第一版搭起来&#xff0c;量化很重要&#xff0c;现在可以迈出第一步了。首先要关注的是回测&#xff0c;和前不久写的这篇文章呼应&#xff0c;测试的确是一个相对独立&#xff0c;又非常重要的部件。过去比较少关注在方面上&#xf…

数据分析案例-航空公司满意度数据可视化

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

【瑞萨RA_FSP】ADC——电压采集

文章目录 一、ADC简介二、ADC的结构框图1. 电压输入范围2. 工作模式3. 转换过程顺序4. 触发源5. ADC转换时间6. 数据寄存器7. 电压转换 一、ADC简介 ADC即模拟数字转换器&#xff0c;ADC英文全称&#xff08;Analog-to-digital converter&#xff09;&#xff0c; 是一种用于将…

Delphi XE10 dxLayoutControl 控件应用指南

Delphi XE10 dxLayoutControl 控件应用指南 DevExpress VCL套件是一套非常强大的界面控件&#xff0c;可惜关于Delphi开发方面的说明太少&#xff0c;有些控件使用起来一头雾水&#xff0c;不知从何下手。本节详细介绍在Delphi Xe10 Seattle中如何利用dxLayoutControl 控件来做…