2023_Spark_实验十二:Spark高级算子使用

news2025/2/28 2:58:53
掌握Spark高级算子在代码中的使用
相同点分析
三个函数的共同点,都是Transformation算子。惰性的算子。
不同点分析
map函数是一条数据一条数据的处理,也就是,map的输入参数中要包含一条数据以及其他你需要传的参数。
mapPartitions函数是一个partition数据一起处理,也即是说,mapPartitions函数的输入是一个partition的所有数据构成的“迭代器”,然后函数里面可以一条一条的处理,在把所有结果,按迭代器输出。也可以结合yield使用效果更优。
rdd的mapPartitions是map的一个变种,它们都可进行分区的并行处理。
两者的主要区别是调用的粒度不一样:map的输入变换函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区。
mapPartitionsWithIndex函数,其实和mapPartitions函数区别不大,因为mapPartitions背后调的就是mapPartitionsWithIndex函数,只是一个参数被close了。mapPartitionsWithIndex的函数可以或得partition索引号;
假设一个rdd有10个元素,分成3个分区。如果使用map方法,map中的输入函数会被调用10次;而使用mapPartitions方法的话,其输入函数会只会被调用3次,每个分区调用1次。
mapPartitionsWithIndex则是带上分区下标进行操作。
1、mapPartitionWithIndex
 

import org.apache.spark.{SparkConf,SparkContext}
object mapPartitionWithIndex {

def getPartInfo:(Int,Iterator[Int]) => Iterator[String] = (index:Int,iter:Iterator[Int]) =>{
iter.map(x =>"[ PartId " + index +", elems: " + x + " ]")
}

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddMapPartitionsWithIndexDemo")
val sc = new SparkContext(conf)

val rdd1 = sc.parallelize(List(1,2,3,4,5,9,6,7,8),numSlices =3 )
val rdd2 = rdd1.mapPartitionsWithIndex(getPartInfo)
rdd2.collect().foreach(println)

}

}

2、aggregate
首先我们来创建一个 RDD

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24


scala> rdd1.collect
warning: there was one feature warning; re-run with -feature for details
res24: Array[Int] = Array(1, 2, 3, 4, 5)

这个 RDD 仅有 1 个分片,包含 5 个数据: 1, 2, 3, 4, 5 。
 
然后我们来应用一下 aggregate 方法。
 
在使用 aggregate 之前,我们还是先定义两个要给 aggregate 当作输入参数的函数吧。
scala> 
// Entering paste mode (ctrl-D to finish)
def pfun1(p1: Int, p2: Int): Int = {
p1 * p2
}

// Exiting paste mode, now interpreting.
pfun1: (p1: Int, p2: Int)Int

scala>

 
scala> 
// Entering paste mode (ctrl-D to finish)
def pfun2(p3: Int, p4: Int): Int = {
p3 + p4
}

// Exiting paste mode, now interpreting.
pfun2: (p3: Int, p4: Int)Int

scala>
接着是第 2 个函数。就不再解释什么了。
 
然后终于可以开始应用我们的 aggregate 方法了。
scala> rdd1.aggregate(3)(pfun1, pfun2)
res25: Int = 363

scala>
输出结果是 363 !这个结果是怎么算出来的呢?
 
首先我们的 zeroValue 即初值是 3 。然后通过上面小节的介绍,我们知道首先会应用 pfun1 函数,因为我们这个 RDD 只有 1 个分片,所以整个运算过程只会有一次 pfun1 函数调用。它的计算过程如下:
 
首先用初值 3 作为 pfun1 的参数 p1 ,然后再用 RDD 中的第 1 个值,即 1 作为 pfun1 的参数 p2 。由此我们可以得到第一个计算值为 3 * 1 = 3 。接着这个结果 3 被当成 p1 参数传入,RDD 中的第 2 个值即 2 被当成 p2 传入,由此得到第二个计算结果为 3 * 2 = 6 。以此类推,整个 pfun1 函数执行完成以后,得到的结果是  3 * 1 * 2 * 3 * 4 * 5 = 360 。这个 pfun1 的应用过程有点像是 “在 RDD 中滑动计算” 。
在 aggregate 方法的第 1 个参数函数 pfun1 执行完毕以后,我们得到了结果值 360 。于是,这个时候就要开始执行第 2 个参数函数 pfun2 了。
 
pfun2 的执行过程与 pfun1 是差不多的,同样会将 zeroValue 作为第一次运算的参数传入,在这里即是将 zeroValue 即 3 当成 p3 参数传入,然后是将 pfun1 的结果 360 当成 p4 参数传入,由此得到计算结果为 363 。因为 pfun1 仅有一个结果值,所以整个 aggregate 过程就计算完毕了,最终的结果值就是 363 

import org.apache.spark.{SparkConf, SparkContext}

object RddAggregateDemo {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("RddDemos")
val sc = new SparkContext(conf)
val rdd1 = sc.parallelize(List("12","23","345",""),numSlices = 2)
//下面代码等价与rdd1.aggregate("")(x+y) => math.min(x.length,y.length)),(x,y)=>x+y))
val result = rdd1.aggregate("")(func1,func2)
println(result)
}
//(x,y)=>math.min(x.Length,y.length)
def func1:(String,String) => String =(x:String,y:String) =>{
println("<x: " + x + ",x.len: " + x.length + ">,<y: " + y +",y.len: " + y.length + ">")
val ret =math.min(x.length,y.length).toString
println("func1 ret:" +ret)
ret
}
//(x+y) => x+y
def func2:(String,String)=>String=(x:String,y:String) =>{
println("========" +(x+y))
x+y
}
}

3、aggregateByKey
通过scala集合以并行化方式创建一个RDD
scala> val pairRdd = sc.parallelize(List((“cat”,2),(“cat”,5),(“mouse”,4),(“cat”,12),(“dog”,12),(“mouse”,2)),2)
pairRdd 这个RDD有两个区,一个区中存放的是:
(“cat”,2),(“cat”,5),(“mouse”,4)
另一个分区中存放的是:
(“cat”,12),(“dog”,12),(“mouse”,2)
然后,执行下面的语句
scala > pairRdd.aggregateByKey(100)(math.max(_ , _), _ + _ ).collect
结果:
res0: Array[(String,Int)] = Array((dog,100),(cat,200),(mouse,200)
下面是以上语句执行的原理详解:
aggregateByKey的意思是:按照key进行聚合
第一步:将每个分区内key相同数据放到一起
分区一
(“cat”,(2,5)),(“mouse”,4)
分区二
(“cat”,12),(“dog”,12),(“mouse”,2)
第二步:局部求最大值
对每个分区应用传入的第一个函数,math.max(_ , _),这个函数的功能是求每个分区中每个key的最大值
这个时候要特别注意,aggregateByKe(100)(math.max(_ , _),_+_)里面的那个100,其实是个初始值
在分区一中求最大值的时候,100会被加到每个key的值中,这个时候每个分区就会变成下面的样子
分区一
(“cat”,(2,5,100)),(“mouse”,(4,100))
然后求最大值后变成:
(“cat”,100), (“mouse”,100)
分区二
(“cat”,(12,100)),(“dog”,(12.100)),(“mouse”,(2,100))
求最大值后变成:
(“cat”,100),(“dog”,100),(“mouse”,100)
第三步:整体聚合
将上一步的结果进一步的合成,这个时候100不会再参与进来
最后结果就是:
(dog,100),(cat,200),(mouse,200)
对PairRDD中相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey'函数最终返回的类型还是PairRDD,对应的结果是Key和聚合后的值,而aggregate函数直接返回的是非RDD的结果。


import org.apache.spark.{SparkConf, SparkContext}

object RddAggregateByKeyDemo {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setMaster("local").setAppName("RddAggregateByKeyDemo")
val sc = new SparkContext(conf)
val pairRDD = sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),numSlices = 2)
val rdd = pairRDD.aggregateByKey(100)(func2,func2)
val resultArray = rdd.collect
resultArray.foreach(println)
sc.stop()

}
def func2(index:Int,iter:Iterator[(String,Int)]):Iterator[String] = {
iter.map(x=>"[partID:" + index + ",val: " +x +"]")
}

//(x,y)=>math.max(x+y)
def func1:(Int,Int) => Int =(x:Int,y:Int) =>{
println("<x: " + x + "," +",y:"+ y + ">")
val ret =math.max(x,y)
println("func1 max:" +ret)
ret
}
//(x+y) => x+y
def func2:(Int,Int)=>Int=(x:Int,y:Int) =>{
println("========func2 x :" + x + ",y:"+y)
println("========func2 ret =====" +(x+y))
x+y
}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1027038.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IOTE 2023国际物联网展直击:芯与物发布全新定位芯片,助力多领域智能化发展

IOTE 2023国际物联网展&#xff0c;作为全球物联网领域的盛会&#xff0c;于9月20日在中国深圳拉开帷幕。北斗星通集团应邀参展&#xff0c;旗下专业从事物联网、消费类GNSS芯片研发设计的芯与物公司也随其亮相本届盛会。 展会上&#xff0c;芯与物展示了一系列创新的GNSS定位…

基于Android+OpenCV+CNN+Keras的智能手语数字实时翻译——深度学习算法应用(含Python、ipynb工程源码)+数据集(五)

目录 前言总体设计系统整体结构图系统流程图 运行环境模块实现1. 数据预处理2. 数据增强3. 模型构建4. 模型训练及保存5. 模型评估6. 模型测试 系统测试1. 训练准确率2. 测试效果3. 模型应用1&#xff09;程序下载运行2&#xff09;应用使用说明3&#xff09;测试结果 相关其它…

【深度学习】快速部署ONNX模型【入门】

【深度学习】快速部署ONNX模型【入门】 提示:博主取舍了很多大佬的博文并亲测有效,分享笔记邀大家共同学习讨论 文章目录 【深度学习】快速部署ONNX模型【入门】前言搭建打包环境打包exe文件总结 前言 之前的内容已经尽可能简单、详细的介绍CPU【Pytorch2ONNX】和GPU【Pytorch…

编译opencv-3.4.5 [交叉编译]

在unbuntu20.04环境下编译opencv3.4.5&#xff0c; cmake 版本&#xff1a;3.27.4 gcc 版本&#xff1a;11.4.0 g版本&#xff1a;11.4.0 在此环境下编译opencv4.5.4正常。 1. 编译时遇到的问题 &#xff08;1&#xff09; Built target libprotobuf make: *** [Makefile:163…

玩玩“小藤”开发者套件 Atlas 200I DK A2 之部署智能语音助手

玩玩“小藤”开发者套件 Atlas 200I DK A2 之部署智能语音助手 0. 背景1. 安装 flac2. 创建自签名证书3. 创建虚拟环境4. 安装PyTorch5. 安装 PyTorch 插件 torch_npu6. 安装APEX混合精度模块7. 安装依赖库8. 使用 gradio 启动智能语音助手9. 访问智能语音助手 0. 背景 总所周…

和逸云 RK3229 如何进入maskrom强刷模式

图中红圈两个点短接以后插usb&#xff0c;就可以进入maskrom模式强刷

【JavaEE】多线程(四)

多线程&#xff08;四&#xff09; 在开始讲之前&#xff0c;我们先来回顾回顾前三篇所讲过的内容~ 线程的概念 并发编程&#xff0c;多进程&#xff0c;比较重&#xff0c;频繁创建销毁&#xff0c;开销大 Thread的使用 创建线程 继承Thread实现Runnable继承Thread&#xff…

提交本地项目到GitHub

文章目录 1 下载git1.1 通过homebrew安装Git1.2 通过Xcode安装 2 创建ssh key、配置git3 提交本地项目到GitHub 说明&#xff1a;该博文参考这篇文章和这段视频 1 下载git 1.1 通过homebrew安装Git 1、未安装homebrew&#xff0c;需安装homebrew /usr/bin/ruby -e "$(…

踩坑:Invalid character found in method name. HTTP method names must be tokens

一、原因 在进行本地小程序与服务端请求时,由于加了签名认证,访问接口时报错 Spring boot端 小程序端 二、解决方案 2.1 更改访问路径 将https:更换成http: 示例:https://localhost:8080 改为 http://localhost:8080 2.2其他原因 ssl证书到期了Tomcat的header缓冲区大小不…

使用docker-compose 部署 MySQL8.0

目录 一、拉取MySQL镜像二、创建挂载目录三、添加配置文件my.cnf &#xff08;没有特殊需求可以跳过&#xff09;四、编写 docker-compose.yml 文件五、启动容器六、运行后查看启动容器的情况七、连接测试 一、拉取MySQL镜像 我这里使用的是MySQL8.0.18&#xff0c;可以自行选…

pycharm 中package, directory, sources root, resources root的区别

【遇到的问题】 导入yolov5中有utils文件&#xff0c;自己的代码中也有utils文件&#xff0c;使得yolov5中的这部分引用出错了。 【解决方案】 单独建立detection文件夹&#xff0c;把检测相关的都放在这里&#xff0c;yolov5是github上拉取的源码&#xff0c;发现yolov5中fr…

用于设计 CNN 的 7 种不同卷积

一 说明 最近对CNN架构的研究包括许多不同的卷积变体&#xff0c;这让我在阅读这些论文时感到困惑。我认为通过一些更流行的卷积变体的精确定义&#xff0c;效果和用例&#xff08;在计算机视觉和深度学习中&#xff09;是值得的。这些变体旨在保存参数计数、增强推理并利用目标…

scryptTS 新版本发布

scryptTS新版本发布&#xff0c;主要带来两个新特性。 您需要使用以下版本来体验&#xff1a; "dependencies": {"scrypt-ts": "0.1.5-beta.2" },1. scryptTS 中隐藏了交易原像 OP_PUSH_TX 技术 使用 OP_PUSH_TX 可以让合约代码访问整个 trans…

解决:Android Studio 中sdk tools 中库显示不全的问题

问题描述 如下图&#xff0c;打开配置后显示不全 解决方案 这是网络问题&#xff0c;由于Android Studio是goolge旗下的产品&#xff0c;多少需要向外访问 通过更改hosts文件即可&#xff0c;用记事本打开&#xff0c;末尾添加如下三行 203.208.43.97 dl.google.com 20…

1.3python基础语法——PyCharm

1&#xff09;PyCharm的作用 python的集成开发环境&#xff0c;功能如下&#xff1a; Project管理 智能提示 语法高亮 代码跳转 调试代码 解释代码(解释器) 框架和库 2&#xff09;下载与安装 下载地址&#xff1a;http://www.jetbrains.com/pycharm/download/#sectionwind…

flash attention的CUDA编程和二维线程块实现softmax

本文参考了链接添加链接描述 flash attention介绍 flash attention的介绍可以参考论文:FlashAttention: Fast and Memory-Efficient Exact Attention with IO-Awareness,具体的数学公式参考下面这个图片:其中注意关于矩阵S有两个维度,softmax的操作维度是dim=1,用pytorc…

python 全网最优雅命令行参数解析, 没有之一

背景 我们在编写python程序时&#xff0c;程序中经常会提供多种功能或者模式&#xff0c;在实际使用时根据不同的参数使用不同的功能。那么如何获取命令行传入进来的参数呢&#xff1f; 一般方法 一般情况下&#xff0c;我们会使用 sys 模块&#xff0c;如&#x1f447; im…

Python - 小玩意 - 请求网络地址获取网页链接

from bs4 import BeautifulSoup from urllib import request # 要请求的网络地址 url https://blog.csdn.net/qq_43116031/ # pip --default-timeout500000 install bs4 # 请求网络地址得到html网页代码 html request.urlopen(url)# 整理代码 soup BeautifulSoup(html, html…

深度学习论文: ISTDU-Net:Infrared Small-Target Detection U-Net及其PyTorch实现

深度学习论文: ISTDU-Net&#xff1a;Infrared Small-Target Detection U-Net及其PyTorch实现 ISTDU-Net&#xff1a;Infrared Small-Target Detection U-Net PDF: https://doi.org/10.1109/LGRS.2022.3141584 PyTorch代码: https://github.com/shanglianlm0525/CvPytorch PyTo…

GPT,GPT-2,GPT-3,InstructGPT的进化之路

ChatGPT 火遍圈内外&#xff0c;突然之间&#xff0c;好多人开始想要了解 NLP 这个领域&#xff0c;想知道 ChatGPT 到底是个什么&#xff1f;作为在这个行业奋斗5年的从业者&#xff0c;真的很开心让人们知道有一群人在干着这么样的一件事情。这也是我结合各位大佬的文章&…