1、Flink DataStreamAPI 概述(上)

news2024/12/26 10:41:26

一、DataStream API

1、概述

1)Flink程序剖析
1.Flink程序组成
a)Flink程序基本组成
  • 获取一个执行环境(execution environment)
  • 加载/创建初始数据;
  • 指定数据相关的转换;
  • 指定计算结果的存储位置;
  • 触发程序执行。
b)创建执行环境
getExecutionEnvironment();

createLocalEnvironment();

createRemoteEnvironment(String host, int port, String... jarFiles);

通常,只使用 getExecutionEnvironment() 即可,该方法会根据上下文做正确的处理

  • 如果在 IDE 中执行程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在本地机器上执行程序。
  • 如果基于程序创建了一个 JAR 文件,并通过命令行运行它,Flink 集群管理器将执行程序的 main 方法,同时 getExecutionEnvironment() 方法会返回一个执行环境以在集群上执行程序。
c)指定数据源

执行环境提供了一些方法,支持使用各种方法从文件中读取数据:可以直接逐行读取数据,像读 CSV 文件一样,或使用任何第三方提供的 source,将一个文本文件作为一个行的序列来读取:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path/to/file");
d)应用转换

指定 data sources 将生成一个 DataStream,可以在上面应用转换(transformation)来创建新的派生 DataStream,可以调用 DataStream 上具有转换功能的方法来应用转换,使用 map 的转换:

DataStream<String> input = ...;

DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});
e)指定数据汇

通过创建 sink 把包含最终结果的 DataStream 写到外部系统。

writeAsText(String path);

print();
f)触发程序执行

需要调用 StreamExecutionEnvironmentexecute() 方法来触发程序执行,根据 ExecutionEnvironment 的类型,执行会在本地机器上触发,或将程序提交到某个集群上执行。

  • execute() 方法将等待作业完成,然后返回一个 JobExecutionResult,其中包含执行时间和累加器结果。

在这里插入图片描述

  • 如果不想等待作业完成,可以通过调用 StreamExecutionEnvironmentexecuteAsync() 方法来触发作业异步执行,它会返回一个 JobClient,可以通过它与刚刚提交的作业进行通信。

在这里插入图片描述

如下是使用 executeAsync() 实现 execute() 语义的示例。

final JobClient jobClient = env.executeAsync();

final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
g)注意

所有 Flink 程序都是延迟执行的

  • 当程序的 main 方法被执行时,数据加载和转换不会直接发生,而是每个算子都被创建并添加到 dataflow 形成的有向图中;
  • 当被执行环境的 execute() 方法显示地触发时,这些算子才会真正执行。
2.Data Sources

Source 用于程序获取数据,可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到程序中。

Flink 自带了许多预先实现的 source functions,也可以实现 SourceFunction 接口编写自定义的非并行 source,也可以实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

通过 StreamExecutionEnvironment 可以访问预定义的 stream source:

基于文件

  • readTextFile(path) - 读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。

  • readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是前两个方法内部调用的方法,它基于给定的 fileInputFormat 读取路径 path 上的文件,根据提供的 watchType 的不同,source 可以定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE),使用 pathFilter,可以过滤正在处理的文件。

    实现

    在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控数据读取

    每个子任务都由一个单独的实体实现,监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行,读取任务的并行度和作业的并行度相等。

    单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。

    Reader 是将实际获取数据的角色,每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

    重要提示

    1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理,这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。
    2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容,reader 会继续读取数据,直到所有文件内容都读完,关闭 source 会导致在那之后不再有检查点,这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取

基于套接字

  • socketTextStream - 从套接字读取,元素可以由分隔符分隔。

基于集合

  • fromCollection(Collection) - 从 Java Java.util.Collection 创建数据流,集合中的所有元素必须属于同一类型。
  • fromCollection(Iterator, Class) - 从迭代器创建数据流,class 参数指定迭代器返回元素的数据类型。
  • fromElements(T ...) - 从给定的对象序列中创建数据流,所有的对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class) - 从迭代器并行创建数据流,class参数指定迭代器返回元素的数据类型。
  • fromSequence(from, to) - 基于给定间隔内的数字序列并行生成数据流。

自定义

  • addSource - 关联一个新的 source function,可以使用 addSource(new FlinkKafkaConsumer<>(...)) 从 Apache Kafka 获取数据,更多参考 DataStream Connectors。
3.DataStream Transformations

参考算子。

4.Data Sinks

Data sinks 使用 DataStream 并将它们转发到文件、套接字、外部系统或打印它们。

Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的算子里:

  • writeAsText() / TextOutputFormat - 将元素按行写成字符串,通过调用每个元素的 toString() 方法获得字符串。
  • writeAsCsv(...) / CsvOutputFormat - 将元组写成逗号分隔的文件,行和字段的分隔符是可配置的,每个字段的值来自对象的 toString() 方法。
  • print() / printToErr() - 在标准输出/标准错误流上打印每个元素的 toString() 值,可以提供一个前缀(msg)附加到输出,有助于区分不同的 print 调用,如果并行度大于1,输出结果将附带输出任务标识符的前缀。
  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法和基类,支持自定义 object 到 byte 的转换。
  • writeToSocket - 根据 SerializationSchema 将元素写入套接字。
  • addSink - 调用自定义 sink function,Flink 捆绑了连接到其它系统(例如 Apache Kafka)的连接器,这些连接器被实现为 sink functions。

注意

DataStream 的 write*() 方法主要用于调试,不参与 Flink 的 checkpointing,通常这些函数具有至少有一次语义,刷新到目标系统的数据取决于 OutputFormat 的实现,并非所有发送到 OutputFormat 的元素都会立即显示在目标系统中;此外,在失败的情况下,这些记录可能会丢失。

为了将流可靠地、精准一次地传输到文件系统中,请使用 FileSink;通过 .addSink(...) 方法调用的自定义实现也可以参与 Flink 的 checkpointing,以实现精准一次的语义。

5.执行参数

StreamExecutionEnvironment 包含了 ExecutionConfig,它允许在运行时设置作业特定的配置值。

参数的说明可参考执行配置,这些参数特别适用于 DataStream API:

  • setAutoWatermarkInterval(long milliseconds):设置自动发送 watermark 的时间间隔,可以使用 long getAutoWatermarkInterval() 获取当前配置值。

在这里插入图片描述

a)容错

State & Checkpointing 描述了如何启用和配置 Flink 的 checkpointing 机制。

b)控制延迟

默认情况下,元素不会在网络上一一传输(这会导致不必要的网络传输),而是被缓冲。

缓冲区的大小(实际在机器之间传输)可以在 Flink 配置文件中设置,虽然此方法有利于优化吞吐量,但当输入流不够快时,它可能会导致延迟问题。

要控制吞吐量和延迟,可以调用执行环境(或单个算子)的 env.setBufferTimeout(timeoutMillis) 方法来设置缓冲区填满的最长等待时间,超过此时间后,即使缓冲区没有未满,也会被自动发送,超时时间的默认值为 100 毫秒。

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

为了最大限度地提高吞吐量,设置 setBufferTimeout(-1) 来删除超时,这样缓冲区仅在已满时才会被刷新。

要最小化延迟,请将超时设置为接近 0 的值(例如 5 或 10 毫秒)应避免超时为 0 的缓冲区,因为它会导致严重的性能下降。

6.调试
a)本地执行环境

LocalStreamEnvironment 在创建它的同一个 JVM 进程中启动 Flink 系统,如果从 IDE 启动 LocalEnvironment,则可以在代码中设置断点并轻松调试程序。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// 构建你的程序

env.execute();
b)集合 Data Sources

Flink 提供了由 Java 集合支持的特殊 data sources 以简化测试。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// 从元素列表创建一个 DataStream
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// 从任何 Java 集合创建一个 DataStream
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// 从迭代器创建一个 DataStream
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

注意: 集合 data source 要求数据类型和迭代器实现 Serializable,且集合 data sources 不能并行执行(parallelism = 1)。

c)迭代器 Data Sink

Flink 还提供了一个 sink 来收集 DataStream 的结果。

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync();

注意:在程序完成时或者CheckPoint触发时才会输出结果。

7.总结
1.DataStream 的 write*() 方法不参与 Flink 的 checkpointing,具有至少一次语义,可以通过 FileSink 或 .addSink(...) 方法调用的自定义实现,参与 Flink 的 checkpointing,具有精准一次语义,实现将流可靠地、精准一次地传输到文件系统中;

2.通过配置 BufferTimeout 控制缓冲区刷出的延迟,通过配置 ExecutionConfig 控制执行参数;

3.通过调用 env.executeAsync() 返回的 jobClient 实现对 Flink Job 的控制(可用于在 Java 中控制 Flink 任务);

4.readFile()方法,在底层 Flink 将文件读取过程拆分为两个子任务,即 目录监控 和 数据读取;监控由单个非并行(并行度 = 1)任务实现,而读取由多并行任务执行,读取任务的并行度和作业的并行度相等;单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于watchType),找到要处理的文件,将它们划分为分片,并将这些分片分配给下游 reader;Reader 是将获取数据的角色,每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1620478.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nintex访问上海斯歌总部,双方达成重要战略共识

近日&#xff0c;Nintex公司&#xff08;K2产品总部&#xff09;亚太区域销售副总裁Keith Payne、资深解决方案工程师Ranjit Nair以及区域销售经理Rachel一行莅临上海斯歌总部进行访问。在此次会晤中&#xff0c;上海斯歌与Nintex就未来在中国大陆及中国香港市场的战略发展达成…

SOLIDWORKS 2024 MBD新增功能

MBD即基于模型的工程定义、是一个用集成的三维实体模型来完整表达产品定义信息的方法体&#xff0c;它详细规定了三维实体模型中产品尺寸、公差的标注规则和工艺信息的表达方法。 01 通过实体几何体控制注解的可见性 SOLIDWORKS 2024 MBD 在用户使用体验上做了很大的提升。S…

QT初识

通过图形化界面输出helloworld 既然学习了QT&#xff0c;那么自然要做经典的输出helloworld字符串的实验。 QT有两好几种方案输出helloworld&#xff0c;一种是通过图形化界面输出&#xff0c;一种是通过代码实现。 这里先了解图形化界面的方案。 创建项目后&#xff0c;点…

高频前端面试题汇总之HTML篇

1. src和href的区别 src和href都是用来引用外部的资源&#xff0c;它们的区别如下&#xff1a; src&#xff1a; 表示对资源的引用&#xff0c;它指向的内容会嵌入到当前标签所在的位置。src会将其指向的资源下载并应⽤到⽂档内&#xff0c;如请求js脚本。当浏览器解析到该元素…

函数式接口及Stream流式计算

一、什么是函数式接口 只有一个方法的接口&#xff0c;例如 FunctionalInterface public interface Runnable { public abstract void run(); }二、Function函数式接口&#xff1a;有一个输入参数&#xff0c;有一个输出 三、断定型接口&#xff1a;有一个输入参数&#xf…

如何用虚拟仿真实训室提质增效?

为积极推进教学改革&#xff0c;优化人才培养方案&#xff0c;紧跟产业发展趋势&#xff0c;响应教育领域数字化改革要求&#xff0c;越来越多院校通过搭建虚拟仿真实训室、数字人实训室&#xff0c;打通了融“理论知识虚拟训练技能训练”为一体的教学和实训。学生通过数字人实…

代码随想录 Day19 字符串 | LC28 实现strStr() 【KMP经典题目】

六、实现strStr() 题目&#xff1a; 力扣28&#xff1a;找出字符串中第一个匹配的下标 给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack…

大模型检索召回系统:RAG技术的全面调查与未来展望

随着人工智能技术的飞速发展&#xff0c;大型语言模型&#xff08;LLMs&#xff09;在自然语言处理&#xff08;NLP&#xff09;领域取得了显著成就。然而&#xff0c;这些模型在处理特定领域或知识密集型任务时仍面临挑战&#xff0c;如产生错误信息或“幻觉”。为了克服这些难…

代码随想录算法训练营33期 第五十天 | 188.买卖股票的最佳时机IV

dp[i][0] 不操作&#xff1b;d[i][1]第一次开始持有股票 //dp[i]当前天i的价值情况&#xff0c;dp[i][0]表示不操作的最大价值&#xff0c;dp[i][1]在当前天第一次持有的最大价值&#xff0c;dp[i][2]在当前天第一次卖出的最大价值, dp[i][3]在当前天第二次持有的最大价值&am…

qmt教程2----订阅单股行情,提供源代码

链接 qmt教程2----订阅单股行情&#xff0c;提供源代码 (qq.com) qmt教程1---qmt安装&#xff0c;提供下载链接 今天我重新封装了全部qmt的内容&#xff0c;包括数据&#xff0c;交易 qmt交易 我本来打算全部上次git的&#xff0c;但是考虑到毕竟是实盘的内容&#xff0c;就放…

项目总结计划-(Word)

项目总结计划书-Word 2 项目工作成果 2.1 交付给用户的产品 2.2 交付给研发中心的产品 2.2.1 代码部分 2.2.2 文档部分 2.3 需求完成情况与功能及性能符合性统计 2.3.1 需求完成情况统计 2.3.2 功能符合性分析 2.3.3 性能符合性分析 3 项目工作分析 3.1 项目计划与进度实施分…

消息队列一文全解!!!

消息队列的用途是什么&#xff1f; 第一章 消息队列的用途是什么&#xff1f; 第二章 消息重复消费如何避免&#xff1f; 第三章 消息的顺序性可靠性如何保证&#xff1f; 第四章 高可用的消息队列如何搭建&#xff1f; 第五章 消息队列面试题汇总 文章目录 消息队列的用途是什…

Vue页面生成导出PDF文件

第一种&#xff1a; 使用浏览器自带打印方法window.print(); 也可使用print-js插件&#xff08;原理相同&#xff09; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>printDemo</title> </…

安防监控/智能分析EasyCVR视频汇聚平台海康/大华/宇视摄像头国标语音GB28181语音对讲配置流程

一、背景分析 近年来&#xff0c;国内视频监控应用发展迅猛&#xff0c;系统接入规模不断扩大&#xff0c;涌现了大量平台提供商&#xff0c;平台提供商的接入协议各不相同&#xff0c;终端制造商需要给每款终端维护提供各种不同平台的软件版本&#xff0c;造成了极大的资源浪…

测试工程师面试准备(软硬件)

您好&#xff0c;我叫XXX。学历XX&#xff0c;XXX专业毕业。X年X月份毕业&#xff0c;但是去年二月份已经找到工作开始实习了&#xff0c;目前工作一年了&#xff0c;这一年的过程中我主要负责软件的开发和测试和软硬件联调测试工作。具体来说就是&#xff0c;在软件开发完成后…

制造业小企业内部小程序简单设计

也没什么需求&#xff0c;就是看企业内部外来单位就餐还需要打印客饭单拿去食堂给打饭师傅&#xff0c;出门单还需要打印纸质版&#xff0c;车间PDA扫码出问题需要人手动处理&#xff0c;会议室需要OA申请&#xff0c;但是申请前不知道哪些会议室事空的(因为不是每个人都下载OA…

neovim0.9版本安装

一 安装 Installing Neovim neovim/neovim Wiki (github.com)https://github.com/neovim/neovim/wiki/Installing-Neovim/921fe8c40c34dd1f3fb35d5b48c484db1b8ae94b 1 下载 curl -LO https://github.com/neovim/neovim/releases/latest/download/nvim.appimage chmod ux n…

【黑马点评Redis——003优惠券秒杀4——消息队列Stream】

1. 目前还存在的问题 设置的阻塞队列可能会超出最大长度系统重启会导致阻塞队列中的信息消失&#xff0c;可能会出现问题 2. 消息队列 消息队列 (Message Queue)。 字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色消息队列:存储和管理消息&#xff0c;也被称为…

网页自动跳转到其他页面,点击浏览器返回箭头,回不到原来页面的问题

背景&#xff1a;今天产品提个需求&#xff0c;需要从index页面自动触发跳转到下一页面的事件&#xff0c;从而不做任何操作&#xff0c;直接跳转到test页面。 代码是这样的&#xff1a; index.vue: <template><div style"width:500px;height:600px;background-…

20 Debian如何配置DNS服务(2)主从服务器

作者&#xff1a;网络傅老师 特别提示&#xff1a;未经作者允许&#xff0c;不得转载任何内容。违者必究&#xff01; Debian如何配置DNS服务&#xff08;2&#xff09;主从服务器 《傅老师Debian小知识库系列之20》——原创 前言 傅老师Debian小知识库特点&#xff1a; 1、…