大数据组件之Flink

news2025/1/16 5:53:13

文章目录

  • 大数据组件之Flink
    • 一.Flink简介
      • Flink是什么?
      • Flink的特点
      • Flink框架处理流程
      • Flink发展时间线
      • Flink在企业中的应用
      • Flink的应用场景
      • 为什么选择Flink?
      • 传统数据处理架构
      • 有状态的流式处理(第一代流式处理架构)
      • 流处理的演变(第二代流式处理架构)
      • 新一代流处理器——Flink(第三代分布式流处理器)
      • 流处理的应用场景
      • Flink的分层 API
      • Flink vs Spark
    • 二.快速上手Flink
      • API 简介
      • 环境准备
      • 导入依赖
      • 日志配置
      • 提供数据
      • 批处理 Word Count
      • 流处理 Word Count
      • 读取文本流 Work Count
    • 三.部署Flink

大数据组件之Flink

Flink

一.Flink简介

Flink是什么?

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Apache Flink是一个框架和分布式处理引擎,用于在无界和有界数据流上进行有状态计算。Flink被设计为在所有常见的集群环境中运行,以内存速度和任何规模执行计算。
Flink官网:https://flink.apache.org

Flink的特点

Flink的特点

Flink框架处理流程

Flink框架处理流程

Flink发展时间线

Flink发展时间线

Flink在企业中的应用

Flink在企业中的应用

Flink的应用场景

Flink的应用场景

为什么选择Flink?

为什么选择Flink

传统数据处理架构

传统数据处理架构
关系型数据库的性能瓶颈,无法支撑大数据下的数据计算。

有状态的流式处理(第一代流式处理架构)

有状态的流式处理基于传统数据处理架构,使用本地状态存在内存中,定期存盘,发生故障可以从持久化存储中恢复数据。
但当数据量很大时,使用集群模式,不同的应用有不同的本地状态,各自处理各自的数据,互不干扰。在分布式处理架构中,数据在传输和处理的过程中,时间是不确定的,数据可能会产生乱序,当需要进行数据汇总时,无法保证之前数据处理的顺序,导致结果不准确。

流处理的演变(第二代流式处理架构)

流处理的演变流处理器确保数据处理的低延迟,批处理器确保数据处理的准确性。但系统过于复杂,实现一个需求,同时要维护两套系统,开发及维护成本过高。

新一代流处理器——Flink(第三代分布式流处理器)

新一代流处理器Flink使用一套系统实现 lambda 架构中的两套功能,对于Flink而言,每秒钟能处理百万个事件,毫秒级的延迟并且可以保证结果的准确性。

流处理的应用场景

流处理的应用场景

Flink的分层 API

Flinl的分层API

Flink vs Spark

Flink数据处理架构:
Flink数据处理架构
Spark数据处理架构:
在这里插入图片描述数据模型:
数据模型运行时架构:
运行时架构

二.快速上手Flink

API 简介

Flink底层是以Java编写的,并为开发人员同时提供了完整的Java和Scala API,在具体项目应用中,可以根据需要选择合适语言的 API 进行开发。

环境准备

  • Win 11
  • JDK 1.8
  • Maven
  • IDEA
  • Git

导入依赖

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
    <flink.version>1.13.0</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.30</slf4j.version>
</properties>

<!-- 引入 Flink 相关依赖-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>${slf4j.version}</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>${slf4j.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-to-slf4j</artifactId>
    <version>2.14.0</version>
</dependency>

日志配置

在 resources 下创建 log4j.properties 文件

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

提供数据

在项目下创建 input 文件夹,在此文件夹下创建 words.txt 文件,内容为:

hello world
hello flink
hello java

批处理 Word Count

新建 BatchWordCount.java

package com.handsome.wordcount;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * @ClassName BatchWordCount
 * @Author Handsome
 * @Date 2022/12/19 14:37
 * @Description 批处理 Word Count
 */
public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2.从文件读取数据(使用的是 DataSet API)
        DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
        // 3.将每行数据进行分词,转换成二元组类型(line 行数据 Collector 收集器)
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 将一行文本进行分词
            String[] words = line.split(" ");
            // 将每个单词转换成二元组输出
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4.按照 word 进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
        // 5.分组内进行聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        // 6.打印结果
        sum.print();
    }
}

打印结果

(flink,1)
(world,1)
(hello,3)
(java,1)

流处理 Word Count

新建 BoundedStreamWordCount.java

package com.handsome.wordcount;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @ClassName BoundedStreamWordCount 
 * @Author Handsome
 * @Date 2022/12/19 15:26
 * @Description 流处理 Word Count
 */
public class BoundedStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.从文件读取数据(使用的是 DataStream API)
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");
        // 3.将每行数据进行分词,转换成二元组类型(line 行数据 Collector 收集器)
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 将一行文本进行分词
            String[] words = line.split(" ");
            // 将每个单词转换成二元组输出
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4.按照 word 进行分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        // 5.分组内进行聚合统计
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
        // 6.打印结果
        sum.print();
        // 7.启动执行
        env.execute();
    }
}

打印结果

9> (world,1)
3> (java,1)
13> (flink,1)
5> (hello,1)
5> (hello,2)
5> (hello,3)

IDEA使用多线程模拟了Flink集群,因此每次输出的结果可能都不相同,前面的数字为线程的编号,未设置并行度则默认为电脑最大核数。

读取文本流 Work Count

新建 StreamWorkCount.java

package com.handsome.wordcount;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @ClassName StreamWorkCount
 * @Author Handsome
 * @Date 2022/12/19 17:24
 * @Description 读取文本流 Work Count
 */
public class StreamWorkCount {
    public static void main(String[] args) throws Exception {
        // 1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从参数中提取主机名和端口号
        //ParameterTool parameterTool = ParameterTool.fromArgs(args);
        //String host = parameterTool.get("host");
        //int port = parameterTool.getInt("port");
        //DataStreamSource<String> lineDataStream = env.socketTextStream(host, port);

        // 2.读取文本流
        DataStreamSource<String> lineDataStream = env.socketTextStream("8.142.157.59", 6666);
        // 3.将每行数据进行分词,转换成二元组类型(line 行数据 Collector 收集器)
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 将一行文本进行分词
            String[] words = line.split(" ");
            // 将每个单词转换成二元组输出
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4.按照 word 进行分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        // 5.分组内进行聚合统计
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
        // 6.打印结果
        sum.print();
        // 7.启动执行
        env.execute();
    }
}

8.142.157.59 为 Linux 服务器 IP 地址,需要根据不同的 Linux 服务器,填写不同的 IP 地址,6666 为 Linux 服务监听的端口号。

在 Linux 中执行 nc -lk 6666 监听 6666 端口
启动 StreamWorkCount.java
在 Linux 中输入要计算的数据
数据打印结果

5> (hello,1)
11> (handsome,1)
9> (world,1)
5> (hello,2)
13> (flink,1)
5> (hello,3)
3> (java,1)
5> (hello,4)

三.部署Flink

视频链接:https://www.bilibili.com/video/BV133411s7Sa

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/100959.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实锤了,尤大妥妥的二次元迷弟 —— 聊聊 Vue 的进化历程

文章目录实锤了&#xff0c;尤大妥妥的二次元迷弟 —— 聊聊 Vue 的进化历程1. 前言2. 库阶段2.1 阶段发展2.2 设计重点和特征3. 框架阶段3.1 阶段发展3.2 设计重点4. 通用框架阶段4.1 阶段发展4.2 设计重点4.3 典型案例5. 编译/运行时混合阶段5.1 阶段发展5.2 设计重点5.3 框架…

五十二——六十二

五十二、JavaScript——函数简介 一、函数 函数&#xff08;Function)- 函数也是一个对象 - 它具有其他对象所有的功能 - 函数中可以储存代码&#xff0c;且可以在需要时调用这些代码 语法&#xff1a; function 函数名&#xff08;&#xff09;{ 语句。。。 } 调用函数- 调用…

drm框架介绍

Drm框架介绍 DRM是Linux目前主流的图形显示框架&#xff0c;相比FB架构&#xff0c;DRM更能适应当前日益更新的显示硬件。比如FB原生不支持多层合成&#xff0c;不支持VSYNC&#xff0c;不支持DMA-BUF&#xff0c;不支持异步更新&#xff0c;不支持fence机制等&#xff0c;而这…

嗅探网站视频

前置知识 MP4是我们常见的视频格式&#xff0c;往往我们在播放服务器视频时直接就是请求的MP4视频源。但其实这样并不好&#xff0c;MP4头文件[ftypmoov]较大&#xff0c;初始化的播放需要下载完整的头文件并进行解析&#xff0c;之后再下载一定长度的可播视频片段才能进行播放…

java 瑞吉外卖day4及补全功能 文件上传下载 菜品分页查询 Dto类 菜品状态修改 菜品停售以及菜品删除

文件上传下载 文件下载介绍 文件上传代码实现 服务端上传&#xff1a; RestController RequestMapping("/common") Slf4j public class CommonController {Value("${reggie.path}")private String basePath;//从配置文件读取设置好的basePathPostMapping…

【教程】5步免费白嫖使用Grammarly Premium高级版

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhang.cn] 1、使用Chrome或者Edge浏览器。 2、安装名为Cookie-Editor的谷歌Chrome扩展。 https://chrome.google.com/webstore/detail/cookie-editor/hlkenndednhfkekhgcdicdfddnkalmdm 2、打开Grammarly网站&#xff0c;…

DBCO-PEG-Cyanine5.5,CY5.5 PEG DBCO,二苯并环辛炔-聚乙二醇-Cyanine5.5

中文名&#xff1a;二苯并环辛炔-聚乙二醇-菁染料CY5.5&#xff0c;二苯并环辛炔-聚乙二醇-Cyanine5.5&#xff0c;菁染料CY5.5PEG环辛炔&#xff0c;花青素Cyanine5.5-聚乙二醇-二苯并环辛炔英文名&#xff1a;DBCO-PEG-CY5.5&#xff0c;DBCO-PEG-Cyanine5.5&#xff0c;Cyan…

位运算、递推与递归、前缀和、差分、二分

题目链接&#xff1a;位运算、递推与递归、前缀和、差分、二分 - Virtual Judge (vjudge.net) A.洛谷 - P2280 样例输入&#xff1a; 2 1 0 0 1 1 1 1样例输出&#xff1a; 1 分析&#xff1a;这道题先用二维前缀和处理一下地图&#xff0c;这样我们就可以在O(1)的复杂度内…

SIT-board 远程交互式白板的实现

来自上海应用技术大学的「SIT-board」团队&#xff0c;在七牛云校园黑客马拉松中勇夺冠军&#xff0c;以下是他们的参赛作品——SIT-board远程交互白板的实现过程。 需求分析 基本绘图功能 作为一个在线协作白板&#xff0c;离线的本地化的白板是一切功能的前提。本地白板中需…

Uniapp安卓apk原生云端打包完整过程

1.进入HbuliderX,找到菜单的发行 2.选择原生App-云打包&#xff0c;接着会弹出一个对话框&#xff0c;如图&#xff1a; 3.在对话框中勾选Andriod&#xff08;apk包&#xff09;、选择使用云端证书。 证书说明如下&#xff1a; (1)使用自有证书&#xff1a;开发者自己生成…

左偏树解决猴王问题

一 问题描述 在森林里住着 N 只好斗的猴子。开始时&#xff0c;猴子们彼此不认识&#xff0c;难免吵架&#xff0c;吵架只发生在互不认识的两只猴子之间。吵架发生时&#xff0c;两只猴子都会邀请它们中最强壮的朋友来决斗。决斗过后&#xff0c;两只猴子和它们的所有朋友都认…

screenviewer工具在树莓派3B+上的适配

目录 工具简介 适配初衷 第三方模块适配问题 源码代编译问题 最后完美运行如图 工具简介 屏幕截图web端展示功能、视频设备如摄像头、视频流等接入&#xff0c;并可web端展示。 适配初衷 这样的工具如果能完美运行在嵌入式linux上是极好的&#xff0c;目前仅适配了wind…

为什么你的程序跑不满CPU?——简单聊聊多核多线程

最近同事测试自己的程序&#xff0c;感觉处理耗时太长&#xff0c;一看CPU使用率&#xff0c;才25%。想要提高CPU使用率降低处理时长&#xff0c;于是向我询问。以此为契机写了这篇&#xff0c;聊聊多核多线程。水平有限&#xff0c;仅供参考。 1.单核单线程 一切开始的前提是…

CCProxy + Proxifier 通过另一台电脑访问网络

问题场景描述&#xff1a; 公司提供的 vpn 只提供了 windows 客户端&#xff1b;Mac没有客户端&#xff0c;而家里的 windows 电脑是多年前的旧电脑&#xff0c;配置不足&#xff0c;所以不能使用&#xff1b;这里整理了一种 搭建跳板机 作为中专的方式进行访问 搭建过程 1.…

OpenCV(7)-OpenCV中的滤波器

OpenCV中的滤波器 图像滤波 滤波的作用&#xff1a;一幅图像通过滤波器得到另一幅图像&#xff1b;其中滤波器又称为卷积核&#xff0c;滤波的过程被称为卷积 卷积的几个基本概念&#xff1a; 卷积核的大小&#xff1a; 卷积核一般为奇数&#xff0c;如3 * 3,5 * 5&#xf…

【DELM回归预测】基于matlab灰狼算法改进深度学习极限学习机GWO-DELM数据回归预测【含Matlab源码 1867期】

⛄一、基本极限学习机算法简介 1 核极限学习机 极限学习机(ELM)是一种含L个神经元的单隐藏层前馈神经网络(SLFN)算法&#xff0c;相比于其他神经网络(如BP)具有训练速度快和泛化能力强等特点。但是ELM算法是随机生成各个神经元连接权值和阈值&#xff0c;易造成算法的波动性和…

架构师必读 —— 逻辑模型(4)

解决问题的基本步骤 如果情绪急躁&#xff0c;过于钻牛角尖&#xff0c;坚持“这就是唯一结论”的态度&#xff0c;就会阻碍逻辑思考。情绪急躁、钻牛角尖的行为属短见薄识&#xff0c;只能导致主观臆断。一味地想“简短地传达观点”时&#xff0c;往往会跳过三角逻辑中的论据和…

VSCode下载和安装详细步骤

一、下载 点击 这里 到Visual Studio Code官网下载。 选择下载版本&#xff0c;大家按自己的电脑版本进行选择&#xff08;这里我选的是Windows 64位的&#xff09;。 二、安装 1. 下载好之后&#xff0c;双击进行安装&#xff1b; 2. 选择【我同意此协议】&#xff0c;再点…

如何快速上手react中的redux管理库

前言&#xff1a; 什么是redux&#xff1f;redux和vuex一样&#xff0c;都被统称为状态管理库&#xff0c;是核心数据存贮与分发、监听数据改变的核心所在。 可以简单说下redux和vuex的区别&#xff1a; 相同点 state 共享数据流程一致&#xff1a;定义全局state&#xff0c;…

【DELM回归预测】基于matlab粒子群算法改进深度学习极限学习机PSO-DELM数据回归预测【含Matlab源码 1884期】

⛄一、PSO-DELM简介 1 DELM的原理 在2004年&#xff0c;极限学习机&#xff08;extreme learning machine,ELM&#xff09;理论被南洋理工大学的黄广斌教授提出&#xff0c;ELM是一种单隐含层前馈神经网络&#xff08;single-hidden layer feedforward neural network,SLFN&am…