Flink 系列教程传送门
第一章 Flink 简介
第二章 Flink 环境部署
第三章 Flink DataStream API
第四章 Flink 窗口和水位线
第五章 Flink Table API&SQL
第六章 新闻热搜实时分析系统
一、Flink架构
Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。它集成了所有常见的集群资源管理器,例如Hadoop YARN,但也可以设置作为独立集群运行。
Flink 运行时由两种类型的进程组成:一个JobManager
和一个或者多个TaskManager
。
每个 Flink 应用都需要有执行环境,DataStream API 将应用构建为一个 job graph,并附加到 StreamExecutionEnvironment 。当调用 env.execute() 时此 graph 就被打包并发送到 JobManager 上,后者对作业并行处理并将其子任务分发给 Task Manager 来执行。每个作业的并行子任务将在 task slot 中执行。
注意,如果没有调用 execute(),应用就不会运行。
Flink Program
:Client
是用于准备数据流并将其发送给JobManager
,客户端可以作为触发执行Java/Scala
程序的一部分运行,也可以在命令行进程./bin/flink run ...
中运行。- Graph 根据用户编写的代码生成最初的图,表示程序的拓扑结构。
JobManager
:JobManager
具有许多与协调Flink
应用程序的分布式执行有关的职责:它决定何时调度下一个Task
(或一组Task
)、对完成的Task
或执行失败做出反应、协调checkpoint
、并且协调从失败中恢复等等。始终至少有一个JobManager
。高可用(HA)设置中可能有多个JobManager
,其中一个始终是Leader
,其他的则是Standby
。TaskManagers
:TaskManager
也称为Worker
用于执行作业流的Task
,并且缓存和交换数据流。必须始终至少有一个TaskManager
。在TaskManager
中资源调度的最小单位是Task Slot
。TaskManager
中Task Slot
的数量表示并发处理Task
的数量。Task Slots
:每个Worker(TaskManager)
都是一个JVM
进程,可以在单独的线程中执行一个或多个SubTask
。为了控制一个TaskManager
中接受多少个Task
,就有了所谓的Task Slots
(至少一个)。每个Task Slot
代表TaskManager
中资源的固定子集。
二、Flink本地模式部署
在 Local
模式下,不需要启动任何的进程,仅仅是使用本地线程来模拟 Flink
的进程,适用于测试、开发调试等,这种模式下,不用更改任何配置,只需要保证 JDK8
安装正常即可。
Flink-1.14.4官方下载地址 | Standalone官方部署 | Local官方部署
本地模式环境部署步骤如下:
# 1. 下载安装包并上传到/usr/local/src 目录
# 2. 解压安装包并重命名为flink
[root@node src]$ tar -zxf flink-1.14.4-bin-scala_2.12.tgz
[root@node src]$ tar -zxf jdk-8u111-linux-x64.tar.gz
# 3. 修改安装包所属用户和用户组权限
[root@node src]$ chown -R root.root flink-1.14.5
# 4. 配置Flink环境变量并重新加载使其生效
[root@node src]$ vim ~/.bash_profile
export JAVA_HOME=/usr/local/src/jdk1.8.0_111/
export FLINK_HOME=/usr/local/src/flink-1.14.5/
export PATH=$PATH:$JAVA_HOME/bin:$FLINK_HOME/bin
[root@node src]$ source ~/.bash_profile
# 4. 启动"集群"
[root@node src]$ start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node.
Starting taskexecutor daemon on host node.
[root@node src]$ jps
17186 Jps
17078 TaskManagerRunner
16809 StandaloneSessionClusterEntrypoint
# 5. 访问WebUI界面8081端口查看运行情况
启动两个进程成功之后,访问
8081
端口号即可访问到 Flink 的 Web 管理界面。注意还需要配置IP地址映射和SSH免密登录,这里就不展开说明了。
入门案例演示
Flink内置入门案例演示,具体
# 进入Flink默认提供的案例jar包所在目录
[root@node streaming]$ pwd
/usr/local/src/flink/examples/streaming
[root@node streaming]$ ll
总用量 7208
-rw-r--r--. 1 1000 1001 13591 2月 25 20:47 Iteration.jar
-rw-r--r--. 1 1000 1001 9357 2月 25 20:47 SessionWindowing.jar
-rw-r--r--. 1 1000 1001 10596 2月 25 20:47 SocketWindowWordCount.jar
-rw-r--r--. 1 1000 1001 3704625 2月 25 20:47 StateMachineExample.jar
-rw-r--r--. 1 1000 1001 13047 2月 25 20:47 TopSpeedWindowing.jar
-rw-r--r--. 1 1000 1001 3581076 2月 25 20:47 Twitter.jar
-rw-r--r--. 1 1000 1001 16879 2月 25 20:47 WindowJoin.jar
-rw-r--r--. 1 1000 1001 11077 2月 25 20:47 WordCount.jar
# 运行词频统计案例,默认可以不用输入词频文件,系统自带,也可以通过 --input 指定输入文件目录, -- output 指定输出文件目录
[root@node streaming]$ flink run WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 750fb8771b02268eb03416bbe4df548d
Program execution finished
Job with JobID 750fb8771b02268eb03416bbe4df548d has finished.
Job Runtime: 1284 ms
# 查看日志中任务执行结果输出文件的前10条数据
[root@node log]$ tail $FLINK_HOME/log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)
# 指定词频文件和输出目录
[root@node streaming]$ flink run WordCount.jar --input /root/flink_data/words.txt --output /root/output
Job has been submitted with JobID 21424abc04bcbc12e497ca1e049a7fb4
Program execution finished
Job with JobID 21424abc04bcbc12e497ca1e049a7fb4 has finished.
Job Runtime: 536 ms
# 查看统计结果
[root@node streaming]$ cat /root/output
(hello,1)
(apache,1)
(flink,1)
(hello,2)
(java,1)
(hello,3)
(apache,2)
(hello,4)
(flink,2)
(java,2)
(flink,3)
(scala,1)
(flink,4)
三、Flink Standalone独立集群模式部署
使用StandAlone
模式,需要启动Flink
的主节点JobManager
以及从节点TaskManager
。
服务名称 | node01 | node02 | node03 |
|
| × | × |
|
|
|
|
1、更改配置文件
停止Master
服务器上的服务进程(stop-cluster.sh
),然后修改 Master
服务器配置文件(conf/flink-conf.yaml
)。
指定主节点(node01)和任务槽个数和并行度(可以默认)
jobmanager.rpc.address: node01
jobmanager.rpc.port: 6123
taskmanager.numberOfTaskSlots: 2 # 任务槽个数,默认是1
parallelism.default: 1 # 默认并行度
指定JobManager(Master)
节点,修改配置文件(conf/master
),替换信息为:node01:8081
指定TaskManager(Worker)
节点,修改配置文件(conf/workers
),替换信息为:node01、node02、node03。注意需要换行,一个Worker节点占据一行。
2、分发Flink安装包配置到另外两个节点
使用Linux scp命令把node01节点的配置分发到另外两个节点上。
# 使用 scp 分发
[root@node01 conf]$ cd /usr/local
[root@node01 local]$ scp -r flink node02:/usr/local/
[root@node01 local]$ scp -r flink node03:/usr/local/
注意:这里需要配置IP地址映射和SSH免密登录。
3、启动Flink集群
# 批量统一启动
[root@node01 bin]$ start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node01.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.
# 单独启动
[root@node01 bin]$ jobmanager.sh start / stop
[root@node01 bin]$ taskmanager.sh start / stop
# 使用脚本查看服务进程
[root@node01 bin]$ jps
4、执行入门案例
四、使用Maven工具把当前词频统计案例打jar包到Linux集群中执行
1、导入打包所需依赖插件
默认Scala文件不会编译为Class字节码文件,在pom.xml中添加打包插件,这里提供两种方式,二选一。
Maven打包-指定Scala代码编译器
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.7.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Maven定制打包插件-生成完全版和简单版jar包
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
2、设置动态参数
修改批处理词频统计的输入文件路径为参数传递
修改流处理词频统计的主机名和端口为参数传递
// 批处理 FlinkBatchWordCountDemo
val params = ParameterTool.fromArgs(args)
val fileDataSet = env.readTextFile(params.get("input"))
// 流处理 FlinkStreamWordCountDemo
val params = ParameterTool.fromArgs(args)
val textDataStream = env.socketTextStream(params.get("host"), params.getInt("port"))
使用IDEA工具测试上述配置是否可以正常接收参数,执行程序
3、使用Maven package进行打包
上传打好的jar包到Linux平台
五、在Flink集群环境中运行Flink程序
1、使用flink run命令运行
通过flink run 命令进行测试执行,可以通过8081端口查看运行详情
// 批处理测试
flink run -j flink_demo-1.0-SNAPSHOT.jar -c FlinkBatchWordCountDemo --input /root/words.txt
// 流处理测试
nc -lk 888
flink run -j flink_demo-1.0-SNAPSHOT.jar -c FlinkStreamWordCountDemo --host node01 --port 888
2、使用Web UI界面提交
通过Flink Web UI界面的Submit New Job选项运行程序,这种方式不用上传jar包Linux中,直接本地选择。