Flink 算子

news2024/12/23 5:51:28

Flink 算子

用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。

这部分内容将描述 Flink DataStream API 中基本的数据转换 API,数据转换后各种数据分区方式,以及算子的链接策略。

数据流转换

Map

DataStream → DataStream #
输入一个元素同时输出一个元素。下面是将输入流中元素数值加倍的 map function:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
})

FlatMap

DataStream → DataStream #
输入一个元素同时产生零个、一个或多个元素。下面是将句子拆分为单词的 flatmap function:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

Filter

DataStream → DataStream #
为每个元素执行一个布尔 function,并保留那些 function 输出值为 true 的元素。下面是过滤掉零值的 filter:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

KeyBy

DataStream → KeyedStream #
在逻辑上将流划分为不相交的分区。具有相同 key 的记录都分配到同一个分区。在内部, keyBy() 是通过哈希分区实现的。有多种指定 key 的方式。

dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);

以下情况,一个类不能作为 key: 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于
Object.hashCode() 实现。 它是任意类的数组。

Reduce

KeyedStream → DataStream #
在相同 key 的数据流上“滚动”执行 reduce。将当前元素与最后一次 reduce 得到的值组合然后输出新值。

下面是创建局部求和流的 reduce function:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

Window

KeyedStream → WindowedStream #
可以在已经分区的 KeyedStreams 上定义 Window。Window 根据某些特征(例如,最近 5 秒内到达的数据)对每个 key Stream 中的数据进行分组。请参阅 windows 获取有关 window 的完整说明。

dataStream
  .keyBy(value -> value.f0)
  .window(TumblingEventTimeWindows.of(Time.seconds(5))); 

WindowAll

DataStream → AllWindowedStream #
可以在普通 DataStream 上定义 Window。 Window 根据某些特征(例如,最近 5 秒内到达的数据)对所有流事件进行分组。请参阅windows获取有关 window 的完整说明。

这适用于非并行转换的大多数场景。所有记录都将收集到 windowAll 算子对应的一个任务中。

dataStream
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));

Window Apply

WindowedStream → DataStream #
AllWindowedStream → DataStream #
将通用 function 应用于整个窗口。下面是一个手动对窗口内元素求和的 function。

如果你使用 windowAll 转换,则需要改用 AllWindowFunction。

windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// 在 non-keyed 窗口流上应用 AllWindowFunction
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

WindowReduce

WindowedStream → DataStream #
对窗口应用 reduce function 并返回 reduce 后的值。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});

Union

DataStream* → DataStream #
将两个或多个数据流联合来创建一个包含所有流中数据的新流。注意:如果一个数据流和自身进行联合,这个流中的每个数据将在合并后的流中出现两次。

dataStream.union(otherStream1, otherStream2, ...);

Window Join

DataStream,DataStream → DataStream #
根据指定的 key 和窗口 join 两个数据流。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

Interval Join

KeyedStream,KeyedStream → DataStream #
根据 key 相等并且满足指定的时间范围内(e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound)的条件将分别属于两个 keyed stream 的元素 e1 和 e2 Join 在一起。

keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});

Window CoGroup

DataStream,DataStream → DataStream #
根据指定的 key 和窗口将两个数据流组合在一起。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});

Connect

DataStream,DataStream → ConnectedStream #
“连接” 两个数据流并保留各自的类型。connect 允许在两个流的处理逻辑之间共享状态。

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

CoMap, CoFlatMap

ConnectedStream → DataStream #
类似于在连接的数据流上进行 map 和 flatMap。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});

Cache

DataStream → CachedDataStream #
把算子的结果缓存起来。目前只支持批执行模式下运行的作业。算子的结果在算子第一次执行的时候会被缓存起来,之后的 作业中会复用该算子缓存的结果。如果算子的结果丢失了,它会被原来的算子重新计算并缓存。

DataStream<Integer> dataStream = //...
CachedDataStream<Integer> cachedDataStream = dataStream.cache();
cachedDataStream.print(); // Do anything with the cachedDataStream
...
env.execute(); // Execute and create cache.     
cachedDataStream.print(); // Consume cached result.
env.execute();

物理分区

Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。

自定义分区

DataStream → DataStream #
使用用户定义的 Partitioner 为每个元素选择目标任务。

dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);

随机分区

DataStream → DataStream #
将元素随机地均匀划分到分区。

dataStream.shuffle();

Rescaling

DataStream → DataStream #
将元素以 Round-robin 轮询的方式分发到下游算子。如果你想实现数据管道,这将很有用,例如,想将数据源多个并发实例的数据分发到多个下游 map 来实现负载分配,但又不想像 rebalance() 那样引起完全重新平衡。该算子将只会到本地数据传输而不是网络数据传输,这取决于其它配置值,例如 TaskManager 的 slot 数量。

上游算子将元素发往哪些下游的算子实例集合同时取决于上游和下游算子的并行度。例如,如果上游算子并行度为 2,下游算子的并发度为 6, 那么上游算子的其中一个并行实例将数据分发到下游算子的三个并行实例, 另外一个上游算子的并行实例则将数据分发到下游算子的另外三个并行实例中。再如,当下游算子的并行度为2,而上游算子的并行度为 6 的时候,那么上游算子中的三个并行实例将会分发数据至下游算子的其中一个并行实例,而另外三个上游算子的并行实例则将数据分发至另下游算子的另外一个并行实例。

当算子的并行度不是彼此的倍数时,一个或多个下游算子将从上游算子获取到不同数量的输入。

请参阅下图来可视化地感知上述示例中的连接模式:
在这里插入图片描述
dataStream.rescale();

广播

DataStream → DataStream #
将元素广播到每个分区 。
dataStream.broadcast();

算子链和资源组

将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作)。此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求:

如果想对整个作业禁用算子链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()。下列方法还提供了更细粒度的控制。需要注意的是,这些方法只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。例如,可以 someStream.map(…).startNewChain() 这样调用,而不能 someStream.startNewChain() 这样。

一个资源组对应着 Flink 中的一个 slot 槽,更多细节请看 slots 。 你可以根据需要手动地将各个算子隔离到不同的 slot 中。

创建新链

基于当前算子创建一个新的算子链。
后面两个 map 将被链接起来,而 filter 和第一个 map 不会链接在一起。

someStream.filter(...).map(...).startNewChain().map(...);

禁止链接

禁止和 map 算子链接在一起。

someStream.map(...).disableChaining();

配置 Slot 共享组

为某个算子设置 slot 共享组。Flink 会将同一个 slot 共享组的算子放在同一个 slot 中,而将不在同一 slot 共享组的算子保留在其它 slot 中。这可用于隔离 slot 。如果所有输入算子都属于同一个 slot 共享组,那么 slot 共享组从将继承输入算子所在的 slot。slot 共享组的默认名称是 “default”,可以调用 slotSharingGroup(“default”) 来显式地将算子放入该组。

someStream.filter(...).slotSharingGroup("name");

名字和描述

Flink里的算子和作业节点会有一个名字和一个描述。名字和描述。名字和描述都是用来介绍一个算子或者节点是在做什么操作,但是他们会被用在不同地方。

名字会用在用户界面、线程名、日志、指标等场景。节点的名字会根据节点中算子的名字来构建。 名字需要尽可能的简洁,避免对外部系统产生大的压力。

描述主要用在执行计划展示,以及用户界面展示。节点的描述同样是根据节点中算子的描述来构建。 描述可以包括详细的算子行为的信息,以便我们在运行时进行debug分析。

someStream.filter(...).name("filter").setDescription("x in (1, 2, 3, 4) and y > 1");

节点的描述默认是按照一个多行的树形结构来构建的,用户可以通过把pipeline.vertex-description-mode设为CASCADING, 实现将描述改为老版本的单行递归模式。

Flink SQL框架生成的算子默认会有一个由算子的类型以及id构成的名字,以及一个带有详细信息的描述。 用户可以通过将table.exec.simplify-operator-name-enabled设为false,将名字改为和以前的版本一样的详细描述。

当一个作业的拓扑很复杂时,用户可以把pipeline.vertex-name-include-index-prefix设为true,在节点的名字前增加一个拓扑序的前缀,这样就可以很容易根据指标以及日志的信息快速找到拓扑图中对应节点。

本来来自官网文档:https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/operators/overview/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1655705.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

企业网盘竟还能这样用,可道云teamOS:三大冷门使用技巧分享

在日常工作中&#xff0c;大家是否有为海量的文件的管理感到头疼&#xff1f; 每当急需某个重要文件时&#xff0c;总是在各种文件夹中寻寻觅觅半天。这种困扰&#xff0c;我相信许多人都有过。 在这种时候&#xff0c;专业的文件管理软件能帮助我们解决大部分的麻烦。 今天我…

【数据库原理及应用】期末复习汇总高校期末真题试卷09

试卷 一、填空题(每空1分&#xff0c;共10分) 数据的完整性是指数据的________、有效性和相容性。数据模型通常由________、数据操作以及数据约束条件等三要素组成。在关系的有关术语中&#xff0c;关系表中的每一行称作________&#xff0c;每一列称作属性。信息的三种世界是…

WPS二次开发系列:一文快速了解WPS SDK功能场景

作者持续关注 WPS二次开发专题系列&#xff0c;持续为大家带来更多有价值的WPS开发技术细节&#xff0c;如果能够帮助到您&#xff0c;请帮忙来个一键三连&#xff0c;更多问题请联系我&#xff08;QQ:250325397&#xff09; 目录 SDK功能介绍 功能详解&#xff1a; 打开文档…

yaml配置文件的在深度学习中的简单应用

1 .创作灵感 小伙伴们再阅读深度学习模型的代码的时候&#xff0c;经常会遇到yaml格式的配置文件。用这个配置文件是因为我们在训练模型的时候会涉及很多的参数&#xff0c;如果这些参数东一个&#xff0c;西一个&#xff0c;我们调起来的时候就会很不方便&#xff0c;所以用y…

社交媒体数据恢复:飞月

首先&#xff0c;请注意&#xff0c;任何数据恢复操作都不能保证100%找回丢失的数据。因此&#xff0c;在进行数据恢复前&#xff0c;请做好备份&#xff0c;并谨慎操作。 以下是一般性的数据恢复步骤&#xff1a; 导出聊天记录&#xff1a;首先尝试导出飞月的聊天记录。这可以…

全网最全:一文入门最热的LLM应用开发框架LangChain

f#### 1. LangChain 简介 1.1. LangChain 发展史 LangChain 的作者是 Harrison Chase&#xff0c;最初是于 2022 年 10 月开源的一个项目&#xff0c;在 GitHub 上获得大量关注之后迅速转变为一家初创公司。2017 年 Harrison Chase 还在哈佛上大学&#xff0c;如今已是硅谷的…

Promise.all和 race

Promise.all() all方法可以完成并行任务&#xff0c; 它接收一个数组&#xff0c;数组的每一项都是一个promise对象。返回值&#xff1a; 成功时&#xff1a;当数组中所有的promise的状态都达到resolved的时候&#xff0c;就返回包含所有 Promise 结果的数组&#xff0c;并且…

短视频矩阵系统贴牌---saas源头开发

一、短视频矩阵运营注意事项&#xff1a; 如&#xff1a;房产行业 短视频矩阵运营是一个系统化的项目&#xff0c;涉及多个平台和账号的管理&#xff0c;以及内容的创作、发布和优化等多个方面。 以下是短视频矩阵运营的注意事项文档的概要以及结果运营数据 一周持续运营量 二…

Java | Leetcode Java题解之第75题颜色分类

题目&#xff1a; 题解&#xff1a; class Solution {public void sortColors(int[] nums) {int n nums.length;int p0 0, p2 n - 1;for (int i 0; i < p2; i) {while (i < p2 && nums[i] 2) {int temp nums[i];nums[i] nums[p2];nums[p2] temp;--p2;}i…

Driftingblues靶机系列Driftingblues4

获得靶机ip&#xff1a;192.168.108.36 扫描靶机的端口服务&#xff1a; 看到存在&#xff1a;ftp服务&#xff0c;ssh服务和web的http服务&#xff0c;先扫描一下web服务&#xff1a; 访问该网址&#xff1a; 在源代码中看到一串base64编码&#xff1a; Z28gYmFjayBpbnRydW…

抖音小店怎么找达人带货的?分享几个成功率超高的沟通话术!

哈喽~我是电商月月 做抖音小店&#xff0c;特别是无货源的商家想要更多的流量&#xff0c;必定会尝试直播卖货&#xff0c;不会自己直播卖货&#xff0c;就会开通精选联盟&#xff0c;在里面找达人合作 那精选联盟到底是怎样找达人带货的呢&#xff1f; 有的达人打招呼了根本…

【多客校园圈子系统】校园圈子校园论坛社区,多校园微社区交友 校园圈子系统-论坛,跑腿

校园生活服务平台已然成为校园创业的好选择&#xff0c;因为校园人口基数大&#xff0c;人口比聚集&#xff0c;并且现在的学生消费能力还是不错的&#xff0c;所以现在在校园里创业&#xff0c;那真是一个明智的选择&#xff0c;尤其是大学校园创业&#xff0c;但是校园生活服…

怎么用git在暂存区(stage)中移除不需要提交(commit)的文件?

2024年5月9日&#xff0c;周四上午 非常简单&#xff0c;用下面这条命令就可以了 git rm --cached <file>注&#xff1a;这条命令不会把文件从文件夹中删除&#xff0c;只会把文件从暂存区中移除出去 实战

【LeetCode】环形链表I 环形链表II

一、环形链表I 题目 思路 该题使用快慢指针 slow、 fast slow 走一步 &#xff0c;fast 走两步 当fast 走到空 或者 fast的下一个结点为空&#xff0c; 则无环 fast若追上slow &#xff0c; 则有环 结论证明 该思路默认了 &#xff1a; 若存在环形链表 &#xff0c; 无论…

阿里云发布通义千问2.5,OpenCompass上得分追平GPT-4 Turbo

5月9日消息&#xff0c;阿里云正式发布通义千问2.5&#xff0c;模型性能全面赶超GPT-4 Turbo&#xff0c;成为地表最强中文大模型。同时&#xff0c;通义千问最新开源的1100亿参数模型在多个基准测评收获最佳成绩&#xff0c;超越Meta的Llama-3-70B&#xff0c;成为开源领域最强…

前端奇怪面试题总结

面试题总结 不修改下面的代码进行正常解构 这道题考的是迭代器和生成器的概念 let [a,b] {a:1,b:2}答案 对象缺少迭代器&#xff0c;需要手动加上 Object.prototype[Symbol.iterator] function* (){// return Object.values(this)[Symbol.iterator]()return yeild* Object.v…

基于Python Django的公务员考试信息管理系统,附源码

博主介绍&#xff1a;✌Java老徐、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&…

简单几步解决Windows 10播放视频提示安装HEVC扩展

相信有不少人都遇到过以下的问题&#xff0c;废话不多说&#xff0c;直接上干货&#xff01; 1.下载插件 免费地址链接: 点击下载 2.安装插件 如图所示&#xff0c;在下载的目录路径里&#xff0c; 1.按住键盘 SHIFT&#xff0c;点击鼠标右键&#xff0c;选择在此处打开Powe…

5分钟了解下HDFS

随着大数据时代的到来&#xff0c;传统的数据存储和管理方式已经无法满足日益增长的数据处理需求。HDFS&#xff08;Hadoop Distributed File System&#xff09;作为Apache Hadoop项目的一部分&#xff0c;以其高度的容错性、可扩展性和高吞吐量&#xff0c;成为了处理大规模数…

ECMAScript 6简介

ECMAScript 6简介 发布日期目标ECMAScript 和 JavaScript 的关系ES6 与 ECMAScript 2015 的关系 ESx标准 命名规则 ECMAScript 的历史 1. ECMAScript 6简介 1.1. 发布日期 ECMAScript 6.0&#xff08;以下简称 ES6&#xff09;是 JavaScript 语言的下一代标准&#xff0c;已…