Flink源码解析之:Flink On Yarn模式任务提交部署过程解析

news2025/1/4 9:20:12

Flink源码解析之:Flink On Yarn模式任务提交部署过程解析

一、Flink on Yarn部署模式概述

Apache Hadoop YARN 在许多数据处理框架中都很流行。 Flink 服务提交给 YARN 的 ResourceManager,后者会在 YARN NodeManagers 管理的机器上生成容器。 Flink 将其 JobManager 和 TaskManager 实例部署到这些容器中。

Flink 可根据在 JobManager 上运行的作业所需的处理插槽数量,动态分配和取消分配 TaskManager 资源。

Flink on Yarn的部署模式包括三种方式,Application Mode、Per-Job Mode、Session Mode。对于生成环境来说,更推荐使用Application Mode或Per-Job Mode,因为这两种模式能够提供更好的应用隔离性。

  1. Application Mode
    Application Mode模式将在 YARN 上启动一个 Flink 集群,应用程序 jar 的 main() 方法将在 YARN 的 JobManager 上执行。 应用程序一旦完成,群集就会关闭。 该种方式相比Per-Job模式来说,将应用main()方法的执行,StreamGraph、JobGraph的生成放在了Flink集群侧来实现。
  2. Per-Job Mode
    Per-job 模式将在 YARN 上启动一个 Flink 集群,在客户端生成StreamGraph、JobGraph,并上传依赖项。最后将 JobGraph 提交给 YARN 上的 JobManager。 如果通过—detached参数配置了分离模式,则客户端将在提交被接受后立即停止。
  3. Session Mode
    Session部署模式会在YARN上部署一个长期运行的Flink集群会话,该会话可以接受并执行多个Flink作业。
    Session部署模式包含两种操作模式:
    • attach mode(default):执行yarn-session.sh文件在Yarn上启动Flink集群,启动后客户端会一致运行,来追踪/监听集群状态。一旦集群异常,客户端会获取异常信息并展示。如果客户端异常终止了,则会发送signal到Flink集群,此时Flink集群同样也会终止。
    • detach mode :使用-d or --detached参数设置。在这种模式下,当执行yarn-session.sh文件在Yarn上启动Flink集群后,客户端会直接返回。要停止 Flink 群集,需要再次调用客户端或 YARN 工具。

三种提交模式的对比:
bin/flink.sh脚本可知,客户端提交过程统一由org.apache.flink.client.cli.CliFronted入口类触发。Per-Job模式和Session模式下Flink应用main方法都会在客户端执行。客户端解析生成JobGraph后会将依赖项和JobGraph序列化后的二进制数据一起发往集群上。当客户端机器上有大量作业提交时,需要大量的网络带宽下载依赖项并将二进制文件发送到集群,会造成客户端消耗大量的资源。尤其在大量用户共享客户端时,问题更加突出。为解决该问题,社区提出了Application模式将Flink应用main方法触发过程后置到了JobManager生成过程中,以此将带宽压力分散到集群各个节点上。

鉴于Application部署模式的优势,本文会以Application部署模式的源码来进行解析,探究Flink以Application模式提交任务到Yarn集群中所经过的大致流程,为我们理解Flink On Yarn的部署有一个更深入和清晰的认识。

二、Flink Application部署模式源码解析

(一)CliFronted入口类

本节以Application部署模式为例,介绍Flink On Yarn的客户端提交源码流程。正如上文说的,由bin/flink.sh脚本可知,客户端提交过程统一由org.apache.flink.client.cli.CliFronted入口类触发,为此,我们首先进入到该方法的源码中,来观察下该入口类的实现逻辑:

/** Submits the job based on the arguments. */
public static void main(final String[] args) {
    EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

    // 1. find the configuration directory
    // 用来获取配置目录,该目录通常包含Flink的配置文件,如flink-conf.yaml。
    final String configurationDirectory = getConfigurationDirectoryFromEnv();

    // 2. load the global configuration
    // 加载flink的全局配置
    final Configuration configuration =
            GlobalConfiguration.loadConfiguration(configurationDirectory);

    // 3. load the custom command lines
    // 加载自定义的命令行配置
    final List<CustomCommandLine> customCommandLines =
            loadCustomCommandLines(configuration, configurationDirectory);

    int retCode = 31;
    try {
		// 实例化了CliFronted对象,CliFronted是Flink为CLI客户端提供的API,它提供了一系列的操作,例如作业的提交,取消,以及打印job的状态等。
        final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

        SecurityUtils.install(new SecurityConfiguration(cli.configuration));
        // 启动Flink作业的入口,parseAndRun方法会解析命令行参数,并启动Flink作业。
		retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
    } catch (Throwable t) {
        final Throwable strippedThrowable =
                ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
        LOG.error("Fatal error while running command line interface.", strippedThrowable);
        strippedThrowable.printStackTrace();
    } finally {
        System.exit(retCode);
    }
}

上述代码是CliFronted的入口main方法,该方法首先根据Flink的配置路径加载全局配置,比如flink-conf.xml配置文件,接着加载自定义命令行配置,并实例化了CliFronted对象。CliFronted是Flink为CLI客户端提供的API,它提供了一系列的操作,例如作业的提交,取消,以及打印job的状态等。最后,调用cli.parseAndRun(args)方法,该方法会解析命令行参数,并启动Flink作业。

parseAndRun方法中,会根据传入参数的第一个参数值来决定Flink集群的部署模式:

  • run-application:则会进入到CliFronted类的runApplication方法中,执行Application部署流程。
  • run:则会进入到CliFronted类的run方法中,在客户端执行作业的main方法(利用反射来执行)

这也是为什么我在使用命令行以Application模式部署Flink集群时,命令的开始要用以下形式:

/bin/flink run-application -t yarn-application...

(二)runApplication

接下来,我们继续进入到runApplicaiton方法来看看它的实现逻辑:

protected void runApplication(String[] args) throws Exception {
    LOG.info("Running 'run-application' command.");

	// 解析传入的命令行参数
    final Options commandOptions = CliFrontendParser.getRunCommandOptions();
    final CommandLine commandLine = getCommandLine(commandOptions, args, true);

	// 如果命令行参数中包含帮助选项(-h/--help),则调用下述方法打印帮助信息并返回
    if (commandLine.hasOption(HELP_OPTION.getOpt())) {
        CliFrontendParser.printHelpForRunApplication(customCommandLines);
        return;
    }

	// 验证并获取激活的自定义命令行, CustonCommandLine是Flink用来处理不同部署模式的工具(例如Yarn,Standlone等),以便针对不同模式解析对应的特定设置和参数
    final CustomCommandLine activeCommandLine =
            validateAndGetActiveCommandLine(checkNotNull(commandLine));

	// 初始化ApplicationClusterDeployer实例, 这是Flink用来启动Application的工具
    final ApplicationDeployer deployer =
            new ApplicationClusterDeployer(clusterClientServiceLoader);

    final ProgramOptions programOptions;
    final Configuration effectiveConfiguration;

    // No need to set a jarFile path for Pyflink job.
    if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
        programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
        effectiveConfiguration =
                getEffectiveConfiguration(
                        activeCommandLine,
                        commandLine,
                        programOptions,
                        Collections.emptyList());
    } else {
        programOptions = new ProgramOptions(commandLine);
        programOptions.validate();
        final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
        effectiveConfiguration =
                getEffectiveConfiguration(
                        activeCommandLine,
                        commandLine,
                        programOptions,
                        Collections.singletonList(uri.toString()));
    }

	// 根据programOptions获取程序参数和入口类名来创建ApplicationConfiguration实例
    final ApplicationConfiguration applicationConfiguration =
            new ApplicationConfiguration(
                    programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
	// 最后调用deployer.run()来运行应用。这一步通常包括联系Flink集群,提交应用程序并安排其在集群中执行。
    deployer.run(effectiveConfiguration, applicationConfiguration);
}

上述代码的实现流程与原理如下所示:

  • 解析命令行参数:首先,调用getCommandLine函数解析传入的命令行参数args。

  • 处理帮助选项:如果命令行参数中包含帮助选项(-h/–help),则调用CliFrontendParser.printHelpForRunApplication打印帮助信息并返回。

  • 获取激活的CustomCommandLine:通过validateAndGetActiveCommandLine函数获取激活的自定义命令行(CustomCommandLine)。CustomCommandLine是Flink用来处理不同部署模式的工具(例如Yarn,Standalone等),以便于针对不同模式解析对应的特定设置和参数。

  • 部署器配置:初始化ApplicationClusterDeployer实例,这是Flink用来启动Application的工具。

  • 提取程序选项和计算有效配置:区分Python作业和其他作业,生成对应的ProgramOptions并验证其有效性。此外,根据激活的命令行、解析得到的命令行参数和程序选项计算出有效的配置(effectiveConfiguration)。

  • 构造应用配置:使用从ProgramOptions中获取的程序参数和入口点类名创建ApplicationConfiguration实例。

  • 运行应用:最后,调用deployer.run()来运行应用。这一步通常包括联系Flink集群,提交应用程序并安排其在集群中执行。

ProgramOptions.entryPointClass的成员值是flink命令行 -c 选项指定的Flink应用入口类com.xxx.xxx.FlinkApplicationDemo,后续会以反射的形式触发main()方法的执行。

(三)ClusterDescriptor.deployApplicationCluster

上面代码中deployer.run(...)方法负责加载Yarn Application模式客户端信息等。

首先代码会根据configuration配置信息来获取ClusterClientFactory对象,获取的逻辑过程是根据configuration配置中的execution.target参数来决定的。

当执行命令行bin/flink run时, execution.target参数对应的枚举值可以如下:

  • remote
  • local
  • yarn-per-job
  • yarn-session
  • kubernetes-session
    当执行命令行bin/flink run-application时,execution.target参数对应的枚举值可以如下:
  • yarn-application
  • kubernetes-application

execution.target参数为yarn-application时,Flink便会生成相应的YarnClusterClientFactory客户端工厂类,然后调用该工厂类的createClusterDescriptor方法,该方法中会新建YarnClient实例,YarnClient实例负责在客户端提交Flink应用程序,并最终生成ClusterDescriptor实例,该实例包含用于在Yarn上部署Flink集群的部署信息Descriptor。

@Override
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
    checkNotNull(configuration);

    final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
    YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);

    return getClusterDescriptor(configuration);
}

private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
    final YarnClient yarnClient = YarnClient.createYarnClient();
    final YarnConfiguration yarnConfiguration =
            Utils.getYarnAndHadoopConfiguration(configuration);

    yarnClient.init(yarnConfiguration);
    yarnClient.start();

    return new YarnClusterDescriptor(
            configuration,
            yarnConfiguration,
            yarnClient,
            YarnClientYarnClusterInformationRetriever.create(yarnClient),
            false);
}

有了该实例后,会调用deployApplicationCluster方法来部署Application模式的Flink集群。集群将在提交应用程序时创建,并在应用程序终止时拆除。此外,应用程序用户代码的{@code main()}将在集群上执行,而不是在客户端上执行。

YarnClusterDescriptor.deployApplicationCluster(…)方法调用过程如下:
(1)、YarnClusterDescriptor.deployApplicationCluster(…);进行一些配置和检查,并调用deployInternal(…)方法。
(2)、YarnClusterDescriptor.deployInternal(…);

其中,最重要的方法是deployInternal方法

(四)YarnClusterDescriptor.deployInternal

在该方法中,首先会判断Hadoop集群是否启用了Kerberos安全认证,如果开启了,则Flink会首先确认当前用户是否拥有有效的kerberos凭证。如果无效,则会抛出异常,部署作业失败。

紧接着,进行资源检查和部署模式判断。

validateClusterResources方法中,会根据配置的JobManager和TaskManager的资源大小与集群资源进行比对。

  1. 如果JobManager的配置内存大小 < Yarn配置的最小调度分配内存(yarn.scheduler.minimum-allocation-mb参数,默认1024MB),则JobManager的内存大小会设置为该配置值。
  2. 如果JobManager大小 > YARN 集群能够提供的单个容器的最大资源,则抛出异常:The cluster does not have the requested resources for the JobManager available!
  3. 如果TaskManager大小 > YARN 集群能够提供的单个容器的最大资源,则抛出异常:The cluster does not have the requested resources for the TaskManagers available!
  4. 如果TaskManager大小 > 当前YARN集群剩余资源单个任务容器分配的最大资源容量,则会打印告警日志:The requested amount of memory for the TaskManagers is more than the largest possible YARN container: freeClusterResources.containerLimit
  5. 如果JobManager大小 > 当前YARN集群剩余资源单个任务容器分配的最大资源容量,则会打印告警日志:The requested amount of memory for the JobManager is more than the largest possible YARN container: freeClusterResources.containerLimit

经过资源检查后,会将最后确定的JobManager和TaskManager资源保存在ClusterSpecification对象中。

在部署模式决定中,Flink 提供了两种部署模式:Detach模式和Non-Detach模式。如果是 Detach模式,Flink 作业提交到YARN后,客户端可以直接退出,而作业将继续在YARN集群上运行。而在 Non-Detach模式下,客户端将持续等待作业执行完成。

然后就到了一个非常重要的方法中:startAppMaster

会根据上面决定的ClusterSpecification资源实例,启动用于管理Flink作业的Application Master。

startAppMaster方法比较长。

这段代码主要是用于启动Flink在YARN集群上的Application Master的过程,代码中包含了几个主要部分:

  1. 首先,核心的首步骤是初始化文件系统并获取对应的 FileSystem 实例。代码检查了文件系统的类型,如果是本地文件系统类型(file://开头),会抛出警告,因为Flink在YARN上运行需要分布式文件系统来存储文件。
  2. 然后,获取了用于提交应用程序的 ApplicationSubmissionContext,并将 Flink 应用所需的各种文件如jar包、配置文件等上传到HDFS,并将这些文件的HDFS路径作为本地资源 (LocalResources)添加到ApplicationSubmissionContext里。
  3. 在文件上传阶段,包括了一系列复杂的步骤,首先是将 flink 配置、job graph、用户 jar、依赖库等上传到HDFS,并将这些文件的路径添加到应用的classpath;其次,如果设置了 security options(例如,Kerberos认证信息),会将相关文件也上传到HDFS;并且,对配置了Kerberos认证的 flink 应用,会从 YARN 获取 HDFS delegation tokens。
  4. 在收集完上述一系列依赖文件后,final ContainerLaunchContext amContainer = setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec) 负责设置启动ApplicationMaster的命令操作。
  5. 设置ApplicationMaster的环境变量,诸如_FLINK_CLASSPATH、_FLINK_DIST_JAR(Flink jar resource location (in HDFS))、KRB5_PATH、_YARN_SITE_XML_PATH等环境变量。最后调用amContainer.setEnvironment(appMasterEnv);方法进行设置。
  6. 接着,会将上述配置好的amContainer实例放入ApplicationSubmissionContext对象中,以及ApplicationName和所需的资源大小,最终交给交给YarnClient去提交,并随后通过周期性地获取应用状态,来等待应用处于RUNNING或FINISHED状态,完成应用的提交过程。
  7. 如果在这一系列操作中有任何异常或错误发生,会触发失败保护钩子 DeploymentFailureHook,进行必要的清理工作。

上面这段代码体现了 Flink on YARN 的工作原理,Flink 通过 YARN Client 提交应用,启动 Application Master 来进行资源申请和任务调度,这是典型的 YARN 应用程序模型。各种文件(包括 flink 本身、用户 jar、配置文件等)都被上传到HDFS,然后再从HDFS分发到运行任务的 YARN 容器中,这样做是为了实现文件的分布式共享,并且利用了 YARN 的 LocalResource 机制来进行文件的分发。

对于第四点中的setupApplicationMasterContainer方法,该方法构造了ApplicationMaster的命令行启动命令,如下所示:

ContainerLaunchContext setupApplicationMasterContainer(
        String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {
    // ------------------ Prepare Application Master Container  ------------------------------

    // respect custom JVM options in the YAML file
    String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
    if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
        javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
    }

    // krb5.conf file will be available as local resource in JM/TM container
    if (hasKrb5) {
        javaOpts += " -Djava.security.krb5.conf=krb5.conf";
    }

    // Set up the container launch context for the application master
    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

    final Map<String, String> startCommandValues = new HashMap<>();
    startCommandValues.put("java", "$JAVA_HOME/bin/java");

    String jvmHeapMem =
            JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
    startCommandValues.put("jvmmem", jvmHeapMem);

    startCommandValues.put("jvmopts", javaOpts);
    startCommandValues.put(
            "logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));

    startCommandValues.put("class", yarnClusterEntrypoint);
    startCommandValues.put(
            "redirects",
            "1> "
                    + ApplicationConstants.LOG_DIR_EXPANSION_VAR
                    + "/jobmanager.out "
                    + "2> "
                    + ApplicationConstants.LOG_DIR_EXPANSION_VAR
                    + "/jobmanager.err");
    String dynamicParameterListStr =
            JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);
    startCommandValues.put("args", dynamicParameterListStr);

    final String commandTemplate =
            flinkConfiguration.getString(
                    ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
                    ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
    final String amCommand =
            BootstrapTools.getStartCommand(commandTemplate, startCommandValues);

    amContainer.setCommands(Collections.singletonList(amCommand));

    LOG.debug("Application Master start command: " + amCommand);

    return amContainer;
}

启动命令的参数包括以下部分:

  • “java”:Java二进制文件的路径。一般来说,在YARN容器中,Java的路径会被设置为$JAVA_HOME/bin/java。
  • “jvmmem”:JVM参数,主要是内存参数,比如最大堆内存、最小堆内存等。这些参数会基于Flink配置以及JobManager的内存配置来生成。
  • “jvmopts”:JVM选项。这些选项来自Flink配置文件中设置的JVM选项,以及若存在Kerberos krb5.conf文件,还会添加-Djava.security.krb5.conf=krb5.conf。
  • “logging”:日志配置项,用于配置Flink的日志选项。
  • “class”:启动类,即YARN集群入口点类名(yarnClusterEntrypoint)。
  • “redirects”:输出重定向的参数,将stdout(输出流)和stderr(错误流)重定向到日志文件中。
  • “args”:传递给启动类的参数,主要是JobManager的动态配置参数。

⠀这些参数最后会填入一个启动命令模板(通常为"%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%"),来生成实际启动Flink应用的命令。

启动后的ApplicationMaster,在YARN集群上起着以下的关键作用:

  • 作为应用程序的主控制器,管理和监视应用程序的执行。
  • 负责请求YARN ResourceManager分配所需的资源(例如容器)。
  • 启动和监视任务执行器(TaskExecutor),它们在分配的容器中运行。
  • 与Flink的client(例如命令行界面或Web界面)以及ResourceManager进行交互,提供应用程序的状态和进度信息。
  • 在应用程序出现异常或失败时,它可以选择重新请求资源并重启失败的任务,提供了一定程度的错误恢复能力。

⠀因此,Application Master是Flink在YARN上运行的关键组件,它负责管理Flink应用程序的生命周期和资源。

(五)YarnApplicationClusterEntryPoint

在上面的setupApplicationMasterContainer方法中,我们说该方法构建了ApplicationMaster的启动命令。从该命令行中可以看到,命令行的启动入口类为yarnClusterEntrypoint参数,对于Yarn Application部署模式来说,参数对应的入口类即为YarnApplicationClusterEntryPoint。在第四部分的分析中,当通过yarnClient将ApplicationMaster提交到Yarn集群后,便会申请Container来执行ApplicationMaster,执行该入口类。
为此,接下来我们来分析一下,YarnApplicationClusterEntryPoint入口类的执行逻辑。

public static void main(final String[] args) {
    // startup checks and logging
    EnvironmentInformation.logEnvironmentInfo(
            LOG, YarnApplicationClusterEntryPoint.class.getSimpleName(), args);
    SignalHandler.register(LOG);
    JvmShutdownSafeguard.installAsShutdownHook(LOG);

    Map<String, String> env = System.getenv();
	
	// 获取工作路径
    final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
    Preconditions.checkArgument(
            workingDirectory != null,
            "Working directory variable (%s) not set",
            ApplicationConstants.Environment.PWD.key());

    try {
        YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
    } catch (IOException e) {
        LOG.warn("Could not log YARN environment information.", e);
    }

    final Configuration dynamicParameters =
            ClusterEntrypointUtils.parseParametersOrExit(
                    args,
                    new DynamicParametersConfigurationParserFactory(),
                    YarnApplicationClusterEntryPoint.class);
    final Configuration configuration =
            YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);

    PackagedProgram program = null;
    try {
		// 获取用户应用程序jar,程序参数、入口类名等信息,封装为PackagedProgram实例
        program = getPackagedProgram(configuration);
    } catch (Exception e) {
        LOG.error("Could not create application program.", e);
        System.exit(1);
    }

    try {
        configureExecution(configuration, program);
    } catch (Exception e) {
        LOG.error("Could not apply application configuration.", e);
        System.exit(1);
    }

    YarnApplicationClusterEntryPoint yarnApplicationClusterEntrypoint =
            new YarnApplicationClusterEntryPoint(configuration, program);

    // 	执行Application Cluster
ClusterEntrypoint.runClusterEntrypoint(yarnApplicationClusterEntrypoint);
}

上面这段代码中,使用getPackagedProgram(configuration)方法获取用户应用程序jar,程序参数、入口类名等信息,封装为PackagedProgram实例,便于后续调用。

最后,调用runClusterEntrypoint方法,启动执行Application Cluster集群。

ClusterEntrypoint.runClusterEntrypoint(...)方法的调用链路如下:

  • ClusterEntrypoint.runClusterEntrypoint(...)
  • ClusterEntrypoint.startCluster(...)
  • ClusterEntrypoint.runCluster(...)
  • DispatcherResourceManagerComponentFactory.create(…)

DispatcherResourceManagerComponentFactory.create方法中,启动了一系列服务,比如:

  • LeaderRetrievalService
  • WebMonitorEndpoint
  • ResourceManagerService
  • DispatcherRunner

本流程中主要需要关注的服务是DispatcherRunner,该方法中,会调用dispatcherRunnerFactory.createDispatcherRunner来初始化dispatchRunner实例,dispatcherRunner实例负责dispatcher组件的高可用leader选举操作,同时dispatcher组件负责触发Flink用户应用main(…)方法执行。

在创建DispatchRunner的过程中,包含高可用Leader选举过程,经过一系列的方法链调用,会选举出一个Leader DispatchRunner服务来负责后续的处理流程。

  • DispatcherResourceManagerComponentFactory.createDispatcherRunner
  • DefaultDispatcherRunner.create()
  • DispatcherRunnerLeaderElectionLifecycleManager.createFor()
  • DefaultLeaderElectionService.start()
  • LeaderElectionDriverFactory.createLeaderElectionDriver()
  • new ZooKeeperLeaderElectionDriver
  • LeaderLatch.start()
  • LeaderLatch.internalStart()
  • LeaderLatch.reset()
  • LeaderLatch.setLeadership()
  • ZooKeeperLeaderElectionDriver.isLeader()
  • DefaultLeaderElectionService.onGrantLeadership()
  • DefaultDispatcherRunner.grantLeadership()
  • DefaultDispatcherRunner.startNewDispatcherLeaderProcess()

选举为leader的DefaultDispatcherRunner实例候选者在回调动作过程中会一直调用到上面的grantLeadership(…)方法,并在startNewDispatcherLeaderProcess(…)方法中生成dispatcherLeaderProcess,表示一个Ledaer Dispatcher进程来提供服务,并通过newDispatcherLeaderProcess::start方法来启动执行该服务的后续处理流程。Leader候选者回调动作触发过程会另起篇幅详细讲解,此处先这样理解。

在后续的处理流程中,我们需要关注的点是在何时触发用户应用程序的main方法执行,为此,继续深入以下调用链:

  • AbstractDispatcherLeaderProcess.startInternal()
  • SessionDispatcherLeaderProcess.onStart()
  • SessionDispatcherLeaderProcess.createDispatcherIfRunning()
  • SessionDispatcherLeaderProcess.createDispatcher()
  • ApplicationDispatcherGatewayServiceFactory.create()
  • new ApplicationDispatcherBootstrap(...)

上述调用链中,createDispatcher(…)方法会调用dispatcherGatewayServiceFactory.create(…)方法,dispatcherGatewayServiceFactory实际类型是ApplicationDispatcherGatewayServiceFactory。在dispatcherGatewayServiceFactory.create(…)方法中新建ApplicationDispatcherBootstrap实例。

在ApplicationDispatcherBootstrap实例中,继续通过以下方法调用链fixJobIdAndRunApplicationAsync(…) -> runApplicationAsync(…) -> runApplicationEntryPoint(…) -> ClientUtils.executeProgram(…) -> program.invokeInteractiveModeForExecution() -> callMainMethod(mainClass, args) -> mainMethod.invoke(null, (Object) args)触发Flink应用main(…)方法的执行。

  • ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync()
  • ApplicationDispatcherBootstrap.runApplicationAsync()
  • ApplicationDispatcherBootstrap.runApplicationEntryPoint()
  • ClientUtils.executeProgram()
  • PackagedProgram.invokeInteractiveModeForExecution()
  • PackagedProgram.callMainMethod()
  • mainMethod.invoke(null, (Object) args);

最终,在ApplicationDispatcherBootstrap类的实现中,我们找到了用户应用程序的main方法执行入口。

三、回顾与总结

回顾一下上面的整体流程,首先,我们通过ApplicationMaster的启动命令,找到AM组建执行的入口类为YarnApplicationClusterEntryPoint,接着,在启动集群时,我们发现Flink会初始化一些诸如LeaderRetrievalService、WebMonitorEndpoint、ResourceManagerService、DispatcherRunner的服务,这些服务分别发挥不同的用途,与Yarn和Flink集群进行交互。在本次分析过程中,我们着重探究了DispatcherRunner服务的创建流程。

首先,会执行高可用的选举流程,最终选举出一个Leader DispatcherRunner来执行服务。选举完成后,该Leader DispatchRunner会调用ClientUtils.executeProgram方法,从封装好的PackagedProgram实例中,获取用户应用程序的入口类mainClass以及程序入参,并最终利用反射触发mainClass的main方法的执行,完成用户自定义Flink应用的执行。

以上就是主要的Flink On Yarn客户端作业的提交过程解析。这个提交过程相对来说还是比较复杂的,包含着很多部署配置参数,资源以及权限的校验和分配,ApplicationMaster的提交启动,并伴随AM启动后执行的一系列Flink服务初始化,以及我们关心的用户应用程序的调用入口,发现了在Application的部署模式下,用户应用程序的调用是在集群侧,也就是Leader DispatchRunner服务中完成的。

当然,DispatchRunner服务负责的任务远不止于此,上述流程中还有更多的细节等待我们去挖掘和学习,这篇文章可能只是让我们对提交流程有了一个初步的大体认识,对于更多深入的部分,需要我们不断思考不断挖掘,也欢迎大家交流观点和看法,感谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2269115.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

uniapp:微信小程序文本长按无法出现复制菜单

一、问题描述 在集成腾讯TUI后&#xff0c;为了能让聊天文本可以复制&#xff0c;对消息组件的样式进行修改&#xff0c;主要是移除下面的user-select属性限制&#xff1a; user-select: none;-webkit-user-select: none;-khtml-user-select: none;-moz-user-select: none;-ms…

2025:OpenAI的“七十二变”?

朋友们&#xff0c;准备好迎接AI的狂欢了吗&#xff1f;&#x1f680; 是不是跟我一样&#xff0c;每天醒来的第一件事就是看看AI领域又有什么新动向&#xff1f; 尤其是那个名字如雷贯耳的 OpenAI&#xff0c;简直就是AI界的弄潮儿&#xff0c;一举一动都牵动着我们这些“AI发…

无人机频射信号检测数据集,平均正确识别率在94.3%,支持yolo,coco json,pasical voc xml格式的标注,364张原始图片

无人机频射信号检测数据集&#xff0c;平均正确识别率在94.3&#xff05;&#xff0c;支持yolo&#xff0c;coco json&#xff0c;pasical voc xml格式的标注&#xff0c;364张原始图片 可识别下面的信号&#xff1a; 图像传输信号LFST &#xff08;Image_Transmission_sign…

柱状图中最大的矩形 - 困难

************* c topic: 84. 柱状图中最大的矩形 - 力扣&#xff08;LeetCode&#xff09; ************* chenck the topic first: Think about the topics I have done before. the rains project comes:盛最多水的容器 - 中等难度-CSDN博客https://blog.csdn.net/ElseWhe…

第17篇 使用数码管实现计数器___ARM汇编语言程序<四>

Q&#xff1a;如何使用定时器实现数码管循环计数器&#xff1f; A&#xff1a;DE1-SoC_Computer系统有许多硬件定时器&#xff0c;本次实验使用A9 Private Timer定时器实现延时&#xff1a;定时器首先向Load寄存器写入计数值&#xff0c;然后向Control寄存器中的使能位E写1来启…

SSM 进销存系统

&#x1f942;(❁◡❁)您的点赞&#x1f44d;➕评论&#x1f4dd;➕收藏⭐是作者创作的最大动力&#x1f91e; &#x1f496;&#x1f4d5;&#x1f389;&#x1f525; 支持我&#xff1a;点赞&#x1f44d;收藏⭐️留言&#x1f4dd;欢迎留言讨论 &#x1f525;&#x1f525;&…

通过Cephadm工具搭建Ceph分布式存储以及通过文件系统形式进行挂载的步骤

1、什么是Ceph Ceph是一种开源、分布式存储系统&#xff0c;旨在提供卓越的性能、可靠性和可伸缩性。它是为了解决大规模数据存储问题而设计的&#xff0c;使得用户可以在无需特定硬件支持的前提下&#xff0c;通过普通的硬件设备来部署和管理存储解决方案。Ceph的灵活性和设计…

【Rust自学】8.4. String类型 Pt.2:字节、标量值、字形簇以及字符串的各类操作

8.4.0. 本章内容 第八章主要讲的是Rust中常见的集合。Rust中提供了很多集合类型的数据结构&#xff0c;这些集合可以包含很多值。但是第八章所讲的集合与数组和元组有所不同。 第八章中的集合是存储在堆内存上而非栈内存上的&#xff0c;这也意味着这些集合的数据大小无需在编…

svn分支相关操作(小乌龟操作版)

在开发工作中进行分支开发&#xff0c;涉及新建分支&#xff0c;分支切换&#xff0c;合并分支等 新建远程分支 右键选择branch/tagert按钮 命名分支的路径名称 点击确定后远程分支就会生成一个当时命名的文件夹&#xff08;开发分支&#xff09; 分支切换 一般在开发阶段&a…

24年收尾之作------动态规划<六> 子序列问题(含对应LeetcodeOJ题)

目录 引例 经典LeetCode OJ题 1.第一题 2.第二题 3.第三题 4.第四题 5.第五题 6.第六题 7.第七题 引例 OJ传送门 LeetCode<300>最长递增子序列 画图分析: 使用动态规划解决 1.状态表示 dp[i]表示以i位置元素为结尾的子序列中&#xff0c;最长递增子序列的长度 2.…

蓝牙|软件 Qualcomm S7 Sound Platform开发系列之初级入门指南

本文适用范围 ADK24.2~ 问题/功能描述 S7开发环境搭建与编译介绍 实现方案 本文介绍适用于windows平台Application部分,audio ss的说明会在下一篇文章在做说明,Linux平台如果不进行AI算法的开发,个人认知是没有必要配置,若是做服务器倒是不错的选择.因为编译完成后烧录调试还…

Redis - 4 ( 9000 字 Redis 入门级教程 )

一&#xff1a; Zset 有序集合 1.1 常用命令 有序集合在 Redis 数据结构中相较于字符串、列表、哈希和集合稍显陌生。它继承了集合中元素不允许重复的特点&#xff0c;但与集合不同的是&#xff0c;有序集合的每个元素都关联一个唯一的浮点分数&#xff08;score&#xff09;…

ubuntu 使用samba与windows共享文件[注意权限配置]

在Ubuntu上使用Samba服务与Windows系统共享文件&#xff0c;需要正确配置Samba服务以及相应的权限。以下是详细的步骤&#xff1a; 安装Samba 首先&#xff0c;确保你的Ubuntu系统上安装了Samba服务。 sudo apt update sudo apt install samba配置Samba 安装完成后&#xff0c…

打印进度条

文章目录 1.Python语言实现(1)黑白色(2)彩色&#xff1a;蓝色 2.C语言实现(1)黑白颜色(2)彩色版&#xff1a;红绿色 1.Python语言实现 (1)黑白色 import sys import timedef progress_bar(percentage, width50):"""打印进度条:param percentage: 当前进度百分比…

深度解析 LDA 与聚类结合的文本主题分析实战

🌟作者简介:热爱数据分析,学习Python、Stata、SPSS等统计语言的小高同学~🍊个人主页:小高要坚强的博客🍓当前专栏:《Python之文本分析》🍎本文内容:深度解析 LDA 与聚类结合的文本主题分析实战🌸作者“三要”格言:要坚强、要努力、要学习 目录 引言 技术框架…

点跟踪基准最早的论文学习解读:TAP-Vid: A Benchmark for Tracking Any Point in a Video—前置基础

TAP-Vid: A Benchmark for Tracking Any Point in a Video— TAP-Vid&#xff1a;跟踪视频中任意点的基准、 学习这一篇文章的本来的目的是为了学习一下TAP-NET便于理解后面用到的TAPIR方法的使用。 文章目录 TAP-Vid: A Benchmark for Tracking Any Point in a Video— TAP-V…

C进阶-字符串与内存函数介绍(另加2道典型面试题)

满意的话&#xff0c;记得一键三连哦&#xff01; 我们先看2道面试题 第一道&#xff1a; 我们画图理解&#xff1a; pa&#xff0c;先使用再&#xff0c;pa开始指向a【0】&#xff0c;之后pa向下移动一位&#xff0c;再解引用&#xff0c;指向a【1】&#xff0c;a【1】又指向…

PH47代码框架 24241231 重要更新

仪式感一下&#xff1a;2024年最后一天&#xff0c;发布 PH47 代码框架的一次重要更新。当然这并不是有意的&#xff0c;而是直到现在才把更新的所有工作全部做完&#xff08;希望确实如此&#xff09;。 本次更新要点&#xff1a; 1、加入多IMU支持。本次更新正式加入对 MPU65…

idea报错:There is not enough memory to perform the requested operation.

文章目录 一、问题描述二、先解决三、后原因&#xff08;了解&#xff09; 一、问题描述 就是在使用 IDEA 写代码时&#xff0c;IDEA 可能会弹一个窗&#xff0c;大概提示你目前使用的 IDEA 内存不足&#xff0c;其实就是提醒你 JVM 的内存不够了&#xff0c;需要重新分配。弹…

Python用K-Means均值聚类、LRFMC模型对航空公司客户数据价值可视化分析指标应用|数据分享...

全文链接&#xff1a;https://tecdat.cn/?p38708 分析师&#xff1a;Yuling Fang 信息时代的来临使得企业营销焦点从产品中心转向客户中心&#xff0c;客户关系管理成为企业的核心问题&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 客户关系管理的关键是客…