1.1 概要介绍
Flink on Yarn的HA高可用模式,首先依赖于Yarn自身的高可用机制(ResourceManager高可用),并通过Yarn对JobManager进行管理,当JobManager失效时,Yarn将重新启动JobManager。其次Flink Job在恢复时,需要依赖Checkpoint进行恢复,而Checkpoint的快照依赖于远端的存储:HDFS,所以HDFS也必须是高可用,同时JobManager的元数据信息也依赖于HDFS的高可用(namenode的高可用,和多副本机制),再者JobManager元数据的指针信息要依赖于Zookeeper的高可用。
1.2 Flink on Yarn的优势
相对于 Standalone 模式,在Yarn 模式下有以下几点好处:
1.资源按需使用,提高集群的资源利用率;
2.任务有优先级,根据优先级运行作业;
3.基于 Yarn 调度系统,能够自动化地处理各个角色的 Failover:
JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控;
如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器;
如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager。
第2章 Flink on Yarn模式运行的方式
2.1 Application
Application模式:简答的说就是直接run job,每次提交的任务Yarn都会分配一个JobManager,执行完之后整个资源会释放,包括JobManager和TaskManager。
Application模式适合比较大的任务、执行时间比较长的任务。
2.2 Session
Session模式:在Session模式中, Dispatcher 和 ResourceManager 是可以复用的;当执行完Job之后JobManager并不会释放,Session 模式也称为多线程模式,其特点是资源会一直存在不会释放。使用时先启动yarn-session,然后再提交job,每次提交job,也都会分配一个JobManager。
Session模式适合比较小的任务、执行时间比较短的任务。该模式不用频繁的申请资源和释放资源。
所以一般生产情况下我们都会选取 on Yarn 部署Application方式运行
废话不多说开始部署
官网下载安装包
解压后进入目录
打开conf/flink-conf.yaml
修改或者可能修改的都在下面写了
Flink on yarn将会覆盖掉几个参数:
jobmanager.rpc.address因为jobmanager的在集群的运行位置并不是事先确定的,它就是am的地址;
taskmanager.tmp.dirs使用yarn给定的临时目录;
parallelism.default也会被覆盖掉,如果在命令行里指定了slot数。
提前创建flink在hadoop上的逻辑数据目录
jobmanager.rpc.address: 0.0.0.0 #感觉on yarn改这个没什么用处,随便改改
jobmanager.bind-host: 0.0.0.0 #感觉on yarn改这个没什么用处,随便改改
taskmanager.bind-host: 0.0.0.0 #感觉on yarn改这个没什么用处,随便改改
taskmanager.host: 0.0.0.0 #感觉on yarn改这个没什么用处,随便改改
high-availability.storageDir: hdfs:///flink/ha/
state.checkpoints.dir: hdfs://nameservice1/flink-checkpoints
state.savepoints.dir: hdfs://nameservice1/flink-savepoints
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
#web.submit.enable: false #允许web提交任务 按需 我不需要
#web.cancel.enable: false #允许web取消任务 按需 我不需要
还有一些针对kafka zookeeoer的kerberos配置 我这边用不着 大同小异。另外这个配置的用处是提交任务之前不用kinit ,我这已经习惯了kinit 所以也用不着 都是字面意思很好配
# security.kerberos.login.use-ticket-cache: true
# security.kerberos.login.keytab: /path/to/kerberos/keytab
# security.kerberos.login.principal: flink-user
jobmanager.archive.fs.dir: hdfs:///flink/completed-jobs/
historyserver.web.address: 0.0.0.0
historyserver.archive.fs.dir: hdfs:///flink/completed-jobs/
添加一行
classloader.check-leaked-classloader: false
然后就可以运行命令测试了
kinit 你的kerberos用户 如果不是hdfs用户的话需要在hdfs上的/user配置好权限,因为会在/user/{username}/.flink/ 下输出临时文件
命令行设置hadoop环境变量
export HADOOP_CLASSPATH=`hadoop classpath`
我配置了flink环境变量 如果你没配置 那就bin/flink ,在flink安装目录下运行 使用官方example 运行看看
flink run -m yarn-cluster ./examples/batch/WordCount.jar
没报错 出来一堆
这种 就是安装成功了
可以在yarn的界面和cdh的界面查到flink的任务
别忘了把包和profile里配置的环境变量 分发到所有节点。便于在任何节点提交任务
最后启动一下histrory web ui
bin/historyserver.sh start
访问ip:端口即可 具体什么端口看你的配置