【pyspark速成专家】11_Spark性能调优方法2

news2025/1/11 20:48:48

目录

​编辑

二,Spark任务UI监控

三,Spark调优案例


二,Spark任务UI监控

Spark任务启动后,可以在浏览器中输入 http://localhost:4040/ 进入到spark web UI 监控界面。

该界面中可以从多个维度以直观的方式非常细粒度地查看Spark任务的执行情况,包括任务进度,耗时分析,存储分析,shuffle数据量大小等。

最常查看的页面是 Stages页面和Excutors页面。

Jobs: 每一个Action操作对应一个Job,以Job粒度显示Application进度。有时间轴Timeline。

Stages: Job在遇到shuffle切开Stage,显示每个Stage进度,以及shuffle数据量。可以点击某个Stage进入详情页,查看其下面每个Task的执行情况以及各个partition执行的费时统计。

Storage:

监控cache或者persist导致的数据存储大小。

Environment:
显示spark和scala版本,依赖的各种jar包及其版本。

Excutors : 监控各个Excutors的存储和shuffle情况。

SQL: 显示各种SQL命令在那些Jobs中被执行。

三,Spark调优案例

下面介绍几个调优的典型案例:

1,资源配置优化

2,利用缓存减少重复计算

3,数据倾斜调优

4,broadcast+map代替join

5,reduceByKey/aggregateByKey代替groupByKey

1,资源配置优化

下面是一个资源配置的例子:

优化前:

#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 8 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files  data.csv,profile.txt
--py-files  pkg.py,tqdm.py
pyspark_demo.py 

优化后:这里主要减小了 executor-cores数量,一般设置为1~4,过大的数量可能会造成每个core计算和存储资源不足产生OOM,也会增加GC时间。 此外也将默认分区数调到了1600,并设置了2G的堆外内存。

#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 2 \
--conf spark.yarn.maxAppAttempts=2 \
--conf spark.default.parallelism=1600 \
--conf spark.sql.shuffle.partitions=1600 \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=2g\
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON = ./anaconda3.zip/anaconda3/bin/python  #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files  data.csv,profile.txt
--py-files  pkg.py,tqdm.py
pyspark_demo.py 

2, 利用缓存减少重复计算

%%time
# 优化前:
import math 
rdd_x = sc.parallelize(range(0,2000000,3),3)
rdd_y = sc.parallelize(range(2000000,4000000,2),3)
rdd_z = sc.parallelize(range(4000000,6000000,2),3)
rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x))
s = rdd_data.reduce(lambda a,b:a+b+0.0)
n = rdd_data.count()
mean = s/n 
print(mean)

%%time 
# 优化后: 
import math 
from  pyspark.storagelevel import StorageLevel
rdd_x = sc.parallelize(range(0,2000000,3),3)
rdd_y = sc.parallelize(range(2000000,4000000,2),3)
rdd_z = sc.parallelize(range(4000000,6000000,2),3)
rdd_data = rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)

s = rdd_data.reduce(lambda a,b:a+b+0.0)
n = rdd_data.count()
mean = s/n 
rdd_data.unpersist()
print(mean)

3, 数据倾斜调优

%%time 
# 优化前: 
rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda x:(x,1))
rdd_count = rdd_one.reduceByKey(lambda a,b:a+b+0.0)
print(rdd_count.collect()) 

%%time 
# 优化后: 
import random 
rdd_data = sc.parallelize(["hello world"]*1000000+["good morning"]*10000+["I love spark"]*10000)
rdd_word = rdd_data.flatMap(lambda x:x.split(" "))
rdd_one = rdd_word.map(lambda x:(x,1))
rdd_mid_key = rdd_one.map(lambda x:(x[0]+"_"+str(random.randint(0,999)),x[1]))
rdd_mid_count = rdd_mid_key.reduceByKey(lambda a,b:a+b+0.0)
rdd_count = rdd_mid_count.map(lambda x:(x[0].split("_")[0],x[1])).reduceByKey(lambda a,b:a+b+0.0)
print(rdd_count.collect())  

#作者按:此处仅示范原理,单机上该优化方案难以获得性能优势

4, broadcast+map代替join

该优化策略一般限于有一个参与join的rdd的数据量不大的情况。

%%time 
# 优化前:

rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")])
rdd_students = rdd_age.join(rdd_gender).map(lambda x:(x[0],x[1][0],x[1][1]))

print(rdd_students.collect())

%%time 

# 优化后:
rdd_age = sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender = sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")],2)
ages = rdd_age.collect()
broads = sc.broadcast(ages)

def get_age(it):
    result = []
    ages = dict(broads.value)
    for x in it:
        name = x[0]
        age = ages.get(name,0)
        result.append((x[0],age,x[1]))
    return iter(result)

rdd_students = rdd_gender.mapPartitions(get_age)

print(rdd_students.collect())

5,reduceByKey/aggregateByKey代替groupByKey

groupByKey算子是一个低效的算子,其会产生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替,通过在每个partition内部先做一次数据的合并操作,大大减少了shuffle的数据量。

%%time 
# 优化前:
rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
                               ("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names = rdd_students.groupByKey().map(lambda t:(t[0],list(t[1])))
names = rdd_names.collect()
print(names)

%%time 
# 优化后:
rdd_students = sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
                               ("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names = rdd_students.aggregateByKey([],lambda arr,name:arr+[name],lambda arr1,arr2:arr1+arr2)

names = rdd_names.collect()
print(names)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1708915.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据技术Hbase列数据库——topic1

目录 搭建单机版Hbase验证方法一验证方法二 搭建单机版Hbase 验证方法一 使用 jps 命令查看 HMaster 进程是否启动 首先使用xftp 7上传hbase-2.1.0安装压缩包到虚拟机进行解压缩到某一地址,这里解压缩到了上传的路径即/root/software/ tar -zxvf hbase-2.1.0-bi…

OrangePi AIpro (8T)使用体验,性能测试报告

前言 这段时间收到了CSDN和香橙派的邀请,对OrangePi AIpro进行体验测评,在此感谢CSDN对我的信任,也感谢香橙派能做出如此优秀的开发板。 可喜可贺,周三晚上我收到了官方寄出的OrangePi AIpro。出于对国产芯片的好奇&#xff0c…

苹果与OpenAI合作在即:iOS 18中的ChatGPT引发期待与担忧

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

idea的project structure下project [lauguage ]()level 没有java的sdk17选项如何导入

idea的project structure下project lauguage level 没有java的sdk17选项如何导入 别导入了,需要升级idea版本。idea中没有project language level没有17如何添加 - CSDN文库 别听这文章瞎扯淡 2021版本就是没有,直接卸载升级到最新版本就可以了。没办法…

马斯克的 xAI 帝国!60亿融资背后的超级布局?

在全球科技竞技场,每个重大融资事件都是对行业格局的一次重塑。近日,埃隆马斯克的人工智能初创企业 xAI 成功完成了一轮规模空前的融资——60亿美元,此举无疑在业界投下了一枚震撼弹,标志着 AI 领域内一场新的竞赛拉开了序幕。 …

SwiftUI中AppStorage的介绍使用

在Swift中,AppStorage是SwiftUI中引入的一个属性包装器,在这之前我们要存储一些轻量级的数据采用UserDefaults进行存取。而AppStorage用于从UserDefaults中读取值,当值改变时,它会自动重新调用视图的body属性。也就是说&#xff0…

CC1310 Debug interface is locked

XDS110连接CC1310板子,打开Smart RF 弹出窗口如下: 解决办法: 1 打开SmartRF Flash Programmer 2 选择连接的设备CC1310, 弹出如下窗口,点击OK。 3 点击Tools图标,选择CC26XX/CC13XX Forced Mass Erase。 4 在弹出的…

骆驼大赛

目录 一,主版图 二,骰子 三,初始设置 四,核心规则 五,结算 这是适合5-8人玩的一个概率推理类的回合制桌游。 一,主版图 赛道由16个格子组成,编号为1-16。 一共7个骆驼,其中正…

就说说Java初学者求职准备项目的正确方式

当下不少Java初学者也知道求职时项目的重要程度,但在简历上写项目和准备面试项目时,真有可能走弯路,这样的话,加重学习负担还是小事,还真有可能导致无法入职。 1 对于在校生和应届生来说,你去跑通个学习项…

四川汇聚荣聚荣科技有限公司好不好?

在当今科技飞速发展的时代,企业要想在激烈的市场竞争中脱颖而出,必须具备强大的技术实力和良好的市场口碑。那么,作为一家专注于科技创新的公司,四川汇聚荣聚荣科技有限公司究竟如何呢?接下来,我们将从四个方面进行详…

探索Python列表的奥秘:从零开始学习

新书上架~👇全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一、列表简介与用途 二、列表的创建与访问 三、列表的增删改查 四、列表生成式与高级操作…

信创加持 YashanDB通过商用密码产品认证

近期,深算院自主研发的“YashanDB V23”成功获得由国家密码管理局商用密码检测中心颁发的《商用密码产品认证证书》,通过GM/T 0028《密码模块安全技术要求》安全等级第二级认证。这标志着YashanDB密码技术和密码管理领域达到了国家标准,能够全…

股民用脚投票 退退退!

倒计时2天,看来今年首只非ST类要退市的股票诞生了。 继上周五封S跌停后,今天正源(股份)再度被股民用脚投票一字跌停, 这已经连续第18个交易日股价低于1块钱了。 按照退市新规,连续20个交易日股价低于1元是…

专业渗透测试 Phpsploit-Framework(PSF)框架软件小白入门教程(九)

本系列课程,将重点讲解Phpsploit-Framework框架软件的基础使用! 本文章仅提供学习,切勿将其用于不法手段! 继续接上一篇文章内容,讲述如何进行Phpsploit-Framework软件的基础使用和二次开发。 现在,我们…

信息标记形式 (XML, JSON, YAML)

文章目录 🖥️介绍🖥️三种形式🏷️XML (Extensible Markup Language)🔖规范🔖注释🔖举例🔖其他 🏷️JSON (JavaScript Object Notation)🔖规范🔖注释&#x…

机器之心 | 清华接手,YOLOv10问世:性能大幅提升,登上GitHub热榜

本文来源公众号“机器之心”,仅用于学术分享,侵权删,干货满满。 原文链接:清华接手,YOLOv10问世:性能大幅提升,登上GitHub热榜 相同性能情况下,延迟减少 46%,参数减少 2…

知识产权与标准化

知识产权与标准化 导航 文章目录 知识产权与标准化导航一、知识产权概述二、保护范围与对象三、保护期限四、知识产权归属五、侵权判定六、标准的分类 一、知识产权概述 知识产权:知识产权是指人们就其智力劳动成果所依法享有的专有权利,通常是国家赋予创造者对其…

SNP数据转型解析:云服务在现代企业数字化转型的必要性

为什么当今的企业想为数字化工作环境做好准备并保持竞争力,很难避免使用云服务呢? 要理解为什么企业没有云的替代选择,我们需要了解云服务的含义 - 它不仅仅指存储数据的另一个位置。各种云模型提供了极大的灵活性,可以根据需要操…

149.二叉树:二叉树的前序遍历(力扣)

代码解决 这段代码实现了二叉树的前序遍历,前序遍历的顺序是:访问根节点 -> 递归遍历左子树 -> 递归遍历右子树。以下是详细解释,包括各个部分的注释: // 二叉树节点的定义 struct TreeNode {int val; // 节…

栈的特性及代码实现(C语言)

目录 栈的定义 栈的结构选取 链式储存结构和顺序栈储存结构的差异 栈的代码实现 "stack.h" "stack.c" 总结 栈的定义 栈:栈是限定仅在表尾进行插入和删除操作的线性表。 我们把运行插入的和删除的一段叫做栈顶(TOP&#xff…