概述
SparkSQL和Hive的异同
- Hive和Spark 均是:“分布式SQL计算引擎”
- SparkSQL使用内存计算,而Hive使用磁盘迭代,所以SparkSQL性能较好
- 二者都可以运行在YARN之上
- SparkSQL无元数据管理,但可以和hive集成,集成之后可以借用hive的metastore进行元数据管理
SparkSQL的数据抽象
PySpark使用
DataFrame
,是一个二维表数据结构,适用于分布式集合
SparkSession对象
在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象
SparkSession对象的作用:
- 用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
# 构建SparkSession执行环境入口对象 spark = SparkSession.builder.\ appName("test").\ config("spark.sql.shuffle.partitions", "4").\ master("local[*]").\ getOrCreate() # 通过SparkSession可以获SparkContext对象 sc = spark.sparkContext #appName:任务名称 #config:设置一些属性 #master:Spark运行模式 #getOrCreate:创建SparkSession对象
DataFrame
DataFrame和RDD的对比
相同点:DataFrame和RDD都是:弹性的、分布式的、数据集
不同点:DataFrame存储的数据结构限定为:二维表结构化数据;而RDD可以存储的数据则没有任何限制
也就是说,DataFrame 是按照二维表格的形式存储数据;RDD则是存储对象本身
DataFrame的组成
在结构层面:
- StructType对象描述整个DataFrame的表结构
- StructField对象描述一个列的信息
在数据层面:
- Row对象记录一行数据
- Column对象记录一列数据并包含列的信息(包含StructField)
schema = StructType().\ add("name", StringType(), nullable=True).\ add("age", IntegerType(), nullable=False) # StructType是由多个StructField组成的 # 通过add方法向StructType中添加StructField # 一个StructField记录由列名、列类型、列是否运行为空组成
DataFrame的构建
1、基于RDD构建:
# 首先定义rdd
rdd = sc.textFile("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.txt").\
map(lambda x: x.split(",")).\
map(lambda x: (x[0], int(x[1])))
然后通过createDataFrame
或者toDF
的方法构建rdd
①createDataFrame
# 构建DataFrame对象
# 参数1 被转换的RDD
# 参数2 指定列名, 通过list的形式指定, 按照顺序依次提供字符串名称即可
df = spark.createDataFrame(rdd, schema=['name', 'age'])
其中schema
还可以通过StructType
的格式进行定义:
# 创建表结构
schema = StructType().\
add("name", StringType(), nullable=True).\
add("age", IntegerType(), nullable=False)
# 构建df
df = spark.createDataFrame(rdd, schema=schema)
②toDF
:
df = rdd.toDF(["name", "age"]) # 其中参数是表结构(参数类型由推断rdd中的数据得到,默认允许为空)
其中schema
也可以通过StructType
的格式进行定义:
schema = StructType().add("name", StringType(), nullable=True).\
add("age", IntegerType(), nullable=False)
df2 = rdd.toDF(schema=schema)
构建好df之后的常用操作如下:
- 可以通过
printSchema()
方法打印df的表结果:
df.printSchema()
结果如下:
可以将表名、表类型、是否允许为空打印出来
- 可以通过
show()
方法打印df中的数据:
# 参数1 表示 展示出多少条数据, 默认不传的话是20
# 参数2 表示是否对列进行截断, 如果列的数据长度超过20个字符串长度, 后续的内容不显示以...代替
# 如果值为False 表示不截断,全部显示;默认是True
df.show(20, False)
- 可以通过
createOrReplaceTempView()
方法创建临时视图表,供SQL语句查询
df.createOrReplaceTempView("people") # 参数是表名
2、基于Pandas的DataFrame构建
import pandas as pd
# 首先构建pandas的df对象
pdf = pd.DataFrame(
{
"id": [1, 2, 3],
"name": ["aa", "bb", "cc"],
"age": [11, 21, 11]
}
)
# 然后创建spark的df对象
df = spark.createDataFrame(pdf)
3、读取外部数据
语法如下
sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
.option("K", "V") # option可选
.schema(StructType | String) # STRING的语法如.schema("name STRING", "age INT")
.load("被读取文件的路径, 支持本地文件系统和HDFS")
①读取文本数据:format("text")
schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text").\
schema(schema=schema).\
load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.txt")
结果如下:
读取文本数据的特点:将一整行只作为一个列
读取,默认列名是value,类型是String
②读取json数据:format("json")
df = spark.read.format("json").load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.json")
json类型的数据有自带的列信息,就不用手动规定schema了,数据格式如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
通过运行结果:
可以看到自动加载了列名相关信息;
如果手动设置了表结构的信息,可能导致找不到json文件的对应数据:
如下:设置schema为:schema = StructType().add("name1",StringType(),nullable=True).add("age1",IntegerType(),nullable=True)
导致无法加载name和age里的数据:
③读取csv数据:format("csv")
df = spark.read.format("csv").\
option("sep", ";").\ # 列分隔符
option("header", True).\ # 是否有CSV标头
option("encoding", "utf-8").\ # 编码
schema("name STRING, age INT, job STRING").\ # 列名和类型
load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/people.csv")
其中列分隔符是根据csv文件里的分隔符设定的:
标头信息如下:如果设置为True
,则在df.show()
的时候会打印出来;
注意:schema的设置也可以通过StructType进行;
并且自定义的列名不会与csv文件里的标头进行比对,如下:
修改了列名,还能正常加载数据
④读取parquet数据:format("parquet")
parquet自带schema,直接load即可:
df = spark.read.format("parquet").load("hdfs://10.245.150.47:8020/user/wuhaoyi/input/sql/users.parquet")
parquet是Spark中常用的一种列式存储文件格式;
parquet对比普通的文本文件的区别:
- parquet 内置schema (列名\ 列类型\ 是否为空)
- 存储是以列作为存储格式
- 存储是序列化存储在文件中的(有压缩属性体积小)
在vscode中安装parquet Viewer
插件可以查看parquet文件的内容:
DSL(领域特定语言)语法风格
以调用DataFrame特有的API的方式来处理Data
show
功能:展示DataFrame中的数据, 默认展示20条
语法:
df.show(参数1, 参数2) - 参数1: 默认是20, 控制展示多少条 - 参数2: 是否截断列, 默认只输出20个字符的长度, 过长不显示, 要显示的话 应填入 truncate = True
printSchema
功能:打印输出df的schema信息
语法:
df.printSchema()
select
功能:选择DataFrame中的指定列
语法:
# select方法的参数支持字符串形式、column形式传入 # 字符串形式 df.select(["id", "subject"]).show() df.select("id", "subject").show() # column形式 # 首先获取column对象 id_column = df['id'] subject_column = df['subject'] #从df中根据列名获取 # 然后传入 df.select(id_column, subject_column).show()
filter和where
功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame
(二者是等价的)
语法:
# 字符串形式 df.filter("score < 99").show() # column形式 df.filter(df['score'] < 99).show()
groupBy
功能:按照指定的列进行数据的分组, 返回值是
GroupedData
对象语法:
# 告知spark需要按照哪个列进行分组: # 字符串形式 df.groupBy("subject").count().show() # column形式 df.groupBy(df['subject']).count().show()
GroupedData对象:
是一个特殊的DataFrame数据集,是经过groupBy后得到的返回值, 内部记录了以分组形式存储的数据
常用API:min、max、avg、sum、count等等;
SQL风格语法
注册DataFrame成为表 语法:
# 注册成临时表
df.createTempView("table") # 注册临时视图(表)
df.createOrReplaceTempView("table2") # 注册或者替换临时视图表
df.createGlobalTempView("table3") # 注册全局临时视图;全局临时视图在使用的时候需要在前面带上global_temp.前缀
全局表和临时表的区别
- 临时表:只能在当前SparkSession使用;
- 全局表:可以跨SparkSession使用,在一个程序内的多个SparkSession中均可调用,但查询时需要前缀
global_temp
使用sql查询
语法:sparksession.sql(sql语句)
DataFrame数据写出
语法:df.write.mode().format().option(K,V).save(path)
mode:写出的模式;包括
append:追加|overwrite:覆盖|ignore:忽略|error:文件重复则报异常(默认)
format:文件格式,包括
text|csv|json|parquet|orc|avro|jdbc
option:属性
save:保存路径
需要注意的是,text只能写出一个单列数据
# 需要将df转换为单列df
df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "ts")).\
write.\
mode("overwrite").\
format("text").\
save("../data/output/sql/text")
写出后文件的格式如下:
DataFrame 通过JDBC读写数据库
首先需要在anaconda中安装mysql的驱动:
将mysql的驱动包放在anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/
路径下
写入mysql
读取mysql