一、自定义RowMapper
详情参考我的另一篇博客:
Spring Batch之读数据库——JdbcCursorItemReader(三十五)_人……杰的博客-CSDN博客
二、项目实例
1.项目框架
2.代码实现
BatchMain.java:
package com.xj.demo28;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @Author : xjfu
* @Date : 2021/10/26 20:01
* @Description : demo28 JdbcCursorItemReader之自定义RowMapper
*/
public class BatchMain {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("demo28/job/demo28-job.xml");
//Spring Batch的作业启动器,
JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
//在batch.xml中配置的一个作业
Job job = (Job)context.getBean("billJob");
try{
//开始执行这个作业,获得处理结果(要运行的job,job参数对象)
JobExecution result = launcher.run(job, new JobParametersBuilder().addString("id","2").toJobParameters());
System.out.println(result.toString());
}catch (Exception e){
e.printStackTrace();
}
}
}
CreditBill.java:
package com.xj.demo28;
/**
* @Author : xjfu
* @Date : 2021/10/26 19:27
* @Description :
*/
public class CreditBill {
//银行卡账户ID
private String accountID = "";
//持卡人姓名
private String name = "";
//消费金额
private double amount = 0;
//消费日期
private String date = "";
//消费场所
private String address = "";
public String getAccountID() {
return accountID;
}
public void setAccountID(String accountID) {
this.accountID = accountID;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getAmount() {
return amount;
}
public void setAmount(double amount) {
this.amount = amount;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@Override
public String toString() {
return this.accountID + "," + this.name + "," + this.amount + "," + this.date + "," + this.address;
}
}
CreditBillProcessor.java:
package com.xj.demo28;
import org.springframework.batch.item.ItemProcessor;
public class CreditBillProcessor implements
ItemProcessor<CreditBill, CreditBill> {
public CreditBill process(CreditBill bill) throws Exception {
System.out.println(bill.toString());
return bill;
}
}
CreditBillRowMapper.java:
package com.xj.demo28;
import org.springframework.jdbc.core.RowMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
/**
* @Author : xjfu
* @Date : 2023/7/17 01:31
* @Description : 自定义RowMapper实现CreditBillRowMapper,将给定的结果集ResultSet转化为CreditBill对象
*/
public class CreditBillRowMapper implements RowMapper<CreditBill> {
public CreditBill mapRow(ResultSet rs, int rowNum) throws SQLException {
CreditBill bill = new CreditBill();
bill.setAccountID(rs.getString("ACCOUNTID"));
bill.setAddress(rs.getString("ADDRESS"));
bill.setAmount(rs.getDouble("AMOUNT"));
bill.setDate(rs.getString("DATE"));
bill.setName(rs.getString("NAME"));
return bill;
}
}
DummyCreditItemWriter.java:
package com.xj.demo28;
import org.springframework.batch.item.ItemWriter;
import java.util.ArrayList;
import java.util.List;
public class DummyCreditItemWriter implements ItemWriter<CreditBill> {
public List<CreditBill> creditBills = new ArrayList<CreditBill>();
public void write(List<? extends CreditBill> items) throws Exception {
creditBills.addAll(items);
}
public List<CreditBill> getCredits() {
return creditBills;
}
}
demo28-job.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
<!--导入文件-->
<import resource="classpath:demo28/job/demo28-jobContext.xml"/>
<!--定义名字为billJob的作业-->
<batch:job id="billJob">
<!--定义名字为billStep的作业步-->
<batch:step id="billStep">
<batch:tasklet transaction-manager="transactionManager">
<!--定义读、处理、写操作,规定每处理两条数据,进行一次写入操作,这样可以提高写的效率-->
<batch:chunk reader="jdbcParameterItemReader" processor="creditBillProcessor" writer="creditItemWriter" commit-interval="2">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
</beans>
demo28-jobContext.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!--引入配置参数-->
<context:property-placeholder location="classpath:/demo28/properties/demo28-batch-mysql.properties" />
<!--
data-source:定义数据源,默认dataSource
transaction-manager:定义事务管理器
isolation-level-for-create:定义创建Job Execution时候的事务隔离级别,避免多个Job Execution执行一个Job Instance,默认SERIALIZABLE
table_prefix:定义使用的数据库表的前缀为BATCH_,默认BATCH_
max-varchar-length:定义varchar的最大长度为1000,默认值为2500
-->
<batch:job-repository
id="jobRepository"
data-source="dataSource"
transaction-manager="transactionManager"
isolation-level-for-create="SERIALIZABLE"
table-prefix="BATCH_"
max-varchar-length="1000"/>
<!--数据库的事务管理器-->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<!--定义作业调度器,用来启动job-->
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<!--注入jobRepository-->
<property name="jobRepository" ref="jobRepository"/>
</bean>
<!--数据源-->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName">
<value>${datasource.driver}</value>
</property>
<property name="url">
<value>${datasource.url}</value>
</property>
<property name="username" value="${datasource.username}"></property>
<property name="password" value="${datasource.password}"></property>
</bean>
<!-- 参数化读取db -->
<bean id="jdbcParameterItemReader" scope="step"
class="org.springframework.batch.item.database.JdbcCursorItemReader" >
<property name="dataSource" ref="dataSource"/>
<!--查找范围是1到待定的范围,待定的范围通过后续的属性preparedStatementSetter设置-->
<property name="sql" value="select ID,ACCOUNTID,NAME,AMOUNT,DATE,ADDRESS from t_credit where id between 1 and ? "/>
<property name="rowMapper" ref="custCreditRowMapper" />
<!--使用ListPreparedStatementSetter来设置SQL语句需要的参数-->
<property name="preparedStatementSetter" ref="paramStatementSetter"/>
</bean>
<bean id="paramStatementSetter" scope="step"
class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<property name="parameters">
<list>
<value>#{jobParameters['id']}</value>
</list>
</property>
</bean>
<!--自定义RowMapper实现CreditBillRowMapper,将给定的结果集ResultSet转化为CreditBill对象-->
<bean id="custCreditRowMapper" class="com.xj.demo28.CreditBillRowMapper"/>
<!--处理类-->
<bean id="creditBillProcessor" scope="step" class="com.xj.demo28.CreditBillProcessor"/>
<!--写类-->
<bean id="creditItemWriter" class="com.xj.demo28.DummyCreditItemWriter"/>
</beans>
create-tables-mysql.sql:
DROP TABLE IF EXISTS t_credit;
DROP TABLE IF EXISTS t_destcredit;
CREATE TABLE t_credit
(ID VARCHAR(10),
ACCOUNTID VARCHAR(20),
NAME VARCHAR(10),
AMOUNT NUMERIC(10,2),
DATE VARCHAR(20),
ADDRESS VARCHAR(128),
primary key (ID)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE t_destcredit
(ID VARCHAR(10),
ACCOUNTID VARCHAR(20),
NAME VARCHAR(10),
AMOUNT NUMERIC(10,2),
DATE VARCHAR(20),
ADDRESS VARCHAR(128),
primary key (ID)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO t_credit(ID,ACCOUNTID,NAME,AMOUNT,DATE,ADDRESS) VALUES('1','4047390012345678','tom',100.00,'2013-2-2 12:00:08','Lu Jia Zui road');
INSERT INTO t_credit(ID,ACCOUNTID,NAME,AMOUNT,DATE,ADDRESS) VALUES('2','4047390012345678','tom',320.00,'2013-2-3 10:35:21','Lu Jia Zui road');
INSERT INTO t_credit(ID,ACCOUNTID,NAME,AMOUNT,DATE,ADDRESS) VALUES('3','4047390012345678','tom',674.70,'2013-2-6 16:26:49','South Linyi road');
INSERT INTO t_credit(ID,ACCOUNTID,NAME,AMOUNT,DATE,ADDRESS) VALUES('4','4047390012345678','tom',793.20,'2013-2-9 15:15:37','Longyang road');
INSERT INTO t_credit(ID,ACCOUNTID,NAME,AMOUNT,DATE,ADDRESS) VALUES('5','4047390012345678','tom',360.00,'2013-2-11 11:12:38','Longyang road');
demo28-batch-mysql.properties:
datasource.driver=com.mysql.jdbc.Driver
datasource.url=jdbc:mysql://127.0.0.1:3306/spring_batch_demo1?serverTimezone=UTC
datasource.username=root
datasource.password=12345
3.运行结果