【Flink】Flink基础

news2024/11/23 9:54:22

Flink 官网地址 (官网介绍的非常详细,觉得看英文太慢的直接使用浏览器一键翻译,本文是阅读官方文档后进行的内容梳理笔记) https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/python/overview/

这 Flink API 贼像 Java 的函数式编程 , 所以先学一下那个比较好 , 可能看 Flink API 的时候会舒服一点点 ~~~

一、Flink 做什么

  • 流处理 🌰
    处理无界数据,换句话说,数据输入永远不会结束
  • 批处理
    处理有界数据的工作范式

在 Flink 中,应用程序由流数据组成,从一个源开始,经过处理,最后再以一个接收器结束 ,下面来自 Flink 官网的一张图很好的描述了这一过程 。

请添加图片描述
举个小栗子,Flink 应用程序消耗来自 Kafka 的实时日志数据,生成的结果流发送给其他的各种系统使用 。

请添加图片描述

二、API

流水的技术,铁一般的API调用大师~~

1. 序列化

Flink 可将基本数据类型 和 复合类型 序列化为 Flink 可流式传输的内容 。

  • 基本 : 字符串、整数等等
  • 复合: 元组Tuple 、 Java 类 POJO 等等

1)元组 Tuple

Flink 支持 Tuple0 到 Tuple25, 使用方式如下 :

// X 为数字, 表示定义了一个 X元组
TupleX<String, Integer, ....> person = Tuple2.of(Value1, Value2, .... , ValueX);

2)POJO

可以被 Flink 序列化的类需要满足以下条件 :

  • 类公共且独立
  • 有公共的无参构造函数
  • 类的非静态字段要么是公共的,要么具有getter和setter方法 (遵循规范的)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Example {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {}

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }
}

2. 流执行环境 ExecutionEnvironment

Flink 应用程序需要一个执行环境 StreamExecutionEnvironment 。 通过 execute 应用被打包发送到 JobManager 中并行执行。
请添加图片描述

3. 数据流来源 DataStream

1)用于测试

  1. env.fromElements

放入一些实例化的元素进去,或者直接放入一个 List, 这是一种简单的用于测试的便捷方法

List<Person> people = new ArrayList<Person>();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream<Person> flintstones = env.fromCollection(people);
  1. env.socketTextStream

直接传入一个套接字作为数据流的来源

DataStream<String> lines = env.socketTextStream("localhost", 9999);
  1. env.readTextFile

传入一个文件作为数据流的来源

DataStream<String> lines = env.readTextFile("file:///path");

2)实际应用

实际应用程序中,数据常来源于 Kafka 等消息系统

4. 接收器 Sink

  1. 测试环境使用 print() 打印结果到控制台
  2. 生产中,接收器往往是各种数据库 或者 各个子系统

5. ETL

Flink 可以帮助我们实现大数据处理中的 ETL 过程 (抽取、转换、加载)

极度类型 Java8 后提供的 Stream 函数式编程操作 ✔️

1)map & flatmap

实现类型转换的基本操作 ,map 和 flatmap 的区别如下:

  • map
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
DataStream<EnrichedRide> enrichedNYCRides = rides
    .map(new Enrichment());
enrichedNYCRides.print();


public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {
	// 重写 map 方法
    @Override
    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
        return new EnrichedRide(taxiRide);
    }
}
  • flatmap
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));
DataStream<EnrichedRide> enrichedNYCRides = rides
    .flatMap(new NYCEnrichment());
enrichedNYCRides.print();

public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {
	// 重写 flatMap 方法,通过 Collector 发出任意数量的流元素
    @Override
    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
        if (valid.filter(taxiRide)) { 
            out.collect(new EnrichedRide(taxiRide));
        }
    }
}

2)keyBy

相当于 SQL 中的 GROUP BY

3)window

在Flink中,窗口是指对数据流进行划分并对每个划分的数据应用某个函数的概念。窗口可以根据不同的维度进行划分,如时间、元素数量等。Flink提供了丰富的窗口函数,包括滚动窗口、滑动窗口、会话窗口等。

举个例子,我们想要将某个数据流按照时间窗口进行划分,并对每个窗口内的商品进行价格平均值的计算,可以通过下面的代码来实现:

DataStream<Tuple2<String, Double>> dataStream = ...;
dataStream
  .keyBy(0) // 按照商品名称进行keyBy操作
  .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 定义10秒时间窗口
  .aggregate(AverageAggregateFunction()) // 对每个窗口进行平均值计算,AverageAggregateFunction() 为自定义的计算函数
  .print();
env.execute("Window Average Example");

4)process

process 通过接收一个处理函数 ProcessFunction 对数据流进行灵活的处理 , 让程序员直接控制数据流的处理 。

ProcessFunction将处理过程拆分为输入、处理和输出三个步骤,允许自定义状态,提供了比其他转换更精细的控制。

下面举个🌰, 假设我们有一个数据流,其中每个数据包含一个客户ID、订单ID和订单金额。我们想要按照客户ID对订单金额进行累加,并将累加结果大于100的客户ID和订单金额输出。

// Tuple3<String, String, Double> 为输入 , Tuple2<String, Double> 为输出
public class ProcessFunctionExample extends ProcessFunction<Tuple3<String, String, Double>, Tuple2<String, Double>> {
  // 自定义状态
  private ValueState<Double> sumState;
  @Override
  public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>(
      "sum",
      TypeInformation.of(new TypeHint<Double>() {})
    );
    sumState = getRuntimeContext().getState(descriptor);
  }

  @Override
  public void processElement(Tuple3<String, String, Double> value, Context ctx, Collector<Tuple2<String, Double>> out) throws Exception {
  	// 自定义状态
    Double sum = sumState.value();
    if (sum == null) {
      sum = 0.0;
    }
    sum += value.f2;
    sumState.update(sum);
	// 输出,并清除状态
    if (sum > 100.0) {
      out.collect(Tuple2.of(value.f0, sum));
      sumState.clear();
    }
  }
}
// 将上面的处理函数应用到下面的数据流中
📎
DataStream<Tuple3<String, String, Double>> input = env.fromElements(
  Tuple3.of("Customer1", "Order1", 50.0),
  Tuple3.of("Customer1", "Order2", 75.0),
  Tuple3.of("Customer2", "Order3", 30.0),
  Tuple3.of("Customer2", "Order4", 70.0),
  Tuple3.of("Customer3", "Order5", 20.0),
  Tuple3.of("Customer3", "Order6", 80.0)
);

input
  .keyBy(0)
  .process(new ProcessFunctionExample())
  .print();

env.execute("ProcessFunction Example");

5)addSink

addSink()是将处理后的数据写入到外部系统的方法,Flink提供了许多现成的SinkFunction,如写入到Kafka、写入到HDFS等。

举个🌰:我们有一个DataStream,其中每个元素都是一个字符串类型的单词,我们想要将这些单词写入到外部的文本文件中。

// 创建一个数据流
DataStream<String> words = env.fromElements("hello", "world", "flink");

// 写入到外部文本文件
words.addSink(new TextFileSink("/path/to/output/file"));

// TextFileSink 的实现
public class TextFileSink implements SinkFunction<String> {

  private final String outputPath;

  public TextFileSink(String outputPath) {
    this.outputPath = outputPath;
  }

  /**
   * 实际的写入方法,将数据写入到文本文件中
   */
  @Override
  public void invoke(String value, Context context) throws Exception {
    FileWriter fileWriter = new FileWriter(outputPath, true);
    fileWriter.write(value + "\n");
    fileWriter.close();
  }
}

想必你已经看出来了,上面就是一个完整的数据 ETL 处理流程鸭 ~~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/422402.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

winForm初始

创建winForm应用程序步骤 创建项目界面设计&#xff0c;拖控件布局设置属性编写代码运行程序 设置属性 在forms框内右击属性 属性框内有 修改标题 在(属性)里的外观里的Text, 点击text后会出现相对应的提示 设置关联属性名称 查看代码 右击 设置label名称 设置textbox关联属…

38-Vue之cron表达式组件使用

cron表达式组件使用前言vue-cron-editor-buefy1. 安装vue-cron-editor-buefy包2. 使用3. 配置路由4. 运行并查看效果vcrontab1. 安装vcrontab包2. 使用3. 配置路由4. 运行并查看前言 本篇来学习下vue中如何生成cron表达式的两个包 vue-cron-editor-buefy 1. 安装vue-cron-ed…

先认识浏览器和 dom

先认识浏览器和 dom 认识浏览器使用控制台(console)初识 dom获取浏览器可见区域高度简单的操作一下 dom向页面添加一个元素innerHTML认识块模式认识坐标与定位小结认识浏览器 我们先创建一个文本文件,然后将其扩展名改成 html,或者直接创建一个 html 文档。嗯,空白的,里…

『pyqt5 从0基础开始项目实战』08. 本地数据配置文件的保存与读取之SMTP邮件报警(保姆级图文)

目录导包和框架代码简化说明绑定鼠标事件编写弹窗UI和读取配置保存配置功能读取本地配置文件编写UI界面保存设置main.py中启动弹窗UI完整代码main.pythreads.pydialog.py总结欢迎关注 『pyqt5 从0基础开始项目实战』 专栏&#xff0c;持续更新中 欢迎关注 『pyqt5 从0基础开始项…

什么是MVVM?

MVVM 是 Model-View-ViewModel 的缩写&#xff0c;是M-V-VM三部分组成。它本质上就是MVC的改进版。 M&#xff1a;Model 代表数据模型&#xff0c;也可以在Model中定义数据修改和操作的业务逻辑。 V&#xff1a;View 代表视图UI&#xff0c;它负责将数据模型转化成UI 展现出来。…

OpenResty+OpenWAF的WEB防护实战

OpenResty是一个基于 Nginx 与 Lua 的高性能 Web 平台&#xff0c;其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。本文介绍通过OpenRestyOpenWAF来搭建软WAF的应用&#xff…

【Linux】多线程协同

目录 生产消费模型 BlockQueue阻塞队列模型 BlockQueue.hp Task.hpp mypc.cc RingQueue循环队列模型 POSIX信号量 RingQueue.hpp Task.hpp main.cc 生产消费模型 生产者与生产者之间关系&#xff1a;互斥&#xff08;竞争&#xff09; 消费者与消费者之间关系&…

偏向锁到轻量级锁的升级过程(耗资源)

目录 上原理&#xff1a; 细说原理&#xff1a; 什么是锁记录呢&#xff1f; 什么是Mark Word 呢&#xff1f; 上图解&#xff1a; 上原理&#xff1a; 偏向锁使⽤了⼀种等到竞争出现才释放锁的机制&#xff0c;所以当其他线程尝试竞争偏向锁时&#xff0c; 持有偏向锁的…

nssctf web 入门(3)

目录 [NISACTF 2022]easyssrf [SWPUCTF 2021 新生赛]ez_unserialize [SWPUCTF 2021 新生赛]no_wakeup 这里通过nssctf的题单web安全入门来写&#xff0c;会按照题单详细解释每题。题单在NSSCTF中。 想入门ctfweb的可以看这个系列&#xff0c;之后会一直出这个题单的解析&…

FLStudio21中文版本好不好用?值不值得下载

FLStudio中文21最新版本以其使用速度而闻名&#xff0c;是一个高度复杂的音乐制作环境。现代的DAW是一种非凡的野兽。首先&#xff0c;它在很大程度上把自己放在了(几乎)每个人记录过程的核心。其次&#xff0c;通过在价格适中的软件中模拟完整的工作室体验&#xff0c;它在音乐…

国内版的ChatGPT弯道超车的机会在哪里?

前言 从去年11月最后一天ChatGPT诞生&#xff0c;截至目前&#xff0c;ChatGPT的热度可谓是爆了。众所周知&#xff0c;ChatGPT是美国“开放人工智能研究中心”研发的聊天机器人程序&#xff0c;它是一个人工智能技术驱动的自然语言处理工具&#xff0c;它能够通过学习和理解人…

【数据分析】— 特征工程、特征设计、特征选择、特征评价、特征学习

【数据分析】— 特征工程特征工程是什么&#xff1f; (Feature Engineering)特征工程的意义特征工程的流程特征的设计从原始数据中如何设计特征&#xff1f;基本特征的提取创建新的特征函数变换特征独热特征表示 One-hot Representation数据的统计特征TF-IDF&#xff08;词频-逆…

「Cpolar」看我如何实现公网远程控制Mac OS【使用mac自带VNC】

&#x1f482;作者简介&#xff1a; THUNDER王&#xff0c;一名热爱财税和SAP ABAP编程以及热爱分享的博主。目前于江西师范大学本科在读&#xff0c;同时任汉硕云&#xff08;广东&#xff09;科技有限公司ABAP开发顾问。在学习工作中&#xff0c;我通常使用偏后端的开发语言A…

探寻人工智能前沿 迎接AIGC时代——CSIG企业行(附一些好玩的创新点)

上周我有幸参加了由中国图像图形学会和合合信息共同举办的CSIG企业行活动。这次活动邀请了多位来自图像描述与视觉问答、图文公式识别、自然语言处理、生成式视觉等领域的学者&#xff0c;他们分享了各自的研究成果和经验&#xff0c;并与现场观众进行了深入的交流和探讨。干货…

重感知还是重地图?其实无需选择

近来&#xff0c;关于自动驾驶应该重感知还是重地图是个热点话题&#xff0c;很多重量级车厂、自动驾驶供应商都开始提出重感知轻地图的方案&#xff0c;并承诺很快能发布出对应的产品。业界也出现了高精地图已“死”等类似的言论。 一时之间&#xff0c;似乎轻地图已经成为了…

三种实现模型可视化的方式(print, torchinfo, tensorboard)

记录一下自己使用的三种模型可视化的方式&#xff0c;从简单到难 Print 最简单的是print&#xff0c;就不用多说了。 Torchinfo from torchinfo import summary import torch model (...) summary(model, (1,3,128,128))即可按照像文档路径一样的方式输出结构&#xff0c;…

算法模板(2):数据结构(5)做题积累

数据结构&#xff08;3&#xff09; 一、并查集 238. 银河英雄传说 有 NNN 艘战舰&#xff0c;也依次编号为 1,2,...,N1,2,...,N1,2,...,N&#xff0c;其中第 iii 号战舰处于第 iii 列。有 TTT 条指令&#xff0c;每条指令格式为以下两种之一&#xff1a;M i j&#xff0c;表…

Linux lvm管理讲解及命令

♥️作者:小刘在C站 ♥️个人主页:小刘主页 ♥️每天分享云计算网络运维课堂笔记,努力不一定有收获,但一定会有收获加油!一起努力,共赴美好人生! ♥️夕阳下,是最美的绽放,树高千尺,落叶归根人生不易,人间真情 前言 目录 一、lvm管理 1.Logical Volume Manager,逻…

【运维笔记】VM centos 环境安装

镜像选择 阿里镜像源 注意在安装时&#xff0c;安装非图形化界面选择minimal版本安装。&#xff08;笔者在安装时选择了erverything和DVD&#xff0c;发现都是图形界面hhh&#xff0c;浪费了一早上时间&#xff09; 翻阅百度垃圾堆&#xff0c;版本号都推荐7.6-7.9&#xff…

图解国家网信办《生成式人工智能服务管理办法(征)》| 附下载

伴随ChatGPT兴起&#xff0c;生成式人工智能技术正作为一种创造性应用&#xff0c;牵引场景创新&#xff0c;推动新技术迭代升级和产业快速增长。由于生成式人工智能处于发展初期&#xff0c;技术成熟度、政策合规性等发展不足&#xff0c;导致其极易面临非法获取数据、个人隐私…