主要回答以下问题:
- Flink集群是由哪些组件组成的?它们彼此之间如何协调工作的?
- 在Flink中job, task, slots,parallelism是什么意思?集群中的资源是如何调度和分配的?
- 如何搭建一个Flink集群?如何配置高可用服务?如何使用外部文件系统?
Flink系统架构
Flink的核心组件包含客户端,jobmanager(JM)和taskmanager™三部分。此外Flink往往还需要结合很多外部组件一起使用,比如高可用服务、持久化存储、资源管理、指标存储与分析的组件。
Flink客户端主要负责将job提交给JM。JM是中央调度器,包含Jobmaster, Dispatcher, ResourceManager三部分。JobMaster is responsible for managing the execution of a single JobGraph. Multiple jobs can run simultaneously in a Flink cluster, each having its own JobMaster. The Dispatcher provides a REST interface to submit Flink applications for execution and starts a new JobMaster for each submitted job. It also runs the Flink WebUI to provide information about job executions. The ResourceManager is responsible for resource de-/allocation and provisioning in a Flink cluster — it manages task slots, which are the unit of resource scheduling in a Flink cluster. TM负责执行具体的任务。
如果只是提交作业和执行作业,不考虑整个集群的稳定性,拓展性,便于维护的性能等,只部署以上三个组件就够了。
但是,如果TM done掉了,JM还可以控制任务重启在其它TM上;如果JM done掉了,所有的任务都将失败,因此我们需要部署高可用服务使得一个JM done掉后,备用的JM 自动地顶上去作业。Flink目前(1.16)仅支持两种高可用服务:Zookeeper HA service 和 K8s HA service.
Flink有故障恢复的机制在任务失败后重启任务,并读取任务失败前的状态在这个状态下继续工作,可以保证哪怕任务失败重启,数据也不丢失,不重发。而这个“任务失败前的状态”是通过checkpoint保存的,考虑到多个JM需要共享checkpoint,checkpoint往往保存在可共享的持久化外部存储系统中,比如HDFS,S3等。因此我们还需要部署文件存储系统。
再说集群的资源管理和调度,Flink支持k8s和YARN两种工具来自动化管理集群资源,也可以不依赖于任何Resource Provider,采用独立部署(standalone)方式部署集群。
再说集群的监控,Flink本身收集了很多指标,可以通过metrics reporter与外部的指标存储、分析、展示工具一起搭建一个Flink监控系统。比如联合Prometheus, grafana搭建监控系统。
Flink的作业执行机制
在讲解Flink不同的部署方式以及不同部署方式下各组件如何协调工作前,我认为很有必要讲解一下Flink的作业执行机制,便于理解之后会反复提到的JobGraph,task, slots等概念。
DataFlows和Operator
程序运行时会被映射为dataflows,每个数据流都是以一个或多个sources开始,一个或多个sinks结束,类似于任意的有向无环图。大多数情况下,程序中的转换运算和dataflow中的算子(operator)是一一对应的关系。
比如下图中的程序就可以转化为由source,map算子,分组聚合算子,sink组成的数据流。
并行计算和并行度(Parallelism)
任务并行:不同的任务(算子)并行处理不同的数据,数据流图中横向的同时执行。
数据并行:一个算子可以包含一个或多个子任务,这些子任务在不同的线程、不同的物理机或容器中完全独立地执行。
并行度:一个特定算子的子任务个数,指的数据并行。有些像多线程的线程数,但和多线程不一样的是,多线程的子线程共享内存资源,但是一个算子的子任务运行在不同的slot上,内存资源是隔离的。注意并行度针对的是算子,不同的算子可以设置为不同的并行度。
并行度的设置:
//全局,不推荐
env.setParallelism(1);
//每一个算子
source.map(...).setParallelism(1)
并行度的执行规则:底层实现>代码局部>代码全局设置>提交任务时的命令行设置>配置文件的默认设置
算子链
上面介绍了数据流图,算子,并行度的概念,再来说什么是算子链。
Flink中算子与算子之间的数据传输形式大体可以分为以下两类:
-
one-to-one(forwarding) 直通:
从一个算子到另一个算子的分区不变,比如source和map之间,这代表着map算子的子任务看到的元素的个数和顺序和source算子的子任务产生的相同。map,filter,flatMap都属于这种(前提是并行度不变)
-
redistributing(重分配):
stream的分区会发生改变,如keyby.
如果前后两个算子并行度相同,且传输方式为one-to-one就可以合并为一个算子链。通常我们说的task就是指的一个算子链,subtask往往指的同一算子链的子任务。
算子合并为算子链是作业执行中很重要的一个优化手段,是否合并是可以通过代码控制的,在作业的性能调优中也是一个可以考虑的调优点。
Flink中之所以合并算子主要考虑的是减少算子之间不必要的数据传输,因为在flink中,不同任务之间的数据传输带来的性能开销其实并不小,一是数据传输必然涉及到序列化和反序列,要是一条数据很大,又选择了不合适的数据类型比如json,那带来的性能损耗是非常明显的;二是如果任务处于不同的taskmanager,那数据传输还涉及到网络传输。另外合并算子也减少了整个job的线程数,能够减少线程转化的开销。
需要注意的是,合并算子并不一定能带来性能提升的,因为算子合并其实相当于减少了并发,可能会影响CPU利用率,可以参考多线程的线程数考虑这一点。
执行图(ExecutionGraph)
相关概念介绍完后,简单介绍一下(很多细节还未搞明白,但尚不影响使用)一个Flink作业是如何一步步转化为Taskmanager上可以执行的task的。下面的描述主要针对Session部署方式,对于Application部署模式之后再介绍。
首先,客户端会将代码转化为dataflow,dataflow进一步优化如合并算子链后生成JobGraph。
然后,JM对JobGraph根据并行度进行拆分生成执行图,
最后JM会分发执行图到taskmanager上,实际执行的叫物理执行图。
Flink的资源分配和调度
slots是Flink中资源分配的最小单位。Flink对内存资源是进行了隔离的,隔离出来的每一份资源叫一个slot。每个TM通过参数taskmanager.numberOfTaskSlots配置slots的数量。建议根据核的数量分配任务槽,这样一个任务槽就一个cpu核,cpu就不需要分时复用了。默认slots平分整个TM的内存资源,Flink也支持细粒度地划分slots的资源。
需要配置cluster.fine-grained-resource-management.enabled为true
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SlotSharingGroup ssgA = SlotSharingGroup.newBuilder("a")
.setCpuCores(1.0)
.setTaskHeapMemoryMB(100)
.build();
SlotSharingGroup ssgB = SlotSharingGroup.newBuilder("b")
.setCpuCores(0.5)
.setTaskHeapMemoryMB(100)
.build();
someStream.filter(...).slotSharingGroup("a") // Set the slot sharing group with name “a”
.map(...).slotSharingGroup(ssgB); // Directly set the slot sharing group with name and resource.
env.registerSlotSharingGroup(ssgA); // Then register the resource of group “a”
上面讲的是资源的分配,再讲资源的调度:不同的task如何分配到slots上面。 主要遵守下面两个原则: 同一个任务的不同子任务只能分配到不同的slots上;多个任务可以共享slot。以上图为例,一共3个算子链,并行度分别为6,6,1,每个算子链在slots上依次分配,同一个Job的不同算子链共享slot的。
基于这样的资源调度规则,就不难理解“一个job需要的任务槽的数量至少为算子链的最大并行度“。像上面的示例,需要的任务槽数量就是6。
为什么slots可以共享?不同的task资源完全隔离不好吗?这里主要是从提高资源的利用率考虑的,希望各个内存区域的使用相对均衡,而不是忙的忙死闲的闲死。
Flink的部署
Flink提供了3种部署模式:
- 会话模式(Session Mode)
- 单作业模式(Per-Job Mode)
- 应用模式(Application Mode)
它们的区别主要在于:集群的生命周期以及资源的分配方式;应用的Main方法在哪里执行——客户端还是JobManager。 其中Per-Job模式在1.15版本后已经废弃,就不再介绍了。
会话模式
先启动集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。一个任务导致集群崩溃会牵连其他所有任务。
会话模式适合单个规模小、执行时间短的大量作业。(因为执行时间短,所以单个作业占用的资源很快能释放掉给下一个作业使用,不需要反复启动集群,反复部署资源)
应用模式
应用模式是提交任务的同时启动集群,一个应用一个集群,应用在集群在,应用亡集群自动关闭。此外,应用模式的另一个显著特点是应用的main方法执行在JM,而不是客户端。这样做是为了减轻客户端的负载,避免当多个用户同时提交任务时客户端宕机。
那么,main方法的执行为什么会带来较大的负载呢?执行main方法首先需要下载相关的依赖,还需要抽取拓扑结构(比如JobGraph)便于后续的处理。客户端执行完后还需要把这些都传输给JM。这就使得客户端一是需要格外的网络带宽下载依赖,传输数据给JM; 二是消耗更多的CPU。因此application模式把这部分的工作放在了JM上。
官方推荐在产线上使用应用模式,在测试开发中使用会话模式。
参考资料
- official document
- B站视频
- 尚硅谷课程教材