Flink任务如何跑起来之 2.算子 StreamOperator

news2025/1/14 18:24:34

Flink任务如何跑起来之 2.算子 StreamOperator

前文介绍了Transformation创建过程,大多数情况下通过UDF完成DataStream转换中,生成的Transformation实例中,核心逻辑是封装了SimpleOperatorFactory实例。

UDF场景下,DataStream到Transformationg过程中,SimpleOperatorFactory实例的创建过程大致如下伪代码所示。

// 具体的函数实例
Function function = ;
// 将函数实例封装到算子实例中
AbstractUdfStreamOperator operator = new AbstractUdfStreamOperator(function);
// 通过算子实例得到其SimpleOperatorFactory实例
SimpleOperatorFactory factory = SimpleOperatorFactory.of(operator)

这里的UDF可以简单理解为需要我们自己传入对应Function实现类的操作,如map、filter等。

问题:
StreamOperator是什么?
为什么需要将Function封装到StreamOperator中?

1. Flink算子

在应用程序中通过各种各样的Function完成DataStream转换,但是Function仅表示数据处理逻辑,并不关心数据从哪里来到哪里去。
以MapFunction为例,map方法中仅包含对每一条到来数据的具体处理逻辑,并不清楚map方法何时被调用,结果返回到哪。

一个完整的数据处理逻辑应该是获取数据->处理数据->输出数据,在Flink中这个最小的完整逻辑通过算子表示,顶层抽象接口为StreamOperator

因此Function作为算子的一部分参与后续的数据加工。

算子包含生命周期、状态和容错管理、数据处理3个方面。设计时分为两条线:

  • 生命周期、状态和容错管理,主要是AbstractStreamOperator抽象类及其子类实现,以及未来的AbstractStreamOperatorV2抽象类。
  • 数据处理,主要是OneInputStreamOperatorTwoInputStreamOperatorMultipleInputStreamOperator接口,分别表示单流、双流和多流的数据处理。在接口中定义了数据的处理方法。

StreamOperator完整的顶层抽象如下。

在这里插入图片描述

  • AbstractStreamOperator,所有流运算的基类。提供了生命周期和属性方法的默认实现。
    包含UDF的算子需继承其AbstractUdfStreamOperator子类
    对于其具体实现,还必须实现OneInputStreamOperator或TwoInputStreamOperator其中一个。
    将来将会使用AbstractStreamOperatorV2替换该基类
  • OneInputStreamOperator,支持单流输入的运算符接口,如果要实现自定义运算符,需要使用AbatractUdfStreamOperator作为基类
  • TwoInputStreamOperator,支持双流输入的运算符基类。同样需要和AbstractStreamOperator一起使用。
  • AbstractStreamOperatorV2,所有流运算符的新基类,旨在取代AbatractUdfStreamOperator。
    当前仅仅用于和MultipleInputStreamOperator一起配合使用。

OneInputStreamOperator、TwoInputStreamOperator和MultipleInputStreamOperator分别对应了Tranformation实现类的OneInputTransformation、TwoInputTransformation和AbstractMultipleInputTransformation。

MultipleInputStreamOperator和AbstractStreamOperatorV2是高版本中才加入的。因此,flink中最初仅支持单流或双流的输入,多流场景下需要拆分成单流或双流进行处理。在支持不同输入的流的实现中,梳理数据的方法分别如下

// 单流输入
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT>, Input<IN> {
    // 处理数据
    void processElement(StreamRecord<IN> element) throws Exception;
}

// 双流输入
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {

    // 处理双流输入中第一个流上的元素
    void processElement1(StreamRecord<IN1> element) throws Exception;

    // 处理双流输入中第二个流上的元素
    void processElement2(StreamRecord<IN2> element) throws Exception;
}

// 多流输入,这里的Input和单流输入继承的Input父类为同一个
public interface MultipleInputStreamOperator<OUT> extends StreamOperator<OUT> {
    List<Input> getInputs();
}

在AbstractStreamOperator众多子类中,AbstractUdfStreamOperator抽象类中封装了Function接口,并且其中open、close等算子生命周期等方法,实际上就是调用Function实例的对应方法。

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
    // 封装Function
    protected final F userFunction;
    // 通过Function实现进行算子的实例化
    public AbstractUdfStreamOperator(F userFunction) {
        this.userFunction = requireNonNull(userFunction);
        checkUdfCheckpointingPreconditions();
    }

    // 算子生命周期的相关方法,实际上调用Function的方法
    @Override
    public void open() throws Exception {
        super.open();
        FunctionUtils.openFunction(userFunction, new Configuration());
    }

    @Override
    public void finish() throws Exception {
        super.finish();
        if (userFunction instanceof SinkFunction) {
            ((SinkFunction<?>) userFunction).finish();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        FunctionUtils.closeFunction(userFunction);
    }
}

常用的实现类基本继承自AbstractUdfStreamOperator抽象类。

单流输入,如map、fliter、source、sink等实现类

在这里插入图片描述
sink算子有两个实现类,分别是SinkOperatorStreamSink<IN>。二者的关系为SinkOperatorStreamSink<RowData>的特例。

双流输入,如concat、intervalJoin等实现类

在这里插入图片描述
本文开头提到通过SimpleOperatorFactory.of方式生成SimpleOperatorFactory实例,该方法如下

public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {
    if (operator == null) {
        return null;
    } else if (operator instanceof StreamSource
            && ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) {
        // 通过addSoure方法添加的Source方式,且SourceFunction为InputFormatSourceFunction的子类
        return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator);
    } else if (operator instanceof StreamSink
            && ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) {
        // 通过addSink方法添加的sink方式,且SinkFunction为OutputFormatSinkFunction的子类
        return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator);
    } else if (operator instanceof AbstractUdfStreamOperator) {
        return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOperator) operator);
    } else {
        return new SimpleOperatorFactory<>(operator);
    }
}

得到SimpleOperatorFactory实例后,在实际执行时,通过其createStreamOperator方法得到StreamOperator实例。

1.1. 算子生成示例

上述内容偏概念更多一些,通过map为例实际观察Function->StreamOperator->StreamOperatorFactory->Transformation的过程

// 步骤1,业务代码中使用map操作
DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))

// 步骤2,将业务代码中提供的MapFunction封装成StreamMap
public <R> SingleOutputStreamOperator<R> map(
        MapFunction<T, R> mapper, TypeInformation<R> outputType) {
    // 将MapFunction封装成StreamMap,StreamMap为AbstractUdfStreamOperator子类
    return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}

// 步骤3,根据StreamMap获取其对应的SimpleOperatorFactory工厂实例
public <R> SingleOutputStreamOperator<R> transform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        OneInputStreamOperator<T, R> operator) {
    
    // 获取StreamMap对应的StreamOperatorFactory工厂类
    return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}

// 步骤4,将工厂实例传入到Transformation中
protected <R> SingleOutputStreamOperator<R> doTransform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        StreamOperatorFactory<R> operatorFactory) {

    OneInputTransformation<T, R> resultTransform =
            new OneInputTransformation<>(
                    this.transformation,
                    operatorName,
                    // 将StreamOperatorFactory工厂实例,传入到Transformation中
                    operatorFactory,
                    outTypeInfo,
                    environment.getParallelism());

    @SuppressWarnings({"unchecked", "rawtypes"})
    SingleOutputStreamOperator<R> returnStream =
            new SingleOutputStreamOperator(environment, resultTransform);

    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

在步骤2中,将MapFunction封装成StreamMap,StreamMap是AbstractUdfStreamOperator的子类,并且同时实现了OneInputStreamOperator,进行数据处理逻辑。在处理数据时,实际上是调用MapFunction的map方法完成,即在业务代码中指定的row -> Tuple2.of(row, 1)的逻辑。

public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {
    // 以下3个属性从父类继承
    // 函数实例
    protected final F userFunction;
    // 结果输出
    protected transient Output<StreamRecord<OUT>> output;
    // 默认算子链生成策略
    protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        // 实例化StreamMap时,指定ALWAYS的算子链生成策略
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // userFunction即MapFunction处理数据时,实质调用MapFunction的map方法。
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

要在Task中算子才会真正执行,这里仅仅是在逻辑上完成算子的定义。

2. 算子链

Flink中会将多个算子合并到一起,组成算子链从而提高算子的运行效率。同一个算子链意味着将在同一个线程中运行。flink中算子链使用OperatorChain抽象类表示。

算子的合并策略在ChainingStrateg枚举类中定义,详情如下

/**
 * StreamOperator 使用的默认值为 HEAD,这意味着算子不链接到其前身。大多数算子使用 ALWAYS 覆盖此操作,这意味着它们将尽可能链接到前身。 
 */
public enum ChainingStrategy {
    // 尽可能的将和上游算子链接到一起,大多数算子的默认值
    ALWAYS,
    // 当前算子不会上下游算子链接到一起
    NEVER,
    // 不会上游算子连接到一起,但是可以和下游算子链接到一起
    HEAD,
    // 此运算符将运行在链的头部(与 HEAD 类似,但它还会尝试在可能的情况下链接source。这允许将多输入运算符与多个源链接到一个任务中。
    HEAD_WITH_SOURCES;
    public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1819658.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python怎么显示行号

我们如果想让Python IDLE显示行号&#xff0c;我们可以通过扩展IDLE功能来做到。 1.我们需要下载一个LineNumber.py扩展。 2.我们打开Python安装目录&#xff0c;找到安装目录下的Lib\idlelib目录&#xff0c;复制LineNumber到这个目录。 3.然后启动扩展。 4.配置扩展的方式…

CCF 矩阵重塑

第一题&#xff1a;矩阵重塑&#xff08;一&#xff09; 本题有两种思路 第一种 &#xff08;不确定是否正确 但是100分&#xff09; #include<iostream> using namespace std; int main(){int n,m,p,q,i,j;cin>>n>>m>>p>>q;int a[n][m];for(i…

英语学习笔记34——What are they doing?

What are they doing? 他们在做什么&#xff1f; 词汇 Vocabulary sleep v. 睡觉 ing形式&#xff1a;sleeping 例句&#xff1a;那个男孩正在睡觉。    That boy is sleeping. 相关&#xff1a;sleepy 困的 例句&#xff1a;我太困了。    I’m so sleepy. shave v.…

实时数据的处理一致性如何保证?

实时数据一致性的定义以及面临的挑战 数据一致性通常指的是数据在整个系统或多个系统中保持准确、可靠和同步的状态。在实时数据处理中&#xff0c;一致性包括但不限于数据的准确性、完整性、时效性和顺序性。 下图是典型的实时/流式数据处理的流程&#xff1a; 流式数据以各…

技术速递|Java on Azure Tooling 5月更新 - Java 对 Azure 容器应用程序的入门指南支持

作者&#xff1a;Jialuo Gan 排版&#xff1a;Alan Wang 大家好&#xff0c;欢迎阅读 Java on Azure 工具 5 月份更新。在本次更新中&#xff0c;我们将介绍 Java 在 Azure 上的容器应用程序的入门指南。希望您喜欢这些更新&#xff0c;并享受使用 Azure 工具包的流畅体验。请下…

鸿蒙轻内核A核源码分析系列七 进程管理 (3)

本文记录下进程相关的初始化函数&#xff0c;如OsSystemProcessCreate、OsProcessInit、OsProcessCreateInit、OsUserInitProcess、OsDeInitPCB、OsUserInitProcessStart等。 1、LiteOS-A内核进程创建初始化通用函数 先看看一些内部函数&#xff0c;不管是初始化用户态进程还…

助力618!你想便宜寄快递退换货吗?

家人们&#xff0c;姐妹们&#xff0c;马上就要到618了&#xff0c;每年一到这种重要的节日&#xff0c;我们都会买买买&#xff0c;但是我们有时候买了会发现这个商品不太满意&#xff0c;我们会选择退换货&#xff0c;或者给商家邮寄回去&#xff0c;但是这个运费可真的太贵了…

CRC计算单元

CRC计算单元 CRC是Cyclic Redundancy Check,循环冗余校验的缩写. 是一种检测数据错误的技术,主要用在数据通信和数据存储的方面. 但是这种技术只能检测到传输或存储的数据是否有误,没有将错误纠正的功能. 而CRC计算单元是一个独立的具备CRC计算功能的外设. AT32 MCU片上CRC计…

Web应用安全测试-认证功能缺陷

Web应用安全测试-认证功能缺陷 存在空口令 漏洞描述&#xff1a;认证登录环节允许空口令 测试方法&#xff1a; 找到网站登录页面&#xff0c;尝试输入用用户名&#xff0c;密码为空进行登录。 风险分析&#xff1a;攻击者可利用该漏洞登录网站后台&#xff0c;操作敏感数…

Warning: `ReactDOMTestUtils.act` is deprecated in favor of `React.act`.

问题&#xff1a;在代码中使用jest进行单元测试时&#xff0c;报错如下&#xff1a; 解决思路&#xff1a; 根据报错提示出来的 react-dom/test-utils 进行全局搜索&#xff0c;发现没有该引用&#xff0c;故进入该代码块中分析。发现代码中引入testing-library/react &#…

C++ 28 之 类对象作为类成员

#define _CRT_SECURE_NO_WARNINGS #include <iostream> #include <string> using namespace std;class Phone { public:string s_p_name;Phone(string p_name) {s_p_name p_name;cout << "phone的构造函数调用" << endl;}~Phone(){cout &…

BitMEX 联合创始人 Arthur Hayes 加入 Covalent 担任战略顾问

Arthur Hayes 加入 Covalent Network&#xff08;CQT&#xff09;&#xff0c;成为其战略顾问。 Hayes 认为 Covalent 与其竞争对手如 The Graph 相比&#xff0c;Covalent Network 的 CQT 代币一直被相对低估&#xff0c;他希望帮助 Covalent Network&#xff08;CQT&#x…

【Three.js】知识梳理二十一:Three.js性能优化和实践建议

Three.js 是一个功能强大的 3D 引擎&#xff0c;用于创建 WebGL 应用。尽管它功能强大&#xff0c;但在复杂的 3D 场景中保持高性能是一个挑战。本文将分享一些在使用 Three.js 时的性能优化提示&#xff0c;帮助你提高应用的运行效率。 1. 使用 stats.js 监视性能 在进行任何…

苦日子开始了,普通人应该怎么做?

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 以为疫情后&#xff0c;我们的收入会好起来&#xff0c;谁曾想连工作都快保不住了&#xff0c;这几年大家日子过的比较苦&#xff0c;很多人想多一份收入。 面对这种情况&#xff0c;我们普通人应该怎么办?如何多…

【秋招突围】2024届秋招笔试-阿里系列笔试题-第一套-三语言题解(Java/Cpp/Python)

&#x1f36d; 大家好这里是清隆学长 &#xff0c;一枚热爱算法的程序员 ✨ 本系计划跟新各公司春秋招的笔试题 &#x1f4bb; ACM银牌&#x1f948;| 多次AK大厂笔试 &#xff5c; 编程一对一辅导 &#x1f44f; 感谢大家的订阅➕ 和 喜欢&#x1f497; &#x1f4e7; 清隆这边…

C++ 27 之 初始化列表

c27初始化列表.cpp #include <iostream> #include <string.h> using namespace std;class Students06{ public:int s_a;int s_b;int s_c;Students06(int a, int b, int c){s_a a;s_b b;s_c c;}// 初始化列表写法1&#xff1a;// Students06():s_a(4),s_b(5),s_…

使用QT绘制简单的动态数据折线图

两个核心类时QChart和QLineSeries 下面这个示例代码中&#xff0c;定时器每隔一段时间将曲线图中的数据点向右移动 一个单位&#xff0c;同时调整横坐标轴的范围&#xff0c;实现了一次滚动对应移动一个数据点的效果。 QLineSeries最多容纳40961024个点 #include <QtWidg…

【RabbitMQ】初识 RabbitMQ

初识 RabbitMQ 1.认识 RabbitMQ1.1 介绍1. 2.使用场景1.2.1 推送通知1.2.2 异步任务1.2.3 多平台应用的通信1.2.4 消息延迟1.2.5 远程过程调用 1.3 特性 2.基本概念2.1 生产者、消费者和代理2.2 消息队列2.3 交换机2.3.1 direct2.3.2 topic2.3.3 headers2.3.4 fanout 2.4 绑定2…

Downie for Mac v4.7.17 在线视频下载软件 安装(简单易学,小白轻松搞定)

Mac分享吧 文章目录 效果一、准备工作二、开始安装1、双击运行软件&#xff0c;将其从左侧拖入右侧文件夹中&#xff0c;等待安装完毕2、应用程序显示软件图标&#xff0c;表示安装成功 三、运行测试1、打开软件&#xff0c;进行设置2、下载视频&#xff0c;测试3、根据需要选…

关于unbuntu的终端自动退出的解决方案

输入sudo vim /etc/profile 将下面的TMOUT的时间注释掉 source /etc/profile使得更改生效