从源码的角度告诉你 spark是怎样完成对文件切片

news2025/1/11 9:05:32

目录

1.说明

2.怎样设置默认切片数

2.1 RDD默认切片设置

2.2 SparkSQL默认切片设置

3. makeRDD 切片原理

4. textFile 切片原理

4.1 切片规则

4.2 怎样设置切片大小

4.3 测试代码

 5.hadoopFile 切片原理

5.1 说明

5.2 切片规则

5.3 怎样设置切片大小

5.4 代码测试

5.5 minPartitions 在 CombineTextInputFormat 中的作用?

5.6 重点关注


1.说明

在spark中为我们提供了用来读取数据的方法
    比如 makeRDD、parallelize、textFile、hadoopFile等方法
    
这些方法按照数据源可以分为两类 文件系统、Driver内存中的集合数据
当我们使用指定的方法读取数据后,会按照指定的切片个数对文件进行切片


2.怎样设置默认切片数

在我们在使用RDD的算子时,经常会遇到可以显式的指定切片个数,或者隐式的使用默认切片个数,下面会告诉我们,怎样设置默认切片个数

2.1 RDD默认切片设置

1.驱动程序中设置
val sparkconf: SparkConf = new SparkConf().setAppName("测试默认切片数")
   .set("spark.default.parallelism","1000")
   .setMaster("local[100]")

2.spark-shell或spark-submit 设置
spark-shell \
--master yarn \
--name "spark-shell-tmp" \
--conf spark.default.parallelism=1000 \
--driver-memory 40G \
--executor-memory 40G \
--num-executors 40 \
--executor-cores 6 \

3.不指定 spark.default.parallelism 参数时,将使用默认值
    local模式:
        local[100] :  100
        local      :  客户端机器核数
    集群模式(yarn):
        2 或者 核数总和

源码:

查看默认切片数: 

// 获取默认切片数
val parallelism = sc.defaultParallelism

2.2 SparkSQL默认切片设置

-- 设置默认切片数
set spark.sql.shuffle.partitions=1000;

默认值:
  当不设置时,默认为200

注意:
  spark.default.parallelism 只有在处理RDD时才会起作用,对SparkSQL的无效
  spark.sql.shuffle.partitions 则是对sparks SQL专用的设置

3. makeRDD 切片原理

可用通过 makeRDD算子 将Driver中序列集合中数据转换成RDD,在转换的过程中,会根据指定的切片个数集合索引对集合切片

切片规则:

        根据集合长度切片数将集合切分成若干子集合(和集合元素内容无关)

示例代码:

  test("makeRDD - 切片逻辑") {
    // 初始化 spark配置实例
    val sparkconf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")
    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sparkconf)

    val rdd: RDD[(String, String)] = sc.makeRDD(List(
      ("张飞1", "张飞java scala spark")
      , ("张飞2", "张飞java scala spark")
      , ("刘备3", "刘备java spark")
      , ("刘备4", "刘备java scala spark")
      , ("刘备5", "刘备scala spark")
      , ("关羽6", "关羽java scala spark")
      , ("关羽7", "关羽java scala")
      , ("关羽8", "关羽java scala spark")
      , ("关羽9", "关羽java spark")))

    // 查看每个分区的内容
    rdd.mapPartitionsWithIndex(
      (i, iter) => {
        println(s"分区编号$i :${iter.mkString(" ")}");
        iter
      }
    ).collect()

    rdd.getNumPartitions

    sc.stop()
  }

结果:

源码阅读:

1. 通过SparkContext创建rdd
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}


2. ParallelCollectionRDD类中的 getPartitions方法
override def getPartitions: Array[Partition] = {
  val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
  slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}

3. ParallelCollectionRDD对象的slice方法(核心切片逻辑)

def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
  // 对切片数做合法性校验
  if (numSlices < 1) {
    throw new IllegalArgumentException("Positive number of partitions required")
  }
  // TODO 通过 集合长度和切片数 获取每个切片的位置信息
  // 从这可以得出 对集合的切片只和 集合索引和切片数相关,和集合内容无关
  // 将 集合索引按照切片数 切分成若干元素
  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
    (0 until numSlices).iterator.map { i =>
      val start = ((i * length) / numSlices).toInt
      val end = (((i + 1) * length) / numSlices).toInt
      (start, end)
    }
  }
  // 对集合类型做判断
  seq match {
    case r: Range =>
      positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
        // If the range is inclusive, use inclusive range for the last slice
        if (r.isInclusive && index == numSlices - 1) {
          new Range.Inclusive(r.start + start * r.step, r.end, r.step)
        } else {
          new Range.Inclusive(r.start + start * r.step, r.start + (end - 1) * r.step, r.step)
        }
      }.toSeq.asInstanceOf[Seq[Seq[T]]]
    case nr: NumericRange[T] =>
      // For ranges of Long, Double, BigInteger, etc
      val slices = new ArrayBuffer[Seq[T]](numSlices)
      var r = nr
      for ((start, end) <- positions(nr.length, numSlices)) {
        val sliceSize = end - start
        slices += r.take(sliceSize).asInstanceOf[Seq[T]]
        r = r.drop(sliceSize)
      }
      slices.toSeq
    case _ =>
      val array = seq.toArray // To prevent O(n^2) operations for List etc
      positions(array.length, numSlices).map { case (start, end) =>
          array.slice(start, end).toSeq
      }.toSeq
  }
}

4. textFile 切片原理

textFile使用的MapReduce框架中TextInputFormat类完成对文件切片和读取切片中数据

4.1 切片规则

1.对job输入路径中的每个文件单独切片
2.判断每个文件是否支持切片
         true : 按照指定切片大小对文件切片
         false: 文件整体作为一个切片 

4.2 怎样设置切片大小

// 切片大小计算规则
    splitSize = Math.max(minSize, Math.min(goalSize, blockSize))

// 参数说明
    1.minSize
    set mapreduce.input.fileinputformat.split.minsize=256000000 或 
    set mapred.min.split.size=256000000
    默认值 minSize=1L

    2.goalSize
    goalSize=所有文件大小总和/指定的切片个数

    3.blockSize
    本地目录32M|HDFS目录128M或256M(看hdfs文件块具体配置)

// 需求 
    1.真实切片大小 < blockSize
    goalSize=所有文件大小总和/指定的切片个数 < blockSize 即(创建rdd时调大切片个数)

    2.真实切片大小 > blockSize
    set mapreduce.input.fileinputformat.split.minSize=大于blockSize值

4.3 测试代码

  test("textFile - 切片逻辑") {
    // 初始化 spark配置实例
    val sf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("Test textFile")

    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sf)
    sc.hadoopConfiguration.setInt("mapred.min.split.size", 469000000)
    // sc.hadoopConfiguration.setInt("mapreduce.input.fileinputformat.split.minsize", 256000000)

    // 读取目录下的所有文件
    val rdd: RDD[String] = sc.textFile("src/main/resources/data/dir/dir3/LOL.map", 1000)

    // 打印分区个数
    println("切片个数:"+rdd.getNumPartitions)

    sc.stop()
  }

执行结果:


 5.hadoopFile 切片原理

5.1 说明

def hadoopFile[K, V](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
  assertNotStopped()

功能:
  读取HDFS文件或本地文件来创建RDD(使用MapReduce框架中InputFormat类)

参数:
  path: 指定job的输入路径
  inputFormatClass: 对输入文件切片和读取的实现类
  keyClass: key的数据类型
  valueClass: value的数据类型
  minPartitions: 最小切片数

5.2 切片规则

根据指定的切片大小进行切片,允许将多个文件合并成换一个切片对象

5.3 怎样设置切片大小

指定切片大小(默认值Long.MaxValue)
set mapred.max.split.size=切片大小 或
set mapreduce.input.fileinputformat.split.maxsize=切片大小

5.4 代码测试

  test("spark中使用 CombineTextInputFormat") {
    // 初始化 spark配置实例
    val sf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("")

    // 初始化 spark环境对象
    val sc: SparkContext = new SparkContext(sf)

    // 读取目录下的所有文件
    val input = "src/main/resources/data/dir/dir3"
        val combineRDD: RDD[(LongWritable, Text)] = sc.hadoopFile[LongWritable, Text
         , org.apache.hadoop.mapred.lib.CombineTextInputFormat](input, 10000)
    //    val combineRDD: RDD[(LongWritable, Text)] = sc.hadoopFile[LongWritable, Text
    //      , org.apache.hadoop.mapred.TextInputFormat](input, 10000)

    sc.hadoopConfiguration.setInt("mapred.max.split.size", 128000000)
    //sc.hadoopConfiguration.setInt("mapreduce.input.fileinputformat.split.maxsize", 128000000)

    println("切片个数:" + combineRDD.getNumPartitions)
    //combineRDD.map(_._2.toString).foreach(println(_))
    //combineRDD.collect()
    //combineRDD.had

    sc.stop()
  }

执行结果:

5.5 minPartitions 在 CombineTextInputFormat 中的作用?

CombineTextInputFormat切片逻辑和 最小切片数(minPartitions)  无关

查看 org.apache.hadoop.mapred.lib.CombineTextInputFormat类 getSplits方法
TODO: numSplits指定的切片个数,并没有使用

  public InputSplit[] getSplits(JobConf job, int numSplits) 
    throws IOException {
    List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =
      super.getSplits(Job.getInstance(job));

    InputSplit[] ret = new InputSplit[newStyleSplits.size()];

    for(int pos = 0; pos < newStyleSplits.size(); ++pos) {
      org.apache.hadoop.mapreduce.lib.input.CombineFileSplit newStyleSplit = 
        (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) newStyleSplits.get(pos);
      ret[pos] = new CombineFileSplit(job, newStyleSplit.getPaths(),
        newStyleSplit.getStartOffsets(), newStyleSplit.getLengths(),
        newStyleSplit.getLocations());
    }
    return ret;
  }

5.6 重点关注

对计算任务而言,合并小文件是一把双刃剑,合并小文件后 就舍弃了数据本地化,则加了网络IO的开销,需要根据实际情况合理的选择 切片策略

CombineTextInputFormat源码参考:​​​​​​​https://blog.csdn.net/wawmg/article/details/17095125

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/392157.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【算法经典题集】前缀和与数学(持续更新~~~)

&#x1f63d;PREFACE&#x1f381;欢迎各位→点赞&#x1f44d; 收藏⭐ 评论&#x1f4dd;&#x1f4e2;系列专栏&#xff1a;算法经典题集&#x1f50a;本专栏涉及到的知识点或者题目是算法专栏的补充与应用&#x1f4aa;种一棵树最好是十年前其次是现在前缀和一维前缀和k倍…

【我的Android开发】AMS中Activity栈管理

概述 Activity栈管理是AMS的另一个重要功能&#xff0c;栈管理又和Activity的启动模式和startActivity时所设置的Flag息息相关&#xff0c;Activity栈管理的主要处理逻辑是在ActivityStarter#startActivityUnchecked方法中&#xff0c;本文也会围绕着这个方法进进出出&#xf…

Gopro卡无法打开视频恢复方法

下边来看一个文件系统严重受损的Gopro恢复案例故障存储: 120G SD卡故障现象:客户正常使用&#xff0c;备份数据时发现卡无法打开&#xff0c;多次插拔后故障依旧。故障分析:Winhex查看发现0号分区表扇区正常&#xff0c;这应该是一个exfat格式的文件系统&#xff0c;但是逻辑盘…

【单目3D目标检测】MonoDDE论文精读与代码解析

文章目录PrefacePros and ConsAbstractContributionsPreliminaryDirect depth estimationDepth from heightPespective-n-point&#xff08;PnP&#xff09;PipelineDiverse Depth EstimationsRobust Depth CombinationOutput distributionSelecting and combining reliable de…

JVM-从熟悉到精通

JVM 机器语言 一个指令由操作码和操作数组成 方法调用等于一个压栈的过程 栈有 BP寄存器 和 SP寄存器来占用空间 BP -> Base Point 栈基址&#xff08;栈底&#xff09;SP -> Stack Point 栈顶 字节序用于规定数据在内存单元如何存放&#xff0c;二进制位的高位和低…

计算机组成原理|第二章(笔记)

目录第二章 计算机的发展及应用2.1 计算机的发展史2.1.1 计算机的生产和发展2.1.2 微型计算机的出现和发展2.1.3 软件技术的兴起与发展2.2 计算机的应用2.3 计算机的展望上篇&#xff1a;第一章&#xff1a;计算机系统概论 第二章 计算机的发展及应用 2.1 计算机的发展史 2.1.…

基于半车悬架的轴距预瞄与轴间预瞄仿真对比

目录 前言 1. 半车悬架模型 2.轴距预瞄(单点预瞄)和轴间预瞄(两点预瞄)原理与仿真分析 2.1轴距预瞄(单点预瞄) 2.1.1预瞄原理 2.2.轴间预瞄(两点预瞄) 2.2.1预瞄原理 2.3仿真分析 3.总结 前言 对于悬架而言&#xff0c;四个车轮实际的输入信息是受到前后延时以及左右相…

SpringCloud:Feign的使用及配置

目录 Feign的使用及配置 1、Feign替代RestTemplate 2、使用Fegin步骤 3、自定义配置 4、Feign使用优化 5、Feign的最佳实践方式 Feign的使用及配置 1、Feign替代RestTemplate RestTemplate方式远程调用的问题 问题&#xff1a; 1、代码可读性差&#xff0c;编程体验不同…

HTML基本概述

文章目录网站和网页浏览器的作用HTML标签元素注释乱码问题web系统是以网站形式呈现的&#xff0c;而前端是以网页形式呈现的。 网站和网页 网站&#xff08;web site&#xff09;&#xff1a;互联网上用于展示特定内容的相关网页的集合。也就是说&#xff0c;一个网站包含多个…

【预告】ORACLE Primavera P6 v22.12 虚拟机发布

引言 离ORACLE Primavera P6 EPPM最新系统 v22.12已过去了3个多月&#xff0c;应盆友需要&#xff0c;也为方便大家体验&#xff0c;我近日将构建最新的P6的虚拟环境&#xff0c;届时将分享给大家&#xff0c;最终可通过VMWare vsphere (esxi) / workstation 或Oracle virtua…

SQL 窗口函数详解

SQL窗口函数详解 窗口函数的主要作用是对数据进行分组排序、求和、求平均值、计数等。 一、窗口函数的基本语法 <分析函数> OVER ([PARTITION BY <列清单>] ORDER BY <排序用列清单> [ROWS BETWEEN 开始位置 AND 结束位置])理解窗口函数的基本语法&#xff…

opencv校正图像

目录1、前言2、例程2.1、代码2.2、效果口罩说明书网页3、按步骤分析转灰度图降噪 Canny边缘检测膨胀&#xff08;可视具体情况省略&#xff09;轮廓检索选取角度1、前言 我们用相机拍照时&#xff0c;会因为角度问题造成拍歪&#xff0c;会影响图像的识别&#xff0c;这时就需…

【PyTorch】教程:torch.nn.Hardtanh

torch.nn.Hardtanh 原型 CLASS torch.nn.Hardtanh(min_val- 1.0, max_val1.0, inplaceFalse, min_valueNone, max_valueNone) 参数 min_val ([float]) – 线性区域的最小值&#xff0c;默认为 -1max_val ([float]) – 线性区域的最大值&#xff0c;默认为 1inplace ([bool]) …

ABP(ASP.NET Boilerplate)配置整合使用Mysql数据库

ABP默认是支持sqlserver数据库的&#xff0c;但是这并不影响使用其他数据库&#xff0c;稍微配置一下就行了&#xff01;很简单——————————— 一、 卸载原来存在Sql Server的依赖包 在程序包管理控制台输入&#xff0c;选择EntityFrameworkCore 然后执行删除包的命令…

基于intel x86+fpga智能驾驶舱和高级驾驶辅助系统硬件设计(二)

系统功能架构及各模块功能介绍 智能驾驶舱和高级驾驶辅助系统是一个车载智能终端嵌入式平台&#xff0c;系统是一个能够运行 虚拟化操作系统的软件和硬件的综合体。本文的车载主机包括硬件主控处理器、电源管理芯 片、存储设备、输入输出控制器、数字仪表系统系统、后座娱乐系统…

抖音怎么合理安排直播内容|辽宁千圣文化

抖音主播们可以利用直播的方式达到带货的底模&#xff0c;那么做主播的话&#xff0c;就要利用好抖音主播中心&#xff0c;很多抖音用户却表示找不到抖音主播中心&#xff0c;那么怎么去看呢&#xff1f;跟着辽宁千圣文化小编来一起看看吧&#xff01;如何成为一名合格的主播&a…

【操作系统原理实验】调度算法模拟实现

选择一种高级语言如C/C等&#xff0c;模拟实现调度算法完成资源分配与回收的过程。2) 自定义PCB等核心数据结构&#xff1b;3) 利用列表、队列等容器类或者其他数据结构管理PCB,完成相应调度算法的模拟&#xff1b;4) 实现外围一些命令如创建进程、查看进程、关闭进程等&#x…

Spacedesk软件推荐,让你的平板也变成电脑的副屏

我的设备&#xff1a; 电脑:戴尔G15 5511、i7-11800H、Windows 11、RTX3060 平板&#xff1a;荣耀V6、麒麟985、安卓10、分辨率2000*1200&#xff08;手机也行&#xff0c;我用的平板&#xff09; 实际使用&#xff1a; 先给放一张实际使用的照片 可以让平板变成电脑的副屏…

28 位委员出席,龙蜥社区第 15 次运营委员会会议顺利召开

2 月 24 日&#xff0c;龙蜥社区在海光召开了第 15 次运营委员会会议&#xff0c;本次会议由统信软件运营委员会委员崔开主持。来自 Arm、阿里云、飞腾、红旗软件、海光、Intel、龙芯、联通软研院、浪潮信息、普华基础软件、统信软件、万里红、移动、中科方德等理事单位的 28 位…

echarts--提示框显示不全问题记录

最近接手一个同事之前做的网页&#xff0c;发现里面使用echarts来绘制各类图表&#xff1b;有2个问题一个是提示框显示不全&#xff0c;另一个就是绘制总是有部分数据显示不全。后者就是div宽度问题。。。无语&#xff0c;说下前面一个问题吧&#xff0c;记录一下。 tooltip组…