Spark RDD编程模型及算子介绍(一)

news2024/11/16 3:47:55

文章目录

    • RDD编程模型介绍
    • RDD的两种算子及延迟计算
    • 常见的Transformation算子

RDD编程模型介绍

  • RDD是Spark 对于分布式数据集的抽象,它用于囊括所有内存中和磁盘中的分布式数据实体。每一个RDD都代表着一种分布式数据形态。
  • 在RDD的编程模型中,一共有两种算子,Transformations类算子和Actions类算子。开发者需要使用Transformations类算子,定义并描述数据形态的转换过程,然后调用Actions类算子,将计算结果收集起来、或是物化到磁盘。

RDD的两种算子及延迟计算

  • Transformation算子:基于不同RDD数据形态之间的转换,构建计算流图(DAG,Directed Acyclic Graph)。

  • Actions类算子:以回溯的方式去触发执行这个计算流图。

  • 开发者调用的各类Transformations算子,并不立即执行计算,当且仅当开发者调用Actions算子时,之前调用的转换算子才会付诸执行。在业内,这样的计算模式有个专门的术语,叫作“延迟计算”(Lazy Evaluation),也叫作懒加载。

在这里插入图片描述

常见的Transformation算子

  • map算子:将RDD的数据一条条处理(处理的逻辑基于map算子中的接收的处理函数),返回新的RDD。map可以自定义函数,也可以使用lambda匿名函数,部分代码如下:
rdd1 = sc.parallelize([1,2,3,4,5,6],3)
  def func(a):
      return 2 * a + 1
print(rdd1.map(func).collect())
rdd2 = sc.parallelize([1,2,3,4,5,6],3)
print(rdd2.map(lambda x :2*x +1).collect())
#结果输出
[3, 5, 7, 9, 11, 13]
[3, 5, 7, 9, 11, 13]
  • flatMap算子:对RDD执行map操作后解除嵌套的操作,部分代码如下:
rdd1 = sc.parallelize(['a b c','d e f','g h i'],3)
rdd2 = rdd1.flatMap(lambda line: line.split(" "))
rdd_rs = rdd2.collect()
print(f'结果是:{rdd_rs}')
# 结果输出
结果是:['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']
  • reduceByKey算子:针对KV型RDD,自动按照Key分组,然后根据聚合逻辑,完成组内数据的聚合操作,部分代码如下:
rdd1 = sc.parallelize([('a',1),('a',2),('b',1),('b',3)],3)
rdd2 = rdd1.reduceByKey(lambda a,b : a+b)
rdd_rs = rdd2.collect()
print(f'结果是:{rdd_rs}')
# 结果输出
结果是:[('b', 4), ('a', 3)]
  • mapValues算子:对KV型RDD中values进行map操作,部分代码如下:
rdd1 = sc.parallelize([('a',1),('a',2),('b',1),('b',3)],3)
rdd2 = rdd1.mapValues(lambda x: x * 10)
rdd_rs = rdd2.collect()
print(f'结果是:{rdd_rs}')
# 输出结果
结果是:[('a', 10), ('a', 20), ('b', 10), ('b', 30)]
  • groupBy算子:将RDD的数据进行分组,部分代码如下
rdd1 = sc.parallelize([('a',1),('b',2),('a',3),('b',4)],3)
rdd2 = rdd1.groupBy(lambda x:x[0])
#需要list转换
#rdd3 = rdd2.map(lambda t:(t[0],list(t[1])))
rdd3 = rdd2.mapValues(lambda t : list(t))
rdd_rs = rdd3.collect()
print(f'结果是:{rdd_rs}')

rdd4 = sc.parallelize([1,2,3,4,5,6],3)
rdd5 = rdd4.groupBy(lambda x: 'odd' if (x%2==1) else 'even')
rdd_rs1 = rdd5.map(lambda x: (x[0],list(x[1]))).collect()

print(f'结果是:{rdd_rs1}')

# 结果是:[('b', [('b', 2), ('b', 4)]), ('a', [('a', 1), ('a', 3)])]
# 结果是:[('even', [2, 4, 6]), ('odd', [1, 3, 5])]
  • filter算子:RDD过滤掉想要的数据进行保留,部分代码如下:
rdd1 = sc.parallelize([1,2,3,4,5,6],3)
#使用filter过滤出奇数
rdd2 = rdd1.filter(lambda x: x%2 == 1)
rdd_rs = rdd2.collect()
print(f'结果是:{rdd_rs}')

# 结果是:[1, 3, 5]
  • distinct算子:对RDD数据进行去重,部分代码如下
rdd1 = sc.parallelize([1,1,2,2,3,3,3],3).distinct().collect()
rdd2 = sc.parallelize([('a',1), ('a',1), ('a',3)]).distinct().collect()
print(rdd1)
print(rdd2)

# [3, 1, 2]
# [('a', 1), ('a', 3)]
  • union算子:将2个RDD合并成一个RDD并返回,部分代码如下
rdd1 = sc.parallelize([1,1,2,3,],3)
rdd2 = sc.parallelize(['a','a','b','c'],3)
rdd_union1 = rdd2.union(rdd1)
rdd_union1_rs = rdd_union1.collect()

print(f'结果是:{rdd_union1_rs}')

# 结果是:['a', 'a', 'b', 'c', 1, 1, 2, 3]
  • join算子:对两个RDD执行join操作和SQL原理一样,部分代码如下:
rdd1 = sc.parallelize([('1001','Tom'),('1002','Jerry'),('1003','Spike'),('1004','Butch')])
rdd2 = sc.parallelize([('1001','技术部'),('1002','销售部'),('1005','行政部')])
# 内连接
rdd_join = rdd1.join(rdd2)
rdd_join_rs = rdd_join.collect()
print(f'结果是:{rdd_join_rs}')

# 左外连接
rdd_left = rdd1.leftOuterJoin(rdd2)
rdd_left_rs = rdd_left.collect()
print(f'结果是:{rdd_left_rs}')

# 右外连接
rdd_right1 = rdd1.rightOuterJoin(rdd2)
rdd_right2 = rdd2.leftOuterJoin(rdd1)
rdd_right1_rs = rdd_right1.collect()
rdd_right2_rs = rdd_right2.collect()
print(f'结果是:{rdd_right1_rs}')
print(f'结果是:{rdd_right2_rs}')

# 结果是:[('1001', ('Tom', '技术部')), ('1002', ('Jerry', '销售部'))]
# 结果是:[('1001', ('Tom', '技术部')), ('1004', ('Butch', None)), ('1002', ('Jerry', '销售部')), ('1003', ('Spike', None))]
# 结果是:[('1001', ('Tom', '技术部')), ('1005', (None, '行政部')), ('1002', ('Jerry', '销售部'))]
# 结果是:[('1001', ('技术部', 'Tom')), ('1005', ('行政部', None)), ('1002', ('销售部', 'Jerry'))]
  • intersection算子:求两个RDD的交集,返回一个新的RDD,部分代码如下:
rdd1 = sc.parallelize([('a',1),('b',2),('c',3)])
rdd2 = sc.parallelize([('a',1),('d',4)])
rdd_intersect = rdd1.intersection(rdd2)
rdd_intersect_rs = rdd_intersect.collect()
print(f'结果是:{rdd_intersect_rs}')

# 结果是:[('a', 1)]
  • glom算子:将RDD数据加上嵌套,嵌套是按照分区来进行,部分代码如下:
rdd1 = sc.parallelize([1,2,3,4,5,6,7],2).glom()
rdd_rs = rdd1.collect()
print(f'结果是:{rdd_rs}')

# 结果是:[[1, 2, 3], [4, 5, 6, 7]]
  • groupByKey算子:对KV型RDD,自动按照Key分组,部分代码如下:
rdd1 = sc.parallelize([('a',1),('b',2),('a',3),('b',4)],3)
rdd2 = rdd1.groupByKey()
#rdd3 = rdd2.map(lambda t:(t[0],list(t[1])))
rdd3 = rdd2.mapValues(lambda t : list(t))
rdd_rs = rdd3.collect()
print(f'结果是:{rdd_rs}')

# 结果是:[('b', [2, 4]), ('a', [1, 3])]
  • sortBy算子:对RDD数据进行排序,基于指定的排序为依据,部分代码如下:
rdd1 = sc.parallelize([('a',1),('b',2),('c',10),('d',6),('e',5)],3)
rdd2 = rdd1.sortBy(lambda x:x[1], ascending=True, numPartitions=3)
rdd3 = rdd1.sortBy(lambda x:x[1],ascending=False, numPartitions=3)
rdd_rs1 = rdd2.collect()
rdd_rs2 = rdd3.collect()

print(f'结果是:{rdd_rs1}')
print(f'结果是:{rdd_rs2}')
"""
# 参数1:告知Spark按照数据的哪个列进行排序
# 参数2:True表示升序、False表示降序
# 参数3:指定分区数

# 一般来说设置numPartitions值之后排序的最终结果只能保证在分区内是有序的,不能保证分区间是有序的。
# 将numPartitions设置为1,可以保证整体有序。

"""
# 结果是:[('a', 1), ('b', 2), ('e', 5), ('d', 6), ('c', 10)]
# 结果是:[('c', 10), ('d', 6), ('e', 5), ('b', 2), ('a', 1)]
  • sortByKey算子:针对KV型RDD,按照Key进行排序,部分代码如下:
rdd1 = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1),
                      ('y', 1), ('u', 1), ('i', 1), ('o', 1), ('p', 1),
                      ('m', 1), ('n', 1), ('j', 1), ('k', 1), ('l', 1)], 3)
rdd2 = rdd1.sortByKey(ascending=True, numPartitions=3, keyfunc=lambda key:str(key).lower())
rdd_rs = rdd2.collect()
print(f'结果是:{rdd_rs}')
# 只是改变排序过程中的大小写,但是对结果并未有任何影响
# 结果是:[('a', 1), ('b', 1), ('C', 1), ('D', 1), ('E', 1), ('f', 1), ('g', 1), ('i', 1), ('j', 1), ('k', 1), ('l', 1), ('m', 1), ('n', 1), ('o', 1), ('p', 1), ('u', 1), ('y', 1)]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/12309.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux-服务管理

服务介绍 服务本质就是进程,但是是运行在后台的,通常都会监听某个端口,等待其他程序的ing求,比如mysqld,sshd,防火墙灯,因为又称为守护进程 如何管理服务 CentOS7.0前使用service命令 servi…

SpringBoot-配置

目录 起步依赖原理分析 配置文件分类 YAML YAML:基本语法 YAML:数据格式 YAML:参数引用 读取配置内容 profile Profile-小结 内部配置加载顺序 外部配置加载顺序 起步依赖原理分析 在spring-boot-starter-parent中定义了各种技术的…

[附源码]Python计算机毕业设计GuiTar网站设计

项目运行 环境配置: Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术: django python Vue 等等组成,B/S模式 pychram管理等等。 环境需要 1.运行环境:最好是python3.7.7,…

【学习笔记】《模式识别》4:贝叶斯判别准则

贝叶斯判别准则 文章目录贝叶斯判别准则一、研究对象及相关概率1. 两类研究对象2.概率3. 条件概率4. 模式识别中的三个概率5. 两对条件概率的区别二、贝叶斯决策1.最小错误率贝叶斯决策2. 最小风险贝叶斯决策3. (0-1)损失最小风险贝叶斯决策4.正态分布模式的贝叶斯决策三、贝叶…

RFID在钢筋仓库管理中的应用

RFID在钢筋仓库管理中的应用 应用背景 随着经济的迅速发展,带动了钢材业的迅速发展,钢筋的使用量也在改革开放后有了近370多倍的增长,如此大量的钢筋在库存管理,盘点,防盗,各种型号发货、防窜货上等等一系…

图片链接或pdf链接通过浏览器打开时,有时可以直接预览,有时却是下载,为什么?

在前端开发中,有时候需要对一些文件链接进行特殊处理,比如对于一些图片链接或者PDF链接,有时我们需要通过浏览器打开进行预览,有时又不希望通过浏览器进行打开,而是希望能够直接下载到本地。但现实效果却往往跟我们相反…

硅麦驱动开发及调试(pdm>>I2S>>pcm)

pdm 协议 PDM接口只有两根信号线: PDM_CLK 时钟信号。 PDM_DATA 数据信号。 I2S协议 数据发送规格 I2S在BCLK的下降沿发送数据(发送),在上升沿进行数据采样(接收)。每次是先发送最高位,最后…

Hadoop概述

Hadoop概述 Hadoop介绍 狭义上Hadoop值的是Apache的一款开源软件。 用java语言实现开源软件框架 允许使用简单的编程模型跨计算机集群对大型数据集进行分布式处理 Hadoop核心组件 Hadoop HDFS(分布式文件存储管理系统):解决海量数据存储 Hadoop YARN(集群资源管理…

3-2、python内置数据类型(列表和元组)

文章目录序列列表列表的创建列表的基本特性连接操作符和重复操作符成员操作符(in和not in)索引切片for循环列表的常用方法增加修改(通过索引和切片重新赋值)查看删除其他操作元组(和列表相比,不能增删改元素…

30分钟带你精通Git使用

一、 版本控制工具 1.1. 什么是版本控制系统? 版本控制系统(Version Control System):是一种记录一个或若干文件内容变化,以便将来查阅特定版本修订情况的系统。版本控制系统不仅可以应用于软件源代码的文本文件,而且…

[附源码]SSM计算机毕业设计班级风采网站JAVA

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

TIA博途中累计流量的两种计算方法示例

TIA博途中累计流量的两种计算方法示例 如下图所示,首先,我们要了解累计流量的含义: 即t1至t2时刻,对瞬时流量求定积分,由上图可知,t1至t2的定积分,即蓝色部分的面积,那么直接求这个面积是有难度的,我们只能用近似的方法来求取, 如下图所示,把该部分面积分割成一个个…

最简单的git图解(git stash)

大家平时开发过程中肯定遇到过这样的情况:代码写了一半,但是需要紧急修改一个bug,还是在当前项目中修改,这时怎么办呢?把写了一半的代码进行提交?可能编译还通不过,或是启动不了,要是…

Java项目:SSH学生学籍管理系统及教务管理系统

作者主页:源码空间站2022 简介:Java领域优质创作者、Java项目、学习资料、技术互助 文末获取源码 项目介绍 本系统包含管理员、教师、学生三种角色; 管理员角色包含以下功能: 管理员登录,学科管理,班级管理,教师管理,学籍信息管理,课表管理…

外贸软件助力国际贸易企业业财共享数字化转型升级

外贸企业数字化转型新机遇丨汇信外贸软件助力业财一体共享升级 随着国际化的进程不断加速,国际贸易市场的发展,使得外贸企业的业务范围不断扩大,海量的资源信息在世界各地不断产生。为了应对国际贸易信息传递的及时性,关于财务信…

[MySQL]复杂查询(进阶)

专栏简介 :MySql数据库从入门到进阶. 题目来源:leetcode,牛客,剑指offer. 创作目标:记录学习MySql学习历程 希望在提升自己的同时,帮助他人,,与大家一起共同进步,互相成长. 学历代表过去,能力代表现在,学习能力代表未来! 目录 1.新增 2. 聚合查询 2.1 聚合函数 3. 分组查询…

Web UI 自动化测试:如何使用隐私模式进行测试

来啦老铁! 这两天有个任务需要在桌面端 UI 自动化中使用隐私模式进行测试,之前没有用过,且在调研的时候还是小小花了点时间各种查资料的,因此做一下记录; 学习路径 1、Chrome 浏览器配置; 2、Edge 浏览…

电脑换cpu要重装系统吗

​cpu是一台电脑的控制以及运算核心,有十分重要的作用,不少小伙伴会遇到更换cpu的问题,所以小伙伴会提前了解换cpu要重装系统吗或者换cpu后bios如何设置的这类问题,接下来小编就为大家带来了详细的介绍,感兴趣的用户可…

(七)Bean的实例化方式

文章目录环境Bean的实例化方式通过构造方法实例化通过简单工厂模式实例化通过工厂方法模式实例化通过FactoryBean接口实例化BeanFactory和FactoryBean的区别BeanFactoryFactoryBean工厂Bean的使用:注入自定义Date上一篇:(六)Sprin…

【免杀前置课——Windows编程】十二、线程同步——一文讲懂什么是线程同步、原子操作函数、临界区、互斥体(激发态与非激发态区别)

线程同步线程同步多线程运行同一操作对象问题解决方案A:原子操作函数解决方案B:临界区解决方案C:互斥体激发态与非激发态互斥体优点:线程同步 多线程运行同一操作对象问题 #include<iostream> #include<Windows.h>LONG g_count 0; DWORD WINAPI myThreadProc1(_…