Flink ExecuteGraph构建源码解析

news2024/11/16 19:58:25

文章目录

  • 前言
  • ExecutionGraph中的主要抽象概念
  • 源码核心代码入口
  • 源码核心流程:


前言

在这里插入图片描述

JobGraph构建过程中分析了JobGraph的构建过程,本文分析ExecutionGraph的构建过程。JobManager(JobMaster) 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph是JobGraph 的并行化版本,是调度层最核心的数据结构。


ExecutionGraph中的主要抽象概念

1、ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有
和并发度一样多的 ExecutionVertex。
2、ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输
出是IntermediateResultPartition。
3、IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个
IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。
4、IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是
ExecutionVertex,consumer是若干个ExecutionEdge。
5、ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,
target是ExecutionVertex。source和target都只能是一个。
6、Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下
ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过
ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过
ExecutionAttemptID 来确定消息接受者。

源码核心代码入口

ExecutionGraph executioinGraph = SchedulerBase.createAndRestoreExecutionGraph(
                        completedCheckpointStore,
                        checkpointsCleaner,
                        checkpointIdCounter,
                        initializationTimestamp,
                        mainThreadExecutor,
                        jobStatusListener,
                        vertexParallelismStore);

在 SchedulerBase 这个类的内部,有两个成员变量:一个是 JobGraph,一个是 ExecutioinGraph
在创建 SchedulerBase 的子类:DefaultScheduler 的实例对象的时候,会在 SchedulerBase 的构造
方法中去生成 ExecutionGraph。

源码核心流程:

DefaultExecutionGraphFactory.createAndRestoreExecutionGraph()
ExecutionGraph newExecutionGraph = createExecutionGraph(...)
DefaultExecutionGraphBuilder.buildGraph(jobGraph, ....)
// 创建 ExecutionGraph 对象
executionGraph = (prior != null) ? prior : new ExecutionGraph(...)
// 生成 JobGraph 的 JSON 表达形式
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
// 重点,从 JobGraph 构建 ExecutionGraph
executionGraph.attachJobGraph(sortedTopology);
// 遍历 JobVertex 执行并行化生成 ExecutioinVertex
for(JobVertex jobVertex : topologiallySorted) {
	// 每一个 JobVertex 对应到一个 ExecutionJobVertex
	ExecutionJobVertex ejv = new ExecutionJobVertex(jobGraph,
	jobVertex);
	ejv.connectToPredecessors(this.intermediateResults);
	List<JobEdge> inputs = jobVertex.getInputs();
	for(int num = 0; num < inputs.size(); num++) {
		JobEdge edge = inputs.get(num);
		IntermediateResult ires =intermediateDataSets.get(edgeID);
		this.inputs.add(ires);
		// 根据并行度来设置 ExecutionVertex
		for(int i = 0; i < parallelism; i++) {
			ExecutionVertex ev = taskVertices[i];
			ev.connectSource(num, ires, edge,consumerIndex);
		}
	}
}

DefaultExecutionGraphBuilder 详细代码如下:

public class DefaultExecutionGraphBuilder {

    public static DefaultExecutionGraph buildGraph(
            JobGraph jobGraph,
            Configuration jobManagerConfig,
            ScheduledExecutorService futureExecutor,
            Executor ioExecutor)  {
        final String jobName = jobGraph.getName();
        final JobID jobId = jobGraph.getJobID();
        final JobInformation jobInformation = new JobInformation(... );
        // create a new execution graph, if none exists so far
        final DefaultExecutionGraph executionGraph;
         executionGraph = new DefaultExecutionGraph( ....);

        // set the basic properties
        executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));

        // initialize the vertices that have a master initialization hook
        // file output formats create directories here, input formats create splits
        for (JobVertex vertex : jobGraph.getVertices()) {
            String executableClass = vertex.getInvokableClassName();
                vertex.initializeOnMaster(
                        new SimpleInitializeOnMasterContext(
                                classLoader,
                                vertexParallelismStore
                                        .getParallelismInfo(vertex.getID())
                                        .getParallelism()));
        }

        // topologically sort the job vertices and attach the graph to the existing one
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();

        executionGraph.attachJobGraph(sortedTopology);

        // configure the state checkpointing
        if (isDynamicGraph) {
            // dynamic graph does not support checkpointing so we skip it
            log.warn("Skip setting up checkpointing for a job with dynamic graph.");
        } else if (isCheckpointingEnabled(jobGraph)) {
            JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();

            // load the state backend from the application settings
            final StateBackend applicationConfiguredBackend;
            final SerializedValue<StateBackend> serializedAppConfigured =
                    snapshotSettings.getDefaultStateBackend();

            if (serializedAppConfigured == null) {
                applicationConfiguredBackend = null;
            } else {
                try {
                    applicationConfiguredBackend =
                            serializedAppConfigured.deserializeValue(classLoader);
                } catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(
                            jobId, "Could not deserialize application-defined state backend.", e);
                }
            }

            final StateBackend rootBackend =
                     StateBackendLoader.fromApplicationOrConfigOrDefault(
                             applicationConfiguredBackend,
                             snapshotSettings.isChangelogStateBackendEnabled(),
                             jobManagerConfig,
                             classLoader,
                             log);


            // load the checkpoint storage from the application settings
            final CheckpointStorage applicationConfiguredStorage;
            final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
                    snapshotSettings.getDefaultCheckpointStorage();

            if (serializedAppConfiguredStorage == null) {
                applicationConfiguredStorage = null;
            } else {
                    applicationConfiguredStorage =                           serializedAppConfiguredStorage.deserializeValue(classLoader);


            final CheckpointStorage rootStorage;
            try {
                rootStorage =
                        CheckpointStorageLoader.load(
                                applicationConfiguredStorage,
                                null,
                                rootBackend,
                                jobManagerConfig,
                                classLoader,
                                log);
            } catch (IllegalConfigurationException | DynamicCodeLoadingException e) {
                throw new JobExecutionException(
                        jobId, "Could not instantiate configured checkpoint storage", e);
            }

            // instantiate the user-defined checkpoint hooks

            final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
                    snapshotSettings.getMasterHooks();
            final List<MasterTriggerRestoreHook<?>> hooks;

            if (serializedHooks == null) {
                hooks = Collections.emptyList();
            } else {
                final MasterTriggerRestoreHook.Factory[] hookFactories;
                try {
                    hookFactories = serializedHooks.deserializeValue(classLoader);
                } catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(
                            jobId, "Could not instantiate user-defined checkpoint hooks", e);
                }

                final Thread thread = Thread.currentThread();
                final ClassLoader originalClassLoader = thread.getContextClassLoader();
                thread.setContextClassLoader(classLoader);

                try {
                    hooks = new ArrayList<>(hookFactories.length);
                    for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
                        hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
                    }
                } finally {
                    thread.setContextClassLoader(originalClassLoader);
                }
            }

            final CheckpointCoordinatorConfiguration chkConfig =
                    snapshotSettings.getCheckpointCoordinatorConfiguration();
            String changelogStorage = jobManagerConfig.getString(STATE_CHANGE_LOG_STORAGE);

            executionGraph.enableCheckpointing(
                    chkConfig,
                    hooks,
                    checkpointIdCounter,
                    completedCheckpointStore,
                    rootBackend,
                    rootStorage,
                    checkpointStatsTrackerFactory.get(),
                    checkpointsCleaner,
                    jobManagerConfig.getString(STATE_CHANGE_LOG_STORAGE));
        }

        return executionGraph;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1499701.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

keepalived原理以及lvs、nginx跟keeplived的运用

keepalived基础 keepalived的原理是根据vrrp协议&#xff08;主备模式&#xff09;去设定的 vrrp技术相关原理 状态机&#xff1b; 优先级0~255 心跳线1秒 vrrp工作模式 双主双备模式 VRRP负载分担过程 vrrp安全认证&#xff1a;使用共享密匙 keepalived工具介绍 keepal…

Python实现霍德里克-普雷斯科特(Hodrick-Prescott,HP)过滤器模型和UC-ARIMA模型(hpfilter算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 霍德里克-普雷斯科特&#xff08;Hodrick-Prescott, HP&#xff09;过滤器模型&#xff1a; HP滤波器是…

数据结构算法第二套试卷小题

1.哈夫曼树 在哈夫曼树中&#xff0c;每个非叶子节点都有两个子节点&#xff08;因为哈夫曼树是一种二叉树&#xff09;&#xff0c;而叶子节点没有子节点。如果用二叉链表作为存储结构&#xff0c;每个节点需要包含两个指针域&#xff0c;分别指向其左孩子和右孩子。因此&…

SystemVerilog构造、包

包 包提供了一种共享不同构造的附加方式。他们的行为与VHDL包。包可以包含函数、任务、类型和枚举。的语法包是&#xff1a; package package_name; items endpackage : package_name 最终的package_name不是必需的&#xff0c;但它使代码更易于阅读。包是import命令在其他…

渗透测试——信息收集

信息收集 前言 信息收集是在做渗透时找尽可能的多的信息&#xff0c;为之后的渗透做铺垫。信息收集的方法有很多 比如&#xff0c;页面、真实的IP、域名/子域名、敏感目录/文件、端口探测、CMS指纹识别、操作系统识别 1. 页面信息收集 拿到域名后&#xff0c;从网站的url中…

c++ 二分查找(迭代与递归)

二分搜索被定义为一种在排序数组中使用的搜索算法&#xff0c;通过重复将搜索间隔一分为二。二分查找的思想是利用数组已排序的信息&#xff0c;将时间复杂度降低到O(log N)。 二分查找算法示例 何时在数据结构中应用二分查找的条件&#xff1a; 应用二分查找算法&#xff1a;…

Java程序员如何通过跳槽薪资翻倍,java多态面试题

Spring 面试题 1、不同版本的 Spring Framework 有哪些主要功能&#xff1f; 2、什么是 Spring Framework&#xff1f; 3、列举 Spring Framework 的优点。 4、Spring Framework 有哪些不同的功能&#xff1f; 5、Spring Framework 中有多少个模块&#xff0c;它们分别是什…

一文搞懂所有 VAE 模型

目录 收起 1 引言 2 符号术语 2.1 AE中的符号 2.2 VAE中的符号 3 基础自编码器 3.1 Autoencoder 3.2 Denoising Autoencoder 随着Stable Diffusion和Sora等技术在生成图像和视频的质量与帧率上取得显著提升&#xff0c;能够在一个低维度的压缩空间进行计算变得越发重要…

【数据结构与算法】二分查找题解(二)

这里写目录标题 一、81. 搜索旋转排序数组 II二、167. 两数之和 II - 输入有序数组三、441. 排列硬币四、374. 猜数字大小五、367. 有效的完全平方数六、69. x 的平方根 一、81. 搜索旋转排序数组 II 中等 已知存在一个按非降序排列的整数数组 nums &#xff0c;数组中的值不必…

7-18 彩虹瓶(Python)

彩虹瓶的制作过程&#xff08;并不&#xff09;是这样的&#xff1a;先把一大批空瓶铺放在装填场地上&#xff0c;然后按照一定的顺序将每种颜色的小球均匀撒到这批瓶子里。 假设彩虹瓶里要按顺序装 N 种颜色的小球&#xff08;不妨将顺序就编号为 1 到 N&#xff09;。现在工…

vue3+elementPlus:el-table-column表格列动态设置单元格颜色

:cell-style属性 //html<el-tableempty-text"暂无数据":data"datalist.table":max-height"height"row-key"id"border:cell-style"cellStyle"> <el-table>//js //动态设置单元格颜色 const cellStyle ({ row, c…

矢量数据库简单介绍:在 Postgres使用 pg_vector

矢量数据库简单介绍&#xff1a;在 Postgres使用 pg_vector 作为向人工智能大规模转变的一部分&#xff0c;矢量数据库越来越受欢迎。它们也称为矢量化数据库&#xff0c;在人工智能领域发挥着至关重要的作用&#xff0c;因此了解它们的工作原理非常重要。为此&#xff0c;我们…

某准网招聘接口逆向之WebPack扣取

​​​​​逆向网址 aHR0cHM6Ly93d3cua2Fuemh1bi5jb20v 逆向链接 aHR0cHM6Ly93d3cua2Fuemh1bi5jb20vc2VhcmNoP3BhZ2VOdW09MSZxdWVyeT1weXRob24mdHlwZT01 逆向接口 aHR0cHM6Ly93d3cua2Fuemh1bi5jb20vYXBpX3RvL3NlYXJjaC9qb2IuanNvbg 逆向过程 请求方式&#xff1a;GET 参数构成…

FPGA高端项目:FPGA基于GS2971的SDI视频接收+纯verilog图像缩放+多路视频拼接,提供8套工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐本博已有的 SDI 编解码方案本方案的SDI接收转HDMI输出应用本方案的SDI接收图像缩放应用本方案的SDI接收HLS图像缩放HLS多路视频拼接应用本方案的SDI接收OSD多路视频融合叠加应用本方案的SDI接收HLS多路视频融合叠加应用本方案的SDI接收GTX…

基于Redis自增实现全局ID生成器(详解)

本博客为个人学习笔记&#xff0c;学习网站与详细见&#xff1a;黑马程序员Redis入门到实战 P48 - P49 目录 全局ID生成器介绍 基于Redis自增实现全局ID 实现代码 全局ID生成器介绍 背景介绍 当用户在抢购商品时&#xff0c;就会生成订单并保存到数据库的某一张表中&#…

Python 读取写入excel文件

使用Python读取和写入excel的xlsx、xls文件 目录 读取xlsx文件 安装三方库 引入三方库 读取数据 打开文件 表名 最大行数 最大列数 读取一张表 读取整个文件 返回xls整体内容 安装三方包 读取内容 写入xls文件 引入三方库 创建文件并写入数据 报错及解决 报错…

SSL 证书,了解一下常识

公司的网站、应用怎么才能保证在互联网上安全运行&#xff0c;不被攻击、盗取数据呢&#xff1f; 创业必经之路&#xff0c;一步一步走就对了&#xff0c;可能没赶上红利期&#xff0c;但不做就等于0。 概述 SSL 证书&#xff08;SSL Certificates&#xff09;又称数字证书&am…

3/7—21. 合并两个有序链表

代码实现&#xff1a; 方法1&#xff1a;递归 ---->难点 /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/ struct ListNode* mergeTwoLists(struct ListNode *list1, struct ListNode *list2) {/*1.如果l1为…

Vivado使用记录(未完待续)

一、Zynq开发流程 二、软件安装 三、软件使用 字体大小修改&#xff1a;Setting、Font 四、Vivado基本开发流程 1、创建工程 Quick Start 组包含有 Create Project&#xff08;创建工程&#xff09;、 Open Project&#xff08;打开工程&#xff09;、 Open Example Project&…

智慧视频终端解决方案

依托富瀚微智慧视频SOC&#xff0c;提供以视频为核心的智能产品及解决方案