mapreduce 将数据清洗后保存到 hbase

news2024/11/14 18:11:25

mapreduce 将数据清洗后保存到 hbase

数据格式

{"年份":"1990","国家补贴(亿元)":"5.4","地方补贴(亿元)":"3.2","企业补贴(亿元)":"0.8","其他补贴(亿元)":"0.5"}
{"年份":"1991","国家补贴(亿元)":"5.8","地方补贴(亿元)":"3.4","企业补贴(亿元)":"0.9","其他补贴(亿元)":"0.6"}
{"年份":"1992","国家补贴(亿元)":"6.2","地方补贴(亿元)":"3.7","企业补贴(亿元)":"1","其他补贴(亿元)":"0.7"}
{"年份":"1993","国家补贴(亿元)":"7","地方补贴(亿元)":"4.1","企业补贴(亿元)":"1.2","其他补贴(亿元)":"0.8"}
{"年份":"1994","国家补贴(亿元)":"7.8","地方补贴(亿元)":"4.5","企业补贴(亿元)":"1.4","其他补贴(亿元)":"0.9"}
{"年份":"1995","国家补贴(亿元)":"8.5","地方补贴(亿元)":"4.9","企业补贴(亿元)":"1.6","其他补贴(亿元)":"1"}
{"年份":"1996","国家补贴(亿元)":"9.2","地方补贴(亿元)":"5.3","企业补贴(亿元)":"1.8","其他补贴(亿元)":"1.1"}
{"年份":"1997","国家补贴(亿元)":"10","地方补贴(亿元)":"5.7","企业补贴(亿元)":"2","其他补贴(亿元)":"1.2"}
{"年份":"1998","国家补贴(亿元)":"10.8","地方补贴(亿元)":"6.1","企业补贴(亿元)":"2.2","其他补贴(亿元)":"1.3"}
{"年份":"1999","国家补贴(亿元)":"11.6","地方补贴(亿元)":"6.6","企业补贴(亿元)":"2.5","其他补贴(亿元)":"1.4"}
{"年份":"2000","国家补贴(亿元)":"12.5","地方补贴(亿元)":"7.2","企业补贴(亿元)":"2.8","其他补贴(亿元)":"1.6"}
{"年份":"2001","国家补贴(亿元)":"13.5","地方补贴(亿元)":"7.9","企业补贴(亿元)":"3.2","其他补贴(亿元)":"1.8"}
{"年份":"2002","国家补贴(亿元)":"14.5","地方补贴(亿元)":"8.7","企业补贴(亿元)":"3.7","其他补贴(亿元)":"2"}
{"年份":"2003","国家补贴(亿元)":"15.6","地方补贴(亿元)":"9.6","企业补贴(亿元)":"4.3","其他补贴(亿元)":"2.2"}
{"年份":"2004","国家补贴(亿元)":"16.8","地方补贴(亿元)":"10.6","企业补贴(亿元)":"5","其他补贴(亿元)":"2.5"}
{"年份":"2005","国家补贴(亿元)":"18.2","地方补贴(亿元)":"11.7","企业补贴(亿元)":"5.8","其他补贴(亿元)":"2.8"}
{"年份":"2006","国家补贴(亿元)":"19.8","地方补贴(亿元)":"12.9","企业补贴(亿元)":"6.7","其他补贴(亿元)":"3.2"}
{"年份":"2007","国家补贴(亿元)":"21.5","地方补贴(亿元)":"14.3","企业补贴(亿元)":"7.7","其他补贴(亿元)":"3.7"}
{"年份":"2008","国家补贴(亿元)":"23.3","地方补贴(亿元)":"15.9","企业补贴(亿元)":"8.8","其他补贴(亿元)":"4.3"}
{"年份":"2009","国家补贴(亿元)":"25.2","地方补贴(亿元)":"17.6","企业补贴(亿元)":"10.1","其他补贴(亿元)":"5"}
{"年份":"2010","国家补贴(亿元)":"27.2","地方补贴(亿元)":"19.4","企业补贴(亿元)":"11.6","其他补贴(亿元)":"5.8"}
{"年份":"2011","国家补贴(亿元)":"29.2","地方补贴(亿元)":"21.3","企业补贴(亿元)":"13.3","其他补贴(亿元)":"6.7"}
{"年份":"2012","国家补贴(亿元)":"31.3","地方补贴(亿元)":"23.4","企业补贴(亿元)":"15.2","其他补贴(亿元)":"7.7"}
{"年份":"2013","国家补贴(亿元)":"33.5","地方补贴(亿元)":"25.6","企业补贴(亿元)":"17.3","其他补贴(亿元)":"8.8"}
{"年份":"2014","国家补贴(亿元)":"35.8","地方补贴(亿元)":"27.9","企业补贴(亿元)":"19.6","其他补贴(亿元)":"10"}
{"年份":"2015","国家补贴(亿元)":"38.2","地方补贴(亿元)":"30.3","企业补贴(亿元)":"22.1","其他补贴(亿元)":"11.4"}
{"年份":"2016","国家补贴(亿元)":"40.7","地方补贴(亿元)":"32.8","企业补贴(亿元)":"24.9","其他补贴(亿元)":"13.1"}
{"年份":"2017","国家补贴(亿元)":"43.3","地方补贴(亿元)":"35.5","企业补贴(亿元)":"27.9","其他补贴(亿元)":"15.2"}
{"年份":"2018","国家补贴(亿元)":"46.2","地方补贴(亿元)":"38.3","企业补贴(亿元)":"31.2","其他补贴(亿元)":"17.6"}
{"年份":"2019","国家补贴(亿元)":"49.3","地方补贴(亿元)":"41.3","企业补贴(亿元)":"34.8","其他补贴(亿元)":"20.3"}
{"年份":"2020","国家补贴(亿元)":"52.5","地方补贴(亿元)":"44.6","企业补贴(亿元)":"38.7","其他补贴(亿元)":"23.5"}
{"年份":"2021","国家补贴(亿元)":"55.9","地方补贴(亿元)":"48.2","企业补贴(亿元)":"42.8","其他补贴(亿元)":"27.1"}
{"年份":"2022","国家补贴(亿元)":"59.4","地方补贴(亿元)":"52.1","企业补贴(亿元)":"47.3","其他补贴(亿元)":"31.4"}
{"年份":"2023","国家补贴(亿元)":"63.1","地方补贴(亿元)":"56.5","企业补贴(亿元)":"52.4","其他补贴(亿元)":"36.2"}

javabean

package cn.lhz.bean;

import cn.lhz.util.annotation.RowKeyAnnotation;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
 * 教育历年补贴
 *
 * @author 李昊哲
 * @version 1.0.0
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class SubsidyYear {
  /**
   * 年份
   */
  @RowKeyAnnotation
  private Integer year;

  /**
   * 国家
   */
  private double country;

  /**
   * 地方
   */
  private double local;

  /**
   * 企业
   */
  private double enterprise;

  /**
   * 其它
   */
  private double other;

  @Override
  public String toString() {
    return this.year + "\t" + this.country + "," + this.local + "," + this.enterprise + "," + this.other;
  }
}

mapreduce

package cn.lhz.etl;

import cn.lhz.bean.SubsidyYear;
import cn.lhz.util.hbase.HbaseUtil;
import cn.lhz.util.string.StringUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;

/**
 * 教育历年补贴
 *
 * @author 李昊哲
 * @version 1.0.0
 */
public class SubsidyYear2Hbase {
  public static class SubsidyYearMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
      // 将读取到的每行内容转为 java 字符串
      String json = value.toString();
      // 将读取到的 json 格式字符串 转为 csv 格式字符串
      String csv = StringUtil.extractValuesToString(json);
      System.out.println(csv);
      System.out.println("key >>> " + csv.substring(0, csv.indexOf(",")));
      System.out.println("value >>> " + csv.substring(csv.indexOf(",") + 1));
      // 截取 csv 格式字符串中第一个单元格的字符串作为输出的 key
      Text outKey = new Text(csv.substring(0, csv.indexOf(",")));
      // 截取 csv 格式字符串中除了第一个单元所有的字符串作为输出的 value
      Text outValue = new Text(csv.substring(csv.indexOf(",") + 1));
      // map输出
      context.write(outKey, outValue);
    }
  }

  public static class SubsidyYearReducer extends Reducer<Text, Text, Text, Text> {
    private Connection connection;
    public Table table;

    @Override
    protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
      // 与 hbase 建立连接
      connection = HbaseUtil.getConnection();
      // 数据表名称
      String tableName = "SUBSIDY_YEAR";
      // 获取数据表
      table = HbaseUtil.getTable(connection, tableName);
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
      String csv = "";
      for (Text value : values) {
        csv = value.toString();
      }
      try {
        SubsidyYear subsidyYear = StringUtil.csv2Bean(csv, false, SubsidyYear.class);
        subsidyYear.setYear(Integer.parseInt(key.toString()));
        HbaseUtil.upsert(table, "OVER_THE_YEARS", subsidyYear);
      } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException | InstantiationException e) {
        throw new RuntimeException(e);
      }
    }

    @Override
    protected void cleanup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
      if (table != null) {
        // 释放与 table 资源
        table.close();
      }
      if (connection != null) {
        // 释放与 hbase 之间的连接
        connection.close();
      }
    }
  }

  public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
    // 设置环境变量 hadoop 用户名 为 root
    System.setProperty("HADOOP_USER_NAME", "root");

    // 参数配置对象
    Configuration conf = new Configuration();

    // 跨平台提交
    conf.set("mapreduce.app-submission.cross-platform", "true");

    // 本地运行
    conf.set("mapreduce.framework.name", "local");
    // 设置集群本地文件系统路径
    conf.set("mapreduce.cluster.local.dir", "file:///home/lhz/hadoop");
    // 设置默认文件系统为 本地文件系统
    // conf.set("fs.defaultFS", "file:///");

    // 声明Job对象 就是一个应用
    // 为当前 job 设置名称 默认名称为打包后在的jar文件名称
    Job job = Job.getInstance(conf, "教育历年补贴");
    // 指定当前Job的驱动类
    job.setJarByClass(SubsidyYear2Hbase.class);
    // 指定当前Job的 Mapper
    job.setMapperClass(SubsidyYearMapper.class);
    // 设置 reduce 输出 value 的数据类型
    job.setReducerClass(SubsidyYearReducer.class);
    // 指定当前Job的 Reducer
    job.setOutputKeyClass(Text.class);
    // 设置 reduce 输出 key 的数据类型
    job.setOutputValueClass(Text.class);
    // 定义 map 输入的路径 注意:该路径默认为hdfs路径
    FileInputFormat.addInputPath(job, new Path("/edu-ods/教育补贴.log"));
    // 定义 reduce 输出数据持久化的路径 注意:该路径默认为hdfs路径
    Path path = new Path("/edu-dwd");
    // 根据配置项获取 HDFS 文件系统
    FileSystem fs = path.getFileSystem(conf);
    if (fs.exists(path)) {
      // 如果 数据输出目录存在 则将数据输出目录删除
      fs.delete(path, true);
    }
    FileOutputFormat.setOutputPath(job, path);
    // 提交 job
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2240326.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《实时流计算系统设计与实现》-Part 2-笔记

做不到实时 做不到实时的原因 实时计算很难。通过增量计算的方式来间接获得问题的&#xff08;伪&#xff09;实时结果&#xff0c;即使这些结果带有迟滞性和近似性&#xff0c;但只要能够带来尽可能最新的信息&#xff0c;那也是有价值的。 原因可分成3个方面&#xff1a; …

gdb调试redis。sudo

1.先启动redis-server和一个redis-cli。 2.ps -aux|grep reids查看redis相关进程。 3.开始以管理员模式附加进程调试sudo gdb -p 2968.注意这里不能不加sudo&#xff0c;因为Redis 可能以 root 用户启动&#xff0c;普通用户无法附加到该进程。否则就会出现可能下列情形&#…

长连接配置以及断线重连

目录 长连接index 主要进行连接 import SockJS from "sockjs-client"; import Stomp from "stompjs"; import { notification } from "antd"; // 网络请求API import { nowApiAddressObj } from "../api/nowApiAddressObj";// 工具 i…

LeetCode【0054】螺旋矩阵

本文目录 1 中文题目2 求解方法&#xff1a;数学模拟2.1 方法思路2.2 Python代码2.3 复杂度分析 3 题目总结 1 中文题目 给定一个 m 行 n 列的矩阵 matrix &#xff0c;请按照 顺时针螺旋顺序 &#xff0c;返回矩阵中的所有元素。 示例&#xff1a; 提示&#xff1a; 1 ≤ m …

万字长文解读深度学习——训练(DeepSpeed、Accelerate)、优化(蒸馏、剪枝、量化)、部署细节

&#x1f33a;历史文章列表&#x1f33a; 深度学习——优化算法、激活函数、归一化、正则化深度学习——权重初始化、评估指标、梯度消失和梯度爆炸深度学习——前向传播与反向传播、神经网络&#xff08;前馈神经网络与反馈神经网络&#xff09;、常见算法概要汇总万字长文解读…

C#版使用融合通信API发送手机短信息

目录 功能实现 范例运行环境 实现范例 类设计 类代码实现 调用范例 总结 功能实现 融合云通信服务平台&#xff0c;为企业提供全方位通信服务&#xff0c;发送手机短信是其一项核心功能&#xff0c;本文将讲述如何使用融合云服务API为终端手机用户发送短信信息&#xf…

第四十五章 Vue之Vuex模块化创建(module)

目录 一、引言 二、模块化拆分创建方式 三、模块化拆分完整代码 3.1. index.js 3.2. module1.js 3.3. module2.js 3.4. module3.js 3.5. main.js 3.6. App.vue 3.7. Son1.vue 3.8. Son2.vue 四、访问模块module的state ​五、访问模块中的getters ​六、mutati…

如何解决不能将开发板连接到虚拟机的问题(连接显示灰色,不能选中)

-- 如果连接上rk3588单片机&#xff0c;虚拟机无法来连接&#xff0c;如何更改 -- 先将虚拟机关机 -- 将虚拟机的配置文件以文本文件的形式打开 -- 再将所有的FALSE改为TRUE即可 -- 然后再次打开虚拟机即可

什么是白盒测试

一、什么是白盒测试 白盒测试又称结构测试、逻辑驱动测试或基于代码的测试。 白盒测试是一种测试用例设计方法&#xff0c;盒子指的是被测试的软件&#xff0c;白盒指的是盒子是可视的&#xff0c;即清楚盒子内部的东西以及里面是如何运作的。 "白盒"法需要测试者…

图形 2.6 伽马校正

伽马校正 B站视频&#xff1a;图形 2.6 伽马校正 文章目录 伽马校正颜色空间传递函数 Gamma校正校正过程为什么需要校正&#xff1f;CRT与转换函数 为什么sRGB在Gamma 0.45空间&#xff1f; 人对亮度的敏感韦伯定律中灰值 线性工作流不在线性空间下进行渲染的问题统一到线性空…

Android setContentView执行流程(一)-生成DecorView

Android setContentView执行流程(一)-生成DecorView Android setContentView执行流程(二)-将布局添加到mContentParent setContentView的流程主要就是讲在Activity的onCreate方法中调用setContentView方法之后&#xff0c;我们自定义的xml文件加载的过程&#xff0c;学习它可以…

【计算机网络】【网络层】【习题】

计算机网络-网络层-习题 文章目录 13. 图 4-69 给出了距离-向量协议工作过程&#xff0c;表&#xff08;a&#xff09;是路由表 R1 初始的路由表&#xff0c;表&#xff08;b&#xff09;是相邻路由器 R2 传送来的路由表。请写出 R1 更新后的路由表&#xff08;c&#xff09;。…

图像处理实验四(Adaptive Filter)

一、Adaptive Filter简介 自适应滤波器&#xff08;Adaptive Filter&#xff09;是一种能够根据输入信号的统计特性自动调整自身参数以达到最佳滤波效果的滤波器。它广泛应用于信号处理领域&#xff0c;如信道均衡、系统识别、声学回波抵消、生物医学、雷达、波束形成等模块。 …

typedef 与 extern 的结合:一场误解的澄清

typedef 与 extern 的结合:一场误解的澄清 一、typedef 的基本用法二、extern 的基本用法三、typedef 与 extern 的结合:一场误解的澄清示例二:使用 extern 声明外部变量示例三:错误的用法:尝试在 typedef 中使用 extern四、总结在C语言编程的世界里,typedef和extern是两…

Qt_day5_常用类

常用类 目录 1. QString 字符串类&#xff08;掌握&#xff09; 2. 容器类&#xff08;掌握&#xff09; 2.1 顺序容器QList 2.2 关联容器QMap 3. 几种Qt数据类型&#xff08;熟悉&#xff09; 3.1 跨平台数据类型 3.2 QVariant 统一数据类型 3.3 QStringList 字符串列表 4. QD…

HashMap的put流程知道吗

HashMap 的 put 方法算是 HashMap 中比较核心的功能了&#xff0c;复杂程度高但是算法巧妙&#xff0c;同时在上一版本的基础之上优化了存储结构&#xff0c;从链表逐步进化成了红黑树&#xff0c;以满足存取性能上的需要。本文逐行分析了 put 方法的执行流程&#xff0c;重点放…

鸿蒙UI开发——实现环形文字

1、背 景 有朋友提问&#xff1a;您好关于鸿蒙UI想咨询一个问题 如果我想实现展示环形文字是需要通过在Text组件中设置transition来实现么&#xff0c;还是需要通过其他方式来实现。 针对这位粉丝朋友的提问&#xff0c;我们做一下解答。 2、实现环形文字效果 ❓ 什么是环形…

保存pytest的执行日志;在日志中显示当前是第几次执行

1、在本地保存执行日志&#xff1a; 在终端中执行时因为指定了-s参数&#xff0c;所以会打印相关信息&#xff0c;可以帮助我们后续定位问题&#xff1a; 但是显示在终端时后面无法查看&#xff0c;所以需要把执行日志保存在本地&#xff0c;使用tee 或 重定向符号>&#x…

2024年8个最佳在线websocket调试工具选择

精选了 8 款功能强大且易于使用的 WebSocket 测试工具&#xff1a; 工具名称支持的系统是否免费ApifoxWindows, Mac, Linux是WebSocket KingWindows, Mac, Linux是PostmanWindows, Mac, Linux是Socket.IO Test ClientWindows, Mac, Linux是InsomniaWindows, Mac, Linux是Wires…

H5流媒体播放器EasyPlayer.js播放器wasm编译打包之后报uncaught referenceErro的原因排查

EasyPlayer.js H5播放器&#xff0c;是一款能够同时支持HTTP、HTTP-FLV、HLS&#xff08;m3u8&#xff09;、WS、WEBRTC、FMP4视频直播与视频点播等多种协议&#xff0c;支持H.264、H.265、AAC、G711A、Mp3等多种音视频编码格式&#xff0c;支持MSE、WASM、WebCodec等多种解码方…