Flink作业执行之 2.算子 StreamOperator

news2024/11/22 23:36:26

Flink作业执行之 2.算子 StreamOperator

前文介绍了Transformation创建过程,大多数情况下通过UDF完成DataStream转换中,生成的Transformation实例中,核心逻辑是封装了SimpleOperatorFactory实例。

UDF场景下,DataStream到Transformationg过程中,SimpleOperatorFactory实例的创建过程大致如下伪代码所示。

// 具体的函数实例
Function function = ;
// 将函数实例封装到算子实例中
AbstractUdfStreamOperator operator = new AbstractUdfStreamOperator(function);
// 通过算子实例得到其SimpleOperatorFactory实例
SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator)

这里的UDF可以简单理解为需要我们自己传入对应Function实现类的操作,如map、filter等。

问题:
StreamOperator是什么?
为什么需要将Function封装到StreamOperator中?

1. Flink算子

在应用程序中通过各种各样的Function完成DataStream转换,但是Function仅表示数据处理逻辑,并不关心数据从哪里来到哪里去。
以MapFunction为例,map方法中仅包含对每一条到来数据的具体处理逻辑,并不清楚map方法何时被调用,结果返回到哪。

一个完整的数据处理逻辑应该是获取数据->处理数据->输出数据,在Flink中这个最小的完整逻辑通过算子表示,顶层抽象接口为StreamOperator

因此Function作为算子的一部分参与后续的数据加工。

算子包含生命周期、状态和容错管理、数据处理3个方面。设计时分为两条线:

  • 生命周期、状态和容错管理,主要是AbstractStreamOperator抽象类及其子类实现,以及未来的AbstractStreamOperatorV2抽象类。
  • 数据处理,主要是OneInputStreamOperatorTwoInputStreamOperatorMultipleInputStreamOperator接口,分别表示单流、双流和多流的数据处理。在接口中定义了数据的处理方法。

StreamOperator完整的顶层抽象如下。

在这里插入图片描述

  • AbstractStreamOperator,所有流运算的基类。提供了生命周期和属性方法的默认实现。
    包含UDF的算子需继承其AbstractUdfStreamOperator子类
    对于其具体实现,还必须实现OneInputStreamOperator或TwoInputStreamOperator其中一个。
    将来将会使用AbstractStreamOperatorV2替换该基类
  • OneInputStreamOperator,支持单流输入的运算符接口,如果要实现自定义运算符,需要使用AbatractUdfStreamOperator作为基类
  • TwoInputStreamOperator,支持双流输入的运算符基类。同样需要和AbstractStreamOperator一起使用。
  • AbstractStreamOperatorV2,所有流运算符的新基类,旨在取代AbatractUdfStreamOperator。
    当前仅仅用于和MultipleInputStreamOperator一起配合使用。

OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator分别对应了Tranformation实现类的OneInputTransformation、TwoInputTransformation和AbstractMultipleInputTransformation。

MultipleInputStreamOperator和AbstractStreamOperatorV2是高版本中才加入的。因此,flink中最初仅支持单流或双流的输入,多流场景下需要拆分成单流或双流进行处理。在支持不同输入的流的实现中,梳理数据的方法分别如下

// 单流输入
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> {
    // 处理数据
    void processElement(StreamRecord<IN> element) throws Exception;
}

// 双流输入
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {

    // 处理双流输入中第一个流上的元素
    void processElement1(StreamRecord<IN1> element) throws Exception;

    // 处理双流输入中第二个流上的元素
    void processElement2(StreamRecord<IN2> element) throws Exception;
}

// 多流输入,这里的Input和单流输入继承的Input父类为同一个
public interface MultipleInputStreamOperator<OUT> extends StreamOperator<OUT> {
    List<Input> getInputs();
}

在AbstractStreamOperator众多子类中,AbstractUdfStreamOperator抽象类中封装了Function接口,并且其中open、close等算子生命周期等方法,实际上就是调用Function实例的对应方法。

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
    // 封装Function
    protected final F userFunction;
    // 通过Function实现进行算子的实例化
    public AbstractUdfStreamOperator(F userFunction) {
        this.userFunction = requireNonNull(userFunction);
        checkUdfCheckpointingPreconditions();
    }

    // 算子生命周期的相关方法,实际上调用Function的方法
    @Override
    public void open() throws Exception {
        super.open();
        FunctionUtils.openFunction(userFunction, new Configuration());
    }

    @Override
    public void finish() throws Exception {
        super.finish();
        if (userFunction instanceof SinkFunction) {
            ((SinkFunction<?>) userFunction).finish();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        FunctionUtils.closeFunction(userFunction);
    }
}

常用的实现类基本继承自AbstractUdfStreamOperator抽象类。

单流输入,如map、fliter、source、sink等实现类

在这里插入图片描述
sink算子有两个实现类,分别是SinkOperatorStreamSink<IN>。二者的关系为SinkOperatorStreamSink<RowData>的特例。

双流输入,如concat、intervalJoin等实现类

在这里插入图片描述
本文开头提到通过SimpleOperatorFactory.of方式生成SimpleOperatorFactory实例,该方法如下

public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {
    if (operator == null) {
        return null;
    } else if (operator instanceof StreamSource
            && ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {
        // 通过addSoure方法添加的Source方式,且SourceFunction为InputFormatSourceFunction的子类
        return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator);
    } else if (operator instanceof StreamSink
            && ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {
        // 通过addSink方法添加的sink方式,且SinkFunction为OutputFormatSinkFunction的子类
        return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);
    } else if (operator instanceof AbstractUdfStreamOperator) {
        return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);
    } else {
        return new SimpleOperatorFactory<>(operator);
    }
}

得到SimpleOperatorFactory实例后,在实际执行时,通过其createStreamOperator方法得到StreamOperator实例。

1.1. 算子生成示例

上述内容偏概念更多一些,通过map为例实际观察Function->StreamOperator->StreamOperatorFactory->Transformation的过程

// 步骤1,业务代码中使用map操作
DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))

// 步骤2,将业务代码中提供的MapFunction封装成StreamMap
public <R> SingleOutputStreamOperator<R> map(
        MapFunction<T, R> mapper, TypeInformation<R> outputType) {
    // 将MapFunction封装成StreamMap,StreamMap为AbstractUdfStreamOperator子类
    return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}

// 步骤3,根据StreamMap获取其对应的SimpleOperatorFactory工厂实例
public <R> SingleOutputStreamOperator<R> transform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        OneInputStreamOperator<T, R> operator) {
    
    // 获取StreamMap对应的StreamOperatorFactory工厂类
    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

// 步骤4,将工厂实例传入到Transformation中
protected <R> SingleOutputStreamOperator<R> doTransform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        StreamOperatorFactory<R> operatorFactory) {

    OneInputTransformation<T, R> resultTransform =
            new OneInputTransformation<>(
                    this.transformation,
                    operatorName,
                    // 将StreamOperatorFactory工厂实例,传入到Transformation中
                    operatorFactory,
                    outTypeInfo,
                    environment.getParallelism());

    @SuppressWarnings({"unchecked", "rawtypes"})
    SingleOutputStreamOperator<R> returnStream =
            new SingleOutputStreamOperator(environment, resultTransform);

    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

在步骤2中,将MapFunction封装成StreamMap,StreamMap是AbstractUdfStreamOperator的子类,并且同时实现了OneInputStreamOperator,进行数据处理逻辑。在处理数据时,实际上是调用MapFunction的map方法完成,即在业务代码中指定的row -> Tuple2.of(row, 1)的逻辑。

public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {
    // 以下3个属性从父类继承
    // 函数实例
    protected final F userFunction;
    // 结果输出
    protected transient Output<StreamRecord<OUT>> output;
    // 默认算子链生成策略
    protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        // 实例化StreamMap时,指定ALWAYS的算子链生成策略
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // userFunction即MapFunction处理数据时,实质调用MapFunction的map方法。
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

要在Task中算子才会真正执行,这里仅仅是在逻辑上完成算子的定义。

2. 算子链

Flink中会将多个算子合并到一起,组成算子链从而提高算子的运行效率。同一个算子链意味着将在同一个线程中运行。flink中算子链使用OperatorChain抽象类表示。

算子的合并策略在ChainingStrateg枚举类中定义,详情如下

/**
 * StreamOperator 使用的默认值为 HEAD,这意味着算子不链接到其前身。大多数算子使用 ALWAYS 覆盖此操作,这意味着它们将尽可能链接到前身。 
 */
public enum ChainingStrategy {
    // 尽可能的将和上游算子链接到一起,大多数算子的默认值
    ALWAYS,
    // 当前算子不会上下游算子链接到一起
    NEVER,
    // 不会上游算子连接到一起,但是可以和下游算子链接到一起
    HEAD,
    // 此运算符将运行在链的头部(与 HEAD 类似,但它还会尝试在可能的情况下链接source。这允许将多输入运算符与多个源链接到一个任务中。
    HEAD_WITH_SOURCES;
    public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1821849.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

免费听歌,电脑或手机免费听歌,落雪音乐安装详细步骤

近年来&#xff0c;由于资本的力量导致各种收费&#xff0c;看个电视想听歌都必须要付费了&#xff0c;否则你听不完整&#xff0c;吃相非常难看&#xff0c;特别是电视&#xff0c;吸血鬼式吸收各种会员费&#xff0c;各种APP也是铺天盖地的广告&#xff0c;渐渐迷失了自我&am…

【C++】AVL树/红黑树实现及map与set的封装

前言 【C】二叉树进阶&#xff08;二叉搜索树&#xff09; 这篇文章讲述了关于二叉搜索树知识&#xff0c;但是二叉搜索树有其自身的缺陷&#xff0c;假如往树中插入的元素有序或者接近有序&#xff0c;二叉搜索树就会退化成单支树&#xff0c;时间复杂度会退化成O(N)&#xff…

CCAA质量管理体系 (2021)

一、考试大纲 中 国 认 证 认 可 协 会 质量管理体系基础考试大纲 第 1 版 文件编号&#xff1a;CCAA-TR-105-01:2021 发布日期&#xff1a;2021 年 3 月 2 日 实施日期&#xff1a;2021 年 4 月 1 日 质量管理体系基础考试大纲&#xff08;第 1 版&#xff09;1.总则 本大…

大数据集成平台建设方案(Word方案)

基础支撑平台主要承担系统总体架构与各个应用子系统的交互&#xff0c;第三方系统与总体架构的交互。需要满足内部业务在该平台的基础上&#xff0c;实现平台对于子系统的可扩展性。基于以上分析对基础支撑平台&#xff0c;提出了以下要求&#xff1a; 基于平台的基础架构&…

LabVIEW电源适应能力检测系统

随着工业自动化程度的提高&#xff0c;电源质量直接影响设备的稳定运行。利用LabVIEW开发一个单相电源适应能力检测系统&#xff0c;该系统通过智能化和自动化测试&#xff0c;提高了测试效率&#xff0c;减少了人为错误&#xff0c;保证了电源质量的可靠性。 项目背景 在现代…

软件系统设计开发规程(Word文件)

技术解决方案过程包括&#xff1a; 1、选择最佳解决方案&#xff1b; 2、制定架构设计&#xff1b; 3、制定概要设计&#xff1b; 4、制定详细设计和数据库设计&#xff1b; 5、利用准则进行接口设计&#xff1b; 6、实现设计&#xff1b; 7、进行单元测试&#xff1b; 8、进行…

知乎社招1年Go开发123+HR面经,期望22k

面经哥只做互联网社招面试经历分享&#xff0c;关注我&#xff0c;每日推送精选面经&#xff0c;面试前&#xff0c;先找面经哥 一面‍ 0、自我介绍 1、你才工作一年为什么就想找机会了&#xff08;为什么想跳&#xff09;【甩锅给公司&#xff0c;反正不是我的问题】 2、对…

boot项目配置邮箱发送

最近项目准备进入测试阶段&#xff0c;时间相对充沛些&#xff0c;便对邮箱的信息发送记录下&#xff01; 邮箱设置-开启smtp协议及获取授权码 以QQ邮箱为例&#xff0c;其他邮箱大同小异&#xff01; 开启协议 获取授权码 具体代码 基于javax.mail实现 原文可看 前辈帖子…

达梦数据库上市,给数据库国产化加油打气

吉祥学安全知识星球&#x1f517;除了包含技术干货&#xff1a;《Java代码审计》《Web安全》《应急响应》《护网资料库》《网安面试指南》还包含了安全中常见的售前护网案例、售前方案、ppt等&#xff0c;同时也有面向学生的网络安全面试、护网面试等。 作为家乡的企业上市必须…

蓝牙音频解码芯片TD5163介绍,支持红外遥控—拓达半导体

蓝牙芯片TD5163A是一颗支持红外遥控、FM功能和IIS音频输出的蓝牙音频解码芯片&#xff0c;此颗芯片的亮点在于同时支持真立体声&单声道、TWS功能、PWM、音乐频谱和串口AT指令控制等功能&#xff0c;芯片在支持蓝牙无损音乐播放的同时&#xff0c;还支持简单明了的串口发送A…

RTE Open Day 联手 AGI Playground,最先锋 RTE+AI Builders 齐聚北京丨活动招募

6 月 22 日至 23 日&#xff0c;北京&#xff0c;AGI Playground 2024 即将引燃今夏&#xff01; 这场备受瞩目的 AGI 盛会&#xff0c;将汇聚王小川、杨植麟等众多创业者。RTE 开发者社区的 Builders 和 RTE Open Day 也将亮相其中&#xff01; 「有一群人在一起&#xff0c…

云计算【第一阶段(13)】Linux的Find命令

一、查找文件或目录Find 格式 find 查找的范围 类型 查找数据 1.1、常用查找类型 查找类型关键字说明按名称查找-name根据目标文件的名称进行查找&#xff0c;允许使用“*”及“&#xff1f;”通配符按文件大小查找-size根据目标文件的大小进行查找&#xff0c;一般使用…

[华为北向网管NCE开发教程(6)消息订阅

1.作用 之前介绍的都是我们向网管NCE发起请求获取数据&#xff0c;消息订阅则反过来&#xff0c;是网管NCE系统给我们推送信息。其原理和MQ&#xff0c;JMS这些差不多&#xff0c;这里不过多累述。 2.场景 所支持订阅的场景有如下&#xff0c;以告警通知为例&#xff0c;当我…

基于51单片机智能路灯控制—人数、光强

基于51单片机智能路灯控制 &#xff08;仿真&#xff0b;程序&#xff09; 功能介绍 具体功能&#xff1a; 1.光敏电阻与电阻组成分压电路&#xff0c;环境光强度越强&#xff0c;光敏电阻越小&#xff0c;ADC检测的电压越强&#xff1b; 2.红外计数传感器&#xff08;按键…

Introducing Index-1.9B

简介 大家好&#xff0c;今天我们很高兴首次发布Index系列模型中的轻量版本&#xff1a;Index-1.9B系列 本次开源的Index-1.9B 系列包含以下模型&#xff1a; Index-1.9B base : 基座模型&#xff0c;具有 19亿 非词嵌入参数量&#xff0c;在2.8T 中英文为主的语料上预训练&…

01本地图像导入及参数设置

左边工具栏&#xff1a;采集-》图像源&#xff0c;点击后 拉到流程窗口中 在右边有三个按钮可以添加图像和图像文件夹。 双击 图像源 可以打开 参数设置 参数说明&#xff1a; 像素格式&#xff1a;MONO8 表示图像为黑白图像&#xff0c;RGB24为彩色图像。看你想以什么图像打开…

JS手写题解析

手写Promise class MyPromise {constructor(executor) { // executor执行器this.status pending // 等待状态this.value null // 成功或失败的参数this.fulfilledCallbacks [] // 成功的函数队列this.rejectedCallbacks [] // 失败的函数队列const that thisfunction reso…

我的创作纪念日 --- 携手CSDN的512天

起航 时间过得可真快啊&#xff0c;转眼间距离我发的第一篇文章已经有512天了&#xff0c;那是一个寒假&#xff0c;当我发现自己又浑浑噩噩的过完了一个学期时&#xff0c;我才开始思考自己想拥有怎样的人生&#xff0c;然后我就写下了自己的第一篇文章 about me&#xff0c;…

哪些国产项目管理软件最受欢迎?详细解读六大主流系统

满足国产化诉求的6款项目管理系统&#xff1a;PingCode、Worktile、Teambition、禅道、华为云DevCloud、Tapd。 国产项目管理软件以其定制化高、适应本土市场的优势&#xff0c;正成为越来越多企业的选择。本文将探讨几款优秀的国产项目管理工具&#xff0c;帮助您找到提升团队…

C# WPF入门学习主线篇(三十三)—— 使用ICommand实现命令绑定

C# WPF入门学习主线篇&#xff08;三十三&#xff09;—— 使用ICommand实现命令绑定 在MVVM模式中&#xff0c;命令绑定是将用户交互&#xff08;如按钮点击&#xff09;与ViewModel中的方法连接起来的一种机制。使用ICommand接口可以实现这一功能&#xff0c;从而将UI逻辑与业…