Flink的ResourceManager详解(一)

news2025/1/13 17:35:15
ResourceManager 总结
一、概述
1、ResourceManager 管理 Flink 集群中的计算资源,计算资源主要来自 TaskManager 组件。

2、如果集群采用 Native【本地模式】部署,则 ResourceManager 会动态地向集群资源管理器申请 Container 并启动TaskManager,例如Hadoop Yarn、Kubernetes等。

3、ResourceManager主要接收来自 JobManager 的 SlotRequest 和 TaskManager 的 SlotReport。
二、分类
1、动态资源管理 和 不支持动态资源管理
1)一类支持动态资源管理,例如KubernetesResourceManager、YarnResourceManager及MesosResourceManager

支持动态资源管理的集群类型,可以按需启动TaskManager资源,根据Job所需的资源请求,动态启动TaskManager节点,这种资源管理方式不用担心资源浪费和资源动态伸缩的问题。

实现动态资源管理的ResourceManager需要继承ActiveResourceManager基本实现类。

2)另一类不支持动态资源管理,例如StandaloneResourceManager
2、分类图

在这里插入图片描述

三、核心服务

ResourceManagerRuntimeServices 中包含 SlotManager 和 JobLeaderldService 两个主要服务和 HeartbeatService 心跳服务。

1、SlotManager 管理整个集群的 Slot 计算资源,并对 Slot 计算资源进行统一的分配和管理,同时实现了对 TaskManager 信息的注册和管理。
2、JobLeaderldService 通过实现 jobLeaderldListeners 实时监听 JobManager 的运行状态,以获取集群启动的作业对应的 JobLeaderld 信息,防止出现 JobManager 无法连接的情况,用于管理注册的 JobManager 节点,包括对 JobManager 的注册和注销等操作。
3、HeartbeatService 主要通过 TaskManagerHeartbeatListener 和 JobManagerHeartbeatListener 两个监听器收集来自 TaskManager和 JobManager 的心跳信息,以保证整个运行时中各个组件之间能够正常通信。
四、ResourceManager 的初始化和启动
DefaultDispatcherResourceManagerComponentFactory#create 方法
1、初始化 ResourceManager
 resourceManager =
                    resourceManagerFactory.createResourceManager(
                            configuration,
                            ResourceID.generate(),
                            rpcService,
                            highAvailabilityServices,
                            heartbeatServices,
                            fatalErrorHandler,
                            new ClusterInformation(hostname, blobServer.getPort()),
                            webMonitorEndpoint.getRestBaseUrl(),
                            metricRegistry,
                            hostname,
                            ioExecutor);
1)创建 ResourceManagerRuntimeServices
1.创建 SlotManager

SlotMatchingStrategy 根据作业中给定的 ResourceProfile 匹配 Slot 计算资源。SlotMatchingStrategy主要分为两种类型:

一种是LeastUtilizationSlotMatchingStrategy,即按照利用率最低原则匹配Slot资源,尽可能保证TaskExecutor上资源的使用率处于比较低的水平,这种策略能够有效降低机器的负载。

另一种是AnyMatchingSlotMatchingStrategy,即直接返回第一个匹配的Slot资源策略。

private static SlotManager createSlotManager(
            ResourceManagerRuntimeServicesConfiguration configuration,
            ScheduledExecutor scheduledExecutor,
            SlotManagerMetricGroup slotManagerMetricGroup) {
        final SlotManagerConfiguration slotManagerConfiguration =
                configuration.getSlotManagerConfiguration();
        if (configuration.isEnableFineGrainedResourceManagement()) {
            return new FineGrainedSlotManager(
                    scheduledExecutor,
                    slotManagerConfiguration,
                    slotManagerMetricGroup,
                    new DefaultResourceTracker(),
                    new FineGrainedTaskManagerTracker(),
                    new DefaultSlotStatusSyncer(
                            slotManagerConfiguration.getTaskManagerRequestTimeout()),
                    new DefaultResourceAllocationStrategy(
                            SlotManagerUtils.generateTaskManagerTotalResourceProfile(
                                    slotManagerConfiguration.getDefaultWorkerResourceSpec()),
                            slotManagerConfiguration.getNumSlotsPerWorker()),
                    Time.milliseconds(REQUIREMENTS_CHECK_DELAY_MS));
        } else if (configuration.isDeclarativeResourceManagementEnabled()) {
            return new DeclarativeSlotManager(
                    scheduledExecutor,
                    slotManagerConfiguration,
                    slotManagerMetricGroup,
                    new DefaultResourceTracker(),
                    new DefaultSlotTracker());
        } else {
            return new SlotManagerImpl(
                    scheduledExecutor, slotManagerConfiguration, slotManagerMetricGroup);
        }
    }
2.创建 JobLeaderIdService
final JobLeaderIdService jobLeaderIdService =
                new DefaultJobLeaderIdService(
                        highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout());
2)返回创建的 StandaloneResourceManager
return new StandaloneResourceManager(
                rpcService,
                resourceId,
                highAvailabilityServices,
                heartbeatServices,
                resourceManagerRuntimeServices.getSlotManager(),
                ResourceManagerPartitionTrackerImpl::new,
                resourceManagerRuntimeServices.getJobLeaderIdService(),
                clusterInformation,
                fatalErrorHandler,
                resourceManagerMetricGroup,
                standaloneClusterStartupPeriodTime,
                AkkaUtils.getTimeoutAsTime(configuration),
                ioExecutor);

在 StandaloneResourceManager 构造方法中启动 RpcServer

this.rpcServer = rpcService.startServer(this);
2、启动 ResourceManager
resourceManager.start()->ResourceManager#onStart

ResourceManager#startResourceManagerServices

1)获取 leaderElectionService
leaderElectionService =
                    highAvailabilityServices.getResourceManagerLeaderElectionService();
2)初始化 resourceManagerDriver【ActiveResourceManager需要】
resourceManagerDriver.initialize(this, new GatewayMainThreadExecutor(), ioExecutor);
3)启动 leader 竞选,在 leader 节点启动服务
1.启动心跳服务

在ResourceManager中HeartbeatService的启动方法中,包括了对taskManagerHeartbeatManager和jobManagerHeartbeatManager两个心跳管理服务的启动操作。

而心跳管理服务主要通过TaskManagerHeartbeatListener和JobManagerHeartbeatListener两个监听器收集来自TaskManager和JobManager的心跳信息,以保证整个运行时中各个组件之间能够正常通信。

startHeartbeatServices();
2.启动 slotManager 服务

通过scheduledExecutor线程池启动TaskManager周期性超时检查服务,通过checkTaskManagerTimeouts()方法实现该检查,防止TaskManager长时间掉线等问题。

启动单独的线程对提交的SlotRequest进行周期性超时检查,防止Slot请求超时。

slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
4)启动 jobLeaderIdService
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
五、总结
1、ResourceManager 通过 SlotManager 管理集群中的计算资源(TaskManager 的 SlotReport)响应 JobManager 的 SlotRequest;
2、ResourceManager 通过 HeartBeatService 监听 JobManager 和 TaskManager 的心跳,保证运行时各个组件间能够正常通信;
3、ResourceManager 通过 JobLeaderldService 管理注册的 JobManager 节点,包括对 JobManager 的注册和注销等操作;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1099434.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

depcheck检查项目依赖的安装情况-帮你解决各种项目运行灵异事件

depcheck检查项目缺失的依赖 depcheck介绍与安装介绍安装 depcheck使用基础使用注意 进阶使用 删除多余的依赖注意 depcheck介绍与安装 介绍 工作中,以下的场景恐怕大家都有经历过: 从代码仓库上面 clone 的项目,自己本地一运行就报错… 用…

Ruby和面向对象技术

Ruby和许多极为流行的编程语言都是面向对象的。多数的面向对象编程语言,每个对象都是一个样例或者既定类的实例以及独立对象的行为。 一、创建一个通用对象 创建一个通用对象 obj Object.new定义通用对象的行为 def obj.talk puts "I am an object"p…

MySQL远程连接

一、什么是mysq的远程连接? 1、本地连接 直接在本地使用mysqladmin命令登录 mysql -u root -p 解释如下: mysql:mysql 命令表示要启动 MySQL 客户端。-u root:-u 选项指定要使用的用户名。在这里,我们使用 root 用户名作为示例。-p:-p 选项需要用户输入密码。如果省…

Unexpected mutation of “dialogVisible“ prop.

问题记录: Vue2项目在封装element-ui的dialog组件时,eslint报错 Unexpected mutation of “dialogVisible” prop.eslintvue/no-mutating-props 大致意思是父组件传递过来的 dialogVisible 属性,不允许在子组件中修改父组件的值 解决方法&a…

qtabwidget 样式表

.QWidget{background-color: #ffffff; }/*设置TabWidget中QTabBar的样式*/ QTabWidget{background-color: #E6EBE8; } QTabBar::tab{background-color: #DEDEDE;font-family:Source Han Sans CN; /*设置tab中的文本的字体*/font-size:20pt;font-weight: normal;color:#3D3D3…

HCIP---BGP社团属性

文章目录 目录 文章目录 前言 一.BGP社团属性概述 公有社团属性 配置命令 前言 前文详细介绍了BGP的基础内容,本编将着重于BGP协议的另一个重点内容进行详解。 一.BGP社团属性概述 BGP社区属性是BGP路由协议中的一种特殊属性,可以用于指定一组AS号码&…

线程安全案例 --- 线程池

小王学习录 今日鸡汤什么是线程池为什么需要线程池1. 协程2. 线程池3. 什么是用户态和内核态使用线程池 --- java标准库1. 线程池的创建2. 工厂模式3. 线程池的使用ThreadPollExecutor类构造方法1. corePoolSize, maximumPoolSize 线程2. long keepAliveTime, TimeUnit unit 时…

paddlenlp:社交网络中多模态虚假媒体内容核查(特征篇)

初赛之特征构造 写在前面一、安装paddleOCR二、代码部分三、模型优缺点四、写在最后 写在前面 通过前面两篇文章的介绍,我们可以大致的知道模型用到的特征分为四块:qCap,qImg,captions,imgs。根据这些特征&#xff0c…

第一章 C语言程序设计Pro

考点一 C语言特点,C程序基本构成 数据类型丰富运算符丰富是一种中级语言(高级) //包含了中级和高级的特性数据类型检查不严格 //int和char通用 对下标的越界不报错 2021.下列属于计算机中高级语言的是(…

嵌入式基础——哈弗结构

文章目录 1 什么是哈弗结构?2 哈弗结构单片机的特点3 什么是改进的(Enhanced)哈弗结构?4 写在最后 1 什么是哈弗结构? 哈佛结构是一种将程序指令存储和数据存储分开的存储器结构,如图所示: 哈佛结构是一种并行体系结构&#xf…

Consider using the `--user` option or check the permissions.

ERROR: Could not install packages due to an OSError: [WinError 5] 拒绝访问。: C:\\Users\\luckyli\\anaconda3\\envs\\CV\\Lib\\site-packages\\~orch\\lib\\asmjit.dll Consider using the --user option or check the permissions. 安装pytorch时遇到上述问题 通过以下…

Git Cherry Pick的使用

cherry-pick命令的基本用法 cherry-pick命令的基本语法如下&#xff1a; git cherry-pick <commit>其中&#xff0c;<commit>是要应用的提交的哈希值或分支名。该命令会将指定的提交应用到当前分支上&#xff0c;并创建一个新的提交。 使用场景 cherry-pick命令…

Jmeter-实现图片的上传和下载

图片上传 选中测试计划右键&#xff0c;添加->线程(用户)->线程组 配置线程组 上面分别是总次数&#xff0c;时间&#xff0c;循环次数&#xff0c;就是字面意思 选中你的线程组右键&#xff0c;添加->取样器->HTTP请求 配置HTTP请求 为了方便观看&#xff0c;这…

【C++基础】13. 结构体

文章目录 【 1. 结构体的定义 】【 2. 结构体成员的访问 】【 3. 结构体变量的声明 】【 4. 指向结构体的指针 】 数组与结构体&#xff1a;C/C 数组允许定义可存储相同类型数据项的变量。而结构体是 C 中另一种用户自定义的可用的数据类型&#xff0c;它允许我们存储不同类型的…

在vscode中配置git bash终端、git 源码管理

打开vscode文件->首选项->设置&#xff0c;打开设置搜索shell windows将以下配置添加到vscode中的settings.json中 注意&#xff1a; terminal.integrated.profiles.windows这个配置项是就是添加终端的terminal.integrated.defaultProfile.windows这个是配置默认选项的…

【Linux初阶】多线程3 | 线程同步,生产消费者模型(普通版、BlockingQueue版)

文章目录 ☀️一、线程同步&#x1f33b;1.条件变量&#x1f33b;2.同步概念与竞态条件&#x1f33b;3.条件变量函数&#x1f33b;4.条件变量使用规范&#x1f33b;5.代码案例 ☀️二、生产者消费者模型&#x1f33b;1.为何要使用生产者消费者模型&#x1f33b;2.生产者消费者模…

代理正向 反向代理

1.正向代理 主动发送流量 端口转发 反向代理 被动发送流量 正向代理的设置 画图 实验 利用 攻击机 外网 失控服务器 内网 外网都有 内部服务内网 使用工具 使用的恶意脚本 放到网页里 客户端 使用 网站访问 解析 一下 使用的工具 pyth…

Java学习_day01_hello java

构成 JDK JDK是java开发者工具&#xff0c;由JRE和一些开发工具组成。JRE JRE是java运行环境&#xff0c;由JVM和java核心类库组成。JVM JVM是java虚拟机&#xff0c;主要用来运行字节码。 执行过程 由IDE或文本编辑器&#xff0c;编写源代码&#xff0c;并将文件保存为*.ja…

谈谈 AOF

谈谈 AOF Append Only File&#xff0c;只追加文件。 AOF 文件存储的是具体的操作命令。 Redis 每执行一条写操作命令&#xff0c;执行完之后&#xff0c;就把该命令追加到 AOF_Buffer 缓冲区中&#xff0c;然后会使用某种写回策略&#xff0c;写回磁盘的AOF文件中。 Redis 重启…

【GD32】GD32F303串口设置DMA发生中断无法进入中断函数

在GD32F303官方提供的串口例程中&#xff0c;有一个DMA发生和接收中断例程&#xff0c;在模仿着写的过程中&#xff0c;能够正常发送数据&#xff0c;但是无法进入中断函数。DMA0_Channel3_IRQHandler函数时官方定义的弱函数&#xff0c;需要自己重新实现。如果开启了DMA0通道3…