Flink 1.14.*中flatMap,filter等基本转换函数源码

news2024/9/21 4:31:41

这里以flatMap,filter为例,介绍Flink如果要实现这些基本转换需要实现哪些接口,Flink运行时调用这些实现类的入口,这些基本转换函数之间的类关系

  • 一、创建基本转换函数需要实现类继承AbstractRichFunction并实现特性接口
    • 1、RichFlatMapFunction
    • 2、RichFilterFunction
  • 二、Flink把实现了flatMap,filter功能的类加入到作业中
  • 三、Flink运行时如何调用flatMap和filter的实现类的
  • 四、类关系图

一、创建基本转换函数需要实现类继承AbstractRichFunction并实现特性接口

1、RichFlatMapFunction

@Public
public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {
    private static final long serialVersionUID = 1L;

    public RichFlatMapFunction() {
    }
    //需要实现下面这个方法
    public abstract void flatMap(IN var1, Collector<OUT> var2) throws Exception;
}

只需要实现类继承了RichFlatMapFunction,实现了flatMap方法就可以

2、RichFilterFunction

@Public
public abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {
    private static final long serialVersionUID = 1L;

    public RichFilterFunction() {
    }
    //需要实现下面这个类
    public abstract boolean filter(T var1) throws Exception;
}

只需要实现类继承了RichFilterFunction,实现了filter方法就可以

二、Flink把实现了flatMap,filter功能的类加入到作业中

一般是通过如下代码

DataStream<Row>  dateStream = 来自source的数据流
dateStream.flatMap(extend RichFlatMapFunction的子类);
dateStream.filter(extend RichFilterFunction的子类);

三、Flink运行时如何调用flatMap和filter的实现类的

那就看一下dateStream.flatMap方法

@Public
public class DataStream<T> {
    protected final Transformation<T> transformation;

    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);
        return this.flatMap(flatMapper, outType);
    }
    
    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
        return this.transform("Flat Map", outputType, (OneInputStreamOperator)(new StreamFlatMap((FlatMapFunction)this.clean(flatMapper))));
    }
}

StreamFlatMap构造时会把实现类当成入参构建OneInputStreamOperator

@Internal
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> {
    private static final long serialVersionUID = 1L;
    private transient TimestampedCollector<OUT> collector;

    public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
        super(flatMapper);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        super.open();
        this.collector = new TimestampedCollector(this.output);
    }
	
    public void processElement(StreamRecord<IN> element) throws Exception {
        this.collector.setTimestamp(element);
        //这里就是调用的父类的userFunction,即构造函数传入的flatMapper
        ((FlatMapFunction)this.userFunction).flatMap(element.getValue(), this.collector);
    }
}

下面会把userFunction赋值给AbstractUdfStreamOperator的字段,这样子类在调用userFunction时就是调用的这个

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
    private static final long serialVersionUID = 1L;
    protected final F userFunction;

    public AbstractUdfStreamOperator(F userFunction) {
        this.userFunction = (Function)Objects.requireNonNull(userFunction);
        this.checkUdfCheckpointingPreconditions();
    }
}

这样StreamFlatMapuserFunction的操作,就是对实现了RichFlatMapFunction的子类的操作

像filter也类似,如下

@Internal
public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
    private static final long serialVersionUID = 1L;

    public StreamFilter(FilterFunction<IN> filterFunction) {
        super(filterFunction);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        if (((FilterFunction)this.userFunction).filter(element.getValue())) {
            this.output.collect(element);
        }

    }
}

StreamFilterStreamFlatMap都是继承了AbstractUdfStreamOperator 实现了OneInputStreamOperator接口,
你可以理解StreamFilterStreamFlatMap有共同的父类和接口,

四、类关系图

RichFlatMapFunction
在这里插入图片描述
RichFilterFunction
在这里插入图片描述

通过上面两张图就知道RichFlatMapFunctionRichFilterFunction都是相同的父类扩展下来的

StreamFlatMap
在这里插入图片描述
StreamFilter
在这里插入图片描述
通过上面的也清楚,StreamFlatMapStreamFilter都是相同的父类和接口,只是processElement方法的实现不一样

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2087051.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

甲基化组学全流程生信分析教程

甲基化组学全流程分析和可视化教程 读取数据目录下的idat文件的甲基化全流程一键分析 功能简介 甲基化分析模块可以实现甲基化芯片450K, 870kEPIC数据的自动读取&#xff0c;可以读取idat文件&#xff0c;也可以读取beta甲基化矩阵文件甲基化数据的缺失值插值甲基化数据的质…

【ArcGIS/GeoScenePro】Portal和Server关系

简介 上图简化后 三层 最上面:应用层 中间(门户):连接应用层和服务器,对server上发布的服务进行管理、分享和权限分配 最低面:服务器 例如:桌面想用server里的服务数据资源,需要通过portal去请求 Enterprise = portal(中间)+server(最底面层) 具体的Enterpri…

Unity Foreach循环GC测试

关于网上讨论Foreach循环会不会产生GC的问题&#xff0c;不如自己实验一番&#xff0c;我用的Unity版本是2021.3.23f1c1版本。 测试代码如下&#xff1a; using System.Collections.Generic; using UnityEngine; using UnityEngine.Profiling;namespace Test {public class M…

sqli-labs靶场通关攻略(41-50)

Less-41 1、判断闭合方式 输入?id1 -- 必和成功 2、查看回显点 输入?id-1 union select 1,2,3 -- 得出回显点为2&#xff0c;3 3、查询数据库名 输入?id-1 union select 1,2,database() -- 4、查询数据库中的表名 输入?id-1 union select 1,2,group_concat(table_nam…

Notepad++回车不自动补全

问题 使用Notepad时&#xff0c;按回车经常自动补全&#xff0c;但我们希望回车进行换行&#xff0c;而不是自动补全&#xff0c;而且自动补全使用Tab进行补全足够了。下文介绍设置方法。 设置方法 打开Notepad&#xff0c;进入设置 - 首选项 - 自动完成&#xff0c;在插入选…

代码随想录Day 29|leetcode题目:134.加油站、135.分发糖果、860.柠檬水找零、406.根据身高重建队列

提示&#xff1a;DDU&#xff0c;供自己复习使用。欢迎大家前来讨论~ 文章目录 第八章 贪心算法 part03二、题目题目一&#xff1a;134. 加油站解题思路&#xff1a;暴力方法贪心算法&#xff08;方法一&#xff09;贪心算法&#xff08;方法二&#xff09; 题目二&#xff1a…

openEuler:日志管理

日志介绍 概述 系统日志是一系列用于记录系统操作和活动进行的文件&#xff0c;这些日志对于监控和排查系统问题非常重要&#xff0c;因为它们可以提供有关系统行为、应用活动和安全事件的见解。系统日志还可以成为识别 Linux 系统中潜在安全弱点和漏洞的重要信息来源。通过分…

[米联客-XILINX-H3_CZ08_7100] FPGA程序设计基础实验连载-20 I2C MASTER控制器驱动设计

软件版本&#xff1a;VIVADO2021.1 操作系统&#xff1a;WIN10 64bit 硬件平台&#xff1a;适用 XILINX A7/K7/Z7/ZU/KU 系列 FPGA 实验平台&#xff1a;米联客-MLK-H3-CZ08-7100开发板 板卡获取平台&#xff1a;https://milianke.tmall.com/ 登录“米联客”FPGA社区 http…

pyautogui通过图像获取定位实现计算器自动计算

使用 pyautogui.locateCenterOnScreen 能够在屏幕上搜索给定图像的位置&#xff0c;并准确地返回该图像的中心点坐标。 &#x1f33f;使用 pyautogui 实现计算器自动计算 准备工作&#xff0c;把计算器的按钮截图保存下来。例如&#xff1a; 计算“75”&#xff0c;实现代码如…

【网络】WebSocket协议详解

WebSocket协议详解 一 、WebSocket 诞生背景二、WebSocket 特点三、WebSocket 的握手环节四、WebSokect 的数据格式1、 第一个字节2、第二个字节3、Masking-key4、playload Data5、一些注意细节 WebSocket 的官方文档 WebSocket 的中文文档(非官方) 一 、WebSocket 诞生背景 在…

深度学习基础—简单的卷积神经网络

3.1.卷积层 下面以卷积神经网络的某一层为例&#xff0c;详解一下网络的结构。 假设当前位于l层&#xff0c;则输入6*6*3的彩色图片&#xff0c;有两个3*3*3的过滤器&#xff0c;卷积操作后将输出2个4*4的图片。如果把过滤器看成权重w&#xff0c;卷积这一步操作其实就是w*a&am…

消息称华为纯血鸿蒙部分应用采用虚拟机方案

华为预计在11月发布正式版纯血鸿蒙&#xff0c;为了能够适配更多的App&#xff0c;官方也是有了新的解决方案。报道中提到&#xff0c;纯血鸿蒙设备对有些还没上架的应用会使用虚拟机方案过渡。据悉&#xff0c;华为的虚拟机方案作为过渡措施&#xff0c;首先能确保用户在鸿蒙系…

概率论与编程的联系及数据科学应用

目录 引言 第一章 概率模拟与编程实现 1.1 随机数生成与蒙特卡罗模拟 1.1.2 蒙特卡罗模拟 第二章 统计建模与数据分析 2.1 统计模型实现 2.2 概率图模型 第三章 概率论在机器学习中的应用 3.1 随机森林与决策树 3.2 贝叶斯分类器 总结与展望 引言 在大数据和人工智…

学习node.js 十 redis的基本语法

redis Redis&#xff08;Remote Dictionary Server&#xff09;是一个开源的内存数据结构存储系统&#xff0c;它提供了一个高效的键值存储解决方案&#xff0c;并支持多种数据结构&#xff0c;如字符串&#xff08;Strings&#xff09;、哈希&#xff08;Hashes&#xff09;、…

素数之和(c语言)

1./描述 //牛牛刚刚学了素数的定义&#xff1a;素数值指在大于1的自然数中&#xff0c;除了1和它本身以外不再有其他因数的自然数 //牛牛想知道在[l, r] 范围内全部素数的和 //输入描述&#xff1a; //输入两个正整数 l&#xff0c;r 表示闭区间范围 //输出描述&#xff1a; //…

sqli-labs靶场通关攻略 46-50

主页有sqli-labs靶场通关攻略 1-45 第四六关 less-46 步骤一&#xff1a;利用报错注入查询库 ?sort1 and updatexml(1,concat(0x7e,database(),0x7e),1) 步骤二&#xff1a;查询表名 ?sort1 and updatexml(1,concat(0x7e,(select group_concat(table_name)from informatio…

如何通过日志或gv$sql_audit,分析OceanBase运行时的异常SQL

本文作者&#xff1a;郑增权&#xff0c;爱可生 DBA 团队成员&#xff0c;OceanBase 和 MySQL 数据库技术爱好者。本文约 2000 字&#xff0c;预计阅读需要 8 分钟。 简介 在 OCP 云平台的 Top SQL 界面中&#xff0c;能观察到异常SQL&#xff0c;但这些SQL并未明确显示具体的…

防泄密的方法都有哪些?

一、防泄密的方法都有哪些&#xff1f;使用安全通讯工具&#xff1a;采用加密通讯工具&#xff0c;确保敏感信息在传输过程中不被窃取或篡改。定期安全审计&#xff1a;对系统和数据进行定期的安全审计和检查&#xff0c;发现潜在的泄密风险并及时处理。文件加密&#xff1a;对…

光伏电站的施工步骤

施工准备&#xff1a;在施工前&#xff0c;需要进行现场勘查&#xff0c;了解施工场地的地形、地貌、气候等情况&#xff0c;制定施工方案和安全措施。同时&#xff0c;还需要准备好施工所需的材料和设备&#xff0c;如光伏组件、支架、电缆、逆变器等 。基础施工&#xff1a;根…

“面试宝典:高频算法题目详解与总结”

干货分享&#xff0c;感谢您的阅读&#xff01; &#xff08;暂存篇---后续会删除&#xff0c;完整版和持续更新见高频面试题基本总结回顾&#xff08;含笔试高频算法整理&#xff09;&#xff09; 备注&#xff1a;引用请标注出处&#xff0c;同时存在的问题请在相关博客留言…