掌握RDD分区

news2024/12/27 13:38:54

零、本讲学习目标

  1. 学会如何指定分区数量
  2. 会定义与使用自定义分区器

一、RRD分区

(一)RDD分区概念

  • RDD是一个大的数据集合,该集合被划分成多个子集合分布到了不同的节点上,而每一个子集合就称为分区(Partition)。因此,也可以说,RDD是由若干个分区组成的。

(二)RDD分区作用

  • 在分布式程序中,网络通信的开销是很大的,因此控制数据分布以获得最少的网络传输可以极大的提升程序的整体性能,Spark程序可以通过控制RDD分区方式来减少通信开销。Spark中所有的RDD都可以进行分区,系统会根据一个针对键的函数对元素进行分区。虽然Spark不能控制每个键具体划分到哪个节点上,但是可以确保相同的键出现在同一个分区上。

二、RDD分区数量

(一)RDD分区原则

  • RDD各个分区中的数据可以并行计算,因此分区的数量决定了并行计算的粒度。Spark会给每一个分区分配一个单独的Task任务对其进行计算,因此并行Task的数量是由分区的数量决定的。RDD分区的一个分区原则是使得分区的数量尽量等于集群中CPU核心数量。

(二)影响分区的因素

  • RDD的创建有两种方式:一种是使用parallelize()或makeRDD()方法从对象集合创建;另一种是使用textFile()方法从外部存储系统创建。RDD分区的数量与RDD的创建方式以及Spark集群的运行模式有关。

(三)使用parallelize()方法创建RDD时的分区数量

1、指定分区数量

  • 使用parallelize()方法创建RDD时,可以传入第二个参数,指定分区数量。
  • 注意:采用本地模式启动Spark Shell(在master节点上)

 

  • 利用mapPartitionsWithIndex()函数实现带分区索引的映射

 

 

  • 第1个分区完成了3个元素的映射,第2个分区完成了3个元素的映射,第3个分区完成了4个元素的映射

2、默认分区数量

  • 若不指定分区数量,则默认分区数量为Spark配置文件spark-defaults.conf中的参数
  • spark.default.parallelism的值。若没有配置该参数,则Spark会根据集群的运行模式自动确定分区数量。
  • 如果是本地模式,默认分区数量就等于本机CPU核心总数,这样每个CPU核心处理一个分区的计算任务,可以最大程度发挥CPU的性能。
  • 如果是Spark Standalone或Spark On YARN模式,默认分区数量就取集群中所有CPU的核心总数与2中的较大值,即最少分区数为2。
  • 我们采用的是Standalone模式的Spark集群
  • 先用spark-shell本地模式启动

 

  • 由此可见,本地机器master的CPU核数为4

 

  • 以集群模式启动Spark Shell

 

  • 注意:Spark集群是一个Master(master虚拟机)和两个Worker(slave1和slave2虚拟机)。

 

  • 默认分区数是8。为什么是8呢?集群两个工作节点(slave1和slave2)的CPU核数总和是4 + 4 = 8

3、分区源码分析

  • parallelize()方法是在SparkContext类定义的

 

  • numSlices参数为指定的分区数量,该参数有一个默认值defaultParallelism,是一个无参函数

 

  • 上述代码中的taskScheduler的类型为特质TaskScheduler,通过调用该特质的defaultParallelism方法取得默认分区数量,而类TaskSchedulerImpl继承了特质TaskScheduler并实现了defaultParallelism方法。

 

 

  • 上述代码中的backend的类型为特质SchedulerBackend,通过调用该特质的defaultParallelism()方法取得默认分区数量,特质SchedulerBackend主要用于申请资源和对Task任务的执行和管理;而类LocalSchedulerBackend和类CoarseGrainedSchedulerBackend则继承了特质SchedulerBackend并分别实现了其中的defaultParallelism()方法

 

  • 类LocalSchedulerBackend用于Spark的本地运行模式(Executor和Master等在同一个JVM中运行),其调用顺序在TaskSchedulerImpl类之后;类CoarseGrainedSchedulerBackend则用于Spark的集群运行模式。
  • LocalSchedulerBackend中的defaultParallelism()方法

 

  • 上述代码中的字符串spark.default.parallelism为Spark配置文件spark-defaults.conf中的参数spark.default.parallelism;totalCores为本机CPU核心总数。
  • CoarseGrainedSchedulerBackend中的defaultParallelism()方法

 

  • 上述代码中,math.max(totalCoreCount.get(), 2)表示取集群中所有CPU核心总数与2两者中的较大值。

(四)使用textFile()方法创建RDD时的分区数量

  • textFile()方法通常用于读取HDFS中的文本文件,使用该方法创建RDD时,Spark会对文件进行分片操作(类似于MapReduce的分片,实际上调用的是MapReduce的分片接口),分片操作完成后,每个分区将存储一个分片的数据,因此分区的数量等于分片的数量

1、指定最小分区数量

  • 使用textFile()方法创建RDD时可以传入第二个参数指定最小分区数量。最小分区数量只是期望的数量,Spark会根据实际文件大小、文件块(Block)大小等情况确定最终分区数量

 

  • 在HDFS中有一个文件/park/test.txt,读取该文件,并指定最小分区数量为5,但是实际分区数量是6

2、默认最小分区数量

  • 若不指定最小分区数量,则Spark将采用默认规则计算默认最小分区数量。

  • 以集群启动Spark Shell,默认分区数是2

 

  • 查看textFile()源码

 

  • 上述代码中的minPartitions参数为期望的最小分区数量,该参数有一个默认值defaultMinPartitions,这是一个无参函数,我们来查看其源码。

 

  • 从上述代码中可以看出,默认最小分区数取默认并行度与2中的较小值;而默认并行度则是parallelize()方法的默认分区数。

    3、默认实际分区数量

  • 最小分区数量确定后,Spark接下来将计算实际分区数量。查看textFile()方法的源码可知,textFile()方法最后调用了一个hadoopFile()方法,并对该方法的结果执行了map()算子。

 

  • 查看hadoopFile()方法的源码

 

  • 从上述代码可以看出,最终返回一个HadoopRDD对象。
  • 查看HadoopRDD类的部分源码

 

  • 查看HadoopRDD类的部分源码HadoopRDD类中的getPartitions()方法的功能是获取实际分区数量。通过调用getInputFormat()方法得到InputFormat的实例,然后调用该实例的getSplits()方法获得输入数据的所有分片,getSplits()方法是决定最终分区数量的关键方法,该方法的第二个参数即为RDD的最小分区数量。
  • 查看InputFormt接口getSplits()抽象方法

 

  • InputFormat有个实现类FileInputFormat,它实现了getSplits()方法

 

 

  • 根据期望分片数量(numSplits,即最小分区数量)计算期望分片大小(goalSize)。计算实际分片大小(splitSize)。splitSize最终决定了分片的数量。
  • splitSize由3个因素决定:最小分片大小(minSize)期望分片大小(goalSize)分块大小(blockSize)
  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    StopWatch sw = new StopWatch().start();
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits.toArray(new FileSplit[splits.size()]);
  }
  • 结论:在MapReduce中,每个分片对应一个Map任务,多个Map任务以完全并行的方式处理;而在Spark中,每个分片对应一个分区,每个分区对应一个Task任务,多个Task任务以完全并行的方式处理

(五)RDD分区方式

  • Spark框架为RDD提供了两种分区方式,分别是哈希分区器(HashPartitioner)和范围分区器(RangePartitioner)。其中,哈希分区是根据哈希值进行分区;范围分区是将一定范围的数据映射到一个分区中。这两种分区方式已经可以满足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。

三、Spark分区器

(一)分区器 - Partitioner抽象类

  • Spark RDD的Shuffle过程与MapReduce类似,涉及数据重组和重新分区,且要求RDD的元素必须是(key, value)形式的。分区规则是由分区器(Partitioner)控制的,Spark的主要分区器是HashPartitioner和RangePartitioner,都继承了Partitioner抽象类。
  • 抽象类Partitioner中有两个方法,分别用于指定分区数量和设置分区规则

(二)哈希分区器 - HashPartitioner类

  • HashPartitioner是Spark使用的默认分区器,其分区规则为:取(key,value)对中key的hashCode值,然后除以分区数量后取余数。若余数小于0(一般余数都大于等于0),则用余数与分区数量的和作为分区ID,否则将余数作为分区ID。分区ID一致的(key,value)对则会被分配到同一个分区。因此,默认情况下,key值相同的(key,value)对一定属于同一个分区,但是同一个分区中可能有多个key值不同的(key,value)对。该分区器还支持key值为null的情况,当key值等于null时,将直接返回0作为分区ID。

 

  • HashPartitioner分区器中,对key取hashCode值实际上调用的是Java类Object中的hashCode()方法。由于Java数组的hashCode值基于的是数组标识,而不是数组内容,因此具有相同内容的数组的hashCode值不同。如果将数组作为RDD的key,就可能导致内容相同的key不能分配到同一个分区中。这个时候可以将数组转为集合,或者使用自定义分区器,根据数组内容进行分区。

四、自定义分区器

(一)提出问题

  • 在有些情况下,使用Spark自带的分区器满足不了特定的需求。
  • 例如,某学生有以下3科三个月的月考成绩数据。

 

  • 现需要将每一科成绩单独分配到一个分区中,然后将3科成绩输出到HDFS的指定目录(每个分区对应一个结果文件),此时就需要对数据进行自定义分区。

(二)解决问题

1、准备数据文件

  • master虚拟机的/home目录里创建marks.txt

 

  • 将数据文件上传到HDFS指定目录

2、新建科目分区器

  • 创建net.cl.rdd.day04包,在包里创建SubjectPartitioner
package net.cl.rdd.day04

import org.apache.spark.Partitioner

class SubjectPartitioner(partitions: Int) extends Partitioner {
  /**
   * @return 分区数量
   */
  override def numPartitions: Int = partitions

  /**
   * @param key(科目)
   * @return 分区索引
   */
  override def getPartition(key: Any): Int = {
    val partitionIndex = key.toString match {
      case "chinese" => 0
      case "math" => 1
      case "english" => 2
    }
    partitionIndex
  }
}

3、测试科目分区器

  • 调用RDD的partitionBy()方法传入科目分区器类SubjectPartitioner的实例,可以对RDD按照自定义规则进行重新分区。
  • net.cl.rdd.day04包里创建TestSubjectPartitioner单例对象
package net.cl.rdd.day04

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


object TestSubjectPartitioner {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("TestSubjectPartitioner") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)
    // 读取HDFS文件,生成RDD
    val lines = sc.textFile("hdfs://master:9000/partition/input/marks.txt")
    // 将每行数据映射成(科目,成绩)二元组
    val data: RDD[(String, Int)] = lines.map(line => {
        val fields = line.split(" ")
        (fields(0), fields(1).toInt) // (科目,成绩)
    })
    // 将数据按科目分区器重新分区
    val partitionData = data.partitionBy(new SubjectPartitioner(3))
    // 在控制台输出分区数据
    partitionData.collect.foreach(println)
    // 保存分区数据到HDFS指定目录
    partitionData.saveAsTextFile("hdfs://master:9000/partition/output")
  }
}
  • 运行程序,查看结果

 

  • 查看HDFS的结果文件

 

  • 如果传入的分区数不是3,会出现什么状况?
  • 删除输出目录

 

  • 运行程序,查看控制台输出结果

 

  • 查看HDFS上的结果文件

 

  • 删除输出目录,修改分区数为2,再运行程序,查看控制台结果

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/562283.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3.5 RDD持久化机制

一、RDD持久化 1、不采用持久化操作 查看要操作的HDFS文件 以集群模式启动Spark Shell 按照图示进行操作&#xff0c;得RDD4和RDD5 查看RDD4内容&#xff0c;会从RDD1到RDD2到RDD3到RDD4跑一趟 显示RDD5内容&#xff0c;也会从RDD1到RDD2到RDD3到RDD5跑一趟 2、采用持久化…

python解析html数据,获取到的链接是以/或 ./ 或 ../ 开头的相对链接,不是以http开头的,需要补全

一、实现的目标 在使用爬虫获取网页html数据时,解析到的链接是/或./ 开头的相对链接,不是以http开头的链接,如:/picture/0/cca65350643c441e80d390ded3975db0.png 。此时需要完成对该链接的补全,以得到正确的链接。此外,我们需要将解析到的html数据保存到起来,将来需要展…

3.8 Spark RDD典型案例

一、利用RDD计算总分与平均分 &#xff08;一&#xff09;准备工作 1、启动HDFS服务 2、启动Spark服务 3、在本地创建成绩文件 4、将成绩文件上传到HDFS &#xff08;二&#xff09;完成任务 1、在Spark Shell里完成任务 &#xff08;1&#xff09;读取成绩文件&#xff…

【搭建轻量级图床】本地搭建LightPicture开源图床管理系统,并公网远程访问

文章目录 1.前言2. Lightpicture网站搭建2.1. Lightpicture下载和安装2.2. Lightpicture网页测试2.3.cpolar的安装和注册 3.本地网页发布3.1.Cpolar云端设置3.2.Cpolar本地设置 4.公网访问测试5.结语 1.前言 现在的手机越来越先进&#xff0c;功能也越来越多&#xff0c;而手机…

二十三种设计模式第九篇--代理模式

在代理模式&#xff08;Proxy Pattern&#xff09;中&#xff0c;一个类代表另一个类的功能。这种类型的设计模式属于结构型模式。在代理模式中&#xff0c;我们创建具有现有对象的对象&#xff0c;以便向外界提供功能接口。 意图&#xff1a;为其他对象提供一种代理以控制对这…

示范性微电子院校“抢人”,芯片赛道黄不了!

经常看到有同学问&#xff0c;“国内高校微电子专业最好的是哪所高校?”“想搞数字ic设计去哪所大学好呢&#xff1f;” 其实国内28所示范性微电子学院都是非常不错的选择。 2015年&#xff0c;九所示范性微电子院校名单公布&#xff0c;包括了清华大学、北京大学、复旦大学…

8、Linux C/C++ 实现MySQL的图片插入以及图片的读取

本文结合了Linux C/C 实现MySQL的图片插入以及图片的读取&#xff0c;特别是数据库读写的具体流程 一、文件读取相关函数 fseek() 可以将文件指针移动到文件中的任意位置。其基本形式如下&#xff1a; int fseek(FILE *stream, long offset, int whence);其中&#xff0c;str…

kafka 设置用户密码和通过SpringBoot测试

叙述 当前Kafka认证方式采用动态增加用户协议。 自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka群集的安全性&#xff0c;Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现&#xff0c;此文主要介绍SASL方式。 1&#xff09;SASL验证: 验证方式Kaf…

【JavaSE】Java基础语法(六):方法详解

文章目录 1. 方法概述1.1 方法的概念 2. 方法的定义和调用2.1 方法的定义2.2 方法的调用过程 3. 带参数方法的定义和调用3.1 带参数方法定义和调用3.2 形参和实参 4. 带返回值方法的定义和调用4.1 带返回值方法定义和调用4.2 带返回值方法的练习-求两个数的最大值(应用) 5. 方法…

【链接】深入理解PLT表和GOT表

系列综述&#xff1a; &#x1f49e;目的&#xff1a;本系列是个人整理为了秋招面试的&#xff0c;整理期间苛求每个知识点&#xff0c;平衡理解简易度与深入程度。 &#x1f970;来源&#xff1a;材料主要源于多处理器编程的艺术进行的&#xff0c;每个知识点的修正和深入主要…

nest日志包pino、winston配置-懒人的折腾

nest日志 三种node服务端日志选型 winstonpinolog4js 2023年5月23日 看star数&#xff1a;winston > pino > log4js 使用体验&#xff1a; pino 格式简洁&#xff0c;速度快&#xff0c;支持输入日志到任意数据库&#xff0c;日志暂无自动清理&#xff08;可能是我…

AI是怎么帮我写代码,写SQL的?(本文不卖课)

近期&#xff0c;ChatGPT风起云涌&#xff0c;“再不入局&#xff0c;就要被时代淘汰”的言论甚嚣尘上&#xff0c;借着这一波创业的朋友都不止3-4个&#xff0c;如果没记错&#xff0c;前几次抛出该言论的风口似乎是区块链&#xff0c;元宇宙&#xff0c;WEB3.0。 画外音&…

动态规划问题实验:数塔问题

目录 前言实验内容实验流程实验过程实验分析伪代码代码实现分析算法复杂度用例测试 总结 前言 动态规划是一种解决复杂问题的方法&#xff0c;它将一个问题分解为若干个子问题&#xff0c;然后从最简单的子问题开始求解&#xff0c;逐步推导出更复杂的子问题的解&#xff0c;最…

绝世内功秘籍《调试技巧》

本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 内容专栏&#xff1a;这里是《C知识系统分享》专栏&#xff0c;笔者用重金(时间和精力)打造&#xff0c;基础知识一网打尽&#xff0c;希望可以帮到读者们哦。 内…

CloudQuery v2.0.0 发布 新增数据保护、数据变更、连接管理等功能

哈喽社区的小伙伴们&#xff0c;经过一个月的努力&#xff0c;CloudQuery 社区版发布了全新 v2.0.0系列&#xff01; 对比 v1.5.0&#xff0c;v2.0.0 在整体 UI 界面上就做了很大调整&#xff0c;功能排布我们做了重新梳理&#xff0c;可以说&#xff0c;社区版 v2.0.0 带领 C…

Linux——makefile自动化构建工具

一. 前言 一个工程中的源文件不计数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;makefile定义了一系列的 规则来指定&#xff0c;哪些文件需要先编译&#xff0c;哪些文件需要后编译&#xff0c;哪些文件需要重新编译&#xff0c;甚至于进行更复杂 的功能…

数据结构的定义

主要的定义 数据 描述客观事物的数和字符的集合&#xff0c;比如文字&#xff0c;数字和特殊符号 基本单元&#xff1a;数据元素 一个数据单元由若干个数据项构成 数据项&#xff1a;具有独立含义的数据最小单元&#xff0c;也称字段或域 数据元素&…

Spring Boot 中的 Starter 是什么?如何创建自定义 Starter?

Spring Boot 中的 Starter 是什么&#xff1f;如何创建自定义 Starter&#xff1f; Spring Boot 是一个快速构建应用程序的框架&#xff0c;它提供了一种简单的方式来快速启动和配置 Spring 应用程序。Spring Boot Starter 是 Spring Boot 的一个重要概念&#xff0c;它可以帮…

计算机网络详细笔记(四)网际控制报文协议ICMP

文章目录 4.网际控制报文协议ICMP4.1.ICMP报文的种类4.2.ICMP应用举例 4.网际控制报文协议ICMP 网际控制报文协议概述&#xff1a;&#xff1a; 作用&#xff1a;更有效地转发IP数据报和提高交付成功的机会。原理&#xff1a;允许主机或路由器报告差错情况和提供有关异常情况…

maven_SSM项目如何实现验证码功能

验证码的作用 防止恶意注册&#xff0c;自动化程序批量注册。防止暴力破解。 1、这里我们使用goole的验证码生成器 由于直接在maven中引入依赖&#xff0c;没有找到。所以只能直接去下载jar包了。 链接&#xff1a;https://pan.baidu.com/s/1KANhJKI4sQCfkiroTVr0WA?pwd29iv …