目录
创建 DataFrames
生成我们自己的 JSON 数据
创建 DataFrame
创建临时表
简单的 DataFrame 查询
DataFrame API 查询
SQL 查询
创建 DataFrames
通常,您会通过使用 SparkSession(或在 PySpark shell 中调用 spark)导入数据来创建 DataFrame。
我们将讨论如何将数据导入到本地文件系统、Hadoop 分布式文件系统(HDFS)或其他云存储系统(例如,S3 或 WASB)。在本文中,我们将专注于在 Spark 内直接生成您自己的 DataFrame 数据或利用 Databricks 社区版中已经可用的数据源。
首先,我们将不访问文件系统,而是通过生成数据来创建 DataFrame。在这种情况下,我们将首先创建 stringJSONRDD RDD,然后将其转换为 DataFrame。这段代码片段创建了一个包含游泳者(他们的 ID、姓名、年龄和眼睛颜色)的 JSON 格式的 RDD。
生成我们自己的 JSON 数据
下面,我们将最初生成 stringJSONRDD RDD:
stringJSONRDD = sc.parallelize(("""
{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"
}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"
}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"
}""")
)
现在我们已经创建了 RDD,我们将使用 SparkSession 的 read.json 方法(即 spark.read.json(...))将其转换为 DataFrame。我们还将使用 .createOrReplaceTempView 方法创建一个临时表。
创建 DataFrame
以下是创建 DataFrame 的代码:
swimmersJSON = spark.read.json(stringJSONRDD)
创建临时表
以下是创建临时表的代码:
swimmersJSON.createOrReplaceTempView("swimmersJSON")
如前文所述,许多 RDD 操作是转换,这些转换直到执行动作操作时才执行。例如,在前面的代码片段中,sc.parallelize 是一个转换,当使用 spark.read.json 从 RDD 转换为 DataFrame 时执行。注意,在这段代码的笔记本截图中(左下角附近),直到包含 spark.read.json 操作的第二个单元格,Spark 作业才执行。
为了进一步强调这一点,在下图的右侧窗格中,我们展示了执行的 DAG 图。
在下面的截图中,您可以看到 Spark 作业的 parallelize 操作来自生成 RDD stringJSONRDD 的第一个单元格,而 map 和 mapPartitions 操作是创建 DataFrame 所需的操作:
需要注意的是,parallelize、map 和 mapPartitions 都是 RDD 转换。在 DataFrame 操作 spark.read.json(在本例中)中,不仅有 RDD 转换,还有将 RDD 转换为 DataFrame 的动作。这是一个重要的说明,因为即使您正在执行 DataFrame 操作,要调试您的操作,您需要记住您将在 Spark UI 中理解 RDD 操作。
请注意,创建临时表是一个 DataFrame 转换,并且在执行 DataFrame 动作之前不会执行(例如,要执行的 SQL 查询)。
简单的 DataFrame 查询
现在您已经创建了 swimmersJSON DataFrame,我们将能够在其上运行 DataFrame API 以及 SQL 查询。让我们从一个简单的查询开始,显示 DataFrame 中的所有行。
DataFrame API 查询
要使用 DataFrame API 执行此操作,您可以使用 show(<n>) 方法,该方法将前 n 行打印到控制台:
# DataFrame API
swimmersJSON.show()
这将给出以下输出:
SQL 查询
如果您更倾向于编写 SQL 语句,您可以编写以下查询:
spark.sql("select * from swimmersJSON").collect()
这将给出以下输出:
我们使用了 .collect() 方法,它返回所有记录作为一个行对象(Row objects)的列表。请注意,您可以对 DataFrames 和 SQL 查询使用 collect() 或 show() 方法。只要确保,如果您使用 .collect(),这是针对小 DataFrame 的,因为它将返回 DataFrame 中的所有行,并将它们从执行器移回驱动程序。您可以改用 take(<n>) 或 show(<n>),这允许您通过指定 <n> 来限制返回的行数: