Flink Job 执行流程

news2025/1/24 2:20:23

Flink On Yarn 模式

在这里插入图片描述

基于Yarn层面的架构类似 Spark on Yarn模式,都是由Client提交AppRM上面去运行,然后 RM分配第一个container去运行AM,然后由AM去负责资源的监督和管理。需要说明的是,FlinkYarn模式更加类似Spark on Yarncluster模式,在cluster模式中,dirver将作为AM中的一个线程去运行。Flink on Yarn模式也是会将JobManager启动在container里面,去做个driver类似的任务调度和分配,Yarn AMFlink JobManager在同一个Container,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在Yarn集群上,Flink Yarn Client就可以提交Flink JobFlink JobManager,并进行后续的映射、调度和计算处理。

Fink on Yarn 的缺陷

【1】资源分配是静态的,一个作业需要在启动时获取所需的资源并且在它的生命周期里一直持有这些资源。这导致了作业不能随负载变化而动态调整,在负载下降时无法归还空闲的资源,在负载上升时也无法动态扩展。
【2】On-Yarn模式下,所有的container都是固定大小的,导致无法根据作业需求来调整container的结构。譬如CPU密集的作业或需要更多的核,但不需要太多内存,固定结构的container会导致内存被浪费。
【3】与容器管理基础设施的交互比较笨拙,需要两个步骤来启动Flink作业:1.启动Flink守护进程;2.提交作业。如果作业被容器化并且将作业部署作为容器部署的一部分,那么将不再需要步骤2。
【4】On-Yarn模式下,作业管理页面会在作业完成后消失不可访问。
【5】Flink推荐 per job clusters 的部署方式,但是又支持可以在一个集群上运行多个作业的session模式,令人疑惑。

Flink版本1.5中引入了DispatcherDispatcher是在新设计里引入的一个新概念。Dispatcher会从Client端接受作业提交请求并代表它在集群管理器上启动作业。引入Dispatcher的原因主要有两点:
【1】一些集群管理器需要一个中心化的作业生成和监控实例;
【2】能够实现Standalone模式下JobManager的角色,且等待作业提交。在一些案例中,Dispatcher是可选的Yarn或者不兼容的kubernetes

资源调度模型重构下的 Flink On Yarn 模式

[点击并拖拽以移动] ​

客户端提交JobGraph以及依赖jar包到YarnResourceManager,接着Yarn ResourceManager分配第一个container以此来启动AppMasterApplication Master中会启动一个FlinkResourceManager以及JobManagerJobManager会根据JobGraph生成的ExecutionGraph以及物理执行计划向FlinkResourceManager申请slotFlinkResoourceManager会管理这些slot以及请求,如果没有可用slot就向YarnResourceManager申请containercontainer启动以后会注册到FlinkResourceManager,最后JobManager会将subTask deploy到对应containerslot中去。
[点击并拖拽以移动] ​

在有Dispatcher的模式下:会增加一个过程,就是Client会直接通过HTTP Server的方式,然后用Dispatcher将这个任务提交到Yarn ResourceManager中。

新框架具有四大优势,详情如下:
【1】client直接在Yarn上启动作业,而不需要先启动一个集群然后再提交作业到集群。因此client再提交作业后可以马上返回。
【2】所有的用户依赖库和配置文件都被直接放在应用的classpath,而不是用动态的用户代码classloader去加载。
【3】container在需要时才请求,不再使用时会被释放。
【4】“需要时申请”的container分配方式允许不同算子使用不同profile (CPU和内存结构)的container

新的资源调度框架下 single cluster job on Yarn 流程介绍

[点击并拖拽以移动] ​

single cluster job on Yarn模式涉及三个实例对象:
【1】clifrontend Invoke App code;生成StreamGraph,然后转化为JobGraph
【2】YarnJobClusterEntrypoint(Master) 依次启动YarnResourceManagerMinDispatcherJobManagerRunner三者都服从分布式协同一致的策略;JobManagerRunnerJobGraph转化为ExecutionGraph,然后转化为物理执行任务Execution,然后进行deploydeploy过程会向 YarnResourceManager请求slot,如果有直接deploy到对应的YarnTaskExecutiontorslot里面,没有则向YarnResourceManager申请,带container启动以后deploy
【3】YarnTaskExecutorRunner (slave) 负责接收subTask,并运行。

整个任务运行代码调用流程如下图

[点击并拖拽以移动] ​

subTask在执行时是怎么运行的?

调用StreamTaskinvoke方法,执行步骤如下:
【1】initializeState()operatorinitializeState()
【2】openAllOperators()operatoropen()方法;
【3】最后调用run方法来进行真正的任务处理;

我们来看下flatMap对应的OneInputStreamTaskrun方法具体是怎么处理的。

@Override
protected void run() throws Exception {
    // 在堆栈上缓存处理器引用,使代码更易于JIT
    final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

    while (running && inputProcessor.processInput()) {
        // 所有的工作都发生在“processInput”方法中
    }
}

最终是调用StreamInputProcessorprocessInput()做数据的处理,这里面包含用户的处理逻辑。

public boolean processInput() throws Exception {
    if (isFinished) {
        return false;
    }
    if (numRecordsIn == null) {
        try {
            numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
        } catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", e);
           numRecordsIn = new SimpleCounter();
       }
   }
   while (true) {
       if (currentRecordDeserializer != null) {
           DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
           if (result.isBufferConsumed()) {
               currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
               currentRecordDeserializer = null;
           }
           if (result.isFullRecord()) {
               StreamElement recordOrMark = deserializationDelegate.getInstance();
               //处理watermark
               if (recordOrMark.isWatermark()) {
                   // handle watermark
                   //watermark处理逻辑,这里可能引起timer的trigger
                   statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                   continue;
               } else if (recordOrMark.isStreamStatus()) {
                   // handle stream status
                   statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                   continue;
                   //处理latency watermark
               } else if (recordOrMark.isLatencyMarker()) {
                   // handle latency marker
                   synchronized (lock) {
                       streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                   }
                   continue;
               } else {
                   //用户的真正的代码逻辑
                   // now we can do the actual processing
                   StreamRecord<IN> record = recordOrMark.asRecord();
                   synchronized (lock) {
                       numRecordsIn.inc();
                       streamOperator.setKeyContextElement1(record);
                       //处理数据
                       streamOperator.processElement(record);
                   }
                   return true;
               }
           }
       }
            
       //这里会进行checkpoint barrier的判断和对齐,以及不同partition 里面checkpoint barrier不一致时候的,数据buffer,
       final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
       if (bufferOrEvent != null) {
           if (bufferOrEvent.isBuffer()) {
               currentChannel = bufferOrEvent.getChannelIndex();
               currentRecordDeserializer = recordDeserializers[currentChannel];
               currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
           }
           else {
               // Event received
               final AbstractEvent event = bufferOrEvent.getEvent();
               if (event.getClass() != EndOfPartitionEvent.class) {
                   throw new IOException("Unexpected event: " + event);
               }
           }
       }
       else {
           isFinished = true;
           if (!barrierHandler.isEmpty()) {
               throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
           }
           return false;
       }
   }
}

streamOperator.processElement(record)最终会调用用户的代码处理逻辑,假如operatorStreamFlatMap的话。

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
    collector.setTimestamp(element);
    userFunction.flatMap(element.getValue(), collector);//用户代码
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1339048.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Hive安装笔记——备赛笔记——2024全国职业院校技能大赛“大数据应用开发”赛项——任务2:离线数据处理

将下发的ds_db01.sql数据库文件放置mysql中 12、编写Scala代码&#xff0c;使用Spark将MySQL的ds_db01库中表user_info的全量数据抽取到Hive的ods库中表user_info。字段名称、类型不变&#xff0c;同时添加静态分区&#xff0c;分区字段为etl_date&#xff0c;类型为String&am…

【电商项目实战】基于SpringBoot完成首页搭建

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《电商项目实战》。&#x1f3af;&#x1f3af; &am…

【PyQt学习篇 · ⑭】:QTableView的使用

文章目录 QTableView的使用示例 QTableView的使用 QTableView 是 PyQt 中用于显示表格数据的窗口部件&#xff0c;它提供了一个灵活的方式来显示和编辑数据。下面是一些关于 QTableView 的使用的具体信息&#xff1a; 创建 QTableView 对象&#xff1a; from PyQt5.QtWidgets …

计算机网络基础:OSI参考模型是什么?

一、概述 OSI (Open Systems Interconnection Model,开放式系统互联模型)&#xff0c;由ISO ( International Organization for Standardization&#xff0c;国际标准化组织 ) 收录在ISO 7489标准中并于1984年发布。 意义&#xff1a; 在OSI没有出来之前我们的网络有如下问题…

Linux Debian12使用podman安装upload-labs靶场环境

一、upload-labs简介 PHP语言编写&#xff0c;持续收集渗透测试和CTF中针对文件上传漏洞的靶场&#xff0c;总共21关&#xff0c;每一关都包含着不同的上传绕过方式。 二、安装podman环境 Linux Debian系统如果没有安装podman容器环境&#xff0c;可以参考这篇文章先安装pod…

如何在vscode当中预览html文件运行结果

如何在vscode当中预览html文件运行结果 下载拓展内容打开拓展界面下载拓展 运行html文件参考内容 上一篇文章当中讲了如何实现在网页上对html文件的预览,但是这样子其实在运行代码的过程当中效果比较差,那么还需要可以实时预览运行的结果 下载拓展内容 打开拓展界面 下载拓展 …

微信小程序发放红包封面及领取

微信小程序发放红包封面及领取 一、微信红包封面开放平台配置发放的红包封面二、小程序后管平台设置配置录入红包封面奖品信息三、微信小程序调用接口效果 一、微信红包封面开放平台配置发放的红包封面 微信红包封面开放平台 红包封面的发放方式有&#xff1a;领取二维码、领…

unity 保存和加载窗口布局

这么简单的事网上一堆废话文章 右上角&#xff0c;Layout点开后有保存和删除 要切换布局点红框里的已经保存的布局

Linux下MQTT环境的简单应用及搭建——之Mosquitto

文章目录 前言一、ubuntu搭建mqtt服务器 | 概要二、整体架构流程 | 技术实现细节1、下载源码2、安装Mosquitto3、解压并修改配置文件4、关于Mosquitto常见的一些操作指令5、启动mosquitto6、测试mosquitto测试1&#xff1a;Linux多终端交互测试测试2&#xff1a;Linux与Windows…

2023安洵杯-秦岭防御军wp

reverse 感觉有点点简单## import base64 def ba64_decode(str1_1):mapp "4KBbSzwWClkZ2gsr1qAQu0FtxOm6/iVcJHPY9GNp7EaRoDf8UvIjnL5MydTX3eh"data_1 [0] * 4flag_1 [0] * 3for i in range(32, 127):for y in range(32, 127):for k in range(32, 127):flag_1[0]…

MIT线性代数笔记-第30讲-奇异值分解

目录 30.奇异值分解打赏 30.奇异值分解 奇异值分解&#xff08;简称 S V D SVD SVD分解&#xff09;可以将一个比较复杂的矩阵用更小更简单的几个子矩阵相乘来表示&#xff0c;这些小矩阵描述的都是矩阵的重要的特性。奇异值分解在图形降噪、推荐系统中都有很重要的应用。 对…

最优化考试之牛顿法

最优化考试之牛顿法 一、牛顿法1.问题条件2.求解过程3.例子 PS 一、牛顿法 1.问题条件 目标函数 f ( x ) f(x) f(x)&#xff0c;求极小值初始点 x 0 x_0 x0​精度要求e&#xff08;没有提就是近似0&#xff09; 2.求解过程 求解一阶雅克比矩阵 ∇ f ( x ) ∇f(x) ∇f(x)和二…

分享11 种有用的 JavaScript 技巧

今天这篇文章&#xff0c;我想与你分享 11个有用的JavaScript实用小技巧&#xff0c;它们将极大地提高你的工作效率。 1.生成随机颜色的两种方式 1&#xff09;.生成RandomHexColor const generateRandomHexColor () > {return #${Math.floor(Math.random() * 0xffffff)…

Java多线程的概念以及三种实现方式(Thread类,Callable接口,Runnable接口)

目录 1.线程2.多线程的应用场景3.并发和并行4.多线程的实现方式1.继承Thread类的方式进行实现2.实现**Runnable接口**的方式进行实现3.利用Callable接口和Future接口方式实现 4.多线程几种实现方式之间的对比 1.线程 线程是操作系统能够进行运算调度的最小单位。它被包含在进程…

Jmeter之JSON断言

需求 我们发送一个请求&#xff0c;结果返回json数据&#xff0c;我们需要根据json数据中code的值来判断此次请求是否成功。 接口案例&#xff1a; PostMapping(value "/login") public ResponseMessage<String> login(RequestBody SeckillUserDTO seckill…

LeetCode-移除元素(27) 合并两个有序数组(88)

1.移除元素&#xff08;27&#xff09; 题目描述&#xff1a; 给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 不要使用额外的数组空间&#xff0c;你必须仅使用 O(1) 额外空间并 原地 修改输入…

Session的使用详解(创建,获取和销毁)

文章目录 Session的使用详解&#xff08;创建&#xff0c;获取和销毁&#xff09;1、为什么使用session,与cookie的区别2、session是什么3、session的常用方法4、session的构造和获取代码演示SetSessionServlet.javaGetSessionServlet.javaweb.xml运行结果如下: 5、销毁session…

跨境电商营销工具:功能详解与实战应用!

在当今全球化的电商市场中&#xff0c;跨境电商营销工具已经成为企业不可或缺的营销利器。 这些工具为企业提供了丰富的功能&#xff0c;帮助企业更好地拓展海外市场、提升品牌影响力&#xff0c;本文将为您详细解读跨境电商营销工具的一般功能&#xff0c;以及如何在实际操作…

【JavaScript】垃圾回收与内存泄漏

✨ 专栏介绍 在现代Web开发中&#xff0c;JavaScript已经成为了不可或缺的一部分。它不仅可以为网页增加交互性和动态性&#xff0c;还可以在后端开发中使用Node.js构建高效的服务器端应用程序。作为一种灵活且易学的脚本语言&#xff0c;JavaScript具有广泛的应用场景&#x…

HPM6750开发笔记《开发环境的搭建》

目录 一&#xff0c;下载完整的HPM—SDK 二&#xff0c;安装硬件驱动 二&#xff0c;软件激活 三&#xff0c;创建工程 1.用文档中给的方法创建工程&#xff1a; 2.用sdk_env_v1.3.0中提供的工具创建工程&#xff1a; 一&#xff0c;下载完整的HPM—SDK 下载网址&#x…