13.108.Spark 优化、Spark优化与hive的区别、SparkSQL启动参数调优、四川任务优化实践:执行效率提升50%以上

news2024/12/28 3:21:09

13.108.Spark 优化
1.1.25.Spark优化与hive的区别
1.1.26.SparkSQL启动参数调优
1.1.27.四川任务优化实践:执行效率提升50%以上

13.108.Spark 优化:

1.1.25.Spark优化与hive的区别

先理解spark与mapreduce的本质区别,算子之间(map和reduce之间多了依赖关系判断,即宽依赖和窄依赖。)
优化的思路和hive基本一致,比较大的区别就是mapreduce算子之间都需要落磁盘,而spark只有宽依赖才需要落磁盘,窄依赖不落磁盘。
在这里插入图片描述
在这里插入图片描述

1.1.26.SparkSQL启动参数调优

在这里插入图片描述

1)先对比结果:executors优化
Hive执行了30分钟(1800秒)的sql,没有优化过的SparkSQL执行需要,
最少化的Executor执行需要640秒(提高了Executor的并行度,牺牲了HDFS的吞吐量:5个core最合适),
最大化的Executor 281.634秒(最大限度的利用HDFS的吞吐量,牺牲Executor的并行度),
优化取中间值,253.189秒。

方案1:最少化 Fat executors

---------------------------------	Fat executors	--------------------------------------------------------------------------------
./spark-sql --master yarn \	# Fat executors (每个节点一个Executor)【优势:最佳吞吐量】
--num-executors 3 \			# 集群中的节点的数目 = 3
--executor-memory 30G \	# 每个节点的内存/每个节点的executor数目 = 30GB/1 = 30GB
--executor-cores 16 \		# 每个executor独占节点中所有的cores = 节点中的core的数目 = 16
--driver-memory 1G			# AM大约需要1024MB的内存和一个Executor
耗时:Time taken: 640 seconds

方案2:最大化Tiny executors

---------------------------------	Tiny executors	--------------------------------------------------------------------------------
./spark-sql --master yarn \	# Tiny executors [每个Executor一个Core]【优势:并行性】
--num-executors 48 \		# 集群中的core的总数 = 每个节点的core数目 * 集群中的节点数 = 16*3
--executor-memory 1.6G \	# 每个节点的内存/每个节点的executor数目 = 30GB/16 = 1.875GB
--executor-cores 1 \			# 每个executor一个core
--driver-memory 1G			# AM大约需要1024MB的内存和一个Executor
耗时:Time taken: 281.634 seconds
executor并发度只有45,task的并发度,1个executor 50左右,总数 18382

方案3:折中方案

---------------------------------	Balance between Fat (vs) Tiny	--------------------------------------------------------------------------------
./spark-sql --master yarn \	# Balance between Fat (vs) Tiny
--num-executors 8 \			# (16-1)*3/5 = 9 留一个executor给ApplicationManager => --num-executors = 9-1 = 8
						# 每个节点的executor的数目 = 9 / 3 = 3
--executor-memory 10G \	# 每个executor的内存 = 30GB / 3 = 10GB【默认分配的是8G,需要修改配置文件支持到10G。】
						# 计算堆开销 = 7% * 10GB = 0.7GB。因此,实际的 --executor-memory = 10 - 0.7 = 9.3GB
--executor-cores 5 \			# 给每个executor分配5个core,保证良好的HDFS吞吐。
						# 每个节点留一个core给Hadoop/Yarn守护进程 => 每个节点可用的core的数目= 16 - 1
--driver-memory 1G			
耗时:Time taken: 253 seconds

Task并行度优化
1.调整 Executors 下 每个stage的默认task数量,即设置Task 的并发度:

【当集群数量比较大时】
很多人常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,
!【默认是一个HDFS block对应一个task(如果不设置那么可以通过第三种方案来优化!)】。
通常来说,Spark默认设置的数量是偏少的(比如就几十个task),
如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。
试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,
那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!
因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,
比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

30 G 16 core

/home/admin/bigdata/spark-2.2.0-bin-hadoop2.6/bin/spark-sql \
--master yarn \
--num-executors 16 \
--executor-memory 1G \
--executor-cores 10 \
--driver-memory 1G \
--conf spark.default.parallelism=450 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3	

1.1.27.四川任务优化实践:执行效率提升50%以上

一、四川的信息
账号:xxxxxx 密码: xxxxxxxx

一、事实表优化
1、**优化结果: 20 分钟左右,优化完成后 5 分钟左右。**数据量:5.8亿

2、原SQL:(spark不一定快)

drop table if exists dc_f_organization;
create table if not exists dc_f_organization (
	orgid int,
	orgcode string,
	yearmonth string ,
	zzdate string,
	orgname string,
	orglevel int,
	id int,
	orgtagging int, 
	createdate timestamp
);

insert into dc_f_organization
select 
	a.orgid, .orgcode, a.yearmonth, a.zzdate, n.orgname, n.orglevel, n.id, n.orgtagging, n.createdate
from 
	( select o.id orgid, o.orgcode, d.yearmonth, d.zzdate from dc_d_organization o, dc_d_wddate ) a
	left join dc_d_organization n on to_date(n.createdate)=a.zzdate and n.orgcode = a.orgcode;
	

3、优化方案:
– ############################## HIVE 执行:增加 block 的数量,提高Spark的并发度(当前任务文件比较小,设置了26;一般参考数量:300左右;) #################################
– (1) 单独执行笛卡尔积,
– 先拆分文件:(改用hive,拆分文件,增加并行度)
– 【耗时:101.586 seconds;结果文件数量 26】
– 检查文件块数量:hadoop fs -ls /user/hive/warehouse/test.db/dc_d_org_date 26 个block

set mapreduce.map.memory.mb=1024;
set mapred.max.split.size=524288;
set mapred.min.split.size.per.node=524288;
set mapred.min.split.size.per.rack=524288;
drop table if exists dc_d_org_date;
create table dc_d_org_date as select o.id orgid,o.orgcode,d.yearmonth,d.zzdate from dc_d_organization o CROSS JOIN dc_d_wddate d;
-- ##############################	SPARK 执行;参数:spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G	#################################
-- (2)【Spark:Time taken: 115.78 seconds;】
set spark.shuffle.consolidateFiles=true;
drop table if exists dc_f_organization;
create table if not exists dc_f_organization
(orgid int,orgcode string,YEARMONTH string ,ZZDATE string,ORGNAME string,orglevel int,id int,ORGTAGGING int, createdate timestamp);

insert into dc_f_organization
select a.orgid,a.orgcode,a.YEARMONTH,a.ZZDATE,n.ORGNAME,n.orglevel,n.id,n.ORGTAGGING,n.createdate
from dc_d_org_date a
left join DC_D_ORGANIZATION n on to_date(n.CREATEDATE)=a.ZZDATE and n.orgcode = a.orgcode;

– ############################## 持续优化方向:将上述两者合并到一起在 spark 中执行 ##############################
问题:可能是因为文件太小,spark 分区命令没有生效。set spark.sql.shuffle.partitions=300;
注意:SPARK中笛卡尔积需要改成 CROSS JOIN,否则语法报错。

二、优化CUBE表

  1、优化结果:原来1小时左右,优化后26分钟。
		总结:shuffle时间:16分钟,数据量	35.2亿
		任务含有宽依赖(group)被分成2个stage
			
		✔采用方案 1:改用spark执行。提高并行度。
			执行参数:spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G
			stage 1 执行时间:11(partitions=300)
			stage 2 执行时间:15(partitions=200)
			设置分区数量,默认是200set spark.sql.shuffle.partitions=300;(理论上可以提高 stage 2 30%的速度,实际运行的时候可能会丢失executor,运行不稳定,不建议设置。)
			(原因可能是设置了虚拟核心数量。)
			
		方案 2:将case when的操作独立出一张表,去除部分重复扫描计算,减少cube阶段的计算量。
			抽取的时间增加了2分钟,节省的 shuffle 时间也是2分钟。没有意义。
			预处理时间:2-3分钟
			stage 1 执行时间:11
			stage 2 执行时间:13(节省的时间也是2-3分钟)
			
		方案 3:提高 shuffle 使用内存的占比 设置为60%
			执行参数:spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G --conf spark.storage.memoryFraction=0.3 --conf spark.shuffle.memoryFraction=0.5
			执行结果:效果不明显,多次执行时间也不太一致。
			
		方案 4:减少CUBE的维度数量, orgid 和 orgcode是一对一关系,可以去掉1个维度,计算完成之后再join
			执行结果:join 消耗的时间更久。
				
	2、采用的方案1:SPARK执行
		-- 执行参数 spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G
		-- set spark.sql.shuffle.partitions=300;
		drop table  if  exists dc_c_organization;
		create table if not exists dc_c_organization(
			YEARMONTH string,ZZDATE string,orgid int ,orgcode string,total int,
			provinceNum int,cityNum int,districtNum int, newDistrictNum int,townNum int,streetNum int,otherNum int,communityNum int,villageNum int,gridNum int);
			
		-- 如果用 hive 执行可以开启 combiner,map端先预聚合,减少reduce端的数据量和计算量,减少磁盘的IO和网络传输时间。
		-- set hive.map.aggr = true;
		-- set hive.groupby.mapaggr.checkinterval = 10000;
		
		-- ##############################	SPARK	#################################
		-- set spark.sql.shuffle.partitions=300;
		insert into dc_c_organization
		select  
		n.YEARMONTH,
		n.ZZDATE,
		n.orgid,
		n.orgcode,
		count(n.id) total,
		nvl(SUM(case when pt.displayname='省' then 1  else 0 end),0) AS provinceNum,
		nvl(SUM(case when pt.displayname='市' then 1  else 0 end),0) as cityNum,
		nvl(SUM(case when pt.displayname='县(区)' then 1  else 0 end),0) AS districtNum,
		(nvl(SUM(case when pt.displayname='县(区)'  then 1  else 0 end),0) -nvl(SUM(case when pt.displayname='县(区)' AND n.ORGTAGGING= 31 then 1  else 0 end),0)) as newDistrictNum,
		nvl(SUM(case when  ((n.ORGNAME LIKE '%乡%' OR n.ORGNAME LIKE '%镇%' OR n.ORGNAME LIKE '%乡镇%')) AND pt.displayname='乡镇(街道)' then 1  else 0 end),0) townNum,
		nvl(SUM(case when (n.ORGNAME LIKE '%街道%') AND pt.displayname='乡镇(街道)' then 1  else 0 end),0) streetNum,
		(nvl(SUM(case when pt.displayname='乡镇(街道)'then 1  else 0 end),0)-nvl(SUM(case when ((n.ORGNAME LIKE '%乡%' OR n.ORGNAME LIKE '%镇%' OR n.ORGNAME LIKE '%乡镇%') ) AND pt.displayname='乡镇(街道)' then 1  else 0 end),0)-nvl(SUM(case when (n.ORGNAME LIKE '%街道%' )  AND pt.displayname='乡镇(街道)' then 1  else 0 end),0)) otherNum,
		(nvl(SUM(case when pt.displayname='村(社区)' then 1  else 0 end),0)-nvl(SUM(case when ((n.ORGNAME LIKE '%村' OR n.ORGNAME LIKE '%村民委员会' OR n.ORGNAME LIKE '%农村工作中心站' OR n.ORGNAME LIKE '%村委会')) AND pt.displayname='村(社区)' then 1  else 0 end),0)) communityNum,
		nvl(SUM(case when ((n.ORGNAME LIKE '%村' OR n.ORGNAME LIKE '%村民委员会' OR n.ORGNAME LIKE '%农村工作中心站' OR n.ORGNAME LIKE '%村委会')) AND pt.displayname='村(社区)' then 1  else 0 end),0) villageNum,
		nvl(SUM(case when pt.displayname='片组片格'then 1  else 0 end),0) gridNum
		from dc_f_organization n
		left join dc_d_property pt on n.orglevel = pt.id
		GROUP BY n.YEARMONTH,n.ZZDATE,n.orgid,n.orgcode
		WITH CUBE;
			
	3、优化方案2:从业务逻辑上进行优化。(发现SQL逻辑中存在重复的计算)
		-- ############################	预处理:去除重复计算和减少CUBE的计算量	############################
			drop table if exists temp_dc_c_organization;
			create table temp_dc_c_organization
			as select
			n.yearmonth,
			n.zzdate,
			n.orgid,
			n.orgcode,
			n.id as id,
			case when pt.displayname='省' then 1  else 0 end as provincenum,
			case when pt.displayname='市' then 1  else 0 end as citynum,
			case when pt.displayname='县(区)' then 1  else 0 end as districtnum,
			case when pt.displayname='县(区)' and n.orgtagging= 31 then 1  else 0 end as old_districtnum,
【重复1case when ((n.orgname like '%乡%' or n.orgname like '%镇%' or n.orgname like '%乡镇%')) and pt.displayname='乡镇(街道)' then 1  else 0 end townnum,
【重复2case when (n.orgname like '%街道%') and pt.displayname='乡镇(街道)' then 1  else 0 end streetnum,
			case when pt.displayname='乡镇(街道)'then 1  else 0 end as total_streetnum_01,
【重复1case when ((n.orgname like '%乡%' or n.orgname like '%镇%' or n.orgname like '%乡镇%')) and pt.displayname='乡镇(街道)' then 1  else 0 end as total_streetnum_02,
【重复2case when (n.orgname like '%街道%') and pt.displayname='乡镇(街道)' then 1  else 0 end as total_streetnum_03,
			case when pt.displayname='村(社区)' then 1  else 0 end as communitynum_01,
【重复3case when ((n.orgname like '%村' or n.orgname like '%村民委员会' or n.orgname like '%农村工作中心站' or n.orgname like '%村委会')) and pt.displayname='村(社区)' then 1  else 0 end as communitynum_02,
【重复3case when ((n.orgname like '%村' or n.orgname like '%村民委员会' or n.orgname like '%农村工作中心站' or n.orgname like '%村委会')) and pt.displayname='村(社区)' then 1  else 0 end villagenum,
			case when pt.displayname='片组片格'then 1  else 0 end gridnum
			from
			dc_f_organization n
			left join dc_d_property pt on n.orglevel = pt.id;
			
			
		-- ############################	CUBE:节省的时间相当于预处理的时间。############################
			create table dc_c_organization_02
			as select  
			yearmonth,
			zzdate,
			orgid,
			count(id) total,
			sum(provincenum) as provincenum,
			sum(citynum) as citynum,
			sum(districtnum) as districtnum,
			sum(districtnum)-sum(old_districtnum) as newdistrictnum,
			sum(townnum) townnum,
			sum(streetnum) streetnum,
			sum(total_streetnum_01)-sum(townnum)-sum(streetnum) othernum,
			sum(communitynum_01)-sum(villagenum) communitynum,
			sum(villagenum) villagenum,
			sum(gridnum) gridnum
			from temp_dc_c_organization as n
			group by yearmonth, zzdate, orgid with cube;
			

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/957056.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

什么是架构,架构的本质是什么

不论是开发人员还是架构师,我们都一直在跟软件系统打交道,架构是在工作中出现最频繁的术语之一。那么,到底什么是架构?你可能有自己的答案,也有可能没有答案。对“架构”的理解需要我们不断在实践中思考、归纳、演绎&a…

说说Lambda架构

分析&回答 Lambda架构是由Storm的作者Nathan Marz提出的一个实时大数据处理框架。Marz在Twitter工作期间开发了著名的实时大数据处理框架Storm,Lambda架构是其根据多年进行分布式大数据系统的经验总结提炼而成。Lambda架构的目标是设计出一个能满足实时大数据系…

高教社杯数模竞赛特辑论文篇-2018年C题:基于 RFMT 模型的百货商场会员画像描绘(附获奖论文及代码实现)

目录 赛题 摘要 一、问题的重述 二、模型假设 三、变量说明 四、模型的建立与求解 4.1 数据预处理 4.2 问题一的模型建立与求解 4.2.1 建模思路 4.2.2 模型建立 4.2.3 模型的求解与结果分析 4.3 问题二的模型建立与求解 4.3.1 建模思路 4.3.2 模型建立 4.3.3 模…

污水厂数字孪生 | 3D可视化管理系统助力污水企业数字化管理

随着城市化进程的不断加快,污水处理成为了城市环境保护的重要组成部分。传统的污水处理方式往往存在诸多问题,如信息不对称、安全隐患等。为了解决这些问题,污水处理3D可视化管控平台应运而生,它通过结合数字孪生技术和远程指导技…

详解Python argparse ---命令行选项、参数和子命解析器

详解argparse模块 一、 模块简介二、使用步骤三、ArgumentParser()参数四、add_argument()参数详解五、示例 一、 模块简介 argparse模块使编写用户友好的命令行界面变得容易。该程序定义了它需要什么参数,argparse将找出如何从s…

使用C语言计算1/1-1/2+1/3-1/4+...+1/99-1/100

观察算式,发现分子都是1,分母从1~100,所以可以使用for循环产生1~100之间的数。 另一个问题是,如何产生正负交替的符号?很简单,这个符号本质上就是往每一项前面乘一个系数:一或者负一。所以只需…

纽扣电池/锂电池UN38.3安全检测报告

根据规章要求,航空公司和机场货物收运部门应对锂电池进行运输文件审查,重要的是每种型号的锂电池UN38.3安全检测报告。该报告可由的三方检测机构。如不能提供此项检测报告,将禁止锂电池进行航空运输. UN38.3包含产品:1、 锂电池2…

AI建模 | 物体三维重建的高效方法

三维重建是将客观世界中的物体在虚拟空间表达出来,在大众视野中,物品三维重建最直观的应用当属虚拟仿真和VR/AR导航。其实在学科专业领域,三维重建已经更早地应用在高精地图、测绘系统、城市规划等领域。 科技发展的终极方向应当是普适性&am…

dll修复精灵下载方法,完美解决电脑d3dx9-d3dx11dll文件丢失方法

大家好!今天,我将为大家带来一场关于d3dx9_43.dll丢失的6种解决方法的演讲。希望通过这次演讲,能够帮助大家解决在电脑使用过程中遇到的问题,提高我们的生活和工作效率。 首先,让我们来了解一下d3dx9_43.dll是什么文件…

《QDebug 2023年8月》

一、Qt Widgets 问题交流 1.获取 QWidget 当前所在屏幕区域 本来以为 QWidget 的 screen() 接口返回的是组件自己所在屏幕的 QSreen,实测是所属 Window 所在的屏幕,如果 Window 跨屏了两者所属屏幕可能就不是同一个。 获取 QWidget 当前所在屏幕区域可…

jmeter单接口和多接口测试

最近接触到了多接口串联,接口串联的技术会在其他帖子有说明,其核心技术点就是通过正则表达式和变量来实现接口的关联。目前为止呢笔者用到的地方还只有一个,就是关于session保持的时候。但是看到很多资料都说测试过程中经常遇到b接口需要用a接…

IDA Pro反汇编工具下载安装使用

一、前言 IDA Pro(Interactive Disassembler Professional)简称“IDA”,是Hex-Rays公司出品的一款交互式反汇编工具,是目前最棒的一个静态反编译软件,为众多0day世界的成员和ShellCode安全分析人士不可缺少的利器。ID…

R3LIVE项目实战(5) — R3LIVE数据采集与时间同步

目录 1 R3LIVE数据采集运行步骤 1.1 录制数据集 1.2 修改config下对应的配置文件 1.3 启动相机和雷达节点 1.4 运行R3LIVE与播包 2 R3LIVE在线运行 1 R3LIVE数据采集运行步骤 1.1 录制数据集 采集数据需要注意的一点是,不同传感器之间的时间同步问题&#x…

【C语言练习】C语言如何操作内存(重中之重!!!)

📢:如果你也对机器人、人工智能感兴趣,看来我们志同道合✨ 📢:不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 📢:文章若有幸对你有帮助,可点赞 👍…

Android 之 GPS 初涉

本节引言: 说到GPS这个名词,相信大家都不陌生,GPS全球定位技术嘛,嗯,Android中定位的方式 一般有这四种:GPS定位,WIFI定准,基站定位,AGPS定位(基站GPS); 本…

Linux的内存理解

建议 Mysql机器 尽量不要硬swap,如果是ssd磁盘还好。Free命令 free 命令显示系统内存的使用情况,包括物理内存、交换内存(swap)和内核缓冲区内存 输出简介: Mem 行(第二行)是内存的使用情况。Swap 行(第三行)是交换空间的使用情况。total 列显示系统总的可用物理内存和交换…

【高阶产品策略】策略产品数据与行为分析方法

文章目录 1、策略产品数据与用户行为数据分析概述2、埋点、策略数据收集核心技能3、用户行为数据分析应用4、数据平台实施 1、策略产品数据与用户行为数据分析概述 2、埋点、策略数据收集核心技能 3、用户行为数据分析应用 4、数据平台实施

亲测有效!Win7中如何安装高版本的NodeJS

正常情况下,Win7支持的Node.js最高版本是V13.14,但在开发过程中,有不少Vue项目或其他需要依赖Node环境的项目,对Node版本要求都比较高。对此,我们要么重装操作系统到Win8以上,要么就得想办法在Win7中安装高…

平衡二叉树AVLTree的实现与应用(难度5/10)

这是目前难度最高的一个作业,主要难点在于树的平衡,树的平衡依赖于调试输出的图形化,也就是输出二叉树的实现,二叉树的输出技巧性比较强,一般人很难直接想到控制台可以打印二叉树。后面的测试结果显示本文实现的AVLTre…

MYSQL 高级SQL语句

1、按关键字排序: order by 语句用来实现 ,前面可以使用where字句使查询结果进一步过滤 asc 是按照升序排序 , 默认的 desc 是按照降序排序 order by的语法结构 例:select name,score from ku order by score desc; 表示将数…