[Flink04] Flink部署实践

news2025/1/19 14:25:44

    Flink部署支持三种模式:本地部署、Standalone部署、Flink on Yarn部署。

    独立(Standalone)模式由Flink自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。

    Flink on Yarn模式,把资源管理交给Yarn实现,计算机资源统一由Haoop Yarn管理,生产环境测试。

     Yarn(yet another resource negotiator)是一个通用分布式资源管理系统和调度平台,为上层应用提供统一的资源管理和调度。在集群利用率、资源统一管理和数据共享等方面带来巨大好处.

1 基础环境

1.1 服务器环境

操作系统环境为CentOS 8。

1)配置规划集群节点间免密访问

参考相关章节或附录的指南配置,可以有效提供部署和管理效率。

2)配置JAVA环境

参考相关章节或附录的指南配置。

3)配置HDFS存储集群

如果需要与HDFS存储集群集成,则需要提前完成配置。

参考相关章节或附录的指南配置,并且Flink规划集群或设备可网络访问。

4)配置zookeeper集群

如果需要部署Standalone模式,则需要提前完成配置。参考相关章节或附录的指南配置。

5)配置 Yarn集群

如果需要部署Flink on Yarn模式,则需要提前完成配置。参考相关章节或附录的指南配置。

1.2 Flink软件基础配置

在本实践案例中,采用的Flink软件包版本 1.14.5,Hadoop的版本为3.2,Spark软件的根目录(SPARK_HOME)为/opt/flink/flink。

源码下载可以通过官方源和国内源两种方式下载,官方源再国外,下载速度慢,国内源采用清华大学的源,速度相对较快,但只保留最新版本。

Apache官方:https://archive.apache.org/dist/flink/

清华大学:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/

下载软件包:

# wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz
# tar -zxvf flink-1.14.5-bin-scala_2.12.tgz

解压软件包:

# tar -zxvf flink-1.14.5-bin-scala_2.12.tgz
# ln -s flink-1.14.5 flink
# ls flink/
bin  conf  examples  lib  LICENSE  licenses  log  NOTICE  opt  plugins  README.txt

本地模式

最简单的启动方式,其实是不搭建集群,直接本地模式启动。

2.1 配置部署

在本地模式下,不需要启动任何的进程,仅仅是使用本地线程来模拟 Flink 的进程,适用于测试开发调试等,不用更改任何配置信息,只需要保证 JDK8 安装正常即可。

1)启动命令

# /usr/local/flink/bin/start-cluster.sh

2)关闭命令

# /usr/local/flink/bin/stop-cluster.sh

2.2 测试验证

1)Flink启动

# /usr/local/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host lake01.
Starting taskexecutor daemon on host lake01

2)访问验证

通过<local主机地址>:8081打开

3)jps查看

# jps
3968 Jps
1941 NameNode
3685 TaskManagerRunner
2790 NodeManager
3418 StandaloneSessionClusterEntrypoint
2159 DataNode

4)执行官方用例WordCount

通过执行官方示例,可以看到flink任务运行成功

# /opt/flink/flink/bin/flink run /opt/flink/flink/examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID d6987ed5b263fc5e297d5be1e28b465a
Program execution finished
Job with JobID d6987ed5b263fc5e297d5be1e28b465a has finished.
Job Runtime: 373 ms
Accumulator Results:
- 843a1470cb2c3e3169dfb25bcda7369d (java.util.ArrayList) [170 elements]
(a,5)
(action,1)
(after,1)
(against,1)
……

观察Flink WebUI,如下图

2.3 问题-提示无法连接Yarn服务

一、问题描述

从flink on yarn模式切换为本地模式,执行start-cluster.sh提示如下错误:

2022-10-13 20:38:05,757 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/opt/flink/flink-1.14.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2022-10-13 20:38:05,946 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at **lake02/******:8032
2022-10-13 20:38:06,017 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2022-10-13 20:38:07,043 INFO  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: **lake02/******:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
……
2022-10-13 20:38:16,052 WARN  org.apache.hadoop.ipc.Client                                 [] - Failed to connect to server: **lake02/******:8032: retries get failed due to exceeded maximum allowed retries number: 10
java.net.ConnectException: Connection refused

二、问题分析

通过jps发现缺少TaskManagerRunner。

# jps
1902910 StandaloneSessionClusterEntrypoint
1861160 RunJar
1903349 SqlClient
1861350 RunJar
128746 NameNode
200744 QuorumPeerMain
1865793 Kafka
1911504 Jps

发现workers和masters文件均为空

三、解决方案

恢复masters和workers的内容

# cat masters
localhost:8081

# cat workers
localhost

3 Standalone模式

3.1 概述

     Standalone模式是最简单的一种集群模式,不需要Yarn、mesos等资源调度平台,自带集群,资源管理由flink集群管理,开发环境测试使用。

    Standalone模式是一种主从模式,主要有两个组件构成分别是JobManager(Master)和TaskManager(Slave)。

当一个应用提交执行时,Flink的各个组件是如何交互协作的:

1)App程序通过rest接口提交给Dispatcher(rest接口是跨平台,并且可以直接穿过防火墙,不需考虑拦截)。

2)Dispatcher把JobManager进程启动,把应用交给JobManager。

3)JobManager拿到应用后,向ResourceManager申请资源(slots),ResouceManager会启动对应的TaskManager进程,TaskManager空闲的slots会向ResourceManager注册。

4)ResourceManager会根据JobManager申请的资源数量,向TaskManager发出指令(这些slots由你提供给JobManager)。

5)接着,TaskManager可以直接和JobManager通信了(它们之间会有心跳包的连接),TaskManager向JobManager提供slots,JobManager向TaskManager分配在slots中执行的任务。

6)最后,在执行任务过程中,不同的TaskManager会有数据之间的交换。

3.2 配置部署

一、节点规划

本次通过虚拟机部署,采用5个节点 ,每一个节点提供一块500GB的硬盘。

Hostname

IP

用途

说明

labnode01

192.168.80.131

master, jobmanager

OScentos8.0

labnode02

192.168.80.132

slavetaskmanager

OScentos8.0

labnode03

192.168.80.133

slavetaskmanager

OScentos8.0

二、修改配置文件

1)修改flink-conf.yaml配置文件:

##配置master节点ip
jobmanager.rpc.address: 192.168.1.100

##配置每个节点的可用slot,1 核CPU对应 1 slot
##the number of available CPUs per machine
taskmanager.numberOfTaskSlots: 30

##默认并行度 1 slot资源
parallelism.default: 1

2)修改master和work配置文件

Master文件

# cat masters
labnode01:8081

workers文件

labnode02
labnode03

将以上文件分发各节点对应文件夹。

三、集群启动和关闭

在master节点上执行此脚本,就可以启动集群,前提要保证master节点到slaver节点可以免密登录。

因为它的启动过程是:先在master节点启动jobmanager进程,然后ssh到各slaver节点启动taskmanager进程。

启动集群

# /usr/local/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host lake02.
Starting taskexecutor daemon on host lake03.
Starting taskexecutor daemon on host lake04.
Starting taskexecutor daemon on host slake05.

停止集群:

# /usr/local/flink/bin/stop-cluster.sh

3.3 运行验证

1)启动Flink

# /usr/local/flink/bin/start-cluster.sh

2)访问flink webUI

3)执行官方用例WordCount

执行命令:

# /usr/local/flink/bin/flink run /usr/local/flink/examples/batch/WordCount.jar
……
- f27663f6191a378629eea720a988cc53 (java.util.ArrayList) [170 elements]


(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
……

查看Flink WebUI

4 Flink On Yarn模式

4.1 概述

    独立(Standalone)模式由 Flink 自身提供资源,无需其他框架,这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但Flink 是大数据计算框架,不是资源调度框架,这并不是它的强项;所以还是应该让专业的框架做专业的事,和其他资源调度框架集成更靠谱。

    在目前大数据生态中,国内应用最为广泛的资源管理平台是Yarn。Yarn(yet another resource negotiator)是一个通用分布式资源管理系统和调度平台,为上层应用提供统一的资源管理和调度。在集群利用率、资源统一管理和数据共享等方面带来巨大好处。

    Flink on Yarn 企业生产环境运行Flink任务大多数的选择。

    在强大的Yarn平台上,Flink是如何在Yarn上集成部署的,其过程是:客户端把Flink 应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的 NodeManager 申请容器。在这些容器上,Flink会部署JobManager 和 TaskManager 的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的 Slot 数量动态分配TaskManager资源。

https://upload-images.jianshu.io/upload_images/6178553-c51c1c9b2f695e5b.png

1)提交App之前,先上传Flink的Jar包和配置到HDFS,以便JobManager和TaskManager共享HDFS的数据。

2)客户端向ResourceManager提交Job,ResouceManager接到请求后,先分配container资源,然后通知NodeManager启动ApplicationMaster。

3)ApplicationMaster会加载HDFS的配置,启动对应的JobManager,然后JobManager会分析当前的作业图,将它转化成执行图(包含了所有可以并发执行的任务),从而知道当前需要的具体资源。

4)接着,JobManager会向ResourceManager申请资源,ResouceManager接到请求后,继续分配container资源,然后通知ApplictaionMaster启动更多的TaskManager(先分配好container资源,再启动TaskManager)。container在启动TaskManager时也会从HDFS加载数据。

5)最后,TaskManager启动后,会向JobManager发送心跳包。JobManager向TaskManager分配任务。

Flink提供了yarn上运行的3模式,分别为Session-Cluster,Application Mode和Per-Job-Cluster模式。

4.2 配置部署

一、节点规划

本次通过虚拟机部署,采用5个节点 ,每一个节点提供一块500GB的硬盘。

Hostname

IP

用途

说明

labnode01

192.168.80.131

master, jobmanager

labnode02

192.168.80.132

slavetaskmanager

labnode03

192.168.80.133

slavetaskmanager

二、Yarn环境配置

在Yarn-site.xml中配置关闭内存校验。

Yarn-site.xml是hadoop中/etc/hadoop下的配置文件,否则flink任务可能会因为内存超标而被Yarn集群主动杀死。

<!-- Mem Check Start -->
<!-- 设置不检查虚拟内存的值,不然内存不够会报错 -->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
    <!-- Mem Check end -->

将修改后的配置文件分发到各节点,然后重启Yarn集群。

三、将Flink软件和配置文件分发到Flink集群规划节点

将Flink的配置文件conf/flink-conf.yaml恢复为初始状态。

4.3 Session-Cluster模式(yarn-session)

4.3.1 概述

    Session-Cluster模式需要先启动Flink集群,向Yarn申请资源。以后提交任务都向这里提交。这个Flink集群会常驻在yarn集群中,除非手工停止。

    在向Flink集群提交Job的时候, 如果资源被用完了,则新的Job不能正常提交.

    缺点: 如果提交的作业中有长时间执行的大作业, 占用了该Flink集群的所有资源, 则后续无法提交新的job.

    所以, Session-Cluster适合那些需要频繁提交的多个小Job, 并且执行时间都不长的Job。

会话模式有两种操作模式:

  1. 附加模式(默认):yarn-session.sh客户端将 Flink 集群提交给 YARN,但客户端一直在运行,跟踪集群的状态。如果集群失败,客户端将显示错误。如果客户端被终止,它也会发出集群关闭的信号。
  2. 分离模式(-d或--detached):yarn-session.sh客户端将 Flink 集群提交给 YARN,然后客户端返回。需要再次调用客户端或 YARN 工具来停止 Flink 集群。

4.3.2 常用命令

1)yarn-session.sh参数说明

使用bin/yarn-session.sh --help 查看可用参数:

Usage:
   Optional
     -at,--applicationType <arg>     Set a custom application type for the application on Yarn
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running Yarn session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Set to Yarn-cluster to use Yarn execution mode.
     -nl,--nodeLabel <arg>           Specify Yarn node label for the Yarn application
     -nm,--name <arg>                Set a custom name for the application on Yarn
     -q,--query                      Display available Yarn resources (memory, cores)
     -qu,--queue <arg>               Specify Yarn queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--Yarndetached              If present, runs the job in detached mode (deprecated; use non-Yarn specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

2)启动命令

使用Yarn-session.sh命令申请资源初始化一个Flink集群,命令格式如下:

bin/yarn-session.sh <参数>

如示例:

# /opt/flink/flink/bin/yarn-session.sh -d

3)关闭Flink

停止 flink on Yarn 会话模式中的flink集群

yarn application -kill <appid>

echo "stop" | ./bin/flink -id <appid>

如示例:

# echo "stop" | /opt/flink/flink/bin/flink -id application_1661480406159_0025

4.3.3 运行验证

1)启动Flink

# /opt/flink/flink/bin/yarn-session.sh -d

执行结果:

访问Yarn WebUI:

访问Flink WebUI,http://lake04:38347

2)运行官方用例WordCount

# /opt/flink/flink/bin/flink run /opt/flink/flink/examples/batch/WordCount.jar

命令行执行结果:

Flink WebUI的首页:

Flink WebUI中的结果:

3)关闭

执行命令关闭Flink

# echo "stop" | /opt/flink/flink/bin/flink -id application_1661480406159_0025

4.4 Per-Job-Cluster模式(yarn-cluster)

4.4.1 概述

一个Job会对应一个Flink集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。

每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。

4.4.2 常用命令

1)参数说明

flink run -m yarn-cluster --help;可用参数:

该模式下不需要先启动 yarn-session,确保 Hadoop 集群是健康的情况下直接提交 Job 命令:

bin/flink -m yarn-cluster <参数> <jar file>

如示例:

# /opt/flink/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar

4.4.3 运行验证

1)启动并执行官方用例WordCount

# /opt/flink/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar

执行结果:

访问Yarn WebUI:

4.4.4 优缺点

优点:随到随用,只有任务需要运行时才会开启flink集群;运行完就关闭释放资源,资源利用更合理;

缺点:对于小作业不太友好,

适用场景:适合大作业,长时间运行的大作业。

4.5 Application Mode

4.5.1 概述

    Application Mode会在Yarn上启动集群, 应用jar包的main函数(用户类的main函数)将会在JobManager上执行. 只要应用程序执行结束, Flink集群会马上被关闭. 也可以手动停止集群.

与Per-Job-Cluster的区别: 就是Application Mode下, 用户的main函数式在集群中执行的

官方建议:

出于生产的需求, 我们建议使用Per-job or Application Mode,因为他们给应用提供了更好的隔离!

4.5.2 运行验证

启动

# /opt/flink/flink/bin/flink run-application -t yarn-application /opt/flink/flink/examples/batch/WordCount.jar

执行结果:

访问Yarn WebUI:

4.5.3 常见问题

任务提示 Could not allocate the required slot within slot request tim

一、错误日志

Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
        at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResource$8(DefaultScheduler.java:539)
        ... 37 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        ... 35 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
        at org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
        ... 28 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 300000 ms
        ... 29 more

解决方案

将flink的配置文件conf/flink-conf.yaml恢复为初始状态,重新启动flink的Yarn session。

问题FLINK Could not get job jar and dependencies from JAR file: JAR file does not exist:

一、问题描述

使用flink客户端将执行flink提交到Yarn,输入-yjm参数提示错误

# /opt/flink/flink/bin/flink run -m Yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /opt/flink/flink/examples/batch/WordCount.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink/flink-1.14.5/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-3.2.4/share/hadoop/common/lib/slf4j-reload4j-1.7.35.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Could not get job jar and dependencies from JAR file: JAR file does not exist: -yn

二、问题分析

flink1.8版本之后已弃用该参数,ResourceManager将自动启动所需的尽可能多的容器,以满足作业请求的并行性。

三、解决方案

去掉即可

Deployment took more than 60 seconds. Please check if the requested resources are

一、问题描述

日志信息如下:

INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

截图如下:

二、解决方案

配置yarn-site.xml

  <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>1024</value>
    </property>
    <property>
        <name>yarn.scheduler.maximum-allocation-mb</name>
        <value>102400</value>
    </property>
    <property>
        <name>yarn.nodemanager.resource.cpu-vcores</name>
        <value>32</value>
    </property>
   <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>51200</value>
    </property>

Flink读取Hudi表时报错lassNotFoundException: *mapred.FileInputFormat

一、问题现象:

执行“select * from t1;”报错,报错信息如下:

Flink SQL> select * from t1;
……
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.mapred.FileInputFormat

二、原因分析

mapred代表的是hadoop旧API,而mapreduce代表的是hadoop新的API

三、解决办法

解决办法为复制集群的hadoop-mapreduce-client-core.jar到Flink/lib中。

读取数据表失败NoSuchMethodError: *Preconditions.checkArgument

一、问题描述

创建表格式如下

CREATE TABLE t2(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://10.101.12.140:9000/datas/flink-hudi/test0907/t1',
'table.type' = 'MERGE_ON_READ',
'read.tasks' = '1',
'read.streaming.enabled' = 'true',
'read.streaming.start-commit' = '20210316134557',
'read.streaming.check-interval' = '4'
);
INSERT INTO t2 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

执行成功,然后执行表内容查询

select * from t2;

报出如下错误:

Flink SQL> select * from t2;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputForm
Flink SQL>

二、原因分析

mapred代表的是hadoop旧API,而mapreduce代表的是hadoop新的API

三、解决办法

解决办法为复制集群的hive-exec-3.1.3.jar到各节点的flink/lib中。(注意hive-exec和hadoop版本的匹配)

启动失败NoSuchMethodError: *Preconditions.checkArgument

一、错误描述

通过bin/yarn-session.sh -d启动yarn-session失败,报错信息如下:

The program finished with the following exception:

org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.reflect.InvocationTargetException
        at org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl.getClient(RpcClientFactoryPBImpl.java:81)
        at org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC.getProxy(HadoopYarnProtoRPC.java:48)
        ……
        ... 21 more
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
        at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
        ……
        at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.<init>(ApplicationClientProtocolPBClientImpl.java:209)
        ... 26 more

二、错误原因

Preconditions是guava下的工具类,hudi的源码依赖了不同的项目,这些项目使用了不同的guava版本,所报错误是由于运行时guava版本过旧,没有相应的方法。

三、解决方案

在HADOOP_HOME下查询hadoop使用的guava版本,将其拷贝到FLINK_HOME/lib下:

# find ./ -name guava*
./share/hadoop/common/lib/guava-27.0-jre.jar
./share/hadoop/hdfs/lib/guava-27.0-jre.jar

将文件复制到所有yarn集群的FLINK_HOME/lib下。

重新执行bin/yarn-session.sh -d,成功。

通过yarn启动flink失败-连接yarn失败

通过yarn模式启动flink,报出如下异常,关键信息如下:

# /usr/local/flink/bin/yarn-session.sh -d
….
2022-10-27 11:04:30,332 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at /0.0.0.0:8032
…
2022-10-27 11:04:41,153 WARN  org.apache.hadoop.ipc.Client                                 [] - Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 9 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
2022-10-27 11:04:41,153 WARN  org.apache.hadoop.ipc.Client                                 [] - Failed to connect to server: 0.0.0.0/0.0.0.0:8032: retries get failed due to exceeded maximum allowed retries number: 10
java.net.ConnectException: Connection refused
……

原因分析:

1)检查是否启动hadoop集群, 如果没有启动, 是无法连接到hadoop的yarn。

2)flink运行于yarn上,flink要能找到hadoop配置,因为要连接到yarn的resourcemanager和hdfs。

如果正常启动还无法连接yarn, 可以查看一下hadoop的环境变量是否配置好。

在本实例中,时因为无法获取HADOOP_CONF_DIR的配置信息导致问题发生。

二、解决方案

设置HADOOP_CONF_DIR环境变量,并使之生效。

# cat /etc/profile | grep HADOOP_CONF_DIR
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
# source /etc/profile

然后重新启动flink。

BTW:如果已经设置HADOOP_CONF_DIR环境变量,可能由于某种原因HADOOP_CONF_DIR环境变量没有生效,这个原因有很多。

5 参考资料

[01] https://blog.csdn.net/Vector97/article/details/117398947

[02] https://www.jianshu.com/p/8c9c897ea72a

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1455416.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Shiro-04-shiro 详细架构

详细架构 下图显示了Shiro的核心架构概念&#xff0c;并简要概述了每个架构&#xff1a; 下面我们对除了核心组件的部分做一下简单的介绍&#xff1a; Authentication&#xff08;身份验证&#xff09; 身份验证是验证用户身份的过程。 也就是说&#xff0c;当用户通过应用…

相机图像质量研究(32)常见问题总结:图像处理对成像的影响--振铃效应

系列文章目录 相机图像质量研究(1)Camera成像流程介绍 相机图像质量研究(2)ISP专用平台调优介绍 相机图像质量研究(3)图像质量测试介绍 相机图像质量研究(4)常见问题总结&#xff1a;光学结构对成像的影响--焦距 相机图像质量研究(5)常见问题总结&#xff1a;光学结构对成…

VMware Ubuntu 无网卡驱动怎么办?

网上拷了一个VMware的Ubuntu系统&#xff0c;结果网络没法连接。 ifconfig命令一看&#xff0c;只有lo的本地回环网卡&#xff0c;没有真实网卡驱动。 很简单&#xff0c;一下三步就可以搞定。 首先&#xff0c;iplink show查看本地所有网卡。 我这里出现了一个ens33的网卡…

深度学习-分类任务---经典网络

文章目录 经典网络1 LeNet51.1 模型结构1.2 模型结构1.3 模型特性 2 AlexNet2.1 模型介绍2.2 模型结构2.3 模型解读2.4 模型特性 3 可视化ZFNet-转置卷积3.1 基本的思想及其过程3.2 卷积与转置卷积3.3 卷积可视化3.4 ZFNet和AlexNet比较 4 VGGNet4.1 模型结构4.2 模型特点 5 Ne…

21-k8s中pod的调度-污点和污点容忍

一、污点的概念 所谓的污点&#xff0c;就是给k8s集群中的节点设置的污点&#xff0c;通过设置污点&#xff0c;来规划资源创建时所在的节点&#xff1b; 污点的类型解释说明PreferNoshedule 节点设置这个五点类型后&#xff1b; 表示&#xff0c;该节点接受调度&#xff0c;但…

NLP深入学习:《A Survey of Large Language Models》详细学习(七)

文章目录 1. 前言2. 应用场景2.1 LLMs 对研究界的应用2.1.1 经典 NLP 任务2.1.2 信息检索2.1.3 推荐系统2.1.4 多模态大语言模型2.1.5 知识图谱增强型 LLM2.1.6 基于 LLM 的智能体2.1.7 用于评估 2.2 特定领域的应用 3. 参考 1. 前言 这是《A Survey of Large Language Models…

51_蓝桥杯_数码管静态显示

一 电路 二 数码管静态显示工作原理 三 代码 代码1 实现第一个数码管显示数字6的功能 #include "reg52.h"unsigned char code SMG_Duanma[18] {0xc0,0xf9.0xa4,0x99,0x92,0x82,0xf8,0x80,0x90,0x88,0x80,0xc0,0x86,0x8e,0xbf,0x7f};void Delay(unsignde int t) {wh…

在ubuntu中制作ubuntu的U盘启动盘

概要&#xff1a; 本篇演示在ubuntu22.04中制作ubuntu22.04的U盘启动盘 一、下载ubuntu22.04的iso文件 访问ubuntu官网https://ubuntu.com自行下载ubuntu官网 二、制作U盘启动盘 打开系统自带软件Startup Disk Creator 软件会自动检测iso文件和U盘 点击Make Startup Disk…

从源代码安装 rocSOLVER 并 调试 rocSOLVER 在 Ubuntu 22.04 平台

0, 下载并编译 rocBLAS 的调试版本 sudo apt install python3.10-venv sudo apt install libmsgpack-dev sudo pip install joblibgit clone --recursive https://github.com/ROCm/rocBLAS.git $ cd rocBLAS/ $ ./install.sh -i -g构建时间也不短 1&#xff0c;下载并编译 roc…

17.Qt 单选框相关操作

目录 前言&#xff1a; 技能&#xff1a; 内容&#xff1a; 1. 布置界面 2.管理 3.槽函数 参考&#xff1a; 前言&#xff1a; 组合选择只能选择一个的单选框实现&#xff0c;以及管理组合单选框 技能&#xff1a; <QRadioButton> <QButtonGroup> 内容&…

openGauss学习笔记-223 openGauss性能调优-系统调优-数据库系统参数调优-数据库内存参数调优

文章目录 openGauss学习笔记-223 openGauss性能调优-系统调优-数据库系统参数调优-数据库内存参数调优223.1 逻辑内存管理参数223.2 执行算子是否下盘的参数 openGauss学习笔记-223 openGauss性能调优-系统调优-数据库系统参数调优-数据库内存参数调优 数据库的复杂查询语句性…

RIP协议详解

​RIP是最早的动态路由协议&#xff0c;虽然已经过时并且很少使用&#xff0c;但是可以通过学习RIP并且和ospf等现在正在使用的路由协议对比&#xff0c;了解其工作原理和过时原因&#xff0c;具有很强的学习性。 一、RIP协议简介 RIP&#xff08;Routing Information Protoc…

【数据仓库】主题域和数据域

数据域与主题域区别 https://www.cnblogs.com/datadance/p/16898254.html 数据域是自下而上&#xff0c;以业务数据视角来划分数据&#xff0c;一般进行完业务系统数据调研之后就可以进行数据域的划分。针对公共明细层&#xff08;DWD&#xff09;进行主题划分。主题域则自上而…

spark sql官网优化指南

两句话概括 缓存数据调整参数 缓存数据 把数据缓存到内存,spark sql能够只扫描需要列并且会自动压缩数据,占用最小的内存和减小GC压力。这无需多言,内存远远要快于磁盘,spark效率比hive高这个就是一个主要原因。 缓存数据代码spark.catalog.cacheTable("tableName&qu…

【ArcGIS微课1000例】0103:导出点、线、面要素的折点坐标值

点要素对应的是一个或者若干个坐标,线要素对应的是对个坐标值对应的点连起来,面要素是多个坐标值对应的点连起来构成的封闭多边形。本文讲述导出点的坐标值。 文章目录 一、点要素坐标导出1. 计算点坐标2. 导出点坐标二、线要素坐标导出1. 生成线要素折点2. 计算折点坐标3. 导…

腾讯云4核8G12M服务器支持多少人在线?

4核8G服务器支持多少人同时在线访问&#xff1f;阿腾云的4核8G服务器可以支持20个访客同时访问&#xff0c;关于4核8G服务器承载量并发数qps计算测评&#xff0c;云服务器上运行程序效率不同支持人数在线人数不同&#xff0c;公网带宽也是影响4核8G服务器并发数的一大因素&…

Github:分享一款开源的跨平台多功能远程控制和监控工具Spark

目录 1、设备列表及操作列表 2、登录机器 Shell 执行命令 3、桌面远程访问 4、远程设备的进程管理 5、远程设备文件管理 今天要给大家推荐一个开源的跨平台多功能远程控制和监控工具&#xff1a;Spark。 目前该项目处于不维护状态&#xff0c;大家可以自己根据需要进行扩…

Eclipse - Code Templates

Eclipse - Code Templates References Window -> Preferences -> C/C -> Code Style -> Code Templates 配置默认代码模板&#xff0c;可以点击 Export 将自己配置好的 Code Templates 导出去&#xff0c;以便备份和共享。 References [1] Yongqiang Cheng, https…

【软考高级信息系统项目管理师--第十一章:项目成本管理】

&#x1f680; 作者 &#xff1a;“码上有前” &#x1f680; 文章简介 &#xff1a;软考高级–信息系统项目管理师 &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; 第十一章&#xff1a;项目成本管理 成本管理的过程规划成本管理估算成本制定预算控…

【JavaEE】_HTTP请求首行

目录 1. URL 2. 方法 2.1 GET方法 2.2 POST方法 2.3 GET与POST的区别 2.4 低频使用方法 1. URL 在mysql JDBC中已经提到过URL的相关概念&#xff1a; 如需查看有关JDBC更多内容&#xff0c;原文链接如下&#xff1a; 【MySQL】_JDBC编程-CSDN博客 URL用于描述某个资源…