Spark 常用算子02

news2025/1/10 12:02:18

常用Action算子

1、countByKey算子

功能:统计key出现的次数(一般适用于KV型的RDD)
用法:

result = rdd1.countByKey()
print(result)

代码示例:

# coding:utf8

from pyspark import SparkConf, SparkContext

if __name__ == '__main__':
    # 通过SparkConf对象构建SparkContext
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd = sc.textFile("../data/words.txt")
    rdd2 = rdd.flatMap(lambda x : x.split(" ")).map(lambda x : (x,1))

    result = rdd2.countByKey()

    print(result)
    print(type(result))

测试数据:
在这里插入图片描述

结果打印:
在这里插入图片描述
注意: action算子返回的结果的字典类型,和之前的转换算子时有区别的。
注意: countByKey只是统计key出现的次数,但是只是使用于KV型的RDD,所以上面使用元组,value值为1。但是不能误以为时value值相加。
例如下面示例:显然说明了只是统计key的出现次数
在这里插入图片描述

2、collect算子

功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
用法:

rdd.collect()

这个算子,是将RDD各个分区数据都拉取到Driver中,注意的是:RDD是分布式对象,其数据量会很大,所以使用这个算子之前,必须知道,结果数据集不会太大。不然会把Driver内存撑爆。

3、reduce算子

功能:对RDD数据集按照你传入的逻辑进行聚合
语法:

rdd.reduce(func)
# func : (T,T) -> T
# 2个形参,一个返回值,三个参数的类型必须一致

代码示例:

rdd = sc.parallelize(range(1,10))
print(rdd.reduce(lambda a ,b : a + b ))

reduce执行流程分析:
在这里插入图片描述

4、fold算子

功能:和reduce一样,接收传入逻辑进行聚合,但是聚合是带有初始值的
这个初始值聚合,会作用在
       分区内聚合,分区间聚合
比如:[ [1,2,3] , [4,5,6] , [7,8,9] ]
数据分布在3个分区,分区内聚合:
分区1 1,2,3聚合的时候带上10作为初始值得到 16
分区2 4,5,6聚合的时候带上10作为初始值得到 25
分区3 7,8,9聚合的时候带上10作为初始值得到 34

分区间聚合:
3个分区的结果做聚合也会带上初始值10,所以结果是 10+16+25+34 = 85

语法:

rdd.fold(初始值,func)
# func : (T,T) -> T
# 2个形参,一个返回值,三个参数的类型必须一致

代码示例:
在这里插入图片描述

5、first算子

功能:取出RDD的第一个元素,返回值根据存储的数据而定
语法:

rdd.first()
6、take算子

功能:取RDD的前n个元素,组合成list返回,list里面的数据类型根据RDD存储的数据而定
语法:

rdd.take(n)
7、top算子

功能:对RDD的数据集进行降序排序,取前n个
大小的比较依据的是对象的compare方法进行比较,如果没有重写compare方法,对于数字来说,按照大小降序,对于字符串来说,按照ASCII码进行比较
语法:

rdd.top(n) # 表示降序之后取前n个
8、count算子

功能:计算RDD有多少条数据,返回值是一个数字
语法:

rdd.count()
9、takeSample算子

功能:随机抽样RDD的数据
因为spark是用来做大数据计算的,当你想查看数据时,如果是有collect查看数据,有可能会把Driver内存撑爆。所以可以使用takeSample从
语法:

takeSample(参数1 : True or False , 参数2:采样数, 参数3:随机数种子)
参数1 : True表示运行取同一个数据,False表示不允许取同一个数据。和数据内容无关,是否重复表示的是 : 同一个位置的数据
参数2 : 抽样要几个
参数3 : 随机数种子,这个参数传入一个数据即可,随意给

随机数种子,数字可以随便传,如果传同一个数字,那么取出的结果是一致的
一般参数3,我们不传,spark会自动给与随机的种子。
其实这个种子代表步长,在计算机中其实没有严格意义上的随机数,大部分的时候,都是基于随机数种子来去决定随机数的产生。所以,如果随机数种子一样,那么取出来的结果也是一样的。

代码演示:
在这里插入图片描述
在这里插入图片描述
在测试的时候,给定种子,并且你取数量小于数据总量时,你每次运行结果都是一样的。

10、takeOrdered算子

功能:对RDD进行排序取前n个
语法:

rdd.takeOrdered(参数1,参数2)
参数1:要几个数据
参数2:对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子),默认升序,可以指定降序,但是需要自定定义降序逻辑。

代码示例:
在这里插入图片描述

10、foreach算子

功能:对RDD的每一个元素,执行你提供的逻辑操作(和map一个意思),但是这个方法没有返回值
语法:

rdd.foreach(func)
# func : (T) -> None

代码示例:
我们可以看见,foreach是没有返回值的,你想要返回值就用map,不需要就使用foreach
在这里插入图片描述
foreach的执行是由Executor直接输出的!!!
和collect有区别的,collect会将数据从Executor收集到Driver中,再由Driver输出。
所以再某些程度上。效率会高一点(比如打印操作,或者一些插入数据操作)

11、saveAsTextFile算子

功能:将RDD的数据写入文本文件中
支持本地写出,hdfs等文件系统
语法:

rdd.saveAsTextFile("hdfs://node1:8020/output/test")

代码示例:
执行之后你会发现,在你Pycharm中没有这个文件,因为你是将你的工程同步到Linux虚拟机中的,这个本地路径肯定就是你Linux的路径。在PyCharm中只会自动将工程的同步到Linux中,不会讲Linux同步到你自己开发项目中。所以需要你自己手动取拉取同步。
在这里插入图片描述
在这里插入图片描述
注意:!!!
你有几个分区,就有几个文件!!!

注意点:
saveAsTextFile会由Executor直接写文件,和forEach一样,由Executor输出,不经由Driver收集之后输出。
可以在一定程度上的 性能提高。
目前学习中:其余的Action算子都会讲结果发送至Driver。


分区操作算子

1、mapPartitions算子----属于转换算子

首先回忆一下map算子的功能,是将RDD的数据一条条处理。
如果分区1数据为 1,2
分区2数据为3,4
分区3数据为5,6
对这个rdd数据执行乘以10的操作,那么分区1的1,2分别乘以10,放入一个新的分区,会执行两次操作。同理分区3,4。那么总共会经历6次IO传输。

mapPartitions
功能:还是之前的数据,三个分区,6个数据,执行乘以10的操作。分区1的数据会整体进行处理,将1和2乘以10,放入新的分区中,只会有一次IO的处理。同理,分区2,3。
从结果看,map和mapPartitions效果是一样的,但是它两中间过程是不一样的。

语法:

rdd.mapPartitions( func )
# func : Iterable[T] -> Iterable[U]
传入的参数是一个迭代器对象,返回参数也是一个迭代器对象,数据类型没有要求。

代码示例:
在这里插入图片描述

从以上代码看,map和mapPartitions这两个算子,在CPU层面没有区别,但是在网络IO层面看,mapPartitions效率更高。

2、foreachPartition算子----属于Action算子

功能:和普通foreach一致,但是,foreachPartition是一次处理一个分区的数据,foreach是一条一条处理数据。
两者都属于Action算子,没有返回值。
代码示例:
在这里插入图片描述
总结:foreachPartition 和 mapPartitions 就是对foreach,map的加强版。前者一个分区只走一次IO,后者会走多次IO。


3、partitionBy算子---- 转换算子

功能:对RDD进行自定义分区操作
语法:

rdd.partitionBy(参数1,参数2)
# 参数1: 重新分区后有几个分区
# 参数2: 自定义分区规则,函数传入

# 参数2:(k)-> int
一个传入参数进来,类型无所谓,但是返回值一定是int类型,将key传给这个函数,你自己写逻辑,
决定返回一个分区编号

分区编号从0开始,不要超出分区数

代码示例:
当前指定分区数为2,数据按照顺序进行了分区
在这里插入图片描述

4、repartition算子---- 转换算子

功能:对RDD的分区数进行修改,仅仅修改分区数。
语法:

rdd.repartition(N)

注意:对分区的数量进行操作,一定要慎重
一般情况下,我们写spark代码,除了要求全局排序设置1个分区外,多数时候,所有API中关于分区相关的代码我们都不太理会。
因为,如果你修改了分区,会影响并行计算(内存迭代的并行管道数量),分区增加,极大可能导致shuffle

代码示例:
在这里插入图片描述

coalesce算子:
这个算子和repartition功能一样,增减分区的。

# 如果rdd分区数为3
rdd2 = rdd.coalesce(1) # 将分区数设置为1
rdd3 = rdd.coalesce(5,shuffle=True)  # 将分区数设置为5

coalesce在增加分区的时候,shuffle=True,这个参数必须带上。就是为了告诉开发人员,增加分区会导致shuffle,为了避免误增加分区。

repartition算子底层就是coalesce,默认shuffle=True

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/177614.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

破解五角大楼3.0漏洞赏金计划专注于设施控制系统

国防部正在计划其“黑掉五角大楼”计划的第三次迭代,重点是找出维持标志性建筑和地面运行的操作技术中的漏洞。 国防部于 2016 年启动了黑客入侵五角大楼计划,供应商 HackerOne 协调了该部门公共网站上的漏洞赏金计划。 超过 1,400 名黑客参加了第一轮…

绝对空前!!!互联网史上的最大ddos攻击惊艳登场

美国遭遇史上最大黑客攻击,知名网站全部瘫痪。全世界一半的网络被黑客攻陷,大网站无一幸免。就在(10月22日),美国早上我们见证了互联网建立以来的最大ddos攻击,twitter、netflix、paypal、reddit、pinteres…

【MySQL】锁

文章目录基础MyISAM表锁并发插入锁调度策略InnoDB事务并发事务行锁行锁争用情况行锁实现方式恢复和复制对InnoDB锁机制的影响死锁MVCC底层实现和原理悲观锁和乐观锁基础 锁是计算机协调多个进程或线程并发访问某一资源的机制(避免争抢)。在数据库中&…

一文打通java线程

基本概念:程序、进程、线程 程序(program) 是为完成特定任务、用某种语言编写的一组指令的集合。即指一 段静态的代码,静态对象。 进程(process) 是程序的一次执行过程,或是正在运行的一个程序。是一个动态的过程:有它自身的产…

Linux常用命令——sort命令

在线Linux命令查询工具(http://www.lzltool.com/LinuxCommand) sort 将文件进行排序并输出 补充说明 sort命令是在Linux里非常有用,它将文件进行排序,并将排序结果标准输出。sort命令既可以从特定的文件,也可以从stdin中获取输入。 语法…

文本情感分类TextCNN原理+IMDB数据集实战

1.任务背景 情感分类: 发展历程: 2.数据集 本次使用IMDB数据集进行训练。 3.模型结构 3.1 CNN基础 卷积: 单通道卷积:每组卷积核只包含一个。 单通道输入 单输出:设置一组卷积核。 单通道输入 多输出:…

国企避坑:to B服务性质的业务线不要来!又卷又累,互联网和它比简直是小巫见大巫!...

国企好归好,但不是所有的国企都能闭眼入,一位网友友情提示大家:不管是国企还是央企,to b服务性质的业务线不要来,不要来,不要来!又卷又累,苦哈哈,互联网和这个比&#xf…

在CSS世界的权力——权重

在CSS的世界中也存在着权力即CSS权重 1. 概念 CSS权重指的是样式的优先级,有两条或多条样式作用于一个元素,权重高的那条样式对元素起作用,权重相同的,后写的样式会覆盖前面写的样式 2. 以前的BUG 在实际开发中,我…

代码随想录--双指针章节总结

代码随想录–双指针章节总结 1.LeetCode27 移除元素 给你一个数组 nums 和一个值 val,你需要 原地 移除所有数值等于 val 的元素,并返回移除后数组的新长度。 不要使用额外的数组空间,你必须仅使用 O(1) 额外空间并 原地 修改输入数组。 …

C++程序设计——动态内存管理

一、C/C内存分布 1.栈(堆栈) 存储非静态局部变量、函数参数、返回值等等,栈是向下增长。 2.内存映射段 是高效的I/O映射方式,用于转载一个共享的动态内存库。用户可使用系统接口创建共享内存,做进程间通信。 3.堆 用…

WPS的简单JS宏应用

有一阵子没写博客了,各种琐事忙碌;前段时间接触了下WPS的宏功能,抽点时间写个学习笔记吧。 案例背景简单说一下,主任让我统计OA后台在建工程项目的概况,后台数据导出一张表,再问隔壁经营部的同事要了一张中…

java类的初始化2023018

类的初始化: 第一次使用某个类,例如Person类,系统通常会在第一次使用Person类时加载这个类并初始化这个类。在类的准备阶段,系统将会为该类的类变量分配内存空间,并指定默认初始值。当Person类初始化完成后&#xff0c…

机器学习笔记之深度玻尔兹曼机(二)深度玻尔兹曼机的预训练过程

机器学习笔记之深度玻尔兹曼机——深度玻尔兹曼机的预训练过程引言深度信念网络预训练过程的问题深度玻尔兹曼机的预训练过程(2023/1/24)引言 上一节介绍了玻尔兹曼机系列的相关模型,本节将介绍深度玻尔兹曼机的预训练过程。 深度信念网络预训练过程的问题 在玻尔…

Escher 愛雪磁磚設計法則 - 高雄燕巢深水國小科展指導

“Talk is cheap. Show me the code.” ― Linus Torvalds 老子第41章 上德若谷 大白若辱 大方無隅 大器晚成 大音希聲 大象無形 道隱無名 拳打千遍, 身法自然 “There’s no shortage of remarkable ideas, what’s missing is the will to execute them.” – Seth Godin …

GreenPlum AOCO列存如何将数据刷写磁盘

GreenPlum AOCO列存如何将数据刷写磁盘AOCO列存表每个字段一个文件,前面我们介绍了列存表如何加载数据页,本文我们重点介绍AOCO表如何进行刷写。AOCO表进行insert、update、delete会产生脏数据,和heap表的异步脏页刷写不同,AOCO表…

写一个锅炉温控系统用python编写

简单来说就是锅炉水热了之后循环泵自动开启,然后将热水输送走,送到暖气,热水抽走,凉水进入锅炉,温度降低,循环泵关闭,等待下一次水烧热。因为需要取暖的房子距离烧锅炉的地方比较远,所以需要循环泵,如果距离近的话水烧热后利用热水上流冷水回流的原理会自动完成循环。…

前言技术之mybatis-plus

目录 1.什么是mybatis-plus 2.初体验 3.日志 4.主键生成策略 5.更新 6.自动填充 1.什么是mybatis-plus 升级版的mybatis,目的是让mybatis更易于使用, 用官方的话说“为简化而生” 官网: MyBatis-Plus 2.初体验 1.准备数据库脚本 数据…

BI 解决方案:BimlStudio 22.3.0 Crack

全功能开发环境:::: 导入现有解决方案 通过添加 BimlScript 自动化进行更改并重新生成包;使您的解决方案更好、更快。 可视化整个 BI 解决方案 通过我们的可视化设计器在一个位置进行更改,观察您的整个解决方案自行更新…

【ArcGIS微课1000例】0061:ArcGIS打开xyz格式点云数据的方法

本文讲述ArcMap和ArcScene中如何打开xyz格式的点云数据并做可视化的方法。 文章目录 一、xyz格式点云简介二、ArcMap打开xyz点云三、ArcScene打开xyz点云四、注意事项一、xyz格式点云简介 本实验使用的数据是配套数据包中的0061.rar,斯坦福大学的点云数据,格式为X,Y,Z,如下…

【My Electronic Notes系列——晶闸管】

目录 序言: 🏮🏮新年的钟声响,新年的脚步迈,祝新年的钟声,敲响你心中快乐的音符,幸运与平安,如春天的脚步紧紧相随,春节快乐!春华秋实,我永远与你…