Flink_DataStreamAPI_执行环境

news2025/1/22 12:47:12

DataStreamAPI_执行环境

  • 1创建执行环境
    • 1.1getExecutionEnvironment
    • 1.2createLocalEnvironment
    • 1.3createRemoteEnvironment
  • 2执行模式(Execution Mode)
  • 3触发程序执行

Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系。

在这里插入图片描述

1创建执行环境

我们要获取的执行环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。

1.1getExecutionEnvironment

最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。这种方式,用起来简单高效,是最常用的一种创建执行环境的方式。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

1.2createLocalEnvironment

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

1.3createRemoteEnvironment

这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。

StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
  		.createRemoteEnvironment(
    		"host",                   // JobManager主机名
    		1234,                     // JobManager进程端口号
   			"path/to/jarFile.jar"  // 提交给JobManager的JAR包
		);

2执行模式(Execution Mode)

从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。

// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream API执行模式包括:流执行模式、批执行模式和自动模式。

流执行模式(Streaming)
这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。

批执行模式(Batch)
专门用于批处理的执行模式。

自动模式(AutoMatic)
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
批执行模式的使用。主要有两种方式:
(1)通过命令行配置
在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。

bin/flink run -Dexecution.runtime-mode=BATCH ...

(2)通过代码配置
在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。
实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

3触发程序执行

需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。
所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

env.execute();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242777.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Cyberchef配合Wireshark提取并解析HTTP/TLS流量数据包中的文件

本文将介绍一种手动的轻量级的方式,还原HTTP/TLS协议中传输的文件,为流量数据包中的文件分析提供帮助。 如果捕获的数据包中存在非文本类文件,例如png,jpg等图片文件,或者word,Excel等office文件异或是其他类型的二进…

Golang云原生项目:—实现ping操作

熟悉报文结构 ICMP校验和算法: 报文内容,相邻两个字节拼接到一起组成一个16bit数,将这些数累加求和若长度为奇数,则将剩余一个字节,也累加求和得出总和之后,将和值的高16位与低16位不断求和,直…

基于STM32 HAL库的FFT计算与数学运算:幅值、频率、均方根、平均值、最大值、最小值、峰峰值与标准差

一、用STM32进行FFT计算与数学运算的过程 1. 信号采集 首先,我们需要使用STM32的ADC模块来采集模拟信号,比如三相交流电。ADC将模拟信号(如电压或电流)转换为数字信号,供后续处理。 采样数量:FFT的计算通…

关于Github报错Verify your two-factor authentication (2FA) settings的解决方案

如果我们在使用GitHub出现2FA验证问题:Verify your two-factor authentication (2FA) settings,那么可以参考下面的解决方法解决问题。 当然,如果有国外的手机号直接使用验证码接收就可以,问题是不支持中国手机啊。那么怎么办呢&…

【机器学习chp2】贝叶斯最优分类器、概率密度函数的参数估计、朴素贝叶斯分类器、高斯判别分析。万字超详细分析总结与思考

前言,请先看。 本文的《一》《二》属于两个单独的知识点:共轭先验和Laplace平滑,主要因为他们在本文的后续部分经常使用,又因为他们是本人的知识盲点,所以先对这两个知识进行了分析,后续内容按照标题中的顺…

游戏引擎学习第16天

视频参考:https://www.bilibili.com/video/BV1mEUCY8EiC/ 这些字幕讨论了编译器警告的概念以及如何在编译过程中启用和处理警告。以下是字幕的内容摘要: 警告的定义:警告是编译器用来告诉你某些地方可能存在问题,尽管编译器不强制要求你修复…

01.防火墙概述

防火墙概述 防火墙概述1. 防火墙的分类2. Linux 防火墙的基本认识3. netfilter 中五个勾子函数和报文流向 防火墙概述 防火墙( FireWall ):隔离功能,工作在网络或主机边缘,对进出网络或主机的数据包基于一定的 规则检…

express 从0-1如何创建一个项目 注册接口

内容参考: windos下安装mysql express 使用mysql 一、创建一个空项目 二、创建一个包管理工具 npm init -y三、安装需要的插件及app.js的部分实现 npm i express 安装express 框架 npm i cors 安装cors 用于跨域 npm install mysql2 安装mysql数据库 npm i b…

Shell基础(4)

声明! 学习视频来自B站up主 **泷羽sec** 有兴趣的师傅可以关注一下,如涉及侵权马上删除文章,笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其他均与本人以及泷羽sec团…

(长期更新)《零基础入门 ArcGIS(ArcMap) 》实验一(下)----空间数据的编辑与处理(超超超详细!!!)

续上篇博客(长期更新)《零基础入门 ArcGIS(ArcMap) 》实验一(上)----空间数据的编辑与处理(超超超详细!!!)-CSDN博客 继续更新 本篇博客内容为道路拓扑检查与修正&#x…

Python防检测之鼠标移动轨迹算法

一.简介 鼠标轨迹算法是一种模拟人类鼠标操作的程序,它能够模拟出自然而真实的鼠标移动路径。 鼠标轨迹算法的底层实现采用C/C语言,原因在于C/C提供了高性能的执行能力和直接访问操作系统底层资源的能力。 鼠标轨迹算法具有以下优势: 模拟…

3D编辑器教程:如何实现3D模型多材质定制效果?

想要实现下图这样的产品DIY定制效果,该如何实现? 可以使用51建模网线上3D编辑器的材质替换功能,为产品3D模型每个部位添加多套材质贴图,从而让3D模型在展示时实现DIY定制效果。 具体操作流程如下: 第1步:上…

Qt按钮类-->day09

按钮基类 QAbstractButton 标题与图标 // 参数text的内容显示到按钮上 void QAbstractButton::setText(const QString &text); // 得到按钮上显示的文本内容, 函数的返回就是 QString QAbstractButton::text() const;// 得到按钮设置的图标 QIcon icon() const; // 给按钮…

Cellebrite VS IOS18Rebooting

Cellebrite VS IOS18Rebooting我们想分享一些有关 iOS 18 重启“功能”的信息。在过去一周左右的时间里,人们对 iOS 18 中一项新的未记录功能产生了极大关注,该功能会导致设备在一段时间不活动后重新启动。 这意味着,如果设备在一定时间不活…

【Linux】:进程信号(详谈信号捕捉 OS 运行)

✨ 来去都是自由风,该相逢的人总会相逢 🌏 📃个人主页:island1314 🔥个人专栏:Linux—登神长阶 ⛺️ 欢迎关注:👍点赞…

视觉SLAM相机——单目相机、双目相机、深度相机

一、单目相机 只使用一个摄像头进行SLAM的做法称为单目SLAM,这种传感器的结构特别简单,成本特别低,单目相机的数据:照片。照片本质上是拍摄某个场景在相机的成像平面上留下的一个投影。它以二维的形式记录了三维的世界。这个过程中…

MongoDB在现代Web开发中的应用

💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 MongoDB在现代Web开发中的应用 MongoDB在现代Web开发中的应用 MongoDB在现代Web开发中的应用 引言 MongoDB 概述 定义与原理 发展…

OceanBase 分区表详解

1、分区表的定义 在OceanBase数据库中,普通的表数据可以根据预设的规则被分割并存储到不同的数据区块中,同一区块的数据是在一个物理存储上。这样被分区块的表被称为分区表,而其中的每一个独立的数据区块则被称为一个分区。 如下图所示&…

Linux(CentOS 7) yum一键安装mysql8

1、通过yum安装 (1)下载mysql 在Linux找个地方输入以下命令 wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm (2)安装mysql yum 仓库配置文件 [rootVM-8-15-centos ~]# sudo rpm -Uvh mysql80-c…

记一次预览USB摄像头并获取实时回调数据的过程(UVCAndroid集成)

背景 主工程是gradle4.8 jdk1.8 启用jetifier要接入的usb摄像头的库是UVCAndroid gradle8.7 jdk17 接入过程 看了下setCallbackActivity非常适合我们的需求,而且回调后的数据是RGB888,看到demo中用到了xml若干于是想到用aar打包,整个过程也…