Spark---资源、任务调度

news2025/1/11 10:54:52

一、Spark资源调度源码

1、Spark资源调度源码过程

Spark资源调度源码是在Driver启动之后注册Application完成后开始的。Spark资源调度主要就是Spark集群如何给当前提交的Spark application在Worker资源节点上划分资源。Spark资源调度源码在Master.scala类中的schedule()中进行的。

2、Spark资源调度源码结论

  1. Executor在集群中分散启动,有利于task计算的数据本地化。
  2. 默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存。
  3. 如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项。
  4. 默认情况下没有设置--total-executor-cores,一个Application会使用Spark集群中所有的cores。
  5. 启动Executor不仅和core有关还和内存有关。

3、资源调度源码结论验证

使用Spark-submit提交任务演示。也可以使用spark-shell来验证。

1、默认情况每个worker为当前的Application启动一个Executor,这个Executor使用集群中所有的cores和1G内存。

./spark-submit 
--master spark://node1:7077
 --class org.apache.spark.examples.SparkPi
 ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

2、在workr上启动多个Executor,设置--executor-cores参数指定每个executor使用的core数量。

./spark-submit
 --master  spark://node1:7077
 --executor-cores 1 
 --class org.apache.spark.examples.SparkPi 
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

3、内存不足的情况下启动core的情况。Spark启动是不仅看core配置参数,也要看配置的core的内存是否够用。

./spark-submit 
--master  spark://node1:7077 
--executor-cores 1  
--executor-memory 3g 
--class org.apache.spark.examples.SparkPi
 ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

--total-executor-cores集群中共使用多少cores

注意:一个进程不能让集群多个节点共同启动。

./spark-submit 
--master  spark://node1:7077 
--executor-cores 1  
--executor-memory 2g 
--total-executor-cores 3
--class org.apache.spark.examples.SparkPi
 ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

二、Spark任务调度源码

Spark任务调度源码是从Spark Application的一个Action算子开始的。action算子开始执行,会调用RDD的一系列触发job的逻辑。其中也有stage的划分过程:

三、Spark二次排序和分组取topN

1、二次排序

大数据中很多排序场景是需要先根据一列进行排序,如果当前列数据相同,再对其他某列进行排序的场景,这就是二次排序场景。例如:要找出网站活跃的前10名用户,活跃用户的评测标准就是用户在当前季度中登录网站的天数最多,如果某些用户在当前季度登录网站的天数相同,那么再比较这些用户的当前登录网站的时长进行排序,找出活跃用户。这就是一个典型的二次排序场景。

解决二次排序问题可以采用封装对象的方式,对象中实现对应的比较方法。

1.SparkConf sparkConf = new SparkConf()
2..setMaster("local")
3..setAppName("SecondarySortTest");
4.final JavaSparkContext sc = new JavaSparkContext(sparkConf);
5.
6.JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");
7.
8.JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {
9.
10.  /**
11.   * 
12.  */
13.  private static final long serialVersionUID = 1L;
14.
15.  @Override
16.  public Tuple2<SecondSortKey, String> call(String line) throws Exception {
17.    String[] splited = line.split(" ");
18.    int first = Integer.valueOf(splited[0]);
19.    int second = Integer.valueOf(splited[1]);
20.    SecondSortKey secondSortKey = new SecondSortKey(first,second);
21.    return new Tuple2<SecondSortKey, String>(secondSortKey,line);
22.  }
23.});
24.
25.pairSecondRDD.sortByKey(false).foreach(new 
26.VoidFunction<Tuple2<SecondSortKey,String>>() {
27.
28.  /**
29.   * 
30.   */
31.  private static final long serialVersionUID = 1L;
32.
33.    @Override
34.    public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {
35.      System.out.println(tuple._2);
36.  }
37.});
38.
39.
40.
41.public class SecondSortKey implements Serializable,Comparable<SecondSortKey>{
42.  /**
43.   * 
44.   */
45.  private static final long serialVersionUID = 1L;
46.  private int first;
47.  private int second;
48.  public int getFirst() {
49.    return first;
50.  }
51.  public void setFirst(int first) {
52.    this.first = first;
53.  }
54.  public int getSecond() {
55.    return second;
56.  }
57.  public void setSecond(int second) {
58.    this.second = second;
59.  }
60.  public SecondSortKey(int first, int second) {
61.    super();
62.    this.first = first;
63.    this.second = second;
64.  }
65.  @Override
66.  public int compareTo(SecondSortKey o1) {
67.    if(getFirst() - o1.getFirst() ==0 ){
68.      return getSecond() - o1.getSecond();
69.    }else{
70.      return getFirst() - o1.getFirst();
71.    }
72.  }
73.}

2、分组取topN

大数据中按照某个Key进行分组,找出每个组内数据的topN时,这种情况就是分组取topN问题。

解决分组取TopN问题有两种方式,第一种就是直接分组,对分组内的数据进行排序处理。第二种方式就是直接使用定长数组的方式解决分组取topN问题。

1.SparkConf conf = new SparkConf()
2..setMaster("local")
3..setAppName("TopOps");
4.JavaSparkContext sc = new JavaSparkContext(conf);
5.JavaRDD<String> linesRDD = sc.textFile("scores.txt");
6.
7.JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
8.
9.  /**
10.   * 
11.  */
12.  private static final long serialVersionUID = 1L;
13.
14.  @Override
15.  public Tuple2<String, Integer> call(String str) throws Exception {
16.    String[] splited = str.split("\t");
17.    String clazzName = splited[0];
18.    Integer score = Integer.valueOf(splited[1]);
19.    return new Tuple2<String, Integer> (clazzName,score);
20.  }
21.});
22.
23.pairRDD.groupByKey().foreach(new 
24.VoidFunction<Tuple2<String,Iterable<Integer>>>() {
25.
26.  /**
27.   * 
28.   */
29.  private static final long serialVersionUID = 1L;
30.
31.  @Override
32.  public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
33.    String clazzName = tuple._1;
34.    Iterator<Integer> iterator = tuple._2.iterator();
35.
36.    Integer[] top3 = new Integer[3];
37.
38.    while (iterator.hasNext()) {
39.      Integer score = iterator.next();
40.
41.      for (int i = 0; i < top3.length; i++) {
42.        if(top3[i] == null){
43.          top3[i] = score;
44.          break;
45.        }else if(score > top3[i]){
46.        for (int j = 2; j > i; j--) {
47.          top3[j] = top3[j-1];
48.        }
49.        top3[i] = score;
50.        break;
51.      }
52.    }
53.  }
54.  System.out.println("class Name:"+clazzName);
55.  for(Integer sscore : top3){
56.    System.out.println(sscore);
57.  }
58.}
59.});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1265037.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

揭开 BFC 的神秘面纱:前端开发必知必会

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

【多线程】-- 05 Lambda表达式

多线程 4 Lambda表达式 λ是希腊字母表中排序第十一位的字母&#xff0c;英语名称为Lambda是为了避免匿名内部类定义过多实质属于函数式编程的概念 为什么要使用Lambda表达式&#xff1f; 避免匿名内部类定义过多可以让代码看起来很简洁去掉了一堆没有意义的代码&#xff0…

【LeetCode】128. 最长连续序列——哈希的应用(3)

文章目录 1、思路2、解题方法3、复杂度时间复杂度:空间复杂度: 4、Code Problem: 128. 最长连续序列 1、思路 我会用一种做题者的思路来去看待这道题。 我们在乍一看到这道题的时候&#xff0c;看到它的时间复杂度要求为O(N)&#xff0c;然后又要求去找序列(就是让你判断这个…

高频Latex公式速查表,写论文技术博客不愁了

常见上下标X_{2}X^{2}\hat{X}\bar{X}\frac{1}{X}常见希腊字母\alpha \beta \gamma \delta \varepsilon \eta \theta \rho \sigma \phi \varphi \omega常见数学符号\leq \geq \neq\approx 其他\sum \prod \int \bigoplus \forall \exists \times \setminus \bigotimes \bigodot …

五分钟 k8s 实战-应用探针

Probe.png 今天进入 kubernetes 的运维部分&#xff08;并不是运维 kubernetes&#xff0c;而是运维应用&#xff09;&#xff0c;其实日常我们大部分使用 kubernetes 的功能就是以往运维的工作&#xff0c;现在云原生将运维和研发关系变得更紧密了。 今天主要讲解 Probe 探针相…

集成IDE开发环境,Java开发工具IntelliJ IDEA 2023中文

IntelliJ IDEA 2023是一款功能强大的软件&#xff0c;其为程序员提供了一款先进的集成开发环境。它以智能、高效和人性化为主要特点&#xff0c;致力于提高开发人员的生产力&#xff0c;帮助程序员更快、更好地编写代码。IntelliJ IDEA 2023支持多种语言和框架&#xff0c;包括…

iOS 通用链接的配置(Universal Links)

一、打开Associated Domains 1.首先登录 苹果开发者网站 2.Certificates, Identifiers & Profiles 下的Identifiers 找到要配追的Identifiers 点进去 3.打开Associated Domains然后保存 二、更新Profile文件 如果我们使用自动的&#xff0c;可以忽略这一步&#xff0c;…

泛微E-Office SQL注入漏洞复现

0x01 产品简介 泛微E-Office是一款标准化的协同 OA 办公软件&#xff0c;泛微协同办公产品系列成员之一,实行通用化产品设计&#xff0c;充分贴合企业管理需求&#xff0c;本着简洁易用、高效智能的原则&#xff0c;为企业快速打造移动化、无纸化、数字化的办公平台。 0x02 漏…

00.本地搭建 threejs 文档网站(网页版是外网比较慢)

three官网 https://threejs.org/ 下载代码 进入官网 可以选择github去下载 或者 下载压缩包 github 下载https链接地址 https://github.com/mrdoob/three.js.git git clone https://github.com/mrdoob/three.js.git安装依赖启动程序 安装依赖 npm i 或者 pnpm i 或者 …

通过git上传文件到github仓库

一、新建github仓库 访问github官网&#xff1a;GitHub: Let’s build from here GitHub 点击个人头像&#xff0c;在右侧栏选择Your repositories。 点击New&#xff0c;新建一个github仓库。 创建Repository name仓库名&#xff0c;如果这个仓库名已经创建过的话&#xff…

开始使用Spring Boot Admin吧-使用Nacos注册SBA

什么是 Spring Boot Admin&#xff08;SBA&#xff09;? Spring Boot Admin 是 codecentric 公司开发的一款开源社区项目&#xff0c;目标是让用户更方便的管理以及监控 Spring Boot 应用。 应用可以通过我们的Spring Boot Admin客户端&#xff08;通过HTTP的方式&#xff0…

Vue项目的创建、运行与端口号修改

前言&#xff1a;Vue-cli是Vue官方提供的一个脚手架&#xff0c;用于快速生成一个Vue的项目模板&#xff0c;依赖于NodeJS环境 NodeJS下载&#xff1a;NodeJS安装下载 Vue-cli下载&#xff1a;Vue-cli下载 一.Vue图形化创建项目 1.建立一个文件夹&#xff0c;保存Vue项目 2.在该…

华为P40无法链接adb的解决记录

真的很讨厌华为的设备&#xff0c;很多东西啥设备都能跑得好好的&#xff0c;就华为会出问题&#xff0c;简直就是手机界的IE。 情况&#xff1a;突然无法链接adb到P40&#xff0c;拔插无效&#xff0c;关闭开发人员选项再打开也无效&#xff0c;撤销USB调试授权也无效&#x…

xcode opencv

1、导入报错 Undefined symbols: linker command failed with exit code 1 (use -v to see invocation) 直接添加如下图内容即可

ArkTS-WebView内嵌H5页面

鸿蒙开发使用WebView内嵌H5页面 访问在线网页时需添加网络权限&#xff1a;ohos.permission.INTERNET module.json5文件配置 {"module" : {"requestPermissions":[{"name": "ohos.permission.INTERNET"}]} }踩坑日记 加载网页效果无法…

python-爬虫(可直接使用)

爬虫&#xff08;Web Scraping&#xff09;是指通过编程自动化地获取互联网上的信息的过程。爬虫的目的通常是从网页中抓取数据&#xff0c;进行数据分析、处理或展示。以下是爬虫的基本流程和一些重要的概念&#xff1a; 爬虫基本流程&#xff1a; 确定目标&#xff1a; 确定要…

React 之 airbnb - 项目实战

一、开发前言 1. 规范 2. 创建项目 node -v > 18.0.0 npm -v > 8.6.0 create-react-app star-airbnb 3. 项目基本配置 配置jsconfig.json {"compilerOptions": {"target": "es5","module": "esnext","ba…

如何设置带有密码的excel只读模式?

Excel只读模式大家都不陌生&#xff0c;那大家知道带有密码的只读模式吗&#xff1f;今天给大家分享如何设置带有密码的只读模式。 打开excel文件&#xff0c;将文件进行【另存为】设置&#xff0c;然后停留在保存路径的界面中&#xff0c;我们点击下面的工具 – 常规选项 在常…

postgresql以及postgis安装

一、安装postgresql及postgis 1.下载postgresql https://www.enterprisedb.com/downloads/postgres-postgresql-downloads 我选择的版本为“postgresql-14.8-2-windows-x64.exe”。 2.以管理员模式运行安装程序 安装路径建议不要C盘&#xff0c;可能会由于权限问题导致目录…

【计算机毕业设计】nodejs+vue音乐播放器系统 微信小程序83g3s

本系统的设计与实现共包含12个表:分别是配置文件信息表&#xff0c;音乐列表评论表信息表&#xff0c;音乐论坛信息表&#xff0c;歌手介绍信息表&#xff0c;音乐资讯信息表&#xff0c;收藏表信息表&#xff0c;token表信息表&#xff0c;用户表信息表&#xff0c;音乐类型信…