Spark编程-键值对RDD(K,V)创建及常用操作

news2024/11/24 1:05:49

简述

        SparkRDD中可以包含任何类型的对象,在实际应用中,“键值对”是一种比较常见的RDD元素类型,分组和聚合操作中经常会用到,尤其是groupByKey和reduceByKey。
        Spark操作中经常会用到“键值对RDD”(Pair RDD),用于完成聚合计算。普通RDD里面存储的数据类型是Int、String等,而“键值对RDD”里面存储的数据类型是“键值对”。

生产环境用到的操作

        以下为我在生产环境用到的操作

WordCount:

        统计文本中每个单词出现的次数,使用Pair RDD将每个单词作为键,将出现次数作为值,然后进行reduceByKey操作进行聚合。

分组聚合:

        将具有相同键的元素分组在一起,并对每个键的值进行聚合操作,如groupByKey、reduceByKey等。

数据连接和关联:

        使用键值对进行数据的连接和关联操作,如join、cogroup等。

数据预处理:

        对数据进行分组、排序、过滤等预处理操作,如groupBy、sortByKey、filter等。

数据分析和统计:

        使用Pair RDD进行数据分析和统计操作,如计算平均值、求和、最大值、最小值等。 通过Pair RDD,可以更方便地处理键值对数据,实现更灵活和复杂的数据处理和分析需求。

Pair RDD的创建方式

第一种:从文件中加载数据创建pairRDD

//测试数据,自己编的,文件名为personID
591,2021,15448329898,北京,彩信
592,2022,15648029823,河北,微信
593,2022,16742329894,山西,电话
594,2020,17748529893,海南,微信
595,2020,19048729896,大连,QQ

代码及运行结果

scala> val lines = sc.textFile("file:///data/testdata/personID.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///data/testdata/personID.txt MapPartitionsR                                    DD[1] at textFile at <console>:23

scala> val pairRDD = lines.flatMap(elem => (elem + 1))
pairRDD: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[2] at flatMap at <console>:23

scala> val pairRDD = lines.flatMap(line => line.split(",")).map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:                                    23

scala> pairRDD.foreach(println)
(591,1)0:>                                                          (0 + 1) / 1]
(2023,1)
(15448329898,1)
(北京,1)
(彩信,1)
(592,1)
......

从代码执行结果来看:

 返回的结果是键值对类型的RDD,即RDD[(String, Int)]。从pairRDD.foreach(println)执行的打印输出结果也可以看到,都是由(单词,1)这种形式的键值对。

第二种:通过数组Array或集合List创建pairRDD

案例:

//使用array数组
scala> val array = Array("spark", "hadoop", "flink", "hive")
array: Array[String] = Array(spark, hadoop, flink, hive)
scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val pairRDD = rdd.map(word =>(word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:23
scala> pairRDD.foreach(println)
(spark,1)
(hadoop,1)
(flink,1)
(hive,1)

//使用list集合
scala> val list = List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:23              ^

scala> pairRDD.foreach(println)
(hadoop,1)
(spark,1)
(hive,1)

常用键值对转换操作

常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等

reduceByKey(func)

功能:使用func函数合并具有相同键的值。注意,这里强调合并相同键。

比如,reduceByKey((a,b) => a+b),有五个键值对(nlp,1)
        (nlp,1)
        (spark,1)
        (nlp,1)
        (hadoop,1)
        (hadoop,1)

对具有相同key的键值对进行合并后的结果就是:

        (spark,1)
        (hadoop,2)
        (nlp,3)
我们对上面第二种方式创建List集合得到的pairRDD进行reduceByKey()操作,代码如下:

scala> val list = List("nlp","nlp","spark","nlp","hadoop","hadoop")
list: List[String] = List(nlp, nlp, spark, nlp, hadoop, hadoop)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24

scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:23

scala> pairRDD.foreach(println)
(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)

scala> pairRDD.reduceByKey((a,b) => a + b).foreach(println)
(spark,1)
(hadoop,2)
(nlp,3)

groupByKey()

功能:对具有相同键的值进行分组。注意,这里强调对相同的键分成一组。

比如,groupByKey((a,b) => a+b),有五个键值对(nlp,1)
        (nlp,1)
        (spark,1)
        (nlp,1)
        (hadoop,1)
        (hadoop,1)

我们对上面第二种方式创建得到的pairRDD进行groupByKey()操作,代码如下:

scala> pairRDD.groupByKey()
res17: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[13] at groupByKey at <console>:24
// 分组后,value被保存到Iterable[Int]中

scala> pairRDD.groupByKey().foreach(println)
(spark,CompactBuffer(1))
(hadoop,CompactBuffer(1, 1))
(nlp,CompactBuffer(1, 1, 1))

keys

功能:会把键值对RDD中的key返回形成一个新的RDD。

scala> pairRDD.keys
res20: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at keys at <console>:24

scala> pairRDD.keys.foreach(println)
nlp
nlp
spark
nlp
hadoop
hadoop

可以对返回的key的集合进行操作,比如说写入一个List集合中

scala> val prirRDDkeysList = pairRDD.keys.collect().toList
prirRDDkeysList: List[String] = List(nlp, nlp, spark, nlp, hadoop, hadoop)

scala> val prirRDDkeysArray = pairRDD.keys.collect()
prirRDDkeysArray: Array[String] = Array(nlp, nlp, spark, nlp, hadoop, hadoop)

values

功能: 把键值对RDD中的value返回形成一个新的RDD。

scala> pairRDD.foreach(println)
(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)

scala> pairRDD.values.foreach(println)
1
1
1
1
1
1

        将得到的值保存到数组或集合中

scala> val prirRDDValuesList = pairRDD.values.collect().toList
prirRDDValuesList: List[Int] = List(1, 1, 1, 1, 1, 1)

scala> val prirRDDValueArray = pairRDD.values.collect()
prirRDDValueArray: Array[Int] = Array(1, 1, 1, 1, 1, 1)

注意

        为什么会报错value collect is not a member of Unit ,因为foreach方法返回的是Unit类型,它没有collect方法。

scala> val prirRDDValuesList = pairRDD.values.foreach(println).collect().toList 
:26: error: value collect is not a member of Unit 
val prirRDDValuesList = pairRDD.values.foreach(println).collect().toList

工作中使用collect()导致的内存不足调优:

        当处理大数据集时,可以考虑使用Spark的分布式计算能力来处理数据,而不是将所有数据收集到驱动程序中。这样可以避免内存不足的问题。

        我使用collect方法将这个RDD中的元素收集到驱动程序,并返回一个数组。如果pairRDD中的数据量很大,collect操作可能会导致内存不足的问题,建议在处理大数据集时,谨慎使用collect方法。我们可以用很多方法来避免:

  1. 使用RDD转换操作:可以使用各种RDD转换操作,如mapfilterreduceByKey等,对数据集进行转换和聚合操作。这些操作在分布式环境下进行,可以利用集群中的多个节点进行计算。
  2. 使用RDD的collecttake方法:如果只需要获取部分数据,可以使用collect方法将数据收集到驱动程序中,确保数据量不会导致内存不足,可以使用take方法获取RDD中的前几个元素。
  3. 使用RDD的sample方法:可以使用sample方法对数据进行采样,从而获取数据集的一个子集。这样可以在处理大数据集时降低计算和内存的压力。
  4. 使用Spark SQL或DataFrame:如果数据集结构化且存储在支持Spark SQL的数据源中,可以使用Spark SQL或DataFrame API进行数据操作和分析。这些API提供了更高级的数据操作和查询功能。
  5. 使用持久化存储:如果需要将处理结果保存下来或供其他程序使用,可以将结果存储在持久化存储系统中,如HDFS或数据库。这样可以避免将所有数据收集到驱动程序中。 利用集群中的计算资源进行并行计算,避免将所有数据收集到驱动程序中,可以使用RDD转换操作、采样、分页获取等技术来处理数据。

sortByKey()

功能:是返回一个根据键排序的RDD。

scala> pairRDD.sortByKey().foreach(println)
(hadoop,1)
(hadoop,1)
(nlp,1)
(nlp,1)
(nlp,1)
(spark,1)

mapValues(func)  (常用)

功能:对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。

        即我只对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。例如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的pairRDD,如果执行pairRDD.mapValues(x => x+1),就会得到一个新的键值对RDD,它包含下面四个键值对("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。

scala> pairRDD.mapValues(a => a*2).foreach(println)
(nlp,2)
(nlp,2)
(spark,2)
(nlp,2)
(hadoop,2)
(hadoop,2)

join  (常用)

功能:对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

        join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
        对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

        比如,pairRDD1是一个键值对集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一个键值对集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{("spark",1,"fast"),("spark",2,"fast")}。

案例代码:

scala> val paRDD1 = sc.parallelize(Array(("spark",2),("hadoop",3),("spark",1),("hive",4),("hadoop",2)))
paRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[28] at parallelize at <console>:26

scala> val paRDD2 = sc.parallelize(Array(("spark","nicetry"),("hadoop","good"),("spark",234),("hive",2314),("hadoop","ohho")))
paRDD2: org.apache.spark.rdd.RDD[(String, Any)] = ParallelCollectionRDD[29] at parallelize at <console>:26

scala> paRDD1.join(paRDD2).foreach(println)
(spark,(2,nicetry))
(spark,(2,234))
(spark,(1,nicetry))
(spark,(1,234))
(hive,(4,2314))
(hadoop,(3,good))
(hadoop,(3,ohho))
(hadoop,(2,good))
(hadoop,(2,ohho))

    eg:现在来看林子雨教授讲解的是真清晰,温故而知新。

一个完整实例-计算每种图书的每天平均销量

思路

计算一天中各种类图书卖出去的平均值,键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量

步骤

1、构建数组,包含对应键值对,调用parallelize方法生成 RDD

2、针对构建得到的rdd,我们调用mapValues()函数,把rdd中的每个每个键值对(key,value)的value部分进行修改,把value转换成键值对(value,1),其中,数值1表示这个key在rdd中出现了1次,为什么要记录出现次数呢?因为,我们最终要计算每个key对应的平均值,所以,必须记住这个key出现了几次,最后用value的总和除以key的出现次数,就是这个key对应的平均值。

(注:collect()是一个行动操作,功能是以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数。)

3、调用reduceByKey()函数,此处必须要十分准确地理解reduceByKey()函数的功能 => 合并具有相同键的值。

        reduceByKey(func)的功能是使用func函数合并具有相同键的值。这里的func函数就是Lamda表达式(x,y) => (x._1+y._1,x._2 + y._2),这个表达式中,x和y都是value,而且是具有相同key的两个键值对所对应的value。

4、 计算最终结果。对得到的几个键值对构成的RDD执行mapValues()操作,得到每种书的每天平均销量。mapValues,key不变,只对值记性操作。value会被赋值给Lamda表达式x => (x._1 / x._2中的x,x的值就是(22,2),x._1就是22,表示hadoop书总销量是22,x._2就是2,表示2天,因此,hadoop书籍的每天平均销量就是x._1 / x._2,也就是11。mapValues()输出的一个键值对就是("hadoop",11),其他同理。

代码

//构建书籍及销量
scala> val books = sc.parallelize(Array(("book1",5),("book2",10),("book3",8),("book1",6),("book2",12)))
books: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:23
// 统计
scala> val sum_books = books.mapValues(x => (x,1)).foreach(println)
(book1,(5,1))
(book2,(10,1))
(book3,(8,1))
(book1,(6,1))
(book2,(12,1))
sum_books: Unit = ()
//计算出现次数,value中,前面是总数,后面是天数,如(11,2),表示2天卖出11本
scala> val average_books = books.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1 , x._2 + y._2)).foreach(println)
(book1,(11,2))
(book3,(8,1))
(book2,(22,2))
average_books: Unit = ()

//平均值统计
scala> val average_books = books.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1 , x._2 + y._2)).mapValues(x => x._1 / x._2).foreach(println)
(book1,5)
(book3,8)
(book2,11)
average_books: Unit = ()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/752677.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

swiper滚动块宽度踩坑记录

背景&#xff1a;需要给swiper增加图片懒加载优化性能&#xff0c;这里使用的是swiper自带的 lazy api。但是加了懒加载后发现滚动块的宽度变长了&#xff0c;这里的原因是我只给滚动条设置了宽度的样式但是没有给滚动块设置宽度的样式。于是我按照官方文档的做法给滚动块设置宽…

STM32单片机语音识别台灯控制系统人检测亮度调节

实践制作DIY- GC00156-语音识别台灯控制系统 一、功能说明&#xff1a; 基于STM32单片机设计-语音识别台灯控制系统 二、功能说明&#xff1a; 电路&#xff1a;STM32F103C系列最小系统串口语音识别模块LED灯板1个红外传感器 1.任何时候没有人则关闭灯。有人可以自动打开灯。…

LIS检验信息系统

LIS检验信息系统是以病人为中心、以业务处理为基础、以提高检验科室管理水平和工作效率为目标&#xff0c;将医学检验、科室管理和财务统计等工作进行整合&#xff0c;全面改善检验科室的工作现状。 LIS把检验、检疫、放免、细菌微生物及科研使用的各类分析仪器&#xff0c;通…

pandas的DataFrame转存MATLAB的mat格式

有的时候需要把 pandas 处理好的 DataFrame 进一步交给MATLAB来处理。当然可以保存成 excel 文件&#xff0c;不过当数据量比较大的时候&#xff0c;读取比较慢&#xff0c;这个时候转存成 MATLAB 可读的 mat 文件更合适&#xff08;MATLAB 能快速读取&#xff09;。 标准的操…

接口自动化测试的最佳工程实践 (ApiTestEngine)

目录 前言&#xff1a; 背景 核心特性 特性拆解介绍 写在后面 前言&#xff1a; 接口自动化测试是现代软件开发中不可或缺的一环。一个良好的测试框架和最佳工程实践可以提高测试效率和质量。 背景 当前市面上存在的接口测试工具已经非常多&#xff0c;常见的如Postman…

建筑结构健康监测常见问题及解决措施

建筑结构健康监测(SHM)是指利用无损传感技术&#xff0c;通过对结构特性进行分析&#xff0c;达到检测结构损伤或退化的目的&#xff0c;是当前守护建筑安全的一种新型技术手段&#xff0c;通过建筑结构健康监测管理者可以直观的了解到建筑物的健康状态&#xff0c;为建筑维护和…

【力扣算法13】之 12. 整数转罗马数字 python

文章目录 问题描述示例1示例2 示例 3:示例 4:示例 5:提示 思路分析代码分析完整代码详细分析运行效果截图调用示例运行结果 完结 问题描述 罗马数字包含以下七种字符&#xff1a; I&#xff0c; V&#xff0c; X&#xff0c; L&#xff0c;C&#xff0c;D 和 M。 字符数值I1V5X…

AP5193 DC-DC宽电压LED降压恒流驱动器 LED电源驱动IC

产品 AP5193是一款PWM工作模式、外围简单、内置功率MOS管&#xff0c;适用于4.5-100V输入的高精度降压LED恒流驱动芯片。电流2.5A。AP5193可实现线性调光和PWM调光&#xff0c;线性调光脚有效电压范围0.55-2.6V.AP5193 工作频率可以通过RT 外部电阻编程来设定&#xff0c;同时…

centos环境搭建nsq集群

简言 1. nsq是go语言开发的实时的分布式消息处理平台&#xff0c;目的是用来大规模地处理每天数以十亿计级别的消息。它具有分布式和去中心化拓扑结构&#xff0c;该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征 2. nsq官网地址 NSQ Docs 1.2.1 …

【详解】C语言冷门知识点之--位段

文章目录 一&#xff0c; 位段的解释二&#xff0c; 位段的声明和使用位段的声明&#xff1a;位段的使用&#xff1a; 三&#xff0c;位段的空间大小计算第一个例子&#xff1a;第二个例子&#xff1a;注意&#xff1a; 四&#xff0c; 位段的内存分配五&#xff0c;位段的跨平…

@Autowired 到底是怎么把变量注入进来的?

文章目录 1. Bean 的创建2. populateBean3. postProcessProperties3.1 findAutowiringMetadata3.2 inject3.3 doResolveDependency 4. 时序图 在 Spring 容器中&#xff0c;当我们想给某一个属性注入值的时候&#xff0c;有多种不同的方式&#xff0c;例如可以通过构造器注入、…

C++ string类型的基本使用方法

目录 1.定义和初始化string对象 2.string对象上的常用操作 在C中string是用来处理可变长字符串的&#xff0c;是C标准库中提供的类型&#xff0c;使用起来十分方便。同时C也支持C语言的字符数组来表示字符串。使用时记得包含string头文件。 1.定义和初始化string对象&#xf…

讨论下相亲交友小程序介绍红娘系统搭建的功能有哪些

首页内容 同城会员&#xff0c;VIP会员&#xff0c;线下会员&#xff0c;热文推荐&#xff0c;恋爱话术&#xff0c;爱情故事&#xff0c;恋爱宝典&#xff0c;相亲宝典&#xff0c;浪漫约会&#xff0c;相亲活动&#xff0c;地区、年龄筛选&#xff0c;用户动态&#xff0c;用…

(简单)剑指Offer || 056. 二叉搜索树中两个节点的和 Java

方法一&#xff1a;深度优先搜索哈希表 使用深度优先搜索的方式遍历整棵树&#xff0c;用哈希表记录遍历过的节点的值 对于一个值为x的节点&#xff0c;检查哈希表中是否存在k-x即可。如果存在对应的元素&#xff0c;那么我们就可以在该树上找到两个节点的和为k&#xff1b;否…

Idea 修改默认 Maven 为自己的

每次我们打开新项目时,都要去配置一遍 maven,很麻烦,其实可以去修改 idea 里面默认的 maven 配置,这样后面不管是打开新项目还是老项目,就都是用的自己的 maven 了. 1.文件->新项目设置->新项目的设置 File->Other Settings -> Settings for New Project 2.然后和…

【Unity开发必备】100多个 Unity 学习网址 资源 收藏整理大全【持续更新】

Unity 相关网站整理大全 众所周知&#xff0c;工欲善其事必先利其器&#xff0c;有一个好的工具可以让我们事半功倍&#xff0c;有一个好用的网站更是如此&#xff01; 但是好用的网站真的太多了&#xff0c;收藏夹都满满的(但是几乎没打开用过&#x1f601;)。 所以本文是对…

eclipse : sun.misc.BASE64Encoder找不到jar包的解决方法

sun.misc.BASE64Encoder找不到jar包 比较好的解决办法 按顺序依次操作&#xff1a; Windows -> Preferences -> Java -> Compiler -> Errors/Warnings。再按照顺序依次&#xff1a; Deprecated and trstricted API -> Forbidden reference (access rules): -&g…

量子力学的实验验证:双缝实验和贝尔不等式

亲爱的读者&#xff0c; 欢迎回到我们的量子力学系列文章。在前几篇文章中&#xff0c;我们介绍了量子力学的起源、基本概念&#xff0c;以及叠加态、超级定位和量子纠缠等奇特现象。今天&#xff0c;我们将深入探讨量子力学的实验验证&#xff0c;重点介绍双缝实验和贝尔不等…

Android自定义圆环进度条/刻度仪表盘(单环单点带进度动画)

效果图: 1.自定义SleepDashBoardView /*** 睡眠刻度仪表盘*/ public class SleepDashBoardView extends View {private static final float START_ANGLE 135f;private static final float MAX_ANGLE 270f;private float progress 0;private float centerX;private float ce…

ValueError: check_hostname requires server_hostname怎么解决?

背景: 想使用pip安装某一个包。结果报错如上图绿框所示。 解决方法&#xff1a; 把代理&#xff08;梯子&#xff09;关掉就行了。