深入理解Spark BlockManager:定义、原理与实践

news2024/9/24 15:21:22

深入理解Spark BlockManager:定义、原理与实践

1.定义

Spark是一个开源的大数据处理框架,其主要特点是高性能、易用性以及可扩展性。在Spark中,BlockManager是其核心组件之一,它负责管理内存和磁盘上的数据块,并确保这些数据块在集群中的各个节点上可以高效地共享和访问,其中包括存储、复制、序列化和反序列化数据块,并且负责将这些数据块分发到集群中的各个节点上,以便进行计算。BlockManager还处理数据块的缓存和回收,以及故障恢复和数据迁移等任务。

因为Spark是分布式的计算引擎,因此BlockManager也是一个分布式组件,各个节点(Executor)上都有一个BlockManger实例,管理着当前Executor的数据及元数据进行处理及维护,比如我们常说的block块的增删改的操作,都会在BlockManager上做相应的元素局的变更。而Executor上的BlockManager实例是由Driver端上的BlockManagerMaster统一管理,其关系类似于我们常说的NameNode和DataNode之间的关系。我们知道Spark本身有很多的模块,比如Scheduler调度模块,Standalone资源管理模块等,而BlockManager就是其中非常重要的模块,其源码量也是非常的巨大。总而言之,spark BlockManager是负责Spark上所有的数据的存储与管理的一个极其重要的组件。

2.原理分析

2.1 数据块的管理

在Spark中,每个数据块都有唯一的标识符,称为BlockId。BlockManager通过维护数据块的元数据来管理这些数据块,包括数据块的类型、大小、版本号、所在节点等信息。当一个节点需要访问一个数据块时,它会向BlockManager发送请求,BlockManager根据数据块的标识符和元数据来定位数据块所在的节点,并返回数据块的引用。

sealed abstract class BlockId {
  // 全局唯一的block的名字
  def name: String

  // convenience methods
  def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None

  // 一下判断不同类型的Block,可能是RDD、Shuffle、Broadcast之一
  def isRDD: Boolean = isInstanceOf[RDDBlockId]

  def isShuffle: Boolean = {
    (isInstanceOf[ShuffleBlockId] || isInstanceOf[ShuffleBlockBatchId] ||
     isInstanceOf[ShuffleDataBlockId] || isInstanceOf[ShuffleIndexBlockId])
  }

  def isShuffleChunk: Boolean = isInstanceOf[ShuffleBlockChunkId]

  def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]

  override def toString: String = name
}

2.2 数据块的存储

我们知道,Spark中的数据块可以存在于内存或磁盘中,对于小数据块,BlockManager会优先将其存储在内存中,以提高访问速度;对于大数据块,则会将其存储在磁盘上。BlockManager还支持将数据块存储在外部存储系统中,如HDFS、S3等。

class StorageLevel private(
    // 磁盘
private var _useDisk: Boolean,
// 内存
private var _useMemory: Boolean,
// 堆外内存
private var _useOffHeap: Boolean,
// 是否序列化
private var _deserialized: Boolean,
// block默认副本
private var _replication: Int = 1)
extends Externalizable

2.3 数据块的复制

为了保证数据块的可靠性和高可用性,BlockManager会自动将一些数据块复制到其他节点上,以免数据丢失或节点故障导致数据无法访问。复制策略可以根据具体需求进行配置,例如可以设置副本数、复制间隔、复制位置等。

2.4 数据块的序列化和反序列化

在Spark中,数据块经常需要在不同的节点之间传输和共享,因此需要进行序列化和反序列化。BlockManager提供了常用的序列化和反序列化方式,包括Java序列化、Kryo序列化等。

图片

2.5 数据块的缓存和回收

为了提高计算效率,BlockManager还支持将一些常用的数据块缓存在内存中,以避免频繁地从磁盘或外部存储系统中读取数据块。同时,BlockManager还会定期清除一些不再使用的数据块,以释放资源。

2.6 故障恢复和数据迁移

当一个节点出现故障或者网络出现问题时,BlockManager会自动进行故障恢复,将丢失的数据块重新复制到其他节点上。此外,在集群扩容或缩容时,BlockManager还支持数据迁移,以保证数据块的平衡分布。 

2.7 运行原理图

图片

3.代码解读

Spark的BlockManager主要由以下两个类实现:

BlockManagerMaster:负责管理集群中所有节点的BlockManager,并协调各个节点之间的数据块复制和迁移等操作。

BlockManager:负责管理本地节点的数据块,包括数据块的存储、缓存、序列化和反序列化等操作。

接下来,我们重点分析BlockManager,BlockManager的代码主要位于Spark的存储模块中。以下是BlockManager的主要代码结构:

  • BlockManagerMaster:这是BlockManager的主节点,它负责管理所有的数据块。BlockManagerMaster会与每个工作节点上的BlockManager进行通信,了解每个数据块的位置和状态。

  • BlockManagerWorker:这是BlockManager的工作节点,它负责管理本地的数据块。BlockManagerWorker会与BlockManagerMaster进行通信,报告本地数据块的状态。

  • BlockInfo:这是表示一个数据块的信息,包括数据块的大小、位置、副本数等。

  • BlockManager:这是实际执行数据块管理操作的类,它提供了读取、写入、删除数据块的方法。

下面是BlockManager的关键代码解析:


class BlockManager(
  executorId: String,
  rpcEnv: RpcEnv,
  val master: BlockManagerMaster,
  val defaultSerializer: Serializer,
  val conf: SparkConf,
  memoryManager: MemoryManager,
  mapOutputTracker: MapOutputTracker,
  shuffleManager: ShuffleManager,
  blockTransferService: BlockTransferService,
  securityManager: SecurityManager,
  numUsableCores: Int)
extends BlockDataManager with Logging {

  // 存储所有的Block
  private val blocks = new ConcurrentHashMap[BlockId, BlockInfo]

  // 存储所有正在读取中的Block
  private val activeReads = new ConcurrentHashMap[BlockId, BlockFetchingState]

  // 存储所有正在写入中的Block
  private val activeWrites = new ConcurrentHashMap[BlockId, BlockOutputStream]

  // 存储所有已经删除的Block
  private val deadBlocks = new ConcurrentHashMap[BlockId, Long]

  // 存储所有已经接收到的Block
  private val receivedBlockTracker = new ReceivedBlockTracker

  // 存储所有已经丢失的Block
  private val blockReplicationPolicy = BlockManager.getReplicationPolicy(conf, master)
  private val blockTracker = new BlockTracker(blockReplicationPolicy)
  private val lostBlocks = new ConcurrentHashMap[BlockId, ArrayBuffer[BlockManagerId]]

  // 存储所有已经被缓存的Block
  private val cachedBlocks = new ConcurrentHashMap[BlockId, CachedBlock]

  // BlockManager的内存管理器
  private val memoryStore =
    new MemoryStore(conf, memoryManager, this, blockInfoManager)

  // BlockManager的磁盘管理器
  private val diskStore = new DiskStore(conf, this, diskBlockManager)

  // BlockManager的块传输服务
  private val blockTransferService =
    new NettyBlockTransferService(conf, securityManager, numUsableCores)

  // BlockManager的块上传服务
  private val blockUploadHandler = new BlockUploadHandler(this)

  // BlockManager的块下载服务
  private val blockDownloader = new BlockDownloader(blockTransferService, this)

  // BlockManager的安全管理器
  private val blockTransferServiceServer =
    blockTransferService.initServer(rpcEnv, blockUploadHandler, blockDownloader)

  // BlockManager的Shuffle管理器
  private val shuffleBlockResolver = new ShuffleBlockResolver(conf)

  // BlockManager的Shuffle上传服务
  private val shuffleUploadHandler = new ShuffleUploadHandler(this, shuffleBlockResolver)

  // BlockManager的Shuffle下载服务
  private val shuffleDownloader = new ShuffleDownloader(blockTransferService, this)

  // BlockManager的Shuffle管理器
  private val shuffleServerId = SparkEnv.get.blockManager.blockManagerId.shuffleServerId

  // BlockManager的Shuffle服务
  private val shuffleService =
    new NettyShuffleService(shuffleServerId, conf, securityManager, shuffleUploadHandler,
      shuffleDownloader)

  // BlockManager的Metrics
  private val metricsSystem = SparkEnv.get.metricsSystem
  private val numBlocksRegistered = metricsSystem.counter("blocks.registered")
  private val numBlocksRemoved = metricsSystem.counter("blocks.removed")

  // 启动BlockManager的各个服务
  blockTransferService.init(clientMode = false)
  blockTransferServiceServer.start()
  shuffleService.start()

  // BlockManager的ID
  val blockManagerId = BlockManagerId(executorId, blockTransferService.hostName, rpcEnv.address.port)

代码中,BlockManager主要包括以下几个部分:

  • 存储结构:使用ConcurrentHashMap存储所有的Block、正在读取中的Block、正在写入中的Block、已经删除的Block、已经接收到的Block、已经缓存的Block以及已经丢失的Block等信息。

  • 内存管理器和磁盘管理器:内存管理器负责将小的数据块存储在内存中,而磁盘管理器则负责将大的数据块存储在磁盘上。

  • 块传输服务:负责处理节点之间的数据块传输,例如上传、下载和复制等操作。

  • Shuffle管理器:负责处理Spark的Shuffle操作,包括Shuffle数据的存储和传输等。

Metrics:用于收集BlockManager的各种指标,如已注册的Block数、已删除的Block数等。

4.案例分析

下面以WordCount为例,演示BlockManager在Spark中的使用过程:

val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)

val lines = sc.textFile("data.txt")
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

wordCounts.collect().foreach(println)

在这个例子中,首先调用textFile方法读取文本文件,并将其划分为多个Block。然后,使用flatMap和map方法对每个Block中的文本进行处理,最后使用reduceByKey方法将相同的单词进行合并。在这个过程中,BlockManager扮演着重要的角色,它负责管理所有的Block,并确保它们可以高效地共享和访问。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1439355.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

通过docker-compose部署NGINX服务,并使该服务开机自启

要在通过docker-compose部署的NGINX服务实现开机自启,你需要确保Docker守护进程在系统启动时自动运行,并配置docker-compose.yml文件以在容器中运行NGINX服务。以下是步骤: 确保Docker守护进程开机启动: 在Ubuntu/Debian上&#x…

Spring IoC容器(四)容器、环境配置及附加功能

本文内容包括容器的Bean 及 Configuration 注解的使用、容器环境的配置文件及容器的附加功能(包括国际化消息、事件发布与监听)。 1 容器配置 在注解模式下,Configuration 是容器核心的注解之一,可以在其注解的类中通过Bean作用…

32USART串口

目录 一.通信接口 二.时序 三.USART简介 ​编辑四.数据帧 五.起始位侦测和采样位置对齐 &波特率计算 六.相关函数 七.编码格式设置 (1) UTF-8编码(有的软件兼容性不好)​编辑 (2)GB2312编码 八.…

【Nicn的刷题日常】之有序序列合并

1.题目描述 描述 输入两个升序排列的序列,将两个序列合并为一个有序序列并输出。 数据范围: 1≤�,�≤1000 1≤n,m≤1000 , 序列中的值满足 0≤���≤30000 0≤val≤30000 输入描述…

前端异步相关知识总结

目录 一、同步和异步简介 同步(按顺序执行) 异步(不按顺序执行) 异步出现的原因和需求 二、实现异步的方法 回调函数 Promise 生成器Generators/ yield async await 三、promise和 async await 区别 概念 两者的区别 …

07-Java桥接模式 ( Bridge Pattern )

Java桥接模式 摘要实现范例 桥接模式(Bridge Pattern)是用于把抽象化与实现化解耦,使得二者可以独立变化 桥接模式涉及到一个作为桥接的接口,使得实体类的功能独立于接口实现类,这两种类型的类可被结构化改变而互不影…

实践动物姿态估计,基于最新YOLOv8全系列【n/s/m/l/x】参数模型开发构建公共场景下行人人员姿态估计分析识别系统

姿态估计(PoseEstimation)在我们前面的相关项目中涉及到的并不多,CV数据场景下主要还是以目标检测、图像识别和分割居多,最近正好项目中在使用YOLO系列最新的模型开发项目,就想着抽时间基于YOLOv8也开发构建实现姿态估…

Open CASCADE学习|创建多段线与圆

使用Open CASCADE Technology (OCCT)库来创建和显示一些2D几何形状。 主要过程如下: 包含头文件:代码首先包含了一些必要的头文件,这些头文件提供了创建和显示几何形状所需的类和函数。 定义变量:在main函数中,定义…

如何查看端口映射?

端口映射是一种用于实现远程访问的技术。通过将外网端口与内网设备的特定端口关联起来,可以使外部网络用户能够通过互联网访问内部网络中的设备和服务。在网络中使用端口映射可以解决远程连接需求,使用户能够远程访问设备或服务,无论是在同一…

JAVA生产使用登录校验模式

背景 目前我们的服务在用户登录时,会先通过登录接口进行密码校验。一旦验证成功,后端会利用UUID生成一个独特的令牌(token),并将其存储在Redis缓存中。同时,前端也会将该令牌保存在本地。在后续的接口请求…

常用对象和常用成员函数

常量对象与常量成员函数来防止修改对象,实现最低权限原则。 在Obj被定义为常量对象的情况下,下面这条语句是错误的。 错误的原因是常量对象一旦初始化后,其值就再也不能改变。因此,不能通过常量对象调用普通成员函数,因…

海外云手机的核心优势

随着5G时代的到来,云计算产业正处于高速发展的时期,为海外云手机的问世创造了一个可信任的背景。在资源有限且需求不断增加的时代,将硬件设备集中在云端,降低个人用户的硬件消耗,同时提升性能,这一点单单就…

得物自研API网关实践之路

一、业务背景 老网关使用 Spring Cloud Gateway (下称SCG)技术框架搭建,SCG基于webflux 编程范式,webflux是一种响应式编程理念,响应式编程对于提升系统吞吐率和性能有很大帮助; webflux 的底层构建在netty之上性能表…

广度优先搜索(BFS)

力扣刷题之旅:进阶篇(二) 继续我的力扣刷题之旅,我在进阶篇的第一部分中深入探索了BFS(广度优先搜索)算法,并感受到了它在图形搜索中的强大威力。现在,我进入了进阶篇的第二部分&am…

百卓Smart管理平台 uploadfile.php 文件上传漏洞复现(CVE-2024-0939)

0x01 产品简介 百卓Smart管理平台是北京百卓网络技术有限公司(以下简称百卓网络)的一款安全网关产品,是一家致力于构建下一代安全互联网的高科技企业。 0x02 漏洞概述 百卓Smart管理平台 uploadfile.php 接口存在任意文件上传漏洞。未经身份验证的攻击者可以利用此漏洞上传…

单片机无线发射的原理剖析

目录 一、EV1527编码格式 二、OOK&ASK的简单了解 三、433MHZ 四、单片机的地址ID 五、基于STC15W104单片机实现无线通信 无线发射主要运用到了三个知识点:EV1527格式;OOk;433MHZ。下面我们来分别阐述: EV1527是数据的编…

Stable Diffusion 模型下载:Samaritan 3d Cartoon SDXL(撒玛利亚人 3d 卡通 SDXL)

文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八案例九案例十 下载地址 模型介绍 由“PromptSharingSamaritan”创作的撒玛利亚人 3d 卡通类型的大模型,该模型的基础模型为 SDXL 1.0。 条目内容类型大模型基础模型SDXL 1.0来源CIVITA…

IDEA创建Java类时自动添加注释(作者、年份、月份)

目录 IDEA创建Java类时自动添加注释(作者、年份、月份)如图: IDEA创建Java类时自动添加注释(作者、年份、月份) 简单记录下,IDEA创建Java类时自动添加注释(作者、年份、月份)&#…

@PostMapping/ @GetMapping等请求格式

目录 1.只传一个参数的 第一种 第二种 第三种:表单 2.传整个对象的 2.1修改实体类就是传整个对象过来 2.2新增实体类就是传整个对象过来新增 1.只传一个参数的 第一种 PostMapping("/add/{newsId}")public Result addOne(PathVariable Integer newsId) {}pos…

Spring + Tomcat项目中nacos配置中文乱码问题解决

实际工作的时候碰到了nacos中文乱码的问题,一顿排查最终还是调源码解决了。下面为具体的源码流程,有碰到的可以参考下。 对于nacos配置来说,初始主要源码就在NacosConfigService类中。里面有初始化获取配置content以及设置对应监听器的操作。…