SparkSql oom原因以及参数调优+数据倾斜解决方案

news2024/11/14 4:08:04

1、Spark历史版本对比

spark1 vs spark2 vs spark3 

1、spark1引入内存计算的理念,解决中间结果落盘导致的效率低下。在理想状况下性能可达到MR的100倍。虽然提高了一定的计算效率,但也带来了大量的内存管理问题,典型的如内存oom问题频发。

2、spark2引入了Tungsten引擎,关键算子效率上比Spark1提升了10倍。启用“统一内存管理”,不再使用“静态内存管理”,不再使用“静态内存管理”,oom问题大幅下降

3、spark3启用自适应查询(Adaptive Query Execution)

  • 1.动态合并shuffle partitions

    可以简化甚至避免调整shuffle分区的数量。用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的小分区合并为较大的分区。

  • 2.动态调整join策略

    在一定程度上避免由于缺少统计信息或着错误估计大小(当然也可能两种情况同时存在),而导致执行次优计划的情况。这种自适应优化可以在运行时sort merge join转换成broadcast hash join,从而进一步提升性能

  • 3.动态优化倾斜的join(skew joins)

    skew joins可能导致负载的极端不平衡,并严重降低性能。在AQE从shuffle文件统计信息中检测到任何倾斜后,它可以将倾斜的分区分割成更小的分区,并将它们与另一侧的相应分区连接起来。这种优化可以并行化倾斜处理,获得更好的整体性能。

Spark内存管理:https://zhuanlan.zhihu.com/p/642722131?utm_id=0

2、SparkSql oom的因素

sparksql oom的因素是多样的,一个程序,从输入、处理2个方面进行考量

2.1 输入数据因素

1、文件格式&压缩

目前我司新建表默认使用parquet格式

 对于snappy压缩,hivesql参考表的“parquet.compression”属性决定是否压缩,采用什么压缩方法,sparksql遇到parquet表默认snappy压缩。

  • parquet相比于text、csv格式普遍有3-4倍的压缩比
  • snappy相比于非压缩普遍有3-4倍的压缩比
  • parquet+snappy相比于原始的text、csv达到6-8倍的压缩比,甚至更高

2、数据文件的大小

  数仓集群默认的文件大小是256M,大数据是分布式计算,普遍存在倾斜不均的情形,加之程序自身的控制,造成了数据文件的大小不一。

  尽管spark有输入数据的拆分能力,但扔存在拆分不均匀的可能。例如:原始文件的大小为300M,将拆分为256M+44M 2个文件,他们的数据规模相差5.8倍。按照1个task处理一个文件的设定,分得256M文件的task将更容易出现oom

3、数据的型态分布

  如字段中包含大量的null、枚举字段大量的相同值等,parquet+snappy将导致实际压缩比进一步提升。

4、大字段如xml,json

  string字段类型,如其中存储的是xml、json等字段(字符长度达千、万字符数级的称为大字段),parquet+snappy导致实际压缩比进一步提升

2.2 运行环境因素

  环境因素也是oom的重要原因,表现在spark默认参数、yarn集群稳定性

1、spark默认参数

  定义在集群的spark.defaults中,每一个提交到集群的spark作业都默认使用这套参数。影响oom的主要参数:

  • spark.executor.cores=4 --executor 4cpu,可并行运行4 task

  • spark.executor.memory=12g --executor 12g内存,task间共享(不建议调整)

  • spark.executor.memoryOverhead=2g --executor 2g堆外内存,task间共享(不建议调整)

  • spark.sql.files.maxPartitionBytes=268435456(256m) --读取数据文件时,单个partition或task最大256m、文件超过256m即拆分

  • spark.sql.adaptive.coalescePartitions.initialPartitionNum=none --aqe动态计算shuffle阶段partitions的最大值,none时向下取spark.sql.shuffle.partitions

  • spark.sql.shuffle.partitions=400 --shuffle阶段最大400个partition,数据规模很大建议调整

  在spark ui 中可以观察任务的具体参数值

2、yarn集群稳定性

  yarn集群稳定性包括yarn资源调度、yarn节点负载高低等。

3、Spark sql的参数调优

1、input阶段

  input阶段将表或数据文件加载到executor,先进行解压(如果有,比如snappy)、展开(比如parquet、orc格式),这些都是在内存中进行,内存不足时是不可溢写磁盘的。

  • 数据文件(拆分后大小差异导致oom)

  • 数据文件(拆分后)大小相似,大JSON字段、高压缩比导致oom

 2、shuffle阶段

  当sql出现出现distinct join、groupby等关键字时,spark需要进行shuffle操作。shuffle操作将前一个或多个阶段的分片数据拉取到同一个excutor进行聚合处理(即我们通常说的reduce),如shuffle partition或者说task数过小,oom是大概率时间。

  • shuffle阶段概要:多少个task执行读/写操作

  •  shuffle阶段详情:读/写是否均匀、是否倾斜

 3.1 参数调优

    目前executor默认配置的是12G+2G内存、4核CPU,最大可以有4个task同时并行(每颗CPU执行一个task),这4个task共享14G内存。日常调优可以如下:

  1、spark.executors.cores

    不论在哪个阶段发生oom,降低executor的负载总是可行的。可调整spark.executor.cores=2,即单个executor同时执行2个task,这样每个task大约可使用7G内存。

  2、spark.sql.adaptive.coalescePartitions.initialPartitionNum/spark.sql.shuffle.partitions

    shuffle阶段的oom,说明shuffle partitions数比较小,导致上阶段的“大规模数据”在本阶段“更聚集”。可调整该参数为更大的值,使大规模数据更分散。

  • spark1,没有spark.sql.adaptive.coalescePartitions.initialPartitionNum参数,固定partitions值为400,调大会造成碎片化
  • spark2,没有spark.sql.adaptive.coalescePartitions.initialPartitionNum参数,可调整spark.sql.shuffle.partitions(配合AQE效果不错)
  • spark3,优先调整spark.sql.adaptive.coalescePartitions.initialPartitionNumtions(是AQE的系列参数之一,默认none时向下取spark.sql.shuffle.partitions)

    *注意:更大的shuffle partitions数固然能减轻shuffle阶段的oom概率,但需要向集群申请更多的服务器资源,不可无限调大(建议控制在2000以内)

方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个 key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。

3、spark.sql.files.maxPartitionBytes

  对于input阶段出现的oom,通常原因是输入数据的超高压缩比,如:json、xml大字段、null值占比高、枚举字段多等。

  spark.executor.cores值仍不起作用时,可逐步调小spark.sql.files.maxPartitionBytes=134217728(128m)/67108864(64m)

  调小spark.executor.cores、spark.sql.files.maxPartitionBytes值的本质都是减小executor的负载,spark.executor.cores减少了task的并行度,而spark.sql.files.maxPartitionBytes减小了单个task处理的数据规模。所以:

  •   优先调整spark.executor.cores,验证是否扔有oom
  •   确认不起作用后,再调整spark.sql.files.maxPartitionBytes,此时建议恢复spark.executor.cores的值(如不恢复,相当于2*2=4倍的负载降低、对于单executor14g的内存资源是巨大浪费)

  *注意:spark.sql.files.maxPartitionBytes不建议调得过小(比如32m、16m),过小需要向集群申请更多executor及计算资源、集群负载增大,也可能导致数据文件碎片化

4、数据倾斜解决方案

https://download.csdn.net/blog/column/8735546/111667492

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1946350.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第124天:内网安全-代理 Sockets协议路由不出网后渗透通讯CS-MSF 控制上线

目录 思维导图 环境配置 案例一:网络通讯&控制上线--CS-路由添加&节点建立&协议生成&正反连接 案例二:网络通讯&控制上线--MSF-路由添加&节点建立&协议生成&正反连接 思维导图 环境配置 这里由于系统内存问题我只设…

79页PDF免费下载 | 全域数字化转型评估模型研究报告

一、前言: 随着数字技术的飞速发展,零售行业正站在转型的十字路口。如何在变革中找到方向,如何通过数字化转型提升企业竞争力,已成为每个零售企业必须面对的课题。腾讯智慧零售与伏羲智库深度合作,推出《2024年全域数…

【Python selenium过极验滑块】用自动化selenium 操作GEETEST极验滑块,简单粗暴

文章日期:2024.07.24 使用工具:Python 文章类型:自动化过极验滑块 文章全程已做去敏处理!!! 【需要做的可联系我】 AES解密处理(直接解密即可)(crypto-js.js 标准算法&…

使用FileZilla Cilent快速让手机与电脑进行文件互传(无需生态)

目录 前言使用 FileZilla笔者的话 前言 当设备多的时候文件的传输就成了一种问题。 就比如说我想将手机上的文件传到电脑里面,因为我使用的电脑跟我的手机不是一个生态的,它们唯一的联系或许就是连接到了统一 WIFI 下,也就是说它们在同一个…

Redis与MySQL的数据一致性问题

目录 一、策略模式 1、旁路缓存模式(Cache Aside Pattern) 2、读写穿透(Read-Through/Write-Through) 3、异步缓存写入(Write Behind) 二、一致性解决方案 1、缓存延迟双删 2、删除重试机制 3、读取…

【概率论】-2-概率论公理(Axioms of Probability)

上一篇文章我们学习了基本的概率论内容-排列组合,本次我们学习概率论公理的内容,正式开始计算概率,在开始前我们需要学习一些基本概念。 目录 一.样本空间和事件 1.样本空间 2.事件 3.交并补 二、概率公理 1.基本公理 2.对称差 2.布尔…

Mysql中(基于GTID方式)实现主从复制,单主复制详细教程

🏡作者主页:点击! 🐧Linux基础知识(初学):点击! 🐧Linux高级管理防护和群集专栏:点击! 🔐Linux中firewalld防火墙:点击! ⏰️创作…

Android Studio 一键删除 Recent Projects信息的方法

Android Studio打开项目多了就一堆最近项目的记录,在IDE里面只能一个个手动删除。 File - Recent Projects 解决方案:修改配置文件 Note:方法不唯一。 Android Studio 存储了一个包含最近打开项目信息的配置文件。通过手动编辑或删除recentP…

代码随想录算法训练营day7 | 454.四数相加II、383.赎金信、15.三数之和、18.四数之和

文章目录 454.四数相加II思路 383.赎金信思路 15.三数之和思路剪枝去重 18.四数之和思路剪枝去重复习:C中的类型转换方法 总结 今天是哈希表专题的第二天 废话不多说,直接上题目 454.四数相加II 建议:本题是 使用map 巧妙解决的问题&#x…

Pytorch使用教学1-Tensor的创建

0 导读 在我们不知道什么是深度学习计算框架时,我们可以把PyTorch看做是Python的第三方库,在PyTorch中定义了适用于深度学习的张量Tensor,以及张量的各类计算。就相当于NumPy中定义的Array和对应的科学计算方法,正是这些基本数据…

JVM系列(三) -类加载器及双亲委派模型介绍

在之前的文章中,介绍了类的加载过程中,我们有提到在加载阶段,通过一个类的全限定名来获取此类的二进制字节流操作,其实类加载器就是用来实现这个操作的。 在虚拟机中,任何一个类,都需要由加载它的类加载器…

【ffmpeg命令入门】添加水印

文章目录 前言什么是水印?为什么要添加水印?ffmpeg添加水印添加图片水印添加文字水印基本使用方法drawtext的参数 总结 前言 在视频制作和编辑的过程中,添加水印是一个常见且重要的步骤。水印不仅可以保护版权,还能用于品牌宣传和…

netty入门-4 Channel与ChannelFuture

Channel 基本类似于NIO中的Channel概念。作为读写数据的通道。 常见方法 close() 可以用来关闭 channelcloseFuture() 用来处理 channel 的关闭 sync 方法作用是同步等待 channel 关闭而 addListener 方法是异步等待 channel 关闭 pipeline() 方法添加处理器write() 方法将数…

Stable Diffusion基本原理通俗讲解

Stable Diffusion是一种基于深度学习的图像生成技术,它属于生成对抗网络(GANs)的一种。简单来说,Stable Diffusion通过训练一个生成器(Generator)和一个判别器(Discriminator)&#…

算法力扣刷题记录 五十八【701.二叉搜索树中的插入操作】

前言 本文是二叉搜索树操作。 二叉树篇继续。 一、题目阅读 给定二叉搜索树(BST)的根节点 root 和要插入树中的值 value ,将值插入二叉搜索树。 返回插入后二叉搜索树的根节点。 输入数据 保证 ,新值和原始二叉搜索树中的任意节…

【常见开源库的二次开发】基于openssl的加密与解密——SHA算法源码解析(六)

目录 一、SHA-1算法分析: 1.1 Merkle Tree可信树 1.2 源码实现: 1.3 哈希计算功能 1.4 两种算法的区别: 1.4.1 目的 1.4.2 实现机制 1.4.3 输出 1.4.4 应用场景: 1.4 运行演示: 二、SHA-2算法分析: 2.1哈…

【ESP32S3学习笔记】与有人AP520X路由器连接失败的问题

项目场景: 提示:这里简述项目相关背景: 项目上新换了个路由器,结果发现ESP32模组连接不上,其他的路由器都正常。 问题描述 提示:这里描述项目中遇到的问题: 对比log发现有问题的时候&#x…

智慧大棚数据库版

创建一个SMartBigHouse数据库 在数据库创建一个表用来存储数据 这边将id设为主键并将标识增量设为1 搭建Winfrom 搭建历史查询界面 串口数据,(这边是用的一个虚拟的串口工具,需要的话私) ModbusSerialMaster master;DataPointCollection wenduValues; //…

Win10使用VS Code远程连接Ubuntu服务器时遇到SSH公钥错误的解决方案

在使用Windows 10上的Visual Studio Code(VS Code)远程连接Ubuntu 20.04服务器时,遇到了以下错误: 错误的原因 这个错误消息表明,SSH 客户端检测到远程主机的 ECDSA 公钥已更改。可能是由于以下原因之一&#xff1a…

python—NumPy的基础(2)

文章目录 一维数组索引和切片一维数组索引和切片的使用一维数组负索引和切片的使用 二维数组的索引和切片索引直接获取使用坐标获取数组[x,y]二维数组负索引的使用切片数组的复制 改变数组的维度改变数组的维度 数组的拼接列表的拼接一维数组的拼接二维数组的拼接vstack 与hsta…