Flink Program 编程套路回顾
1、获取执行环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2、通过执行环境对象,注册数据源 Source,得到数据抽象
DataStream ds = env.socketTextStream(...)
3、调用数据抽象的各种Transformation执行逻辑计算
DataStream resultDS = ds.flatMap(...).keyBy(...).sum(...);
4、将各种Transformation执行完毕之后得到的计算结果数据抽象注册 Sink
resultDS.addSink(...)
5、提交Job执行
env.execute(...)
Flink Job 提交脚本解析
# Submission to an already running Flink on YARN cluster
./bin/flink run --target yarn-session
# Submission spinning up a Flink on YARN cluster in Per-Job Mode
./bin/flink run --target yarn-per-job
# Submission spinning up Flink on YARN cluster in Application Mode
./bin/flink run-application --target yarn-application
具体可以参考官网:
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#advanced-cli
CliFrontend 提交分析
当用户把 Flink 应用程序打成 jar 使用 flink run … 的 shell 命令提交的时候,底层是通过 CliFrontend 来处理。底层的逻辑,就是通过反射来调用用户程序的 main() 方法执行。
需要注意的是,Application 模式下,会通过 YarnClusterDescriptor.deployInternal 方法在 yarn 中部署一个 application 集群,返回 YarnRestClusterClient 对象。yarn 中会启动一个 EmbeddedJobClient,执行 submitJob 方法提交 jobGraph。
ExecutionEnvironment 源码解析
StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:
1、提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源。
2、提供了 setParallelism() 设置应用程序的并行度。
3、StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责 Job 执行的一些行为配置管理。还管理了 Configuration 管理一些其他的配置。这个所谓的其他配置,还包含了 Checkpoint 的配置,这个 chekcpoint 的配置参数,会单独解析出来,存储在 CheckpontConfig 中
4、StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations 成员变量,该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些 Transformation 按照逻辑拼接起来,就能得到 StreamGragh, 注意转换顺序:
UserFunction ==> StreamOperator ==> Transformation ==> StreamNode
5、StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的参数就是:StreamGraph
Flink on YARN Per-job 模式提交流程分析
入口类:ApplicatoinMaster: YarnJobClusterEntryPoint
Job提交流程源码分析
getStreamGraph(jobName) 生成 StreamGraph 解析
// 入口
StreamGraph streamGraph = getStreamGraph(jobName, true){
// 通过 StreamGraphGenerator 来生成 StreamGraph
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate(){
streamGraph = new StreamGraph(....)
for(Transformation<?> transformation : transformations) {
transform(transformation);
}
}
}
transform(transformation){
// 先递归处理该 Transformation 的输入
Collection<Integer> inputIds = transform(transform.getInput());
// 将 Transformation 变成 Operator 设置到 StreamGraph 中,其实就是添加 StreamNode
streamGraph.addOperator(....);
// 设置该 StreamNode 的并行度
streamGraph.setParallelism(transform.getId(), parallelism);
// 设置该 StreamNode 的入边 SreamEdge
for(Integer inputId : inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
// 内部实现
// 构建 StreamNode 之间的 边(StreamEdge) 对象
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, ...){
// TODO_MA 注释: 给 上游 StreamNode 设置 出边
getStreamNode(edge.getSourceId()).addOutEdge(edge);
}
// TODO_MA 注释: 给 下游 StreamNode 设置 入边
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
execute(StreamGraph) 解析
// 入口
JobClient jobClient = executeAsync(streamGraph){
// 执行一个 SreamGraph
executorFactory.getExecutor(configuration).execute(streamGraph, configuration){
// 第一件事:由 StreamGraph 生成 JobGragh
JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
// 第二件事:通过 RestClusterClient 提交 JobGraph 到Flink集群
clusterClient.submitJob(jobGraph)
}
}
// 通过 RestClusterClient 来提交 JobGraph
RestClusterClient.submitJob(JobGraph jobGraph){
// 继续提交
RestClusterClient.sendRetriableRequest(){
// 通过 RestClient 提交
RestClient.sendRequest(webMonitorHost, webMonitorPort, ...){
// 继续提交
RestClient.submitRequest(targetAddress,targetPort,httpRequest,responseType)
}
}
}
最终通过 channel 把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理。
小结
01、用户根据 Flink 应用程序的编写套路,写好应用程序,打成 jar 包,通过 flink run 的命令来执行提交
02、这个命令的底层,其实是执行: CliFrontend 组件来执行提交
03、这个 CliFrontend 的内部,会通过反射的技术,来转交执行到用户自定义应用程序的 main()
04、先获取 StreamExecutionEnvironment 执行环境对象实例
05、执行算子:其实就是从 算子 ---> function ---> StreamOperator ---> Transformation
06、执行 StreamExecutionEnvironment 的 executor 方法来执行提交
07、首先遍历 StreamExecutionEnvironment 的 transformations 这个 list 来生成 StreamGraph,之后会继续被构建成 JobGraph
08、具体的内部的提交是通过 RestClusterClient 来执行提交
09、在通过 RestClusterClient 提交之前,其实还会做一件事:把 SreamGraph 变成 JobGraph,也还会先把 JobGraph 持久化成为一个磁盘文件
10、在这个 RestClusterClient 的内部,其实是通过 RestClient 来提交
11、RestClient 其实在初始化的时候,就初始化了一个 Netty 客户端
12、通过封装一个 HttpRequest 对象,包含了需要提交的 JobGraph 文件和 Jar 包等,通过 Netty 客户端链接服务端,发送请求对象到服务端
13、Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler
WebMonitorEndpoint 处理 RestClient 的 JobSubmit 请求
最终处理这个请求: Flink 主节点 JobManager 负责处理这个请求的是 WebMonitorEndpoint 中的 Netty 服务端,接收到 rest 请求会调用 Router 执行 route 处理,找到对应的 Handler 执行处理。提交 Job 对应的 Handler 是 JobSubmitHandler。
// JobManager 服务端处理入口
JobSubmitHandler.handleRequest(){
// 恢复得到 JobGraph
JobGraph jobGraph = loadJobGraph(requestBody, nameToFile);
// 通过 Dispatcher 提交 JobGraph
Dispatcher.submitJob(jobGraph, timeout);
}
JobMaster 启动源码剖析
关键方法: jobMasterServiceFactory.createJobMasterService
核心的工作是:
- 创建 JobMaster 这个 RpcEndpoint 组件,负责通信。内部会创建一个 DefaultScheduler 调度组件,在初始化该调度组件的时候,会调用 ExecutionGraphFactory 的相关方法,来把 JobGraph 转换成 ExectionGraph
- JobMaster 启动,跳转到 onStart() 方法。内部的主要工作,就是以下这三:
- 启动心跳机制,维持和 ResourceManager,和 TaskExecutor 之间的心跳
- 启动 SlotPoolImpl 这个 slot 管理组件。
- 从 ZK 获取 ResourceManager 的地址,从而进行 JobMaster 向 ResourceManager 的注册
- 启动的这个 JobMaster 负责这个 Job 中的所有的 Task 的 slot 的申请和 任务的派发,状态的跟踪,容错,还有 checkpoint等各种操作
JobMaster 和 ResourceManager/TaskExecutor 的心跳
JobMaster 向 ResourceManager 注册
// 启动 JobMaster
jobMaster.start(){
JobMaster.onStart(){
startJobExecution(){
// 第一件大事:启动 JobMaster 必要的一些工作
startJobMasterServices(){
// 第一件事: 启动心跳机制
this.taskManagerHeartbeatManager = createTaskManagerHeartbeatManager(heartbeatServices);
this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices);
// 第二件事: 启动 SlotPoolImpl
slotPoolService.start(getFencingToken(), getAddress(), getMainThreadExecutor());
// 第三件事: 从 ZK 获取 ResourceManager 的地址
// 这儿就是 JobMaster 向 ResourceManager 执行注册的入口
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
// 第二件大事:开始调度执行
startScheduling();
}
}
}
ResourceManager.registerJobManager(){
// ResourceManager 关于 JobMaster 的注册内部实现,重要的事情做了四件
registerJobMasterInternal(jobMasterGateway, jobId, ....){
// TODO_MA 马中华 注释: 生成 JobMaster 注册对象
JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, ....);
// TODO_MA 马中华 注释: 完成注册
jobManagerRegistrations.put(jobId, jobManagerRegistration);
jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
// TODO_MA 马中华 注释: 加入心跳管理
jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>() {});
// TODO_MA 马中华 注释: 返回 JobMaster 注册成功
return new JobMasterRegistrationSuccess(getFencingToken(), resourceId);
}
}
Flink Graph 演变
StreamGraph 构建和提交源码解析
关于 StreamNode 的定义:
public class StreamNode {
private final int id;
private int parallelism;
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
private final Class<? extends AbstractInvokable> jobVertexClass;
}
关于 StreamEdge 的定义:
public class StreamEdge implements Serializable {
private final String edgeId;
private final int sourceId;
private final int targetId;
}
JobGraph 构建和提交源码解析
JobGraph: StreamGraph 经过优化后生成了 JobGraph,提交给 Flink 集群的数据结构。它包含的主要抽象概念有:
1、JobVertex:经过优化后符合条件的多个 StreamNode 可能会 chain 在一起生成一个 JobVertex,即一个JobVertex 包含一个或多个 operator,JobVertex 的输入是 JobEdge,输出是 IntermediateDataSet。
2、IntermediateDataSet:表示 JobVertex 的输出,即经过 operator 处理产生的数据集。producer 是 JobVertex,consumer 是 JobEdge。
3、JobEdge:代表了 job graph 中的一条数据传输通道。source 是 IntermediateDataSet,target 是 JobVertex。即数据通过 JobEdge 由 IntermediateDataSet 传递给目标 JobVertex。
在 StreamGraph 构建 JobGragh 的过程中,最重要的事情就是 operator 的 chain 优化,那么到底什么样的情况的下 Operator 能chain 在一起呢 ?答案是要满足以下 9 个条件:
// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认是 HEAD)
!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是 ALWAYS)
!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的 shuffle 方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();
构建逻辑的重点代码:
1、在 connect 之间,调用的 createChain() 就是先执行优化,然后再生成 JobVertex
2、然后 调用 connect 之后,是为了组织关系
1、先生成 IntermediateDataSet 和 JobEdge
2、把 IntermediateDataSet 和 当前 JobVertex 设置为 JobEdge 的 source 和 target
3、把 JobEdge 设置为这个 IntermediateDataSet 的消费者
关于 JobVertex 的定义:
public class JobVertex implements java.io.Serializable {
private final JobVertexID id;
private final ArrayList<IntermediateDataSet> results = new ArrayList<>();
private final ArrayList<JobEdge> inputs = new ArrayList<>();
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
private String invokableClassName;
}
关于 IntermediateDataSet 的定义:
public class IntermediateDataSet implements java.io.Serializable {
private final IntermediateDataSetID id;
private final JobVertex producer;
private final List<JobEdge> consumers = new ArrayList<JobEdge>();
}
关于 JobEdge 的定义:
public class JobEdge implements java.io.Serializable {
private final JobVertex target;
private IntermediateDataSet source;
private IntermediateDataSetID sourceId;
}