一、MapReduce DB 操作
对于本专栏的前面几篇文章的操作,基本都是读取本地或 HDFS 中的文件,如果有的数据是存在 DB
中的我们要怎么处理呢?
Hadoop
为我们提供了 DBInputFormat
和 DBOutputFormat
两个类。顾名思义 DBInputFormat
负责从数据库中读取数据,DBOutputFormat
负责把数据最终写入数据库中。
不过如果要把数据库内容映射成对象实体,还需要该实体实现 DBWritable
接口,其中 readFields
方法用来指定获取数据库中的字段,write
方法用于指定写入数据库字段。
下面还是使用本专栏上几篇文章所使用的COVID-19
案例进行试验,首先将文本类型的数据集导入 Mysql
数据库中,然后读取表信息作为数据集分析每个州的 cases
和 deaths
的总数,并将计算结果写入 Mysql
。
COVID-19
案例地址:
https://blog.csdn.net/qq_43692950/article/details/127475811
二、文本类型的数据集导入 Mysql 数据库
首先在 Mysql
中创建表:
CREATE TABLE `covid_input` (
`id` int NOT NULL AUTO_INCREMENT,
`date` datetime DEFAULT NULL,
`county` varchar(255) DEFAULT NULL,
`state` varchar(255) DEFAULT NULL,
`fips` varchar(255) DEFAULT NULL,
`cases` int DEFAULT NULL,
`deaths` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
创建对象实体类,实现 WritableComparable
、DBWritable
:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountEntity implements WritableComparable<CountEntity>, DBWritable {
private String date; //日期
private String county; // 县
private String state; // 州
private String fips; // 县编码code
private Long cases;//确诊病例数
private Long deaths;//死亡病例数
/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(date);
out.writeUTF(county);
out.writeUTF(state);
out.writeUTF(fips);
out.writeLong(cases);
out.writeLong(deaths);
}
/**
* 反序列化方法 注意顺序
*/
@Override
public void readFields(DataInput in) throws IOException {
this.date = in.readUTF();
this.county = in.readUTF();
this.state = in.readUTF();
this.fips = in.readUTF();
this.cases = in.readLong();
this.deaths = in.readLong();
}
/**
* 指定写入 DB 中的字段,parameterIndex对应 DBOutputFormat.setOutput 中指定的 fieldNames
*/
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, date);
preparedStatement.setString(2, county);
preparedStatement.setString(3, state);
preparedStatement.setString(4, fips);
preparedStatement.setLong(5, cases);
preparedStatement.setLong(6, deaths);
}
/**
* 从数据库读取字段信息,由于是读取的文本文件写入 Mysql,没有读取 DB
*/
@Override
public void readFields(ResultSet resultSet) throws SQLException {
}
//排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序
@Override
public int compareTo(CountEntity o) {
int i = this.state.compareTo(o.getState());
if (i == 0) {
return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
}
return i;
}
}
编写 Mapper
类,由于这里无需聚合分组操作,可以不做 Reduces
,直接去 Mapper
的输出结果到 Mysql
即可,因此这里 key
输出实体对象,Value
为 Null
占位
@Slf4j
public class DBMapper extends Mapper<LongWritable, Text, CountEntity, NullWritable> {
CountEntity outValue = new CountEntity();
NullWritable outKey = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (fields.length >= 6){
outValue.setDate(fields[0]);
outValue.setCounty(fields[1]);
outValue.setState(fields[2]);
outValue.setFips(fields[3]);
outValue.setCases(Long.parseLong(fields[4]));
outValue.setDeaths(Long.parseLong(fields[5]));
context.write(outValue, outKey);
}
}
}
最后编写驱动类,声明输出表及字段:
public class DBDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
//配置当前作业需要使用的JDBC信息
DBConfiguration.configureDB(
conf,
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://127.0.0.1:3306/mapreduces?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT",
"root",
"root"
);
int status = ToolRunner.run(conf, new DBDriver(), args);
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), DBDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(DBDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(DBMapper.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(CountEntity.class);
job.setMapOutputValueClass(NullWritable.class);
//这里无需Recuces
job.setNumReduceTasks(0);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, new Path("D:/test/input"));
// 配置作业的输出
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(
job,
"covid_input",
"date", "county", "state", "fips", "cases","deaths");
return job.waitForCompletion(true)? 0:1;
}
}
执行驱动类:
执行成功后,到 Mysql 查看结果:
已经写入成功,下面基于该表统计每个 state
州 的 cases
和 deaths
总数。
三、计算各个州的累积cases、deaths
现在和上面不同的是,输入和输出都是DB
,首先创建结果输出表:
CREATE TABLE `covid_output` (
`id` int NOT NULL AUTO_INCREMENT,
`state` varchar(255) DEFAULT NULL,
`cases` int DEFAULT NULL,
`deaths` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
修改 CountEntity
实体,指定读取和输出 DB
字段:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountEntity implements WritableComparable<CountEntity>, DBWritable {
private String date; //日期
private String county; // 县
private String state; // 州
private String fips; // 县编码code
private Long cases;//确诊病例数
private Long deaths;//死亡病例数
/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(date);
out.writeUTF(county);
out.writeUTF(state);
out.writeUTF(fips);
out.writeLong(cases);
out.writeLong(deaths);
}
/**
* 反序列化方法 注意顺序
*/
@Override
public void readFields(DataInput in) throws IOException {
this.date = in.readUTF();
this.county = in.readUTF();
this.state = in.readUTF();
this.fips = in.readUTF();
this.cases = in.readLong();
this.deaths = in.readLong();
}
/**
* 由于输出covid_output表字段
*/
@Override
public void write(PreparedStatement preparedStatement) throws SQLException {
preparedStatement.setString(1, state);
preparedStatement.setLong(2, cases);
preparedStatement.setLong(3, deaths);
}
/**
* 读取covid_input 表中的字段
*/
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.date = resultSet.getString("date");
this.county = resultSet.getString("county");
this.state = resultSet.getString("state");
this.fips = resultSet.getString("fips");
this.cases = resultSet.getLong("cases");
this.deaths = resultSet.getLong("deaths");
}
//排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序
@Override
public int compareTo(CountEntity o) {
int i = this.state.compareTo(o.getState());
if (i == 0) {
return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
}
return i;
}
}
编写 Mapper
类,将 state
最为 key
, value
行数据输出至 Reduces 中
public class DBMapper extends Mapper<LongWritable, CountEntity, Text, CountEntity> {
Text outKey = new Text();
@Override
protected void map(LongWritable key, CountEntity value, Context context) throws IOException, InterruptedException {
outKey.set(value.getState());
context.write(outKey, value);
}
}
在 Reduces
中对 cases
和 deaths
进行求和,key
即为输出的结果:
public class DBReducer extends Reducer<Text, CountEntity, CountEntity, NullWritable> {
CountEntity outKey = new CountEntity();
NullWritable outValue = NullWritable.get();
@Override
protected void reduce(Text key, Iterable<CountEntity> values, Context context) throws IOException, InterruptedException {
long totalCases = 0;
long totalDeaths = 0;
for (CountEntity value : values) {
totalCases += value.getCases();
totalDeaths += value.getDeaths();
}
outKey.setState(key.toString());
outKey.setCases(totalCases);
outKey.setDeaths(totalDeaths);
context.write(outKey, outValue);
}
}
最后编写驱动类,指定输入输出方式:
public class DBDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
//配置文件对象
Configuration conf = new Configuration();
//配置当前作业需要使用的JDBC信息
DBConfiguration.configureDB(
conf,
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://127.0.0.1:3306/mapreduces?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT",
"root",
"root"
);
conf.set("mapreduce.framework.name", "local");
int status = ToolRunner.run(conf, new DBDriver(), args);
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
// 创建作业实例
Job job = Job.getInstance(getConf(), DBDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(DBDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(DBMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CountEntity.class);
job.setReducerClass(DBReducer.class);
job.setOutputKeyClass(CountEntity.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(DBInputFormat.class);
DBInputFormat.setInput(
job,
CountEntity.class,
"SELECT date,county,state,fips,cases,deaths from covid_input",
"SELECT count(*) from covid_input"
);
// 配置作业的输出
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(
job,
"covid_output",
"state", "cases", "deaths");
return job.waitForCompletion(true) ? 0 : 1;
}
}
执行该驱动类:
执行成功后,到 Mysql 中查看结果:
最后可以检验数据的正确性:
SELECT
state,
sum(cases) AS total_cases,
sum(deaths) AS total_deaths
FROM
`covid_input`
GROUP BY state