Flink程序有三部分operation组成,分别是源source、转换transformation、目的地sink。这三部分构成DAG。
DAG首先生成的是StreamGraph。
用户代码在添加operation的时候会在env中缓存(变量transformations),在env.execute()执行的时候才会生成对应StreamGraph。
生成StreamGraph
transformations中只存了3个变量,其实是5个。
getStreamGraph顾名思义就是生成StreamGraph。
最后是getStreamGraphGenerator(transformations).generate()生成。getStreamGraphGenerator获取生成器,generate生成StreamGraph
generate方法中首先创建了StreamGraph对象,再遍历transformations给StreamGraph添加相关操作信息(transform(transformation)
)。其余部分都是处理相关的运行参数(执行参数、checkpoint参数、savepoint参数等)
transform中主要有三部分。
没有并行度,添加并行度
要是指定了slotGroup,将需要的slot资源记录到slotSharingGroupResources中
最后实际转换。优先使用_translatorMap_中存在的translator。这些translator是已经定义好的解释器,可以根据不同场景选择是流模式还是批模式。传统是legacyTransform
legacyTransform
根据情况处理单个流输入或多个流输入。
translate根据情况选择批处理或者流处理
addOperator和addEdge是重点方法,添加顶点和边。
StreamEdge
一个edge连接上下游两个node。
edgeId:唯一id
sourceId、targetId:连接的上下游node的id
outputPartitioner:分区器
StreamNode
一个node可以有多个edge
inEdges、outEdges:node的入边和出边
jobVertexClass:封装用户函数的执行类
StreamGraph
有多个streamNodes组成,streamNodes之间是streamEdge相连。
类似以下这种:
streamNodes:缓存graph所有的node
sources:DAG的输入源集合
sinks:DAG的输出源集合
添加node
addSink、addSource、addOperator是主要方法。可以看到addSink、addSource也是addOperator。
addOperator中addNode是添加StreamNode的方法。
addNode就是创建StreamNode对象,并添加到streamNodes中。
添加edge
方法是addEdge,内部调用addEdgeInternal
addEdgeInternal中前面是处理虚拟节点的。后面是调用createActualEdge来添加
createActualEdge中首先确定partitioner,没有指定partitioner就优先使用ForwardPartitioner,要求上下游并行度一样,否则使用RebalancePartitioner。
然后创建StreamEdge对象,并将相关信息绑定到对应的StreamNode上。