Flink on yarn 实战和源码分析

news2024/11/17 5:24:21

版本:1.13.6

目录

Flink on yarn 的3种模式的使用

yarn session 模式源码分析

yarn per-job模式源码分析

application模式源码分析


Flink on yarn 的3种模式的使用

Application Mode #

./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar

Per-Job Mode #

./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

Session Mode #

./bin/flink run -t yarn-session ./examples/streaming/TopSpeedWindowing.jar

注意:application 方式 使用的action为 run-application,而 per-job session mode 都是 run -t。 为什么不统一呢,官方没注意到?

yarn session 模式源码分析

以样例TopSpeedWindowing.jar为例,命令行提交命令为:

./bin/flink run -t yarn-session ./examples/streaming/TopSpeedWindowing.jar

根据bin/flink脚本中的入口类 org.apache.flink.client.cli.CliFrontend

1、找到bin/flink 提交脚本中看到启动类即程序的入口是: org.apache.flink.client.cli.CliFrontend

2、查看其中的main方法,执行的逻辑简单总结如下:

  1. 获取flink的conf目录的路径

  2. 根据conf路径,加载配置

  3. 封装命令行接口:按顺序Generic、Yarn、Default   

/** Submits the job based on the arguments. */
public static void main(final String[] args) {
    EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

    // 1. find the configuration directory
    final String configurationDirectory = getConfigurationDirectoryFromEnv();

    // 2. load the global configuration
    final Configuration configuration =
            GlobalConfiguration.loadConfiguration(configurationDirectory);

    // 3. load the custom command lines
    final List<CustomCommandLine> customCommandLines =
            loadCustomCommandLines(configuration, configurationDirectory);

    int retCode = 31;
    try {
        final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

        SecurityUtils.install(new SecurityConfiguration(cli.configuration));
        retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
    } catch (Throwable t) {
        final Throwable strippedThrowable =
                ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
        LOG.error("Fatal error while running command line interface.", strippedThrowable);
        strippedThrowable.printStackTrace();
    } finally {
        System.exit(retCode);
    }
}

parseAndRun 方法中 根据命令行的第一个参数匹配action ,此模式参数为run ,所以 case ACTION_RUN:

/**
 * Parses the command line arguments and starts the requested action.
 *
 * @param args command line arguments of the client.
 * @return The return code of the program
 */
public int parseAndRun(String[] args) {

    // check for action
    if (args.length < 1) {
        CliFrontendParser.printHelp(customCommandLines);
        System.out.println("Please specify an action.");
        return 1;
    }

    // get action
    String action = args[0];

    // remove action from parameters
    final String[] params = Arrays.copyOfRange(args, 1, args.length);

    try {
        // do action
        switch (action) {
            case ACTION_RUN:
                run(params);
                return 0;
            case ACTION_RUN_APPLICATION:
                runApplication(params);
                return 0;
            case ACTION_LIST:
                list(params);
                return 0;
            case ACTION_INFO:
                info(params);
                return 0;
            case ACTION_CANCEL:
                cancel(params);
                return 0;
            case ACTION_STOP:
                stop(params);
                return 0;
            case ACTION_SAVEPOINT:
                savepoint(params);
                return 0;
            case "-h":
            case "--help":
                CliFrontendParser.printHelp(customCommandLines);
                return 0;
            case "-v":
            case "--version":
                String version = EnvironmentInformation.getVersion();
                String commitID = EnvironmentInformation.getRevisionInformation().commitId;
                System.out.print("Version: " + version);
                System.out.println(
                        commitID.equals(EnvironmentInformation.UNKNOWN)
                                ? ""
                                : ", Commit ID: " + commitID);
                return 0;
            default:
                System.out.printf("\"%s\" is not a valid action.\n", action);
                System.out.println();
                System.out.println(
                        "Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
                System.out.println();
                System.out.println(
                        "Specify the version option (-v or --version) to print Flink version.");
                System.out.println();
                System.out.println(
                        "Specify the help option (-h or --help) to get help on the command.");
                return 1;
        }
    } catch (CliArgsException ce) {
        return handleArgException(ce);
    } catch (ProgramParametrizationException ppe) {
        return handleParametrizationException(ppe);
    } catch (ProgramMissingJobException pmje) {
        return handleMissingJobException();
    } catch (Exception e) {
        return handleError(e);
    }
}

/**
 * Executions the run action.
 *
 * @param args Command line arguments for the run action.
 */
protected void run(String[] args) throws Exception {
    LOG.info("Running 'run' command.");
    //获取run的动作,默认的配置项
    final Options commandOptions = CliFrontendParser.getRunCommandOptions();
    //根据用户指定的配置项,进行解析 例如-t -p -c等
    final CommandLine commandLine = getCommandLine(commandOptions, args, true);

    // evaluate help flag,只要参数中包含help 打印后结束
    if (commandLine.hasOption(HELP_OPTION.getOpt())) {
        CliFrontendParser.printHelpForRun(customCommandLines);
        return;
    }

    final CustomCommandLine activeCommandLine =
            validateAndGetActiveCommandLine(checkNotNull(commandLine));

    final ProgramOptions programOptions = ProgramOptions.create(commandLine);

    //获取 用户的jar包和其他依赖
    final List<URL> jobJars = getJobJarAndDependencies(programOptions);

    //获取有效配置:HA的id、Target(session、per-job)、JobManager内存、TaskManager内存、每个TM的slot数
    final Configuration effectiveConfiguration =
            getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

    LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
    // PackagedProgram 类很关键,
    try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
        // 执行程序
        executeProgram(effectiveConfiguration, program);
    }
}

yarn per-job模式源码分析

以样例TopSpeedWindowing.jar为例,命令行提交命令为:

./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

application模式源码分析

Flink 1.11 版本引入了 Application 模式。

应用模式是唯一一个main方法不在客户端上执行的。其他两种模式都要在本地管理依赖资源,运行main方法然后生成JobGraph并提交到集群,不仅增加了网络传输的压力,还消耗了客户端大量的CPU资源。为了解决这个问题,应用模式先把用户jar包等资源提交到资源平台,然后创建Flink集群 并 自动在服务器上运行应用main方法。在main方法中可以通过execute或executeAysnc提交任务并记录提交作业id,作业执行完毕后,集群自动关闭。因此这种模式可以跟Per Job一样做到单任务的资源隔离,同时也可以解决客户端的计算瓶颈问题。

另外,应用模式支持提交多个作业,作业的顺序依赖于启动顺序。如果使用的是execute()会阻塞按顺序执行;如果使用的是executeAysnc,各个任务可能会乱序执行。

bin目录下 flink脚本内容,可以看到 入口类为 org.apache.flink.client.cli.CliFrontend

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break
    fi
    ls=`ls -ld -- "$target"`
    target=`expr "$ls" : '.* -> \(.*\)$'`
    iteration=$((iteration + 1))
done

# Convert relative path to absolute path
bin=`dirname "$target"`

# get flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

CC_CLASSPATH=`constructFlinkClassPath`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)

# Add Client-specific JVM options
FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_CLI}"

# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

使用application方式提交样例作业

hadoop1 jps

hadoop2 jps

hadoop3 jps

jobmanager

taskmanager -p 3 1slot/taskmanager 所以 需要启动3个container

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1047493.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Apollo简易地图制作

在Apollo中模拟障碍物 一、准备工作 在模拟障碍物之前&#xff0c;需要下载并编译Apollo源码&#xff0c;过程可以依据Apollo开放平台文档&#xff0c;其中可能遇到的问题在这里或许可以寻找到答案 二、运行Dreamview 进入容器 cd ~/apollobash docker/scripts/dev_start.s…

515万新作者投身电商事业,抖音电商将投入更多资源扶持作者长期发展

9月27日&#xff0c;2023抖音电商作者峰会在上海举办。上千位抖音电商作者、MCN机构、精选联盟服务商、商家等重要生态伙伴参会&#xff0c;围绕大会主题“向新成长”进行了深入探讨。会上&#xff0c;抖音电商总裁魏雯雯提到&#xff0c;电商作者的事业有更多发展方向。为助力…

输送机使用的常见误区

输送机也称流水线&#xff0c;是指在自动化生产过程中起到运输货物&#xff0c;联通各个生产设备的主要机械设备。但在使用的过程中&#xff0c;很多用户对于输送机的使用存在一定的误区&#xff0c;导致设备故障频出&#xff0c;下面就针对用户已在使用输送机过程中的常见误区…

以太网中的介质共享访问控制机制

什么是CSMA/CD CSMA/CD&#xff08;Carrier Sense Multiple Access with Collision Detection&#xff09;是一种用于以太网等共享介质的访问控制机制。它用于协调多个设备共享同一物理介质&#xff08;例如同一局域网&#xff09;上的传输权利&#xff0c;以避免碰撞并提供公…

mac安装python2

Python 2 于 2020 年 1 月 1 日宣布结束支持&#xff0c;包括 Homebrew 在内的许多项目和包管理器已经停止支持 Python 2。 如果现在你还要安装 Python 2&#xff0c;需要从 Python 官网下载安装包&#xff1a; 访问 Python 的发布页面。从页面底部找到 Python 2 的最后一个版…

tp8 Editor.md

Editor.md - 开源在线 Markdown 编辑器 放于public文件夹下 html代码&#xff1a; <div class"layui-col-md12" id"content"><textarea name"content" placeholder"详情" class"layui-textarea">{notempty nam…

【Unity的HDRP渲染管线搭建配置VR交互场景_SteamVR 插件和Pico串流助手_经验分享】

HDRP渲染管线配置VR交互场景 Unity创建场景和相关配置下载导入项目打开PICO串流助手在Pico中的配置:用Steam串流VR_这篇的前置补充 Unity创建场景和相关配置 带HDRP Sample Scene 示例的 下载 SteamVR Unity插件地址02 导入项目

GEO生信数据挖掘(二)下载基因芯片平台文件及注释

检索到目标数据集后&#xff0c;开始数据挖掘&#xff0c;本文以阿尔兹海默症数据集GSE1297为例 目录 下载平台文件 1.AnnotGPL参数改为TRUE,联网下载芯片平台的soft文件。&#xff08;国内网速奇慢经常中断&#xff09; 2.手工去GEO官网下载 转换芯片探针ID为gene name 拓…

ADS-B及雷达显示终端8.3

新版本功能升级主要有如下: 1、地图更新 在上一版本8.2中使用的高程地图为由SRTM经过地形晕渲后&#xff0c;生成地形图片&#xff0c;然后对图片进行贴图&#xff0c;一一按规定位置、大小将地形图贴至底图上&#xff0c;而后在底图上进行二维矢量地图的绘制&#xff0c;包括…

uniapp app 导出excel 表格

直接复制运行 <template><view><button click"tableToExcel">导出一个表来看</button><view>{{ successTip }}</view></view> </template><script>export default {data() {return {successTip: }},metho…

【面试高高手】—— SpringBoot(11题)

文章目录 1.什么是SpringBoot?2.为什么需要Spring Boot&#xff1f;3.SpringBoot的特征&#xff1f;4.SpringBoot的两个策略是什么&#xff1f;5.说一下SpringBoot的自动装配流程&#xff1f;6.说下什么是 Bean?7.什么是 CSRF 攻击&#xff1f;如何避免&#xff1f;8. Spring…

python ToastNotifier TypeError got Nonetype

这个错误没什么影响&#xff0c;只是在通知结束后会抛出 如果你实在不爽&#xff0c;办法如下&#xff1a; 找到"<你的python安装路径>\Lib\site-packages\win10toast"&#xff0c;里面应该有__main__.py和__init__.py两个文件&#xff0c;打开__init__.py 找到…

人工智能(AI)在产生新创意方面有多出色?

传统智慧一直不太擅长此道。发现新的创业机会、为未满足的需求提供解决方案&#xff0c;以及为新公司命名都是非结构化的任务&#xff0c;似乎不适合由算法来完成。然而&#xff0c;人工智能的最新进展——特别是像ChatGPT这样的大语言模型的出现——正在挑战这种假定。 我们教…

【ES6知识】Promise 对象

文章目录 1.1 概述1.2 静态方法1.3 实例方法1.4 Promise 拒绝事件 1.1 概述 Promise 对象用于表示一个异步操作的最终完成&#xff08;或失败&#xff09;及其结果值。是异步编程的一种解决方案&#xff08;可以解决回调地狱问题&#xff09;。 一个 Promise 对象代表一个在这…

Python实用技术——爬虫(二):爬虫需要使用的库

一&#xff0c;Requests库 1&#xff0c;主要使用方法&#xff1a; 1&#xff09;get&#xff08;&#xff09;方法&#xff1a; 这个Response对象中包含爬虫返回的内容。 除了request方法是基础方法外&#xff0c;其他都是通过调用request方法来实现的。 所以&#xff0c;我…

[React] React高阶组件(HOC)

文章目录 1.Hoc介绍2.几种包装强化组件的方式2.1 mixin模式2.2 extends继承模式2.3 HOC模式2.4 自定义hooks模式 3.高阶组件产生初衷4.高阶组件使用和编写结构4.1 装饰器模式和函数包裹模式4.2 嵌套HOC 5.两种不同的高阶组件5.1 正向的属性代理5.2 反向的继承 6.如何编写高阶组…

使用 Express 设置 GraphQL

使用 Express 设置 GraphQL 在本文中&#xff0c;我们将探讨如何在 Node.js 中设置 Express.js 和 GraphQL。另外&#xff0c;本文还将分享一些基本技巧&#xff0c;以确保我们的服务器已准备好投入实际使用&#xff01; 什么是 GraphQL GraphQL 是 API 的查询语言&#xff…

网络爬虫——urllib(1)

前言&#x1f36d; ❤️❤️❤️网络爬虫专栏更新中&#xff0c;各位大佬觉得写得不错&#xff0c;支持一下&#xff0c;感谢了&#xff01;❤️❤️❤️ 前篇简单介绍了什么是网络爬虫及相关概念&#xff0c;这篇开始讲解爬虫中的第一个库——urllib。 urllib&#x1f36d; …

Windows安装CMake详细教程(附学习资料)

CMake是一个跨平台的开源构建工具&#xff0c;用于自动化管理C项目的构建过程。本教程旨在向初学者介绍如何在Windows操作系统上安装CMake&#xff0c;并提供详细的步骤指导&#xff0c;帮助您顺利开始使用这个强大的工具。 学习资料在文末~ 步骤1&#xff1a;下载CMake安装程…

SW利用点光源来校核

先要建立坐标系&#xff0c;然后查这个坐标系的绝对坐标 然后删除其他光源&#xff0c;把环境光源降低最小 最后添加点光源&#xff0c;位置在之前查的坐标点