使用Akka的Actor模拟Spark的Master和Worker工作机制

news2025/1/11 11:03:30

使用Akka的Actor模拟Spark的Master和Worker工作机制

Spark的Master和Worker协调工作原理

在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程:

  1. Worker 启动后,会向预先配置的 Master 节点发送注册请求。
  2. Master 接收到注册请求后,会为该 Worker 创建一个唯一的标识符(Worker ID)并将其信息保存在内存中。
  3. Master 向 Worker 发送包含 Master URL、Worker ID 等信息的注册响应。
  4. Worker 收到注册响应后,会启动一个定时器并开始周期性地向 Master 发送心跳消息。
  5. Worker 的心跳消息中包含当前的负载状况、可用资源等信息。
  6. Master 接收到心跳消息后,更新该 Worker 的最近心跳时间,并根据需要对集群进行动态调整,例如添加新的任务或删除故障的 Worker。
  7. 如果 Master 在一段时间内没有收到某个 Worker 的心跳消息,它将把该 Worker 标记为失效,并将其相应的资源标记为可用以供后续使用。

具体原理如下:

  • Worker 通过网络向 Master 发送心跳消息,通常使用基于 TCP 的长连接。这些心跳消息可以包含有关 Worker 健康状况、资源利用情况等的信息。
  • Master 使用一个内部的心跳管理组件来处理接收到的心跳消息,并维护每个 Worker 的状态。它根据心跳消息的频率和时间戳来判断 Worker 是否正常运行。
  • 如果 Master 在预定的时间内没有收到 Worker 的心跳消息,它会将该 Worker 标记为失效并触发一系列的故障处理机制,例如重新分配任务给其他可用的 Worker。
  • Worker 定期发送心跳消息,以确保在网络故障、Worker 故障或其他问题发生时能够及时通知 Master。

通过心跳机制,Master 能够实时监控 Worker 的状态,并根据需要进行集群的动态管理和资源调度,从而实现高可用性和容错性。

使用Akka的Actor模拟Spark的Master和Worker工作机制

  1. worker注册到Master, Master完成注册,并回复worker注册成功。
  2. worker定时发送心跳,并在Master接收到。
  3. Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间。
  4. 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除。
  5. master worker 进行分布式部署(Linux系统)-》如何给maven项目打包->上传linux。
  • 创建SparkMaster类继承Actor特质,实现Receive方法,并定义对应的伴生对象,在伴生对象中创建SparkMaster-actor引用,并启动Actor发送消息。服务端Master对worker进行心跳监测,发现6秒内无法获取worker心跳,将异常的Worker的实例从HashMap中移除。若能正常获取到心跳,则获取心跳信息后更新心跳时间。定时保持心跳机制。

代码实现:

class SparkMaster extends Actor {
  //定义个hashMap,管理workers(所有worker的实例)
  val workers = mutable.Map[String, WorkerInfo]()

  override def receive: Receive = {

    case "start" => {
      println("master服务器启动了...")
      //这里开始。。
      self ! StartTimeOutWorker
    }
    case RegisterWorkerInfo(id, cpu, ram) => {
      //接收到worker注册信息
      if (!workers.contains(id)) {
        //创建WorkerInfo 对象
        val workerInfo = new WorkerInfo(id, cpu, ram)
        //加入到workers
        workers += ((id, workerInfo))
        println("服务器的workers=" + workers)
        //回复一个消息,说注册成功
        sender() ! RegisteredWorkerInfo
      }
    }
    case HeartBeat(id) => {
      //更新对应的worker的心跳时间
      //1.从workers对应的HashMap中取出WorkerInfo,然后更新worker心跳时间
      val workerInfo = workers(id)
      workerInfo.lastHeartBeat = System.currentTimeMillis()
      println("master更新了 " + id + " 心跳时间...")
    }
    case StartTimeOutWorker => {
      println("开始了定时检测worker心跳的任务")
      import context.dispatcher
      //说明
      //1. 0 millis 不延时,立即执行定时器
      //2. 9000 millis 表示每隔3秒执行一次
      //3. self:表示发给自己
      //4. RemoveTimeOutWorker 发送的内容
      context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
    }
    //对RemoveTimeOutWorker消息处理
    //这里需求检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除
    case RemoveTimeOutWorker => {
      //首先将所有的 workers 的 所有WorkerInfo
      val workerInfos = workers.values
      val nowTime = System.currentTimeMillis()
      //先把超时的所有workerInfo,删除即可
      workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeat) > 6000)
        .foreach(workerInfo => workers.remove(workerInfo.id))
      println("当前有 " + workers.size + " 个worker存活的")
    }
  }
}

object SparkMaster {
  def main(args: Array[String]): Unit = {

    //这里我们分析出有3个host,port,sparkMasterActor
    if (args.length != 3) {
      println("请输入参数 host port sparkMasterActor名字")
      sys.exit()
    }

    val host = args(0)
    val port = args(1)
    val name = args(2)

    //先创建ActorSystem
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=${host}
         |akka.remote.netty.tcp.port=${port}
            """.stripMargin)
    val sparkMasterSystem = ActorSystem("SparkMaster", config)
    //创建SparkMaster -actor
    val sparkMasterRef = sparkMasterSystem.actorOf(Props[SparkMaster], s"${name}")
    //启动SparkMaster
    sparkMasterRef ! "start"
  }
}

  • 定义SparkWorker类继承Actor特质,实现Receive方法,在方法中实现向master发送注册信息的请求,获取到服务端Master注册成功的消息后,定义定时任务发送心跳包给Master。
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{
  //masterProxy是Master的代理/引用ref
  var masterPorxy :ActorSelection = _
  val id = java.util.UUID.randomUUID().toString

  override def preStart(): Unit = {
    println("preStart()调用")
    //初始化masterPorxy
    masterPorxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}")
    println("masterProxy=" + masterPorxy)
  }
  override def receive:Receive = {
    case "start" => {
      println("worker启动了")
      //发出一个注册消息
      masterPorxy ! RegisterWorkerInfo(id, 16, 16 * 1024)
    }
    case RegisteredWorkerInfo => {
      println("workerid= " + id + " 注册成功~")
      //当注册成功后,就定义一个定时器,每隔一定时间,发送SendHeartBeat给自己
      import context.dispatcher
      //说明
      //1. 0 millis 不延时,立即执行定时器
      //2. 3000 millis 表示每隔3秒执行一次
      //3. self:表示发给自己
      //4. SendHeartBeat 发送的内容
      context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)

    }
    case SendHeartBeat =>{
      println("worker = " + id + "给master发送心跳")
      masterPorxy ! HeartBeat(id)
    }
  }
}

object SparkWorker {
  def main(args: Array[String]): Unit = {

    if (args.length != 6) {
      println("请输入参数 workerHost workerPort workerName masterHost masterPort masterName")
      sys.exit()
    }

    val workerHost = args(0)
    val workerPort = args(1)
    val workerName = args(2)
    val masterHost = args(3)
    val masterPort = args(4)
    val masterName = args(5)
    val config = ConfigFactory.parseString(
      s"""
         |akka.actor.provider="akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname=${workerHost}
         |akka.remote.netty.tcp.port=${workerPort}
            """.stripMargin)

    //创建ActorSystem
    val sparkWorkerSystem = ActorSystem("SparkWorker",config)

    //创建SparkWorker 的引用/代理
    val sparkWorkerRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort.toInt,masterName)), s"${workerName}")

    //启动actor
    sparkWorkerRef ! "start"
  }
}

  • 分别定义发送注册信息的RegisterWorkerInfo的样例类,WorkerInfo消息类,定义注册成功的消息样例对象RegisteredWorkerInfo,心跳信息样例类HeartBeat,以及确认发送心跳信息样例对象SendHeartBeat,触发超时work的样例对象StartTimeOutWorker,移除超时worker的样例对象RemoveTimeOutWorker。

代码如下:

// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)


// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {
  var lastHeartBeat : Long = System.currentTimeMillis()
}

// 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo

//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发现的协议消息
case class HeartBeat(id: String)

//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息,检测worker,对于心跳超时的.
case object RemoveTimeOutWorker


运行效果:
在这里插入图片描述
通过这个案例我们可以深入理解Spark的Master和Worker的通讯机制,为了方便以后对Spark的底层源码的学习,命名的方式和源码保持一致.(如: 通讯消息类命名就是一样的);同时也加深了我们对主从服务心跳检测机制(HeartBeat)的理解,方便以后spark源码二次开发。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/971983.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Unity】VS Code 没有自动补全 MonoBehaviour 的方法

正常来说,在VS Code 输入类似 OnTriggerEnter2D等方法名时,VS Code会根据已经输入的前缀自动提示相关方法。 在不正常的情况下,根据StackOverFlow上面的回答,依次试过了 安装 .NET SDK安装 .NET Framework Dev PackVS Code安装 …

【ES6】js中的__proto__和prototype

在JavaScript中,__proto__和prototype都是用于实现对象继承的关键概念。 1、proto __proto__是一个非标准的属性,用于设置或获取一个对象的原型。这个属性提供了直接访问对象内部原型对象的途径。对于浏览器中的宿主对象和大多数对象来说,可…

【STM32】学习笔记-PWR(Power Control)电源控制

PWR(Power Control)电源控制 PWR(Power Control)电源控制是一种技术或设备,用于控制电源的开关和输出。它通常用于电源管理和节能,可以通过控制电源的工作状态来延长电子设备的使用寿命,减少能…

SQL注入-盲注 Burp盲注方法

文章目录 判断库名位数Burp 抓取数据包设置payload位置设置payload 1设置payload 2点击开始攻击 判断库名下表名的位数Burp 抓取数据包点击开始攻击 判断库名下第二张表名判断表名下的字段名判断表中具体数据 什么是盲注? 有时目标存在注入,但在页面上没…

Unity中Shader的UV扭曲效果的实现

文章目录 前言一、实现的思路1、在属性面板暴露一个 扭曲贴图的属性2、在片元结构体中,新增一个float2类型的变量,用于独立存储将用于扭曲的纹理的信息3、在顶点着色器中,根据需要使用TRANSFORM_TEX对Tilling 和 Offset 插值;以及…

使用QT操作Excel 表格的常用方法

VBA 简介 Microsoft Office软件通常使用VBA来扩展Windows的应用程序功能,Visual Basic for Applications(VBA)是一种Visual Basic的一种宏语言。 在VBA的参考手册中就可以看到具体函数、属性的用法,Qt操作Excel主要通过 QAxObj…

Redis 缓存预热+缓存雪崩+缓存击穿+缓存穿透

面试题: 缓存预热、雪萌、穿透、击穿分别是什么?你遇到过那几个情况?缓存预热你是怎么做的?如何造免或者减少缓存雪崩?穿透和击穿有什么区别?他两是一个意思还是载然不同?穿适和击穿你有什么解…

【每日一题】1041. 困于环中的机器人

1041. 困于环中的机器人 - 力扣(LeetCode) 在无限的平面上,机器人最初位于 (0, 0) 处,面朝北方。注意: 北方向 是y轴的正方向。南方向 是y轴的负方向。东方向 是x轴的正方向。西方向 是x轴的负方向。 机器人可以接受下列三条指令之…

入门力扣自学笔记277 C++ (题目编号:42)(动态规划)

42. 接雨水 题目: 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图,计算按此排列的柱子,下雨之后能接多少雨水。 示例 1: 输入:height [0,1,0,2,1,0,1,3,2,1,2,1] 输出:6 解释:上面是由数组…

时序预测 | MATLAB实现CNN-BiLSTM卷积双向长短期记忆神经网络时间序列预测(风电功率预测)

时序预测 | MATLAB实现CNN-BiLSTM卷积双向长短期记忆神经网络时间序列预测(风电功率预测) 目录 时序预测 | MATLAB实现CNN-BiLSTM卷积双向长短期记忆神经网络时间序列预测(风电功率预测)预测效果基本介绍程序设计参考资料 预测效果…

小白学go基础03-了解Go项目的项目结构

我们先来看看第一个Go项目——Go语言自身——的项目结构是什么样的。Go项目的项目结构自1.0版本发布以来一直十分稳定,直到现在Go项目的顶层结构基本没有大的改变。 截至Go项目commit 1e3ffb0c(2019.5.14),Go1.0 项目结构如下&am…

js+vue,前端关于页面滚动让头部菜单淡入淡出实现原理

今天遇到个需求:我这里借用小米商城的详情页做个比喻吧。 刚开始其商品详情页是这样的: 当滚动到一定高度时,是这样的: 可以看到当滚动到轮播图底下的时候,详情页的菜单完全显现出来。 以下上代码: HTML…

uniApp webview 中调用底座蓝牙打印功能异常

背景: 使用uniApp, 安卓底座 webView 方式开发; 调用方式采用H5 向 底座发送消息, 底座判断消息类型, 然后连接打印机进行打印; 内容通过指令集方式传递给打印机; 过程当中发现部分标签可以正常打印, 但又有部分不行,打印机没反应, 也没有报错; 原因分析: 对比标签内容…

医院安全(不良)事件上报系统源码 不良事件报告平台源码 前后端分离,支持二开

医院安全(不良)事件上报系统源码 系统定义: 规范医院安全(不良)事件的主动报告,增强风险防范意识,及时发现医院不良事件和安全隐患,将获取的医院安全信息进行分析反馈,…

“金融级”数字底座:从时代的“源启”,到“源启”的时代

今年初《数字中国建设整体布局规划》正式发布,这代表着数字中国建设迈向了实质的落地阶段,其背后的驱动就是遍及各行各业的数字化转型。 千姿百态、复杂多样的应用场景,可以看作是遍布数字中国的“点”;千行百业、各种类型的行业…

阿里云2核4G服务器5M带宽5年费用价格明细表

阿里云2核4G服务器5M带宽可以选择轻量应用服务器或云服务器ECS,轻量2核4G4M带宽服务器297元一年,2核4G云服务器ECS可以选择计算型c7、c6或通用算力型u1实例等,买5年可以享受3折优惠,阿腾云分享阿里云服务器2核4G5M带宽五年费用表&…

【系统编程】线程池以及API接口简介

(꒪ꇴ꒪ ),Hello我是祐言QAQ我的博客主页:C/C语言,数据结构,Linux基础,ARM开发板,网络编程等领域UP🌍快上🚘,一起学习,让我们成为一个强大的攻城狮&#xff0…

非比较排序——计数排序

本章gitee代码:计数排序 文章目录 🍇0. 前言🍈1. 思路🍉2. 代码实现🍊3. 优势与缺陷🍋4. 其他的非比较排序🫴桶排序🫴基数排序 🍇0. 前言 传统的排序方法通常需要逐个比…

嵌入式Linux开发实操(十五):nand flash接口开发(2)

通用NAND驱动程序支持几乎所有基于NAND的芯片,并将它们连接到Linux内核的内存技术设备(MTD)子系统。这个接口走的是nand的并口,可以在shell的/dev中看到设备,比如/mtd0、/mtd0ro…,mtdblock0、mtdblock1… sysfs在设备层次结构中提供了几个视角。设备必须挂在某条总线bus…

Linux相关指令(下)

cat指令 查看目标文件的内容 常用选项&#xff1a; -b 对非空输出行编号 -n 对输出的所有行编号 -s 不输出多行空行 一个重要思想&#xff1a;linux下一切皆文件&#xff0c;如显示器文件&#xff0c;键盘文件 cat默认从键盘中读取数据再打印 退出可以ctrlc 输入重定向<…