【Spark的五种Join策略解析】

news2024/11/23 14:34:25

join基本流程

Spark将参与Join的两张表抽象为流式遍历表(streamIter)和查找表(buildIter),通常streamIter为大表,buildIter为小表,我们不用担心哪个表为streamIter,哪个表为buildIter,这个spark会根据join语句自动帮我们完成。

img

对于每条来自streamIter的记录,都要去buildIter中查找匹配的记录,所以buildIter一定要是查找性能较优的数据结构。spark提供了三种join实现:sort merge join、broadcast join以及hash join。

五种join策略

  • Shuffle Hash Join
  • Broadcast Hash Join
  • Sort Merge Join
  • Cartesian Join
  • Broadcast Nested Loop Join

大表join小表

Shuffle Hash Join

image-20220929103146591

Join 步骤:把大表和小表按照相同的分区算法和分区数进行分区(Join 的 keys 进行分区),保证了 hash 值一样(相同key)的数据都分发到同一个分区中(分区内不排序),然后在同一个 Executor 中两张表 hash 值一样的分区就可以在本地进行 hash Join 。在进行 Join 之前,还会对小表的分区构建 Hash 桶(这就要求每个分区都不能太大),便于查找。

注意,和broadcast hash join的区别,这里并没有广播小表,在双方shuffle后的分区内,小表转成Hash桶与大表进行hash join。

苛刻的条件:

  • buildIter总体估计大小超过spark.sql.autoBroadcastJoinThreshold设定的值,即不满足broadcast join条件
  • 开启尝试使用hash join的开关,spark.sql.join.preferSortMergeJoin=false
  • 每个分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold设定的值,即shuffle read阶段每个分区来自buildIter的记录要能放到内存中
  • streamIter的大小是buildIter三倍以上

特点:

  • 仅支持等值连接,join key不需要排序
  • 支持除了全外连接(full outer joins)之外的所有join类型
  • 需要对小表构建Hash map,属于内存密集型的操作,如果构建Hash表的一侧数据比较大,可能会造成OOM,不适合严重倾斜的join
  • 对于FullOuter Join,需要建立双向hash表,代价太大。因此FullOuterJoin默认都是基于SortJoin来实现

Broadcast Hash Join

image-20220929103203808

将小表的数据广播到 Spark 所有的 Executor 端,只能用于等值连接。避免了 Shuffle 操作。一般而言,Broadcast Hash Join 会比其他 Join 策略执行的要快。因为他直接在一个map中完成了,也称之为map join

Join 步骤:

  • 利用 collect 算子将小表的数据从 Executor 端拉到 Driver 端
  • 在 Driver 端调用 sparkContext.broadcast 广播到所有 Executor 端
  • 在 Executor 端使用广播的数据与大表进行 Join 操作

使用条件:

  • 必须为等值连接,不要求 Join 的 keys 可排序
  • 小表大小小于 spark.sql.autoBroadcastJoinThreshold(default 10M)设定的值

Broadcast Nested Loop Join

该方式是在没有合适的JOIN机制可供选择时,最终会选择该种join策略。优先级为:Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join.

最小的数据集被广播到另一个数据集的每个分区上,执行一个嵌套循环来执行join, 也就是说数据集1的每条记录都尝试join数据集2的每条记录(最笨的方法),效率比较低。既可以做等值join也可以做非等值join,而且是非等值join的默认策略。

没有排序,就是广播小表到每个分区上,尝试join每条记录,效率低!

大表之间join

Sort Merge Join

image-20220929103221072

先hash到同一个分区且排好序,然后再在分区内顺序查找比对

对表的大小没有条件,不管分区多大,SortMergeJoin 都不用把一侧的数据全部加载到内存中,而是即用即丢;两个序列都有序。从头遍历,碰到 key 相同的就输出,如果不同,左边小就继续取左边,反之取右边,由于排序的特性,每次处理完一条记录后只需要从上一次结束的位置开始查找,SortMergeJoinExec执行时就能够避免大量无用的操作,提高了大数据量下sql join 的稳定性。

Join 步骤:

  • shuffle: 将两张表按照 join key 进行shuffle,保证join key值相同的记录会被分在相应的分区
  • sort: 对每个分区内的数据进行排序
  • merge: 排序后再对相应的分区内的记录进行连接

使用条件:

  • 等值连接
  • 参与 join 的 key 可排序

Cartesian Join

笛卡尔积

如果左表有n个分区,右表有m个分区,那么笛卡尔积后的分区数是K=n * m个;并且这K个分区中,第K(i)个分区获取的左表分区为 kn=i / m,获取的右表分区为 km=i % m,然后kn和km这两个分区做笛卡尔积;由于是以分区为单位,所以不会触发shuffle;

join策略选择

等值连接的情况

有join提示(hints)的情况,按照下面的顺序

  • Broadcast Hint:如果join类型支持,则选择broadcast hash join
  • Sort merge hint:如果join key是排序的,则选择 sort-merge join
  • shuffle hash hint:如果join类型支持, 选择 shuffle hash join
  • shuffle replicate NL hint: 如果是内连接,选择笛卡尔积方式

没有join提示(hints)的情况,则逐个对照下面的规则

image-20220929194248135
  • 如果join类型支持,并且其中一张表能够被广播(spark.sql.autoBroadcastJoinThreshold值,默认是10MB),则选择 broadcast hash join
  • 如果参数spark.sql.join.preferSortMergeJoin设定为false,且一张表足够小(可以构建一个hash map) ,则选择shuffle hash join
  • 如果join keys 是排序的,则选择sort-merge join
  • 如果是内连接,选择 cartesian join
  • 没有可以选择的执行策略,则最终选择broadcast nested loop join,即使可能会发生OOM

非等值连接情况

有join提示(hints),按照下面的顺序

  • broadcast hint:选择broadcast nested loop join.
  • shuffle replicate NL hint: 如果是内连接,则选择cartesian product join

没有join提示(hints),则逐个对照下面的规则

  • 如果一张表足够小(可以被广播),则选择 broadcast nested loop join
  • 如果是内连接,则选择cartesian product join
  • 如果可能会发生OOM或者没有可以选择的执行策略,则最终选择broadcast nested loop join

实验

非等值连接默认是BroadcastNestedLoopJoin

scala> spark.conf.get("spark.sql.join.preferSortMergeJoin")
res1: String = true

scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
res2: String = 10485760

scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)
data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50)

scala> val df1 = data1.toDF("id1")
df1: org.apache.spark.sql.DataFrame = [id1: int]

scala> val data2 = Seq(30, 20, 40, 50)
data2: Seq[Int] = List(30, 20, 40, 50)

scala> val df2 = data2.toDF("id2")
df2: org.apache.spark.sql.DataFrame = [id2: int]

scala> val dfJoined = df1.join(df2, $"id1" >= $"id2")   //非等值连接
dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
// 注意查看执行计划是BroadcastNestedLoopJoin
scala> dfJoined.queryExecution.executedPlan
res3: org.apache.spark.sql.execution.SparkPlan =
BroadcastNestedLoopJoin BuildRight, Inner, (id1#3 >= id2#8)
:- LocalTableScan [id1#3]
+- BroadcastExchange IdentityBroadcastMode
   +- LocalTableScan [id2#8]
image-20220929190714574

shuffle hash join

scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)

scala> spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

scala> val dfhashJoined = df1.join(df2, $"id1" === $"id2")    //等值连接
dfhashJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]

scala> dfhashJoined.queryExecution.executedPlan
res7: org.apache.spark.sql.execution.SparkPlan =
ShuffledHashJoin [id1#3], [id2#8], Inner, BuildRight
:- Exchange hashpartitioning(id1#3, 200)
:  +- LocalTableScan [id1#3]
+- Exchange hashpartitioning(id2#8, 200)
   +- LocalTableScan [id2#8]
image-20220929191340105

sort MergeJoin

scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

scala> spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

scala> val sortJoined = df1.join(df2, $"id1" === $"id2")
sortJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]

scala> sortJoined.queryExecution.executedPlan
res11: org.apache.spark.sql.execution.SparkPlan =
*(3) SortMergeJoin [id1#3], [id2#8], Inner
:- *(1) Sort [id1#3 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id1#3, 200)
:     +- LocalTableScan [id1#3]
+- *(2) Sort [id2#8 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id2#8, 200)
      +- LocalTableScan [id2#8]

scala> sortJoined.show
image-20220929193014333

spark3 join策略提示

  1. Broadcast HashJoin
有三种方式
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;

SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;
  1. shuffle sort merge Join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
  1. shuffle Hash Join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1003662.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

硬不硬你说了算!近 40 张图解被问千百遍的 TCP 三次握手和四次挥手面试题

前言 不管面试 Java 、C/C、Python 等开发岗位, TCP 的知识点可以说是的必问的了。 任 TCP 虐我千百遍,我仍待 TCP 如初恋。 遥想小林当年校招时常因 TCP 面试题被刷,真是又爱又狠…. 过去不会没关系,今天就让我们来消除这份恐…

hadoop运行WordCount时,Input path does not exist错误原因

修改配置文件core-site.xml 为如下所示 vim /usr/local/hadoop/etc/hadoop/core-site.xmlxml文件改为 <configuration></configuration>如果将core-site.xml文件变动为原来的内容&#xff0c;则程序将不再去hdfs://localhost:9000下寻找input文件&#xff0c;而是…

解决uniapp局部页面(scrollview)下拉刷新出现不能复位的问题

问题所在&#xff1a;&#xff08;困扰了我一天&#xff09; 局部页面自定义刷新出现下拉页面不能复位的问题 看下解决后的效果 废话不多说直接上代码 <view class"content"><scroll-view class"section" :style"{height:300px}" :re…

IDEA设置方法分割线

IDEA设置方法分割线 最近在学习视频教程的时候&#xff0c;总是能看到一些大佬用的IDEA与自己的仿佛不是一个软件&#xff01; 如下图&#xff1a; 看到一些老师用IDEA时&#xff0c;方法上都会有一条横线。感觉这样很方便&#xff0c;于是乎自己设置了一下。现在分享给大家&…

react处理跨域

如果是新建的react项目&#xff0c;没有将webpack的配置文件释放出来的话&#xff0c;请先运行 npm run eject 根目录会出现config文件夹&#xff0c;找到path.js就可以看到proxy的配置&#xff0c;默认读取的是src/setupProxy.js 那么我们可以在src目录下新建setupProxy.js…

快解析内网穿透如何帮你轻松实现外网远程连接?

外网相信大家多少了解一点&#xff0c;其实就是连接不同地区局域网&#xff0c;或者是城域网计算机通信的远程网&#xff0c;因此被称为广域网或者公网。在这个互联网信息时代&#xff0c;很多用户还是不知道怎么连接外网&#xff1f;外网远程桌面连接的步骤是怎样的&#xff1…

【Samba】win 11 不允许一个用户使用一个以上用户名与服务器或共享资源的多重连接

win11 遇到不允许一个用户使用一个以上用户名与服务器或共享资源的多重连接 原因 之前使用两个用户登录过&#xff0c;没有释放 解决方案1 1、打开cmd命令窗口&#xff1a; net use * /del /y 命令中断开所有连接. 2、重新登录 解决方案2 删除后重启

【Linux问题】日期校准

问题 请求阿里云对象存储返回 The difference between the reguest time and the current time is too large. 规定时间和当前时间之间的差异太大。 由于虚拟机出现问题导致服务器时间不准 正常的服务器时间 异常的服务器时间 设置一下时间就好 校准时间 安装 yum insta…

winscope怎么实现user版本上导出方案设计探讨-千里马android framework车载车机手机系统开发

背景 在马哥给讲解怎么用winscope来分析各种闪黑&#xff0c;黑屏等问题后&#xff0c;很多买课的同学都开始使用这个工具用于实际公司的项目了&#xff0c;但是很多同学又开始发现有一个问题&#xff0c;那就发现在user版本的手机设备上发现无法抓取相关的winscope&#xff0…

在 CentOS 7 上安装中文字体

在 CentOS 7 上安装中文字体 1. 安装中文字体包&#xff1a;2. 配置字体&#xff1a;3. 更新字体缓存&#xff1a; 在 CentOS 7 上安装中文字体可以按照以下步骤进行&#xff1a; 1. 安装中文字体包&#xff1a; sudo yum install -y fontconfig sudo yum install -y cjkuni-…

MobaXterm-Chinese中文版本安装及简单使用

项目原地址&#xff1a; https://mobaxterm.mobatek.net/download.html 1.开源中文版 项目地址&#xff1a; https://github.com/RipplePiam/MobaXterm-Chinese-Simplified?loginfrom_csdn 本仓库旨在对 MobaXterm Home Edition &#xff08;Portable edition&#xff09; 进…

新闻轮播图

一、效果图 二、vue中html部分 <div class"swiper-container" ref"swiperDom"><div class"swiper-wrapper"><div class"swiper-slide" v-for"(item,index) in newsImageList" :key"index"><…

【语义分割 01】Open MMLab介绍

1 Tutorial https://github.com/TommyZihao/MMSegmentation_Tutorials https://github.com/TommyZihao/Train_Custom_Dataset https://github.com/TommyZihao/aidlux_tutorial OpenMMLab是一个由中国开发者主导的具有世界影响力的人工智能计算机视觉开源算法体系, 至今已经开…

剪辑的视频太大怎么办?一分钟学会压缩视频

当我们剪辑视频时&#xff0c;常常会遇到视频文件过大&#xff0c;导致传输不变、存储空间不足等问题&#xff0c;那么如何解决这个问题呢&#xff1f;下面就给大家分享几个压缩视频文件大小的方法&#xff0c;轻松解决剪辑视频太大的问题~ 一、使用视频压缩工具 这边给大家分…

树型结构和二叉树的概念及特性

目录 1. 树型结构 1.1 树的概念 1.2重要专有名词概念 1.3 树的表示形式 1.4 树的应用 ​编辑 2. 二叉树 2.1 概念 2.2 两种特殊的二叉树 2.3 二叉树的性质 3.有关二叉树性质的练习题 1. 树型结构 1.1 树的概念 树是一种 非线性 的数据结构&#xff0c;它是由 n …

CAD ObjectArx 二次开发 创建工具栏实现点击button出现抽屉式菜单

实现在CAD中创建工具栏并添加菜单命令&#xff0c;如下图 参照文章&#xff1a; cad—菜单&#xff0c;工具栏&#xff0c;屏幕菜单&#xff0c;增强工具栏 主要实现路径是通过创建一个可停靠窗口&#xff0c;并在其中创建toolbutton并给button点击事件添加命令&#xff0c;将…

【EI会议征稿】2023年第五届光电材料与器件国际学术会议(ICOMD 2023)

2023年第五届光电材料与器件国际学术会议&#xff08;ICOMD 2023&#xff09; 2023 5th International Conference on Optoelectronic Materials and Devices (ICOMD 2023) 第五届光电材料与器件国际学术会议&#xff08;ICOMD 2023&#xff09;将于2023年11月17-19日在中国重…

常见的除静电设备有哪些?

在工业生产中&#xff0c;往往会产生大量的静电和灰尘&#xff0c;影响设备运行效率。这时候我们就需要除静电设备了&#xff0c;常用到的静电消除器有离子风枪、离子风机、离子风棒、离子风嘴等。 离子风枪是一种手持式静电消除器&#xff0c;风力大、除尘效果好是它的特点。…

OJ练习第169题——课程表 IV

课程表 IV 力扣链接&#xff1a;1462. 课程表 IV 题目描述 你总共需要上 numCourses 门课&#xff0c;课程编号依次为 0 到 numCourses-1 。你会得到一个数组 prerequisite &#xff0c;其中 prerequisites[i] [ai, bi] 表示如果你想选 bi 课程&#xff0c;你 必须 先选 a…

Web系统常见漏洞修复

背景&#xff1a; 在工作中&#xff0c;我们的交付团队在交付项目时&#xff0c;可能会遇到甲方会使用一些第三方工具&#xff08;奇安信等&#xff09;对项目代码进行扫描&#xff0c;特别是一些对安全性要求比较高的企业&#xff0c;比如涉及到一些证券公司、银行、金融等。他…