- 服务器操作系统:centos7
- 本机操作系统:Mac
- Flink version: 1.15
- JDK version: java11
- HA service: Zookeeper
- File System: NFS
资源分配:
ip | hostname | role |
---|---|---|
10.250.0.1 | main0 | JM |
10.250.0.2 | main1 | JM |
10.250.0.3 | main2 | JM |
10.250.0.4 | worker1 | TM |
10.250.0.5 | worker2 | TM |
环境准备
远程登录工具:iTerm
远程登录5台服务器,⌘(command) + ⇧(shift) + i快捷键同时操作它们。若登录账号非root账号,建议切换为root账号
关闭SeLinux
$ setenforce 0
$ sed -i --follow-symlinks 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/sysconfig/selinux
$ reboot
$ sestatus
配置hostname
$ hostnamectl set-hostname main0 #main1,main2,worker1~2
$ hostnamectl status
$ vi /etc/hosts
$ cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
10.250.0.1: main0
10.250.0.2: main1
10.250.0.3: main2
10.250.0.4: worker1
10.250.0.5: worker2
同步时间
$ yum install ntp -y
$ crontab -e
$ crontab -l
0 * * * * /usr/sbin/ntpdate cn.pool.ntp.org
配置ssh免密登录
# 1. 生成密钥
$ ssh-keygen
# 2. 复制各自的公钥到其它服务器的authorized_key
$ ssh-copy-id -i ~/.ssh/id_rsa.pub main0 #main1,main2,worker1~2
关于复制公钥涉及到服务器之间的相互访问,可能会要求你输入root账号的密码,如果不知道可以通过手工复制粘贴代替ssh-copy-id命令。
详情可参考:
ssh免密登录遇到过的坑
安装Java11
安装flink之前需要装好java,1.15之后的版本只支持java11,不再支持java8.
$ yum search java|grep jdk
$ yum install -y java-11-openjdk
$ yum install java-11-openjdk-devel -y
$ java -version
Zookeeper HA 服务环境准备
Zookeeper集群搭建
Standalone部署模式下的flink集群只支持zookeeper高可用服务,所以若要部署Jobmanager高可用,必须部署zookeeper。
我选择将所有安装包都安装在/opt目录下。zookeeper只涉及到前三台服务器,因此以下操作仅用于前三台服务器(main0,main1,main2)
$ cd /opt
$ wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz --no-check-certificate
$ tar -xzf apache-zookeeper-3.7.1-bin.tar.gz'
# 重命名
$ mv apache-zookeeper-3.7.1 zookeeper
进入解压后的目录,修改配置文件zoo.cfg(默认只有zoo_sample.cfg,zoo.cfg需要自建)
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/logs
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=10.250.0.1:2888:3888
server.2=10.250.0.2:2888:3888
server.3=10.250.0.3:2888:3888
进入配置文件中定义的dataDir目录,创建myid文件,在myid文件中写入各自的id。配置文件中server.1=10.250.0.1:2888:3888里面的server.1 1就是id号。因此myid文件分别写入1,2,3即可。
$ cd /opt/zookeeper/data
$ vi myid
1 # 三台机器不一样,其它为2,3
$ nohup bin/zkServer.sh start >> /dev/null 2>&1 &
启动后可通过 ps -aux|grep zkServer 命令查看进程是否启动成功。也可以通过jps命令查看是否有QuorumPeerMain进程
启动失败可以进入/opt/zookeeper/logs查看对应的log。我这边的失败原因有:myid配置目录和zoo.cfg文件中的dataDir不一致;端口被占用。
NFS 配置
之所以配置NFS是因为配置Flink的zookeeper HA 服务需要配置可共享的文件存储(High-availability.storageDir must be a durable file system that is accessible from all nodes)官方示例给的hdfs,但是考虑到hdfs太笨重,我们的任务量也不大,我选择了更轻量的NFS。我的NFS服务是直接向同事申请的,同事提供了远程目录,我直接挂载到我的三台jobmanager服务器的/mnt/flink/ha/位置,之后这个目录就将配置为flink中的High-availability.storageDir
$ rpm -qa|grep nfs
$ rpm -qa|grep rpc
# if not installed, install nfs
$ yum -y install nfs-utils rpcbind
# verify
$ rpm -qa nfs-utils rpcbind
# moutn local dir to remote dir
mount -t nfs -o nolock 远程目录 /mnt/flink/ha/
Flink集群部署与配置
下面操作对象为所有服务器
下载并解压
$ cd /opt
$ wget https://dlcdn.apache.org/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz --no-check-certificate
$ tar -xzf flink-*.tgz
Configure masters in conf/masters:
main0:8081
main1:8081
main2:8081
Configure workers in conf/workers:
worker1
worker2
配置 conf/flink-conf.yaml
完整文件太长,不同机器不完全一样,下面仅列出worker1的配置项(去除了注释)
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 1600m
taskmanager.bind-host: 0.0.0.0
taskmanager.host: worker1(服务器worker2上写worker2)
taskmanager.memory.process.size: 8192m
taskmanager.memory.managed.size: 0m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 1
high-availability: zookeeper
high-availability.storageDir: file:///mnt/flink/ha/
high-availability.zookeeper.quorum: main0:2181,main1:2181,main2:2181
high-availability.zookeeper.path.root: /opt/flink-1.15.2/cluster_nodes
high-availability.cluster-id: /cluster_one
jobmanager.execution.failover-strategy: region
rest.address: localhost
rest.bind-address: 0.0.0.0
启动和关闭集群
# start cluster
$ bin/start-cluster.sh
# close cluster
$ bin/stop-cluster.sh
开启集群后可以在10.250.0.1:8081上看到UI界面
启动集群后也可以关闭具体的某个taskmanager
$ bin/taskmanager.sh stop
同一台机器可以启动多个taskmanager(前提是资源够)
$ bin/taskmanager.sh start
Flink集群功能测试
关掉或kill掉一个taskmanager,可以看到运行的任务重启在另一个taskmanager上。
关掉或kill掉当前的jobmanager leader,可以在另一个jobmanager的 UI界面上监测到任务从恢复点恢复。