0、背景
上篇文章《Spark 任务需要的内存跟哪些因素有关》验证 Spark 任务需要的内存,跟单个 partition 的数据量大小,以及数据计算逻辑复杂度有关。但是之中有个最大的特点,就是把 MySQL 作为数据源的时候,无论数据量多大,Spark 默认都只有一个分区。
这也就导致了每次数据量增加时,需要分配给 Spark 单个 partition 的内存越来越大,而 HDFS,不管数据量增加到多大,Spark 却都可以淡定的保证内存一直不变。
因为当 HDFS 作为数据源时,无论多大的数据量,都会默认把它都切成一个个 128m 的数据块,这样就导致进入 Spark 的单个partition 数据量都很小。
这个时候肯定有同学会问:既然是这样的话,那 Spark 在读取 MySQL 的时候,如果直接扩展默认的分区为多个,这样需要的内存不就少些了吗?到底该相反靠谱吗?从而该文章来验证一下。
一、算子扩展分区
如果要把 Spark 读取 MySQL 时默认的1个分区,扩展到多个分区,Spark 能提供的最简单解决办法,目前为止有两个:
- coalesce算子:根据源码的解释,它能够在修改默认分区数量的同时,还能尽量避免数据的 shuffle,是我们这次测试的首选;
- repartition算子:是Spark 用来修改默认数据分区的传统方法。
1.1 利用coalesce算子
因为目前为止,Spark 读取 MySQL,只能以 jdbc 的方式,所以这个「coalesce」算子的使用,就只能这样:
按理说,这样应该就可以把原本只有1个 partition 的数据,给拆成了 8 个 partition 了吧(当前 MySQL 表有1600w+条数据)。
但是,从我的实测结果来看,人家貌似根本不吃这一套,从分区打印信息来看
1.2 利用repartition算子
同样的,把上面的「coalesce」给换成「repartition」就可以了:
这一次,从程序打印的分区数量来看,确实是生效了。
但是,付出的代价却是「惨痛」的。
当前 MySQL 的 1600w+ 数据量,是由之前 2 个 1G 的数据文件写进去的,在不尝试改变分区数之前,Spark 对它进行读取、运算时,对 executor 内存的设置只需要 2G。
--executor-memory 2g
然而当我这次把分区数调整为 8 之后,这 2G 的 executor 内存完全不够用(跑一段时间后就 OOM),后面我就只能逐渐增加这个内存大小。直到把这个 executor 的内存,从开始的 2G,给逐渐加到 14G(就这么以一次1G的幅度试出来的),人家才能跑成功。
--executor-memory 14g
关键是,费这么大内存换来的,却是一个「负性价比」的结果。
先看不做分区调整前,Spark 进程跑完需要的时间:
可以看到,过程非常简单,就2个stage,总耗时也就不到1分钟。
再看把分区调整到 8个之后,Spark 进程跑完需要的时间:
居然超过了8分钟,最最关键的是,扩展到8个分区之后,那8个分区需要的总处理时间,也就花了1秒钟而已,就问你惊不惊喜,意不意外?
1.3 方案小结
对于「coalesce」来说,Spark 还算聪明,至少对于当前的这个案例来说,它应该是预估出了,扩展分区数量并不是一个聪明的选择(费内存不说,还耗时间),所以,人家干脆就不鸟你,直接不生效。
相比之下,「repartition」就听话多了,要它扩展它就扩展,只不过扩展分区之后的代价,需要你自行承担。
从原来的1个分区,扩展到8个,哪些数据进入到哪个分区,肯定就要根据一定的规则来,这里就一定会对数据进行比如「取Hash」或者「排序」(俗称 shuffle),这两个手段中上任意一个,就注定了 Spark 需要额外的「时间」还有「空间」。
这也就解释了,为什么上面把原本 1个分区的 MySQL 数据,给扩展到 8个之后,内存翻了好几倍不说,时间也翻了好几倍。
该方案的测试很有警醒意义:现实开发中、网上的资料或面试交流当中,Spark调整分区首先都会想到该解决方案;朴素的认为,想要提高一个分布式计算的效率,一个有效的办法,就是去增加计算时的「并行度」,但殊不知,在提高了并行度的同时,其他隐形成本可能也跟着大大提高了,得不偿失。
根据我的经验,类似这种问题,最好的解决方案,应该是尽量去避开这种不支持「分布式数据抽取」的数据源,如果说非要去读,且数据量巨大的话。那可以用其他抽取工具,先把它们以流的方式,或者小批量多次的方式,抽取到比如分布式消息队列、或者分布式文件系统中。
然后,再用 Spark 来读取,这样无论你的数据读取效率,还是处理效率,都会有质的提升。
二、多分区配置读取
基于通过算子来扩展分区的方案没有达到期望,其实官方就有提供Spark 读 MySQL 时,是可以通过 jdbc 来设置分区字段,达到分布式的读取目的,以下就展开优雅的实现分布式读取 MySQL 数据源。
2.1 JDBC配置
不像其他天然的分布式数据源,这里的 MySQL,是一个单实例、单表的形式,连存储的数据文件都是单个的,所以如果你用 jdbc 去读这个 MySQL 表,默认只有一个分区,也就好理解了。
既然想分布式读,那就得在读数据的时候,把数据进行人为的「打散」,让 Spark 知道,哪一坨数据,对应哪一个分区。
Spark 官网对这个打撒的配置,有还算比较详细的说明:
但是,看着好像挺简单的,但在实现的时候,还是有些需要你注意的地方。
2.2 注意事项
2.2.1 配置方式
正如文档描述的那样,它这个分区配置有个特点,「要么就几个一起配,要么就干脆一个也别配」,就是下面这 4 个:
我单独试了,确实没有骗我,假如我只配置「numPartitions」这么一个我认为最关键的设置,Spark 是不会买账的,一点卵用都没得。
2.2.2 分区字段要求
这里有个比较难受的地方在于,对于分区字段的类型,人家是有要求的:必须得是「数值类型」、或者「日期类型」、或者「时间戳类型」。
尴尬的是,我现在测试的这张表,全都是字符串类型,比如我随便挑一个字段作为分区,果然就报错了。
那咋整呢?骚操作在后面。
2.2.3 字符串字段如何分区
对于当前的这张表来说,可以拿来作为分区的字段,全都是字符串类型。
但是,我这里给想了个办法。
比如,我想给这个 MySQL 表数据分 10 个分区,而目前我认为比较好的分区字段为 time ,因为它可以根据某个规则,把数据比较平均地分成 10 份。
具体怎么分,看这里:
取这个时间字段值的「最后一个数字」,作为分区规则,就能以非常平均的方式,把这张表的数据给切成 10 份,这个骚操作是不是很聪明?
于是,对于当前这个情况,用 Spark 以分区的方式读取 MySQL 的代码,就得这么来写:
/**以分区方式读取MySQL数据*/
val rawDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://192.168.221.132:3306/test")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("user", "***")
.option("password", "****")
.option("dbtable", "(select cast(substring(time,14) AS decimal) as time_postfix,client_ip,time,target_ip,rcode,query_type,authority_record,add_msg,dns_ip from test03) as target_table")
.option("partitionColumn", "time_postfix")
.option("lowerBound", "0")
.option("upperBound", "10")
.option("numPartitions", 10)
.load()
就在原表基础上,自己定义了一个符合类型要求的分区字段呗,这个额外字段就叫「time_postfix」。
跑起来后,分区数量果然就变成了预期的 10 个:
至此,你可能会好奇,为毛代码设置的上界(upperBound)为 10 ,而不是 9 ,明明「time_postfix」的最大值就是 9 啊?
原因在于,它取的是「下闭上开」的区间,也就是:[0,10) 这种方式。
从程序运行的日志可以清楚地看到它的分区规则:
Number of partitions: 10, WHERE clauses of these partitions: `time_postfix` < 1 or `time_postfix` is null, `time_postfix` >= 1 AND `time_postfix` < 2, `time_postfix` >= 2 AND `time_postfix` < 3, `time_postfix` >= 3 AND `time_postfix` < 4, `time_postfix` >= 4 AND `time_postfix` < 5, `time_postfix` >= 5 AND `time_postfix` < 6, `time_postfix` >= 6 AND `time_postfix` < 7, `time_postfix` >= 7 AND `time_postfix` < 8, `time_postfix` >= 8 AND `time_postfix` < 9, `time_postfix` >= 9
2.2.4 方案小结
想要 Spark 以分布式方式读取 MySQL 的数据,虽然没什么大的技术难度,但想实现好也没有想象的那么简单,需要你注意下面 3 点:
- 4个必要的配置,必须同时出现,且对分区字段的类型有硬性要求;
- 如果分区字段不符合类型要求,需要你用其他手段对它进行转换;
- 对于选定的分区字段,要提前判断它的数据切分效果,否则容易造成读取时的数据倾斜,给后续数据处理带来麻烦。
三、并行一定比不并行性价比高吗
以上两个优化方案演示了Spark通过调用算子改变数据分区、多分区读取MySQL数据,但是优化方案只是对程序优化的手段,最终能不能达到「节省硬件资源」或者「提高计算效率」才是我们追求的目的。
那经过方案二对 MySQL 数据的切割之后,Spark 处理它需要的「硬件资源」到底有没有变少?以及对应的「计算效率」有没有变高?我们一起来验证一下。
3.1 不分区时需要资源和效率
默认情况下,Spark 读 MySQL 只有 1 个分区,所以就要求这个分区能承载所有的数据量,记得当时要读取这张数据量为 1600w+。
用特定的计算方式进行计算时,需要的「最小」内存为:executor 内存量:2G。
--executor-memory 2g
占用的 yarn 总内存为:约3.5G
而当我们把原本的单个分区,给平均切成 10 个之后,需要的最小内存是多少?以及想要实现最高效的计算效率,应该采用什么样的并行策略呢?
3.2 采用单个并行时--基准测试
作为对照,第一次测试,我们对 Spark 采用单并行策略,也就是
「1个 executor,配 1个 CPU」。
经过实测,能成功执行完任务,需要的 executor 内存设置为:512MB。
--executor-cores 1 --num-executors 1 --driver-memory 512m
占用的 yarn 总内存为:不到2G。
任务执行时间:约2分钟半(多次测量取平均值)。
小结:1个并行度时,相比之前 MySQL 采用默认分区(1个分区),Spark 需要的内存量大幅减少(少1.5G),但任务执行效率也由原来的约 1分钟,提高到约 2.5分钟。
3.3 单个executor,2CPU时
在上一个基准测试时,细心的同学可能发现了一个问题,那就是既然原来 1 个分区时,需要的 executor 内存大小为 2G,那么现在把数据切成了平均 10份,理论上单个 executor 需要的内存就只需要大概 200M 就够了呀?
那为毛上面还要把 executor 的内存设置为 512M 呢?
这里之所以单个 executor 给设置 512M,而不是 200M,原因在于:
yarn 不允许,目前我的集群环境,yarn 允许设置的最小 executor 内存大小,只能是 512M!
既然让 512M 的 executor 只跑 1 个分区的数据有些浪费了,那我们就让它并行跑 2个(或者更多个),这样一来,平均单个分区分摊的内存,就是 256M 了(或者更少)。
于是,当前对于 executor 的内存和并行度设置为:
--executor-cores 2 --num-executors 1 --driver-memory 512m
这样一来,虽然只有 1个 executor ,但并行度变成了2:
此时 yarn 占用的总内存不变,不变。
但因为并行度由原来的 1个,提高为 2个,所以执行效率肯定会提高。
由原来的 2.5分钟,提高到只需要 1.6分钟(多次执行取的平均值)。
小结:通过分析单个 executor 有富余的内存空间,提高单个 executor 的并行度,在硬件资源消耗不变的情况下,可以有效提高任务的执行效率。
3.4 单executor,3CPU时(依然可行)
这个时候,executor 的内存和并行度设置为:
--executor-cores 3 --num-executors 1 --driver-memory 512m
单个 executor 的并行度为 3:
yarn 占用的总内存量,依然跟上面一样保持不变(不到2G)。
按理说,这个时候,Spark 每个 partition 平均分得的内存量就为 512/3 MB。那是不是就不够了呢?
结果你猜怎么着,它依然能跑成功(没有出现我认为的 OOM),但有一点可以确定的是,这个时候,单个 partition 的内存肯定是非常紧张的。
这个时候,整个任务执行时间,再一次缩短为 1.2分钟。
小结:通过单 executor 设置 3个并行度,依然在硬件资源消耗不变的情况下能跑成功,且执行效率,比 2个并行更高一些。
3.5 单executor,4CPU时(扛不住了)
如果说,上一个设置单 executor 3 个 CPU 是当前这个任务能承载的内存极限(512MB),那么当前的这个设置,毋庸置疑就是在找死。
事实证明,这一次,确实行不通了,OOM 它终于来了。
3.6 3 executor,3 CPU时(最高效且最节省资源配置)
如果现在要你对当前这个任务,效率最高,且最节省资源的方式的设置如下:
--executor-cores 3 --num-executors 3 --driver-memory 512m
这个时候,权衡资源最省、且效率最高时的最大并发为:9个。
这个时候 yarn 占用的总内存大小为:约3.8G
(这里怀疑是 yarn 显示的 bug,消耗的 CPU 应该为 10 才对)
执行效率为:约 1分钟。
小结:通过权衡资源消耗的最小化,以及执行效率的最大化,对于当前任务,需要 3个 executor,以及每个 executor 3个 CPU,消耗约3.8G 内存。
3.7 小结
在什么都不调整,不设置的情况下(单数据分区),面对相同的逻辑计算,把任务跑成功时。
任务执行效率为:不到 1 分钟。
占用 yarn 总资源为:约3.5G。
而现在分布式是实现了,但就算我们「挖空心思」,以「性价比」最高的方式来跑这个任务,在所消耗的硬件资源,比之前什么都不做还要高的的情况下,执行效率也就是勉强打个平手。
说明什么?
说明分布式的计算方式,并不是在任何情况下都是最优解决方案,如果数据量相对比较小,且单个节点的算力能完全能 hold 住,那用「单节点方式」可能更划算。
毕竟,分布式情况下,天然需要的「基础硬件成本」、和「必要的网络通信开销」是任何一个分布式任务绕不开的。
所以,基于什么样的数据量,用什么样的计算方式,你心里有谱了吗?
参考资料:
- wx公众号(安瑞哥是码农)-《Spark 的 repartition 慎用,coalesce 可能没卵用!》
- wx公众号(安瑞哥是码农)-《Spark 想并发读取 MySQL,咋整?》
- wx公众号(安瑞哥是码农)-《Spark「并行」,就一定比「不并行」性价比更高吗?》