【深入浅出 Yarn 架构与实现】4-6 RM 行为探究 - 申请与分配 Container

news2024/10/2 14:34:10

本小节介绍应用程序的 ApplicationMaster 在 NodeManager 成功启动并向 ResourceManager 注册后,向 ResourceManager 请求资源(Container)到获取到资源的整个过程,以及 ResourceManager 内部涉及的主要工作流程。

一、整体流程

整个过程可看做以下两个阶段的送代循环:

  • 阶段1 ApplicationMaster 汇报资源需求并领取已经分配到的资源;
  • 阶段2 NodeManager 向 ResourceManager 汇报各个 Container 运行状态,如果 ResourceManager 发现它上面有空闲的资源,则进行一次资源分配,并将分配的资源保存到对应的 应用程序数据结构中,等待下次 ApplicationMaster 发送心跳信息时获取(即阶段1)。

image.png

一)AM 汇报心跳

1、ApplicationMaster 通过 RPC 函数 ApplicationMasterProtocol#allocate 向 ResourceManager 汇报资源需求(由于该函数被周期性调用,我们通常也称之为“心跳”),包括新的资源需求描述、待释放的 Container 列表、请求加入黑名单的节点列表、请求移除黑名单的节点列表等。

public AllocateResponse allocate(AllocateRequest request) {
	// Send the status update to the appAttempt.
    // 发送 RMAppAttemptEventType.STATUS_UPDATE 事件
	this.rmContext.getDispatcher().getEventHandler().handle(
	    new RMAppAttemptStatusupdateEvent(appAttemptId, request.getProgress()));
    
    // 从 am 心跳 AllocateRequest 中取出新的资源需求描述、待释放的 Container 列表、黑名单列表
    List<ResourceRequest> ask = request.getAskList();
    List<ContainerId> release = request.getReleaseList();
    ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();

	// 接下来会做一些检查(资源申请量、label、blacklist 等)

	// 将资源申请分割(动态调整 container 资源量)
    // Split Update Resource Requests into increase and decrease.
    // No Exceptions are thrown here. All update errors are aggregated
    // and returned to the AM.
    List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
    List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
    List<UpdateContainerError> updateContainerErrors =
        RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext,
            request, maximumCapacity, increaseResourceReqs,
            decreaseResourceReqs);

	// 调用 ResourceScheduler#allocate 函数,将该 AM 资源需求汇报给 ResourceScheduler
    // (实际是 Capacity、Fair、Fifo 等实际指定的 Scheduler 处理)
    allocation =
        this.rScheduler.allocate(appAttemptId, ask, release,
            blacklistAdditions, blacklistRemovals,
            increaseResourceReqs, decreaseResourceReqs);
}

2、ResourceManager 中的 ApplicationMasterService#allocate 负责处理来自 AM 的心跳请求,收到该请求后,会发送一个 RMAppAttemptEventType.STATUS_UPDATE 事件,RMAppAttemptImpl 收到该事件后,将更新应用程序执行进度和 AMLivenessMonitor 中记录的应用程序最近更新时间。
3、调用 ResourceScheduler#allocate 函数,将该 AM 资源需求汇报给 ResourceScheduler,实际是 Capacity、Fair、Fifo 等实际指定的 Scheduler 处理。
CapacityScheduler#allocate 实现为例:

// CapacityScheduler#allocate
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
    List<ResourceRequest> ask, List<ContainerId> release,
    List<String> blacklistAdditions, List<String> blacklistRemovals,
    List<UpdateContainerRequest> increaseRequests,
    List<UpdateContainerRequest> decreaseRequests) {

    // Release containers
	// 发送 RMContainerEventType.RELEASED
    releaseContainers(release, application);

    // update increase requests
    LeafQueue updateDemandForQueue =
        updateIncreaseRequests(increaseRequests, application);

    // Decrease containers
    decreaseContainers(decreaseRequests, application);

    // Sanity check for new allocation requests
    // 会将资源请求进行规范化,限制到最小和最大区间内,并且规范到最小增长量上
    SchedulerUtils.normalizeRequests(
        ask, getResourceCalculator(), getClusterResource(),
        getMinimumResourceCapability(), getMaximumResourceCapability());

    // Update application requests
    // 将新的资源需求更新到对应的数据结构中
    if (application.updateResourceRequests(ask)
        && (updateDemandForQueue == null)) {
      updateDemandForQueue = (LeafQueue) application.getQueue();
    }

    // 获取已经为该应用程序分配的资源
    allocation = application.getAllocation(getResourceCalculator(),
                   clusterResource, getMinimumResourceCapability());
        
    return allocation;
}

4、ResourceScheduler 首先读取待释放 Container 列表,向对应的 RMContainerImpl 发送 RMContainerEventType.RELEASED 类型事件,杀死正在运行的 Container;然后将新的资源需求更新到对应的数据结构中,之后获取已经为该应用程序分配的资源,并返回给 ApplicationMasterService。

二)NM 汇报心跳

1、NodeManager 将当前节点各种信息(container 状况、节点利用率、健康情况等)封装到 nodeStatus 中,再将标识节点的信息一起封装到 request 中,之后通过RPC 函数 ResourceTracker#nodeHeartbeat 向 ResourceManager 汇报这些状态。

// NodeStatusUpdaterImpl#startStatusUpdater
  protected void startStatusUpdater() {

    statusUpdaterRunnable = new Runnable() {
      @Override
      @SuppressWarnings("unchecked")
      public void run() {
        // ...
        Set<NodeLabel> nodeLabelsForHeartbeat =
                nodeLabelsHandler.getNodeLabelsForHeartbeat();
        NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);

        NodeHeartbeatRequest request =
            NodeHeartbeatRequest.newInstance(nodeStatus,
                NodeStatusUpdaterImpl.this.context
                    .getContainerTokenSecretManager().getCurrentKey(),
                NodeStatusUpdaterImpl.this.context
                    .getNMTokenSecretManager().getCurrentKey(),
                nodeLabelsForHeartbeat);
          
        // 发送 nm 的心跳
        response = resourceTracker.nodeHeartbeat(request);

2、ResourceManager 中的 ResourceTrackerService 负责处理来自 NodeManager 的请 求,一旦收到该请求,会向 RMNodeImpl 发送一个 RMNodeEventType.STATUS_UPDATE 类型事件,而 RMNodelmpl 收到该事件后,将更新各个 Container 的运行状态,并进一步向 ResoutceScheduler 发送一个 SchedulerEventType.NODE_UPDATE 类型事件。

// ResourceTrackerService#nodeHeartbeat
  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
      throws YarnException, IOException {

    NodeStatus remoteNodeStatus = request.getNodeStatus();
    /**
     * Here is the node heartbeat sequence...
     * 1. Check if it's a valid (i.e. not excluded) node
     * 2. Check if it's a registered node
     * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
     * 4. Send healthStatus to RMNode
     * 5. Update node's labels if distributed Node Labels configuration is enabled
     */
      
    // 前 3 步都是各种检查,后面才是重点的逻辑
    // Heartbeat response
    NodeHeartbeatResponse nodeHeartBeatResponse =
        YarnServerBuilderUtils.newNodeHeartbeatResponse(
            getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
            NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
    // 这里会 set 待释放的 container、application 列表
    // 思考:为何只有待释放的列表呢?分配的资源不返回么? - 分配的资源是和 AM 进行交互的
    rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);

    populateKeys(request, nodeHeartBeatResponse);

    ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
        rmContext.getSystemCredentialsForApps();
    if (!systemCredentials.isEmpty()) {
      nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
    }

    // 4. Send status to RMNode, saving the latest response.
    // 发送 RMNodeEventType.STATUS_UPDATE 事件
    RMNodeStatusEvent nodeStatusEvent =
        new RMNodeStatusEvent(nodeId, remoteNodeStatus);
    if (request.getLogAggregationReportsForApps() != null
        && !request.getLogAggregationReportsForApps().isEmpty()) {
      nodeStatusEvent.setLogAggregationReportsForApps(request
        .getLogAggregationReportsForApps());
    }
    this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);

3、ResourceScheduler 收到事件后,如果该节点上有可分配的空闲资源,则会将这些资源分配给各个应用程序,而分配后的资源仅是记录到对应的数据结构中,等待 ApplicationMaster 下次通过心跳机制来领取。(资源分配的具体逻辑,将在后面介绍 Scheduler 的文章中详细讲解)。

三、总结

本篇分析了申请与分配 Container 的流程,主要分为两个阶段。
第一阶段由 AM 发起,通过心跳向 RM 发起资源请求。
第二阶段由 NM 发起,通过心跳向 RM 汇报资源使用情况。
之后就是,RM 根据 AM 资源请求以及 NM 剩余资源进行一次资源分配(具体分配逻辑将在后续文章中介绍),并将分配的资源通过下一次 AM 心跳返回给 AM。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/380327.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

吴恩达机器学习笔记——线性回归

1.模型描述有训练集数据房子面积和卖出的价钱&#xff0c;我们用这组数据来模拟特定面积的房子能够卖出的价钱。这是一个很明显的监督学习&#xff08;supervised learning&#xff09;的例子&#xff0c;因为我们的训练集里包含了正确的结果&#xff08;即房子的卖价&#xff…

非递归迭代实现二叉树前序,中序,后序遍历

文章目录1. 前序遍历2. 中序遍历3. 后序遍历1. 前序遍历 题目链接 解题思路&#xff1a; 非递归遍历一棵树有两点&#xff1a; 1.左路结点 2.左路结点的右子树 什么意思呢&#xff1f; 我们知道前序遍历是按照根&#xff0c;左子树&#xff0c;右子树来的。所以它是先根&…

js中的原型链

js中原型和原型链&#x1f61a; 1、为什么需要原型链&#xff1f;&#x1f923;&#x1f61a; 凡事都是有一定的需求和原因发展起来的&#xff0c;在ECMA中为什么要提出原型链这个概念呢&#xff1f; 我们知道&#xff0c;创建对象有两种方式。一种是通过字面量来创建&#…

科研 | 论文写作 | 最常用的LaTeX语法

最常用的LaTeX语法1. 行内公式2. 行间公式3. 下标4. 上标5. 公式编号6. 数学公式7. 根号和分式8. 上下标记9. 向量10. 积分、极限、求和、乘积11. 三圆点12. 重音符号13. 矩阵14. 小写希腊字母和大写希腊字母15. 公式组合16. 拆分单个公式1. 行内公式 格式&#xff1a;将公式编…

流计算框架storm概览

Attention: supervison 和 nimbus的状态都实时保存在zookeeper集群中和本地. Enchance, this means you can kill -9 Nimbus or the Supervisors and theyll start back up as nothing happened. Topologies 1. storm jar all-my-code.jar org.apache.storm.MyTopology a…

父类子类静态代码块、构造代码块、构造方法执行顺序

github:https://github.com/nocoders/java-everything.git 名词解释 静态代码块&#xff1a;java中使用static关键字修饰的代码块&#xff0c;每个代码块只会执行一次&#xff0c;JVM加载类时会执行静态代码块中的代码&#xff0c;静态代码块先于主方法执行。构造代码块&#…

[Java面经] 三年工作经验, 极兔一二面

极兔一二面面经: 1. mysql的acid怎么实现的 这一点先回答ACID分别是A(原子性),C(一致性),I(隔离性),D(持久性), 其中持久性是数据库落磁盘的操作,无需额外实现. 隔离性是通过事务的隔离级别来实现, MySQL默认的隔离级别是RR(可重复读), 虽然上面还有一层Serializable(串行化…

如何在canvas中模拟css的背景图片样式

笔者开源了一个Web思维导图mind-map&#xff0c;最近在优化背景图片效果的时候遇到了一个问题&#xff0c;页面上展示时背景图片是通过css使用background-image渲染的&#xff0c;而导出的时候实际上是绘制到canvas上导出的&#xff0c;那么就会有个问题&#xff0c;css的背景图…

【日常总结】docker容器相互调用,占用服务器带宽解决方案

目录 一、场景&#xff1a; 1. 环境 2. 项目背景&#xff1a; 3. 全球时区解决方案 4. 方案二步骤 二、问题 三、产生原因 四、解决方案 五、解决步骤 六、整改效果 一、场景&#xff1a; docker容器相互调用&#xff0c;占用慢服务器带宽&#xff0c;导致netty连接的…

go 切片(slice)原理及用法注意事项

切片(slice)定义 go语言中的slice是一种数据结构,其定义为一个结构体,如下所示; type SliceHeader struct {Data uintptr // 指向底层数组的指针Len int // 切片的长度Cap int // 切片的容量 }切片与数组 切片的底层数据存储结构是 数组切片较为灵活,能动态扩容,而数组是定…

vue2使用v-viewer实现图片预览ImagePreview

追溯&#xff1a; View UI Plus 是 View Design 设计体系中基于 Vue.js 3 的一套 UI 组件库&#xff0c;里面有个组件ImagePreview可以实现“图片预览”。 使用ImagePreview组件&#xff0c;报错&#xff1a; [Vue warn]: Unknown custom element: <ImagePreview> - d…

odoo15 标题栏自定义

odoo15 标题栏自定义 如何显示为自定义呢 效果如下: 代码分析: export class WebClient extends Component {setup() {this.menuService = useService("menu");this.actionService = useService("action");this.title = useService("title&…

在Docker 上完成对Springboot+Mysql+Redis的前后端分离项目的部署(全流程,全截图)

本文章全部阅读大约2小时&#xff0c;包含一个完整的springboot vue mysqlredis前后端分离项目的部署在docker上的全流程&#xff0c;比较复杂&#xff0c;请做好心理准备&#xff0c;遇到问题可留言或则私信 目录 1 安装Docker&#xff0c;以及简单使用参照 2 Docker部署m…

HOT100--(3)无重复字符的最长子串

点击查看题目详情 大思路&#xff1a; 创建哈希表&#xff0c;元素类型为<char, int>&#xff0c;分别是字符与其对应下标 用哈希表来存储未重复的子串&#xff0c;若有重复则记录下当前子串最大值maxhashsize 并且开始以相同方法记录下一子串 遍历完成以后&#xff0c…

Android OpenCV(七十三):吊打高斯模糊的StackBlur Android 实践

前言 OpenCV 4.7.0 2022年12月28日Release,ChangeLog中提到 Stackblur algorithm implementation. Stackblur是一种高斯模糊的快速近似,由Mario Klingemann发明。其计算耗时不会随着kernel size的增大而增加,专为大kernel size的模糊滤波场景量身定制。 使用建议:当kerne…

[RDMA-高级计算机网络report] Congestion Control for Large-Scale RDMA Departments

本文主要解决的问题是在RoCEv2体系中&#xff0c;基于优先级的拥塞控制PFC是一种粗粒度的机制。 它在端口&#xff08;或端口加优先级&#xff09;级别上运行&#xff0c;并且不区分流。PAUSE机制是基于每个端口&#xff08;和优先级&#xff09;的&#xff0c;而不是基于每个流…

mysql数据库之索引使用原则

一、最左前缀法则。 1、如果索引使用了多列&#xff08;联合索引&#xff09;&#xff0c;要遵守最左前缀法则。最左前缀法则指的是查询从索引的最左列开始&#xff0c;并且不跳过索引中的列。 如果跳跃到某一列&#xff0c;索引将部分失效&#xff08;后面的字段索引失效&am…

springboot启动时遇见的版本不同、无法启动、自动停止问题解决方案

Springboot项目启动失败初来乍到&#xff0c;听说springboot很好用&#xff0c;很简便&#xff0c;于是爱搞事情的我就打算试试&#xff0c;因为最近在找工作&#xff0c;很多软件开发的也要求springboot的使用&#xff0c;于是我就开启了springboot的学习之旅&#xff0c;打算…

Vue3 企业级项目实战:认识 Spring Boot

Vue3 企业级项目实战 - 程序员十三 - 掘金小册Vue3 Element Plus Spring Boot 企业级项目开发&#xff0c;升职加薪&#xff0c;快人一步。。「Vue3 企业级项目实战」由程序员十三撰写&#xff0c;2744人购买https://s.juejin.cn/ds/S2RkR9F/ 越来越流行的 Spring Boot Spr…

人工智能及其应用(蔡自兴)期末复习

人工智能及其应用&#xff08;蔡自兴&#xff09;期末复习 相关资料&#xff1a; 人工智能期末复习 人工智能复习题 人工智能模拟卷 人工智能期末练习题 1 ⭐️绪论 人工智能&#xff1a;人工智能就是用人工的方法在机器&#xff08;计算机&#xff09;上实现的智能&#xff0…