【Spark精讲】RDD特性之数据本地化

news2025/1/8 5:38:46

目录

首选运行位置

数据的本地化级别

谁来负责数据本地化

数据本地化执行流程

调优

代码中的设置方法 


首选运行位置

上图红框为RDD的特性五:每个RDD的每个分区都有一组首选运行位置,用于标识RDD的这个分区数据最好能够在哪台主机上运行。通过RDD的首选运行位置可以让RDD的某个分区的计算任务直接在指定的主机上运行,从而实现了移动计算而不是移动数据的目的减少了网络传输的开销,如Spark中HadoopRDD能够实现家装数据的任务在相应的数据节点上执行。

数据的本地化级别

package org.apache.spark.scheduler

import org.apache.spark.annotation.DeveloperApi

@DeveloperApi
object TaskLocality extends Enumeration {
  // Process local is expected to be used ONLY within TaskSetManager for now.
  val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value

  type TaskLocality = Value

  def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
    condition <= constraint
  }
}
  1. PROCESS_LOCAL:进程本地化,表示 task 要计算的数据在同一个 Executor 中。
  2. NODE_LOCAL:节点本地化,速度稍慢,因为数据需要在不同的进程之间传递或从文件中读取。分为两种情况,第一种:task 要计算的数据是在同一个 worker 的不同 Executor 进程中。第二种:task 要计算的数据是在同一个 worker 的磁盘上,或在 HDFS 上恰好有 block 在同一个节点上。如果 Spark 要计算的数据来源于 HDFS 上,那么最好的本地化级别就是 NODE_LOCAL。
  3. NO_PREF:没有最佳位置,数据从哪访问都一样快,不需要位置优先。比如 Spark SQL 从 Mysql 中读取数据。
  4. RACK_LOCAL:机架本地化,数据在同一机架的不同节点上。需要通过网络传输数据以及文件 IO,比 NODE_LOCAL 慢。情况一:task 计算的数据在 worker2 的 EXecutor 中。情况二:task 计算的数据在 work2 的磁盘上。
  5. ANY:跨机架,数据在非同一机架的网络上,速度最慢。

谁来负责数据本地化

val rdd1 = sc.textFile("hdfs://...") 
rdd1.cache()
rdd1.map.filter.count()

上面这段简单的代码,背后其实做什么很多事情。Driver 的 TaskScheduler 在发送 task 之前,首先应该拿到 rdd1 数据所在的位置,rdd1 封装了这个文件所对应的 block 的位置,DAGScheduler 通过调用 getPrerredLocations() 拿到 partition 所对应的数据的位置,TaskScheduler 根据这些位置来发送相应的 task。 

具体的解释:

DAGScheduler 切割Job,划分Stage, 通过调用 submitStage 来提交一个Stage 对应的 tasks,submitStage 会调用 submitMissingTasks, submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用 getPreferrdeLocations() 得到 partition 的优先位置,就是这个 partition 对应的 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个task,该 task 优先位置与其对应的 partition 对应的优先位置一致。

TaskScheduler 接收到了 TaskSet 后,TaskSchedulerImpl 会为每个 TaskSet 创建一个 TaskSetManager 对象,该对象包含taskSet 所有 tasks,并管理这些 tasks 的执行,其中就包括计算 TaskSetManager 中的 tasks 都有哪些 locality levels,以便在调度和延迟调度 tasks 时发挥作用。

总的来说,Spark 中的数据本地化是由 DAGScheduler 和 TaskScheduler 共同负责的。

数据本地化执行流程

第一步:PROCESS_LOCAL

TaskScheduler 根据数据的位置向数据节点发送 task 任务。如果这个任务在 worker1 的 Executor 中等待了 3 秒。(默认的,可以通过spark.locality.wait 来设置),可以通过 SparkConf() 来修改,重试了 5 次之后,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 PROCESS_LOCAL 降到 NODE_LOCAL。

第二步:NODE_LOCAL

TaskScheduler 重新发送 task 到 worker1 中的 Executor2 中执行,如果 task 在worker1 的 Executor2 中等待了 3 秒,重试了 5 次,还是无法执行,TaskScheduler 就会降低数据本地化的级别,从 NODE_LOCAL 降到 RACK_LOCAL。

第三步:RACK_LOCAL

TaskScheduler重新发送 task 到 worker2 中的 Executor1 中执行。

第四步:

当 task 分配完成之后,task 会通过所在的 worker 的 Executor 中的 BlockManager 来获取数据。如果 BlockManager 发现自己没有数据,那么它会调用 getRemote() 方法,通过 ConnectionManager 与原 task 所在节点的 BlockManager 中的 ConnectionManager先建立连接,然后通过TransferService(网络传输组件)获取数据,通过网络传输回task所在节点(这时候性能大幅下降,大量的网络IO占用资源),计算后的结果返回给Driver。这一步很像 shuffle 的文件寻址流程。

调优

TaskScheduler在发送task的时候,会根据数据所在的节点发送task,这时候的数据本地化的级别是最高的,如果这个task在这个Executor中等待了3秒,重试发射了5次还是依然无法执行,那么TaskScheduler就会认为这个Executor的计算资源满了,TaskScheduler会降低一级数据本地化的级别,重新发送task到其他的Executor中执行,如果还是依然无法执行,那么继续降低数据本地化的级别...

如果想让每一个 task 都能拿到最好的数据本地化级别,那么调优点就是等待时间加长。注意!如果过度调大等待时间,虽然为每一个 task 都拿到了最好的数据本地化级别,但是我们 job 执行的时间也会随之延长。

官方参数:Configuration - Spark 3.5.0 Documentation

spark.locality.wait3sHow long to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.0.5.0
spark.locality.wait.nodespark.locality.waitCustomize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).0.8.0
spark.locality.wait.processspark.locality.waitCustomize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.0.8.0
spark.locality.wait.rackspark.locality.waitCustomize the locality wait for rack locality.0.8.0

代码中的设置方法 

new SparkConf.set("spark.locality.wait","100") //默认3秒

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1312908.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SQL中的三值逻辑:TRUE、FALSE 和 UNKNOWN。

在SQL中&#xff0c;通常采用三值逻辑处理条件表达式的真值。这种逻辑是基于三种可能的真值状态&#xff1a;TRUE、FALSE 和 UNKNOWN。 TRUE&#xff08;真&#xff09;&#xff1a; 表示条件为真或成立。 FALSE&#xff08;假&#xff09;&#xff1a; 表示条件为假或不成立。…

迎接数字化,亿发智能ERP管理系统解决方案,助力湖南企业精益生产提效

近年来&#xff0c;湖南省生产企业都尝试调整升级&#xff0c;进行数字化转型。因传统车间的管理业务存在着多方面的问题&#xff0c;如信息传输不畅、缺乏共享和互动功能&#xff0c;以及缺乏先进的统计、分析、预测及决策手段等&#xff0c;直接影响了公司生产效率的提高&…

LLM(六)| Gemini:谷歌Gemini Pro 开放API ,Gemini Pro 可免费使用

近期&#xff0c;Google Gemini Pro 开放API 了&#xff0c;且Gemini Pro 可免费使用&#xff01;Gemini Pro支持全球180个国家的38种语言&#xff0c;目前接受文本作为输入并生成文本作为输出。 Gemini API 地址&#xff1a;http://ai.google.dev Gemini Pro 的表现超越了其他…

shopee数据分析工具:提升电商竞争力,探索Shopee工具的价值

在如今竞争激烈的电商市场中&#xff0c;商家需要利用各种工具和策略来提高自己的竞争力。Shopee作为一家知名的电商平台&#xff0c;为商家提供了一系列强大的数据分析工具&#xff0c;帮助商家更好地了解市场趋势、优化产品策略和提高运营效果。本文将介绍几款常见的Shopee数…

Pearson、Spearman 相关性分析使用

介绍 Pearson 积差相关系数衡量了两个定量变量之间的线性相关程度。 用来衡量两个数据集的线性相关程度&#xff0c;仅当一个变量的变化与另一个变量的比例变化相关时&#xff0c;关系才是线性的。 Spearman等级相关系数则衡量分级定序变量之间的相关程度。斯皮尔曼相关系数不…

【WinForm.NET开发】使用 TableLayoutPanel 在 Windows 窗体上排列控件

本文内容 创建项目在行和列中排列控件使用停靠和定位在单元格内放置控件设置行和列属性使用控件跨越行和列通过在工具箱中双击控件将其插入自动处理溢出通过绘制控件轮廓将其插入单元格内不允许有多个控件交换控件后续步骤 某些应用程序需要这样一个窗体&#xff0c;该窗体的…

C语言之⽂件操作

一为啥需要文件&#xff1f; 如果没有⽂件&#xff0c;我们写的程序的数据是存储在电脑的内存中&#xff0c;如果程序退出&#xff0c;内存回收&#xff0c;数据就丢失了&#xff0c;等再次运⾏程序&#xff0c;是看不到上次程序的数据的&#xff0c;如果要将数据进⾏持久化的保…

docker consul容器自动与注册

微服务&#xff08;容器&#xff09;注册与发现&#xff1a;是一种分布式的管理系统&#xff0c;定位服务的方法。 在传统架构当中&#xff0c;应用程序之间直连到已知服务&#xff0c;设备提供的网络&#xff1a;IP地址&#xff0c;基于tcp/ip&#xff0c;端口&#x…

关于linux 磁盘占用排查问题

1.关于磁盘 查看整体磁盘占用大小 df -h 2. 先排除mysql 数据大小 查询库的大小 SELECT table_schema AS "Database", ROUND(SUM(data_length index_length) / 1024 / 1024, 2) AS "Size (MB)" FROM information_schema.TABLES GROUP BY table_schema…

认识产品经理以及Axure简单安装与入门

目录 一.认识产品经理 1.1.项目团队 1.2.概述 1.3.认识产品经理 1.4.产品经理工作范围 1.5.产品经理工作流程 1.6.产品经理的职责 1.7.产品经理的分类 1.8.产品经理能力要求 1.9.产品工具 1.10.产品体验报告 二.Axure简介 三.应用场景 四.安装与汉化 4.1.安装 4…

Vue3-19-组件-定义和基本使用

组件的定义 个人理解 &#xff1a;1、组件&#xff0c;就是我们把某个功能模块进行封装&#xff0c;在使用时直接引入进行使用&#xff0c;极大的提高了代码的可复用性。2、在vue 中&#xff0c;一个 [.vue] 文件&#xff0c;就是一个组件。3、组件之间存在【引入】 与 【被引…

Vue.js 使用基础知识

Vue.js 是一款用于构建用户界面的渐进式框架&#xff0c;它专注于视图层。Vue.js 不同于传统的 JavaScript 框架&#xff0c;它采用了组件化的开发方式&#xff0c;使得开发者可以更加高效和灵活地构建交互式的 Web 应用程序。 目录 什么是 Vue.js安装 Vue.jsVue 实例模板语法插…

数据分析为何要学统计学(7)——什么问题适合使用t检验?

t检验&#xff08;Students t test&#xff09;&#xff0c;用于通过小样本&#xff08;样本容量n < 30&#xff09;对总体均值水平进行无差异推断。 t检验要求样本不能超过两组&#xff0c;且每组样本总体服从正态分布&#xff08;对于三组以上样本的&#xff0c;要用方差…

获取Java类路径

利用System.getProperty(“java.class.path”)可以获取Java类路径&#xff08;Java class path&#xff09;。 package com.thb;import java.io.IOException;public class Test5 {public static void main(String[] args) throws IOException {System.out.println(System.getP…

MySQL数据库 DDL

目录 一、DDL 二、操作数据库 三、操作表 四、数据类型 五、表操作案例 六、修改表 七、删除表 一、DDL Data Definition Language&#xff0c;数据定义语言&#xff0c;用来定义数据库对象(数据库&#xff0c;表&#xff0c;字段) 。 二、操作数据库 &#xff08;1&am…

Linux学习第47天:Linux音频驱动试验:能不能?不行也得行。

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 CAN 是目前应用非常广泛的现场总线之一&#xff0c;主要应用于汽车电子和工业领域&#xff0c;尤其是汽车 领域&#xff0c;汽车上大量的传感器与模块都是通过 C…

OceanMind海睿思案例入选第二届中国数据治理年会“DCMM百项优秀案例”

近日&#xff0c;中国电子信息行业联合会在北京成功举办“第二届中国数据治理年会”。 本届大会以“数据强基、智领未来”为主题&#xff0c;汇聚我国数据治理领域的资深专家、学者、企业大咖同台论道&#xff0c;共话数据未来的发展与创新。 中新赛克海睿思作为DCMM3级乙方代…

Python自动化批量篆刻Polygon动物铭文$ANTS

铭文介绍 Polygon马蹄链动物主题铭文 A N T S 总量 2100 w 张&#xff0c;当前还剩余 76 ANTS 总量2100w张&#xff0c;当前还剩余76%&#xff0c;成本很低0.003MATIC一张&#xff0c;可以打了防身。 BRC20比特币铭文生态有RATS老鼠大军&#xff0c;PRC20马蹄有ANTS蚂蚁大军&a…

浅析:智能化视频安全监管系统的设计与实现步骤

关于智能化视频监管方案&#xff0c;小编已经和大家分享了很多&#xff0c;今天就和大家来探讨一下关于智能化视频安全监管系统的设计与实现步骤。 首先需要分析需求。要与使用者和业务部门合作&#xff0c;明确系统的功能和需求&#xff0c;例如&#xff0c;确定监控区域、安…

vsftp 使用虚拟用户 —— 筑梦之路

很久之前写过一遍安装vsftp的文章&#xff1a; CentOS 7 vsftpd服务器搭建记录——筑梦之路-CSDN博客 安装一条命令就可以搞定&#xff0c;这里不再赘述。 配置vsftpd.conf # /etc/vsftpd/vsftpd.conf文件修改以下配置#不允许匿名用户认证 anonymous_enableNO #NO表示所有用…