Flink源码解析之:如何根据算法生成StreamGraph过程
在我们日常编写Flink应用的时候,会首先创建一个StreamExecutionEnvironment.getExecutionEnvironment()
对象,在添加一些自定义处理算子后,会调用env.execute
来执行定义好的Flink应用程序。我们知道,Flink在实际执行任务前,会根据应用生成StreamGraph,再生成JobGraph,最终提交到集群中进行执行。那么Flink是如何将我们自定义的应用程序转换成StreamGraph的呢?这一过程中实现了什么逻辑? 接下来,我们通过源码来深入了解一下。
在本次分析源码的过程中,主要涉及到StreamExecutionEnvironment
、DataStream
、Transformation
、StreamGraph
、StreamGraphGenerator
几下个类,这里先汇总介绍一下在生成StreamGraph过程中,这些类的交互处理流程,有了这个印象后,再阅读下面的源码流程,更容易串起来和理解。
一、Function -> Transformation转换
在我们编写Flink应用程序时,会自定义一系列算子拼接在数据流链路中,比如,当我们调用datastream.flatMap(flatMapFunction)
方法时,就会将传入的算子函数,转换成Transformation对象,添加到StreamExecutionEnvironment
对象的List<Transformation<?>> transformations
属性中。接下来,我们就来看一下是如何进行转换的。
首先进入到DataStream
类中,找到比如flatMap
方法:
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType =
TypeExtractor.getFlatMapReturnTypes(
clean(flatMapper), getType(), Utils.getCallLocationName(), true);
return flatMap(flatMapper, outType);
}
public <R> SingleOutputStreamOperator<R> flatMap(
FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
上面代码中,将flatMap
封装到StreamFlatMap
方法中,用于表示一个StreamOperator操作符。StreamFlatMap
操作符会针对每一个StreamRecord,通过processElement
方法调用用户函数去处理该流数据:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
// 调用用户函数执行数据流元素处理逻辑
userFunction.flatMap(element.getValue(), collector);
}
回到DataSteram
的FlatMap
方法中,我们继续看transform
方法里做了什么:
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperator<T, R> operator) {
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
上面根据传入的StreamOperator
创建一个SimpleOperatorFactory
对象,StreamOperatorFactory是一个工厂类,其主职责是为特定类型的StreamOperator在运行时创建实例。它还提供了其他附加功能如做一些操作配置,比如chaining。
接下来继续进入doTransform
方法:
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 获取当前数据流(上一个Transformation)的输出类型。这样可以做类型检查,并在类型信息缺失时提前引发错误。
transformation.getOutputType();
// 创建一个新的OneInputTransformation,这个新的OneInputTransformation即为要添加的新操作
// 对于flatMap操作来说,不存在分区,所以上下游是一对一的关系,所以这里用的是OneInputTransformation
OneInputTransformation<T, R> resultTransform =
new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
// 创建一个SingleOutputStreamOperator对象,该对象将接收新加入的操作的输出
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream =
new SingleOutputStreamOperator(environment, resultTransform);
// 将新的Transformation添加到当前的执行环境中,这个操作将并入到计算流图中。
getExecutionEnvironment().addOperator(resultTransform);
// 代表了新添加的操作输出结果的数据流,便于在这个数据流上继续构建后续的计算。
return returnStream;
}
上述代码内容就是将userFunction转换成Transformation的具体执行逻辑了,因为我们最初举例是flatMap
方法,因此在将userFunction转换成Transformation时,会使用OneInputTransformation
来表示。同时这里可以看到,在转换完成后,会调用getExecutionEnvironment().addOperator(resultTransform)
将得到的Transformation
添加到当前执行环境的计算流图中,实际上也就是添加到我们刚刚所说的执行环境的List<Transformation<?>> transformations
属性中了。
二、StreamGraphGenerator生成StreamGraph
在将用户函数userFunction转换成Transformation并保存到StreamExecutionEnvironment的transformations属性中后,我们就收集抽象好了所有的用户函数及处理链路,接下来,就是根据这些封装好的Transformation来生成StreamGraph。
首先进入到StreamExecutionEnvironment
的execute
执行入口方法中:
public JobExecutionResult execute() throws Exception {
return execute(getStreamGraph());
}
@Internal
public StreamGraph getStreamGraph() {
return getStreamGraph(true);
}
@Internal
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations) {
transformations.clear();
}
return streamGraph;
}
在上面的getStreamGraph
方法中,使用getStreamGraphGenerator
方法生成一个StreamGraphGenerator
对象,这里的transformations
参数,实际上指的就是上面保存的每个用户函数转换得到的Transformation
对象。
接下来,我们主要看generator
方法,进入到StreamGraphGenerator
类中,这个类也是创建StreamGraph最核心的类。
public StreamGraph generate() {
// 根据不同的配置信息创建一个StreamGraph对象
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
// 设置 StreamGraph 是否在任务结束后启用checkpoint,这个布尔值从配置中获取。
streamGraph.setEnableCheckpointsAfterTasksFinish(
configuration.get(
ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH));
shouldExecuteInBatchMode = shouldExecuteInBatchMode();
configureStreamGraph(streamGraph);
// 初始化一个哈希映射alreadyTransformed,用于存储已经被转换过的Transformation。
alreadyTransformed = new HashMap<>();
// 遍历transformations列表,对每个transformation对象进行转换
// 这里是转换的核心逻辑
for (Transformation<?> transformation : transformations) {
transform(transformation);
}
// 将slotSharingGroupResources设置为StreamGraph的资源配置。
streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);
setFineGrainedGlobalStreamExchangeMode(streamGraph);
// 获取StreamGraph中所有的StreamNode,检查它们的输入边缘是否满足禁用未对齐的checkpointing的条件,如果满足条件,则将边的supportsUnalignedCheckpoints属性设置为false。
for (StreamNode node : streamGraph.getStreamNodes()) {
if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
for (StreamEdge edge : node.getInEdges()) {
edge.setSupportsUnalignedCheckpoints(false);
}
}
}
// 清理streamGraph和alreadyTransformed以释放资源,并防止后续的错误使用,并保存当前的streamGraph实例到builtStreamGraph中。
final StreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed = null;
streamGraph = null;
// 最后返回构建好的StreamGraph。
return builtStreamGraph;
}
上面代码中,最主要的核心逻辑在for循环遍历transformations中,调用transform
方法对每个Transformation对象进行转换。我们主要进入到该方法中进行分析:
/**
* Transforms one {@code Transformation}.
*
* <p>This checks whether we already transformed it and exits early in that case. If not it
* delegates to one of the transformation specific methods.
*/
private Collection<Integer> transform(Transformation<?> transform) {
// 快速检查传入的 transform 对象是否已经在 alreadyTransformed 字典(一个缓存)中,如果已存在则直接返回对应的ID,这种早期退出的机制避免了对同一任务的重复转换。
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from the ExecutionConfig.
int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// 若 transform 对象指定了 SlotSharingGroup ,那么会从 SlotSharingGroup 中提取资源并更新到 slotSharingGroupResources 中。
transform
.getSlotSharingGroup()
.ifPresent(
slotSharingGroup -> {
final ResourceSpec resourceSpec =
SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) {
slotSharingGroupResources.compute(
slotSharingGroup.getName(),
(name, profile) -> {
if (profile == null) {
return ResourceProfile.fromResourceSpec(
resourceSpec, MemorySize.ZERO);
} else if (!ResourceProfile.fromResourceSpec(
resourceSpec, MemorySize.ZERO)
.equals(profile)) {
throw new IllegalArgumentException(
"The slot sharing group "
+ slotSharingGroup.getName()
+ " has been configured with two different resource spec.");
} else {
return profile;
}
});
}
});
// call at least once to trigger exceptions about MissingTypeInfo
// 调用 transform.getOutputType() 进行安全检查,确保类型信息的完整性。
transform.getOutputType();
// 根据 transform 对象的类型获取对应的转换逻辑 translator.
@SuppressWarnings("unchecked")
final TransformationTranslator<?, Transformation<?>> translator =
(TransformationTranslator<?, Transformation<?>>)
translatorMap.get(transform.getClass());
// 如果找到了相应的 translator,使用它进行转换;否则,使用旧的转换策略 legacyTransform()。
Collection<Integer> transformedIds;
if (translator != null) {
transformedIds = translate(translator, transform);
} else {
transformedIds = legacyTransform(transform);
}
// 在转换完成后,检查 transform 是否已经被记录在 alreadyTransformed 字典中。如果尚未记录,则将转换后的对象ID添加到字典中。
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
// 将转换后产生的节点ID返回以供后续使用。
return transformedIds;
}
很明显,这种转换不可能多次进行,因为这会浪费计算资源。因此,我们需要一个机制来记录哪些Transformation已经被转换过。在Flink中,这是通过一个名为alreadyTransformed的哈希映射实现的。如果当前的Transformation已经存在于alreadyTransformed中,那么就无需再次进行转换,直接返回对应的集合即可。
接下来,根据transform的具体类型,从translatorMap中获取相应的translator转换器(具体的translatorMap内容可以在代码中看到)。找到转换器后,调用translate方法来执行转换。那么我们又需要进入到translate
方法中一探究竟:
private Collection<Integer> translate(
final TransformationTranslator<?, Transformation<?>> translator,
final Transformation<?> transform) {
checkNotNull(translator);
checkNotNull(transform);
// 通过调用getParentInputIds()方法获取当前transform对象的所有输入(父级Transformation)的ID。
final List<Collection<Integer>> allInputIds = getParentInputIds(transform.getInputs());
// 再次检查当前transform对象是否已在alreadyTransformed字典中,如果是,直接返回对应的ID。
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
// 确定slotSharingGroup,这是一个根据transform输入和slotSharingGroup名称,决定slot sharing策略的过程。
final String slotSharingGroup =
determineSlotSharingGroup(
transform.getSlotSharingGroup().isPresent()
? transform.getSlotSharingGroup().get().getName()
: null,
allInputIds.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList()));
// 创建一个TransformationTranslator.Context对象,里面包含了StreamGraph,slotSharingGroup和配置信息,该上下文会在转换过程中使用。
final TransformationTranslator.Context context =
new ContextImpl(this, streamGraph, slotSharingGroup, configuration);
// 根据执行模式不同,调用转换方法translateForBatch()或translateForStreaming()进行具体的转换工作。
return shouldExecuteInBatchMode
? translator.translateForBatch(transform, context)
: translator.translateForStreaming(transform, context);
}
每一个TransformationTranslator实例都绑定了一个特定类型的Transformation的转换逻辑,例如OneInputTransformationTranslator,SourceTransformation等。通过这份代码,我们可以看到Flink的灵活性和可扩展性。你可以为特定的Transformation添加不同的激活逻辑或者处理逻辑。这种设计确保了Flink在处理不同类型Transformation时的高效性,并且很容易添加新类型的Transformation。
这里,我们仍然以OneInputTransformationTranslator的转换逻辑来举例,看一下Flink的Transformation转换逻辑执行了什么操作?
protected Collection<Integer> translateInternal(
final Transformation<OUT> transformation,
final StreamOperatorFactory<OUT> operatorFactory,
final TypeInformation<IN> inputType,
@Nullable final KeySelector<IN, ?> stateKeySelector,
@Nullable final TypeInformation<?> stateKeyType,
final Context context) {
checkNotNull(transformation);
checkNotNull(operatorFactory);
checkNotNull(inputType);
checkNotNull(context);
// 即获取 StreamGraph、slotSharingGroup 和transformation的 ID。
final StreamGraph streamGraph = context.getStreamGraph();
final String slotSharingGroup = context.getSlotSharingGroup();
final int transformationId = transformation.getId();
final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
// addOperator() 方法把转换Transformation 添加到 StreamGraph 中。此操作包括transformation的 ID,slotSharingGroup,CoLocationGroupKey,工厂类,输入类型,输出类型以及操作名。
streamGraph.addOperator(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
operatorFactory,
inputType,
transformation.getOutputType(),
transformation.getName());
// 如果 stateKeySelector(用于从输入中提取键的函数)非空,使用 stateKeyType 创建密钥序列化器,并在 StreamGraph 中设置用于接收单输入的状态键
if (stateKeySelector != null) {
TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
}
// 根据 Transformation 和 executionConfig 设置并行度。
int parallelism =
transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
? transformation.getParallelism()
: executionConfig.getParallelism();
streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
final List<Transformation<?>> parentTransformations = transformation.getInputs();
checkState(
parentTransformations.size() == 1,
"Expected exactly one input transformation but found "
+ parentTransformations.size());
// 根据转换的输入和输出添加边到 StreamGraph。每个输入转换都添加一条边。
for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
streamGraph.addEdge(inputId, transformationId, 0);
}
// 方法返回包含转换 ID 的单个元素集合。
return Collections.singleton(transformationId);
}
上面这段代码,实际上就是构建StreamGraph的主体逻辑部分了,translateInternal() 方法实现了从 Transformation 到 StreamGraph 中操作的转换。在该方法中,对于每一个Transformation,会调用streamGraph.addOperator
方法,生成一个StreamNode对象,存储在StreamGraph的streamNode
属性中,该属性是一个Map<Integer, StreamNode>
结构,表示每个Transformation ID对应的StreamNode节点。
protected StreamNode addNode(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends TaskInvokable> vertexClass,
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
StreamNode vertex =
new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
vertexClass);
streamNodes.put(vertexID, vertex);
return vertex;
}
看完translateInternal
方法中streamGraph.addOperator
的执行逻辑后,接下来还需要关注的一个步骤是streamGraph.addEdge
,这里是连接StreamGraph中各StreamNode节点的逻辑所在:
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
null,
new ArrayList<String>(),
null,
null);
}
在addEdgeInternal
方法中,会区分当前节点是虚拟节点还是物理节点,从而添加物理边还是虚拟边。由于我们用OneInputTransformationTranslator
会创建物理节点,所以进入到创建物理边的分支代码中:
private void createActualEdge(
Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
OutputTag outputTag,
StreamExchangeMode exchangeMode) {
// 首先通过节点ID获取上游和下游的StreamNode。
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// 检查分区器partitioner是否已经设置,如果没有设置,且上游节点与下游节点的并行度相等,那么使用ForwardPartitioner; 如果并行度不相等,则使用RebalancePartitioner。
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
if (partitioner == null
&& upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException(
"Forward partitioning does not allow "
+ "change of parallelism. Upstream operation: "
+ upstreamNode
+ " parallelism: "
+ upstreamNode.getParallelism()
+ ", downstream operation: "
+ downstreamNode
+ " parallelism: "
+ downstreamNode.getParallelism()
+ " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
if (exchangeMode == null) {
exchangeMode = StreamExchangeMode.UNDEFINED;
}
/**
* Just make sure that {@link StreamEdge} connecting same nodes (for example as a result of
* self unioning a {@link DataStream}) are distinct and unique. Otherwise it would be
* difficult on the {@link StreamTask} to assign {@link RecordWriter}s to correct {@link
* StreamEdge}.
*/
// 在上述配置都设置好之后,创建StreamEdge对象,并将其添加到上游节点的出边和下游节点的入边。
int uniqueId = getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();
StreamEdge edge =
new StreamEdge(
upstreamNode,
downstreamNode,
typeNumber,
partitioner,
outputTag,
exchangeMode,
uniqueId);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
从上述代码中可以看出,createActualEdge()
方法实现了在StreamGraph中添加实际的边的过程,这是构建Flink StreamGraph的一个重要步骤。
至此,我们就看到了创建StreamGraph,并根据Transformation来生成StreamNode,并添加StreamEdge边的过程,最终构建好一个完成的StreamGraph来表示Flink应用程序的数据流执行拓扑图。
当然这里我们只是以OneInputTransformationTranslator
转换器举例来分析流程,实际上其他的转换器应该会更复杂一些,有兴趣的可以继续深入研究,本文便不再赘述。同时,本文也仍然有很多细节暂时因为理解不够深入没有涉及,欢迎各位一起交流学习。
最终,在我们构造好StreamGraph后,就需要考虑如何将StreamGraph转换成JobGraph了,下一篇,将继续介绍StreamGraph -> JobGraph的转换。