Spark-RDD 转换算子(双 Value 类型、Key - Value 类型)

news2025/1/15 23:06:12

双 Value 类型

1、intersection(交集)

2、union(并集)

3、subtract(差集)

4、zip(拉链)

Key - Value 类型

1、partitionBy 

2、reduceByKey

3、groupByKey

4、aggregateByKe

5、foldByKey

6、combineByKey

7、join

8、 leftOuterJoin

9、cogroup


双 Value 类型

1、intersection(交集)

        对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

def intersection(other: RDD[T]): RDD[T]
val dataRDD1 = sc.makeRDD(List(1,2,3,4))
val dataRDD2 = sc.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)

2、union(并集)

        对源 RDD 和参数 RDD 求并集后返回一个新的 RDD 

def union(other: RDD[T]): RDD[T]
val dataRDD1 = sc.makeRDD(List(1,2,3,4))
val dataRDD2 = sc.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.union(dataRDD2)

 3、subtract(差集)

        以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集 

def subtract(other: RDD[T]): RDD[T]
val dataRDD1 = sc.makeRDD(List(1,2,3,4))
val dataRDD2 = sc.makeRDD(List(3,4,5,6))
val dataRDD = sc.subtract(dataRDD2)

4、zip(拉链)

        将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素。 

        要求两个数据源分区数量要保持一致

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
val dataRDD1 = sc.makeRDD(List(1,2,3,4))
val dataRDD2 = sc.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.zip(dataRDD2)
println(dataRDD.collect.mkString(" "))

Key - Value 类型

1、partitionBy 

        将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

def partitionBy(partitioner: Partitioner): RDD[(K, V)]
def main(args: Array[String]): Unit = {
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1,2,3,4),2)

    val mapRDD: RDD[(Int, Int)] = rdd.map((_,1))
    //partitionBy根据指定的分区规则对数据进行重新分区
    mapRDD.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")

  //关闭环境
  sc.stop()
  }

 

2、reduceByKey

        可以将数据按照相同的 Key 对 Value 进行聚合

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
  def main(args: Array[String]): Unit = {
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4)))

    //相同Key的数据进行value数据的聚合(两两聚合)
    val rdRDD: RDD[(String, Int)] = rdd.reduceByKey((x:Int, y:Int)=> x+y)
    rdRDD.collect().foreach(println)

  //关闭环境
  sc.stop()
  }

3、groupByKey

        将数据源的数据根据 key 对 value 进行分组 

def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
def main(args: Array[String]): Unit = {
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4)))

    //将数据源中相同Key的数据分在一个组中,形成一个对偶元组
    //元组中第一个元素:Key  ,元组中第二个元素:相同Key中相同Key的value的集合
    val gpRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()

    gpRDD.collect().foreach(println)
  //关闭环境
  sc.stop()
  }

4、aggregateByKe

        将数据根据不同的规则进行分区内计算和分区间计算 

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
 combOp: (U, U) => U): RDD[(K, U)]
def main(args: Array[String]): Unit = {
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)))

    //将数据源中相同Key的数据分在一个组中,形成一个对偶元组
    //元组中第一个元素:Key  ,元组中第二个元素:相同Key中相同Key的value的集合
    //存在柯里化 有两个参数 rdd.aggregate(初始值)(分区内计算规则,分区间计算规则)
    //初始值:主要用于当碰见一个Key的时候 和value进行分区计算
    rdd.aggregateByKey(0)(
      (x,y) => math.max(x,y),
      (x,y) => x + y
    ).collect().foreach(println)

  //关闭环境
  sc.stop()
  }

5、foldByKey

        当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
 def main(args: Array[String]): Unit = {
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)))
    
    rdd.foldByKey(0)(_+_).collect().foreach(println)

  //关闭环境
  sc.stop()
  }

6、combineByKey

        最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于 aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

def combineByKey[C](
 createCombiner: V => C,
 mergeValue: (C, V) => C,
 mergeCombiners: (C, C) => C): RDD[(K, C)]

        将数据 List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个 key 的平 均值 :

def main(args: Array[String]): Unit = {
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)))
    //combineByKey(参数1:将相同key的第一个数据进行结构的转换实现操作 ,参数2:分区内计算规则 ,参数3:分区间计算规则)
    rdd.combineByKey(
      (_, 1),
      (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
      (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    ).collect().foreach(println)

  //关闭环境
  sc.stop()
  }

7、join

        在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def main(args: Array[String]): Unit = {
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
    val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))
    //连接条件:Key需要相同
    //如果两个数据源中没有相同Key元素,那么改数据不会出现在结果中
    //如果两个数据源中Key有多个相同的数据,则会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长(导致性能降低)
    val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)

    joinRDD.collect().foreach(println)

  //关闭环境
  sc.stop()
  }

8、 leftOuterJoin

        类似于 SQL 语句的左外连接

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def main(args: Array[String]): Unit = {
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2)))
    val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))

    val leftRDD: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
    val rightRDD: RDD[(String, (Option[Int], Int))] = rdd1.rightOuterJoin(rdd2)

    leftRDD.collect().foreach(println)
    println("----------------------------------------")
    rightRDD.collect().foreach(println)

  //关闭环境
  sc.stop()
  }

 9、cogroup

        在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
def main(args: Array[String]): Unit = {
    //准备环境
    //"*"代表线程的核数   应用程序名称"RDD"
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2)))
    val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))

    //connect + group = cogroup (分组与连接的概念)
     val cRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)

    cRDD.collect().foreach(println)
    
  //关闭环境
  sc.stop()
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/402370.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

FinOps首次超越安全成为企业头等大事|云计算趋势报告

随着云计算在过去十年中的广泛应用,云计算用户所面临的一个持续不变的趋势是:安全一直是用户面临的首要挑战。然而,这种情况正在发生转变。 知名IT软件企业 Flexera 对云计算决策者进行年度调研已经持续12年,而今年安全问题首次…

3.初识Vue

目录 1 vue 浏览器调试工具 1.1 安装 1.2 配置 2 数据驱动视图与双向数据绑定 3 简单使用 3.1 下载 3.2 将信息渲染到DOM上 4 使用vue浏览器调试工具 5 vue指令 1 vue 浏览器调试工具 chrome可能是我浏览器的原因,装上用不了,我们使…

javaWeb核心05-FilterListenerAjax

文章目录Filter&Listener&Ajax1,Filter1.1 Filter概述1.2 Filter快速入门1.2.1 开发步骤1.2.2 代码演示1.3 Filter执行流程1.4 Filter拦截路径配置1.5 过滤器链1.5.1 概述1.5.2 代码演示1.5.3 问题1.6 案例1.6.1 需求1.6.2 分析1.6.3 代码实现1.6.3.1 创建F…

JavaScript Date(日期)对象

日期对象用于处理日期和时间。在线实例返回当日的日期和时间如何使用 Date() 方法获得当日的日期。getFullYear()使用 getFullYear() 获取年份。getTime()getTime() 返回从 1970 年 1 月 1 日至今的毫秒数。setFullYear()如何使用 setFullYear() 设置具体的日期。toUTCString()…

要做一个关于DDD的内部技术分享,记录下用到的资源,学习笔记(未完)

最后更新于2023年3月10日 14:28:08 问题建模》软件分层》具体结构,是层层递进的关系。有了问题建模,才能进行具体的软件分层的讨论,再有了分层,才能讨论在domain里面应该怎么实现具体结构。 1、问题建模:Domain、Mod…

手写模拟SpringMvc源码

MVC框架MVC是一种设计模式(设计模式就是日常开发中编写代码的一种好的方法和经验的总结)。模型(model)-视图(view)-控制器(controller),三层架构的设计模式。用于实现前端…

无公网IP快解析实现移动app访问内网应用

随着移动化的发展,国内各大管理软件厂商纷纷推出相应的移动应用服务。移动端为企业提供了不一样的办公方式,从碎片化应用到一体化,同时也为企业办公提供了更高效便捷办公体验。 移动办公和传统pc端在服务器端部署时并没有太大的区别。企业为了…

数据、数据资源及数据资产管理的区别

整理不易,转发请注明出处,请勿直接剽窃! 点赞、关注、不迷路! 摘要:数据、数据资源、数据资产 数据、数据资源及数据资产的区别 举例 CRM系统建设完成后会有很多数据,这些数据就是原始数据,业务…

线程(操作系统408)

基本概念 我们说引入进程的目的是更好的使用多道程序并发执行,提高资源的利用率和系统吞吐量;而引入线程的目的则是减小程序在并发执行的时候所付出的时间开销,提高操作系统的并发性能。 线程可以理解成"轻量级进程",…

Request和Response的概述

⭐作者介绍:大二本科网络工程专业在读,持续学习Java,输出优质文章⭐作者主页:︶ㄣ释然⭐如果觉得文章写的不错,欢迎点个关注😉有写的不好的地方也欢迎指正,一同进步😁Request和Respo…

Flink之Source

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源,而读取数据的算子就是源算子(Source)。所以,Source 就是我们整个处理程序的输入端。Flink 代码中通用的添加 Source…

【零基础入门学习Python---Python的五大数据类型之数字类型】

一.Python的五大数据类型之数字类型 在Python中,变量用于存储数据。变量名可以是任何字母、数字和下划线的组合。Python支持多种数据类型,包括数字、字符串、列表、元组和字典。这篇文章我们就来学习一下五大数据类型中的数字类型。 1.1 数字类型 Python 中的数字类型主要…

【C语言蓝桥杯每日一题】—— 单词分析

【C语言蓝桥杯每日一题】—— 单词分析😎前言🙌单词分析🙌总结撒花💞😎博客昵称:博客小梦 😊最喜欢的座右铭:全神贯注的上吧!!! 😊作者…

JSP电动车充电运营管理系统用myeclipse定制开发mysql数据库mvc模式java编程servlet

一、源码特点 JSP 电动车充电运营管理系统 是一套完善的系统源码,对理解JSP java serlvet MVC编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。 研究的基本内容是基于Web的电动车充电运营管理系统&#xf…

论文笔记 | 标准误聚类问题

关于标准误的选择,如是否选择稳健性标准误、是否采取聚类标准误。之前一直是困惑的,惯用的做法是类似主题的文献做法。所以这一次,借计量经济学课程之故,较深入学习了标准误的选择问题。 在开始之前推荐一个知乎博主。他阅读了很…

ssl/tsl 加密原理

ssl/tsl 加密原理 对称加密 对称加密:即加密和解密用的都是同一个秘钥,主要优势就是速度比非对称加密快 非对称加密 非对称加密: 即加密和解密用的是不同的秘钥,例如:在服务端存在一对公钥和私钥,服务…

JWT令牌解析及刷新令牌(十一)

写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!如果我的博客对你有帮助,欢迎进行评论✏️✏️、点赞👍👍、收藏⭐️⭐️&#…

mmdetection3d-之(三)--FCOS3d训练waymo数据集

本内容分为两部分 1. waymo数据集转KITTI格式2. FCOS3D训练KITTI格式的waymo数据集1 waymo数据集转kitti格式 1.1 waymo数据集简介 1.1.1 waymo数据集下载 waymo数据集v1.2.0可以从这里下载。其中,train(32个压缩包),test&…

零入门kubernetes网络实战-22->基于tun设备实现在用户空间可以ping通外部节点(golang版本)

《零入门kubernetes网络实战》视频专栏地址 https://www.ixigua.com/7193641905282875942 本篇文章视频地址(稍后上传) 本篇文章主要是想做一个测试: 实现的目的是 希望在宿主机-1上,在用户空间里使用ping命令发起ping请求,产生的icmp类型的…

从FVM上线前的测试网统计报告中看前景,Filecoin将会迎来什么变化?

FEVM将在2023/03/14主网上线!在Calibration网络升级正式完成后,Filecoin V18 Hygge升级将于2023年3月14日(π日)正式上线!此次升级将正式为Filecoin网络带来智能合约。基于FVM的可编程性。此次更新升级将释放数据经济的…