Spark优化----Spark 数据倾斜

news2024/12/18 14:57:28

目录

数据倾斜的表现:

定位数据倾斜问题:

解决方案一:聚合原数据

避免 shuffle 过程

缩小 key 粒度(增大数据倾斜可能性,降低每个 task 的数据量)

增大 key 粒度(减小数据倾斜可能性,增大每个 task 的数据量)

解决方案二:过滤导致倾斜的 key

解决方案三:提高 shuffle 操作中的 reduce 并行度

reduce 端并行度的设置

reduce 端并行度设置存在的缺陷

​​​​​​​解决方案四:使用随机 key 实现双重聚合

​​​​​​​解决方案五:将 reduce join 转换为 map join

核心思路:

不适用场景分析:

​​​​​​​解决方案六:sample 采样对倾斜 key 单独进行 join

适用场景分析:

不适用场景分析:

​​​​​​​解决方案七:使用随机数扩容进行 join

核心思想:

局限性:


        Spark 中的数据倾斜问题主要指 shuffle 过程中出现的数据倾斜问题,是由于不同的 key对应的数据量不同导致的不同 task 所处理的数据量不同的问题。

        例如,reduce 点一共要处理 100 万条数据,第一个和第二个 task 分别被分配到了 1 万条数据,计算 5 分钟内完成,第三个 task 分配到了 98 万数据,此时第三个 task 可能需要 10 个小时完成,这使得整个 Spark 作业需要 10 个小时才能运行完成,这就是数据倾斜所带来的后果。

        注意,要区分开数据倾斜与数据量过量这两种情况,数据倾斜是指少数 task 被分配了绝大多数的数据,因此少数 task 运行缓慢;数据过量是指所有 task 被分配的数据量都很大, 相差不多,所有task 都运行缓慢。

数据倾斜的表现:

  • Spark 作业的大部分 task 都执行迅速,只有有限的几个 task 执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;
  • Spark 作业的大部分 task 都执行迅速,但是有的 task 在运行过程中会突然报出 OOM, 反复执行几次都在某一个 task 报出 OOM 错误,此时可能出现了数据倾斜,作业无法正常运行。

定位数据倾斜问题:

  • 查阅代码中的 shuffle 算子,例如 reduceByKey、countByKey、groupByKey、join 等算子,根据代码逻辑判断此处是否会出现数据倾斜;
  • 查看 Spark 作业的 log 文件,log 文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个 stage,对应的 shuffle 算子是哪一个;

解决方案一:聚合原数据

避免 shuffle 过程

        绝大多数情况下,Spark 作业的数据来源都是 Hive 表,这些 Hive 表基本都是经过 ETL 之后的昨天的数据。为了避免数据倾斜,我们可以考虑避免 shuffle 过程,如果避免了 shuffle 过程,那么从根本上就消除了发生数据倾斜问题的可能。

        如果 Spark 作业的数据来源于Hive 表,那么可以先在 Hive 表中对数据进行聚合,例如按照key 进行分组,将同一key 对应的所有value 用一种特殊的格式拼接到一个字符串里去,这样,一个 key 就只有一条数据了;之后,对一个 key 的所有 value 进行处理时,只需要进行map 操作即可,无需再进行任何的 shuffle 操作。通过上述方式就避免了执行 shuffle 操作, 也就不可能会发生任何的数据倾斜问题。

        对于 Hive 表中数据的操作,不一定是拼接成一个字符串,也可以是直接对 key 的每一条数据进行累计计算。

        要区分开,处理的数据量大和数据倾斜的区别。

缩小 key 粒度(增大数据倾斜可能性,降低每个 task 的数据量)

key 的数量增加,可能使数据倾斜更严重。

增大 key 粒度(减小数据倾斜可能性,增大每个 task 的数据量)

        如果没有办法对每个 key 聚合出来一条数据,在特定场景下,可以考虑扩大 key 的聚合粒度。

        例如,目前有 10 万条用户数据,当前key 的粒度是(省,城市,区,日期),现在我们考虑扩大粒度,将key 的粒度扩大为(省,城市,日期),这样的话,key 的数量会减少,key 之间的数据量差异也有可能会减少,由此可以减轻数据倾斜的现象和问题。(此方法只针对特定类型的数据有效,当应用场景不适宜时,会加重数据倾斜)

解决方案二:过滤导致倾斜的 key

        如果在 Spark 作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的 key 进行过滤,滤除可能导致数据倾斜的 key 对应的数据,这样,在 Spark 作业中就不会发生数据倾斜了。

​​​​​​​解决方案三:提高 shuffle 操作中的 reduce 并行度

        当方案一和方案二对于数据倾斜的处理没有很好的效果时,可以考虑提高 shuffle 过程中的 reduce 端并行度,reduce 端并行度的提高就增加了 reduce 端 task 的数量,那么每个 task 分配到的数据量就会相应减少,由此缓解数据倾斜问题。

reduce 端并行度的设置

        在大部分的 shuffle 算子中,都可以传入一个并行度的设置参数,比如 reduceByKey(500), 这个参数会决定 shuffle 过程中 reduce 端的并行度,在进行 shuffle 操作的时候,就会对应着创建指定数量的reduce task。对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等, 需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度, 该值默认是 200,对于很多场景来说都有点过小。

        增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个key 分配给多个 task, 从而让每个task 处理比原来更少的数据。举例来说,如果原本有 5 个 key,每个 key 对应 10 条数据,这 5 个 key 都是分配给一个 task 的,那么这个 task 就要处理 50 条数据。而增加了shuffle read task 以后,每个 task 就分配到一个key,即每个 task 就处理 10 条数据,那么自然每个 task 的执行时间都会变短了。

reduce 端并行度设置存在的缺陷

        提高 reduce 端并行度并没有从根本上改变数据倾斜的本质和问题(方案一和方案二从根本上避免了数据倾斜的发生),只是尽可能地去缓解和减轻 shuffle reduce task 的数据压力, 以及数据倾斜的问题,适用于有较多 key 对应的数据量都比较大的情况。

        该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,这个对应着 100 万数据的 key 肯定还是会分配到一个 task 中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

        在理想情况下,reduce 端并行度提升后,会在一定程度上减轻数据倾斜的问题,甚至基本消除数据倾斜;但是,在一些情况下,只会让原来由于数据倾斜而运行缓慢的 task 运行速度稍有提升,或者避免了某些 task 的OOM 问题,但是,仍然运行缓慢,此时,要及时放弃方案三,开始尝试后面的方案。

​​​​​​​解决方案四:使用随机 key 实现双重聚合

        当使用了类似于 groupByKey、reduceByKey 这样的算子时,可以考虑使用随机 key 实现双重聚合,如图所示:

        首先,通过 map 算子给每个数据的 key 添加随机数前缀,对 key 进行打散,将原先一样的 key 变成不一样的 key,然后进行第一次聚合,这样就可以让原本被一个 task 处理的数据分散到多个task 上去做局部聚合;随后,去除掉每个 key 的前缀,再次进行聚合。

        此方法对于由 groupByKey、reduceByKey 这类算子造成的数据倾斜由比较好的效果, 仅仅适用于聚合类的 shuffle 操作,适用范围相对较窄。如果是 join 类的 shuffle 操作,还得用其他的解决方案。

        此方法也是前几种方案没有比较好的效果时要尝试的解决方案。

​​​​​​​解决方案五:将 reduce join 转换为 map join

        正常情况下,join 操作都会执行 shuffle 过程,并且执行的是 reduce join,也就是先将所有相同的 key 和对应的 value 汇聚到一个 reduce task 中,然后再进行join。普通 join 的过程如下图所示:

        普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个RDD 是比较小的,则可以采用广播小 RDD 全量数据+map 算子来实现与join 同样的效果,也就是 map join, 此时就不会发生 shuffle 操作,也就不会发生数据倾斜。

        (注意,RDD 是并不能进行广播的,只能将 RDD 内部的数据通过 collect 拉取到 Driver 内存然后再进行广播)

核心思路:

        不使用 join 算子进行连接操作,而使用 Broadcast 变量与 map 类算子实现 join 操作,进而完全规避掉 shuffle 类的操作,彻底避免数据倾斜的发生和出现。将较小 RDD 中的数据直接通过 collect 算子拉取到Driver 端的内存中来,然后对其创建一个 Broadcast 变量;接着对另外一个RDD 执行 map 类算子,在算子函数内,从 Broadcast 变量中获取较小 RDD 的全量数据,与当前RDD 的每一条数据按照连接 key 进行比对,如果连接 key 相同的话,那么就将两个 RDD 的数据用你需要的方式连接起来。

        根据上述思路,根本不会发生 shuffle 操作,从根本上杜绝了 join 操作可能导致的数据倾斜问题。

        当 join 操作有数据倾斜问题并且其中一个 RDD 的数据量较小时,可以优先考虑这种方式,效果非常好。map join 的过程如图所示:

不适用场景分析:

        由于Spark 的广播变量是在每个Executor 中保存一个副本,如果两个RDD 数据量都比较大, 那么如果将一个数据量比较大的 RDD 做成广播变量,那么很有可能会造成内存溢出。

​​​​​​​解决方案六:sample 采样对倾斜 key 单独进行 join

        在 Spark 中,如果某个 RDD 只有一个 key,那么在 shuffle 过程中会默认将此 key 对应的数据打散,由不同的 reduce 端 task 进行处理。

        当由单个 key 导致数据倾斜时,可有将发生数据倾斜的 key 单独提取出来,组成一个RDD,然后用这个原本会导致倾斜的 key 组成的 RDD 根其他 RDD 单独 join,此时,根据Spark 的运行机制,此 RDD 中的数据会在 shuffle 阶段被分散到多个 task 中去进行 join 操作。倾斜 key 单独 join 的流程如图所示:

适用场景分析:

        对于RDD 中的数据,可以将其转换为一个中间表,或者是直接使用 countByKey()的方式,看一个这个 RDD 中各个 key 对应的数据量,此时如果你发现整个 RDD 就一个 key 的数据量特别多,那么就可以考虑使用这种方法。

        当数据量非常大时,可以考虑使用 sample 采样获取 10%的数据,然后分析这 10%的数据中哪个 key 可能会导致数据倾斜,然后将这个 key 对应的数据单独提取出来。

不适用场景分析:

如果一个RDD 中导致数据倾斜的 key 很多,那么此方案不适用。

​​​​​​​解决方案七:使用随机数扩容进行 join

        如果在进行 join 操作时,RDD 中有大量的 key 导致数据倾斜,那么进行分拆 key 也没什么意义,此时就只能使用最后一种方案来解决问题了,对于 join 操作,我们可以考虑对其中一个 RDD 数据进行扩容,另一个RDD 进行稀释后再 join。

        我们会将原先一样的 key 通过附加随机前缀变成不一样的 key,然后就可以将这些处理后的“不同 key”分散到多个 task 中去处理,而不是让一个 task 处理大量的相同 key。这一种方案是针对有大量倾斜 key 的情况,没法将部分 key 拆分出来进行单独处理,需要对整个RDD 进行数据扩容,对内存资源要求很高。

核心思想:

        选择一个 RDD,使用 flatMap 进行扩容,对每条数据的 key 添加数值前缀(1~N 的数值),将一条数据映射为多条数据;(扩容)

        选择另外一个 RDD,进行map 映射操作,每条数据的 key 都打上一个随机数作为前缀(1~N 的随机数);(稀释)

将两个处理后的RDD,进行 join 操作。

局限性:

        如果两个RDD 都很大,那么将RDD 进行 N 倍的扩容显然行不通; 使用扩容的方式只能缓解数据倾斜,不能彻底解决数据倾斜问题。使用方案七对方案六进一步优化分析:

        当 RDD 中有几个 key 导致数据倾斜时,方案六不再适用,而方案七又非常消耗资源,此时可以引入方案七的思想完善方案六:

  • 对包含少数几个数据量过大的 key 的那个 RDD,通过 sample 算子采样出一份样本来,然后统计一下每个 key 的数量,计算出来数据量最大的是哪几个key。
  • 然后将这几个 key 对应的数据从原来的RDD 中拆分出来,形成一个单独的 RDD,并给每个 key 都打上n 以内的随机数作为前缀,而不会导致倾斜的大部分 key 形成另外一个RDD。
  • 接着将需要 join 的另一个 RDD,也过滤出来那几个倾斜 key 对应的数据并形成一个单独的 RDD,将每条数据膨胀成 n 条数据,这 n 条数据都按顺序附加一个 0~n 的前缀, 不会导致倾斜的大部分 key 也形成另外一个RDD。
  • 再将附加了随机前缀的独立 RDD 与另一个膨胀 n 倍的独立 RDD 进行 join,此时就可以将原先相同的 key 打散成 n 份,分散到多个 task 中去进行 join 了。
  • 而另外两个普通的RDD 就照常 join 即可。
  • 最后将两次 join 的结果使用 union 算子合并起来即可,就是最终的 join 结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2261644.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第十六届蓝桥杯模拟赛(第一期)-Python

本次模拟赛我认为涉及到的知识点: 分解质因数 Python的datetime库 位运算 简单dp 1、填空题 【问题描述】 如果一个数 p 是个质数,同时又是整数 a 的约数,则 p 称为 a 的一个质因数。 请问 2024 有多少个质因数。 【答案提交】 这是一道结…

MATLAB2021B APP seriallist 串口通信

文章目录 前言一、项目需要二、使用步骤1.查找串口填写到查找列表2.发送函数3. 接收函数4.检测串口按钮5.选择串口号 总结 前言 提示:这里可以添加本文要记录的大概内容: 项目需要: 提示:以下是本篇文章正文内容,下面…

OpenShift 4 - 多云管理(2) - 配置多集群观察功能

《OpenShift / RHEL / DevSecOps 汇总目录》 本文在 OpenShift 4.17 RHACM 2.12 环境中进行验证。 文章目录 多集群观察技术架构安装多集群观察功能监控多集群的运行状态监控多集群的应用运行在被管集群监控应用运行在管理集群监控被管集群的应用运行 参考 多集群观察技术架构…

AMBA-CHI协议详解(十二)

AMBA-CHI协议详解(一)- Introduction AMBA-CHI协议详解(二)- Channel fields / Read transactions AMBA-CHI协议详解(三)- Write transactions AMBA-CHI协议详解(四)- Other transac…

【win10+RAGFlow+Ollama】搭建本地大模型助手(教程+源码)

一、RAGFlow简介 RAGFlow是一个基于对文档深入理解的开源RAG(Retrieval-augmented Generation,检索增强生成)引擎。 主要作用: 让用户创建自有知识库,根据设定的参数对知识库中的文件进行切块处理,用户向大…

Android中坐标体系知识超详细讲解

说来说去都不如画图示意简单易懂啊!!!真是的! 来吧先上张图! (一)首先明确一下android 中的坐标系统: 屏幕的左上角是坐标系统原点(0,0) 原点向右延伸是X轴正…

IO的进阶

目录 1. 字符流转向字节流的桥梁1.1 OutputStreamWriter1.2 InputStreamReader1.3 编码与解码1.4 常见编码方式1.5 编码与解码的注意事项 2.Properties2.1概述2.2 Properties 的常用方法2.3 Properties 的应用场景2.4 实例 3.序列化3.1 ObjectOutputStream 4.字符编码4.1 ASCII…

【计算机网络】期末考试预习复习|中

作业讲解 转发器、网桥、路由器和网关(4-6) 作为中间设备,转发器、网桥、路由器和网关有何区别? (1) 物理层使用的中间设备叫做转发器(repeater)。 (2) 数据链路层使用的中间设备叫做网桥或桥接器(bridge)。 (3) 网络层使用的中间设备叫做路…

Edge Scdn用起来怎么样?

Edge Scdn:提升网站安全与性能的最佳选择 在当今互联网高速发展的时代,各种网络攻击层出不穷,特别是针对网站的DDoS攻击威胁,几乎每个行业都可能成为目标。为了确保网站的安全性与稳定性,越来越多的企业开始关注Edge …

UE4_控件蓝图_制作3D生命血条

一:效果图如下: 二、实现步骤: 1、新建敌人 右键蓝图类 选择角色, 重命名为BP_Enemytest。 双击打开,配置敌人网格体 修改位置及朝向 效果如下: 选择合适的动画蓝图类: 人物就有了动作&#x…

厦门凯酷全科技有限公司引领电商营销新风尚

在当今数字化经济快速发展的背景下,抖音作为领先的短视频和直播平台,已成为品牌推广和产品销售的重要渠道。厦门凯酷全科技有限公司(以下简称“凯酷全”)凭借其专业的团队和丰富的经验,专注于为客户提供高质量的抖音电…

高扬程潜水泵:大流量与高效率的完美结合|深圳鼎跃

洪水是由暴雨、风暴潮等等自然因素引起的江河湖海水量迅速增加或水位迅猛上涨的水流现象。一旦发生洪水事件,会侵袭河道沿岸的城市、农田等场景,在低洼地区容易形成积水,不仅影响人们的生活,还存在一定的安全风险。 高扬程潜水泵是…

神经网络基础-神经网络搭建和参数计算

文章目录 1.构建神经网络2. 神经网络的优缺点 1.构建神经网络 在 pytorch 中定义深度神经网络其实就是层堆叠的过程,继承自nn.Module,实现两个方法: __init__方法中定义网络中的层结构,主要是全连接层,并进行初始化。…

web网页前后端交互方式

参考该文&#xff0c; 一、前端通过表单<form>向后端发送数据 前端是通过html中的<form>表单&#xff0c;设置method属性定义发送表单数据的方式是get还是post。 如使用get方式&#xff0c;则提交的数据会在url中显示&#xff1b;如使用post方式&#xff0c;提交…

Mac配置 Node镜像源的时候报错解决办法

在Mac电脑中配置国内镜像源的时候报错,提示权限问题,无法写入配置文件。本文提供解决方法,青测有效。 一、原因分析 遇到的错误是由于 .npm 目录下的文件被 root 用户所拥有,导致当前用户无法写入相关配置文件。 二、解决办法 在终端输入以下命令,输入管理员密码即可。 su…

Linux实操篇-远程登录/Vim/开机重启

目录 传送门前言一、远程登录1、概念2、ifconfig3、实战3.1、SSH&#xff08;Secure Shell&#xff09;3.2、VNC&#xff08;Virtual Network Computing&#xff09;3.3、RDP&#xff08;Remote Desktop Protocol&#xff09;3.4、Telnet&#xff08;不推荐&#xff09;3.5、FT…

【C/C++进阶】CMake学习笔记

本篇文章包含的内容 一、CMake简介二、使用CMake构建工程2.1 一个最简单的CMake脚本2.2 使用变量和宏2.3 文件搜索 三、使用CMake制作和使用库文件3.1 静态库和动态库3.2 字符串操作3.3 CMake制作库文件3.4 CMake使用库文件3.4.1 使用link_libraries链接3.4.2 使用target_link_…

JS 生成防篡改水印

网页中有水印的需求&#xff0c;今天我们实现手写一个防篡改水印&#xff0c;先看下效果图&#xff1a; 一、创建class函数 传递一个dom为水印包裹器&#xff0c;有一些监听防篡改的observer&#xff0c;然后实例化的时候创建水印&#xff0c;执行create()方法 class WaterMa…

概率论得学习和整理26:EXCEL 关于plot 折线图--频度折线图的一些细节

目录 0 折线图有很多 1 频度折线图 1.1 直接用原始数据做的频度折线图 2 将原始数据生成数据透视表 3 这样可以做出了&#xff0c;频度plot 4 做按某字段汇总&#xff0c;成为累计plot分布 5 修改上面显示效果&#xff0c;做成百分比累计plot频度分布 0 折线图有很多 这…

实现echart大屏动画效果及全屏布局错乱解决方式

如何实现echarts动画效果?如何实现表格或多个垂直布局的柱状图自动滚动效果?如何解决tooltip位置超出屏幕问题,如何解决legend文字过长,布局错乱问题?如何处理饼图的中心图片永远居中? 本文将主要解决以上问题,如有错漏,请指正. 一、大屏动画效果 这里的动画效果主要指&…