前言
这里主要是 涉及到 flink 中各个角色的交互
TaskManager 和 ResourceManager 的交互
JobMaster 和 ResourceManager 的交互
等等流程
TaskManager 和 ResourceManager 的交互
主要是 包含了几个部分, 如下, 几个菜单
TaskManager向 ResourceManager 注册
ResourceManager 向 TaskManager 心跳的发送
ResourceManager 这边收到 TaskManager 的 slotReport 之后的处理
TaskManager向 ResourceManager 注册
TaskManager 中 TaskExecutor 启动之后, 会向 ResourceManager 注册
注册如下, 向 ResourceManager 这边发送请求, 携带上基本信息, resourceId 是 TaskManagerRunner 中 ResourceID.generate() 随机生成的一个字符串
ResourceManager 的地址是根据 JobManager 的信息拼接上固定的 “/user/resourcemanager” 得到的
JobManager 这边的 ResourceManager 注册该 TaskManager 的相关信息, 并相应 ResourceManager 这边创建的 WorkerRegistration 信息返回
然后这里注册了一个发送到 TaskManager 的定时心跳, 注册到了 ResourceManager.taskManagerHeartbeatManager 中
ResourceManager 向 TaskManager 心跳的发送
这里 ResourceManager 向 TaskManager 这边心跳的发送是这里 HeartbeatManagerSenderImpl.run 中处理的, 定时的效果是 延时+递归 来实现的
然后接着 TaskManager 这边会响应 TaskManager 的各个 slot 的相关信息给 ResourceManager
ResourceManager 这边收到 TaskManager 的 slotReport 之后的处理
然后接着就是 ResourceManager 这边的处理, 更新目标 taskManager 的 slot 的相关信息
然后 web 页面上, 这里 TaskManager 的相关信息 就是来自于 ResourceManager
JobMaster 和 ResourceManager 的交互
主要是 包含了几个部分, 如下, 几个菜单
JobMaster 向 ResourceManager 注册
JobMaster 向 ResourceManager 注册
同样是 JobMaster 启动的饿时候, 会自动向 ResourceManager 注册
注册的信息如下, jobId, jobResourceId, 以及 jobManager 的交互信息
然后这里的 jobResourceId 同样是 JobMaster 初始化的时候 ResourceID.generate() 生成的一个随机字符串
ResourceManager 向 JobMaster 这边心跳的发送
ResourceManager 收到 JobMaster 的注册请求之后, 会向 jobManagerHeartbeatManager 注册向 JobMaster 的心跳任务
然后就是 JobManager 这边收到心跳之后, 向 ResourceManager 发送了一个心跳信息, 未携带 任何数据
ResourceManager 这边收到 TaskManager 的 null 之后的处理
无任何处理, 也不用任何处理
JobMaster 这边资源请求的流程
JobMaster 启动之后, 自动连接 ResourceManager
连接上 ResourceManager 之后, 会向 ResourceManager 发送执行资源的请求
然后是 ResourceManager 这边找到合适的 TaskManagerSlot, 然后 allocateSlot, 向 TaskManager 指定具体的 job
ResourceManager 向 TaskManager 发送请求, 指派其需要执行 目标 job
然后是 TaskExecutor 注册 job 信息, 以及对方 JobMaster 的交互信息
然后是 TaskExecutor 这边主动和目标 JobMaster 获取联系, 表示为 JobMaster 提供一个 TaskManagerSlot 用于执行目标任务
然后是 JobMaster 这边拿到了 TaskManagerSlot 之后执行任务
接着是更新 Execution 的 slot 的信息, 然后这个是外层 CompletableFuture 是 Execution.scheduleForExecution 中的 allocationFuture
然后就是 JobMaster 这边的 deploy, 这里会向具体的 TaskExecutor 发送任务
然后 deploy 里面就是根据 ExecutionVertex 封装 TaskDeploymentDescriptor, 然后将相关信息发送到 TaskExecutor 去执行
处理函数的传递流程
这一系列流程如下
- driver 这边采集各个函数对象, 封装 UserCodeObjectWrapper, 然后序列化 封装到 TaskConfig, 以 udf 结尾的 配置信息作为 key, 这部分 TaskConfig 是包含在 JobGraph 中的每一个 JobVertex 中的, 然后伴随着 JobGraph 的序列化传递到 JobManager 这边进行处理
- JobManager 这边反序列化 JobGraph, 然后创建 JobMaster, 该 JobVertex 经过 ExecutionVertex, TaskDeploymentDescriptor 然后传递到 TaskExecutor
- TaskExecutor 这边反序列化 DataSourceTask, ChainedDriver, DataSinkTask 等等, 然后 执行任务
所以这个流程中 JobManager 这边是仅仅是获取, 持有, 传递 udf 部分, 不涉及 反序列化
driver 这边
从上下文获取 function 对象, 也就是我们驱动代码里面 “new Test01WordCount$MyFlatMapMapper()”, 然后封装了一个 UserCodeObjectWrapper 被 FlatMapOperatorBase 持有
然后会经历 Plan, OptimizedPlan 然后到 JobVertex 阶段
然后是创建 JobGraph, 创建每一个 JobVertex 的时候, 序列化该 JobVertex 的 处理函数
然后是将 chainedTask, 的相关配置信息放在 主JobVertex
然后隔离是通过 ”chaining.taskconfig.” + $idx 来进行隔离的, 相当于是增加了一系列的名称空间
然后就是 JobGraph 的序列化, 准备发送 http 请求 传输 Job 到 JobManager
JobManager 这边
JobManager 这边反序列化 JobGraph 如下, 这里面和客户端那边一样
然后 这边的 JobGraph 和 客户端那边的一致, 包含了 JobVertex 中包含了 TaskConfiguration 相关信息
然后是到后面封装 TaskDeploymentDescriptor 这里可以看到, 也是间接的从 JobVertex 中获取的 TaskConfiguration
然后 最终的传递是通过 TaskInformation 从 JobMaster 这边传递到 TaskExecutor
TaskExecutor 这边
反序列化各个 DataSourceTask, ChainedDriver, DataSinkTask 等等的时候
根据索引, 添加前缀, 来获取给定的 ChainedDriver, 然后添加到 chinedTaskTarget 中, 基于 previous 形成了一个单项任务执行的链表, 用于后面的执行
这里各个任务的前缀为 “chaining.taskconfig” + $idx 和前面放入的时候, 是对称的
这里是具体的获取配置的地方, 前缀 + “udf”, 然后从 配置信息中获取配置
然后是 反序列化 UserCodeObjectWrapper, 里面封装了目标函数, Test01WordCount$MyFlatMapMapper
完