Spark Shuffle Tracking 原理分析

news2024/10/6 8:24:30

Shuffle Tracking

Shuffle Tracking 是 Spark 在没有 ESS(External Shuffle Service)情况,并且开启 Dynamic Allocation 的重要功能。如在 K8S 上运行 spark 没有 ESS。本文档所有的前提都是基于以上条件的。

如果开启了 ESS,那么 Executor 计算完后,把 shuffle 数据交给 ESS, Executor 没有任务时,可以安全退出,下游任务从 ESS 拉取 shuffle 数据。

1. 背景

如果 Executor 执行了上游的 Shuffle Map Task 并且把 shuffle 数据些到本地。并且现在 Executor 没有 Task 运行,那么此 Executor 是否能销毁?

现状是如果 Executor 没有 active 的 shuffle 数据,则可以被销毁。
active shuffle 的定义:如果 Shuffle Map Stage 的 task 把 shuffle 数据输出到本地。如果依赖此 shuffle 的Stage 没有计算完毕,则称此 shuffle 为 active shuffle。因为依赖此 shuffle 的 Task 可能从 Driver 端获取了 MapStatus,但是还没有拉取完 shuffle 数据。

为了达到此目的,需要跟踪每个 Stage 和每个 Task 的运行信息。并且启动定时任务,定时扫描每个 Executor,判断是否有任务运行,是否有 active 的 shuffle,如果没有则可以退出。

退出有两种,如果开启了 decommission,则到期的 executors 进入 decommission 模式,否则执行 killExecutors。

参数配置

spark.dynamicAllocation.shuffleTracking.enabled: 默认 true,是否开启 shuffle tracking。
spark.dynamicAllocation.shuffleTracking.timeout: 默认 Long.MaxValue,

2. 设计

ExecutorMonitor 为每个 Executor 创建一个 Tracker, 用于跟踪此 Executor 的状态。

private val executors = new ConcurrentHashMap[String, Tracker]()

定时任务间隔时间查找 timeout 的 executor,然后处理。

timedOutExecutors 方法的主要逻辑,就是遍历 executors。如果 executor 没有 active 的 shuffle 并且当前时间大于 executor 的超时时间 timeoutAt,则此 executor 可以被安全释放。

为什么 executor 有 active shuffle 数据就不能 kill?
在这里插入图片描述

  • Shuffle 的过程:
  1. MapTask 把 shuffle 写到本地,并且把状态汇报给 Driver.
  2. Reduce Task 从 Driver 获取 shuffle status,并从 shuffle status 获取每个 shuffle 数据的地址。
  3. 连接对应的 executor 获取 shuffle 数据。

如果在 reduce 获取完 shuffle status 后,MapTask 所在的 Executor 被 kill 掉,Reduce Task 就无法获取 shuffle 数据。

如果执行 decommission 逻辑,把 MapTask 的 shuffle 数据长传到 bos 等分布式存储是否可以?

也是不可以的,因为 reduce 可能已经把 shuffle status 拿走,获取的 shuffle status 没有记录 shuffle 数据在分布式存储上。

参考: ExecutorMonitor,ExecutorAllocationManager

Executor 状态的更新

ExecutorMonitor 实现了 SparkListner 接口,当 Job, Stage, Task 等 start 和 end 时,都会执行回调。

以 hasActiveShuffle 为例
每个 executor 用一个集合 shuffleIds 存储其上拥有的 shuffle 数据。 当其为空时,说明没有 shuffle 数据。

在 onTaskEnd 和 onBlockUpdated 时调用 addShuffle 向 shuffleIds 添加数据。

在以下时机删除 shuffleIds 里的数据。

  1. 依赖 driver 端的 ContextCleaner,当 ShuffleRDD 仅有 weakReference 时触发。
  2. rdd.cleanShuffleDependencies 方法,但是此方法仅在 org.apache.spark.ml.recommendation.ALS 使用。

timeoutAt 的计算逻辑

总结:timeoutAt 根据 idle 的时间,spark.dynamicAllocation.cachedExecutorIdleTimeout 和 spark.dynamicAllocation.shuffleTracking.timeout 这 3 个值中最大的值。

详细计算逻辑:
timeoutAt 在一些事件发生时触发计算,如 onBlockUpdated, onUnpersistRDD, updateRunningTasks, removeShuffle, updateActiveShuffles
timeoutAt 的计算逻辑:
当执行器有计算任务时 为 Long.MaxValue。
否则为 max(_cacheTimeout, _shuffleTimeout, idleTimeoutNs)
_cacheTimeout: 如果没有 cache 数据,为0,否则为参数 spark.dynamicAllocation.cachedExecutorIdleTimeout 的值(默认 Long.MaxValue)。

_shuffleTimeout: 如果没有 shuffle数据,为 0, 否则为参数 spark.dynamicAllocation.shuffleTracking.timeout 的值(默认 Long.MaxValue)。
idleTimeoutNs 为 spark.dynamicAllocation.executorIdleTimeout

3. 测试

测试命令

spark-shell  \
 --conf spark.dynamicAllocation.enabled=true \
 --conf spark.dynamicAllocation.initialExecutors=2 \
 --conf spark.dynamicAllocation.maxExecutor=400 \
 --conf spark.dynamicAllocation.minExecutors=1 \
 --conf spark.shuffle.service.enabled=false \
 --conf spark.dynamicAllocation.shuffleTracking.enabled=true

参考资料:

https://www.waitingforcode.com/apache-spark/what-new-apache-spark-3-shuffle-service-changes/read

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1479958.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#用户界面,UI设置密码隐藏显示

两种方法,第一种,在文本框属性设置UseSystemPasswordChar为True。 这里默认是以黑点隐藏显示。 第二种,在文本框属性设置隐藏显示的符号,这里设置为星号*。

探索Android多屏互动技术:构建无缝交互体验

探索Android多屏互动技术:构建无缝交互体验 1. 简介 在当前移动设备和智能家居应用中,多屏互动技术已经成为一个备受关注的话题。随着移动设备(如智能手机、平板电脑)和智能家居设备的普及,用户对于多屏协同工作、娱…

基于springboot实现流浪动物救助网站系统项目【项目源码+论文说明】

基于springboot实现流浪动物救助网站系统演示 摘要 然而随着生活的加快,也使很多潜在的危险日益突显出来,比如在各种地方会发现很多无家可归的、伤痕累累的、可怜兮兮的动物,当碰到这种情况,是否会立马伸出双手去帮助、救助它们&…

重磅功能!EasyBoss ERP正式接入Lazada本土店全托管,赋能商家轻松出海东南亚

近两年,在跨境电商圈出现了一个新的名词——“全托管模式”,随着部分跨境电商巨头借助全托管模式大获成功,全托管在跨境电商领域掀起一股热潮,吸引着越来越多的平台与卖家布局探索。 作为东南亚头部电商平台的Lazada也在2023年4月…

uniapp 安卓保活功能原生插件

插件介绍 安卓保活原生插件,多种技术保活方案大幅提高保活效率,支持多任务app隐藏,息屏保活,清理后继续保活等 插件地址 安卓保活功能原生插件 - DCloud 插件市场 详细使用文档 uniapp 安卓保活功能原生插件 超级福利 uniapp…

基于JAVA的毕业设计分配选题系统 开源项目

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 专业档案模块2.2 学生选题模块2.3 教师放题模块2.4 选题审核模块 三、系统展示四、核心代码4.1 查询专业4.2 新增专业4.3 选择课题4.4 取消选择课题4.5 审核课题 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpri…

四川易点慧电子商务有限公司抖音小店:信赖之选,品质保障

随着电子商务的飞速发展,越来越多的消费者选择在网上购物。四川易点慧电子商务有限公司作为业内佼佼者,其抖音小店自开业以来,就以其卓越的产品品质、完善的售后服务和高效的物流体系,赢得了广大消费者的信赖和好评。 一、品质至上…

远程扫描如何在 Dynamic Web TWAIN 中工作

您是否厌倦了被办公室扫描仪束缚?Dynamic Web TWAIN 版本 18 带来了令人兴奋的远程扫描功能,让您摆脱束缚。借助远程扫描,您现在可以在整个办公网络中共享扫描仪,并通过浏览器从任何设备启动扫描作业。 点击下载Dynamsoft最新版h…

k8s-项目测试环境部署

部署规划 概述 项目开发好后,我们需要部署,我们接下来就基于 阿里云云效 阿里云容器镜像服务 k8s 搭建部署环境 阿里云云效 : 放代码,可以做cicd(https://www.aliyun.com/product/yunxiao) 阿里云容器镜像服务 :…

初学selenuim[1]($x(‘xpath语法’)、WebDriverWait())

文章目录 初学selenuim记录1、执行driver webdriver.Chrome()后很久才打开浏览器2、浏览器多元素定位 $x(‘xpath语法’)3、打开浏览器driver.get("网址")执行了很久才开始定位元素:等待(1)driver.set_page_load_timeout(t)&#…

C语言可以干些什么?C语言主要涉及哪些IT领域?

C语言可以干些什么?C语言主要涉及哪些IT领域? 在开始前我有一些资料,是我根据网友给的问题精心整理了一份「C语言的资料从专业入门到高级教程」, 点个关注在评论区回复“888”之后私信回复“888”,全部无偿共享给大家…

代码随想录算法训练营第五天

● 自己看到题目的第一想法 242. 有效的字母异位词 方法&#xff1a; 方法一&#xff1a; 暴力法 1. 分别对s, t排序 2. 遍历s与t 判断s[i]!t[i] 返回 false 否则 返回true思路&#xff1a; 注意&#xff1a; 代码&#xff1a; bool cmp(char a, char b){return a<b;…

RK3568 Android12 适配抖音 各大APP

RK3568 Android12 适配抖音 各大APP SOC RK3568 system:Android 12 平台要适配抖音和各大APP 平台首先打开抖音发现摄像头预览尺寸不对只存在右上角,我将抖音APP装在手机上预览,发现是全屏 一开始浏览各大博客 给出的解决方法是修改framework 设置为全屏显示: framewo…

Jenkins自动化部署之流水线模式部署

文章目录 任务类型Pipeline流水线项目声明式的Pipeline脚本式Pipeline 示例脚本生成Tools配置示例 高级Pipeline Script from SCM 任务类型 在Jenkins中&#xff0c;有不同类型的任务&#xff08;项目&#xff09;适用于不同的构建需求。以下是一些常见的Jenkins任务类型&…

(vue)el-checkbox 实现展示区分 label 和 value(展示值与选中获取值需不同)

(vue)el-checkbox 实现展示区分 label 和 value&#xff08;展示值与选中获取值需不同&#xff09; 后端数据 解决方法 在 el-checkbox 标签中间传入要展示的文本即可&#xff0c;代码如下&#xff1a; <el-checkbox-groupv-model"formInline.processFieldList"…

章鱼网络 Community Call #18|Omnity 将首先支持 Runes 协议资产跨链

香港时间2024年2月8日12点&#xff0c;章鱼网络举行第18期 Community Call。 2024年&#xff0c;我们打开一个良好的局面&#xff1a;$NEAR Restaking 已经完成第三方审计&#xff0c;并且经过几次迭代&#xff0c;进入了正式稳定运行的阶段。更重要的是&#xff0c;我们宣布了…

基于移动端的食堂助餐在线点餐配送系统 uniapp微信小程序

本文从管理员、老人、配送员、食堂商家的功能要求出发&#xff0c;养老助餐管理系统小程序中的功能模块主要是实现老人、配送员、食堂商家、食堂大厅、预约选座、餐号信息、美食信息、美食订单、订单信息、订单配送、订单评价、老人食堂、下单信息、饮食分析。经过认真细致的研…

免费下载《金融行业数据安全交换解决方案白皮书》

金融行业包括商业银行业务、证券业务、保险业务、基金业务、信托业务等&#xff0c;因此数据类型多种多样&#xff0c;并且数据涉及主体众多&#xff0c;应用场景上较为多样复杂&#xff0c;在数据交换上存在安全、合规、可控、可靠、高效等需求。 金融行业会面临哪些数据安全…

【FPGA/IC】RAM-Based Shift Register Xilinx IP核的使用

前言 一般来讲&#xff0c;如果要实现移位寄存器的话&#xff0c;通常都是写RTL用reg来构造&#xff0c;比如1bit变量移位一个时钟周期就用1个reg&#xff0c;也就是一个寄存器FF资源&#xff0c;而移位16个时钟周期就需要16个FF&#xff0c;这种方法无疑非常浪费资源。 Xili…

Goose:Golang中的数据库迁移工具

Goose&#xff1a;Golang中的数据库迁移工具 在Golang开发中&#xff0c;数据库迁移是一个常见的任务&#xff0c;用于管理数据库模式的演化和版本控制。Goose是一个轻量级的、易于使用的数据库迁移工具&#xff0c;专为Golang开发者设计。本文将介绍Goose的基本概念、用法和优…