执行计划
查看执行计划
explain()
:只展示物理执行计划。(使用较多)explain(mode="simple")
:只展示物理执行计划。- `explain(mode=“extended”):展示物理执行计划和逻辑执行计划。
explain(mode="codegen")
:展示要 Codegen 生成的可执行 Java 代码。(使用较多)explain(mode="cost")
:展示优化后的逻辑执行计划以及相关的统计。explain(mode="formatted")
:以分隔的方式输出,它会输出更易读的物理执行计划,并展示每个节点的详细信息。
from pyspark.sql import SparkSession
# 初始化SparkSession
spark = SparkSession.Builder()\
.appName('test')\
.enableHiveSupport()\
.getOrCreate()
sql = """
select level
,count(distinct from_table) as cnt
from dw_pub_safe.dw_pub_table_rely_pfd
where level > 3+7
group by level
"""
spark.sql(sql).explain()
执行计划如下:我们可以看到分成了三个stage,对常量进行了优化,预先计划好,中间有两次shuffle;
该段sql,如果是hive执行计划,则是一个mr过程
参数优化
yarn单节点参数:内存yarn.nodemanager.resource.memory-mb
;cpu核数:yarn.nodemanager.resource.cpu-vcores
;可以结合服务器台数及集群资源使用情况,合理分配spark计算资源;
一个executor由一个yarn的Container执行,一个executor设置的内存和cpu核心数要小于yarn对Container资源配置
单个Container
最大cpu数:yarn.scheduler.maximum-allocation-vcores
最大内存:yarn.scheduler.maximum-allocation-mb
更多属性可以看yarn的配置文件或者后台Configuration
spark_df与pandas_df之间的转化
提速,可以添加参数:spark.conf.set("spark.sql.execution.arrow.enabled",True)
在spark_df.toPandas()
过程,会将数据回收到driver内存中,如果内存溢出需调整回收driver最大内存参数spark.driver.maxResultSize
,样例如下:设置最大32g
spark = spark = SparkSession.Builder()\
.config("spark.driver.maxResultSize","32g")\
.enableHiveSupport()\
.getOrCreate()
广播join
类似与hive中的mapjoin
参数:spark.sql.autoBroadcastJoinThreshold
为-1
时,表示禁用广播join,默认是小表10mb,开启小表广播join;
语句优化
数据倾斜
- 过滤倾斜的key,比如业务中爬虫异常key,null
可以使用df.sample(n)
抽样找出倾斜异常key,再df.filter(func)
过滤掉倾斜key如果key中的null倾斜需要保留,我们可以将null数据单独拿出来进行处理,比如:
-- 假设a.mobile存在大量的null倾斜
select a.id
,a.mobile
,b.fields
from table_a
left join table_b
on nvl(a.mobile,rand()) = b.mobile
-- 或者使用union all改写,单独摘出来再拼接上去
select a.id
,a.mobile
,b.fields_name
from table_a a
left join table_b b
on a.mobile=b.mobile
where a.mobile is not null
union all
select a.id,a.mobile,null as fields_name
from table_a a
where a.mobile is null
-- 过滤异常key
select groupby_field
,count(1) as cnt
from table_a
where groupby_field <> '异常key'
group by groupby_field
-
增加shuffle并行度,如果各个key的数据量整体差异不大,task < executor_num(executor个数) * executor_cores(每个executor的核心数),我们可以考虑增加task数量,来充分利用计算资源;
spark.sql.shuffle.partitions
参数可以设置并行度(默认是200),一般设置每核跑1-3个task,磁盘io时可以充分利用计算资源。
spark中有很多算子有指定并行度参数,比如:
textFile(xx,minnumpartition)
sc.parallelize(xx,num)
sc.makeRDDD(xx,num)
sc.parallelizePairs(List[Tuple2],num)
redduceByKey(xx,num)
,groupByKey(xx,num)
,join
,distinct
repartition
,coalesce
spark.default.parallelism
在sparksql中并行度由spark.sql.shuffle.partitions
决定 -
双重聚合,类似于hive中的groupby倾斜参数
set hive.groupby.skewindata=true
,用两个mr计算作业,第一个mr中的key随机分发聚合,第二个mr做全局聚合;比如:
select groupby_field
,sum(cnt) as cnt -- 全局聚合
from
( -- key打散聚合
select ceiling(rand() * 10) as rnd -- 添加随机数打散
,groupby_field -- 分组字段
,count(1) as cnt
from table_name
group by ceiling(rand() * 10),groupby_field
) t
group by groupby_field
- reduce的joion改写成mapjoin,如果存在小表情况下,可以使用mapjoin,将小表回收到driver端,再广播到各个执行的executor上应用map关联;此场景使用于大表join小表的情况;
这里需要注意,在外连接时,比如left join
或者right join
,小表是主表,mapjoin不生效
select /*+mapjoin(b)*/ a.id
,b.fields_name
from table_a a
join table_b b -- b小表
on a.id=b.id
- join中倾斜key单独摘出来操作。在hive中会有join倾斜参数,
hive.optimize.skewjoin=true;
它会将join作业拆分成两个MR,对于倾斜健部分单独摘出来使用mapjoin处理,非倾斜键走正常reduce的join。在spark中,如果倾斜键数据符合大表+小表原则,也可以使用该策略。如果倾斜健两个表的数据都比较大,大表倾斜部分同一个key添加n种前缀,小表膨胀倾斜健部分膨胀n倍,倾斜部分join,再union 非倾斜部分join
select a.id,a.field_a,b.field_b
from
( -- 加入随机数
select id,field_a,ceiling(rand()*10) as rnd_name
from table_a
where a.id in ('倾斜健')
) a
join
( -- 数据膨胀
select id,subview.rnd_name,field_b
from table_b b
lateral view explode(array(1,2,3,4,5,6,7,8,9,10)) subview as rnd_name
where b.id in ('倾斜健')
) b
on a.id=b.id and a.rnd_name=b.rnd_name
union all -- 拼接非倾斜部分的join
select a.id,a.field_a,b.field_b
from table_a a
join table_b b
on a.id=b.id
where a.id not in ('倾斜健') and b.id not in ('倾斜健')
对于rdd计算优化,在代码层面,如果rdd多次使用使用cache()
,persist()
持久化
尽量避免shuffle类算子,尽量使用有map端聚合算子,比如reduceByKey,aggregateByKey(可以自定义map端聚合函数,自定义初始记录),combineByKey(类同aggregateByKey,初始记录为rdd数据行):可以减少shuffle write数据量,shuffle读数据量,redduce端聚合次数;
尽量使用高性能算子,比如用reduceByKey取代groupByKey;使用mapPartitions取代map;数据filter过滤后使用coalse减少分区
使用广播变量,比如mapjoin
待更新。。。。。。