Spark----RDD(弹性分布式数据集)

news2024/9/21 16:46:24

RDD

文章目录

  • RDD
    • RDD是什么?
    • 为什么需要RDD?
    • RDD的五大属性
    • WordCount中的RDD的五大属性
    • 如何创建RDD?
    • RDD的操作
      • 两种
      • 基本算子/操作/方法/API
      • 分区操作
      • 重分区操作
      • 聚合操作
        • 四个有key函数的`区别`
      • 关联操作
      • 排序操作
  • RDD的缓存/持久化
    • cache和persist
    • checkpoint检查点
      • APl
  • 共享变量
  • Shuffle本质

RDD是什么?

RDD是弹性分布式数据集(Resilient Distributed Dataset)的缩写,它是Spark中最基本的数据抽象,用来表示分布式集合,支持分布式操作。RDD有以下特点:

  • RDD是不可变的,也就是说,一旦创建了一个RDD,就不能对它进行修改或删除,只能通过转换操作(transformation)生成新的RDD。
  • RDD是可分区的,也就是说,一个RDD可以被切分成多个小块(partition),每个小块可以在集群中的不同节点上并行计算。
  • RDD是可并行计算的,也就是说,RDD支持一系列的高级操作(action),比如map、filter、reduce等,这些操作可以在每个分区上并行执行,并返回结果。

为什么需要RDD?

因为RDD可以提高数据处理的效率容错性。相比于传统的MapReduce框架,RDD有以下优势:

  • RDD可以支持内存计算,也就是说,RDD可以将数据缓存在内存中,避免了频繁的磁盘I/O,从而提高了速度。
  • RDD可以支持多次迭代计算,也就是说,RDD可以在多个操作之间重用中间结果,避免了重复的数据读取和计算,从而提高了效率。
  • RDD可以支持容错机制,也就是说,RDD可以通过血统(lineage)记录数据的转换过程,当某个分区发生故障时,可以根据血统重新计算该分区的数据,从而恢复数据。
    • RDD的血统(lineage)是指RDD之间的依赖关系,也可以叫做RDD的逻辑执行计划。RDD的血统是由一系列的转换操作组成的,每个转换操作都会产生一个新的RDD,并记录它的父RDD和转换函数。RDD的血统可以用来追踪数据的来源和变化过程,也可以用来在节点故障时恢复丢失的数据分区。RDD的血统可以通过toDebugString方法查看,它会返回一个字符串,表示RDD的依赖图。

RDD的五大属性

RDD的五大属性是:

  • 一组分片(Partition),即数据集的基本组成单位。每个分片可以在集群中的不同节点上并行计算。每个分片都会被一个计算任务处理,并决定并行计算的粒度。
  • 一个计算每个分区的函数(compute)。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。该函数会对每个分区的数据进行转换或操作,并返回一个迭代器。
  • RDD之间的依赖关系(dependencies)。RDD可以通过转换操作生成新的RDD(每次转换都会生成一个新的RDD),从而形成一个有向无环图(DAG)。当某个分区发生故障时,可以根据依赖关系重新计算该分区的数据。
  • 一个分区器(partitioner),即RDD的分片函数。该函数决定了RDD的分片数量和分片方式。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
  • 一个优先位置列表(preferred locations),即每个分区的最佳计算位置。该列表通常是根据数据源的位置来确定的,比如HDFS文件的块位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置,以减少数据传输开销。

WordCount中的RDD的五大属性

在这里插入图片描述

如何创建RDD?

RDD中的数据可以来源于2个地方:本地集合或外部数据源

  1. 多种API
    sc.parallelize(本地集合,分区数)
    sc.makeRDD(本地集合,分区数) //底层使用的parallelize
    sc.textFile(本地/HDFS文件/文件夹,分区数) //注意不要用它读取大量小文件

    sc.wholeTextFiles(本地/HDFS文件夹,分区数) //专门用来读取小文件的

  2. 获取RDD分区数
    rdd.getNumPartitions //获取rdd的分区数,底层是partitions.lengthrdd.partitions.length //获取rdd的分区数

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDDemo1 {
  def main(args: Array[String]): Unit = {
    //创建环境
    val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //创建RDD
    val rdd1: RDD[Int] = sc.parallelize(1 to 10) //8
    val rdd2: RDD[Int] = sc.parallelize(1 to 10,3) //3

    val rdd3: RDD[Int] = sc.makeRDD(1 to 10)//底层是parallelize //8
    val rdd4: RDD[Int] = sc.makeRDD(1 to 10,4) //4
      
    //RDD[一行行的数据]
    val rdd5: RDD[String] = sc.textFile("data/input/words.txt")//2
    val rdd6: RDD[String] = sc.textFile("data/input/words.txt",3)//3
    //RDD[一行行的数据]
      //ratings10文件夹中有10个小文件
    val rdd7: RDD[String] = sc.textFile("data/input/ratings10")//10
    val rdd8: RDD[String] = sc.textFile("data/input/ratings10",3)//10
      
    //RDD[(文件名, 一行行的数据),(文件名, 一行行的数据)....]
    val rdd9: RDD[(String, String)] = sc.wholeTextFiles("data/input/ratings10")//2
    val rdd10: RDD[(String, String)] = sc.wholeTextFiles("data/input/ratings10",3)//3

    println(rdd1.getNumPartitions)//8 //底层partitions.length
    println(rdd2.partitions.length)//3
    println(rdd3.getNumPartitions)//8
    println(rdd4.getNumPartitions)//4
    println(rdd5.getNumPartitions)//2
    println(rdd6.getNumPartitions)//3
    println(rdd7.getNumPartitions)//10
    println(rdd8.getNumPartitions)//10
    println(rdd9.getNumPartitions)//2
    println(rdd10.getNumPartitions)//3

  }
}

RDD的操作

两种

  1. Transformation转换操作:返回一个新的RDD

    which create a new dataset from an existing one

    所有Transformation函数都是Lazy,不会立即执行,需要Action函数触发

  2. Action动作操作:(返回值不是RDD无返回值或返回其他的)

    which return a value to the driver program after

    running a computation on the datase
    所有Action函数立即执行(Eager),比如count、first.collect、take等

在这里插入图片描述

基本算子/操作/方法/API

map,faltMap,filter,foreach,saveAsTextFile

import org.apache.commons.lang3.StringUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDDemo2 {
  def main(args: Array[String]): Unit = {
    // 创建环境
    val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    // 创建RDD
    val lines: RDD[String] = sc.textFile("data/input/words.txt") //2

    // transformation
    val result: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)

    // 输出/action
    result.foreach(println)
    result.saveAsTextFile("data/output/result4")
  }
}

分区操作

每个RDD由多分区组成的,实际开发中如果涉及到资源相关操作建议对每个分区进行操作,即:
使用mapPartitions代替map函数
使用foreachPartition代替foreache函数

  • map和mapPartitions都是转换操作,它们的区别是map是对每个元素应用一个函数,而mapPartitions是对每个分区的迭代器应用一个函数
  • mapPartitions的优势是可以减少函数调用的次数,提高性能,尤其是在涉及数据库连接等IO操作时,可以在每个分区只建立一次连接。
  • mapPartitions的劣势是可能会造成内存溢出,因为它一次要处理一个分区的所有数据,如果数据量太大,就会占用过多的内存空间。而map是一条一条地处理数据,不会有这个问题。
  • foreach和foreachPartition都是动作操作,它们的区别是foreach是对每个元素执行一个函数,而foreachPartition是对每个分区的迭代器执行一个函数
  • foreachPartition的优势和劣势与mapPartitions类似,都是可以减少函数调用次数,提高性能,但也可能造成内存溢出。
  • foreach和foreachPartition一般用于在程序末尾将数据落地到存储系统中,如mysql,es或hbase中。

代码示例

  • map函数的例子:
//假设有一个RDD[Int],每个元素都加1
val rdd = sc.parallelize (List (1, 2, 3, 4, 5))
val result = rdd.map (x => x + 1) //使用匿名函数
result.collect () // Array (2, 3, 4, 5, 6)
  • mapPartitions函数的例子:
//假设有一个RDD[Int],每个分区的元素都乘以分区号
val rdd = sc.parallelize (List (1, 2, 3, 4, 5), 2) //分成两个分区
val result = rdd.mapPartitions ((iter, pid) => iter.map (x => x * pid)) //使用匿名函数
result.collect () // Array (0, 0, 6, 8, 10)
  • foreach函数的例子:
//假设有一个RDD[String],打印每个元素
val rdd = sc.parallelize (List ("a", "b", "c", "d", "e"))
rdd.foreach (x => println (x)) //使用匿名函数
//输出:
a
b
c
d
e
  • foreachPartition函数的例子:
//假设有一个RDD[String],打印每个分区的元素个数
val rdd = sc.parallelize (List ("a", "b", "c", "d", "e"), 2) //分成两个分区
rdd.foreachPartition (iter => println (iter.size)) //使用匿名函数
//输出:
3
2

使用mapPartitions和foreachPartition的主要目的

mapPartitions和foreachPartition的主要目的是为了减少函数调用的次数,提高性能。因为在Spark中,每个分区都是一个任务,每个任务都会在一个Executor上运行。如果使用map或foreach,那么每个元素都会调用一次函数,这会增加函数调用的开销,尤其是在涉及数据库连接等IO操作时,每个元素都会创建和关闭一个连接,这会降低性能。而如果使用mapPartitions或foreachPartition,那么每个分区只会调用一次函数,这样就可以在每个分区只建立一次连接,减少函数调用的开销,提高性能。

重分区操作

重分区操作是指改变RDD的分区数,有两种常用的方法:repartitioncoalesce

在这里插入图片描述

  • repartition是通过shuffle将数据重新分布到指定数量的分区,可以增加或减少分区数,也可以改变数据的分布。
  • coalesce是通过合并或拆分现有的分区来改变分区数,可以减少分区数,也可以增加分区数(需要开启shuffle),但不会改变数据的分布。
  • repartition的优势是可以平衡数据的负载,避免数据倾斜,也可以根据某些列来进行分区,提高后续操作的效率。
  • repartition的劣势是会触发shuffle操作,消耗网络和磁盘资源,降低性能。
  • coalesce的优势是可以减少分区数,减少任务调度的开销,也可以避免shuffle操作,提高性能。
  • coalesce的劣势是不能平衡数据的负载,可能导致数据倾斜,也不能根据某些列来进行分区。

代码示例

  • repartition的例子:
//假设有一个RDD[Int],分成3个分区
val rdd = sc.parallelize(List(1,2,3,4,5,6),3)
//查看分区数
rdd.partitions.length // 3
//使用repartition增加分区数到6
val rdd2 = rdd.repartition(6)
//查看分区数
rdd2.partitions.length // 6
//使用repartition减少分区数到2
val rdd3 = rdd.repartition(2)
//查看分区数
rdd3.partitions.length // 2
  • coalesce的例子:
//假设有一个RDD[Int],分成3个分区
val rdd = sc.parallelize(List(1,2,3,4,5,6),3)
//查看分区数
rdd.partitions.length // 3
//使用coalesce减少分区数到2,不开启shuffle
val rdd2 = rdd.coalesce(2)
//查看分区数
rdd2.partitions.length // 2
//使用coalesce增加分区数到4,开启shuffle
val rdd3 = rdd.coalesce(4,true)
//查看分区数
rdd3.partitions.length // 4

聚合操作

聚合操作可以按有key的聚合函数和无key的聚合函数分类:

  • 有key的聚合函数是指对PairRDD中的数据,按照key进行分组或合并的函数,例如reduceByKey、groupByKey、foldByKey、aggregateByKey等。这些函数需要传入一个或多个函数作为参数,来指定如何对每个key的value进行聚合。有key的聚合函数返回的结果仍然是PairRDD,即key-value对。

    • 四个有key函数的用法如下:

      • reduceByKey是指对PairRDD中的数据,按照key进行分组,然后对每个key的value使用一个函数进行聚合,分区内和分区间的聚合规则相同。用法是:RDD.reduceByKey(func, numPartitions),其中func是聚合函数,numPartitions是分区数。
      • groupByKey是指对PairRDD中的数据,按照key进行分组,然后返回一个新的RDD,其中每个key对应一个Iterable的value。用法是:RDD.groupByKey(numPartitions),其中numPartitions是分区数。
      • aggregateByKey是指对PairRDD中的数据,按照key进行分组,然后对每个key的value使用一个初始值和两个函数进行聚合,分区内和分区间的聚合规则可以不同。用法是:RDD.aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions),其中zeroValue是初始值,seqFunc是分区内的聚合函数,combFunc是分区间的聚合函数,numPartitions是分区数。
      • foldByKey是指对PairRDD中的数据,按照key进行分组,然后对每个key的value使用一个初始值和一个函数进行聚合,分区内和分区间的聚合规则相同。用法是:RDD.foldByKey(zeroValue, func, numPartitions),其中zeroValue是初始值,func是聚合函数,numPartitions是分区数。
  • 无key的聚合函数是指对RDD中的数据,不区分key,直接进行整体或单列的聚合的函数,例如count、sum、avg等。这些函数不需要传入参数,只需要指定要聚合的列或整个数据集。无key的聚合函数返回的结果不是RDD,而是一个单一的值或数组。

代码示例

  • 有key的聚合函数的代码示例:
//假设有一个PairRDD,表示每个城市的销售额
val sales = sc.parallelize(List(("Beijing", 100), ("Shanghai", 200), ("Guangzhou", 150), ("Beijing", 120), ("Shanghai", 300), ("Guangzhou", 180)))

//使用reduceByKey函数,按照城市进行分组,然后对每个城市的销售额求和
val totalSales = sales.reduceByKey((x, y) => x + y)
//输出结果为:(Beijing,220),(Shanghai,500),(Guangzhou,330)

//使用groupByKey函数,按照城市进行分组,然后对每个城市的销售额求平均值
val avgSales = sales.groupByKey().mapValues(values => values.sum / values.size)
//输出结果为:(Beijing,110),(Shanghai,250),(Guangzhou,165)

//使用foldByKey函数,按照城市进行分组,然后对每个城市的销售额求和
val totalSales = sales.foldByKey(0)(_ + _)
//输出结果为:(Beijing,220),(Shanghai,500),(Guangzhou,330)

//使用aggregateByKey函数,按照城市进行分组,然后对每个城市的销售额求最大值和最小值
val maxMinSales = sales.aggregateByKey((Int.MinValue, Int.MaxValue))(
  (acc, value) => (math.max(acc._1, value), math.min(acc._2, value)), //在每个分区内对每个key的value求最大值和最小值
  (acc1, acc2) => (math.max(acc1._1, acc2._1), math.min(acc1._2, acc2._2)) //在不同分区间对每个key的value求最大值和最小值
)
//输出结果为:(Beijing,(120,100)),(Shanghai,(300,200)),(Guangzhou,(180,150))
  • 无key的聚合函数的代码示例:
//假设有一个RDD,表示每个人的年龄
val ages = sc.parallelize(List(20, 25, 30, 35, 40))

//使用count函数,统计RDD中的元素个数
val count = ages.count()
//输出结果为:5

//使用sum函数,求RDD中的元素之和
val sum = ages.sum()
//输出结果为:150

//使用avg函数,求RDD中的元素的平均值
val avg = ages.mean()
//输出结果为:30.0

四个有key函数的区别

  • reduceByKey和groupByKey的区别是,reduceByKey在分区内和分区间使用相同的函数对value进行聚合,而groupByKey只是按照key进行分组,不对value进行聚合。reduceByKey比groupByKey更高效,因为它可以减少shuffle的数据量。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y4YBexxe-1681978717408)(img/1609646014985.png)]在这里插入图片描述

在这里插入图片描述

  • aggregateByKey和reduceByKey的区别是,aggregateByKey可以指定一个初始值,并且可以使用不同的函数对分区内和分区间的value进行聚合,而reduceByKey只能使用相同的函数。aggregateByKey比reduceByKey更灵活,因为它可以实现更多的逻辑。
  • foldByKey和aggregateByKey的区别是,foldByKey只能使用相同的函数对分区内和分区间的value进行聚合,而aggregateByKey可以使用不同的函数。foldByKey是aggregateByKey的简化形式,适用于分区内和分区间的聚合规则相同的情况。

关联操作

关联操作是指对两个或多个RDD进行连接,根据不同的条件和方式,产生一个新的RDD。

  • 对于RDD,有以下几种关联操作:
    • join:根据两个RDD中的相同的key进行内连接,返回一个新的PairRDD,其中每个key对应一个元组,包含两个RDD中的value。
    • leftOuterJoin:根据两个RDD中的相同的key进行左外连接,返回一个新的PairRDD,其中每个key对应一个元组,包含左边RDD中的value和右边RDD中的value(如果存在)或None(如果不存在)。
    • rightOuterJoin:根据两个RDD中的相同的key进行右外连接,返回一个新的PairRDD,其中每个key对应一个元组,包含左边RDD中的value(如果存在)或None(如果不存在)和右边RDD中的value。
    • fullOuterJoin:根据两个RDD中的相同的key进行全外连接,返回一个新的PairRDD,其中每个key对应一个元组,包含左边RDD中的value(如果存在)或None(如果不存在)和右边RDD中的value(如果存在)或None(如果不存在)。
    • subtract:根据两个RDD中的不同的key进行差集操作,返回一个新的PairRDD,其中只包含左边RDD中有而右边RDD中没有的键值对。
    • cartesian:根据两个RDD中的所有可能的键值对进行笛卡尔积操作,返回一个新的PairRDD,其中包含所有可能的组合。

代码示例

  • 对于RDD,假设有以下两个PairRDD:
val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2 = sc.parallelize(List(("a", 4), ("b", 5), ("d", 6)))
  • 那么可以进行以下关联操作:
// join
rdd1.join(rdd2).collect()
// 输出:Array((a,(1,4)), (b,(2,5)))

// leftOuterJoin
rdd1.leftOuterJoin(rdd2).collect()
// 输出:Array((a,(1,Some(4))), (b,(2,Some(5))), (c,(3,None)))

// rightOuterJoin
rdd1.rightOuterJoin(rdd2).collect()
// 输出:Array((a,(Some(1),4)), (b,(Some(2),5)), (d,(None,6)))

// fullOuterJoin
rdd1.fullOuterJoin(rdd2).collect()
// 输出:Array((a,(Some(1),Some(4))), (b,(Some(2),Some(5))), (c,(Some(3),None)), (d,(None,Some(6))))

// subtract
rdd1.subtract(rdd2).collect()
// 输出:Array((c,3))

// cartesian
rdd1.cartesian(rdd2).collect()
// 输出:Array(((a,1),(a,4)), ((a,1),(b,5)), ((a,1),(d,6)), ((b,2),(a,4)), ((b,2),(b,5)), ((b,2),(d,6)), ((c,3),(a,4)), ((c,3),(b,5)), ((c,3),(d,6)))

排序操作

排序操作是指对RDD中的数据按照某种规则进行升序或降序的操作。

  • 对于RDD,有以下几种排序操作:
    • sortBy:根据指定的函数对RDD中的元素进行排序,可以指定升序或降序,默认为升序。
    • sortByKey:对PairRDD中的键进行排序,可以指定升序或降序,默认为升序。
    • top:返回RDD中最大的n个元素,可以指定排序规则,默认为自然顺序。
    • takeOrdered:返回RDD中最小的n个元素,可以指定排序规则,默认为自然顺序。

代码示例

val rdd = sc.parallelize(List(5, 3, 7, 1, 9, 4))
val pairRdd = sc.parallelize(List(("a", 5), ("b", 3), ("c", 7), ("d", 1), ("e", 9), ("f", 4)))
  • 可以进行以下排序操作:
// sortBy
rdd.sortBy(x => x).collect() // 输出:Array(1, 3, 4, 5, 7, 9)
rdd.sortBy(x => x, false).collect() // 输出:Array(9, 7, 5, 4, 3, 1)

// sortByKey
pairRdd.sortByKey().collect() // 输出:Array((a,5), (b,3), (c,7), (d,1), (e,9), (f,4))
pairRdd.sortByKey(false).collect() // 输出:Array((f,4), (e,9), (d,1), (c,7), (b,3), (a,5))

// top
rdd.top(3) // 输出:Array(9, 7, 5)
pairRdd.top(3) // 输出:Array((f,4), (e,9), (d,1))

// takeOrdered
rdd.takeOrdered(3) // 输出:Array(1, 3, 4)
pairRdd.takeOrdered(3) // 输出:Array((a,5), (b,3), (c,7))
  • 对pairRdd的排序都是按照字符abcd的大小来排的,因为默认的排序规则是按照字典序。如果想按照数字大小来排,可以指定一个自定义的排序规则:
// 定义一个Ordering对象,用于比较两个元组的第二个元素
object ValueOrdering extends Ordering[(String, Int)] {
  def compare(a: (String, Int), b: (String, Int)) = a._2 compare b._2
}

// 使用自定义的排序规则进行排序
pairRdd.sortByKey()(ValueOrdering).collect() // 输出:Array((d,1), (b,3), (f,4), (a,5), (c,7), (e,9))
pairRdd.sortByKey(false)(ValueOrdering).collect() // 输出:Array((e,9), (c,7), (a,5), (f,4), (b,3), (d,1))
pairRdd.top(3)(ValueOrdering) // 输出:Array((e,9), (c,7), (a,5))
pairRdd.takeOrdered(3)(ValueOrdering) // 输出:Array((d,1), (b,3), (f,4))
  • 也可以直接使用Ordering类提供的by方法,根据指定的函数生成一个Ordering对象。这样的话,你就不用自己定义一个Ordering对象了。
pairRdd.sortByKey()(top(3)(Ordering.by(_._2))).collect() // 输出:Array((d,1), (b,3), (f,4), (a,5), (c,7), (e,9))
pairRdd.sortByKey(false)(top(3)(Ordering.by(_._2))).collect() // 输出:Array((e,9), (c,7), (a,5), (f,4), (b,3), (d,1))
pairRdd.top(3)(top(3)(Ordering.by(_._2))) // 输出:Array((e,9), (c,7), (a,5))
pairRdd.takeOrdered(3)(top(3)(Ordering.by(_._2))) // 输出:Array((d,1), (b,3), (f,4))
      • 这个函数就是_._2,表示取元组的第二个元素。所以这个排序规则就是按照元组的第二个元素进行比较。这样的写法更简洁,但是也更难理解,所以要看自己的喜好。😊

RDD的缓存/持久化

缓存解决什么问题?–解决的是热点数据频繁访问的效率问题
在Spark开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。

RDD的缓存/持久化是Spark的一个重要特性,它可以提高迭代计算和交互式查询的性能,避免重复计算。

RDD的缓存/持久化有三种方式:cachepersistcheckpoint

cache和persist

  • cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。

    • cache是persist的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY)将数据持久化到内存中。

      在这里插入图片描述

    • persist可以指定不同的存储级别,比如MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER等,这些存储级别可以根据数据是否使用硬盘,是否使用堆外内存,是否序列化,是否有副本等因素进行选择。

      • 缓存级别
        在这里插入图片描述
    • cache和persist都是lazy的算子,需要触发action操作才会执行持久化。

    在这里插入图片描述

代码示例

cache的代码示例:

// 创建一个RDD
val rdd = sc.parallelize(1 to 10)

// 对RDD进行一些转换操作
val rdd2 = rdd.map(_ * 2)

// 对RDD进行缓存,使用默认的存储级别MEMORY_ONLY
rdd2.cache()

// 对RDD进行一些action操作,触发缓存
rdd2.count()
rdd2.collect()

//清空缓存
rdd2.unpersist()

persist的代码示例:

// 创建一个RDD
val rdd = sc.parallelize(1 to 10)

// 对RDD进行一些转换操作
val rdd2 = rdd.map(_ * 2)

// 对RDD进行持久化,指定存储级别为MEMORY_AND_DISK_SER
rdd2.persist(StorageLevel.MEMORY_AND_DISK_SER)

// 对RDD进行一些action操作,触发持久化
rdd2.count()
rdd2.collect()

//清空缓存
rdd2.unpersist()

如果想手动移除一个RDD的缓存,而不是等待该RDD被Spark自动移除,可以使用RDD.unpersist()方法。

checkpoint检查点

  • checkpoint: 将数据持久化到磁盘,并切断RDD的依赖关系,使得后续的操作可以从检查点处读取数据,而不需要从源头处重新计算。
    • checkpoint适用于计算代价特别大或者依赖链特别长的RDD。

    • checkpoint需要触发两次action操作才会执行持久化,一次是为了标记需要检查点的RDD,另一次是为了启动一个新的job来写入检查点数据。

    • checkpoint之后,RDD的依赖关系会被切断,而cache和persist之后,RDD的依赖关系还是存在的。

    • checkpoint之后,缓存的数据会被保存到HDFS中,不会丢失,而cache和persist之后,如果内存或磁盘不足,缓存的数据可能会丢失或被删除。

    • 一般来说,在使用checkpoint之前最好先使用cache或persist进行缓存,这样可以避免重新计算被标记的RDD。

APl

sc.setCheckpointDir(HDFS路径)//设置checkpoint路径,开发中一般设置为HDFS的目录

rdd.checkpoint//对计算复杂且后续会被频繁使用的RDD进行checkpoint

代码示例

checkpoint()的代码示例

// 创建一个SparkContext对象
val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)

// 设置检查点目录
sc.setCheckpointDir("hdfs://localhost:9000/checkpoint")

// 创建一个RDD
val rdd = sc.parallelize(1 to 10)

// 对RDD进行一些转换操作
val rdd2 = rdd.map(_ * 2)

// 对RDD进行检查点,将数据持久化到磁盘,并切断依赖关系
rdd2.checkpoint()

// 对RDD进行一些action操作,触发检查点
rdd2.count()
rdd2.collect()

checkpoint没有清除缓存的方法

共享变量

Spark共享变量是指可以在多个任务或节点之间共享的变量,它们可以用来实现一些特定的功能或优化性能。

Spark有两种共享变量,分别是:

  • 广播变量(broadcast variable),用来把一个变量在所有节点的内存之间进行共享,避免每个任务都拷贝一份变量的副本,节省网络和内存资源。
  • 累加器(accumulator),用来对信息进行聚合,比如计数或求和,只能由驱动程序读取,由任务更新。

Spark共享变量的使用方法是

  • 广播变量可以通过调用SparkContext.broadcast(v)方法从一个普通变量v中创建,然后通过调用value方法获取广播变量的值。

    在这里插入图片描述

  • 累加器可以通过调用SparkContext.longAccumulator(name)SparkContext.doubleAccumulator(name)方法创建一个数值类型的累加器,然后通过调用add(n)方法更新累加器的值,或者通过调用value方法获取累加器的值。

Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
Hello   world  how are you ?
    //创建环境
    val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //需求:
    // 以词频统计WordCount程序为例,处理的数据word2.txt所示,包括非单词符号,
    // 做WordCount的同时统计出特殊字符的数量
    //创建一个计数器/累加器
    val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
    //定义一个特殊字符集合
    val ruleList: List[String] = List(",", ".", "!", "#", "$", "%", "?")
    //将集合作为广播变量广播到各个节点
    val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)

    //创建RDD
    val lines: RDD[String] = sc.textFile("data/input/words2.txt")

    //transformation
    val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
      .flatMap(_.split("\\s+"))
      .filter(ch => {
        //获取广播数据
        val list: List[String] = broadcast.value
        if (list.contains(ch)) { //如果是特殊字符
          mycounter.add(1)
          false
        } else { //是单词
          true
        }
      }).map((_, 1))
      .reduceByKey(_ + _)

    //输出
    wordcountResult.foreach(println)
    val chResult: lang.Long = mycounter.value
    println("特殊字符的数量:"+chResult)

Shuffle本质

shuffle本质是洗牌

Shuffle的本质是指在Spark中进行一些需要跨分区交换数据的操作时,比如reduceByKey,join,groupByKey等,需要将属于同一个key的数据发送到同一个分区中,这个过程就涉及到了数据的重新分区和网络传输,这就是Shuffle。

Shuffle的本质可以用以下几个步骤来描述:

  • 在Shuffle操作之前的阶段,称为map阶段,每个任务会根据分区器(Partitioner)和聚合器(Aggregator)对自己处理的数据进行本地分区和聚合,并将结果写入本地磁盘文件中,这些文件称为map输出文件或Shuffle文件。
  • 在Shuffle操作之后的阶段,称为reduce阶段,每个任务会根据自己的分区号向map阶段的所有任务发送请求,获取属于自己分区的数据,并将这些数据拉取到自己的内存或磁盘中,然后进行后续的处理。
  • 在整个Shuffle过程中,Spark会通过一些机制来优化性能和资源利用率,比如使用序列化和压缩技术减少数据量,使用缓存和溢写策略控制内存占用,使用推测执行和动态资源分配机制提高任务执行效率等。

Shuffle的本质是Spark中最核心也最复杂的机制之一,它决定了Spark作业的性能和稳定性。

一个Shuffle文件的结构图如下:

-----------------Shuffle文件
分区0的数据每个分区的数据由多个记录组成,每个记录由长度、键、值三部分构成
-----------------
分区1的数据
-----------------
-----------------
分区n的数据
-----------------
分区0的索引每个分区的索引由偏移量和长度两部分构成,用于定位分区在文件中的位置
-----------------
分区1的索引
-----------------
-----------------
分区n的索引
-----------------

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/440760.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java学习-MySQL-DQL数据查询-联表查询JOIN

Java学习-MySQL-DQL数据查询-联表查询JOIN 1.分析需求,查找那些字段 2.分析查询的字段来自哪些表 3.确定使用哪种连接查询 4.确定交叉点 5.确定判断条件 操作描述inner join返回左右表的交集left join返回左表,即使右表没有right join返回右表&#xf…

iptables深度总结--基础篇

iptables 五表五链 链:INPUT OUTPUT FORWARD PREROUTING POSTROUTING 表:filter、nat、mangle、raw、security 数据报文刚进网卡,还没有到路由表的时候,先进行了prerouting,进入到路由表,通过目标地址判…

FFMPEG 关于smaple_fmts的理解及ffplay播放PCM

问题 当我将一个aac的音频文件解码为原始的PCM数据后,使用ffplay播放测试是否成功时,需要提供给ffplay 采样率,通道数,PCM的格式类型 3个参数,否则无法播放! 所以使用ffprobe 查看原来的aac文件信息&…

Python手写板 画图板 签名工具

程序示例精选 Python手写板 画图板 签名工具 如需安装运行环境或远程调试&#xff0c;见文章底部个人QQ名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对<<Python手写板 画图板 签名工具>>编写代码&#xff0c;代码整洁&#xff0c;规则&am…

别再回答面试官,toFixed采用的是四舍五入啦!

四舍五入大家都知道&#xff0c;但你知道银行家舍入法么&#xff1f;你知道JS里的toFixed实现用的是哪种吗&#xff1f; 前两天我写了篇《0.1 0.2 不等于 0.3&#xff1f;原来是因为这个》&#xff0c;大概就是说&#xff0c;0.1 0.2不等于0.3是因为浮点数精度问题。 结果在…

LinkedList 的特点及优缺点

现在来讲 LinkedList LinkedList 是链表集合&#xff0c;基于链表去存储数据&#xff0c;每一个数据视作一个节点 private static class Node<E> {// 存放的数据E item;// 下一个节点Node<E> next;// 上一个节点Node<E> prev;Node(Node<E> prev, E ele…

【unity实战】2D横版实现人物移动跳跃2——用对象池设计制作冲锋残影的效果(包含源码)

基于上一篇人物移动二段跳进一步优化完善 先看看最终效果 什么是对象池? 在Unity中,对象池是一种重复使用游戏对象的技术。使用对象池的好处是可以减少游戏对象的创建和销毁,从而提高游戏的性能。如果不使用对象池,每次需要创建游戏对象时,都需要调用Unity的Instantiate函…

国内几大技术网站,你最爱和哪个玩耍?

所谓“物以类聚&#xff0c;人以群分” 所谓“士为知己者死&#xff0c;女为悦己者容” 所谓“世上的乌鸦都一般黑&#xff0c;鸽子却各有各的白” CSDN&#xff0c;掘金&#xff0c;博客园等&#xff0c;说起来都是“技术”社区&#xff0c;每个却都有着不同的姿色和用处。至于…

初识Spring——IoC及DI详解

目录 一&#xff0c;什么是Spring Spring设计核心 Spring核心定义 Spring官网 二&#xff0c;什么是IoC IoC思想 控制权的反转 三&#xff0c;什么是DI DI的定义 DI和IoC的关系 一&#xff0c;什么是Spring Spring设计核心 我们常说的Spring其实指的是Spring Framewo…

ABP vNext电商项目落地实战(一)——项目搭建

一、落地条件&#xff1a; 1. .NET5版本 2. DDD 3. ABP vNext 4.ABP CLI &#xff08;ABP的命令行工具&#xff0c;包括ABP的各种模板&#xff09; 5.SQL Server 写在前面&#xff1a;我觉得这个框架的文件分层很凌乱&#xff0c;在企业的实际业务场景中&#xff0c;一般…

vscode+git浅尝

git 安装git以后初始化仓库分支重命名合并分支连接远程仓库推送项目 安装git以后 第一次使用git需要配置用户名和邮箱 任意处打开git终端&#xff0c;譬如鼠标右击点击git bash here 命令分别为&#xff1a; 设置用户名和邮箱 git config --global user.name “username” …

【QA】Python代码调试之解决Segmentation fault (core dumped)问题

Python代码调试之解决Segmentation fault 问题 问题描述排查过程1. 定位错误&#xff0c;2. 解决办法 参考资料 问题描述 Python3执行某一个程序时&#xff0c;报Segmentation fault (core dumped)错&#xff0c;且没有其他任何提示&#xff0c;无法查问题。 Segmentation fa…

jenkins gitlab asp.net core持续集成

什么是jenkins Jenkins直接取自其官方文档&#xff0c;是一个独立的开源自动化服务器&#xff0c;您可以使用它来自动执行与构建、测试、交付或部署软件相关的各种任务。 jenkins可以干什么 Jenkins 通过自动执行某些脚本来生成部署所需的文件来工作。这些脚本称为JenkinsFi…

叶酸聚乙二醇羟基FA-PEG-OH;了解高分子试剂 Folate-PEG-OH

FA-PEG-OH&#xff0c;叶酸-聚乙二醇-羟基 中文名称&#xff1a;叶酸聚乙二醇羟基 英文名称&#xff1a;FA-PEG-OH HO-PEG-FA Folate-PEG-OH 性状&#xff1a;黄色液体或固体&#xff0c;取决于分子量 溶剂&#xff1a;溶于水&#xff0c;DMSO、DMF等常规性有机溶剂 活性基…

【NestJs】使用连接mysql企业级开发规范

本篇将介绍如何建立 NestJs 的数据库连接、并使用数据库联表查询。 简介 Nest 与数据库无关&#xff0c;允许您轻松地与任何 SQL 或 NoSQL 数据库集成。根据您的偏好&#xff0c;您有许多可用的选项。一般来说&#xff0c;将 Nest 连接到数据库只需为数据库加载一个适当的 No…

Delphi DataSnap 流程分析(一)

DataSnap 有三种方式: 1、DataSnap REST Application: Create a DataSnap Server with support for REST Communication and with pages that invoke server methods using Java Script and JSON. 2、DataSnap Server: The DataSnap Server Wizard provides an easy way to i…

怎么把视频中动态的人物P掉,把视频中不要的人物去掉

怎么把视频中动态的人物P掉&#xff1f;很多小伙伴试过ps抠图&#xff0c;但是你试过视频人物抠图吗&#xff1f;其实道理是一样的&#xff0c;但是操作过程却变难了。今天就给大家带来一个简单的方法&#xff0c;轻松去除视频中的人物。不影响整个画面的呈现。 在拍摄旅游视频…

springcloud:快速上手定时任务框架xxl-job(十五)

0. 引言 实际开发中&#xff0c;我们常常遇到需要定时执行的任务&#xff0c;我们可以利用定时线程池或schedule框架等来实现定时任务&#xff0c;但这些方式都有效率、性能上的缺陷&#xff0c;在微服务框架下&#xff0c;我们期望一种更加规整、轻量、可靠的定时任务框架来帮…

【通信接口】UART、IIC、SPI

目录 一、预备知识 1、串行与并行 2、单工与双工 3、波特率 二、UART 三、IIC 四、SPI &#xff08;一对一、一对多&#xff09; 五、IIC、SPI异同点 参考文章&#xff1a;这些单片机接口&#xff0c;一定要熟悉&#xff1a;UART、I2C、SPI、TTL、RS232、RS422、RS485…

kafka-5 kafka的高吞吐量和高可用性

kafka的高吞吐量和高可用性 6.1 高吞吐量6.2 高可用&#xff08;HA&#xff09; 6.1 高吞吐量 kafka的高吞吐量主要是由4方面保证的&#xff1a; &#xff08;1&#xff09;顺序读写磁盘 Kafka是将消息持久化到本地磁盘中的&#xff0c;一般人会认为磁盘读写性能差&#xff…