【Spark系列6】如何做SQL查询优化和执行计划分析

news2024/9/28 11:12:50

Apache Spark SQL 使用 Catalyst 优化器来生成逻辑执行计划和物理执行计划。逻辑执行计划描述了逻辑上如何执行查询,而物理执行计划则是 Spark 实际执行的步骤。

一、查询优化

示例 1:过滤提前

未优化的查询

val salesData = spark.read.parquet("hdfs://sales_data.parquet")
val result = salesData
  .groupBy("product_id")
  .agg(sum("amount").alias("total_sales"))
  .filter($"total_sales" > 1000)

优化后的查询

val salesData = spark.read.parquet("hdfs://sales_data.parquet")
val filteredData = salesData.filter($"amount" > 1000)
val result = filteredData
  .groupBy("product_id")
  .agg(sum("amount").alias("total_sales"))

优化解释:通过在聚合之前应用过滤,减少了聚合操作处理的数据量,从而减少了执行时间和资源消耗。

示例 2:使用广播连接

未优化的查询

val largeTable = spark.read.parquet("hdfs://large_table.parquet")
val smallTable = spark.read.parquet("hdfs://small_table.parquet")
val result = largeTable.join(smallTable, Seq("key"))

优化后的查询

import org.apache.spark.sql.functions.broadcast

val largeTable = spark.read.parquet("hdfs://large_table.parquet")
val smallTable = spark.read.parquet("hdfs://small_table.parquet")
val result = largeTable.join(broadcast(smallTable), Seq("key"))

优化解释:如果有一个小表和一个大表需要连接,使用广播连接可以将小表的数据发送到每个节点,减少数据传输和shuffle操作,提高查询效率。

示例 3:避免不必要的Shuffle操作

未优化的查询

val transactions = spark.read.parquet("hdfs://transactions.parquet")
val result = transactions
  .repartition(100, $"country")
  .groupBy("country")
  .agg(sum("amount").alias("total_amount"))

优化后的查询

val transactions = spark.read.parquet("hdfs://transactions.parquet")
val result = transactions
  .groupBy("country")
  .agg(sum("amount").alias("total_amount"))

优化解释:repartition会导致全局shuffle,而如果后续的操作是按照同一个键进行聚合,这个操作可能是不必要的,因为groupBy操作本身会引入shuffle。

示例 4:处理数据倾斜

未优化的查询

val skewedData = spark.read.parquet("hdfs://skewed_data.parquet")
val referenceData = spark.read.parquet("hdfs://reference_data.parquet")
val result = skewedData.join(referenceData, "key")

优化后的查询

val skewedData = spark.read.parquet("hdfs://skewed_data.parquet")
val referenceData = spark.read.parquet("hdfs://reference_data.parquet")
val saltedSkewedData = skewedData.withColumn("salted_key", concat($"key", lit("_"), (rand() * 10).cast("int")))
val saltedReferenceData = referenceData.withColumn("salted_key", explode(array((0 to 9).map(lit(_)): _*)))
  .withColumn("salted_key", concat($"key", lit("_"), $"salted_key"))
val result = saltedSkewedData.join(saltedReferenceData, "salted_key")
  .drop("salted_key")

优化解释:当存在数据倾斜时,可以通过给键添加随机后缀(称为salting)来分散倾斜的键,然后在连接后去除这个后缀。

示例 5:缓存重用的DataFrame

未优化的查询

val dataset = spark.read.parquet("hdfs://dataset.parquet")
val result1 = dataset.filter($"date" === "2024-01-01").agg(sum("amount"))
val result2 = dataset.filter($"date" === "2024-01-02").agg(sum("amount"))

优化后的查询

val dataset = spark.read.parquet("hdfs://dataset.parquet").cache()
val result1 = dataset.filter($"date" === "2024-01-01").agg(sum("amount"))
val result2 = dataset.filter($"date" === "2024-01-02").agg(sum("amount"))

优化解释:如果同一个数据集被多次读取,可以使用cache()persist()方法将数据集缓存起来,避免重复的读取和计算。

在实际应用中,优化Spark SQL查询通常需要结合数据的具体情况和资源的可用性。通过观察Spark UI上的执行计划和各个stage的详情,可以进一步诊断和优化查询性能。

二、执行计划分析

逻辑执行计划

逻辑执行计划是对 SQL 查询语句的逻辑解释,它描述了执行查询所需执行的操作,但不涉及具体如何在集群上执行这些操作。逻辑执行计划有两个版本:未解析的逻辑计划(unresolved logical plan)和解析的逻辑计划(resolved logical plan)。

举例说明

假设我们有一个简单的查询:

SELECT name, age FROM people WHERE age > 20

在 Spark SQL 中,这个查询的逻辑执行计划可能如下所示:

== Analyzed Logical Plan ==
name: string, age: int
Filter (age#0 > 20)
+- Project [name#1, age#0]
   +- Relation[age#0,name#1] parquet

这个逻辑计划的组成部分包括:

  • Relation: 表示数据来源,这里是一个 Parquet 文件。
  • Project: 表示选择的字段,这里是nameage
  • Filter: 表示过滤条件,这里是age > 20

物理执行计划

物理执行计划是 Spark 根据逻辑执行计划生成的,它包含了如何在集群上执行这些操作的具体细节。物理执行计划会考虑数据的分区、缓存、硬件资源等因素。

举例说明

对于上面的逻辑执行计划,Spark Catalyst 优化器可能生成以下物理执行计划:

== Physical Plan ==
*(1) Project [name#1, age#0]
+- *(1) Filter (age#0 > 20)
   +- *(1) ColumnarToRow
      +- FileScan parquet [age#0,name#1] Batched: true, DataFilters: [(age#0 > 20)], Format: Parquet, Location: InMemoryFileIndex[file:/path/to/people.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(age), GreaterThan(age,20)], ReadSchema: struct<age:int,name:string>

这个物理执行计划的组成部分包括:

  • FileScan: 表示数据的读取操作,这里是从 Parquet 文件读取。
  • ColumnarToRow: 表示数据格式的转换,因为 Parquet 是列式存储,需要转换为行式以供后续操作。
  • Filter: 表示过滤操作,这里是执行age > 20的过滤条件。
  • Project: 表示字段选择操作,这里是选择nameage字段。

物理执行计划还包含了一些优化信息,例如:

  • Batched: 表示是否批量处理数据,这里是true
  • DataFilters: 实际应用于数据的过滤器。
  • PushedFilters: 表示已推送到数据源的过滤器,这可以减少从数据源读取的数据量。

要查看 Spark SQL 查询的逻辑和物理执行计划,可以在 Spark 代码中使用.explain(true)方法:

val df = spark.sql("SELECT name, age FROM people WHERE age > 20")
df.explain(true)

这将输出上述的逻辑和物理执行计划信息,帮助开发者理解和优化查询。

给一个实际业务执行过程中的SQL计划参考:

执行计划:

== Parsed Logical Plan ==
Aggregate [count(1) AS count#93L]
+- Aggregate [pos_id#0, tag_id#1, pos_tag_id#2L], [pos_id#0, tag_id#1, pos_tag_id#2L]
   +- Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))
      +- SubqueryAlias `pos_tag_dim_row`
         +- Relation[pos_id#0,tag_id#1,pos_tag_id#2L,click_date#3,user_type#4,ad_division_id_new#5,delivery_system_type#6,campaign_type#7,ad_sku_first_cate_cd#8,automated_bidding_type#9,ad_type_cd#10,period#11,medium_company_id#12L,medium_traffic_id#13L,media_tag_created_date#14,os_type#15,bundle_name#16,jd_request_cnt#17L,filter_requests#18L,filter_requests_after#19L,valid_requests#20L,media_return_cnt#21L,impressions#22L,queries#23L,... 19 more fields] parquet

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#93L]
+- Aggregate [pos_id#0, tag_id#1, pos_tag_id#2L], [pos_id#0, tag_id#1, pos_tag_id#2L]
   +- Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))
      +- SubqueryAlias `pos_tag_dim_row`
         +- Relation[pos_id#0,tag_id#1,pos_tag_id#2L,click_date#3,user_type#4,ad_division_id_new#5,delivery_system_type#6,campaign_type#7,ad_sku_first_cate_cd#8,automated_bidding_type#9,ad_type_cd#10,period#11,medium_company_id#12L,medium_traffic_id#13L,media_tag_created_date#14,os_type#15,bundle_name#16,jd_request_cnt#17L,filter_requests#18L,filter_requests_after#19L,valid_requests#20L,media_return_cnt#21L,impressions#22L,queries#23L,... 19 more fields] parquet

== Optimized Logical Plan ==
Aggregate [count(1) AS count#93L]
+- Aggregate [pos_id#0, tag_id#1, pos_tag_id#2L]
   +- Project [pos_id#0, tag_id#1, pos_tag_id#2L]
      +- Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))
         +- Relation[pos_id#0,tag_id#1,pos_tag_id#2L,click_date#3,user_type#4,ad_division_id_new#5,delivery_system_type#6,campaign_type#7,ad_sku_first_cate_cd#8,automated_bidding_type#9,ad_type_cd#10,period#11,medium_company_id#12L,medium_traffic_id#13L,media_tag_created_date#14,os_type#15,bundle_name#16,jd_request_cnt#17L,filter_requests#18L,filter_requests_after#19L,valid_requests#20L,media_return_cnt#21L,impressions#22L,queries#23L,... 19 more fields] parquet

== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(1)], output=[count#93L])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#96L])
      +- *(2) HashAggregate(keys=[pos_id#0, tag_id#1, pos_tag_id#2L], functions=[], output=[])
         +- Exchange hashpartitioning(pos_id#0, tag_id#1, pos_tag_id#2L, 100)
            +- *(1) HashAggregate(keys=[pos_id#0, tag_id#1, pos_tag_id#2L], functions=[], output=[pos_id#0, tag_id#1, pos_tag_id#2L])
               +- *(1) Project [pos_id#0, tag_id#1, pos_tag_id#2L]
                  +- *(1) Filter (isnotnull(pos_id#0) && isnotnull(tag_id#1))
                     +- *(1) FileScan parquet [pos_id#0,tag_id#1,pos_tag_id#2L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://ns1017/user/jd_ad/ads_report/etl/offline.spark/tmc_base_report_daily/out..., PartitionFilters: [], PushedFilters: [IsNotNull(pos_id), IsNotNull(tag_id)], ReadSchema: struct<pos_id:string,tag_id:string,pos_tag_id:bigint>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1429403.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32GPIO输入(按键控制LED、光敏电阻控制蜂鸣器实例)

文章目录 一、介绍传感器模块介绍硬件电路c语言数据类型 二、实例按键控制LED接线图代码实现 光敏电阻控制蜂鸣器组装线路代码实现 相关函数解释 一、介绍 传感器模块介绍 硬件电路 上两种按下时为0&#xff0c;下两种按下时为1。 c语言数据类型 现在常用stdint头文件所定…

【Java程序设计】【C00187】基于SSM的旅游资源网站管理系统(论文+PPT)

基于SSM的旅游资源网站管理系统&#xff08;论文PPT&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于ssm的旅游资源网站 本系统分为前台系统、用户和管理员3个功能模块。 前台系统&#xff1a;当游客打开系统的网址后&#xff0c;首先看到的就是…

一文掌握单基因GSEA富集分析 | gseaGO and gseaKEGG

本期教程 本期教程原文&#xff1a;一文掌握单基因GSEA富集分析 | gseaGO and gseaKEGG 写在前面 关于GSEA分析&#xff0c;我们在前期的教程单基因GSEA富集分析 | 20220404有出过类似的分享。今天&#xff0c;我们也结合相关的资源整理出一篇关于GSEA的教程及出图教程。每个…

mysql 锁知识汇总

目录 一、锁1.1 什么是锁&#xff1f;1.2 全局锁1.2.1 定义1.2.2 应用场景1.2.3 会出现的问题1.2.4 解决方法 1.3 表级锁1.3.1 表锁1.3.2 元数据锁&#xff08;MDL&#xff09;1.3.3 意向锁1.3.4 AUTO-INC锁 1.4 行级锁1.4.1 记录锁(Record Lock)1.4.2 间隙锁(Gap Lock)1.4.3 N…

字符下标计数

下标计数 数组计数&#xff0c;即通过使用一个新的数组&#xff0c;对原来数组里面的项进行计数&#xff0c;统计原来数组中各项出现的次数&#xff0c;如下图所示&#xff1a; 数组计数可以方便快速地统计出一个各项都比较小的数组中&#xff0c;数值相同的数的个数。 数组计数…

PHP集成开发 -- PhpStorm 2023

PhpStorm 2023是一款强大的PHP集成开发环境&#xff08;IDE&#xff09;&#xff0c;旨在提高开发人员的生产力和代码质量。以下是关于PhpStorm 2023软件的详细介绍&#xff1a; 首先&#xff0c;PhpStorm 2023提供了丰富的代码编辑功能&#xff0c;包括语法高亮、自动补全、代…

ubuntu22.04 安装部署01:禁用内核更新

一、前言 ubunut22.04系统安装以后&#xff0c;内核更新会导致各种各样的问题&#xff0c;因此锁定初始安装环境特别重要&#xff0c;下面介绍如何锁定内核更新。 二、操作方法 2.1 查看可用内核 dpkg --list | grep linux-image dpkg --list | grep linux-headers dpkg --…

故障诊断 | 一文解决,CNN-LSTM卷积神经网络-长短期记忆神经网络组合模型的故障诊断(Matlab)

效果一览 文章概述 故障诊断 | 一文解决,CNN-LSTM卷积神经网络-长短期记忆神经网络组合模型的故障诊断(Matlab) 模型描述 CNN-LSTM模型是一种结合了卷积神经网络(Convolutional Neural Network)和长短期记忆神经网络(Long Short-Term Memory)的组合模型,常用于数据故障…

各版本的Qt Creator的下载地址

2024年2月3日&#xff0c;周六上午 Index of /official_releases/qtcreatorhttps://download.qt.io/official_releases/qtcreator/ 如果想下载测试中的最新版Qt Creator的快照可以去这个地址 Index of /snapshots/qtcreatorhttps://download.qt.io/snapshots/qtcreator/

景联文科技受邀出席全国信标委生物特征识别分委会二届五次全会

全国信息技术标准化技术委员会生物特征识别分技术委员会&#xff08;SAC/TC28/SC37&#xff0c;以下简称“分委会”&#xff09;二届五次全会于2024年1月30日在北京顺利召开&#xff0c;会议由分委员秘书长王文峰主持。 分委会由国家标准化管理委员会批准成立&#xff0c;主要负…

git 如何修改仓库地址

问题背景&#xff1a;组内更换大部门之后&#xff0c;代码仓的地址也迁移了&#xff0c;所以原来的git仓库地址失效了。 虽然重新建一个新的文件夹&#xff0c;再把每个项目都git clone一遍也可以。但是有点繁琐&#xff0c;而且有的项目本地还有已经开发一半的代码&#xff0c…

网络异常案例六_IP冲突

问题现象 同一个局域网下&#xff0c;一个路由器带几十台终端设备&#xff0c;存在终端设备获取到了相同IP的场景。该路由器也是DHCP Server。 有两个设备终端&#xff0c;都显示获取到了192.168.11.177这个ip。 抓包分析 抓包过程中&#xff0c;看到的一些问题。 ps&#x…

三路快排解决TopK问题

前言&#xff1a; 我们首先要明白什么是三路快排&#xff0c;什么是topk问题。 三路快排&#xff1a; 思想&#xff1a; 三路快排就是数组分3块&#xff0c;三个指针&#xff0c;先随机取一个基准值key&#xff0c;然后将数组划分为3个部分&#xff1a; 【小于key】【等于…

客户端和服务端的简介

Client 和 Server 客户端&#xff08;Client&#xff09; 或称用户端&#xff0c;是指与服务器相对应&#xff0c;为客户提供本地服务的程序。除了一些只在本地运行的应用程序之外&#xff0c;一般安装在客户机上&#xff0c;需要与服务端互相配合运行。例如&#xff1a;下载 Q…

决策树的相关知识点

&#x1f4d5;参考&#xff1a;ysu老师课件西瓜书 1.决策树的基本概念 【决策树】&#xff1a;决策树是一种描述对样本数据进行分类的树形结构模型&#xff0c;由节点和有向边组成。其中每个内部节点表示一个属性上的判断&#xff0c;每个分支代表一个判断结果的输出&#xff…

爬虫笔记(三):实战qq登录

咳咳&#xff0c;再这样下去会进橘子叭hhhhhh 以及&#xff0c;这个我觉得大概率是成功的&#xff0c;因为测试了太多次&#xff0c;登录并且验证之后&#xff0c;qq提醒我要我修改密码才可以登录捏QAQ 1. selenium 有关selenium具体是啥&#xff0c;这里就不再赘述了&#x…

用C++实现一个哈希桶并封装实现 unordered_map 和 unordered_set

目录 哈希桶的实现 封装 unordered_map 和 unordered_set 封装代码 HashTable.h MyUnorderedMap.h MyUnorderedSet.h 哈希桶&#xff0c;又叫开散列法。开散列法又叫链地址法(开链法)&#xff0c;首先对关键码集合用散列函数计算散列地址&#xff0c;具有相同地址的关键码…

spring问题点

1.事务 1.1.事务传播 同一个类中 事务A调非事务B B抛异常 AB事务生效&#xff08;具有传播性&#xff09; 同一个类中 事务A调非事务B A抛异常 AB事务生效 也就是主方法加了事务注解 则方法内调用的其他本类方法无需加事务注解&#xff0c; 发生异常时可以保证事务的回滚 最常…

安科瑞消防设备电源监控系统在地铁工程的设计与应用

【摘要】&#xff1a;本文介绍了地铁工程中消防设备电源监控系统设置的必要性及规范求&#xff0c;分析了监控设计方案&#xff0c;提出该系统在地铁工程中的应用要求及建议&#xff0c;以供地铁工程建设参考。消防设备电源监控系统主要针对消防用电设备的电源进行实时的监控&a…

在 Elastic Agent 中为 Logstash 输出配置 SSL/TLS

要将数据从 Elastic Agent 安全地发送到 Logstash&#xff0c;你需要配置传输层安全性 (TLS)。 使用 TLS 可确保你的 Elastic Agent 将加密数据发送到受信任的 Logstash 服务器&#xff0c;并且你的 Logstash 服务器从受信任的 Elastic Agent 客户端接收数据。 先决条件 确保你…