Spring batch 系列文章
Spring Batch教程(一) 简单的介绍以及通过springbatch将xml文件转成txt文件
Spring Batch教程(二)示例:将txt文件转成xml文件以及读取xml文件内容存储到数据库mysql
Spring Batch教程(三)示例:从mysql中读取数据写入文本和从多个文本中读取内容写入mysql
Spring Batch教程(四)tasklet使用示例:spring batch的定时任务使用
Spring Batch教程(五)spring boot实现batch功能注解示例:读写文本文件
Spring Batch教程(六)spring boot实现batch功能注解示例:读文件写入mysql
文章目录
- Spring batch 系列文章
- 一、示例1:spring batch的定时任务使用
- 1、maven依赖
- 2、准备测试数据
- 3、PersonInfo bean
- 4、建立FieldSetMapper
- 5、创建ItemProcessor实现类
- 6、创建spring batch job
- 7、创建tasklet用于归档
- 8、创建QuartzJobBean
- 9、创建ApplicationContextAware接口实现类
- 10、创建FactoryBean的触发器
- 11、进行job的配置
- 1)、job配置
- 2)、quartz配置
- 12、创建一个运行job的main类
- 13、验证
- 1)、验证步骤
- 1)、控制台输出
- 2)、程序结果输出
本文介绍了1个示例,即spring batch的一个定时任务示例,其中通过tasklet监控任务的运行情况
本文使用的是jdk8版本,最新版本的spring core和springb batch用不了。
一、示例1:spring batch的定时任务使用
1、maven依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<springframework.version>5.2.25.RELEASE</springframework.version>
<joda-time.version>2.12.5</joda-time.version>
<quartz.version>2.2.1</quartz.version>
<commons-io.version>2.4</commons-io.version>
<springbatch.version>4.2.8.RELEASE</springbatch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${springbatch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
<version>${springbatch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${springframework.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${springframework.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda-time.version}</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
</dependencies>
2、准备测试数据
3、PersonInfo bean
import lombok.Data;
/**
*
* @author alanchan
*
*/
@Data
public class PersonInfo {
private int id;
private String name;
private String birthday;
private double salary;
}
4、建立FieldSetMapper
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
import com.win.tasklet.bean.PersonInfo;
/**
*
* @author alanchan
*
*/
public class PersonInfoFieldSetMapper implements FieldSetMapper<PersonInfo> {
public PersonInfo mapFieldSet(FieldSet fieldSet) throws BindException {
PersonInfo personInfo = new PersonInfo();
personInfo.setName(fieldSet.readString(0));
personInfo.setBirthday(fieldSet.readString(1));
personInfo.setSalary(fieldSet.readDouble(2));
return personInfo;
}
}
5、创建ItemProcessor实现类
import org.springframework.batch.item.ItemProcessor;
import com.win.tasklet.bean.PersonInfo;
/**
*
* @author alanchan
*
*/
public class PersonInfoItemProcessor implements ItemProcessor<PersonInfo, PersonInfo> {
public PersonInfo process(PersonInfo personInfo) throws Exception {
System.out.println("Processing result :" + personInfo);
if (personInfo.getSalary() < 60) {
PersonInfo tempPersonInfo = new PersonInfo();
tempPersonInfo.setName(personInfo.getName());
tempPersonInfo.setBirthday(personInfo.getBirthday());
tempPersonInfo.setSalary(personInfo.getSalary() * 1.5);
personInfo = tempPersonInfo;
}
return personInfo;
}
}
6、创建spring batch job
import java.io.File;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.JobLocator;
import org.springframework.batch.core.launch.JobLauncher;
/**
*
* @author alanchan
*
*/
public class SpringBatchJob {
private String jobName;
private JobLocator jobLocator;
private JobLauncher jobLauncher;
private File contentDirectory;
private String directoryPath = "D:/testtasklet/inputFiles";
public void init() {
contentDirectory = new File(directoryPath);
}
boolean fileFound = false;
public void performJob() {
System.out.println("开始任务 Job");
try {
if (contentDirectory == null || !contentDirectory.isDirectory()) {
System.err.println("输入目录不存在,任务终止");
}
fileFound = false;
for (File file : contentDirectory.listFiles()) {
if (file.isFile()) {
System.out.println("File found :" + file.getAbsolutePath());
fileFound = true;
JobParameter param = new JobParameter(file.getAbsolutePath());
Map<String, JobParameter> map = new HashMap<String, JobParameter>();
map.put("personInfoInputFile", param);
map.put("date", new JobParameter(new Date()));
JobExecution result = jobLauncher.run(jobLocator.getJob(jobName), new JobParameters(map));
System.out.println(" Job 完成 : " + result.toString());
}
}
if (!fileFound) {
System.out.println("输入目录不存在,任务终止");
}
} catch (JobExecutionException ex) {
System.out.println("任务出现致命异常 :" + ex);
}
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public void setJobLocator(JobLocator jobLocator) {
this.jobLocator = jobLocator;
}
public void setJobLauncher(JobLauncher jobLauncher) {
this.jobLauncher = jobLauncher;
}
}
7、创建tasklet用于归档
功能是一旦处理完成则进行归档
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
/**
*
* @author alanchan
*
*/
public class FileArchivingTasklet implements Tasklet {
private File archiveDirectory;
private String archiveDirectoryPath = "d:/testtasklet/archivedFiles";
public void init() {
archiveDirectory = new File(archiveDirectoryPath);
}
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Map<String, Object> map = chunkContext.getStepContext().getJobParameters();
String fileName = (String) map.get("personInfoInputFile");
archiveFile(fileName);
return RepeatStatus.FINISHED;
}
public void archiveFile(String fileName) throws IOException {
System.out.println("Archiving file: " + fileName);
File file = new File(fileName);
File targetFile = new File(archiveDirectory, file.getName() + getSuffix());
FileUtils.moveFile(file, targetFile);
}
public String getSuffix() {
return "_" + new SimpleDateFormat("yyyyMMddHHmmss").format(new DateTime(DateTimeZone.UTC).toDate());
}
}
8、创建QuartzJobBean
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;
import com.win.tasklet.SpringBatchJob;
/**
*
* @author alanchan
*
*/
@DisallowConcurrentExecution
public class SchedulerJob extends QuartzJobBean {
private String batchJob;
public void setBatchJob(String batchJob) {
this.batchJob = batchJob;
}
@Override
protected void executeInternal(JobExecutionContext context) {
ApplicationContext applicationContext = ApplicationContextUtil.getApplicationContext();
SpringBatchJob job = applicationContext.getBean(batchJob, SpringBatchJob.class);
System.out.println("Quartz job started: " + job);
try {
job.performJob();
} catch (Exception exception) {
System.out.println("Job " + batchJob + " 不能执行 : " + exception.getMessage());
}
System.out.println("Quartz job 终止");
}
}
9、创建ApplicationContextAware接口实现类
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
/**
*
* @author alanchan
*
*/
public class ApplicationContextUtil implements ApplicationContextAware {
private static ApplicationContextUtil instance;
private ApplicationContext applicationContext;
private static synchronized ApplicationContextUtil getInstance() {
if (instance == null) {
instance = new ApplicationContextUtil();
}
return instance;
}
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (getInstance().applicationContext == null) {
getInstance().applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return getInstance().applicationContext;
}
}
10、创建FactoryBean的触发器
import org.quartz.CronScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.springframework.beans.factory.FactoryBean;
/**
*
* @author alanchan
*
*/
public class CronTriggerFactoryBean implements FactoryBean<Trigger> {
private final String jobName;
private final String cronExpression;
public CronTriggerFactoryBean(String jobName, String cronExpression) {
this.jobName = jobName;
this.cronExpression = cronExpression;
}
public Trigger getObject() throws Exception {
return TriggerBuilder.newTrigger().forJob(jobName, "DEFAULT").withIdentity(jobName + "Trigger", "DEFAULT").withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
}
public Class<?> getObjectType() {
return Trigger.class;
}
public boolean isSingleton() {
return false;
}
}
11、进行job的配置
1)、job配置
文件位置:/sping-batch/src/main/resources/batch-context.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"
default-autowire="byName" default-init-method="init">
<!-- JobRepository and JobLauncher are configuration/setup classes -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry"/>
<!-- A BeanPostProcessor that registers Job beans with a JobRegistry. -->
<bean class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
<!-- Thanks to this bean, you can now refer dynamic files in input folder whose names can be different on each run-->
<bean id="inputPersonInfoJobFile" class="org.springframework.core.io.FileSystemResource" scope="step">
<constructor-arg value="#{jobParameters[personInfoInputFile]}"/>
</bean>
<!-- ItemReader reads a complete line one by one from input file -->
<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" ref="inputPersonInfoJobFile" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="fieldSetMapper">
<!-- Mapper which maps each individual items in a record to properties in POJO -->
<bean class="com.win.tasklet.PersonInfoFieldSetMapper" />
</property>
<property name="lineTokenizer">
<!-- A tokenizer class to be used when items in input record are separated by specific characters -->
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value="|" />
</bean>
</property>
</bean>
</property>
</bean>
<!-- ItemWriter writes a line into output flat file -->
<bean id="flatFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="file:d:/testtasklet/personInfoOutput.txt" />
<property name="appendAllowed" value="true" />
<property name="lineAggregator">
<!-- An Aggregator which converts an object into delimited list of strings -->
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="|" />
<property name="fieldExtractor">
<!-- Extractor which returns the value of beans property through reflection -->
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="name,salary,birthday" />
</bean>
</property>
</bean>
</property>
</bean>
<!-- Optional ItemProcessor to perform business logic/filtering on the input records -->
<bean id="itemProcessor" class="com.win.tasklet.PersonInfoItemProcessor" />
<!-- Step will need a transaction manager -->
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="fileArchivingTasklet" class="com.win.tasklet.FileArchivingTasklet" />
<!-- Actual Job -->
<batch:job id="personInfoBatchJob" restartable="true">
<batch:step id="processFiles" next="archiveFiles">
<batch:tasklet allow-start-if-complete="false" start-limit="1" transaction-manager="transactionManager">
<batch:chunk reader="flatFileItemReader" writer="flatFileItemWriter" processor="itemProcessor" commit-interval="10" />
</batch:tasklet>
</batch:step>
<batch:step id="archiveFiles">
<batch:tasklet ref="fileArchivingTasklet" />
</batch:step>
</batch:job>
</beans>
2)、quartz配置
任务每分钟运行一次
文件位置;/sping-batch/src/main/resources/quartz-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"
default-autowire="byName" default-init-method="init">
<import resource="batch-context.xml" />
<bean id="applicationContextUtil" class="com.win.tasklet.quartz.ApplicationContextUtil" />
<bean id="springBatchJob" class="com.win.tasklet.SpringBatchJob">
<property name="jobName" value="personInfoBatchJob" />
<property name="jobLocator" ref="jobRegistry" />
<property name="jobLauncher" ref="jobLauncher" />
</bean>
<bean name="taskJobDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="jobClass" value="com.win.tasklet.quartz.SchedulerJob" />
<property name="jobDataMap">
<map>
<entry key="batchJob" value="springBatchJob" />
</map>
</property>
<property name="durability" value="true" />
</bean>
<!-- Run the job every 1 minute -->
<bean id="taskCronTrigger" class="com.win.tasklet.quartz.CronTriggerFactoryBean">
<constructor-arg index="0" value="taskJobDetail" />
<constructor-arg index="1" value="0 0/1 * * * ?" />
</bean>
<bean id="quartzSchedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="jobDetails">
<list>
<ref bean="taskJobDetail" />
</list>
</property>
<property name="triggers">
<list>
<ref bean="taskCronTrigger" />
</list>
</property>
<property name="quartzProperties">
<props>
<prop key="org.quartz.jobStore.class">org.quartz.simpl.RAMJobStore</prop>
</props>
</property>
</bean>
</beans>
12、创建一个运行job的main类
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
*
* @author alanchan
*
*/
public class App {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("quartz-context.xml");
}
}
13、验证
1)、验证步骤
- 创建源数据目录
D:/testtasklet/inputFiles - 创建归档文件目录
D:/testtasklet/archivedFiles - 启动应用程序
- 将测试文件放入源数据目录
- 观察控制台输出和程序结果输出
- 观察定时任务是否是一分钟运行一次
1)、控制台输出
- 放一个文件后,控制台输出如下
Quartz job started: com.win.tasklet.SpringBatchJob@394e0afd
开始任务 Job
文件目录 :D:\testtasklet\inputFiles\personinfo-2.txt
jobName : personInfoBatchJob
Processing result :PersonInfo(id=0, name=zhangsan, birthday=1998-03-01, salary=92.0)
Processing result :PersonInfo(id=0, name=lisi, birthday=1995-08-01, salary=60.0)
Archiving file: D:\testtasklet\inputFiles\personinfo-2.txt
Job 完成 : JobExecution: id=0, version=2, startTime=Fri Jul 21 14:55:00 CST 2023, endTime=Fri Jul 21 14:55:00 CST 2023, lastUpdated=Fri Jul 21 14:55:00 CST 2023, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, Job=[personInfoBatchJob]], jobParameters=[{date=1689922500016, personInfoInputFile=D:\testtasklet\inputFiles\personinfo-2.txt}]
Quartz job 终止
- 如果在超过一分钟没有输入源文件的时候,控制台输出如下
Quartz job started: com.win.tasklet.SpringBatchJob@394e0afd
开始任务 Job
输入目录不存在,任务终止
Quartz job 终止
- 再次放入一个文件
Quartz job started: com.win.tasklet.SpringBatchJob@394e0afd
开始任务 Job
文件目录 :D:\testtasklet\inputFiles\personinfo-3.txt
jobName : personInfoBatchJob
Processing result :PersonInfo(id=0, name=wangking, birthday=1989-04-01, salary=18.0)
Processing result :PersonInfo(id=0, name=tony, birthday=1995-08-01, salary=86.0)
Archiving file: D:\testtasklet\inputFiles\personinfo-3.txt
Job 完成 : JobExecution: id=1, version=2, startTime=Fri Jul 21 15:17:00 CST 2023, endTime=Fri Jul 21 15:17:00 CST 2023, lastUpdated=Fri Jul 21 15:17:00 CST 2023, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=1, version=0, Job=[personInfoBatchJob]], jobParameters=[{date=1689923820012, personInfoInputFile=D:\testtasklet\inputFiles\personinfo-3.txt}]
Quartz job 终止
2)、程序结果输出
放入第一个文件的时候,发现如下
-
源文件目录移动到归档目录,并改名为
文件内容与输入源文件内容一致。 -
输出文件的内容如下
再次放入源数据目录一个文件 -
源文件目录中的输入文件被移走到归档目录
-
归档目录中会把该文件改名
-
输出文件内容
文件内容会追加,具体如下
以上,完成了spring batch的一个定时任务示例,其中通过tasklet监控任务的运行情况。