RDD算子(四)、血缘关系、持久化

news2025/1/22 20:51:20

1. foreach

分布式遍历每一个元素,调用指定函数

val rdd = sc.makeRDD(List(1, 2, 3, 4))
rdd.foreach(println)

结果是随机的,因为foreach是在每一个Executor端并发执行,所以顺序是不确定的。如果采集collect之后再调用foreach打印,则是在Driver端执行。

RDD的方法之所以叫算子,就是为了与scala集合的方法区分开来, scala集合的方法是在同一个节点执行的,而RDD的算子则是在Executor(分布式节点)执行的。从计算的角度讲,RDD算子外部的操作都是在Driver端执行,算子内部的操作都是在Executor端执行。

2. 闭包检测

class User {
    var age : Int = 30
}
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val user = new User()
rdd.foreach(num => {
    println("age = " + (user.age + num))
})

因为foreach内部的操作是在Executor上执行的,所以在Driver上创建的user需要传递给各个Executor,如果user没有序列化,则会报错

class User extends Serializable{
    var age : Int = 30
}

或者将User变为样例类,因为样例类在编译时会自动混入序列化特质(实现可序列化接口)

case class User {
    var age : Int = 30
}

如果把原始集合变为空,依然会报错,这是因为RDD算子中传递的函数会包含闭包操作(匿名函数,算子内用到了算子外的数据),所以会进行闭包检测,即检查里面的变量是否序列化

val rdd = sc.makeRDD(List[Int]())
val user = new User()
rdd.foreach(num => {
    println("age = " + (user.age + num))
})

 再看如下案例:

class Search(query:String) {
    def isMatch(s:String): Boolean {
        s.contains(query)
    }

    def getMatch1(rdd: RDD[String]): RDD[String] {
        rdd.filter(isMatch)
    }

    def getMatch2(rdd: RDD[String]): RDD[String] {
        rdd.filter(x => x.contains(query))
    }
}
val rdd = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
val search = new Search("h")
search.getMatch1(rdd).collect().foreach(println)

此时会报错Search类没有序列化,因为在rdd的filter算子内调用了query,而query作为类的构造参数,实际上是类的私有变量,isMatch方法相当于:

def isMatch(s:String): Boolean {
    s.contains(this.query)
}

this相当于类的对象,因此需要进行闭包检测。getMatch2也有类似的问题。除了将类序列化以及改为样例类之外,还可以将query赋给方法内部的临时变量:

def getMatch2(rdd: RDD[String]): RDD[String] {
    val s = query
    rdd.filter(x => x.contains(s))
}

3. 依赖关系

 每个RDD会保存血缘关系(不会保存数据),这样提高了容错性,因为如果其中某个RDD转换到另一个RDD失败了,就可以根据血缘关系来重新读取。RDD保存依赖关系而示意图如下:

血缘关系展示代码:

val lines : RDD[String] = sc.textFile("datas")
println(lines.toDebugString)
println("******************")

val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.toDebugString)
println("******************")

val wordToOne = words.map(word=>(word, 1))
println(wordToOne.toDebugString)
println("******************")

val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.toDebugString)
println("******************")

查看依赖关系只需直接将代码中的toDebugString改为dependencies即可

val lines : RDD[String] = sc.textFile("datas")
println(lines.dependencies)
println("******************")

val words : RDD[String] = lines.flatMap(_.split(" "))
println(words.dependencies)
println("******************")

val wordToOne = words.map(word=>(word, 1))
println(wordToOne.dependencies)
println("******************")

val wordToSum : RDD[(String, Int] = wordToOne.reduceByKey(_+_)
println(wordToSum.dependencies)
println("******************")

​​​​​​​

 一对一的依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的一个分区的数据,也叫窄依赖,而Shuffle依赖关系表示新的RDD的一个分区的数据来源于旧的RDD的多个分区的数据,也叫宽依赖。

4. 阶段和任务划分

窄依赖中,分区有多少个,就有多少个任务,只有一个阶段;宽依赖中,有两个阶段,每个阶段的任务数等于分区数。

RDD阶段由有向无环图(DAG)表示

Application->Job->Stage->Task每一个层级是1对n的关系。

每个Application中可能会提交多个作业,一个作业会划分为多个阶段(阶段数=宽依赖个数+1),一个阶段可能因为多个分区而包含多个任务,一个阶段中的任务数=最后一个RDD的分区数。

5. cache和persist

如果对于同一份数据源,想做多个不同的功能(比如统计单词数以及根据单词分组),这些不同的功能在实现过程中有很多重复的步骤(比如很多相同的RDD转换),此时可能会引入性能问题。虽然看起来RDD转换的过程复用了,但是RDD不存储数据,只有逻辑,所以最终的行为算子会从头开始再读取相同的数据,比如下面代码:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")

val groupRDD = mapRDD.groupByKey()

groupRDD.collect.foreach(println)

 

可以看到,在一行*上下,@都执行了,说明数据从头开始读取,从最开始的RDD再次执行。 

为了解决这种性能问题,可以对mapRDD里的数据进行缓存,要么缓存在内存中,要么缓存在磁盘中,得看具体情况,这就是RDD的持久化操作。使用cache方法缓存在内存中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

mapRDD.cache()

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")

val groupRDD = mapRDD.groupByKey()

groupRDD.collect.foreach(println)

 

放在内存中可能不安全,使用persist方法缓存在磁盘中:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

mapRDD.persist(StorageLevel.DISK_ONLY)

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")

val groupRDD = mapRDD.groupByKey()

groupRDD.collect.foreach(println)

注意:持久化操作也是等到行动算子触发才会真正执行。持久化操作不一定都是为了重用才引入的,有些情况下,前面一些RDD转换操作耗时很长或者数据很重要的场合,也可以进行持久化操作,这样一旦中间出了问题,重新执行任务不至于再执行之前耗时很长的操作。

除了以上的cache和persist方法,还可以使用检查点(checkpoint)的方法进行持久化操作。checkpoint需要落盘,因此需要指定存储路径,之前的persist方法也要落盘,只不过它存储在临时路径,任务执行完就会删除,而checkpoint是永久化存储

sc.setCheckPointDir("cp")

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

mapRDD.checkpoint()

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")

val groupRDD = mapRDD.groupByKey()

groupRDD.collect.foreach(println)

但是checkpoint会单独再开启一个作业,因此效率可能更低。但是与cache联合执行,即先cache,再checkpoint,就不会开启新的作业。

另外,使用cache方法会改变RDD的血缘关系:

val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

mapRDD.checkpoint()
println(mapRDD.toDebugString)

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")
println(mapRDD.toDebugString)


 

 

可以看到,cache方法(其实persis方法也会) 在血缘关系中添加新的依赖(原来的依赖还保留)。但是checkpoint方法会改变原来的血缘关系,建立新的血缘关系(等同于数据源变了):

sc.setCheckPointDir("cp")


val data : RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))

val flatRDD : RDD[String] = data.flatMap(_.split(" "))

val mapRDD = flatRDD.map(word=>{
    println("@@@@@@@@@@@@@@@")
    (word, 1)
})

mapRDD.checkpoint()
println(mapRDD.toDebugString)

val reduceRDD : RDD[(String, Int)] = mapRDD.reduceByKey(_+_)

reduceRDD.collect.foreach(println)

println("****************")
println(mapRDD.toDebugString)


 

6. 自定义分区器

class MyPartitioner extends Partitioner {
    override def numPartitions : Int = n  //自定义,可以写死

    override def getPartition(key : Any) : Int = {
        key match {
            case "xxx" => 0
            case _ => 1
        }
    }
    
}

分区器传给RDD:

val partitionRDD = rdd.partitionBy(new MyPartitioner)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1571361.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用CSS计数器,在目录名称前加上了序号,让目录看起来更加井然有序

目录(Text of Contents缩写为TOC),其实就是一篇文章的概要或简述。这好比,去书店买书,先是被这本书的标题所吸引,而后我们才会,翻开这本书目录,看看这本书主要是在讲些什么&#xff…

Claude 3 on Amazon Bedrock 结合多智能体助力 Altrubook AI 定义消费者 AI 新范式

关于 Altrubook AI 智能消费决策机器人 Altrubook 是全球首创场景化智能决策机器人,由国内外大厂等前员工共同研发,具有定制化 IP 决策机器人、沉浸式购物体验和需求匹配优化等独特优势。目前,Altrubook AI 已完成与 Claude 3 on Amazon Bedr…

【深度优先】【树上倍增 】2846. 边权重均等查询

本文涉及知识点 深度优先 树上倍增 LeetCode2846. 边权重均等查询 现有一棵由 n 个节点组成的无向树,节点按从 0 到 n - 1 编号。给你一个整数 n 和一个长度为 n - 1 的二维整数数组 edges ,其中 edges[i] [ui, vi, wi] 表示树中存在一条位于节点 ui…

备战蓝桥杯---贡献法刷题

话不多说,直接看题: 什么是贡献法?这是一种数学思想,就是看每一个元素对总和的贡献。 1. 我们可以先枚举区间再统计次数,但这显然TLE。我们可以发现,每一个孤独的区间对应一个孤独的牛,因此我…

注意力机制篇 | YOLOv8改进之添加多尺度全局注意力机制DilateFormer(MSDA)| 即插即用

前言:Hello大家好,我是小哥谈。多尺度全局注意力机制DilateFormer是一种用图像识别任务的深度学习模型。它是在Transformer模型的基础上进行改进的,旨在提高模型对图像中不同尺度信息的感知能力。DilateFormer引入了多尺度卷积和全局注意力机制来实现多尺度感知。具体来说,…

考古:IT架构演进之IOE架构

考古:IT架构演进之IOE架构 IOE架构(IBM, Oracle, EMC)出现在20世纪末至21世纪初,是一种典型的集中式架构体系。在这个阶段,企业的关键业务系统往往依赖于IBM的小型机(后来还包括大型机)、Oracle…

题库管理系统-基于Springboot实现JAVA+SQL离散数学题库管理系统(完整源码+LW+翻译)

基于Springboot实现JAVASQL离散数学题库管理系统(完整源码LW翻译) 概述: 本系统具体完成的功能如下: 1题库的管理与维护:新题的录入,修改,删除等功能。 2生成试卷:包括自动生成与手工改动,要…

ThingsBoaed、系统模块层级讲解

系统管理员能够使用租户配置文件为多个租户配置通用设置。每个租户在单个时间点都拥有唯一的个人资料。 让我们一一查看租户配置文件中的可用设置。 配置文件配置 这些设置允许系统管理员配置对租户创建的实体数量的限制,设置每月最大消息数、API 调用数的限制&…

Vue - 1( 13000 字 Vue 入门级教程)

一:Vue 1.1 什么是 Vue Vue.js(通常称为Vue)是一款流行的开源JavaScript框架,用于构建用户界面。Vue由尤雨溪在2014年开发,是一个轻量级、灵活的框架,被广泛应用于构建单页面应用(SPA&#xf…

java自动化测试学习-03-06java基础之运算符

运算符 算术运算符 运算符含义举例加法,运算符两侧的值相加ab等于10-减法,运算符左侧减右侧的值a-b等于6*乘法,运算符左侧的值乘以右侧的值a*b等于16/除法,运算符左侧的值除以右侧的值a/b等于4%取余,运算符左侧的值除…

matlab使用教程(34)—求解时滞微分方程(2)

1.具有状态依赖时滞的 DDE 以下示例说明如何使用 ddesd 对具有状态依赖时滞的 DDE(时滞微分方程)方程组求解。Enright 和Hayashi [1] 将此 DDE 方程组用作测试问题。方程组为: 方程中的时滞仅出现在 y 项中。时滞仅取决于第二个分量 y 2 t …

每日面经分享(Git经典题目,Git入门)

1. GitHub是什么 a. Git是一个分布式版本控制系统,作用是跟踪、管理和协调软件开发项目中的代码更改。 b. 提供了一种有效的方式来管理代码的版本历史,以及多人协作开发的能力。 2. Git的作用有哪些 a. 版本控制:Git可以记录每次代码更改的…

政安晨:【Keras机器学习实践要点】(十六)—— 图像分类从零开始

目录 简介 设置 加载数据:猫与狗数据集 原始数据下载 滤除损坏的图像 生成数据集 将数据可视化 使用图像数据增强 数据标准化 预处理数据的两个选项 配置数据集以提高性能 建立模型 训练模型 对新数据进行推理 政安晨的个人主页:政安晨 欢…

【快捷部署】011_PostgreSQL(16)

📣【快捷部署系列】011期信息 编号选型版本操作系统部署形式部署模式复检时间011PostgreSQL16Ubuntu 20.04Docker单机2024-03-28 一、快捷部署 #!/bin/bash ################################################################################# # 作者&#xff1…

【二分查找】Leetcode 二分查找

题目解析 二分查找在数组有序可以使用,也可以在数组无序的时候使用(只要数组中的一些规律适用于二分即可) 704. 二分查找 算法讲解 当left > right的时候,我们循环结束,但是当left和right缩成一个点的时候&#x…

DDR3接口

mig IP核的配置 首先添加mig IP核   然后确认以下工程信息,主要是芯片型号以及编译环境,没什么问题后点击next.   如下图所示,这一页选择"Create Design",在"Component Name"一栏设置该IP元件的名称&…

Redis数据库——群集(主从、哨兵)

目录 前言 一、主从复制 1.基本原理 2.作用 3.流程 4.搭建主动复制 4.1环境准备 4.2修改主服务器配置 4.3从服务器配置(Slave1和Slave2) 4.4查看主从复制信息 4.5验证主从复制 二、哨兵模式——Sentinel 1.定义 2.原理 3.作用 4.组成 5.…

59 使用 uqrcodejs 生成二维码

前言 这是一个最近的一个来自于朋友的需求, 然后做了一个 基于 uqrcodejs 来生成 二维码的一个 demo package.json 中增加以依赖 "uqrcodejs": "^4.0.7", 测试用例 <template><div class"hello"><canvas id"qrcode&qu…

代码随想录-算法训练营day02【滑动窗口、螺旋矩阵】

专栏笔记&#xff1a;https://blog.csdn.net/weixin_44949135/category_10335122.html https://docs.qq.com/doc/DUGRwWXNOVEpyaVpG?uc71ed002e4554fee8c262b2a4a4935d8977.有序数组的平方 &#xff0c;209.长度最小的子数组 &#xff0c;59.螺旋矩阵II &#xff0c;总结 建议…

[中级]软考_软件设计_计算机组成与体系结构_08_输入输出技术

输入输出技术 前言控制方式考点往年真题 前言 输入输出技术就是IO技术 控制方式 程序控制(查询)方式&#xff1a;分为无条件传送和程序查询方式两种。 方法简单&#xff0c;硬件开销小&#xff0c;但I/O能力不高&#xff0c;严重影响CPU的利用率。 程序中断方式&#xff1…