【Python】PySpark 数据计算 ⑤ ( RDD#sortBy方法 - 排序 RDD 中的元素 )

news2025/1/6 11:14:48

文章目录

  • 一、RDD#sortBy 方法
    • 1、RDD#sortBy 语法简介
    • 2、RDD#sortBy 传入的函数参数分析
  • 二、代码示例 - RDD#sortBy 示例
    • 1、需求分析
    • 2、代码示例
    • 3、执行结果





一、RDD#sortBy 方法




1、RDD#sortBy 语法简介


RDD#sortBy 方法 用于 按照 指定的 键 对 RDD 中的元素进行排序 , 该方法 接受一个 函数 作为 参数 , 该函数从 RDD 中的每个元素提取 排序键 ;

根据 传入 sortBy 方法 的 函数参数 和 其它参数 , 将 RDD 中的元素按 升序 或 降序 进行排序 , 同时还可以指定 新的 RDD 对象的 分区数 ;


RDD#sortBy 语法 :

sortBy(f: (T) ⇒ U, ascending: Boolean, numPartitions: Int): RDD[T]
  • 参数说明 :
    • f: (T) ⇒ U 参数 : 函数 或 lambda 匿名函数 , 用于 指定 RDD 中的每个元素 的 排序键 ;
    • ascending: Boolean 参数 : 排序的升降设置 , True 生序排序 , False 降序排序 ;
    • numPartitions: Int 参数 : 设置 排序结果 ( 新的 RDD 对象 ) 中的 分区数 ;
      • 当前没有接触到分布式 , 将该参数设置为 1 即可 , 排序完毕后是全局有序的 ;
  • 返回值说明 : 返回一个新的 RDD 对象 , 其中的元素是 按照指定的 排序键 进行排序的结果 ;

2、RDD#sortBy 传入的函数参数分析


RDD#sortBy 传入的函数参数 类型为 :

(T) ⇒ U

T 是泛型 , 表示传入的参数类型可以是任意类型 ;

U 也是泛型 , 表示 函数 返回值 的类型 可以是任意类型 ;

T 类型的参数 和 U 类型的返回值 , 可以是相同的类型 , 也可以是不同的类型 ;





二、代码示例 - RDD#sortBy 示例




1、需求分析


统计 文本文件 word.txt 中出现的每个单词的个数 , 并且为每个单词出现的次数进行排序 ;

Tom Jerry
Tom Jerry Tom
Jack Jerry Jack Tom

在这里插入图片描述

读取文件中的内容 , 统计文件中单词的个数并排序 ;

思路 :

  • 读取数据到 RDD 中 ,
  • 然后 按照空格分割开 再展平 , 获取到每个单词 ,
  • 根据上述单词列表 , 生成一个 二元元组 列表 , 列表中每个元素的 键 Key 为单词 , 值 Value 为 数字 1 ,
  • 对上述 二元元组 列表 进行 聚合操作 , 相同的 键 Key 对应的 值 Value 进行相加 ;
  • 将聚合后的结果的 单词出现次数作为 排序键 进行排序 , 按照升序进行排序 ;

2、代码示例


对 RDD 数据进行排序的核心代码如下 :

# 对 rdd4 中的数据进行排序
rdd5 = rdd4.sortBy(lambda element: element[1], ascending=True, numPartitions=1)

要排序的数据如下 :

[('Tom', 4), ('Jack', 2), ('Jerry', 3)]

按照上述二元元素的 第二个 元素 进行排序 , 对应的 lambda 表达式为 :

lambda element: element[1]

ascending=True 表示升序排序 ,

numPartitions=1 表示分区个数为 1 ;


排序后的结果为 :

[('Jack', 2), ('Jerry', 3), ('Tom', 4)]

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "D:/001_Develop/022_Python/Python39/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 将 文件 转为 RDD 对象
rdd = sparkContext.textFile("word.txt")
print("查看文件内容 : ", rdd.collect())

# 通过 flatMap 展平文件, 先按照 空格 切割每行数据为 字符串 列表
#   然后展平数据解除嵌套
rdd2 = rdd.flatMap(lambda element: element.split(" "))
print("查看文件内容展平效果 : ", rdd2.collect())

# 将 rdd 数据 的 列表中的元素 转为二元元组, 第二个元素设置为 1
rdd3 = rdd2.map(lambda element: (element, 1))
print("转为二元元组效果 : ", rdd3.collect())

# 应用 reduceByKey 操作,
#   将同一个 Key 下的 Value 相加, 也就是统计 键 Key 的个数
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
print("统计单词 : ", rdd4.collect())

# 对 rdd4 中的数据进行排序
rdd5 = rdd4.sortBy(lambda element: element[1], ascending=True, numPartitions=1)
print("最终统计单词并排序 : ", rdd4.collect())

# 停止 PySpark 程序
sparkContext.stop()



3、执行结果


执行结果 :

D:\001_Develop\022_Python\Python39\python.exe D:/002_Project/011_Python/HelloPython/Client.py
23/08/04 10:49:06 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: Could not locate Hadoop executable: D:\001_Develop\052_Hadoop\hadoop-3.3.4\bin\winutils.exe -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
PySpark 版本号 :  3.4.1
查看文件内容 :  ['Tom Jerry', 'Tom Jerry Tom', 'Jack Jerry Jack Tom']
查看文件内容展平效果 :  ['Tom', 'Jerry', 'Tom', 'Jerry', 'Tom', 'Jack', 'Jerry', 'Jack', 'Tom']
转为二元元组效果 :  [('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jack', 1), ('Jerry', 1), ('Jack', 1), ('Tom', 1)]
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
统计单词 :  [('Tom', 4), ('Jack', 2), ('Jerry', 3)]
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
最终统计单词并排序 :  [('Jack', 2), ('Jerry', 3), ('Tom', 4)]

Process finished with exit code 0

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/836263.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pp-ocr报错记录

RESER 报错: distutils.errors.DistutilsError: Could not find suitable distribution for Requirement.parse(‘tomli>1.0.0’) 解决办法: 参考:https://stackoverflow.com/questions/67603407/distutilserror-could-not-find-suitable…

golang函数传参——值传递理解

做了五年的go开发,却并没有什么成长,都停留在了业务层面了。一直以为golang中函数传参,如果传的是引用类型,则是以引用传递,造成这样的误解,实在也不能怪我。我们来看一个例子,众所周知&#xf…

享元模式 Flyweight Pattern 《游戏编程模式》学习笔记

如果我们要存储一个树一样的数据结构,直觉来说我们会这么写 但是实际上我们会发现,哪怕森林里有千千万万的树,它们大多数长得一模一样。 它们使用了相同的网格和纹理。 这意味着这些树的实例的大部分字段是一样的。 那么我们就可以将树共…

C语言自定义类型 — 结构体、位段、枚举、联合

前言 本期主要对通讯录三篇博客文章进行补充 通讯录文章:通讯录系列文章 对结构体进行详细介绍,其次讲解位段、枚举、联合体 文章目录 前言一、结构体1.什么是结构体2.结构声明2.1 声明格式2.2 如何声明(代码演示) 3.特殊声明3.1…

css word-break

上面的一行还是可以放置很多个字符的,但是就是换行了。 要求填充满整行,超过在换行 加上word-break:break-all;就行

每日汇评:由于美国就业数据强劲,黄金可能恢复下行趋势

1、美国非农就业数据公布前,金价试图从三周低点反弹; 2、美国经济数据喜忧参半,推动美元和美债收益率回落; 3、金价上行空间有限,因日技术面走势偏空; 金价又将下跌一周,周五有望创下六周以来…

浅谈小区电动汽车充电桩管理系统设计及应用

安科瑞 华楠 摘要:提出了针对数量众的充电桩计算充电负荷时所需需要系数的一种替代方法,阐述了二者适用背景的相似性,并针对其可行性进行了相关论述。数發众的新能源汽车在相对集中的时间充电(尤其是与居民用电高峰重的惰况下&…

测试人员简单使用Jenkins

一、测试人员使用jenkins干什么? 部署测试环境 二、相关配置说明 一般由开发人员进行具体配置 1.Repository URL:填写git地址 2.填写开发分支,测试人员可通过相应分支进行测试环境的构建部署 当多个版本并行时,开发人员可以通过…

devops-发布vue前端项目

回到目录 将使用jenkinsk8s发布前端项目 1 环境准备 node环境 在部署jenkins的服务器上搭建node环境 node版本 # 1.拉取 https://nodejs.org/download/release/v20.4.0/node-v20.4.0-linux-x64.tar.gz# 2.解压到/usr/local目录下 sudo tar xf v20.4.0.tar.gz -C /usr/loc…

唐刘:TiDB 研发工程实践及 TiDB 人才观丨CCF 中国数据库暑期学校

在刚刚结束的 CCF 中国数据库暑期学校上, PingCAP 的研发副总裁唐刘分享了在 TiDB 研发过程中的工程实践经验和人才培养方法。目前,TiDB 已广泛应用于各行各业,有着庞大的用户基数,面临多样化的数据处理需求。PingCAP 通过开源、敏…

Datax 数据同步-使用总结(一)

1,实时同步? datax 通常做离线数据同步使用。 目前能想到的方案 利用 linux 的定时任务时间戳的方式做增量同步。 2,同步速度快不快? 单表同步速度还是挺快的 但是如果遇到复杂的 sql 查询,其同步效率&#xff0c…

企业数字化转型:信息化还是数字化?

面对巨大的数字经济市场,全球大部分企业都开始了数字化转型进程,国内一半以上的企业已经将数字化转型视为下一步发展重点,并制定了清晰的数字化转型战略规划。 但是,相当一部分传统行业,如制造、金融、能源、化工等非数…

使用ngrok实现内网穿透

前言:因为公司对接的某个项目要搭建一个测试环境,所以使用内网穿透的方式来搭建。非常方便,而且还节省资源,最重要的是免费啊这个工具。 ngrok官网:https://ngrok.com/ 点击下载,很快就能下好。 下好之…

Postman(接口测试工具)

目录 一.基本介绍 1.Postman 是什么 2.Postman 快速入门 2.1快速入门需求说明 二.Postman 完成 Controller 层测试 需要的代码: Java类 request.jsp success.jsp 1. 完成请求 2. 完成请求 3. 完成请求 4. 完成请求 5. 完成请求 三.发送join 一.基本介…

【Ubuntu】Ubuntu 22.04 升级 OpenSSH 9.3p2 修复CVE-2023-38408

升级原因 近日Openssh暴露出一个安全漏洞CVE-2023-38408,以下是相关资讯: 一、漏洞详情 OpenSSH是一个用于安全远程登录和文件传输的开源软件套件。它提供了一系列的客户端和服务器程序,包括 ssh、scp、sftp等,用于在网络上进行…

机器学习深度学习——卷积神经网络(LeNet)

👨‍🎓作者简介:一位即将上大四,正专攻机器学习的保研er 🌌上期文章:机器学习&&深度学习——池化层 📚订阅专栏:机器学习&&深度学习 希望文章对你们有所帮助 卷积神…

微信朋友圈会自动点赞?

网友称微信存在bug,朋友圈会自动点赞?腾讯回应了 微信作为国内最大的网络社交平台,目前用户已超过11亿。 令人吃惊的是,拥有这么庞大用户数量的平台,竟然有可能存在Bug。 近日,#微信回应看朋友圈会自动点…

Linux性能分析工具介绍(二)--内存、进程、磁盘、IO分析

目录 一、引言 二、Linux性能分析工具介绍 ------>2.1、进程 ------>2.2、内存 ------>2.3、磁盘 ------>2.4、IO 一、引言 本章从内存、IO、进程的角度,分析linux系统的性能 二、Linux性能分析工具介绍 2.1、进程 2.1.1、top top命令可以动态查看进程…

【pandas百炼成钢】数据预览与预处理

知识目录 前言一、数据查看1 - 查看数据维度2 - 随机查看5条数据3 - 查看数据前后5行4 - 查看数据基本信息5 - 查看数据统计信息|数值6 - 查看数据统计信息|非数值7 - 查看数据统计信息|整体 二、缺失值处理8 - 计算缺失值|总计9 …

【ASP.NET MVC】使用动软(三)(11)

一、问题 上文中提到,动软提供了数据库的基本操作功能,但是往往需要添加新的功能来解决实际问题,比如GetModel,通过id去查对象: 这个功能就需要进行改进:往往程序中获取的是实体的其他属性,比如…