深度了解flink(七) JobManager(1) 组件启动流程分析

news2024/11/7 15:28:47

前言

JobManager是Flink的核心进程,主要负责Flink集群的启动和初始化,包含多个重要的组件(JboMaster,Dispatcher,WebEndpoint等),本篇文章会基于源码分析JobManagr的启动流程,对其各个组件进行介绍,希望对JobManager有一个更全面的了解。

集群启动模式

ClusterEntryPoint是Flink集群的入口点的基类,该类是抽象类,类继承关系UML图如下

通过上图可知道,Flink有3种集群模式

Flink Session集群

根据不同的资源管理器,有3个不同的子类:

  • StandaloneSessionClusterEntrypoint Standalone session模式下集群的入口类
  • KubernetesSessionClusterEntrypoint K8s session模式下集群的入口类
  • YarnSessionClusterEntrypoint Yarn session模式下集群的入口类

集群生命周期

在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。

资源隔离:

Flink作业共享集群的ResourceManager和Dispacher等组件,TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。

适用场景:

因为组件共享,session集群资源使用率高,集群预先存在,不需要额外申请资源,适合一些比较小的,不是长期运行的作业,例如SQL预览,交互式查询,实时任务测试环境等

Flink Per Job集群

只要Yarn提供了继承的子类:YarnJobClusterEntrypoint

集群生命周期

在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。一旦作业完成,Flink Job 集群将被拆除。

资源隔离:

每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

使用场景:

实时由于Per Job模式下用户应用程序的main方法在客户端执行生成JobGraph,任务量大情况下存在性能瓶颈,目前已被标记为废弃状态。

Flink Application集群

根据不同的资源管理器,有3个不同的子类:

  • StandaloneApplicationClusterEntryPoint Standalone Application模式下集群的入口类
  • KubernetesApplicationClusterEntrypoint K8s Application模式下集群的入口类
  • YarnApplicationClusterEntryPoint Yarn Application模式下集群的入口类

集群生命周期

Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且main方法在集群上而不是客户端上运行。应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用main方法来提取 JobGraph

资源隔离:

每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

使用场景:

Application模式资源隔离性好,Per Job模式的替换方案,适合长期运行、具有高稳定性的大型作业

JobManager启动流程

JobManger启动流程在不同模式下基本相同,Standalone模式可以在本地运行(可以参考),方便Debug,因为使用Standalone模式的入口类StandaloneSessionClusterEntrypoint进行启动流程的分析。

main方法入口

public static void main(String[] args) {
    // 打印系统相关信息
    EnvironmentInformation.logEnvironmentInfo(
        LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
    //信号注册器,注册系统级别的信号,接收到系统级别终止信号优雅的关闭
    SignalHandler.register(LOG);
    //注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭
    JvmShutdownSafeguard.installAsShutdownHook(LOG);

    // 解析命令行参数,获取配置信
    final EntrypointClusterConfiguration entrypointClusterConfiguration =
    ClusterEntrypointUtils.parseParametersOrExit(
        args,
        new EntrypointClusterConfigurationParserFactory(),
        StandaloneSessionClusterEntrypoint.class);
    //加载config.yaml,构建Configuration对象
    Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

    StandaloneSessionClusterEntrypoint entrypoint =
    new StandaloneSessionClusterEntrypoint(configuration);

    ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

主要步骤

  1. 打印系统信息。
  2. 注册信号处理器,注册系统级别的信号,确保优雅关闭。
  3. 注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭。
  4. 解析命令行参数,加载配置文件。
  5. 初始化 StandaloneSessionClusterEntrypoint
  6. 调用 ClusterEntrypoint#runClusterEntrypoint 方法启动集群。

ClusterEntrypoint#runClusterEntrypoint

public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
    //
    clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
    LOG.error(
        String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
        e);
    System.exit(STARTUP_FAILURE_RETURN_CODE);
}

//无关代码 无需关注
}

核心步骤

  • 调用 clusterEntrypoint.startCluster() 启动集群。

ClusterEntrypoint#startCluster

public void startCluster() throws ClusterEntrypointException {
       //无关代码 无需关注
        try {
            FlinkSecurityManager.setFromConfiguration(configuration);
            //插件管理类,用来加载插件。插件加载两种方式。
            //1).通过如下参数配置FLINK_PLUGINS_DIR。
            //2).将插件jar包放入到plugins下
            PluginManager pluginManager =
                    PluginUtils.createPluginManagerFromRootFolder(configuration);
            //初始化文件系统的配置
            configureFileSystems(configuration, pluginManager);
            //初始化安全上下文环境 默认HadoopSecurityContext,Hadoop安全上下文,
            //使用先前初始化的UGI(UserGroupInformation)和适当的安全凭据。比如Kerberos。
            //总结:初始化安全环境,创建安全环境的时候会做一系列的检查。
            SecurityContext securityContext = installSecurityContext(configuration);

            ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
            //安全的情况下调用runCluster开始初始化组件
            securityContext.runSecured(
                    (Callable<Void>)
                            () -> {
                                runCluster(configuration, pluginManager);

                                return null;
                            });
        } catch (Throwable t) {
            //异常处理代码 无需关注
        }
    }

startCluster方法主要做了一些环境和配置初始化的工作

主要步骤

  1. 初始化插件管理器,用来加载插件。
  2. 初始化文件系统设置 例如 hdfs、本地file。此时只是初始化的配置。
  3. 初始化安全环境。
  4. 安全环境下调用 runCluster 方法。

ClusterEntrypoint#runCluster

private void  runCluster(Configuration configuration, PluginManager pluginManager)
            throws Exception {
        synchronized (lock) {
            //初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等
            initializeServices(configuration, pluginManager);

            // write host information into configuration
            configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
            configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());

            //创建Dispatcher和ResourceManger组件的工厂类
            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                            createDispatcherResourceManagerComponentFactory(configuration);

            //创建Dispatcher和ResourceManger组件
            clusterComponent =
                    dispatcherResourceManagerComponentFactory.create(
                            configuration,
                            resourceId.unwrap(),
                            ioExecutor,
                            commonRpcService,
                            haServices,
                            blobServer,
                            heartbeatServices,
                            delegationTokenManager,
                            metricRegistry,
                            executionGraphInfoStore,
                            new RpcMetricQueryServiceRetriever(
                                    metricRegistry.getMetricQueryServiceRpcService()),
                            failureEnrichers,
                            this);
            //组件停止运行后的异步方法
            clusterComponent
                    .getShutDownFuture()
                    .whenComplete(
                        //代码省略
                    )
                            
        }
    }

主要步骤

1.初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等

2.创建Dispatcher和ResourceManger组件的工厂类

3.创建Dispatcher和ResourceManger组件

4.定义组件停止运行后的异步方法

总结

本篇文章分享了Flink任务的集群模式,通过源码的方式分析了JobManger的启动流程,后续会对JobManger相关的服务和组件进行更详细的分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2229604.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ChatGPT终于变成了智能搜索引擎

一、引言 今天即2024年11月1日&#xff0c;ChatGPT又给我们带来了惊喜。 继前一段时间新增加聊天搜索功能之后&#xff0c;ChatGPT又新增联网功能&#xff0c;可以像搜索引擎一样进行网页搜索&#xff0c;这样一个智能工具摇身一变成AI搜索了&#xff01; 有了AI搜索我们将可…

ZK范式系列之zkVM介绍(1)

1. 引言 zkVM&#xff08;Zero-Knowledge Virtual Machine&#xff0c;零知识虚拟机&#xff09;&#xff1a; 是一种功能强大的虚拟机&#xff0c;利用零知识证明 (zero-knowledge proof&#xff0c;ZKP) 来保证计算的完整性和隐私性。 零知识证明&#xff08;ZKP&#xff…

颠覆微服务管理:用Traefik+Docker轻松实现自动化流量控制

#作者&#xff1a; Power0fMoney 文章目录 第一部分&#xff1a;背景和现状1.1 微服务架构的兴起1.2 容器技术的普及1.3 运维的痛点 第二部分&#xff1a;详细解释Traefik各个功能模块2.1 动态服务发现2.2 内置的Lets Encrypt支持2.3 中间件支持2.4 负载均衡策略2.5 监控和可视…

BSV区块链为供应链管理带来效率革命

​​发表时间&#xff1a;2024年10月10日 供应链管理是众多行业的重中之重&#xff0c;它确保了商品能够从制造商处顺畅地传递到消费者手中。然而&#xff0c;传统的供应链管理面临着许多挑战&#xff0c;包括缺乏透明度、延误、欺诈和协调上的低效率等等。 BSV区块链技术的出…

Xcode 15.4 运行flutter项目,看不到报错信息详情?

Xcode升级后&#xff0c;遇到了奇怪的事情&#xff1a; 运行flutter项目&#xff0c;左侧栏显示有报错信息&#xff0c;但是点击并没有跳转出具体的error详情。【之前都会自己跳转出来的&#xff0c;升级后真的是无厘头】 方案&#xff1a; 点击左侧导航栏最右边的图标——>…

Openlayers高级交互(14/20):汽车移动轨迹动画(开始、暂停、结束)

本示例演示在vue+openlayers中实现轨迹动画,这里设置了小汽车开始,暂停,结束等的控制键,采用了线段步长位置获取坐标来定位点的方式来显示小车的动态。 效果图 专栏名称内容介绍Openlayers基础实战 (72篇)专栏提供73篇文章,为小白群体提供基础知识及示例演示,能解决基…

【深入浅出】深入浅出Bert(附面试题)

本文的目的是为了帮助大家面试Bert&#xff0c;会结合我的面试经历以及看法去讲解Bert&#xff0c;并非完整的技术细致讲解&#xff0c;介意请移步。 深入浅出】深入浅出Bert&#xff08;附面试题&#xff09; 网络结构Pre-TrainingFine-Tuning 输入编码词向量编码句子编码位置…

保存暄桐的这份清福清单 让福气慢慢积累

鸿福易享&#xff0c;清福难得。真正的清福&#xff0c;不是多么好的物质基础&#xff0c;而是来源于内心的平安喜乐。生活于纷繁的现代社会&#xff0c;想获得清净的福德实属不易&#xff0c;于是&#xff0c;便更需要我们为自己创造条件。暄桐送你一份清福清单&#xff0c;当…

Profinet、Ethernet/IP 工业以太网无线通信解决方案

在工业现场&#xff0c;我们常常会面临这样的困扰&#xff1a;两个PLC之间、PLC 跟远程IO之间或者PLC 跟伺服之间由于种种原因不方便布线&#xff0c;严重影响了通讯效率和生产进程。为了解决这一难题&#xff0c;三格电子设计了一款工业以太网无线网桥&#xff0c;这款无线网桥…

【Flask】四、flask连接并操作数据库

目录 前言 一、 安装必要的库 二、配置数据库连接 三、定义模型 四、操作数据库 1.添加用户 2.删除用户 3.更新用户信息 4查询所有用户 五、测试结果 前言 在Flask框架中&#xff0c;数据库的操作是一个核心功能&#xff0c;它允许开发者与后端数据库进行交互&#xf…

NGINX 交叉编译 arm32

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 源码指引&#xff1a;github源…

如何在Linux下安装和配置Docker

文章目录 安装前的准备在Debian/Ubuntu上安装Docker添加Docker仓库安装Docker验证安装 在CentOS/RHEL上安装Docker安装必要的软件包设置Docker仓库安装Docker启动Docker服务 Docker的基本使用拉取一个镜像运行一个容器 配置Docker创建Docker目录使用非root用户运行Docker 结语 …

什么是感知器?

神经网络是松散地基于人脑结构的信号处理工具。它们通常与人工智能 (AI) 相关。我不喜欢“人工智能”这个词&#xff0c;因为它不且简单化。如果将“智能”定义为快速进行数值计算的能力&#xff0c;那么神经网络是人工智能。但在我看来&#xff0c;智能远不止于此——它是设计…

【js逆向学习】某多多anti_content逆向(补环境)

文章目录 声明逆向目标逆向分析逆向过程总结 声明 本文章中所有内容仅供学习交流使用&#xff0c;不用于其他任何目的&#xff0c;不提供完整代码&#xff0c;抓包内容、敏感网址、数据接口等均已做脱敏处理&#xff0c;严禁用于商业用途和非法用途&#xff0c;否则由此产生的…

Centos系统新增网卡后获取不到网卡的IP地址解决方法

一、问题描述 当我们给Centos系统添加了新的网卡后,使用查看IP地址命令【ip addr】时,发现新网卡没有获取到对应的IP地址信息,如下图所示: 二、解决方法 有两种解决方法:一种是自动获取IP地址;另外一种是手动配置IP地址; 2.1、自动获取IP地址 #自动获取网卡的IP地址命…

ARB链挖矿DApp系统开发模式定制

在区块链生态中&#xff0c;挖矿作为一种获取加密资产的方式&#xff0c;越来越受到关注。ARB链凭借其高效的性能和灵活的智能合约系统&#xff0c;成为了开发挖矿DApp的理想平台。本文将探讨ARB链挖矿DApp的开发模式定制&#xff0c;包括架构设计、功能实现以及最佳实践。 ARB…

Redis-06 Redis高可用集群架构原理与搭建

前面章节搭建了Redis【主-从】架构和【主-从】 哨兵架构&#xff0c;已可满足部分企业场景应用&#xff0c;但都有其对应的弊端&#xff0c;本章节将讲解更优生产架构&#xff1a;Redis集群架构。不仅包含哨兵架构的自动选举功能&#xff0c;还能降低主从架构下主节点的单节点压…

【Linux】一些Shell脚本编程基础题

目录 一、比较两个数的大小 二、求1-100的素数和 三、编写shell脚本&#xff0c;输入一个数字n并计算1~n的和&#xff0c;同时要求如果输入的数字小于1&#xff0c;则重新输入&#xff0c;直到输入正确的数字为止。 四、编写一个shell脚本用来进行成绩等级评定&#xff0c;…

Java代码实现PKCS5填充

1. 前言 如果你也在做加解密相关的需求&#xff0c;比如调用国密标准0018接口的对称加密/解密接口。就会遇到需要自己填充数据原文为16字节的整数倍&#xff08;因为SM4分组算法的加密数据长度必须是其密钥大小的整数倍&#xff0c;SM4密钥大小是128bit&#xff0c;即&#xff…

mv_zhao直线

# 线段检测例程 # # 这个例子展示了如何在图像中查找线段。对于在图像中找到的每个线对象&#xff0c; # 都会返回一个包含线条旋转的线对象。# find_line_segments()找到有限长度的线&#xff08;但是很慢&#xff09;。 # Use find_line_segments()找到非无限的线&#xff08…