Flink的部署模式:Local模式、Standalone模式、Flink On Yarn模式

news2025/1/13 7:39:30

Flink常见的部署模式

  • Flink部署、执行模式
    • Flink的部署模式
    • Flink的执行模式
  • Local本地模式
    • 下载安装
    • 启动、停止Flink
    • 提交测试任务
    • 停止作业
  • Standalone独立模式
    • 会话模式
    • 单作业模式
    • 应用模式
  • YARN运行模式
    • 会话模式
      • 启动Hadoop集群
      • 申请一个YARN会话
      • 查看Yarn、Flink
      • 提交作业
      • 查看、测试作业
    • 单作业模式
      • 提交作业
      • 查看Yarn、Flink
      • 查看、取消作业
    • 应用模式
      • 提交作业
      • 查看、取消作业
      • 从HDFS读取提交任务
    • Yarn模式高可用

Flink部署、执行模式

Flink的部署模式

本地模式、Standalone模式和FlinkonYARN模式是Flink的三种常见部署模式。

1.Local本地模式:

在本地模式下,Flink以单机模式运行,无需启动分布式资源管理器。这种模式适用于本地开发和测试,用于验证Flink代码的正确性和性能。

2.Standalone模式:

在Standalone模式下,Flink作为一个独立的集群运行。需要启动Flink的JobManager和TaskManager,JobManager负责接收和调度任务,而TaskManager负责执行任务。

3.Flink on YARN模式:

在FlinkonYARN模式下,Flink在YARN(Hadoop的资源调度和集群管理系统)之上运行。Flink作为一个YARN应用程序,利用YARN来管理资源分配和任务调度。使用这种模式,可以充分利用Hadoop集群的资源,实现Flink的分布式计算。

Flink的执行模式

Flink可以通过以下三种方式之一执行应用程序:

1.Session Mode:会话模式

会话模式需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所有提交的作业会竞争集群中的资源。适合任务规模小,执行时间短的大量作业。

Flink的作业执行环境会一直保留在集群上,直到会话被显式终止。这样,可以提交多个作业,它们可以共享相同的集群资源和状态,从而实现更高的效率和资源利用。

2.Per-Job Mode:单作业模式

每个Flink应用程序作为一个独立的作业被提交和执行。

每次提交的Flink应用程序都会创建一个独立的作业执行环境,该作业执行环境仅用于执行该特定的作业。

当作业完成后,作业执行环境会被释放,集群关闭,资源释放

3.Application Mode:应用模式

应用模式算是前2种模式的升级,前2种模式中,Flink程序代码是在客户端执行,然后客户端提交给JobManager,客户端需要占用大量网络带宽。

应用模式需要为每一个提交的应用单独启动一个JobManager(应用程序在JobManager执行),也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager关闭。

4.三种模式的区别:

集群生命周期和资源隔离保证

应用程序的main()方法是在客户端还是在集群上执行

在这里插入图片描述

Local本地模式

Local模式是Flink提供的最简单部署模式,可以在单台服务器上运行,适用于日常的开发和调试。

注意:Flink的运行依赖JAVA环境,需要预先安装好JDK

下载安装

Flink下载地址: https://archive.apache.org/dist/flink/

下载Flink

wget https://repo.huaweicloud.com/apache/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz

解压、重命名

tar  -zxvf flink-1.17.0-bin-scala_2.12.tgz 

mv flink-1.17.0 flink

启动、停止Flink

不需要进行任何配置,直接使用Flink默认配置,直接运行脚本启动

bin/start-cluster.sh

停止Flink

bin/stop-cluster.sh

直接访问:http://IP:8081,可以看到Flink的后台管理界面

每个taskmanager有3个solt

在这里插入图片描述

提交测试任务

提交一个测试任务:

./bin/flink run examples/batch/WordCount.jar

在控制台直接看到输出

[root@node01 flink]# ./bin/flink run examples/batch/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/program/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/program/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID a946d0abf84ac6848a823cec43f7056f
Program execution finished
Job with JobID a946d0abf84ac6848a823cec43f7056f has finished.
Job Runtime: 584 ms
Accumulator Results: 
- 1a50b4c9582d4d35a854872c62391768 (java.util.ArrayList) [170 elements]


(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
(awry,1)

同样,在Flink的后台管理界面 Completed Jobs 一栏可以看到刚才提交执行的程序:
在这里插入图片描述

停止作业

可以直接在 WEB 界面上点击对应作业的 Cancel Job 按钮进行取消,也可以使用命令行进行取消。

使用命令行进行取消时,需要先获取到作业的JobId

bin/flink list

获取到JobId后,使用flink cancel JobId命令取消作业

bin/flink cancel a946d0abf84ac6848a823cec43f7056f

Standalone独立模式

Standalone模式是集群模式的一种,独立模式是独立运行的,不依赖任何外部的资源管理平台,存在资源不足,出现故障不会自动扩展或重分配资源的能力,一般用在开发测试或作业非常少的场景下。

优缺点:

部署相对简单,可以支持小规模,少量的任务运行

缺少系统层面对集群中Job的管理,容易遭成资源分配不均匀

资源隔离相对简单,任务之间资源竞争严重

会话模式

会话模式部署需要先启动集群,集群资源固定,通过Web页面客户端提交任务,可以多个任务。

搭建一个Flink集群,参考:搭建Flink集群、集群HA高可用以及配置历史服务器

1.启动 Flink 集群:

通过bin/start-cluster.sh脚本启动集群

2.打开Flink Web UI

在浏览器中输入http://node01:8081/地址打开Flink Web UI

3.提交Flink作业

在Flink Web UI中选择要提交的 Flink 作业 jar 包,并指定作业参数和作业名称。

bin/flink run ../examples/streaming/WordCount.jar

4.查看Flink作业

提交作业之后,在 Flink Web UI 上会看到作业的运行状态,可以查看作业日志和监控指标等信息。

5.停止Flink作业

可以在Flink Web UI中停止作业,也可以使用bin/flink cancel jobID命令停止指定的作业

单作业模式

Standalone集群并不支持单作业模式部署,单作业模式需要借助一些资源管理平台。

应用模式

应用模式下不会提前创建集群,因此不能调用start-cluster.sh脚本,但是可以使用在bin目录下的standalone-job.sh来创建一个JobManager。

1.将Flink应用程序的jar包放到Flink的安装路径下的lib目录下。

[root@node01 flink]# mv /root/demo-1.0-SNAPSHOT.jar  lib

2.启动netcat

[root@node01 ~]# nc -lk 8888

3.启动JobManager

直接指定作业入口类,脚本会到lib目录扫描所有的jar包

[root@node01 flink]# bin/standalone-job.sh start --job-classname cn.ybzy.demo.WordCountDemo  
Starting standalonejob daemon on host node01.

4.启动TaskManager

[root@node01 flink]# bin/taskmanager.sh start
Starting taskexecutor daemon on host node01.

5.查看进程

[root@node01 flink]# jps
11973 Jps
11240 TaskManagerRunner
11898 StandaloneApplicationClusterEntryPoint

6.查看Web UI
在这里插入图片描述
一直是如下所示状态,明显异常:
在这里插入图片描述
查看flink/log/flink-root-standalonejob-1-node01.log日志

1.异常提示资源不够:

Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_371]
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) ~[?:1.8.0_371]
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) ~[?:1.8.0_371]
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) ~[?:1.8.0_371]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_371]
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_371]

修改配置文件,调大资源,发现无效。

# jobmanager.memory.process.size: 1600m
jobmanager.memory.process.size: 2000m

#taskmanager.memory.process.size: 1728m
taskmanager.memory.process.size: 2600m

后来仔细观察日志,发现一处核心异常如下异常:

 org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 6f4f54c45d7bb59531f537b966776793: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=3}]

关键词numberOfRequiredSlots=3尤为重要,JobManager启动默认只有1Slot,Slot请求资源不够!

编辑conf/flink-conf.yaml文件

# taskmanager.numberOfTaskSlots: 1
# 修改Slot数量为3
taskmanager.numberOfTaskSlots: 3

停止taskmanager、standalone-job,重新启动,Web UI显示明显正常
在这里插入图片描述
在这里插入图片描述
发送测试数据

[root@node01 ~]# nc -lk 8888
abc bcd cdf

在这里插入图片描述

7.停止集群

[root@node01 flink]# bin/taskmanager.sh stop
Stopping taskexecutor daemon (pid: 14117) on host node01.
[root@node01 flink]# bin/standalone-job.sh stop
No standalonejob daemon (pid: 14813) is running anymore on node01.

8.总结:

在Flink中,Slot是Flink作业管理的资源基本单位,一个任务不一定会占用1个Slot。

当向Flink提交一个任务时,Flink会为该任务分配所需的Slot数量。通常取决于以下几个因素:

任务的并行度(Parallelism):如果任务的并行度很高,即需要同时执行多个子任务,则可能需要使用多个Slot。

TaskManager的资源:如果TaskManager的资源非常丰富,例如拥有多个CPU或GPU核心,则可以分配更多的Slot来运行任务。反之,则可能只能分配较少的Slot。

任务的资源需求:如果任务需要大量的内存或计算资源,则可能需要分配更多的Slot来满足需求。

个人在编写Flink程序时,设置了并行度,打包上传运行,由于JobManager的默认numberOfTaskSlots配置为1,Solt数量不够,故出现上述异常。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(3);

YARN运行模式

客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。

1.安装Hadoop

安装Hadoop参考:搭建Hadoop3.X完全分布式集群环境

2.配置环境变量

# Hadoop
export HADOOP_HOME=/usr/local/program/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

# Flink
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

3.启动Hadoop集群,包括HDFS和YARN

[root@node01 hadoop]# sbin/start-all.sh 

4.启动netcat

nc -lk 8888

会话模式

YARN的会话模式需要首先申请一个YARN会话(YARN Session)来启动Flink集群。

启动Hadoop集群

启动Hadoop集群,包括HDFS和YARN

[root@node01 hadoop]# sbin/start-all.sh 

申请一个YARN会话

查看yarn-session.sh命令帮助

[root@node01 flink]# bin/yarn-session.sh --help
Usage:
   Optional
     -at,--applicationType <arg>     Set a custom application type for the application on YARN
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Set to yarn-cluster to use YARN execution mode.
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

主要参数:

-d:分离模式,让Flink YARN客户端后台运行,即YARN session可以后台运行

-jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB

-nm(--name):配置在YARN UI界面上显示的任务名

-qu(--queue):指定YARN队列名

-tm(--taskManager):配置每个TaskManager所使用内存

执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群

[root@node01 flink]# bin/yarn-session.sh -nm flink-test
......
2023-06-12 22:03:01,088 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 22:03:01,428 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 22:03:01,457 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 22:03:01,476 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 22:03:01,480 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1686577483648_0001
2023-06-12 22:03:01,613 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1686577483648_0001
2023-06-12 22:03:01,613 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2023-06-12 22:03:01,615 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2023-06-12 22:03:06,406 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2023-06-12 22:03:06,407 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node03:37824 of application 'application_1686577483648_0001'.
JobManager Web Interface: http://node03:37824

查看Yarn、Flink

访问http://node01:8088/cluster查看yarn

在这里插入图片描述
YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID

2023-06-12 22:03:06,406 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2023-06-12 22:03:06,407 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node03:37824 of application 'application_1686577483648_0001'.
JobManager Web Interface: http://node03:37824

访问给出的地址:http://node03:37824
在这里插入图片描述

提交作业

可以通过Web UI或者命令行两种方式提交作业

a.通过Web UI提交作业
在这里插入图片描述

b.通过命令行提交作业

1.将Flink程序打Jar包并上传至集群

2.执行命令将任务提交到已经开启的Yarn-Session中运行

客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址。同时JobManager的地址在YARN Session的启动页面中可以找到。

[root@node01 ~]# /usr/local/program/flink/bin/flink run  -c cn.ybzy.demo.WordCountDemo  /root/demo-1.0-SNAPSHOT.jar

2023-06-12 22:21:08,468 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 22:21:08,468 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 22:21:08,824 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/usr/local/program/flink/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-12 22:21:08,860 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node01/192.168.1.100:8032
2023-06-12 22:21:08,986 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-06-12 22:21:09,049 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node03:37824 of application 'application_1686577483648_0001'.
Job has been submitted with JobID cdf1ff7b48472b3d7bc413a1ee9700e8

查看、测试作业

通过Flink的Web UI页面查看提交任务的运行情况,Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。

在这里插入图片描述

发送数据测试

[root@node01 program]# nc -lk 8888
abc bcd cdf

在这里插入图片描述

单作业模式

在YARN环境中,由于有了外部平台做资源调度,因此也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群。

提交作业

执行命令提交作业

[root@node01 flink]# bin/flink run -t yarn-per-job -c cn.ybzy.demo.WordCountDemo  /root/demo-1.0-SNAPSHOT.jar
.....
2023-06-12 22:46:26,984 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 22:46:27,009 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 22:46:27,029 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 22:46:27,034 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1686577483648_0004
2023-06-12 22:46:27,061 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1686577483648_0004
2023-06-12 22:46:27,061 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2023-06-12 22:46:27,063 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2023-06-12 22:46:31,086 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2023-06-12 22:46:31,087 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node02:42192 of application 'application_1686577483648_0004'.
Job has been submitted with JobID dfcb72ebf4a5f33d8e7967d6beaaf96d

注意:在使用-d参数启动时,启动过程中可能会出现如下异常:

Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
        at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:184)
        at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:208)
        at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)
        at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)
        at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
        at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)
        at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
        at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
        at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
        at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789)
        at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
        at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
        at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)

解决方案是在flink的/conf/flink-conf.yaml配置文件中设置

classloader.check-leaked-classloader: false

查看Yarn、Flink

访问http://node01:8088/cluster查看
在这里插入图片描述

打开Flink Web UI页面进行监控

a.访问启动日志中的JobManager地址,如:node02:42192

在这里插入图片描述
b.也可以在http://node01:8088/cluster页面中跳转到Flink的Web UI界面

在这里插入图片描述
在这里插入图片描述

查看、取消作业

[root@node01 flink]# bin/flink list -t yarn-per-job -Dyarn.application.id=application_1686577483648_0004

2023-06-12 22:55:43,755 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 22:55:43,755 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 22:55:43,864 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/usr/local/program/flink/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-12 22:55:43,927 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node01/192.168.1.100:8032
2023-06-12 22:55:44,087 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-06-12 22:55:44,159 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node02:42192 of application 'application_1686577483648_0004'.
Waiting for response...
------------------ Running/Restarting Jobs -------------------
12.06.2023 22:46:30 : dfcb72ebf4a5f33d8e7967d6beaaf96d : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

取消作业

# 如果取消作业,整个Flink集群会停掉
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX <jobId>
[root@node01 flink]# bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1686577483648_0004  dfcb72ebf4a5f33d8e7967d6beaaf96d

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2023-06-12 22:57:06,430 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 22:57:06,430 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Cancelling job dfcb72ebf4a5f33d8e7967d6beaaf96d.
2023-06-12 22:57:06,560 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/usr/local/program/flink/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-12 22:57:06,638 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node01/192.168.1.100:8032
2023-06-12 22:57:06,830 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-06-12 22:57:06,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node02:42192 of application 'application_1686577483648_0004'.
Cancelled job dfcb72ebf4a5f33d8e7967d6beaaf96d.

应用模式

应用模式同样非常简单,与单作业模式类似,直接执行flink run-application命令即可。

提交作业

执行命令提交作业

[root@node01 flink]# bin/flink run-application -t yarn-application -c cn.ybzy.demo.WordCountDemo  /root/demo-1.0-SNAPSHOT.jar

2023-06-12 23:01:00,465 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:01:00,751 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:01:00,799 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:01:00,817 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 23:01:00,821 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1686577483648_0005
2023-06-12 23:01:00,847 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1686577483648_0005
2023-06-12 23:01:00,848 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2023-06-12 23:01:00,849 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2023-06-12 23:01:05,123 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2023-06-12 23:01:05,124 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node03:40762 of application 'application_1686577483648_0005'.

在这里插入图片描述

查看、取消作业

查看作业

[root@node01 flink]# bin/flink list -t yarn-application -Dyarn.application.id=application_1686577483648_0005

2023-06-12 23:02:55,490 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 23:02:55,490 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 23:02:55,630 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/usr/local/program/flink/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-12 23:02:55,689 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node01/192.168.1.100:8032
2023-06-12 23:02:55,844 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-06-12 23:02:55,905 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node03:40762 of application 'application_1686577483648_0005'.
Waiting for response...
------------------ Running/Restarting Jobs -------------------
12.06.2023 23:01:05 : a66d8fa98d23210d36b5b005ff0a1c53 : Flink Streaming Job (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

取消作业

[root@node01 flink]# bin/flink cancel -t yarn-application -Dyarn.application.id=application_1686577483648_0005 a66d8fa98d23210d36b5b005ff0a1c53

2023-06-12 23:03:49,038 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-06-12 23:03:49,038 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
Cancelling job a66d8fa98d23210d36b5b005ff0a1c53.
2023-06-12 23:03:49,156 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/usr/local/program/flink/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-06-12 23:03:49,204 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node01/192.168.1.100:8032
2023-06-12 23:03:49,364 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-06-12 23:03:49,427 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node03:40762 of application 'application_1686577483648_0005'.
Cancelled job a66d8fa98d23210d36b5b005ff0a1c53.

从HDFS读取提交任务

通过yarn.provided.lib.dirs配置选项指定位置,将flink的依赖上传到远程

将Flink本身的依赖和用户jar预先上传到HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了

上传flink的lib和plugins到HDFS上

[root@node01 flink]#  hadoop fs -mkdir /flink-dist
[root@node01 flink]# hadoop fs -put lib/ /flink-dist
[root@node01 flink]# hadoop fs -put plugins/ /flink-dist

上传Flink开发程序jar包到HDFS

[root@node01 flink]# hadoop fs -mkdir /flink-jar
[root@node01 flink]# hadoop fs -put /root/demo-1.0-SNAPSHOT.jar /flink-jar

提交作业

[root@node01 flink]# bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://node01:9000/flink-dist"  -c cn.ybzy.demo.WordCountDemo hdfs://node01:9000/flink-jar/demo-1.0-SNAPSHOT.jar

2023-06-12 23:19:20,128 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cluster specification: ClusterSpecification{masterMemoryMB=2500, taskManagerMemoryMB=2200, slotsPerTaskManager=3}
2023-06-12 23:19:20,617 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:19:20,721 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:19:20,783 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 23:19:20,788 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1686577483648_0009
2023-06-12 23:19:20,816 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1686577483648_0009
2023-06-12 23:19:20,816 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2023-06-12 23:19:20,817 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2023-06-12 23:19:24,086 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2023-06-12 23:19:24,086 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node02:43653 of application 'application_1686577483648_0009'.

在这里插入图片描述
在这里插入图片描述

Yarn模式高可用

Standalone模式中, 同时启动多个Jobmanager, 一个为leader其他为standby, 当leader挂了, 其他的才会有一个成为leader

yarn的高可用是同时只启动一个Jobmanager, 当这个Jobmanager挂了之后, yarn会再次启动一个, 其实是利用的yarn的重试次数来实现的高可用

在yarn-site.xml中配置

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>The maximum number of application master execution attempts.  </description>
</property>

在flink-conf.yaml中配置

# 次数应该小于yarn-site.xml中配置重试次数
yarn.application-attempts: 3
high-availability.type: zookeeper
high-availability.storageDir: hdfs://node01:9000/flink/yarn/ha
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181
high-availability.zookeeper.path.root: /flink-yarn

启动yarn-session

[root@node01 flink]# bin/yarn-session.sh -nm flink-test

kill一个Jobmanager,查看复活情况

jps

kill -9 pid

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1032151.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Centos7安装mysql详细过程

官网 https://dev.mysql.com/downloads/repo/yum/1、下载安装包 cd /optwget http://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm2、安装MYSQL源 yum -y install mysql57-community-release-el7-11.noarch.rpm3、查看安装结果 yum repolist enabled | …

C语言实现:删除链表倒数第k个元素

然后这里讲一下删倒数第k个元素的算法思想 我这里很简单啊&#xff0c;你要删倒数第k个&#xff0c;那不就是正数len-k1个吗 举个例子&#xff1a; 比如12345 删倒数第3个&#xff0c;就是删正数5-313&#xff0c;也就是正数第3个 删倒数第2个&#xff0c;就是删正数5-214&…

什么是 Sepolia 测试网以及如何从 Faucet 获取 Sepolia ETH

如何通过水龙头领取 Sepolia 测试网 ETH 代币 Sepolia 测试网需要 Sepolia ETH 代币来测试即将推出的 dApp&#xff0c;然后再在以太坊主网上线。您可以从 Alchemy、QuickNode 和 Infura 水龙头领取 Sepolia 测试网 ETH。 要点 您可以从官方水龙头和其他一些独立水龙头获取 S…

快速学习Netty

Netty框架探索&#xff1a;助力高效网络编程 一、Netty是个啥&#xff1f;二、“Hello World”服务器端实现&#xff08;Server&#xff09;客户端实现&#xff08;Client&#xff09;思考&#x1f914; 三、Netty的核心组件EventLoopChannelChannelPipelineChannelHandlerByte…

请实现一个函数,输入一个整数数组和一个目标值,在数组中找到两个数使得它们的和等于目标值。

今日份AI出笔试题&#xff1a; AI Golang笔试中级题目https://bs.rongapi.cn/1702565828114780160/23 完整题目&#xff1a; 请实现一个函数&#xff0c;输入一个整数数组和一个目标值&#xff0c;在数组中找到两个数使得它们的和等于目标值。函数应该返回这两个数的索引&am…

转载—Linux下文件搜索、查找、查看命令

Linux下文件搜索、查找、查看命令 1、最强大的搜索命令&#xff1a;find 查找各种文件的命令  2、在文件资料中查找文件&#xff1a;locate   3、搜索命令所在的目录及别名信息&#xff1a;which  4、搜索命令所在的目录及帮助文档路径&#xff1a;whereis 5、在文件中搜寻…

回收站文件恢复,这3个方法必须掌握!

“我是一名电脑小白&#xff0c;听说电脑中删除的文件会被放入回收站中&#xff0c;那么回收站里的文件应该怎么恢复呢&#xff1f;如果回收站被删除了&#xff0c;文件还有机会找回来吗&#xff1f;” 回收站作为电脑中一个功能强大的工具&#xff0c;对我们找回误删的数据有很…

数据结构-----树和二叉树的定义与性质

目录 前言 思维导图 一.树 树的定义 二.二叉树 1.二叉树的定义 2.二叉树的形态&#xff08;图&#xff09; 3.二叉树的性质 三.满二叉树 1.定义 2.特点和性质 四.完全二叉树 1.定义 2.特点和性质 前言 今天开始我们就学习新的数据结构类型啦&#xff01;没错它就是…

Ribbon负载均衡器

两种&#xff1a; 1.1 集中式负载均衡&#xff0c;服务端负载均衡 硬件 nginx 轮询、负载、哈希、随机、权重 为什么要做负载均衡&#xff1f; 1.2 客户端负载均衡器 用客户端 负载均衡器 很多机制可以自定义 小知识&#xff1a;不想让别人调自己&#xff0c;只想用别人的…

2023-9-22 没有上司的舞会

题目链接&#xff1a;没有上司的舞会 #include <cstring> #include <iostream> #include <algorithm>using namespace std;const int N 6010;int n; int happy[N]; int h[N], e[N], ne[N], idx; bool has_father[N];// 两个状态&#xff0c;选该节点或不选该…

李航老师《统计学习方法》第2章阅读笔记

感知机&#xff08;perceptron&#xff09;时二类分类的线性分类模型&#xff0c;其输入为实例的特征向量&#xff0c;输出为实例的类别&#xff0c;取1和-1二值。感知机对应于输入空间&#xff08;特征空间&#xff09;中将实例划分为正负两类的分离超平面 想象一下在一个平面…

spring:实现初始化动态bean|获取对象型数组配置文件

0. 引言 近期因为要完成实现中间件的工具包组件&#xff0c;其中涉及要读取对象型的数组配置文件&#xff0c;并且还要将其加载为bean&#xff0c;因为使用了spring 4.3.25.RELEASE版本&#xff0c;很多springboot的相关特性无法支持&#xff0c;因此特此记录&#xff0c;以方…

阿里云服务器u1和经济型e实例有什么区别?

阿里云服务器经济型e实例和云服务器u1有什么区别&#xff1f;同CPU内存配置下云服务器u1性能更强&#xff0c;u1实例价格也要更贵一些。经济型e实例属于共享型云服务器&#xff0c;不同实例vCPU会争抢物理CPU资源&#xff0c;并导致高负载时计算性能波动不稳定&#xff0c;而云…

实时更新进度条:JavaScript中的定时器和异步编程技巧

前言 在Web开发中&#xff0c;有许多场景需要实时地更新页面上的进度&#xff0c;例如上传文件、数据处理等。本文将介绍如何利用JavaScript中的定时器和异步编程技巧来实现实时更新进度&#xff0c;并探讨一些其他解决方案。 处理进度实时更新&#xff1a; 利用异步编程实现实…

可转债长期持有策略——收益与风险、利息收入、案例研究

可转债投资策略——长期持有策略 一、收益与风险的权衡 长期持有可转债是一种投资策略&#xff0c;旨在实现稳定的收益&#xff0c;并在投资期限内从可转债中获得利益。在采用这种策略时&#xff0c;投资者需要平衡可转债的收益和风险&#xff0c;以满足其财务目标。以下是关…

尝试访问启动磁盘设置时出错怎么办?

当出现“尝试访问启动磁盘设置时出错”这样的错误提示&#xff0c;而且启动转换控制面板打不开了时&#xff0c;是无法开启触摸板功能的。我们可以使用以下方法来解决问题。 1. 在Windows桌面左下角搜索框输入“计算机管理”后点击“打开”。 2. 点击“本地用户与组”&#xff…

树结构数据在table中回显 treeselect disabled

<el-table-column label"产业认定" align"center" prop"industryIdentification"><template slot-scope"scope"><treeselectv-if"scope.row.industryIdentification"v-model"scope.row.industryIdentif…

Zookeeper系统模型_客户端命令行

创建 创建ZK节点 语法结构&#xff1a; create [-s] [-e] path data acl 参数&#xff1a; -s&#xff1a;顺序节点-e&#xff1a;临时节点 默认情况下&#xff0c;不添加-s或者-e参数的&#xff0c;创建的是持久节点。 示例&#xff1a; [zk: localhost:2181(CONNECTED) …

Spring Cloud Alibaba Gateway全局token过滤、局部过滤访问时间超过50ms日志提示

文章目录 Spring Cloud Alibaba Gateway验证token在前篇的基础上加入依赖在filter包中创建tokenFilter Spring Cloud Alibaba Gateway局部过滤1.继承AbstractGatewayFilterFactory2.仿照AddRequestHeaderGatewayFilterFactory Spring Cloud Alibaba Gateway验证token 基础搭建…

linux上gitlab备份与还原

三 Gitlab备份 1.gitlab安装 1.1 添加镜像地址 添加镜像地址的目的是为了提高国内用户软件下载的速度&#xff0c;编辑(新建)文件gitlab-ce.repo&#xff0c;指令&#xff1a; vi /etc/yum.repos.d/gitlab-ce.repo复制 输入&#xff1a; [gitlab-ce] namegitlab-ce # 清华…