flink on yarn with kerberos 边缘提交

news2024/11/20 12:24:01

flink on yarn 带kerberos 远程提交 实现

  1. flink kerberos 配置
    在这里插入图片描述
  2. 先使用ugi进行一次认证
  3. 正常提交
import com.google.common.io.Files;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.cli.GenericCLI;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.MalformedURLException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkNotNull;


/**
* @author: jiayeli.cn
* @description
* @date: 2023/8/29 下午9:09
*/

@Slf4j
public class YarnClientTestCase {

   @Test
   public void submitJobWithYarnDesc() throws ClusterDeploymentException, IOException {
       // hadoop
       String hadoopConfDir = "/x/x/software/spark-3.3.2-bin-hadoop3/etc/hadoop";
       //flink的本地配置目录,为了得到flink的配置
       String flinkConfDir = "/opt/flink-1.14.3/conf";
       //存放flink集群相关的jar包目录
       String flinkLibs = "hdfs://node01:8020/lib/flink";
       //用户jar
       String userJarPath =  "hdfs://node01:8020/jobs/streaming/testCase/TopSpeedWindowing.jar";
       String flinkDistJar = "hdfs://node01:8020/lib/flink/flink-dist_2.12-1.14.3.jar";
       String[] args = "".split("\\s+");
       String appMainClass = "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing";
       String principal = "dev@JIAYELI.COM";
       String keyTab = "/x/x/workspace/bigdata/sparkLauncherTestcase/src/test/resource/dev_uer.keytab";

       enableKrb5(principal, keyTab);
       YarnClient yarnClient = YarnClient.createYarnClient();
       YarnConfiguration yarnConfiguration = new YarnConfiguration();
       Optional.ofNullable(hadoopConfDir)
           .map(e -> new File(e))
           .filter(dir -> dir.exists())
           .map(File::listFiles)
           .ifPresent(files -> {
               Arrays.asList(files).stream()
                       .filter(file -> Files.getFileExtension(file.getName()).equals(".xml"))
                       .forEach(conf -> yarnConfiguration.addResource(conf.getPath()));
           });

       yarnClient.init(yarnConfiguration);
       yarnClient.start();

       Configuration flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir);
       //set run model
       flinkConf.setString(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
       //set application name
       flinkConf.setString(YarnConfigOptions.APPLICATION_NAME, "onYarnApiSubmitCase");
       //flink on yarn dependency
       flinkConf.set(YarnConfigOptions.PROVIDED_LIB_DIRS, Collections.singletonList(new Path(flinkLibs).toString()));
       flinkConf.set(YarnConfigOptions.FLINK_DIST_JAR, flinkDistJar);
       flinkConf.set(PipelineOptions.JARS, Collections.singletonList(new Path(userJarPath).toString()));
       //设置:资源/并发度
       flinkConf.setInteger(CoreOptions.DEFAULT_PARALLELISM, 1);
       flinkConf.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1G"));
       flinkConf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1G"));
       flinkConf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);


       ClusterSpecification clusterSpecification = new ClusterSpecification
               .ClusterSpecificationBuilder()
               .setMasterMemoryMB(1024)
               .setTaskManagerMemoryMB(1024)
               .setSlotsPerTaskManager(2)
               .createClusterSpecification();

       YarnClusterInformationRetriever ycir = YarnClientYarnClusterInformationRetriever.create(yarnClient);

       YarnConfiguration yarnConf = (YarnConfiguration) yarnClient.getConfig();

       ApplicationConfiguration appConfig = new ApplicationConfiguration(args, appMainClass);

       YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
               flinkConf,
               yarnConf,
               yarnClient,
               ycir,
               false);

       ClusterClientProvider<ApplicationId> applicationCluster =
               yarnClusterDescriptor.deployApplicationCluster( clusterSpecification, appConfig );

       yarnClient.stop();

   }

   private void enableKrb5(String principal, String keyTab) throws IOException {
     System.setProperty("java.security.krb5.conf", "/x/x/Documents/kerberos/krb5.conf");

       org.apache.hadoop.conf.Configuration krb5conf = new org.apache.hadoop.conf.Configuration();


       String krb5ConfPath = "/x/x/Documents/kerberos/krb5.conf";

       krb5conf.set("hadoop.security.authentication", "kerberos");

       //      UserGroupInformation.setConfiguration(conf)
       UserGroupInformation.setConfiguration(krb5conf);

       // 登录Kerberos并获取UserGroupInformation实例
       UserGroupInformation.loginUserFromKeytab(principal, keyTab);
       UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

       log.debug(ugi.toString());
   }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/945673.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Gorilla LLM:连接海量 API 的大型语言模型

如果你对这篇文章感兴趣&#xff0c;而且你想要了解更多关于AI领域的实战技巧&#xff0c;可以关注「技术狂潮AI」公众号。在这里&#xff0c;你可以看到最新最热的AIGC领域的干货文章和案例实战教程。 一、前言 在当今这个数字化时代&#xff0c;大型语言模型&#xff08;LLM…

史上最全AP、mAP详解与代码实现

文章目录 前言一、mAP原理1、mAP概念2、准确率3、精确率4、召回率5、AP: Average Precision 二、mAP0.5与mAP0.5:0.951、mAP0.52、mAP0.5:0.95 三、mAP代码实现1、真实标签json文件格式2、模型预测标签json文件格式3、mAP代码实现4、mAP结果显示 四、模型集成mAP代码1、模型mai…

项目:点餐系统3mysql知识回顾MySQL客户端

连接数据库 mysql -uroot -p 密码&#xff1a;空 一、第三方库&#xff1a;MySQL 数据库-存储并管理数据的仓库&#xff0c;是一个C/S架构 MySQL客户端通过sql来告诉MySQL服务器&#xff0c;自己需要做什么操作 1.sql语句 sql&#xff1a;structure query language结构化查询…

LeetCode 44题:通配符匹配

题目 给你一个输入字符串 (s) 和一个字符模式 (p) &#xff0c;请你实现一个支持 ? 和 * 匹配规则的通配符匹配&#xff1a; ? 可以匹配任何单个字符。* 可以匹配任意字符序列&#xff08;包括空字符序列&#xff09;。 判定匹配成功的充要条件是&#xff1a;字符模式必须…

AI识别工人安全绳佩戴检测算法

AI识别工人安全绳佩戴检测算法通过yolov5智能图像识别算法对现场图像进行处理和分析&#xff0c;AI识别工人安全绳佩戴检测算法识别出工人是否佩戴安全绳&#xff0c;一旦发现工人未佩戴安全绳&#xff0c;AI识别工人安全绳佩戴检测算法将立即进行告警&#xff0c;并将事件记录…

SOLIDWORKS中多实体文件到装配体的转换技巧

我们在做机械等工程设计中&#xff0c;有时为了节省时间&#xff0c;需要把多实体的“零件”&#xff0c;直接转换为装配体&#xff0c;不再另外装配&#xff0c;这样能大大简化设计的操作时间&#xff0c;复杂程度。 在这里&#xff0c;我们首先要了解&#xff0c;SOLIDWORKS文…

2023常见前端面试题

以下是一些2023年秋招常见的前端面试题及其答案&#xff1a; 1. 请解释一下什么是前端开发&#xff1f; 前端开发是指使用HTML、CSS和JavaScript等技术来构建网页和用户界面的过程。前端开发人员负责将设计师提供的视觉设计转化为可交互的网页&#xff0c;并确保网页在不同设备…

【活体检测模型】活体检测思路推演

ref:https://arxiv.org/pdf/1611.05431.pdf https://github.com/miraclewkf/ResNeXt-PyTorch 用分类的思想做活体检测&#xff0c;要求准确的分出正负样本&#xff0c;否则&#xff0c;支付宝被别人用了&#xff0c;问题就很严重。 大部分的商用场景还是 摇摇头、张张口&#x…

Android——基本控件(下)(二十)

1. 树型组件&#xff1a;ExpandableListView 1.1 知识点 &#xff08;1&#xff09;掌握树型组件的定义&#xff1b; &#xff08;2&#xff09;可以使用事件对树操作进行监听。 2. 具体内容 既然这个组件可以完成列表的功能&#xff0c;肯定就需要一个可以操作的数据&…

Java集合sort排序报错UnsupportedOperationException处理

文章目录 报错场景排查解决UnmodifiableList类介绍 报错场景 我们使用的是PostgreSQL数据库&#xff0c;存储业务数据&#xff0c;业务代码使用的是Spring JPA我们做的是智慧交通信控平台&#xff0c;有个功能是查询展示区域的交通态势&#xff0c;需要按照不同维度排序展示区…

pngPacker与TexturePacker打包对比,手把手教你使用pngPacker

pngPacker与TexturePacker打包对比&#xff0c;手把手教你使用pngPacker打包出媲美texturepacker的效果 pngPacker是一款免费的图片打包工具&#xff0c;软件小巧易用&#xff0c;主流游戏图片格式&#xff0c;如 bmp,jpg,png 可以打包为 png 大图&#xff0c;采用命令行格,前一…

数字乡镇综合解决方案PPT

导读&#xff1a;原文《数字乡镇综合解决方案PPT》&#xff08;获取来源见文尾&#xff09;&#xff0c;本文精选其中精华及架构部分&#xff0c;逻辑清晰、内容完整&#xff0c;为快速形成售前方案提供参考。 部分内容&#xff1a; 喜欢文章&#xff0c;您可以关注评论转发本…

网络编程 http 相关基础概念

文章目录 表单是什么http请求是什么http请求的结构和说明关于http方法 GET和POST区别http常见状态码http响应http 请求是无状态的含义html是什么 &#xff08;前端内容&#xff0c;了解即可&#xff09;html 常见标签 &#xff08;前端内容&#xff0c;了解即可&#xff09;关于…

哇塞不!赛博时代云上自动化辅导孩子学习。

背景 孩子天天上网看动画片&#xff0c;都幼儿园大班了还不会100以内的加减法&#xff0c;因为我平时还需要忙着工作不能天天陪着孩子。 这次我们整个活&#xff0c;让孩子每天心甘情愿的做100道题。而且电脑还要给孩子一些提醒&#xff0c;即使做错了也会不厌其烦的给孩子提示…

图像颜色空间转换

目录 1.图像颜色空间介绍 RGB 颜色空间 2.HSV 颜色空间 3.RGBA 颜色空间 2.图像数据类型间的互相转换convertTo() 3.不同颜色空间互相转换cvtColor() 4.Android JNI demo 1.图像颜色空间介绍 RGB 颜色空间 RGB 颜色空间是最常见的颜色表示方式之一&#xff0c;其中 R、…

【C++】—— 异常处理

前言&#xff1a; 本期&#xff0c;我将给大家讲解的是有关 异常处理 的相关知识&#xff01; 目录 &#xff08;一&#xff09;C语言传统的处理错误的方式 &#xff08;二&#xff09;C异常概念 &#xff08;三&#xff09;异常的使用 1、异常的抛出和捕获 1️⃣ 异常的…

Windows Qt 5.12.10下载与安装

Qt 入门实战教程&#xff08;目录&#xff09; C自学精简实践教程 目录(必读) 1 Qt5.12.10下载 qt-opensource-windows-x86-5.12.10.exe 官方离线安装包 Download Source Package Offline Installers | Qt 下载巨慢&#xff08;也可能很快&#xff09; 只能下载到最新的&…

C语言——类型转换

数据有不同的类型&#xff0c;不同类型数据之间进行混合运算时涉及到类型的转换问题。 转换的方法有两种&#xff1a; 自动转换(隐式转换)&#xff1a;遵循一定的规则&#xff0c;由编译系统自动完成强制类型转换&#xff1a;把表达式的运算结果强制转换成所需的数据类型 语法格…

自动化测试(三):接口自动化pytest测试框架

文章目录 1. 接口自动化的实现2. 知识要点及实践2.1 requests.post传递的参数本质2.2 pytest单元测试框架2.2.1 pytest框架简介2.2.2 pytest装饰器2.2.3 断言、allure测试报告2.2.4 接口关联、封装改进YAML动态传参&#xff08;热加载&#xff09; 2.3 pytest接口封装&#xff…

20 MySQL(下)

文章目录 视图视图是什么定义视图查看视图删除视图视图的作用 事务事务的使用 索引查询索引创建索引删除索引聚集索引和非聚集索引影响 账户管理&#xff08;了解非DBA&#xff09;授予权限 与 账户的相关操作 MySQL的主从配置 视图 视图是什么 通俗的讲&#xff0c;视图就是…