Flink 物理执行图

news2024/12/30 2:45:54

文章目录

  • 物理执行图
  • 一、Task
  • 二、ResultPartition
  • 三、ResultSubpartition
  • 四、InputGate
  • 五、InputChannel


物理执行图

JobManager根据ExecutionGraph对作业进行调度,并在各个TaskManager上部署任务。这些任务在TaskManager上的实际执行过程就形成了物理执行图。物理执行图并不是一个具体的数据结构,而是描述了流处理任务在集群中的实际执行情况。
它包含的主要抽象概念有:Task、ResultPartition、ResultSubpartition、InputGate、InputChannel。
在这里插入图片描述


一、Task

Execution被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。
一个作业可以被划分为多个Task,并在不同的Task上并行执行。每个Task由一个或多个子任务(Subtask)组成,每个子任务在一个TaskSlot中运行。Task主要负责接收输入数据,执行数据转换和计算,并将结果发送到下游的算子中。

在Flink中,Task的执行由TaskExecutor来负责。Task.doRun()方法是引导Task初始化并执行其相关代码的核心方法。它会构造并实例化Task的可执行对象,即AbstractInvokable。AbstractInvokable.invoke()方法的执行过程中,如果正常执行完毕,会输出ResultPartition缓冲区数据,关闭缓冲区,并标记Task为Finished;如果因为取消操作导致退出,会标记Task为CANCELED,并关闭用户代码;如果执行过程中抛出异常,会标记Task为FAILED,关闭用户代码,并记录异常;如果执行过程中JVM抛出错误,会强制终止虚拟机,并退出当前进程。

二、ResultPartition

ResultPartition代表由一个Task生成的数据,并与ExecutionGraph中的IntermediateResultPartition一一对应。它实际上是一个缓存池,里面保存的是经过序列化之后的节点计算结果。每个ResultPartition包含多个ResultSubPartition,其数目由下游消费Task的数量和DistributionPattern来决定。ResultSubPartition是ResultPartition的一个子分区,真正持有缓冲区Buffer。

写入ResultPartition的操作由ResultPartition的add方法实现。此外,在shuffle阶段,ResultPartition的选择由ChannelSelector负责,它决定了序列化后的record应该写入哪个ResultSubPartition。

ResultPartition在Flink的物理执行图中扮演着重要角色,它确保了数据在Task之间的正确流动和传输,是构建高效、可靠数据流处理应用的关键组件之一。

三、ResultSubpartition

ResultSubpartition是ResultPartition的一个子分区,用于存储和传输数据。每个ResultPartition包含多个ResultSubpartition,其数量由下游消费Task的数量和DistributionPattern决定。这种设计有助于并行处理数据,提高处理效率。

ResultSubpartition负责接收上游Task生成的数据,并将其缓存起来,以便下游Task消费。同时,ResultSubpartition还负责数据的序列化、反序列化和传输,确保数据在不同Task之间的正确流动。

根据数据类型和传输需求,Flink提供了不同类型的ResultSubpartition实现。例如,PipelinedSubpartition是基于内存的管道模式的结果子分区,适用于低延迟的数据传输场景;BoundedBlockingSubpartition中是以阻塞的方式传输的,即数据先被写入,然后再被消费。这种机制确保了数据的有序性和一致性,避免了数据在传输过程中的丢失或乱序问题。

在Flink的物理执行图中,ResultSubpartition与InputGate和InputChannel紧密相关。每个InputGate消费一个或多个ResultPartition,而每个InputGate又包含一个或多个InputChannel。InputChannel与ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。这种设计使得数据能够按照预定的路径在Task之间流动,实现分布式数据流处理。

总的来说,ResultSubpartition是Flink数据流处理中的关键组件,它负责数据的存储、传输和消费,确保数据在不同Task之间的正确流动和高效处理。

创建ResultPartition、ResultSubpartition的相关源码

    public ResultPartition create(
            String taskNameWithSubtaskAndId,
            int partitionIndex,
            ResultPartitionID id,
            ResultPartitionType type,
            int numberOfSubpartitions,
            int maxParallelism,
            SupplierWithException<BufferPool, IOException> bufferPoolFactory) {
        BufferCompressor bufferCompressor = null;
        if (type.supportCompression() && batchShuffleCompressionEnabled) {
            bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);
        }

        ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions];

        final ResultPartition partition;
        if (type == ResultPartitionType.PIPELINED
                || type == ResultPartitionType.PIPELINED_BOUNDED
                || type == ResultPartitionType.PIPELINED_APPROXIMATE) {
            final PipelinedResultPartition pipelinedPartition =
                    new PipelinedResultPartition(
                            taskNameWithSubtaskAndId,
                            partitionIndex,
                            id,
                            type,
                            subpartitions,
                            maxParallelism,
                            partitionManager,
                            bufferCompressor,
                            bufferPoolFactory);

            for (int i = 0; i < subpartitions.length; i++) {
                if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
                    subpartitions[i] =
                            new PipelinedApproximateSubpartition(
                                    i, configuredNetworkBuffersPerChannel, pipelinedPartition);
                } else {
                    subpartitions[i] =
                            new PipelinedSubpartition(
                                    i, configuredNetworkBuffersPerChannel, pipelinedPartition);
                }
            }

            partition = pipelinedPartition;
        } else if (type == ResultPartitionType.BLOCKING
                || type == ResultPartitionType.BLOCKING_PERSISTENT) {
            if (numberOfSubpartitions >= sortShuffleMinParallelism) {
                partition =
                        new SortMergeResultPartition(
                                taskNameWithSubtaskAndId,
                                partitionIndex,
                                id,
                                type,
                                subpartitions.length,
                                maxParallelism,
                                batchShuffleReadBufferPool,
                                batchShuffleReadIOExecutor,
                                partitionManager,
                                channelManager.createChannel().getPath(),
                                bufferCompressor,
                                bufferPoolFactory);
            } else {
                final BoundedBlockingResultPartition blockingPartition =
                        new BoundedBlockingResultPartition(
                                taskNameWithSubtaskAndId,
                                partitionIndex,
                                id,
                                type,
                                subpartitions,
                                maxParallelism,
                                partitionManager,
                                bufferCompressor,
                                bufferPoolFactory);

                initializeBoundedBlockingPartitions(
                        subpartitions,
                        blockingPartition,
                        blockingSubpartitionType,
                        networkBufferSize,
                        channelManager,
                        sslEnabled);

                partition = blockingPartition;
            }
        } else if (type == ResultPartitionType.HYBRID_FULL
                || type == ResultPartitionType.HYBRID_SELECTIVE) {
            partition =
                    new HsResultPartition(
                            taskNameWithSubtaskAndId,
                            partitionIndex,
                            id,
                            type,
                            subpartitions.length,
                            maxParallelism,
                            batchShuffleReadBufferPool,
                            batchShuffleReadIOExecutor,
                            partitionManager,
                            channelManager.createChannel().getPath(),
                            networkBufferSize,
                            HybridShuffleConfiguration.builder(
                                            numberOfSubpartitions,
                                            batchShuffleReadBufferPool.getNumBuffersPerRequest())
                                    .setSpillingStrategyType(
                                            type == ResultPartitionType.HYBRID_FULL
                                                    ? HybridShuffleConfiguration
                                                            .SpillingStrategyType.FULL
                                                    : HybridShuffleConfiguration
                                                            .SpillingStrategyType.SELECTIVE)
                                    .build(),
                            bufferCompressor,
                            bufferPoolFactory);
        } 
        return partition;
    }

四、InputGate

InputGate是对数据输入的封装,与JobGraph中的JobEdge一一对应。每个InputGate消费一个或多个ResultPartition,这些ResultPartition代表上游Task生成的数据。InputGate的主要作用是管理和控制数据的流入,确保数据能够按照正确的顺序和方式被Task所消费。

InputGate由多个InputChannel构成,每个InputChannel与ExecutionGraph中的ExecutionEdge以及ResultSubpartition一一对应。这意味着每个InputChannel负责接收一个ResultSubpartition的输出,从而实现了数据的精确传递和接收。

在Flink的物理执行过程中,InputGate和InputChannel起着至关重要的作用。它们不仅负责数据的接收和传递,还参与了数据的序列化和反序列化过程,确保数据在不同Task之间的正确流动。此外,InputGate和InputChannel还提供了对数据传输的控制和优化功能,可以根据实际需求调整数据传输的策略和方式。

总的来说,Flink的InputGate通过对数据输入的封装和管理,实现了数据的精确传递和高效处理。

五、InputChannel

InputChannel是数据输入通道的关键组件,它位于InputGate之下,与ExecutionGraph中的ExecutionEdge以及ResultSubpartition一对一地相连。每个InputChannel负责接收一个ResultSubpartition的输出,确保数据从上游Task正确地流向下游Task。

根据消费的ResultPartition的位置,InputChannel有两种不同的实现:LocalInputChannel和RemoteInputChannel。LocalInputChannel用于处理本地数据交换,即数据在同一TaskManager的不同Task之间传输;而RemoteInputChannel则负责远程数据交换,即数据在不同TaskManager的Task之间传输。这种设计使得Flink能够灵活地处理分布式环境中的数据流动。

此外,还有一个名为UnknownInputChannel的实现类,它作为尚未确定ResultPartition位置的情况下的占位符。在实际执行过程中,UnknownInputChannel最终会被更新为LocalInputChannel或RemoteInputChannel,以反映实际的数据传输路径。

InputChannel在Flink的数据流处理中扮演着重要角色。它不仅是数据传输的通道,还参与数据的序列化和反序列化过程,确保数据在传输过程中的完整性和一致性。同时,InputChannel与InputGate和ResultSubpartition的紧密协作,使得Flink能够高效地处理大规模、高吞吐量的数据流。

总结来说,Flink InputChannel负责数据的接收、传输和序列化,确保数据在不同Task之间的正确流动。通过LocalInputChannel和RemoteInputChannel的不同实现,Flink能够处理各种分布式场景下的数据交换需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1501803.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

快速上手:使用Hexo搭建并自定义个人博客

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

算法---双指针练习-6(查找总价格为目标值的两个商品)

查找总价格为目标值的两个商品 1. 题目解析2. 讲解算法原理3. 编写代码 1. 题目解析 题目地址&#xff1a;点这里 2. 讲解算法原理 算法的基本思想是首先初始化两个指针begin和end&#xff0c;分别指向数组的起始位置和末尾位置。 接下来&#xff0c;算法使用一个循环来移动e…

瑞芯微 | I2S-音频基础分享

1. 音频常用术语 名称含义ADC&#xff08;Analog to Digit Conversion&#xff09;模拟信号转换为数字信号AEC&#xff08;Acoustic Echo Cancellor&#xff09;回声消除AGC&#xff08;Automatic Gain Control&#xff09;自动增益补偿&#xff0c;调整MIC收音量ALSA&#xf…

Stable Diffusion 3报告

报告链接&#xff1a;https://stability.ai/news/stable-diffusion-3-research-paper 文章目录 要点表现架构细节通过重新加权改善整流流量Scaling Rectified Flow Transformer Models灵活的文本编码器RF相关论文 要点 发布研究论文&#xff0c;深入探讨Stable Diffuison 3的…

HarmonyOS 数据持久化 关系型数据库之 初始化操作

上文 HarmonyOS 数据持久化之首选项 preferences 我们有说用户首选项 但它只能处理一些比较简单的数据类型结构 的持久化处理 如果是一些批量较大 结构较为复杂的数据结构 那么 首选项就无法满足了 我们就要选择 关系型数据库 通过 SQLite 组件实现的一种本地数据库&#xff0…

IPSEC VPPN实验

实验背景&#xff1a;FW1和FW2是双机热备的状态。 实验要求&#xff1a;在FW和FW3之间建立一条IPSEC通道&#xff0c;保证10.0.2.0/24网段可以正常访问到192.168.1.0/24 IPSEC VPPN实验配置&#xff08;由于是双机热备状态&#xff0c;所以FW1和FW2只需要配置FW1主设备即可&…

企业专属采购商城搭建,对接电商平台数量越多越好吗?

近年来在国家政策驱动和国央企的引领示范下&#xff0c;企业采购逐渐从线下向电商化迈进&#xff0c;采购电商平台的应用让越来越多的传统企业、中小企业开始意识到数字化商城采购的价值。搭建企业自有专属采购商城&#xff0c;内接企业各类信息管理系统&#xff0c;外联电商采…

spring-cloud-openfeign 3.0.0(对应spring boot 2.4.x之前版本)之前版本feign整合ribbon请求流程

在之前写的文章配置基础上 https://blog.csdn.net/zlpzlpzyd/article/details/136060312 下图为自己整理的

阿里云k8s内OSS报错UnKnownHost。

这个问题就是链接不上oss属于网络问题&#xff1a; 1.排查服务器 在服务器&#xff08;ecs&#xff09;上直接ping oss地址看是否能够通。 不通就要修改dns和hosts&#xff08;这个不说&#xff0c;自己网上查&#xff09; 2.排查容器 进去ping一下你的容器是否能访问到oss…

07.axios封装实例

一.简易axios封装-获取省份列表 1. 需求&#xff1a;基于 Promise 和 XHR 封装 myAxios 函数&#xff0c;获取省份列表展示到页面 2. 核心语法&#xff1a; function myAxios(config) {return new Promise((resolve, reject) > {// XHR 请求// 调用成功/失败的处理程序}) …

生活的色彩--爱摸鱼的美工(17)

题记 生活不如意事十之八九&#xff0c; 恶人成佛只需放下屠刀&#xff0c;善人想要成佛却要经理九九八十一难。而且历经磨难成佛的几率也很小&#xff0c;因为名额有限。 天地不仁以万物为刍狗&#xff01; 小美工记录生活&#xff0c;记录绘画演变过程的一天。 厨房 食…

BUUCTF---[MRCTF2020]你传你呢1

1.题目描述 2.打开题目链接 3.上传shell.jpg文件&#xff0c;显示连接成功&#xff0c;但是用蚁剑连接却连接不上。shell文件内容为 <script languagephp>eval($_REQUEST[cmd]);</script>4.用bp抓包&#xff0c;修改属性 5.需要上传一个.htaccess的文件来把jpg后缀…

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:Marquee)

跑马灯组件&#xff0c;用于滚动展示一段单行文本。仅当文本内容宽度超过跑马灯组件宽度时滚动&#xff0c;不超过时不滚动。 说明&#xff1a; 该组件从API Version 8开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 子组件 无 接口 Ma…

CleanMyMac X4.15.0专为macOS设计的清理和优化工具

CleanMyMac X 是一款专为 macOS 设计的清理和优化工具。其基本功能和特点主要包括&#xff1a; 系统清理&#xff1a;CleanMyMac X 可以扫描并清除 macOS 系统中的垃圾文件&#xff0c;如缓存、日志、无用的语言文件等&#xff0c;从而释放硬盘空间并提高系统性能。应用程序管…

Webpack常用配置及作用

一 、 二、 三、 四、 五、 六、 七、 八、

AVL树讲解

AVL树 1. 概念2. AVL节点的定义3. AVL树插入3.1 旋转 4.AVL树的验证 1. 概念 AVL树是一种自平衡二叉搜索树。它的每个节点的左子树和右子树的高度差&#xff08;平衡因子&#xff0c;我们这里按右子树高度减左子树高度&#xff09;的绝对值不超过1。AVL的左子树和右子树都是AV…

Cloud-Eureka服务治理-Ribbon负载均衡

构建Cloud父工程 父工程只做依赖版本管理 不引入依赖 pom.xml <packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEA…

AIGC启示录:深度解析AIGC技术的现代性与系统性的奇幻旅程

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

ES入门二:文档的基本操作

索引管理 创建索引 删除索引 文档管理 创建文档 如果有更新需求&#xff0c;使用第一种如果有唯一性校验&#xff0c;使用第二种如果需要系统给你创建文档Id&#xff0c;使用第三种 &#xff08;这个性能更好&#xff09; 相比第一种&#xff0c;第三种的写入效率更高&#xf…

公网IP与私有IP及远程互联

1.公网有私有IP及NAT 公网IP是全球唯一的IP&#xff0c;通过公网IP&#xff0c;接入互联网的设备是可以访问你的设备。但是IPV4资源有限&#xff0c;一般ISP(Internet Service Provider)并不会为用户提供公网IP。所以家里的计算机在公司是没法直接使用windows远程桌面直接访问…