PySpark基础入门(7):Spark SQL

news2025/1/16 0:53:06

概述

SparkSQL和Hive的异同

  1. Hive和Spark 均是:“分布式SQL计算引擎”
  2. SparkSQL使用内存计算,而Hive使用磁盘迭代,所以SparkSQL性能较好
  3. 二者都可以运行在YARN之上
  4. SparkSQL无元数据管理,但可以和hive集成,集成之后可以借用hive的metastore进行元数据管理

SparkSQL的数据抽象

PySpark使用DataFrame,是一个二维表数据结构,适用于分布式集合

SparkSession对象

在RDD阶段,程序的执行入口对象是: SparkContext

在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象


SparkSession对象的作用:

  • 用于SparkSQL编程作为入口对象
  • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
    # 构建SparkSession执行环境入口对象
    spark = SparkSession.builder.\
        appName("test").\
		config("spark.sql.shuffle.partitions", "4").\
        master("local[*]").\
        getOrCreate()
	# 通过SparkSession可以获SparkContext对象
 	sc = spark.sparkContext

#appName:任务名称
#config:设置一些属性
#master:Spark运行模式
#getOrCreate:创建SparkSession对象

DataFrame

DataFrame和RDD的对比

相同点:DataFrame和RDD都是:弹性的、分布式的、数据集

不同点:DataFrame存储的数据结构限定为:二维表结构化数据;而RDD可以存储的数据则没有任何限制

也就是说,DataFrame 是按照二维表格的形式存储数据;RDD则是存储对象本身

DataFrame的组成

在结构层面:

  • StructType对象描述整个DataFrame的表结构
  • StructField对象描述一个列的信息

在数据层面:

  • Row对象记录一行数据
  • Column对象记录一列数据并包含列的信息(包含StructField)
schema = StructType().\
add("name", StringType(), nullable=True).\
add("age", IntegerType(), nullable=False)
# StructType是由多个StructField组成的
# 通过add方法向StructType中添加StructField
# 一个StructField记录由列名、列类型、列是否运行为空组成

DataFrame的构建

1、基于RDD构建:

	# 首先定义rdd
	rdd = sc.textFile("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], int(x[1])))

然后通过createDataFrame或者toDF的方法构建rdd

createDataFrame

# 构建DataFrame对象
# 参数1 被转换的RDD
# 参数2 指定列名, 通过list的形式指定, 按照顺序依次提供字符串名称即可
df = spark.createDataFrame(rdd, schema=['name', 'age'])

其中schema还可以通过StructType的格式进行定义:

# 创建表结构
schema = StructType().\
add("name", StringType(), nullable=True).\
add("age", IntegerType(), nullable=False)
# 构建df
df = spark.createDataFrame(rdd, schema=schema)

toDF

df = rdd.toDF(["name", "age"]) # 其中参数是表结构(参数类型由推断rdd中的数据得到,默认允许为空)

其中schema也可以通过StructType的格式进行定义:

schema = StructType().add("name", StringType(), nullable=True).\
        add("age", IntegerType(), nullable=False)
df2 = rdd.toDF(schema=schema)

构建好df之后的常用操作如下:

  • 可以通过printSchema()方法打印df的表结果:
df.printSchema()

结果如下:

可以将表名、表类型、是否允许为空打印出来

  • 可以通过show()方法打印df中的数据:
# 参数1 表示 展示出多少条数据, 默认不传的话是20
# 参数2 表示是否对列进行截断, 如果列的数据长度超过20个字符串长度, 后续的内容不显示以...代替
# 如果值为False 表示不截断,全部显示;默认是True
df.show(20, False)
  • 可以通过createOrReplaceTempView()方法创建临时视图表,供SQL语句查询
df.createOrReplaceTempView("people") # 参数是表名

2、基于Pandas的DataFrame构建

import pandas as pd
# 首先构建pandas的df对象
pdf = pd.DataFrame(
    {
        "id": [1, 2, 3],
        "name": ["aa", "bb", "cc"],
        "age": [11, 21, 11]
    }
)
# 然后创建spark的df对象
df = spark.createDataFrame(pdf)

3、读取外部数据

语法如下

sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
	.option("K", "V") # option可选
	.schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT")
	.load("被读取文件的路径, 支持本地文件系统和HDFS")

①读取文本数据:format("text")

	schema = StructType().add("data", StringType(), nullable=True)
    df = spark.read.format("text").\
        schema(schema=schema).\
        load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.txt")

结果如下:

读取文本数据的特点:将一整行只作为一个列读取,默认列名是value,类型是String

②读取json数据:format("json")

df = spark.read.format("json").load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.json")

json类型的数据有自带的列信息,就不用手动规定schema了,数据格式如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

通过运行结果:

可以看到自动加载了列名相关信息;

如果手动设置了表结构的信息,可能导致找不到json文件的对应数据:

如下:设置schema为:schema = StructType().add("name1",StringType(),nullable=True).add("age1",IntegerType(),nullable=True)

导致无法加载name和age里的数据:

③读取csv数据:format("csv")

df = spark.read.format("csv").\
option("sep", ";").\ # 列分隔符
option("header", True).\ # 是否有CSV标头
option("encoding", "utf-8").\ # 编码
schema("name STRING, age INT, job STRING").\ # 列名和类型
load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.csv")

其中列分隔符是根据csv文件里的分隔符设定的:

标头信息如下:如果设置为True,则在df.show()的时候会打印出来;

注意:schema的设置也可以通过StructType进行;

并且自定义的列名不会与csv文件里的标头进行比对,如下:

修改了列名,还能正常加载数据

④读取parquet数据:format("parquet")

parquet自带schema,直接load即可:

df = spark.read.format("parquet").load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/users.parquet")

parquet是Spark中常用的一种列式存储文件格式

parquet对比普通的文本文件的区别:

  1. parquet 内置schema (列名\ 列类型\ 是否为空)
  2. 存储是以列作为存储格式
  3. 存储是序列化存储在文件中的(有压缩属性体积小)

在vscode中安装parquet Viewer插件可以查看parquet文件的内容:

DSL(领域特定语言)语法风格

以调用DataFrame特有的API的方式来处理Data

show

功能:展示DataFrame中的数据, 默认展示20条

语法:

df.show(参数1, 参数2)
- 参数1: 默认是20, 控制展示多少条
- 参数2: 是否截断列, 默认只输出20个字符的长度, 过长不显示, 要显示的话 应填入 truncate = True

printSchema

功能:打印输出df的schema信息

语法:df.printSchema()

select

功能:选择DataFrame中的指定列

语法:

# select方法的参数支持字符串形式、column形式传入
# 字符串形式
df.select(["id", "subject"]).show()
df.select("id", "subject").show()
# column形式
# 首先获取column对象
id_column = df['id']
subject_column = df['subject'] #从df中根据列名获取
# 然后传入
df.select(id_column, subject_column).show()

filter和where

功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame

(二者是等价的)

语法:

# 字符串形式
df.filter("score < 99").show()
# column形式
df.filter(df['score'] < 99).show()

groupBy

功能:按照指定的列进行数据的分组, 返回值是GroupedData对象

语法:

# 告知spark需要按照哪个列进行分组:
# 字符串形式
df.groupBy("subject").count().show()
# column形式
df.groupBy(df['subject']).count().show()

GroupedData对象:

是一个特殊的DataFrame数据集,是经过groupBy后得到的返回值, 内部记录了以分组形式存储的数据

常用API:min、max、avg、sum、count等等;

SQL风格语法

注册DataFrame成为表 语法:

# 注册成临时表
df.createTempView("table") # 注册临时视图(表)
df.createOrReplaceTempView("table2") # 注册或者替换临时视图表
df.createGlobalTempView("table3") # 注册全局临时视图;全局临时视图在使用的时候需要在前面带上global_temp.前缀

全局表和临时表的区别

  • 临时表:只能在当前SparkSession使用;
  • 全局表:可以跨SparkSession使用,在一个程序内的多个SparkSession中均可调用,但查询时需要前缀global_temp

使用sql查询

语法:sparksession.sql(sql语句)

DataFrame数据写出

语法:df.write.mode().format().option(K,V).save(path)

mode:写出的模式;包括append:追加|overwrite:覆盖|ignore:忽略|error:文件重复则报异常(默认)

format:文件格式,包括text|csv|json|parquet|orc|avro|jdbc

option:属性

save:保存路径

需要注意的是,text只能写出一个单列数据

# 需要将df转换为单列df
df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "ts")).\
        write.\
        mode("overwrite").\
        format("text").\
        save("../data/output/sql/text")

写出后文件的格式如下:

DataFrame 通过JDBC读写数据库

首先需要在anaconda中安装mysql的驱动:

将mysql的驱动包放在anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/路径下

写入mysql

读取mysql

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/501412.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

极光笔记 | 极光推出“运营增长”解决方案,开启企业增长新引擎

摘要&#xff1a; 移动互联网流量红利见底&#xff0c;营销获客面临更多挑战 随着移动互联网流量红利见顶&#xff0c;越来越多的企业客户发现获取新客户的难度直线上升&#xff0c;获客成本持续攀高。 传统的移动互联网营销以PUSH为代表&#xff0c;采用简单粗暴的方式给用户…

PaddleVideo 简介以及文件目录详解

简介特性许可证书 PaddleVideo 文件目录总述applications 文件夹详述configs 文件夹详述docs 文件夹详述paddlevideo 文件夹详述utils 文件夹tasks 文件夹loader 文件夹modeling 文件夹solver 文件夹metrics 文件夹 简介 PaddleVideo 旨在打造一套丰富、领先且实用的 Video 工…

【阿里云】秒懂云通信

目录 一、秒懂云通信-第一回听什么? 二、短信的使用场景 1. 短信的三种类型&#xff1a;短信通知、验证、会员营销 三、短信平台的选择 1、看成功率 2、看价格 3、看体验 四、秒懂云通信 五、如何使用 Step 1&#xff1a;业务入口 Step 2&#xff1a;注册账号 Step…

云安全技术——Snort安装与配置

目录 一、Snort简介 二、安装Centos7 Minimal系统 三、基本环境配置 四、安装Snort 五、下载规则 六、配置Snort 七、测试Snort 一、Snort简介 Snort是一个开源的网络入侵检测系统&#xff0c;主要用于监控网络数据包并检测可能的攻击行为。它可以实时分析网络流量&…

HJ37 统计每个月兔子的总数

HJ37 统计每个月兔子的总数 描述示例解题思路以及代码分析解法1解法2 描述 描述 有一种兔子&#xff0c;从出生后第3个月起每个月都生一只兔子&#xff0c;小兔子长到第三个月后每个月又生一只兔子。 例子&#xff1a;假设一只兔子第3个月出生&#xff0c;那么它第5个月开始会…

ASEMI代理ADUM3211TRZ-RL7原装ADI车规级ADUM3211TRZ-RL7

编辑&#xff1a;ll ASEMI代理ADUM3211TRZ-RL7原装ADI车规级ADUM3211TRZ-RL7 型号&#xff1a;ADUM3211TRZ-RL7 品牌&#xff1a;ADI/亚德诺 封装&#xff1a;SOIC-8 批号&#xff1a;2023 引脚数量&#xff1a;8 工作温度&#xff1a;-40C~125C 安装类型&#xff1a;表…

操作系统原理 —— 操作系统什么时候会发生进程的调度(十二)

操作系统什么时候需要进程调度&#xff1f; 进程调度的层次中&#xff0c;有一个低级调度&#xff0c;就是按照某种算法从就绪队列中选择一个进程为其分配 CPU。 那操作系统会在什么时候触发进程调度呢&#xff1f; 在这里一共可以分为两大类&#xff1a; 当前运行的进程主动…

04-微服务部署2023系列-centos安装gitlab

目的:为了将来的devops快速部署搭建自己的代码库,保证速度和私密性 前面01-03小节: 完成基本的服务器环境 centos_nginx_java_docker; 这个基础环境是将来集群中每台服务器的基本, 可以先打一个镜像备份。 阿里云的镜像备份比较简单。以后搭建新服务器时,以这个为基础,安…

JUC并发包详解AQS同步队列

一、AQS介绍 在JUC并发包中&#xff0c;AQS为其最关键的作用&#xff0c;全称为abstractQueuedSynchroinzed同步器&#xff0c;为信号量semaphore、同步锁的基础抽象类。 其中内部主要有二大块 state 共享资源&#xff0c;通过并发操作state修改改值为1&#xff0c;如果修改成…

《Linux 内核设计与实现》09. 内核同步介绍

共享资源之所以要防止并发访问&#xff0c;是因为如果多个执行线程同时访问和操作数据&#xff0c;就有可能发生各线程之间相互覆盖共享数据的情况&#xff0c;从而造成被访问的数据不一致状态。 临界区和竞争条件 临界区&#xff1a;访问和操作共享数据的代码段。原子操作&a…

键控流水灯

项目文件 文件 关于项目的内容知识点可以见专栏单片机原理及应用 的第四章 IO口编写 在电路图的基础上&#xff0c;编写可键控的流水灯程序。要求实现的功能为&#xff0c;K1是总开关,当K1首次按下时&#xff0c;流水灯由下往上流动;当K2按下时停止流动&#xff0c;且全部灯灭…

ASK,FSK和PSK

一、ASK&#xff0c;FSK和PSK 数字信号只有有限个离散值&#xff0c;使用数字信号对载波进行调制的方式称为键控(Keying),分为幅度键控&#xff08;ASK)、频移键控&#xff08;FSK)和相移键控&#xff08;PSK)。 幅度键控可以通过乘法器和开关电路来实现&#xff0c;在数字信…

SpringBoot【开发实用篇】---- 配置高级

SpringBoot【开发实用篇】---- 配置高级 1. ConfigurationProperties2. 宽松绑定/松散绑定3. 常用计量单位绑定4. 校验5. 数据类型转换 进入开发实用篇第二章内容&#xff0c;配置高级&#xff0c;其实配置在基础篇讲了一部分&#xff0c;在运维实用篇讲了一部分&#xff0c;这…

离线安装Percona

前言 安装还是比较简单&#xff0c;这边简单进行记录一下。 版本差异 一、离线安装Percona 下载percona官网 去下载你需要对应的版本 jemalloc-3.6.0-1.el7.x86_64.rpm 需要单独下载 安装Percona 进入RPM安装文件目录&#xff0c;执行下面的脚本 yum localinstall *.rpm修改…

C语言CRC-16 X25格式校验函数

C语言CRC-16 X25格式校验函数 CRC-16校验产生2个字节长度的数据校验码&#xff0c;通过计算得到的校验码和获得的校验码比较&#xff0c;用于验证获得的数据的正确性。基本的CRC-16校验算法实现&#xff0c;参考&#xff1a; C语言标准CRC-16校验函数。 不同应用规范通过对输…

聊聊Doris向量化执行引擎-过滤操作

聊聊Doris向量化执行引擎-过滤操作 Doris是开源的新一代极速MPP数据库&#xff0c;和StarRocks同源&#xff0c;采用全面向量化技术&#xff0c;充分利用CPU单核资源&#xff0c;将单核执行性能做到极致。本文&#xff0c;我们聊聊过滤操作是如何利用SIMD指令进行向量化操作。 …

PCB设计流程步骤中的注意事项

PCB中文名称为印制电路板&#xff0c;又称印刷线路板&#xff0c;几乎所有电子设备中都会应用到PCB。这种由贵金属制成的绿色电路板连接了设备的所有电气组件&#xff0c;并使其能够正常运行。PCB原理图是一个计划&#xff0c;是一个蓝图。它说明的并不是组件将专门放置在何处&…

【51单片机HC6800-EM3 V3.0】动态数码管显示,原理分析,连线操作

&#x1f38a;专栏【51单片机】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【如愿】 大一同学小吉&#xff0c;欢迎并且感谢大家指出我的问题&#x1f970; 目录 &#x1f354;提醒 &#x1f38a;连线图片 &#x1f38a;原理…

基于SSM框架扶贫信息综合平台前台管理系统(spring+springmvc+mybatis+jsp+jquery+css)

一、项目简介 本项目是一套基于SSM框架扶贫信息综合平台前台管理系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&am…

【刷题】138. 复制带随机指针的链表

138. 复制带随机指针的链表 一、题目描述二、示例三、实现 138. 复制带随机指针的链表 一、题目描述 给你一个长度为 n 的链表&#xff0c;每个节点包含一个额外增加的随机指针 random &#xff0c;该指针可以指向链表中的任何节点或空节点。 构造这个链表的 深拷贝。 深拷贝…