Spark RDD优化

news2024/9/9 1:26:03

Spark RDD优化

  • 一、分区优化
  • 二、持久化优化
  • 三、依赖优化
  • 四、共享变量优化
  • 五、提交模式与运行模式优化
  • 六、其他优化

一、分区优化

  • 分区数调整:RDD的分区数可以通过repartitioncoalesce方法进行调整。合理的分区数可以提高并行度,但过多的分区会增加管理开销。通常,分区数应根据数据规模和集群资源进行调整。

    val rdd: RDD[String] = rdd.coalesce(numPartitions:Int, shuffle:Boolean)
    val rdd: RDD[String] = rdd.repartition(numPartitions:Int) 
    // repartition(numPartitions: Int) 等价于 coalesce(numPartitions, true) 
    
    1. 缩小分区

      存在过多的小任务的时候收缩合并分区,减少分区的个数,减少任务调度成本
      默认情况下,不会对数据重组,比如:3个合成2个,采用 {1+2},{3},容易导致数据倾斜
      若需数据均衡,则将 shuffle 参数设置为 true 即可

    2. 扩大分区

      若需要扩大分区,shuffle 参数必须设置为 true
      若将2个分区拆分成3个,必须打乱重新分区,否则数据还是在两个分区(有一个分区为空),{1},{2},{空}

  • 数据本地性:Spark会尽量将数据分配给与数据源相同的计算节点上,以减少数据移动的开销。在创建RDD时,可以通过设置分区偏好(如preferredLocations)或自定义分区来优化数据本地性,以最小化网络传输并最大化计算效率。

    自定义分区

    // 自定义分区器
    class MyPartitioner(numPartitions: Int) extends Partitioner {
      override def numPartitions: Int = numPartitions   // 返回分区器的分区数量
      override def getPartition(key: Any): Int = {
          // 这里需要实现分区逻辑
          // 返回值是一个整数,表示该键应该被分配到哪个分区
      }
    }
    
    // 使用自定义分区器重新分区  
    val partitionedRDD = rdd.partitionBy(new MyPartitioner(2))  // 传入分区个数
    
  • 处理数据倾斜:数据倾斜是指某些分区包含的数据远远多于其他分区,导致计算资源分配不均。可以使用repartitioncoalesce方法重新分区RDD,或使用reduceByKeygroupByKey的变体等特定操作来减轻数据倾斜的影响。

二、持久化优化

  • 持久化策略:对于需要多次使用的RDD,应该进行持久化操作,以避免重复计算。持久化策略包括内存持久化(如MEMORY_ONLY)、磁盘持久化(如DISK_ONLY)以及内存和磁盘混合持久化(如MEMORY_AND_DISK)等。

  • 序列化:使用序列化可以进一步减少内存消耗,并提高持久化效率。Spark支持多种序列化框架,如Java序列化、Kryo序列化等。Kryo序列化通常比Java序列化更快,且占用空间更小。

    // 临时存储于【xx】重用,job结束后自动删除 
    val rddCache: RDD[T] = rdd.cache()					// 到内存上
    val rdd: RDD[T] = rdd.persist(level:StorageLevel)
    // cache() 		等价于persist(StorageLevel.MEMORY_ONLY)
    // persisit() 	参数如下
    
    StorageLevel.MEMORY_ONLY				只写到内存上
    StorageLevel.DISK_ONLY					只写到磁盘上
    StorageLevel.OFF_HEAP					使用堆外内存
    StorageLevel.MEMORY_AND_DISK			先内存,后磁盘 
    StorageLevel.MEMORY_AND_DISK_SER		先内存,后磁盘,采取序列化方式
    StorageLevel.MEMORY_AND_DISK_SER_2 		先内存,后磁盘,采取二代序列化方式
    
  • 检查点:对于需要长时间运行或可能遭受故障的应用,设置检查点(Checkpoint)可以将RDD的状态保存到稳定存储中,以便在故障后恢复。检查点会切断RDD的血统关系,从而避免重新计算整个血统链。

    // checkpoint 长久存储于【磁盘】重用,job结束后不会删除,涉及IO性能较差,安全且一般和cache组合使用
    val conf = new SparkConf()
        .setAppName("spark_rdd")
        .setMaster("local[4]")
    val sc = SparkContext.getOrCreate(conf)
    // 设置检查点路径
    sc.setCheckpointDir("hdfs://ip:9000/spark/checkpoint")
    // ... 
    rdd.checkpoint()	// 将该 RDD 的内容写入到设置的路径,并在该 RDD 的计算图中插入一个检查点(Checkpoint)节点
    

三、依赖优化

  • 宽依赖与窄依赖:RDD之间的依赖关系分为宽依赖和窄依赖。窄依赖有助于实现数据本地性,而宽依赖则可能导致数据移动和网络开销。在设计RDD转换操作时,应尽量避免不必要的宽依赖。

    1、Driver程序提交后

    1、Spark调度器将所有的RDD看成是一个Stage
    2、然后对此Stage进行逆向回溯,遇到Shuffle就断开,形成一个新的Stage
    3、遇到窄依赖,则归并到同一个Stage
    4、等到所有的步骤回溯完成,便生成一个DAG图

    2、为什么要划分阶段

    1、基于数据的分区,本着传递计算的性能远高于传递数据,所以数据本地化是提升性能的重要途径之一
    2、一组串行的算子,无需 Shuffle,基于数据本地化可以作为一个独立的阶段连续并行执行
    3、经过一组串行算子计算,遇到 Shuffle 操作,默认情况下 Shuffle 不会改变分区数量,但会因为 numPartitions:Int, partitioner:Partitioner 等参数重新分配,过程数据会【写盘供子RDD拉取(类MapReduce)】

    3、RDD依赖关系

    • Lineage:血统、遗传

      RDD最重要的特性之一,保存了RDD的依赖关系

      RDD实现了基于Lineage的容错机制

    • 依赖关系 org.apache.spark.Dependency

      窄依赖 NarrowDependency,1V1 OneToOneDependency,1VN RangeDependency
      宽依赖 ShuffleDependency

    • 当RDD分区丢失时

      对于窄依赖,Spark只需要重新计算丢失分区的父RDD分区即可。
      对于宽依赖,Spark需要重新执行整个shuffle过程,以重新生成丢失的数据。
      若配合持久化更佳:cache, persist, checkpoint

    在这里插入图片描述

    类型
    窄依赖map,flatMap,mapPartitions,mapPartitionsWithIndex,glom,filter,distinct,intersection,sample,union,subtract,zip…,cogroup
    宽依赖sortBy,sortByKey,groupByKey,reduceByKey,cogroup,join,partitionBy,repartition
    不一定的情况在Spark中,并非所有操作都可以明确地归类为宽依赖或窄依赖。有些操作可能根据具体的实现或上下文而有所不同。然而,在大多数情况下,上述提到的算子可以清晰地划分为宽依赖或窄依赖。
    如:reduceByKey(【partitioner: Partitioner】, func: (V, V) => V)
    若使用的是带 partitioner 的重载且 Partitioner 和父RDD的 Partitioner一致
    则为窄依赖RDD,否则为宽依赖ShuffledRDD
    
  • 优化转换操作:在可能的情况下,使用能够减少shuffle操作的转换函数,如mapPartitions代替mapreduceByKey代替groupByKey等。这些操作可以减少数据在网络中的传输量,从而提高性能。

    shuffle性能较差:因为shuffle必须落盘,内存中等数据会OOM
    groupByKey只分组(存在Shuffle) + reduce只聚合
        <=结果同,性能不同=>
    reduceByKey先分组、预聚合、再聚合(存在Shuffle) 
    

四、共享变量优化

  • 广播大变量:当Spark作业中需要使用到较大的外部变量时,可以将这些变量广播到每个节点的Executor上,而不是每个Task都复制一份。这样可以减少网络传输开销和内存消耗。

    val bc:Broadcast[T] = sc.broadcast(value:T)		// 创建广播变量  
    rdd.mapPartitions(itPar=>{
        val v:T = bc.value	// 在每个分区内部,通过bc.value获取广播变量的值  
        ...					// 使用v进行计算...
    })
    
  • 累加器(Accumulators):累加器提供了一种有效的手段来进行分布式计算中的统计和计数操作,减少通信开销,并简化聚合操作。

    累加器:accumulate:只能 add 操作,常用于计数
    1、定义在Driver端的一个变量,Cluster中每一个Task都会有一份Copy
    2、所有的Task都计算完成后,将所有Task中的Copy合并到驱动程序中的变量中
    非累加器:在所有Task中的都会是独立Copy,不会有合并

    累加器
    val accLong: LongAccumulator = sc.longAccumulator("longAcc")	// 定义累加器
    val accDouble: DoubleAccumulator = sc.doubleAccumulator("doubleAcc")
    rdd.mapPartitions(itPar=>{
        ...
        accLong.add(v:Long)		// 将值添加到累加器中
        accDouble.add(v:Double)
        ...
    })
    accXxx.reset()		// 重置累加器
    val isZero:Boolean = accXxx.isZero	// 检查累加器是否为零值
    val num:Long|Double = accXxx.value|sum|count|avg // 获取累加器的值、总和、计数或平均值
    
    // 定义一个累加器,用于统计 "bad" 记录的数量
    val errorCount = sc.longAccumulator("Error Count")
    val data = sc.parallelize(Array("good", "bad", "good", "bad", "good"))
    data.foreach(record => if (record == "bad") errorCount.add(1))
    // 打印累加器的值,即 "bad" 记录的总数
     println(s"Total errors: ${errorCount.value}")
    

    自定义累加器:

    写一个类继承 import org.apache.spark.util.AccumulatorV2[IN, OUT]

    abstract class AccumulatorV2[IN, OUT] extends Serializable {
        // 返回是否为零值累加器
        def isZero: Boolean
    
        // 创建此累加器的新副本,其为零值
        def copyAndReset(): AccumulatorV2[IN, OUT] = {...}
    
        // 创建此累加器的新副本
        def copy(): AccumulatorV2[IN, OUT]
    
        // 重置此累加器为零值
        def reset(): Unit
    
        // 添加:接收输入并累加
        def add(v: IN): Unit
    
        // 合并:合并另一个相同类型的累加器并更新其状态
        def merge(other: AccumulatorV2[IN, OUT]): Unit
    
        // 当前累加器的值
        def value: OUT
    }
    
  • 自定义计量器优化(Custom Metrics):自定义计量器允许用户定义和收集特定的性能指标,提供更细粒度的作业监控和调优能力。通过 SparkListener 接口,可以实现自定义的监听器来监控和记录所需的指标。

五、提交模式与运行模式优化

  • 提交模式:Spark支持Client模式和Cluster模式两种提交方式。Client模式便于查看日志和结果,但可能消耗较多资源;Cluster模式则更适合大规模作业,但查看日志和结果可能不太方便。应根据实际情况选择合适的提交模式。

    spark-submit --class <MainClass> --master <MasterURL> --deploy-mode <DeployMode> <PathToJar>
    

    <MainClass>:包含 main 方法的主类的名称。

    <MasterURL>:指定集群的 Master URL。

    <DeployMode>:指定提交模式,可以是 clientcluster

    <PathToJar>:包含 Spark 应用程序的 JAR 文件的路径。

    spark-submit --class SparkClientModeApp --master yarn --deploy-mode client /path/to/your/jarfile.jar	
    spark-submit --class SparkClientModeApp --master yarn --deploy-mode cluster /path/to/your/jarfile.jar
    
  • 运行模式:Spark支持多种运行模式,如Local模式、Standalone模式、YARN模式等。不同的运行模式适用于不同的场景和需求。例如,Local模式适用于本地开发和测试;Standalone模式适用于构建独立的Spark集群;YARN模式则适用于与Hadoop生态系统集成。

    local: 在单核上运行
    local[N]: 在指定数量的 N 个核上运行,如 “local[4]”
    local[*]: 使用所有可用的核
    spark://HOST:PORT: 连接到指定的 Spark standalone cluster
    yarn: 连接到 YARN 集群
    mesos://HOST:PORT: 连接到 Mesos 集群

六、其他优化

  • 序列化框架选择:除了Kryo序列化外,还可以考虑使用其他高效的序列化框架来优化Spark作业的性能。
  • 监控与调优:使用Spark提供的监控工具和API(如Spark UI、getStorageLevel方法等)来监控作业的运行状态和性能瓶颈,并根据监控结果进行调优。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1917341.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024年Wolf沃尔夫奖数学奖得主出炉:诺加·阿隆(Noga Alon)、阿迪·萨莫尔(Adi Shamir)

沃尔夫基金会官网&#xff1a;2024年沃尔夫数学奖联合颁发给了以色列魏茨曼科学研究所的阿迪萨莫尔 &#xff08;Adi Shamir&#xff0c;1952 -&#xff09; 教授和美国普林斯顿大学的诺加阿隆&#xff08;Noga Alon&#xff0c;1963 -&#xff09; 教授&#xff0c;表彰他们对…

ADI新型充电器解决方案可实现电池堆电压和充电效率

就目前而言&#xff0c;这可能是生活中zui常见的问题了。世纪之交&#xff0c;电池&#xff08;尤其是基于锂离子的电池&#xff09;成本的降低和性能的提高&#xff0c;推动了电池供电的储能和便携式设备的稳步增长。此外&#xff0c;超级电容器由于具有独特的性质&#xff0c…

引用类飘红,但是导入失败

背景&#xff1a; 引用类飘红&#xff0c;且显示可导入&#xff0c;但是导入一直失败。 列如下图&#xff0c;引导你可以导入依赖包&#xff0c;但是你发现按了导入他是导入不了的&#xff0c;就一直失败&#xff0c;就一直飘红 解决方案&#xff1a; 我们用一下idea的清理不…

IDEA中Debug的使用

自定义功能图表 功能说明 光标回到Debug行 执行到光标所在行 Force Step into Step into 进入JDK的配置 step into做了下述配置后&#xff0c;也可以和force step into一样进入到JDK中。 Trace Current Stream Chain Reset Frame 重置方法入栈 Force Return Break Point四…

【Spring Cloud精英指南】深度探索与实战:网关Gateway的高级应用与最佳实践

1. 前言 Spring Cloud Gateway提供了一个在Spring生态系统之上构建的API网关&#xff0c;包括&#xff1a;Spring 5&#xff0c;Spring Boot 2和Project Reactor。Spring Cloud Gateway旨在提供一种简单而有效的路由方式&#xff0c;并为它们提供一些网关基本功能&#xff0c;…

警惕:与ChatGPT共享业务数据可能十分危险

您已经在使用ChatGPT了吗&#xff1f;或者您正在考虑使用它来简化操作或改善客户服务&#xff1f;虽然ChatGPT提供了许多好处&#xff0c;但重要的是&#xff0c;您要意识到与ChatGPT这样的人工智能工具共享敏感业务数据相关的安全风险。下面&#xff0c;我们概述了一些关键问题…

什么是 C 语言中的代码优化技巧?

&#x1f345;关注博主&#x1f397;️ 带你畅游技术世界&#xff0c;不错过每一次成长机会&#xff01; &#x1f4d9;C 语言百万年薪修炼课程 【https://dwz.mosong.cc/cyyjc】通俗易懂&#xff0c;深入浅出&#xff0c;匠心打磨&#xff0c;死磕细节&#xff0c;6年迭代&…

Javaweb09-数据库连接池技术

数据库连接池 1.数据库连接池基本概念&#xff1a; JDBC连接池是一个管理数据库连接的重要工具&#xff0c;它能够显著提高应用程序与数据库之间的性能和效率。连接池通过预先创建和维护一组数据库连接&#xff0c;而不是每次请求都创建新的连接&#xff0c;从而避免了频繁的…

js字符串文字添加不同颜色,replace的妙用$1...$9

更改字符串第一个数字为红色显示&#xff0c;第二个数字为黄色显示 $1匹配的是正则第一个括号选中的字符串&#xff0c;可以使用正则不断用括号匹配然后更改样式 const testStr "剩余12个名额&#xff0c;截止时间12月25日" testStr this.testStr.replace(/(\d)(\D…

【Python】 已解决:Python编码问题导致的SyntaxError

文章目录 一、分析问题背景二、可能出错的原因三、错误代码示例四、正确代码示例五、注意事项 已解决&#xff1a;Python编码问题导致的SyntaxError 一、分析问题背景 在使用Python进行编程时&#xff0c;有时会遇到编码相关的问题。特别是在处理包含非ASCII字符&#xff08;…

算法复杂度详解( 超详细!)

前言&#xff1a; 今天&#xff0c;小编正式开始学习了数据结构的正式内容&#xff0c;学习了算法复杂度相关的内容&#xff0c;为了加强对这个的了解&#xff0c;于是诞生了这一篇文章&#xff0c;下面废话不多说&#xff0c;开始进入复杂度的详解&#xff01; 目录&#xff1…

鸿蒙元服务API集全新呈现-开发更清晰高效

鸿蒙元服务API集全新呈现&#xff0c;开发更清晰高效&#xff0c;具体见如下截图&#xff0c;深黑色部分即本阶段公布支持的元服务API集。 本材料整理来源于HarmonyOS NEXT Developer Beta1官方公开的文档

银河麒麟(Kylin)KYSEC使用

1.推荐使用方法 *.临时禁用指令: setstatus disable--禁用 注&#xff1a;执行reboot后系统会自动启动 2.选用指令&#xff1a; *.永久禁用指令&#xff1a; setstatus disable -p *.重启后,KYSEC还是处理关闭关状态。 *.使用如下指令启用&#xff1a;setstatus enable …

慕尼黑电子展回顾:启明智显闪耀全场,多模态硬件智能体引领未来科技潮流

在刚刚落幕的慕尼黑电子展上&#xff0c;启明云端携启明智显与触觉智能两家子公司&#xff0c;共同为全球观众呈现了一场科技盛宴。本次展会&#xff0c;启明智显凭借其创新的多模态硬件智能体及一系列前沿产品&#xff0c;赢得了广泛关注与好评&#xff0c;展位现场人流如织&a…

基于java+springboot+vue实现的大学城水电管理系统(文末源码+Lw)106

基于SpringBootVue的实现的大学城水电管理系统&#xff08;源码数据库万字Lun文流程图ER图结构图演示视频软件包&#xff09; 系统功能&#xff1a; 本大学城水电管理系统 管理员功能有个人中心&#xff0c;用户管理&#xff0c;领用设备管理&#xff0c;消耗设备管理&#x…

基于Java+Vue的场馆预约系统源码体育馆羽毛球馆篮球馆预约

市场前景 市场需求持续增长&#xff1a;近年来&#xff0c;随着人们生活水平的提高和休闲娱乐需求的多样化&#xff0c;各类场馆&#xff08;如体育馆、图书馆、博物馆、剧院等&#xff09;的访问量不断增加。然而&#xff0c;传统的预约方式往往存在效率低下、信息不透明等问…

浅谈React

forwardRef和useImperativeHandle的联动使用 import React, { useImperativeHandle, useRef } from "react" import { forwardRef } from "react"const CustomInput forwardRef((props, ref) > {const inputRef useRef<HTMLInputElement>(null…

【以史为镜、以史明志,知史爱党、知史爱国】中华上下五千年之-宋朝(北宋)

宋朝&#xff08;960年—1279年&#xff09;是中国历史上承五代十国下启元朝的朝代&#xff0c;分北宋和南宋两个阶段&#xff0c;共历十八帝&#xff0c;享国三百一十九年。 北宋 赵匡胤&#xff08;宋太祖&#xff09;-赵光义&#xff08;宋太宗&#xff09;-赵恒&#xff08…

【MyBatis】——入门基础知识必会内容

&#x1f3bc;个人主页&#xff1a;【Y小夜】 &#x1f60e;作者简介&#xff1a;一位双非学校的大二学生&#xff0c;编程爱好者&#xff0c; 专注于基础和实战分享&#xff0c;欢迎私信咨询&#xff01; &#x1f386;入门专栏&#xff1a;&#x1f387;【MySQL&#xff0…

【初阶数据结构】树与二叉树:从零开始的奇幻之旅

初阶数据结构相关知识点可以通过点击以下链接进行学习一起加油&#xff01;时间与空间复杂度的深度剖析深入解析顺序表:探索底层逻辑深入解析单链表:探索底层逻辑深入解析带头双向循环链表:探索底层逻辑深入解析栈:探索底层逻辑深入解析队列:探索底层逻辑深入解析循环队列:探索…