Spark---RDD算子(单值类型转换算子)

news2024/11/25 0:23:36

文章目录

  • 1.RDD算子介绍
  • 2.转换算子
      • 2.1 Value类型
          • 2.1.1 map
          • 2.1.2 mapPartitions
          • 2.1.3 mapPartitionsWithIndex
          • 2.1.4 flatMap
          • 2.1.5 glom
          • 2.1.6 groupBy
          • 2.1.7 filter
          • 2.1.8 sample
          • 2.1.9 distinct
          • 2.1.10 coalesce
          • 2.1.11 repartition
          • 2.1.12 sortBy

1.RDD算子介绍

RDD算子是用于对RDD进行转换(Transformation)或行动(Action)操作的方法或函数。通俗来讲,RDD算子就是RDD中的函数或者方法,根据其功能,RDD算子可以分为两大类:
转换算子(Transformation): 转换算子用于从一个RDD生成一个新的RDD,但是原始RDD保持不变。常见的转换算子包括map、filter、flatMap等,它们通过对RDD的每个元素执行相应的操作来生成新的RDD。
行动算子(Action): 行动算子触发对RDD的实际计算,并返回计算结果或将结果写入外部存储系统。与转换算子不同,行动算子会导致Spark作业的执行。如collect方法。

2.转换算子

RDD 根据数据处理方式的不同将算子整体上分为:
Value 类型:对一个RDD进行操作或行动,生成一个新的RDD。
双 Value 类型:对两个RDD进行操作或行动,生成一个新的RDD。
Key-Value类型:对键值对进行操作,如reduceByKey((x, y),按照key对value进行合并。

2.1 Value类型

2.1.1 map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

函数定义
def map[U: ClassTag](f: T => U): RDD[U]

代码实现:

    //建立与Spark框架的连接
    val rdd = new SparkConf().setMaster("local[*]").setAppName("RDD") //配置文件
    val sparkRdd = new SparkContext(rdd) //读取配置文件

    val mapRdd: RDD[Int] = sparkRdd.makeRDD(List(1, 2, 3, 4))
    //对mapRdd进行转换
    val mapRdd1 = mapRdd.map(num => num * 2)
    //对mapRdd1进行转换
    val mapRdd2 = mapRdd1.map(num => num + "->")

    mapRdd2.collect().foreach(print)

    sparkRdd.stop();//关闭连接

在这里插入图片描述

2.1.2 mapPartitions

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

函数定义
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。

mapPartitions在处理数据的时候因为是批处理,相对于map来说处理效率较高,但是如果数据量较大的情况下使用mapPartitions可能会造成内存溢出,因为mapPartitions会将分区内的数据全部加载到内存中。此时更推荐使用map。

2.1.3 mapPartitionsWithIndex

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

函数定义
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

实现只保留第二个分区的数据

    val mapRdd: RDD[Int] = sparkRdd.makeRDD(List(1, 2, 3, 4),2)
    val newRdd: RDD[Int] = mapRdd.mapPartitionsWithIndex((index, iterator) => {
      if (index == 1) iterator
      else Nil.iterator
    })
    newRdd.collect().foreach(println)
2.1.4 flatMap

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

       //建立与Spark框架的连接
    val rdd = new SparkConf().setMaster("local[*]").setAppName("RDD") //配置文件
    val sparkRdd = new SparkContext(rdd) //读取配置文件

    val rdd1: RDD[List[Int]] = sparkRdd.makeRDD(List(List(1, 2), List(3, 4)))
    val rdd2: RDD[String] = sparkRdd.makeRDD(List("Hello Java", "Hello Scala"), 2)

    val frdd1: RDD[Int] =rdd1.flatMap(list=>{list})
    val frdd2: RDD[String] =rdd2.flatMap(str=>str.split(" "))

    frdd1.collect().foreach(println)
    frdd2.collect().foreach(println)
    sparkRdd.stop();//关闭连接

在这里插入图片描述

2.1.5 glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变,glom函数的作用就是将一组数据转换为数组。

函数定义
def glom(): RDD[Array[T]]

    /建立与Spark框架的连接
    val rdd = new SparkConf().setMaster("local[*]").setAppName("RDD") //配置文件
    val sparkRdd = new SparkContext(rdd) //读取配置文件

    val rdd1: RDD[Any] = sparkRdd.makeRDD(List(1,2,3,4),2)
    val value: RDD[Array[Any]] = rdd1.glom()
    value.collect().foreach(data=> println(data.mkString(",")))
    sparkRdd.stop();//关闭连接

在这里插入图片描述

2.1.6 groupBy

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。 极限情况下,数据可能被分在同一个分区中

函数定义
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

	    //按照奇偶分组
    val rdd1: RDD[Int] = sparkRdd.makeRDD(List(1,2,3,4),2)
    val value = rdd1.groupBy(num => num % 2)
    value.collect().foreach(println)
    
    //将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。
    val rdd2: RDD[String] = sparkRdd.makeRDD(List("Hello", "hive", "hbase", "Hadoop"))
    val value1: RDD[(Char, Iterable[String])] = rdd2.groupBy(str => {
      str.charAt(0)
    })
    value1.collect().foreach(println)

在这里插入图片描述

2.1.7 filter

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

函数定义
def filter(f: T => Boolean): RDD[T]

	//获取偶数
    val dataRDD = sparkRdd.makeRDD(List(
      1, 2, 3, 4
    ), 1)
    val value1 = dataRDD.filter(_ % 2 == 0)
2.1.8 sample

函数定义
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]

根据指定的规则从数据集中抽取数据

参数具体意义:
1.抽取数据不放回
 withReplacement: Boolean, 该参数表示抽取不放回,此时采用伯努利算法(false)
 fraction: Double,该参数表示抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
 seed: Long = Utils.random.nextLong): RDD[T] 该参数表示随机数种子

2.抽取数据放回
 withReplacement: Boolean, 该参数表示抽取放回,此时采用泊松算法(true)
 fraction: Double,该参数表示重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
 seed: Long = Utils.random.nextLong): RDD[T] 该参数表示随机数种子
2.1.9 distinct

将数据集中重复的数据去重

def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

    val dataRDD = sparkRdd.makeRDD(List(
      1, 2, 3, 4, 1, 2
    ), 6)
    val value = dataRDD.distinct()

在这里插入图片描述

2.1.10 coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]

    //初始Rdd采用6个分区
    val dataRDD = sparkRdd.makeRDD(List(
      1, 2, 3, 4, 1, 2
    ), 6)
    //将分区数量缩减至2个
    val value = dataRDD.coalesce(2)

在coalesce中默认不开启shuffle,在进行分区缩减的时候,数据不会被打散。
在这里插入图片描述

2.1.11 repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

repartition内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程。
在这里插入图片描述

	//将分区数量从2个提升至4个
    val dataRDD = sparkRdd.makeRDD(List(
      1, 2, 3, 4, 1, 2
    ), 2)
    val dataRDD1 = dataRDD.repartition(4)
2.1.12 sortBy

该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

def sortBy[K](
f: (T) => K, 该参数表述用于处理的函数
ascending: Boolean = true, 该参数表示是否升序排序
numPartitions: Int = this.partitions.length) 该参数表示设置分区数量
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

    val dataRDD = sparkRdd.makeRDD(List(
      1, 2, 3, 4, 1, 2
    ), 2)
    //按照初始数据降序排列
    val dataRDD1 = dataRDD.sortBy(num => num, false, 4)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1366642.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis(三)持久化

文章目录 RDB(Redis Database)自动触发保存频率修改dump文件保存路径修改文件保存名称dump恢复 手动触发save![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/a56fdff44aee4efa96c2ce3615b69dc1.png)bgsave 优劣优点缺点 检查修复dump文件会触…

算法基础之耍杂技的牛

耍杂技的牛 核心思想&#xff1a; 贪心 推公式&#xff1a; 将 i 和 i1 个奶牛交换位置 比较交换位置后的危险系数最大值若Wi Si > Wi1 Si1 则交换前大 交换后更优 需要交换因此 按照WS从小到大排序 就是最优解 再计算危险系数 #include<iostream>#include<…

Spring学习之——事务控制

Spring中的事务控制 说明&#xff1a; JavaEE体系进行分层开发&#xff0c;事务处理位于业务层&#xff0c;Spring提供了分层设计业务层的事务处理解决方案。 Spring框架为我们提供了一组事务控制的接口。具体在后面的小节介绍。这组接口是在spring-tx.RELEASE.jar中。 spri…

24分+的医药顶刊带你学习表观组学解析超级热点“肿瘤耐药”的机制

对癌症患者采用治疗干预时获得性耐药是转移性癌症复发的主要原因。此前&#xff0c;获得性耐药发展的研究主要集中在识别耐药肿瘤中常见的基因突变。越来越多的证据表明&#xff0c;在永久性获得性耐药出现之前&#xff0c;癌症中存在一种表观遗传调控的可逆耐药状态&#xff0…

[足式机器人]Part3 机构运动学与动力学分析与建模 Ch00-1 坐标系与概念基准

本文仅供学习使用&#xff0c;总结很多本现有讲述运动学或动力学书籍后的总结&#xff0c;从矢量的角度进行分析&#xff0c;方法比较传统&#xff0c;但更易理解&#xff0c;并且现有的看似抽象方法&#xff0c;两者本质上并无不同。 2024年底本人学位论文发表后方可摘抄 若有…

关爱服务 |“冬日暖情”送温暖乐善公益行志愿服务活动(第十七期)

为大力弘扬“学习雷锋、奉献他人、提升自己”的志愿精神&#xff0c;有效整合动员全民志愿服务资源&#xff0c;全面推进清远市志愿服务事业发展。机构将以“三关爱”活动为主题&#xff0c;积极开展关爱他人、关爱自然、关爱社会志愿服务活动&#xff0c;积极宣传、倡导志愿者…

docker swarm 常用命令简介以及使用案例

docker swarm Docker Swarm 是Docker官⽅的跨节点的容器编排⼯具。⽤户只需要在单⼀的管理节点上操作&#xff0c;即可管理集群下的所有节点和容器 解决的问题 解决docker server的集群化管理和部署Swarm通过对Docker宿主机上添加的标签信息来将宿主机资源进⾏细粒度分区&am…

txt文档里筛选出重复数据,并保存到新的txt文档

txt文档里筛选出重复数据&#xff0c;并保存到新的txt文档 input_file rD:\pythonXangmu\quchong\input_file.txt #原始文档 #output_file output.txt#重复内容记录文档 output_file rD:\pythonXangmu\quchong\output.txt#绝对路径&#xff0c;解决报错找不到文件或文件夹 w…

20.Activity跳转时的参数传递

(1).如何传递数据 (2).如何接收数据 (3).如何回传数据

基于深度学习的PCB板缺陷检测系统(含UI界面、yolov5、Python代码、数据集)

项目介绍 项目中所用到的算法模型和数据集等信息如下&#xff1a; 算法模型&#xff1a;     yolov5 yolov5主要包含以下几种创新&#xff1a;         1. 添加注意力机制&#xff08;SE、CBAM、CA等&#xff09;         2. 修改可变形卷积&#xff08;DySnake-主…

【现代密码学】笔记2 -- 完善保密性《introduction to modern cryphtography》现代密码学原理与协议

【现代密码学】笔记2--完善保密性《introduction to modern cryphtography》 写在最前面2 完善保密性的介绍2.1 定义和基本属性加密方案的组成密钥产生算法 (Gen)加密算法 (Enc)解密算法 (Dec)概率分布独立性 完美保密加密3. 回顾加密词法4. 完美保密&#xff08;**Perfect Sec…

【书生·浦语大模型实战营03】《基于 InternLM 和 LangChain 搭建你的知识库》学习笔记

《基于 InternLM 和 LangChain 搭建你的知识库》 常见术语 RAG: Retrieval Augmented Generation&#xff0c;检索增强生成 1. 大模型开发范式 1.1 RAG VS Finetune RAGFinetune低成本可个性化微调可实时更新知识覆盖面广受基座模型影响大成本高昂单次回答知识有限无法实时…

Maven之多环境配置与应用

多环境配置与应用 1. 多环境配置作用 maven提供配置多种环境的设定&#xff0c;帮助开发者使用过程中快速切换环境 2. 多环境配置步骤 2.1 定义多环境 <!--定义多环境--> <profiles><!--定义具体的环境&#xff1a;生产环境--><profile><!--定义…

Spring——Spring的事务控制(1)基础篇

Spring事务控制 1.事务介绍 1.1.什么是事务&#xff1f; 当你需要一次执行多条SQL语句时&#xff0c;可以使用事务。通俗一点说&#xff0c;如果这几条SQL语句全部执行成功&#xff0c;则才对数据库进行一次更新&#xff0c;如果有一条SQL语句执行失败&#xff0c;则这几条S…

短视频矩阵系统+无人直播源码+视频批量分发----开发实践

核心技术 1. AI自动直播&#xff1a; 智能系统通过丰富可定制的文案库&#xff0c; 拥有有料有趣的灵魂。不仅能自动语音讲解内容&#xff0c;还可以在直播中和用户灵活互动。直播中可将团购商品同话术自动上下架。 2. AI剪辑 可一键智能批量成片&#xff0c;也可跟着模板剪…

寒假前端第一次作业

1、用户注册&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>用户注册</title> …

红日靶场 4

靶场配置 ​ 733 x 668899 x 819 ​ ​ 733 x 6161466 x 1232 ​ ​ 733 x 6261449 x 1237 ​ ​ 733 x 6301450 x 1247 ​ IP 地址分配&#xff1a; Win7: 192.168.183.133(内网)Ubuntu: 192.168.183.134(内网) 192.168.120.137(外网)DC: 192.168.183.130(内网)Kali…

并发(6)

目录 36.什么是CAS&#xff1f; 37.CAS使用示例&#xff0c;结合AtomicInteger给出示例&#xff1f; 38.CAS会有哪些问题&#xff1f; 39.AtomicInteger底层实现&#xff1f; 40.请阐述你对Unsafe类的理解&#xff1f; 36.什么是CAS&#xff1f; CAS的全称为Compare&#…

QT DAY1作业

1.QQ登录界面 头文件代码 #ifndef MYWIDGET_H #define MYWIDGET_H#include <QWidget> #include <QIcon> #include <QLabel> #include <QPushButton> #include <QMovie> #include <QLineEdit>class MyWidget : public QWidget {Q_OBJECTpu…

LLM之LangChain(一)| LangChain六大核心模块简要汇总

声明&#xff1a;本文主要内容来自以下书籍《LangChain入门指南&#xff1a;构建高可复用、可扩展的LLM应用程序》和LangChain官网&#xff0c;非常感谢作者的贡献&#xff0c;由于作者有版权限制&#xff0c;因此在这里声明&#xff0c;如果涉及侵权&#xff0c;请联系我删除此…