Spark分布式计算原理

news2025/1/5 14:00:11

一、Spark WordCount运行原理

二、划分Stage

数据本地化

        移动计算,而不是移动数据

        保证一个Stage内不会发生数据移动

三、Spark Shuffle过程

在分区之间重新分配数据

        父RDD中同一分区中的数据按照算子要求重新进入RDD的不同分区中

        中间结果写入磁盘

        有子RDD拉取数据,而不是由父RDD推送

        默认情况下,shuffle不会改变分区数量

四、RDD的依赖关系

Lineage:血统、依赖

        RDD最重要的特征之一,保存了RDD的依赖关系

        RDD实现了基于Lineage的容错机制

依赖关系

        宽依赖:一个父RDD的分区被子RDD的多个分区使用

        窄依赖:一个父RDD的分区被子RDD的一个分区使用

宽依赖对比窄依赖

宽依赖对应shuffle操作,需要在运行时将同一个父RDD的分区传入到不同的子RDD分区中,不同的分区可能位于不同的节点,就可能涉及多个节点间数据传输

当RDD分区丢失时,Spark会对数据进行重新计算,对于窄依赖只需要重新计算一次子RDD的父RDD分区

补:最多22个元组,元组可以套元组

宽依赖容错图

结论:

相比于宽依赖,窄依赖对优化更有利

练习:判断RDD依赖关系

Map  flatMap  filter  distinct  reduceByKey  groupByKey  sortByKey  union  join

scala> val b = sc.parallelize(Array(("hello",11),("java",12),("scala",13),("kafka",14),("sun",15)))
// b: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val c = sc.parallelize(Array(("hello",11),("java",12),("scala",13),("kafka",14),("sun",15)))
// c: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> b.glom.collect
// collect   collectAsync

scala> b.glom.collect
// res4: Array[Array[(String, Int)]] = Array(Array((hello,11)), Array((java,12)), Array((scala,13)), Array((kafka,14), (sun,15)))

scala> c.glom.collect
// collect   collectAsync

scala> c.glom.collect
// res5: Array[Array[(String, Int)]] = Array(Array((hello,11)), Array((java,12)), Array((scala,13)), Array((kafka,14), (sun,15)))

scala> val d = a.jion(b)
// <console>:27: error: value jion is not a member of org.apache.spark.rdd.RDD[(String, Int, Int, Int)]
//        val d = a.jion(b)  //拼写错误
                 ^
scala> val d = a.join(b)
// <console>:27: error: value join is not a member of org.apache.spark.rdd.RDD[(String, Int, Int, Int)]
//        val d = a.join(b)  //应该是bc不是ab
                 ^
scala> val d = b.join(c)   //Tab成功
// d: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[8] at join at <console>:27

scala> d.glom.collect
// collect   collectAsync

scala> d.glom.collect
// res6: Array[Array[(String, (Int, Int))]] = Array(Array((sun,(15,15))), Array(), Array((scala,(13,13)), (hello,(11,11)), (java,(12,12)), (kafka,(14,14))), Array())

五、DAG工作原理

(1)根据RDD之间的依赖关系,形成一个DAG(有向无环图)

(2)DAGScheduler将DAG划分为多个Stage

        划分依据:是否发生宽依赖(Shuffle)

        划分规则:从后往前,遇到宽依赖切割为新的Stage

        每个Stage由一组并行的Task组成的

六、Shuffle实践

最佳实践

        提前部分聚合减少数据移动

        尽量避免Shuffle  

      

七、RDD优化

  • RDD持久化
  • RDD共享变量
  • RDD分区设计
  • 数据倾斜

(一)RDD持久化

(1)RDD缓存机制:缓存数据至内存/磁盘,可大幅度提升Spark应用性能

(2)缓存策略StorageLevel

①RDD存储级别介绍(StorageLevel )

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

②缓存应用场景

从文件加载数据之后,因为重新获取文件成本较高

经过较多的算子变换之后,重新计算成本较高

单个非常消耗资源的算子之后

③使用注意事项

Cache() 或persist() 后不能再有其他的算子

Cache() 或persist() 遇到Action算子完成后才生效

④操作

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object CacheDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("cachedemo")
    val sc: SparkContext = SparkContext.getOrCreate(conf)
    val rdd: RDD[String] = sc.textFile("in/users.csv")
    //    stack堆栈      heap栈
//    rdd.cache()   //设置默认缓存Memory_Only
    rdd.persist(StorageLevel.MEMORY_ONLY)
    val value: RDD[(String, Int)] = rdd.map(x=>(x,1))
//    val start: Long = System.currentTimeMillis()
//    println(value.count())
//    val end: Long = System.currentTimeMillis()
//    println("1:"+(end-start))
    for (i<- 1 to 10){
      val start: Long = System.currentTimeMillis()
      println(value.count())
      val end: Long = System.currentTimeMillis()
      println(i+":"+(end-start))
      Thread.sleep(10)
      if (i>6){
        rdd.unpersist()
      }
    }
  }
}

(3)检查点:类似于快照

sc.setCheckpointDir("hdfs:/checkpoint0918")
val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4)))
rdd.checkpoint
rdd.collect //生成快照
rdd.isCheckpointed
rdd.getCheckpointFile

(4)检查点与缓存的区别

        检查点会删除RDD lineage,而缓存不会

        SparkContext被销毁后,检查点数据不会被删除

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CheckPointDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("cachedemo")
    val sc: SparkContext = SparkContext.getOrCreate(conf)
    sc.setCheckpointDir("file://文件路径")
    val rdd: RDD[Int] = sc.parallelize(1 to 20)
    rdd.checkpoint()
    rdd.glom().collect.foreach(x=>println(x.toList))
    println(rdd.isCheckpointed)
    println(rdd.getCheckpointFile)
  }
}

(二)RDD共享变量

(1)广播变量:允许开发者将一个只读变量(Driver)缓存到每个节点(Executor)上,而不是每个任务传递一个副本

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object BroadCastDemo {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("cachedemo")
    val sc: SparkContext = SparkContext.getOrCreate(conf)

    var arr = Array("hello","hi","Come here")//一维数组
    var arr2 = Array((1,"hello"),(2,"hello"),(3,"hi"))//二维数组
    //广播变量
    val broadcastVar: Broadcast[Array[String]] = sc.broadcast(arr)
    val broadcastVar2: Broadcast[Array[(Int, String)]] = sc.broadcast(arr2)
    //普通RDD
    val rdd: RDD[(Int, String)] = sc.parallelize(Array((1,"leader"),(2,"teamleader"),(3,"worker")))
    val rdd2: RDD[(Int, String)] = rdd.mapValues(x => {
      println("value is:" + x)
//      broadcastVar.value(0) + ":" + x
      broadcastVar2.value(1)._2 + ":" + x
    })
    rdd2.collect.foreach(println)
  }
}

(2)累加器:只允许added操作,常用于实现计算

(三)RDD分区设计

(1)分区大小限制为2GB

(2)分区太少

        不利于并发

        更容易数据倾斜影响

        groupBy,reduceByKey,sortByKey等内存压力增大

(3)分区过多

        Shuffle开销越大

        创建任务开销越大

(4)经验

        每个分区大约128MB

        如果分区小于但接近2000,则设置为大于2000

(四)数据倾斜

1、指分区中的数据分配不均匀,数据集中在少数分区中

        严重影响性能

        通常发生在groupBy,jion等之后

2、解决方案

        使用新的Hash值(如对key加盐)重新分区

3、实战

[root@kb23 jars]# pwd

/opt/soft/spark312/examples/jars

[root@kb23 jars]# ls

scopt_2.12-3.7.1.jar  spark-examples_2.12-3.1.2.jar

[root@kb23 sbin]# pwd

/opt/soft/spark312/sbin

[root@kb23 sbin]# start-all.sh

[root@kb23 spark312]# ./bin/spark-submit --master spark://192.168.91.11:7077 --class org.apache.spark.examples.SparkPi ./examples/jars/spark-examples_2.12-3.1.2.jar 100

Pi is roughly 3.1407527140752713

重新分区(../spark312目录)

./bin/spark-submit \
--master spark://192.168.78.131:7077 
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
--class org.apache.spark.examples.SparkPi \
./examples/jars/spark-examples_2.12-3.1.2.jar \
1000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1076948.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【分享】获取微信通讯录python代码形式实现

具体流程就是&#xff1a; 1. 打开微信 2. 点击通讯录 3. 滚动鼠标到最顶部&#xff08;防止已经滚动了一部分了&#xff09; 4. 获取联系人列表 5. 找到最后一个空格所在的位置&#xff08;后一个就是真正的联系人了&#xff09; 6. 点击第一个联系人 7.记录下上一个联…

Docker-compose创建LNMP服务并运行Wordpress网站平台

一、部署过程 1.安装Docker #关闭防火墙 systemctl stop firewalld.service setenforce 0#安装依赖包 yum install -y yum-utils device-mapper-persistent-data lvm2 #设置阿里云镜像源 yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/d…

联邦学习中的数据非独立同分布问题

联邦学习中的数据非独立同分布&#xff08;Non-IID&#xff09; 从集中式机器学习到联邦机器学习 集中式模型&#xff1a;传统的集中式机器学习是将所有的数据收集到服务器端&#xff0c;在服务器端统一进行模型训练和处理&#xff0c;并将预测的结果分发给用户。但将数据上传…

树莓派ubuntu上配置miniconda并创建虚拟环境

树莓派安装ubuntu和miniconda配置 本文所配置环境为&#xff1a;树莓派4B安装的系统为ubuntu 22 server&#xff0c;所配置的miniconda版本为4.2&#xff0c;python版本3.8。在此之前要清楚树莓派4B已经将处理器从arm架构换成了aarch64架构&#xff0c;所以能够使用最新的aarc…

基于SpringBoot的音乐网站

目录 前言 一、技术栈 二、系统功能介绍 用户信息管理 歌曲分类管理 歌曲信息管理 轮播图管理 歌曲信息 歌曲评论 用户注册 三、核心代码 1、登录模块 2、文件上传模块 3、代码封装 前言 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施…

你必须知道的数据查询途径!!

在当今信息爆炸的时代&#xff0c;我们每天都会面临海量的数据和信息。如何在这些繁杂的信息中快速、准确地找到自己需要的内容&#xff0c;也是当代一个非常重要的技能。下面&#xff0c;我将介绍几种你必须知道的企业数据信息查找途径。 ​ 1. 搜索引擎 搜索引擎是我们日常中…

巴州阳光公益人的国庆

原创 何素平 巴州阳光志愿者服务协会 2023-10-10 15:01 发表于新疆 在中华人民共和国成立74周年之际&#xff0c;巴州阳光公益机构的社工志愿者在党支部的组织下&#xff0c;抒发爱国之情&#xff0c;砥砺强国之志&#xff0c;共同祝愿国家繁荣富强&#xff0c;祝福各族人民幸…

JShaman JavaScript混淆加密工具,中英版本区别

JShaman JavaScript混淆加密工具&#xff0c;中英版本区别如下。 中文版&#xff0c;配置简单&#xff0c;网站功能多&#xff0c;支持代码提交、文件上传、WebAPI&#xff1b; 英文版&#xff0c;配置项较多&#xff0c;网站功能简约&#xff0c;不支持文件上传&#xff0c;…

vue接入高德地图获取经纬度

&#x1f90d;step1:高德地图开放平台&#xff0c;根据指引注册成为高德开放平台开发者&#xff0c;并申请 web 平台&#xff08;JS API&#xff09;的 key 和安全密钥; &#x1f90d;step2:在html引入安全密钥&#xff08;获取经纬度用&#xff0c;不然会报错&#xff09; <…

3款国产办公软件,不仅好用,还支持linux国产操作系统

当提到国产办公软件并支持Linux国产操作系统时&#xff0c;以下是三款备受好评的软件&#xff1a; 1. WPS Office&#xff08;金山办公套件&#xff09; WPS Office是中国知名的办公软件套件&#xff0c;也是一款跨平台的应用程序。它包含文字处理、表格编辑和演示文稿等常见办…

「新房家装经验」客厅电视高度标准尺寸及客厅电视机买多大尺寸合适?

客厅电视悬挂高度标准尺寸是多少&#xff1f; 客厅电视悬挂高度通常在90~120厘米之间&#xff0c;电视挂墙高度也可以根据个人的喜好和实际情况来调整&#xff0c;但通常不宜过高&#xff0c;以坐在沙发上观看时眼睛能够平视到电视中心点或者中心稍微往下一点的位置为适宜。 客…

cesium图标漂移分析与解决

漂移现象如下 什么是图标漂移&#xff1f; 随着视野改变&#xff0c;图标相对于地面发生了相对位置的变化 让人感觉到图标有飘忽不定的感觉 原因分析 图标是静止的&#xff0c;它的位置在世界坐标系中是绝对的、静止的。 漂移大部分的原因是&#xff1a; 透视关系发生了错…

漏电继电器 LLJ-630F φ100 导轨安装 分体式结构 LLJ-630H(S) AC

系列型号&#xff1a; LLJ-10F(S)漏电继电器LLJ-15F(S)漏电继电器LLJ-16F(S)漏电继电器 LLJ-25F(S)漏电继电器LLJ-30F(S)漏电继电器LLJ-32F(S)漏电继电器 LLJ-60F(S)漏电继电器LLJ-63F(S)漏电继电器LLJ-80F(S)漏电继电器 LLJ-100F(S)漏电继电器LLJ-120F(S)漏电继电器LLJ-125F(S…

【工具】SSH端口转发管理器,专门管理SSH Port Forwarding

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhang.cn] 开源代码看这里&#xff1a;http://xfxuezhang.cn/index.php/archives/1151/ 背景介绍 有时候需要用到ssh的端口转发功能。目前来说&#xff0c;要么是cmd里手敲指令&#xff0c;但每次敲也太麻烦了&#xff1b;或…

烟感监控小技巧,这才是最高级的方法!

商业大厦是现代城市中不可或缺的一部分&#xff0c;它们承载着大量的人员和财产。因此&#xff0c;火灾安全一直是商业大厦管理者和业主们极为关注的重要议题。 因此&#xff0c;为了及时发现并迅速应对火灾威胁&#xff0c;商业大厦越来越倾向于采用高效、智能的烟感监控系统。…

springboot-配置文件优先级

官方文档 https://docs.spring.io/spring-boot/docs/2.7.16/reference/htmlsingle/#features.external-config Spring Boot允许外部化配置&#xff0c;这样就可以在不同的环境中使用相同的应用程序代码。您可以使用各种外部配置源&#xff0c;包括Java属性文件、YAML文件、环境…

PTA 7-4 包装机(单调栈)

题目 一种自动包装机的结构如图 1 所示。首先机器中有 N 条轨道&#xff0c;放置了一些物品。轨道下面有一个筐。当某条轨道的按钮被按下时&#xff0c;活塞向左推动&#xff0c;将轨道尽头的一件物品推落筐中。当 0 号按钮被按下时&#xff0c;机械手将抓取筐顶部的一件物品&…

ssti 常见注入模块利用

文件读取 python脚本&#xff08;这里以重庆橙子科技jinjia2模板注入为例&#xff09; import requests url http://39.104.177.130:18080/flaskBasedTests/jinja2/ for i in range(500):data {"name":"{{().__class__.__base__.__subclasses__()["str…

面试经典 150 题 22 —(数组 / 字符串)— 28. 找出字符串中第一个匹配项的下标

28. 找出字符串中第一个匹配项的下标 方法一 class Solution { public:int strStr(string haystack, string needle) {if(haystack.find(needle) string::npos){return -1;}return haystack.find(needle);} };方法二 class Solution { public:int strStr(string haystack, s…

TensorFlow学习:使用官方模型进行图像分类、使用自己的数据对模型进行微调

前言 上一篇文章 TensorFlow案例学习&#xff1a;对服装图像进行分类 中我们跟随官方文档学习了如何进行预处理数据、构建模型、训练模型等。但是对于像我这样的业余玩家来说训练一个模型是非常困难的。所以为什么我们不站在巨人的肩膀上&#xff0c;使用已经训练好了的成熟模…