Spark(2)-基础tranform算子(一)

news2025/1/20 17:01:09

一、算子列表

编号名称
1map算子
2flatMap算子
3filter算子
4mapPartitions算子
5mapPartitionsWithIndex算子
6keys算子
7values算子
8mapValues算子
9flatMaplValues算子
10union算子
11reducedByKey算子
12combineByKey算子
13groupByKey算子
14foldByKey算子
15aggregateByKey算子
16ShuffledRDD算子
17distinct算子
18partitionBy算子

 二、代码示例

package sparkCore


import org.apache.hadoop.mapreduce.task.reduce.Shuffle
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.{Aggregator, HashPartitioner, SparkConf, SparkContext, TaskContext}

/**
 * spark基本算子
 */


object basi_transform_02 {
  def main(args: Array[String]): Unit = {


    val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    sc.setLogLevel("WARN")

    //1. map算子
    val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7),2)
    val map_rdd: RDD[Int] = rdd1.map(_ * 2)
    println("*****1. map算子************")
    map_rdd.foreach(println(_))

    //2.flatMap算子
    println("*****2.flatMap算子************")
    val arr: Array[String] = Array(
      "Hive python spark",
      "Java Hello Word"
    )

    val rdd2: RDD[String] = sc.makeRDD(arr, 2)
    val flatMap_rdd: RDD[String] = rdd2.flatMap(_.split(" "))

    flatMap_rdd.foreach(println(_))

    //3.filter算子
    println("*****3.filter算子***********")
    val rdd3: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10))
    val filter_rdd :RDD[Int]= rdd3.filter(_ % 2 == 0)
    filter_rdd.foreach(println(_))

    //4. mapPartitions算子:将数据以分区的形式返回,进行map操作,一个分区对应一个迭代器
    // 应用场景: 比如在进行数据库操作时,在操作数据之前,需要通过JDBC方式连接数据库,如果使用map,那每条数据处理之前
    //         都需要连接一次数据库,效率显然很低.如果使用mapPartitions,则每个分区连接一次即可
    println("*****4. mapPartitions算子**********")
    val rdd4: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 4, 5, 4, 4, 3, 10),2)
    val mapParition_rdd: RDD[Int] = rdd4.mapPartitions(iter => {
      print("模拟数据库连接操作")
      iter.map(_ * 2)
    })

    mapParition_rdd.foreach(println(_))


    //5. mapPartitionsWithIndex算子,类似于mapPartitions,不过有两个参数
    //  第一个参数是分区索引,第二个是对应的迭代器
    // 注意:函数返回的是一个迭代器
    println("*****5. mapPartitionsWithIndex算子**********")
    val rdd5: RDD[Int] = sc.parallelize(List(10, 20, 30, 40, 60),2)
    val mapPartitionWithIndex_Rdd: RDD[String] = rdd5.mapPartitionsWithIndex((index, it) => {
      it.map(e => s"partition:$index,val:$e")
    })

    mapPartitionWithIndex_Rdd.foreach(println(_))

    //6.keys算子: RDD中的数据是【对偶元组】类型,返回【对偶元组】的全部key
    println("*****6.keys算子**********")
    val lst: List[(String, Int)] = List(
      ("spark", 1), ("spark", 3), ("hive", 2),
      ("Java", 1), ("Scala", 3), ("Python", 2)
    )

    val rdd6: RDD[(String, Int)] = sc.parallelize(lst)
    val keysRdd: RDD[String] = rdd6.keys
    keysRdd.foreach(println(_))

    //7.values: RDD中的数据是【对偶元组】类型,返回【对偶元组】的全部value
    println("*****7.values算子**********")
    val values_RDD: RDD[Int] = rdd6.values
    values_RDD.foreach(println(_))

    //8.mapValues: RDD中的数据为对偶元组类型, 将value进行计算,然后与原Key进行组合返回(即返回的仍然是元组)
    println("*****8.mapValues算子**********")
    val lst2: List[(String, Int)] = List(
      ("Hello", 1), ("world", 2),
      ("I", 2), ("love", 3), ("you", 2)
    )

    val rdd8: RDD[(String, Int)] = sc.parallelize(lst2, 2)
    val mapValues_rdd: RDD[(String, Int)] = rdd8.mapValues(_ * 10)
    mapValues_rdd.foreach(println(_))

    //9.flatMaplValues:RDD是对偶元组,将value应用传入flatMap打平后,再与key组合
    println("*****9.flatMaplValues算子**********")
    // ("spark","1 2 3") => ("spark",1),("spark",2),("spark",3)
    val lst3: List[(String,String )] = List(
      ("Hello", "1 2 3"), ("world", "4 5 6"),
    )

    val rdd9: RDD[(String, String)] = sc.parallelize(lst3)
    // 第一个_是指初始元组中的value;第二个_是指value拆分后的每一个值(转换成整数)
    val flatMapValues: RDD[(String, Int)] = rdd9.flatMapValues(_.split(" ").map(_.toInt))
    flatMapValues.foreach(println(_))

    //10.union:将两个类型一样的RDD合并到一起,返回一个新的RDD,新的RDD分区数量是两个RDD分区数量之和
    println("*****10.union算子**********")
    val union_rdd1 = sc.parallelize(List(1, 2, 3), 2)
    val union_rdd2 = sc.parallelize(List(4, 5, 6), 3)
    val union_rdd: RDD[Int] = union_rdd1.union(union_rdd2)
    union_rdd.foreach(println(_))

    //11.reducedByKey,在每个分区中进行局部分组聚合,然后将每个分区聚合的结果从上游拉到下游再进行全局分组聚合
    println("*****11.reducedByKey算子**********")
    val lst4: List[(String, Int)] = List(
      ("spark", 1), ("spark", 1), ("hive", 3),
      ("Python", 1), ("Java", 1), ("Scala", 3),
      ("flink", 1), ("Mysql", 1), ("hive", 3)
    )

    val rdd11: RDD[(String, Int)] = sc.parallelize(lst4, 2)
    val reduced_rdd: RDD[(String, Int)] = rdd11.reduceByKey(_ + _)
    reduced_rdd.foreach(println(_))

    //12.combineByKey:相比reducedByKey更底层的方法,后者分区内和分区之间相同Key对应的value值计算逻辑相同,但是前者可以分别定义不同的
    //   的计算逻辑.combineByKey 需要传入三个函数作为参数:
    // 其中第一个函数:key在上游分区第一次出现时,对应的value该如何处理
    // 第二个函数:分区内相同key对应value的处理逻辑
    // 第三个函数: 分区间相同Key对应value的处理逻辑
    println("*****12.combineByKey算子**********")
    val f1 = (v:Int) => {
      val stage = TaskContext.get().stageId()
      val partition = TaskContext.getPartitionId()
      println(s"f1 function invoked in stage: $stage,partiton:$partition")
      v
    }


    //分区内相同key对应的value使用乘积
    val f2 = (a:Int,b:Int) => {
      val stage = TaskContext.get().stageId()
      val partition = TaskContext.getPartitionId()
      println(s"f2 function invoked in stage: $stage,partiton:$partition")
      a * b
    }

    //分区间相同key对应的value使用加法
    val f3 = (m:Int,n:Int) => {
      val stage = TaskContext.get().stageId()
      val partition = TaskContext.getPartitionId()
      println(s"f3 function invoked in stage: $stage,partiton:$partition")
      m + n
    }

    val rdd12: RDD[(String, Int)] = sc.parallelize(lst4,2)
    val combineByKey_rdd: RDD[(String, Int)] = rdd12.combineByKey(f1, f2, f3)
    combineByKey_rdd.foreach(println(_))

    //13.groupByKey:按key进行分组,返回的是(key,iter(value集合)
    println("*****13.groupByKey算子**********")
    val rdd13: RDD[(String, Int)] = sc.parallelize(lst4, 3)
    val groupByKey_rdd: RDD[(String, Iterable[Int])] = rdd13.groupByKey()
    groupByKey_rdd.foreach(println(_))

    //14.foldByKey:每个分区应⽤⼀次初始值,先在每个进⾏局部聚合,然后再全局聚合(注意全局聚合的时候,初始值并不会被用到)
    // 局部聚合的逻辑与全局聚合的逻辑相同
    println("*****14.foldByKey算子**********")
    val lst5: List[(String, Int)] = List(
      ("maple", 1), ("kelly", 1), ("Avery", 1),
      ("maple", 1), ("kelly", 1), ("Avery", 1)
    )


    val rdd14: RDD[(String, Int)] = sc.parallelize(lst5)
    val foldByKey_rdd: RDD[(String, Int)] = rdd14.foldByKey(1)(_ + _)
    foldByKey_rdd.foreach(println(_))

    //15.aggregateByKey:foldByKey,并且可以指定初始值,每个分区应⽤⼀次初始值,传⼊两个函数,分别是局部聚合的计算逻辑
    // 和全局聚合的逻辑
    println("*****15.aggregateByKey算子**********")
    val rdd15: RDD[(String, Int)] = sc.parallelize(lst5)
    val aggregateByKey_rdd: RDD[(String, Int)] = rdd15.aggregateByKey(1)(_ + _,_ * _ )
    aggregateByKey_rdd.foreach(print(_))

    //16 ShuffledRDD:reduceByKey、combineByKey、aggregateByKey、foldByKey底层都是使⽤的ShuffledRDD,
    // 并且 mapSideCombine = true
    println("*****16.ShuffledRDD算子**********")
    val rdd16: RDD[(String, Int)] = sc.parallelize(lst5,2)

    val partitioner = new HashPartitioner(rdd16.partitions.length)
    // 对rdd16按照指定分区器进行分区
    // String是rdd16中Key的数据类型,第一个Int是rdd16中value的数据类型,第二个Int是中间结果的数据类型(当然前提是传入聚合器-里面包含计算逻辑
    // [可以据此知晓中间结果的数据类型])
    val shuffledRDD: ShuffledRDD[String, Int, Int] = new ShuffledRDD[String, Int, Int](rdd16,partitioner)
    // 设置一个聚合器: 指定rdd16的计算逻辑(包含三个函数,分别是分区内一个key对应value的处理逻辑;分区内相同key对应value计算逻辑
    // 和分区间相同Key对应value计算逻辑)
    val aggregator: Aggregator[String, Int, Int] = new Aggregator[String, Int, Int](f1, f2, f3)
    // 给shuffledRDD设置聚合器
    shuffledRDD.setAggregator(aggregator)
    shuffledRDD.setMapSideCombine(true) // 设置Map端聚合
    println(shuffledRDD.collect().toList)

    // 17.distinct算子:对RDD元素进行去重
    println("*****17.distinct算子**********")
    val lst6: Array[String] = Array(
      "spark", "spark", "hive",
      "Python", "Python", "Java"
    )

    val rdd17: RDD[String] = sc.parallelize(lst6)
    val distinct_rdd: RDD[String] = rdd17.distinct()
    println(distinct_rdd.collect().toList)

    // 18.partitionBy: 按照指定的分区器进行分区(底层使用的是ShuffleRDD)
    println("***** 18.partitionBy算子**********")
    val rdd18: RDD[(String,Int)] = sc.parallelize(lst5,2)
    val partitioner2 = new HashPartitioner(rdd18.partitions.length)
    val partitioned_rdd: RDD[(String, Int)] = rdd18.partitionBy(partitioner2)
    println(partitioned_rdd.collect().toList)
    sc.stop()
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1495971.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

内含资料下载丨黄东旭:2024 现代应用开发关键趋势——降低成本、简化架构

作为一名工程师和创业者,创办 PingCAP 是我进入创新世界的一次深潜。这段旅程既有令人振奋的发现,也充满令人生畏的不确定性。作为这次探险之旅见证的 TiDB ,现在已在全球服务超过 3000 家企业,其中有已经实现了商业成功的大公司&…

Mantle生态创新Meme项目Puff开启创世Mint,一文了解玩法、空投+Mint教程

大饼马上破前高,Ethereum 的再质押赛道吸引了大量资金,PEPE、DOGE等代币的 Memecoin 也一路飞涨,整个加密市场都充斥着金钱的味道!目前,贪婪与恐惧指数已经达到了90,属于极度贪婪区间。越是这样的狂欢时刻&…

数据处理分类、数据仓库产生原因

个人看书学习心得及日常复习思考记录,个人随笔。 数据处理分类 操作型数据处理(基础) 操作型数据处理主要完成数据的收集、整理、存储、查询和增删改操作等,主要由一般工作人员和基层管理人员完成。 联机事务处理系统&#xff…

解读Linux文件目录权限:srw-rw----

在Linux系统中,文件或目录的权限由10个字符表示,分为四段: 第一个字符:表示文件类型。d代表目录,-代表普通文件,l代表符号链接,s代表套接字(socket),c代表字符…

TypeScript学习笔记(上):TypeScript的介绍、安装及常用类型

我对TypeScript的理解就是,TypeScript是增加了类型校验的JavaScript,能够把运行期错误提升至编译期 TypeScript是什么? TypeScript(简称:TS)是 JavaScript 的超集(JS 有的 TS 都有&#xff09…

MySQL-视图:视图概述、使用视图注意点、视图是否影响基本表

视图 一、视图概述二、使用视图注意点三、视图操作是否影响基本表 一、视图概述 在数据库管理系统中,视图(View)是一种虚拟表,它并不实际存储数据,而是基于一个或多个实际表的查询结果。视图提供了一种对数据库中数据…

机器人编程学习有哪些好处?

机器人编程学习有许多好处,无论是对个人还是对社会都具有重要意义。以下是机器人编程学习的一些好处: 1. **培养计算思维:** 通过机器人编程学习,可以培养逻辑思维、问题解决能力和创新思维。编程过程中需要分析问题、设计算法、…

3月6日龙虎榜复盘:沪指缩量调整 机器人概念股午后大涨

上海亚商投顾前言:无惧大盘涨跌,解密龙虎榜资金,跟踪一线游资和机构资金动向,识别短期热点和强势个股。 一.市场情绪 沪指昨日缩量震荡,创业板指午后涨超1%,随后上演冲高回落走势。风电、光伏等新能源方向…

软考高级:数据流图概念和例题

作者:明明如月学长, CSDN 博客专家,大厂高级 Java 工程师,《性能优化方法论》作者、《解锁大厂思维:剖析《阿里巴巴Java开发手册》》、《再学经典:《Effective Java》独家解析》专栏作者。 热门文章推荐&am…

恢复IDEA误删除的git提交,提交被删除,尝试恢复提交

​​​​​​ dgqDESKTOP-JRQ5NMD MINGW64 /f/IdeaProjects/workspace/spzx-parent ((8bb112e...)) $ git reflog 8bb112e (HEAD, origin/master, master) HEAD{0}: checkout: moving from master to 8bb112e5ac18dfe4bbd64adfd06363e46b609f21 8bb112e (HEAD, origin/master, …

大华IPC网络摄像机如何保存视频

一、背景 通常网络相机(IPC)不会自带存储功能,需要接入录像机(NVR)进行保存。 其中NVR也分软件存储及硬件存储,这里不提,这边单独说FTP存储 二、配置前提 要配置FTP存储需要:①网络…

虚拟机环境搭建

搭建vm环境,配置虚拟机,期间遇到不支持,重启电脑后还是没用 此主机支持 AMD-V,但 AMD-V 处于禁用状态。 如果已在 BIOS/固件设置中禁用 AMD-V,或主机自更改此设置后从未重新启动,则 AMD-V 可能被禁用。 确…

SpringBoot3整合Mybatis-plus报错IllegalArgumentException

错误信息 使用的SpringBoot3版本&#xff1a;3.2.3 java.lang.IllegalArgumentException: Invalid value type for attribute factoryBeanObjectType: java.lang.String 第一想法就是感觉是版本太低导致和SpringBoot3不兼容。 查询mybatis-plus最高的版本 <!-- https://m…

微信公众号实现【抽奖功能】

前言 最近为了提高公众号的用户粘性&#xff0c;需要增加一个功能&#xff0c;那就是用户可以点击公众【每日礼包】的按钮&#xff0c;实现抽奖&#xff0c;有可能获得免费的会员天数&#xff01; 例如点进公众号发送消息栏目&#xff0c;有一个下面的按钮&#xff1a; 在菜单…

第十一篇 - 应用于市场营销视频场景中的人工智能和机器学习技术 – Video --- 我为什么要翻译介绍美国人工智能科技巨头IAB公司?

IAB平台&#xff0c;使命和功能 IAB成立于1996年&#xff0c;总部位于纽约市。 作为美国的人工智能科技巨头社会媒体和营销专业平台公司&#xff0c;互动广告局&#xff08;IAB- the Interactive Advertising Bureau&#xff09;自1996年成立以来&#xff0c;先后为700多家媒体…

Seurat 中的数据可视化方法

本文[1]将使用从 2,700 PBMC 教程计算的 Seurat 对象来演示 Seurat 中的可视化技术。您可以从 SeuratData[2] 下载此数据集。 SeuratData::InstallData("pbmc3k")library(Seurat)library(SeuratData)library(ggplot2)library(patchwork)pbmc3k.final <- LoadData(…

kibana 上dashbord 和discover 时间快 or 慢 8小时,处理方案

今天遇到了一个问题。在es库中的数据的时间是正确的。但是在kibana的discover展示页面上是错误的&#xff0c;错了8个小时。我这里是快了8个小时。这个问题非常难受&#xff0c;因为看起来&#xff0c;总是差8个小时&#xff0c;特别是查看日志的时候&#xff0c;总有一种错觉&…

OpenAI 大声朗读出来

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

垂直分表、分布式Id详细介绍、模板引擎FreeMarker、对象存储服务MinIO(黑马头条Day02)

目录 垂直分表 分布式ID 为什么需要分布式ID 分布式ID需要满足的条件 常见的分布式ID算法有哪些 项目中具体如何使用分布式ID 模板引擎FreeMarker freemarker简介 对象存储服务MinIO MinIO简介 MinIO的优点 本项目中使用的FreeMarker和MinIO示例 今天在学习黑马头…

[LeetCode][155]【学习日记】最小栈——记录每个时刻的最小值

题目 最小栈 请你设计一个最小栈。它提供push&#xff0c;pop&#xff0c;top操作&#xff0c;并能在常数时间内检索到最小元素的栈。 实现MinStack类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void pop() 删除堆栈顶部的元素。int top() 获取堆栈…