Spark常用RDD算子:transformation转换算子以及action触发算子

news2024/11/24 2:14:39

文章目录

  • 1. 算子(方法)介绍
  • 2. 常用transformation算子
    • 2.1 map
  • 2.2 flatMap
    • 2.3 filter
    • 2.4 distinct
    • 2.6 groupBy
    • 2.7 sortBy()
    • 2.8 k-v数据[(k,v),(k1,v1)]
  • 3. 常用action算子

1. 算子(方法)介绍

rdd中封装了各种算子方便进行计算,主要分为两类:

  • transformation 转换算子
    • 对RDD数据进行转化得到新的RDD,定义了一个线程任务。
    • 常见:map、filter、flatMap、reduceByKey、groupByKey、sortByKey
  • action 触发算子
    • 触发计算任务,让计算任务进行执行,得到结果。
    • 触发线程执行的。
    • 常见:foreach、first、count、reduce、saveAsTextFile、collect、take

RDD的转换算子大部分都是从RDD中读取元素数据(RDD中每条数据),具体计算需要开发人员编写函数传递到RDD算子中。
RDD的执行算子则大部分是用来获取数据,collect方法就是触发算子。

注意

  • 转换算子是lazy模式,一般不会触发job和task的运行,返回值一定是RDD。
  • 执行算子,会触发job和task的运行,返回值一定不是RDD。

2. 常用transformation算子

2.1 map

  • RDD.map(lambda 参数:参数计算)
  • 参数接受每个元素数据
# Map算子使用
# map算子主要使用长场景,一个转化rdd中每个元素的数据类型,拼接rdd中的元素数据,对rdd中的元素进行需求处理
# 需求,处理hdfs中的学生数据,单独获取每个学生的信息
from pyspark import SparkContext

sc = SparkContext()

# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))
# 3-从rdd2中获取姓名数据
rdd3 = rdd2.map(lambda x:x[1])

# lambda 函数能进行简单的数据计算,如果遇到复杂数据计算时,就需要使用自定义函数
# 获取年龄数据,并且转化年龄数据为int类型,将年龄和性别合并一起保存成元组  (男,20) (女,21)
def func(x):
    # 1-先切割数据
    data_split = x.split(',')
    # 2-转化数据类型
    age = int(data_split[2])
    # 3-拼接性别和年龄
    data_tuple = (data_split[3],age)
    return data_tuple
# 将函数的名字传递到map中,不要加括号
rdd4 = rdd.map(func)


# 触发执行算子,查看读取的数据
res = rdd.collect()
print(res)

res2 = rdd2.collect()
print(res2)

res3 = rdd3.collect()
print(res3)


res4 = rdd4.collect()
print(res4)

运行结果:
在这里插入图片描述

2.2 flatMap

  • 处理的是二维嵌套列表数据[[1,‘张三’],[2,‘李四’],[3,‘王五’]] --> [1, ‘张三’, 2, ‘李四’, 3, ‘王五’]
  • rdd.flatMap(lambda 参数:[参数计算])
#flatmap算子使用
# 主要使用场景是对二维嵌套的数据降维操作 [[1,'张三'],[2,'李四'],[3,'王五']] --> [1, '张三', 2, '李四', 3, '王五']
from pyspark import SparkContext
sc = SparkContext()

#生成rdd
rdd = sc.parallelize([[1,'张三'],[2,'李四'],[3,'王五']])
#使用flatmap算子进行转化
rdd2 = rdd.flatMap(lambda x: x)

#查看数据
res = rdd2.collect()
print(res)

运行结果:
在这里插入图片描述

2.3 filter

  • rdd.filter(lambda 参数:参数条件过滤)
  • 条件过滤的书写和Python中if判断的一样
# RDD数据过滤
# 需求:过滤年龄大于20岁的信息
from pyspark import SparkContext

sc = SparkContext()

# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))
#使用fliter方法进行数据过滤
rdd3 = rdd2.filter(lambda x: int(x[2]) > 20)
rdd4 = rdd2.filter(lambda x:x[3]=='男')


# 查看数据
res = rdd2.collect()
print(res)

res3 = rdd3.collect()
print(res3)

res4 = rdd4.collect()
print(res4)

运行结果:
在这里插入图片描述

2.4 distinct

  • 不需要lambda rdd.distinct
# distinct  去重算子
# rdd中有重复数据时,可以进行去重
from pyspark import SparkContext

sc = SparkContext()

# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x: x.split(','))
# 3-从rdd2中获取性别数据
rdd3 = rdd2.map(lambda x: x[3])

#对rdd3中的数据去重
rdd4 = rdd3.distinct()

#查看数据
res = rdd3.collect()
print(res)

res1 = rdd4.collect()
print(res1)

运行结果:
在这里插入图片描述

2.6 groupBy

  • rdd.groupBy(lambda 参数:根据参数编写分组条件)
  • mapValues(list)
# groupBy分组
# 按照不同性别进行分组
# 原理: 就是对需要分组的数据进行hash取余数 ,余数相同会放入同一组
from pyspark import SparkContext

sc = SparkContext()

# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x: x.split(','))
# 3- 对性别进行分组
rdd3 = rdd2.groupBy(lambda x: hash(x[3]) % 2)
#查看分组的数据内容 mapValues 取出分组后的数据值,对数据值转为列表即可
rdd4 = rdd3.mapValues(lambda x:list(x))

# 查看数据内容
res = rdd2.collect()
print(res)


res3 = rdd3.collect()
print(res3)

res4 = rdd4.collect()
print(res4)

运行结果:
在这里插入图片描述

2.7 sortBy()

  • rdd.sortBy(lambda x:x,ascending=False)
#RDD的数据排序
from pyspark import SparkContext
sc = SparkContext()

# 生成rdd数据
# 非k,v数据
rdd = sc.parallelize([4,7,3,2,8])
#在spark中使用元组表示k,v数据
rdd2 = sc.parallelize([('张三',90),('李四',70),('王五',99)])

# 数据排序
rdd3 = rdd.sortBy(lambda x: x)
rdd4 = rdd.sortBy(lambda x: x,ascending=False)

#k,V数据排序
rdd5 = rdd2.sortBy(lambda x: x[1],ascending=False)
rdd6 = rdd2.sortBy(lambda x: x[1])

#查看结果
res = rdd3.collect()
print(res)

res2 = rdd4.collect()
print(res2)

res3 = rdd5.collect()
print(res3)

res4 = rdd6.collect()
print(res4)

运行结果:
在这里插入图片描述

2.8 k-v数据[(k,v),(k1,v1)]

  • groupByKey()
    • rdd.groupByKey()
  • reduceByKey()
    • rdd.reduceByKey(lambda 参数1,参数2:对两个参数计算)
  • sortByKey()
    • rdd.sortByKey()
#k,v结构数据处理
from pyspark import SparkContext
sc = SparkContext()
#k,v分组
# 1. 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2. 使用转化算子进行数据处理
rdd2 = rdd.map(lambda x: x.split(','))
#将数据转为k,v结构,然后进行分组,把分组的字段作为key值
rdd3 = rdd2.map(lambda x: (x[3], x))
# 使用groupBykey方法,按key进行分组
rdd4 = rdd3.groupByKey().mapValues(lambda x: list(x))

#k,v数据计算
#统计不同性别的年龄总和 (求和  平均数  最大值  最小值  数量)
#将需要计算的数据转为k,v结构  分组的字段是key值  聚合数据是value值
rdd5 = rdd2.map(lambda x: (x[3],int(x[2])))
# 使用reduceBykey方法进行聚合计算  会将相同key值的数据先合并,然后在聚合计算
# 聚合计算的算子,lambda x,y 需要结合两个参数
rdd6 = rdd5.reduceByKey(lambda x, y: x+y)

rdd7 = rdd5.groupByKey().mapValues(lambda x: sum(list(x))/len(list(x)))
rdd8 = rdd5.groupByKey().mapValues(lambda x: max(list(x)))


res = rdd2.collect()
print(res)

res3 = rdd3.collect()
print(res3)

res4 = rdd4.collect()
print(res4)

res5 = rdd5.collect()
print(res5)

res6 = rdd6.collect()
print(res6)

res7 = rdd7.collect()
print(res7)

res8 = rdd8.collect()
print(res8)

运行结果:

3. 常用action算子

  • collecct()取出RDD中所有值
    • rdd.collect()
  • reduce() 非k-v类型数据累加[1,2,3,4,6]
    • rdd.reduce(lambda 参数1,参数2:两个参数计算)
  • count() 统计RDD元素个数
    • rdd.count()
  • take() 取出指定数量值
    • rdd.take(数量)
# action算子使用
# 触发转化算子执行
from pyspark import SparkContext

sc = SparkContext()

# 生成rdd
rdd = sc.parallelize([1,2,3,4])

rdd_kv = sc.parallelize([('a',2),('b',3)])

# 进行转化处理

# 使用action
# 获取rdd中所有元素数据,转为列表展示
res = rdd.collect()
print(res)
# 指定取出的数据数量
res2 = rdd.take(3)
print(res2)
# 对非kv数据计算
# 求和
res3 = rdd.reduce(lambda x,y:x+y)
print(res3)
# 求数量
res4 = rdd.count()
print(res4)
# 求最大值
res5 = rdd.max()
print(res5)

res6 = rdd.mean()
print(res6)

# 将kv数据转为字典输出
res7 = rdd_kv.collectAsMap()
print(res7)

# 将rdd结果保存到hdfs 指定目录路径,指定的目录不能存在
rdd_kv.saveAsTextFile('hdfs://node1:8020/data/result')

运行结果:
在这里插入图片描述在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2203387.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

传感器模块编程实践(四)舵机+MPU6050陀螺仪模块融合云台模型

文章目录 一.概要二.实验模型原理1.硬件连接原理框图2.控制原理 三.实验模型控制流程四.云台模型程序五.实验效果视频六.小结 一.概要 云台主要用来固定摄像头。准确地说,云台是一种可以多角度调节的支撑设备,类似于人的脖子可以支撑着脑袋,…

java随机生成数学算式

生成随机数学算式可谓是计算机领域的一个经典的问题, 本文使用JFrame,JButton,JTextField等java图形化工具,生成一个可以随机切换题目,可以实现计时功能的一个图形化界面 源代码展示 randomMath类 package login;import javax.swing.*; import java.awt.*; import java.awt.e…

uniapp 锁屏显示插件 Ba-LockShow(可让vue直接具备锁屏显示能力)

简介 Ba-LockShow 是一款可以直接使uniapp的vue界面在锁屏页展示的插件。 支持使vue直接具备锁屏显示能力支持设置锁屏显示和不显示支持唤醒屏幕 截图展示(仅参考) 支持定制、本地包、源码等,有建议和需要,请点击文章结尾“Unia…

【C++】常用数据结构纲要(简易版)

非静无以成学。——诸葛亮 数据结构概括 1、什么是数据结构呢?2、讲述过的结构2、1、前言2、2、树->二叉树->两种平衡二叉树2、3、单链表->双链表->带有哨兵位的链表 3、B树3、1、概念及图示3、2、B树数据处理3、2、1、查找3、2、2、插入 4、哈希表4、1…

不是 PHP 不行了,而是 MySQL 数据库扛不住啊

大多数的业务场景下 PHP 还没有达到性能瓶颈,然而 MySQL 数据库就先行驾崩了。但我们总是不分青红皂白,一股脑的把原因归结于是 PHP 语言不行了,每当遇到这种情形我就会感叹到 PHP 的命真苦啊。PHP 作为一门优秀的开源编程语言,在…

基于WebSocket实现简易即时通讯功能

代码实现 pom.xml <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency><groupId>org.springframework.boot</groupId><artifa…

C++(vector的实现)

1. vector介绍 vector本质其实就是和我们数据结构中的顺序表类似&#xff0c;都是对一个数组进行增删查改等操作&#xff0c;但是这里vector的实现和顺序表的实现有所不同&#xff0c;vector的底层源码的实现是通过三个迭代器实现的&#xff0c;一个是指向开始的位置_start&…

优化小企业财务,使用记账软件的好处解析

财务记账软件优化企业财务管理&#xff0c;支持开票、在线支付、费用分类、银行对账、工时项目管理、库存管理及税务合规&#xff0c;自动生成报表助企业决策&#xff0c;克服传统电子表格局限&#xff0c;支持企业持续健康发展。 使用财务记账软件的好处和优势 1、开票和计费…

10.pwn 中级ROP

ret2csu 什么是ret2csu&#xff1f; 这个其实就是在程序中一般都会有一段万能的控制参数的gadgets&#xff0c;里面可以控制rbx,rbp,r12,r13,r14,r15以及rdx,rsi,edi的值&#xff0c;并且还可以call我们指定的地址。然后劫持程序执行流的时候&#xff0c;劫持到这个__libc_cs…

seL4 Faults(八)

Faults 学习什么是线程错误理解线程错误和处理器硬件错误是不同的理解什么是错误处理器理解内核对于一个有错误的线程做了什么了解如何设置内核将在其上传递故障消息的端点&#xff08;master与 MCS&#xff09;。在错误故障后学习如何恢复线程。 Background: What is a faul…

(21)Nakagami-m分布及其参数的意义

文章目录 前言一、Nakagami衰落的定义二、Nakagami衰落的形状参数m三、Nakagami衰落的尺度参数ω四、Nakagami随机变量的生成 前言 在无线信道中&#xff0c;由于电波的多径传播效应&#xff0c;接收到的信号强度会因为多条传播路径的相长或相消而发生起伏变化。这种现象被称为…

mysql迁移到达梦数据库报错:参数不兼容

1: 这个错误可能是某个字段‘定义超长’&#xff0c;尝试&#xff1a; 2: 如果还报错&#xff0c;指定和mysql同版本驱动

树状数组——学习心得

可以解决大部分区间上面的修改以及查询的问题&#xff0c;例如1.单点修改&#xff0c;单点查询&#xff0c;2.区间修改&#xff0c;单点查询&#xff0c;3.区间查询&#xff0c;区间修改&#xff0c;换言之&#xff0c;线段树能解决的问题&#xff0c;树状数组大部分也可以&…

全栈开发笔记

1.后端没问题 前端不显示返回数据&#xff1f; 返回数据被&#xff0c;axios拦截器拦截了 2.路径跳转显示空白&#xff1f; 没有配置router 3.后端部署到服务器上 无法通过外网访问接口&#xff1f; 检查服务器防火墙设置 即使服务监听正确&#xff0c;服务器本身的防火墙也可能…

【工作流引擎集成】springboot+Vue+activiti+mysql带工作流集成系统,直接用于业务开发,流程设计,工作流审批,会签

前言 activiti工作流引擎项目&#xff0c;企业erp、oa、hr、crm等企事业办公系统轻松落地&#xff0c;一套完整并且实际运用在多套项目中的案例&#xff0c;满足日常业务流程审批需求。 一、项目形式 springbootvueactiviti集成了activiti在线编辑器&#xff0c;流行的前后端…

前端_002_CSS扫盲

文章目录 概念选择器常用属性背景边框高度和宽度颜色文本字体链接表格里对齐显示相关溢出&#xff0c;滚动条属性 伪类和伪元素 概念 1.书写格式&#xff1a; 选择器{ 属性名:属性值 ; 属性名:属性值 ; } 2.文件后缀.css 选择器 元素选择器 [tag] id选择器 #[id_name] c…

西门子S7-SMART运动控制向导

打开“运动控制”向导&#xff0c;“工具”->“向导”->“运动控制” 图 1.打开“运动控制”向导 选择需要配置的轴 图 2.选择需要配置的轴 为所选择的轴命名 图 3.为所选择的轴命名 输入系统的测量系统&#xff08;“工程量”或者“脉冲数/转”&#xff…

开机启动项在哪里关闭?五个全面指南,教你关闭开机启动项!(新)

您是否发现您的电脑运行性能正在受一些无关紧要的应用程序所影响呢&#xff1f;也许您没有意识到&#xff0c;每当您登录电脑时&#xff0c;许多程序会在不知情的情况下自动启动。这些自动启动的程序不仅会拖慢系统的运行速度&#xff0c;还会占用大量的内存和cpu资源。为了改善…

QT:绘制事件和定时器

1.绘制时针 xx.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QTimer> #include<QPainter> #include <QTime>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpubl…

YOLOv11进行图像与视频的目标检测

一、AI应用系统实战项目 项目名称项目名称1.人脸识别与管理系统2.车牌识别与管理系统3.手势识别系统4.人脸面部活体检测系统5.YOLOv8自动标注6.人脸表情识别系统7.行人跌倒检测系统8.PCB板缺陷检测系统9.安全帽检测系统10.生活垃圾分类检测11.火焰烟雾检测系统12.路面坑洞检测…