flink StreamGraph 构造flink任务

news2024/11/20 17:11:57

文章目录

      • 背景
      • 主要步骤
      • 代码

背景

通常使用flink 提供的高级算子来编写flink 任务,对底层不是很了解,尤其是如何生成作业图的细节
下面通过构造一个有向无环图,来实际看一下

主要步骤

1.增加source
2.增加operator
3. 增加一条边,连接source和operator
4. 增加sink
5. 增加一条边,连接operator和sink

代码

 // Step 1: Create basic configurations
        Configuration configuration = new Configuration();
        ExecutionConfig executionConfig = new ExecutionConfig();
        CheckpointConfig checkpointConfig = new CheckpointConfig();
        SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();

        // Step 2: Create a new StreamGraph instance
        StreamGraph streamGraph = new StreamGraph(configuration, executionConfig, checkpointConfig, savepointRestoreSettings);

        // Step 3: Add a source operator

        GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + index;
        DataGeneratorSource<String> source = new DataGeneratorSource<>(generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.STRING);
        SourceOperatorFactory<String> sourceOperatorFactory = new SourceOperatorFactory<>(source, WatermarkStrategy.noWatermarks());
        streamGraph.addSource(1, "sourceNode", "sourceDescription", sourceOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sourceSlot");

        // Step 4: Add a map operator to transform the data
        StreamMap<String, String> mapOperator = new StreamMap<>(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });
        SimpleOperatorFactory<String> mapOperatorFactory = SimpleOperatorFactory.of(mapOperator);
        streamGraph.addOperator(2, "mapNode", "mapDescription", mapOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "mapSlot");

        // Step 5: Connect source and map operator
        streamGraph.addEdge(1, 2, 0);

        // Step 6: Add a sink operator to consume the data
        StreamMap<String, String> sinkOperator = new StreamMap<>(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                System.out.println(value);
                return value;
            }
        });
        SimpleOperatorFactory<String> sinkOperatorFactory = SimpleOperatorFactory.of(sinkOperator);
        streamGraph.addSink(3, "sinkNode", "sinkDescription", sinkOperatorFactory, TypeInformation.of(String.class), TypeInformation.of(String.class), "sinkSlot");

        // Step 7: Connect map and sink operator
        streamGraph.addEdge(2, 3, 0);
        streamGraph.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        streamGraph.setMaxParallelism(1,1);
        streamGraph.setMaxParallelism(2,1);
        streamGraph.setMaxParallelism(3,1);
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);


        // Step 8: Convert StreamGraph to JobGraph
        JobGraph jobGraph = streamGraph.getJobGraph();


        // Step 9: Set up a MiniCluster for local execution
        MiniClusterConfiguration miniClusterConfig = new MiniClusterConfiguration.Builder()
                .setNumTaskManagers(10)
                .setNumSlotsPerTaskManager(10)
                .build();
        MiniCluster miniCluster = new MiniCluster(miniClusterConfig);

        // Step 10: Start the MiniCluster
        miniCluster.start();

        // Step 11: Submit the job to the MiniCluster
        JobExecutionResult result = miniCluster.executeJobBlocking(jobGraph);
        System.out.println("Job completed with result: " + result);

        // Step 12: Stop the MiniCluster
        miniCluster.close();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2244153.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Misc_01转二维码(不是二进制)

例题ctfhub/隐写v2.0 打开是一张图片 文件分离得到zip&#xff0c;爆破密码得到7878 打开得到0和1&#xff0c; !!!不是二进制转图片&#xff0c;直接是二维码 缩小能看到 000000000000000000000000000000000000000000000000000000000000000000000 000000000000000000000000…

AI工具百宝箱|任意选择与Chatgpt、gemini、Claude等主流模型聊天的Anychat,等你来体验!

文章推荐 AI工具百宝箱&#xff5c;使用Deep Live Cam&#xff0c;上传一张照片就可以实现实时视频换脸...简直太逆天&#xff01; Anychat 这是一款可以与任何模型聊天 &#xff08;chatgpt、gemini、perplexity、claude、metal llama、grok 等&#xff09;的应用。 在页面…

[论文阅读] 异常检测综述 Deep Learning for Anomaly Detection: A Review(一)

深度学习在异常检测中的应用&#xff1a;综述 摘要 异常检测&#xff0c;又称离群点检测或新奇性检测&#xff0c;在各个研究领域中数十年来一直是一个持续且活跃的研究领域。仍然存在一些独特的问题复杂性和挑战&#xff0c;需要先进的方法来解决。近年来&#xff0c;基于深…

PaddlePaddle 开源产业级文档印章识别PaddleX-Pipeline “seal_recognition”模型 开箱即用篇(一)

AI时代到来&#xff0c;各行各业都在追求细分领域垂直类深度学习模型&#xff0c;今天给大家介绍一个PaddlePaddle旗下&#xff0c;基于PaddleX Pipeline 来完成印章识别的模型“seal_recognition”。 官方地址&#xff1a;https://github.com/PaddlePaddle/PaddleX/blob/relea…

Ubuntu 22.04 上快速搭建 Samba 文件共享服务器

Samba 简介 Samba 是一个开源软件&#xff0c;它扮演着不同操作系统间沟通的桥梁。通过实现 SMB&#xff08;Server Message Block&#xff09;协议&#xff0c;Samba 让文件和打印服务在 Windows、Linux 和 macOS 之间自由流动。 以下是 Samba 的特点&#xff1a; 跨平台兼…

语义分割(semantic segmentation)

语义分割(semantic segmentation) 文章目录 语义分割(semantic segmentation)图像分割和实例分割代码实现 语义分割指将图片中的每个像素分类到对应的类别&#xff0c;语义区域的标注和预测是 像素级的&#xff0c;语义分割标注的像素级的边界框显然更加精细。应用&#xff1a…

QT基础 UI编辑器 QT5.12.3环境 C++环境

一、UI编辑器 注意&#xff1a;创建工程时&#xff0c;要勾上界面按钮 UI设计师界面的模块 UI编辑器会在项目构建目录中自动生成一个ui_xxx.h&#xff08;构建一次才能生成代码&#xff09;&#xff0c;来表示ui编辑器界面的代码&#xff0c;属于自动生成的&#xff0c;一定不…

Jmeter的后置处理器(二)

5--JSR223 PostProcessor 功能特点 自定义后处理逻辑&#xff1a;使用脚本语言编写自定义的后处理逻辑。支持多种脚本语言&#xff1a;支持 Groovy、JavaScript、BeanShell 等脚本语言。动态参数传递&#xff1a;将提取的数据存储为变量&#xff0c;供后续请求使用。灵活性高…

高阶C语言之五:(数据)文件

目录 文件名 文件类型 文件指针 文件的打开和关闭 文件打开模式 文件操作函数&#xff08;顺序&#xff09; 0、“流” 1、字符输出函数fputc 2、字符输入函数fgetc 3、字符串输出函数fputs 4、 字符串输入函数fgets 5、格式化输入函数fscanf 6、格式化输出函数fpr…

【Android、IOS、Flutter、鸿蒙、ReactNative 】实现 MVP 架构

Android Studio 版本 Android Java MVP 模式 参考 模型层 model public class User {private String email;private String password;public User(String email, String password) {this.email = email;this.password = password;}public String getEmail() {return email;}…

如何管理服务中的 “昂贵” 资源

如果接触过实际大型业务系统&#xff0c;就能体会到许多业务的正常运行都依赖于各种昂贵的第三方接口&#xff0c;调用一次都是要花元子的&#xff0c;例如 大语言模型nlp 服务&#xff1a;信息提取、分类等cv 服务&#xff1a;定位、信息提取、分类等 然而经常可能由于各种无…

蓝桥杯每日真题 - 第16天

题目&#xff1a;&#xff08;卡牌&#xff09; 题目描述&#xff08;13届 C&C B组C题&#xff09; 解题思路&#xff1a; 题目分析&#xff1a; 有 n 种卡牌&#xff0c;每种卡牌的现有数量为 a[i]&#xff0c;所需的最大数量为 b[i]&#xff0c;还有 m 张空白卡牌。 每…

CSS遮罩:mask

CSS属性 mask 允许使用者通过遮罩或者裁切特定区域的图片的方式来隐藏一个元素的部分或者全部可见区域。 // 一般用位图图片做遮罩 mask: url(~/assets/images/mask.png); mask-size: 100% 100%;// 使用 SVG 图形中的形状来做遮罩 mask: url(~/assets/images/mask.svg#star);…

Zabbix:使用CentOS 9,基于LNMP平台,源码部署Zabbix 7。

ZBX&#xff1a;源码部署Zabbix 7 一、Zabbix概述1. 什么是zabbix2. 为什么学习zabbix3. 逻辑架构3. 实验环境4. 软件下载&#xff1a; 二、安装前的系统准备工作1. 配置主机名2. 关闭防火墙3. 关闭selinux4. 配置yum源5. 配置时钟同步6. 优化系统限制7. 安装JDK 三、部署LNMP环…

5G与4G互通的桥梁:N26接口

5G的商用部署进程将是一个基于4G系统进行的长期的替换、升级、迭代的过程&#xff0c;4G系统是在过渡到5G全覆盖过程中&#xff0c;作为保障用户业务连续性体验这一目的的最好补充。 因此4G/5G融合组网&#xff0c;以及互操作技术将是各大运营商在网络演进中需要重点考虑的问题…

【计算机网络实验】之静态路由配置

【计算机网络实验】之静态路由配置 实验题目实验目的实验任务实验设备实验环境实验步骤路由器配置设置静态路由测试路由器之间的连通性配置主机PC的IP测试 实验题目 静态路由协议的配置 实验目的 熟悉路由器工作原理和机制&#xff1b;巩固静态路由理论&#xff1b;设计简单…

【专题】2024AIGC创新应用洞察报告汇总PDF洞察(附原数据表)

原文链接&#xff1a;https://tecdat.cn/?p38310 在科技日新月异的今天&#xff0c;人工智能领域正以前所未有的速度发展&#xff0c;AIGC&#xff08;人工智能生成内容&#xff09;成为其中最耀眼的明珠。从其应用场景的不断拓展&#xff0c;到对各行业的深刻变革&#xff0…

微知-动态链接库导出的三种方式?(LD_LIBRARY_PATH, /etc/ld.so.conf, -Wl,-rpath)

背景 经常需要导出动态库&#xff0c;最场景的方式是指定LD_LIBRARY_PATH。本文介绍3中 LD_LIBRARY_PATH 这种方式临时生效 export LD_LIBRARY_PATH/path/to/mylibdir:$LD_LIBRARY_PATH使用ldconfig和/etc/ld.so.conf 在配置文件 /etc/ld.so.conf 中指定动态库搜索路径。每…

Jenkins更换主题颜色+登录页面LOGO图片

默认主题和logo图片展示 默认主题黑色和白色。 默认LOGO图片 安装插件 Login ThemeMaterial Theme 系统管理–>插件管理–>Available plugins 搜不到Login Theme是因为我提前装好了 没有外网的可以参考这篇离线安装插件 验证插件并修改主题颜色 系统管理–>A…