Spark中为什么Left join比Full join 快

news2024/12/23 22:27:11

背景

最近在调优的过程中,发现了left outer joinfull outer join快很多的情况,
具体的sql如下:

from 
    db.baseTb1 base  
    join db.tb1 a on base.id = a.id
    full outer join db.tbl2 b on  a.id = b.id 
    full outer join db.tbl3 c on  b.id = c.id 
    full outer join db.tbl4 d on  c.id = d.id 
    full outer join db.tbl5 e on  d.id = e.id 
  
-----

from 
    db.baseTb1 base  
    left join db.tb1 a on base.id = a.id
    left outer join db.tbl2 b on  a.id = b.id 
    left outer join db.tbl3 c on  a.id = c.id 
    left outer join db.tbl4 d on  a.id = d.id 
    left outer join db.tbl5 e on  a.id = e.id 

结论

先说结论:left join中4个join会在同一个Stage执行,也就是说会在同一个Task执行4个join,而full join每个join都是在单独的Stage中执行,是串行的, left join如下:
在这里插入图片描述

如果在语意允许的情况下,选择left join可以大大加速任务运行,笔者遇到的情况就是 left join 运行了1个小时,而full join运行了6个小时

分析

对于full outer join的情况,运行的物理计划如下:
在这里插入图片描述

对于每个SortMergeJoin完后都会有一个Exchange的shuffle操作。

对于left outer join的情况,运行的物理计划如下:
在这里插入图片描述

只有在读取source文件完之后才会有Exchange的shuffle的操作。
这是为什么呢?
因为在RuleEnsureRequirements中,会对于不匹配的计划之间加上shuffle Exchange物理计划,具体代码如下:

  private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
    val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
    val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
    var children: Seq[SparkPlan] = operator.children
    assert(requiredChildDistributions.length == children.length)
    assert(requiredChildOrderings.length == children.length)

    // Ensure that the operator's children satisfy their output distribution requirements.
    children = children.zip(requiredChildDistributions).map {
      case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
        child
      case (child, BroadcastDistribution(mode)) =>
        BroadcastExchangeExec(mode, child)
      case (child, distribution) =>
        val numPartitions = distribution.requiredNumPartitions
          .getOrElse(conf.numShufflePartitions)
        ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
    }
  • child.outputPartitioning.satisfies(distribution)这块代码就是判断下游的输出分区是否满足当前计划所要求的分布
    我们分析第一个join的时候,也就是:
   FileSourceScanExec     FileSourceScanExec
       \                      /
       \/                    \/

             SortMergeJoin

这里SortMergeJoin的requiredChildDistributionClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys)
SortMergeJoin的child的outputPartitioning为FileSourceScanExec.outputPartitioning,即UnknownPartitioning

所以会引入Exchange,形成如下的物理计划:

FileSourceScanExec     FileSourceScanExec
       \                      /
       \/                    \/
    Exchange            Exchange

           \               /
             SortMergeJoin

而最终经过AQE以后会形成如下的物理计划:

   FileSourceScanExec     FileSourceScanExec
       \                      /
       \/                    \/
   Exchange                Exchange
       |                      |
   CustomShuffleReader CustomShuffleReader

           \               /
             SortMergeJoin

而对于接下来的第二个join,full join和left join的情况就不一样了:

  • 对于left join:

FileSourceScanExec     FileSourceScanExec
       \                      /
       \/                    \/
   Exchange                Exchange
       |                      |
   CustomShuffleReader CustomShuffleReader

           \               /                    
             SortMergeJoin                  FileSourceScanExec
                                                   |
                    |                           Exchange
                    |                              |
                    |                          CustomShuffleReader
                    |                              /
                    SortMergeJoin(left outer join)

因为第二个SortMergeJoin的requiredChildDistributionClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys)
SortMergeJoin 的child的outputPartitioning为第一个SortMergeJoin.outputPartitioning,具体的代码如下:

  override def outputPartitioning: Partitioning = joinType match {
    case _: InnerLike =>
      PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
    case LeftOuter => left.outputPartitioning
    case RightOuter => right.outputPartitioning
    case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions)
    case LeftExistence(_) => left.outputPartitioning
    case x =>
      throw new IllegalArgumentException(
        s"ShuffledJoin should not take $x as the JoinType")
  }

所以是CustomShuffleReader.outputPartitioning,w为Exchange.outputPartitioningHashPartitioning,则能匹配satisfied上,所以不会引入额外的shuffle

  • 对于full outer join:

FileSourceScanExec     FileSourceScanExec
       \                      /
       \/                    \/
   Exchange                Exchange
       |                      |
   CustomShuffleReader CustomShuffleReader

           \               /                    
             SortMergeJoin                  FileSourceScanExec
                                                   |
                    |                           Exchange
                    |                              |
                    |                          CustomShuffleReader
                    |                              /
                    SortMergeJoin(left outer join)

其他的都是left outer join一样,唯一不一样的是SortMergeJoin 的child的outputPartitioning是 第一个SortMergeJoin.outputPartitioning ,根据以上代码:
走的就是FullOuter的逻辑,也就是UnknownPartitioning,所以是不满足,得引入shuffle Exchange。

其实从逻辑上来说,full join后如果不重新shuffle,会导致一个任务中会有id为null值的存在,会导致join的结果不正确
而对于left join来说就不一样了,task join完后id还是保持原来的就不会变,所以就不必重新shuffle

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/761295.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Java+SpringBoot+vue的任务追踪管理系统设计与实现

博主介绍:✌擅长Java、微信小程序、Python、Android等,专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇🏻 不然下次找不到哟 Java项目精品实战案…

【代码随想录 | Leetcode | 第五天】链表 | 移除链表元素 | 设计链表 | 203-707

前言 欢迎来到小K的Leetcode|代码随想录|专题化专栏,今天将为大家带来移除链表元素和设计链表的分享✨ 目录 前言203. 移除链表元素707. 设计链表总结 203. 移除链表元素 ✨题目链接点这里 给你一个链表的头节点 head 和一个整数 val ,请你删除链表中所…

[Arduino] ESP32开发 - UDP收发数据

UDP 通信 UDP发送数据 再开始测试之前,请使用手机打开一个热点,并把电脑连接到手机热点上,这样子在后续测试中 ESP32 和电脑就会在同一个局域网(手机热点可以使用路由器代替) 新建任意文件,填入以下代码 …

linux日志文件

前言: 无论管理什么系统,对日志文件的监控、调用、管理都是其中重要的一部分。服务器问题的解决都是从查看系统(错误)日志开始的 一、作用: linux运行的程序通常把系统的消息和错误写入对应的日志文件,如L…

浙大数据结构第三周之03-树3 Tree Traversals Again

题目详情: An inorder binary tree traversal can be implemented in a non-recursive way with a stack. For example, suppose that when a 6-node binary tree (with the keys numbered from 1 to 6) is traversed, the stack operations are: push(1); push(2)…

Vue列表过滤(计算属性和监听属性实现)

filter&#xff1a; 过滤器 indexOf()&#xff1a;是否包含某某值 <body> <div id"root"><!--遍历数组--><h2>人员列表</h2><input type"text" placeholder"请输入名字" v-model"keyWord"><ul…

封装实验环境,助力观察MySQL binlog事件

维多利亚女王时代是聚积的年代&#xff1b;不仅仅是物质财富的聚积&#xff0c;而且是每一个国家能增强国力的所有因素与要素的增加和聚积。教育惠及社会各个阶层。科学打开大自然的无限宝库。宝库之门一扇一扇被打开。阴暗而神秘的宝库一个个被照亮&#xff0c;一个个被开发&a…

Gitlab 使用 docker buildx 多重构建镜像上传私有 Harbor与 Dockerhub

文章目录 1. 预备条件2. 安装 docker2.1 安装 docker buidx2.2 docker 配置2.3 安装 Buildx2.4 安装模拟器 3. 安装 git4. 安装 gitlab5. 部署 gitlab-runner6. 搭建 harbor7. 开发应用8. 配置 BuildKit8.1 Registry mirror8.2 设置镜像仓库正式 9. 编写 .gitlabs-ci.yaml 1. 预…

5月更新,docsify综合漏洞知识库!

项目介绍 一个知识库&#xff0c;集成了Vulhub、Peiqi、EdgeSecurity、0sec、Wooyun等开源漏洞库&#xff0c;涵盖OA、CMS、开发框架、网络设备、开发语言、操作系统、Web应用、Web服务器、应用服务器等多种漏洞。 关注【Hack分享吧】公众号&#xff0c;回复关键字【230428】获…

深度探讨大模型位置编码及其外推性

深度探讨大模型位置编码及其外推性 作者&#xff1a;王嘉宁&#xff0c;转载请注明出处&#xff1a;https://wjn1996.blog.csdn.net/article/details/120607050 现如今很多大模型都开始支持超过4096长度的推理&#xff0c;例如GPT-4支持超过30k&#xff0c;ChatGLM2-6B也支持最…

mysql数据库-----事务

目录 1.事务的概念 2. 事务的ACID特点 &#xff08;1&#xff09;脏读 &#xff08;2&#xff09;不可重复读 &#xff08;3&#xff09;幻读 &#xff08;4&#xff09; 丢失更新 1.事务的概念 MySQL 事务主要用于处理操作量大&#xff0c;复杂度高的数据。比如说&…

Spring IoC 自定义简单实现案例

IoC 极简实现案例 1.引入依赖 <dependencies><dependency><!--Dom4j是java的XML解析组件--><groupId>org.dom4j</groupId><artifactId>dom4j</artifactId><version>2.1.1</version></dependency><dependency…

从VAE到Diffusion生成模型详解(2.1):普通GAN的改进及变种

文章目录 1. JS散度的问题2. LSGAN(Least Square GAN)LSGAN目标函数 3. WGAN&#xff08;wasserstein GAN&#xff09;WGAN的目标函数 4. 条件GANcGANSGANACGANInfoGANtext2imageimage2image 参考 1. JS散度的问题 上一篇博客从VAE到Diffusion生成模型详解(2)&#xff1a;生成…

Node+MySQL+Vue2.0+elementUI实现的博客管理系统(一)

前端部分&#xff1a; Vue项目的入口文件main.js: //引入Vue import Vue from vue //引入App import App from ./App.vue //引入VueRouter import VueRouter from vue-router import router from ./router/index import Vuex from vuex import store from ./store //完整引入…

数学专题训练2 组合计数

1. 硬币购物 4 种面值的硬币&#xff0c;第 i 种的面值是 C i C_i Ci​​。 n n n​ 次询问&#xff0c;每次询问给出每种硬币的数量 D i D_i Di​​ 和一个价格 S S S​&#xff0c;问付款方式。 n ≤ 1 0 3 , S ≤ 1 0 5 n\leq 10^3,S\leq 10^5 n≤103,S≤105​. 如果用…

第一百零九天学习记录:C++实战:职工管理系统(黑马教学视频)

1、管理系统需求 代码进入一个 while 循环&#xff0c;不断从文件流 ifs 中读取数据。循环的条件是 ifs 从文件中读取 id、name 和 dId 三个值的操作都成功。如果读取成功&#xff0c;循环内部的代码块会执行一次&#xff0c; num 的值加一。 这段代码的作用是从文件中逐行读取…

ENSP路由器的基本命令操作(第十六课)

ENSP路由器的基本命令操作(第十六课) 1-1 代码重点回顾 1 ipconfig 查看 pc机的IP地址2 ping 连接的IP地址3 system-view 进入系统视图4 [Huawei]interface GigabitEthernet 0/0/1 进入接口视图 过0/0/15 quit 返回上一层视图6 ctrlZ 快速返回用户视图7 [sy]sysna…

【V8】【2. 内联函数、Slot】

什么是内联函数 在 V8 执行函数的过程中&#xff0c;会观察函数中一些调用点 (CallSite) 上的关键的中间数据&#xff0c;然后将这些数据缓存起来&#xff0c;当下次再次执行该函数的时候&#xff0c;V8 就可以直接利用这些中间数据&#xff0c;节省了再次获取这些数据的过程&…

PS图层混合模式超详细解答-图层混合模式的原理(Part2)

对比度组 叠加Overlay 此模式是正片叠底和滤色的组合&#xff0c;组合依据是底图的中性灰平面&#xff0c;如果在 [ 0 , 128 ] [0,128] [0,128]则使用正片叠底&#xff0c;若是在 ( 128 , 255 ] (128,255] (128,255]之间&#xff0c;则是实用滤色。 公式 r O v e r L a y (…

专项练习29

目录 一、选择题 1、如果修改obj里面的name属性时会发生什么&#xff1f; 2、以下代码执行后&#xff0c;a.x 和 b.x 的结果分别为&#xff08;&#xff09; 3、在ECMAScript6 中,Promise的状态有&#xff08;&#xff09; 4、下面哪些方式在同一个窗口下能够检测一个js对象是…