一、组件
1. JobManager
作业管理器是一个 Flink 集群中任务管理和调度的核心,是控制应用执行的主进程
1.1 JobMaster
- JobMaster 是 JobManager 中最核心的组件,负责处理单独的作业(Job)。JobMaster 和具体的 Job 是一一对应的,多个 Job 可以同时运行在一个 Flink 集群中, 每个 Job 都有一个自己的 JobMaster
- 在作业提交时,JobMaster 会先接收到要执行的应用,即客户端提交来的 Jar 包、数据流图 (dataflow graph) 和作业图 (JobGraph);然后 JobMaster 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作执行图 (ExecutionGraph),它包含了所有可以并发执行的任务;接着 JobMaster 会向资源管理器 (ResourceManager) 发出请求,申请执行任务必要的资源,一旦获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager 上。而在运行过程中, JobMaster 会负责所有需要中央协调的操作,比如检查点 (checkpoints) 的协调
1.2 ResourceManager
资源管理器
- ResourceManager 主要负责资源的分配和管理,在 Flink 集群中只有一个
- 资源是指 TaskManager 的任务槽(task slots)。任务槽就是 Flink 集群中的资源调配单元,包含了机器用来执行计算的一组 CPU 和内存资源。每一个任务(Task)都需要分配到一个 slot 上执行
- Flink 的 ResourceManager,针对不同的环境和资源管理平台有不同的具体实现
- 在 Standalone 部署时,因为 TaskManager 是单独启动的(没有 Per-Job 模式),所以 ResourceManager 只能分发可用 TaskManager 的任务槽,不能单独启动新 TaskManager
- 在有资源管理平台 (如 Yarn) 时,当新的作业申请资源,ResourceManager 会将有空闲槽位的 TaskManager 分配给 JobMaster。如果 ResourceManager 没有足够的任务槽,它可以向资源提供平台发起会话,请求提供启动新 TaskManager 进程的容器。另外,ResourceManager 还负责停掉空闲的 TaskManager,释放计算资源
1.3 Dispatcher
分发器
- Dispatcher 主要负责提供一个 REST 接口,用来提交应用,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件
- Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息
- Dispatcher 在架构中并不是必需的,在不同的部署模式下可能会被忽略掉
2. TaskManager
任务管理器
- TaskManager 是 Flink 中的工作进程,负责数据流的具体计算,所以也被称为 Worker。Flink 集群中必须至少有一个 TaskManager;在分布式系统中,通常会有多个 TaskManager 运行,每一个 TaskManager 都包含了一定数量的任务槽(task slots)
- Slot 是资源调度的最小单位,slot 的数量限制了 TaskManager 能够并行处理的任务数量。启动之后,TaskManager 会向 ResourceManager 注册它的 slots;收到 ResourceManager 的指令后,TaskManager 就会将一个或者多个槽位提供给 JobMaster 调用, JobMaster 就可以分配任务来执行了
- 在执行过程中,TaskManager 可以缓冲数据,还可以跟其他运行同一应用的 TaskManager 交换数据
二、任务提交流程
1. 整体抽象流程
- 一般情况下,由客户端(App)通过分发器提供的 REST 接口,将作业提交给 JobManager
- 由分发器启动 JobMaster,并将作业(包含 JobGraph)提交给 JobMaster
- JobMaster 将 JobGraph 解析为可执行的 ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源(slots)
- 资源管理器判断当前是否由足够的可用资源;如果没有,启动新的 TaskManager
- TaskManager 启动之后,向 ResourceManager 注册自己的可用任务槽(slots)
- 资源管理器通知 TaskManager 为新的作业提供 slots
- TaskManager 连接到对应的 JobMaster,提供 slots
- JobMaster 将需要执行的任务分发给 TaskManager
- TaskManager 执行任务,互相之间可以交换数据
2. Flink on Yarn 任务提交流程
2.1 会话模式任务提交流程
- 先启动一个 YARN session,启动 JobManager,此时只有 ResourceManager 和 Dispatcher 在运行
- 客户端将 flink jar 包和相关配置上传到 HDFS
- 客户端通过 REST 接口,将作业提交给分发器
- 分发器启动 JobMaster,并将作业(包含 JobGraph)提交给 JobMaster
- JobMaster 向资源管理器请求资源(slots)
- 资源管理器向 YARN 的资源管理器请求 container 资源
- YARN 启动新的 TaskManager 容器
- TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽
- 资源管理器通知 TaskManager 为新的作业提供 slots
- TaskManager 连接到对应的 JobMaster,提供 slots
- JobMaster 将需要执行的任务分发给 TaskManager,执行任务
2.2 单作业模式任务提交流程
- 客户端将作业提交给 YARN 的资源管理器,这一步中会同时将 Flink 的 Jar 包和配置上传到 HDFS,以便后续启动 Flink 相关组件的容器
- YARN 的资源管理器分配 Container 资源,启动 Flink JobManager,并将作业提交给JobMaster。这里省略了 Dispatcher 组件
- JobMaster 向资源管理器请求资源(slots)
- 资源管理器向 YARN 的资源管理器请求 container 资源
- YARN 启动新的 TaskManager 容器
- TaskManager 启动之后,向 Flink 的资源管理器注册自己的可用任务槽
- 资源管理器通知 TaskManager 为新的作业提供 slots
- TaskManager 连接到对应的 JobMaster,提供 slots
- JobMaster 将需要执行的任务分发给 TaskManager,执行任务
三、任务调度原理
1. 整体调度过程
- Flink 代码在被提交执行后首先经过优化器和图生成器会生成数据流图
- Flink Client 的 ActorSystem 创建 Actor 将数据流图发送给 JobManager 中的 Actor
- JobManager 会不断接收 TaskManager 的心跳消息,从而可以获取到有效的 TaskManager
- JobManager 通过调度器在 TaskManager 中调度执行 Task (Task 对应一个线程)
- 在程序运行过程中,Task 与 Task 之间可以进行数据传输
1.1 Job Client
- 主要职责是提交任务, 提交后可以结束进程, 也可以等待结果返回
- Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点
- Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。执行完成后,Job Client 将结果返回给用户
1.2 JobManager
- 主要职责是调度工作并协调任务做检查点
- 集群中至少要有一个 master,master 负责调度 task,协调checkpoints 和容错
- 高可用设置的话可以有多个 master,但要保证一个是 leader,其他是standby
- JobManager 包含 Actor System、Scheduler、CheckPoint 三个重要的组件
- JobManager 从客户端接收到任务以后, 首先生成优化过的执行计划, 再调度到 TaskManager 中执行
1.3 TaskManager
- 主要职责是从 JobManager 处接收任务, 并部署和启动任务, 接收上游的数据并处理
- TaskManager 是在 JVM 中的一个或多个线程中执行任务的工作节点
- TaskManager 在创建之初就设置好了 Slot, 每个 Slot 可以执行一个任务
2. 相关概念
2.1 数据流图
Dataflow Graph,Flink 程序中所有算子按照逻辑顺序连接在一起的一张图,由 Source、Transformation、Sink 三部分组成,以一个或多个 Source 开始以一个或多个 Sink 结束,类似 Spark 的 DAG
- Source:数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、RabbitMQ 等
- Transformation:数据转换的各种操作,有
Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select
等 - Sink:接收器,Flink 将转换计算后的数据发送的地点 ,Flink 常见的 Sink 有:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等
2.2 并行子任务与并行度
-
并行子任务 (Subtask):一个算子操作可以 “复制” 成多份分布到不同的节点去运行,每个节点所运行的任务称为该算子的一个并行子任务
-
并行度 (Parallelism):
- 针对数据流图中的每一步操作而言,一个算子操作的并行子任务个数称之为它的并行度 (Parallelism)
- 针对整个数据流图而言,它的所有算子操作中的最大并行度称之为整个 Stream 的并行度
-
并行度的设置:
-
Flink 代码中设置:
//Flink环境对象调用setParallelism(n)方法设置整个程序全局的并行度 StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(2) //每个算子操作调用setParallelism(n)方法设置当前算子的并行度 dataStream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
-
提交 Flink 应用时设置
#命令行使用 -p 设置并行度 bin/flink run –p 2 #WebUI中在提交应用的配置框中填写并行度
-
配置文件中设置,对整个集群生效
vim flink-conf.yaml parallelism.default: 2
-
2.3 算子链
-
不同算子之间的数据传输方式:
- 一对一(One-to-one,forwarding):类似 Spark 的窄依赖,从上游向下游进行数据传输不会改变数据的分区和顺序。例如:source、map、 filter、 flatMap 等算子之间的数据传输
- 重分区(Redistributing):类似 Spark 的宽依赖和 Shuffle 过程 (重分区好比发牌,shuffle好比洗牌),上游的数据会根据不同的策略 (基于 key hash 值、broadcast、rebalance轮询以及完全随机) 传输到不同的下游中,会造成数据分区和顺序的改变。例如:map 和 keyBy 之间的数据传输、window 和 Sink 之间的数据传输
-
算子链 (Operator Chain):并行度相同、同一个 slot 共享组且数据传输方式为 one-to-one 的算子们可以合并成为一个算子链,形成一个 Task 由一个线程执行
-
设置:
//全局禁用算子链 env.disableOperatorChaining(); //禁用算子链 .map(word -> Tuple2.of(word, 1L)).disableChaining(); //从当前算子开始新链 .map(word -> Tuple2.of(word, 1L)).startNewChain()
2.4 Task Slot 和槽共享
-
Flink 的每一个任务 (Task) 需要一个线程来执行;TaskManager 是一个 JVM 进程,在其中可以启动多个独立线程来执行任务
-
为了控制一个 TaskManager 能接收多少个 Task,通过 Task Slot 对每个任务运行所占用的资源
做出明确的划分,一个 TaskManager 至少有一个 Task Slot -
Task Slot:在 TaskManager 上拥有计算资源的一个固定大小的子集,一个 TaskManager 上的所有 Task Slot 会均分整个内存,所以任务之间不受影响
-
Task Slot 配置:
vim flink-conf.yaml taskmanager.numberOfTaskSlots: 8 #由于slot之间不会涉及 CPU 的隔离,所以可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争
-
槽共享:默认情况下,同一个作业的不同任务节点的子任务可以在同一个 Task Slot 上执行,实现槽共享。但同一个任务节点的并行子任务必须独立占据一个 Task Slot 执行
-
通过设置 “slot 共享组” (SlotSharingGroup) 可以让某个算子对应的任务完全独占一个 slot
//共享组名称自定义,不设置则与前一个算子同属一个共享组,默认是default .map(word -> Tuple2.of(word, 1L)).slotSharingGroup("1"); //此时,整个作业总共需要的 slot 数量,就是各个 slot 共享组最大并行度的总和
-
并行度与 Task Slot:并行度是程序运行时实际使用的并发线程资源;Task Slot 是整个 TaskManager 总共可用的并发线程资源
2.5 执行流程图转换
- Flink 中执行流程图转换可以分为:
StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图
- StreamGraph:逻辑流图或数据流图,它是根据用户通过 Stream API 编写的代码生成的最初的执行图,用来表示程序的拓扑结构
- JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化是将多个符合条件的节点 chain 在一起作为一个节点
- ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构
- 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的 “图”,并不是一个具体的数据结构