SparkLaunch提交Spark任务到Yarn集群

news2025/1/8 6:19:56

SparkLaunch提交任务

    • 1.提交Spark任务的方式
    • 2.SparkLaunch 官方接口
    • 3.任务提交流程及实战


1.提交Spark任务的方式

  1. 通过Spark-submit 提交任务
  2. 通过Yarn REST Api提交Spark任务
  3. 通过Spark Client Api 的方式提交任务
  4. 通过SparkLaunch 自带API提交任务
  5. 基于Livy的方式提交任务,可参考我的另一篇文章 Apache Livy 安装部署使用示例

上面的几种方式提交任务各自有对应的优缺点,不再进行赘述,下面要介绍的是通过SparkLaunch 的方式提交到集群中,并异步获取任务的执行状态进行更新到运行记录表中,从而实现Saprk任务的提交和状态获取。


2.SparkLaunch 官方接口

通过官方文档可以了解到SaprkLaunch 对应的方法:SparkLaunch
SparkLaunch 主要有两个接口:

  1. SparkAppHandle 主要负责Spark任务提交到集群上面
  2. SparkAppHandle.Listerner 主要是用来监控Spark的运行状态

在这里插入图片描述

可以查看SparkLaunch 类对应的方法主要用到的方法如下所示:


  //设置配置文件地址
  public SparkLauncher setPropertiesFile(String path);

  //设置App 名称
  public SparkLauncher setAppName(String appName);
  
  //设置 Master
  public SparkLauncher setMaster(String master);

  //设置 部署模式
  public SparkLauncher setDeployMode(String mode) ;

  //设置 Jar包运行主类
  public SparkLauncher setMainClass(String mainClass);

  //设置 Spark 相关参数,需要以spark. 开头
  public SparkLauncher addSparkArg(String name, String value);

  //设置 Main函数的参数
  public SparkLauncher addAppArgs(String... args);

  // 启动Saprk任务的提交
  public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) 
  。。。。。。

上面的一些方法主要用于设置Spark所需要的相关参数,比如资源相关参数、Jar包相关参数、部署模式等,调用startApplication 方法会镇长的取创建一个任务提交的实现,通过下面的两个方法就能够使得任务可以正常提交到Yarn 上面

 sparkLauncher.startApplication(new SparkAppHandle.Listener() {
 	  
 	  // 任务运行状态改变的时候触发的操作
      @Override
      public void stateChanged(SparkAppHandle handle) {

      }
      
 	  // 日志状态出现改变的时候触发的操作
      @Override
      public void infoChanged(SparkAppHandle handle) {

      }
  });

3.任务提交流程及实战

总结上面的方法可以得到任务提交的总体流程:

  1. 创建SparkLaunch
  2. 初始化相关参数信息、资源参数、配置参数、Jar包参数
  3. 调用startApplication 启动任务的提交
  4. 调用stateChanged 捕获状态信息的改变并作出相应的操作

接下来给出完整的任务提交的相关伪代码:

    /**
     * 发起任务提交
     *
     * @param sparkApplicationParam Spark 相关配置参数
     * @param otherConfigParams     其他配置参数
     * @param mainParams            main 方法配置参数
     */
    private void launch(DmpRunInfo dmpRunInfo, SparkApplicationParam sparkApplicationParam, Map<String,
            String> otherConfigParams, String[] mainParams) {
        // 初始化SparkLauncher
        SparkLauncher launcher = new SparkLauncher()
                .setSparkHome(sparkApplicationParam.getSparkHome())
                .setAppResource(sparkApplicationParam.getMainJarPath())
                .setMainClass(sparkApplicationParam.getMainClass())
                .setMaster(sparkApplicationParam.getMaster())
                .setDeployMode(sparkApplicationParam.getDeployMode())
                .setAppName(sparkApplicationParam.getAppName())
                .setConf("spark.driver.memeory", sparkApplicationParam.getDriverMemory())
                .setConf("spark.executor.memory", sparkApplicationParam.getExecutorMemory())
                .setConf("spark.executor.cores", sparkApplicationParam.getExecutorCores())
                // spark.yarn.archive 配置的HDFS地址
                .setConf("spark.yarn.archive", SparkParamConstants.SPARK_YARN_ARCHIVE)
                .setConf("spark.yarn.queue", SparkParamConstants.SPARK_PARAM_YARN_QUEUE)
                .setVerbose(true);
        // 禁用输出到本地日志方式
        // .redirectError(new File(otherConfigParams.get("SPARK_ERROR_LOG_DIR")))
        // .redirectOutput(new File(otherConfigParams.get("SPARK_OUT_LOG_DIR")))

        /**
         * 设置其他的参数时候需要使用[spark.] 开头的key ,否则spark 解析不出来
         */
        if (otherConfigParams != null && otherConfigParams.size() > 0) {
            logger.info("开始设置spark job 运行参数");
            for (Map.Entry<String, String> conf : otherConfigParams.entrySet()) {
                logger.info("{}:{}", conf.getKey(), conf.getValue());
                launcher.setConf(conf.getKey(), conf.getValue());
            }
        }

        if (mainParams.length != 0) {
            logger.info("开始设置Spark Job Main 方法的参数 {}", Arrays.toString(mainParams));
            launcher.addAppArgs(mainParams);
        }
        logger.info("参数设置完成,开始提交Spark任务");
        // 线程池的方式运行任务
        executor.execute(() -> {
            try {

                // 线程计数
                CountDownLatch countDownLatch = new CountDownLatch(1);
                SparkAppHandle sparkAppHandle = launcher.startApplication(new SparkAppHandle.Listener() {
                    @Override
                    public void stateChanged(SparkAppHandle handle) {
                        // 修改运行状态
                        。。。。。。。。。

                        if (handle.getAppId() != null) {
                            // 设置运行ID 到运行记录中
                            logger.info("{} stateChanged :{}", handle.getAppId(), handle.getState().toString());
                        } else {
                            logger.info("stateChanged :{}", handle.getState().toString());
                        }
                        // 更新状态
                        。。。。。。。。
                        // 失败告警发送到群功能
                        if (SparkAppHandle.State.FAILED.toString().equals(handle.getState().toString())) {
                            // 失败告警
                            。。。。。。。。。。。。
                        }

                        // Job 状态完成之后退出线程
                        if (handle.getState().isFinal()) {
                            countDownLatch.countDown();
                        }
                    }

                    @Override
                    public void infoChanged(SparkAppHandle handle) {
                       // do something

                    }
                });

                logger.info("The task is executing, current is get application id before,please wait ........");
                String applicationId = null;
                while (!SparkAppHandle.State.RUNNING.equals(sparkAppHandle.getState())) {
                    applicationId = sparkAppHandle.getAppId();
                    if (applicationId != null) {
                        logger.warn("handle current state is {}, appid is {}",
                                sparkAppHandle.getState().toString(), applicationId);
                        break;
                    }
                }
                logger.warn("handle current state is {}, appid is {}",
                        sparkAppHandle.getState().toString(), applicationId);
                countDownLatch.await();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        });
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/134836.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深拷贝浅拷贝的区别?如何实现一个深拷贝

一、数据类型存储 前面文章我们讲到&#xff0c;JavaScript中存在两大数据类型&#xff1a; 基本类型引用类型 基本类型数据保存在在栈内存中 引用类型数据保存在堆内存中&#xff0c;引用数据类型的变量是一个指向堆内存中实际对象的引用&#xff0c;存在栈中 二、浅拷贝…

【2】SCI易中期刊推荐——遥感图像领域(2区)

🚀🚀🚀NEW!!!SCI易中期刊推荐栏目来啦 ~ 📚🍀 SCI即《科学引文索引》(Science Citation Index, SCI),是1961年由美国科学信息研究所(Institute for Scientific Information, ISI)创办的文献检索工具,创始人是美国著名情报专家尤金加菲尔德(Eugene Garfield…

2022年最有开创性的10篇AI论文总结

2022年随着聊天GPT和Mid - journey和Dall-E等图像生成器的流行&#xff0c;我们看到了整个人工智能领域的重大进展。在人工智能和计算机科学的时代&#xff0c;这是令人振奋的一年。本文我们总结了在2022年发表的最具开创性的10篇论文&#xff0c;无论如何你都应该看看。 1、Al…

Apache Calcite初识

Calcite原理和代码讲解(一) https://blog.csdn.net/qq_35494772/article/details/118887267quickstart:Apache Calcite精简入门与学习指导 https://blog.51cto.com/xpleaf/2639844quickstart:多源数据的关联 csv和mem数据类型 https://cloud.tencent.com/developer/article/162…

【Javassist】快速入门系列14 使用Javassist导入包路径

系列文章目录 01 在方法体的开头或结尾插入代码 02 使用Javassist实现方法执行时间统计 03 使用Javassist实现方法异常处理 04 使用Javassist更改整个方法体 05 当有指定方法调用时替换方法调用的内容 06 当有构造方法调用时替换方法调用的内容 07 当检测到字段被访问时使用语…

CSS复习(一)

CSS复习1.前言2. CSS介绍2.1 CSS的引入方式2.2 选择器2.2 颜色的赋值方式3. 补充4.display4.1 盒子模型4.1.1 盒子模型之宽高盒子模型之外边距盒子模型之边框盒子模型之内边距4.2 文本问题1.前言 首先补充一下部分相关知识&#xff1a; 分区标签自身没有显示效果&#xff0c;…

【算法】kmp、Trie、并查集、堆

文章目录1.kmp2.Trie3.并查集4.堆1.kmp KMP 的精髓就是 next 数组&#xff1a;也就是用 next[j] k;简单理解就是&#xff1a;来保存子串某个位置匹配失败后&#xff0c;回退的位置。 给定一个字符串 S&#xff0c;以及一个模式串 P&#xff0c;所有字符串中只包含大小写英文字…

大文件上传如何做断点续传

大文件上传如何做断点续传 一、是什么 不管怎样简单的需求&#xff0c;在量级达到一定层次时&#xff0c;都会变得异常复杂 文件上传简单&#xff0c;文件变大就复杂 上传大文件时&#xff0c;以下几个变量会影响我们的用户体验 服务器处理数据的能力请求超时网络波动 上…

信息安全3——数字签名和认证

1 &#xff09;签名&#xff1a;手写签名是被签文件的物理组成部分&#xff0c;而数字签名不是被签消息的物理部分&#xff0c;因而需要将签名连接到被签消息上。 2 &#xff09;验证&#xff1a;手写签名是通过将它与其它真实的签名进行比较来验证而数字签名是利用已经公开的验…

年终总结(我心飞翔向)

2022 年度个人总结&#xff08;自由向&#xff09; 前奏 其实在2021年12月底考研前就回家了&#xff0c;回家做毕设。他们考研的那几天回了中北&#xff0c;参加了党支部会议&#xff0c;见证了一批同学的转预转正&#xff1b;收拾了一大波衣服&#xff0c;因为我已经提前想到…

Git(三) - Git 常用命令

一、设置用户签名 说明&#xff1a; 签名的作用是区分不团操作者身份。用户的签名信息在每一个版本的提交信息中能够看到&#xff0c;以此确认本次提交是谁做的。GIT 首次安装必须设置一下用户签名&#xff0c;否则无法提交代码。 注意&#xff1a; 这里设置用户前面和将来登录…

微机原理真题2019年,错题整理

目录 2019年 填空 编程 1​编辑 2 3 练习册的题 2019年 1&#xff1a;在计算机中能够在一组信息中取出所需要的一部分信息的器件是&#xff08;&#xff09; A:触发器 B:寄存器 C:译码器 D:锁存器 2&#xff1a;宏汇编程序中一般由3个段组成&#xff0c;这三…

FreeRTOS实验使用01

1&#xff1a;vTaskList的使用 我使用的时候&#xff0c;如果把pcWriteBuff定义在任务中&#xff0c;程序会卡死&#xff0c;不信你可以尝试一下&#xff0c;所以我就把pcWriteBuff定义到了全局中&#xff0c;才能使用 2&#xff1a;队列问题 场景&#xff1a;创建3个格子的队…

搜狗 workflow异步调度框架(二)HTTP客户端

1.避免进程提前终止 由于任务的启动是异步的&#xff0c;所以任务的执行和主线程的执行是并行的&#xff0c;如果不加任何的控制&#xff0c;那么当主线程执行完所有操作以后直接退出&#xff0c;并且导致整个进程的终止。 WFFacilities::WaitGroup 可以根据情况阻塞线程或者恢…

DDR3 数据传输 (六)

引言 前文链接: DDR3 数据传输 (一) DDR3 数据传输 (二) DDR3 数据传输 (三) DDR3 数据传输 (四) DDR3 数据传输 (五) 本文在前文设计的基础上,给出板级验证。<

Spring Boot MongoDB 入门

1. 概述 2. 快速入门 3. 基于方法名查询 4. 基于 Example 查询 5. MongoTemplate 6. 自增主键 666. 彩蛋 1. 概述 可能有一些胖友对 MongoDB 不是很了解&#xff0c;这里我们引用一段介绍&#xff1a; FROM 《分布式文档存储数据库 MongoDB》 MongoDB 是一个介于关系数据…

《计算机视觉》:角点检测与图像匹配

文章目录 任务一:基本处理-Harris角点检测原理代码结果与分析任务二:SIFT算法原理代码结果与分析任务一:基本处理-Harris角点检测 数据:棋盘图片 要求:自己写函数实现Harris角点检测子,设置不同参数,比较检测结果 边缘检测子:sobel检测子 响应函数参数alpha:0.05 参数…

【JavaScript】BOM 概念及相关操作

文章目录【JavaScript】BOM 概念及相关操作一. BOM概念BOM可以操作的内容二.window内置对象和属性(1) 获取浏览器窗口的尺寸(2) 获取文档窗口的尺寸(3) 浏览器的常见事件(4) 浏览器的历史记录(5) 浏览器的标签页(6) 浏览器卷去的尺寸(7) 浏览器滚动到的位置浏览器滚动到的位置案…

ARM 按键轮询编程实战

一、什么是按键 1、按键的物理特性 平时没人按的时候&#xff0c;弹簧把按键按钮弹开。此时内部断开的。有人按下的时候&#xff0c;手的力量克服弹簧的弹力&#xff0c;将按钮按下&#xff0c;此时内部保持接通&#xff08;闭合&#xff09;状态&#xff1b;如果手拿开&…

【应急响应】 - Windows 排查分析

Windows 分析排查1. 文件分析1.1 开机启动文件1.2 temp 临时异常文件1.3 浏览器信息分析1.4 文件时间属性分析1.5 最近打开文件分析2. 进程分析2.1 可疑进程发现与关闭3. 系统信息3.1 windows 计划任务3.2 隐藏账户与发现3.2.1 隐藏账号的建立3.2.2 隐藏账号的删除3.3 补丁查看…