Spark大数据处理讲课笔记3.3 掌握RDD分区

news2024/11/15 23:04:12

文章目录

  • 零、本讲学习目标
  • 一、RRD分区
    • (一)RDD分区概念
    • (二)RDD分区作用
  • 二、RDD分区数量
    • (一)RDD分区原则
    • (二)影响分区的因素
    • (三)使用parallelize()方法创建RDD时的分区数量
      • 1、指定分区数量
      • 2、默认分区数量
      • 3、分区源码分析
    • (四)使用textFile()方法创建RDD时的分区数量
      • 1、指定最小分区数量
      • 2、默认最小分区数量
      • 3、默认实际分区数量
    • (五)RDD分区方式
  • 三、Spark分区器
    • (一)分区器 - Partitioner抽象类
    • (二)哈希分区器 - HashPartitioner类
  • 四、自定义分区器
    • (一)提出问题
    • (二)解决问题
      • 1、准备数据文件
      • 2、新建科目分区器
      • 3、测试科目分区器

零、本讲学习目标

  1. 学会如何指定分区数量
  2. 会定义与使用自定义分区器

一、RRD分区

(一)RDD分区概念

  • RDD是一个大的数据集合,该集合被划分成多个子集合分布到了不同的节点上,而每一个子集合就称为分区(Partition)。因此,也可以说,RDD是由若干个分区组成的。
    在这里插入图片描述

(二)RDD分区作用

  • 在分布式程序中,网络通信的开销是很大的,因此控制数据分布以获得最少的网络传输可以极大的提升程序的整体性能,Spark程序可以通过控制RDD分区方式来减少通信开销。Spark中所有的RDD都可以进行分区,系统会根据一个针对键的函数对元素进行分区。虽然Spark不能控制每个键具体划分到哪个节点上,但是可以确保相同的键出现在同一个分区上。

二、RDD分区数量

(一)RDD分区原则

  • RDD各个分区中的数据可以并行计算,因此分区的数量决定了并行计算的粒度。Spark会给每一个分区分配一个单独的Task任务对其进行计算,因此并行Task的数量是由分区的数量决定的。RDD分区的一个分区原则是使得分区的数量尽量等于集群中CPU核心数量。

(二)影响分区的因素

  • RDD的创建有两种方式:一种是使用parallelize()makeRDD()方法从对象集合创建;另一种是使用textFile()方法从外部存储系统创建。而RDD分区的数量与RDD的创建方式以及Spark集群的运行模式有关。

(三)使用parallelize()方法创建RDD时的分区数量

1、指定分区数量

  • 使用parallelize()方法创建RDD时,可以传入第二个参数,指定分区数量。
    在这里插入图片描述

  • 分区的数量应尽量等于集群中所有CPU的核心总数,以便可以最大程度发挥CPU的性能。

  • 利用mapPartitionsWithIndex()函数实现带分区索引的映射
    在这里插入图片描述
    在这里插入图片描述

  • 第1个分区完成了3个元素的映射,第2个分区完成了3个元素的映射,第3个分区完成了4个元素的映射

2、默认分区数量

  • 若不指定分区数量,则默认分区数量为Spark配置文件spark-defaults.conf中的参数spark.default.parallelism的值。若没有配置该参数,则Spark会根据集群的运行模式自动确定分区数量。
  • 如果是本地模式,默认分区数量就等于本机CPU核心总数,这样每个CPU核心处理一个分区的计算任务,可以最大程度发挥CPU的性能。
  • 如果是Spark Standalone或Spark On YARN模式,默认分区数量就取集群中所有CPU的核心总数与2中的较大值,即最少分区数为2
    在这里插入图片描述
  • 我们采用的是Standalone模式的Spark集群
  • 先用spark-shell本地模式启动
    在这里插入图片描述
  • 由此可见,本地机器master的CPU核数为4
    在这里插入图片描述
  • 以集群模式启动Spark Shell
    在这里插入图片描述
  • 注意:Spark集群是一个Master(master虚拟机)和两个Worker(slave1和slave2虚拟机)
    在这里插入图片描述
  • 默认分区数是8。为什么是8呢?集群两个工作节点(slave1和slave2)的CPU核数总和是4 + 4 = 8
    在这里插入图片描述

3、分区源码分析

  • parallelize()方法是在SparkContext类定义的在这里插入图片描述
  • numSlices参数为指定的分区数量,该参数有一个默认值defaultParallelism,是一个无参函数
    在这里插入图片描述
  • 上述代码中的taskScheduler的类型为特质TaskScheduler,通过调用该特质的defaultParallelism方法取得默认分区数量,而类TaskSchedulerImpl继承了特质TaskScheduler并实现了defaultParallelism方法。
    在这里插入图片描述
    在这里插入图片描述
  • 上述代码中的backend的类型为特质SchedulerBackend,通过调用该特质的defaultParallelism()方法取得默认分区数量,特质SchedulerBackend主要用于申请资源和对Task任务的执行和管理;而类LocalSchedulerBackend和类CoarseGrainedSchedulerBackend则继承了特质SchedulerBackend并分别实现了其中的defaultParallelism()方法。
    在这里插入图片描述
  • 类LocalSchedulerBackend用于Spark的本地运行模式(Executor和Master等在同一个JVM中运行),其调用顺序在TaskSchedulerImpl类之后;类CoarseGrainedSchedulerBackend则用于Spark的集群运行模式。
  • LocalSchedulerBackend中的defaultParallelism()方法
    在这里插入图片描述
  • 上述代码中的字符串spark.default.parallelism为Spark配置文件spark-defaults.conf中的参数spark.default.parallelism;totalCores为本机CPU核心总数。
  • CoarseGrainedSchedulerBackend中的defaultParallelism()方法
    在这里插入图片描述
  • 上述代码中,math.max(totalCoreCount.get(), 2)表示取集群中所有CPU核心总数与2两者中的较大值。

(四)使用textFile()方法创建RDD时的分区数量

  • textFile()方法通常用于读取HDFS中的文本文件,使用该方法创建RDD时,Spark会对文件进行分片操作(类似于MapReduce的分片,实际上调用的是MapReduce的分片接口),分片操作完成后,每个分区将存储一个分片的数据,因此分区的数量等于分片的数量
    在这里插入图片描述

1、指定最小分区数量

  • 使用textFile()方法创建RDD时可以传入第二个参数指定最小分区数量。最小分区数量只是期望的数量,Spark会根据实际文件大小、文件块(Block)大小等情况确定最终分区数量
    在这里插入图片描述
  • 在HDFS中有一个文件/park/test.txt,读取该文件,并指定最小分区数量为5,但是实际分区数量是6
    在这里插入图片描述

2、默认最小分区数量

  • 若不指定最小分区数量,则Spark将采用默认规则计算默认最小分区数量。
  • 查看textFile()源码
    在这里插入图片描述
  • 上述代码中的minPartitions参数为期望的最小分区数量,该参数有一个默认值defaultMinPartitions,这是一个无参函数,我们来查看其源码。
    在这里插入图片描述
  • 从上述代码中可以看出,默认最小分区数取默认并行度与2中的较小值;而默认并行度则是parallelize()方法的默认分区数。

3、默认实际分区数量

  • 最小分区数量确定后,Spark接下来将计算实际分区数量。查看textFile()方法的源码可知,textFile()方法最后调用了一个hadoopFile()方法,并对该方法的结果执行了map()算子。
    在这里插入图片描述
  • 查看hadoopFile()方法的源码在这里插入图片描述
  • 从上述代码可以看出,最终返回一个HadoopRDD对象。
  • 查看HadoopRDD类的部分源码
    在这里插入图片描述
  • HadoopRDD类中的getPartitions()方法的功能是获取实际分区数量。通过调用getInputFormat()方法得到InputFormat的实例,然后调用该实例的getSplits()方法获得输入数据的所有分片,getSplits()方法是决定最终分区数量的关键方法,该方法的第二个参数即为RDD的最小分区数量。
  • 查看InputFormt接口getSplits()抽象方法
    在这里插入图片描述
  • InputFormat有个实现类FileInputFormat,它实现了getSplits()方法
    在这里插入图片描述
    在这里插入图片描述
  • 根据期望分片数量(numSplits,即最小分区数量)计算期望分片大小(goalSize)。计算实际分片大小(splitSize)。splitSize最终决定了分片的数量。
  • splitSize由3个因素决定:最小分片大小(minSize)、期望分片大小(goalSize)、分块大小(blockSize)。
  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    StopWatch sw = new StopWatch().start();
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits.toArray(new FileSplit[splits.size()]);
  }
  • 结论:在MapReduce中,每个分片对应一个Map任务,多个Map任务以完全并行的方式处理;而在Spark中,每个分片对应一个分区,每个分区对应一个Task任务,多个Task任务以完全并行的方式处理。

(五)RDD分区方式

  • Spark框架为RDD提供了两种分区方式,分别是哈希分区(HashPartitioner)范围分区(RangePartitioner)。其中,哈希分区是根据哈希值进行分区;范围分区是将一定范围的数据映射到一个分区中。这两种分区方式已经可以满足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。

三、Spark分区器

(一)分区器 - Partitioner抽象类

  • Spark RDD的Shuffle过程与MapReduce类似,涉及数据重组和重新分区,且要求RDD的元素必须是(key, value)形式的。分区规则是由分区器(Partitioner)控制的,Spark的主要分区器是HashPartitionerRangePartitioner,都继承了Partitioner抽象类。
    在这里插入图片描述
  • 抽象类Partitioner中有两个方法,分别用于指定分区数量和设置分区规则
    在这里插入图片描述

(二)哈希分区器 - HashPartitioner类

  • HashPartitioner是Spark使用的默认分区器,其分区规则为:取(key,value)对中key的hashCode值,然后除以分区数量后取余数。若余数小于0(一般余数都大于等于0),则用余数与分区数量的和作为分区ID,否则将余数作为分区ID。分区ID一致的(key,value)对则会被分配到同一个分区。因此,默认情况下,key值相同的(key,value)对一定属于同一个分区,但是同一个分区中可能有多个key值不同的(key,value)对。该分区器还支持key值为null的情况,当key值等于null时,将直接返回0作为分区ID。
  • HashPartitioner分区器中,对key取hashCode值实际上调用的是Java类Object中的hashCode()方法。由于Java数组的hashCode值基于的是数组标识,而不是数组内容,因此具有相同内容的数组的hashCode值不同。如果将数组作为RDD的key,就可能导致内容相同的key不能分配到同一个分区中。这个时候可以将数组转为集合,或者使用自定义分区器,根据数组内容进行分区。
    在这里插入图片描述

四、自定义分区器

(一)提出问题

  • 在有些情况下,使用Spark自带的分区器满足不了特定的需求。
  • 例如,某学生有以下3科成绩数据。
科目成绩
chinese98
math88
english96
chinese89
math96
english67
chinese88
math78
english89
  • 现需要将每一科成绩单独分配到一个分区中,然后将3科成绩输出到HDFS的指定目录(每个分区对应一个结果文件),此时就需要对数据进行自定义分区。

(二)解决问题

1、准备数据文件

  • master虚拟机的/home目录里创建marks.txt
    在这里插入图片描述
  • 将数据文件上传到HDFS指定目录
    在这里插入图片描述

2、新建科目分区器

  • 创建net.huawei.rdd.day04包,在包里创建SubjectPartitioner
    在这里插入图片描述
package net.huawei.rdd.day04

import org.apache.spark.Partitioner

/**
 * 功能:科目分区器
 * 作者:华卫
 * 日期:2023年05月04日
 */
class SubjectPartitioner(partitions: Int) extends Partitioner {
  /**
   * @return 分区数量
   */
  override def numPartitions: Int = partitions

  /**
   * @param key
   * @return 分区索引
   */
  override def getPartition(key: Any): Int = {
    val partitionIndex = key.toString match {
      case "chinese" => 0
      case "math" => 1
      case "english" => 2
    }
    partitionIndex
  }
}

3、测试科目分区器

  • 调用RDD的partitionBy()方法传入科目分区器类SubjectPartitioner的实例,可以对RDD按照自定义规则进行重新分区。
  • net.huawei.rdd.day04包里创建TestSubjectPartitioner单例对象
    在这里插入图片描述
package net.huawei.rdd.day04

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 功能:测试科目分区器
 * 作者:华卫
 * 日期:2023年05月04日
 */
object TestSubjectPartitioner {
  def main(args: Array[String]): Unit = {
    // 创建Spark配置对象
    val conf = new SparkConf()
      .setAppName("TestSubjectPartitioner") // 设置应用名称
      .setMaster("local[*]") // 设置主节点位置(本地调试)
    // 基于Spark配置对象创建Spark容器
    val sc = new SparkContext(conf)
    // 读取HDFS文件,生成RDD
    val lines = sc.textFile("hdfs://master:9000/partition/input/marks.txt")
    // 将每行数据映射成(科目,成绩)二元组
    val data: RDD[(String, Int)] = lines.map(line => {
        val fields = line.split(" ")
        (fields(0), fields(1).toInt) // (科目,成绩)
    })
    // 将数据重新分区
    val partitionData = data.partitionBy(new SubjectPartitioner(3))
    // 在控制台输出分区数据
    partitionData.collect.foreach(println)
    // 保存分区数据到HDFS指定目录
    partitionData.saveAsTextFile("hdfs://master:9000/partition/output")
  }
}
  • 运行程序,查看结果
    在这里插入图片描述
  • 查看HDFS的结果文件
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/487454.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【计算机视觉】Visual Transformer (ViT)模型结构以及原理解析

文章目录 一、简介二、Vision Transformer如何工作三、ViT模型架构四、ViT工作原理解析4.1 步骤1&#xff1a;将图片转换成patches序列4.2 步骤2&#xff1a;将patches铺平4.3 步骤3&#xff1a;添加Position embedding4.4 步骤4&#xff1a;添加class token4.5 步骤5&#xff…

数字图像处理--matlab图像反转、对数变换、伽马变换、对比度拉伸详解和代码实现

灰度变换在图像的单个像素上操作&#xff0c;主要以对比度和阈值处理为目的 空间滤波涉及改善性能的操作&#xff08;邻域操作&#xff09;&#xff0c;如通过图像中每一个像素的邻域处理来锐化图像 这两种方法均属于图像增强。 灰度变换 邻域基本增强变换定义数学表达三种基本…

QTableWidget表格控件的用法(非常详细)

QTableWidget表格控件的用法&#xff08;非常详细&#xff09; QTableWidget表格控件的用法&#xff08;非常详细&#xff09;QTableWidget详解1.常用API设置自动调整行高和列宽设置表格内容是否可编辑设置行表头、列表头是否显示 2.添加子项3.右键弹出菜单4.设置风格5.清空6.运…

LED灯内常见驱动电路

如今LED灯已成为照明的主流&#xff0c;使用白炽灯的家庭少之又少。其主要优势是LED灯更节能&#xff0c;相同光效的情况下&#xff0c;LED灯消耗的电能要比白炽灯减少70%以上。 LED灯的寿命比白炽灯要长&#xff0c;使用过白炽灯的人都知道&#xff0c;使用不了多长时间&…

简析Linux内核中的各种锁:信号量/互斥锁/读写锁/原子锁/自旋锁/内存屏障等

首先得搞清楚&#xff0c;不同锁的作用对象不同。 下面分别是作用于临界区、CPU、内存、cache 的各种锁的归纳&#xff1a; 一、atomic原子变量/spinlock自旋锁 — —CPU 既然是锁CPU&#xff0c;那就都是针对多核处理器或多CPU处理器。单核的话&#xff0c;只有发生中断会使…

生成C++工程的UML类图和类继承关系图

简介 在进行软件开发时&#xff0c;了解代码结构和关系、类之间的继承关系以及类内部的成员函数和变量定义是非常重要的。为此&#xff0c;我们可以使用Doxygen和Graphviz工具来生成UML类图和类集成关系图。 Doxygen是一个用于从注释的C源代码中生成文档的工具&#xff0c;支…

day01刷题记录

刷题 题目一分析题解 题目二分析题解 题目一 牛牛举办了一次编程比赛,参加比赛的有3*n个选手,每个选手都有一个水平值a_i.现在要将这些选手进行组队,一共组成n个队伍,即每个队伍3人.牛牛发现队伍的水平值等于该队伍队员中第二高水平值。 例如: 一个队伍三个队员的水平值分别是…

access数据库连接sqlserver实现远程连接

由于项目需要对接生产系统&#xff0c;但是生产系统使用的是access数据库&#xff08;这么老还在用&#xff0c;不知道咋想的&#xff09;&#xff0c;客户又想把项目部署到阿里云上&#xff0c;需要阿里云远程连接本地的access数据库&#xff08;心里一句MMP送上&#xff09;&…

Java——线程池详细讲解

文章目录 一、线程池一、线程池基础1.1 什么是线程池1.2 为什么使用线程池1.3 线程池有哪些优势1.4 应用场景 二、线程池使用2.1 Java内置线程池 ThreadPoolExecutor2.1.1 线程池的七个参数2.1.1.1 **int corePoolSize 核心线程数量**2.1.1.2 int maximumPoolSize 最大线程数2.…

假期后,野兔百科系统网站源码新版更新发布

这个是野兔百科系统中文版更新&#xff0c;这次更新了增加几个功能模块&#xff0c;几个已知的问题&#xff0c;修复系统部分功能。 系统名称&#xff1a;野兔百科系统 系统语言&#xff1a;中文版 系统源码&#xff1a;不加密&#xff0c;开源 系统开发&#xff1a;PHPMySQL …

尚融宝29-提现和还款

目录 一、提现 &#xff08;一&#xff09;需求 &#xff08;二&#xff09;前端 &#xff08;三&#xff09;后端 1、提现接口 2、回调接口 二、还款 &#xff08;一&#xff09;需求 &#xff08;二&#xff09;前端 &#xff08;三&#xff09;后端 1、还款接口 …

第一章:概述

1&#xff0c;因特网概述 1.网络、互联网和英特网 网络(Network)由若干结点(Node)和连接这些结点的链路(Link)组成。 多个网络还可以通过路由器互连起来&#xff0c;这样就构成了一个覆盖范围更大的网络&#xff0c;即互联网(或互连网)。因此&#xff0c;互联网是“网络的网络…

UE蓝图基础学习笔记(未完待续2023/05/03)

文章目录 一、项目创建1&#xff09;准备流程&#xff08;选择模板、开发语言、平台、质量等&#xff09;2&#xff09;界面介绍 二、Actor三、操作关卡对象&#xff08;旋转、移动、缩放和坐标轴&#xff09;四、常用快捷键五、运行游戏六、蓝图介绍七、蓝图节点八、操作事件图…

Azure DevOps Server 2022.0.1升级手册

Contents 1. 概述2. 操作方法 2.1 安装操作系统2.2 安装数据库2.4 还原数据2.3 安装和配置Azure DevOps Server 1. 概述 Azure DevOps Server 是微软公司经过20多年的持续开发&#xff0c;逐渐将需求管理、敏捷实践、源代码管理、持续集成等功能集成一体&#xff0c;实现应用软…

AutoHotKey简单入门

简单入门 快捷键 ^j::Send, Hello world! Return^j::代表CtrlJ&#xff0c;其中^代表Ctrl键 Send命令&#xff1a;在光标处输入Hello world! 也就是说&#xff0c;你按下CtrlJ后&#xff0c;将会输入字符串Hello world! Return即返回 热字串 ::ftw::Free the whales Ret…

抖音营销策略:新手如何利用抖音提高品牌曝光度

随着短规频平台的兴起&#xff0c;抖音作为其中的校佼者&#xff0c;已经成为了众多用户和企业的营销利器。但是&#xff0c;对于抖音新手而言&#xff0c;如何在这个平台上快速提升影响力呢?下面不若与众就为大家分享几个实用的方法。 一、关注抖音热门话题和潮流 抖音平台上…

力扣题库刷题笔记581-最短无序连续子数组

1、题目如下&#xff1a; 2、题解代码实现&#xff1a; 浅看题解&#xff0c;解题思路和本人接替思路一毛一样&#xff0c;奈何没有想到用双指针&#xff0c;在代码实现上也存在问题。当知道用双指针的时候&#xff0c;本题也变得相对简单。思路如下&#xff1a; a、输入仅存在…

Vue条件渲染v-if和v-show

条件渲染v-if和v-show <div id"root"><!-- <div v-if"true">v-if</div>--> <!-- <div v-show"true">v-show</div>--> n:{{n}}<button click"n">点击n</button><div v…

法规标准-UN R152标准解读

UN R152是做什么的&#xff1f; UN R152 全名为关于M1和N1型机动车高级紧急制动系统&#xff08;AEBS&#xff09;型式认证的统一规定&#xff0c;是联合国对于M1和N1型车辆AEBS系统认证的要求说明&#xff0c;当满足其要求内容时&#xff0c;才可通过联合国的认证&#xff0c…

数字化转型导师坚鹏:面向数字化转型的大数据顶层设计实践

面向数字化转型的大数据顶层设计实践 课程背景&#xff1a; 数字化背景下&#xff0c;很多企业存在以下问题&#xff1a; 不清楚大数据思维如何建立&#xff1f; 不清楚企业大数据分析方法&#xff1f; 不了解大数据应用成功案例&#xff1f; 课程特色&#xff1a; …