Spring Batch 指南

news2024/11/16 3:19:20

SpringBatch 介绍

目前,Spring Batch是批处理框架界为数不多的优秀框架(Java语言开发)。

Spring Batch 是一个轻量级的、完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。

Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用 Spring 框架的开发者或者企业更容易访问和利用企业服务;

Spring Batch 提供了大量可重用的组件,包括了日志、追踪、事务、任务作业统计、任务重启、跳过、重复、资源管理。

通过 Spring Batch 能够支持简单的、复杂的和大数据量的批处理作业。同时它也提供了优化和分片技术用于实现高性能的批处理任务。

一个典型的批处理应用程序大致如下:

  1. 从数据库,文件或队列中读取大量记录。
  2. 以某种方式处理数据。
  3. 以修改之后的形式写回数据。

在SpringBatch 中 Job是step的运行框架 ,而具体的运行业务是由step进行完成

Step

Step
下图就是Step的简要构造

在这里插入图片描述

一个Step通常涵盖三个部分:读数据(Reader)、处理数据(Processor)和写数据(Writer)。但是并不是所有的Step都需要自身来完成数据的处理,比如存储过程等方式是通过外部功能来完成,因此Spring Batch提供了2种Step的处理方式:
1)面向分片的ChunkStep,
2)面向过程的TaskletStep。

一般使用ChunkStep

ChunkStep

在Step中数据是按记录(按行)处理的,但是每条记录处理完毕之后马上提交事物反而会导致IO的巨大压力。因此Spring Batch提供了数据处理的分片功能。设置了分片之后,一次工作会从Read开始,然后交由给Processor处理。处理完毕后会进行聚合,待聚合到一定的数量的数据之后一次性调用Write将数据提交到物理数据库。
如果在聚合数据期间出现任何错误,所有的这些数据都将不执行写入。

面向分片的处理过程图

在这里插入图片描述

@Bean
public Step testStep(PlatformTransactionManager transactionManager) {
	return stepBuilderFactory.get("testStep")
				.transactionManager(transactionManager)
				.<String, String>chunk(int) 
				.reader(testReader()) 
				.processor(testProcessor)
				.writer(testWriter()) 
				.build();
}

transactionManager:使用默认的 PlatformTransactionManager 对事物进行管理。当配置好事物之后Spring Batch会自动对事物进行管理,无需开发人员显示操作。

提交间隔

Step使用PlatformTransactionManager管理事物。每次事物提交的间隔根据chunk方法中配置的数据执行。提交间隔设置太小,那么会浪费需要多不必要的资源,提交间隔设置的太长,会导致事物链太长占用空间,并且出现失败会导致大量数据回滚。一般设为10到20。

stepBuilderFactory.get("testStep")
	.<String, String>chunk(int)
	.reader(testReader()) 
	.processor(testProcessor)
	.writer(testWriter()) 
	.startLimit(1)
	.build();

step重启次数

某些Step可能用于处理一些先决的任务,所以当Job再次重启时这Step就没必要再执行,可以通过设置startLimit来限定某个Step重启的次数。当设置为1时候表示仅仅运行一次,而出现重启时将不再执行。

stepBuilderFactory.get("testStep")
	.<String, String>chunk(int)
	.reader(testReader()) 
	.processor(testProcessor)
	.writer(testWriter()) 
	.startLimit(1)
	.build();

step每次重启都运行

可以通过设置allow-start-if-complete为true告知框架每次重启该Step都要执行

stepBuilderFactory.get("testStep")
	 .<String, String>chunk(int)
	 .reader(testReader()) 
	 .processor(testProcessor)
	 .writer(testWriter()) 
	 .allowStartIfComplete(true)
	 .build();

step失败后跳过

stepBuilderFactory.get("testStep")
 	 .<String, String>chunk(int)
 	 .reader(testReader()) 
	 .processor(testProcessor)
 	 .writer(testWriter()) 
 	 .skipLimit(10)
	 .skip(Exception.class)
	 .noSkip(FileNotFoundException.class)
 	 .build();

skip-limit(skipLimit方法)配置的参数表示当跳过的次数超过数值时则会导致整个Step失败,从而停止继续运行
skip表示要当捕捉到Exception异常就跳过。但是Exception有很多继承类,此时可以使用noSkip方法指定某些异常不能跳过

step重试

stepBuilderFactory.get("testStep")
 	 .<String, String>chunk(int)
  	 .reader(testReader()) 
  	 .processor(testProcessor)
   	 .writer(testWriter()) 
	    .faultTolerant()
	    .retryLimit(3)
	    .retry(DeadlockLoserDataAccessException.class)
   	 .build();

retry(DeadlockLoserDataAccessException.class)表示只有捕捉到该异常才会重试,retryLimit(3)表示最多重试3次,faultTolerant()表示启用对应的容错功能。

step控制不回滚

stepBuilderFactory.get("testStep")
	    .<String, String>chunk(int)
   	 .reader(testReader()) 
   	 .processor(testProcessor)
        .writer(testWriter()) 
  	 .faultTolerant()
	   .noRollback(ValidationException.class) //不必回滚的异常
       .build();

noRollback属性为Step提供了不必进行事物回滚的异常配置

step数据重读

stepBuilderFactory.get("testStep")
 	  .<String, String>chunk(int)
    .reader(testReader()) 
    .processor(testProcessor)
    .writer(testWriter()) 
	  .readerIsTransactionalQueue() //数据重读
    .build();

默认情况下如果错误不是发生在Reader阶段,那么没必要再去重新读取一次数据。但是某些场景下需要Reader部分也需要重新执行,比如Reader是从一个JMS队列中消费消息,当发生回滚的时候消息也会在队列上重放,因此也要将Reader纳入到回滚的事物中,根据这个场景可以使用readerIsTransactionalQueue来配置数据重读

step事务属性

	//配置事物属性
	DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
	attribute.setPropagationBehavior(Propagation.REQUIRED.value());
	attribute.setIsolationLevel(Isolation.DEFAULT.value());
	attribute.setTimeout(30);

	return this.stepBuilderFactory.get("testStep")
			.<String, String>chunk(int)
  		.reader(testReader()) 
 			.processor(testProcessor)
    	.writer(testWriter()) 
			.transactionAttribute(attribute) //设置事物属性
			.build();

step顺序执行

在这里插入图片描述

jobBuilderFactory.get("job")
		.start(stepA())
		.next(stepB()) //顺序执行
		.next(stepC())
		.build();

step条件执行

在这里插入图片描述

jobBuilderFactory.get("job")
				.start(stepA()) //启动时执行的step
				.on("*").to(stepB()) //默认跳转到stepB
				.from(stepA()).on("FAILED").to(stepC()) //当返回的ExitStatus为"FAILED"时,执行。
				.end()
				.build();

End退出

默认情况下(没有使用end、fail方法结束),Job要顺序执行直到退出,这个退出称为end。这个时候,BatchStatus=COMPLETED、ExitStatus=COMPLETED,表示成功执行。除了Step链式处理自然退出,也可以显示调用end来退出系统。

jobBuilderFactory.get("job")
		.start(step1()) //启动
		.next(step2()) //顺序执行
		.on("FAILED").end()
		.from(step2()).on("*").to(step3()) //条件执行
		.end()
		.build();

step1到step2是顺序执行,当step2的exitStatus返回"FAILED"时则直接End退出。其他情况执行Step3。

Fail退出

除了end还可以使用fail退出,这个时候,BatchStatus=FAILED、ExitStatus=EARLY TERMINATION,表示执行失败。这个状态与End最大的区别是Job会尝试重启执行新的JobExecution。看下面代码的例子:

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
			.start(step1()) //执行step1
			.next(step2()).on("FAILED").fail() //step2的ExitStatus=FAILED 执行fail
			.from(step2()).on("*").to(step3()) //否则执行step3
			.end()
			.build();
}

@StepScope和@JobScope

@StepScope

Spring Batch框架只有在批处理时才需要实例化job以及对应的最底层处理单位(ItemReader,ItemProcessor,ItemWriter,Tasklet)且在job启动后的运行参数一旦确定便无法修改。

为了使每一次启动job时使处理单位的参数可以动态修改(比如第一次job启动时参数birthDate=“20210101”,第二次job参数启动时参数改为birthDate=“20210102”)。 所以设计了@StepScope配合@Value(“#{jobParameters[‘birthDate’]}”) 从job的启动参数中获取所需参数。

@StepScope注解使用注意事项:

只能用在最底层处理单位(ItemReader,ItemProcessor,ItemWriter,Tasklet)的方法上,配合@Bean使用。

被@StepScope注解修饰的bean只会在Step被加载时进行初始化,Step处理完成后便会被销毁(也就是说被@StepScope注解所修饰bean的生命周期与Step的生命周期保持同步)

@JobScope

Job Scope的概念和 Step Scope类似,都是用于标识在到了某个执行时间段再添加和注入Bean。

@JobScope用于告知框架知道JobInstance存在时候才初始化对应的@Bean

@JobScope
@Bean
// 初始化获取 jobParameters中的参数
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

@JobScope
@Bean
// 初始化获取jobExecutionContext中的参数
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

Listener监听器

Spring Batch提供了多种监听器Listener,用于在任务处理过程中触发我们的逻辑代码。具体可以参考下表:

监听器具体说明
JobExecutionListener在Job开始之前(beforeJob)和之后(afterJob)触发
StepExecutionListener在Step开始之前(beforeStep)和之后(afterStep)触发
ChunkListener在 Chunk 开始之前(beforeChunk),之后(afterChunk)和错误后(afterChunkError)触发
ItemReadListener在 Read 开始之前(beforeRead>,之后(afterRead)和错误后(onReadError)触发
ItemProcessListener在 Processor 开始之前(beforeProcess),之后(afterProcess)和错误后(onProcessError)触发
ItemWriteListener在 Writer 开始之前(beforeWrite),之后(afterWrite)和错误后(onWriteError)触发
SkipListener在 Skip(reder)时候,在 Skip(writer)时候,在 Skip(processor)时候

Listener 的执行顺序

  • JobExecutionListener.beforeJob()
  • StepExecutionListener.beforeStep()
  • ChunkListener.beforeChunk()
  • ItemReaderListener.beforeReader()
  • ItemReaderListener.afterReader()
  • ItemProcessListener.beforeProcess()
  • ItemProcessListener.afterProcess()
  • ItemWriterListener.beforeWriter()
  • ItemWriterListener.afterWriter()
  • ChunkListener.afterChunk()
  • StepExecutionListener.afterStep()
  • JobExecutionListener.afterJob()

需要注意的是如果在同一个级别定义多个 Listener,比如 Parent Step中定一个了一个 StepListener,自身又定义了一个 StepListener,如果需要两个都执行,那么需要在自身 listener 上加上merge 属性。

获取JobParameters参数

为了能在ItemReader, ItemWriter, ItemProcessor中读取JobParameters中的参数,有三个方法:

  1. 使用 @BeforeStep注解
@Component
@StepScope
public class PersonItemProcessor implements ItemProcessor<Person, Person> {
  
	JobParameters jobParameters;
  
	@BeforeStep
	public void beforeStep(final StepExecution stepExecution) {
	    jobParameters = stepExecution.getJobParameters();
	    log.info("jobParameters: {}", jobParameters);
	}
  1. 实现StepExecutionListener接口
@Component
@StepScope
public class MyStepExecutionListener implements StepExecutionListener {
  
  private StepExecution stepExecution;

  @Override
  public void beforeStep(StepExecution stepExecution) {
     this.stepExecution = stepExecution;
     jobParameters = stepExecution.getJobParameters();
  }
}
  1. 把相应的Bean定义成@StepScope,再用@Value直接注入
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;
  
	@Bean 
  @StepScope
	public FlatFileItemReader<Person> reader() {
		return new FlatFileItemReaderBuilder<Person>()
			.name("personItemReader")
			.resource(new ClassPathResource("sample-data.csv"))
			.delimited()
			.names(new String[]{"firstName", "lastName"})
			.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
				setTargetType(Person.class);
			}})
			.build();
	}

	@Bean
  @StepScope
	public PersonItemProcessor processor() {
		return new PersonItemProcessor();
	}

	@Bean 
  @StepScope
	public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
		return new JdbcBatchItemWriterBuilder<Person>()
			.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
			.sql("...")
			.dataSource(dataSource)
			.build();
	}

	@Bean("importUserJob") //这个不要定义StepScope
	public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
		return jobBuilderFactory.get("importUserJob")
			.incrementer(new RunIdIncrementer())
			.listener(listener)
			.flow(step1)
			.end()
			.build();
	}

	@Bean //这个不要定义StepScope
	public Step step1(JdbcBatchItemWriter<Person> writer) {
  
  }

重点注意的是,要把ItemReader, ItemWriter, ItemProcessor声明成StepScope,而Job和Step本身不能申明成StepScope
然后在ItemReader, ItemWriter, ItemProcessor中可以直接注入参数了:

public class PersonItemProcessor implements ItemProcessor<Person, Person> {
  
	@Value("#{jobParameters['dataUnitId']}")
	private Long dataUnitId;

}

参考知识:

Spring Batch 教程之配置Step

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/438915.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

.Net 6.0 部署Linux+Nginx +PM2教程

今天带大家将本地.Net6.0项目部署到Linux系统中,其中有用到Nginx反向代理和PM2进程管理工具,希望本偏文章能对你有所帮助,成为你成功路上的垫脚石! 背景: 在.Net 5.0横空出世之后,.Net已经支持夸平台了,身为一名合格的码农,你敢说你不会用Linux? 哈哈哈开个玩笑,因为工作最近接…

Idea启动运行报错:Error:java: 无效的源发行版: 13

最近在做Springboot项目时&#xff0c;常常出现上述错误&#xff0c;小编也不知道怎么回事&#xff0c;到网上找了这个方面的解决办法&#xff0c;但是却发现根本解决不了&#xff0c;最终通过小编多次尝试&#xff0c;终于发现&#xff0c;为什么会报这个错误。(应该是Java版本…

基于广泛数据增强的新型白质束ONE-SHOT分割

文章目录 One-Shot Segmentation of Novel White Matter Tracts via Extensive Data Augmentation摘要方法One-Shot分割的广泛数据增强 实验结果 One-Shot Segmentation of Novel White Matter Tracts via Extensive Data Augmentation 摘要 探索了新WM束的One-Shot分割问题由…

Kubernetes 如何保障容器可用性?一文介绍探针的使用

有时候&#xff0c;应用因为无限循环或死锁而停止响应&#xff0c;为确保应用在这种情况下可以重新启动&#xff0c;需要有一种机制检查应用程序的运行状况&#xff0c;而不是依赖应用程序内部的检测。 K8s 主要提供了三种探针来针对这种机制&#xff1a; 存活探针&#xff1a…

Swift之深入解析内存安全

一、内存安全 一般来说&#xff0c;Swift 会阻止代码中的不安全行为。例如&#xff0c;Swift 会保证变量在被使用前已经初始化&#xff0c;在释放某变量后其内存也会变得不可访问&#xff0c;以及检查数组索引是否存在越界错误。Swift 还通过要求修改内存中位置的代码具有对该…

通达信缠论顶底分型选股公式(一笔优化版)

在前文《缠论底分型选股公式&#xff0c;处理了包含关系》中介绍了缠论底分型&#xff0c;并编写了选股公式。底分型条件比较容易满足&#xff0c;因此产生的信号比较多。有热心网友提出&#xff0c;可以用顶底分型构成一笔过滤信号。 缠论一笔的构成条件&#xff1a;两个相邻…

砷化镓,锑化铟的能带结构

目录 回顾 正课 1.锑化铟的能带结构 2.砷化镓的能带结构 3.混合晶体的能带结构 回顾 从结合力上看由共价键和离子键混合 化合物半导体从结构上来说具有闪锌矿结构 三五族半导体的能带结构&#xff1a; 1.导带结构 2.价带结构 3.禁带宽度 正课 能带结构的共同特征&#…

docker容器与网络模式|磁盘使用|内存使用|清理

docker容器与网络模式|磁盘使用|内存使用|清理 一 docker 网络模式二 docker的网络模式分类三 网络模式详解3.1host模式&#xff1a;3.2 container模式container模式操作 3.3 none模式3.4 brideg模式3.5 自定义网络 四 容器资源控制4.1 docker容器cpu压力测试4.2 设置CPU资源占…

SpringCloud 微服务随机掉线排查过程

一、背景 我们的业务共使用 11 台&#xff08;阿里云&#xff09;服务器&#xff0c;使用 SpringcloudAlibaba 构建微服务集群, 共计 60 个微服务, 全部注册在同一个 Nacos 集群。 流量转发路径&#xff1a;nginx -> spring-gateway -> 业务微服务。 使用的版本如下&a…

IGA_PLSM3D的理解1

文章目录 前言一、IgaTop3D_FAST.m给的参数二、Material properties 材料特性对Geom_Mod3D的理解 三、IGA准备对Pre_IGA3D的理解 输出1-----CtrPts&#xff1a; 输出2-----Ele&#xff1a; 输出3-----GauPts&#xff1a; 对Boun_Cond3D的理解 输出1-----DBoudary&#xff1a; 输…

Java入坑之类的派生与继承

一、继承 1.1继承的概念 Java中的继承&#xff1a;子类就是享有父类的属性和方法&#xff0c;并且还存在一定的属性和方法的扩展。 Subclass&#xff0c;从另一个类派生出的类&#xff0c;称为子类(派生类&#xff0c;扩展类等) Superclass&#xff0c;派生子类的类&#xff…

PC主流同步软件评测

因为一个要把超过13G的众多零散文件同步到webdav网盘的需求&#xff0c;将市面上一些知名的同步软件试用测试了一番&#xff0c;最终发现只有Syncovery满足需求 先把参与测试的同步软件列一下&#xff1a; GoodSync、FreeFileSync、微软SyncToy、KLS Backup、SecondCopy、Sma…

coreboot seabios

seabios现在是很多虚拟机的默认启动bios&#xff0c;这跟它的短小精干有很大关系&#xff0c;也跟它提供比较完备的legacy支持有关。 按照以下步骤把seabios制作成coreboot的payload&#xff0c; 下载seabios代码&#xff0c; http://code.coreboot.org/p/seabios/downloads…

学生成绩管理系统【GUI/Swing+MySQL】(Java课设)

系统类型 Swing窗口类型Mysql数据库存储数据 使用范围 适合作为Java课设&#xff01;&#xff01;&#xff01; 部署环境 jdk1.8Mysql8.0Idea或eclipsejdbc 运行效果 本系统源码地址&#xff1a;https://download.csdn.net/download/qq_50954361/87700420 更多系统资源库…

【Cpython的GIL详细了解一下?】

简单解释 全局解释器锁&#xff08;Global Interpreter Lock&#xff0c;简称 GIL&#xff09;是 CPython 解释器内部的一个同步原语&#xff0c;它用于在同一时间只允许一个线程执行 Python 字节码。 GIL 的工作原理如下&#xff1a; 在 CPython 中&#xff0c;每个线程都需…

systemctl 命令设置开机自启动失败

1.案例现象 我在 3 月 31日的时候发表了一篇《shell 脚本之一键部署安装 Nginx 》&#xff0c;介绍了如何通过 shell 脚本一键安装 Nginx 我脚本中执行了 Nginx 开机自启动的命令&#xff0c;当我使用 systemctl status nginx 命令复核的时候&#xff0c;我发现 Nginx 服务设…

C# NetCore XML 反序列化解析错误:<xml xmlns=‘‘> was not expected 及 Encoding=utf-16问题

xml帮助类在最后 刚好有业务需要解析xml文件&#xff0c;于是找到帮助类代码开始尝试解析文件&#xff0c;总是出现异常&#xff1a;<xml xmlnsxxxxxxxxxxxxxxxx> was not expected&#xff0c;开始寻找解决方案&#xff1a; 要使结果正确&#xff0c;必须满足两个条件…

【Qt】QString与QChar的源码学习及二者与Unicode的关系【2023.04.20】

简介 关于QString乱码的一些补充。主要就两点&#xff0c;QChar、QString底层存储的字符都是16进制的Unicode编码。 QChar QChar对应16位的Unicode字符集。 The QChar class provides a 16-bit Unicode character. In Qt, Unicode characters are 16-bit entities without an…

Redis RDB 和 AOF原理讲解

redis提供了两种持久化方式。 aof&#xff08;APPEND ON FILE&#xff09;持久化&#xff1a;原理是将redis的操作以命令的方式写入aof文件中&#xff0c;追加。 rdb&#xff08;Redis DataBase&#xff09;内存快照持久化&#xff0c;就是将redis的内存中的数据全量拷贝一份存…

【C++STL精讲】stack与queue的基本使用及模拟实现

文章目录 &#x1f490;专栏导读&#x1f490;文章导读&#x1f337;stack是什么&#xff1f;&#x1f337;stack的基本使用&#x1f337;stack的模拟实现&#x1f337;queue是什么&#xff1f;&#x1f337;queue的基本使用&#x1f337;queue的模拟实现 &#x1f490;专栏导读…