【Spark】RDD转换算子

news2024/11/26 14:34:17

目录

map

mapPartitions

mapPartitionsWithIndex

flatMap

glom

groupBy

shuffle

filter

sample

distinct

coalesce

repartition

sortBy

ByKey

intersection

union

subtract

zip

partitionBy

reduceByKey

groupByKey

reduceByKey 和 groupByKey 的区别

aggregateByKey

foldByKey

combineByKey

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别

join

leftOuterJoin

cogroup


完成永远比完美更重要

Value类型

map

def map[U: ClassTag](f: T => U): RDD[U]
将处理的数据 逐条 进行映射转换(A => B),这里的转换可以是类型的转换,也可以是值的转换。
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(
 num => {
    num * 2
 }
)
val dataRDD2: RDD[String] = dataRDD1.map(
 num => {
     "" + num
 }
)
实例
从服务器日志数据 apache.log 中获取用户请求 URL 资源路径
apache.log
83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
83.149.9.216 - - 17/05/2015:10:05:57 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Bold.ttf
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Regular.ttf
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard.png
83.149.9.216 - - 17/05/2015:10:05:46 +0000 GET /presentations/logstash-monitorama-2013/images/Dreamhost_logo.svg
83.149.9.216 - - 17/05/2015:10:05:11 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard2.png
83.149.9.216 - - 17/05/2015:10:05:19 +0000 GET /presentations/logstash-monitorama-2013/images/apache-icon.gif
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/nagios-sms5.png
83.149.9.216 - - 17/05/2015:10:05:00 +0000 GET /presentations/logstash-monitorama-2013/images/redis.png
83.149.9.216 - - 17/05/2015:10:05:25 +0000 GET /presentations/logstash-monitorama-2013/images/elasticsearch.png
83.149.9.216 - - 17/05/2015:10:05:59 +0000 GET /presentations/logstash-monitorama-2013/images/logstashbook.png
83.149.9.216 - - 17/05/2015:10:05:30 +0000 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 - - 17/05/2015:10:05:53 +0000 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/1983_delorean_dmc-12-pic-38289.jpeg
83.149.9.216 - - 17/05/2015:10:05:54 +0000 GET /presentations/logstash-monitorama-2013/images/simple-inputs-filters-outputs.jpg
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/tiered-outputs-to-inputs.jpg
83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
24.236.252.67 - - 17/05/2015:10:05:40 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /articles/dynamic-dns-with-dhcp/
93.114.45.13 - - 17/05/2015:10:05:04 +0000 GET /reset.css
93.114.45.13 - - 17/05/2015:10:05:45 +0000 GET /style2.css
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:17 +0000 GET /images/jordan-80.png
93.114.45.13 - - 17/05/2015:10:05:21 +0000 GET /images/web/2009/banner.png
package com.qihang.bigdata.spark.core.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.textFile("datas/apache.log")

    //长String => 短String
    val mapRDD: RDD[String] = rdd.map(
      line => {
        val data = line.split(" ")
        data(6)
      }
    )

    mapRDD.collect().foreach(println)

    sc.stop()

  }
}
 1. rdd的计算一个分区内的数据是一个一个执行逻辑
    只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
    分区内数据的执行是有序的。
 2. 不同分区数据计算是无序的。

mapPartitions

def mapPartitions[U: ClassTag](
 f: Iterator[T] => Iterator[U],
 preservesPartitioning: Boolean = false): RDD[U]
将待处理的数据 以分区为单位 发送到计算节点进行处理,这里的处理是指可以进行任意的处
理,哪怕是过滤数据。(将一个分区的迭代器传入 => 处理后整个分区的迭代器)
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
 datas => {
     datas.filter(_==2)
 }
)
实例
获取每个数据分区的最大值
package com.qihang.bigdata.spark.core.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    val mpRDD = rdd.mapPartitions(
      iter => {
        List(iter.max).iterator // 需要返回迭代器,用List()封装
      }
    )

    mpRDD.collect().foreach(println(_))

    sc.stop()

  }
}
可以以分区为单位进行数据转换操作
但是会将整个分区的数据加载到内存进行引用
如果处理完的数据是不会被释放掉,存在对象的引用。
在内存较小,数据量较大的场合下,容易出现内存溢出。
map mapPartitions 的区别
数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子
是以分区为单位进行批处理操作。
功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。
MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,
所以可以增加或减少数据
性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处
理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能
不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。

mapPartitionsWithIndex

def mapPartitionsWithIndex[U: ClassTag](
 f: (Int, Iterator[T]) => Iterator[U],
 preservesPartitioning: Boolean = false): RDD[U]
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处
理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
val dataRDD1 = dataRDD.mapPartitionsWithIndex(
 (index, datas) => {
     datas.map(index, _)
 }
)

实例

获取第二个分区的数据

package com.qihang.bigdata.spark.core.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    val mpiRDD = rdd.mapPartitionsWithIndex(
      (index, iter) => {
        if (index == 1) { //0,1 从零开始,index==1为第二个分区
          iter
        } else {
          Nil.iterator  // 
        }
      }
    )

    mpiRDD.collect().foreach(println(_))

    sc.stop()

  }
}

将数据和所在分区合并成元组

        val rdd = sc.makeRDD(List(1,2,3,4))

        val mpiRDD = rdd.mapPartitionsWithIndex(
            (index, iter) => {
                // 1,   2,    3,   4
                //(0,1)(2,2),(4,3),(6,4)
                iter.map(
                    num => {
                        (index, num)
                    }
                )
            }
        )

flatMap

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
 
val dataRDD = sparkContext.makeRDD(List(
 List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
 list => list
)

glom

def glom(): RDD[Array[T]]
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // List => Int
    // Int => Array
    val glomRDD: RDD[Array[Int]] = rdd.glom()

    glomRDD.collect().foreach(data => println(data.mkString(",")))

案例

计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)

        // 【1,2】,【3,4】
        // 【2】,【4】
        // 【6】
        val glomRDD: RDD[Array[Int]] = rdd.glom()

        val maxRDD: RDD[Int] = glomRDD.map(
            array => {
                array.max
            }
        )
        println(maxRDD.collect().sum)

groupBy

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(
 _%2
)

shuffle

将数据根据指定的规则进行分组 , 分区默认不变,但是数据会被 打乱重新组合 ,我们将这样
的操作称之为 shuffle 。极限情况下,数据可能被分在同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组,多个组可以放在一个分区里。
所以分区数和分组数无关。 

实例

List("Hello", "hive", "hbase", "Hadoop") 根据单词首写字母进行分组。
        val rdd  = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)

        // 分组和分区没有必然的关系
        val groupRDD = rdd.groupBy(_.charAt(0))

        groupRDD.collect().foreach(println)

filter

def filter(f: T => Boolean): RDD[T]
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。
当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出
数据倾斜
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4
),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)

实例

从服务器日志数据 apache.log 中获取 2015 5 17 日的请求路径
83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png
83.149.9.216 - - 17/05/2015:10:05:57 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Bold.ttf
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/css/fonts/Roboto-Regular.ttf
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png
83.149.9.216 - - 17/05/2015:10:05:50 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard.png
83.149.9.216 - - 17/05/2015:10:05:46 +0000 GET /presentations/logstash-monitorama-2013/images/Dreamhost_logo.svg
83.149.9.216 - - 17/05/2015:10:05:11 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard2.png
83.149.9.216 - - 17/05/2015:10:05:19 +0000 GET /presentations/logstash-monitorama-2013/images/apache-icon.gif
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/nagios-sms5.png
83.149.9.216 - - 17/05/2015:10:05:00 +0000 GET /presentations/logstash-monitorama-2013/images/redis.png
83.149.9.216 - - 17/05/2015:10:05:25 +0000 GET /presentations/logstash-monitorama-2013/images/elasticsearch.png
83.149.9.216 - - 17/05/2015:10:05:59 +0000 GET /presentations/logstash-monitorama-2013/images/logstashbook.png
83.149.9.216 - - 17/05/2015:10:05:30 +0000 GET /presentations/logstash-monitorama-2013/images/github-contributions.png
83.149.9.216 - - 17/05/2015:10:05:53 +0000 GET /presentations/logstash-monitorama-2013/css/print/paper.css
83.149.9.216 - - 17/05/2015:10:05:24 +0000 GET /presentations/logstash-monitorama-2013/images/1983_delorean_dmc-12-pic-38289.jpeg
83.149.9.216 - - 17/05/2015:10:05:54 +0000 GET /presentations/logstash-monitorama-2013/images/simple-inputs-filters-outputs.jpg
83.149.9.216 - - 17/05/2015:10:05:33 +0000 GET /presentations/logstash-monitorama-2013/images/tiered-outputs-to-inputs.jpg
83.149.9.216 - - 17/05/2015:10:05:56 +0000 GET /favicon.ico
24.236.252.67 - - 17/05/2015:10:05:40 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /articles/dynamic-dns-with-dhcp/
93.114.45.13 - - 17/05/2015:10:05:04 +0000 GET /reset.css
93.114.45.13 - - 17/05/2015:10:05:45 +0000 GET /style2.css
93.114.45.13 - - 17/05/2015:10:05:14 +0000 GET /favicon.ico
93.114.45.13 - - 17/05/2015:10:05:17 +0000 GET /images/jordan-80.png
93.114.45.13 - - 17/05/2015:10:05:21 +0000 GET /images/web/2009/banner.png
        val rdd = sc.textFile("datas/apache.log")

        val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
            line => {
                val datas = line.split(" ")
                val time = datas(3)
                //time.substring(0, )
                val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
                val date: Date = sdf.parse(time)
                val sdf1 = new SimpleDateFormat("HH")
                val hour: String = sdf1.format(date)
                (hour, 1)
            }
        ).groupBy(_._1)
        timeRDD.map{
            case ( hour, iter ) => {
                (hour, iter.size)
            }
        }.collect.foreach(println)

sample

def sample(
withReplacement: Boolean,
fraction:Double,
seed: Long = Utils.random.nextLong): RDD[T]
根据指定的规则从数据集中 抽取 数据
  // sample算子需要传递三个参数
        // 1. 第一个参数表示,抽取数据后是否将数据返回 true(放回),false(丢弃)
        // 2. 第二个参数表示,
        //         如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
        //         如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
        // 3. 第三个参数表示,抽取数据时随机算法的种子
        //                    如果不传递第三个参数,那么使用的是当前系统时间
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4
),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不
要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)

防止数据倾斜。

distinct

def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
将数据集中重复的数据去重
//源码 
// map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
        // (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
        // (1, null)(1, null)(1, null)
        // (null, null) => null
        // (1, null) => 1
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),1)
val dataRDD1 = dataRDD.distinct()

coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false,
 partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
 (implicit ord: Ordering[T] = null)
 : RDD[T]
根据数据量 缩减分区 ,用于大数据集过滤后,提高小数据集的执行效率
spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少
分区的个数,减小任务调度
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)

扩大分区
 // coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
        // 所以如果想要实现扩大分区的效果,需要使用shuffle操作
        // spark提供了一个简化的操作
        // 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
        // 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
        //val newRDD: RDD[Int] = rdd.coalesce(3, true)

repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true 。无论是将分区数多的
RDD 转换为分区数少的 RDD ,还是将分区数少的 RDD 转换为分区数多的 RDD repartition
操作都可以完成,因为无论如何都会经 shuffle 过程。
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)

sortBy

def sortBy[K](
 f: (T) => K,
ascending: Boolean = true,
 numPartitions: Int = this.partitions.length)
 (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理
的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一
致。中间存在 shuffle 的过程。
val dataRDD = sparkContext.makeRDD(List(
 1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)

ByKey

双Value类型

intersection

def intersection(other: RDD[T]): RDD[T]

union

def union(other: RDD[T]): RDD[T]

subtract

def subtract(other: RDD[T]): RDD[T]

zip

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
        // Can't zip RDDs with unequal numbers of partitions: List(2, 4)
        // 两个数据源要求分区数量要保持一致
        // Can only zip RDDs with same number of elements in each partition
        // 两个数据源要求分区中数据数量保持一致
        val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),2)
        val rdd2 = sc.makeRDD(List(3,4,5,6),2)

        val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
        println(rdd6.collect().mkString(","))
        // 交集,并集和差集要求两个数据源数据类型保持一致
        // 拉链操作两个数据源的类型可以不一致

        val rdd1 = sc.makeRDD(List(1,2,3,4))
        val rdd2 = sc.makeRDD(List(3,4,5,6))
        val rdd7 = sc.makeRDD(List("3","4","5","6"))

        // 交集 : 【3,4】
        val rdd3: RDD[Int] = rdd1.intersection(rdd2)
        //val rdd8 = rdd1.intersection(rdd7)
        println(rdd3.collect().mkString(","))

        // 并集 : 【1,2,3,4,3,4,5,6】
        val rdd4: RDD[Int] = rdd1.union(rdd2)
        println(rdd4.collect().mkString(","))

        // 差集 : 【1,2】
        val rdd5: RDD[Int] = rdd1.subtract(rdd2)
        println(rdd5.collect().mkString(","))

        // 拉链 : 【1-3,2-4,3-5,4-6】
        val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
        val rdd8 = rdd1.zip(rdd7)
        println(rdd6.collect().mkString(","))

Key-Value类型

partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]
将数据按照指定 Partitioner 重新进行分区。 Spark 默认的分区器是 HashPartitioner
        val rdd = sc.makeRDD(List(1,2,3,4),2)

        val mapRDD:RDD[(Int, Int)] = rdd.map((_,1))
        // RDD => PairRDDFunctions
        // 隐式转换(二次编译)

        // partitionBy根据指定的分区规则对数据进行重分区
import org.apache.spark.HashPartitioner
        val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
        newRDD.partitionBy(new HashPartitioner(2))

        newRDD.saveAsTextFile("output")

reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
可以将数据按照相同的 Key Value 进行聚合
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.reduceByKey(_+_)
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)
reduceByKey分区内和分区间计算规则相同。

groupByKey

def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K,
将数据源的数据根据 key value 进行分组
val dataRDD1 =
 sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey()
val dataRDD3 = dataRDD1.groupByKey(2)
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))

reduceByKey groupByKey 的区别

shuffle 的角度 reduceByKey groupByKey 都存在 shuffle 的操作,但是 reduceByKey
可以在 shuffle 前对分区内相同 key 的数据进行预聚合( combine )功能,这样会减少落盘的
数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题, reduceByKey 性能比较
高。
从功能的角度 reduceByKey 其实包含分组和聚合的功能。 GroupByKey 只能分组,不能聚
合,所以在分组聚合的场合下,推荐使用 reduceByKey ,如果仅仅是分组而不需要聚合。那
么还是只能使用 groupByKey

aggregateByKey

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
 combOp: (U, U) => U): RDD[(K, U)]
将数据根据 不同的规则 进行分区内计算和分区间计算
val dataRDD1 =
 sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 =
 dataRDD1.aggregateByKey(0)(_+_,_+_)

实例

取出每个分区内 相同 key 的最大值然后分区间相加
// TODO : 取出每个分区内相同 key 的最大值然后分区间相加
// aggregateByKey 算子是函数柯里化,存在两个参数列表
// 1. 第一个参数列表中的参数表示初始值
// 2. 第二个参数列表中含有两个参数
// 2.1 第一个参数表示分区内的计算规则
// 2.2 第二个参数表示分区间的计算规则
val rdd =
 sc.makeRDD(List(
 ("a",1),("a",2),("c",3),
 ("b",4),("c",5),("c",6)
 ),2)
// 0:("a",1),("a",2),("c",3) => (a,10)(c,10)
// => (a,10)(b,10)(c,20)
// 1:("b",4),("c",5),("c",6) => (b,10)(c,10)
val resultRDD =
 rdd.aggregateByKey(10)(
 (x, y) => math.max(x,y),
 (x, y) => x + y
 )
resultRDD.collect().foreach(println)

foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)

combineByKey

def combineByKey[C](
 createCombiner: V => C,
 mergeValue: (C, V) => C,
 mergeCombiners: (C, C) => C): RDD[(K, C)]
最通用的对 key-value rdd 进行聚集操作的聚集函数( aggregation function )。类似于
aggregate() combineByKey() 允许用户返回值的类型与输入不一致。
实例
将数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
求每个 key 的平 均值
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), 
("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
 (_, 1),
 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

reduceByKeyfoldByKeyaggregateByKeycombineByKey 的区别

reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相 同
AggregateByKey :相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规
则可以不相同
CombineByKey: 当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区
内和分区间计算规则不相同。
/*
        reduceByKey:

             combineByKeyWithClassTag[V](
                 (v: V) => v, // 第一个值不会参与计算
                 func, // 分区内计算规则
                 func, // 分区间计算规则
                 )

        aggregateByKey :

            combineByKeyWithClassTag[U](
                (v: V) => cleanedSeqOp(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
                cleanedSeqOp, // 分区内计算规则
                combOp,       // 分区间计算规则
                )

        foldByKey:

            combineByKeyWithClassTag[V](
                (v: V) => cleanedFunc(createZero(), v), // 初始值和第一个key的value值进行的分区内数据操作
                cleanedFunc,  // 分区内计算规则
                cleanedFunc,  // 分区间计算规则
                )

        combineByKey :

            combineByKeyWithClassTag(
                createCombiner,  // 相同key的第一条数据进行的处理函数
                mergeValue,      // 表示分区内数据的处理函数
                mergeCombiners,  // 表示分区间数据的处理函数
                )

         */

join

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
在类型为 (K,V) (K,W) RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的
(K,(V,W)) RDD
存在笛卡儿积
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)

leftOuterJoin

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
类似于 SQL 语句的左外连接
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)

cogroup

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))
在类型为 (K,V) (K,W) RDD 上调用,返回一个 (K,(Iterable<V>,Iterable<W>)) 类型的 RDD
    val rdd1 = sc.makeRDD(List(
      ("a", 1), ("b", 2) , ("c", 3)
    ))

    val rdd2 = sc.makeRDD(List(
      ("a", 4), ("b", 5), ("c", 6), ("c", 7)
    ))

    // cogroup : connect + group (分组,连接)
    val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)

    cgRDD.collect().foreach(println)



(a,(CompactBuffer(1),CompactBuffer(4)))
(b,(CompactBuffer(2),CompactBuffer(5)))
(c,(CompactBuffer(3),CompactBuffer(6, 7)))

案例

package com.qihang.bigdata.spark.core.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MapTest")
    val sc = new SparkContext(sparkConf)

    val dataRDD = sc.textFile("datas/agent.log")



    val mapRDD = dataRDD.map(
      line => {
        val datas = line.split(" ")
        ((datas(1), datas(4)), 1)
      }
    )

    val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)
    //如果map中的数据有特定的格式, 可以用模式匹配简化
    //case 特定格式
    //https://www.cnblogs.com/JYB2021/p/16199184.html
    val newMapRDD: RDD[(String, (String, Int))] = reduceRDD.map {
      case ((prv, ad), sum) =>
        (prv, (ad, sum))
    }

    val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()

    //只对 value 进行迭代 用mapValues
    val resultRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues(
      iter => {
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
      }
    )

    resultRDD.collect().foreach(println)

    sc.stop()

  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/720558.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

kafka的broker和replica和文件存储

zookeeper中存储的kafka信息 /brokers/ids&#xff0c;记录存在的服务器id/brokers/topics/test/partitions/0/state&#xff0c;记录leader和可用副本服务器/comsumers&#xff0c;0.9版本之前存储消费者的offset信息&#xff0c;但是会产生zookeeper和broker的跨节点通信/co…

SPSS读取数据文件(一)

1.读取Excel数据文件 &#xff08;1&#xff09;选择“文件”-“打开”-“数据”,在弹出的“打开数据”对话框下选择Excel文件&#xff0c;如图所示 &#xff08;2&#xff09;选择要打开的Excel文件&#xff0c;点击“打开”&#xff0c;如图所示 &#xff08;3&#xff09;可…

[论文总结]YOLO v1、YOLO v2、YOLO v3、YOLO v4、YOLOv5

背景 在这里我们主要介绍YOLO 系列的相关目标检测算法&#xff0c;从最开始的YOLO v1 一直到 YOLO v5。本文也借鉴了其他文档和原始论文。总结下来这五个方法的演进线路如下表格所示。 对比维度YOLO v1YOLO v2YOLO v3YOLO v4YOLO v5backboneVGGdarknet19darknet53darknet53da…

Lua学习笔记:浅谈对垃圾回收的理解

前言 本篇在讲什么 Lua的垃圾回收 本篇适合什么 适合初学Lua的小白 本篇需要什么 对Lua语法有简单认知 依赖Sublime Text编辑器 本篇的特色 具有全流程的图文教学 重实践&#xff0c;轻理论&#xff0c;快速上手 提供全流程的源码内容 ★提高阅读体验★ &#x1f…

Unity:sentinel key not found (h0007)

SSD换电脑&#xff0c;unity 编辑器无法打开&#xff1b; 具体步骤&#xff1a; 删除这个路径下的文件 C:\ProgramData\SafeNet 下 Sentinel LDK 打开Windows 的Cmd 命令行&#xff0c;输入编辑器版本下Unity.exe的路径&#xff0c; CD E:\Dev_Env\Unity\Hub\Editor\2020.3.3…

如何将 arduino-esp32 库作为 ESP-IDF 组件使用?

相关文档 arduino-esp32 SDKESP-IDF SDKESP-IDF 软件环境搭建说明Arduino 软件环境使用说明Arduino as an ESP-IDF component &#xff08;官方说明&#xff09; 环境准备 目前&#xff0c;最新 Master 版本的 arduino-esp32 SDK 要求使用 v4.4 版本的 ESP-IDF SDK 软件编译环…

05、Nginx反向代理

一、网关、代理与反向代理&#xff1a; 在Nginx中&#xff0c;网关、代理和反向代理是三种常见的功能&#xff0c;用于转发和处理请求。下面是它们的简要介绍&#xff1a; 网关&#xff08;Gateway&#xff09;&#xff1a; 网关在网络通信中起到中介的作用&#xff0c;将客户…

【视觉SLAM入门】1. 基础知识,运动观测,旋转(旋转矩阵,轴角,欧拉角,四元数)和eigen库基础

"山薮藏疾" 1. 运动与观测1.1 通用运动方程1.2 通用观测方程1.3 对SLAM的认识 2. 三维运动2.1 旋转与平移2.2 变换矩阵2.3 矩阵知识补充2.4 旋转向量2.5 欧拉角2.6 四元数2.7 其他变换 3. 编程基础3.1 链接库说明3.2 eigen库 注&#xff1a; 以后的方程中如未说明&am…

面试官:一千万的数据,要怎么查?

一个老生常谈的问题&#xff0c;SELECT *和SELECT具体字段那个快&#xff1f;在数据量少的时候可能没什么差别&#xff0c;但是数据量达到千万级&#xff0c;差距就显现出来。废话不多说&#xff0c;往下看 ↓。 SELECT * 和 SELECT 具体字段的区别 在 MySQL 中&#xff0c;SE…

批量规范化

✨✨✨ 感谢优秀的你打开了小白的文章 “希望在看文章的你今天又进步了一点点&#xff0c;生活更加美好&#xff01;”&#x1f308;&#x1f308;&#x1f308; 目录 1.批量规范化基本原理 2.批量规范化的使用 2.1对于全连接层 2.2对于卷积层 3.代码实现 3.1方式一 …

26488-24-4,Cyclo(D-Phe-L-Pro),具有良好的生物相容性

资料编辑|陕西新研博美生物科技有限公司小编MISSwu​ 【产品描述】 Cyclo(D-Phe-L-Pro)环(D-苯丙氨酸-L-脯氨酸)&#xff0c;环二肽是由两个氨基酸通过肽键环合形成&#xff0c;是自然界中小的环状肽。由于其存在两个酰胺键即四个可以形成氢键的位点&#xff0c;环二肽可以在氢…

商业海外社交媒体营销10步指南 [2023]

如今&#xff0c;社交媒体是任何成功商业战略的重要组成部分。这不是奢侈品&#xff0c;而是必需品。全球有超过 36 亿人使用社交媒体&#xff0c;它是企业展示其产品和服务、建立品牌知名度以及与客户联系的数字游乐场。 但这不仅仅是娱乐和游戏。要在社交媒体上取得成功&…

Golang每日一练(leetDay0114) 矩阵中的最长递增路径、按要求补齐数组

目录 329. 矩阵中的最长递增路径 Longest Increasing Path In A Matrix &#x1f31f;&#x1f31f; 330. 按要求补齐数组 Patching Array &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日一练 专栏 Golang每日一练 专栏 Python每日…

数据结构--二叉树的性质

数据结构–二叉树的性质 二叉树常考性质 常见考点1: 设非空二叉树中度为0、1和2的结点个数分别为 n 0 、 n 1 和 n 2 &#xff0c;则 n 0 n 2 1 n_0、n_1和n_2&#xff0c;则n_0 n_2 1 n0​、n1​和n2​&#xff0c;则n0​n2​1 n 0 n 2 1 \color{red}n_0 n_2 1 n0​n2​…

图层中大型数据集的分块处理思路

图层中大型数据集的分块处理思路 为改善要素叠加工具&#xff08;如联合和相交&#xff09;的性能和可伸缩性&#xff0c;软件采用了称为自适应细分处理的运算逻辑。当可用的物理内存不足以对数据进行处理时&#xff0c;就会触发系统使用此逻辑。由于保持在物理内存的可用范围…

助力企业完成等保2.0的重要工具

在当今数字化时代&#xff0c;企业面临着越来越多的网络安全威胁和数据泄露风险。为了保护敏感信息和维护业务的连续性&#xff0c;许多企业正在积极采取措施来实施等保2.0标准。在这一过程中&#xff0c;EventLog Analyzer作为一种全面的安全信息与事件管理解决方案&#xff0…

swagger2word使用(将swagger2转化为word)

开源项目地址 https://github.com/JMCuixy/swagger2word 项目使用 1、项目拉下来以后先修改application.xml配置文件红框内容&#xff0c;将其修改为要换为自己swagger文档的地址 2、运行项目后输入地址http://127.0.0.1:8080/toWord 即可下载word文档

结构体和数据结构--共用体

共用体&#xff0c;也称联合&#xff08;Union&#xff09;&#xff0c;是将不同类型的数据组织在一起共同占用同一段内存的一种构造数据类型。共用体与结构体的类型声明方法类似&#xff0c;只是关键字变为了Union。 例题&#xff1a;演示共用体所占内存字节数的计算方法 #i…

如何用手机制作3D人物模型素材

3D人物模型素材是现代3D游戏和电影制作中必不可少的一部分。它们是数字艺术家和设计师们用来创造逼真世界的关键。3D人物模型素材是用计算机程序制作的虚拟人物&#xff0c;可以被用于电影、电视、游戏和虚拟现实应用中。它们可以被用来代替实际演员&#xff0c;也可以被用来创…

小程序蓝牙通信

蓝牙通信能力封装 一开始是根据uniapp提供的蓝牙api写的蓝牙方法&#xff0c;之后发现复用性&#xff0c;以及一些状态的监听存在缺陷&#xff0c;之后整理成了类。这样复用性以及状态监听的问题就解决了。 蓝牙组件 创建蓝牙组件的类 单例模式是为了保证蓝牙长连接&#xff0…