Dataframe
dataframe 是spark中参考pandas设计出的一套高级API,用户可以像操作pandas一样方便的操作结构化数据。毕竟纯的RDD操作是十分原始且麻烦的。而dataframe的出现可以让熟悉pandas的从业人员能用非常少的成本完成分布式的数据分析工作, 毕竟跟数据打交道的人很少有不懂dataframe的。
初始化dataframe的方法
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import Row
logFile = "/Users/xxxx/tools/spark-3.0.3-bin-hadoop2.7/README.md"
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
dataA = sqlContext.read.csv("路径")
dicts = [{'col1': 'a', 'col2': 1}, {'col1': 'b', 'col2': 2}]
dataf = sqlContext.createDataFrame(dicts)
dataf.show()
dicts = [['a', 1], ['b', 2]]
rdd = sc.parallelize(dicts)
dataf = sqlContext.createDataFrame(rdd, ['col1','col2'])
dataf.show()
rows = [Row(col1='a', col2=1), Row(col1='b', col2=2)]
dataf= sqlContext.createDataFrame(rows)
dataf.show()
dataf.write.csv(path="/Users/cainsun/Downloads/test_spark", header=True, sep=",", mode='overwrite')
可以看到创建dataframe有多种方式, 可以从文件中读取, 可以从列表中初始化,可以用简单的方式指定列信息, 也可以使用Row类来初始化列信息。
dataframe常用操作
读取数据:
df = spark.read.json("data.json")
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df = spark.read.parquet("data.parquet")
显示数据:
# 显示前 n 行数据,默认为 20 行
df.show(n=5)
# 打印 DataFrame 的 schema
df.printSchema()
选择和过滤数据:
# 选择特定列
selected_df = df.select("column1", "column2")
# 使用条件过滤数据
filtered_df = df.filter(df["age"] > 30)
聚合和分组数据:
from pyspark import SparkContext, SparkConf, SQLContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
dicts = [
['teacher', 202355, 16, '336051551@qq.com'],
['student', 2035, 16, '336051551@qq.com'],
['qa', 2355, 16, '336051551@qq.com'],
['qa', 20235, 16, '336051551@qq.com'],
['teacher', 35, 16, '336051asdf'],
['student', 453, 16, '336051asdf'],
]
rdd = sc.parallelize(dicts, 3)
data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])
result = data.groupBy("title").max("sales").alias("max_sales")
resultA = data.groupBy("title").sum("sales").alias("sum_sales")
# 显示结果
result.show()
resultA.show()
+-------+----------+
| title|max(sales)|
+-------+----------+
|teacher| 202355|
| qa| 20235|
|student| 2035|
+-------+----------+
+-------+----------+
| title|sum(sales)|
+-------+----------+
|teacher| 202390|
| qa| 22590|
|student| 2488|
+-------+----------+
数据排序:
from pyspark.sql.functions import desc
# 按列排序
sorted_df = df.sort("column1")
# 按列降序排序
sorted_df = df.sort(desc("column1"))
添加,修改和删除列:
from pyspark.sql.functions import upper
# 添加新列
new_df = df.withColumn("new_column", df["column1"] * 2)
# 修改现有列
modified_df = df.withColumn("column1", upper(df["column1"]))
# 删除列
dropped_df = df.drop("column1")
重命名列:
# 重命名 DataFrame 中的列
renamed_df = df.withColumnRenamed("old_column_name", "new_column_name")
spark sql
初始化
from pyspark import SparkContext, SparkConf, SQLContext
# 创建 SparkSession
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
dicts = [
['teacher', 202355, 16, '336051551@qq.com'],
['student', 2035, 16, '336051551@qq.com'],
['qa', 2355, 16, '336051551@qq.com'],
['qa', 20235, 16, '336051551@qq.com'],
['teacher', 35, 16, '336051asdf'],
['student', 453, 16, '336051asdf'],
]
rdd = sc.parallelize(dicts, 3)
data = sqlContext.createDataFrame(rdd, ['title', 'sales', 'age', 'email'])
data.createOrReplaceTempView("table")
要使用spark sql的能力, 需要利用createOrReplaceTempView创建一个临时表,然后才能执行 sql
简单的sql执行
query = "select * from table where title = 'qa'"
resultB = sqlContext.sql(query)
resultB.show()
# 执行结果
+-----+-----+---+----------------+
|title|sales|age| email|
+-----+-----+---+----------------+
| qa| 2355| 16|336051551@qq.com|
| qa|20235| 16|336051551@qq.com|
+-----+-----+---+----------------+
分组查询
query = "select title, sum(sales), max(sales) from table group by title"
resultC = sqlContext.sql(query)
resultC.show()
# 执行结果
+-------+----------+----------+
| title|sum(sales)|max(sales)|
+-------+----------+----------+
|teacher| 202390| 202355|
| qa| 22590| 20235|
|student| 2488| 2035|
+-------+----------+----------+
Spark sql适合熟悉sql语法的人使用,本质上sql和dataframe最终都会被翻译成rdd来运行。我们可以把它看成是rdd的高级语法糖就可以。 大家喜欢哪种操作方式就选择哪种就可以。
数据测试/监控
回顾自学习与数据闭环那里,我们知道在这样的系统中针对与每天新采集的数据,需要做一道数据校验。下面我模拟一个场景写这样一个检查脚本。
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
rdd = sc.parallelize(range(1000))
print(rdd.map(lambda x: '%s,%s' % ('男', '16')).collect())
dicts = [
['frank', 202355, 16, '336051551@qq.com'],
['frank', 202355, 16, '336051551@qq.com'],
['frank', 202355, 16, '336051551@qq.com'],
['frank', 202355, 16, '336051551@qq.com'],
['frank', 202355, 16, '336051asdf'],
['', 452345, 16, '336051asdf'],
]
rdd = sc.parallelize(dicts, 3)
dataf = sqlContext.createDataFrame(rdd, ['name', 'id', 'age', 'email'])
# 验证 id 字段必须是整数
id_filter = F.col("id").cast("int") >= 0
# 验证 name 字段必须是非空字符串
name_filter = F.col("name").isNotNull() & (F.col("name") != "")
# 验证 age 字段必须是大于等于 0 的整数
age_filter = F.col("age").cast("int") >= 0
# 验证 email 字段必须是有效的电子邮件地址
email_filter = F.col("email").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$")
# 应用过滤条件
valid_data = dataf.filter(id_filter & name_filter & age_filter & email_filter)
# 输出符合质量要求的数据
valid_data.show()
# 输出不符合质量要求的数据
invalid_data = dataf.exceptAll(valid_data)
invalid_data.show()
更多内容欢迎来到我的知识星球: