第二章 Flink 环境部署

news2025/2/1 4:03:36

Flink 系列教程传送门

第一章 Flink 简介

第二章 Flink 环境部署

第三章 Flink DataStream API

第四章 Flink 窗口和水位线

第五章 Flink Table API&SQL

第六章 新闻热搜实时分析系统

一、Flink架构

Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。它集成了所有常见的集群资源管理器,例如Hadoop YARN,但也可以设置作为独立集群运行。

Flink 运行时由两种类型的进程组成:一个JobManager和一个或者多个TaskManager

每个 Flink 应用都需要有执行环境,DataStream API 将应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。

注意,如果没有调用 execute(),应用就不会运行。

  • Flink ProgramClient是用于准备数据流并将其发送给 JobManager,客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程./bin/flink run ...中运行。
  • Graph 根据用户编写的代码生成最初的图,表示程序的拓扑结构。
  • JobManagerJobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 Task(或一组 Task)、对完成的 Task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 Leader,其他的则是 Standby
  • TaskManagersTaskManager也称为 Worker用于执行作业流的 Task并且缓存和交换数据流。必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 Task SlotTaskManagerTask Slot 的数量表示并发处理 Task 的数量。
  • Task Slots :每个 Worker(TaskManager)都是一个 JVM 进程,可以在单独的线程中执行一个或多个 SubTask。为了控制一个 TaskManager 中接受多少个 Task,就有了所谓的 Task Slots(至少一个)。每个 Task Slot 代表 TaskManager 中资源的固定子集。

二、Flink本地模式部署

Local模式下,不需要启动任何的进程,仅仅是使用本地线程来模拟 Flink 的进程,适用于测试、开发调试等,这种模式下,不用更改任何配置,只需要保证 JDK8 安装正常即可。

Flink-1.14.4官方下载地址 | Standalone官方部署 | Local官方部署

本地模式环境部署步骤如下:

# 1. 下载安装包并上传到/usr/local/src 目录
# 2. 解压安装包并重命名为flink
[root@node src]$ tar -zxf flink-1.14.4-bin-scala_2.12.tgz
[root@node src]$ tar -zxf jdk-8u111-linux-x64.tar.gz
# 3. 修改安装包所属用户和用户组权限
[root@node src]$ chown -R root.root flink-1.14.5
# 4. 配置Flink环境变量并重新加载使其生效
[root@node src]$ vim ~/.bash_profile
export JAVA_HOME=/usr/local/src/jdk1.8.0_111/
export FLINK_HOME=/usr/local/src/flink-1.14.5/
export PATH=$PATH:$JAVA_HOME/bin:$FLINK_HOME/bin
[root@node src]$ source ~/.bash_profile
# 4. 启动"集群"
[root@node src]$ start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node.
Starting taskexecutor daemon on host node.
[root@node src]$ jps
17186 Jps
17078 TaskManagerRunner
16809 StandaloneSessionClusterEntrypoint
# 5. 访问WebUI界面8081端口查看运行情况

启动两个进程成功之后,访问 8081 端口号即可访问到 Flink 的 Web 管理界面。注意还需要配置IP地址映射和SSH免密登录,这里就不展开说明了。

 入门案例演示

Flink内置入门案例演示,具体

# 进入Flink默认提供的案例jar包所在目录
[root@node streaming]$ pwd
/usr/local/src/flink/examples/streaming
[root@node streaming]$ ll
总用量 7208
-rw-r--r--. 1 1000 1001   13591 2月  25 20:47 Iteration.jar
-rw-r--r--. 1 1000 1001    9357 2月  25 20:47 SessionWindowing.jar
-rw-r--r--. 1 1000 1001   10596 2月  25 20:47 SocketWindowWordCount.jar
-rw-r--r--. 1 1000 1001 3704625 2月  25 20:47 StateMachineExample.jar
-rw-r--r--. 1 1000 1001   13047 2月  25 20:47 TopSpeedWindowing.jar
-rw-r--r--. 1 1000 1001 3581076 2月  25 20:47 Twitter.jar
-rw-r--r--. 1 1000 1001   16879 2月  25 20:47 WindowJoin.jar
-rw-r--r--. 1 1000 1001   11077 2月  25 20:47 WordCount.jar

# 运行词频统计案例,默认可以不用输入词频文件,系统自带,也可以通过 --input 指定输入文件目录, -- output 指定输出文件目录
[root@node streaming]$ flink run WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 750fb8771b02268eb03416bbe4df548d
Program execution finished
Job with JobID 750fb8771b02268eb03416bbe4df548d has finished.
Job Runtime: 1284 ms

# 查看日志中任务执行结果输出文件的前10条数据
[root@node log]$ tail $FLINK_HOME/log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)

# 指定词频文件和输出目录
[root@node streaming]$ flink run WordCount.jar --input /root/flink_data/words.txt --output /root/output
Job has been submitted with JobID 21424abc04bcbc12e497ca1e049a7fb4
Program execution finished
Job with JobID 21424abc04bcbc12e497ca1e049a7fb4 has finished.
Job Runtime: 536 ms

# 查看统计结果
[root@node streaming]$ cat /root/output
(hello,1)
(apache,1)
(flink,1)
(hello,2)
(java,1)
(hello,3)
(apache,2)
(hello,4)
(flink,2)
(java,2)
(flink,3)
(scala,1)
(flink,4)

三、Flink Standalone独立集群模式部署

使用StandAlone模式,需要启动Flink的主节点JobManager以及从节点TaskManager

服务名称

node01

node02

node03

JobManager

×

×

TaskManager

1、更改配置文件

停止Master服务器上的服务进程(stop-cluster.sh),然后修改 Master 服务器配置文件(conf/flink-conf.yaml)。

指定主节点(node01)和任务槽个数和并行度(可以默认)

jobmanager.rpc.address: node01
jobmanager.rpc.port: 6123
taskmanager.numberOfTaskSlots: 2  # 任务槽个数,默认是1
parallelism.default: 1            # 默认并行度

指定JobManager(Master)节点,修改配置文件(conf/master),替换信息为:node01:8081

指定TaskManager(Worker)节点,修改配置文件(conf/workers),替换信息为:node01node02node03。注意需要换行,一个Worker节点占据一行。

2、分发Flink安装包配置到另外两个节点

使用Linux scp命令把node01节点的配置分发到另外两个节点上。

# 使用 scp 分发
[root@node01 conf]$ cd /usr/local
[root@node01 local]$ scp -r flink node02:/usr/local/
[root@node01 local]$ scp -r flink node03:/usr/local/

注意:这里需要配置IP地址映射和SSH免密登录。

3、启动Flink集群

# 批量统一启动
[root@node01 bin]$ start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node01.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.

# 单独启动
[root@node01 bin]$  jobmanager.sh start / stop
[root@node01 bin]$  taskmanager.sh start / stop

# 使用脚本查看服务进程
[root@node01 bin]$ jps

4、执行入门案例

四、使用Maven工具把当前词频统计案例打jar包到Linux集群中执行

1、导入打包所需依赖插件

默认Scala文件不会编译为Class字节码文件,在pom.xml中添加打包插件,这里提供两种方式,二选一。

Maven打包-指定Scala代码编译器

<build>
  <plugins>
    <plugin>
      <groupId>net.alchim31.maven</groupId>
      <artifactId>scala-maven-plugin</artifactId>
      <version>4.7.1</version>
      <executions>
        <execution>
          <goals>
            <goal>compile</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

Maven定制打包插件-生成完全版和简单版jar包

<build>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>3.3.0</version>
      <configuration>
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>single</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

2、设置动态参数

修改批处理词频统计的输入文件路径为参数传递

修改流处理词频统计的主机名和端口为参数传递

// 批处理 FlinkBatchWordCountDemo
val params = ParameterTool.fromArgs(args)
val fileDataSet = env.readTextFile(params.get("input"))

// 流处理 FlinkStreamWordCountDemo
val params = ParameterTool.fromArgs(args)
val textDataStream = env.socketTextStream(params.get("host"), params.getInt("port"))

使用IDEA工具测试上述配置是否可以正常接收参数,执行程序

3、使用Maven package进行打包 

上传打好的jar包到Linux平台

五、在Flink集群环境中运行Flink程序

1、使用flink run命令运行

通过flink run 命令进行测试执行,可以通过8081端口查看运行详情

// 批处理测试
flink run -j flink_demo-1.0-SNAPSHOT.jar -c FlinkBatchWordCountDemo --input /root/words.txt

// 流处理测试
nc -lk 888
flink run -j flink_demo-1.0-SNAPSHOT.jar -c FlinkStreamWordCountDemo --host node01 --port 888

 2、使用Web UI界面提交

通过Flink Web UI界面的Submit New Job选项运行程序,这种方式不用上传jar包Linux中,直接本地选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/140215.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python-turtle画图

认识TurtleTurtle是一个渲染器基于底层图形编程结构(API)构建&#xff0c;主要用于场景的构建以及3D物体的绘制(3D游戏、虚拟场景等)Turtle是一个窗体程序Turtle是Python语言中的一个很流行的绘制图像的函数库&#xff0c;想象一个小海龟在一个横轴为x&#xff0c;纵轴为y的坐标…

时序数据库 TDengine 携手北京科技大学设计研究院,助力冶金工业智慧化

北京科技大学设计研究院有限公司作为北京科技大学全资产业化技术推广机构&#xff0c;从 2013 年开始在冶金、钢铁行业进行业务系统开发和实施&#xff0c;围绕先进材料、绿色低碳和智能制造不断深耕细作&#xff0c;持续创新。其拥有高效轧制与智能制造国家工程研究中心、国家…

DPDK工作原理和环境搭建

DPDK工作原理DPDK环境搭建编译DPDKDPDK工作原理DPDK实践之处理UDP数据总结DPDK环境搭建 工具准备&#xff1a;VMware、ubuntu16.04。 &#xff08;1&#xff09;VMware添加两个网卡。桥接网卡作为 DPDK 运行的网卡&#xff0c;NAT 网卡作为 ssh 连接的网卡。 &#xff08;2&…

后台交互—springboot+mybatis整合小程序(源码演示)

后台准备pom.xml<?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM…

【案例实战】SpringBoot整合GRPC微服务远程通信

1.什么是GRPC GRPC是RPC框架中的一种&#xff0c;是一个高性能&#xff0c;开源和通用的RPC框架&#xff0c;基于Protobuf序列化协议开发&#xff0c;且支持众多开发语言。 面向服务端和协议端&#xff0c;基于http/2设计&#xff0c;带来诸如双向流&#xff0c;流控&#xff…

足球视频AI(三)——YOLOV7目标检测自训练模型

一、基础概念 YoloV7提供的yolov7-tiny.onnx 对于图像中包含较大尺寸的足球检测准确率高。 但在实际应用中&#xff0c;足球视频中的足球非常小&#xff0c;默认的模型难于满足实际的足球检测需求。 1.1 识别目标 1&#xff09;固定机位的视频中足球的逐帧识别 1.2 实现思…

邮箱2023系统

邮箱2023系统 前言 VMMail作为一款开源的邮件系统&#xff0c;目前已经发布到了10.0版本。 该版本在 GitHub上是免费的&#xff0c;且代码也是开源的&#xff0c;所以该程序不会对 GitHub上的所有用户造成任何影响。 由于 VMMail开发时采用了开源代码&#xff0c;并在 GitHub上…

区块链之bolt数据库持久化与基本功能完善

文章目录bolt数据库安装使用bolt进行持久化存储bolt持久化的基本步骤创世区块的持久化新增区块的持久化完善区块链基本功能创世区块创建增加区块遍历区块链链接&#xff1a; 区块链项目github地址项目目前进度&#xff1a;bolt数据库安装 bolt数据库介绍&#xff1a; bolt数据…

Vue3过渡动画实现

文章目录P14Vue3过渡&动画实现过渡动画的使用过渡CSS动画效果同时设置过渡和动画mode和appearanimate.cssgsapgsap实现数字变化认识列表的过渡列表过渡的移动动画列表的交错过渡案例P14Vue3过渡&动画实现 过渡动画的使用 <template><button click"isShow…

进入新组织项目经理如何快速提升自己的影响力?

我们在工作中&#xff0c;经常以“对事不对人”来体现他们的专业性&#xff0c;但是这点并不符合人性。更多时候对人不对事&#xff0c;反倒能提高问题的解决能力。项目经理会发现&#xff0c;很多事情的推进&#xff0c;都建立在和对方的信任的基础上&#xff0c;所以先成为对…

使用Qt开发的linux嵌入式设备监控、管理框架,监测嵌入式设备运行状态,执行远程shell,远程升级,与客户端进行文件传输

linux SPY 简介 使用Qt开发的linux嵌入式设备监控、管理框架 [客户端]&#xff1a;aes_tcp_lib 完整代码下载地址&#xff1a;使用Qt开发的linux嵌入式设备监控、管理框架 开发环境 ubuntu 20Qt 5.12Qt Creator 4.13.1 核心功能 监测嵌入式设备运行状态转发客户端消息,…

划重点!企业在采购管理中应避免的10个错误

一家企业的采购能力在很大程度上取决于其采购管理系统的有效性。当系统运行良好时&#xff0c;那么采购就会相当顺利。如果你使用的是一个低效的系统&#xff0c;那就会导致一大堆常见的采购问题。 无论采购错误的根本原因是什么&#xff0c;任其发展&#xff0c;最终会给企业…

【ZooKeeper】第二章 JavaAPI 操作

【ZooKeeper】第二章 JavaAPI 操作 文章目录【ZooKeeper】第二章 JavaAPI 操作一、Curator 简介二、Curator API1.建立连接2.创建节点3.查询节点4.修改节点5.删除节点6.Watch 事件监听三、分布式锁四、案例&#xff1a;12306售票一、Curator 简介 Curator 是 Apache ZooKeeper…

【云原生进阶之容器】第二章Controller Manager原理2.3节--Reflector分析

1 Reflector组件 1.1 背景 Reflector 是保证 Informer 可靠性的核心组件,在丢失事件,收到异常事件,处理事件失败等多种异常情况下需要考虑的细节很多。单独的listwatcher缺少重新连接和重新同步机制,有可能出现数据不一致问题。其对事件响应是同步的,如果执行复杂的操作会…

把财务分析明白的BI软件有哪些?

有财务分析这一个地狱级的在&#xff0c;什么销售、采购、库存都是渣渣。在财务分析中&#xff0c;指标运算组合可以有多样化的改变&#xff0c;资金来来回回能把人绕晕&#xff0c;一个极不起眼的疏忽都可能导致所有工作推到重来的可怕后果。即使是在擅长做大数据智能可视化分…

MySQL基础命令

MySQL基础命令 创建数据库 Show databases;查看 选择数据库 Use test; 在test数据库中创建表 create table t_stu( id int, name varchar(20) ); 查看表信息 desc t_stu&#xff1b; 添加字段 alter table t_stu add age int; 字段age成功添加 修改字段 alter tab…

springBoot使用rabbitmq并保证消息可靠性

一、理论说明 1.1、数据的丢失问题&#xff0c;可能出现在生产者、MQ、消费者中 1、如下图 1.2、生产者弄丢了数据 1、生产者将数据发送到 RabbitMQ 的时候&#xff0c;可能数据就在半路给搞丢了&#xff0c;因为网络问题啥的&#xff0c;都有可能。此时可以选择用RabbitM…

VS2019打包程序变成带运行环境的安装包

背景 给外行客户写程序的时候&#xff0c;为了避免客户麻烦&#xff0c;我们在写完程序之后&#xff0c;需要把运行环境也打包进安装包中&#xff0c;这样客户就可以一键安装使用。给客户减少麻烦的同时&#xff0c;无疑让我们也有了更多的好评。 步骤 1.写好我们要打包的程…

推荐16个前端必备的实用工具与网站

1. GitHub Desktop 对于新手来说&#xff0c;要记住那么多git命令可能有点困难&#xff0c;建议新手用git可视化工具&#xff0c;会方便很多 2. 图片在线压缩 tinypng 是一个完全免费并且高压缩率的在线压缩图片网站&#xff0c;一般能满足日常大部分压缩图片的需求&#x…

Visual Studio 2022最全安装教程(+背景图设置),一步步教会你如何安装并运行

目录visual studio 2022最全安装教程一、官网下载二、安装启动三、项目测试四、背景图设置 一、官网下载1.点击蓝色链接—->Visual Studio官网&#xff0c;进入之后是这个界面&#xff0c;选择社区版Community下载&#xff08;社区版Community是对个人免费的&#xff0c;一共…