前言:
💞💞大家好,我是书生♡,本阶段和大家一起分享和探索大数据技术Spark—SparkSQL,本篇文章主要讲述了:Spark SQL 深度探索:内置函数、数据源处理与自定义函数实践等等。欢迎大家一起探索讨论!!!
💞💞代码是你的魔法,创造是你的舞台,用它们编织梦想,跨越障碍,探索无限的可能!!
个人主页⭐: 书生♡
gitee主页🙋♂:闲客
专栏主页💞:大数据开发
博客领域💥:大数据开发,java编程,前端,算法,Python
写作风格💞:超前知识点,干货,思路讲解,通俗易懂
支持博主💖:关注⭐,点赞、收藏⭐、留言💬
目录
- 1. SparkSQL内置函数
- 1.1 窗口函数
- 1.2 when&otherwise 函数
- 2. SparkSQL读写数据
- 2.1 从文件读取数据
- 2.1.1 txt文件读取
- 2.1.2 CVS文件读取
- 2.1.3 json文件读取
- 2.1.4 其他文件
- 2.2 往文件写入数据
- 2.2.1 text文件写入
- 2.2.2 text文件写入
- 2.2.3 json文件写入
- 2.3 数据库的读取写入
- 2.3.1 数据库读取数据
- 2.3.2 写入数据库
- 3. 自定义函数
- 3.1 函数分类
- 3.2 UDTF---explode函数
- 3.3 自定义UDF函数
- 3.3.1 普通注册方式
- 3.3.2 装饰器注册方式
- 3.4 Pandas的DF和Spark中的DF相互转换
1. SparkSQL内置函数
1.1 窗口函数
Apache Spark SQL 提供了一组强大的窗口函数,允许用户执行复杂的分析操作而无需复杂的子查询或者自定义函数。窗口函数可以在每个分区内对数据进行排序,并计算各种聚合值,如累计和、移动平均等。
Spark SQL 支持的窗口函数包括但不限于:
- row_number()
为每一行分配一个唯一的、连续的整数。
- rank()
分配一个唯一的排名给每一行,跳过被超越的排名值。
- dense_rank()
分配一个唯一的排名给每一行,不跳过被超越的排名值。
- percent_rank()
计算每一行相对于分区中其他行的百分比排名。
- cume_dist()
计算每一行相对于分区中其他行的累积分布。
- lead(expression, [offset], [default])
返回给定表达式在当前行之后第 offset 行的值(默认 offset 为 1)。
- lag(expression, [offset], [default])
返回给定表达式在当前行之前第 offset 行的值(默认 offset 为 1
- sum(expression),avg(expression),min(expression),max(expression)
对表达式计算聚合值,如总和、平均值、最小值或最大值。
- count(expression)
计算非空值的数量。
窗口函数语法:
排序函数--排序的序号 F.rank() F.row_number() F.dense_rank()
第一步:创建df对象
# 使用sparkcontext读取hdfs上的文件数据
# 将读取的文件数据转化为rdd
def getStudentDataFrame(sc,path,schema):
rdd = sc.textFile(path)
res1 = rdd.collect()
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
df = rdd_split.toDF(schema=schema)
df_select = df.select('*').limit(10)
return df_select
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from studentdataframe import getStudentDataFrame
# 创建sparksession对象
spark = SparkSession.builder.getOrCreate()
# 创建sc对象
sc = spark.sparkContext
# 创建df对象
file_path = 'hdfs://node1:8020/data/stu.txt'
schema = ('id string,name string,gender string,age string,'
'birthday string,major string,hobby string,create_time string')
df = getStudentDataFrame(sc, file_path, schema)
df.show()
第二步:在 select 方法中直接应用窗口函数
df_select=df.select(df.id.cast('int').alias('id'),df.name,df.gender,df.age.cast('int').alias('age'),df.birthday,df.major,df.hobby,df.create_time,)
df_select.printSchema()
df_row=df_select.select('id','name','gender','age',F.row_number().over(Window.partitionBy('gender').orderBy(df_select.age.desc())).alias('rn'))
df_row.select('id','name','gender','rn','age').where('rn<=3').show()
df_row1 = df_select.select(
"id",
"name",
"gender",
"age",
F.max("age").over(Window.partitionBy("gender")).alias("max_age")
)
df_row1.show()
1.2 when&otherwise 函数
在 PySpark 中,when 和 otherwise 是非常常用的函数,用于创建条件表达式,特别是在 DataFrame API 中。这些函数通常与 case 一起使用,允许您根据条件对数据进行转换。
F.when(condition=,value).when().otherwise(value)
使用 when 和 otherwise 的基本语法
from pyspark.sql import functions as F
# 创建一个基于条件的列
df = df.withColumn('new_column', F.when(condition, value_if_true).otherwise(value_if_false))
示例:根据年龄划分成 青年: 0<=age<30 中年: 30<=age<60 老年: age>=60, 统计不同年龄段学生人数
df_select = df.select('id','name','gender','age',
F.when((F.col('age') >=0) & (F.col('age') < 22) ,'青年').
when((F.col('age') >=22) & (F.col('age') <60) ,'中年').
otherwise('老年').alias('age_stage'))
df_select.show()
df_select.groupBy('age_stage').agg(F.count('id').alias('cnt')).show()
2. SparkSQL读写数据
在 Apache Spark 中,Spark SQL 提供了一种简单的方式来读取和写入各种数据源。下面是一些常见的数据源以及如何使用 Spark SQL 读写这些数据源的方法。
读取数据
- CSV 文件
df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("path/to/csv/file.csv")
- JSON 文件
df = spark.read.json("path/to/json/file.json")
- Parquet 文件
df = spark.read.parquet("path/to/parquet/file.parquet")
- JDBC 数据源
properties = {"user": "username", "password": "password"}
df = spark.read.jdbc(url="jdbc:mysql://localhost:3306/database",
table="table_name",
properties=properties)
- Avro 文件
df = spark.read.format("avro").load("path/to/avro/file.avro")
写入数据
- CSV 文件
df.write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save("path/to/output/csv")
- JSON 文件
df.write.json("path/to/output/json")
- Parquet 文件
df.write.parquet("path/to/output/parquet")
- JDBC 数据源
properties = {"user": "username", "password": "password"}
df.write.jdbc(url="jdbc:mysql://localhost:3306/database",
table="table_name",
mode="overwrite",
properties=properties)
- Avro 文件
df.write.format("avro").save("path/to/output/avro")
其他注意事项
- 数据源格式:Spark SQL 支持多种数据源格式,包括 CSV、JSON、Parquet、Avro、ORC 等。
- 写入模式:
mode
参数可以是"append"
,"overwrite"
,"ignore"
或"error"
。 - 分区:对于较大的数据集,可以使用分区来优化写入和读取性能。
df.write.partitionBy("year", "month").parquet("path/to/partitioned/parquet")
2.1 从文件读取数据
格式:
df = spark.read.format("格式") .load("path/to/csv/file.csv")
2.1.1 txt文件读取
print("======================== txt文件读取=====================")
# 第一种方法
df_text = spark.read.text('hdfs://node1:8020/data/stu.txt')
df_text.show()
df_select =df_text.select(F.split(df_text.value,",")[0].cast("int").alias("id"),
F.split(df_text.value,",")[1].alias("name"),
F.split(df_text.value,",")[2].alias("gender"),
F.split(df_text.value,",")[3].cast("int").alias("age"),
F.split(df_text.value,",")[4].alias("birthday"),
F.split(df_text.value,",")[5].alias("major"),
F.split(df_text.value,",")[6].alias("hobby")
)
df_select.show()
# 第二种方法
spark.read.format('text').load(path="hdfs://node1:8020/data/stu.txt").show()
2.1.2 CVS文件读取
csv文件需要指定分隔符,默认是逗号
csv也是一个文本文件格式, 可以指定分隔符(默认为逗号,)
header=True: 第一行为列名
inferSchema=True: 自动推断数据类型
schema: 设置表结构 列名和类型
print("========================csv文件读取=====================")
df_csv = spark.read.csv('hdfs://node1:8020/data/stu.csv',sep=',',
schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
df_csv.show()
df_csv1 = spark.read.format('csv').load(path="hdfs://node1:8020/data/stu.csv",sep=',',
schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
df_csv1.show()
2.1.3 json文件读取
json文件可以不指定schema,会根据字典自动解析
print("========================json文件读取=====================")
df_json = spark.read.json('file:///export/server/spark/examples/src/main/resources/employees.json')
df_json.show()
2.1.4 其他文件
print('-------------------orc文件读取---------------------')
df_orc = spark.read.orc('file:///export/server/spark/examples/src/main/resources/users.orc')
df_orc.show()
print('-------------------parquet文件读取---------------------')
df_parquet = spark.read.parquet('file:///export/server/spark/examples/src/main/resources/users.parquet')
df_parquet.show()
2.2 往文件写入数据
数据写入一共有三种方式:
df.write.text/json/csv/jdbc...(path=, mode=) df.write.save(format='text/json/csv/jdbc', path=, mode=) df.write.format().mode().save(path=)mode参数写入方式: ①append: 追加写入 ②overwrite: 覆盖写入
- txt格式文件
df.write.text(path=)
df.write.save(format='text', path=, mode=)
df.write.format().mode().save(path=)
- json格式文件
df.write.json(path=, mode=)
df.write.format().mode().save(path=)
- csv格式文件
df.write.csv(path=, sep=, mode=)
df.write.format().mode().save(path=)
- orc格式文件
df.write.orc(path=, mode=)
df.write.format().mode().save(path=)
- parquet格式文件
df.write.parquet(path=, mode=)
df.write.format().mode().save(path=)
- mysql数据
df.write.jdbc(url=, table=, mode=,properties=)
df.write.format().mode().save(path=)
准备数据:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# 创建sparksession对象
spark = SparkSession.builder.getOrCreate()
# 创建df对象
df = spark.createDataFrame(data=[[1, '小明', 18, '男'],
[2, '小红', 20, '女'],
[3, '张三', 22, '女'],
[4, '李四', 24, '男']],
schema='id int,name string,age int,gender string')
df.show()
2.2.1 text文件写入
text()方法中是没有mode参数, 只支持一次性写入, 不能重复执行,因此不建议使用,建议使用
df.write.save(format='text', path=, mode=)
df.write.format().mode().save(path=)
print('--------------------写入text文件------------------')
# todo df.write.text/json/csv/jdbc...(path=, mode=)
# text()方法中是不支持mode参数,写mode参数运行会报错, 只支持一次性写入, 不能重复执行
# df.write.text(path='/data/student.txt', mode='append')
# todo df.write.save(format='text/json/csv/jdbc', path=, mode=)
df_text.write.save(format='text',path='/data/student.txt', mode='overwrite')
df_text.write.save(format='text',path='/data/student.txt', mode='append')
# todo df.write.format().mode().save(path=)
df_text.write.format('text').mode('append').save(path='/data/student.txt')
2.2.2 text文件写入
# todo df.write.text/json/csv/jdbc...(path=, mode=)
df.write.csv(path='/data/student.csv',sep=',', mode='overwrite')
# todo df.write.format().mode().save(path=)
df.write.save(format='csv',path='/data/student.csv',sep=',', mode='overwrite')
2.2.3 json文件写入
print('--------------------写入json文件------------------')
# todo df.write.format().mode().save(path=)
df.write.format('json').mode('overwrite').save(path='/data/student.json')
2.3 数据库的读取写入
2.3.1 数据库读取数据
spark.read.jdbc(url='mysql的url', table='表名', properties=配置信息 账号,密码,驱动类型)
print("========================数据库读取=====================")
# url: mysql的url
# table: 表名
# properties: 配置信息 账号,密码,驱动类型
df_mysql = spark.read.jdbc(url='jdbc:mysql://192.168.88.100:3306/spark_test?characterEncoding=UTF-8',
table='test',
properties={'user':'root',
'password':'123456',
'driver':'com.mysql.jdbc.Driver'})
df_mysql.show()
2.3.2 写入数据库
spark.read.jdbc(url='mysql的url',table='表名', mode='写入方式', properties=配置信息 账号,密码,驱动类型)
print('-------------------------------数据表写入--------------------------')
# todo 如果表不存在会自动创建, 也可以先创建表再写入(表字段类型要和df列值类型一致)
df.write.jdbc(url='jdbc:mysql://192.168.88.100:3306/spark_test?characterEncoding=UTF-8',
table='stu',
mode='overwrite',
properties={'user': 'root',
'password': '123456',
'driver': 'com.mysql.jdbc.Driver'
})
3. 自定义函数
3.1 函数分类
- UDF(User-Defined-Function)函数
- 一对一关系
- 原df一行数据经过UDF函数处理, 返回新df的一行结果(函数的返回值)
- concat/concat_ws/split/substring…
- 可以自定义
- UDAF(User-Defined Aggregation Function)函数
- 多对一关系
- 原df多行数据经过UDAF函数处理, 返回新df的一行结果(函数的返回值)
- max/min/sum/mean/count…
- 可以自定义 -> 借助Pandas模块中的Series对象
- UDTF(User-Defined Table-Generating Function)函数
- 一对多关系
- 原df一行数据经过UDTF函数处理, 返回新df的多行结果(函数的返回值)
- explode -> 爆炸/炸裂函数
3.2 UDTF—explode函数
explode(col)函数:
接收array类型的列对象 -> array类型等同于python中的list类型
将[值1, 值2, …]结构的数据炸裂开, 列表中有多少个元素返回多少行数据
案例:将文件中的数据读取出来,统计每一个单词的个数
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
df_text = spark.read.text('/data/words.txt')
df_text.show(truncate=False)
df_split = df_text.select(F.split('value',',').alias('value'))
df_split.show(truncate=False)
df_explode = df_split.select(F.explode('value').alias('word'))
df_explode.show(truncate=False)
df_group = df_explode.groupBy('word').agg(F.count('word').alias('cnt'))
df_group.show(truncate=False)
# todo 通过sparksql实现
df_text.createOrReplaceTempView('text')
spark.sql(sqlQuery="""
select word,count(word) cnt
from (select explode(split(value ,',')) as word
from text) temp group by word
order by cnt DESC
""").show()
3.3 自定义UDF函数
创建自定义的函数需要将函数注册到Spark中:
- 普通注册方式 ------普通注册方式在DSL和SQL方式中都能使用
- 装饰器注册方式-------装饰器注册方式只能在DSL方式中使用
3.3.1 普通注册方式
自定义UDF函数使用步骤:以DSL和SQL语句两种方式种使用
- 创建自定义函数 -> python函数, 函数的参数是某列中的一行数据
- 将自定义函数注册spark中
- 普通注册方式 -> 借助sparksession对象
变量名 =sparksession.udf.register(name=新函数名, f=自定义UDF函数名, returnType=函数的返回值类型)
- 普通注册方式 -> 借助sparksession对象
name:新函数名, 一般和变量名重名
f:自定义UDF函数名
returnType: 说明自定义UDF函数的返回值,
如果返回值是嵌套类型, 需要外层和内层都要说明, spark的类型对如果返回值是字符串类型, 可以省略不写, 默认类型
注意:注册的过程中有两个函数名,一个是我们自定义的另一个是我们在上面自定义的函数名,DSL使用的是我们后定义的,sql使用的我们参数里面的函数名
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType
import re
spark = SparkSession.builder.getOrCreate()
def info(email):
# 正则表达式
# ()分组 .*?->非贪婪模式匹配
res =r'(.*?)@(.*?)[.](.*)'
fa = re.match(res,email)
return fa.group(1),fa.group(2)
df = spark.read.load(format='csv', path='/data/stu.csv',
sep=',',
schema='name string,age int,'
'gender string,phone string,email '
'string,city string,address string')
# 将自定义UDF函数注册到spark中, 类似spark的内置函数, 可以对df的列进行处理
# 普通注册方式
email_func = spark.udf.register(name="email_func",f=info,returnType=ArrayType(StringType()))
df.select('email',
email_func('email')[0].alias('user'),
email_func('email')[1].alias('company')).show()
print("------------------------------------------")
df.createTempView('email')
spark.sql(sqlQuery="""
select email,email_func(email)[0] as user,
email_func(email)[1] as company from email
""").show()
3.3.2 装饰器注册方式
@udf(returnType=): def 函数名(): ... 只能在DSL方式中使用
我们只需要在定义函数的是,在函数名上面加上一个注解
注意:返回值为字符串的时候,不需要写returnType,如果是其他的形式需要指定返回的格式
使用的是只要在select方法中,调用函数传入参数就可以了
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType
import re
spark = SparkSession.builder.getOrCreate()
print('**************************************************')
@udf(returnType=StringType())
def to_age(age):
if age>=0 and age<30:
return '青年'
elif age>=30 and age<60:
return '中年'
elif age>=60:
return '老年'
df_csv = spark.read.load(format='csv', path='/data/stu.csv',
sep=',',
schema='name string,age int,'
'gender string,phone string,email '
'string,city string,address string')
'''
@udf(returnType=)
def 函数名():
...
# 只能在DSL方式中使用
'''
# todo 装饰器注册方式
df_age=df_csv.select('name','age',to_age('age').alias('age_stage'))
df_age.show()
df_age.groupBy('age_stage').count().show()
3.4 Pandas的DF和Spark中的DF相互转换
在udf函数中,对字段数据的处理是一行一行的处理,无法对整个字段中的所有数据一次性处理,也就是无法完成聚合的操作
求和,求平均数,最大值,最小值等都需要获取整个字段的数据内容,udf函数中无法实现,此时就需要使用udaf函数
UDAF函数需要借助pandas中series类型数据,该类型的数据可以接受字段的一整列数据,完成对字段所有数据的聚合操作
- Pandas
Pandas是python的一个数据分析包(numpy,matlab),最初由AQR Capital Management于2008年4月开发,并于2009年底开源出来,目前由专注于Python数据包开发的PyData开发team继续开发和维护,属于PyData项目的一部分。
pandas是单机资源计算,不适合大数据计算场景
Pandas最初被作为金融数据分析工具而开发出来,因此pandas为时间序列分析提供了很好的支持。
Pandas的主要数据结构是 Series (一维数据)与 DataFrame(二维数据),这两种数据结构足以处理金融、统计、社会科学、工程等领域里的大多数典型用例。
通过索引取值 行索引 列索引
Series 代表一列或一行数据 只有行索引 取出某行或某列数据
DataFrame 有行和列 同时有行索引、列索引,通过行列索引取出的值就转为series类型
spark的dataframe row对象 row[0] 和 schema信息(列的名字)
- series
是一种类似于一维数组的对象,它由一组数据以及一组与之相关的数据标签(即索引)组成。
- DataFrame
是一个表格型的数据结构,它含有一组有序的列,每列可以是不同的值类型(数值、字符串、布尔型值),可以看作是一种二维表格型的数据结构,既有行索引也有列索引,行索引是 index,列索引是 columns。它可以被看做由 Series 组成的字典(共同用一个索引)。
- pandas的dataframe和spark的dataframe的转化
pandas的数据在计算时使用的是单机资源进行计算,想要进行分布式计算,利用多台计算机资源,此时就可以将pandas的dataframe转化为spark的dataframe
# 导入SparkSession模块,用于创建Spark会话
# 导入pandas库,用于数据处理
from pyspark.sql import SparkSession
import pandas as pd
# 创建或获取现有的Spark会话
spark = SparkSession.builder.getOrCreate()
# 使用pandas创建一个DataFrame,包含姓名和年龄数据
pd_df = pd.DataFrame(data={'name': ['zhjangsan', 'lisi', 'wangwu'],
'age': [0, 20, 30]})
# 打印pandas DataFrame以查看数据
print(pd_df)
# 将pandas DataFrame转换为Spark DataFrame,并指定schema
spark_df = spark.createDataFrame(pd_df, schema=['name', 'age'])
# 打印Spark DataFrame以查看数据
print(spark_df.show())
# 将Spark DataFrame转换回pandas DataFrame
s_to_p = spark_df.toPandas()
# 打印转换后的pandas DataFrame以查看数据
print(s_to_p)