Hadoop3 - MapReduce DB 操作

news2024/12/22 7:00:48

一、MapReduce DB 操作

对于本专栏的前面几篇文章的操作,基本都是读取本地或 HDFS 中的文件,如果有的数据是存在 DB 中的我们要怎么处理呢?

Hadoop 为我们提供了 DBInputFormatDBOutputFormat 两个类。顾名思义 DBInputFormat 负责从数据库中读取数据,DBOutputFormat负责把数据最终写入数据库中。

不过如果要把数据库内容映射成对象实体,还需要该实体实现 DBWritable 接口,其中 readFields 方法用来指定获取数据库中的字段,write 方法用于指定写入数据库字段。

下面还是使用本专栏上几篇文章所使用的COVID-19 案例进行试验,首先将文本类型的数据集导入 Mysql 数据库中,然后读取表信息作为数据集分析每个州的 casesdeaths 的总数,并将计算结果写入 Mysql

COVID-19 案例地址:

https://blog.csdn.net/qq_43692950/article/details/127475811

二、文本类型的数据集导入 Mysql 数据库

首先在 Mysql 中创建表:

CREATE TABLE `covid_input` (
  `id` int NOT NULL AUTO_INCREMENT,
  `date` datetime DEFAULT NULL,
  `county` varchar(255) DEFAULT NULL,
  `state` varchar(255) DEFAULT NULL,
  `fips` varchar(255) DEFAULT NULL,
  `cases` int DEFAULT NULL,
  `deaths` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

创建对象实体类,实现 WritableComparableDBWritable

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountEntity implements WritableComparable<CountEntity>, DBWritable {

    private String date; //日期
    private String county; // 县
    private String state; // 州
    private String fips; // 县编码code
    private Long cases;//确诊病例数
    private Long deaths;//死亡病例数

    /**
     * 序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(date);
        out.writeUTF(county);
        out.writeUTF(state);
        out.writeUTF(fips);
        out.writeLong(cases);
        out.writeLong(deaths);
    }

    /**
     * 反序列化方法 注意顺序
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.date = in.readUTF();
        this.county = in.readUTF();
        this.state = in.readUTF();
        this.fips = in.readUTF();
        this.cases = in.readLong();
        this.deaths = in.readLong();
    }

    /**
     * 指定写入 DB 中的字段,parameterIndex对应 DBOutputFormat.setOutput 中指定的 fieldNames
     */
    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1, date);
        preparedStatement.setString(2, county);
        preparedStatement.setString(3, state);
        preparedStatement.setString(4, fips);
        preparedStatement.setLong(5, cases);
        preparedStatement.setLong(6, deaths);
    }

    /**
     *  从数据库读取字段信息,由于是读取的文本文件写入 Mysql,没有读取 DB
     */
    @Override
    public void readFields(ResultSet resultSet) throws SQLException {

    }

    //排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序
    @Override
    public int compareTo(CountEntity o) {
        int i = this.state.compareTo(o.getState());
        if (i == 0) {
            return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
        }
        return i;
    }

}

编写 Mapper 类,由于这里无需聚合分组操作,可以不做 Reduces ,直接去 Mapper 的输出结果到 Mysql 即可,因此这里 key 输出实体对象,ValueNull 占位

@Slf4j
public class DBMapper extends Mapper<LongWritable, Text, CountEntity, NullWritable> {

    CountEntity outValue = new CountEntity();
    NullWritable outKey = NullWritable.get();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split(",");
        if (fields.length >= 6){
            outValue.setDate(fields[0]);
            outValue.setCounty(fields[1]);
            outValue.setState(fields[2]);
            outValue.setFips(fields[3]);
            outValue.setCases(Long.parseLong(fields[4]));
            outValue.setDeaths(Long.parseLong(fields[5]));
            context.write(outValue, outKey);
        }
    }
}

最后编写驱动类,声明输出表及字段:

public class DBDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        //配置当前作业需要使用的JDBC信息
        DBConfiguration.configureDB(
                conf,
                "com.mysql.cj.jdbc.Driver",
                "jdbc:mysql://127.0.0.1:3306/mapreduces?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT",
                "root",
                "root"
        );
        int status = ToolRunner.run(conf, new DBDriver(), args);
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), DBDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(DBDriver.class);
        // 设置作业mapper reducer类
        job.setMapperClass(DBMapper.class);
        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(CountEntity.class);
        job.setMapOutputValueClass(NullWritable.class);

        //这里无需Recuces
        job.setNumReduceTasks(0);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, new Path("D:/test/input"));
        // 配置作业的输出
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(
                job,
                "covid_input",
                "date", "county", "state", "fips", "cases","deaths");

        return job.waitForCompletion(true)? 0:1;
    }
}

执行驱动类:

在这里插入图片描述
执行成功后,到 Mysql 查看结果:

在这里插入图片描述

已经写入成功,下面基于该表统计每个 state 州 的 casesdeaths 总数。

三、计算各个州的累积cases、deaths

现在和上面不同的是,输入和输出都是DB,首先创建结果输出表:

CREATE TABLE `covid_output` (
  `id` int NOT NULL AUTO_INCREMENT,
  `state` varchar(255) DEFAULT NULL,
  `cases` int DEFAULT NULL,
  `deaths` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

修改 CountEntity 实体,指定读取和输出 DB 字段:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountEntity implements WritableComparable<CountEntity>, DBWritable {

    private String date; //日期
    private String county; // 县
    private String state; // 州
    private String fips; // 县编码code
    private Long cases;//确诊病例数
    private Long deaths;//死亡病例数

    /**
     * 序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(date);
        out.writeUTF(county);
        out.writeUTF(state);
        out.writeUTF(fips);
        out.writeLong(cases);
        out.writeLong(deaths);
    }

    /**
     * 反序列化方法 注意顺序
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.date = in.readUTF();
        this.county = in.readUTF();
        this.state = in.readUTF();
        this.fips = in.readUTF();
        this.cases = in.readLong();
        this.deaths = in.readLong();
    }

    /**
     * 由于输出covid_output表字段
     */
    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1, state);
        preparedStatement.setLong(2, cases);
        preparedStatement.setLong(3, deaths);
    }

    /**
     * 读取covid_input 表中的字段
     */
    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        this.date = resultSet.getString("date");
        this.county = resultSet.getString("county");
        this.state = resultSet.getString("state");
        this.fips = resultSet.getString("fips");
        this.cases = resultSet.getLong("cases");
        this.deaths = resultSet.getLong("deaths");
    }

    //排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序
    @Override
    public int compareTo(CountEntity o) {
        int i = this.state.compareTo(o.getState());
        if (i == 0) {
            return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
        }
        return i;
    }

}

编写 Mapper 类,将 state 最为 keyvalue 行数据输出至 Reduces 中

public class DBMapper extends Mapper<LongWritable, CountEntity, Text, CountEntity> {

    Text outKey = new Text();

    @Override
    protected void map(LongWritable key, CountEntity value, Context context) throws IOException, InterruptedException {
        outKey.set(value.getState());
        context.write(outKey, value);
    }
}

Reduces 中对 casesdeaths 进行求和,key 即为输出的结果:

public class DBReducer extends Reducer<Text, CountEntity, CountEntity, NullWritable> {

    CountEntity outKey = new CountEntity();
    NullWritable outValue = NullWritable.get();

    @Override
    protected void reduce(Text key, Iterable<CountEntity> values, Context context) throws IOException, InterruptedException {
        long totalCases = 0;
        long totalDeaths = 0;
        for (CountEntity value : values) {
            totalCases += value.getCases();
            totalDeaths += value.getDeaths();
        }
        outKey.setState(key.toString());
        outKey.setCases(totalCases);
        outKey.setDeaths(totalDeaths);
        context.write(outKey, outValue);
    }
}

最后编写驱动类,指定输入输出方式:

public class DBDriver extends Configured implements Tool {
    
    public static void main(String[] args) throws Exception {
        //配置文件对象
        Configuration conf = new Configuration();
        //配置当前作业需要使用的JDBC信息
        DBConfiguration.configureDB(
                conf,
                "com.mysql.cj.jdbc.Driver",
                "jdbc:mysql://127.0.0.1:3306/mapreduces?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT",
                "root",
                "root"
        );

        conf.set("mapreduce.framework.name", "local");
        int status = ToolRunner.run(conf, new DBDriver(), args);
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 创建作业实例
        Job job = Job.getInstance(getConf(), DBDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(DBDriver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(DBMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CountEntity.class);

        job.setReducerClass(DBReducer.class);
        job.setOutputKeyClass(CountEntity.class);
        job.setOutputValueClass(NullWritable.class);

        job.setInputFormatClass(DBInputFormat.class);
        DBInputFormat.setInput(
                job,
                CountEntity.class,
                "SELECT date,county,state,fips,cases,deaths from covid_input",
                "SELECT count(*) from covid_input"
        );
        // 配置作业的输出
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(
                job,
                "covid_output",
                "state", "cases", "deaths");

        return job.waitForCompletion(true) ? 0 : 1;
    }
}

执行该驱动类:

在这里插入图片描述
执行成功后,到 Mysql 中查看结果:

在这里插入图片描述

最后可以检验数据的正确性:

SELECT
	state,
	sum(cases) AS total_cases,
	sum(deaths) AS total_deaths
FROM
	`covid_input`
GROUP BY state

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1349.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MODBUS通信浮点数存储解析常用算法

MODBUS通信相关的基础知识,各种PLC通信程序的写法。可以参看专栏的其它文章这里不赘述。MODBUS通信时,数据帧都是以字节为单位发送和接收的,接收到的字节,如何存放和解析。就需要我们具备数据处理类的知识了,这里需要大家简单了解下有关数据结构的基础知识,这方面比较薄弱…

AcWing 蓝桥杯AB组辅导课 05、树状数组与线段树

文章目录前言一、树状数组1.1、树状数组知识点1.2、树状数组代码模板模板题&#xff1a;AcWing 1264. 动态求连续区间和例题例题1、AcWing 1265. 数星星【中等&#xff0c;信息学奥赛一本通】习题习题1&#xff1a;1215. 小朋友排队【中等&#xff0c;蓝桥杯】二、 线段树知识点…

27.5 Java集合之Set学习(基本概念,存储原理,性能测试)

文章目录1.Set接口1.1 Set的特性是什么&#xff1f;2.具体实现2.1 HashSet2.1.1 存储原理2.1.2 性能测试2.2 TreeSet2.2.1 存储原理2.2.2 性能测试2.3 EnumSet&#xff08;了解即可&#xff09;2.3.1 存储原理2.4 LinkedHashSet2.4.1 存储原理2.4.2 性能测试2.4.3 代码地址1.Se…

【Gitee】上传本地项目到 Gitee 仓库(入门篇)

本文主要介绍上传本地项目到 Gitee 仓库的过程&#xff0c;可以说是一个比较傻瓜的教材吧&#xff0c;从0开始&#xff0c;祝大家都能一次成功~~~ 一、前期准备 1. 配置 Gitte 创建 Gitte 账号&#xff0c;绑定好邮箱&#xff0c;并创建一个空仓库 。创建账号绑定邮箱过程这部…

【信号检测】基于小波变换的信号趋势检测和分离研究附matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

双十一好物推荐:2022年好用的数码好物分享

一年一度的双十一尽在眼前&#xff0c;因为双十一的优惠力度是一年中最大的一次&#xff0c;所以许多人都想着直接一年屯一次&#xff0c;一次屯一年的理念&#xff0c;那么作为资深剁手党的我来说&#xff0c;对比于选购双十一好物来说我还是比较有心得的&#xff0c;下面让我…

机器视觉之工业摄像机知识点(一)

本文主要记录一些基础的工业摄像机的一些简要知识点。我也是根据我觉得比较重要的来记录。作为一位算法工程师&#xff0c;其实是有两条路来走&#xff0c;即技术专家以及技术经理。这两个实际是不同的职业方向。如果你不擅于与外部沟通交流&#xff0c;并且具备非常强的科研和…

基于OpenHarmony的ArkUI框架进阶对于高性能容器类和持久化和原子化的运用

文章目录高性能容器类Badge原子化服务代码简析表达式持久化高性能容器类 顾名思义&#xff0c;容器类是一个存储类&#xff0c;用于存储各种数据类型的元素&#xff0c;并提供一系列处理数据元素的方法。ArkUI开发框架提供了两种类型的容器类&#xff0c;线性和非线性。这些容…

【机器学习】求矩阵的-1/2次方的方法

目录 一、背景描述 二、D^(-1/2)的理论基础 三、代码实现 四、总结 一、背景描述 今天在看如下论文的时候&#xff1a; 态势感知图卷积网络在电力系统连锁故障中的应用-机器学习文档类资源-CSDN文库https://download.csdn.net/download/mzy20010420/86745616?spm1001.20…

Rust之常用集合(一):向量(vector)

开发环境 Windows 10Rust 1.64.0VS Code 1.72.2 项目工程 这里继续沿用上次工程rust-demo 常用集合 Rust的标准库包括许多非常有用的数据结构&#xff0c;称为集合。大多数其他数据类型表示一个特定的值&#xff0c;但是集合可以包含多个值。与内置数组和元组类型不同&…

2022年数维杯数学建模A题银行效率评价与破产成因分析求解全过程文档及程序

2022年数维杯数学建模 A题 银行效率评价与破产成因分析 原题再现&#xff1a; 银行在国家经济社会发展过程中扮演者重要的决策&#xff0c;银行的破产会对企业和个人造成众多不利的影响。相比国内的银行&#xff0c;国际银行的倒闭频次更高&#xff0c;因此国际银行倒闭原因的…

一小时教你轻松学会使用Java 整合 Easy Excel 操作 Excel 文件

文章目录一、Apache POI简介二、POI操作Excel构建maven项目导入依赖使用POI实现基本写操作使用POI实现大数据量写操作使用POI实现基本读操作使用POI读取不同类型的数据三、Easy Excel简介构建maven项目导入依赖实现写操作实现读操作目前市面上比较流行的操作Excel 文件工具大致…

【前端】vue阶段案例:vue-router使用流程

文章目录目标步骤1.配置映射关系2.导入路由并注册3.完成首页App.vue可能出现的问题&#xff1a;Component name "About" should always be multi-word参考目标 点击首页&#xff0c;则url变为/home&#xff0c;且下面显示的组件是Home组件点击关于&#xff0c;则url变…

更易用的OceanBase|生态工具征文大赛正式开启!

OceanBase 一直在思考&#xff0c;什么样的数据库对用户而言更易用&#xff1f; 更易用&#xff0c;除了功能完善、性能优秀、运行稳定的数据库系统&#xff0c;丰富多样的生态工具也必不可少。 作为一款完全自主研发的原生分布式数据库&#xff0c;OceanBase 的生态工具经历…

Java图片或视频生成GIF动图,发送微信

目录前言GIF简介代码生成图片合成GIF自定义GIF动图视频生成GIF发送微信小结前言 别人的博客文章中有动态显示这是怎么做到的呢&#xff1f;别人的微信发送的表情动态为什么是自己鬼畜视频&#xff1f;这些都是别人做到的&#xff0c;本文就是让自己也可以做到以上的事情&#…

Java基于springboot+vue的图书馆网上图书借阅系统 nodejs前后端分离

在Internet高速发展的今天&#xff0c;我们生活的各个领域都涉及到计算机的应用&#xff0c;其中包括网上图书借阅系统的网络应用&#xff0c;在外国网上图书借阅系统已经是很普遍的方式&#xff0c;不过国内的管理网站可能还处于起步阶段。网上图书借阅系统具有网上图书信息管…

HTML小游戏3—— 机器人总动员(附完整源码)

&#x1f482; 网站推荐:【神级源码资源网】【摸鱼小游戏】&#x1f91f; 风趣幽默的前端学习课程&#xff1a;&#x1f449;28个案例趣学前端&#x1f485; 想寻找共同学习交流、摸鱼划水的小伙伴&#xff0c;请点击【摸鱼学习交流群】&#x1f4ac; 免费且实用的计算机相关知…

北京化工大学数据结构2022/10/27作业 题解

目录 问题 A: 二叉树的性质 问题 B: 二叉树的节点 问题 C: 满二叉树 问题 D: 完全二叉树的节点序号 -----------------------------------分割线------------------------------------------ 问题 E: 二叉树的深度 问题 F: 数据结构作业04 -- 二叉树的输入 递归版 迭代版…

【第一阶段:java基础】第4章:java控制结构

本系列博客是韩顺平老师java基础课的课程笔记 韩顺平P103-P1551. 顺序2. 分支3. 循环4. 跳转5. 编程思想1. 顺序 程序从上至下逐行执行&#xff0c;中间没有任何判断和跳转 2. 分支 单分支if双分支if-else多分支if - else if … - else嵌套分支&#xff1a;建议嵌套最好不要…

【水果派不吃灰】Raspberry Pi树莓派Linux系统下替换国内apt软件更新源

目录1. 前言2. 备份原始配置文件3. 修改原始文件3.1 软件更新源3.2 系统更新源4. 更新4.1 sudo apt-get update 更新软件源列表4.1 sudo apt-get upgrade 更新软件版本(时间会久点)4.1 sudo apt-get dist-upgrade4.1 sudo rpi-update 内核版本&#xff08;尽量不更新&#xff0…