【基础篇】七、Flink核心概念

news2025/2/28 21:33:22

文章目录

  • 1、并行度
  • 2、并行度的设置
  • 3、算子链
  • 4、禁用算子链
  • 5、任务槽
  • 6、任务槽和并行度的关系

1、并行度

要处理的数据量很多时,可以把一个算子的操作(比如前面demo里的flatMap、sum),"复制"多份到多个节点,数据来了以后可以到任意一个节点执行。即将一个算子任务拆分成多个并行的子任务,再分发到不同的节点上执行,实现真正的并行计算。(好绕口,就是把一个活儿让好几个Task节点共同去做)

在Flink执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行

在这里插入图片描述

某一个算子的子任务的个数被称之为其并行度(parallelism)。 一条流水线上,几个人在同时干着打螺丝,几个人在同时处理着焊电路板。同一个程序,不同的算子,可以有不同的并行度。一个流程序的并行度,可以认为就是其所有算子中最大的并行度。如上图,source、map、window、sink四个算子,sink为1,其余为2,则这个流处理程序的并行度为2。

2、并行度的设置

方式一:代码中设置

算子后跟着调用setParallelism()方法为某一个算子设置并行度

stream.map(word -> Tuple2.of(word, 1L))
	.setParallelism(2);   //map算子并行度为2

执行环境对象后面调setParallelism()方法设置全局并行度,对所有算子生效

env.setParallelism(2);

一般不设全局,会导致无法动态扩容。

方式二:提交应用时指令中设置

-p参数来指定当前应用程序执行的并行度,类似上面的全局设置

bin/flink run –p 2 –c com.plat.SocketStreamWordCount  ./FlinkDemo-1.0-SNAPSHOT.jar

这种和Web控制台设置一个意思:

在这里插入图片描述

方式三:配置文件中设置

在集群的配置文件flink-conf.yaml中直接更改默认并行度:

parallelism.default: 2

这个设置对于整个集群上提交的所有作业有效,初始值为1。当代码中没设置、提交时没指定,就用这个配置文件的。在开发环境中,没有配置文件,默认并行度就是当前机器的CPU核心数

在这里插入图片描述

最后,本地调试想看控制台界面,可创建本地环境执行对象,用于本地调试:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createlocalEnvironmentWithwebuI(new Configuration());

访问localhosr:8081,socket是特殊的,只能是1,改不了,其余算子均为4。

在这里插入图片描述
最后,这几种方式的优先级为:代码中为某算子单独设定 > 代码中执行环境对象全局设置 > 提交时指定 > 配置文件

3、算子链

一个数据流,数据在各种算子之间传输的形式可能是一对一(one-to-one)的直通(forwarding),也可能是打乱的重分区(redistributing)。

在这里插入图片描述

一对一(One-to-one,forwarding)

如上图,source算子读完数据后,可以直接发给map算子接着处理。map 算子的子任务,看到的元素个数和顺序跟source 算子的子任务产生的完全一样,即一对一,一个算子的task和一个算子的task数据一样。特点是:

  • 数据不需要重新分区
  • 数据不需要调整顺序

重分区(Redistributing)

和一对一的直流相反,此时数据的分区会发生改变,如图中,map完数据后,直接keyBy/window(注意keyBy自身不是算子),按key分组了。也就是每一个算子的子任务task,会根据某些规则,把数据发送到不同的下游task,从而引起了数据重分区。

合并算子链

在Flink中,并行度相同一对一(one to one)算子操作,可以直接连接在一起形成一个大的任务(task),每个task又会被一个线程执行,即算子链。合并的条件:

  • 两算子并行度相等(子任务数量一样)
  • 两算子为one to one的直流关系

在这里插入图片描述

上图中Source和map之间满足了算子链的要求,所以可以直接合并在一起,形成了一个任务;因为并行度为2,所以合并后的任务也有两个并行子任务。这样,这个数据流图所表示的作业最终会有5个任务,由5个线程并行执行合并算子链的机制,可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。

4、禁用算子链

Flink默认会按照算子链的原则进行链接合并,但有的场景不适合合并,比如:

  • 两个算子串在一起,它们的子任务task搭配形成n组(n为并行度),每组共用一个线程,但如果两个算子本身计算任务都很重,那就不适合串一起,就像两个脾气都差的人合租,此时应该断开算子链
  • 当出现错误,需要定位问题是哪个算子时,就要禁用算子链

全局禁用算子链:

//env为执行环境对象
env.disableOperatorChaining();

disableChaining方法可只给某个算子设置禁用算子链,那它和它前后的算子就都不能再组成算子链(控制台上UI会显示Forward,表明本来是一对一的算子链关系)

.map(word -> Tuple2.of(word, 1L)).disableChaining();

在这里插入图片描述

startNewChain方法,从当前算子开始新链,即只和前面的算子断开,和后面的算子能串一起的话还是会串

// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()

5、任务槽

Flink中每一个TaskManager都是一个JVM进程,它可以启动多个独立的线程,来并行执行子任务。但每个TaskManager总的计算资源有限,并行任务越多,每个线程能分到的可用资源就越少,为了限制TaskManager能并行处理的最大任务数,提出任务槽(task slots)的概念,对TaskManager上对每个任务运行所占用的资源做出明确的划分。一锅饭,能盛6碗,谁来都夹一筷子,谁都吃不饱,因此,锅前放6个碗,也就是分为6碗饭,来一个人,就端走一碗,端没了别人就去其他锅,分到饭的六个人,也不用和别人抢,且能吃饱。这个碗就是任务槽。

在这里插入图片描述

比如一个TaskManager上有三个slot,那就把这个TaskManager的内存资源分为三份,一个插槽一份。如此,在插槽上执行一个子任务时,就相当于划定了一块内存给这个子任务专款专用,不需要和其他子任务去竞争内存资源。前面提到的合并成算子链后的5个子任务,两个TaskManager就可实现,如上图。

任务槽数量的设置

在flink安装目录的conf/flink-conf.yaml配置文件中,可以设置每个TaskManager的slot数量,默认是1个slot。

taskmanager.numberOfTaskSlots: 8

slot目前仅仅用来隔离内存,不会涉及CPU的隔离在具体应用时,可以将slot数量配置为机器的CPU核心数,尽量避免不同任务之间对CPU的竞争。这也是开发环境默认并行度设为机器CPU数量的原因。(这就像合租,内存就像卧室,厕所就像CPU,三个人,三间房,但一个厕所也够用,类比CPU时间片和线程切换)

子任务task对任务槽的共享

上面讲到,一人一碗饭,一个子任务一个插槽。而插槽的共享,就是放宽了政策,不同类型的算子,它们的并行子任务允许放到同一个插槽上并行执行(注意,依旧并行)。如下图,两个TaskManager,6个插槽,每个插槽上的子任务对应的算子种类都不一样。

在这里插入图片描述

如上图所示,只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个slot上执行。所以对于第一个任务节点source→map,它的6个并行子任务必须分到不同的slot上,而第二个任务节点keyBy/window/apply的并行子任务却可以和第一个任务节点共享slot。slot共享的好处在于:

  • 活儿大致平均分配到了所有的TaskManager
  • slot有好几种算子的子任务,组合起来就是一个完整的作业管道或者流。此时,即使某个TaskManager宕机,其他节点也不受影响,作业继续执行

如果不希望默认的slot共享,比如需要让某个算子的task独享一个slot,就可以设置slot共享组

.map(word -> Tuple2.of(word, 1L)).slotSharingGroup("taskTest");

只有属于同一个slot共享组的子任务,才会开启slot共享,这个组默认是default,不同slot共享组之间的任务是完全隔离的,必须分配到不同的slot上。

6、任务槽和并行度的关系

  • 任务槽slot是一个静态概念,表示最大的并发上限。假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9个task slot,表示集群最多能并行执行同一算子的9个子任务。
  • 并行度是一个动态概念,表示实际运行占用了几个。比如并行度为4,即这个算子有4个子任务task,需要放在4个插槽上。此时,并行度为4,slot为9。

Job运行时,必须插槽slot的数量必须大于等于并行度,否则任务运行失败:NoResourceAvailableException:could not acquire the minimun required resources 。注意Yarn等模式部署时,会动态申请TaskManager,申请的TM的数量 = job并行度 /每个TM的slot的数量,向上取整。

比如,某算子并行度为10,即它有10个task要放在不同的插槽上,此时插槽有9个,那就不能运行,而不是9个跑完再让第十个执行。再比如,一个Flink程序中定义了4个算子:

source→ flatmap→ reduce→ sink

前提: flink-conf.yaml中taskmanager.numberOfTaskSlots数量为3(建议为CPU核心数),假设TaskManager数量也为3,即插槽有3*3=9个

Case1:并行度parallelism.default=1

在这里插入图片描述

分析:4种算子,并行度为1 ⇒ 其中两个形成算子链算一个 ⇒ 三个子任务 ⇒ 同一作业的不同种类的算子的任务,共享任务槽 ⇒ 总共占用一个插槽,剩8个可用

Case2:全局并行度为2

在这里插入图片描述

分析:三种算子,并行度为2 ⇒ 其中两个形成算子链算一个 ⇒六个子任务 ⇒ 插槽共享 ⇒ 总共占用2个插槽,剩7个可用 ⇒ 计算机资源利用不充分,设置合适的并行度才能提高效率

Case3:全局并行度为9

分析: 并行度为9 ⇒ 一种算子有9个子任务 ⇒ 插槽共享 ⇒ 占九个

在这里插入图片描述

Case4:全局set为9,但sink算子set为1

在这里插入图片描述

分析: 并行度为9 ⇒ 一种算子有9个子任务 ⇒ 29 + 11 = 19个子任务 ⇒ 插槽共享

最后,可以看到,整个流处理程序的并行度,就是所有算子并行度的最大值,因为这代表了程序运行所需要的插槽slot的数量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1087129.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NovelAi本地部署版详细教程

这几天NovelAI模型泄露了。那就凑巧了,就以这个模型为例。完整的介绍一下stable-diffusion-webui本地安装方法几乎是从零开始说起(除了不教操作系统安装)。WebUI就是stable-diffusion的可视化版本! 本地安装的好处是: …

IDEA实现远程Debug调试

一、 前提   需要准备JDK1.8环境,安装IDEA(版本不限) 二、 IDEA中如何实现远程Debug模式 (1)、创建demo项目 1.File一>New一>project… 2.Maven Archetype一>填写Name一>选择jdk1.8一>选择Web一>创建 (2)、配置Idea 找到Remote Jvm Debug java…

A Better Finder Rename 12 for Mac——让重命名变得更简单

A Better Finder Rename 12 for Mac是一款专业的批量重命名工具,为您提供了快速、简单、可靠的重命名解决方案。无论您是否需要批量重命名文件、图像、音频或视频文件等,A Better Finder Rename 12 for Mac可以帮助您快速完成任务,节省宝贵的…

计算机网络-计算机网络体系结构-数据链路层

目录 *一、组帧 1.1字符计数法 1.2字符填充法 1.3零比特填充法 1.4违规编码 *二、差错控制 2.1检错编码 2.2.1奇偶校验码 2.2.2 CRC循环冗余码 2.2纠错编码-海明码 *三、流量控制和可靠传输机制 流量控制 停止-等待协议 ​编辑 后退n帧协议的滑动窗口(GBN) 选择…

效率出图!9款最好用的矢量图软件推荐

设计可以让我们将想法和想象力变成可视化的现实,数字时代的设计,对于细节的要求则更高,矢量绘图必不可少。和我们常见的png、jpeg等格式的图片不同,矢量图格式一般有SVG、EPS、AI等,它是通过数学方程来绘制的&#xff…

解决MySQL错误-this is incompatible with sql_mode=only_full_group_by

报错 Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggregated column ‘数据库名.表名.字段名’ which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_modeonly_full_group_by 原因 MySQL错误-t…

Spring Boot中的 @Aspect 注解是什么,如何使用

Spring Boot中的 Aspect 注解是什么,如何使用 在Spring Boot应用程序中,面向切面编程(AOP)是一种重要的编程范例,它可以用来处理横切关注点,例如日志记录、事务管理、性能监测等。Aspect 注解是Spring Fra…

uni-app开发微信小程序的报错[渲染层错误]排查及解决

一、报错信息 [渲染层错误] Framework nner error (expect FLOW INITIALCREATION end but get FLOW CREATE-NODE) 二、原因分析及解决方案 第一种 原因:基础库版本的原因导致的。 解决: 1.修改调试基础库版本 2.详情—>本地设置—>调试基础库…

2023转行要趁早!盘点网络安全的岗位汇总

前段时间,知名机构麦可思研究院发布了《2022年中国本科生就业报告》,其中详细列出近五年的本科绿牌专业,信息安全位列第一。 对于网络安全的发展与就业前景,知了姐说过很多,作为当下应届生收入较高的专业之一&#xf…

【NeRF】1、NeRF 是什么

NeRF 最早是在 ECCV2020 中提出的方法,还获得了 ECCV2020 Oral 论文:Representing Scenes as Neural Radiance Fields for View Synthesis代码:https://github.com/bmild/nerf官网:https://www.matthewtancik.com/nerf Neural R…

Spring Boot中的异步编程:解决的问题与应用场景

Spring Boot中的异步编程:解决的问题与应用场景 在现代Web应用程序中,高并发和性能是至关重要的。为了处理大量的请求和任务,异步编程成为了不可或缺的一部分。Spring Boot提供了强大的异步编程支持,可以显著提高应用程序的吞吐量…

Avalonia使一个弹窗弹到指定位置

1.项目下载地址&#xff1a;https://gitee.com/confusedkitten/avalonia-demo 2.UI库Semi.Avalonia&#xff0c;项目地址 https://github.com/irihitech/Semi.Avalonia 3.样式预览 4.PositionControl.axaml <UserControl xmlns"https://github.com/avaloniaui&quo…

TimesNet:时间序列预测的最新模型

2020年发布的N-BEATS、2022年发布的N-HiTS和2023年3月发布的PatchTST开始。N-BEATS和N-HiTS依赖于多层感知器架构&#xff0c;而PatchTST利用了Transformer架构。 2023年4月发表了一个新的模型&#xff0c;它在时间序列分析的多个任务中实现了最先进的结果&#xff0c;如预测、…

pdf怎么压缩?pdf文件缩小的方法在这里

PDF文件由于其跨平台、可打印性强等特点&#xff0c;成为了我们日常工作中经常使用的一种文件格式。然而&#xff0c;这种格式的文件有时候会因为过于庞大而给我们的存储和传输带来困扰&#xff0c;其实&#xff0c;这种情况只需要通过一些工具对PDF文件进行压缩&#xff0c;即…

网站为什么需要https证书以及如何申请

随着互联网的快速发展&#xff0c;网站的安全性问题越来越受到人们的关注。因此&#xff0c;越来越多的网站开始使用https证书&#xff0c;以保护用户的数据安全和隐私。那么&#xff0c;网站为什么需要https证书呢&#xff1f; 首先&#xff0c;https证书可以提供加密保护&…

《RISC-V体系结构编程与实践》的benos_payload程序——mysbi跳转到benos分析

1、benos_payload.bin结构分析 韦东山老师提供的开发文档里已经对程序的结构做了分析&#xff0c;这里不再赘述&#xff0c;下面是讨论mysbi跳转到benos的问题&#xff1b; 2、mysbi跳转到benos的代码 3、跳转产生的疑问 我认为mysbi.bin最后跳转到0x22000地址处执行&#xff0…

如何防止内部员工数据外泄?

首先&#xff0c;数据对于企业的价值和意义无需多说&#xff0c;数据价值的发挥和利用以数据安全为基础。当数据创造价值的同时&#xff0c;也面临着被窃取泄露、滥用、非法利用的风险&#xff0c;进而对个人、组织甚至整个社会、国家的利益产生严重威胁和损害。近年来&#xf…

苹果手机备份软件哪个好用?有哪些免费的第三方备份软件

备份手机数据一直是一个让人头疼的问题&#xff0c;尤其对于iPhone用户来说。尽管iCloud和iTunes提供了方便的备份选项&#xff0c;但是有时候&#xff0c;我们可能需要更多高级功能或者更直观的操作界面。本文将介绍几款好用的苹果手机备份软件。 一、主流苹果备份软件 1.iClo…

ORA-48140: the specified ADR Base directory does not exist?手动创建pfile启动失败

1.描述问题 ORA-48108: invalid value given for the diagnostic_dest init.ora parameterORA-48140: the specified ADR Base directory does not exist [/u01/app/oracle/product/11.2.0/db_1/dbs/<ORACLE_BASE>]ORA-48187: specified directory does not exist Linux-…