文章目录
- 1、部署模式
- 2、本地独立部署会话模式的Flink
- 3、本地独立部署会话模式的Flink集群
- 4、向Flink集群提交作业
- 5、Standalone方式部署单作业模式
- 6、Standalone方式部署应用模式的Flink
1、部署模式
会话模式(Session Mode)
先启动Flink集群,保持一个会话,在这个会话种通过客户端提交作业。因为集群启动时所有资源都已经确定,所以所有提交的作业会竞争集群中的资源。比如下图中提交的三个Flink Application:
有点类似大学入学前,你在的那间宿舍已准备好,开学时和你室友分床位。会话模式比较适合于单个规模小、执行时间短的大量作业。
单作业模式(Per-Job Mode)
上面的会话模式因为资源共享会导致很多问题,为了更好的隔离资源,考虑为每个提交的作业启动一个集群,即单作业Per-Job模式
单作业模式,提前不启动Flink集群,有作业提交了,再启动一个集群。现提交现启动,每个作业都用的单独的集群,作业完成后,集群关闭,所有资源释放。类似你不住宿舍了,你现在住酒店,去前台现开现住,人走退房。单作业模式Flink无法直接自己运行,需要借助一些资源管理框架来启动集群,如K8S、Hadoop的YARN。
应用模式(Application Mode)
前面提到的两种模式下,Flink应用代码都是在客户端上执行,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager,加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。
所以解决办法就是,不要客户端了,直接把应用提交到JobManger上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JoblManager也就关闭了,这就是应用模式。
总结1:
应用模式与单作业模式,都是提交作业之后才创建集群,不同的时,单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群,而应用模式下,是直接由JobManaget执行应用程序的。
总结2:
- 会话模式下,集群生命周期独立于集群上运行的任何作业的生命周期,且所有作业之间共享集群资源
- 单作业模式下,多了启动集群的代价,对于每个提交的作业,资源隔离性得到了保证,集群生命周期和作业生命周期绑定
- 应用模式下,直接把应用提交到JobManger上运行,不是在客户端上执行
最后,对应这三种模式,采用的部署方式可以是:
- 独立部署
- K8S部署
- YARN部署
本篇只整理独立部署,后两种部署方式见下篇。
2、本地独立部署会话模式的Flink
独立部署就是独立运行,即Flink自己管理Flink资源,不依靠任何外部的资源管理平台,比如K8S或者Hadoop的Yarn,当然,独立部署的代价就是:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理,生产环境或者作业量大的场景下不建议采用独立部署。
- 下载安装包
# 下载地址:
https://archive.apache.org/dist/flink/flink-1.17.0/
flink-1.17.0-bin-scala_2.12.tgz
- 上传安装包到Linux节点服务器上的某目录:
/opt/moudle/flink-1.17.0-bin-scala_2.12.tgz
- 解压
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
- 启动,进入安装目录执行start-cluster.sh
[code9527@node01 flink-1.17.0] bin/start-cluster.sh
- 访问WebUI,对Flink集群进行监控管理
http://IP:8081
独立安装会话模式的Flink成功,控制台中,可以看到,TaskManager的数量为1(本来就一台机器,一个节点),由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为1。最后,可停止集群:
[code9527@node01 flink-1.17.0] /bin/stop-cluster.sh
可能遇到的坑:
坑1:
start-cluster.sh执行报错:Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME.
原因:未安装Java环境
yum install -y java-1.8.0-openjdk.x86_64
坑2:
http://IP:8081访问不通
处理下防火墙:
firewall-cmd --add-port 8081/tcp --permanent
firewall-cmd --reload
3、本地独立部署会话模式的Flink集群
上面部署的单机Flink,当你有多台服务器,要部署一个集群时,大体流程和上面一样。假设有三台服务器,角色分配规划如下:
节点服务器 | node-01 | node-02 | node-03 |
---|---|---|---|
角色 | JobManager+TaskManager | TaskManager | TaskManager |
主节点上的操作:
- 上传安装包到Linux节点服务器上的某目录:
/opt/moudle/flink-1.17.0-bin-scala_2.12.tgz
- 解压
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
- 进入解压目录/conf目录,修改flink-conf.yaml文件
vi flink-conf.yaml
# 修改内容如下:
# JobManager节点地址,我写了IP,这里IP或者hostname都行
jobmanager.rpc.address: node-01
jobmanager.bind-host: 0.0.0.0
rest.address: node-01
rest.bind-address: 0.0.0.0
# TaskManager节点地址.需要配置为当前机器名
taskmanager.bind-host: 0.0.0.0
taskmanager.host: node-01
- 其他可选配置:在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置
- jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
- taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。
- taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。
- parallelism.default:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
- 修改workers文件,指定干活儿的节点TaskManager的信息,这里是node01和另外两台主机02、03
[code9527@node01 conf] vi workers
# 修改如下内容:
node-01
node-02
node-03
# 用IP也行,这就是上面单机我也用用IP,而不用默认localhost的原因,多节点下看着乱得很
- 修改masters文件
vi masters
# 修改内容,hostname也行
node-01:8081
至于两个Task的从节点,直接把上面改好的Flink安装目录拷贝或分发给另外两个节点服务器:
# node02、node03上建好/opt/moudle/flink-1.17.0/目录后,01节点执行
scp /opt/moudle/flink-1.17.0 root@node02:/opt/moudle/flink-1.17.0/
scp /opt/moudle/flink-1.17.0 root@node03:/opt/moudle/flink-1.17.0/
再修改node02的taskmanager.host:
# conf目录下
vim flink-conf.yaml
# 改为:
taskmanager.host: node02 # IP或hostname
再修改node03的taskmanager.host:
# conf目录下
vim flink-conf.yaml
# 改为:
taskmanager.host: node03 # IP或hostname
回node01启动,执行start-cluster.sh
[code9527@node01 flink-1.17.0] bin/start-cluster.sh
此时,在控制台,应该可以看到当前集群的TaskManager数量为3,总Slot数和可用Slot数都为3
4、向Flink集群提交作业
上一篇,写了读取socket发送的单词并统计单词个数的程序,这里演示将它提交到集群中年去执行,首先将程序打包,在pom.xml中添加打包插件:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
打包,指令或者IDEA页面上操作:
mvn clean
mvn package
打包完成后,在target目录下即可找到所需JAR包,JAR包会有两个,一个原始包,一个带依赖的包(类似SpringBoot打包插件),因为集群中已经具备任务运行所需的所有依赖,所以建议使用原始包original-xxx.jar。下面打开Flink的控制台,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的JAR包:
点击该JAR包,出现任务配置页面,进行相应配置:
点Submit提交作业(点Submit没反应参考【这篇】),导航栏的Running Jobs可查看程序运行列表情况
在Flink程序里写的Linux主机里开启端口监听,并在socket端口中输入一些字符串:
先点击Task Manager侧边栏,再切StdOut的tab页,点刷新,可以看到运行成功:
先取消任务,接下来用命令行提交任务:
使用命令行提交,会话模式下还是先启动集群:
bin/start-cluster.sh
进入flink安装目录/opt/module/flink-1.17.0,把前面的jar包上传到该目录下,执行flink run指令提交作业
bin/flink run -m 10.4.95.27:8081 -c com.plat.count.SocketStreamWordCount ./FlinkService-1.0-SNAPSHOT.jar
# -m指定了提交到的JobManager
# -c指定了入口类
提交成功:
此时web控制台还是可以看到同样的效果,且/opt/module/flink-1.17.0/log路径中,也可以查看TaskManager的输出:
[root@node-105-69 log] cat flink-atguigu-standalonesession-0-node-105-69.out
(hello,1)
(hello,2)
(flink,1)
(hello,3)
(scala,1)
5、Standalone方式部署单作业模式
部署不了单作业模式,前面说了,Standalone方式下,Flink并不支持单作业模式部署。因为单作业模式需要借助一些资源管理平台,比如K8S
6、Standalone方式部署应用模式的Flink
应用模式下不会提前创建集群
,所以不能调用start-cluster.sh脚本。需要使用同样在bin目录下的standalone-job.sh来创建一个JobManager
# 先停掉会话模式
[root@node-105-69 flink-1.17.0] bin/stop-cluster.sh
# 继续开启对应的Linux主机的netcat
nc -lk 9527
将上面的安装包放到flink的lib目录下
[root@node-105-69 flink-1.17.0] mv FlinkService-1.0-SNAPSHOT.jar lib/
启动JobManager,这里我们直接指定作业入口类,脚本会到lib目录扫描所有的jar包
[root@node-105-69 flink-1.17.0] bin/standalone-job.sh start --job-classname com.plat.count.SocketStreamWordCount
启动TaskManager:(独立部署,这个时候干活的Task是手动起的)
[root@node-105-69 flink-1.17.0] bin/taskmanager.sh start
发送数据到9527端口:
查看控制台:
停掉集群:
bin/taskmanager.sh stop
bin/standalone-job.sh stop