Pyspark下操作dataframe方法(1)

news2024/12/27 14:04:25

文章目录

  • Pyspark dataframe
    • 创建DataFrame
      • 使用Row对象
      • 使用元组与scheam
      • 使用字典与scheam
      • 注意
    • agg 聚合操作
    • alias 设置别名
      • 字段设置别名
      • 设置dataframe别名
    • cache 缓存
    • checkpoint RDD持久化到外部存储
    • coalesce 设置dataframe分区数量
    • collect 拉去数据
    • columns 获取dataframe列

Pyspark dataframe

创建DataFrame

from pyspark.sql import  SparkSession,Row
from pyspark.sql.types import *

def init_spark():
    spark  = SparkSession.builder.appName('LDSX_TEST_DATAFrame') \
        .config('hive.metastore.uris', 'thrift://hadoop01:9083') \
        .config('spark.master', "local[2]") \
        .enableHiveSupport().getOrCreate()
    return spark
spark = init_spark()

# 设置字段类型
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
])

使用Row对象

cs = Row('name','age','id','gender')
row_list = [ cs('ldsx','12','1','男'),cs('test1','20','1','女'),
             cs('test2','26','1','男'),cs('test3','19','1','女'),
             cs('test4','51','1','女'),cs('test5','13','1','男')]
data = spark.createDataFrame(row_list)
data.show()
 +-----+---+---+---+
| name|age| id|gender|
+-----+---+---+---+
| ldsx| 12|  1| 男|
|test1| 20|  1| 女|
|test2| 26|  1| 男|
|test3| 19|  1| 女|
|test4| 51|  1| 女|
|test5| 13|  1| 男|
+-----+---+---+---+
data.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)

使用元组与scheam

park.createDataFrame([('ldsx1','12','1','男'),('ldsx2','12','1','男')],schema).show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|ldsx1| 12|  1|    男|
|ldsx2| 12|  1|    男|
+-----+---+---+------+

使用字典与scheam

spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}]).show()
+---+------+---+----+
|age|gender| id|name|
+---+------+---+----+
| 12|    女|  1|ldsx|
+---+------+---+----+

注意

scheam设置优先级高于row设置,dict设置的key

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("id", StringType(), True),
    StructField("测试", StringType(), True),
])
spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}],schema).show()
+----+---+---+----+
|name|age| id|测试|
+----+---+---+----+
|ldsx| 12|  1|null|
+----+---+---+----+

agg 聚合操作

在 PySpark 中,agg(aggregate)函数用于对 DataFrame 进行聚合操作。它允许你在一个或多个列上应用一个或多个聚合函数,并返回计算后的结果。可以结合groupby使用。

from pyspark.sql import functions as sf
data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
data.agg({'age':'max'}).show()
+--------+
|max(age)|
+--------+
|      51|
+--------+
data.agg({'age':'max','gender':"max"}).show()
+-----------+--------+
|max(gender)|max(age)|
+-----------+--------+
|         男|      51|
+-----------+--------+

data.agg(sf.min(data.age)).show()
+--------+
|min(age)|
+--------+
|      12|
+--------+
data.agg(sf.min(data.age),sf.min(data.name)).show()
+--------+---------+
|min(age)|min(name)|
+--------+---------+
|      12|     ldsx|
+--------+---------+

结合groupby使用

data.groupBy('gender').agg(sf.min('age')).show()
 +------+--------+
|gender|min(age)|
+------+--------+
|    女|      19|
|    男|      12|
+------+--------+
data.groupBy('gender').agg(sf.min('age'),sf.max('name')).show()
+------+--------+---------+
|gender|min(age)|max(name)|
+------+--------+---------+
|    女|      19|    test4|
|    男|      12|    test5|
+------+--------+---------+

alias 设置别名

字段设置别名

#字段设置别名
data.select(data['name'].alias('rename_name')).show()
+-----------+
|rename_name|
+-----------+
|       ldsx|
|      test1|
|      test2|
|      test3|
|      test4|
|      test5|
+-----------+

设置dataframe别名

d1 = data.alias('ldsx1')
d2 = data2.alias('ldsx2')
d1.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
d2.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|测试1| 12|  1|    男|
|测试2| 20|  1|    男|
+-----+---+---+------+

d3 = d1.join(d2,col('ldsx1.gender')==col('ldsx2.gender'),'inner')
d3.show()
+-----+---+---+------+-----+---+---+------+
| name|age| id|gender| name|age| id|gender|
+-----+---+---+------+-----+---+---+------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|
|test2| 26|  1|    男|测试1| 12|  1|    男|
|test2| 26|  1|    男|测试2| 20|  1|    男|
|test5| 13|  1|    男|测试1| 12|  1|    男|
|test5| 13|  1|    男|测试2| 20|  1|    男|
+-----+---+---+------+-----+---+---+------+

d3[['name']].show()
#报错提示
pyspark.errors.exceptions.captured.AnalysisException: [AMBIGUOUS_REFERENCE] Reference `name` is ambiguous, could be: [`ldsx1`.`name`, `ldsx2`.`name`].
# 使用别名前缀获取
d3[['ldsx1.name']].show()
+-----+
| name|
+-----+
| ldsx|
| ldsx|
|test2|
|test2|
|test5|
|test5|
+-----+
>>> d3[['ldsx2.name']].show()
+-----+
| name|
+-----+
|测试1|
|测试2|
|测试1|
|测试2|
|测试1|
|测试2|
+-----+
d3.select('ldsx1.name','ldsx2.name').show()
+-----+-----+
| name| name|
+-----+-----+
| ldsx|测试1|
| ldsx|测试2|
|test2|测试1|
|test2|测试2|
|test5|测试1|
|test5|测试2|
+-----+-----+

cache 缓存

dataframe缓存默认缓存级别MEMORY_AND_DISK_DESER

df.cache()
# 查看逻辑计划和物理计划
df.explain()

checkpoint RDD持久化到外部存储

Checkpoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用Checkpoint比较合适,或者数据量很大的时候,采用Checkpoint比较合适。如果数据量小,或者RDD重新计算也是非常快的,直接使用缓存即可。
CheckPoint支持写入HDFS。CheckPoint被认为是安全的

sc = spark.sparkContext
# 设置检查存储目录
sc.setCheckpointDir('hdfs:///ldsx_checkpoint')
d3.count()
# 保存会在hdfs上进行存储
d3.checkpoint()
# 从hdfs读取
d3.count()

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

coalesce 设置dataframe分区数量

# 设置dataframe分区数量
d3 = d3.coalesce(3)
# 获取分区数量
d3.rdd.getNumPartitions()

collect 拉去数据

当任务提交到集群的时候collect()操作是用来将所有结点中的数据收集到dirver节点,数据量很大慎用防止dirver炸掉。

d3.collect()
[Row(name='ldsx', age='12', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='ldsx', age='12', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试2', age='20', id='1', gender='男')]

columns 获取dataframe列

>>> d3.columns
['name', 'age', 'id', 'gender', 'name', 'age', 'id', 'gender']

	
d3.withColumn('ldsx1.name_1',col('ldsx1.name')).show()
+-----+---+---+------+-----+---+---+------+------------+
| name|age| id|gender| name|age| id|gender|ldsx1.name_1|
+-----+---+---+------+-----+---+---+------+------------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|        ldsx|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|        ldsx|
|test2| 26|  1|    男|测试1| 12|  1|    男|       test2|
|test2| 26|  1|    男|测试2| 20|  1|    男|       test2|
|test5| 13|  1|    男|测试1| 12|  1|    男|       test5|
|test5| 13|  1|    男|测试2| 20|  1|    男|       test5|
+-----+---+---+------+-----+---+---+------+------------+

# 重命名列名
d3.withColumnRenamed('ldsx1.name_1',col('ldsx1.name')).show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2107694.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【如何用远程连接到ubuntu服务器上的redis】

文章目录 ubuntu上安装redis常用命令 远程连接测试在另一台PC上进行远程访问 ubuntu上安装redis Redis 5.0 被包含在默认的 Ubuntu 20.04 软件源中。想要安装它,以 root 或者其他 sudo 身份运行下面的命令: sudo apt update //更新apt sudo apt inst…

全视通精彩亮相宁夏养老服务业博览会,助力西北地区养老产业高质量发展

据悉,今年4月,宁夏被列入48个全国基本养老服务综合平台试点地区,是全域申报成功的8个省(直辖市)之一,也是西北唯一的入选省份。5月,中卫市成功入选2024年居家和社区基本养老服务提升行动项目地区…

多智能体强化学习:citylearn城市建筑能量优化和需求响应

今天分享一个用于能量优化的强化学习框架,citylearn 代码量非常庞大,我都不敢看,看也看不完,不花一定的时间难以搞懂它的原理。 CityLearn(CL)环境是一个类似 OpenAI Gym 的环境,它通过控制不…

网络安全服务基础Windows--第10节-FTP主动与被动模式

概述 将某台计算机中的⽂件通过⽹络传送到可能相距很远的另⼀台计算机中,是⼀项基本的⽹络应⽤,即⽂件传送。 ⽂件传送协议FTP (File Transfer Protocol)是因特⽹上使⽤得最⼴泛的⽂件传送协议。 FTP是⼀个⽼早的⽹络协议&…

图形几何-如何将凹多边形分解成若干个凸多边形

凹多边形的概念 凹多边形是指至少有一个内角大于180度的多边形。与之相对,凸多边形的所有内角均小于或等于180度,且任意两点之间的连线都完全位于多边形内部。将凹多边形分解成若干个凸多边形是计算几何中的一个重要问题。 分解原理 将凹多边形分解为凸…

Python【3】乌七八糟

目录 if __name__ "__main__ 模块名————__name__ 装饰器 参数的优化——可以接受任何函数 需要添加自定义参数——再套一层 语法糖——好甜! 类init self if __name__ "__main__ 在Python中,if __name__ "__main__"…

再谈全排列

题目链接: . - 力扣(LeetCode) 每次做全排列的题目,我都要孕育好一阵子,到底怎么去思考这个问题呢? 首先,我觉得最好的方式就是画个树。 画了树之后,你就知道,这个问题&…

鸿蒙轻内核M核源码分析系列五 时间管理

往期知识点记录: 鸿蒙(HarmonyOS)应用层开发(北向)知识点汇总 持续更新中…… 在鸿蒙轻内核源码分析上一篇文章中,我们剖析了中断的源码,简单提到了Tick中断。本文会继续分析Tick和时间相关的源…

多所高校拟撤销地理、测绘、建筑等相关专业!网友:游了很久,发现没有岸!

近日,各大高校频频传来专业“下线”的消息。多所高校拟撤销地理、测绘等相关专业。对此很多网友破防了,表示:还没毕业,专业没了? 更有城乡规划的网友表示:自己已经成为怨种毕业生,游了很久&…

公司数字化转型的目的是什么?

不同行业公司,其数字化转型的目的也不一样。下面我列举几个行业,给大家讲讲其数字化转型的真正目的。 制造数字化转型 制造业来说,数字化转型的本质是通过新一代信息技术与制造技术的融合,实现以数据为核心的资源要素变革、以网络…

【8.28更新】Win10 22H2 正式版:19045.4842镜像下载!

今日系统之家小编给大家带来2024年最新的Windows10 22H2正式版系统,该版本系统基于微软官方Windows 10 22H2 19045.4842 64位 专业版进行离线制作与优化,系统安全无任何病毒残留,且兼容性出色,能完美兼容新老机型。安装后&#xf…

一大波华为“黑”正在赶来

文|琥珀食酒社 作者 | 积溪 不管你信不信 我都敢肯定的告诉你 又一波黑华为的浪潮 将在下周到来 因为下周二 也就是9月10号 华为将发布一款划时代的产品 华为MateXT非凡大师 三折叠屏手机 就我现在得到的情况 这款手机最大的特点 就是先进 余承东都说…

SRT协议分析以及收拉流测试

文章目录 介绍协议概述协议常用URL格式协议工作流程协议包格式数据包和控制包数据包控制包ACKNACK 开源协议栈libSRTFFmpegVLC Media PlayerSRT AllianceSRS 测试使用ffmpegsrs推流端接收端播放端srs配置 使用 libSRT发送端接收端 介绍 SRT(Secure Reliable Transpo…

力扣: 有效的字母异位词

文章目录 需求数组map结尾 需求 给定两个字符串 s 和 t ,编写一个函数来判断 t 是否是 s 的字母异位词。 字母异位词 是通过重新排列不同单词或短语的字母而形成的单词或短语,通常只使用所有原始字母一次。 示例 1: 输入: s “anagram”, t “nagaram…

9、类和对象

9.1 封装 9.1.1 封装的例子 class Student { public:string name;int age; public:void setName(string name_) {name name_;} }; int main() {Student s1;s1.setName("zhangsan");return 0; }类中的行为都叫做成员,例如成员属性,成员变量&…

磁吸轨道灯的优缺点深度解析:为你的家居照明提供新选择

在现代家居装修中,照明设计已成为提升居住品质的重要一环。磁吸轨道灯作为一种新兴的照明解决方案,以其独特的灵活性和美观性逐渐受到市场的青睐。然而,任何产品都有其两面性,磁吸轨道灯也不例外。本文将深入探讨磁吸轨道灯的优缺…

产品起名|给你的产品插上会飞的翅膀

引言:在品牌的世界里,产品的名字不仅仅是一个标签,它是品牌个性、价值和承诺的直接体现。一个好的产品名能够快速传达产品特性,吸引消费者,并在市场中建立独特的品牌形象。 好产品从起名开始 品牌介绍:南京…

使用卫星仿真软件STK的一些应用和思考(星地链路、星间链路)

目录 任务描述利用STK建模星地协同系统3个GEO高轨卫星240/20/1 Walker-Star Constellation 低轨卫星星座地面站或者地面设备 链路建模与数据提取处理星地链路星间链路数据读取的几种方法最麻烦的方法使用Matlab与STK互联接口使用大规模使用Chain 总结 任务描述 在一个星地协同…

【小设计】基于宏实现的C++ 可复用setter 和getter设计

前言 最近在开发unity游戏的时候,面对庞大复杂的不同类之间进行数据交换和调用,我们必须做好类数据的信息管理,往往我希望暴露给其他类越少越好,这时候我就利用了C#的一个语言特性 public PlayerStateMachine stateMachine{get;…

创建锁对象/函数

描述:某些单据进行修改时,需要锁定数据 方法步骤: 1、se11:可copy创建新锁 EZSDDH 2、输入需要锁定的主表,锁参数会根据主键自动补填 3、激活后,会生成对应的锁函数 ENQUEUE_EZSDDH :锁定表 …