Spark SQL生产优化经验--任务参数配置模版

news2025/1/11 14:14:59

大表扫描

特殊case说明:当任务存在扫event_log表时需注意,若对event_log表进行了过滤,且过滤比很高,如下图的case,input为74T,但shuffle write仅为3.5G,那么建议提高单partition的读取数据量,将参数set spark.sql.files.maxPartitionBytes=536870912提高10倍至5368709120;
在这里插入图片描述

小型任务

目前测试:在不手动添加任何参数、平均时长在30min以内、单个shuffle 量在500G以下的任务可以使用该模版,但实际任务情况还需跟踪观察
开启AQE后,spark.sql.shuffle.partitio参数失效(spark默认200,任务从头到尾的所有shuffle都是这个并行度,无法自由操控)
AQE:实现原理是在每个stage完成后再启动一个子Job来计算shuffle中间结果量,依此来进行调节task个数,让作业根据每次shuffle的数据量自行调节并行度。
spark.sql.adaptive.minNumPostShufflePartitions 控制所有阶段的reduce个数,reduce个数区间最小值,为了防止分区数过小
spark.sql.adaptive.maxNumPostShufflePartitions reduce个数区间最大值,同时也是shuffle分区数的初始值
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 动态调整 reduce 个数的 partition 大小依据。如设置 64MB,则 reduce 阶段每个 task 最少处理 64MB 的数据。默认值为 64MB。

--基础资源
set spark.driver.memory=15g;
set spark.driver.cores=3;
set spark.driver.memoryOverhead=4096;
set spark.executor.memory=5G;
set spark.executor.memoryOverhead=1024;
set spark.executor.cores=2;
set spark.vcore.boost.ratio=2;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=300;
--ae,shuffle partition并行度
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=1000;
--268435456;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=536870912;
--开启parquet切分
set spark.sql.parquet.adaptiveFileSplit=true;
--初始task调节,合并小文件
set spark.sql.files.maxPartitionBytes=536870912;

中型任务

目前测试:在不手动添加任何参数、平均时长在90min以内、单个shuffle 量在2T以下的任务可以使用该模版,但实际任务情况还需跟踪观察。
spark.executor.memoryOverhead 每个executor的堆外内存大小,堆外内存主要用于数据IO,对于报堆外OOM的任务要适当调大,单位Mb,与之配合要调大executor JVM参数,例如:set spark.executor.memoryOverhead=3072
set spark.executor.extraJavaOptions=-XX:MaxDirectMemorySize=2560m

--基础资源
set spark.driver.memory=25g;
set spark.driver.cores=4;
set spark.driver.memoryOverhead=5120;
set spark.executor.memory=10G;
set spark.executor.memoryOverhead=4096;
set spark.executor.cores=3;
set spark.vcore.boost.ratio=1;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=600;
--AQE
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=1000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize= 536870912;
--开启parquet切分,初始task调节,合并小文件
set spark.sql.parquet.adaptiveFileSplit=true;
set spark.sql.files.maxPartitionBytes=536870912;
--推测
set spark.speculation.multiplier=2.5;
set spark.speculation.quantile=0.8;
--shuffle 落地hdfs
set spark.shuffle.hdfs.enabled=true;
set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;

大型任务

目前测试:在不手动添加任何参数、平均时长在120min以内、单个shuffle 量在10T以下的任务可以使用该模版,但实际任务情况还需跟踪观察。

--基础资源
set spark.driver.memory=25g;
set spark.driver.cores=4;
set spark.driver.memoryOverhead=5120;
set spark.executor.memory=15G;
set spark.executor.memoryOverhead=3072;
set spark.executor.cores=3;
set spark.vcore.boost.ratio=1;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=900;
--ae
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=3000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize= 536870912;
--shuffle 落地hdfs
set spark.shuffle.hdfs.enabled=true;
set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;
--开启parquet切分,合并小文件
set spark.sql.parquet.adaptiveFileSplit=true;
set spark.sql.files.maxPartitionBytes=536870912;
--推测
set spark.speculation.multiplier=2.5;
set spark.speculation.quantile=0.9;

超大型任务

目前测试:在不手动添加任何参数、平均时长大于120min、单个shuffle 量在10T以上的任务可以使用该模版,但实际任务情况还需跟踪观察。

--基础资源
set spark.driver.memory=30g;
set spark.driver.cores=4;
set spark.driver.memoryOverhead=5120;
set spark.executor.memory=20G;
set spark.executor.memoryOverhead= 5120;
set spark.executor.cores=5;
set spark.vcore.boost.ratio=1;
--动态executor申请
set spark.dynamicAllocation.minExecutors=10;
set spark.dynamicAllocation.maxExecutors=1500;
--ae
set spark.sql.adaptive.minNumPostShufflePartitions=10;
set spark.sql.adaptive.maxNumPostShufflePartitions=7000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize= 536870912;
--开启parquet切分,合并小文件
set spark.sql.parquet.adaptiveFileSplit=true;
set spark.sql.files.maxPartitionBytes=536870912;
-- shuffle 落地 hdfs,shuffle文件上传hdfs
set spark.shuffle.hdfs.enabled=true;
set spark.shuffle.io.maxRetries=1;
set spark.shuffle.io.retryWait=0s;
--推测
set spark.speculation.multiplier=2.5;
set spark.speculation.quantile=0.9;

其他常用参数

--ae hash join
set spark.sql.adaptive.hashJoin.enabled=true;
set spark.sql.adaptiveHashJoinThreshold=52428800;
--输出文件合并 byBytes,该功能会生成两个stage,
--第一个stage shuffle的数据量来预估最后生成到hdfs上的文件数据量大小,
--并通过预估的文件数据量大小计算第二个stage的并行度,即最后生成的文件个数。
--该功能只能控制生成的文件大小尽量接近spark.merge.files.byBytes.fileBytes,且有一定的性能损耗,需根据实测情况选择使用。    
-- 最终文件数量:(totalBytes / fileBytes / compressionRatio).toInt + 1                       
set spark.merge.files.byBytes.enabled=true;
set spark.merge.files.byBytes.repartitionNumber=100;  --第一个stage的并行读
set spark.merge.files.byBytes.fileBytes=134217728;		-- 预期的文件大小
set spark.merge.files.byBytes.compressionRatio=3;		-- 压缩比,shuffle文件和最后生成的文件格式和压缩格式都不相同,因此通过该参数调节
--输出文件合并  该功能会在原来job的最后一个stage后面增加1个stage来控制最后生成的文件数量,
--对于动态分区,每个分区生成spark.merge.files.number个文件。
spark.merge.files.enabled=true            
spark.merge.files.number=512
--skew_join 解析绕过tqs
set tqs.analysis.skip.hint=true;
--初始task上限
set spark.sql.files.openCostInBytes=4194304;
set spark.datasource.splits.max=20000;
--broadcast时间
set spark.sql.broadcastTimeout = 3600;
--(防止get json报错)
set spark.sql.mergeGetMapValue.enabled=true;

--ae 倾斜处理 HandlingSkewedJoin  OptimizeSkewedJoin
set spark.sql.adaptive.allowBroadcastExchange.enabled=true;
set spark.sql.adaptive.hashJoin.enabled=false;
set spark.sql.adaptive.skewedPartitionFactor=3;
set spark.sql.adaptive.skewedPartitionMaxSplits=20;
set spark.sql.adaptive.skewedJoin.enabled=true;
set spark.sql.adaptive.skewedJoinWithAgg.enabled=true;
set spark.sql.adaptive.multipleSkewedJoin.enabled=true;
set spark.shuffle.highlyCompressedMapStatusThreshold=20000;

--并发读文件
set spark.sql.concurrentFileScan.enabled=true;
--filter按比例读取文件
set spark.sql.files.tableSizeFactor={table_name}:{filter 比例};
set spark.sql.files.tableSizeFactor=dm_content.tcs_task_dict:10;
--AM failed 时长
set spark.yarn.am.waitTime=200s;
--shuffle service 超时设置
set spark.shuffle.registration.timeout=12000;
set spark.shuffle.registration.maxAttempts=5;
--parquet index 特性生效,in 条件的个数
set spark.sql.parquet.pushdown.inFilterThreshold=30; 

--设置engine
set tqs.query.engine.type=sparkcli;

--hive metastore 超时
spark.hadoop.hive.metastore.client.socket.timeout=600

--manta备用
spark.sql.adaptive.maxNumPostShufflePartitions 5000
spark.executor.memoryOverhead 8000
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 536870912

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/709583.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis数据库高可用之RDB和AOF持久化

Redis数据库高可用、RDB和AOF持久化、性能管理 一、Redis 高可用二、Redis 持久化Ⅰ、持久化的功能Ⅱ、两种持久化方式Ⅲ、RDB 和 AOF 的区别 三、RDB 持久化Ⅰ、触发条件Ⅱ、执行流程 四、AOF持久化Ⅰ、开启AOFⅡ、执行流程 五、RDB 和 AOF 的优缺点Ⅰ、RDB 持久化Ⅱ、AOF 持久…

金鹰优化算法,附MATLAB代码,直接复制

金鹰优化算法(golden eagle optimizer, GEO)是于2020年提出的新型智能优化算法,该算法建立于金鹰个体 捕食过程中的巡航和攻击行为之上,通过平衡两者关系,帮助函数寻找最优值,已经在许多方面得到了应用。 关…

Google 将为高端 Chromebook 推出独立品牌

说起 Chromebook,一般大家的第一印象就是价格便宜、配置不高、做工普通,所选的材料也都是以塑料为主,产品主打的市场也是学生和教育群体。在不少人看来,Chromebook 就是一个配备了功能齐全的浏览器,外加一定的文件管理…

GOLANG进阶:Viper,Mysql,Swagger

GOLANG从浅入深必须学习的一些工具包 1.Viper: Viper 是一个完整的 Go 应用程序配置解决方案,优势就在于开发项目中你不必去操心配置文件的格式而是让你腾出手来专注于项目的开发。其特性如下: 支持 JSON/TOML/YAML/HCL/envfile/Java proper…

数据结构--栈在函数递归中的调用

数据结构–栈在函数递归中的调用 void func2(int x) {int n, m;//... }void func1(int a, int b) {int x;//...func2(x);x 5201314;//... }int main() {int a, b, c;//...func1(a, b);//... }函数调用的特点:最后被调用的函数最先执行结束(LIFO) 函数调用时,需要用…

【人工智能与机器学习】基于卷积神经网络CNN的猫狗识别

文章目录 1 引言2 卷积神经网络概述2.1 卷积神经网络的背景介绍2.2 CNN的网络结构2.2.1 卷积层2.2.2 激活函数2.2.3 池化层2.2.4 全连接层 2.3 CNN的训练过程图解2.4 CNN的基本特征2.4.1 局部感知(Local Connectivity)2.4.2 参数共享(Parameter Sharing)…

顶点数据加入颜色数据

顶点着色器代码: #version 330 core layout(location 0) in vec3 aPos; layout(location 1) in vec3 aColor; out vec3 ourColor; void main(){gl_Position vec4(aPos.x, aPos.y, aPos.z, 1.0f);ourColoraColor; }片段着色器代码: #version 330 cor…

11-切片有什么用?【视频版】

目录 问题视频解答 问题 视频解答 点击观看: 11-切片有什么用?

学习 vue3版本

文章目录 创建各种函数setup注意点 ref函数总结 reactive总结 响应式vue2vue3总结 ref与reactive的比较计算属性监视watch的value的问题 watchEffect函数生命周期Hooks函数总结 toRef总结 其他CompositionApishallowReactive与shallowRefreadonly与shallowReadonlytoRaw与markR…

React Antd Form.List 组件嵌套多级动态增减表单 + 表单联动复制实现

Antd Form.List 组件嵌套多级动态增减表单 表单联动复制实现 一、业务需求 有一个页面的组件,其中一部分需要用到动态的增减 复制表单,然后就想起 了使用 Antd 的 Form.List 去完成这个功能。 这个功能的要求是: 首先是一个动态的表单&…

事后多重比较案例分析

一、案例介绍 由单因素方差分析案例中,为研究郁金对低张性缺氧小鼠存活时间的影响,将36只小鼠随机生成A、B以及 C 三组,每组12个,雌雄各半,分别以10g/kg、20g/kg、40g/kg三种不同剂量的郁金灌胃,各组小鼠均…

08-C++学习笔记-类与对象

🔟🔒 08-C学习笔记-类与对象 在本篇学习笔记中,我们将详细讲解C中的类与对象的概念和相关知识。类是C中一种重要的数据类型,它允许我们自定义数据结构和相应的操作。 📚 C类与对象详细讲解 ✨类的概念 类是一种用户…

黑客(网络安全)自学

前言: 1.这是一条坚持的道路,三分钟的热情可以放弃往下看了. 2.多练多想,不要离开了教程什么都不会了.最好看完教程自己独立完成技术方面的开发. 3.有时多 google,baidu,我们往往都遇不到好心的大神,谁会无聊天天给你做解答 .4.遇到实在搞不懂的,可以先放放,以后…

3dmax导出cad

3dmax2022 导出cad 导入arcmap 10.2 导出版本为AutoCAD 2007 DWG

玩转Matplotlib的10个高级技巧

Matplotlib是Python中流行的数据可视化库,仅使用简单的几行代码就可以生成图表。但是默认的方法是生成的图表很简单,如果想增强数据演示的影响和清晰度,可以试试本文总结的10个高级技巧,这些技巧可以将可视化提升到一个新的水平: …

Hyperledger Fabric网络快速启动

目录 1、网络服务配置 2、关联的docker-compose-base.yaml 各Peer节点容器设置如下信息。 3、被关联的Peer-base.yaml 4、启动网络 2、完成通道的创建 2.1将节点加入应用通道 更新锚节点 2.为什么要创建节点并将其加入应用通道中? 1、网络服务配置 由于要启动…

人工智能(pytorch)搭建模型16-基于LSTM+CNN模型的高血压预测的应用

大家好,我是微学AI,今天给大家介绍一下人工智能(pytorch)搭建模型16-基于LSTMCNN模型的高血压预测的应用,LSTMCNN模型搭建与训练,本项目将利用pytorch搭建LSTMCNN模型,涉及项目:高血压预测,高血…

鼠标点击切换图片(使用js中的src属性)

使用到的知识点&#xff1a; 模板字符串 js中的src属性 img.src ./images/${i}.jpg 效果展示&#xff1a; 具体代码实现&#xff1a; <body><div class"box" style"width:500px;height:300px;margin:100px auto;"><img src&quo…

Python中获取指定目录下所有文件名的方法

在《Python中文件名和路径的操作》中提到&#xff0c;os模块中的函数可以对文件进行操作。通过递归以及os模块中提供的函数&#xff0c;可以获取指定目录下所有的文件名。 1 基本流程 通过递归获取指定目录下所有文件名的基本流程&#xff0c;如图1所示。 图1 基本流程 2 函…

Web开播系统的技术演进

随着直播SaaS业务的深入发展&#xff0c;Web端开播的诉求变得越来越强烈&#xff0c;对比客户端开播工具如OBS&#xff0c;Web开播与SaaS平台亲和度高&#xff0c;可以让用户快速体验平台全流程&#xff0c;同时易于分享链接&#xff0c;快速连麦。因此&#xff0c;寻求更加稳定…