Spark-Core核心算子

news2024/11/28 10:45:22

文章目录

    • 一、数据源获取
      • 1、从集合中获取
      • 2、从外部存储系统创建
      • 3、从其它RDD中创建
      • 4、分区规则—load数据时
    • 二、转换算子(Transformation)
      • 1、Value类型
        • 1.1 map()_
        • 1.2 mapPartitions()
        • 1.3 mapPartitionsWithIndex(不常用)
        • 1.4 filterMap()_扁平化(合并流)
        • 1.5 groupBy()_分组
        • 1.6 filter()_过滤
        • 1.7 distinct()_去重
        • 1.8 coalesce()_合并分区
        • 1.9 repartition()_重新分区
        • 1.10 sortBy()_排序
        • 1.11 map和mapPartitions区别
        • 1.12 coalesce和repartition区别
      • 2、双-Value类型
        • 2.1 intersection()_交集
        • 2.2 union()_并集不去重
        • 2.3 subtract()_差集
        • 2.4 zip()_拉链
      • 3、Key—Value类型
        • 3.1 partitionBy()_按照K重新分区
        • 3.2 groupByKey()_按照K重新分组
        • 3.3 reduceByKey()_按照K聚合V
        • 3.4 aggregateByKey()_不同逻辑的归约
        • 3.5 sortByKey()_按照K进行排序
        • 3.6 mapValues()_只对V进行操作
        • 3.7 join()_等同于sql内连接
        • 3.8 cogroup()_类似于sql全连接
        • 3.9 自定义分区器
        • 3.10 reduceByKey和groupByKey区别
    • 三、行动算子(Action)
      • 1、collect()_以数组的形式返回数据集
      • 2、count()_返回RDD中元素个数
      • 3、first()_返回RDD中的第一个元素
      • 4、take()_返回由RDD前n个元素组成的数组
      • 5、takeOrdered()_返回排序后前n个元素
      • 6、countByKey()_统计每种key的个数
      • 7、saveAsTextFile(path)_保存成Text文件
      • 8、saveAsSequenceFile(path)_保存成Sequencefile文件
      • 9、saveAsObjectFile(path)_序列化成对象保存到文件
      • 10、foreach()_遍历RDD中每一个元素


一、数据源获取

1、从集合中获取

sc.parallelize(list)
sc.makeRDD(list)
sc.makeRDD(list, 2)
val list: List[Int] = List(1, 2, 3, 4, 5)
//  从List中创建RDD
val rdd01: RDD[Int] = sc.parallelize(list)
//  底层调用parallelize。从结合list中获取数据
val rdd02: RDD[Int] = sc.makeRDD(list)
//  2:分区数量为2
val rdd03: RDD[Int] = sc.makeRDD(list, 2)

2、从外部存储系统创建

//	从文件中获取
sc.textFile("input/1.txt")
//  无论文件中存储的是什么数据,读取过来都当字符串进行处理
val rdd04: RDD[String] = sc.textFile("input/1.txt")

3、从其它RDD中创建

在其它执行步骤完成后,生成新的RDD对象

val rdd05: RDD[String] = rdd04.map(_ * 2)

4、分区规则—load数据时

从集合中创建

从文件中创建

二、转换算子(Transformation)

//  1、创建SparkConf并设置App名称
val conf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
//  2、创建SparkContext,该对象时提交Spark APP 的入口
val sc = new SparkContext(conf)
//  3、创建RDD
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
//  4、具体执行步骤
val rdd01: RDD[Int] = rdd.map(x => x * 20)
//  5、打印结果
println(rdd01.collect().toList)
//  6、关闭连接
sc.stop()

1、Value类型

1.1 map()_

在这里插入图片描述

//  4、具体执行步骤
val rdd01: RDD[Int] = rdd.map(x => x * 20)
//  4、具体执行步骤
val rdd02: RDD[Int] = rdd01.map(_ * 20)

1.2 mapPartitions()

以分区为单位执行的map()

在这里插入图片描述

1.3 mapPartitionsWithIndex(不常用)

  • 里面的函数针对每个分区操作,分区有多少个,函数就执行多少次。
  • 函数的第一个参数代表分区号。
  • 函数的第二个参数代表分区数据迭代器。
  /**
   *
   * @param f                     分区编号
   * @param preservesPartitioning 分区数据迭代器
   */
def mapPartitionsWithIndex[U: ClassTag](
    f: (Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] = withScope {

}
rdd03.mapPartitionsWithIndex((index, items) => {
  items.map((index, _))
})
//	指定迭代器规则,并使用分区数据迭代器
rdd03.mapPartitionsWithIndex((index, items) => {
  items.map((index, _))
}, preservesPartitioning = true)

1.4 filterMap()_扁平化(合并流)

扁平化(合并流)

功能说明

  • 与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。
  • 区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。

在这里插入图片描述

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {

}
val rdd08: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4), List(5, 6)), 2)
val rdd09: RDD[Int] = rdd08.flatMap(list => list)
//	List(1, 2, 3, 4, 5, 6)
println(rdd09.collect().toList)

1.5 groupBy()_分组

分组

按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

在这里插入图片描述

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

在这里插入图片描述

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
  groupBy[K](f, defaultPartitioner(this))
}

案例

// 3.2 将每个分区的数据放到一个数组并收集到Driver端打印
rdd.groupBy((x)=>{x%2})
// 简化
rdd.groupBy(_%2)
val rdd10: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
//  (0,CompactBuffer(2, 4, 6, 8))
//  (1,CompactBuffer(1, 3, 5, 7, 9))
rdd10.groupBy(_ % 2).collect().foreach(println)
val rdd11: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5), 3)
//  按照数字相同进行分区
//  (3,CompactBuffer(3))
//  (4,CompactBuffer(4))
//  (1,CompactBuffer(1))
//  (5,CompactBuffer(5))
//  (2,CompactBuffer(2))
rdd11.groupBy(a => a).collect().foreach(println)

1.6 filter()_过滤

过滤

接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。

在这里插入图片描述

rdd11.filter(a => a % 2 == 0)
rdd11.filter(_% 2 == 0)
val rdd11: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
val rdd110: RDD[Int] = rdd11.filter(a => a % 2 == 0)
//  List(2, 4, 6, 8)
println(rdd110.collect().toList)

1.7 distinct()_去重

去重

  • 对内部的元素去重,并将去重后的元素放到新的RDD中。
  • 默认情况下,distinct会生成与原RDD分区个数一致的分区数。
  • 用分布式的方式去重比HashSet集合方式不容易OOM。

在这里插入图片描述

//	去重
rdd.distinct()
//	去重(2并发度)
rdd.distinct(2)
val rdd12: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 2, 3), 3)
//  List(3, 4, 1, 2)
println(rdd12.distinct().collect().toList)
//  List(4, 2, 1, 3)(采用多个Task提高并发读)
println(rdd12.distinct(2).collect().toList)

1.8 coalesce()_合并分区

合并分区

  • Coalesce算子包括:配置执行Shuffle和配置不执行Shuffle两种方式。
  • 缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

在这里插入图片描述

rdd13.coalesce(2)
rdd14.coalesce(2, shuffle = true)

缩减分区并执行Shuffer

val rdd14: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
//  缩减分区为2个
val rdd131: RDD[Int] = rdd13.coalesce(2)
//  缩减分区为2个,并执行Shuffer
val rdd141: RDD[Int] = rdd14.coalesce(2, shuffle = true)

1.9 repartition()_重新分区

重新分区

  • 执行Shuffle。
  • 该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。
  • 无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
  • 分区规则不是hash,因为平时使用的分区都是按照hash来实现的,repartition一般是对hash的结果不满意,想要打散重新分区。

在这里插入图片描述

rdd.repartition(2)
val rdd15: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
//  重新分区
val rdd151: RDD[Int] = rdd15.repartition(2)

1.10 sortBy()_排序

排序

  • 该操作用于排序数据。
  • 在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。
  • 排序后新产生的RDD的分区数与原RDD的分区数一致。
  • 实现正序和倒序排序。

在这里插入图片描述

//	正序
rdd.sortBy(num => num)
//	倒叙
rdd.sortBy(num => num, ascending = false)

案例:

val rdd16: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3)
//  重新排序,默认升序
val rdd161: RDD[Int] = rdd16.sortBy(num => num)
//  重新排序,配置降序
val rdd162: RDD[Int] = rdd16.sortBy(num => num, ascending = false)
val rdd17: RDD[(Int, Int)] = sc.makeRDD(List((1, 2), (3, 4), (5, 6)))
//  先按照第1个值升序,在按第2个值排序
val rdd171: RDD[(Int, Int)] = rdd17.sortBy(num => num)

1.11 map和mapPartitions区别

在这里插入图片描述

map与mapPartitions的区别

  • 函数针对的对象不一样
    • map的函数是针对每个元素操作
    • mapPartitions的函数是针对每个分区操作
  • 函数的返回值不一样
    • map的函数是针对每个元素操作,要求返回一个新的元素,map生成的新RDD元素个数 = 原RDD元素个数
    • mapPartitions的函数是针对分区操作,要求返回新分区的迭代器,mapPartitions生成新RDD元素个数不一定=原RDD元素个数
  • 元素内存回收的时机不一样
    • map对元素操作完成之后就可以垃圾回收了
    • mapPartitions必须要等到分区数据迭代器里面数据全部处理完成之后才会统一垃圾回收,如果分区数据比较大可能出现内存溢出,此时可以用map代替。
val rdd02: RDD[Int] = rdd01.mapPartitions(a => a.map(b => b * 2))
val rdd03: RDD[Int] = rdd02.mapPartitions(a => a.map(_ * 2))

1.12 coalesce和repartition区别

  • coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
  • repartition实际上是调用的coalesce,进行shuffle。源码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
}
  • coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的,repartition扩大分区执行shuffle。

2、双-Value类型

2.1 intersection()_交集

并集不去重

  • 对源RDD和参数RDD求交集后返回一个新的RDD。
  • 利用shuffle的原理进行求交集 ,需要将所有的数据落盘shuffle 效率很低
  • 不推荐使用

在这里插入图片描述

println(rdd01.intersection(rdd02)
val rdd01: RDD[Int] = sc.makeRDD(1 to 4)
val rdd02: RDD[Int] = sc.makeRDD(4 to 8)
//	取交集
//	利用shuffle的原理进行求交集  需要将所有的数据落盘shuffle 效率很低  不推荐使用
println(rdd01.intersection(rdd02).collect().toList)

2.2 union()_并集不去重

并集不去重

  • 对源RDD和参数RDD求并集后返回一个新的RDD
  • 由于不走shuffle ,效率高 。
  • 所有会使用到

在这里插入图片描述

rdd1.union(rdd2)
val rdd01: RDD[Int] = sc.makeRDD(1 to 4)
val rdd02: RDD[Int] = sc.makeRDD(4 to 8)
//	由于不走shuffle  效率高  所有会使用到
rdd1.union(rdd2).collect().foreach(println)

2.3 subtract()_差集

差集

  • 计算差的一种函数,去除两个RDD中相同元素,不同的RDD将保留下来。
  • 同样使用shuffle的原理,将两个RDD的数据写入到相同的位置,进行求差集
  • 需要走shuffle 效率低,不推荐使用
  • 在rdd01的数据中,与rdd02相差的数据(1,2,3)

在这里插入图片描述

//	计算第一个RDD与第二个RDD的差集并打印
rdd01.subtract(rdd02)
val rdd01: RDD[Int] = sc.makeRDD(1 to 4)
val rdd02: RDD[Int] = sc.makeRDD(4 to 8)
// 同样使用shuffle的原理  将两个RDD的数据写入到相同的位置 进行求差集
// 需要走shuffle  效率低  不推荐使用
//	在rdd01的数据中,与rdd02相差的数据(1,2,3)
rdd01.subtract(rdd02).collect().foreach(println)

2.4 zip()_拉链

拉链

  • 该操作可以将两个RDD中的元素,以键值对的形式进行合并。
  • 其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的元素。
  • 将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

在这里插入图片描述

val rdd01: RDD[Int] = sc.makeRDD(Array(1, 2, 3), 3)
val rdd02: RDD[String] = sc.makeRDD(Array("a", "b", "c"), 3)
//  List((1,a), (2,b), (3,c))
println(rdd01.zip(rdd02).collect().toList)
//  List((a,1), (b,2), (c,3))
println(rdd02.zip(rdd01).collect().toList)

反例:

val rdd02: RDD[String] = sc.makeRDD(Array("a", "b", "c"), 3)

val rdd03: RDD[String] = sc.makeRDD(Array("a", "b"), 3)
//  元素个数不同,不能拉链
//  SparkException: Can only zip RDDs with same number of elements in each partition
println(rdd03.zip(rdd02).collect().toList)
val rdd04: RDD[String] = sc.makeRDD(Array("a", "b", "c"), 2)
//  分区数不同,不能拉链
//  java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 3)
println(rdd04.zip(rdd02).collect().toList)

3、Key—Value类型

3.1 partitionBy()_按照K重新分区

按照K重新分区

  • 将RDD[K,V]中的K按照指定Partitioner重新进行分区;
  • 如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。
  • 分区数量会改变。

在这里插入图片描述

//	使用hash计算方式重分区,并重分区后分区数量 = 2
rdd01.partitionBy(new HashPartitioner(2))
val rdd01: RDD[(Int, String)] = sc.makeRDD(Array((111, "aaa"), (222, "bbbb"), (333, "ccccc")), 3)
val rdd02: RDD[(Int, String)] = rdd01.partitionBy(new HashPartitioner(2))

//	打印重分区后的分区数量
//	(0,(2,bbbb))
//	(1,(1,aaa))
//	(1,(3,ccccc))
val rdd03: RDD[(Int, (Int, String))] = rdd02.mapPartitionsWithIndex((index, datas) => {
  datas.map((index, _))
})
rdd03.collect().foreach(println)

3.2 groupByKey()_按照K重新分组

按照K重新分组

  • groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。
  • 该操作可以指定分区器或者分区数(默认使用HashPartitioner)。
  • 分区数量不会改变。

在这里插入图片描述

rdd001.groupByKey()
val rdd001: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("a", 5), ("b", 2)
val rdd002: RDD[(String, Iterable[Int])] = rdd001.groupByKey()
//  (a,CompactBuffer(1, 5))
//  (b,CompactBuffer(5, 2))
rdd002.collect().foreach(println)

3.3 reduceByKey()_按照K聚合V

按照K聚合V

  • 该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。
  • 其存在多种重载形式,还可以设置新RDD的分区数。

在这里插入图片描述

rdd01.reduceByKey((v1, v2) => (v1 + v2))
val rdd01: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("a", 5), ("b", 2)))
val rdd02: RDD[(String, Int)] = rdd01.reduceByKey((v1, v2) => (v1 + v2))
//  List((a,6), (b,7))
println(rdd02.collect().toList)

3.4 aggregateByKey()_不同逻辑的归约

分区内和分区间逻辑不同的归约

在这里插入图片描述

//	zeroValue(初始值):给每一个分区中的每一种key一个初始值;
//	seqOp(分区内):函数用于在每一个分区中用初始值逐步迭代value;
//	combOp(分区间):函数用于合并每个分区中的结果。
  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
  }
//	分区初始值=0,分区内取最大值,分区间求和
rdd01.aggregateByKey(0)(math.max(_, _), _ + _)
val rdd01: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("a", 5), ("b", 2)))
//	取出每个分区相同key对应值的最大值,然后相加
val rdd02: RDD[(String, Int)] = rdd01.aggregateByKey(0)(math.max(_, _), _ + _)
//	List((a,6), (b,7))
println(rdd02.collect().toList)

3.5 sortByKey()_按照K进行排序

按照K进行排序

  • 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。

在这里插入图片描述

//  按照key的正序(默认正序)
rdd01.sortByKey(ascending = true)
//  按照key的倒序排列
rdd01.sortByKey(ascending = false)
val rdd01: RDD[(Int, String)] = sc.makeRDD(Array((3, "aa"), (6, "cc"), (2, "bb"), 1, "dd"))
//  按照key的正序(默认正序)
println(rdd01.sortByKey(ascending = true).collect().toList)
//  按照key的倒序排列
println(rdd01.sortByKey(ascending = false).collect().toList)

3.6 mapValues()_只对V进行操作

只对V进行操作

  • 针对于(K,V)形式的类型只对V进行操作

在这里插入图片描述

rdd01.mapValues(_ + "|||")
val rdd01: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (1, "d"), (2, "b"), (3, "c")))
//  对Value值添加字符串|||
//  List((1,a|||), (1,d|||), (2,b|||), (3,c|||))
println(rdd01.mapValues(_ + "|||").collect().toList)

3.7 join()_等同于sql内连接

join() 等同于sql里的内连接,关联上的要,关联不上的舍弃

  • 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD。
  • 类似于SQL中的join(内联)

在这里插入图片描述

//  按key进行 内联join
rdd01.join(rdd02)
val rdd01: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd02: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))
//  按key进行 内联join
//  List((1,(a,4)), (2,(b,5)))
println(rdd01.join(rdd02).collect().toList)

3.8 cogroup()_类似于sql全连接

cogroup() 类似于sql的全连接,但是在同一个RDD中对key聚合

  • 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD。
  • 操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。
  • 取并集

在这里插入图片描述

//  cogroup 合并两个RDD,取并集
rdd01.cogroup(rdd02)
val rdd01: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd02: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))
//  cogroup 两个RDD并打印
//  List((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5)))
//  (3,(CompactBuffer(c),CompactBuffer())), (4,(CompactBuffer(),CompactBuffer(6))))
println(rdd01.cogroup(rdd02).collect().toList)

cogroup后结果处理

val rdd01: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 1), (3, 5)))
val rdd02: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))
//	cogroup后类型为Iterable,key调用其sum进行值求和(相同的key)
val value1: RDD[(Int, (Iterable[Int], Iterable[Int]))] = rdd01.cogroup(rdd02)
val value: RDD[(Int, (Int, Int))] = value1.mapValues(a => {
  (a._1.sum, a._2.sum)
})

3.9 自定义分区器

要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法。

  1. numPartitions: Int:返回创建出来的分区数。
  2. getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
  3. equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同。
val rdd01: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd02: RDD[(Int, String)] = rdd01.partitionBy(new MyParTition(2))
println(rdd02.collect().toList)
class MyParTition(num: Int) extends Partitioner {
  //  设置分区数
  override def numPartitions: Int = num
  //  具体分区逻辑
  override def getPartition(key: Any): Int = {
    //  采用模式匹配。依据不同的类型,采用不同的处理逻辑
    //  字符串:放入0号分区。整数:取模分区个数
    key match {
      case s: String => 0
      case i: Int => i % numPartitions
      case _ => 0
    }
  }
}

3.10 reduceByKey和groupByKey区别

  • reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。
  • groupByKey:按照key进行分组,直接进行shuffle。
  • 开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑,后续会学习功能更加强大的归约算子,能够在预聚合的情况下实现求平均值。

三、行动算子(Action)

行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。

1、collect()_以数组的形式返回数据集

以数组的形式返回数据集

  • 在驱动程序中,以数组Array的形式返回数据集的所有元素。

在这里插入图片描述

rdd02.collect().toList

2、count()_返回RDD中元素个数

返回RDD中元素个数

在这里插入图片描述

println(rdd01.count())

3、first()_返回RDD中的第一个元素

返回RDD中的第一个元素

在这里插入图片描述

println(rdd01.first())

4、take()_返回由RDD前n个元素组成的数组

返回由RDD前n个元素组成的数组

在这里插入图片描述

//	返回由前3个元素组成的数组
rdd01.take(3)
val number: Array[(Int, String)] = rdd01.take(3)

5、takeOrdered()_返回排序后前n个元素

返回该RDD排序后前n个元素组成的数组

在这里插入图片描述

// returns Array(2)
sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1)

// returns Array(2, 3)
sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2)

//  List(1, 2)
val rdd02: Array[Int] = sc.makeRDD(List(1, 3, 2, 4)).takeOrdered(2)
println(rdd02.toList)

6、countByKey()_统计每种key的个数

统计每种key的个数

在这里插入图片描述

 rdd01.countByKey()
val rdd01: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
val rdd02: collection.Map[Int, Long] = rdd01.countByKey()
//  Map(1 -> 3, 2 -> 1, 3 -> 2)
println(rdd02)

7、saveAsTextFile(path)_保存成Text文件

保存成Text文件

  • 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

在这里插入图片描述

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
//	保存到本地Text文件
rdd.saveAsTextFile("output01")

8、saveAsSequenceFile(path)_保存成Sequencefile文件

保存成Sequencefile文件

  • 将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
  • 只有kv类型RDD有该操作,单值的没有
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
//	保存成Sequencefile文件
rdd.saveAsObjectFile("output02")

9、saveAsObjectFile(path)_序列化成对象保存到文件

序列化成对象保存到文件

  • 用于将RDD中的元素序列化成对象,存储到文件中。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
//	序列化成对象保存到文件
rdd.map((_, 1)).saveAsObjectFile("output03")

10、foreach()_遍历RDD中每一个元素

遍历RDD中每一个元素

在这里插入图片描述

//  收集后打印
rdd.collect().foreach(println)
//  分布式打印
rdd.foreach(println)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/968213.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MATLAB制图代码【第二版】

MATLAB制图代码【第二版】 文档描述 Code describtion: This code is version 2 used for processing the data from the simulation and experiment. Time : 2023.9.3 Author: PEZHANG 这是在第一版基础上,迭代出的第二版MATLAB制图代码,第二版的特点是…

一探究竟:为什么需要 JVM?它处在什么位置?

小熊学Java全能学习面试指南:https://www.javaxiaobear.cn/ JVM我们并不陌生,现在我们就正式进入 JVM 的学习,如果你是一名软件开发工程师,在日常工作中除了 Java 这个关键词外,还有一个名词也一定经常被提及&#xf…

3、Spring 之IOC 容器 详解

IoC 是 Inversion of Control 的简写,译为“控制反转”,它不是一门技术,而是一种设计思想,是一个重要的面向对象编程法则,能够指导我们如何设计出松耦合、更优良的程序。 Spring 通过 IoC 容器来管理所有 Java 对象的…

【SpringSecurity】十二、集成JWT搭配Redis实现退出登录

文章目录 1、登出的实现思路2、集成Redis3、认证成功处理器4、退出成功处理器5、修改token校验过滤器6、调试 1、登出的实现思路 这是目前的token实现图: 因为JWT的无状态,服务端无法在使用过程中主动废止某个 token,或者更改 token 的权限…

《TCP/IP网络编程》阅读笔记--基于Windows实现Hello Word服务器端和客户端

目录 1--Hello Word服务器端 2--客户端 3--编译运行 3-1--编译服务器端 3-2--编译客户端 3-3--运行 1--Hello Word服务器端 // gcc hello_server_win.c -o hello_server_win -lwsock32 // hello_server_win 9190 #include <stdio.h> #include <stdlib.h> #i…

07-ThreadLocal有哪些使用场景?【Java面试题总结】

ThreadLocal有哪些使用场景&#xff1f; 7.1 多线程场景下共享变量问题 ThreadLocal是线程本地变量&#xff0c;可以存储共享变量副本&#xff0c;每一个独立线程都有与共享变量一模一样的副本。ThreadLocal在当前线程下共享变量是全局共享的&#xff0c;各个线程之间是相互独…

虚拟内存相关笔记

虚拟内存是计算机系统内存管理的一个功能&#xff0c;它允许程序认为它们有比实际物理内存更多的可用内存。它使用硬盘来模拟额外的RAM。当物理内存不足时&#xff0c;操作系统将利用磁盘空间作为虚拟内存来存储数据。这种机制提高了资源的利用率并允许更大、更复杂的应用程序的…

ICCV 2023 | TUM谷歌提出md4all:挑战性条件下的单目深度估计

点击下方卡片&#xff0c;关注“CVer”公众号 AI/CV重磅干货&#xff0c;第一时间送达 点击进入—>【深度估计】交流群 Robust Monocular Depth Estimation under Challenging Conditions 作者列表: Stefano Gasperini, Nils Morbitzer, HyunJun Jung, Nassir Navab, Federi…

【小作文】【信】

【邀请信】【22&#xff0c;1邀请教授参加比赛】 【投诉信】【12,2投诉产品质量问题】

一般不用buildroot来编译uboot和kernel

Buildroot 是一个流行的嵌入式 Linux 系统构建工具&#xff0c;它可以帮助开发者自动化地构建完整的嵌入式 Linux 系统&#xff0c;包括文件系统、内核以及各种用户空间应用程序。虽然 Buildroot 在构建嵌入式系统方面非常强大且易于使用&#xff0c;但一般情况下&#xff0c;它…

STM32WB55开发(1)----套件概述

STM32WB55开发----1.套件概述 所用器件视频教学样品申请优势支持协议系统控制和生态系统访问功能示意图系统框图跳线设置开发板原理图 所用器件 所使用的器件是我们自行设计的开发板&#xff0c;该开发板是基于 STM32WB55 系列微控制器所构建。STM32WBXX_VFQFPN68 不仅是一款评…

Linux学习之NAS服务器搭建

NAS是Network Attached Storage的缩写&#xff0c;也就是网络附属存储。可以使用自己已经不怎么使用的笔记本搭建一台NAS服务器。 fdisk -l可以看一下各个磁盘的状态。 可以看到有sda、sdb、sdc和sdd等四块硬盘。 lvs、vgs和pvs结合起来看&#xff0c;sdb和sdc没有被使用。 …

MYSQL_

文章目录 ①. 索引的概述②. 二叉树和红黑树③. Hash建立索引结构④. B树的数据结构⑤. MyISAM存储引擎索引实现⑥. InnoDB索引实现(聚集)⑦. 联合索引的设定 ①. 索引的概述 ①. 索引是帮助MySQL高效获取数据的排好序的数据结构 ②. mysql数据库的实现原理通过b树实现的,b树的…

docker安装redis,并挂载配置文件

1&#xff1a;下载镜像&#xff0c;不添加版本 默认下载最新的 docker pull redis下载成功后如图所示 2&#xff1a;下载redis配置文件&#xff0c;我是在docker中下载的&#xff0c;也可以使用文件上传工具将配置文件上传到自己指定的目录。 首先需要安装wget&#xff0c;否…

第一章 USB应用笔记之USB初步了解

USB应用笔记之USB初步了解 文章目录 USB应用笔记之USB初步了解前言USB的优点&#xff1a;USB版本发展USB速度以及电气接口USB传输过程USB开发抓包工具&#xff1a;USB传输方式1.控制传输特点:2.中断传输的特点3. 批量传输的特点4.实时传输&#xff08;同步传输&#xff09;的特…

同步与互斥

硬件指令 实现互斥&#xff1a;硬件指令&#xff0c;硬件实现的原子操作&#xff0c;不会被打断 tsl指令和xchg指令 当前指令执行完&#xff0c;才会检测中断 If the signal comes while an instruction is being executed, it is held until the execution of the instructi…

Feign负载均衡写法

Feign主要为了面向接口编程 feign是web service客户端&#xff0c;是接口实现的&#xff0c;而ribbon是通过微服务名字访问通过RestTemplate调用的&#xff0c;如下&#xff1a; 在Feign的实现下&#xff0c;我们只需要创建一个接口并使用注解的方式来配置它&#xff08;类似…

仿京东 项目笔记2(注册登录)

这里写目录标题 1. 注册页面1.1 注册/登录页面——接口请求1.2 Vue开发中Element UI的样式穿透1.2.1 ::v-deep的使用1.2.2 elementUI Dialog内容区域显示滚动条 1.3 注册页面——步骤条和表单联动 stepsform1.4 注册页面——滑动拼图验证1.5 注册页面——element-ui组件Popover…

开开心心带你学习MySQL数据库之第三篇上

学校的项目组有必要加入吗? 看你的初心. ~~如果初心是通过这个经历能够提高自己的技术水平 ~~是可以考虑的 ~~如果初心是通过这个经历提高自己找工作的概率 ~~这个是不靠谱的,啥用没有 ~~如果初心是通过这个体验更美好的大学生活 ~~靠谱的 秋招,应届生,找工作是非常容易的!!! …

《开发实战》13 | 用好Java 8的日期时间类,少踩一些“老三样”的坑

13 | 用好Java 8的日期时间类&#xff0c;少踩一些“老三样”的坑 初始化日期时间 如果要初始化一个 2019 年 12 月 31 日 11 点 12 分 13秒这样的时间&#xff0c;Date date new Date(2019, 12, 31, 11, 12, 13);输出的时间是 3029 年 1 月 31 日 11 点 12 分 13 秒&#xf…