大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例

news2024/9/29 5:33:15

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节完成的内容如下:

  • SparkSession
  • RDD、DataFrame、DataSet
  • 三者之间互相转换 详细解释
    在这里插入图片描述

核心操作

Transformation(转换操作)

定义:
Transformation是懒执行的操作,意味着这些操作在调用时并不会立即执行计算,而是会生成一个新的数据集(或RDD),它们描述了从输入数据到输出数据的转换逻辑。Transformation的计算会被延迟,直到遇到一个Action操作时才会真正触发执行。

常见操作:

  • select(): 从DataFrame中选择列。
  • filter(): 过滤掉不符合条件的行。
  • join(): 连接两个DataFrame。
  • groupBy(): 对数据进行分组。
  • agg(): 聚合操作。

Action(行动操作)

定义:
Action操作会触发Spark的计算并返回结果。与Transformation不同,Action操作会执行整个计算逻辑,并产生最终的输出,如将结果写入外部存储或将数据返回给驱动程序。

常见操作:

  • show(): 显示DataFrame的内容。
  • collect(): 将DataFrame的数据收集到驱动程序上,作为本地集合返回。
  • count(): 计算DataFrame中的行数。
  • write(): 将DataFrame的数据写入外部存储(如HDFS、S3、数据库等)。
  • take(): 返回DataFrame的前n行数据。

Action操作

与RDD类似的操作

  • show
  • collect
  • collectAsList
  • head
  • first
  • count
  • take
  • takeAsList
  • reduce

与结构相关

  • printSchema
  • explain
  • columns
  • dtypes
  • col

生成数据

保存并上传到服务器上

EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO
7369,SMITH,CLERK,7902,2001-01-02 22:12:13,800,,20
7499,ALLEN,SALESMAN,7698,2002-01-02 22:12:13,1600,300,30
7521,WARD,SALESMAN,7698,2003-01-02 22:12:13,1250,500,30
7566,JONES,MANAGER,7839,2004-01-02 22:12:13,2975,,20
7654,MARTIN,SALESMAN,7698,2005-01-02 22:12:13,1250,1400,30
7698,BLAKE,MANAGER,7839,2005-04-02 22:12:13,2850,,30
7782,CLARK,MANAGER,7839,2006-03-02 22:12:13,2450,,10
7788,SCOTT,ANALYST,7566,2007-03-02 22:12:13,3000,,20
7839,KING,PRESIDENT,,2006-03-02 22:12:13,5000,,10
7844,TURNER,SALESMAN,7698,2009-07-02 22:12:13,1500,0,30
7876,ADAMS,CLERK,7788,2010-05-02 22:12:13,1100,,20
7900,JAMES,CLERK,7698,2011-06-02 22:12:13,950,,30
7902,FORD,ANALYST,7566,2011-07-02 22:12:13,3000,,20
7934,MILLER,CLERK,7782,2012-11-02 22:12:13,1300,,10

写入内容如下图所示:
在这里插入图片描述

测试运行

我们进入 spark-shell 进行测试

// 处理头,使用自动类型推断
val df1 = spark.read.option("header", true).option("infershema", "true").csv("test_spark_03.txt")

df1.count
// 缺省显示20行
df1.union(df1).show()
// 显示2行
df1.show(2)

执行结果如下图所示:
在这里插入图片描述
继续进行测试:

// 不截断字符
df1.toJSON.show(false)
// 显示10行 不截断字符
df1.toJSON.show(10, false)

运行结果如下图所示:
在这里插入图片描述
继续进行测试:

// collect 返回数组 Array[Row]
val c1 = df1.collect()
// collectAsList 返回List Lits[Row]
val c2 = df1.collectAsList()

// 返回 Row
val h1 = df1.head()
val f1 = df1.first()

// 返回 Array[Row]
val h2 = df1.head(3)
val f2 = df1.take(3)

// 返回 List[Row]
val t2 = df1.takeAsList(2)

运行结果如下图所示:
在这里插入图片描述
继续进行测试:

// 结构属性
// 查看列名
df1.columns
// 查看列名和类型
df1.dtypes
// 查看执行计划
df1.explain()
// 获取某个列
df1.col("ENAME")
// 常用
df1.printSchema

运行结果如下图所示:
在这里插入图片描述

Transformation 操作

  • RDD 类似的操作
  • 持久化/缓存 与 checkpoint
  • select
  • where
  • group by / 聚合
  • order by
  • join
  • 集合操作
  • 空值操作(函数)
  • 函数

与RDD类似的操作

  • map
  • filter
  • flatMap
  • mapPartitions
  • sample
  • randomSplit
  • limt
  • distinct
  • dropDuplicates
  • describe

我们进行测试:

val df1 = spark.read.csv("/opt/wzk/data/people1.csv")
// 获取第1列
df1.map(row => row.getAs[String](0)).show

// randomSplit 将DF、DS按给定参数分成多份
val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7))
df2(0).count
df2(1).count
df2(2).count

测试结果如下图:
在这里插入图片描述

我们继续进行测试:

// 取10行数据生成新的Dataset
val df2 = df1.limit(10)
// distinct 去重
val df2 = df1.union(df1)
df2.distinct.count

// dropDuplicates 按列值去重
df2.dropDuplicates.show
df2.dropDuplicates("_c0").show

执行结果如下图:
在这里插入图片描述

存储相关

  • cacheTable
  • persist
  • checkpoint
  • unpersist
  • cache

备注:Dataset默认的存储级别是 MEMEORY_AND_DISK

spark.sparkContext.setCheckpointDir("hdfs://h121.wzk.icu:9000/checkpoint")

df1.show()
df1.checkpoint()
df1.cache()

import org.apache.spark.storage.StorageLevel
df1.persist(StorageLevel.MEMORY_ONLY)
df1.count()
df1.unpersist(true)

执行结果如下图所示:
在这里插入图片描述

select相关

  • 列的多种表示
  • select
  • selectExpr

启动 Spark-Shell 继续进行测试

// 这里注意 option("header", "true") 自动解析一下表头
val df1 = spark.read.option("header", "true").csv("/opt/wzk/data/people1.csv")

// $ col() 等等 不可以混用!!!(有解决方法,但是建议不混用!!!)
// 可以多种形式获取到列
df1.select($"name", $"age", $"job").show

执行结果如下图所示:
在这里插入图片描述

继续进行测试

df1.select("name", "age", "job").show(3)
df1.select(col("name"), col("age"), col("job")).show(3)
df1.select($"name", $"age"+1000, $"job").show(5)

运行结果如下图所示:
在这里插入图片描述

where相关

接着对上述内容进行测试:

df1.filter("age > 25").show
df1.filter("age > 25 and name == 'wzk18'").show
df1.where("age > 25").show
df1.where("age > 25 and name == 'wzk19'").show

运行测试结果如下图:
在这里插入图片描述

groupBy相关

  • groupBy
  • agg
  • max
  • min
  • avg
  • sum
  • count

进行测试:

// 由于我的字段中没有数值类型的,就不做测试了
df1.groupBy("Job").sum("sal").show
df1.groupBy("Job").max("sal").show
df1.groupBy("Job").min("sal").show
df1.groupBy("Job").avg("sal").show
df1.groupBy("Job").count.show
df1.groupBy("Job").avg("sal").where("avg(sal) > 2000").show
df1.groupBy("Job").avg("sal").where($"avg(sal)" > 2000).show
df1.groupBy("Job").agg("sal"->"max", "sal"->"min", "sal"-
>"avg", "sal"->"sum", "sal"->"count").show
df1.groupBy("deptno").agg("sal"->"max", "sal"->"min", "sal"-
>"avg", "sal"->"sum", "sal"->"count").show

orderBy相关

orderBy == sort

df1.orderBy("name").show(5)
df1.orderBy($"name".asc).show(5)
df1.orderBy(-$"age").show(5)

运行测试的结果如下图所示:
在这里插入图片描述
继续进行测试:

df1.sort("age").show(3)
df1.sort($"age".asc).show(3)
df1.sort(col("age")).show(3)

测试结果如下图所示:
在这里插入图片描述

JOIN相关

// 笛卡尔积
df1.crossJoin(df1).count
// 等值连接(单字段)
df1.join(df1, "name").count
// 等值连接(多字段)
df1.join(df1, Seq("name", "age")).show

运行的测试结果如下图所示:
在这里插入图片描述

这里编写两个case:


// 第一个数据集
case class StudentAge(sno: Int, name: String, age: Int)

val lst = List(StudentAge(1,"Alice", 18), StudentAge(2,"Andy", 19), StudentAge(3,"Bob", 17), StudentAge(4,"Justin", 21), StudentAge(5,"Cindy", 20))

val ds1 = spark.createDataset(lst)

// 第二个数据集
case class StudentHeight(sname: String, height: Int)

val rdd = sc.makeRDD(List(StudentHeight("Alice", 160), StudentHeight("Andy", 159), StudentHeight("Bob", 170), StudentHeight("Cindy", 165), StudentHeight("Rose", 160)))

val ds2 = rdd.toDS

运行测试的结果如下图所示:
在这里插入图片描述
接下来我们进行连表操作:

// 连表操作 不可以使用 "name"==="sname" !!!
ds1.join(ds2, 'name==='sname).show
ds1.join(ds2, ds1("name")===ds2("sname")).show
ds1.join(ds2, $"name"===$"sname").show
ds1.join(ds2, $"name"===$"sname", "inner").show

测试的运行结果如下图所示:
在这里插入图片描述

在这里插入图片描述

集合相关

val ds3 = ds1.select("name")
val ds4 = ds2.select("sname")

// union 求并集、不去重
ds3.union(ds4).show
// unionAll(过时了)与union等价
// intersect 求交
ds3.intersect(ds4).show
// except 求差
ds3.except(ds4).show

运行结果如下图所示:
在这里插入图片描述

空值处理

math.sqrt(-1.0)
math.sqrt(-1.0).inNaN()
df1.show
// 删除所有列的空值和NaN
df1.na.drop.show
// 删除某列的空值和NaN
df1.na.drop(Array("xxx")).show
// 对列进行填充
df1.na.fill(1000).show
df1.na.fill(1000, Array("xxx")).show

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2066684.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于lettuce的一次pipeline反向优化

起因是后台job对一批数据做大量的redis读写操作,为了提高job的执行速度,直接使用pipeline对一些不能批量读写的命令进行管道优化 简单介绍什么是lettuce Spring Boot自2.0版本开始默认使用Lettuce作为Redis的客户端(注1)。Lettu…

WEB渗透免杀篇-绕过

360白名单 需要足够的权限 360的扫描日志和设置白名单日志位置在:C:\Users[username]\AppData\Roaming\360Safe\360ScanLog 查看扫描日志内容可以查询到白名单文件 日志文件记录的是添加或移除白名单的时间、文件名、hash等信息,otc1为添加白名单&#…

SadTalker翻译与代码调试

文章目录 SadTalker原文翻译SadTalker:学习风格化音频驱动单幅图像说话人脸动画的真实 3D 运动系数Abstract1. Introduction2. Related Work3. Method3.1. Preliminary of 3D Face Model3.2. Motion Coefficients Generation through Audio3.3. 3D-aware Face Rende…

操作系统简介:设备管理

设备管理 1. 设备管理概述2. 设备管理技术通道技术DMA技术缓冲技术Spooling技术 3. 磁盘调度 设备管理是操作系统中最繁杂而且与硬件紧密相关的部分,不但要管理实际 I/O 操作的设备(如磁盘机、扫描仪、打印机、键盘和鼠标),还要管理诸如设备控制器、DMA…

Linux信号机制探析--信号的处理

🍑个人主页:Jupiter. 🚀 所属专栏:Linux从入门到进阶 欢迎大家点赞收藏评论😊 目录 🍑信号处理信号处理常见方式概览 🍒内核如何实现信号的捕捉 🍎内核态与用户态操作系统是如何正常…

下载cmake操作步骤

cmake官网链接 cmake-3.30.2.tar.gz源代码官网下载链接

中国的人形机器人都有哪些出色的产品?

8月21日,2024世界机器人大会在北京亦庄正式开幕。本次大会共有169家企业集中展出了600余件机器人创新产品,人形机器人占比最大,大会还开设人形机器人专区,共亮相27款整机。 展会中多数人形机器人产品都偏向服务型,主要…

乾坤大挪移!将脚趾移到手指上,江山邦尔骨科医院成功完成一例断指再植手术

2024年6月中旬,家住江山贺村的何阿姨经历一次不小的意外。 那天天气晴朗,何阿姨准备把院子修缮修缮。操作切割工具时,何阿姨没有握稳,让工具一下子飞了出去——飞出去的瞬间,工具切掉了她的左手拇指,血流不…

网络安全大考,攻防演练驱动企业常态化安全运营升级!

当前,网络安全形势日益严峻,恶意软件、勒索软件肆虐,钓鱼攻击手段层出不穷,不断威胁企业数据安全与业务连续性。随着云计算、大数据、物联网等新兴技术的广泛应用,网络边界模糊化,攻击面急剧扩大&#xff0…

Qt (10)【Qt窗口 —— 如何在窗口中创建浮动窗口和状态栏】

阅读导航 引言一、如何在窗口中创建浮动窗口1. 浮动窗口的创建2. 设置停靠的位置 二、如何在窗口中创建状态栏1. 状态栏的创建2. 在状态栏中显示实时消息3. 在状态栏中显示永久消息4. 调整显示消息的位置,并加上进度条 引言 在上一篇文章中,我们一同探索…

数据结构(6_3_1)——图的广度优先遍历

树和图的广度优先遍历区别 树的广度优先遍历&#xff1a; 图的广度优先遍历&#xff1a; 代码&#xff1a; 注:以下代码只适合连通图 #include <stdio.h> #include <stdbool.h>#define MAX_VERTEX_NUM 100typedef struct ArcNode {int adjvex; // 该边所指向的顶…

慧灵夹爪:工业智能的创新先锋

慧灵作为一个知名老品牌&#xff0c;其机器人产品在众多场景中广为人知。随着智能化、自动化技术的不断提升&#xff0c;智能工业飞速发展&#xff0c;慧灵夹爪在其中发挥的作用也越来越多。 在工业自动化生产中&#xff0c;精准与灵活是衡量设备性能的重要标尺。慧灵夹爪以其卓…

Criteria 是干什么用的?

我 | 在这里 ⭐ 全栈开发攻城狮、全网10W粉丝、2022博客之星后端领域Top1、专家博主。 &#x1f393;擅长 指导毕设 | 论文指导 | 系统开发 | 毕业答辩 | 系统讲解等。已指导60位同学顺利毕业 ✈️个人公众号&#xff1a;热爱技术的小郑。回复 Java全套视频教程 或 前端全套视频…

简易电压表设计验证

前言 电压表是测量电压的一种仪器。由永磁体、线圈等构成。电压表是个相当大的电阻器&#xff0c;理想的认为是断路。初中阶段实验室常用的电压表量程为0~3V和0~15V。 传统的指针式电压表包括一个灵敏电流计&#xff0c;在灵敏电流计里面有一个永磁体&#xff0c;在电流计的两个…

GenAI 的产品:快速行动,但失败

2022 年秋季&#xff0c;我正在做一个很酷的项目。是的&#xff0c;你猜对了——使用公司特定的数据对预先训练的 LLM&#xff08;Bert&#xff09;进行微调。 然而&#xff0c;很快 ChatGPT 就发布了&#xff0c;并席卷了全世界。既然已经有一门非常强大的 LLM 了&#xff0c…

支持AI智能搜索的知识库管理系统有哪些?分享4个软件

引言 在数字化时代&#xff0c;知识的获取、管理和利用已成为企业竞争力的重要组成部分。随着信息量的爆炸性增长&#xff0c;如何快速、准确地从海量数据中检索出有价值的知识&#xff0c;成为企业面临的一大挑战。支持AI智能搜索的知识库管理系统能够快速准确地检索信息&…

【前端】vue监视属性和计算属性对比

首先分开讲解各个属性的作用。 1.计算属性 作用&#xff1a;用来计算出来一个值&#xff0c;这个值调用的时候不需要加括号&#xff0c;会根据依赖进行缓存&#xff0c;依赖不变&#xff0c;computed的值不会重新计算。 const vm new Vue({el:#root,data:{lastName:张,firstNa…

严重腰椎滑脱、无法走路,江山邦尔骨科医院机器人辅助手术为患者完美复位

8月8日上午&#xff0c;53岁的李清&#xff08;化名&#xff09;扶着腰、跛脚走进江山邦尔骨科医院。接诊他的&#xff0c;是江山邦尔骨科医院脊柱科的林科院长。 李清和林院长说&#xff0c;自己已有长达两年的腰痛史&#xff0c;最近还伴随右腿麻木及跛行的症状&#xff0c;严…

深度解析上海我店 三年突破一百亿销售额!

在当今数字化时代的大潮中&#xff0c;消费模式正经历着翻天覆地的变革。上海我店网络科技有限公司&#xff08;简称“我店”&#xff09;&#xff0c;凭借其创新的“互联网实体终端”融合商业模式与独特的绿色积分体系&#xff0c;在消费市场中异军突起&#xff0c;成为引领行…

ClkLog常见问题-埋点集成篇Sec. 1

本篇主要解答ClkLog使用过程中【埋点集成】阶段的常见问题。 1.【指标项数据统计】 问&#xff1a;数据概览无法看到数据。 答&#xff1a;如果数据概览所有指标项都没有数据&#xff0c;则需要先检查埋点数据是否接收成功&#xff1b;如果只是会话相关数据&#xff08;访问次数…