Spark学习(6)-Spark SQL

news2025/1/23 3:52:11

1 快速入门

SparkSQL是Spark的一个模块, 用于处理海量结构化数据
SparkSQL是非常成熟的 海量结构化数据处理框架.
学习SparkSQL主要在2个点:

  • SparkSQL本身十分优秀, 支持SQL语言\性能强\可以自动优化\API简单\兼容HIVE等等。
  • 企业大面积在使用SparkSQL处理业务数据。
    • 离线开发
    • 数仓搭建
    • 科学计算
    • 数据分析

特点:
在这里插入图片描述

2 SparkSQL概述

2.1 SparkSQL和Hive的异同

在这里插入图片描述

2.2 SparkSQL的数据抽象

在这里插入图片描述
在这里插入图片描述

2.3 SparkSession对象

在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
SparkSession对象可以:

  • 用于SparkSQL编程作为入口对象。
  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

所以,后续执行环境入口对象,统一变更为SparkSession对象。

在这里插入图片描述
2.4 SparkSession对象

# coding:utf8
# SparkSQL 中的入口对象是SparkSession对象
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 构建SparkSession对象, 这个对象是 构建器模式 通过builder方法来构建
spark = SparkSession.builder.\
appName("local[*]").\
config("spark.sql.shuffle.partitions", "4").\
getOrCreate()
# appName 设置程序名称, config设置一些常用属性
# 最后通过getOrCreate()方法 创建SparkSession对象

3 DataFrame入门和操作

3.1 DataFrame的组成

在结构层面:

  • StructType对象描述整个DataFrame的表结构。
  • StructField对象描述一个列的信息。

在数据层面:

  • Row对象记录一行数据。
  • Column对象记录一列数据并包含列的信。

StructType描述,如下图:
在这里插入图片描述
一个StructField记录:列名、列类型、列是否运行为空。
多个StructField组成一个StructType对象。
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空。

3.2 DataFrame的代码构建 - 基于RDD方式1

DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构。

# 首先构建一个RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
map(lambda x: x.split(',')).\
map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测
# 构建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])

通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。

3.3 DataFrame的代码构建 - 基于RDD方式2

通过StructType对象来定义DataFrame的“表结构”转换RDD

# 创建DF , 首先创建RDD 将RDD转DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
map(lambda x:x.split(',')).\
map(lambda x:(int(x[0]), x[1], int(x[2])))
# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add, 每一个add代表一个StructField
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
df = spark.createDataFrame(rdd, schema)

3.4 DataFrame的代码构建 - 基于RDD方式3

使用RDD的toDF方法转换RDD

# StructType 类
# 这个类 可以定义整个DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
# add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
# 方式1: 只传列名, 类型靠推断, 是否允许为空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()
# 方式2: 传入完整的Schema描述对象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()

3.5 DataFrame的代码构建 - 基于Pandas的DataFrame

将Pandas的DataFrame对象,转变为分布式的SparkSQL

# 构建Pandas的DF
pdf = pd.DataFrame({
"id": [1, 2, 3],
"name": ["张大仙", '王晓晓', '王大锤'],
"age": [11, 11, 11]
})
# 将Pandas的DF对象转换成Spark的DF
df = spark.createDataFrame(pdf)

3.6 DataFrame的代码构建 - 读取外部数据

通过SparkSQL的统一API进行数据读取构建DataFrame

sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
.option("K", "V") # option可选
.schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT")
.load("被读取文件的路径, 支持本地文件系统和HDFS")

读取text数据源

使用format(“text”)读取文本数据,读取到的DataFrame只会有一个列,列名默认称之为:value。

schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text")\
.schema(schema)\
.load("../data/sql/people.txt")

读取json数据源

使用format(“json”)读取json数据

df = spark.read.format("json").\
load("../data/sql/people.json")
# JSON 类型 一般不用写.schema, json自带, json带有列名 和列类型(字符串和数字)
df.printSchema()
df.show()

读取csv数据源

使用format(“csv”)读取csv数据

df = spark.read.format("csv")\
.option("sep", ";")\ # 列分隔符
.option("header", False)\ # 是否有CSV标头
.option("encoding", "utf-8")\ # 编码
.schema("name STRING, age INT, job STRING")\ # 指定列名和类型
.load("../data/sql/people.csv") # 路径
df.printSchema()
df.show()

读取parquet数据源

使用format(“parquet”)读取parquet数据

# parquet 自带schema, 直接load啥也不需要了
df = spark.read.format("parquet").\
load("../data/sql/users.parquet")
df.printSchema()
df.show()

注意:
parquet: 是Spark中常用的一种列式存储文件格式,和Hive中的ORC差不多, 他俩都是列存储格式
parquet对比普通的文本文件的区别:

  • parquet 内置schema (列名\ 列类型\ 是否为空)。
  • 存储是以列作为存储格式。
  • 存储是序列化存储在文件中的(有压缩属性体积小)。

Parquet文件不能直接打开查看,如果想要查看内容,可以在PyCharm中安装如下插件来查看:
在这里插入图片描述

3.7 DataFrame的入门操作

DataFrame支持两种风格进行编程,分别是:DSL风格和SQL风格。

  • DSL语法风格:
    DSL称之为:领域特定语言。其实就是指DataFrame的特有API,DSL风格意思就是以调用API的方式来处理Data。比如:df.where().limit()
  • SQL语法风格
    SQL风格就是使用SQL语句处理DataFrame的数据,比如:spark.sql(“SELECT * FROM xxx)

DSL - show 方法

功能:展示DataFrame中的数据, 默认展示20条。
语法:

df.show(参数1, 参数2)
- 参数1: 默认是20, 控制展示多少条
- 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 要显示的话 请填入 truncate = True

DSL - printSchema方法

功能:打印输出df的schema信息
语法:

df.printSchema()

例如:
在这里插入图片描述

DSL - select

功能:选择DataFrame中的指定列(通过传入参数进行指定)
语法:
在这里插入图片描述
可传递:

  • 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串
    列名来指定列。
  • List[Column]对象或者List[str]对象, 用来选择多个列。

在这里插入图片描述

DSL - filter和where

功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame
语法:

df.filter()
df.where()

where和filter功能上是等价的。
在这里插入图片描述

DSL - groupBy 分组

功能:按照指定的列进行数据的分组, 返回值是GroupedData对象
语法:

df.groupBy()

传入参数和select一样,支持多种形式,不管怎么传意思就是告诉spark
按照哪个列分组。

在这里插入图片描述

GroupedData对象

GroupedData对象是一个特殊的DataFrame数据集,其类全名:<class 'pyspark.sql.group.GroupedData'>,这个对象是经过groupBy后得到的返回值, 内部记录了 以分组形式存储的数据。
GroupedData对象其实也有很多API,像:min、max、avg、sum、等等许多方法都存在。

SQL风格语法 - 注册DataFrame成为表

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中,使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。
如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:
在这里插入图片描述

SQL风格语法 - 使用SQL查询

在这里插入图片描述

pyspark.sql.functions 包

PySpark提供了一个包: pyspark.sql.functions,这个包里面提供了 一系列的计算函数供SparkSQL使用。

导包
from pyspark.sql import functions as F

3.8 SparkSQL Shuffle 分区数目

在这里插入图片描述

3.9 SparkSQL 数据清洗API

在大数据处理之前,首先要对数据进行清洗,有去重,删除缺值,填充缺值等等。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.10 DataFrame数据写出

DataFrame数据写出
在这里插入图片描述

3.11 DataFrame 通过JDBC读写数据库(MySQL示例)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/43568.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

想把iPad作为扩展屏,确发现macOS monterey随航功能不见了

居家办公最不爽的事情就是没有扩展屏&#xff0c;对于开发来说&#xff0c;效率是有影响的&#xff0c;于是便想着把iPad当作扩展屏来用 系统参数 mac&#xff1a; macOS monterey&#xff08;12.4&#xff09;&#xff1b;M1 iPad&#xff1a; iPad Pro 第2代&#xff0c;应该…

细粒度图像分类论文研读-2016

文章目录Compact Bilinear PoolingAbstractIntroductionCompact bilinear modelsA kernelized view of bilinear poolingCompact bilinear poolingSome properties of compact bilinear poolingCompact Bilinear Pooling Abstract 双线性模型很成功&#xff0c;但是双线性特征…

HNUCM-2022年秋季学期《算法分析与设计》练习14

目录 问题 A: 最小生成树&#xff08;Prim&#xff09; 问题 B: 牛妹的蛋糕 问题 C: 尼科彻斯定理 问题 D: 最小生成树&#xff08;Kruskal&#xff09; 问题 E: 单源最短路径问题 问题 F: 搭建电路 问题 G: 丛林小道 问题 H: 低碳出行 问题 A: 最小生成树&#xff08;P…

frontend webstorm plugin:插件推荐

目录CodeGlance &#xff08;左边地图&#xff09;GitToolBox &#xff08;Git提示&#xff09;Material Theme UI &#xff08;主题框架&#xff09;Nyan Progress Bar &#xff08;进度条&#xff09;Rainbow Brackets&#xff08;括号颜色&#xff0c;注意忽略变量&#xff…

【MySQL】事务和索引

文章目录事务&#xff08;Transaction&#xff09;1. 定义2. 如何操作事务2.1 SQL 语句操作事务的几个关键字2.2 使用 SQL 语句操作事务2.3 JDBC 操作事务3. 事务的四个特性&#xff1a;ACID3.1 Atomic&#xff08;原子性&#xff09;3.1.1 理解原子性3.2 Consistency&#xff…

H3C WX2510H无线控制器开局如何简单配置

环境&#xff1a; H3C-WX2510H version 7.1.064, Release 5435P02 AP H3CWA6320-C 问题描述&#xff1a; H3C WX2510h无线控制器开局如何简单配置 解决方案&#xff1a; 1.PC网卡设置ip192.168.0.111&#xff0c;连接随便一个LAN口 2.浏览器访问https://192.168.0.100…

面试处处碰壁?不慌,Java 核心面试文档.PDF 助你披荆斩棘

前言 首先强调几点&#xff1a; 1. 一定要谨慎对待写在简历上的东西&#xff0c;一定要对简历上的东西非常熟悉。因为一般情况下&#xff0c;面试官都是会根据你的简历来问的&#xff1b; 2. 能有一个上得了台面的项目也非常重要&#xff0c;这很可能是面试官会大量发问的地…

如何采集需要验证码登录的网站数据

如何抓取网页上的数据,需要登录&#xff1f;随着互联网的发展&#xff0c;移动支付技术的普及&#xff0c;以及人们对内容进行消费的观念逐渐养成。有很多网站&#xff0c;需要付费后才能查看&#xff0c;或者是开通会员之类的才能查看。针对这类网站&#xff0c;我们如何快速的…

性能测试环境部署

一、安装JDK 【步骤一】安装JDK (安装Jmeter之前需要配置JAVA环境) 下载jdk&#xff0c;到官网下载jdk&#xff0c;地址&#xff1a; http://www.oracle.com/technetwork/java/javase/downloads/index.html 【JDK版本要和JMeter版本对应&#xff0c;如果JDK版本过高&#x…

深度学习入门(8)激活函数ReLU和 Sigmoid的计算图表示方式及代码实现

《深度学习入门》系列文章目录 深度学习入门 (1)感知机 深度学习入门&#xff08;2&#xff09;神经网络 深度学习入门&#xff08;3&#xff09;神经网络参数梯度的计算方式 深度学习入门&#xff08;4&#xff09;【深度学习实战】无框架实现两层神经网络的搭建与训练过程…

黑盒测试用例设计 - 等价类划分法

说明&#xff1a;在所有测试数据中&#xff0c;具有某种共同特征的数据集合进行划分。 分类&#xff1a; 有效等价类&#xff1a;满足需求的数据集合无效等价类&#xff1a;不满足需求的数据集合 步骤&#xff1a; 明确需求设计一个新的测试用例&#xff0c;使其尽可能多的覆…

如何申请软件著作权

申请软件著作权的好处&#xff1a; 1、软件著作权登记证书是在软件著作权发生争议时&#xff0c;证明软件权利的最有力证。这不仅是在进行诉讼或者是发生一般纠纷时都能起到很好的证明作用&#xff0c;但是如果没有进行登记&#xff0c;著作权人的权利就很难获得全面的保护。 …

Linux-scheduler之负载均衡(一)

一、如何衡量CPU的负载 衡量CPU负载 简单衡量 CPU负载就绪队列的总权重CPU负载 就绪队列的总权重 CPU负载就绪队列的总权重 量化负载 CPU负载(采样期间累计运行时间/采样总时间)∗就绪队列总权重CPU负载 (采样期间累计运行时间/采样总时间)*就绪队列总权重 CPU负载(采样期…

Flutter 创建自己的对话框,不使用任何包!

创建自己的对话框&#xff0c;不使用任何包&#xff01; 原文 https://itnext.io/create-your-own-dialog-without-using-any-packages-7bb303f62471 前言 在本文中&#xff0c;我们将学习如何创建我们自己的 showDialog() 函数&#xff0c;并了解到底发生了什么。 正文 先看效…

Matplotlib 可视化50图:散点图(1)

导读 本系列将持续更新50个matplotlib可视化示例&#xff0c;主要参考Selva Prabhakaran 在MachineLearning Plus上发布的博文&#xff1a;Python可视化50图。 定义 关联图是查看两个事物之间关系的图像&#xff0c;它能够展示出一个事物随着另一个事物是如何变化的。关联图的类…

ctfshow(菜狗杯)

目录 web签到 一言既出 驷马难追 web2 c0me_t0_s1gn 我的眼里只有$ TAPTAPTAP Webshell 化零为整 无一幸免 遍地飘零 传说之下&#xff08;雾&#xff09; Is_Not_Obfuscate web签到 <?phperror_reporting(0); highlight_file(__FILE__);eval($_REQUEST[$_GET[…

springboot大学生课堂考勤管理系统的设计与实现

根据一般学生课堂考勤管理系统的功能需求分析&#xff0c;本系统的功能模块如下&#xff1a; &#xff08;1&#xff09;在个人中心&#xff0c;管理员可以修改自己的用户名和登录密码。 &#xff08;2&#xff09;在学生管理模块中&#xff0c;可以查看学生的信息&#xff0c;…

DiffusionDet:Diffusion Model for Object Detection

Diffusion Model for Object Detection 一种用于目标检测的扩散模型 Motivation 1、如何使用一种更简单的方法代替可查询的object queries 2、Bounding box的生成方式过去是三种&#xff0c;第一种为sliding windows、第二种anchor box、第三种object queries&#xff0c;这里其…

AlphaFold2源码解析(3)--数据预处理

AlphaFold2源码解析(3)–数据预处理 数据预处理整体流程 数据处理入口&#xff1a; feature_dict data_pipeline.process( input_fasta_pathfasta_path,# 输入序列目录 msa_output_dirmsa_output_dir) # MSA序列目录 可能是单体也可能是多聚体 主要调动的API是&#xff1a; …

如何让Java项目兼容更多的客户端设备(二)

如何让Java项目兼容更多的客户端设备&#xff08;二&#xff09; ​ ​ 一、Token认证的原理 传统的单体JavaWeb项目通常采用HttpSession保存登陆成功的凭证&#xff0c;但是HttpSession需要浏览器的Cookie机制配合。也就是说Web项目的客户端只能是浏览器&#xff0c;不可以…