Spark RDD编程模型及算子介绍(二)

news2024/11/24 22:28:28

文章目录

    • 常见的Action算子
    • 常见分区操作算子

常见的Action算子

  • countByKey算子:统计Key出现的次数,部分代码如下:
rdd_file = sc.textFile("../Data/input/words.txt")
rdd_map = rdd_file.flatMap(lambda line: line.split(" ")).map(lambda x:(x, 1))
rdd_count = rdd_map.countByKey()
print(rdd_count)
print(type(rdd_count))
# 返回结果为字典
# defaultdict(<class 'int'>, {'Apple': 4, 'Banana': 5, 'Orange': 4, 'Peach': 2})
# <class 'collections.defaultdict'>
  • collect算子:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。RDD是分布式对象,数据量可以很大,所以用这个算子之前需要知道如果数据集结果很大,就会把driver内存撑爆,出现oom。

  • reduce算子:对RDD数据集按照传入的逻辑进行聚合操作,部分代码如下:

rdd = sc.parallelize(range(1,10))
rdd_reduce = rdd.reduce(lambda a,b : a+b)
print(rdd_reduce)
# 45
  • fold算子:和reduce一样接收传入逻辑进行聚合,聚合是带有初始值的。这个初始值既要作用在分区内,也要作用在分区间,部分代码如下:
rdd = sc.parallelize(range(1,10),3)
rdd_reduce = rdd.fold(10,lambda a,b : a+b)
print(rdd_reduce)
# 1 分为[1,2,3] [4,5,6] [7,8,9]
# 2 每个分区+10
# 3 最后汇总再+10 得到结果85
  • first算子:取出RDD第一个元素
sc.parallelize([1,2,3,4]).first()
# 1
  • take算子:取出RDD的前N个元素
sc.parallelize([1,2,3,4],3).take(2)
# [1,2]
  • top算子:对RDD元素进行降序排序,取前N个
sc.parallelize([1,2,3,4],3).top(2)
# [4, 3]
  • count算子:计算RDD有多少条数据,返回值为一个数字
sc.parallelize([1,2,3,4],3).count()
# 4
  • takeSample算子:随机抽样RDD的数据,部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6,7,6,5,4,3,2,1],1)
rdd_takeSample1 = rdd.takeSample(True, 18)
print(rdd_takeSample1)
rdd_takeSample2 = rdd.takeSample(False, 18)
print(rdd_takeSample2)

# [1, 1, 1, 4, 6, 4, 1, 1, 5, 4, 6, 7, 5, 1, 6, 6, 6, 2]
# [2, 4, 2, 5, 5, 6, 3, 7, 4, 1, 6, 3, 1]
# 参数一:bool型,True表示运行取同一个数据,False表示不允许取同一个数据,与数据内容无关,是否重复表示的是同一个位置的数据。
# 参数二:抽样的数目(设置为false则无法超越RDD总数)
# 参数三:随机种子(一般不需要传参)
  • takeOrdered算子:对RDD排序取前N个,部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6,7])
#升序
rdd_takeOrdered1 = rdd.takeOrdered(4)
#降序
rdd_takeOrdered2 = rdd.takeOrdered(4,lambda x : -x)

print(rdd_takeOrdered1)
print(rdd_takeOrdered2)
# [1, 2, 3, 4]
# [7, 6, 5, 4]
  • foreach算子:对RDD的每个元素,执行逻辑操作与map类似,但是这个方法没有返回值。如果想显示值,只能在里面自行打印(无需经过Driver,直接在Executor打印效率更高)。
rdd = sc.parallelize([1,2,3,4,5,6,7],1)
rdd1 = rdd.foreach(lambda x : 2*x +1)
rdd2 = rdd.foreach(lambda x : print(2*x +1))
print(rdd1)
3
5
7
9
11
13
15
None
  • saveAsTextFile算子:保存文件API,分布式执行,不经过Driver,每个分区所在的Executor直接控制数据写出到目标文件系统中,每个分区产生1个结果文件。
#设置为三个分区
rdd_file = sc.textFile("hdfs://node1:8020/Test/WordCount.txt",3)
rdd_words = rdd_file.flatMap(lambda line: line.split(" "))
rdd_map = rdd_words.map(lambda x:(x, 1))
rdd_total = rdd_map.reduceByKey(lambda a,b: a + b)
rdd_rs = rdd_total.saveAsTextFile("hdfs://node1:8020/Test/word_rs1")

结果如下图所示在HDFS WebUI上查看
在这里插入图片描述

常见分区操作算子

  • mapPartitions算子:与map相似,只是一次被传递的是一整个分区的数据,虽然在执行次数上与map相同,但是可以因为减少了网络io的传输次数,效率会大大的提高。部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6],3)
def func(iter):
    rs = list()
    for it  in iter:
        rs.append(2 * it + 1)
    return rs
rdd_part = rdd.mapPartitions(func)
rdd_rs = rdd_part.collect()
print(rdd_rs)

# [3, 5, 7, 9, 11, 13]
  • foreachPartition算子:与普通foreach一样,只是一次被传递的是一整个分区的数据,部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6],3)
# 因为没有返回值所以不需要return
def func(iter):
    rs = list()
    for it  in iter:
        rs.append(2 * it + 1)
    print(rs)

rdd_part = rdd.foreachPartition(func)

# [3, 5]
# [7, 9]
# [11, 13]
  • partitionBy算子:对RDD进行自定义分区操作,部分代码如下
# 参数1 重新分区后有几个分区
# 参数2 自定义分区规则,函数传入(返回编号为int类型,分区编号从0开始,不要超过分区数)
rdd = sc.parallelize([('a',1),('b',2),('c',3),('d',4),('e',5),('f',6)])

def func(key):
    if key == 'a' or key == 'b' : return 0
    if key == 'c' or key == 'd' : return 1
    return 2

rdd_part = rdd.partitionBy(3,func)
rdd_rs = rdd_part.glom().collect()
print(rdd_rs)

# [[('a', 1), ('b', 2)], [('c', 3), ('d', 4)], [('e', 5), ('f', 6)]]
  • repartition算子:对RDD的分区执行重新分区。不建议使用此算子,除非做全局排序的时候,将其设置为1。如果修改尽量减少,不要增加,增加会导致shuffle。不管是增加还是减少都会影响并行计算(内存迭代并行的管道数量),部分代码如下:
rdd = sc.parallelize([1,2,3,4,5,6],3)
rdd_re1 = rdd.getNumPartitions()
print(rdd_re1)
rdd_re2 = rdd.repartition(1).getNumPartitions()
print(rdd_re2)
rdd_re3 = rdd.repartition(5).getNumPartitions()
print(rdd_re3)
# 3
# 1
# 5
  • coalesce算子:对分区数量进行增减,部分代码如下:
# 参数1:分区数
# 参数2:Bool True表示允许shuffle,False表示不允许(默认)。
rdd_re4 = rdd.coalesce(1).getNumPartitions()
print(rdd_re4)
rdd_re5 = rdd.coalesce(5).getNumPartitions()
print(rdd_re5)
rdd_re6 = rdd.coalesce(5,shuffle=True).getNumPartitions()
print(rdd_re6)
# 1
# 3 没有加shuffle=True这里有个API安全机制,分区不会增加
# 5
  • 在源码中我们可以发现reparation算子底层调用的就是coalesce算子,只不过shuffle定义为true。源码如下:
def repartition(self, numPartitions):
    return self.coalesce(numPartitions, shuffle=True)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/14055.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mybatis拦截器源码详解

Mybatis拦截器源码详解Mybatis相关全览一、简介执行与添加顺序拦截器生效入口二、使用例子三、原理加载入口生成代理遍历拦截器匹配&生成代理四、实践例子本文用的是3.5.10版本 源码地址&#xff1a;https://github.com/mybatis/mybatis-3/releases 文档地址&#xff1a;ht…

【云原生监控系列第三篇】Prometheus普罗米修斯进阶——PromQL的指标类型

目录一、PromQL 的指标类型1.1 Counter1.2 Gauge1.3 Histogram1.4 Summary1.5 Histogram 与 Summary 的异同二、Prometheus 的聚合函数三、PromQL 的聚合表达式一、PromQL 的指标类型 PromQL 有四个指标类型&#xff1a; Counter &#xff1a;计数器&#xff0c;用于保存单调递…

医疗产品设计的重要性,你了解多少?

医疗产品设计直接关系患者生活的方式与治疗&#xff0c;一个好的医疗产品设计不但要逐渐细化、便于实际操作&#xff0c;而且还要有利于医师操纵&#xff0c;让患者觉得舒服。这是一种具备重大意义的产品。让我们一起来看看有关医疗产品设计的具体内容! 什么叫医疗产品设计? 医…

【Java技术专题】「原理分析系列」Lambda表达式实现原理分析

Lamdba表达式起源 java8引入了lambda表达式是我们java编程方式变革的一个伟大的创举&#xff0c;由了它我们可以采用闭包的形式区开发任何想开发的方法&#xff0c;让java程序与C或者C更加有了贴合的感觉&#xff0c;虽然编程方式和我们目前的命令式编程方式有很大的不同&#…

仿真必修课:计算电磁学入门(附件参考文献与笔记)

转载自电磁CAEer &#xff0c;作者&#xff1a;刘兵 “作为一个电磁设计师&#xff0c;有必要了解计算电磁学吗&#xff1f;” 答案是肯定的。电磁计算从业人员按照分工大致可以分为两类&#xff1a;一类从事CEM&#xff08;计算电磁学&#xff09;&#xff0c;一类从事CAE&a…

ROS 开源项目 TurtleBot3 安装与使用

功能介绍 启动slam完成地图的搭建与保存启动navigation并读取保存的地图&#xff0c;完成自动导航。 注&#xff1a;人工咨询 如果按照下面方案也无法成功解决&#xff0c;可以进入我淘宝咨询&#xff0c;可进行远程辅助解决。 1、安装部分 1.1 创建工作空间lee_ws mkdir…

java项目-第140期ssm高校二手交易平台-ssm毕业设计_计算机毕业设计

java项目-第140期ssm高校二手交易平台-ssm毕业设计_计算机毕业设计 【源码请到资源专栏下载】 今天分享的项目是《ssm高校二手交易平台》 该项目分为2个角色&#xff0c;管理员和用户。 用户在前台浏览商品&#xff0c;并且可以进行购买。用户购买后可以在后台查看自己的订单等…

论文阅读-基于低秩分解的网络异常检测综述

论文地址&#xff1a;基于低秩分解的网络异常检测综述 摘要&#xff1a; 异常检测对于网络管理与安全至关重要&#xff0e;国内外大量研究提出了一系列网络异常检测方法,其 中大多数方法更关注数据包及其独立时序数据流的分析、检测与告警,这类方法仅仅利用了网络数据之 间的…

嵌入式分享合集104

一、不用串口&#xff0c;如何打印STM32单片机log 本文主要介绍在嵌入式开发中用来输出log的方法。 最常用的是通过串口输出uart log&#xff0c;这种方法实现简单&#xff0c;大部分嵌入式芯片都有串口功能。但是这样简单的功能有时候却不是那么好用&#xff0c;比如&#xf…

入门 Activiti 工作流,通俗易懂

概念 工作流。通过计算机对业务流程自动化执行管理&#xff0c;主要解决的是“使在多个参与者之间按照某种预定义的规则自动进行传递文档、信息或任务的过程&#xff0c;从而实现某个预期的业务目标&#xff0c;或者促使此目标的实现”。 Activiti7 介绍 Activiti是一个工作…

Spring启动流程

Spring启动流程 按Bean状态描述&#xff1a; 创建Bean工厂对Bean工厂后置处理通过Component和Import扫描BeanDefinition&#xff0c;加入到Bean工厂注册Bean后置处理器&#xff0c;用于拦截Bean创建实例化填充属性初始化 调用aware方法BeanPostProcessor实例化前执行调用初始…

【java进阶01:final关键字】final修饰的变量只能赋一次值

目录 final修饰的类无法继承。 final修饰的引用一旦指向某个对象&#xff0c;则不能再指向其他对象&#xff0c;但该引用指向的对象内部的数据是可以修改的。​编辑 final修饰的实例变量必须手动初始化&#xff0c;因为系统不会赋默认值&#xff0c;强制手动赋值&#xff0c…

【router-view】切换组件 深刻理解用法 vue的设计思想

之前学的时候没学明白&#xff0c;导致写项目有些功能的实现上走了歪路。 今天询问了学长&#xff0c;更加深刻的理解的Vue的设计思想。 因为vue是单页面应用&#xff0c;所以学会用router-view来切换频繁变化的地方的组件是非常重要的。 之前&#xff0c;我的一个主页组件由…

Xshell远程连接配置 Ubuntu 18.04.6 + Anaconda + CUDA + Cudnn + Pytorch(GPU+CPU)

Xshell远程连接进行Ubuntu的Pytorch配置写在最前面参考Xshell常用命令Ubantu检查系统的各项配置查看ubuntu系统的版本信息查看Linux的内核版本和系统是多少位的Ubuntu版本各种验证禁用nouveau安装显卡驱动卸载显卡驱动安装显卡驱动加入PPA&#xff0c;然后更新库方法一&#xf…

maven离线模式及设置

maven离线模式及设置 maven离线模式使用场景&#xff1f; 遇到的问题&#xff1a; 最近遇到个项目支持&#xff0c;他在打jar包的时候&#xff0c;总是去网上下载 maven依赖&#xff0c;不去找我本地仓库的&#xff0c;就比较头大&#xff0c;原因不明 现在需求&#xff1a;就…

SpringBoot 玩一玩代码混淆,防止反编译代码泄露

编译 简单就是把代码跑一哈&#xff0c;然后我们的代码 .java文件 就被编译成了 .class 文件 反编译 就是针对编译生成的 jar/war 包 里面的 .class 文件 逆向还原回来&#xff0c;可以看到你的代码写的啥。 比较常用的反编译工具 JD-GUI &#xff0c;直接把编译好的jar丢进…

java项目-第144期ssm农产品供销服务系统-java毕业设计_计算机毕业设计

java项目-第144期ssm农产品供销服务系统-java毕业设计_计算机毕业设计 【源码请到资源专栏下载】 今天分享的项目是《ssm农产品供销服务系统》 该项目分为2个角色&#xff0c;管理员和用户。 用户可以浏览前台,包含功能有&#xff1a; 首页、农产品、农产品资讯、我的、跳转到后…

JavaScript DOM中获取元素、事件基础、操作元素、节点操作

目录 JavaScript事件三要素 常见的事件​编辑 2、执行事件的步骤&#xff1a; 1. 获取元素 1.1 方式 1.1.1 根据ID获取 1.1.2 根据标签名获取 1.1.3 通过html5新增的方法获取 1.1.4 特殊元素获取 2. 事件基础 2.1 事件三要素 2.1.1 事件源 2.1.2 事件类型 2.1.3…

如何使用Spring Security控制会话

在 Spring 安全教程的这篇文章中&#xff0c;我们将讨论Spring 安全会话管理。我们将讨论 Spring 安全性的独特功能&#xff0c;这有助于我们高效和安全的会话管理。 春季安全会议 本文将讨论安全会话管理以及 spring 安全性如何帮助我们控制 HTTP 会话。Spring 安全性使用以…

1.4_29 Axure RP 9 for mac 高保真原型图 - 案例28【中继器-后台管理系统6】功能-原位修改数据

相关链接 目录Axure中文学习网AxureShopAxureShop-QA 案例目标1. 了解使用中继器&#xff0c;弹窗修改数据的实现方式 一、成品效果 Axure Cloud 案例28【中继器 - 后台管理系统6】功能-原位修改数据 版本更新一、修改功能   1.1 文本框&#xff1a;点击数据位置&#xff…