Flink源码之JobManager启动流程

news2024/9/27 17:27:53

从启动命令flink-daemon.sh中可以看出StandaloneSession入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 从该类的main方法会进入ClusterEntrypoint::runCluster中, 该方法中会创建出主要服务和组件。

StandaloneSessionClusterEntrypoint::main
ClusterEntrypoint::runClusterEntrypoint
ClusterEntrypoint::startCluster
ClusterEntrypoint::runCluster

private void runCluster(Configuration configuration, PluginManager pluginManager)
        throws Exception {
    synchronized (lock) {
        initializeServices(configuration, pluginManager); //初始化服务

        // write host information into configuration
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

        final DispatcherResourceManagerComponentFactory
                dispatcherResourceManagerComponentFactory =
                        createDispatcherResourceManagerComponentFactory(configuration);
		//创建核心组件
        clusterComponent =
                dispatcherResourceManagerComponentFactory.create(
                        configuration,
                        ioExecutor,
                        commonRpcService,
                        haServices,
                        blobServer,
                        heartbeatServices,
                        metricRegistry,
                        executionGraphInfoStore,
                        new RpcMetricQueryServiceRetriever(
                                metricRegistry.getMetricQueryServiceRpcService()),
                        this);
	...ignore code
    }
}

可以看出关键代码是调用initializeServices以及创建Cluster Component。

protected void initializeServices(Configuration configuration, PluginManager pluginManager)
        throws Exception {

    LOG.info("Initializing cluster services.");

    synchronized (lock) {
        rpcSystem = RpcSystem.load(configuration);

        commonRpcService =
                RpcUtils.createRemoteRpcService(
                        rpcSystem,
                        configuration,
                        configuration.getString(JobManagerOptions.ADDRESS),
                        getRPCPortRange(configuration),
                        configuration.getString(JobManagerOptions.BIND_HOST),
                        configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));

        JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));

        // update the configuration used to create the high availability services
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

        ioExecutor =
                Executors.newFixedThreadPool(
                        ClusterEntrypointUtils.getPoolSize(configuration),
                        new ExecutorThreadFactory("cluster-io"));
        haServices = createHaServices(configuration, ioExecutor, rpcSystem);
        blobServer = new BlobServer(configuration, haServices.createBlobStore());
        blobServer.start();
        heartbeatServices = createHeartbeatServices(configuration);
        metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);

        final RpcService metricQueryServiceRpcService =
                MetricUtils.startRemoteMetricsRpcService(
                        configuration, commonRpcService.getAddress(), rpcSystem);
        metricRegistry.startQueryService(metricQueryServiceRpcService, null);

        final String hostname = RpcUtils.getHostname(commonRpcService);

        processMetricGroup =
                MetricUtils.instantiateProcessMetricGroup(
                        metricRegistry,
                        hostname,
                        ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                configuration));

        executionGraphInfoStore =
                createSerializableExecutionGraphStore(
                        configuration, commonRpcService.getScheduledExecutor());
    }
}

在initializeServices中首先创建commonRpcService,这个RPCService实例是JobManager提供RPC服务的核心,可以看出它会有个地址和监听端口号,commonRpcService可将继承自Gateway的服务实例包装成AkkaActor对外提供RPC服务,比如ResourceManager、Dispatcher。此外还创建了其他服务:

haService: 可通过HAService获取ResourceManager/Dispatcher/RestEndpoint的地址,同时也提供选主服务,组件启动时需向HAService注册,如果被选主成功,则会调用监听器的grandLeadership回调函数
BlobServer: 可用来提供存储大对象存储服务
heartbeatServices:为组件间传递心跳信息
metricRegistry:提供metric上报和查询服务,监听端口不同,新建了一个RpcService专为Metric服务
processMetricGroup:注册系统运行状态信息的Metric,比如GC/Memory/Network运行时状况,添加Metric都是通过一个MetricGroup添加
executionGraphInfoStore:缓存Job执行时信息,比如ExecutionGrap

初始化服务创建完成后,通过DefaultDispatcherResourceManagerComponentFactory:create创建JobManager的三大核心组件:Dispacher/ResourceManager/RestEndpointServer, 都是通过工厂方法创建:

DefaultDispatcherRunnerFactory
StandaloneResourceManagerFactory
SessionRestEndpointFactory

这些组件是JobManager向HAService注册获取leadership后,被ElectionService回调grantLeadership函数中创建出具体组件实例。

RestServer

RestServer并不是一个RPCServer,没有继承RpcGateway,只提供HTTP接口服务,然后将请求转交给Dispatcher处理,它的生成启动流程如下:

SessionRestEndpointFactory::createRestEndpoint
DispatcherRestEndpoint::new
RestServerEndpoint::start //通过Netty启动Rest服务
DispatcherRestEndpoint::initializeHandlers //JobSubmitHeaders、JobSubmitHandler处理客户端提交Job
WebMonitorEndpoint::initializeHandlers //关联Rest请求的Header和Handler
WebMonitorEndpoint::startInternal //竞选leader

ResourceManager

RM生成启动过程是ResourceManagerServiceImpl先竞选leader成功后再创建出具体的ResourceManager

ResourceManagerServiceImpl::start
ResourceManagerServiceImpl::grantLeadership
ResourceManagerServiceImpl::startNewLeaderResourceManager
ResourceManagerServiceImpl::startResourceManagerIfIsLeader//调用start方法
StandaloneResourceManagerFactory::createResourceManager
StandaloneResourceManager::new
StandaloneResourceManager::start

Dispatcher

Dispacher生成启动过程是DefaultDispatcherRunner选主后再创建出具体实例

DefaultDispatcherRunnerFactory::createDispatcherRunner
DefaultDispatcherRunner::create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
DefaultDispatcherRunner::grantLeadership //
DefaultDispatcherRunner::startNewDispatcherLeaderProcess//创建SessionDispatcherLeaderProcess并调用其start方法
DefaultDispatcherRunner::createNewDispatcherLeaderProcess
SessionDispatcherLeaderProcessFactoryFactory::createFactory
SessionDispatcherLeaderProcessFactory::create
SessionDispatcherLeaderProcess::create
SessionDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::startInternal
SessionDispatcherLeaderProcess:onstart
SessionDispatcherLeaderProcess::createDispatcherIfRunning
SessionDispatcherLeaderProcess::createDispatcher
DefaultDispatcherGatewayServiceFactory::create//创建Dispatcher并调用其start方法
SessionDispatcherFactory::createDispatcher
StandaloneDispatcher::new
StandaloneDispatcher::start
Dispatcher::onstart

总结

在这里插入图片描述
JobManager的启动过程就是创建三大组件RestServer/RM/Dispacher实例初始化的过程,RestSever通过Netty启动HTTP服务,RM/Dispacher被AkkaRpcService包装成AkkaActor提供本地或远程RPC服务,RestServer仅仅是接受请求解析消息后由具体Handler处理,JobGrap提交执行会转发给Dispatcher处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/853717.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

内存新一轮暴跌,即将大量流行“官方翻新”

如果说最近一年你有在关注 PC 硬件价格,内存、SSD 想必是值得感慨的。 一次次的好价抄底,似乎永没有尽头。 SSD 降价归功于国产长江存储闪存颗粒大量出货,但内存的猛降能理解但又不完全能理解。 DDR4 到 DDR5 换代没错,但更早知…

COS控制台体验升级 - 文件列表支持网格布局

前言 对象存储(Cloud Object Storage,COS)控制台文件列表页以表格的形式列出存储桶下的所有文件,为了提高用户在COS控制台文件列表页的操作体验,我们对其进行了改版,现在文件列表页支持网格视图&#xff0…

攻防世界-web-shrine

1. 题目描述 打开链接,发现是一串源码: 从源码中不难发现关键词是flask.render_template_string(safe_jinja(shrine)) ,这个函数说明了题目的关键点在于模板渲染,即存在模板注入 2. 思路分析 从代码中不难发现,即使…

什么是思维导图?怎么制作思维导图?看这篇就够了!

在当下快节奏的社会中,无论是学习、工作还是生活,我们都需要处理大量的信息和任务。对于这些复杂的信息和任务,如何有效地理解、记忆和管理,成为了我们面临的一个重要挑战。对于诸如此类场景,使用思维导图就能很好地辅…

python-docx常用方法总结

由于最近有任务需要自动生成word报告,因此学习了一些python-docx的使用方法,在此总结。 目前网上相关的资料不算太多,且大多数都很简单。有一些稍微复杂的需求往往找不到答案,很多想要的方法这个库似乎并没有直接提供。在git上看…

面部表情识别(Pytorch):人脸检测模型+面部表情识别分类模型

目录 0 相关资料1 基于人脸检测面部表情分类识别方法2 项目安装2.1 平台与镜像2.2 项目下载2.3 模型下载2.4 上传待测试图片2.5 项目安装 3 demo测试 0 相关资料 面部表情识别2:Pytorch实现表情识别(含表情识别数据集和训练代码):https://blog.csdn.net…

链表oj (环形链表oj)

文章目录 1.数组oj 2.链表oj 文章内容 1.数组oj 1. 原地移除数组中所有的元素值为val&#xff0c;要求时间复杂度为O(N)&#xff0c;空间复杂度为O(1)。力扣 int removeElement(int* nums, int numsSize, int val){int k numsSize;int a 0;int b 0;while(b<k){if(nu…

Centos7安装jdk8教程——rpm安装

1. rpm文件下载 下载链接 Java SE 8 Archive Downloads (JDK 8u211 and later) 2.上传到服务器指定路径下并安装 切换到上传目录&#xff0c;然后执行以下命令 rpm -ivh jdk-8u221-linux-x64.rpm3. 设置环境变量并重载配置 # 设置环境变量 vim /etc/profile# 文件末尾添加…

【网站搭建】开源社区Flarum搭建记录

环境 服务器系统&#xff1a;腾讯云 OpenCloudOS 宝塔版本&#xff1a;免费版8.0.1 Nginx&#xff1a;1.24.0 MySQL&#xff1a;5.7.42 PHP&#xff1a;8.1.21 萌狼蓝天 2023年8月7日 PHP设置 1.安装扩展&#xff1a;flieinfo、opcache、exif 2.解除禁用函数&#xff1a;putenv…

Llama 2 with langchain项目详解(一)

Llama 2 with langchain项目详解(一) 2023年2月25日,美国Meta公司发布了Llama 1开源大模型。随后,于2023年7月18日,Meta公司发布了Llama 2开源大模型,该系列包括了70亿、130亿和700亿等不同参数规模的模型。相较于Llama 1,Llama 2的训练数据增加了40%,上下文长度提升至…

直线模组在AGV物流设备起什么作用?

在物流产业高速发展的今天&#xff0c;机器人技术的应用程度已经成为决定企业间相互竞争和未来发展的重要衡量因素。智能机器人运用到物流产业&#xff0c;其效率不言而喻。AGV智能仓储作为现代物流系统的重要组成部分&#xff0c;物流自动化、智能化不光是能提升效率和安全性&…

前端懒加载

懒加载的概念 懒加载也叫做延迟加载、按需加载&#xff0c;指的是在长网页中延迟加载图片数据&#xff0c;是一种较好的网页性能优化的方式。在比较长的网页或应用中&#xff0c;如果图片很多&#xff0c;所有的图片都被加载出来&#xff0c;而用户只能看到可视窗口的那一部分…

Java GUI,mybatis实现资产管理系统

Java GUI——资产管理系统 前言&#xff1a;为了做java课设&#xff0c;学了一手Java GUI。感觉蛮有意思的&#xff0c;写写文章&#xff0c;做个视频记录一下。欢迎大家友善指出我的不足 资产管理系统录制视频&#xff0c;从头敲到尾 模块划分 资产信息管理 资产信息查询 …

【Linux取经路】进程的奥秘

文章目录 1、什么是进程&#xff1f;1.1 自己写一个进程 2、操作系统如何管理进程&#xff1f;2.1 描述进程-PCB2.2 组织进程2.3 深入理解进程 3、Linux环境下的进程3.1 task_struct3.2 task_struct内容分类3.3 组织进程3.4 查看进程属性 4、结语 1、什么是进程&#xff1f; 在…

Java GUI——网页浏览器开发

Java GUI——网页浏览器开发 前言&#xff1a;为了做java课设&#xff0c;学了一手Java GUI。感觉蛮有意思的&#xff0c;写写文章&#xff0c;做个视频记录一下。欢迎大家友善指出我的不足 网页浏览器开发录制视频&#xff0c;从头敲到尾 任务需求 界面需求 菜单栏 文件 【…

构建IT项目价值管理体系︱陆金所控股有限公司项目管理专家朱磊

陆金所控股有限公司项目管理专家朱磊先生受邀为由PMO评论主办的2023第十二届中国PMO大会演讲嘉宾&#xff0c;演讲议题&#xff1a;陆控-构建IT项目价值管理体系。大会将于8月12-13日在北京举办&#xff0c;敬请关注&#xff01; 议题简要&#xff1a; IT资源有限&#xff0c;…

常见的几大排序问题

前言&#xff1a;排序问题&#xff0c;是数据结构中的一大重要的组成板块&#xff0c;很多的面试机试中都会多多少少的涉及到排序问题&#xff0c;之前在上数据结构的那个学期整理过排序问题&#xff0c;不过大都是囫囵吞枣&#xff0c;不求甚解&#xff0c;今天&#xff0c;我…

Altium Designer (AD) 绘制原理图及双层PCB简易教程

目录 前言 工程建立 创建工程 创建原理图文件 创建PCB文件 绘制原理图 导入元件库 ​编辑绘制原理图 设置原理图标注 ​编辑​编辑 结果图​编辑 制作PCB板 生成PCB​编辑 绘制PCB边缘 设置PCB网格间距大小 走线规则设置 修改安全间距 什么是安全间距 怎样修…

在飞机设计中的仿真技术

仿真技术在飞机设计中发挥着越来越重要的作用&#xff0c;本文阐述了国内外在飞机设计中广泛使用的结构强度计算&#xff0c;多体动力学仿真、多学科多目标结构优化、内外流场分析、非线性有限元分析、疲劳强度分析、电磁仿真分析&#xff0c;机电液联合仿真分析等&#xff0c;…

数据结构基础5:栈和队列的实现。

一.栈的基本概念。 一.基本概念 1.基本概念 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶&#xff0c;另一端称为栈底。栈中的数据元素遵守后进先出LIFO&#xff08;Last In First Out&…