前言
JobManager是Flink的核心进程,主要负责Flink集群的启动和初始化,包含多个重要的组件(JboMaster,Dispatcher,WebEndpoint等),本篇文章会基于源码分析JobManagr的启动流程,对其各个组件进行介绍,希望对JobManager有一个更全面的了解。
集群启动模式
ClusterEntryPoint是Flink集群的入口点的基类,该类是抽象类,类继承关系UML图如下
通过上图可知道,Flink有3种集群模式
Flink Session集群
根据不同的资源管理器,有3个不同的子类:
- StandaloneSessionClusterEntrypoint Standalone session模式下集群的入口类
- KubernetesSessionClusterEntrypoint K8s session模式下集群的入口类
- YarnSessionClusterEntrypoint Yarn session模式下集群的入口类
集群生命周期:
在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。
资源隔离:
Flink作业共享集群的ResourceManager和Dispacher等组件,TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。
适用场景:
因为组件共享,session集群资源使用率高,集群预先存在,不需要额外申请资源,适合一些比较小的,不是长期运行的作业,例如SQL预览,交互式查询,实时任务测试环境等
Flink Per Job集群
只要Yarn提供了继承的子类:YarnJobClusterEntrypoint
集群生命周期:
在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。一旦作业完成,Flink Job 集群将被拆除。
资源隔离:
每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。
使用场景:
实时由于Per Job模式下用户应用程序的main方法在客户端执行生成JobGraph,任务量大情况下存在性能瓶颈,目前已被标记为废弃状态。
Flink Application集群
根据不同的资源管理器,有3个不同的子类:
- StandaloneApplicationClusterEntryPoint Standalone Application模式下集群的入口类
- KubernetesApplicationClusterEntrypoint K8s Application模式下集群的入口类
- YarnApplicationClusterEntryPoint Yarn Application模式下集群的入口类
集群生命周期:
Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且main方法在集群上而不是客户端上运行。应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用main方法来提取 JobGraph
资源隔离:
每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。
使用场景:
Application模式资源隔离性好,Per Job模式的替换方案,适合长期运行、具有高稳定性的大型作业
JobManager启动流程
JobManger启动流程在不同模式下基本相同,Standalone模式可以在本地运行(可以参考),方便Debug,因为使用Standalone模式的入口类StandaloneSessionClusterEntrypoint进行启动流程的分析。
main方法入口
public static void main(String[] args) {
// 打印系统相关信息
EnvironmentInformation.logEnvironmentInfo(
LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
//信号注册器,注册系统级别的信号,接收到系统级别终止信号优雅的关闭
SignalHandler.register(LOG);
//注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// 解析命令行参数,获取配置信
final EntrypointClusterConfiguration entrypointClusterConfiguration =
ClusterEntrypointUtils.parseParametersOrExit(
args,
new EntrypointClusterConfigurationParserFactory(),
StandaloneSessionClusterEntrypoint.class);
//加载config.yaml,构建Configuration对象
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
StandaloneSessionClusterEntrypoint entrypoint =
new StandaloneSessionClusterEntrypoint(configuration);
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
主要步骤:
- 打印系统信息。
- 注册信号处理器,注册系统级别的信号,确保优雅关闭。
- 注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭。
- 解析命令行参数,加载配置文件。
- 初始化
StandaloneSessionClusterEntrypoint
。 - 调用
ClusterEntrypoint#runClusterEntrypoint
方法启动集群。
ClusterEntrypoint#runClusterEntrypoint
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
//
clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {
LOG.error(
String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),
e);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
//无关代码 无需关注
}
核心步骤:
- 调用
clusterEntrypoint.startCluster()
启动集群。
ClusterEntrypoint#startCluster
public void startCluster() throws ClusterEntrypointException {
//无关代码 无需关注
try {
FlinkSecurityManager.setFromConfiguration(configuration);
//插件管理类,用来加载插件。插件加载两种方式。
//1).通过如下参数配置FLINK_PLUGINS_DIR。
//2).将插件jar包放入到plugins下
PluginManager pluginManager =
PluginUtils.createPluginManagerFromRootFolder(configuration);
//初始化文件系统的配置
configureFileSystems(configuration, pluginManager);
//初始化安全上下文环境 默认HadoopSecurityContext,Hadoop安全上下文,
//使用先前初始化的UGI(UserGroupInformation)和适当的安全凭据。比如Kerberos。
//总结:初始化安全环境,创建安全环境的时候会做一系列的检查。
SecurityContext securityContext = installSecurityContext(configuration);
ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
//安全的情况下调用runCluster开始初始化组件
securityContext.runSecured(
(Callable<Void>)
() -> {
runCluster(configuration, pluginManager);
return null;
});
} catch (Throwable t) {
//异常处理代码 无需关注
}
}
startCluster方法主要做了一些环境和配置初始化的工作
主要步骤:
- 初始化插件管理器,用来加载插件。
- 初始化文件系统设置 例如 hdfs、本地file。此时只是初始化的配置。
- 初始化安全环境。
- 安全环境下调用
runCluster
方法。
ClusterEntrypoint#runCluster
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
//初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等
initializeServices(configuration, pluginManager);
// write host information into configuration
configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());
//创建Dispatcher和ResourceManger组件的工厂类
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
//创建Dispatcher和ResourceManger组件
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId.unwrap(),
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
delegationTokenManager,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
failureEnrichers,
this);
//组件停止运行后的异步方法
clusterComponent
.getShutDownFuture()
.whenComplete(
//代码省略
)
}
}
主要步骤:
1.初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等
2.创建Dispatcher和ResourceManger组件的工厂类
3.创建Dispatcher和ResourceManger组件
4.定义组件停止运行后的异步方法
总结
本篇文章分享了Flink任务的集群模式,通过源码的方式分析了JobManger的启动流程,后续会对JobManger相关的服务和组件进行更详细的分析。