搭建Flink集群、集群HA高可用以及配置历史服务器

news2025/1/12 21:00:56

Flink集群搭建

  • Flink集群搭建
    • 集群规划
    • 下载并解压安装包
    • 修改集群配置
    • 分发安装目录
    • 启动集群
    • 访问Web UI
  • Flink集群HA高可用
    • 概述
    • 集群规划
    • 配置flink
    • 配置master、workers
    • 配置ZK
    • 分发安装目录
    • 启动HA集群
    • 测试
  • Flink参数配置
  • 配置历史服务器
    • 概述
    • 配置
    • 启动、停止历史服务器
    • 提交一个Job任务
    • 查看历史Job信息

Flink集群搭建

集群规划

节点node01node02node03
角色JobManager
TaskManager
TaskManagerTaskManager

下载并解压安装包

wget https://repo.huaweicloud.com/apache/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz

在node01节点下载flink安装包,同时解压、重命名。

tar  -zxvf flink-1.17.0-bin-scala_2.12.tgz 
mv flink-1.17.0 flink

修改集群配置

进入flink的conf目录,修改集群配置

vim /usr/local/program/flink/conf/flink-conf.yaml

1.修改flink-conf.yaml文件

JobManager节点配置

# jobmanager.rpc.address: localhost
# jobmanager.bind-host: localhost
jobmanager.rpc.address: node01
jobmanager.bind-host: 0.0.0.0

# rest.address: localhost
# rest.bind-address: localhost
rest.address: node01
rest.bind-address: 0.0.0.0

TaskManager节点配置

# taskmanager.host: localhost
# taskmanager.bind-host: localhost

taskmanager.host: node01
taskmanager.bind-host: 0.0.0.0

注意:需要在/etc/hosts文件中配置各个节点信息

172.29.234.1	node01	node01
172.29.234.2	node02	node02
172.29.234.3	node03	node03

2.修改workers文件

指定node01、node02、node03等节点为TaskManager

# localhost
node01
node02
node03

3.修改masters文件

# localhost:8081
node01:8081

分发安装目录

node01节点安装、配置好后,将Flink安装目录分发给另外两个节点服务器。

[root@node01 program]# pwd
/usr/local/program
[root@node01 program]# ls
flink                            jdk8

[root@node01 program]# scp -r flink node02:/usr/local/program/flink

[root@node01 program]# scp -r flink node03:/usr/local/program/flink

在node02、node03节点,修改flink-conf.yaml 配置

1.node02节点

# taskmanager.host: localhost

taskmanager.host: node02

2.node03节点

# taskmanager.host: localhost

taskmanager.host: node03

启动集群

Flink附带了相关的bash脚本,可以用于启动、停止集群。

# 启动集群
./bin/start-cluster.sh

# 停止集群
./bin/stop-cluster.sh

node01节点服务器上执行start-cluster.sh脚本以启动Flink集群

[root@node01 bin]# cd /usr/local/program/flink/bin

[root@node01 bin]# ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node01.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.

查看进程情况

[root@node01 bin]# jps
6788 StandaloneSessionClusterEntrypoint
7256 Jps
7116 TaskManagerRunner
[root@node02 conf]# jps
16884 TaskManagerRunner
16959 Jps
[root@node03 conf]# jps
17139 TaskManagerRunner
17214 Jps

访问Web UI

当如上所示一样后,代表启动成功,此时可以访问http://node01:8081对flink集群和任务进行监控管理。

在这里插入图片描述
注意:关闭防火墙,否则可能无法访问,或者集群的TaskManager数量、Slot数量显示异常

systemctl stop firewalld

提交任务

[root@node01 bin]# flink run ../examples/streaming/WordCount.jar

查看运行结果

[root@node01 bin]# tail flink-*-taskexecutor-*.out

也可以通过Flink的 Web UI来监视集群的状态和正在运行的作业
在这里插入图片描述

Flink集群HA高可用

概述

集群实际上只有一个JobManager,是存在单点故障的,官方提供了Standalone Cluster HA模式来实现集群高可用。

集群可以有多个JobManager,但只有一个处于active状态,其余的则处于备用状态,Flink使用 ZooKeeper来选举出Active JobManager,并依赖其来提供一致性协调服务,所以需要预先安装 ZooKeeper 。

Flink本身提供了内置ZooKeeper插件,可以直接修改conf/zoo.cfg,并且使用 /bin/start-zookeeper-quorum.sh直接启动。

集群规划

节点node01node02node03
角色JobManager
TaskManager
JobManager
TaskManager
TaskManager

配置flink

基于Flink集群的node01节点配置的情况下,修改conf/flink-conf.yaml文件,增加如下配置:

# 配置使用zookeeper来开启高可用模式
high-availability.type: zookeeper

# 配置zookeeper的地址,采用zookeeper集群时,可以使用逗号来分隔多个节点地址
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181

# 在zookeeper上存储flink集群元信息的路径
high-availability.zookeeper.path.root: /flink

# 集群id 放置集群的所有必需协调数据
high-availability.cluster-id: /cluster_one

# 持久化存储JobManager元数据的地址,zookeeper上存储的只是指向该元数据的指针信息
high-availability.storageDir: hdfs://node01:9000/flink/recovery

配置master、workers

修改conf/masters文件,配置master节点

node01:8081
node02:8081

修改conf/workers文件,配置worker节点

node01
node02
node03

配置ZK

编辑vim zoo.cfg文件

server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

分发安装目录

node01节点安装、配置好后,将Flink安装目录分发给另外两个节点服务器。

[root@node01 program]# pwd
/usr/local/program
[root@node01 program]# ls
flink                            jdk8

[root@node01 program]# scp -r flink node02:/usr/local/program/flink

[root@node01 program]# scp -r flink node03:/usr/local/program/flink

在node02、node03节点,修改flink-conf.yaml 配置

1.node02节点

jobmanager.rpc.address: node02

taskmanager.host: node02

2.node03节点

taskmanager.host: node03

启动HA集群

分发Flink相关配置到其他节点,然后确保Hadoop和ZooKeeper已经启动后,使用以下命令来启动集群:

[root@node01 flink]# bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host node01.
Starting standalonesession daemon on host node02.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.

访问http://node01:8081
在这里插入图片描述
访问http://node02:8081
在这里插入图片描述

测试

查看ZK:JobManager节点信息
在这里插入图片描述
kill node01节点上的JobManager进程

[root@node01 flink]# jps
2564 DataNode
3508 NodeManager
18741 Jps
7784 QuorumPeerMain
16666 TaskManagerRunner
2363 NameNode
16300 StandaloneSessionClusterEntrypoint
3117 ResourceManager
[root@node01 flink]# kill -9 16300

查看Active JobManager是否变化
在这里插入图片描述

Flink参数配置

flink-conf.yaml文件中有大量的配置参数,基本常见参数如下:

# jobmanager地址	
jobmanager.rpc.address: node01

# JobManagerJVM 堆内存大小,默认为 1024m 
jobmanager.heap.size: 1024m

# rpc通信端口
jobmanager.rpc.port: 6123

# 进程使用的全部内存大小,可以根据集群规模进行适当调整
jobmanager.memory.process.size:1600m

# TaskmanagerJVM 堆内存大小,默认为 1024m 
taskmanager.heap.size: 1024m

# 进程使用的全部内存大小,可以根据集群规模进行适当调整
taskmanager.memory.process.size: 1728m

# 每个TaskManager能够分配的Slot数量进行配置,默认为1 
# 通常设置为 CPU 核心的数量,或其一半
# Slot就是TaskManager中具体运行一个任务所分配的计算资源
taskmanager.numberOfTaskSlots: 1

# flink任务执行的并行度,默认为1
# 优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量
parallelism.default: 1

# 重启策略
jobmanager.execution.failover-strategy: region

# 存储临时文件的路径,如果没有配置,则默认采用服务器的临时目录,如 LInux/tmp 目录
io.tmp.dirs: /tmp

参考Flink的官方手册:更多配置

配置历史服务器

概述

运行Flink job的集群一旦停止,只能去yarn或本地磁盘上查看日志,对于Job任务信息的查看、异常问题的排查非常不友好。

Flink提供了历史服务器,用来在相应的Flink集群关闭后查询已完成作业的统计信息。通过History Server可以查询这些已完成作业的统计信息,无论是正常退出还是异常退出。

Flink任务停止后,JobManager会将已经完成任务的统计信息进行存档,History Server进程则在任务停止后可以对任务统计信息进行查询。

配置

创建存储目录

[root@node01 flink]# hadoop fs -mkdir -p /logs/flink-job

在flink-config.yaml中添加如下配置

#==============================================================================
# HistoryServer
#==============================================================================

# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)

# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
jobmanager.archive.fs.dir: hdfs://node01:9000/logs/flink-job

# The address under which the web-based HistoryServer listens.
#historyserver.web.address: 0.0.0.0
historyserver.web.address: node01

# The port under which the web-based HistoryServer listens.
#historyserver.web.port: 8082
historyserver.web.port: 8082

# Comma separated list of directories to monitor for completed jobs.
#historyserver.archive.fs.dir: hdfs:///completed-jobs/
historyserver.archive.fs.dir: hdfs://node01:9000/logs/flink-job

# Interval in milliseconds for refreshing the monitored directories.
#historyserver.archive.fs.refresh-interval: 10000
historyserver.archive.fs.refresh-interval: 5000

启动、停止历史服务器

启动历史服务器

[root@node01 flink]# bin/historyserver.sh start
Starting historyserver daemon on host node01.

停止历史服务器

[root@node01 flink]# bin/historyserver.sh stop
Stopping historyserver daemon (pid: 30749) on host node01.

提交一个Job任务

[root@node01 flink]# bin/flink run -t yarn-per-job -c com.atguigu.wc.WordCountStreamUnboundedDemo  /root/FlinkTutorial-1.17-1.0-SNAPSHOT.jar

2023-06-12 23:41:00,719 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:41:00,742 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:41:00,761 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 23:41:00,766 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1686577483648_0012
2023-06-12 23:41:00,792 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1686577483648_0012
2023-06-12 23:41:00,792 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2023-06-12 23:41:00,793 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2023-06-12 23:41:04,565 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2023-06-12 23:41:04,565 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node02:38887 of application 'application_1686577483648_0012'.
Job has been submitted with JobID cd41d983c93d8eb906c9aa899dcdefd0

访问http://node01:8088/cluster查看Hadoop
在这里插入图片描述
访问Web UI查看提交任务信息
在这里插入图片描述

查看历史Job信息

在浏览器地址栏输入:http://node01:8082 查看已经停止的 job 的统计信息
在这里插入图片描述
停止提交任务

[root@node01 flink]# bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1686577483648_0012 cd41d983c93d8eb906c9aa899dcdefd0

访问http://node01:9870/explorer.html#/logs/flink-job查看HDFS中的归档文件
在这里插入图片描述
等一段时间,几分钟后查看历史服务器
在这里插入图片描述
查看Job具体信息
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1030155.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何选择适合你的隧道爬虫ip?

隧道爬虫IP在保护你的网络隐私和提供安全的数据传输方面起着关键作用。然而,在众多的商家中选择适合自己的并非易事。本文将分享一些关键的考虑因素,帮助你选择适合你的隧道爬虫IP商家。无论你是个人用户还是企业客户,相信这些指南都能帮助你…

单元测试 —— JUnit 5 参数化测试

JUnit 5参数化测试 目录 设置我们的第一个参数化测试参数来源 ValueSourceNullSource & EmptySourceMethodSourceCsvSourceCsvFileSourceEnumSourceArgumentsSource参数转换参数聚合奖励总结 如果您正在阅读这篇文章,说明您已经熟悉了JUnit。让我为您概括一下…

家居服务小程序发展指南

随着互联网的快速发展,越来越多的企业开始关注并投资于线上平台的建设,以满足用户的多样化需求。家居服务行业也不例外,通过打造小程序平台,可以更好地服务用户,提供更便捷的家居服务体验。 首先,我们可以选…

大二毕设.3-网盘系统

目录 技术选型: 功能概括: 基本演示: 实现讲解: 技术选型: 前端: Vue3 Element Plus后端: SpringBoot Mybatis-Plus MySQL Redis Caffeine FastDFS/OSS SpringCloud Stream RocketMQ Zookeeper 功能概括&…

Flutter实现地图上汇聚到一点的效果。

要求效果: 实现的效果: 代码: 选择点的界面: import dart:math;import package:flutter/material.dart; import package:get/get.dart; import package:kq_flutter_widgets/widgets/animate/mapChart/map_chart.dart; import pa…

Winserver安装Linux虚拟机执行java程序踩坑

前言: “好久没有更新文章了,最近太忙了!”一个特别朴实无华的小马哥说到。 “小马蝈蝈,那你现在更新文章了,是不是很闲啊,来帮我....” 耳畔听到一个妹子的声音。咳咳咳~~此处省略一万字,文末也…

WebGL 用鼠标控制物体旋转

目录 鼠标控制物体旋转 如何实现物体旋转 示例程序(RotateObject.js) 代码详解 示例效果 鼠标控制物体旋转 有时候,WebGL程序需要让用户通过鼠标操作三维物体。这一节来分析示例程序RotateObject,该程序允许用户通过拖动&…

数据通信——传输层TCP(超时时间选择)

引言 TCP每一次发送报文段,就会对这个报文段设置一次计时器。如果时间到了却没有收到确认报文,那么就要重传该报文。 这个之前在TCP传输的机制中提到过,这个章节就来研究一下超时时间问题。 关于加权的概念 有必要提及一下加权的概念&#x…

天地一体化指挥!平战结合的应急感知云来了

面向智慧应急数字化转型需求,天翼物联基于感知云平台创新能力,为客户提供泛协议接入、感知云应急平台、应急感知数据治理、决策处置大屏等在内的应急感知云服务,构建应急感知神经系统新型数字化底座,实现应急感知、预警、决策、处…

程序员必须掌握的算法系列之贪心算法

一:引言 在计算机科学中,贪心算法(Greedy Algorithm)是一种基于贪心策略的算法思想,它在每一步选择中都采取当前状态下最优的选择,以希望最终能够得到全局最优解。贪心算法通常可以在较短的时间内找到问题…

springcloud3 分布式事务实现逻辑思想2

一 分布式事务逻辑 1.1 CAP理论 CAP原则又称CAP定理,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性)这3个基本需求&…

SPI在Java中的实现与应用 | 京东物流技术团队

1 SPI的概念 API API在我们日常开发工作中是比较直观可以看到的,比如在 Spring 项目中,我们通常习惯在写 service 层代码前,添加一个接口层,对于 service 的调用一般也都是基于接口操作,通过依赖注入,可以…

【深度学习实验】前馈神经网络(六):自动求导

目录 一、实验介绍 二、实验环境 1. 配置虚拟环境 2. 库版本介绍 三、实验内容 0. 导入必要的工具包 1. 标量求导 2. 矩阵求导 3. 计算图 一、实验介绍 PyTorch提供了自动求导机制,它是PyTorch的核心功能之一,用于计算梯度并进行反向传播。自动求…

C++流插入和流提取的重载!

C作为C语言的衍生,其弥补了C语言中的很多不足,也对C语言进行了一定的优化!今日就来讲解一下C中输入/出流相关的知识!以及对输入/出的重载!,希望读完本篇文章,能让读者们对C中输入/出流有更深一步…

Java之IO概述以及

1.1 什么是IO 生活中,你肯定经历过这样的场景。当你编辑一个文本文件,忘记了ctrls ,可能文件就白白编辑了。当你电脑上插入一个U盘,可以把一个视频,拷贝到你的电脑硬盘里。那么数据都是在哪些设备上的呢?键…

散列(哈希)查找的定义,常见的散列函数设计以及处理哈希冲突方法

1.散列表 1.散列表的定义 散列表(Hash Table),又称哈希表。 是一种数据结构,特点是:数据元素的关键字与其存储地址直接相关。 特点: 若不同的关键字通过散列函数映射到同一个值,则称它们为“同义词”。通过散列函数确定的位置…

Maven 设置环境变量(Windows、Linux)

文章目录 Windows 配置 Maven 环境变量Linux 配置 Maven 环境变量 如果想在任意路径下都能通过 mvn 命令运行 Maven 程序,就需要将 Maven 程序路径设置到环境变量中, 否则使用 mvn 命令时每次都要加上 Maven 程序的全路径 核心就一句话,把 M…

坚鹏:中国邮政储蓄银行金融科技前沿技术发展与应用场景第4期

中国邮政储蓄银行金融科技前沿技术发展与应用场景第4期培训圆满结束 中国邮政储蓄银行拥有优良的资产质量和显著的成长潜力,是中国领先的大型零售银行。2016年9月在香港联交所挂牌上市,2019年12月在上交所挂牌上市。中国邮政储蓄银行拥有近4万个营业网点…

学习记忆——英语篇——右脑记忆单词

文章目录 英语字母形象起源右脑记忆单词的原则四大步骤第一步:摄取信息第二步:处理信息第三步:储存信息第四步:提取信息 训练例子字母形象训练 右脑记忆单词5大方法字源法编码法字母编码法字母组合编码法 拼音法全拼法拼音组合 熟…

springcloud3 分布式事务解决方案seata之XA模式4

一 seata的模式 1.1 seata的几种模式比较 Seata基于上述架构提供了四种不同的分布式事务解决方案: XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入 TCC模式:最终一致的分阶段事务模式,有…