1. DataFrame详解
DataFrame是基于RDD进行封装的结构化数据类型,增加了schema元数据,最终DataFrame类型在计算时,还是转为rdd计算。DataFrame的结构化数据有Row(行数据)和schema元数据构成。
- Row 类型 表示一行数据
- DataFrame就算是多行构成
# 导入行类Row
from pyspark.sql import Row
# 创建行数据
r1 = Row(1, '张三', 20)
# 行数取取值 按照下标取值
data = r1[0]
print(data)
data1 = r1[1]
print(data1)
# 指定字段创建行数据
r2 = Row(id=2, name='李四', age=22)
# 按照字段取值
data3 = r2['id']
print(data3)
data4 = r2['name']
print(data4)
- schema表信息
- 定义DataFrame中的表的字段名和字段类型。
# 导入数据类型
from pyspark.sql.types import *
# 定义schema信息
# 使用StructType类进行定义
# add()方法是指定字段信息
# 第一参数,字段名
# 第二个参数,字段信息
# 第三个参数是否允许为空值 默认是True,允许为空
schema_type = StructType().\
add('id',IntegerType()).\
add('name',StringType()).\
add('age',IntegerType(),False)
2. DataFrame创建
创建datafram数据
需要使用一个sparksession的类
创建,SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext。
2.1 基本创建
#DataFrame 的基本创建
#Row就是行数据定义的类
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *
#行数据创建
r1 = Row(1,"刘向阳",23,'男')
print(r1)
#行数据下标取值
print(r1[0])
print(r1[1])
#创建行数据时可以指定字段名
r2 = Row(id=2,name='李四',age=20,gender='女')
print(r2)
#使用字段名取值
print(r2['name'])
# 定义元数据
schema = (StructType().add('id', IntegerType()).add('username', StringType()).add('age', IntegerType()).add('gender', StringType()))
print(schema)
# 将元数据和行数据放在一起合成DataFrame
ss = SparkSession.builder.getOrCreate()
# 调用创建df的方法
df = ss.createDataFrame([r1,r2],schema=schema)
# 查看df中数据
df.show()
#查看元数据信息
df.printSchema()
运行结果:
2.2 RDD和DF之间的转化
- rdd的二维数据转化为DataFrame
- rdd.toDF()
- rdd.toDF()
# rdd 和 dataframe的转化
from pyspark.sql import SparkSession
#创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
#基于ss对象获取sparkContext
sc = ss.sparkContext
#创建rdd , 要使用二维列表指定每行数据
rdd = sc.parallelize([[1,'张三',20,'男'],[2,'李四',20,'男']])
#将rdd转为df
df = rdd.toDF(schema='id int,name string,age int,gender string')
#df数据查看
df.show()
df.printSchema()
#df可以转rdd
res = df.rdd.collect()
print(res)
rdd2 = df.rdd.map(lambda x:x['name'])
res2 = rdd2.collect()
print(res2)
运行结果:
2.3 pandas和spark之间转化
- spark的df转为pandas的df
toPandas
#pandas 和 spark的dataframe转化
from pyspark.sql import SparkSession
import pandas as pd
ss = SparkSession.builder.getOrCreate()
#创建pandas的df
df_pd = pd.DataFrame(
{
'id':[1,2,3,4],
'name':['张三','李四','王五','赵六'],
'age':[1,2,3,4],
'gender':['男','女','女','女']
}
)
#查看数据
print(df_pd)
#取值
name = df_pd['name'][0]
print(name)
# 将pandas中的df转为spark的df
df_spark = ss.createDataFrame(df_pd)
#查看
df_spark.show()
#取值
row = df_spark.limit(1).first()
print(row['name'])
#将spark的df重新转为pandas的df
df_pandas = df_spark.toPandas()
print(df_pandas)
运行结果:
2.4 读取文件数据转为df
通过read方法读取数据转为df
- ss.read
#读取文件转为df
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
#读取不同文件数据转为df
# txt文件
df = ss.read.text('hdfs://node1:8020/data/students.txt')
df.show()
# json 文件
df_json = ss.read.json('hdfs://node1:8020/data/baike_qa_valid.json')
df_json.show()
#orc文件
df_orc = ss.read.orc('hdfs://node1:8020/data/users.orc')
df_orc.show()
#去取csv文件
#header或csv文件中的第一行作为表头字段数据
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv')
df_csv.show()
3. DataFrame基本使用
3.1 SQL语句
使用sparksession提供的sql方法,编写sql语句执行
#使用sql操作dataframe结构化数据
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',')
#使用sql操作df数据
#将df指定一个临时表名
df_csv.createTempView('stu')
#编写sql字符串语句,支持hivesql语法
sql_str ="""
select * from stu
"""
#执行sql语句,执行结果返回一个新的df
df_res = ss.sql(sql_str)
df_csv.show()
df_res.show()
3.2 DSL方法
DSL方法是df提供的数据操作函数
使用方式:
- df.方法()
- 可以进行链式调用
- df.方法().方法().方法()
- 方法执行后返回一个新的df保存计算结果
- new_df = df.方法()
spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据。
from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
where 过滤需要处理的数据 df.join(df2).where()
group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
DSL方法执行完成后会得到一个处理后的新的df
#使用DSL方法操作dataframe
from pyspark.sql import SparkSession
ss = SparkSession.builder.getOrCreate()
#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1/data/students.csv', header=True,sep=',')
#使用DSL方法对df数据进行操作
df2 = df_csv.select('id','name')
#查看结果
df2.show()
#第二种指定字段的方式
df3 = df_csv.select(df_csv.age,df_csv.gender)
#给字段起别名
df4 = df_csv.select(df_csv.age.alias('new_age'),df_csv.gender)
df4.show()
#修改字段类型
df_csv.printSchema()
df5 = df_csv.select(df_csv.age.cast('int'),df_csv.gender)
df5.printSchema()
#where 的数据过滤
age = 20
df6 = df_csv.where(f'age > {age}')
df6.show()
#过滤年龄大于20并且性别为女性的学生信息
df7 = df_csv.where(f'age > 20 and gender = "女" ')
df7.show()
#使用第二种字段判断方式
df8 = df_csv.where(df_csv.age == age)
df8.show()
#分组聚合计算
df9 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age')
df9.show()
#分组后过滤where 聚合计算时只能一次计算一个聚合数据
df10 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age').where('sum(age) > 80')
df10.show()
#排序
df11 = df_csv.orderBy('age') #默认排序
df11.show()
df12 = df_csv.orderBy('age',ascending=False) #降序
df12.show()
#分页
df13 = df_csv.limit(5)
df13.show()
#转为rdd
res = df_csv.rdd.collect()[5:10]
print(res)
df_new = ss.createDataFrame(res)
df_new.show()