Pyspark dataframe基本内置方法(5)

news2025/1/24 7:14:33

文章目录

  • Pyspark sql DataFrame
    • 相关文章
    • toDF 设置新列名
    • toJSON row对象转换json字符串
    • toLocallterator 获取迭代器
    • toPandas 转换python dataframe
    • transform dataframe转换
    • union unionALL 并集不去重(按列顺序)
    • unionByName 并集不去重(按列名)
    • unpivot 反转表(宽表转长表)
    • withColumn 添加列操作
    • withColumns 添加多列操作
    • withColumnRenamed 列重命名
    • withColumnsRenamed 多列重命名
    • withMetadata 设置元数据
    • write 存储表
      • write.saveAsTable
      • insertInto

Pyspark sql DataFrame

相关文章

Pyspark下操作dataframe方法(1)
Pyspark下操作dataframe方法(2)
Pyspark下操作dataframe方法(3)
Pyspark下操作dataframe方法(4)
Pyspark下操作dataframe方法(5)

toDF 设置新列名

列名更新,将会按照新列名顺序的替换原列名返回新dataframe,更新列名数量需要跟原始列名数量一致。

from pyspark.sql.functions import lit

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+
data.toDF(*['n1','n2','n3','n5','n4']).show()
+-----+---+---+---+---+
|   n1| n2| n3| n5| n4|
+-----+---+---+---+---+
| ldsx| 12|  1| 男|  1|
|test1| 20|  1| 女|  1|
|test2| 26|  1| 男|  1|
|test3| 19|  1| 女|  1|
|test4| 51|  1| 女|  1|
|test5| 13|  1| 男|  1|
+-----+---+---+---+---+

toJSON row对象转换json字符串

把dataframe的row对象转换为json字符串,返回rdd

data.rdd.first()
Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
# data.toJSON()返回rdd类型
data.toJSON().first()
'{"name":"ldsx","age":"12","id":"1","gender":"男","new_id":"1"}'

toLocallterator 获取迭代器

返回一个迭代器,其中包含此DataFrame中的所有行。迭代器将消耗与此DataFrame中最大分区一样多的内存。通过预取,它可能会消耗最多2个最大分区的内存。

d1 = data.toLocalIterator()
d1
<generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f55c86e0570>
# 便利迭代器
for i in d1:
    print(i)
    
Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
Row(name='test1', age='20', id='1', gender='女', new_id='1')
Row(name='test2', age='26', id='1', gender='男', new_id='1')
Row(name='test3', age='19', id='1', gender='女', new_id='1')
Row(name='test4', age='51', id='1', gender='女', new_id='1')
Row(name='test5', age='13', id='1', gender='男', new_id='1')

toPandas 转换python dataframe

需要python环境安装pandas的前提下使用,且dataframe需要很小,因为所有数据都加载到driver的内存中。

data.toPandas()
type(data.toPandas())
<class 'pandas.core.frame.DataFrame'>
    name age id gender new_id
0   ldsx  12  1      男      1
1  test1  20  1      女      1
2  test2  26  1      男      1
3  test3  19  1      女      1
4  test4  51  1      女      1
5  test5  13  1      男      1

transform dataframe转换

参数为处理函数,返回值必须为dataframe

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+

# 处理函数自定义最后返回了dataframe
def ldsx(spark_df):
    colums = [ str(i)+'_ldsx' for i in range(len(spark_df.columns)) ]
    return spark_df.toDF(*colums)
    
data.transform(ldsx).show()
+------+------+------+------+------+
|0_ldsx|1_ldsx|2_ldsx|3_ldsx|4_ldsx|
+------+------+------+------+------+
|  ldsx|    12|     1|    男|     1|
| test1|    20|     1|    女|     1|
| test2|    26|     1|    男|     1|
| test3|    19|     1|    女|     1|
| test4|    51|     1|    女|     1|
| test5|    13|     1|    男|     1|
+------+------+------+------+------+

union unionALL 并集不去重(按列顺序)

获得新dataframe,unionall别名为union,如果要去重使用distinct方法,不会解析对应的列名合并,是按照列的顺序合并的,硬合

df2 = spark.createDataFrame([(3, 'C'), (4, 'D')], ['id', 'value'])
df1 = spark.createDataFrame([(1, 'A'), (2, 'B'),(3, 'C'),(3, 'C')], ['id', 'value'])
df1.show()
+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  3|    C|
+---+-----+
df2.show()
+---+-----+
| id|value|
+---+-----+
|  3|    C|
|  4|    D|
+---+-----+
df1.union(df2)
DataFrame[id: bigint, value: string]
df1.union(df2).show()
+---+-----+
| id|value|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  3|    C|
|  3|    C|
|  4|    D|
+---+-----+

# 去重使用distinct
df1.union(df2).distinct().show()
+---+-----+
| id|value|
+---+-----+
|  2|    B|
|  1|    A|
|  3|    C|
|  4|    D|
+---+-----+

unionByName 并集不去重(按列名)

是否允许缺失列:allowMissingColumns,默认不允许

# 按照列名合并
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   1|   2|   3|
|   6|   4|   5|
+----+----+----+


# 对于不存在列进行填补
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6, 7]], ["col1", "col2", "col3", "col4"])
# allowMissingColumns True默认填补null
df1.unionByName(df2, allowMissingColumns=True).show()

+----+----+----+----+----+
|col0|col1|col2|col3|col4|
+----+----+----+----+----+
|   1|   2|   3|NULL|NULL|
|NULL|   4|   5|   6|   7|
+----+----+----+----+----+

unpivot 反转表(宽表转长表)

ids: 标识列
values:选中的列(LIST)
variableColumnName: 列名
valueColumnName:对应列的值

宽表转长表,一行变多行,除了选中的ids是不变的,但是会把选中的values中的列由列变成行记录,variableColumnName记录了反转前的列名,

valueColumnName 对应 variableColumnName 存储值。

data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12|  1|    男|     1|
|test1| 20|  1|    女|     1|
|test2| 26|  1|    男|     1|
|test3| 19|  1|    女|     1|
|test4| 51|  1|    女|     1|
|test5| 13|  1|    男|     1|
+-----+---+---+------+------+
# 一行变成三行,id不变 'age','name','gender'由列转行,c_col依次记录'age','name','gender',c_value则记录对应的值
data.unpivot('id',['age','name','gender'],'c_col','c_value').show()
+---+------+-------+
| id| c_col|c_value|
+---+------+-------+
|  1|   age|     12|
|  1|  name|   ldsx|
|  1|gender|     男|
|  1|   age|     20|
|  1|  name|  test1|
|  1|gender|     女|
|  1|   age|     26|
|  1|  name|  test2|
|  1|gender|     男|
|  1|   age|     19|
|  1|  name|  test3|
|  1|gender|     女|
|  1|   age|     51|
|  1|  name|  test4|
|  1|gender|     女|
|  1|   age|     13|
|  1|  name|  test5|
|  1|gender|     男|
+---+------+-------+

withColumn 添加列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

# 使用d1上的列或者用常量列
d1.withColumn('c_value2',d1.c_value).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
|  1|   age|     12|      12|
|  1|  name|   ldsx|    ldsx|
|  1|gender|     男|      男|
|  1|   age|     20|      20|
|  1|  name|  test1|   test1|
|  1|gender|     女|      女|
|  1|   age|     26|      26|
|  1|  name|  test2|   test2|
|  1|gender|     男|      男|
|  1|   age|     19|      19|
|  1|  name|  test3|   test3|
|  1|gender|     女|      女|
|  1|   age|     51|      51|
|  1|  name|  test4|   test4|
|  1|gender|     女|      女|
|  1|   age|     13|      13|
|  1|  name|  test5|   test5|
|  1|gender|     男|      男|
+---+------+-------+--------+
# 使用常量补充列
from pyspark.sql.functions import lit
d1.withColumn('c_value2',lit('ldsx')).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
|  1|   age|     12|    ldsx|
|  1|  name|   ldsx|    ldsx|
|  1|gender|     男|    ldsx|
|  1|   age|     20|    ldsx|
|  1|  name|  test1|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     26|    ldsx|
|  1|  name|  test2|    ldsx|
|  1|gender|     男|    ldsx|
|  1|   age|     19|    ldsx|
|  1|  name|  test3|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     51|    ldsx|
|  1|  name|  test4|    ldsx|
|  1|gender|     女|    ldsx|
|  1|   age|     13|    ldsx|
|  1|  name|  test5|    ldsx|
|  1|gender|     男|    ldsx|
+---+------+-------+--------+
# 使用表达式设置列
data = [(1,), (2,), (3,), (4,)]
df = spark.createDataFrame(data, ["number"])
df.show()
+------+
|number|
+------+
|     1|
|     2|
|     3|
|     4|
+------+
from pyspark.sql.functions import col, when
df.withColumn("new_number", when(df.number < 3, "Low").otherwise("High")).show()
------+----------+
|number|new_number|
+------+----------+
|     1|       Low|
|     2|       Low|
|     3|      High|
|     4|      High|
+------+----------+

withColumns 添加多列操作

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。

可以使用lit设置常量作为列

可以使用表达式设置列

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).show()
+---+-----+----+----+
|age| name|age2|age3|
+---+-----+----+----+
|  2|Alice|   4|   5|
|  5|  Bob|   7|   8|
+---+-----+----+----+

# 可使用表达式
df.withColumns({'h1': when(df.age < 2, "Low").otherwise("High"), 'h2': df.age + 3}).show()
+---+-----+----+---+
|age| name|  h1| h2|
+---+-----+----+---+
|  2|Alice|High|  5|
|  5|  Bob|High|  8|
+---+-----+----+---+

withColumnRenamed 列重命名

不存在的列重命名报错,返回新dataframe。

列,重命名列

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumnRenamed('age', 'age2').show()
+----+-----+
|age2| name|
+----+-----+
|   2|Alice|
|   5|  Bob|
+----+-----+

withColumnsRenamed 多列重命名

字典,列名的映射

df.withColumnsRenamed({'age':'new_age','name':'new_name'}).show()
+-------+--------+
|new_age|new_name|
+-------+--------+
|      2|   Alice|
|      5|     Bob|
+-------+--------+

withMetadata 设置元数据

更新元数据,返回新dataframe

df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
# 查看列的元数据
df.schema['age'].metadata
{}
# 设置元数据
df_meta = df.withMetadata('age', {'foo': 'bar'})
df_meta.schema['age'].metadata
{'foo': 'bar'}

write 存储表

write.saveAsTable

当追加插入的时候dataframe只需要scheam一致,会自动匹配

  • name: str, 表名

  • format: Optional[str] = None, 格式类型 hive,parquet…

  • mode: Optional[str] = None, 写入方式

    1. append:将this:class:DataFrame的内容附加到现有数据中,数据格式需要一致。
    2. “overwrite”:覆盖现有数据,数据格式不重要了,已此次覆盖为准。
    3. errorerrorifeists:如果数据已经存在,则抛出异常。
    4. ‘ignore’:如果数据已经存在,则自动忽略此操作。
  • partitionBy: Optional[Union[str, List[str]]] = None, 分区列表

df.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
+---+-----+
# 覆盖重写
df.write.saveAsTable('ldsx_test','parquet','overwrite',['age'])

# 追加写入
df.write.saveAsTable('ldsx_test','parquet','append',['age'])

# 另一种写法
df.write.format('parquet').mode('append').partitionBy(['age']).saveAsTable('ldsx_test')

在这里插入图片描述

在这里插入图片描述

insertInto

不会对scheam进行校验,按位置插入

d2.show()
+-----+----+
|name1|age1|
+-----+----+
|ldsx1|   2|
|ldsx2|   3|
+-----+----+
d2.write.insertInto('ldsx_test')
d2.schema
StructType([StructField('name1', StringType(), True), StructField('age1', LongType(), True)])

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2160956.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jenkins声明式流水线语法详解

最基本的语法包含 pipeline&#xff1a;所有有效的声明式流水线必须包含在一个 pipeline 块中stages&#xff1a;包含一系列一个或多个stage指令stage&#xff1a;stage包含在stages中进行&#xff0c;比如某个阶段steps&#xff1a;在阶段中具体得执行操作&#xff0c;一个或…

提升工作效率神器

这五款软件让你事半功倍 在当今快节奏的社会中&#xff0c;提高工作效率成为了每个人追求的目标。而在这个数字化时代&#xff0c;选择对的软件工具无疑是提高效率的关键。今天&#xff0c;我为大家推荐五款优秀的工作效率软件&#xff0c;帮助你在工作中事半功倍。 1、亿可达…

15个 Jenkins 面试题

Jenkins 已成为持续集成和持续部署 (CI/CD) 流程中使用最广泛的自动化服务器之一。凭借其强大的功能和广泛的插件生态系统&#xff0c;Jenkins 已成为全球软件开发团队的首选工具。如果您正在准备 Jenkins 面试&#xff0c;那么精通其概念、架构和最佳实践至关重要。 为了帮助…

1.3 MySql的用户管理

一、下载Mysql客户端 下载navicat:Navicat 中国 | 支持 MySQL、Redis、MariaDB、MongoDB、SQL Server、SQLite、Oracle 和 PostgreSQL 的数据库管理 二、安装Navicat 三、创建数据库 创建一个数据库的连接吧&#xff0c;因为这个界面儿是图形界面儿&#xff0c;所以我们创建…

深入分析MySQL事务日志-Redo Log日志

文章目录 事务日志-Redo Log2.1 Redo Log2.1.1 Redo Log与持久性2.1.2 Redo Log的工作原理2.1.3 Redo Log的落盘策略2.1.4 Redo Log的系统参数 事务日志-Redo Log 事务的隔离性是通过锁实现&#xff0c;而事务的原子性、和持久性则是通过事务日志实现。在MySQL中&#xff0c;事…

【吉林大学编译原理题库】正则表达式的书写

1. 2. 选A 3. 没啥好说的&#xff0c;按意思写就行&#xff1a; 4. &#xfeff;5.设字母表S{0,1}&#xff0c;写正则表达式表示所有偶数个0和偶数个1组成的字符串。 6. 设字母表S{0,1}&#xff0c;写正则表达式表示所有偶数个0和奇数个1组成的字符串。&#xff08;提示&am…

Token usage of Content Filtered messages in Azure OpenAI Services

题意&#xff1a;在Azure OpenAI服务中&#xff0c;内容过滤消息的令牌使用 问题背景&#xff1a; When sending a message to a chat via GetChatCompletions as a response, I get a RequestFailedException. In the exception, I get an answer for which category content…

2-101基于matlab的频带方差端点检测

基于matlab的频带方差端点检测&#xff0c;噪声频谱中&#xff0c;各频带之间变化很平缓&#xff0c;语音各频带之间变化较激烈。据此特征&#xff0c;语音和噪声就极易区分。计算短时频带方差&#xff0c;实质就是计算某一帧信号的各频带能量之间的方差。这种以短时频带方差作…

揭秘MySQL主从复制:打造高可用性与数据冗余的强效引擎

作者简介&#xff1a;我是团团儿&#xff0c;是一名专注于云计算领域的专业创作者&#xff0c;感谢大家的关注 座右铭&#xff1a; 云端筑梦&#xff0c;数据为翼&#xff0c;探索无限可能&#xff0c;引领云计算新纪元 个人主页&#xff1a;团儿.-CSDN博客 目录 前言&#…

从Web2到Web3:探索下一代互联网的无限可能性

互联网经历了从Web1到Web2的重大变革&#xff0c;现在正迈向Web3。Web2通过社交媒体、电子商务和内容平台改变了我们的数字生活&#xff0c;但同时也伴随着中心化平台的垄断和用户数据被广泛控制的问题。而Web3的出现&#xff0c;则试图通过去中心化技术解决这些挑战&#xff0…

人到中年,最清醒的活法—沉浸式做自己

生活中&#xff0c;你是不是常常被这样的事情所困扰&#xff1f; 工作的时候&#xff0c;每天被千头万绪的杂事缠身&#xff0c;看着一堆待完成事项&#xff0c;和工作群里一堆的消息在轰炸你&#xff0c;内心顿感烦躁甚至暴怒。 经常因为领导&#xff0c;同事或者熟人甚至陌生…

java 洛谷题单【算法1-7】搜索

P1219 [USACO1.5] 八皇后 Checker Challenge 解题思路 回溯法 递归与回溯: 从第0行开始&#xff0c;为每个行尝试放置棋子的位置&#xff0c;检查放置是否违反约束条件。如果放置合法&#xff0c;则继续递归处理下一行&#xff08;即下一层递归&#xff09;。如果当前行无法找…

【Go语言】深入解读Go语言中的指针,助你拨开迷雾见月明

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

浅谈提示工程之In-context learning技术

提示工程之In-context learning技术&#xff1b; 通过一张图片围绕下边几个方面进行简单说明 概念起因本质结构注意事项 日常总结

SQL语法学习与实战应用

第一章 引言 1.1 MySQL数据库概述 MySQL&#xff0c;作为一种广泛使用的关系型数据库管理系统&#xff0c;自其问世以来&#xff0c;便凭借开源、高性能及低成本等显著特点&#xff0c;迅速占据了广泛的市场份额。这一系统不仅支持大规模并发访问&#xff0c;更提供了多样化的…

【最新华为OD机试E卷-支持在线评测】绘图机器(100分)多语言题解-(Python/C/JavaScript/Java/Cpp)

🍭 大家好这里是春秋招笔试突围 ,一枚热爱算法的程序员 💻 ACM金牌🏅️团队 | 大厂实习经历 | 多年算法竞赛经历 ✨ 本系列打算持续跟新华为OD-E/D卷的多语言AC题解 🧩 大部分包含 Python / C / Javascript / Java / Cpp 多语言代码 👏 感谢大家的订阅➕ 和 喜欢�…

【ARM】MDK-当选择AC5时每次点击build都会全编译

1、 文档目标 解决MDK中选择AC5时每次点击build都会全编译 2、 问题场景 在MDK中点击build时&#xff0c;正常会只进行增量编译&#xff0c;但目前每次点击的时候都会全编译。 3、软硬件环境 1 软件版本&#xff1a;Keil MDK 5.38a 2 电脑环境&#xff1a;Window 10 4、解决…

新手操作指引:快速上手腾讯混元大模型

引言 腾讯混元大模型是一款功能强大的AI工具&#xff0c;适用于文本生成、图像创作和视频生成等多种应用场景。对于新手用户&#xff0c;快速上手并充分利用这一工具可能会有些挑战。本文将提供详细的新手操作指引&#xff0c;帮助您轻松开始使用腾讯混元大模型。 步骤一&…

kubernetes网络(二)之bird实现节点间BGP互联的实验

摘要 上一篇文章中我们学习了calico的原理&#xff0c;kubernetes中的node节点&#xff0c;利用 calico 的 bird 程序相互学习路由&#xff0c;为了加深对 bird 程序的认识&#xff0c;本文我们将使用bird进行实验&#xff0c;实验中实现了BGP FULL MESH模式让宿主相互学习到对…

个人行政复议在线预约系统开发+ssm论文源码调试讲解

第二章 开发工具及关键技术介绍 2.1 JAVA技术 Java主要采用CORBA技术和安全模型&#xff0c;可以在互联网应用的数据保护。它还提供了对EJB&#xff08;Enterprise JavaBeans&#xff09;的全面支持&#xff0c;java servlet API&#xff0c;JSP&#xff08;java server pages…