在当今大数据蓬勃发展的时代,数据处理的时效性愈发关键。传统基于先存储再批量处理的数据方式,在面对诸如网站实时监控、异常日志即时分析等场景时,显得力不从心。随着 5G、物联网等技术的兴起,海量数据如潮水般涌来,且对实时处理需求激增,流式计算应运而生,而 Flink 作为流式计算领域的佼佼者,正散发着独特魅力,接下来就让我们深入探究 Flink 的安装与入门知识。
一、流式计算简介
(一)数据的时效性
日常工作中,数据处理按时间粒度不同,对时效性要求各异。处理年度、月度数据做统计分析、个性化推荐时,数据最新日期滞后数月无妨;但处理天级、小时级甚至更小粒度数据,像双 11 大屏实时展示、12306 系统实时监控、语雀异常日志即时处理等场景,传统收集 - 存储 - 分析流程难以满足高时效需求,急需新的数据处理模式。
(二)流式计算和批量计算
批量计算
遵循统一收集数据、存储到数据库(DB),再对数据批量处理流程,维护数据表,在表上执行各种计算逻辑,处理全部数据后输出结果,如 Map Reduce、Hive、Spark Batch 常用于此模式生成离线报表。
流式计算
针对持续流动数据流实时处理,数据边流入边计算,计算后丢弃。需提前定义好计算逻辑并提交至流式计算系统,且运行期间不可更改,每次小批量计算结果可实时展现,像 Storm、Flink 等流式分析引擎用于实时大屏、实时报表(Spark Struct Streaming 为准实时)。
(三)流式计算流程和特性
流程
提交流计算作业,等待流式数据触发,持续输出计算结果。
特性
具备实时、低延迟优势,处理无界(持续输入无终止)数据,计算连续进行,数据处理后丢弃。
(四)实时即未来
大数据时代,数据量暴增、来源多样、产生快速,传统批处理与早期流式框架受限于延迟、吞吐量、容错及便捷性,难以满足如实时监控报警、风控、推荐系统需求,Flink 凭借天然流式特性与先进架构崭露头角。
二、Flink 概述
(一)Flink 的引入
在大数据计算引擎发展历程中,有观点将其分四代:
第 1 代 ——Hadoop MapReduce
MapReduce将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法
第 2 代 ——DAG 框架(Tez) + MapReduce
为克服一代弊端,支持 DAG 框架诞生,如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。
第 3 代 ——Spark
以Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越 Job),以及强调的实时计算。在这里,很多人也会认为第三代计算引擎也能够很好的运行批处理的 Job。
第 4 代 ——Flink
突出对流计算支持与更高实时性,也能承担 Batch 任务、执行 DAG 运算,自带批处理、流处理、SQL 高层 API,流式计算性能、可靠性出众。
(二)Flink 发展史
诞生背景
源于 2010 - 2014 年柏林等地大学开展的 Stratosphere 项目,2014 年 4 月捐赠给 Apache 软件基金会,同年 12 月成顶级项目;2008 年其前身已是研究项目,2014 年更名 Flink,用 Java 编写,后续历经多版本迭代,2019 年被阿里巴巴以 9000 万欧元收购其母公司 Data Artisans。
官方介绍
官网(Apache Flink Documentation | Apache Flink)宣称其是为分布式、高性能、随时可用且准确流处理程序打造的开源框架,可兼做流处理与批处理。
Flink是一款分布式的计算引擎,它可以用来做流处理;也可以用来做批处理。
编程语言
Flink官方提供了Java、Scala、Python语言接口用以开发Flink应用程序,但是Flink的源码是使用Java语言进行开发的,且Flink被阿里收购后,未来的主要编程语言都一直会是Java(因为阿里是Java重度使用者!),且GitHub上关于Flink的项目,大多数是使用Java语言编写的。所以课程中以Java语言为主进行Flink的学习讲解。
Flink 中的批和流
批处理有界、持久、量大,适合离线统计;流处理无界、实时,逐个处理数据项,用于实时统计。Flink 视有界数据集为无界流特例,区分有界流(有明确起止,可排序后处理)与无界流(需连续按序处理)。
无界流:意思很明显,只有开始没有结束。必须连续的处理无界流数据,也即是在事件注入之后立即要对其进行处理。不能等待数据到达了再去全部处理,因为数据是无界的并且永远不会结束数据注入。处理无界流数据往往要求事件注入的时候有一定的顺序性,例如可以以事件产生的顺序注入,这样会使得处理结果完整。
有界流:也即是有明确的开始和结束的定义。有界流可以等待数据全部注入完成了再开始处理。注入的顺序不是必须的了,因为对于一个静态的数据集,我们是可以对其进行排序的。有界流的处理也可以称为批处理。
性能比较
运行在 Hadoop YARN 上时,性能 Flink > Spark > Hadoop (MR),因 Flink 支持增量迭代与自动优化,迭代次数多优势更显著。
应用场景
可以看到,各种行业的众多公司都在使用Flink。具体来看,一些行业中的典型应用有:
- 电商和市场营销
举例:实时数据报表、广告投放、实时推荐
- 物联网(IOT)
举例:传感器实时数据采集和显示、实时报警,交通运输业
- 物流配送和服务业
举例:订单状态实时更新、通知信息推送。
- 银行和金融业
举例:实时结算和通知推送,实时检测异常行为。
其他
Flink 是目前开源社区中唯一一套集高吞吐、低延迟、高性能三者于一身的分布式流式数据处理框架。
Spark 只能兼顾高吞吐和高性能特性,无法做到低延迟保障,因为Spark是用批处理来做流处理。
Storm 只能支持低延时和高性能特性,无法满足高吞吐的要求。
三、Standalone 集群模式安装部署
conda deactivate 退出 base环境
Flink支持多种安装模式。
local(本地)——本地模式
standalone——独立模式,Flink自带集群,开发测试环境使用
standaloneHA—独立集群高可用模式,Flink自带集群,开发测试环境使用
yarn——计算资源统一由Hadoop YARN管理,生产环境测试
下载链接:
官网地址:https://archive.apache.org/dist/flink/flink-1.13.1/flink-1.13.1-bin-scala_2.11.tgz
通过网盘分享的文件:flink-1.13.6-bin-scala_2.11.tgz
上传Flink安装包,解压,配置环境变量
上传至/opt/modules/下
[root@hadoop11 modules]# tar -zxf flink-1.13.6-bin-scala_2.11.tgz -C /opt/installs/
[root@hadoop11 installs]# mv flink-1.13.6/ flink
[root@hadoop11 installs]# vim /etc/profile
export FLINK_HOME=/opt/installs/flink
export PATH=$PATH:$FLINK_HOME/bin
export HADOOP_CONF_DIR=/opt/installs/hadoop/etc/hadoop
记得source /etc/profile
修改配置文件
① /opt/installs/flink/conf/flink-conf.yaml
jobmanager.rpc.address: bigdata01
taskmanager.numberOfTaskSlots: 2
web.submit.enable: true
#历史服务器 如果HDFS是高可用,则复制core-site.xml、hdfs-site.xml到flink的conf目录下 hadoop11:8020 -> hdfs-cluster
jobmanager.archive.fs.dir: hdfs://bigdata01:9820/flink/completed-jobs/
historyserver.web.address: bigdata01
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://bigdata01:9820/flink/completed-jobs/
② /opt/installs/flink/conf/masters
bigdata01:8081
③ /opt/installs/flink/conf/workers
bigdata01
bigdata02
bigdata03
上传jar包
将flink-shaded-hadoop-2-uber-2.7.5-10.0.jar放到flink的lib目录下
通过网盘分享的文件:flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
分发
xsync.sh /opt/installs/flink
xsync.sh /etc/profile
大数据集群中实用的三个脚本文件解析与应用-CSDN博客
启动
#启动HDFS
start-dfs.sh
#启动集群
start-cluster.sh
#启动历史服务器
historyserver.sh start
假如 historyserver 无法启动,也就没有办法访问 8082 服务,原因大概是你没有上传 关于 hadoop 的 jar 包到 lib 下:
观察webUI
http://bigdata01:8081 -- Flink集群管理界面 当前有效,重启后里面跑的内容就消失了
能够访问8081是因为你的集群启动着呢
http://bigdata01:8082 -- Flink历史服务器管理界面,及时服务重启,运行过的服务都还在
能够访问8082是因为你的历史服务启动着
两者的区别:首先可以先把服务都停止
然后再重启,发现8081上已经完成的任务中是空的,而8082上的历史任务都还在,原因是8082读取了hdfs上的一些数据,而8081没有。
但是从web提供的功能来看,8081提供的功能还是比8082要丰富的多。
提交官方示例
flink run /opt/installs/flink/examples/batch/WordCount.jar
或者
flink run /opt/installs/flink/examples/batch/WordCount.jar --input 输入数据路径 --output 输出数据路径
例如:
flink run /opt/installs/flink/examples/batch/WordCount.jar --input /home/wc.txt --output /home/result
运行以上案例时,会出现有时候运行成功,有时候运行失败的问题:
Caused by: java.io.FileNotFoundException: /home/wc.txt (没有那个文件或目录)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
at org.apache.flink.api.common.io.FileInputFormat$InputSplitOpenThread.run(FileInputFormat.java:1053)
原因是:你的 taskManager 有三台,你的数据只在本地存放一份,所以需要将数据分发给 bigdata02 和 bigdata03
xsync.sh /home/wc.txt
四、总结
Flink 在流式计算浪潮中凭借卓越性能、丰富特性、广泛应用场景脱颖而出,掌握其安装与基础概念只是第一步,后续深入学习流处理编程模型、算子运用、优化策略等,将助我们挖掘大数据实时处理无限潜力,高效应对数字化时代数据挑战。