Spark 【分区与并行度】

news2025/1/11 23:44:37

RDD 并行度和分区

SparkConf

setMaster("local[*]")

我们在创建 SparkContext 对象时通常会指定 SparkConf 参数,它包含了我们运行时的配置信息。如果我们的 setMaster 中的参数是 "local[*]" 时,通常代表使用的CPU核数为当前环境的最大值。

val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test partition")
    val sc = new SparkContext(conf)

    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    rdd.saveAsTextFile("test_par_out")

    sc.stop()

运行结果:

在设备管理器中查看CPU核数:

 setMaster("local")

这时的使用 CPU 核数的默认值为 1 。

val conf = new SparkConf()
      .setMaster("local")
      .setAppName("test partition")

 

setMaster["local[2]"]

设置使用的 CPU 核数为 2

val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("test partition")

创建RDD时指定分区数

我们也可以在创建 RDD 对象时指定切片数 numSlices(切片数就是分区的数量(通常一个分区对应一个Task,一个Task对应一个Excutor(一个CPU核心)))。

   val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test partition")
    val sc = new SparkContext(conf)

//第二个参数用来指定并行度(分区数)
    val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),1)
    rdd.saveAsTextFile("test_par_out")

    sc.stop()

 

conf.set 指定并行度

val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test partition")
    conf.set("spark.default.parallelism","5")

读取内存数据(集合)的分区规则

核心源码:

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      }
    }

比如我们读取集合数组 List(1,2,3,4,5),我们在创建RDD对象时设置分区数为 3 。

val rdd: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5),3)

 当我们保存时,会输出三个文件,文件内容分别是:

  • part-00000:1
  • part-00001:2,3
  • part-000002:4,5

因为此时我们源码的 positions 的参数是 (length:5,numSilces:3),它会返回三个元组(start,end),对应我们数组的下标,并且左闭右开。

  • part-00000:(0,1)
  • part-00001:(1,3)
  • part-000002:(3,5)

读取文件数据的分区规则

我们在通过读取本地文件系统的文件来创建 RDD 时:

val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("test partition")
    conf.set("spark.default.parallelism","5")

    val sc = new SparkContext(conf)

   val rdd = sc.textFile("data/1.txt")

    sc.stop()

默认的分区数量是最小分区数量(2):

//defaultParallelism取决于 setMaster("local[*]") ,如果是 local[*] 代表分区数=CPU核数 但是min方法返回最小值,最小值=2
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

Spark 分区规则和 Hadoop 是一样的,只是切片规则和数据读取规则有差异。

案例-文件a.txt:

1
2
3

Spark 分区数量的计算方式:

源码:

long totalSize = 0
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

 对于我们上面的文件 a.txt:

// totalSize 是文件的总字节数,一个回车占两个字节
totalSize = 7
goalSize = 7 / (2 == 0 ? 1 : 2) = 3 (单位:byte)    //也就是每个分区占用3个字节

//分区数= totalSize/gogalSize=2...1 (余数1byte,根据hadoop的规则,如果余数>每个分区的字节数的1.1倍,就要产生新的分区,否则就不会产生新的分区)
//这里余数是 1 , 1/3 = 33.3% > 0.1 所以会产生一个新的分区
//所以分区数 = 3

数据分区的分配

1. Spark 数据分区以行为单位进行读取

2. 数据读取时,以偏移量为单位

以上面的 a.txt 为例(@@代表一个回车)

1@@    => 012            
2@@    => 345
3      => 6

 

3. 数据分区的偏移量范围的计算

//注意: 左右都是闭区间,
//偏移量不会被重复读取   
part-00000    => [0,3]    => 1@@,2@@    //读到3的时候已经到了第二行,要读就读一整行,所以2@@都会被读取
part-00001    => [3,6]    => 3 [3,6]对应的第二行的第1个字节(2)~第3行第1个字节(3),而2已经被读过了,所以只剩3
part-00002    => [6,7]    => 

coalesce 和 repartition

coalesce 和 repartition 分别用于缩减分区节省资源和扩大分区提高并行度。

coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。

当 Spark 程序中,存在过多的小任务的时候,可以使用 coalesce 方法,收缩合并分区,减少分区的个数,减少任务调度成本。

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

    val newRDD1 = rdd.coalesce(2)
    /*
      coalesce 默认情况下不会将分区内的数据打乱重新组合,这里是直接将三个分区中两个分区合并为一个分区,另外一个仍然是一个分区
      这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
      如果想要数据均衡,可以进行shuffle处理
      分区结果:
               part-00000: 1 2
               part-00001: 3 4 5 6
     */
    val newRDD2 = rdd.coalesce(2,true)
    /*
      分区结果:
        part-0000: 1 4 5
        part-0001: 2 3 6
     */

repartition

repartition 的底层其实就是 coalesce ,为了区分缩减和扩大分区(都可以由coalesce实现),所以分成了两个方法。

val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    /*
      length=6,numSlices=2
      part-00000:  1 2 3
      part-00001:  4 5 6
     */
    // 想要扩大分区数量 提高并行度 shuffle 必须为true 因为我们要把2个分区的数据分为3个分区 就必须打乱分区内的数据重新排
    // 如果不设置shuffle为true是没有意义的 结果还是2个分区
    val newRdd1 = rdd.coalesce(3,true)
    /*
       分区结果:
          part-00000: 3 5
          part-00001: 2 4
          part-00002: 1 6
     */

    // 缩减分区用 coalesce,如果要数据均衡可以采用 shuffle
    // 扩大分区用 repartition , repartition底层就是 coalesce(numSlices,true)
    rdd.repartition(2)

repartition 底层代码

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1030414.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DevOps与CI/CD常见面试问题汇总

01 您能告诉我们DevOps和Agile(敏捷)之间的根本区别吗? 答:尽管DevOps与敏捷方法(这是最流行的SDLC[Software Development Life Cycle]方法之一)有一些相似之处,但两者在软件开发方面都是根本不同的方法。以下是两者之…

mysql 在eclipse在配置

一、在Windows(窗口)/Preferences(首选项)/Java/Build path(构建路径)/User Library(用户库)里面直接把建一个Mysql, 二、Add External JARs… 添加mysql-connector-java…

python学习之【with语句】

前言 上一篇文章 ​ ​python学习之【文件读写】​​​ 中我们学习了python当中的文件读写,这篇文章接着学习python中文件读写的with语句。 了解with语句 在很多场景中,通过使用with语句可以让我们可以更好地来管理资源和简化代码,它可以看…

洛谷刷题入门篇:顺序结构

链接如下:https://www.luogu.com.cn/training/100#problems 一、Hello,World! 题目链接:https://www.luogu.com.cn/problem/B2002 题目描述 编写一个能够输出 Hello,World! 的程序。 提示: 使用英文标点符号;Hello,World! 逗…

Windows下,快速部署开发环境,第三方库管理,以及项目迁移工具介绍

对于在windows下做c开发的同学,你是否有以下痛点?: 1.每次构建c项目,搭配第三方库环境,都要不停的include,lib,dll等配置,如果4-5个还好,要是10几个...人都麻了... 2.一个环境也无所谓,问题x64/32位系统,Debug,Release都要配置一遍..每次配置…

【C# Programming】值类型、良构类型

值类型 1、值类型 值类型的变量直接包含值。换言之, 变量引用的位置就是值内存中实际存储的位置。 2、引用类型 引用类型的变量存储的是对一个对象实例的引用(通常为内存地址)。 复制引用类型的值时,复制的只是引用。这个引用非常小&#xf…

CentOS安装openjdk和elasticsearch

CentOS安装openjdk 文章目录 CentOS安装openjdk一、yum1.1search1.2安装openjdk 二、elasticsearch的启动和关闭2.1启动2.2关闭2.3添加服务 一、yum 1.1search yum search java | grep jdk1.2安装openjdk [roottest ~]# yum install java-1.8.0-openjdk -y 查看openjdk版本 …

校园学习《乡村振兴战略下传统村落文化旅游设计》 许少辉瑞博士生辉少许

校园学习《乡村振兴战略下传统村落文化旅游设计》 许少辉瑞博士生辉少许

无线定位中TDOA时延估计算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 ...................................................................figure; plot(P1x,P1y…

JeecgBoot v3.5.5 版本发布,性能大升级版本—开源免费的低代码开发平台

项目介绍 JeecgBoot是一款企业级的低代码平台!前后端分离架构 SpringBoot2.x,SpringCloud,Ant Design&Vue3,Mybatis-plus,Shiro,JWT 支持微服务。强大的代码生成器让前后端代码一键生成! JeecgBoot引领…

【Java毕设项目】基于SpringBoot+Vue科研管理系统的设计与实现

博主主页:一季春秋博主简介:专注Java技术领域和毕业设计项目实战、Java、微信小程序、安卓等技术开发,远程调试部署、代码讲解、文档指导、ppt制作等技术指导。主要内容:毕业设计(Java项目、小程序等)、简历模板、学习资料、面试题…

JAVAEE初阶相关内容第十二弹--多线程(进阶)

目录 一、JUC的常见类 1、Callable接口 1.1callable与runnable 1.2代码实例 (1)不使用Callable实现 (2)使用Callable实现 1.3理解Callable 1.4理解FutureTask 2、ReentrantLock 2.1ReentrantLock的用法 2.2ReentrantLoc…

BaseMapper 中的方法

BaseMapper 中的方法&#xff1a; 插入 int insert(T entity) - 插入一条记录。 删除 int deleteById(Serializable id) - 根据主键ID删除记录。 int deleteById(T entity) - 根据实体对象&#xff08;ID&#xff09;删除记录。 int deleteByMap(Map<String, Object> …

快速用Python进行数据分析技巧详解

概要 一些小提示和小技巧可能是非常有用的&#xff0c;特别是在编程领域。有时候使用一点点黑客技术&#xff0c;既可以节省时间&#xff0c;还可能挽救“生命”。 一个小小的快捷方式或附加组件有时真是天赐之物&#xff0c;并且可以成为真正的生产力助推器。所以&#xff0…

【SpringCloud】微服务技术栈入门1 - 远程服务调用、Eureka以及Ribbon

目录 远程服务调用RestTemplate Eureka简要概念配置 Eureka 环境设置 Eureka ClientEureka 服务发现 Ribbon工作流程配置与使用 Ribbon饥饿加载 远程服务调用 RestTemplate RestTemplate 可以模拟客户端来向另外一个后端执行请求 黑马给出的微服务项目中&#xff0c;有两个 …

漏刻有时数据可视化Echarts组件开发(28):异形柱图、pictorialBar和dataZoom组件的使用

构建容器 var dom document.getElementById(container);var myChart echarts.init(dom, null, {renderer: canvas,useDirtyRect: false});模拟数据 var dataList [{name: 班级一, value: 120, max: 120, min: 20},{name: 班级二, value: 183, max: 200, min: 20},{name: 班级…

Windows如何删除“$WINDOWS.~BT“文件夹,解决权限不足无法删除

$WINDOWS.~BT是干嘛的 $Windows.BT是升级或者安装Windows操作系统中间过程中产生的临时文件夹&#xff0c;一般用于保存下载后的升级文件&#xff0c;或者安装过程中复制文件时产生的。用于保存Windows安装记录, 包括配置资料, 错误报告等, 如果安装失败便可反馈给微软公司&am…

pytorch学习3(pytorch手写数字识别练习)

网络模型 设置三层网络&#xff0c;一般最后一层激活函数不选择relu 任务步骤 手写数字识别任务共有四个步骤&#xff1a; 1、数据加载--Load Data 2、构建网络--Build Model 3、训练--Train 4、测试--Test实战 1、导入各种需要的包 import torch from torch import nn f…

Matlab图像处理-区域特征

凹凸性 设P是图像子集S中的点&#xff0c;若通过的每条直线只与S相交一次&#xff0c;则称S为发自P的星形&#xff0c;也就是站在P点能看到S的所有点。 满足下列条件之一&#xff0c;称此为凸状的&#xff1a; 1.从S中每点看&#xff0c;S都是星形的&#xff1b; 2.对S中任…

软件设计师笔记系列(四)

&#x1f600;前言 随着技术的快速发展&#xff0c;软件已经成为我们日常生活中不可或缺的一部分。从智能手机应用到大型企业系统&#xff0c;软件都在为我们提供便利、增强效率和创造价值。然而&#xff0c;随之而来的是对软件质量的日益增长的关注。软件的质量不仅关乎其功能…