深入理解 Flink(六)Flink Job 提交和 Flink Graph 详解

news2024/11/16 2:15:18

Flink Program 编程套路回顾

1、获取执行环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、通过执行环境对象,注册数据源 Source,得到数据抽象
DataStream ds = env.socketTextStream(...)
3、调用数据抽象的各种Transformation执行逻辑计算
DataStream resultDS = ds.flatMap(...).keyBy(...).sum(...);
4、将各种Transformation执行完毕之后得到的计算结果数据抽象注册 Sink
resultDS.addSink(...)
5、提交Job执行
env.execute(...)

Flink Job 提交脚本解析

# Submission to an already running Flink on YARN cluster
./bin/flink run --target yarn-session
# Submission spinning up a Flink on YARN cluster in Per-Job Mode
./bin/flink run --target yarn-per-job
# Submission spinning up Flink on YARN cluster in Application Mode
./bin/flink run-application --target yarn-application

具体可以参考官网:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#advanced-cli

CliFrontend 提交分析

当用户把 Flink 应用程序打成 jar 使用 flink run … 的 shell 命令提交的时候,底层是通过 CliFrontend 来处理。底层的逻辑,就是通过反射来调用用户程序的 main() 方法执行。
需要注意的是,Application 模式下,会通过 YarnClusterDescriptor.deployInternal 方法在 yarn 中部署一个 application 集群,返回 YarnRestClusterClient 对象。yarn 中会启动一个 EmbeddedJobClient,执行 submitJob 方法提交 jobGraph。

ExecutionEnvironment 源码解析

StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:

1、提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源。
2、提供了 setParallelism() 设置应用程序的并行度。
3、StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责 Job 执行的一些行为配置管理。还管理了 Configuration 管理一些其他的配置。这个所谓的其他配置,还包含了 Checkpoint 的配置,这个 chekcpoint 的配置参数,会单独解析出来,存储在 CheckpontConfig 中
4、StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations 成员变量,该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些 Transformation 按照逻辑拼接起来,就能得到 StreamGragh, 注意转换顺序:
UserFunction ==> StreamOperator ==> Transformation ==> StreamNode
5、StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的参数就是:StreamGraph

Flink on YARN Per-job 模式提交流程分析

入口类:ApplicatoinMaster: YarnJobClusterEntryPoint
在这里插入图片描述
在这里插入图片描述

Job提交流程源码分析

getStreamGraph(jobName) 生成 StreamGraph 解析

// 入口
StreamGraph streamGraph = getStreamGraph(jobName, true){
    // 通过 StreamGraphGenerator 来生成 StreamGraph
    StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate(){
        streamGraph = new StreamGraph(....)
        for(Transformation<?> transformation : transformations) {
            transform(transformation);
        }
    }
}

transform(transformation){
    // 先递归处理该 Transformation 的输入
    Collection<Integer> inputIds = transform(transform.getInput());
    // 将 Transformation 变成 Operator 设置到 StreamGraph 中,其实就是添加 StreamNode
    streamGraph.addOperator(....);
    // 设置该 StreamNode 的并行度
    streamGraph.setParallelism(transform.getId(), parallelism);
    // 设置该 StreamNode 的入边 SreamEdge
    for(Integer inputId : inputIds) {
        streamGraph.addEdge(inputId, transform.getId(), 0);
        // 内部实现
        // 构建 StreamNode 之间的 边(StreamEdge) 对象
        StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, ...){
            // TODO_MA 注释: 给 上游 StreamNode 设置 出边
            getStreamNode(edge.getSourceId()).addOutEdge(edge);
        }
        // TODO_MA 注释: 给 下游 StreamNode 设置 入边
        getStreamNode(edge.getTargetId()).addInEdge(edge);
    }
}

execute(StreamGraph) 解析

// 入口
JobClient jobClient = executeAsync(streamGraph){
    // 执行一个 SreamGraph
    executorFactory.getExecutor(configuration).execute(streamGraph, configuration){
        // 第一件事:由 StreamGraph 生成 JobGragh
        JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
        // 第二件事:通过 RestClusterClient 提交 JobGraph 到Flink集群
        clusterClient.submitJob(jobGraph)
    }
}

// 通过 RestClusterClient 来提交 JobGraph
RestClusterClient.submitJob(JobGraph jobGraph){
    // 继续提交
    RestClusterClient.sendRetriableRequest(){
        // 通过 RestClient 提交
        RestClient.sendRequest(webMonitorHost, webMonitorPort, ...){
            // 继续提交
            RestClient.submitRequest(targetAddress,targetPort,httpRequest,responseType)
        }
    }
}

最终通过 channel 把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理。

小结

01、用户根据 Flink 应用程序的编写套路,写好应用程序,打成 jar 包,通过 flink run 的命令来执行提交
02、这个命令的底层,其实是执行: CliFrontend 组件来执行提交
03、这个 CliFrontend 的内部,会通过反射的技术,来转交执行到用户自定义应用程序的 main()
04、先获取 StreamExecutionEnvironment 执行环境对象实例
05、执行算子:其实就是从 算子 ---> function ---> StreamOperator ---> Transformation
06、执行 StreamExecutionEnvironment 的 executor 方法来执行提交
07、首先遍历 StreamExecutionEnvironment 的 transformations 这个 list 来生成 StreamGraph,之后会继续被构建成 JobGraph
08、具体的内部的提交是通过 RestClusterClient 来执行提交
09、在通过 RestClusterClient 提交之前,其实还会做一件事:把 SreamGraph 变成 JobGraph,也还会先把 JobGraph 持久化成为一个磁盘文件
10、在这个 RestClusterClient 的内部,其实是通过 RestClient 来提交
11、RestClient 其实在初始化的时候,就初始化了一个 Netty 客户端
12、通过封装一个 HttpRequest 对象,包含了需要提交的 JobGraph 文件和 Jar 包等,通过 Netty 客户端链接服务端,发送请求对象到服务端
13、Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler

在这里插入图片描述

WebMonitorEndpoint 处理 RestClient 的 JobSubmit 请求

最终处理这个请求: Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler。

// JobManager 服务端处理入口
JobSubmitHandler.handleRequest(){
    // 恢复得到 JobGraph
    JobGraph jobGraph = loadJobGraph(requestBody, nameToFile);
    // 通过 Dispatcher 提交 JobGraph
    Dispatcher.submitJob(jobGraph, timeout);
}

JobMaster 启动源码剖析

关键方法: jobMasterServiceFactory.createJobMasterService
核心的工作是:

  • 创建 JobMaster 这个 RpcEndpoint 组件,负责通信。内部会创建一个 DefaultScheduler 调度组件,在初始化该调度组件的时候,会调用 ExecutionGraphFactory 的相关方法,来把 JobGraph 转换成 ExectionGraph
  • JobMaster 启动,跳转到 onStart() 方法。内部的主要工作,就是以下这三:
    • 启动心跳机制,维持和 ResourceManager,和 TaskExecutor 之间的心跳
    • 启动 SlotPoolImpl 这个 slot 管理组件。
    • 从 ZK 获取 ResourceManager 的地址,从而进行 JobMaster 向 ResourceManager 的注册
  • 启动的这个 JobMaster 负责这个 Job 中的所有的 Task 的 slot 的申请和 任务的派发,状态的跟踪,容错,还有 checkpoint等各种操作

JobMaster 和 ResourceManager/TaskExecutor 的心跳

在这里插入图片描述

JobMaster 向 ResourceManager 注册

// 启动 JobMaster
jobMaster.start(){
    JobMaster.onStart(){
        startJobExecution(){
            // 第一件大事:启动 JobMaster 必要的一些工作
            startJobMasterServices(){
                // 第一件事: 启动心跳机制
                this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
                this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices);
                // 第二件事: 启动 SlotPoolImpl
                slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
                // 第三件事: 从 ZK 获取 ResourceManager 的地址
                // 这儿就是 JobMaster 向 ResourceManager 执行注册的入口
                resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
            }
            // 第二件大事:开始调度执行
            startScheduling();
        }
    }
}
ResourceManager.registerJobManager(){
    // ResourceManager 关于 JobMaster 的注册内部实现,重要的事情做了四件
    registerJobMasterInternal(jobMasterGateway, jobId, ....){
        // TODO_MA 马中华 注释: 生成 JobMaster 注册对象
        JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, ....);
        // TODO_MA 马中华 注释: 完成注册
        jobManagerRegistrations.put(jobId, jobManagerRegistration);
        jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
        // TODO_MA 马中华 注释: 加入心跳管理
        jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>() {});
        // TODO_MA 马中华 注释: 返回 JobMaster 注册成功
        return new JobMasterRegistrationSuccess(getFencingToken(), resourceId);
    }
}

Flink Graph 演变

在这里插入图片描述

StreamGraph 构建和提交源码解析

在这里插入图片描述
关于 StreamNode 的定义:

public class StreamNode {
    private final int id;
    private int parallelism;
    private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
    private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
    private final Class<? extends AbstractInvokable> jobVertexClass;
}

关于 StreamEdge 的定义:

public class StreamEdge implements Serializable {
    private final String edgeId;
    private final int sourceId;
    private final int targetId;
}

JobGraph 构建和提交源码解析

JobGraph: StreamGraph 经过优化后生成了 JobGraph,提交给 Flink 集群的数据结构。它包含的主要抽象概念有:

1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个 JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。
2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。
3、JobEdge:代表了 job graph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。

在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什么样的情况的下 Operator 能chain 在一起呢 ?答案是要满足以下 9 个条件:

// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认是 HEAD)
!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是 ALWAYS)
!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的 shuffle 方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

在这里插入图片描述
构建逻辑的重点代码:

1、在 connect 之间,调用的 createChain() 就是先执行优化,然后再生成 JobVertex
2、然后 调用 connect 之后,是为了组织关系
    1、先生成 IntermediateDataSet 和 JobEdge
    2、把 IntermediateDataSet 和 当前 JobVertex 设置为 JobEdge 的 source 和 target
    3、把 JobEdge 设置为这个 IntermediateDataSet 的消费者

关于 JobVertex 的定义:

public class JobVertex implements java.io.Serializable {
    private final JobVertexID id;
    private final ArrayList<IntermediateDataSet> results = new ArrayList<>();
    private final ArrayList<JobEdge> inputs = new ArrayList<>();
    private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
    private String invokableClassName;
}

关于 IntermediateDataSet 的定义:

public class IntermediateDataSet implements java.io.Serializable {
    private final IntermediateDataSetID id;
    private final JobVertex producer;
    private final List<JobEdge> consumers = new ArrayList<JobEdge>();
}

关于 JobEdge 的定义:

public class JobEdge implements java.io.Serializable {
    private final JobVertex target;
    private IntermediateDataSet source;
    private IntermediateDataSetID sourceId;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1376885.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

allegro PCB设计心得笔记(二) PCB板框设计心得

Cadence Allegro软件设计PCB板框时&#xff0c;使用Add -> line&#xff0c;在Option选择Board Geometry/Outline&#xff0c;根据PCB需要输入对应坐标&#xff0c;设计好板框。 使用Z-Copy命令设计Route Keepin和Package Keepin时&#xff0c;需要先使用使用Shape -> Co…

【JAVA】Java 中什么叫单例设计模式?请用 Java 写出线程安全的单例模式

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a;JAVA ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 前言 正文 懒汉式&#xff08;Lazy Initialization&#xff09;&#xff1a; 双重检查锁定&#xff08;Double-Checked Locking&#xff09;…

vue 组件 import make sure to provide the “name“ option.

百度了好多结果&#xff0c;都过时了&#xff0c;例如&#xff1a; 模块引入是否加{} 再比如&#xff1a; 对于递归组件&#xff0c;请确保提供“name”选项。 出现该错误情况之一&#xff1a; 错误由未正确引入组件或子组件引起&#xff0c;如element-ui中form表单组件未引…

19. 从零用Rust编写正反向代理, 配置数据的热更新原理及实现

wmproxy wmproxy是由Rust编写&#xff0c;已实现http/https代理&#xff0c;socks5代理&#xff0c; 反向代理&#xff0c;静态文件服务器&#xff0c;内网穿透&#xff0c;配置热更新等&#xff0c; 后续将实现websocket代理等&#xff0c;同时会将实现过程分享出来&#xff…

MATLAB读取图片并转换为二进制数据格式

文章目录 前言一、MATLAB 文件读取方法1、文本文件读取2、二进制文件读取3、 图像文件读取4、其他文件读取 二、常用的图像处理标准图片链接三、MATLAB读取图片并转换为二进制数据格式1、matlab 源码2、运行结果 前言 本文记录使用 MATLAB 读取图片并转换为二进制数据格式的方…

130基于MATLAB并结合IBD算法的盲迭代反卷积法进行图像复原

基于MATLAB并结合IBD算法的盲迭代反卷积法进行图像复原 ,输出复原前后图像&#xff0c;PSF频谱结果。程序已调通&#xff0c;可直接运行。 130 matlab盲迭代反卷积IBD (xiaohongshu.com)

高手总结17个画好原理图的技巧

欧若奇科技 专业电路设计&#xff0c;PCB复制&#xff0c;原理图反推&#xff0c;电子产品优化设计等 不光是代码有可读性的说法&#xff0c;原理图也有。很多时候原理图不仅仅是给自己看的&#xff0c;也会给其它人看&#xff0c;如果可读性差&#xff0c;会带来一系列沟通问…

【TypeScript】入门基础知识

目前在做项目的技术栈是 reacttypescript&#xff0c;之前只知道 ts 是 js 的扩展&#xff0c;增加了类型检查&#xff0c;但是没有仔细的学过&#xff0c;纯纯看别人代码上手 anyscript&#xff08;这很难评...&#xff09;。趁着最近空闲&#xff0c;就学习一下 ts 的基础知识…

鸿蒙原生应用/元服务开发-长时任务

概述 功能介绍 应用退至后台后&#xff0c;对于在后台需要长时间运行用户可感知的任务&#xff0c;例如播放音乐、导航等。为防止应用进程被挂起&#xff0c;导致对应功能异常&#xff0c;可以申请长时任务&#xff0c;使应用在后台长时间运行。申请长时任务后&#xff0c;系统…

电脑弹窗“concrt140.dll文件找不到”,快速修复,亲测有效

很多小伙伴&#xff0c;在启动游戏或软件的时候&#xff0c;电脑会弹出错误提示框称““concrt140.dll文件找不到&#xff0c;程序无法运行”&#xff0c;不清楚是怎么回事&#xff0c;应该怎么办&#xff1f; 首先&#xff0c;我们先来了解““concrt140.dll文件找”是什么&a…

【AI视野·今日Robot 机器人论文速览 第七十四期】Wed, 10 Jan 2024

AI视野今日CS.Robotics 机器人学论文速览 Wed, 10 Jan 2024 Totally 17 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Robotics Papers Hold em and Fold em: Towards Human-scale, Feedback-Controlled Soft Origami Robots Authors Immanuel Ampomah Mensah, Je…

【前后端的那些事】前后端环境搭建+树形结构表格实现

文章目录 1. 前后端项目环境搭建2. table-tree2.1 后端准备2.2 前端准备 前言&#xff1a;最近写项目&#xff0c;发现了一些很有意思的功能&#xff0c;想写文章&#xff0c;录视频把这些内容记录下。但这些功能太零碎&#xff0c;如果为每个功能都单独搭建一个项目&#xff0…

【贪心】一手顺子

/** 贪心&#xff1a;将一个数当成一个组中最小的数&#xff0c;在根据该最小数找其它数。* 思路&#xff1a;将hand进行分组&#xff0c;假设hand长度为 n&#xff0c;必须n % groupSize 0才可以分组&#xff0c;否则返回false&#xff0c;* 使用哈希表记录每个数出现…

考研经验总结——目录

文章目录 一、写作顺序二、个人情况说明三、读评论四、一些小牢骚 一、写作顺序 我将准备从三个阶段开始介绍吧 考研前考研中考研后&#xff08;也就是现在我的这种情况&#xff09; 考研前我会分为&#xff1a;数学、专业课、政治、英语 四个部分来写 我应该会涉及&#xf…

AI赋能建筑设计 | VERYCLOUD睿鸿股份与亚马逊云科技协力为AIRI lab. 打造生成式AI应用案例

近年来&#xff0c;很多研究都致力于探索如何让建筑师借助人工智能的力量来促进并简化设计流程。生成式AI全球爆火以来&#xff0c;建筑设计领域也掀起了一场全新的思维变革。 AI为建筑设计带来更多可能 作为一家面向全球提供设计服务的企业&#xff0c;AIRI lab.计划推出一种…

Python教程41:使用turtle画蜡笔小新

---------------turtle源码集合--------------- Python教程39&#xff1a;使用turtle画美国队长盾牌 Python教程38&#xff1a;使用turtle画动态粒子爱心文字爱心 Python教程37&#xff1a;使用turtle画一个戴帽子的皮卡丘 Python教程36&#xff1a;海龟画图turtle写春联 …

【pytorch】使用pytorch构建线性回归模型-了解计算图和自动梯度

使用pytorch构建线性回归模型 线性方程的一般形式 衡量线性损失的一般形式-均方误差 pytorch中计算图的作用和优势 在 PyTorch 中&#xff0c;计算图&#xff08;Computational Graph&#xff09;是一种用于表示神经网络运算的数据结构。每个节点代表一个操作&#xff0c;例如…

【AWS】使用亚马逊云服务器创建EC2实例

目录 前言为什么选择 Amazon EC2 云服务器搭建 Amazon EC2 云服务器注册亚马逊账号登录控制台服务器配置免费套餐预览使用 Amazon EC2 云服务器打开服务器管理界面设置服务器区域填写实例名称选择服务器系统镜像选择实例类型创建密钥对网络设置配置存储启动实例查看实例 总结 前…

【天龙怀旧服】攻略day5

关键字&#xff1a; 天鉴扫荡、举贤、燕子水路 1】85天鉴任务可以扫荡 在流派选择npc那里&#xff0c;花费40交子即可扫荡100点&#xff0c;可以兑换10个灵武打造图&#xff1b; 此外打造图绑定不影响做出来的灵武绑定&#xff0c;只要对应的玉不绑灵武就不绑定 2】冠绝师门…

C#使用CryptoStream类加密和解密字符串

目录 一、CrytoStream的加密方法 二、CrytoStream的解密方法 三、实例 1.源码Form1.cs 2.类库Encrypt.cs 3.生成效果 在使用CryptoStream前要先引用命名空间using System.Security.Cryptography。 一、CrytoStream的加密方法 记住&#xff0c;不能再使用DESCryptoServi…