【pyspark速成专家】3_Spark之RDD编程1

news2025/1/23 17:30:20

目录

​编辑

一,创建RDD

二,常用Action操作

三,常用Transformation操作


一,创建RDD

创建RDD主要有两种方式,一个是textFile加载本地或者集群文件系统中的数据,

第二个是用parallelize方法将Driver中的数据结构并行化成RDD。

#从本地文件系统中加载数据
file = "./data/hello.txt"
rdd = sc.textFile(file,3)
rdd.collect()

['hello world',
 'hello spark',
 'spark love jupyter',
 'spark love pandas',
 'spark love sql']

#从集群文件系统中加载数据
#file = "hdfs://localhost:9000/user/hadoop/data.txt"
#也可以省去hdfs://localhost:9000
#rdd = sc.textFile(file,3)

#parallelize将Driver中的数据结构生成RDD,第二个参数指定分区数
rdd = sc.parallelize(range(1,11),2)
rdd.collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

二,常用Action操作

Action操作将触发基于RDD依赖关系的计算。

collect

rdd = sc.parallelize(range(10),5) 
#collect操作将数据汇集到Driver,数据过大时有超内存风险
all_data = rdd.collect()
all_data

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

take

#take操作将前若干个数据汇集到Driver,相比collect安全
rdd = sc.parallelize(range(10),5) 
part_data = rdd.take(4)
part_data

[0, 1, 2, 3]

takeSample

#takeSample可以随机取若干个到Driver,第一个参数设置是否放回抽样
rdd = sc.parallelize(range(10),5) 
sample_data = rdd.takeSample(False,10,0)
sample_data

[7, 8, 1, 5, 3, 4, 2, 0, 9, 6]

first

#first取第一个数据
rdd = sc.parallelize(range(10),5) 
first_data = rdd.first()
print(first_data)

0

count

#count查看RDD元素数量
rdd = sc.parallelize(range(10),5)
data_count = rdd.count()
print(data_count)

10

reduce

#reduce利用二元函数对数据进行规约
rdd = sc.parallelize(range(10),5) 
rdd.reduce(lambda x,y:x+y)

45

foreach

#foreach对每一个元素执行某种操作,不生成新的RDD
#累加器用法详见共享变量
rdd = sc.parallelize(range(10),5) 
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)

45

countByKey

#countByKey对Pair RDD按key统计数量
pairRdd = sc.parallelize([(1,1),(1,4),(3,9),(2,16)]) 
pairRdd.countByKey()

defaultdict(int, {1: 2, 3: 1, 2: 1})

saveAsTextFile

#saveAsTextFile保存rdd成text文件到本地
text_file = "./data/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)

#重新读入会被解析文本
rdd_loaded = sc.textFile(text_file)
rdd_loaded.collect()

['2', '3', '4', '1', '0']

三,常用Transformation操作

Transformation转换操作具有懒惰执行的特性,它只指定新的RDD和其父RDD的依赖关系,只有当Action操作触发到该依赖的时候,它才被计算。

map

#map操作对每个元素进行一个映射转换
rdd = sc.parallelize(range(10),3)
rdd.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

rdd.map(lambda x:x**2).collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

filter

#filter应用过滤条件过滤掉一些数据
rdd = sc.parallelize(range(10),3)
rdd.filter(lambda x:x>5).collect()

[6, 7, 8, 9]

flatMap

#flatMap操作执行将每个元素生成一个Array后压平
rdd = sc.parallelize(["hello world","hello China"])
rdd.map(lambda x:x.split(" ")).collect()

[['hello', 'world'], ['hello', 'China']]

rdd.flatMap(lambda x:x.split(" ")).collect()

['hello', 'world', 'hello', 'China']

sample

#sample对原rdd在每个分区按照比例进行抽样,第一个参数设置是否可以重复抽样
rdd = sc.parallelize(range(10),1)
rdd.sample(False,0.5,0).collect()

[1, 4, 9]

distinct

#distinct去重
rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()

[4, 1, 5, 2, 3]

subtract

#subtract找到属于前一个rdd而不属于后一个rdd的元素
a = sc.parallelize(range(10))
b = sc.parallelize(range(5,15))
a.subtract(b).collect()

[0, 1, 2, 3, 4]

union

#union合并数据
a = sc.parallelize(range(5))
b = sc.parallelize(range(3,8))
a.union(b).collect()

[0, 1, 2, 3, 4, 3, 4, 5, 6, 7]

intersection

#intersection求交集
a = sc.parallelize(range(1,6))
b = sc.parallelize(range(3,9))
a.intersection(b).collect()

[3, 4, 5]

cartesian

#cartesian笛卡尔积
boys = sc.parallelize(["LiLei","Tom"])
girls = sc.parallelize(["HanMeiMei","Lily"])
boys.cartesian(girls).collect()

[('LiLei', 'HanMeiMei'),
 ('LiLei', 'Lily'),
 ('Tom', 'HanMeiMei'),
 ('Tom', 'Lily')]

sortBy

#按照某种方式进行排序
#指定按照第3个元素大小进行排序
rdd = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
rdd.sortBy(lambda x:x[2]).collect()

[(4, 1, 1), (3, 2, 2), (1, 2, 3)]

zip

#按照拉链方式连接两个RDD,效果类似python的zip函数
#需要两个RDD具有相同的分区,每个分区元素数量相同

rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily"])
rdd_age = sc.parallelize([19,18,20])

rdd_zip = rdd_name.zip(rdd_age)
print(rdd_zip.collect())

[('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]

zipWithIndex

#将RDD和一个从0开始的递增序列按照拉链方式连接。
rdd_name =  sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())

[('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1689242.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

UWB论文:Introduction to Impulse Radio UWB Seamless Access Systems(2):脉冲;超宽带;测距;定位

3) 测距/接收器 像全球定位系统(GPS)这样的系统依赖于单向测距One Way Ranging(OWR),其中多个卫星(代表固定节点,称为锚点anchors)定期传输同步的无线电数据包集合,这允许…

C++ 写的_string类,兼容std::string, MFC CString和 C# 的string

代码例子: using namespace lf; int main() { CString s1 _t("http://www.csdn.net"); _string s2 s1; CString s3 s2; _pcn(s1); _pcn(s2); _pcn(s3); return 0; } 输出: _Str.h /***************************************…

一个开源的工具类轮子是怎么造出来的

心路历程 为什么要做 在22年9月的某一天,在公司开需求评审时,接到了一个给PDF、图片添加水印的需求。做为一个刚工作的CURD程序员,在遇到这些问题时,第一反应是去github上找找有没有类似的开源框架。但是,出乎我意料…

JUnit5标记测试用例

使用场景: 通过Tag对用例分组: 环境分组:测试环境、预发布环境阶段分组:冒烟用例版本分组:V1.1、V1.2 Tag标记用例: 设置标签根据标签执行 结合Maven执行结合测试套件执行 设置标签: 通过T…

小皮面板中访问不了本地的sqli网站---解决方法

今天想在sqli-labs中做题,却发现自己访问不了网站 1、具体的错误原因如下 2、查了一下,可能是因为自己访问的域名不对 3、修改了域名为:http://sqli-labs:81/Less-2/便可以访问了 4、然后接下来我有遇到一个错误,这个问题是php版…

Python3 笔记:sort() 和 sorted() 的区别

1、sort() 可以对列表中的元素进行排序,会改变原列表,之前的顺序不复存在。 list.sort(key, reverse None) key:默认值是None,可指定项目进行排序,此参数可省略。 reverse&#…

Java中transient关键字

transient介绍 在Java中,transient是一个关键字,用于声明一个字段在序列化过程中应该被忽略。当一个对象被序列化时,它的状态(即其字段的值)通常会被保存到字节流中,以便稍后可以反序列化恢复对象的状态。…

AI崛起,掌握它,开启智能新生活!

AI崛起,掌握它,开启智能新生活! 😄生命不息,写作不止 🔥 继续踏上学习之路,学之分享笔记 👊 总有一天我也能像各位大佬一样 🏆 博客首页 怒放吧德德 To记录领地 &…

UFS协议—新手快速入门(一)【1-4】

本篇旨在为初学者提供关于通用闪存存储(UFS)的快速入门指南。 目录 一、背景介绍 二、UFS 三、半双工和全双工 (1)半双工(Half-Duplex) (2)全双工(Full-Duplex&…

走向图对比学习:综述与展望

【摘要】近年来,图的深度学习在各个领域取得了显著的成功。然而,对带注释的图形数据的依赖仍然是一个很大的瓶颈,因为它的成本过高且耗费时间。为了应对这一挑战,图的自监督学习(SSL)得到了越来越多的关注,并取得了重大…

[Linux] 进程概念

目录 1.冯诺依曼硬件体系结构 2.操作系统(OS) 3.系统接口 4.进程的概念 5.进程状态 6.四个其他概念 7.环境变量 8.进程地址空间 1.冯诺依曼硬件体系结构 在冯诺依曼体系结构中,计算机是由输入、输出、存储设备和中央处理器cpu组成的。图中体结…

CSS【常用CSS样式、盒子模型、定位、浮动 、扩展样式】--学习JavaEE的day46

day46 CSS 练习 页面实现&#xff1a; 分析&#xff1a; 未优化&#xff1a; 优化&#xff1a; 参考代码&#xff1a;&#xff08;包含样式优化–选择器CSS属性&#xff09; 先写上table方便实现&#xff0c;之后再去除即可 name没有服务器&#xff0c;可暂时不写 <!…

适合做应用的算法-鲸鱼优化算法(WOA)详细原理-附matlab代码

鲸鱼优化算法 (Whale optimization Algorithm, WOA)是 2016 年由 Mirjalili 等提出的一种新型启发式搜索算法,该算法通过模仿座头鲸在海洋中的捕食行为, 对鲸鱼群体搜索、包围和攻击过程模拟实现来寻找最优解. 与传统的元启发式优化算法相比, 鲸鱼优化算法具有操作简单, 需要设…

3D透视图模型转模型变形?---模大狮模型网

3D建模是数字艺术和设计领域中的重要技术&#xff0c;它可以为我们带来丰富多彩的视觉体验和创意表达。在本文中&#xff0c;我们将探讨一个引人注目的话题&#xff1a;3D透视图中模型转换是否会导致变形?通过深入探讨这个问题&#xff0c;我们希望能够帮助您更好地理解在3D建…

Docker安装文档

Docker安装文档 小有所得&#xff0c;生活便充满希望的微光&#xff1b;小有所望&#xff0c;未来便值得期待。小有知足&#xff0c;幸福便如影随形&#xff1b;小有可期&#xff0c;每一天都充满希望。在今日小满之际&#xff0c;感受这小小确幸&#xff0c;心中满是爱意&…

跟TED演讲学英文:Do schools kill creativity by Sir Ken Robinson

Do schools kill creativity? Link: https://www.ted.com/talks/sir_ken_robinson_do_schools_kill_creativity Speaker: Sir Ken Robinson Date: February 2006 文章目录 Do schools kill creativity?IntroductionVocabularySummaryTranscriptAfterword Introduction Sir…

FTP介绍

FTP 1、FTP—文件传输协议 文件传输协议&#xff08;File Transfer Protocol&#xff0c;FTP&#xff09;是用于在网络上进行文件传输的一套标准协议&#xff0c;它工作在 OSI 模型的第七层&#xff0c; TCP 模型的第四层&#xff0c; 即应用层&#xff0c; 使用 TCP 传输&…

攻防世界-mobile-easy-app详解

序言 这道题网上很多分析&#xff0c;但是分析的都是arm版本的&#xff0c;我选了arm64的来分析&#xff0c;arm64相比arm难度高一些&#xff0c;因为arm64编译器搞了inline优化&#xff0c;看起来略抽象 分析 这道题逻辑很简单&#xff0c;输入flag然后一个check函数验证&a…

av_dump_format经验分析,FFmpeg获取媒体文件总时长(FLV获取总时长的误区)

播放器有个功能&#xff0c;当用户打开视频时&#xff0c;需要读取媒体文件的总时长等信息&#xff0c;不巧的时&#xff0c;获取FLV时总失败&#xff0c;下面来具体分析下FLV和MP4获取总时长的原因和区别&#xff1a; 播放器有个获取MediaInfo的接口&#xff0c;功能如下&am…

9.5 Go语言入门(条件语句和循环语句)

Go语言入门&#xff08;条件语句和循环语句&#xff09; 目录四、条件语句和循环语句1. 条件语句1.1 if 语句1.2 if-else 语句1.3 if-else if-else 语句1.4 带初始化语句的 if1.5 switch 语句1.6 带条件的 switch1.7 多个条件的 case 2. 循环语句2.1 基本 for 循环2.2 省略初始…