本篇为Flink的第二、三部分,Flink快速上手和Flink部署,全篇参考自 尚硅谷2022版1.13系列
整个系列的目录如下:
💚<一>Flink简介 💚<二>Flink快速上手 💚<三>Flink 部署 💚<四>Flink 运行时架构 💚<五>DataStream API 💚<六>Flink 中的时间和窗口 💚<七>处理函数 💚<八>多流转换 💚<九>状态编程 💚<十>容错机制 💚<十一>Table API 和 SQL 💚<十二>Flink CEP
本文章会着重记录比较重要知识点和架构,略过不重要信息和一些代码等,需要全文可以到b站观看,顺便投俩币。
<二、三> Flink快速上手和部署
知识点十二. 快速上手Flink项目的步骤
(1)在IDEA用Maven创建new Project
(2)添加项目依赖,主要是Flink依赖(flink-java、flink-streaming-java、flink-clients)和日志管理依赖(slf4j和log4j)
(3)配置日志管理(添加log4j.properties文件)
知识点十三. 用Flink做WordCount程序的步骤(批处理)(DataSet方法)
(1)创建执行环境
(2)从文件读取数据 按行读取(存储的元素就是每行的文本)(readTextFile)
(3)转换数据格式,调用 flatmap 方法可以对一行文字 进行分词转换,转换成(word,count)形式的二元组
(4)按照 word 进行分组,调用了 groupBy 方法
(5)分组内聚合统计,调用 sum 方法进行聚合
public class BatchWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
DataSource<String> lineDS = env.readTextFile("input/words.txt");
// 3. 转换数据格式
FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG)); //当 Lambda 表达式
使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
// 4. 按照 word 进行分组
UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG =
wordAndOne.groupBy(0);
// 5. 分组内聚合统计
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
// 6. 打印结果
sum.print();
}
}
知识点十四. 用Flink做WordCount程序的步骤(流处理读取文件)(DataStream)
(1)创建流式执行环境StreamExecutionEnvironment
(2)读取文件,readTextFile
(3)转换数据格式,用flatMap转换成二元组
(4)调用keyBy进行分组,传入一个匿名函数作为键选择器 (KeySelector),指定当前分组的 key
(5)调用sum求和
(6)代码末尾需要调用 env 的 execute 方法,开始执行任务。
public class BoundedStreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
.sum(1);
// 6. 打印
result.print();
// 7. 执行
env.execute();
}
}
知识点十五. 用Flink做WordCount程序的步骤(流处理读取文件流)(实时监听)
(1)创建流式执行环境StreamExecutionEnvironment
(2)读取文件流,设置主机名和端口号或者从配置文件中读取
(3)转换数据格式,用flatMap转换成二元组
(4)调用keyBy进行分组
(5)调用sum求和
(6)代码末尾需要调用 env 的 execute 方法,开始执行任务。
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文本流
DataStreamSource<String> lineDSS = env.socketTextStream("hadoop102",
7777);
// 3. 转换数据格式
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4. 分组
KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
.sum(1);
// 6. 打印
result.print();
// 7. 执行
env.execute();
}
}
知识点十六. Flink集群安装步骤
(1)前置安装 linux系统,Java8,hadoop集群,配置环境变量,免密登录,关闭防火墙
(2)对安装包解压并安装,执行启动命令,通过jps可查看进程
(3)在其他节点解压并安装,修改集群配置,指定JobManager 节点,修改workers 文件,指定TaskManager 节点
知识点十七. Flink启动类型
(1)本地启动
最简单的启动方式,其实是不搭建集群,直接本地启动。本地部署非常简单,直接解压安 装包就可以使用,不用进行任何配置;一般用来做一些简单的测试。
(2)集群启动
如果想要扩 展成集群,其实启动命令是不变的,主要是需要指定节点之间的主从关系。指定JobManager 节点,修改workers 文件,指定TaskManager 节点即可。
知识点十八. Flink提交作业方式
(1)在 Web UI 上提交作业
(2)命令行提交作业
知识点十九. Flink部署模式
(1)会话模式
启动一个集群,保持一个会话,在这个会话中 通过客户端提交作业,集群启动时所有资源就都已经确定,所以所有提交的 作业会竞争集群中的资源。缺点也是显而易见的:因为资源是共享的,所以资源不够了,提交新的 作业就会失败。另外,同一个 TaskManager 上可能运行了很多作业,如果其中一个发生故障导 致 TaskManager 宕机,那么所有作业都会受到影响。会话模式比较适合于单个规模小、执行时间短的大量作业。
(2)单作业模式
单作业模式是严格的一对一,集群只为这个作业而生。作业被提交给 JobManager,进而分发给 TaskManager 执行。每个作业都有它自己的 JobManager 管理,占用独享的资源,即使发生故障,它的 TaskManager 宕机也不会影响其他作业。这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。注意的是,单作业模式一般需要借助一些资源管 理框架来启动集群,比如 YARN、Kubernetes。
(3)应用模式
前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给 JobManager 的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给 JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的 资源消耗。所以解决办法就是,我们不要客户端了,直接把应用提交到 JobManger 上运行。而这也就 代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这 个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所 谓的应用模式。应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交 的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应 用程序的,并且即使应用包含了多个作业,也只创建一个集群。
知识点二十. Flink部署模式(结合资源管理平台)
(1)独立模式(Standalone)
独立模式(Standalone)是部署 Flink 最基本也是最简单的方式:所需要的所有 Flink 组件, 都只是操作系统上运行的一个 JVM 进程。 独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果 资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式 一般只用在开发测试或作业非常少的场景下。
(2)YARN 模式
Flink 是大数据计算框架,不是资源 调度框架,这并不是它的强项;让专业的框架做专业的事,使用国内应用最为广泛的资源管理平台YARN来做资源管理的任务。客户端把 Flink 应用提交给 Yarn 的 ResourceManager, Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业 所需要的 Slot 数量动态分配 TaskManager 资源。
(3)K8S 模式
容器化部署是如今业界流行的一项技术,基于 Docker 镜像运行能够让用户更加方便地对 应用进行管理和运维。容器管理工具中最为流行的就是 Kubernetes(k8s),而 Flink 也在最近 的版本中支持了 k8s 部署模式,基本原理与 YARN 是类似的。