Hadoop+Spark大数据技术 实验8 Spark SQL结构化

news2025/1/12 8:46:03

9.2 创建DataFrame对象的方式

val dfUsers = spark.read.load("/usr/local/spark/examples/src/main/resources/users.parquet")

dfUsers: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

dfUsers.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          NULL|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

9.2.2 json文件创建DataFrame对象

val dfGrade = spark.read.format("json").load("file:/media/sf_download/grade.json")

dfGrade: org.apache.spark.sql.DataFrame = [Class: string, ID: string ... 3 more fields]

dfGrade.show(3)

+-----+---+----+-----+-----+
|Class| ID|Name|Scala|Spark|
+-----+---+----+-----+-----+
|    1|106|Ding|   92|   91|
|    2|242| Yan|   96|   90|
|    1|107|Feng|   84|   91|
+-----+---+----+-----+-----+
only showing top 3 rows

9.2.3 RDD创建DataFrame对象

val list = List(

("zhangsan" , "19") , ("B" , "29") , ("C" , "9")

)

val df = sc.parallelize(list).toDF("name","age")

list: List[(String, String)] = List((zhangsan,19), (B,29), (C,9))
df: org.apache.spark.sql.DataFrame = [name: string, age: string]

df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)

df.show()

+--------+---+
|    name|age|
+--------+---+
|zhangsan| 19|
|       B| 29|
|       C|  9|
+--------+---+

9.2.4 SparkSession创建DataFrame对象

// 1.json创建Datarame对象

val dfGrade = spark.read.format("json").load("file:/media/sf_download/grade.json")

dfGrade.show(3)

+-----+---+----+-----+-----+
|Class| ID|Name|Scala|Spark|
+-----+---+----+-----+-----+
|    1|106|Ding|   92|   91|
|    2|242| Yan|   96|   90|
|    1|107|Feng|   84|   91|
+-----+---+----+-----+-----+
only showing top 3 rows

dfGrade: org.apache.spark.sql.DataFrame = [Class: string, ID: string ... 3 more fields]

Selection deleted

// 2.csv创建Datarame对象

val dfGrade2 = spark.read.option("header",true).csv("file:/media/sf_download/grade.json")

dfGrade2: org.apache.spark.sql.DataFrame = [{"ID":"106": string, "Name":"Ding": string ... 3 more fields]

dfGrade2.show(3)

+-----------+-------------+-----------+----------+-----------+
|{"ID":"106"|"Name":"Ding"|"Class":"1"|"Scala":92|"Spark":91}|
+-----------+-------------+-----------+----------+-----------+
|{"ID":"242"| "Name":"Yan"|"Class":"2"|"Scala":96|"Spark":90}|
|{"ID":"107"|"Name":"Feng"|"Class":"1"|"Scala":84|"Spark":91}|
|{"ID":"230"|"Name":"Wang"|"Class":"2"|"Scala":87|"Spark":91}|
+-----------+-------------+-----------+----------+-----------+
only showing top 3 rows

Selection deleted

// 3.Parquet创建Datarame对象

val dfGrade3 = spark.read.parquet("file:/usr/local/spark/examples/src/main/resources/users.parquet")

dfGrade3: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

dfGrade3.show(3)

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          NULL|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

9.2.5 Seq创建DataFrame对象

val dfGrade4 = spark.createDataFrame(

Seq(

("A" , 20 ,98),

("B" , 19 ,93),

("C" , 21 ,92),

)

)toDF("Name" , "Age" , "Score")

dfGrade4: org.apache.spark.sql.DataFrame = [Name: string, Age: int ... 1 more field]

dfGrade4.show()

+----+---+-----+
|Name|Age|Score|
+----+---+-----+
|   A| 20|   98|
|   B| 19|   93|
|   C| 21|   92|
+----+---+-----+

9.3 DataFrame对象保存为不同格式

9.3.1 write.()保存DataFrame对象

DataFrame.write() 提供了一种方便的方式将 DataFrame 保存为各种格式。以下是几种常见格式的保存方法:

1. 保存为JSON格式

df.write.json("path/to/file.json")

2. 保存为Parquet文件

df.write.parquet("path/to/file.parquet")

3. 保存为CSV文件

df.write.csv("path/to/file.csv")

9.3.2 write.format()保存DataFrame对象

DataFrame.write.format() 提供了一种更灵活的方式来保存 DataFrame,可以通过指定格式名称来选择输出格式。以下是几种常见格式的保存方法:

1. 保存为JSON格式

df.write.format("json").save("path/to/file.json")

2. 保存为Parquet文件

df.write.format("parquet").save("path/to/file.parquet")

3. 保存为CSV文件

df.write.format("csv").save("path/to/file.csv")

9.3.3 先将DataFrame对象转化为RDD再保存文件

虽然可以直接使用 DataFrame 的 write 方法保存文件,但有时需要先将 DataFrame 转换为 RDD 再进行保存。这可能是因为需要对数据进行一些 RDD 特定的操作,或者需要使用 RDD 的保存方法。

rdd = df.rdd.map(lambda row: ",".join(str(x) for x in row))
rdd.saveAsTextFile("path/to/file.txt")

注意:

  • 上述代码将 DataFrame 的每一行转换为逗号分隔的字符串,并将结果保存为文本文件。
  • 可以根据需要修改代码以使用不同的分隔符或保存为其他格式。

9.4 DataFrame对象常用操作

9.4.1 展示数据

1. show()

// 显示前20行数据

gradedf.show()

 

// 显示前10行数据

gradedf.show(10)

 

// 不截断列宽显示数据

gradedf.show(truncate=False)

 

2. collect()

// 将 DataFrame 转换成 Dataset 或 RDD,返回 Array 对象

gradedf.collect()

 

3. collectAsList()

// 将 DataFrame 转换成 Dataset 或 RDD,返回 Java List 对象

gradedf.collectAsList()

 

4. printSchema()

// 打印 DataFrame 的模式(schema)

gradedf.printSchema()

 

5. count()

// 统计 DataFrame 中的行数

gradedf.count()

 

6. first()、head()、take()、takeAsList()

// 返回第一行数据

gradedf.first()

 

// 返回前3行数据

gradedf.head(3)

 

// 返回前5行数据

gradedf.take(5)

 

// 返回前5行数据,以 Java List 形式

gradedf.takeAsList(5)

 

7. distinct()

// 返回 DataFrame 中唯一的行数据

gradedf.distinct.show()

 

8. dropDuplicates()

// 删除 DataFrame 中重复的行数据

gradedf.dropDuplicates(Seq("Spark")).show()

9.4.2 筛选

1. where()
   - 根据条件过滤 DataFrame 中的行数据。
   - 示例: gradedf.where("Class = '1' and Spark = '91'").show()

2. filter()
   - 与 where() 功能相同,根据条件过滤 DataFrame 中的行数据。
   - 示例: gradedf.filter("Class = '1'").show()

3. select()
   - 选择 DataFrame 中的指定列。
   - 示例: gradedf.select("Name", "Class","Scala").show(3,false)

修改名称:gradedf.select(gradedf("Name").as("name")).show()

4. selectExpr()
   - 允许使用 SQL 表达式选择列。
   - 示例: gradedf.selectExpr("name", "name as names" ,"upper(Name)","Scala * 10").show(3)

5. col()
   - 获取 DataFrame 中指定列的引用。
   - 示例: gradedf.col("name")

6. apply()
   - 对 DataFrame 中的每一行应用函数。
   - 示例: def get_grade_level(grade): return "A" if grade > 90 else "B" 
   gradedf.select("name", "grade", "grade_level").apply(get_grade_level, "grade_level")

7. drop()
   - 从 DataFrame 中删除指定的列。
   - 示例: gradedf.drop("grade")

8. limit()
   - 限制返回的行数。
   - 示例: gradedf.limit(10)

9.4.3 排序

按ID排序

1. orderBy()、sort()

orderBy() 和 sort() 方法都可以用于对 DataFrame 进行排序,它们的功能相同。

// 按id升序排序

gradedf.orderBy("id").show()

gradedf.sort(gradedf("Class").desc,gradedf("Scala").asc).show(3)

// 按id降序排序

gradedf.orderBy(desc("id")).show()

gradedf.sort(desc("id")).show()

2. sortWithinPartitions()

示例:gradedf.sortWithinPartitions("id").show(5)

引申:sortWithinPartitions() 方法用于对 DataFrame 的每个分区内进行排序。

// 首先对 DataFrame 进行重新分区,使其包含两个分区

val partitionedDF = gradedf.repartition(2)

// 对每个分区内的 id 进行升序排序

partitionedDF.sortWithinPartitions("id").show()

需要注意的是,sortWithinPartitions() 方法不会改变 DataFrame 的分区数量,它只是对每个分区内部进行排序。

9.4.4 汇总与聚合

1. groupBy()

groupBy() 方法用于根据指定的列对 DataFrame 进行分组。

(1) 结合 count()

// 统计每个名字的学生人数
gradedf.groupBy("name").count().show()

(2) 结合 max()

// 找出每个课程学生的最高成绩
gradedf.groupBy("Class").max("Scala","Spark").show()

(3) 结合 min()

// 找出每个名字学生的最低成绩
gradedf.groupBy("name").min("grade").show()

(4) 结合 sum()

// 计算每个名字学生的总成绩
gradedf.groupBy("name").sum("grade").show()

gradedf.groupBy("Class").sum("Scala","Spark").show()

(5) 结合 mean()

// 计算课程的平均成绩

gradedf.groupBy("Class").sum("Scala","Spark").show()

2. agg()

agg() 方法允许对 DataFrame 应用多个聚合函数。

(1) 结合 countDistinct()

// 统计不重复的名字数量
gradedf.agg(countDistinct("name")).show()

(2) 结合 avg()

gradedf.agg(max("Spark"), avg("Scala")).show()

// 计算所有学生的平均成绩
gradedf.agg(avg("grade")).show()

(3) 结合 count()

// 统计学生总数
gradedf.agg(count("*")).show()

(4) 结合 first()

// 获取第一个学生的姓名
gradedf.agg(first("name")).show()

(5) 结合 last()

// 获取最后一个学生的姓名
gradedf.agg(last("name")).show()

(6) 结合 max()、min()

// 获取最高和最低成绩
gradedf.agg(max("grade"), min("grade")).show()

(7) 结合 mean()

// 计算所有学生的平均成绩
gradedf.agg(mean("grade")).show()

(8) 结合 sum()

// 计算所有学生的总成绩
gradedf.agg(sum("grade")).show()

(9) 结合 var_pop()、variance()

// 计算成绩的总体方差
gradedf.agg(var_pop("grade"), variance("grade")).show()

(10) 结合 covar_pop()

// 计算 id 和 grade 之间的总体协方差
gradedf.agg(covar_pop("id", "grade")).show()

(11) 结合 corr()

gradedf.agg(corr("Spark","Scala")).show()

9.4.5 统计

771468c1170e42e88ed99fd7cc2fc4d2.png

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1691712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【传知代码】掩码自回归编码器法(论文复现)

前言:在探索现代数据科学的前沿领域时,掩码自回归编码器法(Masked Autoencoder,简称MAE)无疑是一个引人注目的亮点。这一技术,凭借其独特的训练机制和卓越的性能,已经在图像识别、自然语言处理以…

K8s 搭建 FileBeat+ELK 分布式日志收集系统 以及 KQL 语法介绍

一、K8s FileBeat ELK 介绍 ELK,即Elasticsearch、Logstash和Kibana三个开源软件的组合,是由Elastic公司提供的一套完整的日志管理解决方案。Elasticsearch是一个高度可扩展的开源全文搜索和分析引擎,它允许你快速地、近乎实时地存储、搜索…

PCB设计——返回路径

回流基本概念 从电路理论上看,信号是由电流传播的,明确的说是电子的运动,电子流的特性之一就是电子从不在任何地方停留,无论电流流到哪里,必然要回来,因此电流总是在环路中流动,从源到负载然后从…

高效使用 LaTeX 技巧

但对于一般人而言,你不需要通过学习 Vim 来达到高效编辑 LaTeX 的方式。而是通过一些比较容易实现的方式,使得你能够在原来的基础上更加高效得使用 LaTeX,并达到以思考的速度输入 LaTeX 的方式。 在第一部分,我会首先介绍高效编辑…

1301-习题1-1高等数学

1. 求下列函数的自然定义域 自然定义域就是使函数有意义的定义域。 常见自然定义域: 开根号 x \sqrt x x ​: x ≥ 0 x \ge 0 x≥0自变量为分式的分母 1 x \frac{1}{x} x1​: x ≠ 0 x \ne 0 x0三角函数 tan ⁡ x cot ⁡ x \tan x \cot x …

告别登录烦恼,WPS免登录修改器体验!(如何实现不登录使用WPS)

文章目录 📖 介绍 📖🏡 演示环境 🏡📒 解决方案 📒🎈 获取方式 🎈⚓️ 相关链接 ⚓️ 📖 介绍 📖 想象一下,如果你能够绕过繁琐的登录流程&#x…

探秘机器学习经典:K-近邻算法(KNN)全解析

在浩瀚的机器学习宇宙中,K-近邻算法(K-Nearest Neighbors,简称KNN)如同一颗璀璨的明星,以其简洁直观的原理和广泛的应用范围,赢得了众多数据科学家的喜爱。今天,让我们一起揭开KNN的神秘面纱,深入探讨它的运作机制、优缺点、应用场景,以及如何在实际项目中灵活运用。 …

从零到一建设数据中台 - 数据治理路径

一、数据治理的内容 数据治理用于规范数据的生成以及使用,改进数据质量,对数据进行加工处理,提升数据价值。提供识别和度量数据质量能力、数据清洗转换能力、数据加工三个核心能力。 数据汇集:数据汇集是数据中台数据接入的入口,所有数据来自于业务系统、日志、文件、网络…

JDBC总结

目录 JDBC(java database connection) JDBC连接数据库步骤: 1. 在项目中添加jar文件,如图所示 2.加载驱动类 向数据库中插入数据代码示例: 第一种: 第二种: 查询操作 : 第一种: 第二种: JDBC(java database connection) java数据库连接.api(应用程序编程接口) ,可…

esp32开发中CMakeLists.txt文件在编译时添加打印信息

在使用CMakeLists.txt文件时,我们时常会对一些宏定义表示的具体路径表示迷茫,不太确定具体表示的路径是哪个,这个时候就希望能在编译的时候打印当前文件中使用的宏定义表示的路径的具体信息。 就像下图中,编译时打印出 CMAKE_CU…

牛客NC391 快乐数【simple 模拟法 Java/Go/PHP】

题目 题目链接: https://www.nowcoder.com/practice/293b9ddd48444fa493dd17da0feb192d 思路 直接模拟即可Java代码 import java.util.*;public class Solution {/*** 代码中的类名、方法名、参数名已经指定,请勿修改,直接返回方法规定的值…

serverless在点淘的质量保障实践

SERVERLESS能够将应用分为研发域和运维域,使两者独立迭代,降低运维成本,提升研发效率。点淘作为试点项目,经历了包括功能回归、压力测试和监控验证在内的质量保障流程,并在实践中遇到了各种问题,如依赖梳理…

VOS3000被DDOS攻击后该怎么办

VOS3000遭受DDoS攻击的应对措施 当VOS3000遭受DDoS攻击时,可以采取以下几个步骤来应对: 立即启动防火墙:尽管难以完全阻止DDoS攻击,但防火墙可以在一定程度上帮助抵御攻击,减轻其造成的危害。 联系服务器提供商&#…

抖音小店新规重磅来袭!事关店铺流量!商家的福音来了?

大家好,我是喷火龙。 就在前两天,抖店发布了新规,我给大家总结了一下,无非就是两点。 第一点:保证金下调,一证开多店。 第二点:新品上架破10单,有流量扶持。 咱来细细的解读&…

人生苦短,我学python之数据类型(下)

个人主页:星纭-CSDN博客 系列文章专栏:Python 踏上取经路,比抵达灵山更重要!一起努力一起进步! 目录 一.集合 1.1子集与超集 1.2交集,并集,补集,差集 1.intersection(英文&a…

学习笔记——STM32F103V3版本——HC-05模块控制数码管

一.硬件 1.HC-05模块 2.数码管 3.连接硬件 二.在keil5中的代码 main.c代码: #include "stm32f10x.h" #include "buletooth.h" #include "led.h" #include "sys.h" #include "usart.h" #include "delay.…

python实用系列:按顺序重命名文件

啊,好久没更博客了,今天偶然想换个桌面壁纸,于是上网搜了两个比较满意的桌面壁纸,都是压缩包: 当我想要给他们放到我的桌面壁纸文件里的时候患了难,因为他们的名字有相同的: anime文件夹里边&a…

el-select可选择可搜索可输入新内容

需求:el-form-item添加el-select,并且el-select可选择可搜索可输入新内容,并且和其他的el-input做联动,如果是选择,那么el-input自动回填数据并且不可编辑,如果el-select输入新的内容,那么el-in…

js禁止使用浏览器的前进后退按钮的方法

效果图: // 替换当前页面的历史记录,使用户不能通过浏览器的前进后退按钮导航 history.replaceState(null, null, location.href);// 监听浏览器的历史记录变化事件 window.onpopstate function(event) {// 再次替换当前页面的历史记录,确保…

接口使用实例——数组排序

对于基本数据类型的大小比较&#xff0c;我们可以使用<,>,或者equals方法进行比较&#xff0c;但是对象之间如何进行比较呢&#xff1f;要对对象进行比较&#xff0c;我们必须对同一个成员变量进行比较&#xff0c;如我们可以通过比较name的大小来得出两个对象的大小&…