Spark中RDD概述及RDD算子详解

news2024/9/27 12:14:50

一、RDD概述

1、RDD: 弹性的分布式数据集

弹性:RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中

分布式:数据可以分布在多台服务器中,RDD中的分区来自于block块,而block块会来自不同的datanode

 数据集:(1)RDD自身可以不存储数据的,只存放代码计算逻辑,触发作业执行的时候,数据会在RDD之间流动

               (2)RDD 也可以缓存起来, 相当于存储具体数据

2、RDD 是一个分布式计算框架, 所以, 一定是要能够进行分区计算的, 只有分区了, 才能利用集群的并行计算能力

二、shuffle

spark的运行过程中如果出现了相同的键被拉取到对应的分区,这个过程称之为shuffle。

注:只有 Key-Value 型的 RDD 才会有 Shuffle 操作,Spark的shuffle和mapreduce的shuffle原理是一样,都是要进行落盘。


object Demo2Partition {
  def main(args: Array[String]): Unit = {
    //1、创建Spark环境
    //1.1 创建配置文件对象
    val conf: SparkConf = new SparkConf()

    //1.2 指定运行的模式(local  Standalone  Mesos  YARN)
    conf.setMaster("local") //可以执行所运行需要核数资源local[2],不指定的话默认使用所有的资源执行程序

    //1.3 给spark作业起一个名字
    conf.setAppName("wc")

    //2、创建spark运行时的上下文对象
    //SparkContext是spark-core的入口组件, 是一个 Spark 程序的入口,主要作用是连接集群, 创建 RDD, 累加器, 广播变量等
    val sparkContext: SparkContext = new SparkContext(conf)

    //3、读取文件数据
    //    val wordsLine: RDD[String] = sparkContext.textFile("spark/data/ws/*", minPartitions = 7)
    val wordsLine: RDD[String] = sparkContext.textFile("spark/data/ws/*")
    println(s"wordsLineRDD分区数是:${wordsLine.getNumPartitions}")

    //4、每一行根据|分隔符进行切分
    val words: RDD[String] = wordsLine.flatMap(_.split("\\|"))
    println(s"wordsRDD分区数是:${words.getNumPartitions}")

    val wordsTuple2: RDD[(String, Int)] = words.map((_, 1))
    println(s"wordsTuple2RDD分区数是:${wordsTuple2.getNumPartitions}")

    //产生shuffle的算子上可以单独设置分区数
    val wordsTuple2Group: RDD[(String, Iterable[(String, Int)])] = wordsTuple2.groupBy(_._1, 5)
    println(s"wordsTuple2GroupRDD分区数是:${wordsTuple2Group.getNumPartitions}")

    val wordCount: RDD[(String, Int)] = wordsTuple2Group.map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))
    println(s"wordCountRDD分区数是:${wordCount.getNumPartitions}")

    wordCount.saveAsTextFile("spark/data/word_count2")

  }
}

SparkContext是spark功能的主要入口其代表与spark集群的连接,能够用来在集群上创建RDD、累加器、广播变量。每个JVM里只能存在一个处于激活状态的SparkContext,在创建新的SparkContext之前必须调用stop()来关闭之前的SparkContext。

每一个Spark应用都是一个SparkContext实例,可以理解为一个SparkContext就是一个spark application的生命周期,一旦SparkContext创建之后,就可以用这个SparkContext来创建RDD、累加器、广播变量,并且可以通过SparkContext访问Spark的服务,运行任务。spark context设置内部服务,并建立与spark执行环境的连接。

sparkContext在Spark应用程序的执行过程中起着主导作用,它负责与程序和spark集群进行交互,包括申请集群资源、创建RDD、accumulators及广播变量等。

三、RDD五大特性

1、RDD是由一些分区构成的,读取文件时有多少个block块,RDD中就会有多少个分区
注:默认情况下,所有的RDD中的分区数是一样的,无论是shuffle之前还是shuffle之后的,在最开始加载数据的时候决定的

  2、函数实际上是作用在RDD中的分区上的,一个分区是由一个task处理,有多少个分区,总共就有多少个task

 注:函数在spark中称之为算子(转换transformation算子 RDD-->RDD,行动action算子 RDD->Other数据类型)

  3、RDD之间存在一些依赖关系,后一个RDD中的数据是依赖与前一个RDD的计算结果,数据像水流一样在RDD之间流动

 注:

3.1 RDD之间有两种依赖关系

a. 窄依赖 后一个RDD中分区数据对应前一个RDD中的一个分区数据 1对1的关系

b. 宽依赖 后一个RDD中分区数据来自前一个RDD中的多个分区数据 1对多的关系(shuffle)

3.2 因为有了依赖关系,将整个作业划分了一个一个stage阶段 sumNum(stage) = Num(宽依赖) + 1

3.3 窄依赖的分区数是不可以改变,取决于第一个RDD分区数,宽依赖可以在产生shuffle的算子上设置分区数

4、分区类的算子只能作用在键值对格式的RDD上,groupByKey、reduceByKey

5、spark为task计算提供了精确的计算位置,移动计算而不移动数据

四、RDD算子

RDD 中的算子从功能上分为两大类

  1. Transformation(转换) :它会在一个已经存在的 RDD 上创建一个新的 RDD, 将旧的 RDD 的数据转换为另外一种形式后放入新的 RDD

  2. Action(行动): 执行各个分区的计算任务, 将的到的结果返回到 Driver 中

注意:RDD具有懒执行的特点,一个spark作业,由action算子来触发执行的,若没有action算子,整个作业不执行

转换算子:Transformation

1、Map算子

       将rdd中的数据,一条一条的取出来传入到map函数中,map会返回一个新的rdd,map不会改变总数据条数

object Demo3Map {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val context = new SparkContext(conf)
    //====================================================

    val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

    /**
     * map算子:将rdd中的数据,一条一条的取出来传入到map函数中,map会返回一个新的rdd,map不会改变总数据条数
     */
    val splitRDD: RDD[List[String]] = studentRDD.map((s: String) => {
      println("============好好学习================")
      s.split(",").toList
    })

//    splitRDD.foreach(println)     //foreach是action算子       
  }
}

2、filter算子

       filter: 过滤,将RDD中的数据一条一条取出传递给filter后面的函数,如果函数的结果是true,该条数据就保留,否则丢弃

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo4Filter {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")
    val context = new SparkContext(conf)
    //====================================================
    val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

    /**
     *  filter: 过滤,将RDD中的数据一条一条取出传递给filter后面的函数,如果函数的结果是true,该条数据就保留,否则丢弃
     *
     *  filter一般情况下会减少数据的条数
     */
    val filterRDD: RDD[String] = studentRDD.filter((s: String) => {
      val strings: Array[String] = s.split(",")
      "男".equals(strings(3))
    })


    filterRDD.foreach(println)

  }
}

3、flatMap算子

       flatMap算子:将RDD中的数据一条一条的取出传递给后面的函数,函数的返回值必须是一个集合。最后会将集合展开构成一个新的RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo5flatMap {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("flatMap算子演示")
    val context = new SparkContext(conf)
    //====================================================
    val linesRDD: RDD[String] = context.textFile("spark/data/words.txt")

    /**
     *  flatMap算子:将RDD中的数据一条一条的取出传递给后面的函数,函数的返回值必须是一个集合。最后会将集合展开构成一个新的RDD
     */
    val wordsRDD: RDD[String] = linesRDD.flatMap((line: String) => line.split("\\|"))

    wordsRDD.foreach(println)
  }
}

4、sample算子

        从前一个RDD的数据中抽样一部分数据,抽取的比例不是正好对应的,在抽取的比例上下浮动 比如1000条抽取10% 抽取的结果在100条左右

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo6sample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("flatMap算子演示")
    val context = new SparkContext(conf)
    //====================================================
    val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

    /**
     *  sample算子:从前一个RDD的数据中抽样一部分数据
     *
     *  抽取的比例不是正好对应的,在抽取的比例上下浮动 比如1000条抽取10% 抽取的结果在100条左右
     */
    val sampleRDD: RDD[String] = studentRDD.sample(withReplacement = true, 0.1)
    sampleRDD.foreach(println)
  }

}

5、groupBy算子

        groupBy:按照指定的字段进行分组,返回的是一个键是分组字段,值是一个存放原本数据的迭代器的键值对 返回的是kv格式的RDD

object Demo7GroupBy {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("groupBy算子演示")
    val context = new SparkContext(conf)
    //====================================================
    val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

    val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))

    //需求:求出每个班级平均年龄
    //使用模式匹配的方式取出班级和年龄
    val clazzWithAgeRDD: RDD[(String, Int)] = splitRDD.map {
      case Array(_, _, age: String, _, clazz: String) => (clazz, age.toInt)
    }

    /**
     *  groupBy:按照指定的字段进行分组,返回的是一个键是分组字段,值是一个存放原本数据的迭代器的键值对 返回的是kv格式的RDD
     *
     *  key: 是分组字段
     *  value: 是spark中的迭代器
     *  迭代器中的数据,不是完全被加载到内存中计算,迭代器只能迭代一次
     *
     *  groupBy会产生shuffle
     */
    //按照班级进行分组
    //val stringToStudents: Map[String, List[Student]] = stuList.groupBy((s: Student) => s.clazz)
    val kvRDD: RDD[(String, Iterable[(String, Int)])] = clazzWithAgeRDD.groupBy(_._1)
    val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {
      case (clazz: String, itr: Iterable[(String, Int)]) =>
        //CompactBuffer((理科二班,21), (理科二班,23), (理科二班,21), (理科二班,23), (理科二班,21), (理科二班,21), (理科二班,24))
        //CompactBuffer(21,23,21,23,21,21,24)
        val allAge: Iterable[Int] = itr.map((kv: (String, Int)) => kv._2)
        val avgAge: Double = allAge.sum.toDouble / allAge.size
        (clazz, avgAge)
    }

    clazzAvgAgeRDD.foreach(println)

    while (true){

    }

  }

}

6、groupByKey算子

        groupByKey: 按照键进行分组,将value值构成迭代器返回,在spark中看到RDD[(xx, xxx)] 这样的RDD就是kv键值对类型的RDD,只有kv类型键值对RDD才可以调用groupByKey算子。

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo8GroupByKey {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("groupByKey算子演示")
    val context = new SparkContext(conf)
    //====================================================
    val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

    val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))

    //需求:求出每个班级平均年龄
    //使用模式匹配的方式取出班级和年龄
    val clazzWithAgeRDD: RDD[(String, Int)] = splitRDD.map {
      case Array(_, _, age: String, _, clazz: String) => (clazz, age.toInt)
    }


    /**
     *  groupByKey: 按照键进行分组,将value值构成迭代器返回
     *  将来你在spark中看到RDD[(xx, xxx)] 这样的RDD就是kv键值对类型的RDD
     *  只有kv类型键值对RDD才可以调用groupByKey算子
     *
     */
    val kvRDD: RDD[(String, Iterable[Int])] = clazzWithAgeRDD.groupByKey()
    val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {
      case (clazz: String, ageItr: Iterable[Int]) =>
        (clazz, ageItr.sum.toDouble / ageItr.size)
    }
    clazzAvgAgeRDD.foreach(println)

    while (true){

    }

    /**
     *  groupBy与groupByKey的区别(spark的面试题)
     *  1、代码上的区别:任意一个RDD都可以调用groupBy算子,只有kv类型的RDD才可以调用groupByKey
     *  2、groupByKey之后产生的RDD的结构比较简单,方便后续处理
     *  3、groupByKey的性能更好,执行速度更快,因为groupByKey相比较与groupBy算子来说,shuffle所需要的数据量较少
     */
  }
}

7、reduceByKey算子

       按照键key对value值直接进行聚合,需要传入聚合的方式,reduceByKey算子也是只有kv类型的RDD才能调用。

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo9ReduceByKey {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("reduceByKey算子演示")
    val context = new SparkContext(conf)
    //====================================================
    val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

    val splitRDD: RDD[Array[String]] = studentRDD.map((s: String) => s.split(","))

    //求每个班级的人数
    val clazzKVRDD: RDD[(String, Int)] = splitRDD.map {
      case Array(_, _, _, _, clazz: String) => (clazz, 1)
    }

    /**
     * 利用groupByKey实现
     */
    //    val kvRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()
    //    val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {
    //      case (clazz: String, n: Iterable[Int]) =>
    //        (clazz, n.sum)
    //    }
    //    clazzAvgAgeRDD.foreach(println)

    /**
     * 利用reduceByKey实现:按照键key对value值直接进行聚合,需要传入聚合的方式
     * reduceByKey算子也是只有kv类型的RDD才能调用
     *
     *
     */
    val countRDD: RDD[(String, Int)] = clazzKVRDD.reduceByKey((x: Int, y: Int) => x + y)
    countRDD.foreach(println)



//    clazzKVRDD.groupByKey()
//      .map(kv=>(kv._1,kv._2.sum))
//      .foreach(println)

    while (true){

    }

    /**
     *  reduceByKey与groupByKey的区别
     *  1、reduceByKey比groupByKey在map端多了一个预聚合的操作,预聚合之后的shuffle数据量肯定是要少很多的,性能上比groupByKey要好
     *  2、从灵活角度来看,reduceByKey并没有groupByKey灵活
     *   比如reduceByKey无法做方差,groupByKey后续可以完成
     *
     */


  }
}

8、union算子

union:上下合并两个RDD,前提是两个RDD中的数据类型要一致,合并后不会对结果进行去重。这里的合并只是逻辑层面上的合并,物理层面其实是没有合并

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo10Union {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Union算子演示")
    val context = new SparkContext(conf)
    //====================================================
    val w1RDD: RDD[String] = context.textFile("spark/data/ws/w1.txt") // 1
    val w2RDD: RDD[String] = context.textFile("spark/data/ws/w2.txt") // 1

    /**
     *  union:上下合并两个RDD,前提是两个RDD中的数据类型要一致,合并后不会对结果进行去重
     *
     *  注:这里的合并只是逻辑层面上的合并,物理层面其实是没有合并
     */

    val unionRDD: RDD[String] = w1RDD.union(w2RDD)
    println(unionRDD.getNumPartitions) // 2

    unionRDD.foreach(println)
    while (true){

    }

  }
}

9、join算子

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
    /**
     *  内连接:join
     *  左连接:leftJoin
     *  右连接:rightJoin
     *  全连接:fullJoin
     */

object Demo11Join {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Join算子演示")
    val context = new SparkContext(conf)
    //====================================================

    //两个kv类型的RDD之间的关联
    //通过scala中的集合构建RDD
    val rdd1: RDD[(String, String)] = context.parallelize(
      List(
        ("1001", "尚平"),
        ("1002", "丁义杰"),
        ("1003", "徐昊宇"),
        ("1004", "包旭"),
        ("1005", "朱大牛"),
        ("1006","汪权")
      )
    )

    val rdd2: RDD[(String, String)] = context.parallelize(
      List(
        ("1001", "崩坏"),
        ("1002", "原神"),
        ("1003", "王者"),
        ("1004", "修仙"),
        ("1005", "学习"),
        ("1007", "敲代码")
      )
    )



    //内连接
//    val innerJoinRDD: RDD[(String, (String, String))] = rdd1.join(rdd2)
//    //加工一下RDD
//    val innerJoinRDD2: RDD[(String, String, String)] = innerJoinRDD.map {
//      case (id: String, (name: String, like: String)) => (id, name, like)
//    }
//    innerJoinRDD2.foreach(println)

    //左连接
    val leftJoinRDD: RDD[(String, (String, Option[String]))] = rdd1.leftOuterJoin(rdd2)
    //加工一下RDD
    val leftJoinRDD2: RDD[(String, String, String)] = leftJoinRDD.map {
      case (id: String, (name: String, Some(like))) => (id, name, like)
      case (id: String, (name: String, None)) => (id, name, "无爱好")
    }
    leftJoinRDD2.foreach(println)

    println("=================================")

    //右连接与左连接相差不多,不在赘述

    //全连接
    val fullJoinRDD: RDD[(String, (Option[String], Option[String]))] = rdd1.fullOuterJoin(rdd2)
    //加工一下RDD
    val fullJoinRDD2: RDD[(String, String, String)] = fullJoinRDD.map {
      case (id: String, (Some(name), Some(like))) => (id, name, like)
      case (id: String, (Some(name), None)) => (id, name, "无爱好")
      case (id: String, (None, Some(like))) => (id, "无姓名", like)
    }
    fullJoinRDD2.foreach(println)

  }

}

10、sortby算子

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo12Student {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("sortby算子演示")
    val context = new SparkContext(conf)
    //====================================================
    //需求:统计总分年级排名前10的学生的各科分数
    //读取分数文件数据
    val scoreRDD: RDD[(String, String, String)] = context.textFile("spark/data/score.txt") // 读取数据文件
      .map((s: String) => s.split(",")) // 切分数据
      .filter((arr: Array[String]) => arr.length == 3) // 过滤掉脏数据
      .map {
        //整理数据,进行模式匹配取出数据
        case Array(sid: String, subject_id: String, score: String) => (sid, subject_id, score)
      }

    //计算每个学生的总分
    val sumScoreWithSidRDD: RDD[(String, Int)] = scoreRDD.map {
      case (sid: String, _: String, score: String) => (sid, score.toInt)
    }.reduceByKey((x: Int, y: Int) => x + y)


    //按照总分排序
    val sumScoreTop10: Array[(String, Int)] = sumScoreWithSidRDD.sortBy(-_._2).take(10)

    //取出前10的学生学号
    val ids: Array[String] = sumScoreTop10.map(_._1)

    //取出每个学生各科分数
    val top10StuScore: RDD[(String, String, String)] = scoreRDD.filter {
      case (id: String, _, _) => ids.contains(id)
    }

    top10StuScore.foreach(println)

  }
}

11、mapValues算子

          也是作用在kv类型的RDD上,主要的作用键不变,处理值

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo13MapValues {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Join算子演示")
    val context = new SparkContext(conf)
    //====================================================
    //需求:统计总分年级排名前10的学生的各科分数
    //读取分数文件数据
    val scoreRDD: RDD[(String, String, String)] = context.textFile("spark/data/score.txt") // 读取数据文件
      .map((s: String) => s.split(",")) // 切分数据
      .filter((arr: Array[String]) => arr.length == 3) // 过滤掉脏数据
      .map {
        //整理数据,进行模式匹配取出数据
        case Array(sid: String, subject_id: String, score: String) => (sid, subject_id, score)
      }

    //计算每个学生的总分
    val sumScoreWithSidRDD: RDD[(String, Int)] = scoreRDD.map {
      case (sid: String, _: String, score: String) => (sid, score.toInt)
    }.reduceByKey((x: Int, y: Int) => x + y)

    /**
     * mapValues算子:也是作用在kv类型的RDD上
     * 主要的作用键不变,处理值
     */
    val resRDD: RDD[(String, Int)] = sumScoreWithSidRDD.mapValues(_ + 1000)
    resRDD.foreach(println)

    //等同于
    val res2RDD: RDD[(String, Int)] = sumScoreWithSidRDD.map((kv: (String, Int)) => (kv._1, kv._2 + 1000))
  }
}

12、mapPartition算子

         mapPartition: 主要作用是一次处理一个分区的数据,将一个分区的数据一个一个传给后面的函数进行处理

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo14mapPartition {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("mapPartition算子演示")
    val context = new SparkContext(conf)
    //====================================================
    //需求:统计总分年级排名前10的学生的各科分数
    //读取分数文件数据
    val scoreRDD: RDD[String] = context.textFile("spark/data/ws/*") // 读取数据文件

    println(scoreRDD.getNumPartitions)

    /**
     *  mapPartition: 主要作用是一次处理一个分区的数据,将一个分区的数据一个一个传给后面的函数进行处理
     *
     *  迭代器中存放的是一个分区的数据
     */
//    val mapPartitionRDD: RDD[String] = scoreRDD.mapPartitions((itr: Iterator[String]) => {
//
//      println(s"====================当前处理的分区====================")
//      //这里写的逻辑是作用在一个分区上的所有数据
//      val words: Iterator[String] = itr.flatMap(_.split("\\|"))
//      words
//    })

//    mapPartitionRDD.foreach(println)

    scoreRDD.mapPartitionsWithIndex{
      case (index:Int,itr: Iterator[String]) =>
        println(s"当前所处理的分区编号是:${index}")
        itr.flatMap(_.split("\\|"))
    }.foreach(println)

  }

}

行动算子:Action

13、collect算子

          以数组的形式返回数据集中所有元素

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo15Actions {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Action算子演示")
    val context = new SparkContext(conf)
    //====================================================

    val studentRDD: RDD[String] = context.textFile("spark/data/students.csv")

    /**
     * 转换算子:transformation 将一个RDD转换成另一个RDD,转换算子是懒执行的,需要一个action算子触发执行
     *
     * 行动算子(操作算子):action算子,触发任务执行。一个action算子就会触发一次任务执行
     */
    println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
    val studentsRDD: RDD[(String, String, String, String, String)] = studentRDD.map(_.split(","))
      .map {
        case Array(id: String, name: String, age: String, gender: String, clazz: String) =>
          println("****************************  ^_^ ********************************")
          (id, name, age, gender, clazz)
      }
    println("$$$$$$$$$$$$$$$$$$$$$$***__***$$$$$$$$$$$$$$$$$$$$$$$$$")

    // foreach其实就是一个action算子
//    studentsRDD.foreach(println)
    //    println("="*100)
    //    studentsRDD.foreach(println)

    //    while (true){
    //
    //    }

    /**
     *  collect()行动算子 主要作用是将RDD转成scala中的数据结构
     *
     */
    val tuples: Array[(String, String, String, String, String)] = studentsRDD.collect()

  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1705592.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大数据开发面试题【ClickHouse篇】

170、clickhouse介绍以及架构 clickhouse一个分布式列式存储数据库,主要用于在线分析查询 171、列式存储和行式存储有什么区别? 行式存储: 1、数据是按行存储的 2、没有建立索引的查询消耗很大的IO 3、建立索引和视图花费一定的物理空间和…

OpenAI 再次刷新认知边界:GPT-4 颠覆语音助手市场,流畅度直逼真人互动?

前言 近日,美国人工智能研究公司 OpenAI 发布了其最新旗舰模型 GPT-4o,这一革命性的进展不仅标志着人工智能领域的新突破,更预示着即将步入一个全新的交互时代?GPT-4o 的发布,对于我们来说,意味着人工智能…

分库分表最全详解(图文全面总结)

分库分表 分库分表是数据库设计、和管理中的一种策略,主要解决随着数据量、和并发访问量的增加而带来的性能、和扩展性问题。 分库分表,主要就是两种常用手段:“分库”、和“分表”。 如下图所示: 分库(Database S…

ML307R OpenCPU 网络初始化流程介绍

一、网络初始化流程 二、函数介绍 三、示例代码 四、代码下载地址 一、网络初始化流程 模组的IMEI/SN获取接口可在include\cmiot\cm_sys.h中查看,SIM卡IMSI/ICCID获取接口可以在include\cmiot\cm_sim.h中查看,PDP激活状态查询可以在include\cmiot\cm_modem.h中查看 二、函…

#12松桑前端后花园周刊-SolidStart、Vercel融资、Angular18、Nextjs15RC、p5.js、ChromeDevTools引入AI

⚡️行业动态 SolidStart 1.0 元框架发布 Solidjs 核心团队发布其元框架 SolidStart 1.0 正式版,其特点如下:基于文件系统的路由;支持SSR、流式SSR、CSR、SSG渲染模式;通过代码分割、树摇和无用代码删除构建优化;基于…

大屏表格实现无限滚动效果

实现效果 实现思路 首先固定最外层的高度,并且设置超出高度后隐藏设置每一行的高度为固定35PX,默认显示10行,所以最外层高度就是 35 * 10 表头的高度遍历时克隆一份表格数据,用于视差效果显示设置滚动动画,让表格行所…

docker image分析利器之dive

dive是一个用于研究 Docker 镜像、层内容以及发现缩小 Docker/OCI 镜像大小方法的开源工具. 开源地址: dive github 为了有个直观的印象, 可以先看一下repo文档中的gif图: 安装 在Ubuntu/Debian系统下,可以使用deb包安装: DIVE_VERSION$(curl -sL "https:/…

Transformer模型的简单学习

前言 Transformer 来源于一篇论文:Attention is all you need TRM在做一件什么事情呢?其实一开始它是被用于机器翻译的: 更详细的: 更详细的: 从上图可以看出,一个Encoders 下面包含了 n 个 Encoder&…

Python Anaconda环境复制

虚拟环境复制 conda-pack 第一种方式 conda打包 在打包之前如果没有conda-pack包的话,需要安装pip install conda-pack打包 conda pack -n py36 -o py366.tar.gz -o就是给导出得到的压缩包就在当前目录下 传输到另外一台服务器上 有两台linux服务器&#xff0c…

详析河南道路与桥梁乙级资质新办条件

河南道路与桥梁乙级资质新办条件详析如下: 一、企业基本条件 独立企业法人资格: 申请人必须是具有独立企业法人资格的单位。注册资金: 企业的注册资金应不少于100万元人民币。社会信誉: 申请人应具有良好的社会信誉,无…

RunnerGo V4.6.0 多项新增功能,快看看有没有你想要的!

RunnerGo V4.6.0版本上线,不仅对现有功能进行了深度优化和改进,还带来了诸多新功能。 UI 插件:浮窗升级,优化浏览体验 此次更新中,UI插件全新升级至V2.1版本。新版取消了页面内右下角按钮的设计,在浏览器右…

postman调用Grpc

环境: .net6.0 一、准备 安装nuget: Grpc.AspNetCore Google.Protobuf Grpc.Core.Api Grpc.Tools Grpc.AspNetCore.Server.Reflection Program.cs: public class Program{public static void Main(string[] args){var builder WebApplicat…

Linux 删除SSH密钥(id_ed25519),重新生成

在Linux系统中,重新生成SSH密钥(比如id_ed25519)的过程包括删除现有的密钥文件并生成一个新的。 以下是具体的步骤: 0. 查看下是否有密钥 1. 删除原有的id_ed25519密钥 默认情况下,SSH密钥存储在用户的主目录下的 .…

最新!!2024年上半年软考【中级软件设计师】综合知识真题解析

2024上半年软考考试已经结束了,为大家整理了网友回忆版的软件设计师真题及答案,总共30道题。 上半年考试的宝子们可以对答案预估分数!准备下半年考的宝子可以提前把握考试知识点和出题方向,说不定会遇到相同考点的题目&#xff01…

网络流量探针与流量分析系统:全面指南

目录 什么是网络流量探针? 流量分析系统的功能与重要性 流量分析系统的主要功能 流量分析系统的重要性 AnaTraf 网络流量分析仪 如何选择合适的网络流量探针与流量分析系统? 1. 性能与扩展性 2. 易用性与部署 3. 数据可视化与报告 4. 安全性与…

指定GPU运行程序设置cmd运行的程序后台运行

一、指定GPU运行程序 因为条件限制,拿到的资源只有一块GPU,这时我们需要设置程序在指定的GPU运行。解决思路:在train文件中设置环境变量,让程序在指定GPU运行。 import os os.environ["CUDA_VISIBLE_DEVICES"] "…

汽车合面合壳密封UV胶固化后一般可以耐多少度的高温和低温? 汽车车灯的灯罩如果破损破裂破洞了要怎么修复?

汽车合面合壳密封UV胶固化后一般可以耐多少度的高温和低温? UV胶固化后的耐高温和低温能力取决于具体的UV胶水品牌和型号,以及固化过程中的条件。一般来说,高品质的UV胶水在固化后可以提供较好的耐温性能,但确切的耐温范围需要参考各个厂家提…

ubuntu使用oh my zsh美化终端

ubuntu使用oh my zsh美化终端 文章目录 ubuntu使用oh my zsh美化终端1. 安装zsh和oh my zsh2. 修改zsh主题3. 安装zsh插件4. 将.bashrc移植到.zshrcReference 1. 安装zsh和oh my zsh 首先安装zsh sudo apt install zsh然后查看本地有哪些shell可以使用 cat /etc/shells 将默…

gmssl vs2010编译

1、虚拟机win10 x64,离线安装vs2010和2010sp1补丁; 2、安装ActivePerl_v5.28.1.0000和nasm-2.16.03-installer-x64均是默认完整安装; nasm官网下载: Index of /pub/nasm/releasebuilds/2.16.03/win64https://www.nasm.us/pub/nas…

链表带环问题的思考

判断链表是否带环 思路:快慢指针 慢指针走一步,快指针走两步,当快指针追上慢指针时,代表该链表带环。代码如下: /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/ …