【API篇】一、执行环境API

news2025/4/26 4:36:31

文章目录

  • 0、认识
  • 1、创建执行环境
  • 2、执行模式
  • 3、触发程序执行
  • 4、关于executeAsync方法

0、认识

DataStream API是Flink的核心层API。一个Flink程序,其实就是对数据源DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:

在这里插入图片描述

后面章节,分别对每一个环节的API做整理。

1、创建执行环境

Flink的程序可以在各种上下文环境中运行,比如本地JVM中执行,也可以提交到远程集群中运行,分别对应着不同的Flink的运行环境,获取这个执行环境,也就是StreamExecutionEnvironment类的对象。

获取方式一:

StreamExecutionEnvironment.getExecutionEnvironment()

调用静态方法getExecutionEnvironment,会根据当前运行的上下文直接得到正确的结果:

  • 如果程序是独立运行的,就返回一个本地执行环境
  • 如果是命令行+jar包提交到集群执行,就返回集群的执行环境

重点:这个静态方法根据当前运行方式,自行决定并返回一个适配的运行环境。 getExecutionEnvironment方法还可以传一个flink包下的Confiruration对象,用于改一些默认的配置,比如端口8081

Configuration conf = new Configuration();

conf.set(RestOptions.BIND_PORT, "8082");

此时,控制台就该访问localhost:8082

获取方式二:

StreamExecutionEnvironment.createLocalEnvironment()

返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

//并行度为3
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment(3);  

获取方式三:

这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host",1234,"path/to/jarFile.jar");
// 参数一:JobManager主机名
// 参数二:JobManager进程端口号
// 参数三:提交给JobManager的JAR包

最后,不管用哪个方法,拿到执行环境对象后,还可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制等

env.setParallelism(2);

2、执行模式

从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理,不再使用DataSet API。

// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream API执行模式包括:

  • 流执行模式(Streaming):用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式
  • 批执行模式(Batch):专门用于批处理的执行模式
  • 自动模式(AutoMatic):由程序根据输入数据源是否有界,来自动选择是流处理还是批处理执行

执行模式选择,可以通过命令行方式配置:

//BATCH
bin/flink run -Dexecution.runtime-mode=BATCH ...

也可以通过代码配置:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

前者用的更多,不推荐硬编码。

3、触发程序执行

写完输出(sink)操作并不代表程序已经结束,因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中,这时并没有真正处理数据,因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为延迟执行懒执行

env.execute();

所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后才返回一个执行结果(JobExecutionResult)

4、关于executeAsync方法

  • 正常情况下,一个execute方法执行,触发一个Flink Job。

  • 一个main方法里也可以调用多个execute,但没意义,因为execute()方法将一直等待作业完成,肯定阻塞后面的

  • env.executeAsync方法,可以异步触发,不会阻塞后面的任务

public static void main(String[] args){

	...
	executeAsync();   //触发第一个Job
	
	//job2...(一般不这么写)
	
	....
	executeAsync();  //第二个Job产生
}


  • 一个main方法里executeAsync方法的调用次数,等于生成的flink job的个数
  • 同样的,再Yarn-Application集群,提交一次,当调用n次executeAsync,就有n个Job,对应在JobManager里,就有n个JobMaster

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1086969.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

早安心语|不委屈不将就,让生活充满仪式感

1、让自己的生活多一种可能,给自己的未来多一份惊喜,人生所有的机会和惊喜,都是在你全力以赴的道路上遇到的。 2、推开自己喜欢的人叫成长,留住自己喜欢的人叫本事,总有人嫌你不够好,也有人觉得你哪都好&am…

【RKNN】YOLO V5中pytorch2onnx,pytorch和onnx模型输出不一致,精度降低

在yolo v5训练的模型,转onnx,再转rknn后,测试发现: rknn模型,量化与非量化,相较于pytorch模型,测试精度都有降低onnx模型,相较于pytorch模型,测试精度也有降低&#xff…

The Foundry Nuke 15视频后期合成和特效制作Mac软件

Nuke 15 是一款专业的合成软件,主要用于电影、电视和广告制作中的后期合成和特效制作。 Nuke 15 提供了强大的合成工具和功能,可以对多个图像、视频和3D元素进行无缝融合和合成。它支持多通道图像处理,能够处理高动态范围(HDR&…

算法通关村第18关【青铜】| 回溯

回溯算法是一种解决组合优化问题和搜索问题的算法。它通过尝试各种可能的选择来找到问题的解决方案。回溯算法通常用于问题的解空间非常大,而传统的穷举法会导致计算时间爆炸的情况。回溯算法可以帮助限制搜索空间,以提高效率。 回溯算法的核心思想是在…

ARM作业2

.设置按键中断,按键1按下,LED亮,再按一次,灭 按键2按下,蜂鸣器响。再按一次,不响 按键3按下,风扇转,再按一次,风扇停 头文件key_it.h #ifndef __KEY_IT_H__ #define …

没有炫光的台灯有哪些?2023五款优秀护眼台灯

很多家长有时候会说孩子觉得家里的台灯灯光刺眼,看书看久了就不舒服。这不仅要看光线亮度是否柔和,还要考虑台灯是不是有做遮光式设计。没有遮光式设计的台灯,光源外露,灯光会直射孩子头部,孩子视线较低,很…

【小米技术分享】面试题:什么是乐观锁?你是如何设计一个乐观锁

大家好,我是小米。今天我们来聊一下面试中常见的一个问题:“什么是乐观锁?你是如何设计一个乐观锁?”作为一位热爱技术的程序员,对于这个问题,我有着自己独特的理解和实践经验。接下来,我将以通…

flink1.15 savepoint 超时报错 java.util.concurrent.TimeoutException

savepoint命令 flink savepoint e04813d4e7480c526912eb4d32bba510 hdfs://flink/flink/migration/savepoint56650 -Dyarn.application.id=application_1683808492336_1222报错内容 org.apache.flink.util.FlinkException: Triggering a savepoint for the job e04813d4e7480…

通讯网关软件023——利用CommGate X2HTTP实现HTTP访问Modbus TCP

本文介绍利用CommGate X2HTTP实现HTTP访问Modbus TCP。CommGate X2HTTP是宁波科安网信开发的网关软件,软件可以登录到网信智汇(http://wangxinzhihui.com)下载。 【案例】如下图所示,SCADA系统上位机、PLC、设备具备Modbus RTU通讯接口,现在…

【计算机网络】——前言计算机网络发展的历程概述

主页点击直达:个人主页 我的小仓库:代码仓库 C语言偷着笑:C语言专栏 数据结构挨打小记:初阶数据结构专栏 Linux被操作记:Linux专栏 LeetCode刷题掉发记:LeetCode刷题 算法:算法专栏 C头…

Python编程必备:掌握列表遍历的6种神级技巧!

更多资料获取 📚 个人网站:涛哥聊Python 遍历列表是Python中最常见的任务之一,因为列表是一种非常常用的数据结构,它用于存储一组项目。 在编程中,经常需要对这些项目进行操作,例如查找特定元素&#xff…

【LeetCode】2.两数相加

目录 1 题目2 答案2.1 我写的(不对)2.2 更正 3 问题 1 题目 给你两个 非空 的链表,表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的,并且每个节点只能存储 一位 数字。 请你将两个数相加,并以相同形式返…

如何创建自定义前端组件?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

淘宝历史价格数据接口,淘宝商品历史价格接口,淘宝API接口

淘宝历史价格数据接口采集方法如下: 找到适当的淘宝API接口,该接口需要一个淘宝客的授权码才能使用。使用一个开发工具,比如Python编程语言和Requests库来调用该API接口并获取数据。在调用API时需要指定开始时间和结束时间来获取特定商品在时…

快速搭建Linux网站,并利用内网穿透实现宝塔面板的公网访问【内网穿透】

文章目录 前言1. 环境安装2. 安装cpolar内网穿透3. 内网穿透4. 固定http地址5. 配置二级子域名6. 创建一个测试页面 前言 宝塔面板作为简单好用的服务器运维管理面板,它支持Linux/Windows系统,我们可用它来一键配置LAMP/LNMP环境、网站、数据库、FTP等&…

DataGridView-----datagridviewcomboboxcolumn

今天项目中需要实现下拉框展示数据。同时我们需要进行对应的数据绑定和保存。以下是常见的两种方式。 1. 绑定数据库数据源 2. 直接输入的list集合。 3. 出现的问题 问题:使用datagridviewcomboboxcolumn时,默认第一次以及将鼠标点击到当前行时&#…

ipad有必要用手写笔吗?性价比电容笔排行榜

随着技术的进步,各种新型的数字电子产品不断涌现。比如说,智能手机、ipad、电容笔之类的东西。但事实上,要将iPad的功能发挥到极致,我认为,这款电容笔,就必不可少的了。这就好像我们在ipad平板上书写东西&a…

QT运行界面与画布大小不一致问题(一步到位)

QT运行界面与画布大小不一致问题 出现的问题直接设置环境变量main函数中输入以下代码更改系统缩放比好了,看一下运行结果吧 出现的问题 当我们运行程序时,发现运行出来的大小和设计的几面大小有很大的差别,这使我们开发起来就特别的困难&…

【设计模式】七、适配器模式

文章目录 现实生活中的适配器例子基本介绍工作原理举例:类适配器模式类适配器模式介绍类适配器模式应用实例 举例:对象适配器模式基本思路对象适配器模式应用实例 举例:接口适配器模式接口适配器模式应用实例适配器模式在 SpringMVC 框架应用…

GPT-4V的图片识别和分析能力

GPT-4V是OpenAI开发的大型语言模型,是GPT-4的升级版本。GPT-4V在以下几个方面进行了改进: 模型规模更大:GPT-4V的参数量达到了1.37T,是GPT-4的10倍。训练数据更丰富:GPT-4V的训练数据包括了1.56T的文本和代码数据。算…