Flink JobGraph构建过程

news2025/1/12 12:30:49

文章目录

  • 前言
  • JobGraph创建的过程
  • 总结


前言

StreamGraph构建过程中分析了StreamGraph的构建过程,在StreamGraph构建完毕之后会对StreamGraph进行优化构建JobGraph,然后再提交JobGraph。优化过程中,Flink会尝试将尽可能多的StreamNode聚合在一个JobGraph节点中,通过合并创建JobVertex,并生成JobEdge,以减少数据在不同节点之间流动所产生的序列化、反序列化、网络传输的开销。它包含的主要抽象概念有:

1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是IntermediateDataSet。

2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是JobVertex,consumer 是 JobEdge。

3、JobEdge:代表了job graph中的一条数据传输通道。source是IntermediateDataSet,target是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。
在这里插入图片描述


JobGraph创建的过程

AbstractJobClusterExecutor.execute -> PipelineExecutorUtils.getJobGraph  -> 
PipelineTranslator.translateToJobGraph -> StreamGraphTranslator.translateToJobGraph
 -> StreamGraph.getJobGraph ->  StreamingJobGraphGenerator.createJobGraph

createJobGraph()函数

private JobGraph createJobGraph() {
        preValidate();
        jobGraph.setJobType(streamGraph.getJobType());

        jobGraph.enableApproximateLocalRecovery(
                streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());

        // 为节点生成确定性哈希,以便在提交时识别它们(如果它们没有更改)。.
        Map<Integer, byte[]> hashes =
                defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        setChaining(hashes, legacyHashes);

        setPhysicalEdges();

        markContainsSourcesOrSinks();

        setSlotSharingAndCoLocation();

        setManagedMemoryFraction(
                Collections.unmodifiableMap(jobVertices),
                Collections.unmodifiableMap(vertexConfigs),
                Collections.unmodifiableMap(chainedConfigs),
                id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
                id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());

        configureCheckpointing();

        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

        final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
                JobGraphUtils.prepareUserArtifactEntries(
                        streamGraph.getUserArtifacts().stream()
                                .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                        jobGraph.getJobID());

        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                distributedCacheEntries.entrySet()) {
            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
        }

        // 在最后完成ExecutionConfig时设置它
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
        }

        jobGraph.setChangelogStateBackendEnabled(streamGraph.isChangelogStateBackendEnabled());

        addVertexIndexPrefixInVertexName();

        setVertexDescription();

        // Wait for the serialization of operator coordinators and stream config.
        try {
            FutureUtils.combineAll(
                            vertexConfigs.values().stream()
                                    .map(
                                            config ->
                                                    config.triggerSerializationAndReturnFuture(
                                                            serializationExecutor))
                                    .collect(Collectors.toList()))
                    .get();

            waitForSerializationFuturesAndUpdateJobVertices();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Error in serialization.", e);
        }

        if (!streamGraph.getJobStatusHooks().isEmpty()) {
            jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
        }

        return jobGraph;
    }

在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什
么样的情况的下 Operator 能chain 在一起呢?

// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认HEAD!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是
ALWAYS!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的shuffle方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

构造边

private void connect(Integer headOfChain, StreamEdge edge, NonChainedOutput output) {

        physicalEdgesInOrder.add(edge);

        Integer downStreamVertexID = edge.getTargetId();

        JobVertex headVertex = jobVertices.get(headOfChain);
        JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);

        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());

        downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);

        StreamPartitioner<?> partitioner = output.getPartitioner();
        ResultPartitionType resultPartitionType = output.getPartitionType();

        if (resultPartitionType == ResultPartitionType.HYBRID_FULL
                || resultPartitionType == ResultPartitionType.HYBRID_SELECTIVE) {
            hasHybridResultPartition = true;
        }

        checkBufferTimeout(resultPartitionType, edge);

        JobEdge jobEdge;
        if (partitioner.isPointwise()) {
            jobEdge =
                    downStreamVertex.connectNewDataSetAsInput(
                            headVertex,
                            DistributionPattern.POINTWISE,
                            resultPartitionType,
                            opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),
                            partitioner.isBroadcast());
        } else {
            jobEdge =
                    downStreamVertex.connectNewDataSetAsInput(
                            headVertex,
                            DistributionPattern.ALL_TO_ALL,
                            resultPartitionType,
                            opIntermediateOutputs.get(edge.getSourceId()).get(edge).getDataSetId(),
                            partitioner.isBroadcast());
        }

        // set strategy name so that web interface can show it.
        jobEdge.setShipStrategyName(partitioner.toString());
        jobEdge.setForward(partitioner instanceof ForwardPartitioner);
        jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
        jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());

        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "CONNECTED: {} - {} -> {}",
                    partitioner.getClass().getSimpleName(),
                    headOfChain,
                    downStreamVertexID);
        }
    }

总结

1、在StreamGraph构建完毕之后会开始构建JobGraph,然后再提交JobGraph。

2、StreamingJobGraphGenerator.createJobGraph()是构建JobGraph的核心实现,实现中首先会广度优先遍历StreamGraph,为其中的每个StreamNode生成一个Hash值,如果用户设置了operator的uid,那么就根据uid来生成Hash值,否则系统会自己为每个StreamNode生成一个Hash值。如果用户自己为operator提供了Hash值,也会拿来用。生成Hash值的作用主要应用在从checkpoint中的数据恢复

3、在生成Hash值之后,会调用setChaining()方法,创建operator chain、构建JobGraph顶点JobVertex、边JobEdge、中间结果集IntermediateDataSet的核心方法。

1)、创建StreamNode chain(operator chain)

从source开始,处理出边StreamEdge和target节点(edge的下游节点),递归的向下处理StreamEdge上和target StreamNode,直到找到那条过渡边,即不能再进行chain的那条边为止。那么这中间的StreamNode可以作为一个chain。这种递归向下的方式使得程序先chain完StreamGraph后面的节点,再处理头结点,类似于后序递归遍历。

2)、创建顶点JobVertex

顶点的创建在创建StreamNode chain的过程中,当已经完成了一个StreamNode chain的创建,在处理这个chain的头结点时会创建顶点JobVertex,顶点的JobVertexID根据头结点的Hash值而决定。同时JobVertex持有了chain上的所有operatorID。因为是后续遍历,所有JobVertex的创建过程是从后往前进行创建,即从sink端到source端

3)、创建边JobEdge和IntermediateDataSet

JobEdge的创建是在完成一个StreamNode chain,在处理头结点并创建完顶点JobVertex之后、根据头结点和过渡边进行connect操作时进行的,连接的是当前的JobVertex和下游的JobVertex,因为JobVertex的创建是由下至上的。

根据头结点和边从jobVertices中找到对应的JobGraph的上下游顶点JobVertex,获取过渡边的分区器,创建对应的中间结果集IntermediateDataSet和JobEdge。IntermediateDataSet由上游的顶点JobVertex创建,上游顶点JobVertex作为它的生产者producer,IntermediateDataSet作为上游顶点的输出。JobEdge中持有了中间结果集IntermediateDataSet和下游的顶点JobVertex的引用, JobEdge作为中间结果集IntermediateDataSet的消费者,JobEdge作为下游顶点JobVertex的input。整个过程就是
上游JobVertex——>IntermediateDataSet——>JobEdge——>下游JobVertex

4、接下来就是为顶点设置共享solt组、设置checkpoint配置等操作了,最后返回JobGraph,JobGraph的构建就完毕了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1493314.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Redis】redis的基本使用

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;Redis ⛺️稳中求进&#xff0c;晒太阳 Redis的概述 为什么要有redis? redis是数据库&#xff0c;mysql也是数据库&#xff0c;redis做缓存的意义就是为了减轻数据库压力 数据库为什么…

【归并排序】AcWing. 505 / NOIP2013提高组《火柴排队》(c++)

【题目描述】 涵涵有两盒火柴&#xff0c;每盒装有 n 根火柴&#xff0c;每根火柴都有一个高度。 现在将每盒中的火柴各自排成一列&#xff0c;同一列火柴的高度互不相同&#xff0c;两列火柴之间的距离定义为&#xff1a; 其中 ai 表示第一列火柴中第 i 个火柴的高度&a…

OWASP Top 10 网络安全10大漏洞——A02:A02:2021-加密机制失效

10大Web应用程序安全风险 2021年top10中有三个新类别、四个类别的命名和范围变化&#xff0c;以及一些合并。 A02&#xff1a;A02:2021-加密机制失效 上升一个位置&#xff0c;当前top2&#xff0c;以前称为敏感数据泄露&#xff0c;是一种状况而不是根本原因。更新后的类别…

c#简易学生管理系统

https://pan.baidu.com/s/1kCPvWg8P5hvlf26nGf2vxg?pwdya45 ya45

15.Django总结

文章目录 1.Django创建项目的命令2.MVC,MVT的理解3.Django中间件的使用4.WSGI,uWSGI服务器 和 uwsgi协议5.nginx和uWISG 服务器之间如何配合工作的6.django开发中数据库做过什么优化7.Python中三大框架各自的应用场景8.django如何提升性能(高并发)9. 什么是restful api谈谈你的…

day11_SpringCloud(Nacos注册中心,LoadBalancer,OpenFeign)

文章目录 Spring Cloud Alibaba1 系统架构演进1.1 单体架构1.2 微服务架构1.3 分布式和集群 2 Spring Cloud Alibaba概述2.1 Spring Cloud简介2.2 Spring Cloud Alibaba简介 3 微服务环境准备3.1 工程结构说明3.2 父工程搭建3.3 用户微服务搭建3.3.1 基础环境搭建3.3.2 基础代码…

(文末送书)直击前沿技术:《低代码平台开发实践:基于React》

目录 前言 一、React与低代码平台的结合优势 二、基于React的低代码平台开发挑战 三、基于React的低代码平台开发实践 四、书籍推荐 《低代码平台开发实践&#xff1a;基于React》 1、图书介绍 2、适用人群 3、 作者简介 4、写书原由 5、解决问题 6、书…

理解CPU指令执行:从理论到实践

理解CPU指令执行&#xff1a;从理论到实践 在探讨现代计算机的核心——中央处理单元&#xff08;CPU&#xff09;的工作原理时&#xff0c;我们经常遇到“时钟周期”和“指令执行”这两个概念。这些概念不仅对于理解CPU的性能至关重要&#xff0c;而且对于揭示计算机如何处理任…

javascript中的structuredClone()克隆方法

前言&#xff1a; structuredClone 是 JavaScript 的方法之一&#xff0c;用于深拷贝一个对象。它的语法是 structuredClone(obj)&#xff0c;其中 obj 是要拷贝的对象。structuredClone 方法将会创建一个与原始对象完全相同但是独立的副本。 案例&#xff1a; 当使用Web Work…

精选六款Linux发行版,助力你的渗透测试之旅

Parrot Security OS Parrot Security OS 对游戏来说相对较新。Frozenbox Network是该发行版开发的幕后推手。Parrot Security OS 的目标用户是渗透测试人员&#xff0c;他们需要具有在线匿名性和加密系统的云友好环境。 ​ 系统镜像下载页面&#xff1a;https://www.parrotse…

Codesys 位置式PID闭环控制系统(PID+PWM控制无刷电机)

有关Codesys位置式PID算法公式和源代码,请参考下面文章链接: 1、Codesys位置式PID https://rxxw-control.blog.csdn.net/article/details/131591254https://rxxw-control.blog.csdn.net/article/details/1315912542、博途PLC PWM输出控制 https://rxxw-control.blog.csdn.…

C++ STL自定义排序

更具体的看【速记】C STL自定义排序 - 知乎 (zhihu.com) sort sort第三个位置放的greater<int>和less<int>萌新可能会弄错&#xff0c;这两个单词不是更大和更小的意思&#xff0c;而是大于和小于&#xff0c;并且比较就是自定义排序中的前者和后者。 如果是less…

Small TopAppBar

Small 类型 TopAppBar AppBar 主要由2类&#xff0c;顶部 AppBar 和底部 AppBar。 顶部 AppBar&#xff1a;主要包含了标题&#xff0c;action菜单&#xff0c;导航菜单。底部 AppBar&#xff1a;典型地包含主要导航项。 顶部 AppBar 顶部 AppBar 包含了 4 中类型&#xff…

玩转小米:如何取消王者荣耀微信双开默认选择

文章目录 💢 问题 💢🏡 演示环境 🏡💯 解决方案 💯💢 问题 💢 当我们在手机上安装了多个微信(分身)后,在一些软件(例如王者)使用微信登入时会出现让们选择使用哪个微信进行登入,但是有时候我们不小心设置了默认某一个微信登入后,下次就无法出现选择页面…

物联网与智慧城市:融合创新,塑造未来城市生活新图景

一、引言 在科技飞速发展的今天&#xff0c;物联网与智慧城市的融合创新已成为推动城市发展的重要力量。物联网技术通过连接万物&#xff0c;实现信息的智能感知、传输和处理&#xff0c;为智慧城市的构建提供了无限可能。智慧城市则运用物联网等先进技术&#xff0c;实现城市…

信息系统项目管理师--项目整合管理

项⽬整合管理包括识别、定义、组合、统⼀和协调项⽬管理过程组的各个过程和项⽬管理活动。在项⽬管理中&#xff0c;整合管理兼具统⼀、合并、沟通和建⽴联系的性质&#xff0c;项⽬整合管理贯穿项⽬始终 项⽬整合管理的⽬标包括&#xff1a; ①资源分配&#xff1a; ②平衡竞…

【2024】vue-router和pinia的配置使用

目录 vue-routerpiniavue-routerpinia进阶用法---动态路由 有同学在项目初始化后没有下载vue-router和pinia&#xff0c;下面开始&#xff1a; vue-router npm install vue-router然后在src目录下创建文件夹router&#xff0c;以及下面的index.ts文件&#xff1a; 写进下面的…

每日一题-反转链表

&#x1f308;个人主页: 会编辑的果子君 &#x1f4ab;个人格言:“成为自己未来的主人~” 下面是代码的解题过程&#xff1a; /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/ typedef struct ListNode SLis…

【PowerMockito:编写单元测试过程中原方法没有注入的属性在跑单元测试时出现空指针】

出错场景 下面这一步报空指针&#xff0c;但是因为没有注入&#xff0c;在测试类中无法使用Mock 解决 在执行方法前&#xff0c;加入以下代码 MemberModifier.field(ResourceServiceImpl.class,"zero").set(resourceService,"0");

基于springboot的中小企业设备管理系统设计与实现论文

中小企业设备管理系统 摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了中小企业设备管理系统的开发全过程。通过分析中小企业设备管理系统管理的不足&#xff0c;创建了一个计算机管理中小企业设备管理系统的方…