Flink 本地启动的多种方式

news2024/9/23 20:35:44

Flink 本地启动的多种方式

在这里插入图片描述

Application模式通过代码提交到Yarn上启动

//设置Yarn客户端
YarnClient yarnClient = ;
Configuration configuration = new Configuration();
if (customConfiguration != null) {
  configuration.addAll(customConfiguration);
}
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
configuration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
// 设置flink-dist-???.jar
String distPath = ;
configuration.set(YarnConfigOptions.FLINK_DIST_JAR, distPath);
// 设置需要执行的jar包
String examplePath = ;
configuration.set(PipelineOptions.JARS, Collections.singletonList(examplePath));
FileSystem fileSystem = FileSystem.get(hadoopClusterTest.getConfig());
//设置flink lib
String dirPath = ;
// 上传flink libjar包到hdfs中
fileSystem.copyFromLocalFile(new Path(dirPath), new Path(dirPath));
configuration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(dirPath));
setIfAbsent(configuration, PipelineOptions.JARS, new ArrayList<>());
YarnConfiguration yarnConfiguration = new YarnConfiguration();
YarnClientYarnClusterInformationRetriever yarnClientYarnClusterInformationRetriever =
  YarnClientYarnClusterInformationRetriever.create(yarnClient);
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
  configuration,
  yarnConfiguration,
  yarnClient,
  yarnClientYarnClusterInformationRetriever,
  true
);
ClusterSpecification clusterSpecification = new ClusterSpecification
  .ClusterSpecificationBuilder()
  .setSlotsPerTaskManager(1)
  .createClusterSpecification();

ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(
  new String[]{},
  // 需要执行的类全名
);
try {
  // 启动ApplicationCluster
  yarnClusterDescriptor.deployApplicationCluster(clusterSpecification, applicationConfiguration);
} catch (ClusterDeploymentException e) {
  e.printStackTrace();
}

Session模式通过代码提交到Yarn上启动

public class YarnFlinkSessionTest {
    ClusterClient<ApplicationId> clusterClient;
    @Test
    void test() throws ExecutionException, InterruptedException {
        YarnClient yarnClient = //创建Yarn客户端
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY,
            MemorySize.parse("1024m"));
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
            MemorySize.parse("1024m"));
        configuration.set(YarnConfigOptions.FLINK_DIST_JAR, "${FLINK_HOME}/lib/flink-dist-1.16.2.jar");
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        YarnClientYarnClusterInformationRetriever yarnClientYarnClusterInformationRetriever =
            YarnClientYarnClusterInformationRetriever.create(yarnClient);
        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
            configuration,
            yarnConfiguration,
            yarnClient,
            yarnClientYarnClusterInformationRetriever,
            true
        );
        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
            .setMasterMemoryMB(1024)
            .setTaskManagerMemoryMB(1024)
            .setSlotsPerTaskManager(1)
            .createClusterSpecification();
        try {
            ClusterClientProvider<ApplicationId> applicationIdClusterClientProvider = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
            clusterClient = applicationIdClusterClientProvider.getClusterClient();
        } catch (ClusterDeploymentException e) {
            e.printStackTrace();
        }
        Thread.sleep(10000000);
    }
}

Flink MiniCluster 提交任务

MiniCluster在start方法中启动QueryService、RPCService、Zookeeper、BlobServer、TaskManager、DispatcherLeader、ResourceManager、DispatcherGateway、WebMonitor进行RPC通信。。

MiniCluster启动后再调用submitJob提交任务

RpcTaskManagerGateway、TaskExecutor

命令行Flink本地Standalone模式启动

运行任务:

./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

  1. 该命令会调用CliFrontend.main()方法

  2. CliFrontend.main()方法再调用内部run()方法,然后调用内部executeProgram()方法

  3. 最后CliFrontend.executeProgram()调用ClientUtils.executeProgram()方法.

  4. 最后通过StandloneSessionClusterEntrypoint的main方法启动Flink

RestServerEndpoint在执行start()方法时注册Netty的ChannelHandler,可以通过WebMonitorEndpoint查看具体的Handler类型和实现。

JobManager::onStart -> JobMaster::startJobExecution

官方文档命令行启动

yarn: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/resource-providers/yarn/

kubernetes: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/resource-providers/native_kubernetes/

standalone: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/resource-providers/standalone/overview/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2158402.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux——虚拟机网络配置

进行虚拟机网络配置是确保虚拟机能够正常访问网络、与宿主机及其他设备进行通信的关键步骤。虚拟机网络配置允许用户根据实际需求选择合适的网络模式&#xff0c;并调整网络参数以满足特定的网络环境要求。 虚拟机常见的三种网络模式包括桥接模式、NAT模式和主机模式&#xff…

Shiro rememberMe反序列化漏洞(Shiro-550) 靶场攻略

漏洞原理 Apache Shiro框架提供了记住密码的功能&#xff08;RememberMe&#xff09;&#xff0c;⽤户登录成功后会⽣成经过 加密并编码的cookie。在服务端对rememberMe的cookie值&#xff0c;先base64解码然后AES解密再反 序列化&#xff0c;就导致了反序列化RCE漏洞。 那么&a…

制作网上3D展馆需要什么技术并投入多少费用?

制作网上3D展览馆项目&#xff0c;需要考虑以下技术和预算方面的信息&#xff1a; 技术需求&#xff1a; 1、三维建模技术&#xff1a;利用3D软件&#xff08;3ds max、maya、blender、c4d等&#xff09;制作展馆和展品的3D模型 2、Web3D技术&#xff1a;如WebGL&#xff0c…

飞腾平台perf工具PMU事件集成指南

【写在前面】 飞腾开发者平台是基于飞腾自身强大的技术基础和开放能力&#xff0c;聚合行业内优秀资源而打造的。该平台覆盖了操作系统、算法、数据库、安全、平台工具、虚拟化、存储、网络、固件等多个前沿技术领域&#xff0c;包含了应用使能套件、软件仓库、软件支持、软件适…

VMware安装ubuntu24.04桌面版

一、安装推荐要求 双核2 GHz处理器或更高 4 GB系统内存 25 GB磁盘存储空间 可访问的互联网 光驱或USB安装介质 二、下载桌面系统 下载地址&#xff08;使用手机转存再下载是对作者的最大支持&#xff09;&#xff1a;夸克网盘分享 (quark.cn) 已安装的纯净版ubuntu虚拟…

CMake 构建Qt程序弹出黑色控制台

CMake 构建Qt程序弹出黑色控制台

普通查询+聚合函数的使用(8个例子,数值和字符串的比较)

目录 回顾普通查询聚合函数的使用 表数据 例子1 例子2 例子3 ​​​​​​​例子4 例子5 例子6 例子7(数值和字符串的比较) 例子8 回顾普通查询聚合函数的使用 之前我们介绍过聚合函数 --mysql分组查询 -- 聚合函数(介绍,使用),group by使用,分组聚合统计(使用,havi…

MATLAB基本语句

MATLAB语言为解释型程序设计语言。在程序中可以出现顺序、选择、循环三种基本控制结构&#xff0c;也可以出现对M-文件的调用(相当于对外部过程的调用)。 由于 MATLAB开始是用FORTRAN语言编写、后来用 C语言重写的&#xff0c;故其既有FORTRAN的特征&#xff0c;又在许多语言规…

基于JAVA+SpringBoot+Vue的医院资源管理系统

基于JAVASpringBootVue的医院资源管理系统 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末附源码下载链接&#x1f345; 哈…

基于SpringBoot的医院管理系统【附源码】

基于SpringBoot的医院管理系统&#xff08;源码L文说明文档&#xff09; 目录 4 系统设计 4.1 系统概述 4系统概要设计 4.1概述 4.2系统结构 4.3.数据库设计 4.3.1数据库实体 4.3.2数据库设计表 5系统详细实现 5.1 医生模块的实现 5.1.…

音视频入门基础:FLV专题(3)——FLV header简介

一、引言 本文对FLV格式的FLV header进行简介&#xff0c;FLV文件的开头就是FLV header。 进行简介之前&#xff0c;请各位先从《音视频入门基础&#xff1a;FLV专题&#xff08;1&#xff09;——FLV官方文档下载》下载FLV的官方文档《video_file_format_spec_v10_1.pdf》和…

猫咪掉毛背后的隐秘原因?除毛除臭宠物空气净化器双管齐下!

作为一个二胎家庭&#xff0c;两只猫咪&#xff0c;除了卖萌加倍之外&#xff0c;拉屎需要排队之外&#xff0c;家里最不缺就是毛了。作为一个名鼻炎患者真的很难顶。感受一下40度高温的养猫人&#xff0c;给掉毛怪疏毛浮毛飘飘&#xff0c;逃不过的饮水机&#xff0c;各个角落…

wpf,工具栏上,最小化按钮的实现

工具栏上&#xff0c;最小化按钮的实现。工具栏做成的是用户控件。 用户控件的xaml <Button HorizontalAlignment"Right" Height"32" Click"MinimizeClick" /> 用户控件的cs代码 private void MinimizeClick(object sender, RoutedEven…

IDEA创建Web项目(详细版)

目录 1 新建Web项目 步骤如下 1 打开idea,选择新建项目 2 点击创建 3 点击项目结构&#xff0c;选择添加模块 ---web 2 配置Tomcat 步骤如下 1 点击Edit Configurations&#xff08;编辑配置&#xff09; 1.1 右上角当前文件下 选择编辑配置 1.2 点击菜单栏中run 选…

奔驰「进退」两难

合资车企&#xff0c;尤其是BBA为代表的传统豪华品牌&#xff0c;正在进入阵痛期。 9月14日&#xff0c;奔驰在华合资公司—腾势新能源发生工商变更&#xff0c;比亚迪受让前者剩余10%股份&#xff0c;并变更为旗下全资子公司。至此&#xff0c;这个由奔驰和比亚迪在2011年成立…

Python 从入门到实战25(模块)

我们的目标是&#xff1a;通过这一套资料学习下来&#xff0c;通过熟练掌握python基础&#xff0c;然后结合经典实例、实践相结合&#xff0c;使我们完全掌握python&#xff0c;并做到独立完成项目开发的能力。 上篇文章我们讨论了类继承的相关知识。今天我们将学习一下模块的…

Linux 进程间通信(管道)

目录 一.理解进程间通信 1.进程间通信的意义 2.进程间如何实现通信呢&#xff1f; 二.匿名管道 1.匿名管道的底层原理 引用计数的应用 2.匿名管道代码实现 a.代码的整体框架 b.写接口 c.读接口 d.子进程资源回收 3.匿名管道的官方接口 4.*匿名管道四种情况和五种特…

Leetcode 2246. 相邻字符不同的最长路径(一般树)树形dp C++实现

问题&#xff1a;Leetcode 2246. 相邻字符不同的最长路径 给你一棵 树&#xff08;即一个连通、无向、无环图&#xff09;&#xff0c;根节点是节点 0 &#xff0c;这棵树由编号从 0 到 n - 1 的 n 个节点组成。用下标从 0 开始、长度为 n 的数组 parent 来表示这棵树&#x…

数据结构——顺序表、链表

目录 前言 一&#xff0c;数据结构 1&#xff0c;什么是数据结构&#xff1f; 2&#xff0c;有什么类型&#xff1f; 二&#xff0c;顺序表 1&#xff0c;线性表 2&#xff0c;顺序表基本结构 3&#xff0c;动态顺序表的功能实现 三&#xff0c;链表 1&#xff0c;链…

Go 1.19.4 路径和目录-Day 15

1. 路径介绍 存储设备保存着数据&#xff0c;但是得有一种方便的模式让用户可以定位资源位置&#xff0c;操作系统采用一种路径字符 串的表达方式&#xff0c;这是一棵倒置的层级目录树&#xff0c;从根开始。 相对路径&#xff1a;不是以根目录开始的路径&#xff0c;例如 a/b…