目录
需求一
需求二
转视频版
需求一
需求:先动态生成50w条员工数据,存放再employee.csv文件中
步骤1:定义:DataInitController
@RestController
public class DataInitController {
@Autowired
private IEmployeeService employeeService;
@GetMapping("/dataInit")
public String dataInit() throws IOException {
employeeService.dataInit();
return "ok";
}
}
步骤2:在IEmployeeService 添加dataInit 方法
public interface IEmployeeService {
/**
* 保存
*/
void save(Employee employee);
/**
* 初始化数据:生成50w数据
*/
void dataInit() throws IOException;
}
步骤3:在EmployeeServiceImpl 实现方法
@Service
public class EmployeeServiceImpl implements IEmployeeService {
@Autowired
private EmployeeMapper employeeMapper;
@Override
public void save(Employee employee) {
employeeMapper.save(employee);
}
@Value("${job.data.path}")
public String path;
@Override
public void dataInit() throws IOException {
File file = new File(path, "employee.csv");
if (file.exists()) {
file.delete();
}
file.createNewFile();
FileOutputStream out = new FileOutputStream(file);
String txt = "";
Random ageR = new Random();
Random boolR = new Random();
// 给文件中生产50万条数据
long beginTime = System.currentTimeMillis();
System.out.println("开始时间:【 " + beginTime + " 】");
for (int i = 1; i <= 500000; i++) {
if(i == 500000){
txt = i+",dafei_"+ i +"," + ageR.nextInt(100) + "," + (boolR.nextBoolean()?1:0);
}else{
txt = i+",dafei_"+ i +"," + ageR.nextInt(100) + "," + (boolR.nextBoolean()?1:0) +"\n";
}
out.write(txt.getBytes());
out.flush();
}
out.close();
System.out.println("总共耗时:【 " + (System.currentTimeMillis() - beginTime) + " 】毫秒");
}
}
步骤4:访问http://localhost:8080/dataInit 生成数据。
需求二
需求:启动作业异步读取employee.csv文件,将读到数据写入到employee_temp表,要求记录读与写消耗时间
步骤1:修改IEmployeeService 接口
public interface IEmployeeService {
/**
* 保存
*/
void save(Employee employee);
/**
* 初始化数据:生成50w数据
*/
void dataInit() throws IOException;
/**
* 清空数据
*/
void truncateAll();
/**
* 清空employee_temp数据
*/
void truncateTemp();
}
步骤2:修改EmployeeServiceImpl
@Override
public void truncateAll() {
employeeMapper.truncateAll();
}
@Override
public void truncateTemp() {
employeeMapper.truncateTemp();
}
步骤3:修改IEmployeeMapper.java
public interface EmployeeMapper {
/**
* 添加
*/
int save(Employee employee);
/**
* 添加临时表
* @param employee
* @return
*/
int saveTemp(Employee employee);
/**
* 清空数据
*/
void truncateAll();
/**
* 清空临时表数据
*/
void truncateTemp();
}
步骤4:修改EmployeeMapper.xml
<insert id="saveTemp" keyColumn="id" useGeneratedKeys="true" keyProperty="id">
insert into employee_temp(id, name, age, sex) values(#{id},#{name},#{age},#{sex})
</insert>
<delete id="truncateAll">
truncate employee
</delete>
<delete id="truncateTemp">
truncate employee_temp
</delete>
步骤5:在com.langfeiyes.exp.job.listener 包新建监听器,用于计算开始结束时间
package com.langfeiyes.exp.job.listener;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class CsvToDBJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
long begin = System.currentTimeMillis();
jobExecution.getExecutionContext().putLong("begin", begin);
System.err.println("-------------------------【CsvToDBJob开始时间:】---->"+begin+"<-----------------------------");
}
@Override
public void afterJob(JobExecution jobExecution) {
long begin = jobExecution.getExecutionContext().getLong("begin");
long end = System.currentTimeMillis();
System.err.println("-------------------------【CsvToDBJob结束时间:】---->"+end+"<-----------------------------");
System.err.println("-------------------------【CsvToDBJob总耗时:】---->"+(end - begin)+"<-----------------------------");
}
}
步骤6:在com.langfeiyes.exp.job.config包定义CsvToDBJobConfig配置类
package com.langfeiyes.exp.job.config;
import com.langfeiyes.exp.domain.Employee;
import com.langfeiyes.exp.job.listener.CsvToDBJobListener;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.PathResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import java.io.File;
/**
* 将数据从csv文件中读取,并写入数据库
*/
@Configuration
public class CsvToDBJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Value("${job.data.path}")
private String path;
//多线程读-读文件,使用FlatFileItemReader
@Bean
public FlatFileItemReader<Employee> cvsToDBItemReader(){
FlatFileItemReader<Employee> reader = new FlatFileItemReaderBuilder<Employee>()
.name("employeeCSVItemReader")
.saveState(false) //防止状态被覆盖
.resource(new PathResource(new File(path, "employee.csv").getAbsolutePath()))
.delimited()
.names("id", "name", "age", "sex")
.targetType(Employee.class)
.build();
return reader;
}
//数据库写-使用mybatis提供批处理读入
@Bean
public MyBatisBatchItemWriter<Employee> cvsToDBItemWriter(){
MyBatisBatchItemWriter<Employee> itemWriter = new MyBatisBatchItemWriter<>();
itemWriter.setSqlSessionFactory(sqlSessionFactory); //需要指定sqlsession工厂
//指定要操作sql语句,路径id为:EmployeeMapper.xml定义的sql语句id
itemWriter.setStatementId("com.langfeiyes.exp.mapper.EmployeeMapper.saveTemp"); //操作sql
return itemWriter;
}
@Bean
public Step csvToDBStep(){
return stepBuilderFactory.get("csvToDBStep")
.<Employee, Employee>chunk(10000) //每个块10000个 共50个
.reader(cvsToDBItemReader())
.writer(cvsToDBItemWriter())
.taskExecutor(new SimpleAsyncTaskExecutor()) //多线程读写
.build();
}
//job监听器
@Bean
public CsvToDBJobListener csvToDBJobListener(){
return new CsvToDBJobListener();
}
@Bean
public Job csvToDBJob(){
return jobBuilderFactory.get("csvToDB-step-job")
.start(csvToDBStep())
.incrementer(new RunIdIncrementer()) //保证可以多次执行
.listener(csvToDBJobListener())
.build();
}
}
步骤7:在com.langfeiyes.exp.controller 添加JobController
package com.langfeiyes.exp.controller;
import com.langfeiyes.exp.service.IEmployeeService;
import org.springframework.batch.core.*;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RestController
public class JobController {
@Autowired
private IEmployeeService employeeService;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private JobExplorer jobExplorer;
@Autowired
@Qualifier("csvToDBJob")
private Job csvToDBJob;
@GetMapping("/csvToDB")
public String csvToDB() throws Exception {
employeeService.truncateTemp(); //清空数据运行多次执行
//需要多次执行,run.id 必须重写之前,再重构一个新的参数对象
JobParameters jobParameters = new JobParametersBuilder(new JobParameters(),jobExplorer)
.addLong("time", new Date().getTime())
.getNextJobParameters(csvToDBJob).toJobParameters();
JobExecution run = jobLauncher.run(csvToDBJob, jobParameters);
return run.getId().toString();
}
}
步骤8:访问测试:http://localhost:8080/csvToDB
-------------------------【CsvToDBJob开始时间:】---->1670575356773<-----------------------------
-------------------------【CsvToDBJob结束时间:】---->1670575510967<-----------------------------
-------------------------【CsvToDBJob总耗时:】---->154194<-----------------------------
到这,本篇就结束了,欲知后事如何,请听下回分解~
转视频版
看文字不过瘾可以切换视频版:Spring Batch高效批处理框架实战