springboot整合springbatch实现批处理
- 简介
- 项目搭建
- 步骤
简介
项目搭建
参考博客【场景实战】Spring Boot + Spring Batch 实现批处理任务,保姆级教程
步骤
1.建表
建表sql
CREATE TABLE `student` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(100) NOT NULL COMMENT '姓名',
`class_name` varchar(20) DEFAULT NULL COMMENT '班级名称',
`china_score` varchar(4) DEFAULT NULL COMMENT '语文成绩',
`math_score` varchar(4) DEFAULT NULL COMMENT '数学成绩',
`english_score` varchar(4) DEFAULT NULL COMMENT '英语成绩',
`sex` tinyint(1) NOT NULL COMMENT '性别:0-男,1-女',
`birthday` date NOT NULL COMMENT '生日',
`card_id` varchar(20) NOT NULL COMMENT '身份证号',
`phone` varchar(20) NOT NULL COMMENT '手机号',
PRIMARY KEY (`id`),
UNIQUE KEY `card_id` (`card_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='学生表'
2.pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>springbatch_study</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.3.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3</version>
</dependency>
<!--swagger页面-->
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
<version>6.2.2.Final</version>
</dependency>
</dependencies>
</project>
3.启动类
@SpringBootApplication
@EnableSwagger2
public class BatchService {
public static void main(String[] args) {
SpringApplication.run(BatchService.class,args);
}
}
4.配置文件
server:
port: 8081
spring:
application:
name: spring-batch-study
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?serverTimeZone=Asia/Shanghai&characterEncoding=utf-8
username: root
password: root
batch:
job:
enabled: false #需要jobLaucher.run执行
initialize-schema: never #第一次没有新建batch内置表时为always,创建内置表后设置为never
注意:spring.batch.initialize-schema第一次运行时写为always,运行后会自动生产batch内置表
5.实体类
package com.test.batch.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.util.Date;
/**
* @author 1
*/
@Data
public class Student {
@TableId(type = IdType.AUTO)
private Integer id;
/**
* '姓名'
*/
private String name;
/**
* '班级名称'
*/
private String className;
/**
* '语文成绩'
*/
private String chinaScore;
/**
* '数学成绩'
*/
private String mathScore;
/**
* 英语成绩
*/
private String englishScore;
/**
* '性别:0-男,1-女'
*/
private Integer sex;
/**
* '生日'
*/
@JsonFormat(pattern = "yyyy-MM-dd")
private Date birthday;
/**
* '身份证号'
*/
private String cardId;
/**
* '手机号'
*/
private String phone;
}
6.batch核心配置类
package com.test.batch.config;
import com.test.batch.entity.Student;
import com.test.batch.listen.MyBeanValidator;
import com.test.batch.listen.MyJobListener;
import com.test.batch.listen.MyReaderListener;
import com.test.batch.listen.MyWriteListener;
import com.test.batch.processor.MyProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author 1
*/
@Configuration
@EnableBatchProcessing
@Slf4j
public class BatchConfig {
/**
* JobRepository定义及数据库的操作
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)throws Exception{
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDatabaseType("mysql");
jobRepositoryFactoryBean.setTransactionManager(transactionManager);
jobRepositoryFactoryBean.setDataSource(dataSource);
return jobRepositoryFactoryBean.getObject();
}
/**
* JobLauncher:job的启动器,绑定相关的Repository
* @param dataSource
* @param transactionManager
* @return
* @throws Exception
*/
@Bean
public SimpleJobLauncher myJobLauncher(DataSource dataSource,PlatformTransactionManager transactionManager)throws Exception{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(myJobRepository(dataSource,transactionManager));
return jobLauncher;
}
/**
* 定义job
* @param jobBuilderFactory
* @param myStep
* @return
*/
@Bean
public Job myJob(JobBuilderFactory jobBuilderFactory, Step myStep){
return jobBuilderFactory.get("myJob")
.incrementer(new RunIdIncrementer())
.flow(myStep)
.end()
.listener(myJobListener())
.build();
}
/**
* 注册job监听器
* @return
*/
@Bean
public MyJobListener myJobListener(){
return new MyJobListener();
}
/**
* 定义itemReader,读取文件数据+entity实体映射
* @return
*/
@Bean
public ItemReader<Student> reader(){
FlatFileItemReader<Student> reader = new FlatFileItemReader<>();
//设置文件路径
reader.setResource(new ClassPathResource("static/student.csv"));
reader.setLineMapper(new DefaultLineMapper<Student>(){
{
setLineTokenizer(new DelimitedLineTokenizer(){
{
setNames(new String[]{"name","className","chinaScore","mathScore","englishScore","sex","birthday","cardIdd","phone"});
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<Student>(){
{
setTargetType(Student.class);
//设置日期转换
setConversionService(createConversionService());
}
});
}
});
return reader;
}
public ConversionService createConversionService() {
DefaultConversionService conversionService = new DefaultConversionService();
DefaultConversionService.addDefaultConverters(conversionService);
conversionService.addConverter(new Converter<String, Date>() {
@Override
public Date convert(String text) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
Date date = new Date();
try {
date = sdf.parse(text);
}catch (Exception e){
log.error("日期转换异常 :{}",e);
}
return date;
}
});
return conversionService;
}
/**
* 注册ItemProcessor,处理数据
* @return
*/
@Bean
public ItemProcessor<Student,Student> processor(){
MyProcessor myProcessor = new MyProcessor();
myProcessor.setValidator(myBeanValidator());
return myProcessor;
}
@Bean
public MyBeanValidator myBeanValidator(){
return new MyBeanValidator<Student>();
}
/**
* 定义ItemWriter,指定DataSource,设置批量插入sql语句,写入数据库
* @param dataSource
* @return
*/
@Bean
public ItemWriter<Student> writer(DataSource dataSource){
JdbcBatchItemWriter<Student> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
String sql = "insert into student (name,class_name,china_score,math_score,english_score,sex,birthday,card_id,phone) " +
"values (:name,:className,:chinaScore,:mathScore,:englishScore,:sex,:birthday,:cardId,:phone)";
writer.setSql(sql);
writer.setDataSource(dataSource);
return writer;
}
@Bean
public Step myStep(StepBuilderFactory factory,ItemReader<Student> reader,ItemWriter<Student> writer,ItemProcessor<Student,Student> processor){
return factory.get("myStep")
.<Student,Student>chunk(5000)
.reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
.listener(new MyReaderListener())
.processor(processor)
.writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
.listener(new MyWriteListener())
.build();
}
}
7.自定义处理器
package com.test.batch.processor;
import com.test.batch.entity.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.validator.ValidatingItemProcessor;
import org.springframework.batch.item.validator.ValidationException;
/**
* @author 1
*/
@Slf4j
public class MyProcessor extends ValidatingItemProcessor<Student> {
private Integer GOOD = 90;
private Integer BAD = 60;
@Override
public Student process(Student item) throws ValidationException {
/**
* 需要执行super.process(item)才会调用自定义校验器
*/
super.process(item);
String chinaScore = item.getChinaScore();
String mathScore = item.getMathScore();
String englishScore = item.getEnglishScore();
String name = item.getName();
String phone = item.getPhone();
if (GOOD <= Double.parseDouble(chinaScore) && GOOD <= Double.parseDouble(mathScore) && GOOD <= Double.parseDouble(englishScore)){
log.info("{}同学三科成绩均为90以上,应该给予奖励", name);
}
if (BAD > Double.parseDouble(chinaScore) && BAD > Double.parseDouble(mathScore) && BAD > Double.parseDouble(englishScore)){
log.info("{}同学三科成绩均不及格,建议通知家长,电话:{}", name,phone);
}
return item;
}
}
8.job监听器
package com.test.batch.listen;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
/**
* @author 1
*/
@Slf4j
public class MyJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("job开始,id:{}",jobExecution.getJobId());
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("id:{}",jobExecution.getJobId());
}
}
9.读组件监听器
package com.test.batch.listen;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemReadListener;
import static java.lang.String.format;
/**
* @author 1
*/
@Slf4j
public class MyReaderListener implements ItemReadListener {
@Override
public void beforeRead() {
}
@Override
public void afterRead(Object o) {
}
@Override
public void onReadError(Exception e) {
log.error("读取数据失败:{}",e);
log.info("item error:"+format("%s%n", e.getMessage()));
}
}
10.写组件监听器
package com.test.batch.listen;
import com.test.batch.entity.Student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.ItemWriteListener;
import java.util.List;
import static java.lang.String.format;
/**
* @author 1
*/
@Slf4j
public class MyWriteListener implements ItemWriteListener<Student> {
@Override
public void beforeWrite(List<? extends Student> list) {
}
@Override
public void afterWrite(List<? extends Student> list) {
}
@Override
public void onWriteError(Exception e, List<? extends Student> list) {
try {
log.info(format("%s%n", e.getMessage()));
for (Student message : list) {
log.info(format("Failed writing Students : %s", message.toString()));
}
} catch (Exception ex) {
log.error("format error :{}",ex);
}
}
}
11.字段校验
package com.test.batch.listen;
import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.beans.factory.InitializingBean;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.ValidatorFactory;
import java.util.Set;
/**
* @author 1
*/
public class MyBeanValidator<T> implements Validator<T>, InitializingBean {
private javax.validation.Validator validator;
@Override
public void validate(T t) throws ValidationException {
/**
* 使用Validator的validate方法校验数据
*/
Set<ConstraintViolation<T>> constraintViolations =
validator.validate(t);
if (constraintViolations.size() > 0) {
StringBuilder message = new StringBuilder();
for (ConstraintViolation<T> constraintViolation : constraintViolations) {
message.append(constraintViolation.getMessage() + "\n");
}
throw new ValidationException(message.toString());
}
}
@Override
public void afterPropertiesSet() throws Exception {
ValidatorFactory validatorFactory =
Validation.buildDefaultValidatorFactory();
validator = validatorFactory.usingContext().getValidator();
}
}
12.接口
package com.test.batch.controller;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* @author 1
*/
@RestController
@Slf4j
public class TestController {
@Autowired
SimpleJobLauncher launcher;
@Autowired
private Job job;
@GetMapping("testJob")
public ResponseEntity testJob(){
try {
//job添加参数,确保每个job都唯一
JobParameters jobParameters = new JobParametersBuilder().addDate("date",new Date()).toJobParameters();
launcher.run(job,jobParameters);
}catch (Exception e){
log.error("job error:{}",e);
return ResponseEntity.ok(e.getMessage());
}
return ResponseEntity.ok("操作成功!!!");
}
}
13.数据
14.运行后浏览器输入
http://localhost:8081/doc.html
或页面输入localhost:8081/testJob,文件内容成功写入数据库