Flink03: 集群安装部署

news2024/11/16 5:35:25

Flink支持多种安装部署方式

  • Standalone
  • ON YARN
  • Mesos、Kubernetes、AWS…

这些安装方式我们主要讲一下standalone和on yarn。
如果是一个独立环境的话,可能会用到standalone集群模式。
在生产环境下一般还是用on yarn 这种模式比较多,因为这样可以综合利用集群资源。和我们之前讲的
spark on yarn是一样的效果,这个时候我们的Hadoop集群上面既可以运行MapReduce任务,Spark任务,还可以运行Flink任务,一举三得。

一、Standalone

1. 集群规划

依赖环境
jdk1.8及以上【配置JAVA_HOME环境变量】
ssh免密码登录
在这我们使用bigdata01、02、03这三台机器,这几台机器的基础环境都是ok的,可以直接使用。
集群规划如下:
master:bigdata01
slave:bigdata02、bigdata03

2. 下载flink安装包

 注意:由于目前Flink各个版本之间差异比较大,属于快速迭代阶段,所以在这我们就使用最新版本了,使用Flink1.11.1版本。

 3. 安装步骤

1.安装包下载好以后上传到bigdata01的/data/soft目录中

[root@bigdata01 soft]# ll flink-1.11.1-bin-scala_2.12.tgz
-rw-r--r--. 1 root root 312224884 Aug 5 2026 flink-1.11.1-bin-scala_2.12.
tgz

2. 解压

[root@bigdata01 soft]# tar -zxvf flink-1.11.1-bin-scala_2.12.tgz

3.修改配置

[root@bigdata01 soft]# cd flink-1.11.1
[root@bigdata01 flink-1.11.1]# cd conf/
[root@bigdata01 conf]# vi flink-conf.yaml
......
jobmanager.rpc.address: bigdata01
......
[root@bigdata01 conf]# vi masters
bigdata01:8081
[root@bigdata01 conf]# vi workers
bigdata02
bigdata03

 3:将修改完配置的flink目录拷贝到其它两个从节点

[root@bigdata01 soft]# scp -rq flink-1.11.1 bigdata02:/data/soft/
[root@bigdata01 soft]# scp -rq flink-1.11.1 bigdata03:/data/soft/

4:启动Flink集群

[root@bigdata01 soft]# cd flink-1.11.1
[root@bigdata01 flink-1.11.1]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host bigdata01.
Starting taskexecutor daemon on host bigdata02.
Starting taskexecutor daemon on host bigdata03.

5:验证一下进程

在bigdata01上执行jps

[root@bigdata01 flink-1.11.1]# jps
3986 StandaloneSessionClusterEntrypoint

在bigdata02上执行jps

[root@bigdata02 ~]# jps
2149 TaskManagerRunner

在bigdata03上执行jps

[root@bigdata03 ~]# jps
2150 TaskManagerRunner

6:访问Flink的web界面
        http://bigdata01:8081
7:停止集群,在主节点上执行停止集群脚本

[root@bigdata01 flink-1.11.1]# bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 2149) on host bigdata02.
Stopping taskexecutor daemon (pid: 2150) on host bigdata03.
Stopping standalonesession daemon (pid: 3986) on host bigdata01.

 4. Standalone集群核心参数

 

 5. slot vs parallelism

1:slot是静态的概念,是指taskmanager具有的并发执行能力
2:parallelism是动态的概念,是指程序运行时实际使用的并发能力
3:设置合适的parallelism能提高程序计算效率,太多了和太少了都不好

二、Flink ON YARN

Flink ON YARN模式就是使用客户端的方式,直接向Hadoop集群提交任务即可。不需要单独启动Flink进程。
注意:
1:Flink ON YARN 模式依赖Hadoop 2.4.1及以上版本
2:Flink ON YARN支持两种使用方式

1. Flink ON YARN第一种方式

下面来看一下第一种方式
第一步:在集群中初始化一个长时间运行的Flink集群
使用yarn-session.sh脚本
第二步:使用flink run命令向Flink集群中提交任务

注意:使用flink on yarn需要确保hadoop集群已经启动成功

 1. 首先在bigdata04机器上安装一个Flink客户端,其实就是把Flink的安装包上传上去解压即可,不需要启动

[root@bigdata04 soft]# tar -zxvf flink-1.11.1-bin-scala_2.12.tgz

2. 接下来在执行 yarn-session.sh 脚本之前我们需要先设置 HADOOP_CLASSPATH 这个环境变量,否则,执行yarn-session.sh 是会报错的,提示找不到hadoop的一些依赖。

[root@bigdata01 flink-1.11.1]# bin/yarn-session.sh -jm 1024m -tm 1024m -d
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
        at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
        at java.lang.Class.getMethod0(Class.java:3018)
        at java.lang.Class.getMethod(Class.java:1784)
        at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
        at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 7 more

在 /etc/profile 中配置 HADOOP_CLASSPATH

[root@bigdata04 flink-1.11.1]# vi /etc/profile
export JAVA_HOME=/data/soft/jdk1.8
export HADOOP_HOME=/data/soft/hadoop-3.2.0
export HIVE_HOME=/data/soft/apache-hive-3.1.2-bin
export SPARK_HOME=/data/soft/spark-2.4.3-bin-hadoop2.7
export SQOOP_HOME=/data/soft/sqoop-1.4.7.bin__hadoop-2.6.0
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
export PATH=.:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HIVE_HOME/bin:$SPARK_HO
ME/bin:$SQOOP_HOME/bin:$PATH

刷新配置

[root@bigdata01 flink-1.11.1]# source /etc/profile

3. 接下来,使用 yarn-session.s h在YARN中创建一个长时间运行的Flink集群

[root@bigdata04 flink-1.11.1]# bin/yarn-session.sh -jm 1024m -tm 1024m -d

这个表示创建一个Flink集群, -jm 是指定主节点的内存, -tm 是指定从节点的内存, -d 是表示把这个进程放到后台去执行。启动之后,会看到类似这样的日志信息,这里面会显示flink web界面的地址,以及这个flink集群在yarn中对应的applicationid。

此时到YARN的web界面中确实可以看到这个flink集群。

 可以使用屏幕中显示的flink的web地址或者yarn中这个链接都是可以进入这个flink的web界面的

 

 4. 接下来向这个Flink集群中提交任务,此时使用Flink中的内置案例

[root@bigdata04 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar

注意:这个时候我们使用flink run的时候,它会默认找这个文件,然后根据这个文件找到刚才我们
创建的那个永久的Flink集群,这个文件里面保存的就是刚才启动的那个Flink集群在YARN中对应
的applicationid。

2023-02-19 02:11:19,306 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.
2023-02-19 02:11:19,306 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-root.

[root@bigdata04 flink-1.11.1]# more /tmp/.yarn-properties-root
#Generated YARN properties file
#Tue Jan 20 22:50:06 CST 2026
dynamicPropertiesString=
applicationID=application_1768906309581_0005

5.任务提交上去执行完成之后,再来看flink的web界面,发现这里面有一个已经执行结束的任务了。

 注意:这个任务在执行的时候,会动态申请一些资源执行任务,任务执行完毕之后,对应的资源会自动释放掉。

6. 最后把这个Flink集群停掉,使用yarn的kill命令

[root@bigdata04 flink-1.11.1]# yarn application -kill application_176890630
9581_0005

7. 针对 yarn-session 命令,它后面还支持一些其它参数,可以在后面传一个 -help 参数

[root@bigdata04 flink-1.11.1]# bin/yarn-session.sh -help
Usage:
Optional
-at,--applicationType <arg> Set a custom application type for the
application on YARN
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached m
ode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with o
ptional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager to which to
connect. Use this flag to connect to a different JobManager than the one sp
ecified in the configuration.
-nl,--nodeLabel <arg> Specify YARN node label for the YARN a
pplication
-nm,--name <arg> Set a custom name for the application
on YARN
-q,--query Display available YARN resources (memo
ry, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-t,--ship <arg> Ship files in the specified directory
(t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with
optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached m
ode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-
paths for high availability mode

在这我对一些常见的命令进行了整理,添加了中文注释


 

注意:这里的-j 是指定Flink任务的jar包,此参数可以省略不写也可以

2. Flink ON YARN第二种方式

flink run -m yarn-cluster (创建Flink集群+提交任务)
使用flink run直接创建一个临时的Flink集群,并且提交任务
此时这里面的参数前面加上了一个 y 参数 

[root@bigdata04 flink-1.11.1]# bin/flink run -m yarn-cluster -yjm 1024 -yt
m 1024 ./examples/batch/WordCount.jar

提交上去之后,会先创建一个Flink集群,然后在这个Flink集群中执行任务。

 针对Flink命令的一些用法汇总:

三、Flink ON YARN的好处

1:提高大数据集群机器的利用率
2:一套集群,可以执行MR任务,Spark任务,Flink任务等

四、向集群中提交Flink任务

接下来我们希望把前面我们自己开发的Flink任务提交到集群上面,在这我就使用flink on yarn的第二种方式来向集群提交一个Flink任务。

1. 在pom.xml中添加打包配置
 

 <build>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <!-- scala编译插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.1.6</version>
                <configuration>
                    <scalaCompatVersion>2.12</scalaCompatVersion>
                    <scalaVersion>2.12.11</scalaVersion>
                    <encoding>UTF-8</encoding>
                </configuration>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 可以设置jar包的入口类(可选) -->
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

2. 打包代码

 mvn clean package -DskipTests

3. 将 db_flink-1.0-SNAPSHOT-jar-with-dependencies.jar 上传到bigdata04机器上
的 /data/soft/flink-1.11.1 目录中(上传到哪个目录都可以)

4. 提交Flink任务

注意:提交任务之前,先开启socket

[root@bigdata04 ~]# nc -l 9001
[root@bigdata04 flink-1.11.1]#bin/flink run -m yarn-cluster -c com.imooc.sc
ala.SocketWindowWordCountScala -yjm 1024 -ytm 1024 db_flink-1.0-SNAPSHOT-j
ar-with-dependencies.jar

6. 此时到yarn上面可以看到确实新增了一个任务,点击进去可以看到flink的web界面

 

 通过socket输入一串内容

 然后到flink的web界面查看日志

 

 

 7. 接下来我们希望把这个任务停掉,因为这个任务是一个流处理的任务,提交成功之后,它会一直运行。

注意:此时如果我们使用ctrl+c关掉之前提交任务的那个进程,这里的flink任务是不会有任何影响的,可以一直运行,因为flink任务已经提交到hadoop集群里面了。

 此时如果想要停止Flink任务,有两种方式:

方式一:停止yarn中任务

[root@bigdata04 flink-1.11.1]# yarn application -kill application_176896295
6138_0001

方式二:停止flink任务。可以在界面上点击这个按钮,或者在命令行中执行flink cancel停止都可以

 或者

[root@bigdata04 flink-1.11.1]# bin/flink cancel -yid application_1768962956
138_0001 7b99bfb261a92f84a89d87bcca3a3e23

这个flink任务停止之后,对应的那个yarn-session(Flink集群)也就停止了。

五、开启Flink的HistoryServer

注意:此时flink任务停止之后就无法再查看flink的web界面了,如果想看查看历史任务的执行信息就看不了了,怎么办呢?

        咱们之前在学习spark的时候其实也遇到过这种问题,当时是通过启动spark的historyserver进程解决的。flink也有historyserver进程,也是可以解决这个问题的。historyserver进程可以在任意一台机器上启动,在这我们选择在bigdata04机器上启动在启动historyserver进程之前,需要先修改bigdata04中的flink-conf.yaml配置文件。

[root@bigdata04 flink-1.11.1]# vi conf/flink-conf.yaml
jobmanager.archive.fs.dir: hdfs://bigdata01:9000/completed-jobs/
historyserver.web.address: 192.168.182.103
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://bigdata01:9000/completed-jobs/
historyserver.archive.fs.refresh-interval: 10000

然后启动flink的historyserver进程

[root@bigdata04 flink-1.11.1]# bin/historyserver.sh start

注意:hadoop集群中的historyserver进程也需要启动

 此时Flink任务停止之后也是可以访问flink的web界面的。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/356217.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++入门:引用

目录 一. 什么是引用 1.1 引用的概念 1.2 引用的定义 二. 引用的性质和用途 2.1 引用的三大主要性质 2.2 引用的主要应用 三. 引用的效率测试 3.1 传值调用和传引用调用的效率对比 3.2 值返回和引用返回的效率对比 四. 常引用 4.1 权限放大和权限缩小问题 4.2 跨…

【超好用】自定义的mybatis-plus代码生成器

BACKGROUND你是否也有这样的烦恼&#xff1a;每次写代码都需要创建很多包很多层很多类很多接口&#xff1f;耗时且费力姑且不谈&#xff0c;有时可能还大意了没有闪&#xff0c;搞出一堆bug这谁顶得住啊都3202年了&#xff0c;让程序自力更生吧&#xff01;&#xff01;教程 le…

原创|关于一次产品需求程序设计及优化的经历

文章目录一、流程梳理二、设计梳理三、技术方案3.1、下单接口扩展3.3.1、Request类新增deviceType3.3.2、申请单新增字段产品策略(productStrategy)3.3.3、下单产品策略的处理逻辑3.2、询价模块的设计3.2.1、Context设计3.2.2、ProductStrategy类设计3.2.2.1、AbstractProductS…

k8s篇之概念介绍

文章目录时光回溯什么是K8SK8S不是什么一、K8S构成组件控制平面组件&#xff08;Control Plane Components&#xff09;kube-apiserveretcdkube-schedulerkube-controller-managercloud-controller-managerNode 组件kubeletkube-proxy容器运行时&#xff08;Container Runtime&…

Spring Cloud Nacos实战(七)- Nacos之Linux版本安装

Nacos之Linux版本安装 Linux版NacosMySql生产环境配置 ​ 已经给大家讲解过了Nacos生产环境下需要搭建集群配置&#xff0c;那么这里我们预计需要&#xff1a;1个Nginx3个Nacos注册中心1个MySql 具体配置&#xff1a; 在官网上下载NacosLinux版本&#xff1a;https://github…

基于SSM框架的CMS内容管理系统的设计与实现

基于SSM框架的CMS内容管理系统的设计与实现 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目…

并查集(高级数据结构)-蓝桥杯

一、并查集并查集(Disioint Set)&#xff1a;一种非常精巧而实用的数据结构用于处理不相交集合的合并问题。用于处理不相交集合的合并问题。经典应用&#xff1a;连通子图。最小生成树Kruskal算法。最近公共祖先。二、应用场景有n个人&#xff0c;他们属于不同的帮派。 已知这些…

Kafka漏洞修复之CVE-2023-25194修复措施验证

Kafka漏洞修复之CVE-2023-25194修复措施验证前言风险分析解决方案AdoptOpenJDK Zookeeper Kafka多版本OpenJDK安装切换Zookeeper安装Kafka安装与使用其他Kafka消息发送流程Linux配置加载顺序参考链接前言 场景介绍 Kafka最近爆出高危漏洞CNNVD-202302-515&#xff0c;导致Apa…

LeetCode刷题复盘笔记—一文搞懂贪心算法之56. 合并区间(贪心算法系列第十四篇)

今日主要总结一下可以使用贪心算法解决的一道题目&#xff0c;56. 合并区间 题目&#xff1a;56. 合并区间 Leetcode题目地址 题目描述&#xff1a; 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间…

QXlsx(访问Excel)

再Qt中已经有了QAxObject来访问Excel&#xff0c;但访问的是微软的com&#xff0c;只能访问正版的Excl中的 .xls//xlsx ,而且使用起来及其不方便&#xff08;本人亲测&#xff09;。 在这里使用QXlsx,能更简单的访问Excel数据&#xff0c;但QXlsx这个类并没有在Qt Creator中&a…

《MySQL学习》 MySQL优化器选择如何选择索引

一.优化器的选择逻辑 建表语句 CREATE TABLE t (id int(11) NOT NULL AUTO_INCREMENT,a int(11) DEFAULT NULL,b int(11) DEFAULT NULL,PRIMARY KEY (id),KEY a (a),KEY b (b) ) ENGINEInnoDB;往表中插入10W条数据 delimiter ;; create procedure idata() begindeclare i in…

目标检测三大数据格式VOC,YOLO,COCO的详细介绍

注&#xff1a;本文仅供学习&#xff0c;未经同意请勿转载 说明&#xff1a;该博客来源于xiaobai_Ry:2020年3月笔记 对应的PDF下载链接在&#xff1a;待上传 目录 目标检测常见数据集总结 V0C数据集(Annotation的格式是xmI) A. 数据集包含种类: B. V0C2007和V0C2012的区别…

QT学习记录散件

fromLocal8Bit() qt中fromLocal8Bit()函数可以设置编码。 因为QT默认的编码是unicode&#xff0c;不能显示中文的 而windows默认使用&#xff08;GBK/GB2312/GB18030&#xff09; 所以使用fromLocal8Bit()函数&#xff0c;可以实现从本地字符集GB到Unicode的转换&#xff0c;从…

32-Golang中的map

Golang中的map基本介绍基本语法map声明的举例map使用的方式map的增删改查操作map的增加和更新map的删除map的查找map的遍历map切片基本介绍map排序map的使用细节基本介绍 map是key-value数据结构&#xff0c;又称为字段或者关联数组。类似其它编程语言的集合&#xff0c;在编程…

2023美赛ABCDEF思路汇总

注&#xff1a;以下每个题思路仅是个人所想所做&#xff0c;不代表他人。由于时间仓促完成这么多&#xff0c;难免有不足之处&#xff0c;还请谅解。 文章目录A题第一大问第二大问B题第一问第二问第三问C题第一问第二问第三问第四问D题第一问第二问第三问第四问第五问E题第一问…

#Paper Reading# Language Models are Unsupervised Multitask Learners

论文题目: Language Models are Unsupervised Multitask Learners 论文地址: https://life-extension.github.io/2020/05/27/GPT技术初探/language-models.pdf 论文发表于: OpenAI 2019 论文所属单位: OpenAI 论文大体内容&#xff1a; 本文主要提出了GPT-2&#xff08;Gener…

Visual Studio 2022: 增加对虚幻引擎的支持

自 Visual Studio 2022 发布以来&#xff0c;我们一直专注于为游戏和大型项目开发人员提供一系列生产力和性能改进。今天&#xff0c;我们很高兴与大家分享下一组专门用来提高虚幻引擎开发效率的功能。我们听到并看到了来自你&#xff08;我们的游戏开发人员&#xff09;的大量…

Spring MVC之 一次请求响应的过程

Spring MVC 会创建两个容器&#xff0c;其中创建Root WebApplicationContext 后&#xff0c;调用其refresh()方法会触发刷新事件&#xff0c;完成 Spring IOC 初始化相关工作&#xff0c;会初始化各种 Spring Bean 到当前容器中我们先来了解一个请求是如何被 Spring MVC 处理的…

2023最新文件快递柜系统网站源码 | 匿名口令分享 | 临时文件分享

内容目录一、详细介绍二、效果展示1.部分代码2.效果图展示三、学习资料下载一、详细介绍 2023最新文件快递柜系统网站源码 | 匿名口令分享 | 临时文件分享 很多时候&#xff0c;我们都想将一些文件或文本传送给别人&#xff0c;或者跨端传递一些信息&#xff0c;但是我们又不…

自抗扰控制ADRC之三种微分跟踪器TD仿真分析

目录 前言 1 全程快速微分器 1.1仿真分析 1.2仿真模型 1.3仿真结果 1.4结论 2 Levant微分器 2.1仿真分析 2.2仿真模型 2.3仿真结果 3.非线性跟踪微分器——韩教授 3.1仿真分析 3.2小结 4.总结 前言 工程上信号的微分是难以得到的&#xff0c;所以本文采用微分器…