Spark 内存运用

news2024/9/23 17:22:32

RDD Cache

当同一个 RDD 被引用多次时,就可以考虑进行 Cache,从而提升作业的执行效率

// 用 cache 对 wordCounts 加缓存
wordCounts.cache
// cache 后要用 action 才能触发 RDD 内存物化
wordCounts.count

// 自定义 Cache 的存储介质、存储形式、副本数量
wordCounts.persist(MEMORY_ONLY)

Spark 的 Cache 机制 :

  • 缓存的存储级别:限定了数据缓存的存储介质,如 : 内存、磁盘
  • 缓存的计算过程:从 RDD 展开到分片 (Block),存储于内存或磁盘的过程
  • 缓存的销毁过程:缓存数据主动或被动删除的内存或磁盘的过程

存储级别

Spark 支持的存储级别:

  • RDD 缓存的默认存储级别:MEMORY_ONLY
  • DataFrame 缓存的默认存储级别:MEMORY_AND_DISK
存储级别存储介质存储形式副本设置
内存磁盘对象值序列化
MEMORY_ONLY1
MEMORY_ONLY_22
MEMORY_ONLY_SER1
MEMORY_ONLY_SER_22
DISK_ONLY1
DISK_ONLY_22
DISK_ONLY_33
MEMORY_AND_DISK1内存以对象值存储,磁盘以序列化
MEMORY_AND_DISK_22
MEMORY_AND_DISK_SER1内存/磁盘都以序列化的字节数组存储
MEMORY_AND_DISK_SER22

计算过程

缓存的计算过程 :

  • MEMORY_AND_DISK :先把数据集全部缓存到内存,内存不足时,才把剩余的数据落磁盘
  • MEMORY_ONLY :只把数据往内存里塞

内存中的存储过程 :

在这里插入图片描述

销毁过程

缓存的销毁过程 :

  • 缓存抢占 Execution Memory 空间,会进行缓存释放

Spark 清除缓存的原则:

  • LRU:按元素的访问顺序,优先清除那些最近最少访问的 BlockId、MemoryEntry 键值对
  • 在清除时,同属一个 RDD 的 MemoryEntry 不会清除

Spark 实现 LRU 的数据结构:LinkedHashMap , 内部有两个数据结构

  • HashMap : 用于快速访问,根据指定的 BlockId,O(1) 返回 MemoryEntry
  • 双向链表 : 用于维护元素(BlockId 和 MemoryEntry 键值对)的访问顺序

Spark 会释放 LRU 的 MemoryEntry :

在这里插入图片描述

Cache 注意点

用 Cache 的基本原则 :

  • RDD/DataFrame/Dataset 的引用数为 1,坚决不用 Cache
  • 当引用数大于 1,且运行成本超过 30%,就考虑用 Cache

运行成本占比 : 计算某个分布式数据集要消耗的总时间与作业执行时间的比值

  • 端到端的执行时间为 1 小时
  • DataFrame 被引用了 2 次
  • 从读取数据源到生成该 DataFrame 花了 12 分钟
  • 该 DataFrame 的运行成本占比 = 12*2/60 = 40%

用 noop 计算 DataFrame 运行时间 :

df.write
  .format("noop")
  .save()

.cache 是惰性操作,在调用 .cache后,要先用 count 才能触发缓存的完全物化

Cache 要遵循最小公共子集原则 :

val filePath: String = _
val df: DataFrame = spark.read.parquet(filePath)

//Cache方式一
val cachedDF = df.cache
//数据分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)

两个查询的 Analyzed Logical Plan 不一致,无法缓存复用

//Cache方式二
df.select(col1, col2).filter(col2 > 0).cache
//数据分析
df.filter(col2 > 0).select(col1, col2)
df.select(col1, col2).filter(col2 > 100)

Analyzed Logical Plan 完全一致,能缓存复用

//Cache方式三
val cachedDF = df.select(col1, col2).cache
//数据分析
cachedDF.filter(col2 > 0).select(col1, col2)
cachedDF.select(col1, col2).filter(col2 > 100)

及时清理 Cache :

  • 异步模式 (常用):调用 unpersist() 或 unpersist(False)
  • 同步模式:调用 unpersist(True)

OOM

OOM的具体区域 :

  • 发生 OOM 的 LOC(Line Of Code),代码位置
  • OOM 发生在 Driver 端,还是在 Executor 端
  • 在 Executor 端的哪片内存区域

Driver OOM

Driver 端的 OOM 位置 :

  • 创建小规模的分布式数据集:使用 parallelize、createDataFrame 创建数据集
  • 收集计算结果:通过 take、show、collect 把结果收集到 Driver 端

Driver 端的 OOM 原因:

  • 创建的数据集超过内存上限
  • 收集的结果集超过内存上限

广播变量的创建与分发 :

在这里插入图片描述

广播变量的数据拉取就是用 collect 。当数据总大小超过 Driver 端内存时 , 就报 OOM :

java.lang.OutOfMemoryError: Not enough memory to build and broadcast

对结果集尺寸预估,适当增加 Driver 内存配置

  • Driver 内存大小 : spark.driver.memory

查看执行计划 :

val df: DataFrame = _
df.cache.count

val plan = df.queryExecution.logical
val estimated: BigInt = spark
  .sessionState
  .executePlan(plan)
  .optimizedPlan
  .stats
  .sizeInBytes

Executor OOM

当 Executors OOM 时,定位 Execution Memory 和 User Memory

User Memory OOM

User Memory 用于存储用户自定义的数据结构,如: 数组、列表、字典

当自定义数据结构的总大小超出 User Memory 上限时,就会报错

java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf
java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance

计算 User Memory 消耗时,要考虑 Executor 的线程池大小

  • 当 dict 大小为 #size , Executor 线程池大小为 #threads
  • dict 对 User Memory 的总消耗:#size * #threads
  • 当总消耗超出 User Memory 上限,就会 OOM
val dict = List("spark", "tune")
val words = spark.sparkContext.textFile("~/words.csv")

val keywords = words.filter(word => dict.contains(word))
keywords.map((_, 1)).reduceByKey(_ + _).collect

自定义数据分发 :

  • 自定义的列表 dict 会随着 Task 分发到所有 Executors
  • 多个 Task 的 dict 会对 User Memory 产生重复消耗

在这里插入图片描述

解决 User Memory OOM 的思路 :

  • 先对数据结构的消耗进行预估
  • 相应地扩大 User Memory

UserMemory 总大小 = spark.executor.memory * (1 - spark.memory.fraction)

Execution Memory OOM

Execution Memory OOM 常见实例:数据倾斜和 数据膨胀

配置说明:

  • 2 个 CPU core,每个 core 有两个线程,内存大小为 1GB
  • spark.executor.cores = 3,spark.executor.memory = 900MB
  • Execution Memory = 180MB
  • Storage Memory = 180MB
  • Execution Memory 上限 = 360MB

数据倾斜

3 个 Reduce Task 对应的数据分片大小分别是 100MB , 100MB , 300MB

  • 当 Executor 线程池大小为 3,所以每个 Reduce Task 最多 360MB * 1/3 = 120MB

数据倾斜导致OOM :

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NHRj4w40-1677761995168)(…/…/png/%E5%86%85%E5%AD%98%E8%BF%90%E7%94%A8/image-20230209210919876.png)]

2 种调优思路:

  • 消除数据倾斜,让所有的数据分片尺寸都小于 100MB
  • 调整 Executor 线程池、内存、并行度等相关配置,提高 1/N 上限到 300MB

在 CPU 利用率高下解决 OOM :

  • 维持并发度、并行度不变,增大执行内存设置,提高 1/N 上限到 300MB
  • 维持并发度、执行内存不变,提升并行度把数据打散,将所有的数据分片尺寸都缩小到 100MB 内

数据膨胀

3 个 Map Task 对应的数据分片大小都是 100MB

数据膨胀导致OOM :

在这里插入图片描述

2 种调优思路:

  • 把数据打散,提高数据分片数量、降低数据粒度,让膨胀后的数据量降到 100MB 左右
  • 加大内存配置,结合 Executor 线程池调整,提高 1/N 上限到 300MB

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/383044.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【OJ比赛日历】快周末了,不来一场比赛吗? #03.04-03.10 #12场

CompHub 实时聚合多平台的数据类(Kaggle、天池…)和OJ类(Leetcode、牛客…)比赛。本账号同时会推送最新的比赛消息,欢迎关注!更多比赛信息见 CompHub主页 或 点击文末阅读原文以下信息仅供参考,以比赛官网为准目录2023-03-04&…

从100%进口到自主可控,从600块降到10块,中科院攻克重要芯片

前言 2月28日,“20多位中科院专家把芯片价格打到10块”冲上微博热搜,据河南省官媒大象新闻报道,热搜中提到的中科院专家所在企业为全球最大的PLC分路器芯片制造商仕佳光子,坐落于河南鹤壁。 为实现芯片技术自主可控自立自强&#…

根据栅格数据的范围和像元大小生成等比例的矢量数据

为啥会有这么一个需求呢,后面要是继续写的话会详细说,首先是有一个栅格数据,比如这样的:我的目标是这样的(矢量),就是这样,上面的栅格数据像大小是2*2的,直接上代码&…

【JAVA程序设计】【C00109】基于SSM(非maven)的员工工资管理系统

基于SSM(非maven)的员工工资管理系统项目简介项目获取开发环境项目技术运行截图项目简介 基于ssm框架非maven开发的企业工资管理系统共分为二个角色:系统管理员、员工 管理员角色包含以下功能: 系统后台登陆、管理员管理、员工信…

Linux基础命令-nice调整进程的优先级

文章目录 Nice 命令介绍 语法格式 常用参数 参考实例 1 调整bash的优先级为-10 2 调整脚本的优先级为6 3 调整指令的优先级 4 默认使用nice命令调整优先级 命令总结 Nice 命令介绍 nice命令的主要功能是用于调整进程的优先级,合理分配系统资源。Linux系…

torchaudio的I/O函数

info、load、save1.1 infotorchaudio.info(filepath: str, ...)Fetch meta data of an audio file. Refer to torchaudio.backend for the detail.返回音频文件的meta信息这里的meta元信息包括采样率、帧数、通道数、量化位数、音频格式info torchaudio.info(rE:\adins\data\2…

Java使用DFA算法实现敏感词过滤

1 前言敏感词过滤就是你在项目中输入某些字(比如输入xxoo相关的文字时)时要能检测出来,很多项目中都会有一个敏感词管理模块,在敏感词管理模块中你可以加入敏感词,然后根据加入的敏感词去过滤输入内容中的敏感词并进行…

测试软件5

一 css基础 css定义:可以设置网页中的样式,外观,美化 css中文名字:级联样式表,层叠样式表,样式表 二 css基础语法 1.style标签写在title标签后面 2.选择器{属性名1:属性值1;属性名…

ChatGPT最牛应用,让它帮你更新网站新闻吧!

谁能想到,ChatGPT火了!既能对话入流,又能写诗歌论文、出面试题、编代码,甚至还通过了谷歌面试拿到L3工程师offer,放在一年之前,没人相信这是当前AI能够达到的水平。ChatGPT自面世以来,凭借其极为…

【数据结构初阶】手把手带你实现栈

前言 在进入数据结构初阶的学习之后,我们学习了顺序表和链表,当然栈也是一种特殊的数据结构,他的特点是后进先出。 栈的概念及结构 栈(stack)又名堆栈,它是一种运算受限的线性表。限定仅在表尾进行插入和删…

iptables的介绍

iptables简介 1、 什么是iptables? iptables是linux防火墙工作在用户空间的管理工具,是 netfilter/iptables IP信息包过滤系统的一部分,用来设置、维护和检查Linux内核的IP数据包过滤规则 2、 iptables特点 iptables是基于内核的防火墙&…

【pytorch 入门系列】02 手把手多分类从0到1

温故而知新,通过手把手写一个多分类任务来复习之前所学过的知识。 前置知识 factorize的妙用:把文本数据枚举化 labels, uniques pd.factorize([b, b, a, c, b]) labels,uniques(array([0, 0, 1, 2, 0]), array([‘b’, ‘a’, ‘c’], dtypeobject))…

【C++】-- 特殊类设计

对于类的思维境界提升,没有太大的实际意义,但是锻炼思想。 目录 单例模式 饿汉模式 懒汉模式 #:请设计一个类,不能被拷贝。 拷贝只会发生在两个场景中:拷贝构造函数赋值运算符重载因此想要让一个类禁止拷贝&#xf…

对称锥规划:对称锥的增广拉格朗日乘子法(Semi-Smooth Newton Method解无约束优化子问题)

文章目录对称锥规划:对称锥的增广拉格朗日乘子法(Semi-Smooth Newton Method解无约束优化子问题)对称锥的增广拉格朗日函数Semi-Smooth Newton Method半光滑牛顿法广义雅可比半光滑性半光滑牛顿算法参考文献对称锥规划:对称锥的增…

2023年最新阿里云服务器价格表出炉(精准收费标准及配置价格表)

阿里云在全球率先宣布了基于 Intel Ice Lake 处理器的第七代云服务器ECS,性能提升的同时降低了报价,性价比更高了。进入2023年阿里云服务器价格依然是大家关心的问题,事实上阿里云服务器租用价格和最新收费标准都可以通过官方云服务器计算器来…

【IoT】智能烟雾报警器

设计简介 硬件设计由AT89C51单片机、DS18B20温度传感器、4位共阳数码管、电源模块、报警模块、按键模块、MQ-2烟雾检测模块和ADC0832模数转换模块组成。 烟雾传感器MQ-2检测空气中的烟雾气体,通过ADC0832进行数据转换,经过单片机的运算处理后在数码管上…

【WEB前端进阶之路】 HTML 全路线学习知识点梳理(上)

前言 HTML 是一切Web开发的基础,本文专门为小白整理,针对前端零基础的朋友们,手把手教你学习HTML,让你轻松迈入WEB开发的行列。 首先,感谢 橙子_ 在HTML学习以及本文编写过程中对我的帮助。 文章目录前言一.HTML简介1.…

Java使用不同方式获取两个集合List的交集、补集、并集(相加)、差集(相减)

1 明确概念首先知道几个单词的意思:并集 union交集 intersection补集 complement析取 disjunction减去 subtract1.1 并集对于两个给定集合A、B,由两个集合所有元素构成的集合,叫做A和B的并集。记作:AUB 读作“A并B”例&#…

微纳制造技术——基础知识

1.什么是直接带隙半导体和间接带隙半导体 导带底和价带顶处以同一K值,称为直接带隙半导体 导带底和价带顶不处在同一K值,称为间接带隙半导体 2.扩散和漂移的公式 3.三五族半导体的性质 1.high mobility 2.wide bandgap 3.direct bandgap 4.三五族…

SWM181 串口功能使用介绍

SWM181 串口功能使用介绍📌SDK固件包:https://www.synwit.cn/kuhanshu_amp_licheng/✨注意新手谨慎选择作为入门单片机学习。🌼开发板如下图: 📋SWM181描述上写了有4个串口,在数据手册上,将引脚…