[flink 实时流基础] 转换算子

news2025/1/10 10:23:03

flink学习笔记

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
image.png


文章目录

        • 基本转换算子(map/ filter/ flatMap)
        • 聚合算子(Aggregation)
          • 按键分区(keyBy)
        • 简单聚合(sum/min/max/minBy/maxBy)
        • 归约聚合(reduce)
        • 物理分区算子(Physical Partitioning)
          • 随机分区(shuffle)
          • 轮询分区(Round-Robin)
          • 重缩放分区(rescale)
          • 广播(broadcast)
          • 全局分区(global)
          • 自定义分区(Custom)

基本转换算子(map/ filter/ flatMap)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
image.png
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
image.png
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
:::info
消费一个元素,可以产生0到多个元素。
:::
flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
image.png

聚合算子(Aggregation)
按键分区(keyBy)

对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。
基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。
image.png
在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法。

简单聚合(sum/min/max/minBy/maxBy)

sum():在输入流上,对指定的字段做叠加求和的操作。
min():在输入流上,对指定的字段求最小值。
max():在输入流上,对指定的字段求最大值。
minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

归约聚合(reduce)

reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的定义如下:

public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

物理分区算子(Physical Partitioning)

常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。

随机分区(shuffle)

最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。
随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。
image.png

轮询分区(Round-Robin)

轮询,简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
image.png

重缩放分区(rescale)

重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
image.png

广播(broadcast)

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。

全局分区(global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

自定义分区(Custom)

当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。
1)自定义分区器

public class MyPartitioner implements Partitioner<String> {
    @Override
    public int partition(String key, int numPartitions) {
        return Integer.parseInt(key) % numPartitions;
    }
}

2)使用自定义分区

public class PartitionCustomDemo {
    public static void main(String[] args) throws Exception {
        //        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        
        env.setParallelism(2);
        
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);
        
        DataStream<String> myDS = socketDS
            .partitionCustom(new MyPartitioner(), value -> value);
        myDS.print();
            
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1559124.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Spring MVC】快速学习使用Spring MVC的注解及三层架构

&#x1f493; 博客主页&#xff1a;从零开始的-CodeNinja之路 ⏩ 收录文章&#xff1a;【Spring MVC】快速学习使用Spring MVC的注解及三层架构 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 Spring Web MVC一: 什么是Spring Web MVC&#xff1…

成绩管理系统|基于springboot成绩管理系统的设计与实现(附项目源码+论文)

基于springboot成绩管理系统的设计与实现 一、摘要 传统办法管理信息首先需要花费的时间比较多&#xff0c;其次数据出错率比较高&#xff0c;而且对错误的数据进行更改也比较困难&#xff0c;最后&#xff0c;检索数据费事费力。因此&#xff0c;在计算机上安装毕业设计成绩管…

全局自定义指令实现图片懒加载,vue2通过js和vueuse的useintersectionObserver实现

整体逻辑&#xff1a; 1.使用全局自定义指令创建图片懒加载指令 2.在全局自定义指令中获取图片距离顶部的高度和整个视口的高度 3.实现判断图片是否在视口内的逻辑 一、使用原生js在vue2中实现图片懒加载 1.创建dom元素,v-lazy为自定义指令&#xff0c;在自定义指令传入图片…

瑞吉外卖实战学习--8、人员编辑更新

人员编辑更新 前言1、进入编辑窗口需要先获取用户信息2、通过上篇文章的update的方法来改变数据3、测试效果 前言 1、进入编辑窗口需要先获取用户信息 通过注解PathVariable 来获取路径需要携带的id然后赋值到路径上&#xff0c;再通过id查询用户信息 /*** 通过id查询用户信…

Java毕业设计-基于springboot开发的招聘信息管理系统平台-毕业论文+答辩PPT(附源代码+演示视频)

文章目录 前言一、毕设成果演示&#xff08;源代码在文末&#xff09;二、毕设摘要展示1、开发说明2、需求分析3、系统功能结构 三、系统实现展示1、系统功能模块2、管理员功能模块3、企业后台管理模块4、用户后台管理模块 四、毕设内容和源代码获取总结 Java毕业设计-基于spri…

黄金涨是商品牛市的领先信号

自2022年11月以来&#xff0c;黄金价格持续上涨&#xff0c;目前已经突破历史新高&#xff0c;历史上黄金上涨&#xff0c;大多是商品全面牛市的领先信号。在2008年Q4、2019年也出现过&#xff0c;黄金比其他商品更强&#xff0c;但随后的2009年和2020年均是商品的全面牛市。同…

【前端】layui前端框架学习笔记

【前端目录贴】 参考视频:LayUI 参考笔记:https://blog.csdn.net/qq_61313896/category_12432291.html 1.介绍 官网&#xff1a;http://layui.apixx.net/index.html 国人16年开发的框架,拿来即用,门槛低 … 2. LayUi的安装及使用 Layui 是一套开源的 Web UI 组件库&#xff0…

【论文阅读】UniLog: Automatic Logging via LLM and In-Context Learning

注 由于其公司的保密政策&#xff0c;本文没有公开源代码&#xff0c;数据是公开的。 文章目录 摘要一、介绍二、背景和动机2.1、日志语句生成2.2、大语言模型2.3、上下文学习&#xff08;In-Context Learning&#xff0c;ICL) 三、UNILOG3.1、模型骨干3.2、提示策略3.2.1、提…

stable diffusion如何下载预处理器?

如何下载预处理器&#xff1f; 具体位置:SD文件>extensions>sd-webui-controlnet>annotator” 把整个文件夹复制到SD的文件夹里面 里面有一个“downloads”文件夹 把这些模型复制到“downloads”文件夹里

YOLOV5训练自己的数据集教程(万字整理,实现0-1)

文章目录 一、YOLOV5下载地址 二、版本及配置说明 三、初步测试 四、制作自己的数据集及转txt格式 1、数据集要求 2、下载labelme 3、安装依赖库 4、labelme操作 五、.json转txt、.xml转txt 六、修改配置文件 1、coco128.yaml->ddjc_parameter.yaml 2、yolov5x.…

PLC的大脑和心脏——CPU及其分类、端子接线图

CPU不断地采集输入信号&#xff0c;执行用户程序&#xff0c;刷新系统的输出。 根据供电方式和输入/输出方式的不同&#xff0c;CPU分为3类&#xff0c;如下图1。 图1 CPU的分类 第1对字母&#xff0c;表示CPU的供电方式&#xff0c;AC&#xff08;Alternating Current&#…

vcf文件可以用excel打开吗?四种解决方案

vcf文件可以用excel打开吗&#xff1f; 当然可以。 一、VCF文件简介 VCF&#xff08;vCard&#xff09;文件是一种用于存储联系人信息的文件格式。它通常包含姓名、电话号码、电子邮件地址、地址等详细信息。VCF文件在多种设备和操作系统中广泛使用&#xff0c;特别是在电子邮…

百度云加速方法「Cheat Engine」

加速网盘下载 相信经常玩游戏的小伙伴都知道「Cheat Engine」这款游戏内存修改器&#xff0c;它除了能对游戏进行内存扫描、调试、反汇编 之外&#xff0c;还能像变速齿轮那样进行本地加速。 这款专注游戏的修改器&#xff0c;被大神发现竟然还能加速百度网盘资源下载&#xf…

MySQL数据库备份策略与实践详解

目录 引言 一、MySQL数据库备份的重要性 &#xff08;一&#xff09;数据丢失的原因 &#xff08;二&#xff09;数据丢失的后果 二、MySQL备份类型 &#xff08;一&#xff09;根据数据库状态 &#xff08;二&#xff09;根据数据的完整性 &#xff08;三&#xff09;…

uniapp开发App(二)开通 微信授权登录功能(应用签名、证书、包名 全明白)

前言&#xff1a;开发App肯定要包含登陆&#xff0c;常用登陆方式很多&#xff0c;我选择微信登陆。 一、如何获得微信的授权登陆 答&#xff1a;申请&#xff0c;根据uniapp官网的提示有如下三个步骤 开通 1. 登录微信开放平台区&#xff0c;添加移动应用并提交审核&#xf…

基于java+SpringBoot+Vue的网上书城管理系统设计与实现

基于javaSpringBootVue的网上书城管理系统设计与实现 开发语言: Java 数据库: MySQL技术: SpringBoot MyBatis工具: IDEA/Eclipse、Navicat、Maven 系统展示 前台展示 后台展示 系统简介 整体功能包含&#xff1a; 网上书城管理系统是一个基于互联网的在线购书平台&#…

MSTP环路避免实验(华为)

思科设备参考&#xff1a;MSTP环路避免实验&#xff08;思科&#xff09; 一&#xff0c;技术简介 MSTP&#xff08;多生成树协议&#xff09;&#xff0c;MSTP解决了STP和RSTP没有考虑vlan的问题&#xff0c;STP和RSTP将所有的vlan共享为一个生成树实例&#xff0c;无法实现…

STM32 字符数组结束符 “\0”

STM32 字符数组结束符 “\0” 使用字符数组使用printf&#xff0c;string参考 使用字符数组 使用STM32的串口发送数据&#xff0c;核心代码如下&#xff1a; char str[] "hello world!\n\r";while(1) {HAL_UART_Transmit(&huart2, str, sizeof (str), 10);HAL…

AI预测福彩3D第22弹【2024年3月31日预测--第5套算法开始计算第4次测试】

今天&#xff0c;咱们继续进行本套算法的测试&#xff0c;今天为第四次测试&#xff0c;仍旧是采用冷温热趋势结合AI模型进行预测。好了&#xff0c;废话不多说了。直接上结果~ 仍旧是分为两个方案&#xff0c;1大1小。 经过人工神经网络计算并进行权重赋值打分后&#xff0c;3…

Spring Boot 使用 Redis

1&#xff0c;Spring 是如何集成Redis的&#xff1f; 首先我们要使用jar包 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><gro…