ask是flink中的一个逻辑概念,一个任务由一个或者多个算子组合而成(多个算子构成一个任务是需要满足一定的条件才可以,有兴趣的老铁可以来了解一下 Operator Chain),为了提升任务执行的效率,可以对任务配置并行度,使任务在实际运行过程中并行执行,此时该任务的多个并行任务被称为子任务(subTask)。如下图:每个虚线框是一个任务,框里的圆是子任务。
总结来说:Task是逻辑概念,subTask是实际运行的实例,一个Task的subTask个数就是上面说的并行度。在上图中Task有3个,subTask有6个。
Slot
Slot是flink集群中资源分配的基本单位,slot主要分布在TaskManager中,了解flink架构的老铁都知道,TaskManager是一个jvm进程,是subTask运行的地方。
当TaskManager启动的时候会将自己的资源以Slot的方式注册到ResourceManager,然后JobManager从ResourceManager处申请到Slot资源之后,会将subTask调度到这些Slot上面去运行,在整个过程中sub task是调度的基本单元,Slot则是资源分配的基本单元。
这里需要说明一下:slot之间内存隔离,cpu不隔离,也就是内存是独立的,但是cpu是共享的。
Task并行度和slot个数之间的关系是怎样的?
一个Task的并行度,要求该任务有指定个数个subTask并行执行,所以要求每个subTask运行在不同的solt中,因此slot的个数不能小于任务的并行度。
slot sharing所表达的意思是 slot共享,但是这里需要注意的是,共享slot的subTask需要满足一下条件:
1.subTask必须来自同一个job,以为不同job之间资源是隔离的,TaskManager都是隔离,更不用说TaskManager上的slot了。
2.subTask必须来自不同Task,同一个Task的subTask不用共享一个slot,否自就失去了并行度的意义了。
如果slot不共享,每个sub task都运行在独立的slot上会发生什么?
我们知道一个job的DAG中,不同Task对资源的消耗是不同的,如果都均分slot,那必然有些资源利用率高,有些低。限制不同Task的sub task共享可以尽量让资源占用高的和资源占用低的放一起,这样可以是资源得到重复利用,否则占用资源少的sub task运行完后,给其分配的slot就闲置了。
除此之外,slot sharing也降低了一个job运行对资源依赖的门槛,如果每个slot sharing,那么一个job运行需要的slot个数和job中所有的sub task个数相同,而有了slot sharing,需要的slot个数,取决于DAG中所有Task中最大的并行度个数。
在有slot sharing的场景下,上文中的应用程序只需要2个slot即可:
没有slot sharing的场景下,则需要6个slot:
将算子链接成 task 是个有用的优化:
- 它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。
- 链行为是可以配置的;将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。
Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作)。 此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:
如果想对整个作业禁用算子链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注 意的是, 这些方法只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。例如,可以 someStream.map(...).startNewChain() 这样调用,而不能 someStream.startNewChain()这样。
一个资源组对应着 Flink 中的一个 slot 槽, 你可以根据需要手动地将各个算子隔离到不同的 slot 中。
Start new chain | 以当前 operator 为起点开始新的连接。如下的两个 mapper 算子会链接在一起而 filter 算子则不会和第一个 mapper 算子进行链接。someStream.filter(...).map(...).startNewChain().map(...); | |
Disable chaining |
| |
Set slot sharing group | 配置算子的资源组。Flink 会将相同资源组的算子放置到同一个 slot 槽中执行,并将不同资源组的算子分配到不同的 slot 槽中,从而实现 slot 槽隔离。资源组将从输入算子开始继承如果所有输入操作都在同一个资源组。 Flink 默认的资源组名称为 “default”,算子可以显式调用 slotSharingGroup(“default”) 加入到这个资源组中。 someStream.filter(...).slotSharingGroup("name"); |
Task Slots 和资源
每个 worker(TaskManager)都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)。每个 task slot 代表 TaskManager 中资源的固定子集。
例如,具有 3 个 slot 的 TaskManager,会将其托管内存 1/3 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离;当前 slot 仅分离 task 的托管内存。通过调整 task slot 的数量,用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行(例如,可以在单独的容器中启动)。具有多个 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 连接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个 task 的开销。
Flink中slot是任务执行所申请资源的最小单元,同一个TaskManager上的所有slot都只是做了内存分离,没有做CPU隔离。
每一个TaskManager都是一个JVM进程,如果某个TaskManager 上只有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行,如果有多个 slot 就意味着更多 subtask 共享同一 JVM。
一般情况下有多少个subtask,就是有多少个并行线程,而并行执行的subtask要发布到不同的slot中去执行。
Flink 默认会将能链接的算子尽可能地进行链接,也就是算子链,flink 会将同一个算子链分组内的subtask都发到同一个slot去执行,也就是说一个slot可能要执行多个subtask,即多个线程。
flink 可以根据需要手动地将各个算子隔离到不同的 slot 中。
一个任务所用的总共slot为所有资源隔离组所占用的slot之和,同一个资源隔离组内,按照算子的最大并行度来分配slot。
如果并行度是4 ,但是slot只有3个,会部署失败