Pyspark基础入门4_RDD转换算子

news2024/9/21 4:32:10

Pyspark

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Pyspark基础入门3
#博学谷IT学习技术支持


文章目录

  • Pyspark
  • 前言
  • 一、RDD算子的分类
  • 二、转换算子
    • 1.map算子
    • 2.groupBy算子
    • 3.filter算子
    • 4.flatMap算子
    • 5.union(并集) 和 intersection(交集)算子
    • 6.groupByKey算子
    • 7.reduceByKey算子
    • 8.sortByKey算子
    • 9.countByKey和 countByValue算子
  • 三、动作算子
    • 1.reduce算子
    • 2.first算子
    • 3.take算子
    • 4.top算子
    • 5.count算子
    • 6.foreach算子
    • 7.takeSample算子
  • 总结


前言

今天和大家分享的是Spark RDD算子相关的操作。
RDD算子: 指的是RDD对象中提供了非常多的具有特殊功能的函数, 我们一般将这样的函数称为算子(大白话: 指的RDD的API)


一、RDD算子的分类

整个RDD算子, 共分为二大类: Transformation(转换算子) 和 Action(动作算子)
转换算子:
1- 所有的转换算子在执行完成后, 都会返回一个新的RDD
2- 所有的转换算子都是LAZY(惰性),并不会立即执行, 此时可以认为通过转换算子来定义RDD的计算规则
3- 转换算子必须遇到Action算子才会触发执行

动作算子:
1- 动作算子在执行后, 不会返回一个RDD, 要不然没有返回值, 要不就返回其他的
2- 动作算子都是立即执行, 一个动作算子就会产生一个Job执行任务,运行这个动作算子所依赖的所有的RDD

二、转换算子

在这里插入图片描述

1.map算子

  • 格式: rdd.map(fn)
  • 说明: 根据传入的函数, 对数据进行一对一的转换操作, 传入一行, 返回一行
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo1").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    rdd_map = rdd_init.map(lambda num: num + 1)
    rdd_res = rdd_map.collect()
    print(rdd_res)
    sc.stop()

2.groupBy算子

  • 格式: groupBy(fn)
  • 说明: 根据传入的函数对数据进行分组操作
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo2").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

    # def jo(num):
    #     if num % 2 == 0:
    #         return 'o'
    #     else:
    #         return 'j'

    rdd_group_by = rdd_init.groupBy(lambda num: 'o' if num % 2 == 0 else 'j')
    rdd_res = rdd_group_by.mapValues(list).collect()
    print(rdd_res)
    sc.stop()

3.filter算子

  • 格式: filter(fn)
  • 说明: 过滤算子, 可以根据函数中指定的过滤条件, 对数据进行过滤操作, 条件返回True表示保留, 返回False表示过滤掉
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo3").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    rdd_res = rdd_init.filter(lambda num: num > 3).collect()
    print(rdd_res)
    sc.stop()

4.flatMap算子

  • 格式: flatMap(fn)
  • 说明: 在map算子的基础上, 在加入一个压扁的操作, 主要适用于一行中包含多个内容的操作, 实现一转多的操作
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo4").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd_init = sc.parallelize(['张三 李四 王五 赵六', '田七 周八 李九'])
    rdd_res = rdd_init.flatMap(lambda line: line.split(' ')).collect()
    print(rdd_res)
    sc.stop()

5.union(并集) 和 intersection(交集)算子

格式: rdd1.union|intersection(rdd2)

from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo5").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd1 = sc.parallelize([3, 1, 5, 7, 9])
    rdd2 = sc.parallelize([5, 8, 2, 4, 0])

    # rdd_res = rdd1.union(rdd2).collect()
    # rdd_res = rdd1.union(rdd2).distinct().collect()
    rdd_res = rdd1.intersection(rdd2).collect()
    print(rdd_res)
    sc.stop()

6.groupByKey算子

  • 格式: groupByKey()
  • 说明: 根据key进行分组操作
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo6").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd_init = sc.parallelize([('c01', '张三'), ('c02', '李四'), ('c02', '王五'),
                               ('c02', '赵六'), ('c03', '田七'), ('c03', '周八')])

    rdd_res = rdd_init.groupByKey().mapValues(list).collect()
    print(rdd_res)
    sc.stop()

7.reduceByKey算子

  • 格式: reduceByKey(fn)
  • 说明: 根据key进行分组, 将一个组内的value数据放置到一个列表中, 对这个列表基于 传入函数进行聚合计算操作
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo7").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd_init = sc.parallelize([('c01', '张三'), ('c02', '李四'), ('c02', '王五'),
                               ('c02', '赵六'), ('c03', '田七'), ('c03', '周八')])

    rdd_res = rdd_init.map(lambda kv: (kv[0], 1)).reduceByKey(lambda arr, curr: arr + curr).collect()
    print(rdd_res)
    sc.stop()

8.sortByKey算子

  • 格式: sortByKey(ascending = True|False)
  • 说明: 根据key进行排序操作, 默认按照key进行升序排序, 如果需要倒序, 设置 ascending 为False
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo8").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd_init = sc.parallelize([('c03', '张三'), ('c04', '李四'), ('c05', '王五'),
                               ('c01', '赵六'), ('c07', '田七'), ('c08', '周八')])

    rdd_res = rdd_init.sortByKey(ascending=False).collect()
    print(rdd_res)
    sc.stop()

9.countByKey和 countByValue算子

  • countByKey() 根据key进行分组 统计每个分组下有多少个元素
  • countByValue() 根据value进行分组, 统计相同value有多少个
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo9").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd_init = sc.parallelize([('c01', '张三'), ('c02', '李四'), ('c02', '王五'),
                               ('c02', '赵六'), ('c03', '田七'), ('c03', '周八'), ('c01', '张三')
                               ])

    rdd_res0 = rdd_init.countByKey()
    rdd_res1 = rdd_init.countByValue()

    print(rdd_res0)
    print(rdd_res1)
    sc.stop()

三、动作算子

在这里插入图片描述

1.reduce算子

  • 格式: reduce(fn)
  • 作用: 根据传入的函数对数据进行聚合操作
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo1").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    rdd_res = rdd_init.reduce(lambda agg, curr: agg + curr)
    print(rdd_res)
    sc.stop()

2.first算子

  • 格式: first()
  • 说明: 获取第一个元素
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo2").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    rdd_res = rdd_init.first()
    print(rdd_res)
    sc.stop()

3.take算子

  • 格式: take(N)
  • 说明: 获取前N个元素, 类似于limit操作
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo3").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    rdd_res = rdd_init.take(3)
    print(rdd_res)
    sc.stop()

4.top算子

  • 格式: top(N, [fn])
  • 说明: 对数据集进行倒序排序操作, 如果是kv类型, 默认是针对key进行排序, 获取前N个元素
  • fn: 可以自定义排序, 根据谁来排序
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo4").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd_init = sc.parallelize([('c03', 10), ('c04', 30), ('c05', 20),
                               ('c01', 20), ('c07', 80), ('c08', 5)])

    rdd_res = rdd_init.top(3, lambda kv: kv[1])
    print(rdd_res)
    sc.stop()

5.count算子

  • 格式: count()
  • 说明: 统计多少个
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo5").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    rdd_res = rdd_init.count()
    print(rdd_res)
    sc.stop()

6.foreach算子

  • 格式: foreach(fn)
  • 说明: 对数据集进行遍历操作, 遍历后做什么, 取决于传入的函数
from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo6").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd_init = sc.parallelize([('c03', 10), ('c04', 30), ('c05', 20),
                               ('c01', 20), ('c07', 80), ('c08', 5)])

    rdd_res = rdd_init.foreach(lambda kv : print(kv))
    print(rdd_res)
    sc.stop()

7.takeSample算子

  • 格式: takeSample(True|False, N,seed(种子值))

    • 参数1: 是否允许重复采样
    • 参数2: 采样多少个, 如果允许重复采样, 采样个数不限制, 否则最多等于本身数量个数
    • 参数3: 设置种子值, 值可以随便写, 一旦写死了, 表示每次采样的内容也是固定的(可选的) 如果没有特殊需要, 一般不设置
  • 作用: 数据抽样

from pyspark import SparkContext, SparkConf

if __name__ == '__main__':
    conf = SparkConf().setAppName("demo6").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    rdd_res = rdd_init.takeSample(withReplacement=True, num=5, seed=1)
    print(rdd_res)
    sc.stop()

总结

今天主要分享了RDD的转换算子和动作算子,下次继续分享RDD的一些其他重要算子。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/359790.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flex写法系列-Flex布局之基本语法

以前的传统布局,依赖盒装模型。即 display position float 属性。但是对于比较特殊的布局就不太容易实现,例如:垂直居中。下面主要介绍flex的基本语法,后续还有二期介绍Flex的写法。一、什么是Flex布局?Flex布局个人…

Vuex的创建和简单使用

Vuex 1.简介 1.1简介 1.框框里面才是Vuex state:状态数据action:处理异步mutations:处理同步,视图可以同步进行渲染1.2项目创建 1.vue create 名称 2.运行后 3.下载vuex。采用的是基于vue2的版本。 npm install vuex3 --save 4.vu…

Frequency Domain Model Augmentation for Adversarial Attack

原文:[2207.05382] Frequency Domain Model Augmentation for Adversarial Attack (arxiv.org)代码:https://github.com/yuyang-long/SSA.黑盒攻击替代模型与受攻击模型之间的差距通常较大,表现为攻击性能脆弱。基于同时攻击不同模型可以提高…

C++8:模拟实现list

目录 最基础的链表结构以及迭代器实现 链表节点结构 构造函数 push_back list的迭代器 增删查改功能实现 insert erase pop_front pop_back push_front clear 默认成员函数 析构函数 拷贝构造函数 赋值操作符重载 list的完善 const迭代器 赋值操作符重…

使用BP神经网络诊断恶性乳腺癌(Matlab代码实现)

目录 💥1 概述 📚2 运行结果 🎉3 参考文献 👨‍💻4 Matlab代码 💥1 概述 1.1.算法简介 BP(Back Propagation)网络是1986年由Rumelhart和McCelland为首的科学家小组提出&#xf…

c语言编程规范第三部分

3、头文件应向稳定的方向包含 头文件的包含关系是一种依赖,一般来说,应当让不稳定的模块依赖稳定的模块,从而当不稳定的模块发生变化时,不会影响(编译)稳定的模块。就我们的产品来说,依赖的方向…

数据复制 软件 SnapMirror:统一复制,更快恢复

数据复制 软件 SnapMirror:统一复制,更快恢复 预测未知领域是一项棘手的工作。让 SnapMirror 软件来处理则轻松得多。 通过数据的高可用性和快速数据复制,可即时访问业务关键型数据。放松一下,它会让你满意的。 为什么用 SnapMi…

3D目标检测(一)—— 基于Point-Based方法的PointNet系列

3D目标检测(一)—— PointNet,PointNet,PointNeXt, PointMLP 目录 3D目标检测(一)—— PointNet,PointNet,PointNeXt, PointMLP 前言 零、网络使用算法 …

AQS与Synchronized异曲同工的加锁流程

在并发多线程的情况下,为了保证数据安全性,一般我们会对数据进行加锁,通常使用Synchronized或者ReentrantLock同步锁。Synchronized是基于JVM实现,而ReentrantLock是基于Java代码层面实现的,底层是继承的AQS。 AQS全称…

c++函数对象(仿函数)、谓词、内建函数对象

1、函数对象 1.1 概念 重载函数调用操作符的类,这个类的对象就是函数对象,在使用这个函数对象对应使用重载的()符号时,行为类似于函数调用,因此这个函数也叫仿函数。 注意:函数对象&#xff0…

多个任务并行的时候,你是否总是会手忙脚乱?

很多重要事情之所以变得迫在眉睫,需要立刻处理、应付,是因为被延误或没有进行足够的预防和准备,筹划。 面对多个任务并行的时候,你是否总是会手忙脚乱? 在项目工作中,管理者每天要面对各种工作&#xff…

移动WEB开发二、流式布局

零、文章目录 文章地址 个人博客-CSDN地址:https://blog.csdn.net/liyou123456789个人博客-GiteePages:https://bluecusliyou.gitee.io/techlearn 代码仓库地址 Gitee:https://gitee.com/bluecusliyou/TechLearnGithub:https:…

【Linux】线程函数和线程同步详细整理(金针菇般细)

目录 一,线程函数 1.获取当前线程ID 2.创建线程 3.退出线程 4.阻塞线程 5.分离线程 6.取消线程 7.线程比较 8.测试代码(线程函数总结) 二,线程同步 1.互斥锁 2.读写锁 3.条件变量 4.信号量 一,线程函数 …

【阿旭机器学习实战】【29】产品广告投放实战案例---线性回归

【阿旭机器学习实战】系列文章主要介绍机器学习的各种算法模型及其实战案例,欢迎点赞,关注共同学习交流。 目录问题描述数据处理过程及源码通过数据可视化分析数据训练线性回归模型可视化训练好的线性回归模型结果预测问题描述 你所在的公司在电视上做产…

mybatis狂神(附自学过程中疑问解决)

首先先附上mybatis的官方文本链接mybatis – MyBatis 3 | 简介一、Mybatis介绍MyBatis 是一款优秀的持久层框架,它支持自定义 SQL、存储过程以及高级映射。MyBatis 免除了几乎所有的 JDBC 代码以及设置参数和获取结果集的工作。MyBatis 可以通过简单的 XML 或注解来…

Comparator和Comparable的区别以及Collections.sort排序原理

一、概述 Comparable和Comparator都是两个接口,接口都可以用来实现集合中元素的比较、排序,Comparator位于包java.util下,而Comparable位于包java.lang下,Comparable接口将比较代码嵌入自身类中,而Comparator既可以嵌…

非标题党:前端Vue React 项目编程规范化配置(大厂规范)

前端项目编程规范化配置 下述例子主要是从 代码规范化 以及 git 提交规范化 两方面进行配置。内容很多,请做好心理准备 一、代码检测工具 ESLint 在我们通过 vue create “项目名” 时,我们可以通过手动配置的方式,来配置 ESLint 来对代码进…

QDateTime的11种显示方式

QDateTime datetime QDateTime::currentDateTime(); datetime.toString(“hh:mm:ss\nyyyy/MM/dd”); datetime.toString(“hh:mm:ss ap\nyyyy/MM/dd”); datetime.toString(“hh:mm:ss\nyyyy-MM-dd”); datetime.toString(“hh:mm:ss ap\nyyyy-MM-dd”); datetime.to…

【分享】订阅用友YonSuite集简云连接器同步销售出库数据至用友YonSuite

方案场景 在企业中因多种系统孤立导致数据割裂,是现企业中现阶段面临的最大问题,而钉钉作为常用的OA审批系统,用友YonSuite作为ERP系统,原方式钉钉内完成审批再由人工将数据同步到用友YonSuite系统,数据同步过程中不仅…

将HTTP接口配置成HTTPS

一、使用Java的keytool.exe程序生成本机的TLS许可找到Java的jdk目录进入bin默认安装路径C:\Program Files\Java\jdk1.8.0_91\bin 进入命令面板,在bin的路径栏中输入cmd敲击回车即可使用keytoolkeytool -genkeypair -alias tomcat_https -keypass 123456 -keyalg RSA…