Spark学习——DataFrame清洗HDFS日志并存入Hive中

news2025/1/9 16:50:42

目录

1.开启Hadoop集群和Hive元数据、Hive远程连接

2.配置

3.读取日志文件并清洗

4.单独处理第四列的数据——方法一:

5.单独处理第四列的数据——方法二: 

6.单独处理第四列的数据——方法三: 

7.数据清洗结果展示

8.存入Hive中

9.DataGrip中的代码


HDFS日志文件内容:

2023-02-20 15:19:46 INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Downloaded file edits_tmp_0000000000000030396-0000000000000033312_0000000000025236168 size 0 bytes.
2023-02-20 15:19:46 INFO org.apache.hadoop.hdfs.server.namenode.Checkpointer: Checkpointer about to load edits from 1 stream(s).
2023-02-20 15:19:46 INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Reading /opt/soft/hadoop313/data/dfs/namesecondary/current/edits_0000000000000030396-0000000000000033312 expecting start txid #30396
2023-02-20 15:19:46 INFO org.apache.hadoop.hdfs.server.namenode.FSImage: Start loading edits file /opt/soft/hadoop313/data/dfs/namesecondary/current/edits_0000000000000030396-0000000000000033312

我们要将上面的日志,使用DataFrame API清洗成表格并存入Hive中,清洗后的表格如下:

1.开启Hadoop集群和Hive元数据、Hive远程连接

2.配置

 val spark: SparkSession = SparkSession.builder().appName("demo01")
      .master("local[*]")
      .config("hive.metastore.uris", "thrift://lxm147:9083")
      .enableHiveSupport()
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext

    import spark.implicits._
    import org.apache.spark.sql.functions._

3.读取日志文件并清洗

// TODO 读取文件清洗
    val df1: DataFrame = sc.textFile("in/hadoophistory.log")
      .map(_.split(" "))
      .filter(_.length >= 8)
      .map(x => {
        val tuple: (String, String, String, String, String) = (x(0), x(1), x(2), x(3), x(4))
        tuple
      }).toDF()
    df1.show(4,false)
    /*
    +----------+--------+----+-------------------------------------------------------+------------+
    |_1        |_2      |_3  |_4                                                     |_5          |
    +----------+--------+----+-------------------------------------------------------+------------+
    |2023-02-20|15:19:46|INFO|org.apache.hadoop.hdfs.server.namenode.TransferFsImage:|Downloaded  |
    |2023-02-20|15:19:46|INFO|org.apache.hadoop.hdfs.server.namenode.Checkpointer:   |Checkpointer|
    |2023-02-20|15:19:46|INFO|org.apache.hadoop.hdfs.server.namenode.FSImage:        |Reading     |
    |2023-02-20|15:19:46|INFO|org.apache.hadoop.hdfs.server.namenode.FSImage:        |Start       |
    +----------+--------+----+-------------------------------------------------------+------------+
    */

4.单独处理第四列的数据——方法一:

  // TODO 单独处理第四列的数据
    val df2: DataFrame =
      df1.withColumn("test", split(col("_4"), "\\."))
        .select(
          $"_1".as("t1"),
          $"_2".as("t2"),
          $"_3".as("t3"),
          col("test").getItem(0).as("a0"),
          col("test").getItem(1).as("a1"),
          col("test").getItem(2).as("a2"),
          col("test").getItem(3).as("a3"),
          col("test").getItem(4).as("a4"),
          col("test").getItem(5).as("a5"),
          col("test").getItem(6).as("a6"),
          $"_5".as("t5")
        )

5.单独处理第四列的数据——方法二: 

val df2: DataFrame = 
      df1.rdd.map(
      line => {
        val strings: Array[String] = line.toString().split(",")
        val value: Array[String] = strings(3).split("\\.")
        (strings(0).replaceAll("\\[", ""), strings(1), strings(2),
          value(0), value(1), value(2), value(3), value(4), value(5), value(6),
          strings(4).replaceAll("]", "")
        )
      }
    ).toDF("t1", "t2", "t3", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "t5")

6.单独处理第四列的数据——方法三: 

方法三比较麻烦,但是可以对数据类型做单独处理,可以参考我的另一篇博文《》

另一篇博文中读取的日志数据更换了

7.数据清洗结果展示

df2.show(4, truncate = false)

+----------+--------+----+---+------+------+----+------+--------+----------------+------------+
|t1        |t2      |t3  |a1 |a2    |a3    |a4  |a5    |a6      |a7              |t5          |
+----------+--------+----+---+------+------+----+------+--------+----------------+------------+
|2023-02-20|15:19:46|INFO|org|apache|hadoop|hdfs|server|namenode|TransferFsImage:|Downloaded  |
|2023-02-20|15:19:46|INFO|org|apache|hadoop|hdfs|server|namenode|Checkpointer:   |Checkpointer|
|2023-02-20|15:19:46|INFO|org|apache|hadoop|hdfs|server|namenode|FSImage:        |Reading     |
|2023-02-20|15:19:46|INFO|org|apache|hadoop|hdfs|server|namenode|FSImage:        |Start       |
+----------+--------+----+---+------+------+----+------+--------+----------------+------------+

8.存入Hive中

println("正在存储......")
df2.write.mode(SaveMode.Overwrite).saveAsTable("shopping.dataframe")

spark.close()
sc.stop()
println("存储完毕......")

9.DataGrip中的代码

SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;

use shopping;
show tables;
select * from dataframe;

参考文章《将Spark数据帧保存到Hive:表不可读,因为“ parquet not SequenceFile”》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/429650.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

The 2021 China Collegiate Programming Contest (Harbin) D. Math master

题目链接 题解 2632^{63}263大概是101910^{19}1019那么一共有19位需要讨论, 每一个位数各有保留和删除两种状态, 全部状态就是2182^{18}218种 因为每一位数都有两种状态, 使用二进制数表示每个状态, 正好能全部表示, 在二进制位数下1表示保留, 0表示删除(反过来也一样) 使用二…

分布式版本控制工具 —— Git

一、Git 基本介绍 1.1 相关概念 1️⃣ 首先,我们要知道什么是Git? Git 是一个免费、开源的版本控制系统,它可以有效地跟踪文件的更改,协调多人在同一个项目上的开发,以及管理不同版本的代码。 Git 最初是由 Linus …

微服务架构下认证和鉴权理解

认证和鉴权 从单体应用到微服务架构,优势很多,但是并不是代表着就没有一点缺点了。 微服务架构,意味着每个服务都是松散耦合的。因此,作为软件工程师和架构师,我们在分布式架构中面临着安全挑战。微服务对外开放的端…

PLE详解

具体的实践中,我们主要参考了腾讯的PLE(Progressive Layered Extraction)模型,PLE相对于前面的MMOE和ESMM,主要解决以下问题: 多任务学习中往往存在跷跷板现象,也就是说,多任务学习相对于多个单任务学习的…

Linux/Ubuntu服务自启动原理剖析及三种实现方式

面向Linux系统,并非只是Ubuntu;系统版本不同,配置上可能有所不同。 1、自启动的原理剖析 1.1、 运行等级 Linux分了7个运行等级,分别用数字0,1,2,3,4,5,6表示…

【Python】【进阶篇】十八、Python爬虫获取动态加载数据

目录十八、Python爬虫获取动态加载数据18.1 确定网站类型18.2 影片详情信息18.3 影片总数量18.4 影片类型与类型码18.5 编写完整程序十八、Python爬虫获取动态加载数据 如何获取电影“分类排行榜”中的电影数据(电影),比如输入“剧情”则会输…

用EasyX图形库画一个哆啦A梦

继续说图形库,加一点实战用图形画图(用来巩固代码): rectangle这个函数 四个参数,左上角坐标的x,y值,右下角坐标的x,y值;因为只要有两个点,就可以以它们的横坐标之差为长&#xff…

三范式建模和维度建模,到底该选哪一个?

编辑导语:当你需要从头开始设计数据仓库时,你会选择哪种建模方式?也许,你会从三范式建模和维度建模二者中选择。但是这二者有其各自的适用范围,具体选择哪种方法,还需要回归至业务层。本篇文章里&#xff0…

day-004-链表-两两交换链表中的节点、删除链表的倒数第N个节点、链表相交、环形链表II

两两交换链表中的节点 题目建议:用虚拟头结点,这样会方便很多。 题目链接/文章讲解/视频讲解 /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* Li…

手麻系统源码,手术麻醉管理系统源码,二次开发方便快捷

手术麻醉管理系统源码,手麻系统源码,C# .net 桌面软件 C/S版 手术麻醉管理系统采用下拉式汉化菜单,界面友好,实用性强,设有与住院、病区、药房等系统的软件接口。 文末获取联系! 开发语言:C# …

4.1 随机变量的数学期望

学习目标: 如果我想学习随机变量的数学期望,我可能会采取以下步骤: 掌握概率论基础知识:在学习随机变量的期望之前,我需要了解概率论的基本概念,例如概率、随机变量、概率密度函数等。 学习数学期望的定义…

算法总结---最常用的五大算法(算法题思路)

一、总结 一句话总结: 【明确所求:dijkstra是求点到点的距离,辅助数组就是源点到目标点的数组】 【最简实例分析:比如思考dijkstra:假设先只有三个点】 1、贪心算法是什么? 当前看来最好的选择 局部最…

第二章(2):从零开始掌握PyTorch基础知识,打造NLP学习利器

第二章(2):从零开始掌握PyTorch基础知识,打造NLP学习利器! 目录第二章(2):从零开始掌握PyTorch基础知识,打造NLP学习利器!1. Pytorch基础1.1 Pytorch安装1.1.…

计算机网络考试复习——第四章 4.1 4.2.1 4.2.2

网络层传输的单位是IP数据报 4.1 网络层的几个重要概念 网络层提供的两种服务:网络层应该向运输层提供怎样的服务?面向连接还是无连接? 在计算机通信中,可靠交付应当由谁来负责?是网络还是端系统? 面向连…

X79G Xeon 2630v2 电脑 Hackintosh 黑苹果efi引导文件

原文来源于黑果魏叔官网,转载需注明出处。(下载请直接百度黑果魏叔) 硬件型号驱动情况 主板X79G 处理器Intel Xeon 2630v2已驱动 内存32g (16*2 2666MHZ)已驱动 硬盘Intel 760p 512GB已驱动 显卡RX 470已驱动 声卡瑞昱 英特尔 High De…

硬件外设使用方法——GPIO

【硬件外设使用】——GPIO用法GPIO基本概念GPIO应用pyb与micropython什么是pyb什么是micropythonpyb与micropython关系GPIO在micropython中的用法什么是pyb库pyb库中的GPIO用法micropython下的GPIO用法经过与硬件群的小伙伴商量,决定直接找个板子讲实战了- -。 本部…

qt动态加载qss 更好的推荐方式

1、编写QRC资源文件[window和linux通用] 2、过rcc程序生成rcc资源文件 生成2进制数据: 通过控制台窗口执行以下命令,会把qrc中的资源文件写成二进制数据保存 rcc.exe -binary .\resuorce.qrc -o .\resuorce.rcc 生成16进制数据: 通过控制台窗…

JavaSE学习进阶day03_02 内部类

第二章 内部类(最难的) 2.1 概述 2.1.1 什么是内部类 将一个类A定义在另一个类B里面,里面的那个类A就称为内部类,B则称为外部类。可以把内部类理解成寄生,外部类理解成宿主。 2.1.2 什么时候使用内部类 一个事物内…

STM32驱动SIM900A短信模块

简介:STM32F103C8T6驱动SIM900A短信模块源码介绍。 开发平台:KEIL ARM MCU型号:STM32F103C8T6 传感器型号:SIM900A 特别提示:驱动内可能使用了某些其他组件,比如delay等,在文末外设模板下载…

协议篇之以太网协议基础概念

协议篇之以太网协议基础概念一、写在前面二、什么是以太网三、以太网TCP/IP协议分层四、MAC地址与IP地址五、写在后面一、写在前面 在学习了串口协议后,发现通过串口传输数据,数据传输的速率较慢,无法符合高速率传输场景下的要求,…