文章目录
- Pyspark sql DataFrame
- 相关文章
- toDF 设置新列名
- toJSON row对象转换json字符串
- toLocallterator 获取迭代器
- toPandas 转换python dataframe
- transform dataframe转换
- union unionALL 并集不去重(按列顺序)
- unionByName 并集不去重(按列名)
- unpivot 反转表(宽表转长表)
- withColumn 添加列操作
- withColumns 添加多列操作
- withColumnRenamed 列重命名
- withColumnsRenamed 多列重命名
- withMetadata 设置元数据
- write 存储表
- write.saveAsTable
- insertInto
Pyspark sql DataFrame
相关文章
Pyspark下操作dataframe方法(1)
Pyspark下操作dataframe方法(2)
Pyspark下操作dataframe方法(3)
Pyspark下操作dataframe方法(4)
Pyspark下操作dataframe方法(5)
toDF 设置新列名
列名更新,将会按照新列名顺序的替换原列名返回新dataframe,更新列名数量需要跟原始列名数量一致。
from pyspark.sql.functions import lit
data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12| 1| 男| 1|
|test1| 20| 1| 女| 1|
|test2| 26| 1| 男| 1|
|test3| 19| 1| 女| 1|
|test4| 51| 1| 女| 1|
|test5| 13| 1| 男| 1|
+-----+---+---+------+------+
data.toDF(*['n1','n2','n3','n5','n4']).show()
+-----+---+---+---+---+
| n1| n2| n3| n5| n4|
+-----+---+---+---+---+
| ldsx| 12| 1| 男| 1|
|test1| 20| 1| 女| 1|
|test2| 26| 1| 男| 1|
|test3| 19| 1| 女| 1|
|test4| 51| 1| 女| 1|
|test5| 13| 1| 男| 1|
+-----+---+---+---+---+
toJSON row对象转换json字符串
把dataframe的row对象转换为json字符串,返回rdd
data.rdd.first()
Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
# data.toJSON()返回rdd类型
data.toJSON().first()
'{"name":"ldsx","age":"12","id":"1","gender":"男","new_id":"1"}'
toLocallterator 获取迭代器
返回一个迭代器,其中包含此DataFrame中的所有行。迭代器将消耗与此DataFrame中最大分区一样多的内存。通过预取,它可能会消耗最多2个最大分区的内存。
d1 = data.toLocalIterator()
d1
<generator object _local_iterator_from_socket.<locals>.PyLocalIterable.__iter__ at 0x7f55c86e0570>
# 便利迭代器
for i in d1:
print(i)
Row(name='ldsx', age='12', id='1', gender='男', new_id='1')
Row(name='test1', age='20', id='1', gender='女', new_id='1')
Row(name='test2', age='26', id='1', gender='男', new_id='1')
Row(name='test3', age='19', id='1', gender='女', new_id='1')
Row(name='test4', age='51', id='1', gender='女', new_id='1')
Row(name='test5', age='13', id='1', gender='男', new_id='1')
toPandas 转换python dataframe
需要python环境安装pandas的前提下使用,且dataframe需要很小,因为所有数据都加载到driver的内存中。
data.toPandas()
type(data.toPandas())
<class 'pandas.core.frame.DataFrame'>
name age id gender new_id
0 ldsx 12 1 男 1
1 test1 20 1 女 1
2 test2 26 1 男 1
3 test3 19 1 女 1
4 test4 51 1 女 1
5 test5 13 1 男 1
transform dataframe转换
参数为处理函数,返回值必须为dataframe
data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12| 1| 男| 1|
|test1| 20| 1| 女| 1|
|test2| 26| 1| 男| 1|
|test3| 19| 1| 女| 1|
|test4| 51| 1| 女| 1|
|test5| 13| 1| 男| 1|
+-----+---+---+------+------+
# 处理函数自定义最后返回了dataframe
def ldsx(spark_df):
colums = [ str(i)+'_ldsx' for i in range(len(spark_df.columns)) ]
return spark_df.toDF(*colums)
data.transform(ldsx).show()
+------+------+------+------+------+
|0_ldsx|1_ldsx|2_ldsx|3_ldsx|4_ldsx|
+------+------+------+------+------+
| ldsx| 12| 1| 男| 1|
| test1| 20| 1| 女| 1|
| test2| 26| 1| 男| 1|
| test3| 19| 1| 女| 1|
| test4| 51| 1| 女| 1|
| test5| 13| 1| 男| 1|
+------+------+------+------+------+
union unionALL 并集不去重(按列顺序)
获得新dataframe,unionall别名为union,如果要去重使用distinct方法,不会解析对应的列名合并,是按照列的顺序合并的,硬合
df2 = spark.createDataFrame([(3, 'C'), (4, 'D')], ['id', 'value'])
df1 = spark.createDataFrame([(1, 'A'), (2, 'B'),(3, 'C'),(3, 'C')], ['id', 'value'])
df1.show()
+---+-----+
| id|value|
+---+-----+
| 1| A|
| 2| B|
| 3| C|
| 3| C|
+---+-----+
df2.show()
+---+-----+
| id|value|
+---+-----+
| 3| C|
| 4| D|
+---+-----+
df1.union(df2)
DataFrame[id: bigint, value: string]
df1.union(df2).show()
+---+-----+
| id|value|
+---+-----+
| 1| A|
| 2| B|
| 3| C|
| 3| C|
| 3| C|
| 4| D|
+---+-----+
# 去重使用distinct
df1.union(df2).distinct().show()
+---+-----+
| id|value|
+---+-----+
| 2| B|
| 1| A|
| 3| C|
| 4| D|
+---+-----+
unionByName 并集不去重(按列名)
是否允许缺失列:allowMissingColumns,默认不允许
# 按照列名合并
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
df1.unionByName(df2).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
| 1| 2| 3|
| 6| 4| 5|
+----+----+----+
# 对于不存在列进行填补
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6, 7]], ["col1", "col2", "col3", "col4"])
# allowMissingColumns True默认填补null
df1.unionByName(df2, allowMissingColumns=True).show()
+----+----+----+----+----+
|col0|col1|col2|col3|col4|
+----+----+----+----+----+
| 1| 2| 3|NULL|NULL|
|NULL| 4| 5| 6| 7|
+----+----+----+----+----+
unpivot 反转表(宽表转长表)
ids: 标识列
values:选中的列(LIST)
variableColumnName: 列名
valueColumnName:对应列的值
宽表转长表,一行变多行,除了选中的ids是不变的,但是会把选中的values中的列由列变成行记录,variableColumnName记录了反转前的列名,
valueColumnName 对应 variableColumnName 存储值。
data.show()
+-----+---+---+------+------+
| name|age| id|gender|new_id|
+-----+---+---+------+------+
| ldsx| 12| 1| 男| 1|
|test1| 20| 1| 女| 1|
|test2| 26| 1| 男| 1|
|test3| 19| 1| 女| 1|
|test4| 51| 1| 女| 1|
|test5| 13| 1| 男| 1|
+-----+---+---+------+------+
# 一行变成三行,id不变 'age','name','gender'由列转行,c_col依次记录'age','name','gender',c_value则记录对应的值
data.unpivot('id',['age','name','gender'],'c_col','c_value').show()
+---+------+-------+
| id| c_col|c_value|
+---+------+-------+
| 1| age| 12|
| 1| name| ldsx|
| 1|gender| 男|
| 1| age| 20|
| 1| name| test1|
| 1|gender| 女|
| 1| age| 26|
| 1| name| test2|
| 1|gender| 男|
| 1| age| 19|
| 1| name| test3|
| 1|gender| 女|
| 1| age| 51|
| 1| name| test4|
| 1|gender| 女|
| 1| age| 13|
| 1| name| test5|
| 1|gender| 男|
+---+------+-------+
withColumn 添加列操作
通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。
可以使用lit设置常量作为列
可以使用表达式设置列
# 使用d1上的列或者用常量列
d1.withColumn('c_value2',d1.c_value).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
| 1| age| 12| 12|
| 1| name| ldsx| ldsx|
| 1|gender| 男| 男|
| 1| age| 20| 20|
| 1| name| test1| test1|
| 1|gender| 女| 女|
| 1| age| 26| 26|
| 1| name| test2| test2|
| 1|gender| 男| 男|
| 1| age| 19| 19|
| 1| name| test3| test3|
| 1|gender| 女| 女|
| 1| age| 51| 51|
| 1| name| test4| test4|
| 1|gender| 女| 女|
| 1| age| 13| 13|
| 1| name| test5| test5|
| 1|gender| 男| 男|
+---+------+-------+--------+
# 使用常量补充列
from pyspark.sql.functions import lit
d1.withColumn('c_value2',lit('ldsx')).show()
+---+------+-------+--------+
| id| c_col|c_value|c_value2|
+---+------+-------+--------+
| 1| age| 12| ldsx|
| 1| name| ldsx| ldsx|
| 1|gender| 男| ldsx|
| 1| age| 20| ldsx|
| 1| name| test1| ldsx|
| 1|gender| 女| ldsx|
| 1| age| 26| ldsx|
| 1| name| test2| ldsx|
| 1|gender| 男| ldsx|
| 1| age| 19| ldsx|
| 1| name| test3| ldsx|
| 1|gender| 女| ldsx|
| 1| age| 51| ldsx|
| 1| name| test4| ldsx|
| 1|gender| 女| ldsx|
| 1| age| 13| ldsx|
| 1| name| test5| ldsx|
| 1|gender| 男| ldsx|
+---+------+-------+--------+
# 使用表达式设置列
data = [(1,), (2,), (3,), (4,)]
df = spark.createDataFrame(data, ["number"])
df.show()
+------+
|number|
+------+
| 1|
| 2|
| 3|
| 4|
+------+
from pyspark.sql.functions import col, when
df.withColumn("new_number", when(df.number < 3, "Low").otherwise("High")).show()
------+----------+
|number|new_number|
+------+----------+
| 1| Low|
| 2| Low|
| 3| High|
| 4| High|
+------+----------+
withColumns 添加多列操作
通过添加列或替换具有相同名称的现有列来返回新的DataFrame。
列表达式必须是此DataFrame上的表达式;列只能引用此数据集提供的属性。添加引用其他数据集的列是错误的。
可以使用lit设置常量作为列
可以使用表达式设置列
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}).show()
+---+-----+----+----+
|age| name|age2|age3|
+---+-----+----+----+
| 2|Alice| 4| 5|
| 5| Bob| 7| 8|
+---+-----+----+----+
# 可使用表达式
df.withColumns({'h1': when(df.age < 2, "Low").otherwise("High"), 'h2': df.age + 3}).show()
+---+-----+----+---+
|age| name| h1| h2|
+---+-----+----+---+
| 2|Alice|High| 5|
| 5| Bob|High| 8|
+---+-----+----+---+
withColumnRenamed 列重命名
不存在的列重命名报错,返回新dataframe。
列,重命名列
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
df.withColumnRenamed('age', 'age2').show()
+----+-----+
|age2| name|
+----+-----+
| 2|Alice|
| 5| Bob|
+----+-----+
withColumnsRenamed 多列重命名
字典,列名的映射
df.withColumnsRenamed({'age':'new_age','name':'new_name'}).show()
+-------+--------+
|new_age|new_name|
+-------+--------+
| 2| Alice|
| 5| Bob|
+-------+--------+
withMetadata 设置元数据
更新元数据,返回新dataframe
df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"])
# 查看列的元数据
df.schema['age'].metadata
{}
# 设置元数据
df_meta = df.withMetadata('age', {'foo': 'bar'})
df_meta.schema['age'].metadata
{'foo': 'bar'}
write 存储表
write.saveAsTable
当追加插入的时候dataframe只需要scheam一致,会自动匹配
-
name: str, 表名
-
format: Optional[str] = None, 格式类型 hive,parquet…
-
mode: Optional[str] = None, 写入方式
append
:将this:class:DataFrame
的内容附加到现有数据中,数据格式需要一致。- “overwrite”:覆盖现有数据,数据格式不重要了,已此次覆盖为准。
error
或errorifeists
:如果数据已经存在,则抛出异常。- ‘ignore’:如果数据已经存在,则自动忽略此操作。
-
partitionBy: Optional[Union[str, List[str]]] = None, 分区列表
df.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
+---+-----+
# 覆盖重写
df.write.saveAsTable('ldsx_test','parquet','overwrite',['age'])
# 追加写入
df.write.saveAsTable('ldsx_test','parquet','append',['age'])
# 另一种写法
df.write.format('parquet').mode('append').partitionBy(['age']).saveAsTable('ldsx_test')
insertInto
不会对scheam进行校验,按位置插入
d2.show()
+-----+----+
|name1|age1|
+-----+----+
|ldsx1| 2|
|ldsx2| 3|
+-----+----+
d2.write.insertInto('ldsx_test')
d2.schema
StructType([StructField('name1', StringType(), True), StructField('age1', LongType(), True)])