Flink学习记录

news2025/1/12 6:17:34

可以快速搭建一个Flink编写程序

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.17.1 \
    -DgroupId=com.zxx.langhuan \
    -DartifactId=langhuan-flink \
    -Dversion=1.0.0-SNAPSHOT \
    -Dpackage=com.zxx.langhuan.flink \
    -DinteractiveMode=false

Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有

  • 基本类型,即 String、Long、Integer、Boolean、Array
  • 复合类型:Tuples、POJOs 和 Scala case classes

Flink 的原生序列化器可以高效地操作 tuples 和 POJOs。

对于 Java,Flink 自带有 Tuple0 到 Tuple25 类型。

POJOs

如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):

  • 该类是公有且独立的(没有非静态内部类)
  • 该类有公有的无参构造函数
  • 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。

基本的 stream source

// DataStream API
// fromElements
StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

// fromCollection
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);

// socketTextStream
DataStream<String> lines = env.socketTextStream("localhost", 9999)

// readTextFile
DataStream<String> lines = env.readTextFile("file:///path");

// customSource
DataStreamSource<OUT> out = env.addSource(SourceFunction<OUT> function);



// Table API
// 创建TableEnvironment(表环境)。 创建表环境时,你可以设置作业属性,定义应用的批流模式,以及创建数据源。 我们先创建一个标准的表环境,并选择流式执行器。
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
// 批处理模式
// EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv.executeSql("CREATE TABLE transactions (\n" +
    "    account_id  BIGINT,\n" +
    "    amount      BIGINT,\n" +
    "    transaction_time TIMESTAMP(3),\n" +
    "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
    ") WITH (\n" +
    "    'connector' = 'kafka',\n" +
    "    'topic'     = 'transactions',\n" +
    "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
    "    'format'    = 'csv'\n" +
    ")");

tEnv.executeSql("CREATE TABLE spend_report (\n" +
    "    account_id BIGINT,\n" +
    "    log_ts     TIMESTAMP(3),\n" +
    "    amount     BIGINT\n," +
    "    PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
    ") WITH (\n" +
    "    'connector'  = 'jdbc',\n" +
    "    'url'        = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
    "    'table-name' = 'spend_report',\n" +
    "    'driver'     = 'com.mysql.jdbc.Driver',\n" +
    "    'username'   = 'sql-demo',\n" +
    "    'password'   = 'demo-sql'\n" +
    ")");

Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");

支持的connectors

DataStream ConnectorsTable API Connectors
DataGen
Kafka
Cassandra
DynamoDB
ElasticSearch
Firehose
Kinesis
MongoDB
OpenSearch
文件系统
RabbitMQ
Google Cloud PubSub
Hybrid Source
Pulsar
JDBC
Upsert Kafka
HBase
Print
BlackHole
Hive

场景说明

一个 Flink 集群总是包含一个 JobManager 以及一个或多个 Flink TaskManager。
JobManager 负责处理 Job 提交、 Job 监控以及资源管理。
Flink TaskManager 运行 worker 进程, 负责实际任务 Tasks 的执行,而这些任务共同组成了一个 Flink Job。 
 

有界流:批处理

无界流:流处理

无状态的转换

map()

flatmap()

keyBy()

以下情况,一个类不能作为 key

  1. 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于 Object.hashCode() 实现。
  2. 它是任意类的数组。

reduce() 和其他聚合算子  

数据流转换

有状态的转换

在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:

  • 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
  • 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
  • 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
  • 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
  • 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。

Rich Functions

  • open(Configuration c):仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。
  • close()
  • getRuntimeContext():为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。
  • setRuntimeContext()
  • getIterationRuntimeContext()

ValueState:要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。(相同的key共享)

  • value()
  • update()
  • clear()

Flink 明确支持以下三种时间语义:

  • 事件时间(event time): 事件产生的时间,记录的是设备生产(或者存储)事件的时间

  • 摄取时间(ingestion time): Flink 读取事件时记录的时间

  • 处理时间(processing time): Flink pipeline 中具体算子处理事件的时间

Watermarks: 定义何时停止等待较早的事件

//        AssignerWithPunctuatedWatermarks 在每个事件上都可以提供水位线,因此水位线可以更加灵活和精确地根据事件的特性进行生成。
//        AssignerWithPeriodicWatermarks 在一定的时间间隔内提供水位线,因此水位线的生成不会频繁,适用于事件的时间戳变化频率较高,而水位线的变化频率较低的情况。
//        在选择使用哪个接口时,可以根据具体的业务需求和事件流的特点来决定。如果事件的时间戳和水位线的变化较为频繁,或者需要更精确的控制,可以选择 AssignerWithPunctuatedWatermarks。如果事件的时间戳变化较为平稳,或者水位线的变化不需要那么频繁,可以选择 AssignerWithPeriodicWatermarks。

// WatermarkStrategy 接口包含以下方法:
//        withTimestampAssigner:指定如何从事件中提取事件时间戳(timestamp)。
//        withIdleness:配置是否在流处于空闲状态时发出水位线,默认为不发出水位线。
//        withIdlenessTimeout:指定流处于空闲状态多久后发出水位线。
//        withTimestampAssignerAndIdlenessTimeout:同时指定事件时间戳提取逻辑和流空闲状态的配置。

// 通常,你可以通过静态工厂方法 WatermarkStrategy.forXXX() 来创建特定的水位线策略,其中 XXX 可以是以下几种类型之一:
//        forBoundedOutOfOrderness:适用于乱序但有界的事件流,根据最大允许的乱序程度指定固定的延迟时间。
//        forMonotonousTimestamps:适用于单调递增的事件流,例如源自某些消息队列的事件流。
//        forGenerator:通过自定义的水位线生成器函数来创建水位线策略。
//        forPeriodicBoundedOutOfOrderness:适用于周期性有界乱序事件流,指定固定的延迟时间和乱序间隔。
//        forEventTime:适用于事件时间已经正确嵌入在事件中的情况,不需要提取时间戳。

DataStream<Event> stream = ...;

WatermarkStrategy<Event> strategy = WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.timestamp);

DataStream<Event> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(strategy);

Windows

Flink 有一些内置的窗口分配器,如下所示:

  • 滚动时间窗口
    • 每分钟页面浏览量
    • TumblingEventTimeWindows.of(Time.minutes(1))
  • 滑动时间窗口
    • 每10秒钟计算前1分钟的页面浏览量
    • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
  • 会话窗口
    • 每个会话的网页浏览量,其中会话之间的间隔至少为30分钟
    • EventTimeSessionWindows.withGap(Time.minutes(30))

以下都是一些可以使用的间隔时间 Time.milliseconds(n)Time.seconds(n)Time.minutes(n)Time.hours(n), 和 Time.days(n)

窗口应用函数 

我们有三种最基本的操作窗口内的事件的选项:

  1. 像批量处理,ProcessWindowFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;
  2. 或者像流处理,每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;
  3. 或者结合两者,通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindowFunction 做全量计算。
DataStream<SensorReading> input = ...;

input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new MyWastefulMax());

public static class MyWastefulMax extends ProcessWindowFunction<
        SensorReading,                  // 输入类型
        Tuple3<String, Long, Integer>,  // 输出类型
        String,                         // 键类型
        TimeWindow> {                   // 窗口类型

    @Override
    public void process(
            String key,
            Context context,
            Iterable<SensorReading> events,
            Collector<Tuple3<String, Long, Integer>> out) {

        int max = 0;
        for (SensorReading event : events) {
            max = Math.max(event.value, max);
        }
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

// Context 接口 展示大致如下:
public abstract class Context implements java.io.Serializable {
    public abstract W window();

    public abstract long currentProcessingTime();
    public abstract long currentWatermark();

    public abstract KeyedStateStore windowState();
    public abstract KeyedStateStore globalState();
}

// windowState 和 globalState 可以用来存储当前的窗口的 key、窗口或者当前 key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。
// 增量聚合示例
DataStream<SensorReading> input = ...;

input
    .keyBy(x -> x.key)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .reduce(new MyReducingMax(), new MyWindowFunction());

private static class MyReducingMax implements ReduceFunction<SensorReading> {
    public SensorReading reduce(SensorReading r1, SensorReading r2) {
        return r1.value() > r2.value() ? r1 : r2;
    }
}

private static class MyWindowFunction extends ProcessWindowFunction<
    SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {

    @Override
    public void process(
            String key,
            Context context,
            Iterable<SensorReading> maxReading,
            Collector<Tuple3<String, Long, SensorReading>> out) {

        SensorReading max = maxReading.iterator().next();
        out.collect(Tuple3.of(key, context.window().getEnd(), max));
    }
}

晚到的事件

  • 旁路输出

OutputTag<Event> lateTag = new OutputTag<Event>("late"){};

SingleOutputStreamOperator<Event> result = stream
    .keyBy(...)
    .window(...)
    .sideOutputLateData(lateTag)
    .process(...);

DataStream<Event> lateStream = result.getSideOutput(lateTag);
  • 指定允许的延迟
stream
    .keyBy(...)
    .window(...)
    .allowedLateness(Time.seconds(10))
    .process(...);

如果延迟的事件需要特殊处理,可以从Context中获取水位线时间来做比较

public void processElement(
        TaxiFare fare,
        Context ctx,
        Collector<Tuple3<Long, Long, Float>> out) throws Exception {

    long eventTime = fare.getEventTime();
    TimerService timerService = ctx.timerService();

    if (eventTime <= timerService.currentWatermark()) {
        // 事件延迟;其对应的窗口已经触发。
    } else {
        // 其他处理
    }
}

深入了解窗口操作

滑动窗口是通过复制来实现的

滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果您每隔 15 分钟就有 24 小时的滑动窗口,则每个事件将被复制到 4 * 24 = 96 个窗口中。

时间窗口会和时间对齐

仅仅因为我们使用的是一个小时的处理时间窗口并在 12:05 开始运行您的应用程序,并不意味着第一个窗口将在 1:05 关闭。第一个窗口将长 55 分钟,并在 1:00 关闭。

请注意,滑动窗口和滚动窗口分配器所采用的 offset 参数可用于改变窗口的对齐方式。有关详细的信息,请参见 滚动窗口 和 滑动窗口 。

window 后面可以接 window

比如说:

stream
    .keyBy(t -> t.key)
    .window(<window assigner>)
    .reduce(<reduce function>)
    .windowAll(<same window assigner>)
    .reduce(<same reduce function>);

可能我们会猜测以 Flink 的能力,想要做到这样看起来是可行的(前提是你使用的是 ReduceFunction 或 AggregateFunction ),但不是。

之所以可行,是因为时间窗口产生的事件是根据窗口结束时的时间分配时间戳的。例如,一个小时小时的窗口所产生的所有事件都将带有标记一个小时结束的时间戳。后面的窗口内的数据消费和前面的流产生的数据是一致的。

空的时间窗口不会输出结果

事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。

延迟事件可能导致延迟合并

会话窗口的实现是基于窗口的一个抽象能力,窗口可以 聚合。会话窗口中的每个数据在初始被消费时,都会被分配一个新的窗口,但是如果窗口之间的间隔足够小,多个窗口就会被聚合。延迟事件可以弥合两个先前分开的会话间隔,从而产生一个虽然有延迟但是更加准确地结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/867498.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot复习:(33)WebMvcAutoconfiguration内部静态类WebMvcAutoConfigurationAdapter

WebMvcAutoconfiguration内部静态类WebMvcAutoConfigurationAdapter实现了WebMvcConfigurer接口&#xff0c;重写了一些方法&#xff0c;也就是默认对Spring Mvc进行了一些配置: 该静态类上有个**Import**注解&#xff1a; Import(EnableWebMvcConfiguration.class) 它的父类…

前端笔试题1

HTML/CSS 题1&#xff1a; 1&#xff0e;使用CSS 让该节点不可见&#xff0c;方法越多越好。 <div class"hidden">Hi</div> 使用CSS 让节点不可见的方法有以下几种&#xff1a; 把 visibility 属性设置为 hidden&#xff0c;这样元素框不会被绘制&…

伯俊ERP对接打通金蝶云星空表头表体组合查询接口与采购订单新增接口

伯俊ERP对接打通金蝶云星空表头表体组合查询接口与采购订单新增接口 数据源平台:伯俊ERP 伯俊科技&#xff0c;依托在企业信息化建设方面的领先技术与实践积累&#xff0c;致力于帮助企业实现全渠道一盘货。伯俊提供数字经营的咨询与系统实施&#xff0c;助力企业信息化升级、加…

【C++】STL初识

1.STL的基本概念 2.vector存放内置数据类型 #include <iostream> using namespace std; #include <vector> #include <algorithm>void MyPrint(int val) {cout << val << endl; }void test01() {//创建vector容器对象&#xff0c;并且通过模板参…

DP1.4接口的PCB布局布线要求

DP接口即为DisplayPort接口&#xff0c;是由视频电子标准协会发布的显示接口。DP接口将在传输视频信号的同时加入对高清音频信号传输的支持&#xff0c;并且同时支持更高的分辨率以及刷新率。DP1.4通信端口规范新标准基于DP1.3规范&#xff0c;宽度不变但加入了显示压缩流技术&…

Spring-Cloud-Loadblancer详细分析_3

前两篇文章介绍了加载过程&#xff0c;本文从Feign的入口开始分析执行过程&#xff0c;还是从FeignBlockingLoadBalancerClient.execute来入手 public class FeignBlockingLoadBalancerClient implements Client {private static final Log LOG LogFactory.getLog(FeignBlock…

【TypeScript】进阶之路语法细节,类型和函数

进阶之路 类型别名(type)的使用接口(interface)的声明的使用二者区别&#xff1a; 联合类型和交叉类型联合类型交叉类型 类型断言获取DOM元素 非空类型断言字面量类型的使用类型缩小&#xff08;类型收窄&#xff09;TypeScript 函数类型函数类型表达式内部规则检测函数的调用签…

什么是响应式设计?列举几种实现响应式设计的方法。

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 什么是响应式设计&#xff1f;⭐ 实现响应式设计的方法⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏…

Linux:Shell编程之免交互

目录 绪论 1、here Document免交互 1.1 格式 1.2 cat结合免交互实现重定向输出到指定文件 1.3 变量替换 2、Expect免交互 2.1 三种写法 3、免交互实现普通用户切换root 3.1 send_user 4、接收参数 5、嵌入执行模式 6、ssh远程登录 绪论 免交互&#xff1a;不需要人…

【Linux进行时】进程概念

进程的概念 什么是进程呢&#xff1f; ❓首先我们需要认识一下什么叫进程呢&#xff1f; 课本概念&#xff1a;程序的一个执行实例&#xff0c;正在执行的程序等 &#x1f525;内核观点&#xff1a;担当分配系统资源&#xff08;CPU时间&#xff0c;内存&#xff09;的实体。…

海外ASO优化之关于应用的营销2

在目标受众中建立信任度&#xff0c;并获得博客/新闻网站的热榜&#xff0c;这样自然会提高应用的知名度和目标受众的认知度。就博客读者而言&#xff0c;需要找出推荐的最佳时间和真正推动我们应用是什么。 1、提供了App Store或Google Play的直接链接。 我们首先需要创建一个…

2023年中国锂电池X-Ray及CT检测设备市场竞争格局及行业市场规模前景分析[图]

锂电池X-Ray成像检测设备主要是利用X射线穿透电芯时的吸收、反射、散射效应实现成像并对图像进行处理及算法分析&#xff0c;实现非接触式的无损、自动测量锂电池电芯内部特征尺寸以进行瑕疵检测&#xff0c;确认电芯结构是否合格、避免造成电芯内部短路等安全隐患。 锂电池X-…

fork创建多个子进程

fork创建多个子进程 示例代码 fork1.c #include <stdio.h> #include <stdlib.h> #include <sys/types.h> #include <unistd.h>int main(int argc,char **argv) {int i, j;pid_t pid;for (i 0; i < 3; i){pid fork();if (pid < 0){perror(&q…

Zip压缩包有密码,如何删除?

Zip压缩包设置设置了密码&#xff0c;想要删除密码&#xff0c;除了将压缩包解压出来之后再将文件压缩为不带密码的压缩文件以外&#xff0c;还有一种删除密码的方法。设置方法如下&#xff1a; 右键点击zip文件&#xff0c;找到打开方式&#xff0c;以Windows资源管理器方式打…

sql server profiler使用

一、打开sql server profiler 二、配置 比如我们只过滤包含这个关键字的&#xff0c;输入&#xff1a;%Employees%

【C++11】lambda表达式 | 包装器

文章目录 一.lambda表达式1.lambda表达式概念2.lambda表达式语法3.lambda表达式交换两个数4.lambda表达式底层原理 二.包装器1.function包装器①function包装器介绍②function包装器统一类型③function包装器的意义 2.bind包装器①bind包装器介绍②bind包装器绑定固定参数③bin…

计算机基础之RAID技术

概述 RAID&#xff0c;Redundant Array of Independent Disks&#xff0c;独立磁盘冗余阵列&#xff0c;一种把多块独立的硬盘&#xff08;物理硬盘&#xff09;按不同的方式组合起来形成一个硬盘组&#xff08;逻辑硬盘&#xff09;&#xff0c;从而提供比单个硬盘更高的存储…

Springboot3整合使用aj-captcha行为验证码解决方案

截止到目前(2023-04-20)&#xff0c;Springboot最新稳定版本已经迭代到3.0.5&#xff0c;而我们项目中使用的行为验证码框架aj-captcha还没有适配Springboot3&#xff0c;码云上类似的请求也没有得到过回应&#xff0c;于是决定自己动手适配一下&#xff0c;研究下来发现适配3.…

oi知识表+NOIP提高组算法及算法思想总结

&#xfeff;算法及算法思想总结 │ By lib │ ├暴力 ├模拟 ├递归及递推:数位统计类 ├构造 ▼├排序算法 │ ├冒泡排序 │ ├选择排序 │ ├计数排序 │ ├基数排序 │ ├插入排序 │ ├归并排序 │ ├快速排序 │…