Spark - RDD 算子介绍及使用 Scala、Java、Python 三种语言演示

news2025/1/19 22:28:37

一、RDD 的起源

RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行流程如下:
在这里插入图片描述
多个 MapReduce 任务之间只能通过磁盘来进行传递数据,很明显的效率低下,再来看 RDD 的处理方式:

在这里插入图片描述
整个过程是共享内存的, 而不需要将中间结果存放在分布式文件系统中,这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度。

二、RDD 的特点

RDD 不仅是数据集, 也是编程模型,提供了上层 API, 同时 RDDAPIjdk8stream 流对集合运算的 API 非常类似,同样也都是各算子,如下:

textFile.filter(StringUtils.isNotBlank) //过滤空内容
  .flatMap(_.split(" ")) //根据空格拆分
  .map((_, 1)) // 构建新的返回
  .foreach(s => println(s._1 + "  " + s._2)) //循环

RDD 的算子大致分为两类:

  • Transformation 转换操作, 例如 map flatMap filter 等。
  • Action 动作操作, 例如 reduce collect show

注意:执行 RDD 的时候会进行惰性求值,执行到转换操作的时候,并不会立刻执行,直到遇见了 Action 操作,才会触发真正的执行。

创建 RDD

RDD 有三种创建方式,可以通过本地集合直接创建,也可以通过读取外部数据集来创建,还可以通过其它的 RDD 衍生而来:

首先声明 SparkContext

  • scala:
val conf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc = new SparkContext(conf)
  • java
SparkConf conf = new SparkConf().setAppName("spark").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
  • python
from pyspark import SparkConf, SparkContext, StorageLevel
import findspark

if __name__ == '__main__':
    findspark.init()
    conf = SparkConf().setAppName('spark').setMaster('local[*]')
    sc = SparkContext(conf=conf)

1. 通过集合创建

  • scala
val rdd1 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""))
//指定分区
val rdd2 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""), 5)
  • java
JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""));
//指定分区
JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""), 5);
  • python
rdd1 = sc.parallelize(["abc", "abc", "fff dd", "ee,pp", ""])
#
rdd2 = sc.parallelize(["abc", "abc", "fff dd", "ee,pp", ""], 5)

2. 通过文件创建

  • scala
 //读取本地文件
 val rdd3 = sc.textFile("D:/test/spark/input3/words.txt")
 //读取本地文件,指定分区
 val rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5)
 //读取 HDFS 文件
 val rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt")
 //读取文件同时拿到文件名
 val rdd6 = sc.textFile("hdfs://test/spark/input3/")
  • java
//读取本地文件
JavaRDD<String> rdd3 = sc.textFile("D:/test/spark/input3/words.txt");
//读取本地文件,指定分区
JavaRDD<String> rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5);
//读取 HDFS 文件
JavaRDD<String> rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt");
//读取文件同时拿到文件名
JavaRDD<String> rdd6 = sc.textFile("hdfs://test/spark/input3/");
  • python
# 读取本地文件
rdd3 = sc.textFile("D:/test/spark/input3/words.txt")
#读取本地文件,指定分区
rdd4 = sc.textFile("D:/test/spark/input3/words.txt", 5)
#读取 HDFS 文件
rdd5 = sc.textFile("hdfs://test/spark/input3/words.txt")
#读取文件同时拿到文件名
rdd6 = sc.textFile("hdfs://test/spark/input3/")

下面对相关常用算子进行演示。

三、Transformations 算子

1. map

RDD 中的数据 一对一 的转为另一种形式:

例如:

  • scala:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5))
println(
  num.map(_+1).collect().toList
)
  • java:
JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
System.out.println(
       num.map(i -> i + 1).collect()
);
  • python:
num = sc.parallelize((1, 2, 3, 4, 5))
print(
    num.map(lambda i:i+1).collect()
)

在这里插入图片描述

2. flatMap

Map 算子类似,但是 FlatMap 是一对多,并都转化为一维数据:

例如:

  • scala:
val text = sc.parallelize(Seq("abc def", "hello word", "dfg,okh", "he,word"))
println(
  text.flatMap(_.split(" ")).flatMap(_.split(",")).collect().toList
)
  • java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("abc def", "hello word", "dfg,okh", "he,word"));
System.out.println(
        text.flatMap(s ->Arrays.asList(s.split(" ")).iterator())
                .flatMap(s ->Arrays.asList(s.split(",")).iterator())
                .collect()
);
  • python:
text = sc.parallelize(("abc def", "hello word", "dfg,okh", "he,word"))
print(
    text.flatMap(lambda s: s.split(" ")).flatMap(lambda s: s.split(",")).collect()
)

在这里插入图片描述

3. filter

过滤掉不需要的内容:

例如:

  • scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"))
println(
  text.filter(_.equals("hello")).collect().toList
)
  • java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"));
System.out.println(
        text.filter(s -> Objects.equals(s,"hello"))
                .collect()
);
  • python:
text = sc.parallelize(("hello", "hello", "word", "word"))
print(
    text.filter(lambda s: s == 'hello').collect()
)

在这里插入图片描述

4. mapPartitions

map 类似,针对整个分区的数据转换,拿到的是每个分区的集合:

例如:

  • scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
println(
  text.mapPartitions(iter => {
    iter.map(_ + "333")
  }).collect().toList
)
  • java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
System.out.println(
        text.mapPartitions(iter -> {
            List<String> list = new ArrayList<>();
            iter.forEachRemaining(s -> list.add(s+"333"));
            return list.iterator();
        }).collect()
);
  • python:
 text = sc.parallelize(("hello", "hello", "word", "word"), 2)
 
 def partition(par):
     tmpArr = []
     for s in par:
         tmpArr.append(s + "333")
     return tmpArr

 print(
     text.mapPartitions(partition).collect()
 )

在这里插入图片描述

5. mapPartitionsWithIndex

mapPartitions 类似, 只是在函数中增加了分区的 Index

例如:

  • scala:
val text = sc.parallelize(Seq("hello", "hello", "word", "word"), 2)
println(
  text.mapPartitionsWithIndex((index, iter) => {
    println("当前分区" + index)
    iter.map(_ + "333")
  }, true).collect().toList
)
  • java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("hello", "hello", "word", "word"), 2);
System.out.println(
       text.mapPartitionsWithIndex((index, iter) -> {
           System.out.println("当前分区" + index);
           List<String> list = new ArrayList<>();
           iter.forEachRemaining(s -> list.add(s + "333"));
           return list.iterator();
       }, true).collect()
);
  • python:
text = sc.parallelize(("hello", "hello", "word", "word"), 2)

def partition(index, par):
    print("当前分区" + str(index))
    tmpArr = []
    for s in par:
        tmpArr.append(s + "333")
    return tmpArr

print(
    text.mapPartitionsWithIndex(partition).collect()
)

在这里插入图片描述

6. mapValues

只能作用于 Key-Value 型数据, 和 Map 类似, 也是使用函数按照转换数据, 不同点是 MapValues 只转换 Key-Value 中的 Value

例如:

  • scala:
val text = sc.parallelize(Seq("abc", "bbb", "ccc", "dd"))
println(
  text.map((_, "v" + _))
    .mapValues(_ + "66")
    .collect().toList
)
  • java:
JavaRDD<String> text = sc.parallelize(Arrays.asList("abc", "bbb", "ccc", "dd"));
System.out.println(
       text.mapToPair(s -> new Tuple2<>(s, "v" + s))
               .mapValues(v -> v + "66").collect()
);
  • python:
text = sc.parallelize(("abc", "bbb", "ccc", "dd"))
print(
    text.map(lambda s: (s, "v" + s)).mapValues(lambda v: v + "66").collect()
)

在这里插入图片描述

7. sample

可以从一个数据集中抽样出来一部分, 常用作于减小数据集以保证运行速度, 并且尽可能少规律的损失:

第一个参数为withReplacement, 意为是否取样以后是否还放回原数据集供下次使用, 简单的说,如果这个参数的值为 true, 则抽样出来的数据集中可能会有重复。

第二个参数为fraction, 意为抽样的比例。

第三个参数为seed, 随机数种子, 用于 Sample 内部随机生成下标,一般不指定,使用默认值。

例如:

  • scala:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
println(
  num.sample(true,0.6,2)
    .collect().toList
)
  • java:
JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(
    num.sample(true, 0.6, 2).collect()
);
  • python:
num = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
print(
    num.sample(True, 0.6, 2).collect()
)

在这里插入图片描述

8. union

两个数据并集,类似于数据库的 union

例如:

  • scala:
val text1 = sc.parallelize(Seq("aa", "bb"))
val text2 = sc.parallelize(Seq("cc", "dd"))
println(
  text1.union(text2).collect().toList
)
  • java:
JavaRDD<String> text1 = sc.parallelize(Arrays.asList("aa", "bb"));
JavaRDD<String> text2 = sc.parallelize(Arrays.asList("cc", "dd"));
System.out.println(
        text1.union(text2).collect()
);
  • python:
text1 = sc.parallelize(("aa", "bb"))
text2 = sc.parallelize(("cc", "dd"))
print(
   text1.union(text2).collect()
)

在这里插入图片描述

9. join,leftOuterJoin,rightOuterJoin

两个(key,value)数据集,根据 key 取连接、左连接、右连接,类似数据库中的连接:

例如:

  • scala:
val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
val s2 = sc.parallelize(Seq("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))

val s3 = s1.map(s => (s.split(",")(0), s.split(",")(0)))
val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))

println(s3.join(s4).collectAsMap)
println(s3.leftOuterJoin(s4).collectAsMap)
println(s3.rightOuterJoin(s4).collectAsMap)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"));

JavaPairRDD<String, String> s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
JavaPairRDD<String, String> s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));

System.out.println(s3.join(s4).collectAsMap());
System.out.println(s3.leftOuterJoin(s4).collectAsMap());
System.out.println(s3.rightOuterJoin(s4).collectAsMap());
  • python:
s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
s2 = sc.parallelize(("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))

s3 = s1.map(lambda s:(s.split(",")[0], s.split(",")[0]))
s4 = s2.map(lambda s:(s.split(",")[0], s.split(",")[1]))

print(s3.join(s4).collectAsMap())
print(s3.leftOuterJoin(s4).collectAsMap())
print(s3.rightOuterJoin(s4).collectAsMap())

在这里插入图片描述

10. intersection

获取两个集合的交集 :

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "dfe", "hello"))
val s2 = sc.parallelize(Seq("fgh", "nbv", "hello", "word", "jkl", "abc"))
println(
 	s1.intersection(s2).collect().toList
)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "dfe", "hello"));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList("fgh", "nbv", "hello", "word", "jkl", "abc"));
System.out.println(
     s1.intersection(s2).collect()
);
  • python:
s1 = sc.parallelize(("abc", "dfe", "hello"))
s2 = sc.parallelize(("fgh", "nbv", "hello", "word", "jkl", "abc"))
print(
    s1.intersection(s2).collect()
)

在这里插入图片描述

11. subtract

获取差集,a - b ,取 a 集合中 b 集合没有的元素:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "dfe", "hello"))
val s2 = sc.parallelize(Seq("fgh", "nbv", "hello", "word", "jkl", "abc"))
println(
 	s1.subtract(s2).collect().toList
)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "dfe", "hello"));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList("fgh", "nbv", "hello", "word", "jkl", "abc"));
System.out.println(
        s1.subtract(s2).collect()
);
  • python:
s1 = sc.parallelize(("abc", "dfe", "hello"))
s2 = sc.parallelize(("fgh", "nbv", "hello", "word", "jkl", "abc"))
print(
    s1.subtract(s2).collect()
)

在这里插入图片描述

12. distinct

元素去重,是一个需要 Shuffled 的操作:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(
  s1.distinct().collect().toList
)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(
    s1.distinct().collect()
);
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(
  s1.distinct().collect()
)

在这里插入图片描述

13. reduceByKey

只能作用于 Key-Value 型数据,根据 Key 分组生成一个 Tuple,然后针对每个组执行 reduce 算子,传入两个参数,一个是当前值,一个是局部汇总,这个函数需要有一个输出, 输出就是这个 Key 的汇总结果,是一个需要 Shuffled 的操作:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(
  s1.map((_, 1))
    .reduceByKey(Integer.sum)
    .collectAsMap
)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(
   s1.mapToPair(s -> new Tuple2<>(s, 1))
           .reduceByKey(Integer::sum)
           .collectAsMap()
);
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(
  s1.map(lambda s: (s, 1))
      .reduceByKey(lambda v1, v2: v1 + v2)
      .collectAsMap()
)

在这里插入图片描述

14. groupByKey

只能作用于 Key-Value 型数据,根据 Key 分组, 和 ReduceByKey 有点类似, 但是 GroupByKey 并不求聚合, 只是列举 Key 对应的所有 Value,是一个需要 Shuffled 的操作。

GroupByKeyReduceByKey 不同,因为需要列举 Key 对应的所有数据, 所以无法在 Map 端做 Combine, 所以 GroupByKey 的性能并没有 ReduceByKey 好:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(
  s1.map((_, 1))
    .groupByKey()
    .collectAsMap
)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(
        s1.mapToPair(s -> new Tuple2<>(s, 1))
                .groupByKey()
                .collectAsMap()
);
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(
    s1.map(lambda s: (s, 1))
        .reduceByKey()
        .collectAsMap()
)

在这里插入图片描述

15. combineByKey

对数据集按照 Key 进行聚合,groupByKey, reduceByKey 的底层都是 combineByKey

参数:

createCombiner 将 Value 进行初步转换
mergeValue 在每个分区把上一步转换的结果聚合
mergeCombiners 在所有分区上把每个分区的聚合结果聚合
partitioner 可选, 分区函数
mapSideCombiner 可选, 是否在 Map 端 Combine
serializer 序列化器

例如,求取每个人的分数的平均值:

  • scala:
val s1 = sc.parallelize(Seq("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
println(
  s1.map(s => (s.split(":")(0), s.split(":")(1).toDouble))
    .combineByKey(
      score => (score, 1),
      (c: (Double, Int), newScore: Double) => (c._1 + newScore, c._2 + 1),
      (d1: (Double, Int), d2: (Double, Int)) => (d1._1 + d2._1, d1._2 + d2._2)
    ).map(t => (t._1, t._2._1 / t._2._2))
    .collectAsMap
)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"));
System.out.println(
   s1.mapToPair(s -> new Tuple2<>(s.split(":")[0], Double.parseDouble(s.split(":")[1])))
           .combineByKey(
                   (Function<Double, Tuple2<Double, Integer>>) score -> new Tuple2(score, 1),
                   (Function2<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>>) (c, newScore) -> new Tuple2<>(c._1 + newScore, c._2 + 1),
                   (Function2<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>) (d1, d2) -> new Tuple2<>(d1._1 + d2._1, d1._2 + d2._2))
           .mapToPair(t -> new Tuple2(t._1, t._2._1 / t._2._2))
           .collectAsMap()
);
  • python:
s1 = sc.parallelize(("小明:15.5", "小明:13.3", "张三:14.4", "张三:37.6", "李四:95.9", "李四:45.4"))
print(
    s1.map(lambda s: (s.split(":")[0], float(s.split(":")[1])))
        .combineByKey(lambda score: (score, 1),
                      lambda c, newScore: (c[0] + newScore, c[1] + 1),
                      lambda d1, d2: (d1[0] + d2[0], d1[1] + d2[1]))
        .map(lambda t: (t[0], t[1][0] / t[1][1]))
        .collectAsMap()
)

在这里插入图片描述

16. cogroup

多个 RDD 协同分组, 将多个 RDDKey 相同的 Value 分组:

例如:

  • scala:
val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
val s2 = sc.parallelize(Seq("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))

val s3 = s1.map(s => (s.split(",")(0), s.split(",")(1)))
val s4 = s2.map(s => (s.split(",")(0), s.split(",")(1)))

println(
  s3.cogroup(s4).collectAsMap
)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"));

JavaPairRDD<String, String> s3 = s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));
JavaPairRDD<String, String> s4 = s2.mapToPair(s -> new Tuple2<>(s.split(",")[0], s.split(",")[1]));

System.out.println(
       s3.cogroup(s4).collectAsMap()
);
  • python:
s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
s2 = sc.parallelize(("1,小明", "2,小张", "3,小李", "4,小红", "5,张三"))
s3 = s1.map(lambda s: (s.split(",")[0], s.split(",")[1]))
s4 = s2.map(lambda s: (s.split(",")[0], s.split(",")[1]))
print(
    s3.cogroup(s4).collectAsMap()
)

在这里插入图片描述

17. sortBy ,sortByKey

数据排序,同 sortByKey ,但普通的 RDD 没有sortByKey, 只有 Key-ValueRDD 才有:

参数
func通过这个函数返回要排序的字段
ascending是否升序
numPartitions分区数

例如:

  • scala:
val s1 = sc.parallelize(Seq("1,3", "2,6", "3,8", "4,2"))
val s2 = s1.map(s => (s.split(",")(0), s.split(",")(1).toInt))
println(
  s2.sortBy(_._2,false)
    .collectAsMap()
)
println(
  s2.sortByKey(false).collectAsMap()
)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("1,3", "2,6", "3,8", "4,2"));
System.out.println(
  s1.map(s -> new Tuple2<>(s.split(",")[0], Integer.parseInt(s.split(",")[1])))
          .sortBy(t -> t._2, false, 1)
          .collect()
);
System.out.println(
  s1.mapToPair(s -> new Tuple2<>(s.split(",")[0], Integer.parseInt(s.split(",")[1])))
          .sortByKey(false)
          .collect()
);
  • python:
s1 = sc.parallelize(("1,3", "2,6", "3,8", "4,2"))
s2 = s1.map(lambda s:(s.split(",")[0],int(s.split(",")[1])))
print(
   s2.sortBy(lambda t:t[1],False)
       .collectAsMap()
)
print(
   s2.sortByKey(False)
       .collectAsMap()
)

在这里插入图片描述

18. repartition,coalesce

repartition:重新分区,coalesce:减少分区,如果新的分区数量比原分区数大, 必须 Shuffled, 否则重分区无效,repartitioncoalesce 的不同就在于 coalesce 可以控制是否 Shufflerepartition 是一个 Shuffled 操作。

例如:

  • scala:
var p1 = sc.parallelize(Seq("abc", "abc", "fff dd", "ee,pp", ""))
println(p1.getNumPartitions)
p1 = p1.repartition(5)
println(p1.getNumPartitions)
p1 = p1.coalesce(3)
println(p1.getNumPartitions)
  • java:
JavaRDD<String> p1 = sc.parallelize(Arrays.asList("abc", "abc", "fff dd", "ee,pp", ""));
System.out.println(p1.getNumPartitions());
p1 = p1.repartition(5);
System.out.println(p1.getNumPartitions());
p1 = p1.coalesce(3);
System.out.println(p1.getNumPartitions());
  • python:
p1 = sc.parallelize(("abc", "abc", "fff dd", "ee,pp", ""))
print(p1.getNumPartitions)
p1.repartition(5)
print(p1.getNumPartitions)
p1.coalesce(3)
print(p1.getNumPartitions)

四、Action 算子

1. reduce

对整个结果集规约, 最终生成一条数据, 是整个数据集的汇总。

reducereduceByKey 完全不同, reduce 是一个 action, 并不是 Shuffled 操作,本质上 reduce 就是现在每个 partition 上求值, 最终把每个 partition 的结果再汇总。

例如:

  • scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(
  p1.reduce((_+_))
)
  • java:
JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(
    p1.reduce(Integer::sum)
);
  • python:
 p1 = sc.parallelize((1, 2, 3, 4, 6))
 print(
     p1.reduce(lambda i1, i2: i1 + i2)
 )

在这里插入图片描述

2. collect

以数组的形式返回数据集中所有元素。
例如:

  • scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(
  p1.collect()
)
  • java:
JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(
       p1.collect()
);
  • python:
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(
    p1.collect()
)

在这里插入图片描述

3. count

数据元素个数:

例如:

  • scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(
 p1.count()
)
  • java:
JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(
      p1.count()
);
  • python:
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(
   p1.count()
)

在这里插入图片描述

4. first

返回第一个元素:

例如:

  • scala:
var p1 = sc.parallelize(Seq(1, 2, 3, 4, 6))
println(
  p1.first()
)
  • java:
JavaRDD<Integer> p1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 6));
System.out.println(
      p1.first()
);
  • python:
p1 = sc.parallelize((1, 2, 3, 4, 6))
print(
    p1.first()
)

在这里插入图片描述

5. countByKey

求得整个数据集中 Key 以及对应 Key 出现的次数:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(
 s1.map((_,1)).countByKey()
)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"))
System.out.println(
      s1.mapToPair(s -> new Tuple2<>(s, 1)).countByKey()
);
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(
    s1.map(lambda s: (s, 1)).countByKey()
)

在这里插入图片描述

6. take

返回前 N 个元素:

例如:

  • scala:
val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
println(
  s1.take(3)
)
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
System.out.println(
       s1.take(3)
);
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
print(
    s1.take(3)
)

在这里插入图片描述

7. saveAsTextFile

将结果存入 path 对应的目录中:

例如:

  • scala:
 val s1 = sc.parallelize(Seq("abc", "abc", "hello", "hello", "word", "word"))
 s1.saveAsTextFile("D:/test/output/text/")
  • java:
JavaRDD<String> s1 = sc.parallelize(Arrays.asList("abc", "abc", "hello", "hello", "word", "word"));
s1.saveAsTextFile("D:/test/output/text/");
  • python:
s1 = sc.parallelize(("abc", "abc", "hello", "hello", "word", "word"))
s1.saveAsTextFile("D:/test/output/text/")

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/41112.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

利用pycharm调试ssh远程程序,并实时同步文件

或许你的服务器由于设置问题&#xff0c;不能通过Vscode进行远程调试python程序&#xff0c;那么本篇文章提供了利用pycharm远程调试程序的方法&#xff0c;且使用的编译器可以是服务器中的虚拟环境的编译器&#xff0c;可以实时同步本地与服务器的文件内容。希望对你能够有所帮…

【Oracle系列1】Oracle 的connect权限和create session的区别

【Oracle系列1】Oracle 的connect权限和create session的区别 背景 oracle数据库新建用户之后是无法登录的&#xff0c;需要赋予connect角色或者create session权限。 注意&#xff1a; connect是角色不是权限&#xff0c;create session是权限不是角色。角色是权限的集合。…

c++中的结构体

结构体&#xff1a;属于用户自定义的数据类型&#xff0c;允许用户存储不同的数据类型 语法&#xff1a;struct 结构体名 {结构体成员列表}&#xff1b;通过结构体创建变量的三种方式&#xff1a;1、struct 结构体名 变量名2、struct 结构体名 变量名{成员1值&#xff0c;成员…

第一个Shader程序

shader 很复杂&#xff0c;我学习的过程中也确实感受到了&#xff0c;需要会数学、图形学、编程语法等等知识。不如让我们直接看看 Shader 到底是什么&#xff1f;直接应用起来。或许没有那么复杂。 1、在场景中新建一个正方体&#xff0c;如下图 2、在 project 面板下新建一…

超级棒,使用 LIME 和 SHAP 可轻松解释机器学习模型的预测

在本文中&#xff0c;我将介绍两个可以帮助了解模型的决策过程的模型 LIME 和 SHAP。 作为数据科学家或机器学习从业者&#xff0c;将可解释性集成到机器学习模型中可以帮助决策者和其他利益相关者有更多的可见性并可以让他们理解模型输出决策的解释。 文章目录技术提升模型SH…

day02 redis

day02 Redis 第一章 Redis持久化机制 Redis的高性能是由于其将所有数据都存储在了内存中&#xff0c;为了使Redis在重启之后仍能保证数据不丢失&#xff0c;需要将数据从内存中同步到硬盘(文件)中&#xff0c;这一过程就是持久化。Redis 提供了一系列不同的持久化选项&#x…

MyBatis框架入门(含实例)

目录 1.MyBatis简介 2.ORM框架 3.数据持久化 4.Mybatis入门实战案例 4.1 下载mybatis的jar包 4.2 将jar包导入工程中 4.3 配置Mybatis的核心配置文件 4.3.1 MyBatis核心文件模板(mybatis-config) 4.3.2 mybatis-config模板的设置 4.4 创建User 实体类 4.5定义DAO层M…

11.25学到的东西==命令行

创建文件&#xff0c;可以直接选择文件之后再加上.py import argparse# 单个参数 # 创建解析器 # ArgumentParser 对象包含将命令行解析成 Python 数据类型所需的全部信息。 parser argparse.ArgumentParser() # 单独的参数 square 之后这个help就是提示的信息 # 显示给定数字…

【药材识别】基于matlab GUI SVM色差色温判断药材炮制程度系统【含Matlab源码 2241期】

⛄一、SVM色差色温判断药材炮制程度系统简介 本课题来源于"十二五"国家科技支撑计划项目(2012BAI29B11).颜色是中药质量标准中性状评价极为重要的内容,但传统的中药颜色检测大多依靠人的感官评估,人对颜色的辨别是一个非常复杂的过程,受到光学,视觉生理,视觉心理等诸…

JDBC操作数据库实现增、删、查、改

0.JDBC概念 实际开发中,手动的输入SQL语句是少之又少,大多数情况下是通过编译代码进行来控制自动执行. 具体操作如下: 上述展示有一个【自己写的Mysql客户端】&#xff0c;这种操作是非常容易的&#xff0c;因为各种数据库本身就提供一系列的API&#xff0c;可以让用户很方便…

wordpress 安装主题显示要配置FTP的解决办法

目录 问题复现 1、在安装插件的时候会弹出一个窗口 2、输入相关信息后显示失败 问题解决方法 1、查看wordpress文件权限 2、修改wordpress文件权限 3、插件安装完后&#xff0c;将权限改回 场景&#xff1a;基于Linux 的 wordpress 安装主题显示要配置FTP 安装插件或者主…

RegExp 对象

文章目录RegExp 对象创建RegExp对象正则表达式语法RegExp 对象方法支持正则表达式的 String 对象的方法RegExp.prototype[search]()replace() 方法match()常用正则表达式RegExp 对象 RegExp对象表示正则表达式&#xff0c;是由普通字符和特殊字符(也叫元字符或限定符)组成的文…

基于节点分层的配网潮流前推回代方法matlab程序(IEEE33节点潮流计算)

基于节点分层的配网潮流前推回代方法matlab程序&#xff08;IEEE33节点潮流计算&#xff09; 摘要&#xff1a;结合配电网特有的辐射状特点&#xff0c;提出了一种新的基于节点分层的配网潮流前推回代方法。该方法利用配网支路及其节点参数所形成的节点-节点关联矩阵推导出节点…

MiniAlphaGo黑白棋 蒙特卡洛搜索

做个笔记。 一、蒙特卡洛在黑白棋的应用 输入&#xff1a;棋盘&#x1d44f;&#x1d45c;&#x1d44e;&#x1d45f;&#x1d451;、当前执子方&#x1d450;&#x1d45c;&#x1d459;&#x1d45c;&#x1d45f;、搜索时间&#x1d461;&#x1d456;&#x1d45a;&#x…

小米平板5ProWIFI(elish)刷ArrowOS

文章目录警告下载奇兔刷机系统本体及Recovery清除数据刷入AospRec开始刷入警告完成设置输入法变砖头了qwq又是警告芝士截图Root方法结尾警告 此文章只针对 小米平板5Pro Wifi版本&#xff08;elish&#xff09; 由于条件限制&#xff0c;本文大部分无配图 请务必仔细认真阅读此…

Airflow用于ETL的四种基本运行模式, 2022-11-20

(2022.11.20 Sun) 基本运行模式(pattern)是data pipeline使用Airflow的DAG的不同结构&#xff0c;基本模式有如下四种 &#xff1a; 序列Sequence平行拆分Parallel split同步Synchronisation单选Exclusive choice 序列模式 序列模式即若干task按先后顺序依次执行&#xff0c;…

中远通在创业板IPO过会:前三季度收入11亿元,罗厚斌为董事长

近日&#xff0c;深圳证券交易所创业板披露的新显示&#xff0c;深圳市核达中远通电源技术股份有限公司&#xff08;下称“中远通”&#xff09;获得上市委会议通过。据贝多财经了解&#xff0c;中远通于2021年6月30日在创业板递交申请。 本次冲刺创业板上市&#xff0c;中远通…

以go rabbitmq为例子--用最少的时间最好的掌握消息队列

为什么要使用消息队列&#xff1f; 流量削峰 举个例子&#xff0c;如果订单系统最多能处理一万次订单&#xff0c;这个处理能力应付正常时段的下单时绰绰有余&#xff0c;正常时段我们下单一秒后就能返回结果。但是在高峰期&#xff0c;如果有两万次下单操作系统是处理不了的…

向QTableView单元格插入窗体小部件的功能实现

1.前言 我们知道&#xff1a;QTableWidget类有如下函数&#xff1a; void QTableWidget::setCellWidget(int row, int column, QWidget *widget) 可以实现在指定的单元格插入窗体部件QWidget对象&#xff0c;如下代码&#xff1a; setCellWidget(row, column, new QLineEdi…

2023年天津财经大学珠江学院专升本管理学原理专业考试大纲

天津财经大学珠江学院2023年高职升本科专业课考试《管理学原理》考试大纲一、本大纲系天津财经大学珠江学院2023年高职升本科《管理学原理》课程考试大纲。所列考试范围出自徐碧琳主编的教材《管理学原理&#xff08;第二版&#xff09;》&#xff0c;机械工业出版社&#xff0…