PySpark 之 SparkSQL 编程

news2024/11/20 21:29:59

1. DataFrame 的创建

1.1 RDD 和 DataFrame 的区别

  • RDD 是一种弹性分布式数据集,Spark中的基本抽象。表示一种不可变的、分区储存的集合,可以进行并行操作
  • DataFrame是一种以列对数据进行分组表达的分布式集合, DataFrame等同于Spark SQL中的关系表。相同点是,他们都是为了支持分布式计算而设计

在这里插入图片描述

注意:rddExcutor 上跑的大部分是 Python 代码,只有少部分是 java 字节码;而 SparkSQLExcutor 上跑的全是 Java 字节码,因此其性能要比 rdd 更好,灵活性也更好!

1.1 二元组

# coding=utf-8

from pyspark.sql import SparkSession

session = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = session.sparkContext

# 使用二元组创建DataFrame
a = [('Alice', 1)]
df = session.createDataFrame(a, ['name', 'age'])
df.show()

+-----+---+
| name|age|
+-----+---+
|Alice|  1|
+-----+---+

1.2 键值对

# 键值对
b = [{'name': 'Alice', 'age': 1}]
df = session.createDataFrame(b)
df.show()

1.3 rdd 创建

# rdd 创建
c = [('Alice', 1)]
rdd = sc.parallelize(c)
df = session.createDataFrame(data=rdd, schema=['name', 'age'])
df.show()

1.4 基于 rdd 和 ROW 创建

# 基于rdd和ROW创建DataFrame
from pyspark import Row

d = [('Alice', 1)]
rdd = sc.parallelize(d)
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
df = session.createDataFrame(person)
df.show()

1.5 基于 rdd 和 StructType 创建

# 基于rdd和StructType创建DataFrame

from pyspark.sql.types import StructType, StringType, IntegerType, StructField

e = [('Alice', 1)]
rdd = sc.parallelize(e)
schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)
    ]
)
df = session.createDataFrame(rdd, schema)
df.show()

1.6 基于 pandas 创建

# 基于 pandas 创建
import pandas as pd

# 方法一
f = [('Alice', 1)]
df = session.createDataFrame(data=pd.DataFrame(f), schema=['name', 'age'])
df.show()

# 方法二
pdf = pd.DataFrame([("LiLei",18),("HanMeiMei",17)],columns = ["name","age"])
df = spark.createDataFrame(pdf)
df.show()

1.7 从文件读取创建

# person.json
{"name": "rose", "age": 18}
{"name": "lila", "age": 19}

# person.csv
name,age
rose,18
lila, 19

# person.txt
rose
lila

创建方式:

# 从文件读取
df1 = session.read.json('person.json')
df1.show()

df2 = session.read.load('person.json', format='json')
df2.show()

df3 = session.read.csv('person.csv', sep=',', header=True)
df3.show()

# 可从 hdfs 中读取
df4 = session.read.text(paths='person.txt')
df4.show()

运行结果:

+---+----+
|age|name|
+---+----+
| 18|rose|
| 19|lila|
+---+----+

+---+----+
|age|name|
+---+----+
| 18|rose|
| 19|lila|
+---+----+

+----+---+
|name|age|
+----+---+
|rose| 18|
|lila| 19|
+----+---+

+-----+
|value|
+-----+
| rose|
| lila|
+-----+

1.8 从MySQL 数据库读取

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F


sc = SparkContext("local", appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
    url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"
        "useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
        "useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()
df.show(n=5)
sc.stop()

注意:需要先下载安装:Mysql-connector-java.jar

1.9 toDF方法

rdd = sc.parallelize([("LiLei", 15, 88), ("HanMeiMei", 16, 90), ("DaChui", 17, 60)])
df = rdd.toDF(["name", "age", "score"])
df.show()

1.10 读取hive数据表

session.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
session.sql("LOAD DATA LOCAL INPATH 'data/kv1.txt' INTO TABLE src")
df = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
df.show(5)

1.11 读取 parquet 文件

df = session.read.parquet("data/users.parquet")
df.show()

参考文章

  • 基于pyspark创建DataFrame的几种方法

2. DataFrame 保存文件

可以保存成 csv 文件、json文件、parquet 文件或者保存成 hive 数据表:

# 保存成 csv 文件
df = session.read.format("json").load("data/people.json")
df.write.format("csv").option("header","true").save("people.csv")

# 先转换成 rdd 再保存成 txt 文件
df.rdd.saveAsTextFile("people.txt")

# 保存成 json 文件
df.write.json("people.json")

# 保存成 parquet 文件, 压缩格式, 占用存储小, 且是 spark 内存中存储格式,加载最快
df.write.partitionBy("age").format("parquet").save("namesAndAges.parquet")
df.write.parquet("people.parquet")

# 保存成 hive 数据表
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people")

3. 常用 API 操作

3.1 Action 操作

常用 Action 操作包括 show、count、collect、describe、take、head、first 等操作

# coding=utf-8
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, StructField

session = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = session.sparkContext

info_list = [("rose", 15, 'F'), ("lila", 16, 'F'), ("john", 17, 'M')]
schema = ["name", "age", "gender"]
df = session.createDataFrame(info_list, schema)

df.show(n=2, truncate=True, vertical=False)
print(df.count())

data_list = df.collect()
print(data_list)

print(df.first())	# 返回第一行
print(df.take(2))	# 以Row对象的形式返回DataFrame的前几行
print(df.head(2))	# 返回前 n 行
print(df.describe())	# 探索性数据分析
df.printSchema()	# 以树的格式输出到控制台

运行结果:

# df.show()
+----+---+------+
|name|age|gender|
+----+---+------+
|rose| 15|     F|
|lila| 16|     F|
+----+---+------+
only showing top 2 rows

# df.count()
3

# df.collect()
[Row(name='rose', age=15, gender='F'), Row(name='lila', age=16, gender='F'), Row(name='john', age=17, gender='M')]

# df.first()
Row(name='rose', age=15, gender='F')

# df.take(2)
[Row(name='rose', age=15, gender='F'), Row(name='lila', age=16, gender='F')]

# df.head(2)
[Row(name='rose', age=15, gender='F'), Row(name='lila', age=16, gender='F')]

# df.describe()
DataFrame[summary: string, name: string, age: string, gender: string]

# df.priceSchema()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)

3.2 类 RDD 操作

DataFrameRDD 之间可以相互转换:

# df ---> rdd
df.rdd

# rdd ---> df
df.toDF(schema)

DataFrame 转换为 rdd 后,一些常用的 rdd 操作也是支持的,比如:distinct、cache、sample、foreach、intersect、except、map、flatMap、filter,但是不够灵活:

# coding=utf-8
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
from pyspark.sql.functions import *

session = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = session.sparkContext

# 类 rdd 操作,转换为 rdd,使用 .rdd
data_list = [("Hello World",), ("Hello Python",), ("Hello Spark",)]
schema = ['value']
df = session.createDataFrame(data_list, schema)
df.show()

+------------+
|       value|
+------------+
| Hello World|
|Hello Python|
| Hello Spark|
+------------+

rdd = df.rdd  # 转换为 rdd
print(rdd.collect())  # [Row(value='Hello World'), Row(value='Hello Python'), Row(value='Hello Spark')]

1、map

# 转换为大写,再转换为 df
df_rdd = rdd.map(lambda x: Row(x[0].upper()))
df_rdd.toDF(schema).show()

+------------+
|       value|
+------------+
| HELLO WORLD|
|HELLO PYTHON|
| HELLO SPARK|
+------------+

2、flatMap

# df_flat = rdd.flatMap(lambda x: x[0].split(" "))
# print(df_flat.collect())    # ['Hello', 'World', 'Hello', 'Python', 'Hello', 'Spark']

df_flat = rdd.flatMap(lambda x: x[0].split(" ")).map(lambda x: Row(x)).toDF(schema)
df_flat.show()

+------+
| value|
+------+
| Hello|
| World|
| Hello|
|Python|
| Hello|
| Spark|
+------+

3、filter

# 过滤只有以 Python 结尾的值
df_filter = rdd.filter(lambda x: x[0].endswith('Python'))
print(df_filter.collect())
df_filter.toDF(schema).show()

[Row(value='Hello Python')]

+------------+
|       value|
+------------+
|Hello Python|
+------------+

4、distinct 去重:

df_flat.distinct().show()

+------+
| value|
+------+
| World|
| Hello|
|Python|
| Spark|
+------+

5、cache 缓存:

df.cache()  # 缓存
df.unpersist()  # 去掉缓存

6、sample 抽样:

df_sample = df.sample(False, 0.6, 0)
df_sample.show()

+------------+
|       value|
+------------+
| Hello World|
|Hello Python|
| Hello Spark|
+------------+

7、intersect 交集:

df2 = session.createDataFrame([["Hello World"], ["Hello Scala"], ["Hello Spark"]]).toDF("value")

df_intersect = df.intersect(df2)
df_intersect.show()

+-----------+
|      value|
+-----------+
|Hello Spark|
|Hello World|
+-----------+

8、exceptAll 补集:

# 无补集
df_except = df.exceptAll(df2)
df_except.show()

3.2.1 df 转换 rdd 后 map、flatMap、filter区别

1、rdd.map

def func_1(row):
    print(row)	

    return row[0].upper()

rdd1 = rdd.map(lambda row: func_1(row))
print(rdd1.collect())	# ['HELLO WORLD', 'HELLO PYTHON', 'HELLO SPARK']

# 每一个 row
Row(value='Hello Spark')
Row(value='Hello Python')
Row(value='Hello World')

2、rdd.filter

def func_2(row):
    print(row)

    return row

rdd2 = rdd.filter(lambda row: func_2(row))
print(rdd2.collect())	# [Row(value='Hello World'), Row(value='Hello Python'), Row(value='Hello Spark')]

# 每一个 row
Row(value='Hello Spark')
Row(value='Hello Python')
Row(value='Hello World')

3、rdd.flatMap

def func_3(row):
    print(row)
    print(dir(row))
    print(len(row))	# 1

    return row[0].split(" ")

rdd3 = rdd.flatMap(lambda row: func_3(row))
print(rdd3.collect())	# ['Hello', 'World', 'Hello', 'Python', 'Hello', 'Spark']

# 每一个 row
Row(value='Hello Spark')
Row(value='Hello Python')
Row(value='Hello World')

总结

  • rowRow 对象, 类似于 list,在上面每个 row 只有一个元素
  • filter 后返回的仍然是 Row 对象,而 map、flatMap 却是 Python 对象

3.3 类 SQL操作

sql 操作比类 rdd 操作更为灵活,包括查询 select、selectExpr、where、表连接 join、union、unionAll、表分组 groupby、agg、pivot 等:

# coding=utf-8
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
from pyspark.sql.functions import *

session = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sc = session.sparkContext

info_list = [("rose", 15, 'F'), ("lila", 16, 'F'), ("john", 17, 'M'), ('david', 18, None)]
schema = ["name", "age", "gender"]
df = session.createDataFrame(info_list, schema)

df.show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

3.3.1 表查询

常用方法有:select、filter、selectExpr、where

# 选择某列,限制查询条数 limit
df.select('name').limit(2).show()

+----+
|name|
+----+
|rose|
|lila|
+----+

# 选择多列,并对某列进行计算
df.select('name', df['age'] + 1).show()

+-----+---------+
| name|(age + 1)|
+-----+---------+
| rose|       16|
| lila|       17|
| john|       18|
|david|       19|
+-----+---------+

# 选择多列,并对某列进行计算,最后对计算的列名进行重命名
df.select('name', -df['age'] + 2021).toDF('name', 'birthday').show()
+-----+--------+
| name|birthday|
+-----+--------+
| rose|    2006|
| lila|    2005|
| john|    2004|
|david|    2003|
+-----+--------+

# selectExpr 使用 UDF 函数,指定别名
import datetime

session.udf.register('getBirthYear', lambda age: datetime.datetime.now().year - age)
df.selectExpr('name', 'getBirthYear(age) as birth_year', 'UPPER(gender) as gender').show()

+-----+----------+------+
| name|birth_year|gender|
+-----+----------+------+
| rose|      2006|     F|
| lila|      2005|     F|
| john|      2004|     M|
|david|      2003|  null|
+-----+----------+------+

# where 查询条件
df.where('gender="M" and age=17').show()
+----+---+------+
|name|age|gender|
+----+---+------+
|john| 17|     M|
+----+---+------+

# filter 查询
df.filter(df['age'] > 16).show()
df.filter('gender = "M"').show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

+----+---+------+
|name|age|gender|
+----+---+------+
|john| 17|     M|
+----+---+------+

3.3.2 表连接 join、union

df_score = session.createDataFrame([('john', 'M', 88), ('rose', 'F', 90), ('david', 'M', 50)],
                                   schema=['name', 'gender', 'score'])
df_score.show()

| name|gender|score|
+-----+------+-----+
| john|     M|   88|
| rose|     F|   90|
|david|     M|   50|
+-----+------+-----+

# join 单个字段连接
df.join(df_score.select('name', 'score'), 'name').show()

+-----+---+------+-----+
| name|age|gender|score|
+-----+---+------+-----+
|david| 18|  null|   50|
| john| 17|     M|   88|
| rose| 15|     F|   90|
+-----+---+------+-----+

# join 多个字段连接
df.join(df_score, ['name', 'gender']).show()

+----+------+---+-----+
|name|gender|age|score|
+----+------+---+-----+
|john|     M| 17|   88|
|rose|     F| 15|   90|
+----+------+---+-----+


# 指定连接方式:"inner","left","right","outer","semi","full","leftanti","anti"等
df.join(df_score, ['name', 'gender'], 'right').show()

+-----+------+----+-----+
| name|gender| age|score|
+-----+------+----+-----+
| john|     M|  17|   88|
|david|     M|null|   50|
| rose|     F|  15|   90|
+-----+------+----+-----+

# 灵活指定连接关系
df_mark = df_score.withColumnRenamed('gender', 'sex')
df_mark.show()

+-----+---+-----+
| name|sex|score|
+-----+---+-----+
| john|  M|   88|
| rose|  F|   90|
|david|  M|   50|
+-----+---+-----+

df.join(df_mark, (df['name'] == df_mark['name']) & (df['gender'] == df_mark['sex']), 'inner').show()

+----+---+------+----+---+-----+
|name|age|gender|name|sex|score|
+----+---+------+----+---+-----+
|john| 17|     M|john|  M|   88|
|rose| 15|     F|rose|  F|   90|
+----+---+------+----+---+-----+

# 合并 union
df_student = session.createDataFrame([("Jim", 18, "male"), ("Lily", 16, "female")], schema=["name", "age", "gender"])
df.union(df_student).show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
|  Jim| 18|  male|
| Lily| 16|female|
+-----+---+------+

3.3.2 groupBy、agg 分组聚合

# 分组 groupBy
from pyspark.sql import functions as F

# 按性别进行分组,再找出最大值
df.groupBy('gender').max('age').show()

+------+--------+
|gender|max(age)|
+------+--------+
|     F|      16|
|  null|      18|
|     M|      17|
+------+--------+


# 分组后聚合,groupBy、agg  mean 求平均值
df.groupBy('gender').agg(F.mean('age').alias('mean_age'), F.collect_list('name').alias('names')).show()

+------+--------+------------+
|gender|mean_age|       names|
+------+--------+------------+
|     F|    15.5|[rose, lila]|
|  null|    18.0|     [david]|
|     M|    17.0|      [john]|
+------+--------+------------+

df.groupBy('gender').agg(F.expr('avg(age)'), F.expr('collect_list(name)')).show()

+------+--------+------------------+
|gender|avg(age)|collect_list(name)|
+------+--------+------------------+
|     F|    15.5|      [rose, lila]|
|  null|    18.0|           [david]|
|     M|    17.0|            [john]|
+------+--------+------------------+

分组透视:

# 表分组后透视,groupBy,pivot
df_student = session.createDataFrame([("LiLei", 18, "male", 1), ("HanMeiMei", 16, "female", 1),
                                   ("Jim", 17, "male", 2), ("DaChui", 20, "male", 2)]).toDF("name", "age", "gender",
                                                                                            "class")
df_student.show()
df_student.groupBy("class").pivot("gender").max("age").show()

+---------+---+------+-----+
|     name|age|gender|class|
+---------+---+------+-----+
|    LiLei| 18|  male|    1|
|HanMeiMei| 16|female|    1|
|      Jim| 17|  male|    2|
|   DaChui| 20|  male|    2|
+---------+---+------+-----+

+-----+------+----+
|class|female|male|
+-----+------+----+
|    1|    16|  18|
|    2|  null|  20|
+-----+------+----+

窗口函数:

# 窗口函数

df2 = session.createDataFrame([("LiLei", 78, "class1"), ("HanMeiMei", 87, "class1"),
                            ("DaChui", 65, "class2"), ("RuHua", 55, "class2")]) \
    .toDF("name", "score", "class")

df2.show()
dforder = df2.selectExpr("name", "score", "class",
                        "row_number() over (partition by class order by score desc) as order")

dforder.show()

+---------+-----+------+
|     name|score| class|
+---------+-----+------+
|    LiLei|   78|class1|
|HanMeiMei|   87|class1|
|   DaChui|   65|class2|
|    RuHua|   55|class2|
+---------+-----+------+

+---------+-----+------+-----+
|     name|score| class|order|
+---------+-----+------+-----+
|   DaChui|   65|class2|    1|
|    RuHua|   55|class2|    2|
|HanMeiMei|   87|class1|    1|
|    LiLei|   78|class1|    2|
+---------+-----+------+-----+

3.4 类 Excel 操作

包括增加、删除列,替换某些值、去除、填充空行等操作:

info_list = [("rose", 15, 'F'), ("lila", 16, 'F'), ("john", 17, 'M'), ('david', 18, None)]
schema = ["name", "age", "gender"]
df = session.createDataFrame(info_list, schema)

df.show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

1、列操作:

# 增加列
df_new = df.withColumn('year', -df['age'] + 2021)
df_new.show()

+-----+---+------+----+
| name|age|gender|year|
+-----+---+------+----+
| rose| 15|     F|2006|
| lila| 16|     F|2005|
| john| 17|     M|2004|
|david| 18|  null|2003|
+-----+---+------+----+

# 更换列顺序
df_new.select('name', 'age', 'year', 'gender').show()

+-----+---+----+------+
| name|age|year|gender|
+-----+---+----+------+
| rose| 15|2006|     F|
| lila| 16|2005|     F|
| john| 17|2004|     M|
|david| 18|2003|  null|
+-----+---+----+------+

# 删除列
df_new.drop('year').show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

# 列名重命名
df_new.withColumnRenamed('gender', 'sex').show()

+-----+---+----+----+
| name|age| sex|year|
+-----+---+----+----+
| rose| 15|   F|2006|
| lila| 16|   F|2005|
| john| 17|   M|2004|
|david| 18|null|2003|
+-----+---+----+----+

2、排序 sort

# 排序
df.sort(df['age'].desc()).show()  # asc

+-----+---+------+
| name|age|gender|
+-----+---+------+
|david| 18|  null|
| john| 17|     M|
| lila| 16|     F|
| rose| 15|     F|
+-----+---+------+

# 根据多个字段排序
df.orderBy(df['age'].desc(), df['gender'].desc()).show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
|david| 18|  null|
| john| 17|     M|
| lila| 16|     F|
| rose| 15|     F|
+-----+---+------+

3、去除、填充空行:

# 去除 nan 值行
df.na.drop().show()
# df.dropna().show()

# 填充 nan 值
df.fillna('M').show()
# df.na.fill('M').show()

4、替换:

# 替换某些值
df.na.replace({"": "M", "david": "lisi"}).show()
# df.replace({"": "M", "david": "lisi"}).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|rose| 15|     F|
|lila| 16|     F|
|john| 17|     M|
|lisi| 18|  null|
+----+---+------+

5、去重:

# 去重,默认根据全部字段
df2 = df.unionAll(df)
df2.show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

df2.dropDuplicates().show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| john| 17|     M|
|david| 18|  null|
| rose| 15|     F|
| lila| 16|     F|
+-----+---+------+

# 去重,根据部分字段
df.dropDuplicates(['age']).show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| john| 17|     M|
|david| 18|  null|
| rose| 15|     F|
| lila| 16|     F|
+-----+---+------+

6、其他:

# 简单聚合
df.agg({'name': 'count', 'age': 'avg'}).show()

+-----------+--------+
|count(name)|avg(age)|
+-----------+--------+
|          4|    16.5|
+-----------+--------+

# 汇总信息
df.describe().show()

+-------+-----+------------------+------+
|summary| name|               age|gender|
+-------+-----+------------------+------+
|  count|    4|                 4|     3|
|   mean| null|              16.5|  null|
| stddev| null|1.2909944487358056|  null|
|    min|david|                15|     F|
|    max| rose|                18|     M|
+-------+-----+------------------+------+

# 频率超过 0.5 的年龄和性别
df.stat.freqItems(('age', 'gender'), 0.5).show()

+-------------+----------------+
|age_freqItems|gender_freqItems|
+-------------+----------------+
|         [16]|             [F]|
+-------------+----------------+

4. 与 SQL 交互

除了上述常用 api 操作外,还可以将 DataFrame 注册为临时表视图或者全局表视图,然后使用 SQL 语句来操作 DataFrame,另外也可以对 hive 进行增删改查。

4.1 注册视图与 SQL 交互

1、注册为临时视图:

info_list = [("rose", 15, 'F'), ("lila", 16, 'F'), ("john", 17, 'M'), ('david', 18, None)]
schema = ["name", "age", "gender"]
df = session.createDataFrame(info_list, schema)

df.show()

# 注册为临时视图,生命周期与 SparkSession 关联
df.createOrReplaceTempView('people')

session.sql('select * from people WHERE age = "18" ').show()

+-----+---+------+
+-----+---+------+
|david| 18|  null|
+-----+---+------+

2、注册为全局视图:

# 注册为全局视图,生命周期与 Spark 应用关联
df.createOrReplaceGlobalTempView('people1')
session.sql('select t.gender, collect_list(t.name) as names from global_temp.people1 t group by t.gender ').show()

+------+------------+
|gender|       names|
+------+------------+
|     F|[rose, lila]|
|  null|     [david]|
|     M|      [john]|
+------+------------+

session.newSession().sql('select * from global_temp.people1').show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/191154.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jvm宏观上类的加载机制整体和微观上通过类加载器进行加载的过程

说到一个词“类的加载”其实含有歧义&#xff0c;因为在jvm中可以说有一个宏观的&#xff0c;即整体上的类的加载&#xff0c;还有一个微观上的加载&#xff0c;也就是狭隘的通过类加载器的加载class文件的过程&#xff0c;这里介绍这两种“类的加载”。 类的整体加载过程(类加…

Windows C盘清理的正确方式,从此你告别红色烦恼

前言 伴随着电脑工作的时间越久&#xff0c;C盘常常会提示显示其内存已不足。 C盘容量不足将会极大影响系统的运行速度&#xff0c;电脑会变卡、死机。 今天&#xff0c;就给大家分享一个C盘空间清理终极解决方案&#xff1a; 1、利用Windows自己附带的磁盘清理工具 1&…

[oeasy]python0068_ 字体样式_正常_加亮_变暗_控制序列

字体样式 回忆上次内容 上次了解了一个新的转义模式 \33 逃逸控制字符 esc esc 让输出 退出标准输出流 进行控制信息的设置 可以清屏也可以设置光标输出的位置 还能做什么呢&#xff1f; 可以设置字符的颜色吗&#xff1f;&#xff1f;&#xff1f;&#x1f914; 查看细节…

BIC-2022-BDT:区块链和基于数字双胞胎的智能制造高效数据处理安全框架

摘要工业物联网具有智能连接、数据实时处理、协同监测、信息自动处理等特点&#xff0c;是物联网时代的重要组成部分之一。异构工业物联网设备对高数据速率、高可靠性、高覆盖、低延迟的要求&#xff0c;已成为信息安全领域的一大挑战。工业物联网中的智能制造产业需要多方协同…

(02)Cartographer源码无死角解析-(53) 2D后端优化→位姿图优化理论讲解、

讲解关于slam一系列文章汇总链接:史上最全slam从零开始&#xff0c;针对于本栏目讲解(02)Cartographer源码无死角解析-链接如下: (02)Cartographer源码无死角解析- (00)目录_最新无死角讲解&#xff1a;https://blog.csdn.net/weixin_43013761/article/details/127350885 文末…

【ardunio+sx1268】与【esp32+sx1268】实现不同主控单片机lora通讯

2023.21 在前文 esp32 sx1268 的 spi 驱动调通之后&#xff0c;又尝试 ardunio sx1268 驱动&#xff0c;实现不同主控对于lora模块 sx1268 的控制 文章目录1. 实验结果2.硬件描述2.1 sx12682.2 ardunio ATmega3283.接线实物图5.开发环境6.代码实现关于esp32sx1268 的驱动以及代…

爆款制作获1200w播放,B站UP主+品牌如何迈入2023

1月13日&#xff0c;bilibili 2022年度百大UP主已经揭开帷幕&#xff0c;今年延续2021年的评审标准&#xff0c;依然从专业性、影响力、创新性三个维度进行评选。来源-B站这套评审标准已经实施两年&#xff0c;早期的百大评选上榜的更多是来自知名度高、影响力广的UP主&#xf…

2.关系数据库

学习过程参考&#xff08;后续章节同&#xff09; 【公开课】数据库系统概论&#xff08;王珊老师&#xff09;&#xff08;完结&#xff09; 《数据库系统概论》思维导图 【专栏必读】数据库系统概论第五版&#xff08;王珊&#xff09;专栏学习笔记目录导航及课后习题答案详…

中国电子学会2021年09月份青少年软件编程Scratch图形化等级考试试卷三级真题(含答案)

2021-09 Scratch三级真题 分数&#xff1a;100 题数&#xff1a;38 一、单选题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 1. 程序中要使用不确定的数值&#xff0c;这时要用到的是&#xff1f;&#xff08;D &#xff09; A、图章 …

Github如何使用详细介绍(保姆级教学)

前言 &#x1f4dc; “ 作者 久绊A ” 专注记录自己所整理的Java、web、sql等&#xff0c;IT技术干货、学习经验、面试资料、刷题记录&#xff0c;以及遇到的问题和解决方案&#xff0c;记录自己成长的点滴 目录 一、Github如何搜索 二、如何判断一个项目好不好呢&#xff1f…

yolov5 模型输出的格式解析

工作需要&#xff0c; 又需要对yolov5 输出的模型进行转onnx 再用c进行后续处理。 两个问题。 yolov5 的模型输出的是个啥啊&#xff1f;转成onnx后输出的和yolov5输出的处理是否一样呢&#xff1f; 关于第一个问题&#xff0c;yolov5 的模型输出的是个啥啊&#xff1f; 以前…

【Rust】14. Rust 中的函数式语言功能:迭代器与闭包

14.1 闭包&#xff1a;捕获环境的匿名函数 14.1.1 闭包会捕获其环境 14.1.2 闭包类型推断和注解 闭包并不总是要求像 fn 函数那样在参数和返回值上注明类型闭包通常很短&#xff0c;并只关联于小范围的上下文而非任意情境如果尝试对同一闭包使用不同类型则就会得到类型错误&am…

selenium自动化测试框架

一、Selenium自动化测试&#xff08;基于python&#xff09; 1、Selenium简介&#xff1a; 1.1 Selenium是一款主要用于Web应用程序自动化测试的工具集合。Selenium测试直接运行在浏览器中&#xff0c;本质是通过驱动浏览器&#xff0c;模拟浏览器的操作&#xff0c;比如跳转…

测试碎碎念(基础篇_2)

一、软件测试的基础概念1.1 需求在企业中&#xff0c;需求 主要分为 用户需求 和 软件需求~用户需求&#xff1a;可以简单理解为甲方提出的需求&#xff0c;如果没有甲方&#xff0c;那么就是终端用户使用产品时必须要完成的任务&#xff1b;用户需求 一般是比较简略的&#xf…

Flink官方例子解析:带窗口的WordCount

1. 简介 本篇介绍的是带窗口的WordCount&#xff0c;使用窗口函数countWindow。 countWindow是一种计数窗口&#xff0c;有固定窗口和滑动窗口两种用法。 1.1 固定窗口 countWindow(windowSize) , windowSize指的是窗口大小。 例如countWindow(5)&#xff0c; 说明一个窗口可…

零基础机器学习做游戏辅助第七课--模型的保存与加载

一、保存模型 当我们训练好模型后将它保存下来,这样下次使用时就可以直接加载模型进行工作了。 常见的保存模型有三种: 只保存权重文件:model.save_weights(num_weights) 当我们使用save_weights保存权重文件时,没有指定后缀名,则会保存三个文件在指定目录下

linux 下ARC的中断机制

linux 下ARC的中断机制 一、Idu 中断控制器初始化 Idu 是arc 处理器内部中断控制模块&#xff0c; 类似于arm 内部的gic 中断控制模块 首先&#xff0c;Idu中断控制器在初始化时, 会解析DTS信息中定义了几个idu控制器&#xff0c;每个Idu控制器注册一个struct irq_domain数据…

嵌入式Linux系统开发笔记(十三)

U-Boot烧写验证测试 正点原子专门编写了一个软件来将编译出来的.bin 文件烧写到 SD 卡中&#xff0c;这个软件叫做“imxdownload” 【1】将 imxdownload 拷贝到工程根目录下 【2】给予 imxdownload 可执行权限 我们直接将软件 imxdownload 从 Windows 下复制到 Ubuntu 中以…

【干货】Windows下cmd中cd命令的使用方法

【干货】Windows下cmd中cd命令的使用方法什么是cd命令cd命令的使用打开cmdcd命令的常用方法进入某个盘进入某个目录返回上一级目录返回至当前工作目录下的根目录参考什么是cd命令 此处介绍两个概念&#xff1a; cmd&#xff1a;命令提示符cd&#xff1a;全称change directory…

model.train()与model.val()

一、问题描述 需要将mmpose框架下训练的模型单独保存出来&#xff0c;做后续处理。用torch.save()直接保存模型mmpose_model.pt&#xff0c;然后重新搭建模型&#xff0c;把保存的模型参数加载进去&#xff0c;得到scratch_model.pt使用scratch_model.pt进行推理&#xff0c;与…