【Spark精讲】SparkSQL的RBO与CBO

news2024/11/18 5:59:31

Spark SQL核心:Catalyst

        Spark SQL的核心是Catalyst查询编译器,它将用户程序中的SQL/Dataset/DataFrame经过一系列操作,最终转化为Spark系统中执行的RDD。

Catalyst组成部分

  • Parser :用Antlr将SQL/Dataset/DataFrame转化成一棵未经解析的树,生成 Unresolved Logical Plan
  • Analyzer:Analyzer 结合 Catalog 信息对Parser中生成的树进行解析,生成 Resolved Logical Plan
  • Optimizer:对解析完的逻辑计划进行树结构的优化,以获得更高的执行效率,生成 Optimized Logical Plan
    • 谓词下推(Predicate Pushdown):PushdownPredicate 是最常见的用于减少参与计算的数据量的方法,将过滤操作下推到join之前进行
    • 常量合并(Constant Folding):比如, x+(1+2)  -> x+3
    • 列值裁剪(Column Pruning):对列进行裁减,只留下需要的列
  • Planner:Planner将Optimized Logical Plan 转换成多个 Physical Plan
  • CostModel:CBO 根据 Cost Model 算出每个 Physical Plan 的代价并选取代价最小的 Physical Plan 作为最终的 Physical Plan
  • Spark 以 DAG 的方法执行上述 Physical Plan,在执行 DAG 的过程中,Adaptive Execution 根据运行时信息动态调整执行计划从而提高执行效率

 SQL优化器:RBO、CBO

        SQL语句转化为具体执行计划是由SQL查询编译器决定的,同一个SQL语句可以转化成多种物理执行计划,如何指导编译器选择效率最高的执行计划,这就是优化器的主要作用。传统数据库(例如Oracle)的优化器有两种:

  • 基于规则的优化器(Rule-Based Optimization,RBO)
  • 基于代价的优化器(Cost-Based Optimization,CBO)

2.1 RBO(Rule-Based Optimization)

        RBO: Rule-Based Optimization也即“基于规则的优化器”,该优化器按照硬编码在数据库中的一系列规则来决定SQL的执行计划。只要按照这个规则去写SQL语句,无论数据表中的内容怎样、数据分布如何,都不会影响到执行计划。

        基于规则优化是一种经验式、启发式地优化思路,更多地依靠前辈总结出来的优化规则,简单易行且能够覆盖到大部分优化逻辑,但是对于核心优化算子Join却显得有点力不从心。举个简单的例子,两个表执行Join到底应该使用BroadcastHashJoin  还是SortMergeJoin?当前SparkSQL的方式是通过手工设定参数来确定,如果一个表的数据量小于这个值就使用BroadcastHashJoin,但是这种方案显得很不优雅,很不灵活。基于代价优化(CBO)就是为了解决这类问题,它会针对每个Join评估当前两张表使用每种Join策略的代价,根据代价估算确定一种代价最小的方案 。

2.2 CBO(Cost-Based Optimization)

        CBO: Cost-Based Optimization也即“基于代价的优化器”,该优化器通过根据优化规则对关系表达式进行转换,生成多个执行计划,然后CBO会通过根据统计信息(Statistics)和代价模型(Cost Model)计算各种可能“执行计划”的“代价”,即COST,从中选用COST最低的执行方案,作为实际运行方案。CBO依赖数据库对象的统计信息,统计信息的准确与否会影响CBO做出最优的选择。

        CBO 原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。其核心在于评估一个给定的物理执行计划的代价。物理执行计划是一个树状结构,其代价等于每个执行节点的代价总合。

        每个执行节点的代价分为两个部分:

  • 该执行节点对数据集的影响,或者说该节点输出数据集的大小与分布
  • 该执行节点操作算子的代价

        要计算每个执行节点的代价,CBO需要解决两个问题:

  • 如何获取原始数据集的统计信息
  • 如何根据输入数据集估算特定算子的输出数据集

CBO面临的挑战

​​​​​​在Spark1.0中所有的Catalyst Optimizer都是基于规则 (rule) 优化的。为了产生比较好的查询规 则,优化器需要理解数据的特性,于是在Spark2.0中引入了基于代价的优化器 (cost-based optimizer),也就是所谓的CBO。然而,CBO也无法解决很多问题,比如:

  • 数据统计信息普遍缺失,统计信息的收集代价较高;
  • 储存计算分离的架构使得收集到的统计信息可能不再准确;
  • Spark部署在某一单一的硬件架构上,cost很难被估计;
  • Spark的UDF(User-defined Function)简单易用,种类繁多,但是对于CBO来说是个黑盒子,无法估计其cost;

总而言之,由于种种限制,Spark的优化器无法产生最好的Plan。

也许你会想:Spark为什么不解决这个问题呢?这里有很多挑战,比如: 

  • 统计信息的缺失,统计信息的不准确,那么就是默认依据文件大小来预估表的大小,但是文件 往往是压缩的,尤其是列存储格式,比如parquet 和 ORC,而Spark是基于行处理,如果数据连续重复,file size可能和真实的行存储的真实大小,差别非常之大。这也是为何提高 autoBroadcastJoinThreshold,即使不是太大也可能会导致out of memory; 
  • Filter复杂、UDFs的使用都会使Spark无法准确估计Join输入数据量的大小。当你的queryplan异常大和复杂的时候,这点尤其明显;
  • 其中,Spark3.0中基于运行期的统计信息,将Sort Merge Join 转换为Broadcast Hash Join。

基于RBO优化

left join case

 var appSql: String =
      """
        |select
        |   *
        |from
        |   tab_spark_test as t1
        |left join tab_spark_test_2 as t2
        |on t1.id = t2.id
        |and t1.id > 5+5
        """.stripMargin
 
sparkSession.sql("use default;")
 
sparkSession.sql(appSql).explain(mode = "extended")

执行计划 

Outer 类型 Join 中的谓词下推

Outer 类型的 Join 操作在实际业务中的应用非常广泛 。 然而,不同于常规的 Join, Outer 类型 Join操作的谓词下推的处理比较复杂,用户在写 SQL语句时非常容易忽略,使得执行结果与自己的本意不符。 下面详细介绍谓词下推的几种处理逻辑。

对于 OuterJoin,假设返回所有行的基表为 Preserved row table,另外一张表为 Null supplying table,例如 t1 left join t2,则 t1 为 Preserved row table, t2 为 Null supplying table。 如果 Join 条件表达式为“on t1.key = t2.key and t1.key > 1 where t2.key >2”,则“t1.key> 1”叫作“Join 中条件”,“t2.key>2”叫作“Join后条件”。 总结起来, Outer Join语句的谓词下推有 4种情况,如下表所示。

为了方便分析,构造如下数据,假设表 t1 和表 t2 中的数据相同,都只包含两条数据。下面以数据表 t1 和 t2 为例,说明这 4种情况。

不加任何过滤条件

select t1. key, t1.value, t2.value
from t1 left join t2 
on tl.key = t2.keys;
t1.keyt1.valuet2.value
111
222

(1) Preserved row table“Join 中条件”不下推

select t1. key, t1.value, t2.value
from t1 left join t2 
on t1.key = t2.key 
and t1.key > 1;

这种情况下,过滤条件不会下推, SQL 最终执行的结果为:

 

(2) Preserved row table “Join 后条件”下推

select t1.key, t1.value, t2.value
from t1 left join t2 
on t1.key = t2.key 
where t1.key > 1;

等价于

select
  t1.key,
  t1.value,
  t2.value
from (
  select key, value 
  from t1 
  where t1.key >1
) t3
left join t2 
on t3.key = t2.key;

  

(3) Null supplying table “Join 中条件”下推

select t1.key, t1.value, t2.value
From t1 left join t2 
on t1.key = t2.key 
and t2.key > 1;

等价于

select t1.key, t1.value, t2.value
from t1 left join 
(
select key, value 
from t2 
where t2.key > 1
) t3 
on t1.key = t3.key;

 

(4) Null supplying table “Join 后条件”不下推

select t1.key, t1.value, t2.value
from t1 left join t2 
on t1.key = t2.key 
where t2.key >1;

基于CBO优化

CBO 优化主要在物理计划层面,原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划。

而每个执行节点的代价,分为两个部分: 

1、该执行节点对数据集的影响,即该节点输出数据集的大小与分布;

2、该执行节点操作算子的代价。

每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:

1、初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;

2、中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。

需要先执行特定的 SQL 语句来收集所需的表和列的统计信息。 

--表级别统计信息
ANALYZE TABLE 表名 COMPUTE STATISTICS
--生成列级别统计信息
ANALYZE TABLE 表名 COMPUTE STATISTICS FOR COLUMNS 列 1,列 2,列 3
 
--显示统计信息
DESC FORMATTED 表名
--显示列统计信息
DESC FORMATTED 表名 列名s

没有执行 ANALYZE状态 

执行 ANALYZE后,发现多了很多spark.sql.statistics信息

 

CBO相关参数

通过 "spark.sql.cbo.enabled" 来开启,默认是 false。配置开启 CBO 后,CBO 优化器可以基于表和列的统计信息,进行一系列的估算,最终选择出最优的查询计划。比如:Build 侧选择、优化 Join 类型、优化多表 Join 顺序等。

  • spark.sql.cbo.enabled

    默认false。true 表示打开,false 表示关闭。
    要使用该功能,需确保相关表和列的统计信息已经生成。

  • spark.sql.cbo.joinReorder.enabled
    使用 CBO 来自动调整连续的 inner join 的顺序。
    默认false。true:表示打开,false:表示关闭
    要使用该功能,需确保相关表和列的统计信息已经生成,且CBO 总开关打开。
  • spark.sql.cbo.joinReorder.dp.threshold
    使用 CBO 来自动调整连续 inner join 的表的个数阈值。
    默认10。
    如果超出该阈值,则不会调整 join 顺序。
  val CBO_ENABLED =
    buildConf("spark.sql.cbo.enabled")
      .doc("Enables CBO for estimation of plan statistics when set true.")
      .version("2.2.0")
      .booleanConf
      .createWithDefault(false)

  val PLAN_STATS_ENABLED =
    buildConf("spark.sql.cbo.planStats.enabled")
      .doc("When true, the logical plan will fetch row counts and column statistics from catalog.")
      .version("3.0.0")
      .booleanConf
      .createWithDefault(false)

  val JOIN_REORDER_ENABLED =
    buildConf("spark.sql.cbo.joinReorder.enabled")
      .doc("Enables join reorder in CBO.")
      .version("2.2.0")
      .booleanConf
      .createWithDefault(false)

  val JOIN_REORDER_DP_THRESHOLD =
    buildConf("spark.sql.cbo.joinReorder.dp.threshold")
      .doc("The maximum number of joined nodes allowed in the dynamic programming algorithm.")
      .version("2.2.0")
      .intConf
      .checkValue(number => number > 0, "The maximum number must be a positive integer.")
      .createWithDefault(12)

  val JOIN_REORDER_CARD_WEIGHT =
    buildConf("spark.sql.cbo.joinReorder.card.weight")
      .internal()
      .doc("The weight of cardinality (number of rows) for plan cost comparison in join reorder: " +
        "rows * weight + size * (1 - weight).")
      .version("2.2.0")
      .doubleConf
      .checkValue(weight => weight >= 0 && weight <= 1, "The weight value must be in [0, 1].")
      .createWithDefault(0.7)

  val JOIN_REORDER_DP_STAR_FILTER =
    buildConf("spark.sql.cbo.joinReorder.dp.star.filter")
      .doc("Applies star-join filter heuristics to cost based join enumeration.")
      .version("2.2.0")
      .booleanConf
      .createWithDefault(false)

  val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection")
    .doc("When true, it enables join reordering based on star schema detection. ")
    .version("2.2.0")
    .booleanConf
    .createWithDefault(false)

  val STARSCHEMA_FACT_TABLE_RATIO = buildConf("spark.sql.cbo.starJoinFTRatio")
    .internal()
    .doc("Specifies the upper limit of the ratio between the largest fact tables" +
      " for a star join to be considered. ")
    .version("2.2.0")
    .doubleConf
    .createWithDefault(0.9)

 使用举例

 def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("CBO")
      .set("spark.sql.cbo.enabled", "true")
      .set("spark.sql.cbo.joinReorder.enabled", "true")
      .setMaster("local[*]")
    val sparkSession: SparkSession = Util.SparkSession2hive(sparkConf)
 
    var appSql: String =
      """
        |select
        |   t1.name,count(1)
        |from
        |   tab_spark_test as t1
        |left join tab_spark_test_2 as t2
        |on t1.id = t2.id
        |group by t1.name
        """.stripMargin
 
    sparkSession.sql("use default;")
    sparkSession.sql(appSql).show()
 
    while (true) {}
 
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1352180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怎么快速修复mfc140.dll文件?解决mfc140.dll缺失的方法

面对计算机报告的 ​mfc140.dll​ 文件遗失错误&#xff0c;这通常表明系统中缺少一个关键的动态链接库文件&#xff0c;该文件对于运行以 Microsoft Foundation Class (MFC) 库编写的程序十分重要&#xff0c;尤其是那些需要图形界面的应用程序和一些游戏。若没有这个文件&…

清风数学建模-数学规划模型

内容&#xff1a;数学规划模型&#xff08;cab aeqbeq lbub&#xff09; 一.题型类型 1.线性规划linprog 2.非线性规划 fmincon 3.整数规划 intlinprog 4.&#xff08;0-1规划&#xff09;&#xff08;特殊的线性整数规划&#xff09;intlinprog 5.多目标规划 linprog 标…

JumpServer3.0版本(用户管理、邮件、MFA认证配置)

创建用户组 控制台页面可以看见左侧的用户管理下,有用户列表和用户组 点击用户组、点击创建按钮、设置名称,用户不用选择还没建用户,提交即可 创建用户 点击用户列表创建按钮,设置名称、用户名、邮箱等必填项 这个时候用户组选项,可以选好我们创建的用户组了,先创用…

Spring高手之路-Spring Bean、Java Bean和对象的区别与联系

目录 什么是Spring Bean 什么是Java Bean 什么是对象 Spring Bean与Java Bean与对象的联系与区别 联系 区别 什么是Spring Bean 在Spring官方文档中对Bean的解释如下&#xff1a; In Spring, the objects that form the backbone of your application and that are manage…

MySQL数据库高级SQL语句及存储过程

目录 一、高级SQL语句 &#xff08;一&#xff09;case语句 1.语法定义 2.示例 &#xff08;二&#xff09;空值(NULL) 和 无值( ) 1.区别 2.示例 &#xff08;1&#xff09;字符长度 &#xff08;2&#xff09;判断方法 ① 空值(NULL) ② 无值( ) &#xff08;3…

代码随想录算法训练DAY18|二叉树5

算法训练DAY18|二叉树5 513.找树左下角的值 力扣题目链接 给定一个二叉树&#xff0c;在树的最后一行找到最左边的值。 示例 1: 示例 2: 思路 本题要找出树的最后一行的最左边的值。此时大家应该想起用层序遍历是非常简单的了&#xff0c;反而用递归的话会比较难一点。 我…

【通关喜报】腾讯云TDSQL TCP/TCE、云运维tcp 12月认证考试,全员过关,年终冲刺!

2023年12月23日云贝教育有6位学员参加了腾讯云TDSQL-TCP以及TCE认证考试。都取得非常好的成绩~下面我们来看一下各位同学的理论考试和上机考试成绩吧~

BWP频域位置的确定

这里根据协议整理下BWP频域相关参数以及如何确定BWP的频域位置。 BWP的配置包含几个参数 &#xff1a; 1 SCS, CyclePrefix 和locationAndBandwidth。 BWP频域起始位置N_start_BWPOcarrierRBstart&#xff0c;其中Ocarrier 由RRC层参数offsetToCarrier决定。 locationAndB…

Java学习——设计模式——结构型模式2

结构型模式 结构型模式主要涉及如何组合各种对象以便获得更好、更灵活的结构。虽然面向对象的继承机制提供了最基本的子类扩展父类的功能&#xff0c;但结构型模式不仅仅简单地使用继承&#xff0c;而更多地通过组合与运行期的动态组合来实现更灵活的功能。 包括&#xff1a; 1…

Redis命令---String篇 (超全)

目录 1.Redis Setnx 命令 - 只有在 key 不存在时设置 key 的值。简介语法可用版本: > 1.0.0返回值: 设置成功&#xff0c;返回 1 。 设置失败&#xff0c;返回 0 。 示例 2.Redis Getrange 命令 - 返回 key 中字符串值的子字符简介语法可用版本: > 2.4.0返回值: 截取得到…

vue3 接入 Element Plus

vue3 接入 Element Plus vue3 发布已经很久了&#xff0c;官方也已经发布公告&#xff0c;自2023年12月31日起停止对 vue2 版本的维护更新&#xff0c;因此&#xff0c;vue3 正式登上了历史的舞台。组件库一直是前端开发的利器&#xff0c;减少了开发者开发复杂度&#xff0c;提…

计算机毕业设计------基于SpringCloud的实验室管理系统

项目介绍 实验室管理系统的用户可以分为两种&#xff1a;系统管理员和普通用户。系统管理员主要功能&#xff1a; 登录登出、分析数据、管理用户、管理日志、管理实验室、管理预约、维护个人资料、实验室保修管理 用户主要功能&#xff1a; 注册登录、查询实验室、实验室预约…

Edge浏览器的卸载(一分钟版)

一分钟看完不耽误 开整工具下载后 结尾 开整 工具 Remove-MS-Edge 看名字&#xff0c;简单直接 CSDN下载 资源设置是免费的&#xff0c;大家尽管下载 不放心软件安全的话&#xff0c;自己上github地址下载也行 下载后 解压之后 我们打开有gui的&#xff0c;也就是有界面的&…

胡润研究院发布《2023胡润中国最具历史文化底蕴品牌榜》

胡润研究院发布《2023胡润中国最具历史文化底蕴品牌榜》&#xff0c;前十名分别是片仔癀、同仁堂、贵州茅台、五粮液、中国银行、中华、黄山、农业银行、建设银行、汾酒。 榜单调研范围涵盖中国内地具有60年以上历史的为消费者提供产品或服务的品牌&#xff0c;综合考察品牌历史…

polar CTF web upload tutu

一、题目 二、解题 1、上传两个一样的木马提示不是 setu&#xff08;色图&#xff09; 2、上传两个图&#xff0c;提示md5值不一样 综上他需要两张md5值相同的图 找工具 fastcoll 可生成两个md5值相同的文件 http://www.win.tue.nl/hashclash/fastcoll_v1.0.0.5.exe.zip 照…

编织Spring魔法:解读核心容器中的Beans机制【beans 一】

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 编织Spring魔法&#xff1a;解读核心容器中的Beans机制【beans 一】 前言什么是Spring核心容器Beans的生命周期管理&#xff1a;初始化和销毁方法&#xff1a;各种作用域&#xff1a; beans的配置方式…

【Bidomain建模范式:Pansharpening】

Bidomain Modeling Paradigm for Pansharpening &#xff08;泛锐化的Bidomain建模范式&#xff09; 泛锐化是一个具有挑战性的低层次视觉任务&#xff0c;其目的是学习光谱信息和空间细节之间的互补表示。尽管取得了显着的进步&#xff0c;现有的基于深度神经网络&#xff0…

JVM内存模型理解

1、首先理解下什么是 jvm 内存模型&#xff1f; jvm内存模型定义了Java虚拟机运行时如何组织和管理内存&#xff0c;规定了各个内存区域的作用、结构和交互方式&#xff0c;以及线程间的内存可见性、内存操作的原子性等行为&#xff0c;以支持Java程序的执行&#xff0c;即一种…

以角色为基础的软件开发团队建设

角色抽象作为一种载体&#xff0c;可以很好地进行软件工程知识体系和企业知识地图的组织&#xff0c;满足企业知识体系持续改进的需要&#xff0c;因此角色团队组建和建设也可以作为软件工程实施方法之一。 软件开发项目立项时&#xff0c;重要工作之一就是开发团队的组建&…

Spring Security 6.x 系列(13)—— 会话管理及源码分析(一)

一、会话概念 在实现会话管理之前&#xff0c;我们还是先来了解一下协议和会话的概念&#xff0c;连协议和会话都不知道是啥&#xff0c;还谈啥管理。 1.1 http 协议 因为我们现在的会话&#xff0c;基本上都是基于HTTP协议的&#xff0c;所以在讲解会话之前&#xff0c;我再…