Flink之Task解析

news2025/1/22 19:55:33

Flink之Task解析

  对Flink的Task进行解析前,我们首先要清楚几个角色TaskManagerSlotTaskSubtaskTaskChain分别是什么

角色注释
TaskManager在Flink中TaskManager就是一个管理task的进程,每个节点只有一个TaskManager
SlotSlot就是TaskManager中的槽位,一个TaskManager中可以存在多个槽位,取决于服务器资源和用户配置,可以在槽位中运行Task实例
Task其实Task在Flink中就是一个类,其中可以包含一个或多个算子,这个取决于算子链的构成
SubTaskSubTask就是Task类的并行实例可以是一个或多个,也就是说当代码执行的那一刻开始,就根据用户所设置或者默认的并行度创建出多个SubTask
TaskChainTaskChain就是算子链,何为算子链?就是在一个Task实例中出现的串行算子,算子间必须是OneToOne模式且并行度相同.

  上面对几个角色进行了一个简单的阐述,后面会结合图解和伪代码进行讲解,这里我们以计算中比较经典wordcount为例子,伪代码如下所示:

public class FLinkWordCount {
    public static void main(String[] args) throws Exception {
        // 创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\
        // 设置并行度3
        env.setParallelism(3)
        // 读取数据文件
        DataStreamSource<String> streamSource = env.readTextFile("xxx");
        // 转大写
        DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())
        // 转成tuple2格式,计数1
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));
        // 按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);
        // 求和
        keyed.sum("f1")
        env.execute();
    }
}

  上面的代码中我们使用了两次map,一次keyBy,一次sum算子,我们下面就结合这几个算子进行讲解,讲解之前有两个条件需要先记住:

  • 同一个Task并行实例不能放在同一个TaskSlot上运行,一个TaskSlot上可以运行多个不同的Task并行实例
  • 同一个共享组的算子允许共享槽位,不同共享组的算子决不允许共享槽位

  上面这两句话一定要记牢,以便于后面的理解.

算子链划分及Task槽位分配

算子链划分

可以根据上面的代码理解下图:
在这里插入图片描述

上图中我们可以看到两个map组成一个task chain,keyBysum组成一个task chain,这里说一下原因,首先就是两个map的并行度是一致的,而且是OneToOne模式,所以可以将两个map绑定成一个算子链,并将其放入到一个SubTask中,而到了keyBy这里为什么不能再放入到一个task chain中,这里我们可以思考一下,keyBy时会发生什么?以spark的角度来说会发生shuffle对吧,这就导致了不能满足OneToOne的模式,简单来说我们也可以想清楚,如果keyBymap组成一个task chain那么还怎么做wordcount?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bckucHbv-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task2.png)]

通过上图应该很容易理解了.

Task槽位分配

  上面讲了关于task chain怎么划分的,为什么这样划分,这里讲一下为什么同一个Task的并行实例(SubTask)不能在同一个task slot中.其实这个也很容易就想清楚,如果同一Task的多个SubTask都出现在一个task slot中那么还有什么意义呢?当这些SubTask出现在一个task slot中时就会发生串行计算,那并行的意义也就没有了.

  同时这种机制也保证了任务的容错性,也就是说对于同一个Task一旦某一个task slot出现异常的情况,其他的task slot中的SubTask还能正常运行,如果将这些SubTask放到一个task slot中,当这个task slot出现异常情况时,就会影响整个任务的执行.

  总结来说,这种设计保证了Flink任务的隔离性、容错性、资源利用性.这里用图解的方式便于大家记忆,如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rlgqeo6A-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task3.png)]

槽位共享及算子链断/连

槽位共享

  前面讲过同一个Task的多个SubTask不能出现在一个task slot中,但是不同TaskSubTask是可以共享同一个task slot的,但是在Flink中有一个机制,就是用户(开发人员)可以自定义不同的算子间是否可以共享同一个task slot,如上面的例子中两个map的并行度一致并且符合OneToOne的模式,在正常情况下必然会会分到一个task chain中,但是Flink给用户提供了的slot group的概念,也就是说用户可以将这两个map分配到不同的slot group中,这种情况下两个map就不会划分到一个task chain中,试想一下当两个map都不允许共享同一个task slot时,怎么可能划分到同一个task chain中呢?

  伪代码如下:

public class FLinkWordCount {
    public static void main(String[] args) throws Exception {
        // 创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\
        // 设置并行度3
        env.setParallelism(1)
        // 读取数据文件
        DataStreamSource<String> streamSource = env.readTextFile("xxx");
        // 转大写
        DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())
        // 通过slotSharingGroup()将upperCaseSource作为一个分组"g1"
        SingleOutputStreamOperator<String> slotGroup1 = upperCaseSource.slotSharingGroup("g1");
          
        // 转成tuple2格式,计数1
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));
        // 通过slotSharingGroup()将mapStream作为一个分组"g3"
        SingleOutputStreamOperator<Tuple2<String, Integer>> slotGroup2 = mapStream.slotSharingGroup("g2");
        // 按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);
        // 求和
        keyed.sum("f1")
        env.execute();
    }
}

上面的代码中我们将upperCaseSourcemapStream分成了两个task slot,这样两个map就不可以共享相同的task slot,同时代码中将并行度改为了1,这样便于图解,如下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-saLgMu0Q-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task4.png)]
如果说集群中总task slot只有3个,并且在代码中两个map设置了不同的task slot且两个map的并行度都为3时会怎么样?很简单,提交任务时就会报错,因为提交任务所需要的资源已经超出了集群的资源.

  这里说一下对于对task slot进行分组处理的实际用处,就以代码中两个map为例子,在实际的业务中如果两个map处理的数据量都极大,如果将两个map的计算都放到一个节点的一个task slot时会发生什么?数据的积压、任务异常失败等等都有可能发生,但是有slotSharingGroup我们就可以保证同一个task slot不会承载过大的计算任务,也就达到了资源合理分配的目的.

算子链断/连

  前面讲了关于将两个map进行slotSharingGroup后会将两个map划分到不同的task chain,如果有这样一个情况两个map满足OneToOne的模式且并行度相同时,我们不使用slotSharingGroup能否将两个map划分成不同的task chain?答案是当然可以的,Flink为我们提供了对应的API,伪代码如下:

public class FLinkWordCount {
    public static void main(String[] args) throws Exception {
        // 创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();\
        // 设置并行度3
        env.setParallelism(3)
        // 读取数据文件
        DataStreamSource<String> streamSource = env.readTextFile("xxx");
        // 转大写
        DataStreamSource<String> upperCaseSource = streamSource.map(word -> word.toUpperCase())
  
        // 转成tuple2格式,计数1
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = upperCaseSource.map(word -> Tuple2.of(word, 1));
        // 将mapStream划分到一个新的task chain中
        SingleOutputStreamOperator<Tuple2<String, Integer>> newTaskChainMapStream = mapStream.startNewChain();
        // 按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyed = mapStream.keyBy(tup -> tup.f0);
        // 求和
        keyed.sum("f1")
        env.execute();
    }
}

在上面代码中我们调用了startNewChain()后就可以将mapStream划分到一个新的task chain中,这样的情况下,两个map既属于不同的task chain又可以共享同一个task slot,如下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jOIlz8uH-1692099761760)(/Users/jinlong/data/Typora_WorkSpase/FlinkTask/task5.png)]
以上就是对于Task的讲解,如有错误欢迎指出,如有问题共同探讨.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/883480.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue2-配置脚手架、分析脚手架、render函数、ref属性、props配置项、mixin配置项、scoped样式、插件

&#x1f954;:总有一段付出了没有回报的日子 是在扎根 更多Vue知识请点击——Vue.js VUE2-Day6 配置脚手架脚手架结构render函数vue.js与vue.runtime.xxx.js的区别引入render函数为什么要引入残缺的vue呢&#xff1f; 脚手架默认配置ref属性props配置项传递数据接收数据注意点…

elementui form组件出现英文提示

今天让解决一个bug&#xff0c;是表单组件提示词会出现英文。 问题情景如下&#xff1a; 有时会出现中文&#xff0c;有时会出现英文。 解决方法&#xff1a; 经查看&#xff0c;代码采用的是elementui的form组件&#xff0c;在el-form-item中使用了required属性&#xff0c;同…

企业权限管理(十)-用户详情

用户详情 UserController findById方法 Controller RequestMapping("/user") public class UserController {Autowiredprivate IUserService userService;//查询指定id的用户RequestMapping("/findById.do")public ModelAndView findById(String id) thro…

Python面向对象进阶教程,Python面向对象进阶知识笔记

类方法、静态方法 1. 类方法 第一个形参是类对象的方法需要用装饰器classmethod来标识其为类方法&#xff0c;对于类方法&#xff0c;第一个参数必须是类对象&#xff0c;一般以cls作为第一个参数。 class Dog(object): __type "狗" # 类方法&#xff0c;用class…

数据结构中公式前中后缀表达式-二叉树应用

目录 数据结构中公式前中后缀表达式-二叉树应用 数据结构中公式前中后缀表达式-二叉树应用 什么是前缀表达式、中缀表达式、后缀表达式 前缀表达式、中缀表达式、后缀表达式&#xff0c;是通过树来存储和计算表达式的三种不同方式 以如下公式为例 通过树来存储该公式&#x…

Avalonia 11 WebAssembly中文乱码

文章目录 0x00 原因0x01 解决方法FontForge 0x02 使用自定义字体App.axaml控件使用效果 0x00 原因 新建的Avalonia 11 WebAssembly项目&#xff0c;直接运行的话&#xff0c;会发现中文都是乱码&#xff0c;并且直接在控件上修改FontFamily属性是无法生效的。 0x01 解决方法…

MySQL学习笔记 - 进阶部分

MySQL进阶部分 字符集的相关操作&#xff1a;字符集和比较规则&#xff1a;utf8与utf8mb4&#xff1a;比较规则&#xff1a;常见的字符集和对应的Maxlen&#xff1a; Centos7中linux下配置字符集&#xff1a;各个级别的字符集&#xff1a;执行show variables like %character%语…

matlab画图中多个图例分开绘制

在matlab绘图中&#xff0c;线条较多时导致图例较长回遮挡原图/将图例分类&#xff0c;解决方案将图例分为多个。 一、多个图例一起显示 r 10; a 0; b 0; t0:0.1:2.1*pi; xar*cos(t); ybr*sin(t); plot(x,y,r,linewidth,4);hold on axis equal plot([0 0],[1 10],b,linewi…

双碳目标下基于“遥感+”多技术融合在碳储量、碳排放、碳循环、温室气体等领域应用教程

详情点击链接&#xff1a;双碳目标下基于“遥感”多技术融合在碳储量、碳排放、碳循环、温室气体等领域应用教程 一&#xff1a;双碳视角下遥感技术的研究方向 1.双碳背景及遥感的现实需求 2.全球碳库、碳收支及碳循环现状 3.碳储量、碳收支与碳循环中的遥感技术 4.ENVI及ArcG…

海龟绘图——n个正方形组成的图案

运行结果&#xff1a; 代码&#xff1a; import turtle# 创建海龟对象 nint(input()) t turtle.Turtle()# 设置海龟的颜色和线条粗细 t.color(blue) t.pensize(3)# 画四条直线lengths10 for j in range(n):for i in range(4):lengths20t.forward(lengths)# 旋转90度t.left(90)…

数据治理:打造可信赖的BI环境

章节一&#xff1a;引言 随着信息时代的不断发展&#xff0c;数据已经成为企业决策的重要支撑。而在大数据时代&#xff0c;海量的数据需要被整理、分析&#xff0c;以便为企业提供正确的指导。商业智能&#xff08;BI&#xff09;系统的兴起为企业提供了强大的数据分析能力&am…

利用Lifecycle,管理一个计时器生命周期

Lifecycle是Android Jetpack中的一个组件&#xff0c;用于管理Android应用程序组件&#xff08;如Activity或Fragment&#xff09;的生命周期。它可以帮助开发者在不同的生命周期阶段执行特定的操作&#xff0c;以便更好地管理资源、处理数据和提供用户体验。 Lifecycle作用 …

Python程序设计——对象和类

学习目标 描述对象和类&#xff0c;以及使用类来建模对象定义带数据域和方法的类使用构造方法调用初始化程序来创建和初始化数据域以构建一个对象使用圆点运算符(.)访问对象成员使用self参数引用对象本身使用UML图符号来描述类和对象区分不可变对象和可变对象隐藏数据域以避免数…

应届生运维简历攻略

导语&#xff1a; 当下&#xff0c;计算机科学与技术已经成为一个炙手可热的行业&#xff0c;而作为这个行业中的一份子&#xff0c;运维人员的角色无疑至关重要。如果你是一位即将毕业的应届生&#xff0c;并希望在运维领域打拼&#xff0c;那么一份出色的运维简历将是你踏入…

PS常用快捷按键

1、Ctrl J 键复制&#xff08;快速复制图层&#xff0c;作为备份&#xff09;&#xff1b; 2、快速选择对象&#xff0c;进行移动ctrl 右键 3、放大ctrl 缩小ctrl 4、对同一个图片的多个不同颜色的图片进行截取的时候&#xff0c;注意每次都用同一个切图框&#xff0c;截图保…

【C++】stack容器

1.stack基本概念 英stk 美stk n.&#xff08;整齐的&#xff09;一堆&#xff1b;<英> 垛&#xff0c;堆&#xff1b;大量&#xff0c;许多&#xff1b;&#xff08;尤指工厂的&#xff09;大烟囱&#xff1b;&#xff08;图书馆的&#xff09;藏书架&#xff0c;双面书架…

Redis实现共享Session

Redis实现共享Session 分布式系统中&#xff0c;sessiong共享有很多的解决方案&#xff0c;其中托管到缓存中应该是最常用的方案之一。 1、引入依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM…

Linux:shell脚本 正则表达式与AWK

一、正则表达式 由一类特殊字符及文本字符所编写的模式&#xff0c;其中有些字符&#xff08;元字符&#xff09;不表示字符字面意义&#xff0c;而表示控制或通配的功能&#xff0c;类似于增强版的通配符功能&#xff0c;但与通配符不同&#xff0c;通配符功能是用来处理文件…

C语言入门 Day_3 整数和变量

目录 1.整型 2.变量 3.易错点 4.思维导图 前言&#xff1a; 昨天的课程里面&#xff0c;我们学会了使用printf()打印一行字母&#xff0c;比如 printf("Hello World!\n"); 那么编程中用来表示&#xfeff;数字的是什么类型呢&#xff1f; 接下来我们学习一下…

实战指南,SpringBoot + Mybatis 如何对接多数据源

系列文章目录 MyBatis缓存原理 Mybatis plugin 的使用及原理 MyBatisSpringboot 启动到SQL执行全流程 数据库操作不再困难&#xff0c;MyBatis动态Sql标签解析 从零开始&#xff0c;手把手教你搭建Spring Boot后台工程并说明 Spring框架与SpringBoot的关联与区别 Spring监听器…