【Spark常用算子合集】一文搞定spark中的常用转换与行动算子

news2025/1/16 1:32:51

🚀 作者 :“大数据小禅”

🚀文章简介:本篇文章属于Spark系列文章,专栏将会记录从spark基础到进阶的内容
🚀 内容涉及到Spark的入门集群搭建,核心组件,RDD,算子的使用,底层原理,SparkCore,SparkSQL,SparkStreaming等,Spark专栏地址.欢迎小伙伴们订阅💪
在这里插入图片描述

常用算子合集

      • Spark中的算子概述
      • 转换算子与行动算子的区别于联系
      • 常见的转换算子汇总
        • map算子
        • flatMap算子
        • filter算子
        • mapPartitions算子
        • reduceByKey算子
        • groupByKey算子
        • sample算子
        • sortBy 算子
        • distinct 算子
        • union算子
        • foldByKey算子
        • subtract算子
        • join算子
      • 常见的行动算子汇总
        • reduce 算子
        • collcet算子
        • count算子
        • take算子
        • foreach算子

Spark中的算子概述

RDD 中的算子从功能上分为两大类

  • 1.Transformation(转换算子) 它会在一个已经存在的 RDD 上创建一个新的 RDD,这也使得RDD之间存在了血缘关系与联系
  • 2.Action(动作算子) 执行各个分区的计算任务, 结果返回到 Driver 中

特点

  • 1.Spark 中所有的 Transformations 是 惰性 的, 不会立即执行获得结果. 只会记录在数据集上要应用的操作.当需要返回结果给 Driver 时, 才会执行这些操作, 这个特性叫做 惰性求值
  • 2.每一个 Action 运行的时候, 所关联的所有 Transformation RDD 都会重新计算,

转换算子与行动算子的区别于联系

转换算子是spark中的一种操作,用于从一个RDD转换成另一个RDD,它可以被用来创建新的RDD,也可以被用来转换已有的RDD。它们提供了一种通用的方法来完成RDD的转换,如map、filter、groupByKey等。

行动算子是spark中的另一种操作,它们用于从一个RDD中收集数据,或者从一个RDD中计算结果,如collect、reduce、count等。行动算子可以基于RDD的转换算子的结果来进行计算,也可以基于一组RDD来进行计算。

总之,转换算子和行动算子之间有着紧密的联系,转换算子用于创建RDD,行动算子用于从RDD中收集数据和计算结果。

常见的转换算子汇总

map算子

  • Map 将RDD的数据进行以一对一的关系转换成其他形式 输入分区与输出分区一对一
  • collect: 收集一个弹性分布式数据集的所有元素到一个数组中,便于观察 适用于小型数据
  • take : 取出对应数据的显示条数
  • foreach(println(_)) : 遍历查看数据
  • 结果: 1,4,9,16 (yo,1) (pai,1) (xc,1)
def mapTest(): Unit ={

    val value = sc.parallelize(List(1, 2, 3, 4)).map(
      value=>value*2
    ).collect().foreach(println(_))
    val works = sc.parallelize(List("yo", "pai", "xc")).map(
      work => (work, 1)
    ).collect().take(2).foreach(println(_))
  }

flatMap算子

  • flatMap算子的作用是将一行数据拆分成多个元素,并将所有元素放在一个新的集合中,返回一个新的RDD。
  • 它与map算子的区别在于,map算子只是将一行数据拆分成一个元素,并将其放在新的集合中,
  • 而flatMap算子可以将一行数据拆分成多个元素,并将所有元素放在一个新的集合中。
  • 结果:y o p a i x c‘’
@Test
  def flatmapTest(): Unit ={
    val works = sc.parallelize(List("yo", "pai", "xc")).flatMap(
      work=>(work)
    ).collect().foreach(println(_))
  }

filter算子

  • spark中的filter算子用于对RDD中的每个元素应用一个函数,根据函数的返回值是true还是false来决定是否将该元素放入新的RDD中。
  • 也就是说,filter算子可以根据自定义函数中的逻辑,从源RDD中过滤出一个新的RDD。
  • 结果:pai xc
@Test
  def filterTest(): Unit ={
    val works = sc.parallelize(List("yo", "pai", "xc")).filter(
      //删选出不包含yo字段的
      work=>(!work.contains("yo"))
    ).collect().foreach(println(_))
  }

mapPartitions算子

  • map算子是一对一的操作,会将一个RDD中的每一个元素都映射到另一个RDD中;
  • 而mapPartitions算子是一对多的操作,它会将一个RDD中的每一个分区都映射到另一个RDD中,每个分区中的元素会被一次性处理,减少了操作次数,提高了处理效率。
  • mapPartitions和map算子是一样的,只不过map是针对每一条数据进行转换,mapPartitions针对一整个分区近进行转换
  • 场景:
  • 1.如果说map后面有数据库的访问语句的话那如果说有几万条数据要查询就得进行几万次的连接建立这显然不符合逻辑
  • 2.而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库
  • map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
  • 结果:10,20,30,40,50
@Test
  def mapPartitionsTest(): Unit ={
    val works = sc.parallelize(List(1,2,3,4,5)).mapPartitions(
      //里面接收一个函数,函数里面接收一个集合,传递一个集合要求返回一个集合
      //函数里面要求接收一个集合,并且把集合里的Iterator[T]每一条数据转换之后再返回一个集合回去
      //Iterator[T] => Iterator[U]
     item=>{
       val ele=item.map(item=>item*10)
       ele
     }
    )
    println(works.collect().mkString(","))
  }

reduceByKey算子

  • reduceByKey((V, V) ⇒ V, numPartition)
  • reduceByKey算子是spark中用于对pairRDD中key相同的元素进行聚合的算子。
  • 它的作用是对pairRDD中的每个key的元素都进行reduce操作,将key对应的value值聚合到一起,从而实现对pairRDD的聚合操作。
  • 结果:(勇哥,198) (小明,97)
@Test
  def reduceByKeyTest(): Unit ={
    val works = sc.parallelize(Seq(("勇哥", 100), ("勇哥", 98), ("小明", 97))).reduceByKey(
      (a,b)=>a+b
    )
    println(works.collect().foreach(println(_)))
  }

groupByKey算子

  • groupByKey是Spark中的一个重要的转换操作,它的作用是对每个key对应的元素进行分组,然后将分组后的结果以key-value的形式返回,
  • 其中key是原来的key,value是一个迭代器,迭代器中存放的是key对应的所有元素。
  • groupByKey算子可用于对RDD中的元素进行分组,有时也可以用于聚合操作,但它的性能要比其他聚合函数低得多,因此一般情况下不推荐使用。
  • 结果:
  • (勇哥,CompactBuffer(100))
  • (小红,CompactBuffer(98))
  • (小明,CompactBuffer(97, 77))
//从本地集合创建RDD
    val rdd = sc.parallelize(Seq(("勇哥", 100), ("小红", 98), ("小明", 97), ("小明", 77))).groupByKey()
    println(rdd.collect().foreach(println(_)))
  }

sample算子

  • sample(withReplacement, fraction, seed)
  • sample算子是spark中用来从一个RDD中抽样的算子,它可以根据指定的比例或数量从RDD中抽取一部分样本出来,可以用来做数据探索、模型开发等。
@Test
  def sampleTest(){
    //从本地集合创建RDD
    val rdd = sc.parallelize(List(1,2,3,4,5,6,7)).sample(
      //随机抽3个数字
      withReplacement = true,2
    )
    println(rdd.collect().foreach(println(_)))
  }

sortBy 算子

  • sortBy 算子是将RDD中的元素按照指定的规则排序,其返回类型为排序后的RDD
  • 结果: (Bob,70) (John,80) (Tom,90)
@Test
  def sortByTest(){
    val rdd = sc.parallelize(Array(("Tom",90),("Bob",70),("John",80))).sortBy(_._2)
    println(rdd.collect().mkString(" "))
  }

distinct 算子

  • distinct 去除RDD中重复的元素。
  • 结果: 4 6 2 1 3 5
@Test
  def distinctTest(){
    val rdd = sc.parallelize(List(1,2,3,4,4,5,6,6))
    val distinctRDD = rdd.distinct()
    println(distinctRDD.collect().mkString(" "))
  }

union算子

  • union算子是spark中用于将多个RDD合并成一个RDD的算子,结果RDD中包含了所有输入RDD中的元素,且不去重。
  • subtract 可以从一个RDD中减去另一个RDD中的元素,以得到一个新的RDD。
  • 结果: 1 2 3 1 2 3
@Test
  def unionTest(){
    val rdd1 = sc.parallelize(List(1,2,3))
    val rdd2 = sc.parallelize(List(1,2,3))
    val rdd = rdd1.union(rdd2)
    println(rdd.collect().mkString(" "))
  }

foldByKey算子

  • foldByKey(zeroValue)((V, V) ⇒ V)
  • 算子是对RDD中的key-value类型的数据按key进行聚合操作,将每个key对应的value进行聚合,
  • 将聚合后的结果与zeroValue进行combine操作,返回一个新的RDD,
  • 新的RDD中的每个元素是一个key-value对,其中key是原RDD中的key,value是zeroValue与原RDD中key对应的value的聚合结果。
  • 结果: (勇哥,21) (小明,22)
@Test
  def foldByKeyTest(){
    //从本地集合创建RDD
    val rdd = sc.parallelize(Seq(("勇哥", 1), ("小明", 1), ("小明", 1))).foldByKey(zeroValue = 20)((a,b)=>(a+b))
    println(rdd.collect().foreach(println(_)))
  }

subtract算子

  • subtract算子是spark中的一种RDD操作,它可以接收两个RDD作为参数,并返回一个新的RDD
  • 新RDD中包含第一个RDD中存在,但是第二个RDD中不存在的元素。
  • 结果: 1 2
@Test
  def subtractTest(){
    val rdd1 = sc.parallelize(Seq(1,2,3,4,5))
    val rdd2 = sc.parallelize(Seq(3,4,5,6,7))

    val rdd3 = rdd1.subtract(rdd2)
    println(rdd3.collect().foreach(println(_)))
  }

join算子

  • join算子是spark中的一种内连接算子,它可以将两个数据集中的相同键的元组连接起来。它可以在RDD、DataFrame和Dataset之间使用,
  • 其中RDD和DataFrame可以使用join算子连接,而Dataset则可以使用joinWith算子连接。
  • 结果: (2,(xc,xc1)) (1,(yo,yo1)) (3,(yong,yong1))
@Test
  def joinTest(){
    val rdd1 = sc.makeRDD(Array((1, "yo"), (2, "xc"), (3, "yong")))
    val rdd2= sc.makeRDD(Array((1, "yo1"), (2, "xc1"), (3, "yong1")))
    val rdd3=rdd1.join(rdd2)
    println(rdd3.collect().mkString(" "))
  }

常见的行动算子汇总

reduce 算子

  • reduce 先聚合分区内数据,再聚合分区间数据
  • 结果:10
@Test
  def reduceTest(){
    //从本地集合创建RDD
    val rdd = sc.parallelize(List(1,2,3,4)).reduce(_+_)
    println(rdd)
  }

collcet算子

  • collcet 先将结果数据集以数组Array的方式返回
  • 结果:10 1 2 3 4
@Test
  def collcetTest(){
    //从本地集合创建RDD
    val rdd = sc.parallelize(List(1,2,3,4))
    println(rdd.collect().mkString(" "))
  }

count算子

  • count 返回RDD的元素个数
  • 结果: 4
 @Test
  def countTest(){
    //从本地集合创建RDD
    val rdd = sc.parallelize(List(1,2,3,4))
    println(rdd.count())
  }

take算子

  • take 返回RDD的前n个元素所组合而成的数组
  • 结果: 1 2
@Test
  def takeTest(){
    //从本地集合创建RDD
    val rdd: RDD[Int] = sc.parallelize(List(1,2,3,4))
    println(rdd.take(2).mkString(" "))
  }

foreach算子

  • foreach 遍历RDD中的元素
  • 结果: 1 2
  @Test
  def foreachTest(){
    //从本地集合创建RDD
    val rdd: RDD[Int] = sc.parallelize(List(1,2,3,4))
    println(rdd.take(2).foreach(println(_)))
  }

到这里spark的常用算子就总结完了,其实在Spark还有很多不同的算子本篇列举了一些日常开发中会比较常用的一些操作。对大数据感兴趣的小伙伴可看下方公众号一起交流!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/153202.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构与算法】——第六章:图

文章目录1、图的定义1.1 图的其他定义1.2 图的顶点与边之间的关系1.3 连通图相关术语2、图的存储结构2.1 邻接矩阵2.2 邻接表3、图的遍历3.1 深度优先遍历3.2 广度优先遍历4、最小生成树4.1 普利姆算法(Prim)4.2 克鲁斯卡尔(kruskal)5、最短路径5.1 迪杰斯特拉(Dijkstra)算法5.…

Sentinel限流-@SentinelResource注解配置

SentinelResource 配置-上 (按资源名配置限流规则) 1) Sentinel 控制台配置流控规则: 2)java 代码: GetMapping("/byResource")SentinelResource(value "byResource", blockHandler …

Django项目——通过APIView实现API访问

前提 该文章在已有项目的基础上进行修改 https://blog.csdn.net/qq_38122800/article/details/128583379?spm1001.2014.3001.5502 1、配置序列化器 序列化器包含序列化和反序列化两个过程,简单点理解就是 序列化 : 将从数据库中查的数据变为前端页面可以接受的json数据 反…

Odoo 16 企业版手册 - 库存管理之重订货规则

重订货规则 在Odoo 库存模块中,您可以配置一组规则,帮助您确保库存永远不会用完。Odoo将尝试使用重订货规则在您的库存中保持至少最低数量的产品。让我们看看此功能在Odoo 16中是如何工作的。为此,您可以从库存模块中选择一个可存储的产品。 …

qt学习记录

一、新建项目时只有pro文件而没有其他文件 此时需要在Kits界面将所有编译器选择,即可出现其他文件 二、QMainWindow、QWidget、QDialog的区别 ①QWidget继承于QObject和QPaintDevice,QDialog和QMainWindow则继承于QWidget,QDialog、QMainWi…

[ 数据结构 ] 弗洛伊德算法(Floyd)--------最短路径问题

0 Floyd算法介绍 和 Dijkstra 算法一样,弗洛伊德(Floyd)算法也是一种用于寻找给定的加权图中顶点间最短路径的算法。该算法名称以创始人之一、1978 年图灵奖获得者、斯坦福大学计算机科学系教授罗伯特弗洛伊德命名弗洛伊德算法(Floyd)计算图中各个顶点之间的最短路…

新应用——信息化财务管理,一站式满足多个需求

财务管理应用是企业为了适应当下社会环境提出的一种将财务管理进行信息化管理的方法,与传统财务管理模式不同,将各类业务数据编制为电子数据,便于财务人员查找数据内容,可以更高效的开展工作。百数应用中心的财务管理应用涵盖了项…

机器学习笔记之深度信念网络(二)模型构建思想(RBM叠加结构)

机器学习笔记之深度信念网络——模型构建思想引言回顾:深度信念网络的结构表示解析RBM隐变量的先验概率通过模型学习隐变量的先验概率引言 上一节介绍了深度信念网络的模型表示,本节将介绍深度信念网络的模型构建思想——受限玻尔兹曼机叠加结构的基本逻…

Flutter多分支打包持续化集成

一、使用效果演示 1.1、选择参数打包 以下为参数使用说明。 packingType枚举 android、ios android ios android&ios (新功能:并行打包)备注: android、ios:串行打包,即先打一个再打一个 android&ios:为并行…

与香港财政司司长同台,欧科云链在这场峰会上都说了啥?

今天,POWER 2023香港Web3创新者峰会在中国香港如期召开,香港特别行政区政府财政司司长陈茂波、财经事务及库务局副局长陈浩濂、全国政协委员、立法会议员吴杰庄等港府要员出席峰会。 作为本场峰会的受邀企业,欧科云链控股(01499.HK)公司执行董…

Kernel Pwn基础教程之 Double Fetch

一、前言 Double Fetch是一种条件竞争类型的漏洞,其主要形成的原因是由于用户态与内核态之间的数据在进行交互时存在时间差,我们在先前的学习中有了解到内核在从用户态中获取数据时会使用函数copy_from_user,而如果要拷贝的数据过于复杂的话…

人工智能-正则表达式

目录1、正则表达式概述2、re模块3、匹配单个字符4、匹配多个字符5、匹配开头和结尾6、匹配分组7、总结1、正则表达式概述 在实际开发过程中经常会需要查找某些复杂字符串的格式 正则表达式:记录文本规则的代码 正则表达式特点: 语法令人头疼&#xff…

立创eda专业版学习笔记(4)(隐藏铺铜)

这里的隐藏有两个意思,一个是铺铜过后把铺铜的填充区域隐藏,方便看图,另外一个是隐藏铺铜的轮廓,方便后续改进。 第一种隐藏,隐藏铺铜的填充区域(成片的图块),但是保留轮廓线 这是全…

联想LJ2655DN激光打印机清零方法

联想LJ2655DN激光打印机是市面上常见的打印机,为了节约成本,我们一般使用都是代用硒鼓来代替原装硒鼓,但是发现更换完硒鼓以后还是不能打印甚至有的机器能够打印但是打印速度会变慢或很慢,这个时候这就需要我们对打印机进行清零复位操作了,此款机器因用户更换的硒鼓类型不…

C++ 模板进阶

目录 1. 非类型模板参数 2. 模板的特化 2.1 概念 2.2 函数模板特化 2.3 类模板特化 2.3.1 全特化 2.3.2 偏特化 2.3.3 类模板特化应用示例 3. 模板总结 1. 非类型模板参数 我们在C语言中使用数组的时候可以定义静态数组,但是有个缺陷就是编译器在对越界检查…

Java自定义泛型类注意点

目录 自定义泛型类 如果定义了泛型类,实例化没有指明类的泛型,则认为此泛型类型为Object类型 由于子类在继承带泛型的父类时,指明了泛型类型。则实例化子类对象时,不需要指明类型 由于子类在继承带泛型的父类时,没有…

WebDAV之葫芦儿·派盘 + Photosync

PhotoSync 支持WebDAV方式连接葫芦儿派盘。 苹果手机通过无线传输,备份和共享照片/视频到计算机,其他手机,NAS和流行的云和照片服务的最佳解决办法,快来试下PhotoSync同步工具吧。 PhotoSync面向移动设备

安装部署wordpress(Ubuntu)

wordpress是一个目前流行的基于web的内容管理系统软件。它是基于PHP语言和MySQL数据库开发的,用户可以在支持 PHP 和 MySQL数据库的服务器上快速轻松的部署自己的网站(博客,外贸网站等等)。WordPress有非常多的第三方开发的免费模…

产线工控安全之现状分析及方案应对

产线安全现状 工业控制系统是支撑国民经济的重要设施,是工业领域的神经中枢。现在工业控制系统已经广泛应用于电力、通信、化工、交通、航天等工业领域,支撑起国计民生的关键基础设施。 随着传统的工业转型,数字化、网络化和智能化的工业控…

数学建模---数值微积分

目录 一.引言 二.数值微分 1.数值差分与差商 利用matlab观察差分与差商的区别: 例题: 二.数值积分 1.数值积分基本定理 2.常见的数值积分公式: 积分公式的精度: 3.数值积分的matlab实现 一.引言 在科学研究和工程计算中&…