RDD算子使用----transformation转换操作

news2024/12/23 5:22:33

RDD算子使用

transformation转换操作

map算子

rdd.map(p: A => B):RDD,对rdd集合中的每一个元素,都作用一次该func函数,之后返回值为生成元素构成的一个新的RDD。

val sc = new SparkContext(conf)

//map 原集合*7

val list = 1 to 7

//构建一个rdd

val listRDD:RDD[Int] = sc.parallelize(list)

//listRDD.map((num:Int) => num * 7)

//listRDD.map(num => num * 7)

val ret = listRDD.map(_ * 7)

ret.foreach(println)

flatMap算子

rdd.flatMap(p: A => 集合):RDD ==>rdd集合中的每一个元素,都要作用func函数,返回0到多个新的元素,这些新的元素共同构成一个新的RDD。

def  flatMapOps(sc:SparkContext): Unit = {

val list = List(

"jia jing kan kan kan",

"gao di di  di di",

"zhan yuan qi qi"

)

val listRDD = sc.parallelize(list)

listRDD.flatMap(line => line.split("\\s+"))

.foreach(println)

}

mapPartitions算子

mapPartitions(p: Iterator[A] =>Iterator[B]),上面的map操作,一次处理一条记录;而mapPartitions一次性处理一个partition分区中的数据。

注意:虽说mapPartitions的执行性能要高于map,但是其一次性将一个分区的数据加载到执行内存空间,如果该分区数据集比较大,存在OOM的风险。

//创建RDD并指定分区数

val array = 1 to 10

val listRDD:RDD[Int] = sc.parallelize(array,2)
//通过-将分区之间的数据连接
val result: RDD[String] = rdd.mapPartitions(x=>Iterator(x.mkString("-")))
//打印输出
println(result.collect().toBuffer)

mapPartitionsWithIndex算子

mapPartitionsWithIndex((index,p:Iterator[A]=>Iterator[B])),该操作比mapPartitions多了一个index,代表就是后面p所对应的分区编号:rdd的分区编号,命名规范,如果有N个分区,分区编号就从0,...,N-1。

val rdd: RDD[Int] = sc.parallelize(1 to 16,4)
//查看每个分区当中都保存了哪些数据
val result: RDD[String] = rdd.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(",")))
//打印输出
result.foreach(println)

sample算子

1)sample(withReplacement, fraction, seed):随机抽样算子,sample主要工作就是为了来研究数据本身,去代替全量研究会出现类似数据倾斜(dataSkew)等问题,无法进行全量研究,只能用样本去评估整体。

2)withReplacement:Boolean:有放回的抽样和无放回的抽样。

3)fraction:Double:样本空间占整体数据量的比例,大小在[0,1]。

4)seed:Long:是一个随机数的种子,有默认值,通常不需要传参。

需要说明一点的是,这个抽样是一个不准确的抽样,抽取的结果数可能在准确的结果上下浮动。

def sampleOps(sc: SparkContext): Unit = {

val list = sc.parallelize(1 to 100000)

val sampled1 = list.sample(true, 0.01)

println("sampled1 count: " + sampled1.count())

val sampled2 = list.sample(false, 0.01)

println("sampled2 count: " + sampled2.count())

}

union算子

rdd1.union(rdd2)相当于sql中的union all,进行两个rdd数据间的联合,需要说明一点是,该union是一个窄依赖操作,rdd1如果有N个分区,rdd2有M个分区,那么union之后的分区个数就为N+M。

val rdd1 = sc.parallelize(1 to 5,3)
//查看每个分区当中都保存了哪些数据
rdd1.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
val rdd2: RDD[Int] = sc.parallelize(5 to 7,2)
//查看每个分区当中都保存了哪些数据
rdd2.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
//union整合
val result: RDD[Int] = rdd1.union(rdd2)
//获取数据
println(result.collect().toBuffer)
//查看每个分区当中都保存了哪些数据
result.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
//获取分区数
println(result.getNumPartitions)

join算子

1)rdd1.join(rdd2) 相当于sql中的join连接操作

A(id) a, B(aid) b

select * from A a join B b on a.id = b.aid

2)交叉连接: across join

select * from A a across join B ====>这回产生笛卡尔积。

3)内连接: inner join,提取左右两张表中的交集

select * from A a inner join B on a.id = b.aid 或者

select * from A a, B b where a.id = b.aid

4)外连接:outer join

5)左外连接:left outer join 返回左表所有,右表匹配返回,匹配不上返回null

select * from A a left outer join B on a.id = b.aid

6)右外连接:right outer join 刚好是左外连接的相反

select * from A a left outer join B on a.id = b.aid

7)全连接:full join

8)全外连接:

full outer join = left outer join + right outer join

前提:要先进行join,rdd的类型必须是K-V。

对join操作可以归纳为如图-22所示。

图-22 sql 各种join连接图

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val rdd1: RDD[(Int, String)] = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
val rdd2: RDD[(Int, Int)] = sc.parallelize(Array((1,4),(2,5),(5,6)))
//join操作
val result: RDD[(Int, (String, Int))] = rdd1.join(rdd2)
//打印输出
println(result.collect().toBuffer)
//leftOutJoin操作
val result1: RDD[(Int, (String, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
//打印输出
println(result1.collect().toBuffer)
//rightOuterJoin
val result2: RDD[(Int, (Option[String], Int))] = rdd1.rightOuterJoin(rdd2)
//打印输出
println(result2.collect().toBuffer)
//fullOuterJoin
val result3: RDD[(Int, (Option[String], Option[Int]))] = rdd1.fullOuterJoin(rdd2)
//打印输出
println(result3.collect().toBuffer)
}

coalesce算子

1)coalesce(numPartition, shuffle=false):分区合并的意思。

2)numPartition:分区后的分区个数。

3)shuffle:此次重分区是否开启shuffle,决定当前的操作是宽(true)依赖还是窄(false)依赖。

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val rdd = sc.parallelize(1 to 16,4)
//获取分区数
println(rdd.getNumPartitions)
//查看每个分区当中保存了哪些数据
rdd.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
//缩减分区
val rdd1: RDD[Int] = rdd.coalesce(3)
//获取分区数
println(rdd1.getNumPartitions)
//查看每个分区当中保存的数据变化
rdd1.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
//释放资源

sc.stop()
}

repartition(numPartitions)算子

根据分区数,从新通过网络随机洗牌所有数据。

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)

//设置控制台日志级别
sc.setLogLevel("WARN")
val rdd = sc.parallelize(1 to 16,4)
//获取分区数
println(rdd.getNumPartitions)
//查看每个分区当中保存了哪些数据
rdd.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
//缩减分区
val rdd1: RDD[Int] = rdd.repartition(3)
//获取分区数
println(rdd1.getNumPartitions)
//查看每个分区当中保存的数据变化

rdd1.mapPartitionsWithIndex((index,item)=>Iterator(index+":"+item.mkString(","))).foreach(println)
//释放资源
sc.stop()
}

sortBy(func,[ascending], [numTasks])算子

用func先对数据进行处理,按照处理后的数据比较结果排序。

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
val sc = new SparkContext(conf)

//设置控制台日志输出级别
sc.setLogLevel("WARN")

//加载数据
val rdd= sc.parallelize(List(("a",4),("c",2),("b",1)))

//默认是升序排序,指定false,转为倒叙输出
val rdd1: RDD[(String, Int)] = rdd.sortBy(_._2,false)

//收集结果,返回数组输出
println(rdd1.collect().toBuffer)
}

sortByKey([ascending],[numTasks])算子

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
val sc = new SparkContext(conf)

//设置控制台日志输出级别
sc.setLogLevel("WARN")

//加载数据
val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))

//默认按照key进行升序输出,加false,转为倒叙输出
val result: RDD[(Int, String)] = rdd.sortByKey(false)

//收集结果,返回数组输出
println(result.collect().toBuffer)
}

groupByKey算子

groupByKey(numPartition):[K, Iterable[V]]

按照key来进行分组,numPartition指的是分组之后的分区个数。

这是一个宽依赖操作,但是需要注意一点的是,groupByKey相比较reduceByKey而言,没有本地预聚合操作,显然其效率并没有reduceByKey效率高,在使用的时候如果可以,尽量使用reduceByKey等去代替groupByKey。

def groupByOps(sc: SparkContext): Unit = {

case class Student(id: Int, name:String, province: String)

val stuRDD = sc.parallelize(List(

Student(1, "张三", "安徽"),

Student(2, "李梦", "山东"),

Student(3, "王五", "甘肃"),

Student(4, "周七", "甘肃"),

Student(5, "Lucy", "黑吉辽"),

Student(10086, "魏八", "黑吉辽")

))

//val province2Infos: RDD[(String, Iterable[Student]] = stuRDD.groupBy(stu => stu.province)

val province2Infos: RDD[(String, Iterable[Student])] = stuRDD.map(stu => (stu.province, stu)).groupByKey()



province2Infos.foreach{case (province, stus) => {

println(s"省份:${province}, 学生信息:${stus.mkString(", ")}, 人数:${stus.size}")

}}

}

}

reduceByKey算子

reduceByKey((A1, A2) => A3)

前提不是对全量的数据集进行reduce操作,而是对每一个key所对应的所有的value进行reduce操作。

def reduceByKeyOps(sc: SparkContext): Unit = {

case class Student(id: Int, name:String, province: String)

val stuRDD = sc.parallelize(List(

Student(1, "张三", "安徽"),

Student(2, "李梦", "山东"),

Student(3, "王五", "甘肃"),

Student(4, "周七", "甘肃"),

Student(5, "Lucy", "黑吉辽"),

Student(10086, "魏八", "黑吉辽")

))

val ret = stuRDD.map(stu => (stu.province, 1)).reduceByKey((v1, v2) => v1 + v2)

ret.foreach{case (province, count) => {

println(s"province: ${province}, count: ${count}")

}}

}

foldByKey算子

foldByKey(zeroValue)((A1, A2) => A3),其作用和reduceByKey一样,唯一的区别就是zeroValue初始化值不一样,相当于在scala集合操作中的reduce和fold的区别。

def foldByKeyOps(sc: SparkContext): Unit = {

case class Student(id: Int, name:String, province: String)

val stuRDD = sc.parallelize(List(

Student(1, "张三", "安徽"),

Student(3, "王五", "甘肃"),

Student(5, "Lucy", "黑吉辽"),

Student(2, "李梦", "山东"),

Student(4, "周七", "甘肃"),

Student(10086, "魏八", "黑吉辽")

), 2).mapPartitionsWithIndex((index, partition) => {

val list = partition.toList

println(s"-->stuRDD的分区编号为<${index}>中的数据为:${list.mkString("[", ", ", "]")}")

list.toIterator

})

val ret = stuRDD.map(stu => (stu.province, 1)).foldByKey(0)((v1, v2) => v1 + v2)

ret.foreach{case (province, count) => {

println(s"province: ${province}, count: ${count}")

}}

}

aggregateByKey算子

combineByKey和aggregateByKey的区别就相当于reduceByKey和foldByKey。

def abk2rbk(sc: SparkContext): Unit = {

val array = sc.parallelize(Array(

"hello you",

"hello me",

"hello you",

"hello you",

"hello me",

"hello you"

), 2)

val pairs = array.flatMap(_.split("\\s+")).map((_, 1))

val ret = pairs.aggregateByKey(0)(_+_, _+_)

ret.foreach{case (key, count) => {

println(s"key: ${key}, count: ${count}")

}}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1631541.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

纯血鸿蒙APP实战开发——全局状态保留能力弹窗

全局状态保留能力弹窗 介绍 全局状态保留能力弹窗一种很常见的能力&#xff0c;能够保持状态&#xff0c;且支持全局控制显隐状态以及自定义布局。使用效果参考评论组件 效果图预览 使用说明 使用案例参考短视频案例 首先程序入口页对全局弹窗初始化&#xff0c;使用Globa…

Redis基本數據結構 ― List

Redis基本數據結構 ― List 介紹常用命令範例1. 將元素推入List中2. 取得List內容3. 彈出元素 介紹 Redis中的List結構是一個雙向鏈表。 LPUSH LPOP StackLPUSH RPOP QueueLPUSH BRPOP Queue(消息隊列) 常用命令 命令功能LPUSH將元素推入列表左端RPUSH將元素推入列表右…

ZooKeeper 环境搭建详细教程之三(真集群)

ZooKeeper 搭建详细步骤之三(真集群) ZooKeeper 搭建详细步骤之二(伪集群模式) ZooKeeper 搭建详细步骤之一(单机模式) ZooKeeper 及相关概念简介 真集群搭建 搭建 ZooKeeper 真集群涉及多个步骤,包括准备环境、配置文件设置、启动服务以及验证集群状态。 以下是一个简…

如何基于Zookeeper实现注册中心模型?

在分布式系统中&#xff0c;通常会存在几十个甚至上百个服务&#xff0c;开发人员可能甚至都无法明确系统中到底有哪些服务正在运行。另一方面&#xff0c;我们很难同时确保所有服务都不出现问题&#xff0c;也很难保证当前的服务部署方式不做调整和优化。由于自动扩容、服务重…

天冕科技亮相第十七届深圳国际金融博览会!

第十七届深圳国际金融博览会在深圳会展中心正式开幕&#xff0c;天冕科技跟随南山区组团集体亮相&#xff0c;充分展现金融活力。此次金博会&#xff0c;南山区政府共遴选了包括天冕科技在内的三家优秀金融科技企业组团参展&#xff0c;以特色与创新的案例展示了辖区金融业发展…

Power BI:如何将文件夹批量Excel(多sheet页)文件导入?

故事背景&#xff1a; 业务同事想用Power BI分析近两年市场费用。 数据源全部是Excel文件&#xff0c;并且以每月一个Excel文件的方式&#xff0c;统一存放到同一文件夹下面。 重点&#xff0c;每张Excel文件会有多张sheet页&#xff0c;用区分每家分公司的费用信息。 目前…

照片误删怎么办?华为手机删除的照片如何恢复?

我们在使用华为手机时&#xff0c;可能会因为各种原因不小心删除一些照片。如果这些照片对我们来说很重要&#xff0c;那么恢复它们是非常必要且急迫的。那么华为手机删除的照片如何恢复呢&#xff1f;本文将为您介绍3种恢复华为手机中误删照片的方法。 如何恢复华为手机中被删…

Vue3 v3.4之前如何实现组件中多个值的双向绑定?

文章目录 基础代码1. watch2. computed&#xff08;推荐&#xff09; 官方给的例子是关于el-input的&#xff0c;如下。但是input不是所有组件标签都有的属性啊&#xff0c;有没有一种通用的办法呢&#xff1f; <script setup> defineProps({firstName: String,lastName…

VS2022 配置OpenCV开发环境详细教程

OpenCV OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一个开源的计算机视觉和机器学习软件库&#xff0c;由Intel开发并首先发布于1999年。OpenCV被广泛用于实时图像处理、视频分析、物体检测、面部识别、机器人视觉以及许多其他领域。它支持C、Pytho…

【vscode环境配置系列】vscode远程debug配置

VSCODE debug环境配置 插件安装配置文件debug 插件安装 安装C/C, C/C Runner 配置文件 在项目下建立.vscode文件夹&#xff0c;然后分别建立c_cpp_properties.json&#xff0c; launch.json&#xff0c;tasks.json&#xff0c;内容如下&#xff1a; c_cpp_properties.json:…

机器学习:基于Sklearn、XGBoost框架,使用逻辑回归、支持向量机和XGBClassifier预测帕金森病

前言 系列专栏&#xff1a;机器学习&#xff1a;高级应用与实践【项目实战100】【2024】✨︎ 在本专栏中不仅包含一些适合初学者的最新机器学习项目&#xff0c;每个项目都处理一组不同的问题&#xff0c;包括监督和无监督学习、分类、回归和聚类&#xff0c;而且涉及创建深度学…

Panoptic Domain Adaptive Mask R-CNN (PDAM) 论文总结

论文&#xff08;CVPR会议&#xff09;&#xff1a; Unsupervised Instance Segmentation in Microscopy Images via Panoptic Domain Adaptation and Task Re-weighting &#xff08;TMI期刊&#xff09;&#xff1a;PDAM: A Panoptic-Level Feature Alignment Framework for …

微软如何打造数字零售力航母系列科普03 - Mendix是谁?作为致力于企业低代码服务平台的领头羊,它解决了哪些问题?

一、Mendix 成立的背景 Mendix的成立是为了解决软件开发中最大的问题&#xff1a;业务和IT之间的脱节。这一挑战在各个行业和地区都很普遍&#xff0c;很简单&#xff1a;业务需求通常被描述为IT无法正确解释并转化为软件。业务和IT之间缺乏协作的原因是传统的代码将开发过程限…

[论文笔记]Language Modeling with Gated Convolutional Networks

引言 今天带来论文Language Modeling with Gated Convolutional Networks的笔记&#xff0c;该篇工作提出了GLU(Gated Linear Units&#xff0c;门控线性单元)。 注意该篇工作是2016年发表&#xff0c;是在Transformer论文发表之前。当时作者认为语言建模的主要方法是基于循环…

百度语音识别的springboot应用

1、pom依赖 <dependency> <groupId>com.baidu.aip</groupId> <artifactId>java-sdk</artifactId> <version>4.16.18</version> </dependency> 2、测试的demo 创建语音识别应用 百度智能云-管理中心 (baidu.com) 代码中要…

qt-C++笔记之滑动条QSlider和QProgressBar进度条

qt-C笔记之滑动条QSlider和QProgressBar进度条 —— 2024-04-28 杭州 本例来自《Qt6 C开发指南》 文章目录 qt-C笔记之滑动条QSlider和QProgressBar进度条1.运行2.阅读笔记3.文件结构4.samp4_06.pro5.main.cpp6.widget.h7.widget.cpp8.widget.ui 1.运行 2.阅读笔记 3.文件结构…

ubuntu安装Anaconda安装及conda使用

一. 安装anaconda3详细教程 1、下载镜像 清华大学开源软件镜像站下载地址&#xff1a; https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/ 下拉到最低端选择Linux&#xff0c;选择最新版&#xff08;32/64位&#xff09;下载。这里我下载的是版本Anaconda3-4.3.30-Linux…

《微服务设计》读书笔记

此为阅读纽曼《微服务设计》一书后总结的读书笔记&#xff0c;点此处下载PDF文档。 一、微服务的概念 微服务&#xff08;或称微服务架构&#xff09;是一种云原生架构方法&#xff0c;其核心思想在于将单个应用拆分为众多 小型、松散耦合的服务&#xff0c;服务之间均通过网…

AI视频教程下载:构建一个ChatGPT股票配对交易机器人

ChatGPT及其后续版本GPT-4已经开始改变世界。人们对新机会感到兴奋&#xff0c;同时对我们社会可能受到的影响感到恐惧。这门课程结合了两个主题&#xff1a;AI和财务&#xff08;算法交易&#xff09;。 你将会学到的&#xff1a; 使用ChatGPT构建一个Python配对交易机器人 …

车载系统的 加减串器应用示意

overview 车载系统上使用加减串器来实现camera&#xff0c; led液晶显示屏等 图像数据的远距离传输&#xff0c;将原先在短距离传输视频信号的mipi csi&#xff0c;dsi 等的TX&#xff0c;RX中间&#xff0c;插入加减串器&#xff0c;实现长距离的可靠传输。 示意图如下 往往…