Spark读取MySQL优化方案辩证

news2025/1/22 18:49:28

0、背景

上篇文章《Spark 任务需要的内存跟哪些因素有关》验证 Spark 任务需要的内存,跟单个 partition 的数据量大小,以及数据计算逻辑复杂度有关。但是之中有个最大的特点,就是把 MySQL 作为数据源的时候,无论数据量多大,Spark 默认都只有一个分区。

这也就导致了每次数据量增加时,需要分配给 Spark 单个 partition 的内存越来越大,而 HDFS,不管数据量增加到多大,Spark 却都可以淡定的保证内存一直不变。

因为当 HDFS 作为数据源时,无论多大的数据量,都会默认把它都切成一个个 128m 的数据块,这样就导致进入 Spark 的单个partition 数据量都很小。

这个时候肯定有同学会问:既然是这样的话,那 Spark 在读取 MySQL 的时候,如果直接扩展默认的分区为多个,这样需要的内存不就少些了吗?到底该相反靠谱吗?从而该文章来验证一下。

一、算子扩展分区

如果要把 Spark 读取 MySQL 时默认的1个分区,扩展到多个分区,Spark 能提供的最简单解决办法,目前为止有两个:

  • coalesce算子:根据源码的解释,它能够在修改默认分区数量的同时,还能尽量避免数据的 shuffle,是我们这次测试的首选;
  • repartition算子:是Spark 用来修改默认数据分区的传统方法。

1.1 利用coalesce算子

因为目前为止,Spark 读取 MySQL,只能以 jdbc 的方式,所以这个「coalesce」算子的使用,就只能这样:

按理说,这样应该就可以把原本只有1个 partition 的数据,给拆成了 8 个 partition 了吧(当前 MySQL 表有1600w+条数据)。

但是,从我的实测结果来看,人家貌似根本不吃这一套,从分区打印信息来看

1.2 利用repartition算子

同样的,把上面的「coalesce」给换成「repartition」就可以了:

这一次,从程序打印的分区数量来看,确实是生效了。

但是,付出的代价却是「惨痛」的。

当前 MySQL 的 1600w+ 数据量,是由之前 2 个 1G 的数据文件写进去的,在不尝试改变分区数之前,Spark 对它进行读取、运算时,对 executor 内存的设置只需要 2G

--executor-memory 2g

然而当我这次把分区数调整为 8 之后,这 2G 的 executor 内存完全不够用(跑一段时间后就 OOM),后面我就只能逐渐增加这个内存大小。直到把这个 executor 的内存,从开始的 2G,给逐渐加到 14G(就这么以一次1G的幅度试出来的),人家才能跑成功。

--executor-memory 14g

关键是,费这么大内存换来的,却是一个「负性价比」的结果

先看不做分区调整前,Spark 进程跑完需要的时间:

可以看到,过程非常简单,就2个stage,总耗时也就不到1分钟

再看把分区调整到 8个之后,Spark 进程跑完需要的时间:

居然超过了8分钟,最最关键的是,扩展到8个分区之后,那8个分区需要的总处理时间,也就花了1秒钟而已,就问你惊不惊喜,意不意外?

1.3 方案小结

对于「coalesce」来说,Spark 还算聪明,至少对于当前的这个案例来说,它应该是预估出了,扩展分区数量并不是一个聪明的选择(费内存不说,还耗时间),所以,人家干脆就不鸟你,直接不生效。

相比之下,「repartition」就听话多了,要它扩展它就扩展,只不过扩展分区之后的代价,需要你自行承担。

从原来的1个分区,扩展到8个,哪些数据进入到哪个分区,肯定就要根据一定的规则来,这里就一定会对数据进行比如「取Hash」或者「排序」(俗称 shuffle),这两个手段中上任意一个,就注定了 Spark 需要额外的「时间」还有「空间」

这也就解释了,为什么上面把原本 1个分区的 MySQL 数据,给扩展到 8个之后,内存翻了好几倍不说,时间也翻了好几倍。

该方案的测试很有警醒意义:现实开发中、网上的资料或面试交流当中,Spark调整分区首先都会想到该解决方案;朴素的认为,想要提高一个分布式计算的效率,一个有效的办法,就是去增加计算时的「并行度」,但殊不知,在提高了并行度的同时,其他隐形成本可能也跟着大大提高了,得不偿失

根据我的经验,类似这种问题,最好的解决方案,应该是尽量去避开这种不支持「分布式数据抽取」的数据源,如果说非要去读,且数据量巨大的话。那可以用其他抽取工具,先把它们以流的方式,或者小批量多次的方式,抽取到比如分布式消息队列、或者分布式文件系统中。

然后,再用 Spark 来读取,这样无论你的数据读取效率,还是处理效率,都会有质的提升。

二、多分区配置读取

基于通过算子来扩展分区的方案没有达到期望,其实官方就有提供Spark 读 MySQL 时,是可以通过 jdbc 来设置分区字段,达到分布式的读取目的,以下就展开优雅的实现分布式读取 MySQL 数据源。

2.1 JDBC配置

不像其他天然的分布式数据源,这里的 MySQL,是一个单实例、单表的形式,连存储的数据文件都是单个的,所以如果你用 jdbc 去读这个 MySQL 表,默认只有一个分区,也就好理解了。

既然想分布式读,那就得在读数据的时候,把数据进行人为的「打散」,让 Spark 知道,哪一坨数据,对应哪一个分区。

Spark 官网对这个打撒的配置,有还算比较详细的说明:

但是,看着好像挺简单的,但在实现的时候,还是有些需要你注意的地方。

2.2 注意事项

2.2.1 配置方式

正如文档描述的那样,它这个分区配置有个特点,「要么就几个一起配,要么就干脆一个也别配」,就是下面这 4 个:

我单独试了,确实没有骗我,假如我只配置「numPartitions」这么一个我认为最关键的设置,Spark 是不会买账的,一点卵用都没得。

2.2.2 分区字段要求

这里有个比较难受的地方在于,对于分区字段的类型,人家是有要求的:必须得是「数值类型」、或者「日期类型」、或者「时间戳类型」。

尴尬的是,我现在测试的这张表,全都是字符串类型,比如我随便挑一个字段作为分区,果然就报错了。

那咋整呢?骚操作在后面。

2.2.3 字符串字段如何分区

对于当前的这张表来说,可以拿来作为分区的字段,全都是字符串类型。

但是,我这里给想了个办法。

比如,我想给这个 MySQL 表数据分 10 个分区,而目前我认为比较好的分区字段为 time ,因为它可以根据某个规则,把数据比较平均地分成 10 份。

具体怎么分,看这里:

取这个时间字段值的「最后一个数字」,作为分区规则,就能以非常平均的方式,把这张表的数据给切成 10 份,这个骚操作是不是很聪明?

于是,对于当前这个情况,用 Spark 以分区的方式读取 MySQL 的代码,就得这么来写:

/**以分区方式读取MySQL数据*/
        val rawDF = spark.read
            .format("jdbc")
            .option("url", "jdbc:mysql://192.168.221.132:3306/test")
            .option("driver", "com.mysql.cj.jdbc.Driver")
            .option("user", "***")
            .option("password", "****")
            .option("dbtable", "(select cast(substring(time,14) AS decimal) as time_postfix,client_ip,time,target_ip,rcode,query_type,authority_record,add_msg,dns_ip from test03) as target_table")
            .option("partitionColumn", "time_postfix")
            .option("lowerBound", "0")
            .option("upperBound", "10")
            .option("numPartitions", 10)
            .load()

就在原表基础上,自己定义了一个符合类型要求的分区字段呗,这个额外字段就叫「time_postfix」。

跑起来后,分区数量果然就变成了预期的 10 个:

至此,你可能会好奇,为毛代码设置的上界(upperBound)为 10 ,而不是 9 ,明明「time_postfix」的最大值就是 9 啊?

原因在于,它取的是「下闭上开」的区间,也就是:[0,10) 这种方式。

从程序运行的日志可以清楚地看到它的分区规则:

Number of partitions: 10, WHERE clauses of these partitions: `time_postfix` < 1 or `time_postfix` is null, `time_postfix` >= 1 AND `time_postfix` < 2, `time_postfix` >= 2 AND `time_postfix` < 3, `time_postfix` >= 3 AND `time_postfix` < 4, `time_postfix` >= 4 AND `time_postfix` < 5, `time_postfix` >= 5 AND `time_postfix` < 6, `time_postfix` >= 6 AND `time_postfix` < 7, `time_postfix` >= 7 AND `time_postfix` < 8, `time_postfix` >= 8 AND `time_postfix` < 9, `time_postfix` >= 9

2.2.4 方案小结

想要 Spark 以分布式方式读取 MySQL 的数据,虽然没什么大的技术难度,但想实现好也没有想象的那么简单,需要你注意下面 3 点:

  1. 4个必要的配置,必须同时出现,且对分区字段的类型有硬性要求;
  2. 如果分区字段不符合类型要求,需要你用其他手段对它进行转换;
  3. 对于选定的分区字段,要提前判断它的数据切分效果,否则容易造成读取时的数据倾斜,给后续数据处理带来麻烦。

三、并行一定比不并行性价比高吗

以上两个优化方案演示了Spark通过调用算子改变数据分区、多分区读取MySQL数据,但是优化方案只是对程序优化的手段,最终能不能达到「节省硬件资源」或者「提高计算效率」才是我们追求的目的。

那经过方案二对 MySQL 数据的切割之后,Spark 处理它需要的「硬件资源」到底有没有变少?以及对应的「计算效率」有没有变高?我们一起来验证一下。

3.1 不分区时需要资源和效率

默认情况下,Spark 读 MySQL 只有 1 个分区,所以就要求这个分区能承载所有的数据量,记得当时要读取这张数据量为 1600w+。

用特定的计算方式进行计算时,需要的「最小」内存为:executor 内存量:2G

--executor-memory 2g

占用的 yarn 总内存为:约3.5G

而当我们把原本的单个分区,给平均切成 10 个之后,需要的最小内存是多少?以及想要实现最高效的计算效率,应该采用什么样的并行策略呢?

3.2 采用单个并行时--基准测试

作为对照,第一次测试,我们对 Spark 采用单并行策略,也就是

1个 executor,配 1个 CPU」。

经过实测,能成功执行完任务,需要的 executor 内存设置为:512MB

--executor-cores 1 --num-executors 1 --driver-memory 512m

占用的 yarn 总内存为:不到2G

任务执行时间:约2分钟半(多次测量取平均值)。

小结:1个并行度时,相比之前 MySQL 采用默认分区(1个分区),Spark 需要的内存量大幅减少(少1.5G),但任务执行效率也由原来的约 1分钟,提高到约 2.5分钟

3.3 单个executor,2CPU时

在上一个基准测试时,细心的同学可能发现了一个问题,那就是既然原来 1 个分区时,需要的 executor 内存大小为 2G,那么现在把数据切成了平均 10份,理论上单个 executor 需要的内存就只需要大概 200M 就够了呀?

那为毛上面还要把 executor 的内存设置为 512M 呢?

这里之所以单个 executor 给设置 512M,而不是 200M,原因在于:

yarn 不允许,目前我的集群环境,yarn 允许设置的最小 executor 内存大小,只能是 512M!

既然让 512M 的 executor 只跑 1 个分区的数据有些浪费了,那我们就让它并行跑 2个(或者更多个),这样一来,平均单个分区分摊的内存,就是 256M 了(或者更少)。

于是,当前对于 executor 的内存和并行度设置为:

--executor-cores 2 --num-executors 1 --driver-memory 512m

这样一来,虽然只有 1个 executor ,但并行度变成了2:

此时 yarn 占用的总内存不变,不变。

但因为并行度由原来的 1个,提高为 2个,所以执行效率肯定会提高。

由原来的 2.5分钟,提高到只需要 1.6分钟(多次执行取的平均值)。

小结:通过分析单个 executor 有富余的内存空间,提高单个 executor 的并行度,在硬件资源消耗不变的情况下,可以有效提高任务的执行效率

3.4 单executor,3CPU时(依然可行)

这个时候,executor 的内存和并行度设置为:

--executor-cores 3 --num-executors 1 --driver-memory 512m

单个 executor 的并行度为 3:

yarn 占用的总内存量,依然跟上面一样保持不变(不到2G)。

按理说,这个时候,Spark 每个 partition 平均分得的内存量就为 512/3 MB。那是不是就不够了呢?

结果你猜怎么着,它依然能跑成功(没有出现我认为的 OOM),但有一点可以确定的是,这个时候,单个 partition 的内存肯定是非常紧张的。

这个时候,整个任务执行时间,再一次缩短为 1.2分钟

小结:通过单 executor 设置 3个并行度,依然在硬件资源消耗不变的情况下能跑成功,且执行效率,比 2个并行更高一些

3.5 单executor,4CPU时(扛不住了)

如果说,上一个设置单 executor 3 个 CPU 是当前这个任务能承载的内存极限(512MB),那么当前的这个设置,毋庸置疑就是在找死。

事实证明,这一次,确实行不通了,OOM 它终于来了。

3.6 3 executor,3 CPU时(最高效且最节省资源配置)

如果现在要你对当前这个任务,效率最高,且最节省资源的方式的设置如下:

--executor-cores 3 --num-executors 3 --driver-memory 512m

这个时候,权衡资源最省、且效率最高时的最大并发为:9个。

这个时候 yarn 占用的总内存大小为:约3.8G

(这里怀疑是 yarn 显示的 bug,消耗的 CPU 应该为 10 才对)

执行效率为:约 1分钟

小结:通过权衡资源消耗的最小化,以及执行效率的最大化,对于当前任务,需要 3个 executor,以及每个 executor 3个 CPU,消耗约3.8G 内存

3.7 小结

在什么都不调整,不设置的情况下(单数据分区),面对相同的逻辑计算,把任务跑成功时。

任务执行效率为:不到 1 分钟

占用 yarn 总资源为:约3.5G。

而现在分布式是实现了,但就算我们「挖空心思」,以「性价比」最高的方式来跑这个任务,在所消耗的硬件资源,比之前什么都不做还要高的的情况下,执行效率也就是勉强打个平手

说明什么?

说明分布式的计算方式,并不是在任何情况下都是最优解决方案,如果数据量相对比较小,且单个节点的算力能完全能 hold 住,那用「单节点方式」可能更划算

毕竟,分布式情况下,天然需要的「基础硬件成本」、和「必要的网络通信开销」是任何一个分布式任务绕不开的。

所以,基于什么样的数据量,用什么样的计算方式,你心里有谱了吗?

参考资料:

  1. wx公众号(安瑞哥是码农)-《Spark 的 repartition 慎用,coalesce 可能没卵用!》
  2. wx公众号(安瑞哥是码农)-《Spark 想并发读取 MySQL,咋整?》
  3. wx公众号(安瑞哥是码农)-《Spark「并行」,就一定比「不并行」性价比更高吗?》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2186309.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++】set容器和map容器的基本使用

一、序列式容器和关联式容器 1、STL中的部分容器如&#xff1a;string、vector、list、deque、array、forward_list等&#xff0c;这些容器统称为序列式容器&#xff0c;因为逻辑结构为线性序列的数据结构&#xff0c;两个位置存储的值之间一般没有紧密的关联关系&#xff0c;…

数据结构双向链表和循环链表

目录 一、循环链表二、双向链表三、循环双向链表 一、循环链表 循环链表就是首尾相接的的链表&#xff0c;就是尾节点的指针域指向头节点使整个链表形成一个循环&#xff0c;这就弥补了以前单链表无法在后面某个节点找到前面的节点&#xff0c;可以从任意一个节点找到目标节点…

Leetcode 540. 有序数组中的单一元素

1.题目基本信息 1.1.题目描述 给你一个仅由整数组成的有序数组&#xff0c;其中每个元素都会出现两次&#xff0c;唯有一个数只会出现一次。 请你找出并返回只出现一次的那个数。 你设计的解决方案必须满足 O(log n) 时间复杂度和 O(1) 空间复杂度。 1.2.题目地址 https:…

大语言模型入门(二)——提示词

一、什么是提示词 大语言模型&#xff08;LLM&#xff09;的提示词&#xff08;Prompt&#xff09;是与模型交互的关键&#xff0c;它影响着模型的输出结果。提示词&#xff08;Prompt&#xff09;和提示工程&#xff08;Prompt Engineering&#xff09;密切相关。什么又是提示…

详解代理服务器及Squid

一、 代理服务器简介 &#xff08;1&#xff09;什么是代理服务器 代理服务器英文全称为ProxyServer&#xff0c;其主要功能代理网络用户获取网络信息&#xff0c;起到内网和Internet的桥梁作用。 在TCP/IP网络中&#xff0c;传统的通信过程是这样的&#xff1a;客户端向服务…

ROS2 22.04 Carttographer安装

安装环境&#xff1a; Ubuntu22.04 ros2 humble # 下载源文件 git clone https://github.com/ros2/cartographer.git -b ros2 git clone https://github.com/ros2/cartographer_ros.git -b ros2# 使用小鱼一键配置rosdep wget http://fishros.com/install -O fishros &&am…

基于SSM的本科生操行评定管理系统

文未可获取一份本项目的java源码和数据库参考。 1课题名称 基于SSM的本科生操行评定系统 1.2课题来源和选题依据 随着时代的进步和国民生活水平的不断提高&#xff0c;教育也越来越被人们所重视&#xff0c;学校应该培养品学兼优的全方位人才&#xff0c;学生的行为习惯和日…

mac安装redis实践和客户端连接失败问题解决

文章目录 参考文档和网址redis和客户端安装下载Homebrew程序Mac系统设置redis后台运行设置连接密码安装ARDM使用ARDM连接redis错误解决 参考文档和网址 redis官网命令指导文档brew官网地址brew客户端下载地址redis客户端下载地址 redis和客户端安装 下载Homebrew程序 HomeB…

golang grpc进阶

protobuf 官方文档 基本数据类型 .proto TypeNotesGo Typedoublefloat64floatfloat32int32使用变长编码&#xff0c;对于负值的效率很低&#xff0c;如果你的域有可能有负值&#xff0c;请使用sint64替代int32uint32使用变长编码uint32uint64使用变长编码uint64sint32使用变长…

大语言模型入门(一)——大语言模型智能助手

一、大语言模型智能助手 2022年末ChatGPT一经推出&#xff0c;一时间不注册个账号用一下都跟不上潮流了。然而&#xff0c;我们要注册OpenAI的账号使用ChatGPT还是一件比较麻烦的事情&#xff08;懂的都懂&#xff09;。好在&#xff0c;国内各大团队非常给力地及时推出了自研的…

野火STM32F103VET6指南者开发板入门笔记:【1】点亮RGB

硬件介绍 提示&#xff1a;本文是基于野火STM32F103指南者开发板所写例程&#xff0c;其他开发板请自行移植到自己的工程项目当中即可。 RGB-LEDPin引脚&#xff1a;低电平-点亮&#xff0c;高电平-熄灭REDPB5GREENPB0BLUEPB1 文章目录 硬件介绍软件介绍&#xff1a;结构体方式…

三、数据链路层(上)

目录 3.1数据链路层概述 3.1.1术语 3.1.2功能 3.2封装成帧和透明传输 3.2.1封装成帧 ①字符计数法 ②字符&#xff08;节&#xff09;填充法 ③零比特填充法 ④违规编码法 3.2.2透明传输 3.2.3差错控制 差错原因 检错编码 奇偶校验 ☆循环冗余码CRC 例题 纠错…

社区医院疫苗接种预约小程序管理系统SpringBoot+vue

目录 一、项目概述 二、系统架构 1. 技术栈 2. 架构图 三、后端设计 1. 数据模型 2. API 设计 四、前端设计 五、功能实现 1. 用户登录注册 2. 接种建档 3. 疫苗展示 六、总结 一、项目概述 本项目旨在为社区医院提供一个高效便捷的疫苗接种预约管理系统。系统主要…

记一次vue路由跳转登陆之前的页面,参数丢失问题

一、背景 vue3.0&#xff0c;项目登陆之前访问某个可访问的页面&#xff0c;当跳转到需要登陆才能访问的页面时&#xff0c;跳转到登陆页面&#xff0c;登陆后再跳转到登陆之前需要登陆才能访问的页面&#xff0c;跳转时发现参数丢失了。 A页面&#xff08;无需登陆&#xff…

【零基础保姆级教程】MMDetection3安装与训练自己的数据集

最近在跑对比试验&#xff0c;由于MMDetection框架的算法较齐全&#xff0c;遂决定写一篇教程留做参考。若你对流程有问题与疑问欢迎评论区指出 本文运行环境如下供参考&#xff1a; python版本3.9MMDetection版本3.3 一、虚拟环境的搭建 参考该博客搭建基本环境&#xff1…

【开源免费】基于SpringBoot+Vue.JS水果购物网站(JAVA毕业设计)

本文项目编号 T 065 &#xff0c;文末自助获取源码 \color{red}{T065&#xff0c;文末自助获取源码} T065&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析 六、核心代码6.1 查…

从认识String类,到走进String类的世界

作为一个常用的数据类型&#xff0c;跟随小编一同进入String的学习吧&#xff0c;领略String的一些用法。 1. 认识 String 类 2. 了解 String 类的基本用法 3. 熟练掌握 String 类的常见操作 4. 认识字符串常量池 5. 认识 StringBuffer 和 StringBuilder 一&#xff1a;…

【吊打面试官系列-MySQL面试题】Mysql中的事务回滚机制概述?

大家好&#xff0c;我是锋哥。今天分享关于【Mysql中的事务回滚机制概述&#xff1f;】面试题&#xff0c;希望对大家有帮助&#xff1b; Mysql中的事务回滚机制概述&#xff1f; 事务是用户定义的一个数据库操作序列&#xff0c;这些操作要么全做要么全不做&#xff0c;是一个…

职称评审一次通过需要注意什么?

谁能想到 被评委会全票通过的职称材料 居然要注意这么多细节 营业执照需要加盖公章 论文需要拆分上传 业绩需要连续提供近几年的 奖项可以加分 一些表格有模板 所以职称评审做材料还是有很多方面需要好好注意一下的&#xff0c;建议还是找机构帮你代理整理&#xff0c;因…

如何使用ssm实现基于web的网站的设计与实现+vue

TOC ssm756基于web的网站的设计与实现vue 绪论 1.1 研究背景 当前社会各行业领域竞争压力非常大&#xff0c;随着当前时代的信息化&#xff0c;科学化发展&#xff0c;让社会各行业领域都争相使用新的信息技术&#xff0c;对行业内的各种相关数据进行科学化&#xff0c;规范…