Flink任务如何跑起来之 1.DataStream和Transformation

news2025/1/14 18:35:16

Flink任务如何跑起来之 1.DataStream和Transformation

1. 滥觞

在使用Flink完成业务功能之余,有必要了解下我们的任务是如何跑起来的。知其然,知其所以然。

既然重点是学习应用程序如何跑起来,那么应用程序的内容不重要,越简单越好。
WordCount示例作为学习数据引擎时hello word程序,再合适不过。接下来便以任务执行顺序为线索开启对源码逐步学习。

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 初始化执行环境
        Configuration configuration = new Configuration();
        configuration.setString("rest.port", "9091");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        env.setParallelism(1);

        // 业务逻辑转换
        DataStream<String> text = env.fromCollection(Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan")).name("zl-source");
        DataStream<Tuple2<String, Integer>> counts = text.map(row -> Tuple2.of(row, 1))
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1)
                .name("counter");
        counts.print().name("print-sink");

        // 执行应用程序
        env.execute("WordCount");
    }
}

为了使示例代码足够纯粹(直接复制粘贴后即可跑起来的那种),因此在示例中直接使用List数据作为Source。

最后,计划将自己学习的过程以系列文档的形式作为记录。同时作为自己学习过程的记录,可能存在错误或片面理解,欢迎一起讨论。

2. 头疼的“角色”

在学习源码或查阅资料的同时,以下单词(但不限于)一定会频繁出现,它们或者直接对应flink源码中的接口、类名,或者是一些概念名称。初次看到难免让人抓狂。现在先对这些单词混个脸熟。

Client
JobManager/JobMaster
TaskManager/TaskExecutor
Transformation
StreamOperator
StreamGraph
JobGraph
ExecutionGraph
Task
StreamTask
……

3. 宏观视角

当任务开始执行后,便可以在WebUI上查看其对应的物理执行拓扑,即Task DAG。从我们编写的应用程序代码到Task DAG势必经历了复杂的解析转换操作,这个过程大体如下所示。

在这里插入图片描述

我们编写的应用程序代码首先会转化为Transformation,该实例将作为Flink世界中的起点,开启了之后一系列“旅程”。

4. env.execute()方法做了什么?

在使用DataStream API编写应用程序时,无论业务逻辑如何如何的复杂,但整体结构大致由三部分构成,即

// 1.初始化执行环境
StreamExecutionEnvironment env = ;
// 2.业务逻辑转换,即一系列的DataStream转化
DataStream source = ;
// 3.env.execute()
env.execute();

既然最后必须执行 env.execute()方法,那么首先了解下execute都执行了那些操作。

基于1.16版本的源码,并只保留了源码中的关键逻辑。

// 方法1
public JobExecutionResult execute(String jobName) throws Exception {
    final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
    // 生成StreamGraph,最终调用方法4,通过StreamGraphGenerator生成StreamGraph
    StreamGraph streamGraph = getStreamGraph();
    // ...
    try {
        // 调用方法2
        return execute(streamGraph);
    } catch (Throwable t) {
        // ...
    }
}
// 方法2
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
    // 调用方法3,通过StreamGraph最终得到JobClient
    final JobClient jobClient = executeAsync(streamGraph);
    try {
        final JobExecutionResult jobExecutionResult;
        // ...
        jobListeners.forEach(
                jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
        return jobExecutionResult;
    } catch (Throwable t) {
        // ...
    }
}
// 方法3
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
    // 根据启动环境,得到对应环境的Executor实现
    // 如miniCluster环境则对应LocalExecutor
    final PipelineExecutor executor = getPipelineExecutor();
    // 在具体的executor.execute方法中,将StreamGraph先转化成JobGraph,在将JobGraph提交到JobManager中
    CompletableFuture<JobClient> jobClientFuture =
            executor.execute(streamGraph, configuration, userClassloader);
    try {
        JobClient jobClient = jobClientFuture.get();
        jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
        collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
        collectIterators.clear();
        return jobClient;
    } catch (ExecutionException executionException) {
        // ...
    }
}
// 方法4
private StreamGraph getStreamGraph(List<Transformation<?>> transformations) {
    synchronizeClusterDatasetStatus();
    // 根据Transformation生成StreamGraph
    return getStreamGraphGenerator(transformations).generate();
}

通过上述源码调用链可以,完成从DataStream API->Transformation->StreamGraph->JobGraph的转化。最后将JobGraph提交到了JobManager中并,执行后续操作。

从上述方法4getStreamGraph(List<Transformation<?>> transformations)可知,StreamGraph由Transformation演变而来,此处不禁会产生一个新的疑问,Transformation又从何而来?

WordCount示例代码中并没有与Transformation直接相关的代码。通过查看getSreamGraph方法的完成调用链可知其入参直接来自是StreamExecutionEnvironment类中的transformations成员属性值。在应用程序第一步便生成了StreamExecutionEnvironment的实例,接下来通过env得到DataStream并进行了一系列的转化操作,而在最后的execute方法中便已直接使用transformations属性值了,那么该属性中一定是前面2个过程中实际赋值的。

protected final List<Transformation<?>> transformations = new ArrayList<>();

5. Transformation何时生成?

从StreamExecutionEnvironment的源码中可知,transformations属性只有addOperator方法会执行集合的add操作,其余地方均为集合的get操作。
然而addOperator方法有诸多调用方,且均为其他类中的调用,继续往上查看调用方有些困难,因此这里暂时记下addOperator方法唯一对transformations集合中执行add操作的结论。

// 该方法不适合用户使用。创建operator的api方法必须调用此方法
@Internal
public void addOperator(Transformation<?> transformation) {
    Preconditions.checkNotNull(transformation, "transformation must not be null.");
    this.transformations.add(transformation);
}

通过查看StreamExecutionEnvironment实例的创建过程,可以发现在创建过程中并无transformations的add操作,因此是在DataStream转换操作中对transformations执行了add操作。

5.1. DataStream

在Flink中使用DataStream表示数据流。其仅用于表达业务转化逻辑,实际上并没有真正的存储数据。

DataSteam是顶层封装类,其子类如下

在这里插入图片描述
DataStream类中只有两个成员属性,分别是StreamExecutionEnvironment和Transformation,并在构造方法中对其进行初始化。因此实例化DataStream的同时除执行环境外,还必须传入Transformation的实例。

public class DataStream<T> {
    protected final StreamExecutionEnvironment environment;
    protected final Transformation<T> transformation;

    public DataStream(StreamExecutionEnvironment environment, Transformation<T> transformation) {
        this.environment =
                Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
        this.transformation =
                Preconditions.checkNotNull(
                        transformation, "Stream Transformation must not be null.");
    }
    // ...
}

回到WordCount示例代码中,从集合到DataStream的过程,封装示意如下。

在这里插入图片描述

注意,Transformation中并不是直接持有了AbstractUdfStreamOperator的引用,而是对应的工厂。

源码中关键步骤如下

// 步骤1,从List到Function
public <OUT> DataStreamSource<OUT> fromCollection(
        Collection<OUT> data, TypeInformation<OUT> typeInfo) {
    // ...
    // 创建SourceFunction实例,SourceFunction是Function的实现
    SourceFunction<OUT> function = new FromElementsFunction<>(data);
    return addSource(function, "Collection Source", typeInfo, Boundedness.BOUNDED)
            .setParallelism(1);
}

// 步骤2,从Function到StreamOperator
private <OUT> DataStreamSource<OUT> addSource(
        final SourceFunction<OUT> function,
        final String sourceName,
        @Nullable final TypeInformation<OUT> typeInfo,
        final Boundedness boundedness) {
    // ...
    // 创建StreamSource实例,StreamSource是AbstractUdfStreamOperator的子类,Flink中算子的表示
    final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
    return new DataStreamSource<>(
            this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
}

// 步骤3,从StreamOperator到Transformation,再到DataStream
public DataStreamSource(
        StreamExecutionEnvironment environment,
        TypeInformation<T> outTypeInfo,
        StreamSource<T, ?> operator,
        boolean isParallel,
        String sourceName,
        Boundedness boundedness) {
    super(
            environment,
            // 创建Transformation实例,Transformation是PhysicalTransformation的子类
            new LegacySourceTransformation<>(
                    sourceName,
                    // 将StreamSource封装到Transformation中
                    operator,
                    outTypeInfo,
                    environment.getParallelism(),
                    boundedness));

    // ...
}

继续查看DataStream的map操作可以可以发现,核心流程和上述由集合创建DataStream的过程基本一致:

  • 首先创建Function实例
  • 其次由Function实例创建AbstractUdfStreamOperator实例
  • 然后将AbstractUdfStreamOperator实例封装到Transformation实例中
  • 最后由Transformation和StreamExecutionEnvironment实例创建DataStream实例

不同之处在于,map操作最后将得到的PhysicalTransformation实例添加到StreamExecutionEnvironment实例中的transformations集合中去了。这点差异其实和Transformation实例表示的含义有关,放在文章末尾解释。

protected <R> SingleOutputStreamOperator<R> doTransform(
        String operatorName,
        TypeInformation<R> outTypeInfo,
        StreamOperatorFactory<R> operatorFactory) {
    // ...
    OneInputTransformation<T, R> resultTransform =
            new OneInputTransformation<>(
                    this.transformation,
                    operatorName,
                    operatorFactory,
                    outTypeInfo,
                    environment.getParallelism());
    SingleOutputStreamOperator<R> returnStream =
            new SingleOutputStreamOperator(environment, resultTransform);

    // 区别:添加Transformation到StreamExecutionEnvironment中
    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

但并不是全部的DataStream转化操作都需要经历上述将Function实例封装成AbstractUdfStreamOperator实例,然后将AbstractUdfStreamOperator实例封装到PhysicalTransformation实例的过程。如示例代码中的keyBy和sum操作。其中keyBy并未直接涉及Function,而sum操作直接将得到的SumAggregator函数实例封装到了ReduceTransformation实例中,然后由ReduceTransformation实例得到DataStream实例。

5.2. Transformation

DataStream面向开发者,而Transformation面向flink内核。
每个DataStream实例中都包含一个Transformation实例,表示当前Datastream从上游的DataStream使用该Transformation而来。而所有DataStream中Transformation又都添加到了StreamExecutionEnvironment实例中的transformations集合中去,用于接下来的StreamGraph实例的生成。
Transformation中记录了上游的数据来源,但其并关心数据的物理来源、序列化、转发等问题。

Transformatio是顶层抽象类,有众多的子类,涵盖了DataStream的所有转换,其直接子类如下,可以分为两大类

  • PhysicalTransformation,将会转换成后续graph中节点信息
  • 非PhysicalTransformation,将会转换成后续graph中的边信息

在这里插入图片描述
Transformation中属性如下所示,其中Optional<SlotSharingGroup>表示共享槽位信息,只有开启了允许共享槽位后,该属性才会被设置值。

其构造方法如下,除name外还需要输出类型和并行度两个参数。

public Transformation(String name, TypeInformation<T> outputType, int parallelism) {
    this.id = getNewNodeId();
    this.name = Preconditions.checkNotNull(name);
    this.outputType = outputType;
    this.parallelism = parallelism;
    this.slotSharingGroup = Optional.empty();
}

PhysicalTransformation仅在其父类的基础上增加了设置ChainingStrategy的方法,用于表示生成算子链的策略。

@Internal
public abstract class PhysicalTransformation<T> extends Transformation<T> {
    PhysicalTransformation(String name, TypeInformation<T> outputType, int parallelism) {
        super(name, outputType, parallelism);
    }

    /** Sets the chaining strategy of this {@code Transformation}. */
    public abstract void setChainingStrategy(ChainingStrategy strategy);
}

PhysicalTransformation中有众多的实现子类,全部子类继承关系如下。

在这里插入图片描述
其中以下几个子类出场频率相对更高一些,其他子类只有我们的业务逻辑比较复杂时才会用到。

  • LegacySourceTransformation 表示Source的Transformation
  • LegacySinkTransformation 表示Sink的Transformation
  • SourceTransformation
  • SinkTransformation
  • OneInputTransformation 表示单个输入流的Transformation,如常见的map、flatMap、fliter等
  • TwoInputTransformation 表示两个输入流的Transformation,如concat

疑问:为什么Source和Sink都各自分别有两个Transformation子类?
通过名称也可以看出一些端倪,新老两种实现。
在1.14版本之前,分别通过env.addSource(SourceFunction)DataStream.addSink(SinkFunction)方法生成source和sink
从1.14版本开始新增了env.fromSource(Source)DataStream.sinkTo(Sink)的方式生成source和sink。
新旧方法中入参类型不同,因此导致了两种不同的Transformation实现,从各自的实现类中也可以体现这一点,如下所示。

public class LegacySourceTransformation<T> extends PhysicalTransformation<T>
        implements WithBoundedness {
    // sourceFunction的引用
    private final StreamOperatorFactory<T> operatorFactory;
    // ...
}

public class SourceTransformation<OUT, SplitT extends SourceSplit, EnumChkT>
        extends PhysicalTransformation<OUT> implements WithBoundedness {
    // source的引用
    private final Source<OUT, SplitT, EnumChkT> source;
    // ...
}

public class LegacySinkTransformation<T> extends PhysicalTransformation<T> {

    private final Transformation<T> input;
    // sinkFunction的引用
    private final StreamOperatorFactory<Object> operatorFactory;
    // ...
}

public class SinkTransformation<InputT, OutputT> extends PhysicalTransformation<OutputT> {
    private final DataStream<InputT> inputStream;
    // sink的引用
    private final Sink<InputT> sink;
    private final Transformation<InputT> input;
    // ...
}

Source作为整个数据流的头部,不存在上游,因此其Transformation实现中没有上游Transformation的引用,除此之外其余的Transformation子类中,均持有一个表示上游Transformation的引用,如上述sink中的input属性。

最后解释下,前面提到的为什么没有将表示Source的DataStream中的Transformation加入到env中表示Transformation的集合中,而接下来的转化中,将对应的Transformation加入到了env中。因为Source作为数据源的头部,不会存在上游,而Source作为其他DataSteam的上游,一定会加入到其Transformation的input中,因此没必要单独将Source的transformation加入到env中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1797015.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【设计模式深度剖析】【4】【行为型】【策略模式】

文章目录 策略模式定义英文原话直译 角色类图策略接口Strategy&#xff1a;具体策略类上下文类Context测试类 策略模式的应用策略模式的优点策略模式的缺点策略模式的使用场景 策略模式 策略模式&#xff08;Strategy Pattern&#xff09; Strategy策略也称作Policy政策。 想…

快速搭建rtsp server(Ubuntu)

在现代视频监控和实时视频流媒体应用中&#xff0c;实时流协议&#xff08;RTSP&#xff09;服务器扮演着至关重要的角色。无论是家庭安防系统、企业级监控还是流媒体服务&#xff0c;RTSP服务器都能提供高效、稳定的解决方案。然而&#xff0c;对于许多初学者或开发者来说&…

单轮对话和多轮对话

参考&#xff1a;数据集对应关系说明 - 千帆大模型平台 | 百度智能云文档 (baidu.com) 什么是单轮对话 单轮对话和多轮对话是两种不同的对话形式&#xff0c;它们分别指的是在一次对话中只涉及一个问题和对应的回答&#xff0c;以及在一次对话中涉及多个问题和对应的回答。 …

【JMeter接口测试工具】第二节.JMeter基本功能介绍(上)【入门篇】

文章目录 前言一、获取所有学院信息接口执行二、线程组的介绍 2.1 并发和顺序执行 2.2 优先和最后执行线程组 2.3 线程组的设置细节三、HTTP请求的介绍四、查看结果树的配置使用总结 前言 一、获取所有学院信息接口执行 我们先针对一条简单的接口进行执行&#…

【Python报错】已解决AttributeError: ‘method‘ object has no attribute ‘xxx‘

解决Python报错&#xff1a;AttributeError: ‘method’ object has no attribute ‘xxx’ 在Python中&#xff0c;AttributeError通常表明你试图访问的对象没有你请求的属性或方法。如果你遇到了AttributeError: method object has no attribute xxx的错误&#xff0c;这通常意…

Mintegral数据洞察:全球中轻度游戏市场与创意更新频率

基于2024年3月大盘数据&#xff0c;汇量科技数据研究中心发现&#xff0c;超休闲品类仍是投流中轻度手游的中流砥柱。而投流力度较大的其他细分品类里&#xff0c;可以看到棋牌、模拟经营、非4X策略以及合成X游戏的身影&#xff0c;这些品类是近年来经常出现融合玩法的新兴赛道…

算法课程笔记——可撤销并查集

算法课程笔记——可撤销并查集 Gv

(学习笔记)数据基建-数据质量

数据基建-数据质量 数据质量数据质量保障措施如何推动上下游开展数据质量活动数据质量保障如何量化产出数据质量思考全链路数据质量保障项目 数据质量 概念&#xff1a;数据质量&#xff0c;意如其名&#xff0c;就是数据的准确性&#xff0c;他是数据仓库的基石&#xff0c;控…

【Java】static 类方法中注意事项

static 类方法中注意事项 目录 代码示例&#xff1a; package suziguang_d4_staticNote;public class Student {public int score 66;public static String name "zhangsan";// 1.类方法中可以直接访问类的成员&#xff0c;不可以直接访问实例成员public static v…

Unity Vuforia

首先在unity2019版本里可以在windows->PackageManager里搜Vuforia EngineAR; &#xff08;unity2021版本里搜不到&#xff09; 在官网注册账号&#xff1a; 添加识别图等&#xff1b; 将导出的unitypackage包导入unity中。 unity里导入package之后&#xff0c;新建场景&am…

【SpringBoot + Vue 尚庭公寓实战】房间支付方式管理接口实现(三)

【SpringBoot Vue 尚庭公寓实战】房间支付方式管理接口实现&#xff08;三&#xff09; 文章目录 【SpringBoot Vue 尚庭公寓实战】房间支付方式管理接口实现&#xff08;三&#xff09;1、查询全部支付方式列表2、保存或更新支付方式3、根据ID删除支付方式 房间支付方式管理…

C++设计模式——Adapter适配器模式

一&#xff0c;适配器模式简介 适配器模式是一种结构型设计模式&#xff0c;用于将已有接口转换为调用者所期望的另一种接口。 适配器模式让特定的API接口可以适配多种场景。例如&#xff0c;现有一个名为"Reader()"的API接口只能解析txt格式的文件&#xff0c;给这…

CF1553F Pairwise Modulo

#include<bits/stdc.h> #define int long long using namespace std; int n,s,ss,ma,l,r,a[300005],b[300005],c[300005]; //b 记录个数 //c 记录a[i]*k void insert(int x) {int yx;while(x<ma) b[x],x(x&-x); } void insert1(int x,int y) {while(x<ma) c[x]…

掌控数据流:深入解析 Java Stream 编程

Java 8 引入了一种新的抽象称为流&#xff08;Stream&#xff09;&#xff0c;它可以让你以一种声明的方式处理数据。Java 8 Stream API 可以极大提高 Java 程序员的生产力&#xff0c;使代码更简洁&#xff0c;更易读&#xff0c;并利用多核架构进行外部迭代。这里将详细介绍 …

电商核心技术系列58:电商平台的智能数据分析与业务洞察

相关系列文章 电商技术揭秘相关系列文章合集&#xff08;1&#xff09; 电商技术揭秘相关系列文章合集&#xff08;2&#xff09; 电商技术揭秘相关系列文章合集&#xff08;3&#xff09; 电商核心技术揭秘56&#xff1a;客户关系管理与忠诚度提升 电商核心技术揭秘57:数…

【python进阶】python图形化编程之美--tkinter模块初探

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

SwiftUI中GeometryReader与GeometryProxy的理解与使用

SwiftUI中的GeometryReader是一个视图&#xff0c;使用它我们可以很容易地访问父视图的大小和位置&#xff0c;并使用这些信息来创建一个响应式布局&#xff0c;以适应不同的设备和方向。 在本文中&#xff0c;我们将探索使用GeometryReader的好处&#xff0c;并提供一些如何在…

iPhone 16 Pro 将打破智能手机上最窄边框的记录

iPhone 16 Pro 据悉&#xff0c;苹果即将发布的 iPhone 16 Pro 将拥有令人瞩目的超窄边框设计&#xff0c;这一创新将超越目前市场上所有智能手机的边框宽度&#xff0c;甚至相较于其前代产品 iPhone 15 Pro 而言也更为出色。 根据多方消息透露&#xff0c;虽然整体设计变化…

打造新引擎,迈向数智金融新未来

数智技术正在全面赋能金融机构转型升级以及促进金融与实体经济的加速融合&#xff0c;已呈现出金融机构数智化经营加速、产业 数字金融深度融合、数字技术驱动绿色金融发展、金融信创成果涌现、金融机构加快数字化组织管理变革等行业趋势。 根据银行业协会调研&#xff0c;78%…

HTML静态网页成品作业(HTML+CSS)—— 节日母亲节介绍网页(5个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有5个页面。 二、作品演示 三、代…