欢迎关注微信公众号,更多优质内容会在微信公众号首发
1. pyspark中时间格式的数据转换为字符串格式的时间,示例代码
from datetime import datetime
date_obj = datetime(2023, 7, 2)
formatted_date = date_obj.strftime("%Y-%m-%d %H:%M:%S")
print(formatted_date) # 2023-07-02 00:00:00
2. pysoark中打印dataframe中的有哪些列,示例代码
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df
df = spark.createDataFrame([(1, 'John', 25), (2, 'Alice', 30), (3, 'Bob', 35)], ['id', 'name', 'age'])
# 打印 DataFrame 的列
columns = df.columns
print(columns) # ['id', 'name', 'age']
3. pyspark中选择其中一些列,示例代码
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df
df = spark.createDataFrame([(1, 'John', 25), (2, 'Alice', 30), (3, 'Bob', 35)], ['id', 'name', 'age'])
# 选择 'name' 和 'age' 列
selected_columns = ['name', 'age']
selected_df = df.select(selected_columns)
# 打印选择的列
selected_df.show()
+-----+---+
| name|age|
+-----+---+
| John| 25|
|Alice| 30|
| Bob| 35|
+-----+---+
4. pyspark中选择其中一列不是空的行,示例代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df
df = spark.createDataFrame([(1, 'John', 25), (2, None, 30), (3, 'Bob', None)], ['id', 'name', 'age'])
# 选择 'name' 列不为空的行
selected_df = df.filter(col('name').isNotNull())
# 打印选择的行
selected_df.show()
+---+----+---+
| id|name|age|
+---+----+---+
| 1|John| 25|
+---+----+---+
5. pyspark中的dataframe统计有多少行
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df
df = spark.createDataFrame([(1, 'John', 25), (2, 'Alice', 30), (3, 'Bob', 35)], ['id', 'name', 'age'])
# 打印 DataFrame 的行数
row_count = df.count()
print(row_count)
3
6. pyspark的dataframe删除重复行,示例代码
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df
df = spark.createDataFrame([(1, 'John'), (2, 'Alice'), (3, 'Bob'), (4, 'Alice')], ['id', 'name'])
# 选择 'name' 列的不重复值
distinct_values = df.select('name').distinct()
7. pyspark中的DataFrame对一列分组统计数量,并添加到原来的dataframe,示例代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df
df = spark.createDataFrame([(1, 'John'), (2, 'Alice'), (3, 'Bob'), (4, 'Alice')], ['id', 'name'])
# 对 'name' 列进行分组,并统计每个值的数量
grouped_df = df.groupBy('name').agg(count('*').alias('count'))
# 打印分组统计结果
grouped_df.show()
+-----+-----+
| name|count|
+-----+-----+
| Bob| 1|
|Alice| 2|
| John| 1|
+-----+-----+
8. spark中的DataFrame写入csv,示例代码
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df
df = spark.createDataFrame([(1, 'John', 25), (2, 'Alice', 30), (3, 'Bob', 35)], ['id', 'name', 'age'])
# 将 DataFrame 写入为 CSV 文件
df.write.csv('path/to/output.csv', header=True)
9. pyspark中的dataframe取前n行,示例代码
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df
df = spark.createDataFrame([(1, 'John', 25), (2, 'Alice', 30), (3, 'Bob', 35)], ['id', 'name', 'age'])
# 获取前 10 行
top_10_rows = df.limit(10)
# 打印前 10 行
top_10_rows.show()
10. 打印pyspark中的dataframe的某一列的纯文本值,示例代码
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df
df = spark.createDataFrame([(1, 'John'), (2, 'Alice'), (3, 'Bob')], ['id', 'a'])
# 选择 'a' 列的纯文本值并打印
text_values = df.select('a').collect()
for row in text_values:
print(row['a'])
"""
输出
John
Alice
Bob
"""
11. pyspark中用lit给dataframe添加一列,示例代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 创建包含常量值的列
df = spark.range(5).select(lit('Hello').alias('message'))
# 打印 DataFrame
df.show()
"""
+-------+
|message|
+-------+
| Hello|
| Hello|
| Hello|
| Hello|
| Hello|
+-------+
"""
12. 打印pyspark中dataframe中列的数据类型和列名,示例代码1
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df,列名为 column_name
df = spark.createDataFrame([(1, "Hello"), (2, "World"), (3, "Spark")], ["id", "column_name"])
# 打印列的数据类型
df.printSchema()
"""
root
|-- id: long (nullable = true)
|-- column_name: string (nullable = true)
"""
示例代码2
from pyspark.sql import SparkSession
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df,列名为 column_name
df = spark.createDataFrame([(1, "Hello"), (2, "World"), (3, "Spark")], ["id", "column_name"])
# 获取列的数据类型
column_data_type = df.dtypes[1][1]
# 打印列的数据类型
print(column_data_type)
"""
string
"""
13. pyspark中的dataframe的两列做某种运算并形成新的列
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()
# 假设您的 DataFrame 名称为 df,包含两列:col_a 和 col_b
data = [(1, 2), (3, 4), (5, 6)]
df = spark.createDataFrame(data, ["col_a", "col_b"])
# 将 col_a 列除以 col_b 列,并生成新的列 col_result
df_with_division = df.withColumn("col_result", col("col_a") / col("col_b"))
# 打印包含新列的 DataFrame
df_with_division.show()
"""
+-----+-----+------------------+
|col_a|col_b| col_result|
+-----+-----+------------------+
| 1| 2| 0.5|
| 3| 4| 0.75|
| 5| 6|0.8333333333333334|
+-----+-----+------------------+
"""