Spark join数据倾斜调优

news2025/1/3 11:40:29

Spark中常见的两种数据倾斜现象如下

  • stage部分task执行特别慢

在这里插入图片描述

一般情况下是某个task处理的数据量远大于其他task处理的数据量,当然也不排除是程序代码没有冗余,异常数据导致程序运行异常。

  • 作业重试多次某几个task总会失败
    在这里插入图片描述

常见的退出码143、53、137、52以及heartbeat timed out异常,通常可认为是executor内存被打满。

RDD调优方法

  1. 查看数据分布
    Spark Core中shuffle算子出现数据倾斜时,可在Spark作业中加入查看key分布的代码,也可以将代码拆解出来使用spark-shell做测试
val rdd = sc.parallelize(Array("hello", "hello", "hello", "hi")).map((_,1))

// 数据量较少
rdd.reduceByKey(_ + _)
.sortBy(_._2, false)
.take(20)
// 数据量较大, 用sample采样后在统计
rdd.sample(false, 0.1)
.reduceByKey(_+_)
.sortBy(_._2, false)
.take(20)
  1. 调整shuffle并行度
    原理:Spark在做shuffle时,默认使用HashPartitioner(非Hash Shuffle)对数据进行分区。如果并行度设置的不合适如比较小,可能造成大量不相同的key对应的数据被分配到了同一个task上,造成该task所处理的数据远大于其它task,从而造成数据倾斜
    在这里插入图片描述

调优建议:

  • 使用spark.default.parallelism调整分区数,默认值200建议500或更大
  • 在shuffle的算子上直接设置分区数,如:a.join(b, 500)、rdd.reduceByKey(_ + _, 500)
  1. reduce join转map join
    原理:不使用join算子直接进行连接操作,而使用broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的出现
    在这里插入图片描述

调优建议:

  • broadcast的数据量不要超过500M, 过大driver/executor可能会oom
// 1.broadcast小表
val rdd1Broadcast = sc.broadcast(rdd1.collect())
// 2.map join
rdd2.map { x =>
  val rdd1DataMap = rdd1Broadcast.value.toMap
  rdd1DataMap.get(x._1) match {
    case Some(v) => (x._1, (x._2, v))
    case None => (x._1, (x._2, null))
  }
}
// 2.或者直接
rdd2.join(rdd1Broadcast)
  1. 分拆join在union
    原理:将有数据倾斜的RDD1中倾斜key对应的数据集单独抽取出来加盐(随机前缀),另外一个RDD2每条数据分别与所有的随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者join之后去掉前缀;然后将不包含倾斜key的剩余数据进行join;最后将两次join的结果集通过union合并,即可得到全部join结果。
    在这里插入图片描述

调优建议:

// 1.统计数量最大的key
val skewedKeySet = rdd1.sample(false, 0.2)
  .reduceByKey(_ + _)
  .sortBy(_._2, false)
  .take(10)
  .map(x => x._1)
  .toSet

// 2.拆分异常的rdd, 倾斜key加上随机数
val rdd1_1 = rdd1.filter(x => skewedKeySet.contains(x._1)).map { x =>
  val prefix = scala.util.Random.nextInt(10).toString
  (s"${prefix}_${x._1}", x._2)
}
val rdd1_2 = rdd1.filter(x => !skewedKeySet.contains(x._1))

// 3.正常rdd存在倾斜key的部分进行膨胀
val rdd2_1 = rdd2.filter(x => skewedKeySet.contains(x._1))
  .flatMap { x =>
    val list = 0 until 10
    list.map(i => (s"${i}_${x._1}", x._2))
  }

val rdd2_2 = rdd2.filter(x => !skewedKeySet.contains(x._1))

// 4.倾斜key的rdd进行join
val skewedRDD = rdd1_1.join(rdd2_1).map(x => (x._1.split("_")(1), x._2))
// 5.普通key的rdd进行join
val sampleRDD = rdd1_2.join(rdd2_2)
// 6.结果union
skewedRDD.union(sampleRDD)

SQL调优方法

  1. 查看数据分布
    统计某个查询结果或表中出现次数超过200次的key
WITH a AS (
            ${query}
        )
SELECT k,s
FROM (
        SELECT ${key} AS k,count(*) AS s
        FROM a
        GROUP BY ${key}
)
WHERE s > 200
  1. 自动调整shuffle并行度
    原理:自适应执行开启的前提下(AQE),假设我们设置的shuffle partition个数为5,在map stage结束之后,我们知道每一个partition的大小分别是70MB,30MB,20MB,10MB和50MB。假设我们设置每一个reducer处理的目标数据量是64MB,那么在运行时,我们可以实际使用3个reducer。第一个reducer处理partition 0 (70MB),第二个reducer处理连续的partition 1 到3,共60MB,第三个reducer处理partition 4 (50MB)
    在这里插入图片描述

Spark参数:

参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.coalescePartitions.minPartitionNum自适应执行中使用的最小shuffle后分区数,默认值executor*core数
spark.sql.adaptive.coalescePartitions.initialPartitionNum合并前的初始shuffle分区数量,默认值spark.sql.shuffle.partitions
spark.sql.adaptive.advisoryPartitionSizeInBytes合并小分区到建议的目标值, 默认256m
spark.sql.shuffle.partitionsjoin等操作分区数,默认值200推荐500或更大
  1. 自动优化Join
    原理:自适应执行开启的前提下(AQE),我们可以获得SortMergeJoin两个子stage的数据量,在满足条件的情况下,即一张表小于broadcast阈值,可以将SortMergeJoin转化成BroadcastHashJoin
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.autoBroadcastJoinThreshold默认10M,设置为-1可以禁用广播;实际根据hive表存储的统计信息或文件预估大小与此值做判断看是否做broadcast,由于文件是压缩格式一般情况下此参数并不可靠建议膨胀系数spark.sql.sources.fileCompressionFactor=10推荐此参数保持默认,调整自适应的broadcast参数
spark.sql.adaptive.autoBroadcastJoinThreshold此参数仅影响自适应执行阶段join优化时broadcast阈值;设置为-1可以禁用广播;默认值spark.sql.autoBroadcastJoinThreshold自适应执行得到的数据比较准确,driver内存足够的前提下可以将此值调大如200M
  1. 自动处理数据倾斜
    原理:自适应执行开启的前提下(AQE),我们可以在运行时很容易地检测出有数据倾斜的partition。当执行某个stage时,我们收集该stage每个mapper 的shuffle数据大小和记录条数。如果某一个partition的数据量或者记录条数超过中位数的N倍,并且大于某个预先配置的阈值,我们就认为这是一个数据倾斜的partition,需要进行特殊的处理
    在这里插入图片描述
参数说明推荐值
spark.sql.adaptive.enabled开启自适应执行线上默认值true
spark.sql.adaptive.skewJoin.enabled开启自动解决数据倾斜,默认值true
spark.sql.adaptive.skewJoin.skewedPartitionFactor影响因子,某分区数据大小超过所有分区中位数与影响因子乘积,才会被认为发生了数据倾斜
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes视为倾斜分区的分区数据最小值

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1880226.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C语言 || 数据结构】快速排序

文章目录 前言快速排序1.快排的前后指针法1.1快排的前后指针法的代码实现1.2快排的前后指针法的注意事项 2.快排的挖坑法2.1快排的挖坑法的代码实现2.2快排的挖坑法的注意事项 3.快排的hoare法3.1快排的hoare法的代码实现3.2快排的hoare法的注意事项 4快排的优化4.1快排的三数取…

恢复机制-数据库系统中的故障(事务故障、系统故障、介质故障)、一致性错误、窃取但不强制的缓冲区管理策略

一、引言 数据库管理系统DBMS的事务处理技术实现的一个主要功能部分就是恢复机制,恢复机制完成的功能就是对发生故障后系统中事务的更新结果进行数据恢复,保证事务的原子性和持久性,从而进一步保证数据库的一致性。 数据库系统与其他计算机系…

办公开源利器:ONLYOFFICE

目录 0、引子:一、ONLYOFFICE协作空间1.可集成至Web应用程序2.多种协作方式3.快捷的AI助手4.公共房间:连接第三方存储空间5.集成6.开发人员工具7.用插件拓展功能 二、新增功能1.功能全面的PDF编辑2.PDF 表单3.文本文档编辑器4.电子表格编辑器 三、结语 0…

EasyExcel数据导入

前言: 我先讲一种网上信息的获取方式把,虽然我感觉和后面的EasyExcel没有什么关系,可能是因为这个项目这个操作很难实现,不过也可以在此记录一下,如果需要再拆出来也行。 看上了网页信息,怎么抓到&#x…

【操作系统】进程管理——进程的概念、组成和特征(个人笔记)

学习日期:2024.6.29 内容摘要:进程的基本概念和特征、状态和转换 进程的概念 程序与进程 程序:是静态的,是存放在磁盘里的可执行文件,就是一系列的指令集合 进程(Process):是动态…

一文带你了解乐观锁和悲观锁的本质区别!

文章目录 悲观锁是什么?乐观锁是什么?如何实现乐观锁?什么是CAS应用局限性ABA问题是什么? 悲观锁是什么? 悲观锁它总是假设最坏的情况,它会认为共享资源在每次被访问的时候就会出现线程安全问题&#xff0…

primeflex overflow样式类相关的用法和案例

文档地址&#xff1a;https://primeflex.org/overflow 案例1 <script setup> import axios from "axios"; import {ref} from "vue";const message ref("frontend variable") axios.get(http://127.0.0.1:8001/).then(function (respon…

库存管理系统基于spingboot vue的前后端分离仓库库存管理系统java项目java课程设计java毕业设计

文章目录 库存管理系统一、项目演示二、项目介绍三、部分功能截图四、部分代码展示五、底部获取项目源码&#xff08;9.9&#xffe5;带走&#xff09; 库存管理系统 一、项目演示 库存管理系统 二、项目介绍 基于spingboot和vue前后端分离的库存管理系统 功能模块&#xff…

如何利用python画出AHP-SWOT的战略四边形(四象限图)

在企业或产业发展的相关论文分析中&#xff0c;常用到AHP-SWOT法进行定量分析&#xff0c;形成判断矩阵后&#xff0c;如何构造整洁的战略四边形是分析的最后一个环节&#xff0c;本文现将相关代码发布如下&#xff1a; import mpl_toolkits.axisartist as axisartist import …

chrome.storage.local.set 未生效

之前chrome.storage.local.set 和 get 一直不起作用 使用以下代码运行成功。 chrome.storage.local.set({ pageState: "main" }).then(() > {console.log("Value is set");});chrome.storage.local.get(["pageState"]).then((result) > …

java之命令执行审计思路

1 漏洞原理 因用户输入未过滤或净化不完全&#xff0c;导致Web应用程序接收用户输入&#xff0c;拼接到要执行的系统命令中执行。一旦攻击者可以在目标服务器中执行任意系统命令&#xff0c;就意味着服务器已被非法控制。 2 审计中常用函数 一旦攻击者可以在目标服务器中执行…

2024 Parallels Desktop for Mac 功能介绍

Parallels Desktop的简介 Parallels Desktop是一款由Parallels公司开发的桌面虚拟化软件&#xff0c;它允许用户在Mac上运行Windows和其他操作系统。通过强大的技术支持&#xff0c;用户无需重新启动电脑即可在Mac上运行Windows应用程序&#xff0c;实现了真正的无缝切换。 二…

基于LangChain+LLM的本地知识库问答:从企业单文档问答到批量文档问答

前言 过去半年&#xff0c;随着ChatGPT的火爆&#xff0c;直接带火了整个LLM这个方向&#xff0c;然LLM毕竟更多是基于过去的经验数据预训练而来&#xff0c;没法获取最新的知识&#xff0c;以及各企业私有的知识 为了获取最新的知识&#xff0c;ChatGPT plus版集成了bing搜索…

11_电子设计教程基础篇(磁性元件)

文章目录 前言一、电感1、原理2、种类1、制作工艺2、用途 3、参数1、测试条件2、电感量L3、品质因素Q4、直流电阻&#xff08;DCR&#xff09;5、额定电流6、谐振频率SRF&#xff08;Self Resonant Frequency&#xff09;7、磁芯损耗 4、应用与选型 二、共模电感1、原理2、参数…

第一周:李宏毅机器学习笔记

第一周学习周报 摘要一、机器学习基础理论1. 什么是机器学习&#xff1f;2. 机器学习“寻找”的函数有哪些类型&#xff1f;3. 机器学习中机器如何“寻找”函数&#xff1f;三步走3.1 第一步&#xff1a;设定函数的未知量&#xff08;Function with Unknown Parameters&#xf…

大多数博客首页都在使用的文字打字机出现效果

打字机效果展示 原理步骤 初步框架 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</…

vue使用axios获取信息的案例

List组件&#xff08;用来展示搜索的信息&#xff09; <template><div class"row"><!-- 列表数据 --><div class"card" v-for"user in info.users" :key"user.login" v-show"info.users.length">&l…

在 Windows 下使用 Linux 命令的多种方法

在 Windows 操作系统上使用 Linux 命令行工具&#xff0c;对于许多开发者和系统管理员来说是一个常见的需求。特别是对于那些习惯于 Linux 命令行的用户来说&#xff0c;Windows 自带的 CMD 和 PowerShell 可能并不满足他们的需求。虽然 Windows Subsystem for Linux (WSL) 是一…

【JavaEE】多线程代码案例(1)

&#x1f38f;&#x1f38f;&#x1f38f;个人主页&#x1f38f;&#x1f38f;&#x1f38f; &#x1f38f;&#x1f38f;&#x1f38f;JavaEE专栏&#x1f38f;&#x1f38f;&#x1f38f; &#x1f38f;&#x1f38f;&#x1f38f;上一篇文章&#xff1a;多线程&#xff08;2…