大数据-85 Spark 集群 RDD创建 RDD-Action Key-Value RDD详解 RDD的文件输入输出

news2025/1/23 2:10:21

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
    HDFS(已更完)
    MapReduce(已更完)
    Hive(已更完)
    Flume(已更完)
    Sqoop(已更完)
    Zookeeper(已更完)
    HBase(已更完)
    Redis (已更完)
    Kafka(已更完)
    Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • RDD 的创建
  • 从集合创建RDD、从文件创建RDD、从RDD创建RDD
  • RDD操作算子:Transformation 详细解释
    Transformation

RDD的操作算子分为两类:

  • Transformation,用来对RDD进行转换,这个操作时延迟执行的(或者是Lazy),Transformation,返回一个新的RDD
  • Action,用来触发RDD的计算,得到相关计算结果或者将结果保存到外部系统中,Action:返回int、double、集合(不会返回新的RDD)

上节完成了Transformation

Action

Action 用来触发RDD的计算,得到相关的计算结果

  • collect()/collectAsMap()
  • stats/count/mean/stdev/max/min
  • reduce(func)/fold(func)/aggregate(func)
  • first() 返回第一个RDD
  • take(n) 从开头拿一定数量的RDD
  • top(n) 按照将需或指定排序规则 返回前num个元素
  • takeSample 返回采样数据
  • froeach / foreachPartition 与 map、mapPartition类似
  • saveAsTextFile/saveAsSequenceFile/saveAsObjectFile

Key-Value RDD

RDD整体上分为 Value类型 和 Key-Value类型
前面介绍是 Value类型的RDD操作,实际上使用更多的是Key-Value类型的RDD,也称为 PairRDD

  • Value类型的RDD操作基本集中在RDD.scala中
  • Key-Value类型的RDD操作集中在PairRDDFunctions.scala中

创建PairRDD

val arr = (1 to 10).toArray
val arr1 = arr.map(x => (x, x*10, x*100))

# rdd1 不是 Pair RDD
val rdd1 = sc.makeRDD(arr1)

# rdd2 是 Pair RDD
val arr2 = arr.map(x => (x, (x*10, x*100)))
val rdd2 = sc.makeRDD(arr2)

运行查看如下的结果:
在这里插入图片描述

Transformation操作

mapValues

# mapValues代码更简洁
val a = sc.parallelize(List((1,2),(3,4),(5,6)))
val b = a.mapValues(x => 1 to x)
b.collect

运行结果如下图:
在这里插入图片描述

flatMapValues

将values压平、拍平

val c = a.flatMapValues(x => 1 to x)
c.collect

执行结果如下图所示:
在这里插入图片描述

groupByKey

键值对的key表示图书名称,value表示某天图书销量。计算每个键对应的平均值,也就是计算每种图书的每天平均销量。

val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26),("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25),("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16)))

# 三种写法
rdd.groupByKey().map(x => (x._1, x._2.sum.toDouble/x._2.size)).collect

rdd.groupByKey().map{case (k, v) => (k,v.sum.toDouble/v.size)}.collect

rdd.groupByKey.mapValues(v => v.sum.toDouble/v.size).collect

执行结果如下图所示:
在这里插入图片描述

reduceByKey

这种方式也可以
rdd.mapValues((_, 1)).reduceByKey((x, y)=> (x._1+y._1, x._2+y._2)).mapValues(x => (x._1.toDouble / x._2)).collect()

foldByKey

rdd.mapValues((_, 1)).foldByKey((0, 0))((x, y) => {
(x._1+y._1, x._2+y._2)}).mapValues(x=>x._1.toDouble/x._2).collect

执行结果如下图所示:
在这里插入图片描述

sortByKey

根据key来进行排序

val a = sc.parallelize(List("wyp", "iteblog", "com","397090770", "test"))
val b = sc.parallelize(1 to a.count.toInt)
val c = a.zip(b)
c.sortByKey().collect
c.sortByKey(false).collect

执行如下图所示:
在这里插入图片描述

cogroup

val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"),(3,"Kylin"), (4,"Flink")))
val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"赵六"),(6,"冯七")))

# join
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect.foreach(println)

rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect

执行结果如下图所示:
在这里插入图片描述

outerjoin

# 不同的JOIN操作
rdd1.join(rdd2).collect
rdd1.leftOuterJoin(rdd2).collect
rdd1.rightOuterJoin(rdd2).collect
rdd1.fullOuterJoin(rdd2).collect

执行结果如下图所示:
在这里插入图片描述

lookup

rdd1.lookup("1")
rdd1.lookup(3)

执行结果如下图所示:
在这里插入图片描述

文件输入输出

文本文件

  • 数据读取:textFile(String),可指定单个文件,支持通配符
  • 返回值RDD[(String, Sting)],其中Key是文件的名称,Value是文件的内容。
  • 数据保存:saveAsTextFile(String) 指定输出目录

csv文件

读取CSV(Comma-Separated Values)/TSV(Tab-Separaed Values)数据和读取JSON数据相似,都需要先把文件当做普通文件来读取数据,然后通过将每一行进行解析实现对CSV的提取。
CSV/TSV 数据的输出也是需要将结构化RDD通过相关库转换成字符串的RDD,然后使用Spark的文本文件API写出去

JSON文件

如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析
JSON数据的输出主要通过在输出之前将由结构化数据组成的RDD转换为字符串RDD,然后使用Spark的文本文件API写出去
JSON文件的处理使用 SparkSQL 最为简洁。

SequenceFile

SequenceFile文件是Hadoop用存储二进制形式的Key-Value而设计的一种平面文件(FlatFile)。
Spark有专门用来读取SequenceFile的接口,在SparkContext中,可以调用:SequenceFile[keyClass, valueClass];
调用 saveAsSequenceFile(path)保存PairRDD,系统将键和值能够自动转换为Writable类型。

对象文件

对象文件是序列化后保存的文件,采用Java的序列化机制。
通过 objectFile 接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出,因为序列化所以要指定类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2044138.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习:knn算法

1、概述 全称是k-nearest neighbors,通过寻找k个距离最近的数据,来确定当前数据值的大小或类别。K-近邻算法是一种基本而又有效的机器学习算法,用于分类和回归任务。它属于实例学习方法,或者说是一种基于规则的记忆方法。 2、基本…

STM32外设篇:MPU6050

MPU6050简介 MPU6050是一个6轴姿态传感器,可以测量芯片自身X、Y、Z轴的加速度、角速度参数,通过数据融合可进一步得到姿态角(欧拉角),常应用于平衡小车、飞行器等需要检测自身姿态的场景。 3轴加速度计3轴陀螺仪传感…

各种国产操作系统,一个 U 盘搞定

熟悉 Windows 装机的朋友对老毛桃和大白菜这类装机工具应该不陌生。这两款流行的工具可以用来制作启动盘,方便进行系统安装、备份和还原等操作。它们集成了多种磁盘工具,并支持一个启动 U 盘安装多个版本的 Windows 系统,如 Windows 7、Windo…

css中的高度塌陷

CSS高度塌陷(或称为高度坍塌)是指在某些特定情况下,元素的高度无法被正确计算或显示的现象。这通常发生在具有浮动属性的元素或使用绝对定位的元素周围。 原因 高度塌陷通常发生在父元素包含着一个或多个浮动元素时。由于浮动元素被移出了正…

ChatTTS部署

1、创建conda环境 conda create -n TTS python3.10 conda activate TTS2、拉取源代码 # 从 GitHub 下载代码 git clone https://github.com/2noise/ChatTTS cd ChatTTS拉取模型文件 git clone https://www.modelscope.cn/pzc163/chatTTS.git ChatTTS-Model3、安装环境依赖 …

UDP详解/消息边界

本文旨在解释了为什么说UDP是不可靠,到底什么是UDP的消息边界,以及UDP是否会出现粘包和半包的问题 概念 UDP协议是一种面向非连接的协议,面向非连接指的是在正式通信前不必与对方先建立连接,不管对方状态就直接发送,至于对方是否可以接收到这些数据内容,UDP协议无法控制…

HTML+CSS进阶用法 (下)——移动端适配、媒体查询和响应式布局

欢迎来到移动端适配方案的介绍!随着移动互联网的快速发展,越来越多的用户通过手机和平板电脑访问网站。为了确保网站能够在各种设备上呈现出最佳的视觉效果和用户体验,我们需要采取有效的适配策略。本篇文章将带你了解几种常用的适配方法&…

记事本打不开(保姆级教程)

问题可能是这样的: 1. 应用程序故障:记事本程序可能遇到了临时的应用程序故障或错误。 2. 系统文件损坏:系统文件损坏或丢失可能导致记事本无法正常启动。 3. 注册表问题:注册表中的条目错误或缺失可能影响记事本的加载。 4. 输入…

Blender的Python编程介绍

在Blender这个免费的开源3D设计软件中,最值得称道的一点是可以用Python程序来辅助进行3D设计,我们可以通过Python来调整物体的属性,生成新的物体,甚至生成新的动画等等。 在最近的一个项目中,我用Blender制作了一个动…

PVE 系统下虚拟机数据盘从IDE转换为VIRIO

一、卸载已经挂载的 IDE 数据盘 [rootlocalhost ~]# df -h 文件系统 容量 已用 可用 已用% 挂载点 /dev/mapper/centos-root 29G 897M 29G 4% / devtmpfs 909M 0 909M 0% /dev tmpfs 920M 0 920M 0% /dev/shm tmpfs 920M 8.5M 912M 1% /run tmpfs 920M 0 920M 0% /sys/fs/cgro…

nginx的平滑升级及版本回滚

官方源码包下载地址:nginx: download 一、编译安装Nginx-1.24.0 [rootNginx ~]# dnf install gcc pcre-devel zlib-devel openssl-devel -y [rootNginx ~]# mkdir /nginx #创建目录,将nginx-1.24.0.tar.gz放在这个目录里 [rootNginx nginx]# tar…

C++的动态数组以及std:vector的优化

文章目录 静态数组动态数组代码背景第一种打印方式:使用 for 循环和索引解释 第二种打印方式:使用基于范围的 for 循环解释改进方式:避免拷贝 总结清理数组 代码示例代码分析输出结果总结 代码示例代码详解总结使用 reserve 的优点:使用 empl…

【考研数学】定积分应用——旋转体体积的计算(一文以蔽之)

目录 一、如何计算旋转体体积?思考一个小例子 二、旋转体体积的二重积分表达式 三、用真题,小试牛刀 定积分的应用中,有一类题是求解旋转体的体积问题。 相较于记忆体积计算公式,有一种通法求解体积更不容易出错:二重…

系统数据库介绍及实践

目录 案例 【题目】 【问题 1】(8 分) 【问题 2】(13 分) 【问题 3】(4 分) 【答案】 【问题 1】解析 【问题 2】解析 【问题 3】解析 相关推荐 案例 阅读以下关于应用系统数据架构的说明,在答题纸上回答问题 1 至问题 3。 【题目】 某软件公司拟开发一套…

svn文件定时全量备份

在win11操作系统中,使用定时任务脚本的方式实现对SVN文件的定时备份 SVN备份脚本 1 创建脚本simpleBackup.bat 该脚本主要用于实现备份过程的信息展示 echo 正在备份版本库%1...... md %BACKUP_DIRECTORY%\%2 %SVN_HOME%\bin\svnadmin hotcopy %1 %BACKUP_D…

蓝桥杯 双周赛 第16场 强者赛 题目复盘 (2024年8月10日)

6. 花魁之争 解题思路: 根据题意,对于每一次操作,每个仙女来说都取最优解,那第一次每个仙女都操作一次,这时候胜出的仙女,是一定赢的。所以,只要计算n个字符串操作一次的最优字符串,…

HarmonyOS NEXT - 通过 module 模块化引用公共组件和utils

demo 地址: https://github.com/iotjin/JhHarmonyDemo 代码不定时更新,请前往github查看最新代码 HarmonyOS NEXT 一、HAP & HSP & HAR介绍HAP官方介绍HAR官方介绍HSP官方介绍怎么理解App、HAP、HAR的关系HAR如何转换为HSPHSP模块如何快速切换成HAR模块 二…

【JavaSE】基础知识复习(六)(不全)

1.File与IO流 File类就是代表系统的文件 / 目录,IO流是用来处理File类的 File类 构造器 分隔符(三种) 第三种File.separator是跨平台的,获取当前操作系统的分隔符 常用方法 length() 返回文件大小(字节),如果是目录&am…

EcoDev Studio 与 gitlab【拉取项目,切换分支,再修改提交】

1 安装git工具 https://blog.csdn.net/mukes/article/details/115693833 2 创建空项目 3 推送gitlab 1、进入本地该项目目录下,右键Git Bash Here打开git命令窗口 2、初始化本地仓库: git init将本地项目的所有文件添加到暂存区: git a…

EMC学习笔记2——电磁兼容问题分析

分析一个电磁兼容问题一般从三方面入手,分别是骚扰源、敏感源、耦合路径。解决掉其中一个问题,就能解决大部分的电磁兼容问题。 例如:当骚扰源是雷电时,敏感源是电子线路时,我们需要消除的就是耦合电路。 耦合路径就是…