【深入浅出 Yarn 架构与实现】4-5 RM 行为探究 - 启动 ApplicationMaster

news2025/1/17 1:06:06

本节开始,将对 ResourceManager 中一些常见行为进行分析探究,看某些具体关键的行为,在 RM 中是如何流转的。本节将深入源码探究「启动 ApplicationMaster」的具体流程。

一、整体流程

本小节介绍从应用程序提交到启动 ApplicationMaster 的整个过程,期间涉及 Client、RMService、 RMAppManager、RMApplmpl、RMAppAttemptImpl、RMNode、ResourceScheduler 等几个主要组件。当客户端调用 RPC 函数 ApplicationClientProtocol#submitApplication 后, ResourceManager 端的处理过程如下图所示。
image.png

二、具体流程分析

接下来跟随上面的流程图,我们深入源码具体分析每一步都是如何执行的:
最开始由客户端发起任务提交 submitApplication(),经过 ClientRMServiceRMAppManager 发送 RMAppEventType.START 事件,之后交由 RMAppImpl 处理。

  protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      String user) throws YarnException {
    ApplicationId applicationId = submissionContext.getApplicationId();

    RMAppImpl application =
        createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
    Credentials credentials = null;
    try {
      credentials = parseCredentials(submissionContext);
      if (UserGroupInformation.isSecurityEnabled()) {
        this.rmContext.getDelegationTokenRenewer()
            .addApplicationAsync(applicationId, credentials,
                submissionContext.getCancelTokensWhenComplete(),
                application.getUser());
      } else {
        // Dispatcher is not yet started at this time, so these START events
        // enqueued should be guaranteed to be first processed when dispatcher
        // gets started.
        // 这里发送 RMAppEventType.START 事件
        this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppEvent(applicationId, RMAppEventType.START));
      }

RMAppImpl 这东西是个状态机,收到事件之后会自己转换状态并且处理相应的逻辑。
(状态机还不熟悉的同学,可翻到我前面的文章进行学习《2-4 Yarn 基础库 - 状态机库》)
image.png

截取一部分状态转换代码:

  private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    .addTransition(RMAppState.NEW, RMAppState.NEW,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
     // 收到 RMAppEventType.START 事件
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
            RMAppState.KILLED, RMAppState.FINAL_SAVING),
        RMAppEventType.RECOVER, new RMAppRecoveredTransition())
    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
        new AppKilledTransition())
    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(new AppRejectedTransition(),
          RMAppState.FAILED))

一)RMAppImpl - START

收到 RMAppEventType.START 事件之后,会执行 RMAppNewlySavingTransition()

  private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }

跟下去会发现它发出 RMStateStoreEventType.STORE_APP 事件,去 RMStateStore 中找一下对应的事件处理。发现也是个状态机:

.addTransition(RMStateStoreState.ACTIVE,
    EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
    RMStateStoreEventType.STORE_APP, new StoreAppTransition())

跟着 StoreAppTransition 看看做了啥(发送 RMAppEventType.APP_NEW_SAVED 事件)

  private static class StoreAppTransition
      implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreAppEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      ApplicationStateData appState =
          ((RMStateStoreAppEvent) event).getAppState();
      ApplicationId appId =
          appState.getApplicationSubmissionContext().getApplicationId();
      LOG.info("Storing info for app: " + appId);
      try {
        store.storeApplicationStateInternal(appId, appState);
        // 这里发送了 RMAppEventType.APP_NEW_SAVED 事件
        store.notifyApplication(new RMAppEvent(appId,
               RMAppEventType.APP_NEW_SAVED));
      } catch (Exception e) {
        LOG.error("Error storing app: " + appId, e);
        isFenced = store.notifyStoreOperationFailedInternal(e);
      }
      return finalState(isFenced);
    };
  }

二)RMAppImpl - APP_NEW_SAVED

我们再回到 RMAppImpl,找到对应的状态转移逻辑。

    // 刚刚我们的状态是 NEW_SAVING,收到了 APP_NEW_SAVED 事件,执行 AddApplicationToSchedulerTransition() 后,转换为 SUBMITTED 状态
    .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())

AddApplicationToSchedulerTransition() 中会发送 SchedulerEventType.APP_ADDED 事件。之后 RMAppImpl 转换为 RMAppState.SUBMITTED 状态。
SchedulerEventType.APP_ADDED 会被多个事件处理器捕获处理:
1)ResourceSchedulerWrapper 事件处理器,仅记录

      } else if (schedulerEvent.getType() == SchedulerEventType.APP_ADDED
          && schedulerEvent instanceof AppAddedSchedulerEvent) {
        AppAddedSchedulerEvent appAddEvent =
                (AppAddedSchedulerEvent) schedulerEvent;
        String queueName = appAddEvent.getQueue();
        appQueueMap.put(appAddEvent.getApplicationId(), queueName);
      }

2)各个 AbstractYarnScheduler 的实现类。以 CapacityScheduler 为例:
执行 addApplication()

    case APP_ADDED:
    {
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      String queueName = resolveReservationQueueName(appAddedEvent.getQueue(),
          appAddedEvent.getApplicationId(), appAddedEvent.getReservationID(),
          appAddedEvent.getIsAppRecovering());
      if (queueName != null) {
        if (!appAddedEvent.getIsAppRecovering()) {
          addApplication(appAddedEvent.getApplicationId(), queueName,
              appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
        } else {
          addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,
              appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority());
        }
      }
    }

addApplication() 中会提交 Application 并发送 RMAppEventType.APP_ACCEPTED 事件。

	queue.submitApplication(applicationId, user, queueName);
    rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));

三)RMAppImpl - APP_ACCEPTED(重点)

继续回到 RMAppImpl,执行 StartAppAttemptTransition(),创建 newAttempt,发送事件RMAppAttemptEventType.START

    .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
  private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.createAndStartNewAttempt(false);
    };
  }
  private void
      createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
    createNewAttempt();
    handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
      transferStateFromPreviousAttempt));
  }

RMAppAttemptImpl 中会捕获这个事件,执行 AttemptStartedTransition(),其中会发送 SchedulerEventType.APP_ATTEMPT_ADDED 事件,由 AbstractYarnScheduler 实现类处理

      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
          RMAppAttemptEventType.START, new AttemptStartedTransition())

如在 CapacityScheduler 中由 addApplicationAttempt 处理,会提交 ApplicationAttempt,并发送 RMAppAttemptEventType.ATTEMPT_ADDED 事件

private synchronized void addApplicationAttempt() {
    // 提交 attempt
	queue.submitApplicationAttempt(attempt, application.getUser());
    // 发送 RMAppAttemptEventType.ATTEMPT_ADDED 事件
	rmContext.getDispatcher().getEventHandler().handle(
    		new RMAppAttemptEvent(applicationAttemptId,
            RMAppAttemptEventType.ATTEMPT_ADDED));
}

RMAppAttemptImpl 收到 event 后继续处理,在 ScheduleTransition 会 allocate am container 资源。

      .addTransition(RMAppAttemptState.SUBMITTED, 
          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                     RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.ATTEMPT_ADDED,
          new ScheduleTransition())
        // AM resource has been checked when submission
        Allocation amContainerAllocation =
            appAttempt.scheduler.allocate(
                appAttempt.applicationAttemptId,
                Collections.singletonList(appAttempt.amReq),
                EMPTY_CONTAINER_RELEASE_LIST,
                amBlacklist.getBlacklistAdditions(),
                amBlacklist.getBlacklistRemovals(), null, null);

ResourceScheduler 将资源返回给它之前,会向 RMContainerlmpl 发送一个 RMContainerEventType.ACQUIRED 事件。
RMContainerImpl 接到 RMContainerEventType.START,发送 RMAppAttemptEventType.CONTAINER_ALLOCATED 事件。

    .addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED,
        RMContainerEventType.START, new ContainerStartedTransition())
  private static final class ContainerStartedTransition extends
      BaseTransition {

    @Override
    public void transition(RMContainerImpl container, RMContainerEvent event) {
      container.eventHandler.handle(new RMAppAttemptEvent(
          container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
    }
  }

又回到RMAppAttemptImpl 后续状态机,执行 AMContainerAllocatedTransition,在其中又一次为 am allocate,和上一个状态中 allocate 仅参数不同,没搞懂为啥。这里如果发现 allocate container 资源还是 0,会退回上一步,状态还是 RMAppAttemptState.SCHEDULED 等待再次获取资源。如果正常获取到了资源,就会转为 RMAppAttemptState.ALLOCATED_SAVING 状态。

      .addTransition(RMAppAttemptState.SCHEDULED,
          EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,
            RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.CONTAINER_ALLOCATED,
          new AMContainerAllocatedTransition())
      Allocation amContainerAllocation =
          appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
            EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
            null, null, null);

日志记录完成后,RMStateStoreRMAppAttemptImpl 发送 RMAppAttemptEventType.ATTEMPT_NEW_SAVED 事件。
RMAppAttemptImpl 后续向 ApplicationMasterLauncher 发 送 AMLauncherEventType.LAUNCH 事件(实际执行是在 AMLauncher 中),并将状态从 ALLOCATED_SAVING 转移为 ALLOCATED。

      .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
          RMAppAttemptState.ALLOCATED,
          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())

ApplicationMasterLauncher 收到 AMLauncherEventType.LAUNCH 事件后,会将该事件放到事件队列中,等待 AMLauncher 线程池中的线程处理该事件。它将与对应的 NodeManager 通信,启动 ApplicationMaster,一旦成功启动后,将向 RMAppAttemptImpl 发送 RMAppAttemptEventType.LAUNCHED 事件。

  public void run() {
    switch (eventType) {
    case LAUNCH:
      try {
        LOG.info("Launching master" + application.getAppAttemptId());
        launch();
        handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
            RMAppAttemptEventType.LAUNCHED));

RMAppAttemptImpl 收到 RMAppAttemptEventType.LAUNCHED 事件后,会向 AMLivelinessMonitor 注册,以监控运行状态。RMAppAttemptImpl 状态从 ALLOCATED 转移为 LAUNCHED

之后,NodeManager 通过心跳机制汇报 ApplicationMaster 所在 Container 已经成功启动,收到该信息后,ResourceScheduler 将发送一个 RMContainerEventType.LAUNCHED 事件,RMContainerImpl 收到该事件后,会从 ContainerAllocationExpirer 监控列表中移除。

启动的 ApplicationMaster 通过RPC 函数 ApplicationMasterProtocol#registerApplicationMaster 向 ResourceManager 注册,ResourceManager 中的 ApplicationMasterService 服务接收到该请求后,发送 RMAppAttemptEventType.REGISTERED 事件。

// ApplicationMasterService#registerApplicationMaster

	LOG.info("AM registration " + applicationAttemptId);
      this.rmContext
        .getDispatcher()
        .getEventHandler()
        .handle(
          // 这里发送 RMAppAttemptEventType.REGISTERED 事件
          new RMAppAttemptRegistrationEvent(applicationAttemptId, request
            .getHost(), request.getRpcPort(), request.getTrackingUrl()));

RMAppAttemptImpl 收到该事件后,首先保存该 ApplicationMaster 的基本信息(比如所在 host、启用的 RPC 端口号等),然后向 RMApplmpl 发送一个 RMAppEventType.ATTEMPT_REGISTERED 事件。RMAppAttemptImpl 状态从 LAUNCHED 转移为 RUNNING

      .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
          RMAppAttemptEventType.REGISTERED, REGISTERED_TRANSITION)
// AMRegisteredTransition
	appAttempt.eventHandler.handle(new RMAppEvent(appAttempt
          .getAppAttemptId().getApplicationId(),
          RMAppEventType.ATTEMPT_REGISTERED));

四)RMAppImpl - ATTEMPT_REGISTERED

RMAppImpl 收到 RMAppEventType.ATTEMPT_REGISTERED 事件后,将状态从 ACCEPTED 转换为 RUNNING。

    .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
        RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
            YarnApplicationState.RUNNING))

到这里,启动 ApplicationMaster 的整体流程分析完毕!

三、总结

本篇文章分析了从应用程序提交到启动 ApplicationMaster 的整个过程,分析具体过程看的可能会有些繁琐。但只要抓住核心本质,就很容易捋清楚。重点就是事件处理和状态机,这两个部件理解清楚,就很容易看明白程序的流转。
实际逻辑无非就是几个服务之间互相发送对应的事件,接收到事件后会执行启动服务、记录日志、监控状态,然后再发送个新的事件。
本身不难,但需要耐下心来一点点去梳理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/378822.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

sql学习二

文章目录一、 计算函数1. datediff2. all3. year4. sum二、控制流三、过滤 group by having一、 计算函数 1. datediff datediff(日期1, 日期2)&#xff1a; 得到的结果是日期1与日期2相差的天数。 如果日期1比日期2大&#xff0c;结果为正&#xff1b;如果日期1比日期2小&a…

MySQL 学习笔记(借鉴黑马程序员MySQL)

MySQL视频课链接 MySQL概述 数据库相关概念 数据库是存储数据的仓库&#xff0c;数据是有组织的进行存储&#xff08;DataBase&#xff09; 数据库管理系统是操纵和管理数据库的大型软件&#xff08;DataBase Management System&#xff09; SQL是操作关系型数据库的编程语…

Linux(Centos)安装TDengine

目录1&#xff1a;简介2&#xff1a;前期准备3&#xff1a;安装4&#xff1a;启动5&#xff1a;开机自启动6&#xff1a;安装客户端驱动(如果别的服务器需要链接TD则需要此步操作)7&#xff1a;基础命令1&#xff1a;简介 官网&#xff1a; https://www.taosdata.com/简介&…

webpack配置优化,让你的构建速度飞起

前言 越来越多的项目使用webpack5来构建项目了&#xff0c;今天给大家带来最前沿的webpack5配置&#xff0c;让我们代码在编译/运行时性能更好~ 我们会从以下角度来进行优化&#xff1a; 提升打包构建速度减少代码体积优化代码运行性能 提升打包构建速度 在进行打包速度优化…

Elasticsearch入门之Http操作(索引操作、映射操作、文档操作)

Elasticsearch 基本操作 数据格式&#xff1a; Elasticsearch 是面向文档型数据库&#xff0c;一条数据在这里就是一个文档。为了方便大家理解&#xff0c;我们将 Elasticsearch 里存储文档数据和关系型数据库 MySQL 存储数据的概念进行一个类比&#xff0c;如下图&#xff1a…

linux服务器时间同步

Linux服务器时间同步 需求&#xff1a;两台以上服务器之间的时间同步&#xff0c;以其中一台服务器为时间源&#xff0c;其余服务器同步这台时间源服务器的时间 其中&#xff0c;时间源服务器需要有访问外网权限&#xff0c;不然时间源服务器无法同互联网同步最新的时间&#…

Linux Vim 简介

文章目录01. 编辑器 Gedit 介绍02. 什么是 Vi(Vim)03. vim工作模式4.1 命令模式4.2 编辑模式4.3 末行模式04. vim教程05. vim基本操作06. vim实用操作7.1 命令模式下的操作7.2 末行模式下的操作01. 编辑器 Gedit 介绍 gedit 是一个 GNOME 桌面环境下兼容 UTF-8 的 文本编辑器。…

Spike on Flow with Validation Rule

问题 在Flow中如何友好的显示Validation Rule相关的错误信息&#xff1f; 举例 创建account记录&#xff0c;如果industry为finance&#xff0c;validation rule要求revenue必填。 假如你有个flow用来创建account&#xff0c;点击save触发条件&#xff0c; 期望&#xff1a;…

具备“结构化思维”的优势

导读&#xff1a; 在日常工作中&#xff0c;我们时常会碰到这样的情况&#xff0c;有的人讲事情逻辑非常混乱&#xff0c;罗列了很多事项&#xff0c;却把握不到重点&#xff0c;无法把一件事情说清楚。这种思维混乱是典型的缺少结构化思维的表现。结构化思维非常重要&#xff…

实例10:四足机器人运动学逆解可视化与实践

实例10&#xff1a; 四足机器人运动学逆解单腿可视化 实验目的 了解逆运动学的有无解、有无多解情况。了解运动学逆解的求解。熟悉逆运动学中求解的几何法和代数法。熟悉单腿舵机的简单校准。掌握可视化逆向运动学计算结果的方法。 实验要求 拼装一条mini pupper的腿部。运…

【大话面试】- Redis 篇-第一篇

【大话面试】- Redis 篇-第一篇 认识 NoSQL SQL VS NoSQL 1️⃣ 结构化&#xff08;Structured&#xff09; SQL 的存储格式 NoSQL 从其存储的结构上来看&#xff0c;对于 SQL 数据库而言&#xff0c;我们可以给每一个表的属性添加不同的约束&#xff08;主键唯一&#xff…

Java时间获取、格式化详情

Java时间获取详情java.util.Datejava.util.CalendarJava8推荐的时间获取方法LocalDate获取日期LocalTime获取时间LocalDateTime 获取时间和日期这里先附上后面会用到的进行时间格式化的代码&#xff1a;SimpleDateFormat timeSimpleDateFormatter new SimpleDateFormat("…

09_MySQL的子查询

子查询指一个查询语句嵌套在另一个查询语句内部的查询&#xff0c;这个特性从MySQL 4.1开始引入。SQL 中子查询的使用大大增强了 SELECT 查询的能力&#xff0c;因为很多时候查询需要从结果集中获取数据&#xff0c;或者需要从同一个表中先计算得出一个数据结果&#xff0c;然后…

【Node.js】MySQL数据库

数据库数据库的基本概念什么是数据库常见的数据库和分类数据库的数据组织结构实际开发中库&#xff0c;表&#xff0c;行&#xff0c;字段的关系MySQL相关的软件MySQL Workbench创建数据库创建数据表设计表字段字段的特殊标识向表中插入数据使用SQL管理数据库什么是SQLSQL能做什…

springcloud3 Nacos中namespace和group,dataId的联系

一 Namespance和group和dataId的联系 1.1 3者之间的联系 话不多说&#xff0c;上答案&#xff0c;如下图&#xff1a; namespance用于区分部署环境&#xff0c;group和dataId用于逻辑上区分两个目标对象。 二 案例&#xff1a;实现读取注册中心的不同环境下的配置文件 …

IDEA中Maven报错:Failed to read artifact descriptor for解决方案

导入spark-core依赖报错 Failed to read artifact descriptor for com.esotericsoftware:kryo-shaded:jar: 图片忘记报错了&#xff0c;拿一张网友的图&#xff0c;现象是spark-core成功导入&#xff0c;但是pom文件中project处报错 这个原因是因为maven版本不匹配&#xff0c…

金三银四,助力你的大厂梦,2023年软件测试经典面试真题(2)(共3篇)

前言 金三银四即将到来&#xff0c;相信很多小伙伴要面临面试&#xff0c;一直想着说分享一些软件测试的面试题&#xff0c;这段时间做了一些收集和整理&#xff0c;下面共有三篇经典面试题&#xff0c;大家可以试着做一下&#xff0c;答案附在后面&#xff0c;希望能帮助到大…

eNSP实验:vlan 划分与访问

实验目的 交换机未划分 vlan&#xff0c;直接相连的两个终端能否 ping 通&#xff1f; 不同 vlan 中的两个终端能否可以 ping 通&#xff1f; 相同 vlan 但不连接至同一个交换机的两个终端&#xff0c;能否与 ping通&#xff1f; 实验步骤 设计网络拓扑 交换机选用 S5700…

电子技术——AB类输出阶的偏置

电子技术——AB类输出阶的偏置 下面我们介绍两种AB类输出阶的偏置的方法。 使用二极管偏置 下图展示了电流源 III 加两个二极管的偏置方法&#xff1a; 因为输出阶需要大功率输出&#xff0c;因此输出推挽三极管可能是几何体积比较大的晶体管。对于二极管来说&#xff0c;并不…

LeetCode 79. 单词搜索

LeetCode 79. 单词搜索 难度&#xff1a;middle\color{orange}{middle}middle 题目描述 给定一个 mxnm x nmxn 二维字符网格 boardboardboard 和一个字符串单词 wordwordword 。如果 wordwordword 存在于网格中&#xff0c;返回 truetruetrue &#xff1b;否则&#xff0c;返…