Apache Spark SQL 使用 Catalyst 优化器来生成逻辑执行计划和物理执行计划。逻辑执行计划描述了逻辑上如何执行查询,而物理执行计划则是 Spark 实际执行的步骤。
一、查询优化
示例 1:过滤提前
未优化的查询
val salesData = spark.read.parquet("hdfs://sales_data.parquet")
val result = salesData
.groupBy("product_id")
.agg(sum("amount").alias("total_sales"))
.filter($"total_sales" > 1000)
优化后的查询
val salesData = spark.read.parquet("hdfs://sales_data.parquet")
val filteredData = salesData.filter($"amount" > 1000)
val result = filteredData
.groupBy("product_id")
.agg(sum("amount").alias("total_sales"))
优化解释:通过在聚合之前应用过滤,减少了聚合操作处理的数据量,从而减少了执行时间和资源消耗。
示例 2:使用广播连接
未优化的查询
val largeTable = spark.read.parquet("hdfs://large_table.parquet")
val smallTable = spark.read.parquet("hdfs://small_table.parquet")
val result = largeTable.join(smallTable, Seq("key"))
优化后的查询
import org.apache.spark.sql.functions.broadcast
val largeTable = spark.read.parquet("hdfs://large_table.parquet")
val smallTable = spark.read.parquet("hdfs://small_table.parquet")
val result = largeTable.join(broadcast(smallTable), Seq("key"))
优化解释:如果有一个小表和一个大表需要连接,使用广播连接可以将小表的数据发送到每个节点,减少数据传输和shuffle操作,提高查询效率。
示例 3:避免不必要的Shuffle操作
未优化的查询
val transactions = spark.read.parquet("hdfs://transactions.parquet")
val result = transactions
.repartition(100, $"country")
.groupBy("country")
.agg(sum("amount").alias("total_amount"))
优化后的查询
val transactions = spark.read.parquet("hdfs://transactions.parquet")
val result = transactions
.groupBy("country")
.agg(sum("amount").alias("total_amount"))
优化解释:repartition
会导致全局shuffle,而如果后续的操作是按照同一个键进行聚合,这个操作可能是不必要的,因为groupBy
操作本身会引入shuffle。
示例 4:处理数据倾斜
未优化的查询
val skewedData = spark.read.parquet("hdfs://skewed_data.parquet")
val referenceData = spark.read.parquet("hdfs://reference_data.parquet")
val result = skewedData.join(referenceData, "key")
优化后的查询
val skewedData = spark.read.parquet("hdfs://skewed_data.parquet")
val referenceData = spark.read.parquet("hdfs://reference_data.parquet")
val saltedSkewedData = skewedData.withColumn("salted_key", concat($"key", lit("_"), (rand() * 10).cast("int")))
val saltedReferenceData = referenceData.withColumn("salted_key", explode(array((0 to 9).map(lit(_)): _*)))
.withColumn("salted_key", concat($"key", lit("_"), $"salted_key"))
val result = saltedSkewedData.join(saltedReferenceData, "salted_key")
.drop("salted_key")
优化解释:当存在数据倾斜时,可以通过给键添加随机后缀(称为salting)来分散倾斜的键,然后在连接后去除这个后缀。
示例 5:缓存重用的DataFrame
未优化的查询
val dataset = spark.read.parquet("hdfs://dataset.parquet")
val result1 = dataset.filter($"date" === "2024-01-01").agg(sum("amount"))
val result2 = dataset.filter($"date" === "2024-01-02").agg(sum("amount"))
优化后的查询
val dataset = spark.read.parquet("hdfs://dataset.parquet").cache()
val result1 = dataset.filter($"date" === "2024-01-01").agg(sum("amount"))
val result2 = dataset.filter($"date" === "2024-01-02").agg(sum("amount"))
优化解释:如果同一个数据集被多次读取,可以使用cache()
或persist()
方法将数据集缓存起来,避免重复的读取和计算。
在实际应用中,优化Spark SQL查询通常需要结合数据的具体情况和资源的可用性。通过观察Spark UI上的执行计划和各个stage的详情,可以进一步诊断和优化查询性能。
二、执行计划分析
逻辑执行计划
逻辑执行计划是对 SQL 查询语句的逻辑解释,它描述了执行查询所需执行的操作,但不涉及具体如何在集群上执行这些操作。逻辑执行计划有两个版本:未解析的逻辑计划(unresolved logical plan)和解析的逻辑计划(resolved logical plan)。
举例说明
假设我们有一个简单的查询:
SELECT name, age FROM people WHERE age > 20
在 Spark SQL 中,这个查询的逻辑执行计划可能如下所示:
== Analyzed Logical Plan ==
name: string, age: int
Filter (age#0 > 20)
+- Project [name#1, age#0]
+- Relation[age#0,name#1] parquet
这个逻辑计划的组成部分包括:
Relation
: 表示数据来源,这里是一个 Parquet 文件。Project
: 表示选择的字段,这里是name
和age
。Filter
: 表示过滤条件,这里是age > 20
。
物理执行计划
物理执行计划是 Spark 根据逻辑执行计划生成的,它包含了如何在集群上执行这些操作的具体细节。物理执行计划会考虑数据的分区、缓存、硬件资源等因素。
举例说明
对于上面的逻辑执行计划,Spark Catalyst 优化器可能生成以下物理执行计划:
== Physical Plan ==
*(1) Project [name#1, age#0]
+- *(1) Filter (age#0 > 20)
+- *(1) ColumnarToRow
+- FileScan parquet [age#0,name#1] Batched: true, DataFilters: [(age#0 > 20)], Format: Parquet, Location: InMemoryFileIndex[file:/path/to/people.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:int,name:string>
这个物理执行计划的组成部分包括:
FileScan
: 表示数据的读取操作,这里是从 Parquet 文件读取。ColumnarToRow
: 表示数据格式的转换,因为 Parquet 是列式存储,需要转换为行式以供后续操作。Filter
: 表示过滤操作,这里是执行age > 20
的过滤条件。Project
: 表示字段选择操作,这里是选择name
和age
字段。
物理执行计划还包含了一些优化信息,例如:
Batched
: 表示是否批量处理数据,这里是true
。DataFilters
: 实际应用于数据的过滤器。PushedFilters
: 表示已推送到数据源的过滤器,这可以减少从数据源读取的数据量。
要查看 Spark SQL 查询的逻辑和物理执行计划,可以在 Spark 代码中使用.explain(true)
方法:
val df = spark.sql("SELECT name, age FROM people WHERE age > 20")
df.explain(true)
这将输出上述的逻辑和物理执行计划信息,帮助开发者理解和优化查询。
给一个实际业务执行过程中的SQL计划参考:
执行计划:
== Parsed Logical Plan ==
Aggregate [count(1) AS count#93L]
+- Aggregate [pos_id#0, tag_id#1, pos_tag_id#2L], [pos_id#0, tag_id#1, pos_tag_id#2L]
+- Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))
+- SubqueryAlias `pos_tag_dim_row`
+- Relation[pos_id#0,tag_id#1,pos_tag_id#2L,click_date#3,user_type#4,ad_division_id_new#5,delivery_system_type#6,campaign_type#7,ad_sku_first_cate_cd#8,automated_bidding_type#9,ad_type_cd#10,period#11,medium_company_id#12L,medium_traffic_id#13L,media_tag_created_date#14,os_type#15,bundle_name#16,jd_request_cnt#17L,filter_requests#18L,filter_requests_after#19L,valid_requests#20L,media_return_cnt#21L,impressions#22L,queries#23L,... 19 more fields] parquet
== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#93L]
+- Aggregate [pos_id#0, tag_id#1, pos_tag_id#2L], [pos_id#0, tag_id#1, pos_tag_id#2L]
+- Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))
+- SubqueryAlias `pos_tag_dim_row`
+- Relation[pos_id#0,tag_id#1,pos_tag_id#2L,click_date#3,user_type#4,ad_division_id_new#5,delivery_system_type#6,campaign_type#7,ad_sku_first_cate_cd#8,automated_bidding_type#9,ad_type_cd#10,period#11,medium_company_id#12L,medium_traffic_id#13L,media_tag_created_date#14,os_type#15,bundle_name#16,jd_request_cnt#17L,filter_requests#18L,filter_requests_after#19L,valid_requests#20L,media_return_cnt#21L,impressions#22L,queries#23L,... 19 more fields] parquet
== Optimized Logical Plan ==
Aggregate [count(1) AS count#93L]
+- Aggregate [pos_id#0, tag_id#1, pos_tag_id#2L]
+- Project [pos_id#0, tag_id#1, pos_tag_id#2L]
+- Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))
+- Relation[pos_id#0,tag_id#1,pos_tag_id#2L,click_date#3,user_type#4,ad_division_id_new#5,delivery_system_type#6,campaign_type#7,ad_sku_first_cate_cd#8,automated_bidding_type#9,ad_type_cd#10,period#11,medium_company_id#12L,medium_traffic_id#13L,media_tag_created_date#14,os_type#15,bundle_name#16,jd_request_cnt#17L,filter_requests#18L,filter_requests_after#19L,valid_requests#20L,media_return_cnt#21L,impressions#22L,queries#23L,... 19 more fields] parquet
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)], output=[count#93L])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#96L])
+- *(2) HashAggregate(keys=[pos_id#0, tag_id#1, pos_tag_id#2L], functions=[], output=[])
+- Exchange hashpartitioning(pos_id#0, tag_id#1, pos_tag_id#2L, 100)
+- *(1) HashAggregate(keys=[pos_id#0, tag_id#1, pos_tag_id#2L], functions=[], output=[pos_id#0, tag_id#1, pos_tag_id#2L])
+- *(1) Project [pos_id#0, tag_id#1, pos_tag_id#2L]
+- *(1) Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))
+- *(1) FileScan parquet [pos_id#0,tag_id#1,pos_tag_id#2L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ns1017/user/jd_ad/ads_report/etl/offline.spark/tmc_base_report_daily/out..., PartitionFilters: [], PushedFilters: [IsNotNull(pos_id), IsNotNull(tag_id)], ReadSchema: struct<pos_id:string,tag_id:string,pos_tag_id:bigint>