Spark RDD分区

news2024/11/23 6:18:57

文章目录

  • 一、RRD分区
    • (一)RDD分区概念
    • (二)RDD分区作用
  • 二、RDD分区数量
    • (一)RDD分区原则
    • (二)影响分区的因素
    • (三)使用parallelize()方法创建RDD时的分区数量
      • 1、指定分区数量
      • 2、默认分区数量
      • 3、分区源码分析
    • (四)使用textFile()方法创建RDD时的分区数量
      • 1、指定最小分区数量
      • 2、默认最小分区数量
      • 3、默认实际分区数量
    • (五)RDD分区方式
  • 三、Spark分区器
    • (一)分区器 - Partitioner抽象类
    • (二)哈希分区器 - HashPartitioner类
  • 四、自定义分区器
    • (一)提出问题
    • (二)解决问题
      • 1、准备数据文件
      • 2、新建科目分区器
      • 3、测试科目分区器
  • 五、练习


一、RRD分区

在这里插入图片描述

(一)RDD分区概念

RDD是一个大的数据集合,该集合被划分成多个子集合分布到了不同的节点上,而每一个子集合就称为分区(Partition)。因此,也可以说,RDD是由若干个分区组成的。

(二)RDD分区作用

在分布式程序中,网络通信的开销是很大的,因此控制数据分布以获得最少的网络传输可以极大的提升程序的整体性能,Spark程序可以通过控制RDD分区方式来减少通信开销。Spark中所有的RDD都可以进行分区,系统会根据一个针对键的函数对元素进行分区。虽然Spark不能控制每个键具体划分到哪个节点上,但是可以确保相同的键出现在同一个分区上。

二、RDD分区数量

(一)RDD分区原则

RDD各个分区中的数据可以并行计算,因此分区的数量决定了并行计算的粒度。Spark会给每一个分区分配一个单独的Task任务对其进行计算,因此并行Task的数量是由分区的数量决定的。RDD分区的一个分区原则是使得分区的数量尽量等于集群中CPU核心数量。

(二)影响分区的因素

RDD的创建有两种方式:一种是使用parallelize()或makeRDD()方法从对象集合创建;另一种是使用textFile()方法从外部存储系统创建。RDD分区的数量与RDD的创建方式以及Spark集群的运行模式有关。

(三)使用parallelize()方法创建RDD时的分区数量

1、指定分区数量

使用parallelize()方法创建RDD时,可以传入第二个参数,指定分区数量。

注意:采用本地模式启动Spark Shell(在master节点上)

在这里插入图片描述
分区的数量应尽量等于集群中所有CPU的核心总数,以便可以最大程度发挥CPU的性能。
在这里插入图片描述利用mapPartitionsWithIndex()函数实现带分区索引的映射
在这里插入图片描述

val rdd = sc.parallelize(1 to 10, 3) // 第二参数指定分区数量
rdd.mapPartitionsWithIndex((index, iter) => {
  iter.toList.map(i => index + " : " + ("*" * i)).iterator}).collect.foreach(println)

在这里插入图片描述

val rdd = sc.parallelize(Array(5,7,3,8,1,4,6,9,2,10), 3)
rdd.mapPartitionsWithIndex((index, iter) => {
  iter.toList.map(i => index + " : " + ("*" * i)).iterator}).collect.foreach(println)

第1个分区完成了3个元素的映射,第2个分区完成了3个元素的映射,第3个分区完成了4个元素的映射

2、默认分区数量

若不指定分区数量,则默认分区数量为Spark配置文件spark-defaults.conf中的参数spark.default.parallelism的值。若没有配置该参数,则Spark会根据集群的运行模式自动确定分区数量。

如果是本地模式,默认分区数量就等于本机CPU核心总数,这样每个CPU核心处理一个分区的计算任务,可以最大程度发挥CPU的性能。

如果是Spark Standalone或Spark On YARN模式,默认分区数量就取集群中所有CPU的核心总数与2中的较大值,即最少分区数为2。

我们采用的是Standalone模式的Spark集群

先用spark-shell本地模式启动
在这里插入图片描述
由此可见,本地机器master的CPU核数为4。
在这里插入图片描述
以集群模式启动Spark Shell
在这里插入图片描述

注意:Spark集群是一个Master(master虚拟机)和两个Worker(slave1和slave2虚拟机)
在这里插入图片描述
默认分区数是8。为什么是8呢?集群两个工作节点(slave1和slave2)的CPU核数总和是4 + 4 = 8
在这里插入图片描述

3、分区源码分析

parallelize()方法是在SparkContext类定义的
在这里插入图片描述numSlices参数为指定的分区数量,该参数有一个默认值defaultParallelism,是一个无参函数
在这里插入图片描述
上述代码中的taskScheduler的类型为特质TaskScheduler,通过调用该特质的defaultParallelism方法取得默认分区数量,而类TaskSchedulerImpl继承了特质TaskScheduler并实现了defaultParallelism方法。
在这里插入图片描述在这里插入图片描述
上述代码中的backend的类型为特质SchedulerBackend,通过调用该特质的defaultParallelism()方法取得默认分区数量,特质SchedulerBackend主要用于申请资源和对Task任务的执行和管理;而类LocalSchedulerBackend和类CoarseGrainedSchedulerBackend则继承了特质SchedulerBackend并分别实现了其中的defaultParallelism()方法。

在这里插入图片描述
类LocalSchedulerBackend用于Spark的本地运行模式(Executor和Master等在同一个JVM中运行),其调用顺序在TaskSchedulerImpl类之后;类CoarseGrainedSchedulerBackend则用于Spark的集群运行模式。

类LocalSchedulerBackend中的defaultParallelism()方法
在这里插入图片描述
上述代码中的字符串spark.default.parallelism为Spark配置文件spark-defaults.conf中的参数spark.default.parallelism;totalCores为本机CPU核心总数。

类CoarseGrainedSchedulerBackend中的defaultParallelism()方法
在这里插入图片描述
上述代码中,math.max(totalCoreCount.get(), 2)表示取集群中所有CPU核心总数与2两者中的较大值。

(四)使用textFile()方法创建RDD时的分区数量

textFile()方法通常用于读取HDFS中的文本文件,使用该方法创建RDD时,Spark会对文件进行分片操作(类似于MapReduce的分片,实际上调用的是MapReduce的分片接口),分片操作完成后,每个分区将存储一个分片的数据,因此分区的数量等于分片的数量。

1、指定最小分区数量

使用textFile()方法创建RDD时可以传入第二个参数指定最小分区数量。最小分区数量只是期望的数量,Spark会根据实际文件大小、文件块(Block)大小等情况确定最终分区数量。

在HDFS中有一个文件/park/test.txt,读取该文件,并指定最小分区数量为5,但是实际分区数量是6。

针对/park/test.txt文件,实际分区数比指定分区数大1,但是换个文件,情况就未必如此。

2、默认最小分区数量

若不指定最小分区数量,则Spark将采用默认规则计算默认最小分区数量。

以集群启动Spark Shell,默认分区数是2\

查看textFile()源码
在这里插入图片描述上述代码中的minPartitions参数为期望的最小分区数量,该参数有一个默认值defaultMinPartitions,这是一个无参函数,我们来查看其源码。
在这里插入图片描述从上述代码中可以看出,默认最小分区数取默认并行度与2中的较小值;而默认并行度则是parallelize()方法的默认分区数。

3、默认实际分区数量

最小分区数量确定后,Spark接下来将计算实际分区数量。查看textFile()方法的源码可知,textFile()方法最后调用了一个hadoopFile()方法,并对该方法的结果执行了map()算子。
在这里插入图片描述查看hadoopFile()方法的源码
在这里插入图片描述

从上述代码可以看出,最终返回一个HadoopRDD对象。

查看HadoopRDD类的部分源码
在这里插入图片描述
HadoopRDD类中的getPartitions()方法的功能是获取实际分区数量。通过调用getInputFormat()方法得到InputFormat的实例,然后调用该实例的getSplits()方法获得输入数据的所有分片,getSplits()方法是决定最终分区数量的关键方法,该方法的第二个参数即为RDD的最小分区数量。

查看InputFormt接口getSplits()抽象方法
在这里插入图片描述
InputFormat有个实现类FileInputFormat,它实现了getSplits()方法
在这里插入图片描述在这里插入图片描述
根据期望分片数量(numSplits,即最小分区数量)计算期望分片大小(goalSize)。计算实际分片大小(splitSize)。splitSize最终决定了分片的数量。

splitSize由3个因素决定:最小分片大小(minSize)、期望分片大小(goalSize)、分块大小(blockSize)。

  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    StopWatch sw = new StopWatch().start();
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits.toArray(new FileSplit[splits.size()]);
  }

结论:在MapReduce中,每个分片对应一个Map任务,多个Map任务以完全并行的方式处理;而在Spark中,每个分片对应一个分区,每个分区对应一个Task任务,多个Task任务以完全并行的方式处理。

(五)RDD分区方式

Spark框架为RDD提供了两种分区方式,分别是哈希分区器(HashPartitioner)和范围分区器(RangePartitioner)。其中,哈希分区是根据哈希值进行分区;范围分区是将一定范围的数据映射到一个分区中。这两种分区方式已经可以满足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。

三、Spark分区器

(一)分区器 - Partitioner抽象类

Spark RDD的Shuffle过程与MapReduce类似,涉及数据重组和重新分区,且要求RDD的元素必须是(key, value)形式的。分区规则是由分区器(Partitioner)控制的,Spark的主要分区器是HashPartitioner和RangePartitioner,都继承了Partitioner抽象类。
在这里插入图片描述
抽象类Partitioner中有两个方法,分别用于指定分区数量和设置分区规则
在这里插入图片描述

(二)哈希分区器 - HashPartitioner类

HashPartitioner是Spark使用的默认分区器,其分区规则为:取(key,value)对中key的hashCode值,然后除以分区数量后取余数。若余数小于0(一般余数都大于等于0),则用余数与分区数量的和作为分区ID,否则将余数作为分区ID。分区ID一致的(key,value)对则会被分配到同一个分区。因此,默认情况下,key值相同的(key,value)对一定属于同一个分区,但是同一个分区中可能有多个key值不同的(key,value)对。该分区器还支持key值为null的情况,当key值等于null时,将直接返回0作为分区ID。

HashPartitioner分区器中,对key取hashCode值实际上调用的是Java类Object中的hashCode()方法。由于Java数组的hashCode值基于的是数组标识,而不是数组内容,因此具有相同内容的数组的hashCode值不同。如果将数组作为RDD的key,就可能导致内容相同的key不能分配到同一个分区中。这个时候可以将数组转为集合,或者使用自定义分区器,根据数组内容进行分区。

在这里插入图片描述

四、自定义分区器

(一)提出问题

在有些情况下,使用Spark自带的分区器满足不了特定的需求。

例如,某学生有以下3科三个月的月考成绩数据。

科目成绩
chinese98
math88
english96
chinese89
math96
english67
chinese88
math78
english89

现需要将每一科成绩单独分配到一个分区中,然后将3科成绩输出到HDFS的指定目录(每个分区对应一个结果文件),此时就需要对数据进行自定义分区。

(二)解决问题

1、准备数据文件

在master虚拟机的/home目录里创建marks.txt
在这里插入图片描述

将数据文件上传到HDFS指定目录
在这里插入图片描述

2、新建科目分区器

创建net.army.rdd.day04包,在包里创建SubjectPartitioner类
在这里插入图片描述

package net.army.rdd.day04

import org.apache.spark.Partitioner

/**
 * 作者:梁辰兴
 * 日期:2023/6/6
 * 功能:科目分区器
 */
class SubjectPartitioner(partitions: Int) extends Partitioner {
  /**
   * @return 分区数量
   */
  override def numPartitions: Int = partitions

  /**
   * @param key(科目)
   * @return 分区索引
   */
  override def getPartition(key: Any): Int = {
    val partitionIndex = key.toString match {
      case "chinese" => 0
      case "math" => 1
      case "english" => 2
    }
    partitionIndex
  }
}

3、测试科目分区器

调用RDD的partitionBy()方法传入科目分区器类SubjectPartitioner的实例,可以对RDD按照自定义规则进行重新分区。

在net.army.rdd.day04包里创建TestSubjectPartitioner单例对象
在这里插入图片描述
运行程序,查看结果
在这里插入图片描述

查看HDFS的结果文件
在这里插入图片描述
如果传入的分区数不是3,会出现什么状况?
删除输出目录
在这里插入图片描述
运行程序,查看控制台输出结果
在这里插入图片描述在这里插入图片描述
查看HDFS上的结果文件
在这里插入图片描述
删除输出目录,修改分区数为2,再运行程序,查看控制台结果
在这里插入图片描述在这里插入图片描述在这里插入图片描述

五、练习

对于成绩表marks.txt,按科目分区输出

/partition/output/part-00001

(chinese,275,91.67)

/partition/output/part-00002

(math,262,87.33)

/partition/output/part-00002

(english,252,84)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/614361.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

软件测试之环境搭建—苏汽web系统测试环境搭建

一、搭建环境的准备工作 1、安装好RedHat&#xff0c;输入用户名&#xff1a;root&#xff0c;密码&#xff1a;123456&#xff0c;右键点击桌面&#xff0c;打开终端输入“ifconfig”查询IP地址 2.打开xshell&#xff0c;点击文件&#xff0c;选择新建连接&#xff0c;在输入…

【数据分析之道-Matplotlib(七)】Matplotlib直方图

文章目录 专栏导读1、hist()基本语法2、使用 hist() 函数绘制多个数据组的直方图3、修改直方图的颜色及边框颜色4、六一儿童节为主题&#xff0c;使用直方图进行可视化 专栏导读 ✍ 作者简介&#xff1a;i阿极&#xff0c;CSDN Python领域新星创作者&#xff0c;专注于分享pyth…

POI报表的高级应用

POI报表的高级应用 掌握基于模板打印的POI报表导出理解自定义工具类的执行流程 熟练使用SXSSFWorkbook完成百万数据报表打印理解基于事件驱动的POI报表导入 模板打印 概述 自定义生成Excel报表文件还是有很多不尽如意的地方&#xff0c;特别是针对复杂报表头&#xff0c;单元格…

我们世界中的10个算法

下面的图表展示了我们日常生活中最常用的算法。它们被应用在互联网搜索引擎、社交网络、WiFi、手机甚至卫星等各个领域。 1.排序算法 排序算法用于将一组数据按照特定的顺序进行排列。它们被广泛应用于各种场景&#xff0c;如搜索引擎中的搜索结果排序、数据分析中的数据整理和…

转转前端周刊第六十八期

转转前端周刊 本刊意在将整理业界精华文章给大家&#xff0c;期望大家一起打开视野 如果你有发现一些精华文章想和更多人分享&#xff0c;可以点击我们的公众号名称&#xff0c;将文章链接和你的解读文案发给我们&#xff01;我们会对内容进行筛选和审核&#xff0c;保留你的推…

基于TensorFlow Object Detection API实现RetinaNet目标检测网络(附源码)

文章目录 一、RetinaNet简介1. Backbone网络2. FPN网络 二、RetinaNet实现1. tf.train.CheckPoint简介2. RetinaNet的TensorFlow源码 一、RetinaNet简介 RetinaNet是作者Tsung-Yi Lin和Kaiming He于2018年发表的论文Focal Loss for Dense Object Detection中提出的网络。Retina…

运维小白必学篇之基础篇第十六集:DNS架构FTP实验

DNS架构FTP实验 目录 DNS架构FTP实验 服务端 客户端 服务端 在ftp架构了安装dns域名服务 yum -y install bind 配置主配置文件 vim /etc/named.conf listen-on port 53 { 192.168.50.1; }; allow-query { 192.168.50.0/24; }; 配置区域文件 vim /etc/named.rfc1912.zones…

深聊丨“紫东太初”大模型背后有哪些值得细读的论文(一)

原创&#xff1a;谭婧 没有人想等待&#xff0c;没有人想落伍。 新鲜论文时兴火热&#xff0c;成为大模型发展迅猛的标志之一&#xff0c;人们用“刷论文”这个游荡意味的动词替代另一个颇为严肃的动作&#xff0c;“读论文”。 论文被当作“教材”和“新知识”&#xff0c;在a…

矢量网络分析仪RS罗德与施瓦ZNB8 9KHZ至8.5GHZ德国二手

Rohde & Schwarz ZNB8网络分析仪&#xff0c;8.5 GHz&#xff0c;2 或 4 端口 ​罗德与施瓦茨 ZNB8 Rohde & Schwarz ZNB8 矢量网络分析仪具有高达 140 dB&#xff08;10 Hz IF 带宽&#xff09;的宽动态范围、低于 0.004 dB RMS&#xff08;10 kHz IF 带宽&#xff…

React--Component组件浅析

目录 一 前言二 什么是React组件&#xff1f;三 二种不同 React 组件1 class类组件2 函数组件 四 组件通信方式五 组件的强化方式六 总结 一 前言 在 React 世界里&#xff0c;一切皆组件&#xff0c;我们写的 React 项目全部起源于组件。组件可以分为两类&#xff0c;一类是类…

驱动开发:内核扫描SSDT挂钩状态

在笔者上一篇文章《驱动开发&#xff1a;内核实现SSDT挂钩与摘钩》中介绍了如何对SSDT函数进行Hook挂钩与摘钩的&#xff0c;本章将继续实现一个新功能&#xff0c;如何检测SSDT函数是否挂钩&#xff0c;要实现检测挂钩状态有两种方式&#xff0c;第一种方式则是类似于《驱动开…

【Unity-UGUI控件全面解析】| Layout自动布局组件详解

🎬【Unity-UGUI控件全面解析】| Layout自动布局组件详解一、组件介绍二、组件属性面板2.1 布局元素 (Layout Element)2.2 水平布局组 (Horizontal Layout Group)2.3 垂直布局组 (Vertical Layout Group)2.4 网格布局组 (Grid Layout Group)三、代码操作组件四、组件常用方法示…

把 AI 装进即时通讯,会发生什么?

今年以来&#xff0c;AIGC 技术以“天”为单位快速进化&#xff0c;刷足了存在感。科技公司迅速将 AI 嵌入自家的产品中&#xff0c;追逐 AI 带来的生产力变革&#xff0c;解决日益复杂的需求。从文学到音乐&#xff0c;从绘画到编程&#xff0c;无一领域不受其影响。 在这些领…

网络编程(1)

获取本网络信息相关接口 接口说明&#xff1a; QHostInfo类为主机信息&#xff0c;为主机名查找提供静态函数 QHostAddress类为主机地址类&#xff0c;管理IPV4或IPV6地址信息。 QNetworkInterface类为网络接口类&#xff0c;提供主机IP地址和网络接口的列表。 QNetworkAd…

关于人生,爱情和事业,谈谈我的人类史观(视频在最后)

前几天在知乎上回答了一个问题&#xff0c;没想到一下子好像火了&#xff0c;评论超过五百&#xff0c;也有各种质疑&#xff0c;其中有一个人的问题我觉得值得探讨&#xff0c;因为在回答中&#xff0c;我写下了一段也许值得留存的文字。 视频里面有更多的内容的扩展&#xff…

2023智源大会议程公开丨具身智能与强化学习论坛

6月9日&#xff0c;2023北京智源大会&#xff0c;将邀请这一领域的探索者、实践者、以及关心智能科学的每个人&#xff0c;共同拉开未来舞台的帷幕&#xff0c;你准备好了吗&#xff1f;与会知名嘉宾包括&#xff0c;图灵奖得主Yann LeCun、图灵奖得主Geoffrey Hinton、OpenAI创…

chatgpt赋能python:Python:填写网页内容的SEO最佳实践

Python&#xff1a;填写网页内容的SEO最佳实践 在今天的数字领域中&#xff0c;SEO&#xff08;搜索引擎优化&#xff09;已经成为成功在线业务的必要元素。其中&#xff0c;内容是SEO的核心部分。网页内容不仅仅是用户体验的关键&#xff0c;还是吸引搜索引擎注意的因素之一。…

Java 进阶 -- 集合(一)

本节描述Java集合框架。在这里&#xff0c;您将了解什么是集合&#xff0c;以及它们如何使您的工作更轻松&#xff0c;程序更好。您将了解组成Java Collections Framework的核心元素——接口、实现、聚合操作和算法。 介绍告诉您集合是什么&#xff0c;以及它们如何使您的工作…

【Python】Python系列教程-- Python3 函数(二十一)

文章目录 前言定义一个函数语法实例函数调用参数传递可更改(mutable)与不可更改(immutable)对象python 传不可变对象实例传可变对象实例参数必需参数关键字参数默认参数不定长参数匿名函数return 语句强制位置参数 前言 往期回顾&#xff1a; Python系列教程–Python3介绍&am…

第3章“程序的机器级表示”:数组分配与访问

文章目录 概述3.8.1 基本原则3.8.2 指针运算3.8.3 数组与循环3.8.4 嵌套数组3.8.4 固定大小的数组3.8.5 动态分配的数组 概述 C 中数组是一种将标量型数据聚集成更大数据类型的方式。C用来实现数组的方式非常简单&#xff0c;因此很容易翻译成机器代码。C的一个不同寻常的特点…