Spark 优化

news2024/11/24 8:59:41

1.RDD分区数

Task是作用在每个分区上的,每个分区至少需要一个Task去处理

改变分区数可间接改变任务的并行度,类似手动指定Reduce数量

第一个RDD的分区数由切片的数量决定 默认情况下子RDD的分区数等于父RDD的分区数

Shuflle类算子可手动指定RDD分区数 设置spark.default.parallelism参数可改变Shuffle类算子默认分区数

通过repartition/coalesce操作改变RDD分区数

repartition:通过shuffle的方式增加或减少分区数 coalesce:默认不通过shuffle的方式改变分区数,只能减少分区

优先级

决定分区数优先级,依次从高到低:

1.使用repartiton/coalesce手动改变

增加到8个分区:someRDD.repartition(numPartitions = 8)

减少到2个分区:someRDD.coalesce(numPartitions = 2)

2.使用shuffle类算子时手动指定 kvRDD.groupByKey(numPartitions = 4)

3.设置spark.default.parallelism参数 conf.set("spark.default.parallelism", "20")

4.基于默认情况 默认等于上一个RDD的分区数,如有多个RDD则分区数相加 

package com.bigdata.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object DemoPartitions {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo21Partitions")

    conf.set("spark.default.parallelism", "3")

    val sc: SparkContext = new SparkContext(conf)

    //1.第一个RDD的分区数由切片数量决定,但也会受到minPartitions的影响
    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")
    println(s"stuRDD的分区数:${stuRDD.getNumPartitions}")

    //2.默认情况下,子分区RDD数量等于父RDD的分区数
    val clazzKVRDD: RDD[(String, Int)] = stuRDD.map(line => (line.split(",")(4), 1))
    println(s"clazzKVRDD的分区数为:${clazzKVRDD.getNumPartitions}")

    //3.shuffle类算子可以手动指定分区数量
    val grpRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey(4)
    println(s"grpRDD的分区数为:${grpRDD.getNumPartitions}")

    //4.如果shuffle类算子没有指定分区数,默认等于父RDD的分区数
    // 当然也可以由Spark的一个参数决定:spark.default.parallelism
    // shuffle类算子最后返回的RDD分区数决定因素:手动指定 > spark.default.parallelism > 父RDD的分区数
    val grpRDD1: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()
    println(s"grpRDD的分区数为:${grpRDD.getNumPartitions}")


    val filterRDD: RDD[String] = stuRDD.filter(line => line.split(",")(4) == "文科一班")
    println(s"filterRDD的分区数为:${filterRDD.getNumPartitions}")

     coalesce只能用于减少分区数量
    //coalesce默认不使用shuffle改变分区的数量,所以只适用于减少分区数
    val filterCoalRDD: RDD[String] = filterRDD.coalesce(1)
    println(s"filterCoalRDD的分区数为:${filterCoalRDD.getNumPartitions}")

    /**
     * repartition实际上就是调用了coalesce方法,只不过将shuffle开关置为true
     * 即使用shuffle的方式改变分区数
     * 使用repartition不仅可以增加分区数,也可以减少分区数
     */
    val repoFilterRDD1: RDD[String] = filterRDD.repartition(3)
    println(s"repofilterRDD1的分区数为:${repoFilterRDD1.getNumPartitions}")

    val repoFilterRDD2: RDD[String] = filterRDD.repartition(1)
    println(s"repofilterRDD2的分区数为:${repoFilterRDD2.getNumPartitions}")


  }

}


     如何选择coalesce还是repartition(coalesce不带shuffle操作,repartition为shuffle操作)
     增加分区:repartition
     减少分区
      后续是否要做shuffle类的操作:
     需要:repartition
     不需要:coalesce
    

2.Cache缓存

Spark中对每个RDD执行一个算子操作时,都会重新从源头处计算一遍 如果该RDD被多次使用,则会导致该RDD被重复计算 重复计算,浪费资源,消耗时间,影响整体性能

对多次使用的RDD可以通过cache/persist操作进行缓存

repeatRDD.cache() 默认以仅内存策略对RDD进行缓存

相当于repeatRDD.persist(StorageLevel.MEMORY_ONLY) ,可以设置StorageLevel

如何选择合适的缓存策略:

内存充足:MEMORY_ONLY 

内存不够:MEMORY_AND_DISK_SER

缓存策略需要通过persist方法进行指定

package com.bigdata.core

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

object DemoCache {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo22Cache")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")

    //使用该方法判断次数
    val newStuRDD: RDD[String] = stuRDD.map(line => {
      println("使用map方法")
      line
    })

    //对多次使用的RDD进行缓存
    //   newStuRDD.cache()
    // cache相当于默认使用MEMORY_ONLY的缓存策略


    newStuRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

    // 统计班级人数
    newStuRDD.map(line => (line.split(",")(4), 1)).reduceByKey(_ + _).foreach(println)

    // 统计性别人数
    newStuRDD.map(line => (line.split(",")(3), 1)).reduceByKey(_ + _).foreach(println)

    // 统计年龄分布情况
    newStuRDD.map(line => (line.split(",")(2).toInt, 1)).reduceByKey(_ + _).foreach(println)

    newStuRDD.unpersist()

  }

}

3.CheckPoint检查点

Checkpoint 检查点是一种容错容灾机制(类似于快照操作)

将某一时刻运行的内存数据和状态进行持久化

通常会持久化到磁盘

或者是分布式文件系统,例如HDFS 

CheckPoint 的执行原理: 当RDD的job执行完毕后,会从最后一个RDD往前回溯 当回溯到某个RDD调用了checkpoint方法后,Spark会启动一个新的job 该任务会重新计算该RDD的数据,并持久化到HDFS上 

package com.bigdata.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object DemoCheckPoint {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo23CheckPoint")

    val sc: SparkContext = new SparkContext(conf)

    //CheckPoint操作执行前需要先指定目录来存放检查点(快照)
    sc.setCheckpointDir("spark/data/ck")

    val stuRDD: RDD[String] = sc.textFile("spark/data/stu/students.txt")

    val newStuRDD: RDD[String] = stuRDD.map(line => {
      println("调用map方法")
      line
    })

    newStuRDD.cache()

    //对多次使用或者计算量较大的RDD做CheckPoint
    newStuRDD.checkpoint()

    // 统计班级人数
    newStuRDD.map(line => (line.split(",")(4), 1)).reduceByKey(_ + _).foreach(println)

    // 统计性别人数
    newStuRDD.map(line => (line.split(",")(3), 1)).reduceByKey(_ + _).foreach(println)

    // 统计年龄分布情况
    newStuRDD.map(line => (line.split(",")(2).toInt, 1)).reduceByKey(_ + _).foreach(println)


  }

}

可以先对需要执行checkpoint操作的RDD先进行cache操作,防止重复计算,提高性能

 checkpoint vs cache

4.Lineage血统

Spark中解决节点失效、数据丢失等问题时采用的一种机制或方案 为了保证RDD 中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的 将每种形态看做一个RDD 

 相比其它系统的细粒度容错机制

内存数据更新级别的备份或者LOG机制

Redis中的RDB策略

HBase的WAL机制

RDD的Lineage记录的是粗粒度的

RDD之间的转换操作都会会被记录

例如filter, map, join等操作都会被记录

有点类似Redis中的AOF策略

当RDD的分区数据丢失时 ,可通过Lineage来重新运算并恢复丢失的数据

宽依赖的分区数据重算开销要远大于窄依赖

5.广播变量

算子内部的代码最终会被封装到Task并发送到Executor中执行 如果在算子内部使用了算子外部的变量,变量也会封装到Task中 Task中使用的实际上是外部变量的副本 Task的数量决定了外部变量副本的数量 Task是在Executor中执行的 Task的数量会远大于Executor的数量 故可将外部变量广播到每个Executor中,减少变量的副本数 进而减少网络中传输的数据量,提升运行效率

    val conf: SparkConf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Demo19MapJoin")

    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc
      .textFile("spark/data/stu/students.txt")

    val scoreRDD: RDD[String] = sc
      .textFile("spark/data/stu/score.txt")

    //求出每个学生的总分
    val sumScoreRDD: RDD[(String, Int)] = scoreRDD.map(line => {
      val splits: Array[String] = line.split(",")
      (splits(0), splits(2).toInt)
    }).reduceByKey(_ + _)

    //不可以在RDD中嵌套RDD,故需要先将RDD转换成map
    val sumScoreMap: Map[String, Int] = sumScoreRDD.collect().toMap

    //在此处通过map的getOrElse来获取上部中被转化的map中的学生总分
    stuRDD.map(line=>{
      val splits: Array[String] = line.split(",")
      val id: String = splits(0)
      val sumScore: Int = sumScoreMap.getOrElse(id, 0)
      s"$id,${splits(1)},${splits(4)},$sumScore"
    }).foreach(println)

广播变量是Executor之间的共享变量 广播变量在Driver中一次性创建

Executor对这些变量只能进行读取操作 一般会对本地变量进行广播

例如整形、List集合、Map集合等

但通常是当变量较大时才会进行广播

例如将RDD转换成本地集合再进行广播

广播变量会广播到每个Executor,由BlockManager维护

在Executor上运行的Task都可以访问广播变量

6.累加器

使用及执行原理如图所示

在Driver端定义 sc.longAccumulator

在算子内部累加 accCount.add

在Driver端汇总 accCount.value 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/622382.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

chatgpt赋能python:Python屏幕截图并保存:简单易用的库

Python屏幕截图并保存:简单易用的库 屏幕截图是程序员们在软件开发中常用到的一个小技巧,对于调试、记录Bug、编写文档等方面有极大的帮助。而其中,Python成为了众多程序员的利器之一。 在Python中,大量的库提供了屏幕截图的方法…

PX4-机架选取(基于QG地面站)

因为我的无人机是F450,所以我选用F450的机架 点击应用后,要稍等一会 应用完成后在概述会标识

经纬度坐标为中心点生成米距离长度半径的圆形面,含java js源码+在线绘制,代码简单零依赖

文章目录 java版源码js版源码在线绘制预览效果关于计算的精确度 前些时间在更新我的坐标边界查询工具的时候,需要用到经纬度坐标点的距离计算,和以坐标点为中心生成一个指定距离为半径的圆,搜了一下没有找到现成简单又合适的代码,…

基于OpenCV 和 Dlib 进行头部姿态估计

写在前面 工作中遇到,简单整理博文内容涉及基于 OpenCV 和 Dlib头部姿态评估的简单Demo理解不足小伙伴帮忙指正 庐山烟雨浙江潮,未到千般恨不消。到得还来别无事,庐山烟雨浙江潮。 ----《庐山烟雨浙江潮》苏轼 https://github.com/LIRUILONGS…

2023智源大会议程公开 | 大模型新基建与智力运营论坛

6月9日,2023北京智源大会,将邀请这一领域的探索者、实践者、以及关心智能科学的每个人,共同拉开未来舞台的帷幕,你准备好了吗?与会知名嘉宾包括,图灵奖得主Yann LeCun、OpenAI创始人Sam Altman、图灵奖得主…

【模型评估】混淆矩阵(confusion_matrix)之 TP、FP、TN、FN;敏感度、特异度、准确率、精确率

你这蠢货,是不是又把酸葡萄和葡萄酸弄“混淆”啦!!!这里的混淆,我们细品,帮助我们理解名词“混淆矩阵” 上面日常情况中的混淆就是:是否把某两件东西或者多件东西给弄混了,迷糊了。把…

数据隐私保护的最佳实践:全面了解数据脱敏方案

1、数据脱敏 数据脱敏是一种保护敏感信息的安全措施,通常会将真实数据替换成模拟数据或者经过处理后的数据。下面是常见的数据脱敏实现方案: 字符串替换:将需要脱敏的字符串中指定位置的字符替换为“****”或其他符号。例如,将银…

MySQL数据库误删恢复

前言 经常听说删库跑路这真的不只是一句玩笑话,若不小心删除了数据库,事情很严重。你一个不小心可能会给公司删没。建议研发不要直连生成环境,一般的话都会分配账号权限,生产环境的账号尽量是只读,以防你一个不经意给库或表删除。一定要备份,这很重要,这是一个血的教训。…

iTOP3568开发板-Buildroot 系统设置待机和锁屏

Weston 的超时待机时长可以在启动参数中配置,也可以在 weston.ini 的 core 段配置。 方法一: 修改文件系统中/etc/init.d/S50launcher 文件,如下图所示的红框,0 代表禁止待机,可自行设置待机时间,单位是秒。 方法二&a…

深浅拷贝各种实现方式性能

拷贝方式 拷贝方式类型原理备注Object.clone()默认 浅拷贝,可以自定义实现深拷贝对象内存复制constructor可以实现深拷贝自定义实现BeanUtil.copyProperties()浅拷贝利用 getter/setter 实现属性拷贝反射,spring utilCollectionUtils.clone()深拷贝本质…

强化学习驱动的低延迟视频传输

随着视频会议、视频直播的流行以及未来AR/VR业务的发展,低延迟视频传输服务被广泛使用,但视频质量(QoE)还不能满足用户要求。那么近年来新兴的AI神经网络是否能为视频传输带来智能化的优化?今天LiveVideoStack大会北京…

macos m1 pip install lightgbm error

MacOS M1电脑,执行 pip install lightgbm 错误如下: 尝试如下操作: 参考链接如下: https://github.com/Microsoft/LightGBM/issues/1324 brew install cmake brew install gcc git clone --recursive https://github.com/Micro…

Unity之OpenXR+XR Interaction Toolkit接入HTC Vive解决手柄无法使用的问题

前言 随着Unity版本的不断进化,VR的接口逐渐统一,现在大部分的VR项目都开始使用OpenXR开发了。基于OpenXR,我们可以快速适配HTC,Pico,Oculus,等等设备。 今天我们要说的问题就是,当我们按照官方的标准流程配置完OpenXR后(参考:Unity之OpenXR+XR Interaction Toolkit…

西门子S7-200 CPU输入/输出接线说明

总结来看,S7-200系列PLC提供4个不同的基本型号的8种CPU,其接线方式也可大致分为6种: 1.CPU SR20接线 2.CPU SR40接线 3.CPU CR40接线 4.CPU ST40接线 5. CPU SR60接线 6. CPU ST60接线 除了CPU外,我们还需要了解200smart PLC的数…

排序算法大总结(插入、希尔、选择、堆、冒泡、快速、归并、计数)

1. 排序概要2. 插入排序直接插入排序希尔排序(缩小增量排序) 3.选择排序直接选择排序堆排序 4. 交换排序冒泡排序快速排序霍尔版本(hoare)挖坑法双指针版本快排优化快速排序非递归 5. 归并排序归并递归版本归并非递归版本 6.计数排…

物联网开发项目中具备哪些特点?

物联网开发是一项由设备、传感器、系统和应用程序组成的复杂技术。许多公司已经制定了计划,准备将物联网技术整合到他们的系统中,以帮助提高效率和生产力。在许多方面,物联网都是一项技术投资,但与其他投资相比,它可以…

RFID如何提升工业物流管理效率?

RFID技术目前已经在工业物流行业中广泛应用,企业只要将附带产品信息的RFID标签贴在货物上,利用工业读写器,可以快速准确地读取货物信息,提高仓库管理效率。 RFID如何提升工业物流管理效率? 1、跟踪货物 RFID技术可帮助企业跟踪货…

声表面波滤波器圆片级互连封装技术研究

陈作桓,于大全,张名川 厦门大学,厦门云天半导体科技有限公司 摘要 射频前端模块是无线通信的核心,滤波器作为射频前端的关键器件,可将带外干扰和噪声滤除以保留特定频段内的信号,满足射频系统的通讯要求…

B站刚崩,唯品会又崩:亿级用户网站的架构硬伤与解决方案

说在前面 在40岁老架构师尼恩的数千读者的社区中,一直在指导大家简历和职业升级。前几天,指导了一个华为老伙伴的简历,小伙伴的优势在异地多活,但是在简历指导的过程中,尼恩发现: 异地多活的概念、异地多活…

Segment Anything模型用于地理空间数据

原文地址:https://samgeo.gishub.org/examples/satellite/ 此笔记本通过几行代码展示了如何使用 Segment Anything Model (SAM) 来使用分段卫星图像。确保为此jupyter notebook使用 GPU 运行时。 安装依赖 取消注释并运行以下单元格以安装所需的依赖项。 # %pip ins…