Flink简介

news2025/1/9 1:38:18

Flink 系列教程传送门

第一章 Flink 简介

第二章 Flink 环境部署

第三章 Flink DataStream API

第四章 Flink 窗口和水位线

第五章 Flink Table API&SQL

第六章 新闻热搜实时分析系统


前言

流计算产品实时性有两个非常重要的实时性设计因素,一个是待计算的数据,一个是计算的时钟。低延时要求流计算框架尽可能早的输出计算结果,但是由于存在数据延时和现实业务数据更新的客观情况,就会导致你前一秒计算的结果,因为下一秒来了一个对上一秒已经参与计算的那条数据的更新,进而导致在下一秒时候上一秒的计算结果就是无效的了,那么流计算产品低延时需求导致流计算产品不可能无限制的等待延时数据的到来,这就一定会造成数据计算结果不精准的问题。如果流计算产品想让自己的计算结果更准确,那就需要忍受对延时数据进行更长时间的等待,那就意味着流计算产品的低延时无法达成,所以在流计算产品中鱼和熊掌兼得是不那么容易的。


一、Flink概述

在德语中,Flink 一词表示快速和灵巧,项目采用一只松鼠的彩色图案作为 logo。

Apache Flink是Apache软件基金会的一个顶级项目,是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架,并且可以同时支持实时计算和批量计算

Flink起源于Stratosphere 项目,该项目是在2010年到2014年间由柏林工业大学、柏林洪堡大学和哈索普拉特纳研究所联合开展的,开始是做批处理,后面转向了流处理。

  • 2014年4月,Stratosphere代码被贡献给Apache软件基金会,并改名为Flink,成为Apache软件基金会孵化器项目,并开始在开源大数据行业内崭露头角。
  • 2014年8月,团队的大部分创始成员离开大学,共同创办了一家名为Data Artisans的公司。
  • 2015年4月,Flink发布了里程碑式的重要版本0.9.0。
  • 2019年1月,长期对Flink投入研发的阿里巴巴,以9000万欧元的价格收购了Data Artiscans公司。
  • 2019年8月,阿里巴巴将内部版本Blink开源,合并入Flink1.9.0版本。

目前最新版本Flink为1.16.0版本,本系列课程我们采用Flink1.14.5进行讲解。

二、Flink编程模型

在自然环境中,数据的产生原本就是流式的。无论是来自 Web 服务器的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。但是当你分析数据时,可以围绕 有界流(bounded)或 无界流(unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。

  • 批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。
  • 流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。 

三、程序结构

在Hadoop中,实现一个MapReduce应用程序需要编写Map和Reduce两部分;实现一个Flink应用程序也需要同样的逻辑。一个Flink应用程序由3部分构成,或者说将Flink的操作算子可以分成3部分,分别为Source、Transformation和Sink,如图:

  • 数据源:Flink 在流处理和批处理上的数据源大概有4类:基于本地集合的数据源(fromCollectionfromElements)、基于文件的数据源(readTextFile)、基于网络套接字的数据源(socketTextStream)、自定义的数据源(KafkaSource)。常见的自定义数据源包括Kafka、RabbitMQ、NiFi等。
  • 数据转换:数据转换的各种操作包括map、 flatMap、filter、keyBy、reduce、aggregation、window、union、select等,可以将原始数据转换成满足要求的数据。
  • 数据输出:数据输出是指Flink将转换计算后的数据发送的目的地。常见的数据输出包括写入文件、打印到屏幕、写入Socket 、自定义Sink等 。

在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个汇(sink)结束。

四、总图概览

 Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis)的实时数据,也可以从各种的数据源中消费有界的历史数据。同样,Flink 应用程序生成的结果流也可以发送到各种数据汇中。

在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个(source)开始,并以一个或多个(sink)结束。

从代码到逻辑视图。逻辑视图中圆圈表示算子,箭头表示数据流,可以在Flink Web UI中查看一个作业的逻辑视图,大数据框架的算子对计算做了抽象,方便用户进行并行计算、横向扩展和故障恢复。

通常,程序代码中的 transformation 和 dataflow 中的算子(operator)之间是一一对应的。但有时也会出现一个 transformation 包含多个算子的情况

五、入门案例

1、安装Maven整合IDEA开发工具

Maven 是一款基于 Java 平台的项目管理和整合工具,它将项目的开发和管理过程抽象成一个项目对象模型(POM)。开发人员只需要做一些简单的配置,Maven 就可以自动完成项目的编译、测试、打包、发布以及部署等工作。

约定优于配置(Convention Over Configuration)是 Maven 最核心的涉及理念之一 ,Maven对项目的目录结构、测试用例命名方式等内容都做了规定,凡是使用 Maven 管理的项目都必须遵守这些规则。
Maven 项目构建过程中,会自动创建默认项目结构,开发人员仅需要在相应目录结构下放置相应的文件即可。

官方下载地址,下载完成后,解压到合适的位置即可,建议放在D:/devtools目录下。

2、修改Maven的下载源地址和本地仓库地址

修改Maven安装目录下conf/settings.xml文件,具体修改项如下:

<localRepository>D:/devtools/apache-maven-3.6.1/localRepository</localRepository>
    
<mirrors>
  <mirror>
  <id>nexus-aliyun</id>  
  <mirrorOf>*</mirrorOf>    
  <name>Nexus aliyun</name>  
  <url>http://maven.aliyun.com/nexus/content/groups/public</url>  
</mirror>
<mirror>  
  <id>nexus-osc</id>  
  <mirrorOf>*</mirrorOf>  
  <name>Nexus osc</name>  
  <url>http://mirrors.163.com/maven/repository/maven-central/</url>  
</mirror>
</mirrors>

<profiles>
<profile>
  <id>jdk-1.8</id>
  <activation>
  <!--这个字段表示默认激活-->
  <activeByDefault>true</activeByDefault>
  <jdk>1.8</jdk>
  </activation>
  <properties>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    
      <maven.compiler.encoding>UTF-8</maven.compiler.encoding> 
  </properties>
</profile>  
</profiles>

3、IDEA整合Maven

在IDEA的设置中,搜索maven,做如下修改,选择本地安装的Maven相关选项。

4、使用Flink实现批计算

使用Flink Scala完成批处理的词频统计案例,具体处理流程如下:

在pom.xml中添加flink所需依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-scala_2.12</artifactId>
  <version>1.14.5</version>
</dependency>
<!--No ExecutorFactory found to execute the application. 从 flink1.11.0 版本开始,需要多引入一个 flink-client 包-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.12</artifactId>
  <version>1.14.5</version>
</dependency>

详细代码示例如下:

import org.apache.flink.api.scala._

object WordCountBatchTest {
  def main(args: Array[String]): Unit = {
    // 创建Flink的执行环境(批处理的)
    val env = ExecutionEnvironment.getExecutionEnvironment

    // Source 读取数据源
    val data = env.readTextFile("datasource/word.txt")

    // Transformation 转换 计算
    val result = data
        .flatMap(line=>line.split(" "))
        .map(word=>(word,1))
        .groupBy(0)
        .sum(1)

    // Sink 把转换的结果输出
    result.print()
  }
}

 从 flink1.11.0 版本开始,需要多引入一个 flink-client 包

5、使用Flink实现流计算

Flink流计算会借助NetCat工具进行流式数据进行数据录入,具体安装使用如下:

Netcat官网下载地址,下载netcat-win32-1.12.zip压缩包,解压到安装目录,并配置PATH环境变量。

  1. 输入nc -l -p 9000 -v监控9000端口,接收数据
  2. 输入nc localhost 9000进行连接,并发送数据

在cmd中输入命令:nc -l -p 666 监控666端口,并输入测试数据 

在pom.xml中添加flink流处理所需依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.12</artifactId>
  <version>1.14.5</version>
  <!--<scope>provided</scope>-->
</dependency>

 使用Flink Scala编写流式数据处理程序

import org.apache.flink.streaming.api.scala._

object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 获取Flink流处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 数据源-监控Netcat数据端口666
    val data = env.socketTextStream("localhost", 666)
    // 数据转换
    val result = data
        .flatMap(_.split(" "))
        .filter(_.nonEmpty)
        .map((_,1))
        .keyBy(_._1)
        .sum(1)
    // Sink 数据输出到控制台
    result.print()
    // 流处理环境执行
    env.execute()
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/139980.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于 Docker 的 Neo4j 部署及数据备份与恢复

目录一、部署二、验证三、备份3.1 离线备份3.2 在线备份3.3 社区版备份一、部署 1、pull 镜像 docker pull neo4j:4.4.16-community2、创建目录 mkdir -p /home/data/neo4j/{data,logs,conf,import,db-backup}3、运行容器 docker run -itd \--name neo4j \--restart always…

openwrt-看门狗watchdog

一、硬件watchdog和软件watchdog Linux内核不仅为各种不同类型的watchdog硬件电路提供了驱动&#xff0c;还提供了一个基于定时器的纯软件watchdog驱动&#xff0c;软件watchdog基于内核的定时器实现&#xff0c;当内核或中断出现异常时&#xff0c;软件watchdog是无法复位系统…

二叉树17:路径总和

主要是我自己刷题的一些记录过程。如果有错可以指出哦&#xff0c;大家一起进步。 转载代码随想录 原文链接&#xff1a; 代码随想录 leetcode链接&#xff1a;112. 路径总和 112. 路径总和 题目&#xff1a; 给你二叉树的根节点 root 和一个表示目标和的整数 targetSum 。判…

判断图中有没有证件图片

整体解决思路: 前提:拍摄场景光线稳定,证件没有放在图像边缘;且图片使用的证件阅读器拍摄的红外图片,采用了开灯和关灯各拍摄一张图片,图像相减,进行了背景去除; 1)使用二值化和膨胀腐蚀以及sobel算子等进行图像的预处理; 2)进行凸包计算,通过角度,进行证件区域…

缓存穿透,缓存雪崩,缓存击穿的超详解

文章目录1、缓存穿透问题的解决思路2、缓存雪崩问题及解决思路3、缓存击穿问题及解决思路1、缓存穿透问题的解决思路 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓存永远不会生效&#xff0c;这些请求都会打到数据库&#xff0c;失去了缓存的意…

pytorch数据dataset的三种读取方式

文章目录1.自带的datasets2.ImageFolder3.自己创建的dataset&#xff08;用的多&#xff09;1.自带的datasets pytorch自带的数据集 具体数据集如下&#xff1a; 用法&#xff1a; 可以用的功能 2.ImageFolder 文件形式如下时用此&#xff1a;&#xff08;一个文件下只…

MyBatis框架 入门案例

二刷复习简介MyBatis 是一款优秀的持久层框架&#xff0c;它支持自定义 SQL、存储过程以及高级映射。MyBatis 免除了几乎所有的 JDBC 代码以及设置参数和获取结果集的工作。MyBatis 可以通过简单的 XML 或注解来配置和映射原始类型、接口和 Java POJO&#xff08;Plain Old Jav…

磨金石教育科技摄影技能干货分享|艺术摄影的本源是创作,核心是表达

艺术类的作品一直都有一个风格&#xff0c;那就是抽象。特别是摄影领域&#xff0c;一幅作品如果缺乏创意&#xff0c;只是展现现实的景物&#xff0c;那么很容易就变成了纪实摄影或者风光摄影。因此&#xff0c;要想让作品突出艺术性&#xff0c;就要再原有的画面上&#xff0…

Exynos_4412——中断处理(中断学习结尾篇)

目录 一、ARM的异常处理机制 1.1异常概念 1.2异常处理机制 1.3ARM异常源 1.4异常模式 1.5ARM异常响应 1.6异常向量表 1.7异常返回 1.8IRQ异常举例 二、工程模板代码结构 三、中断处理框架搭建 四、中断处理程序 五、用key3再试一试 前景提要&#xff1a; Exynos_…

利用Model Inspector的建模规则检查

利用Model Inspector的规则检查 Model Inspector是一种基于模型的软件静态验证自动化解决方案。通过对模型进行规则检查,开发人员的工作成本将减少,开发效率将会大大提高。 Model Inspector支持各种行业标准建模规范,对违反规范进行检查。 用户可通过dashboard获知模型质量指标…

滴滴前端一面常考手写面试题合集

使用Promise封装AJAX请求 // promise 封装实现&#xff1a; function getJSON(url) {// 创建一个 promise 对象let promise new Promise(function(resolve, reject) {let xhr new XMLHttpRequest();// 新建一个 http 请求xhr.open("GET", url, true);// 设置状态的…

磨金石教育摄影技能干货分享|优秀摄影作品欣赏——艺术的表达

艺术摄影是难以欣赏的&#xff0c;这是大部分人的共识。艺术作品的创作也自然具有较高的难度&#xff0c;很多时候画面的表达与思想不符。要么内容不足以反应思想&#xff0c;要么思想太浅&#xff0c;而内容太杂。想要真正理解作者要表达什么&#xff0c;就要从内容去逐个分解…

ZigBee案例笔记 -- 外部中断

文章目录1.中断概述2.中断屏蔽3.中断处理4.按键中断控制LED1.中断概述 CC2530有18个中断源&#xff0c;每个中断源都有它自己的位于一系列 SFR 寄存器中的中断请求标志。相应标志位请求的每个中断可以分别使能或禁用&#xff0c;中断分别组合为不同的、可以选择的优先级别&…

分布式ID之雪花算法

分布式ID常见生成策略 分布式ID生成策略常见的有如下几种: 数据库自增ID。UUID生成。Redis的原子自增方式。数据库水平拆分&#xff0c;设置初始值和相同的自增步长。批量申请自增ID。雪花算法。百度UidGenerator算法(基于雪花算法实现自定义时间戳)。美团Leaf算法(依赖于数据…

【ZooKeeper】第一章 快速入门

【ZooKeeper】第一章 快速入门 文章目录【ZooKeeper】第一章 快速入门一、概念二、安装1.环境准备2.安装3.配置二、命令操作1.数据模型2.服务端常用命令3.客户端常用命令一、概念 ZooKeeper 是 Apache Hadoop 项目下的一个子项目&#xff0c;是一个树形目录服务ZooKeeper 翻译…

SpringBoot使用SchedulingConfigurer实现多个定时任务多机器部署问题

目录一、使用SchedulingConfigurer实现多个定时任务二、定时任务多机器部署解决方案三、基于redis实现的代码示例3.1、基于redis实现的概述3.2、基于redis实现的代码3.2.1、代码目录结构3.2.2、引入依赖包3.2.3、配置文件新增redis连接配置3.2.4、自定义redis锁注解类3.2.5、自…

Linux 块设备驱动

1.块设备是针对存储设备的&#xff0c;比如 SD 卡、 EMMC、 NAND Flash、 Nor Flash、 SPI Flash、机械硬盘、固态硬盘等。因此块设备驱动其实就是这些存储设备驱动&#xff0c;块设备驱动相比字符设备驱动的主要区别如下&#xff1a; ①、块设备只能以块为单位进行读写访问&am…

【阶段二】Python数据分析Pandas工具使用07篇:探索性数据分析:数据的描述:数据的集中趋势

本篇的思维导图: 探索性数据分析:数据的描述 数据的描述是为了让数据使用者或开发者更加了解数据,进而做到“心中有数”,其描述过程侧重于统计运算和统计绘图。通过统计运算可以得到具体的数据特征,如反映集中趋势中的均值水平、中位数、分位数和众数等;反映分散趋势的方…

Unity脚本 --- VS调试工具

一般游戏逻辑调试的时候用的都是VS调试工具来进行调试 1.在Unity脚本中启动调试后并不会立刻开始调试&#xff0c;还需要我们在Unity中点击play&#xff08;游戏运行&#xff09;后调试才会开始进行 2.在调试的时候点击f11可以逐语句调试&#xff0c;同时当我们在调试的时候想…

螺旋桨k线的意义?

相信大家即使没坐过直升机&#xff0c;也很看见过螺旋桨吧&#xff1f;它的动能巨大&#xff0c;刮起的旋风能支撑起一架飞机的升降。但大家是否知道&#xff0c;在K线技术分析中&#xff0c;也有一种特殊的形态叫“螺旋桨”呢&#xff1f; 三、螺旋桨K线的形态概念 如下图&am…