flink集群安装部署

news2024/12/26 21:04:20

1.下载

官网下载:Downloads | Apache Flink

阿里网盘下载(包含依赖包):阿里云盘分享

提取码:9bl2

2.解压

tar -zxvf flink-1.12.7-bin-scala_2.11.tgz -C ../opt/module

3.修改配置文件

cd flink-1.12.7/conf/

修改  flink-conf.yaml  文件

修改  masters  文件

创建  workers 文件

3.1修改  flink-conf.yaml  文件

具体配置按照自己的集群配置来

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################


#==============================================================================
# Common
#==============================================================================

# The external address of the host on which the JobManager runs and can be
# reached by the TaskManagers and any clients which want to connect. This setting
# is only used in Standalone mode and may be overwritten on the JobManager side
# by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
# In high availability mode, if you use the bin/start-cluster.sh script and setup
# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
# automatically configure the host name based on the hostname of the node where the
# JobManager runs.

jobmanager.rpc.address: hadoop102

# The RPC port where the JobManager is reachable.

jobmanager.rpc.port: 6123


# The heap size for the JobManager JVM

jobmanager.heap.size: 800m


# The total process memory size for the TaskManager.
#
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.

taskmanager.memory.process.size: 1024m

# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
#
#taskmanager.memory.flink.size: 1280m

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 1

# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1

# The default file system scheme and authority.
# 
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==============================================================================
# High Availability
#==============================================================================

# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
#
high-availability: zookeeper

# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
# 
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
#
 high-availability.storageDir: hdfs://192.168.233.130:8020/flink/ha/

# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
#
high-availability.zookeeper.quorum: 192.168.233.130:2181,192.168.233.131:2181,192.168.233.132:2181


# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
# The default value is "open" and it can be changed to "creator" if ZK security is enabled
#
high-availability.zookeeper.client.acl: open

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
 state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
 state.checkpoints.dir: hdfs://192.168.233.130:8020/flink-checkpoints

# Default target directory for savepoints, optional.
#
 state.savepoints.dir: hdfs://192.168.233.130:8020/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false

# The failover strategy, i.e., how the job computation recovers from task failures.
# Only restart tasks that may have been affected by the task failure, which typically includes
# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.

jobmanager.execution.failover-strategy: region

#==============================================================================
# Rest & web frontend
#==============================================================================

# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
#
rest.port: 8081

# The address to which the REST client will connect to
#
rest.address: 0.0.0.0

# Port range for the REST and web server to bind to.
#
rest.bind-port: 8080-8090

# The address that the REST & web server binds to
#
rest.bind-address: 0.0.0.0

# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.

web.submit.enable: true

#==============================================================================
# Advanced
#==============================================================================

# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# io.tmp.dirs: /tmp

# The classloading resolve order. Possible values are 'child-first' (Flink's default)
# and 'parent-first' (Java's default).
#
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
# classloader.resolve-order: child-first

# The amount of memory going to the network stack. These numbers usually need 
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, the default max is 1GB.
# 
# taskmanager.memory.network.fraction: 0.1
#taskmanager.memory.network.min: 128mb
#taskmanager.memory.network.max: 1gb

#==============================================================================
# Flink Cluster Security Configuration
#==============================================================================

# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
# may be enabled in four steps:
# 1. configure the local krb5.conf file
# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
# 3. make the credentials available to various JAAS login contexts
# 4. configure the connector to use JAAS/SASL

# The below configure how Kerberos credentials are provided. A keytab will be used instead of
# a ticket cache if the keytab path and principal are set.

# security.kerberos.login.use-ticket-cache: true
# security.kerberos.login.keytab: /path/to/kerberos/keytab
# security.kerberos.login.principal: flink-user

# The configuration below defines which JAAS login contexts

# security.kerberos.login.contexts: Client,KafkaClient

#==============================================================================
# ZK Security Configuration
#==============================================================================

# Below configurations are applicable if ZK ensemble is configured for security

# Override below configuration to provide custom ZK service name if configured
# zookeeper.sasl.service-name: zookeeper

# The configuration below must match one of the values set in "security.kerberos.login.contexts"
# zookeeper.sasl.login-context-name: Client

#==============================================================================
# HistoryServer
#==============================================================================

# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)

# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
jobmanager.archive.fs.dir: hdfs://192.168.233.130:8020/completed-jobs/

# The address under which the web-based HistoryServer listens.
#historyserver.web.address: 0.0.0.0

# The port under which the web-based HistoryServer listens.
historyserver.web.port: 8082

# Comma separated list of directories to monitor for completed jobs.
historyserver.archive.fs.dir: hdfs://192.168.233.130:8020/completed-jobs/

# Interval in milliseconds for refreshing the monitored directories.
historyserver.archive.fs.refresh-interval: 10000

3.2修改masters

192.168.233.130:8081

3.3创建workers

里面配置 集群 各机器的 ip 或者 主机名

192.168.233.130
192.168.233.131
192.168.233.132

4.分发集群

scp -r flink-1.12.7 hadoop103:/opt/module
scp -r flink-1.12.7 hadoop104:/opt/module

5.配置环境变量

vim /etc/profile.d/my_env.sh 

#FLINK_HOME
export FLINK_HOME=/opt/module/flink-1.12.7
export PATH=$FLINK_HOME/bin:$PATH

6.分发环境变量

 scp /etc/profile.d/my_env.sh hadoop102:/etc/profile.d/
 scp /etc/profile.d/my_env.sh hadoop103:/etc/profile.d/

7.source 环境变量

集群节点1:source /etc/profile.d/my_env.sh
集群节点2:source /etc/profile.d/my_env.sh
集群节点3:source /etc/profile.d/my_env.sh

8.添加依赖包

可以不添加试试,直接  bin/start-cluster.sh 启动 flink集群

不出意外的话 可能会出现一系列 的错误

https://mvnrepository.com/

搜索  commons-cli-1.4.jar  和 flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar 并下载

这个当然会有点慢

不过我在阿里云盘中已经放置了,可以去里面下载

9.启动flink集群

bin/start-cluster.sh

Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop103.
Starting taskexecutor daemon on host hadoop104.
 

实在不放心,在每个机器上 jps 查看下 是否又有对应的进程

 

 

 10.查看 flink web ui

http://192.168.233.130:8081/ 

11.测试

 bin/flink run examples/streaming/WordCount.jar

日志:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/chunjun/chunjun-dist/connector/iceberg/chunjun-connector-iceberg.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/chunjun-dist/connector/iceberg/chunjun-connector-iceberg.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/chunjun/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID c537bf865e69549f725bb167bfa64c18
Program execution finished
Job with JobID c537bf865e69549f725bb167bfa64c18 has finished.
Job Runtime: 2678 ms

成功!

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/507122.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[C++]string的使用

目录 string的使用&#xff1a;&#xff1a; 1.string类介绍 2.string常用接口说明 string相关习题训练&#xff1a;&#xff1a; 1.仅仅反转字母 2.找字符串中第一个只出现一次的字符 3.字符串里面最后一个单词的长度 4.验证一个字符串是否是回文 5.字符串相加 6.翻转字符串…

[Dubbo] 重要接口与类 (三)

文章目录 1.dubbo的整体调用链路2.dubbo的源码整体设计3.重要接口和类 1.dubbo的整体调用链路 消费者通过Interface进行方法调用&#xff0c;统一交由消费者的Proxy处理&#xff08;Proxy通过ProxyFactory来进行代理对象的创建&#xff09; Proxy调用Filter模块&#xff0c;做…

linux中fork函数与vfork函数的区别

fork函数跟vfork函数一样能够创建进程&#xff0c;它们主要有两个区别 &#xff08;1&#xff09;区别一&#xff1a; vfork直接使用父进程存储空间&#xff0c;不拷贝。 &#xff08;2&#xff09;区别二&#xff1a; vfork保证子进程先运行&#xff0c;当子进程调用exit退…

【读论文】AT-GAN

【读论文】AT-GAN 介绍网络架构生成器IAMSTM 辨别器 损失函数SEM损失内容损失结构损失对抗损失 总结参考 论文&#xff1a;https://www.sciencedirect.com/science/article/pii/S156625352200255X 如有侵权请联系博主 介绍 大概是刚开学的时候就读到一篇文章&#xff0c;看完…

Nginx静态资源传输优化,文件高效传输,事半功倍

1.引出问题 Nginx可以作为静态资源服务器&#xff0c;比如我们访问192.168.110.97:80&#xff0c;熟悉的nginx欢迎界面&#xff0c;这其实也是nginx为我们提供的一个静态文件&#xff1a;index.html。 既然是静态资源&#xff0c;那我们能否优化一下传输效率呢&#xff1f; 1…

训练计划安排(练一休一训练分化+倒金字塔训练法)【了解即可,一般人容量不用练一休一,看抖音@孙悟饭桶】

目录 练一休一训练分化每次训练的组数12-15组 &#xff08;4-5个动作&#xff09;QA 倒金字塔训练法倒金字塔热身正式组常见误区&#xff1a; 训练补剂bcaa咖啡因肌酸蛋白粉 如何降低皮质醇水平如何提升睾酮水平文献出处睡眠8h摄入适量脂肪&#xff08;0.8g每公斤体重&#xff…

java APT原理及APT实战 - 一步步教你写ButterKnife

一、定义 Java APT 是 Java 技术设计的一个 APT 架构&#xff0c; APT&#xff08;Annotation Processing Tool&#xff09;即注解处理器&#xff0c;它是一种处理注解的工具&#xff0c;也是javac中的一个工具&#xff0c;用于在编译阶段未生成class之前对源码中的注解进行扫…

Windows巧用git实现笔记自动备份

Windows巧用git实现笔记自动备份 准备git仓库配置自动上传脚本设置 Windows 自动定时任务参考文献 准备git仓库 安装git&#xff1a;https://git-scm.com/downloads&#xff1a; 注册并登录gitee&#xff0c;本地生成ssh key&#xff08;详情百度&#xff09;&#xff0c;然后…

数据处理Pandas学习笔记(一)

import pandas as pdpandas值series创建 t pd.Series([1, 2, 31, 12, 3, 4]) t0 1 1 2 2 31 3 12 4 3 5 4 dtype: int64type(t)pandas.core.series.Seriesseries指定索引 t2 pd.Series([1,23,3,2,3],indexlist(abcde)) t2a 1 b 23 c 3 d …

【Java-05】常用API、正则表达式、Collection集合

主要内容 BigInteger类BigDecimal类Arrays类包装类String类的常用方法正则表达式Collection集合 1 BigInteger类 1.1 概述 概述 : java.math.BigInteger类是一个引用数据类型 , 可以用于计算一些大的整数 , 当超出基本数据类型数据范围的整数运算时就可以使用BigInteger了。…

类别无关的姿态估计ECCV2022

现有的2D姿态估计工作主要集中在某一类别&#xff0c;例如人类、动物和车辆。然而&#xff0c;有很多应用场景需要检测unseen对象类的姿态&#xff08;或关键点&#xff09;。因此作者提出CAPE任务&#xff08;Category-Agnostic Pose Estimation&#xff09;&#xff0c;该任务…

Sketch哪个版本好用?

使用最新版本的 Sketch 是很有意义的。一方面&#xff0c;最新版本通常会有新的功能和改进&#xff0c;使设计师更方便地完成工作。另一方面&#xff0c;使用最新版本还可以避免出现因版本不兼容而无法打开源文件的问题。此外&#xff0c;最新版本通常会更稳定&#xff0c;因此…

Linux command(sar)

说明 sar命令是一个系统性能监测工具&#xff0c;用于收集、报告和分析系统的各种资源使用情况。以下是sar命令的基本用法&#xff1a; sar [选项] [时间间隔] [次数] 选项&#xff1a;可用的选项包括-a&#xff08;显示所有资源使用情况&#xff09;、-b&#xff08;显示I/…

图片修复增强调研

Real-ESRGAN 工程地址&#xff1a;https://github.com/xinntao/Real-ESRGAN 效果&#xff1a; 人脸增强部分&#xff0c;调用的GFPGAN. GFPGAN 工程地址&#xff1a;https://github.com/TencentARC/GFPGAN 论文效果&#xff1a; BasicSR-ESRGAN&#xff1a; 项目地址&a…

[Qt编程之Widgets模块] -001: QButtonGroup抽象容器

1.QButtonGroup简介 QButtonGroup提供了一个抽象容器&#xff0c;可以将按钮小部件放入其中。它不提供此容器的可视化表示&#xff0c;而是管理组中每个按钮的状态。 互斥按钮组&#xff0c;将关闭除已单击的按钮外的所有可选中&#xff08;可切换&#xff09;按钮。默认情况下…

免费使用GPT-4.0?【AI聊天 | GPT4教学】 —— 微软 New Bing GPT4 申请与使用保姆级教程

目录 认识 New Bing 2. 注册并登录 Microsoft 账号 3. 如何免科学上网使用 New Bing&#xff1f; 4. 加入 WaitList 候补名单 5. 使用 New Bing&#xff01; 6. 使用 Skype 免科学上网访问 New Bing&#xff01; 7. 在 Chrome 浏览器中使用 New Bing&#xff01; 8. 总…

如何利用splice()和slice()方法操作数组

如何利用splice&#xff08;&#xff09;和slice&#xff08;&#xff09;方法操作数组 前言splice()是什么&#xff0c;有什么用&#xff1f;怎么用&#xff1f;slice()是什么&#xff0c;有什么用&#xff1f;怎么用&#xff1f;splice和slice方法的区别小结 前言 splice&am…

如今的Android就业率惨不忍睹~

3月底公司大裁员&#xff0c;投了一个月简历&#xff0c;一天投个几十份简历&#xff0c;而收到面试通知的就那么三四家&#xff0c;要么就是薪水给得很低不想去&#xff0c;要么就是高薪水的Offer拿不下&#xff0c;而自己中意公司的却没有给出回应,唉……真难啊&#xff01;&…

4.2 线性表顺序表

目录 目录结构 线性表 线性表的特征&#xff1a; 顺序表存储结构的表示 顺序表存储结构的特点 顺序存储结构的表示 线性表的基本运算 基本运算的相关算法 线性表的基本运算 线性表 目录结构 线性表 线性表是包含若干数据元素的一个线性序列 记为&#xff1a; L(a0, …

Istio virtual service 故障注入之延时(fixedDelay)、中断(abort)

Istio 故障注入 Istio 故障注入与其他在网络层引入错误&#xff08;例如延迟数据包或者直接杀死 Pod&#xff09;的机制不同&#xff0c;Istio 允许在应用程序层注入故障。这使得可以注入更多相关的故障&#xff0c;比如 HTTP 错误代码等。 Istio 可以注入两种类型的故障&…