如何定位Spark数据倾斜问题,解决方案

news2024/11/24 4:00:55

文章目录

  • 前言
  • 一、数据倾斜和数据过量
  • 二、 数据倾斜的表现
  • 三、定位数据倾斜问题
    • 定位思路:查看任务-》查看Stage-》查看代码
  • 四、7种典型的数据倾斜场景
    • 解决方案一:聚合元数据
    • 解决方案二:过滤导致倾斜的key
    • 解决方案三:提高shuffle操作中的reduce并行度
    • 解决方案四:使用随机key实现双重聚合
    • 解决方案五:将reduce join转换为map join
    • 解决方案六:sample采样对倾斜key单独进行join
    • 解决方案七:使用随机数以及扩容进行join
  • 总结


前言

在Spark任务运行过程中,数据倾斜的情况是比较常见的,通常解决的方法有:修改任务的并行度或是将key打散的方式进行优化,下面循序渐进地介绍几种常见的倾斜场景和解决方案。

一、数据倾斜和数据过量

Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题

例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配到了1万条数据,计算5分钟内完成,第三个task分配到了98万数据,此时第三个task可能需要10个小时完成,这使得整个Spark作业需要10个小时才能运行完成,这就是数据倾斜所带来的后果数据倾斜俩大直接致命后果
1)数据倾斜直接会导致一种情况:Out Of Memory
2)运行速度慢
在这里插入图片描述
注意,要区分开数据倾斜与数据量过量这两种情况,数据倾斜是指少数task被分配了绝大多数的数据,因此少数task运行缓慢;数据过量是指所有task被分配的数据量都很大,相差不多,所有task都运行缓慢

二、 数据倾斜的表现

1)Spark作业的大部分task都执行迅速,只有有限的几个task执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢

2)Spark作业的大部分task都执行迅速,但是有的task在运行过程中会突然报出OOM,反复执行几次都在某一个task报出OOM错误,此时可能出现了数据倾斜,作业无法正常运行

三、定位数据倾斜问题

定位思路:查看任务-》查看Stage-》查看代码

某个task执行特别慢的情况
某个task莫名其妙内存溢出的情况
查看导致数据倾斜的key的数据分布情况
1)查阅代码中的shuffle算子,例如reduceByKey、countByKey、groupByKey、distinct、aggregateByKey、join、cogroup、repartition等算子,根据代码逻辑判断此处是否会出现数据倾斜
2)查看Spark作业的log文件,log文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage,对应的shuffle算子是哪一个

四、7种典型的数据倾斜场景

1)数据源中的数据分布不均匀,Spark需要频繁交互
2)数据集中的不同Key由于分区方式,导致数据倾斜
3)JOIN操作中,一个数据集中的数据分布不均匀,另一个数据集较小(主要)
4)聚合操作中,数据集中的数据分布不均匀(主要)
5)JOIN操作中,两个数据集都比较大,其中只有几个Key的数据分布不均匀
6)JOIN操作中,两个数据集都比较大,有很多Key的数据分布不均匀
7)数据集中少数几个key数据量很大,不重要,其他数据均匀

解决方案一:聚合元数据

1)避免shuffle过程
绝大多数情况下,Spark作业的数据来源都是Hive表,这些Hive表基本都是经过ETL之后的昨天的数据为了避免数据倾斜,我们可以考虑避免shuffle过程,如果避免了shuffle过程,那么从根本上就消除了发生数据倾斜问题的可能

如果Spark作业的数据来源于Hive表,那么可以先在Hive表中对数据进行聚合,例如按照key进行分组,将同一key对应的所有value用一种特殊的格式拼接到一个字符串里去,这样,一个key就只有一条数据了;之后,对一个key的所有value进行处理时,只需要进行map操作即可,无需再进行任何的shuffle操作。通过上述方式就避免了执行shuffle操作,也就不可能会发生任何的数据倾斜问题。

对于Hive表中数据的操作,不一定是拼接成一个字符串,也可以是直接对key的每一条数据进行累计计算要区分开,处理的数据量大和数据倾斜的区别
2)缩小key粒度(增大数据倾斜可能性,降低每个task的数据量)
key的数量增加,可能使数据倾斜更严重
3)增大key粒度(减小数据倾斜可能性,增大每个task的数据量)
如果没有办法对每个key聚合出来一条数据,在特定场景下,可以考虑扩大key的聚合粒度
例如,目前有10万条用户数据,当前key的粒度是(省,城市,区,日期),现在我们考虑扩大粒度,将key的粒度扩大为(省,城市,日期),这样的话,key的数量会减少,key之间的数据量差异也有可能会减少,由此可以减轻数据倾斜的现象和问题。
(此方法只针对特定类型的数据有效,当应用场景不适宜时,会加重数据倾斜)

解决方案二:过滤导致倾斜的key

如果在Spark作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的key进行过滤,滤除可能导
致数据倾斜的key对应的数据,这样,在Spark作业中就不会发生数据倾斜了

解决方案三:提高shuffle操作中的reduce并行度

当方案一和方案二对于数据倾斜的处理没有很好的效果时,可以考虑提高shuffle过程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的数量,那么每个task分配到的数据量就会相应减少,由此缓解数据倾斜问题
1)reduce端并行度的设置
在大部分的shuffle算子中,都可以传入一个并行度的设置参数,比如reduceByKey(500),这个参数会决定shuffle过程中reduce端的并行度,在进行shuffle操作的时候,就会对应着创建指定数量的reduce task。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小
增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了

2)reduce端并行度设置存在的缺陷
提高reduce端并行度并没有从根本上改变数据倾斜的本质和问题(方案一和方案二从根本上避免了数据倾斜的发生),只是尽可能地去缓解和减轻shuw le reduce task的数据压力,以及数据倾斜的问题,适用于有较多key对应的数据量都比较大的情况

该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用在理想情况下,reduce端并行度提升后,会在一定程度上减轻数据倾斜的问题,甚至基本消除数据倾斜;但是,在一些情况下,只会让原来由于数据倾斜而运行缓慢的task运行速度稍有提升,或者避免了某些task的OOM问题,但是,仍然运行缓慢,此时,要及时放弃方案

解决方案四:使用随机key实现双重聚合

当使用了类似于groupByKey、reduceByKey这样的算子时,可以考虑使用随机key实现双重聚合,如下图所示
在这里插入图片描述
首先,通过map算子给每个数据的key添加随机数前缀,对key进行打散,将原先一样的key变成不一样的key,然后进行第一次聚合,这样就可以让原本被一个task处理的数据分散到多个task上去做局部聚合;
随后,去除掉每个key的前缀,再次进行聚合此方法对于由groupByKey、reduceByKey这类算子造成的数据倾斜由比较好的效果,仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuw le操作,还得用其他的解决方案。此方法也是前几种方案没有比较好的效果时要尝试的解决方案

解决方案五:将reduce join转换为map join

正常情况下,join操作都会执行shuw le过程,并且执行的是reduce join,也就是先将所有相同的key和对应的value汇聚到一个reduce task中,然后再进行join。普通join的过程如下图所示
在这里插入图片描述
普通的join是会走shuw le过程的,而一旦shuw le,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜

注意,RDD是并不能进行广播的,只能将RDD内部的数据通过collect拉取到Driver内存然后再进行广播
1)核心思路
不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
根据上述思路,根本不会发生shuffle操作,从根本上杜绝了join操作可能导致的数据倾斜问题
当join操作有数据倾斜问题并且其中一个RDD的数据量较小时,可以优先考虑这种方式,效果非常好。
map join的过程如下图所示
在这里插入图片描述
2)不使用场景分析
由于Spark的广播变量是在每个Executor中保存一个副本,如果两个RDD数据量都比较大,那么如果将一个数据量比较大的RDD做成广播变量,那么很有可能会造成内存溢出

解决方案六:sample采样对倾斜key单独进行join

在Spark中,如果某个RDD只有一个key,那么在shuw le过程中会默认将此key对应的数据打散,由不同的reduce端task进行处理当由单个key导致数据倾斜时,可有将发生数据倾斜的key单独提取出来,组成一个RDD,然后用这个原本会导致倾斜的key组成的RDD根其他RDD单独join,此时,根据Spark的运行机制,此RDD中的数据会在shuffle阶段被分散到多个task中去进行join操作。倾斜key单独join的流程如下图所示
在这里插入图片描述
1)适用场景分析
对于RDD中的数据,可以将其转换为一个中间表,或者是直接使用countByKey()的方式,看一个这个RDD中各个key对应的数据量,此时如果你发现整个RDD就一个key的数据量特别多,那么就可以考虑使用这种方法
当数据量非常大时,可以考虑使用sample采样获取10%的数据,然后分析这10%的数据中哪个key可能会导致数据倾斜,然后将这个key对应的数据单独提取出来
2)不适用场景分析
如果一个RDD中导致数据倾斜的key很多,那么此方案不适用

解决方案七:使用随机数以及扩容进行join

如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了,对于join操作,我们可以考虑对其中一个RDD数据进行扩容,另一个RDD进行稀释后再join
我们会将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,需要对整个RDD进行数据扩容,对内存资源要求很高
1)核心思想
选择一个RDD,使用flatMap进行扩容,对每条数据的key添加数值前缀(1~N的数值),将一条数据映射为多条数据;(扩容)
选择另外一个RDD,进行map映射操作,每条数据的key都打上一个随机数作为前缀(1~N的随机数);
(稀释)将两个处理后的RDD,进行join操作
在这里插入图片描述
2)局限性
如果两个RDD都很大,那么将RDD进行N倍的扩容显然行不通
使用扩容的方式只能缓解数据倾斜,不能彻底解决数据倾斜问题

3)使用方案七对方案六进一步优化分析
当RDD中有几个key导致数据倾斜时,方案六不再适用,而方案七又非常消耗资源,此时可以引入方案七的思想完善方案六
(1)对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key
(2)然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD
(3)接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD
(4)再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了
(5)而另外两个普通的RDD就照常join即可
(6)最后将两次join的结果使用union算子合并起来即可,就是最终的join结果

总结

如何定位Spark数据倾斜问题和解决方案介绍到这里啦~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/422207.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

谁才是天下第一关?

什么是关,中华大地有多少关? 关是往来必由之要处。“山川扼要,是设关津。表封藏,以达道路,天险既呈,人力并济”。 关可分为: 关防,驻兵防守的要塞;关津,水陆…

python笔记:qgrid

在Jupyter Notebook中像在Excel一样操作pandas的DataFrames,如sort/filter,并轻松把操作后的数据用于后续分析。 0 安装 pip install qgrid jupyter nbextension enable --py --sys-prefix qgrid 1 基本使用方法 1.1 数据 import numpy as np import…

Carla 保姆级安装教程

一:电脑配置 carla支持windows,Linux系统构建,官方对于安装电脑的最低配置要求是拥有6G显存的GPU,推荐8G显存的GPU,至少需要20G的存储空间,所有对电脑的配置要求是不小的挑战。 我所使用电脑的硬件配置:3…

3.7 曲率

学习目标: 如果我要学习高等数学中的曲率,我会遵循以下步骤: 1.熟悉相关的数学概念:在学习曲率之前,我们需要了解曲线、切线和曲率半径等相关的数学概念。因此,我会复习这些概念,以便更好地理…

网卡别名的设置

文章目录1. 网卡别名是什么2. 工作原理3. 设置3.1 临时添加,重启失效3.1.1 使用ipconfig命令来设置网卡别名3.1.2 使用ip addr命令来设置网卡别名3.2 永久性添加3.3 查看参考1. 网卡别名是什么 IP别名就是一张物理网卡上配置多个IP,实现类似子接口之类的…

制作PassMarkMemTest86启动U盘

制作PassMarkMemTest86启动U盘1. 概述2.制作 PassMarkMemTest86 启动U盘结束语1. 概述 PassMarkMenTest86 是一款免费、开源且强大的内存检测工具,能测试电脑内存的稳定性、存储大小和隐性问题,它还拥有 13 种不同的 RAM 测试算法,在主菜单中…

洛丽运动会 NFT 作品集第一弹

欢迎来到 2036 年洛丽运动会,这是一个以史前世界为背景的体育小游戏体验。为了庆祝这场伟大比赛的开始,结合了史前和运动配件的 NFT 系列将于北 The Sandbox 市场平台发布。 运动和格斗设备将提高你在运动会上的技能;而史前配件将使你与体育场…

Linux高并发服务器(webserver)

一.有限状态机 它的转移函数表示系统从一个状态转移到另一个状态的条件 二.EPOLL 在内核中创建一个数据,这个数据有两个比较重要的数据,一个是需要检测的文件描述符的信息(红黑树),一个双向链表,存放检测到…

Java类加载机制介绍

类加载机制的简单介绍 类加载机制是指将.class字节码文件读入到内存中。在运行时数据区中的方法区保留类的数据结构,在堆中创建一个与之对应的Class对象。 类的生命周期主要经历7个阶段:加载、验证、准备、解析、初始化、使用、卸载 其中从加载到初始化…

如何通俗易懂的解释无线通信中的那些专业术语!

这是一篇来自网络的非常经典的一篇老文,原作者不详,但非常值得一读! 香农定理 类比:城市道路上的汽车的车速和什么有关系?和道路的宽度有关系,和自己车的动力有关系,也其他干扰因素有关系&…

Unity接SDK - 极光推送

2021.09.09记录,2023发布,如有不对,还请包含。发晚了 如果想看Android原生接入JPush - SDK,移步Android原生集成JPush SDK_jpush android sdk v4.7.2 极光推送 - 接入 版本: Unity 2020.3.10f1 JPush - Unity 3…

linux系统安全及应用

目录一、账号安全控制1.1基本安全措施1.1.1系统账号的清理1.1.1.1将非登录用户的Shell设为/sbin/nologin1.1.1.2锁定长期不使用的账号1.1.1.3删除无用账号1.1.1.4锁定账号文件passwd、shadow1.1.2密码安全控制1.1.2.1设置密码有效期1.1.2.2要求用户下次登录时修改密码1.1.3命令…

服务端开发之Java秋招面试11

努力了那么多年,回头一望,几乎全是漫长的挫折和煎熬。对于大多数人的一生来说,顺风顺水只是偶尔,挫折、不堪、焦虑和迷茫才是主旋律。我们登上并非我们所选择的舞台,演出并非我们所选择的剧本。继续加油吧! 目录 1.MySQL的多版本并发控制具体实现过程?…

目标检测YOLO系列-YOLOVX运行步骤(推理、训练全过程)

下载项目:点击下载 进入项目根目录(通过cd命令) apex的安装与下载 下载apex git clone https://github.com/NVIDIA/apex进入apex目录 cd apex执行安装命令 python setup.py install首先安装相关的类库: pip install -i https://p…

深入学习MongoDB---1---入门篇+基础重点篇

MongoDB入门 MongDB作为NoSQL数据库之一,主要关注:灵活性、扩展性、高可用灵活性:NoSQL的特点就是反范式理论,为数据的水平扩展和字段的组织提供了巨大的便利高可用:天生就伴随副本集(从节点)的…

计数排序的实现

计数排序是非比较排序的一种,是对哈希直接定址法的变形应用,其操作步骤如下: 1.统计相同元素出现的次数。 2.根据统计结果将序列回收到原来的序列中。 拿一组重复元素较多的数组来举例子: 10 11 10 15 14 15…

Disruptor-源码解读

前言 Disruptor的高性能,是多种技术结合以及本身架构的结果。本文主要讲源码,涉及到的相关知识点需要读者自行去了解,以下列出: 锁和CAS伪共享和缓存行volatile和内存屏障 原理 此节结合demo来看更容易理解:传送门…

数云融合|新手入门,5分钟秒懂开源

目录一、开源软件开源领域的两大组织:FSF和OSI二、开源许可证开源意味着免费吗?三、开源技术应用领域四、总结一、开源软件 开源即开放源代码,他的核心是源代码公开,任何人都可以查看、使用、修改和分发。与之相对的是闭源&#…

js排序算法

排序算法 - jsjs交换两个值的三种方法方式1:算术运算方式2:ES6解构方式3:数组的特性冒泡排序实现思路图解bubbleSort参考视频选择排序实现思路图解selectionSort参考视频插入排序实现思路图解insertionSort参考视频js交换两个值的三种方法 方…

javaSccript---call()、 bind()、 apply()的区别

call()、apply()、bind() 都是用来重定义 this 这个对象的 语法: function.apply(thisArg, [argsArray])//argsArray 是一个可选的数组 function.call(thisArg, arg1, arg2, ...)//arg1、arg2、... 是将传递给函数的参数列表 function.bind(thisArg, arg1, arg2, ..…