Flink 常用API(2)——转换算子+聚合算子

news2025/1/20 21:57:49

转换算子(Transformation)

映射(map)

用于将数据流中的数据进行转换,形成新的数据流

“一一映射”,消费一个元素就产出一个元素

参数:接口 MapFunction 的实现

方法:map

返回值类型:DataStream

通过匿名类/实现类实现:

// 传入匿名类,实现 MapFunction
//Event是输入,String是输出
stream.map(new MapFunction<Event, String>() {
 @Override
 public String map(Event e) throws Exception {
 return e.user;
 }
 });

// 传入 MapFunction 的实现类
 stream.map(new UserExtractor()).print();
 env.execute();
public static class UserExtractor implements MapFunction<Event, String> {
 @Override
 public String map(Event e) throws Exception {
 return e.user;
 }
 }

源码相关:

调用map方法,可以看到返回的是一个SingleOutputStreamOperator

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
        TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);
        return this.map(mapper, outType);
    }

而SingleOutputStreamOperator 类本身也继承自 DataStream 类,在DataStream类的基础上对一些参数进行了更详细的设置;

public class SingleOutputStreamOperator<T> extends DataStream<T>

过滤(filter)

参数:FilterFunction 接口的实现

方法:filter

返回值类型:DataStream

 // 传入匿名类实现FilterFunction
        stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event e) throws Exception {
                return e.user.equals("Mary");
            }
        });

        // 传入FilterFunction实现类
        stream.filter(new UserFilter()).print();

        env.execute();
    }
    public static class UserFilter implements FilterFunction<Event> {
        @Override
        public boolean filter(Event e) throws Exception {
            return e.user.equals("Mary");
        }
    }

扁平映射(flatMap)

先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理

参数:FlatMapFunction 接口的实现

方法:flatMap

返回值类型:flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来指定输出

同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同

public static class MyFlatMap implements FlatMapFunction<Event, String> {
 @Override
 public void flatMap(Event value, Collector<String> out) throws Exception 
{
 if (value.user.equals("Mary")) {
 out.collect(value.user);
 } else if (value.user.equals("Bob")) {
 out.collect(value.user);
 out.collect(value.url);
 }
 }
 }

聚合算子(Aggregation)

按键分区(keyBy)

对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率,因此要聚合,先分区;

keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区,也就是并行处理的子任务,对应着任务槽(slot);这样具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理

在keyBy()方法的内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以这里 key 如果是 POJO 的话,必须要重写 *hashCode() *方法

实现方法:

// 使用 Lambda 表达式
 KeyedStream<Event, String> keyedStream = stream.keyBy(e -> e.user);


 // 使用匿名类实现 KeySelector
 KeyedStream<Event, String> keyedStream1 = stream.keyBy(new KeySelector<Event, String>() {
 @Override
 public String getKey(Event e) throws Exception {
 return e.user;
 }
 });

//结果:将 DataStream 转换为KeyedStream

简单聚合

参数:

传入的参数决定了聚合的依据,

对于元组有两种形式,位置和名称:

stream.keyBy(r -> r.f0).sum(1).print();(元组中1号位置)

stream.keyBy(r -> r.f0).sum("f1").print();(元组中名称为f1的数据)

对于POJO,只能传入字段名:

stream.keyBy(e -> e.user).max( "timestamp" ).print();

通过聚合操作,从 KeyedStream 转换成了DataStream

也就是说,先分区,再聚合,得到的依然是一个DataStream

归约聚合(reduce)

reduce 算子是一个一般化的聚合统计操作

实现 ReduceFunction 接口,接口里需要实现 reduce()方法

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 通过reduce算子实现sum和maxBy的功能
        env.addSource(new ClickSource())
                .map(new MapFunction<Event, Tuple2<String,Long>>(

                        ) {
                    @Override
                    public Tuple2<String, Long> map(Event event) throws Exception {
                        return Tuple2.of(event.getUser(),1L);
                    }
                })
                        .keyBy(e -> e.f0)
                                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                                    @Override
                                    public Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
                                        return Tuple2.of(t1.f0, t1.f1+t2.f1);
                                    }
                                })
                                        .keyBy(e -> true)
                                                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                                                    @Override
                                                    public Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
                                                        return t1.f1 > t2.f1 ? t1 : t2;
                                                    }
                                                })
                                                        .print();

        env.execute();
    }

用户自定义函数(UDF)

富函数类(Rich Function Classes)

所有的 Flink 函数类都有其Rich 版本

与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能

一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在 map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以我们可以在 open()中建立连接,在 map()中读写数据,而在 close()中关闭连接

另外,富函数类提供了 getRuntimeContext()方法,可以获取到运行时上下文的一些信息,例如程序执行的并行度,任务名称,以及状态

物理分区

keyBy:逻辑分区

物理分区与 keyBy 一大区别在于,keyBy 之后得到的是一个 KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式


  1. 随机分区(shuffle)

    方法:.shuffle()

每次执行结果都会不相同:

        2.轮询分区(Round-Robin)

方法:.rebalance();rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去

        3.重缩放分区(rescale)

方法:.rescale()

重缩放和轮询的区别:

①重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中;也就是说,“发牌人”如果有多个,那么 rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌

②从底层实现上看,rebalance 和 rescale 的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而 rescale 仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源

示例:

 env.addSource(new RichParallelSourceFunction<Integer>() {  // 这里使用了并行数据源的富函数版本
                    @Override
                    public void run(SourceContext<Integer> sourceContext) throws Exception {
                        for (int i = 1; i <= 8; i++) {
                            // 将奇数发送到索引为1的并行子任务
                            // 将偶数发送到索引为0的并行子任务
                            if ( i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                                sourceContext.collect(i);
                            }
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                })
                .setParallelism(2)
                .rescale()
                .print().setParallelism(4);

结果:

可以看到,奇数全部在3,4分区,偶数全部在1,2分区;

如果使用rebalance方法:

可以看到,奇数偶数平均分配到所有分区

        4.广播(broadcast):经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去

        5.全局分区(global):通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1

       6. 自定义分区(Custom):使 用partitionCustom()方法来自定义分区

// 将自然数按照奇偶分区
 env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
 .partitionCustom(new Partitioner<Integer>() {
 @Override
 public int partition(Integer key, int numPartitions) { //分区器
 return key % 2;
 }
 }, 
 new KeySelector<Integer, Integer>() { //KeyBy
 @Override
 public Integer getKey(Integer value) throws Exception {
 return value;
 }
 })
 .print().setParallelism(2);
 env.execute();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/522811.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#串口通信从入门到精通(13)——多个串口发送数据

文章目录 前言1、多串口数据的发送2、源码前言 我们在开发串口通信程序时,有时候会需要连接不止一个串口,这时候该怎么写程序呢?本文就来介绍多个串口数据的发送 1、多串口数据的发送 我们在之前的专栏中介绍了串口数据的发送,当时有提到过,我们是通过创建一个SerialPo…

支付系统设计三:渠道网关设计06-业务处理

文章目录 前言一、业务服务工厂二、业务处理服务1. 业务处理服务2. 业务处理抽象服务3. 流量控制4. 报文提交4.1 获取交易的服务端通讯列表4.2 循环请求支付渠道4.2.1 报文组装4.2.2 报文发送4.2.2.1 协议处理器获取4.2.2.2 构建通讯客户端4.2.2.3 发送请求4.2.2.4 响应报文读取…

【rust】| 06——语言特性 | 所有权

系列文章目录 【rust】| 00——开发环境搭建 【rust】| 01——编译并运行第一个rust程序 【rust】| 02——语法基础 | 变量(不可变?)和常量 【rust】| 03——语法基础 | 数据类型 【rust】| 04——语法基础 | 函数 【rust】| 05——语法基础 | 流程控制 【rust】| 06——语言特…

论文/文章/课设 不会写 后端的实现方式?来试试这个吧!

起因 有朋友用了云开发&#xff0c;但是不太会写&#xff0c;老师也不太理解&#xff0c;就来询问我该怎么写&#xff08;不要苛责老师古板&#xff0c;他们确实不一定能够立刻接受新东西&#xff09; 用的不是云开发的也适用以下思路 思路 我们把后端开发类比成拧螺丝&…

非煤矿山电子封条 yolov7

非煤矿山电子封条通过yolov7python网络模型技术&#xff0c;非煤矿山电子封条可以对矿山主副井口、风井口、车辆出入口和调度室等全天候不间断实时分析预警&#xff0c;发现人员违规行为及异常设备状态立即告警。YOLOv7 的发展方向与当前主流的实时目标检测器不同&#xff0c;研…

GitSVN区别及选型

1、结论先行 git更适用于纯代码仓库&#xff0c;优势在于分支管理svn则擅长于文件管理&#xff0c;优势在于目录级权限控制 2、版本管理发展历程 3、Git&SVN为何而生 Git出生于2005年&#xff0c;是 Linus Torvalds 为了帮助管理 Linux 内核开发而开发的一个开放源码的版…

LeetCode 63 不同路径 II

题目&#xff1a; 一个机器人位于一个 m x n 网格的左上角 &#xff08;起始点在下图中标记为 “Start” &#xff09;。机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角&#xff08;在下图中标记为 “Finish”&#xff09;。现在考虑网格中有障碍物。那么从左…

一款超级简洁的个人博客系统搭建教程(附源码)

开发环境 IDEA、jdk1.8、mysql8.33 开发框架 springboot 1.首先&#xff0c;确保已安装 Git 和 IntelliJ IDEA。如果你还没有安装 Git&#xff0c;请前往官网下载并安装 Git。 2.打开 IntelliJ IDEA&#xff0c;选择 “File” -> “New” -> “Project from Version Con…

Java面试知识点(全)- Java面试基础部分三

[Java面试知识点(全) 导航&#xff1a; https://nanxiang.blog.csdn.net/article/details/130640392 注&#xff1a;随时更新 ThreadPoolExecutor 如果不了解这个类&#xff0c;应该了解前面提到的ExecutorService&#xff0c;开一个自己的线程池非常方便&#xff1a; Execut…

桂院导航小程序 静态项目 二次开发教程

Gitee代码仓库&#xff1a;桂院导航小程序 先 假装 大伙都成功安装了静态项目&#xff0c;并能在 微信开发者工具 和 手机 上正确运行。 接着就是 将项目 改成自己的学校。 代码里的注释我就不说明了&#xff0c;有提到 我的学校 的文字都改成你自己的就行 1. 全局 app.json…

吴恩达教你写提示词 ChatGPT prompt engineering

文章目录 吴恩达教你写提示词 ChatGPT prompt engineering1. 关键提示&#xff08;prompt&#xff09;原则1. 基础2. 编写明确和具体的提示词3. 给模型时间“思考”4. 模型的限制5. 迭代式提示&#xff08;prompt&#xff09;开发过程 2. 提示&#xff08;prompt&#xff09;一…

Ubuntu下通过Wine安装LTSpice 17.1.8

LTSpice LTSpice 是常用的电路模拟软件, 但是只有 Windows 版本和 Mac 版本, 在 Linux 下需要用 Wine 运行. 以下说明如何在 Ubuntu 下安装最新的 LTSpice 17.1.8 安装 下载 LTSpice 安装文件 下载地址: https://www.analog.com/en/design-center/design-tools-and-calcula…

51单片机也可以移植RTOS

说起RTOS移植&#xff0c;我们首先会想到32位单片机。 那么51单片机可以移植RTOS吗&#xff1f; 我的答案是&#xff0c;只要资源够用&#xff08;ROM空间、RAM空间&#xff09;&#xff0c;可以移植。 前提是你对RTOS的实现原理非常清楚&#xff0c;并且可以自己完成移植工作…

国内免费cdn汇总2023最新

内容分发网络简称CDN&#xff0c;其原理大概是将网站内容分发至加速节点&#xff0c;让用户从就近的服务器节点上获取内容&#xff0c;从而提高网站的访问加载速度。大部分服务商&#xff08;如阿里云&#xff0c;腾讯云&#xff0c;京东云等&#xff09;的CDN服务是按使用量收…

在Linux开发板上安装HomeAssistant

1. 什么是Home Assistant Home Assistant 使用 Python3 开发的&#xff0c;是一个完整的 UI 管理的家庭自动化生态系统&#xff0c;它运行 Home Assistant Core、Home Assistant Supervisor 和附加组件。它预装在 Home Assistant OS 上&#xff0c;当然也可以安装在任何 Linux…

Android ConstraintLayout 使用入门

ConstraintLayout是Android中一个非常强大的布局管理器&#xff0c;它可以帮助我们快速创建复杂的布局&#xff0c;并且具有很好的性能和可扩展性。在本文中&#xff0c;我将从面试的角度&#xff0c;详细讲解ConstraintLayout的概念、特点、使用方法和示例。 概念 Constraint…

力扣19删除链表的倒数第 N 个结点:思路分析+图文全解+方法总结(快慢指针法递归法)+深入思考

文章目录 第一部分&#xff1a;题目描述第二部分&#xff1a;代码实现2.1 快慢指针法2.2 递归 第一部分&#xff1a;题目描述 &#x1f3e0; 链接&#xff1a;19. 删除链表的倒数第 N 个结点 - 力扣&#xff08;LeetCode&#xff09; ⭐ 难度&#xff1a;中等 第二部分&#…

Java【网络编程2】 详解基于 UDP 协议的 Socket API, 逐行代码解析如何网络编程

文章目录 前言一、认识 Socket(套接字), TCP 协议和 UDP 协议1, 什么是 Socket(套接字)2, 浅谈 TCP 协议和 UDP 协议的区别和特点 二、基于 UDP 协议的 Socket API1, DatagramSocket 类2, DatagramPacket 类 三、逐行代码解析网络编程1, 逐行解析客户端1.1, 核心成员方法 start…

(转载)从0开始学matlab(第2天)—MATLAB 变量的初始化

当变量初始化时&#xff0c;MATLAB 将会自动建立变量。有三种方式初始化 MATLAB 中的变量&#xff1a; 1 &#xff0e;用赋值语句初始化变量 2 &#xff0e;用 input 函数从键盘输入初始化变量 3 &#xff0e;从文件读取一个数据 前两种方法我们在这里讨论&#xff0c…

Linux -- 进阶 Web服务器 虚拟主机 -- 基于 域名

基于域名的虚拟主机 &#xff1a; 原理 &#xff1a; # 当服务器无法给每个网站都分配一个独立的 IP 地址时&#xff0c;可以通过用户请求的域 名实现不同域名传输不同的网页数据。 域名解析 &#xff1a; 功能 &#xff1a; 域名<>IP &#xff08; 就是 …