Spark 6:Spark SQL DataFrame

news2024/10/7 14:22:41

SparkSQL 是Spark的一个模块, 用于处理海量结构化数据。

148dc251c91f40cba8a316745b65d22f.png

SparkSQL是用于处理大规模结构化数据的计算引擎
SparkSQL在企业中广泛使用,并性能极好
SparkSQL:使用简单、API统一、兼容HIVE、支持标准化JDBC和ODBC连接
SparkSQL 2014年正式发布,当下使用最多的2.0版Spark发布于2016年,当下使用的最新3.0办发布于2019年

SparkSQL和Hive的异同

Hive和Spark 均是:“分布式SQL计算引擎”。均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。

6ba76b7a40ce4f2fa613c19c5462a422.png

SparkSQL的数据抽象

5834d1c0b6a044d5b7bc3c0f945fb038.png

Pandas - DataFrame
• 二维表数据结构
• 单机(本地)集合
SparkCore - RDD
• 无标准数据结构,存储什么数据均可
• 分布式集合(分区)
SparkSQL - DataFrame
• 二维表数据结构
• 分布式集合(分区) 

6abaa6bf1efa46669218ee9c13e0133b.png

SparkSQL 其实有3类数据抽象对象
• SchemaRDD对象(已废弃)
• DataSet对象:可用于Java、Scala语言
• DataFrame对象:可用于Java、Scala、Python、R
以Python开发SparkSQL,主要使用的就是DataFrame对象作为核心数据结构 

DataFrame概述

RDD:有分区的、弹性的、分布式的、存储任意结构数据
DataFrame:有分区的、弹性的、分布式的、存储二维表结构数据

DataFrame和RDD都是:弹性的、分布式的、数据集。只是,DataFrame存储的数据结构“限定”为:二维表结构化数据;而RDD可以存储的数据则没有任何限制,想处理什么就处理什么。

假定有如下数据集

1562823cb9f34eb1a93105fc1bec7192.png

DataFrame按二维表格存储

7a1ba11e2841496b9218a4fa37fb751e.png

RDD按数组对象存储

2431552393704aecb38f9084fa8c8a77.png

SparkSession对象
在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。
SparkSession对象可以:
- 用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
所以,后续的代码,执行环境入口对象,统一变更为SparkSession对象

47b303f5b5764750abb6de331008b8f9.png

构建SparkSession核心代码

有如下数据集:列1ID,列2学科,列3分数

859c4a3d46764fd5ac5d05b1e4c30012.png

数据集文件:资料\data\sql\stu_score.txt

需求:读取文件,找出学科为“语文”的数据,并限制输出5条where subject = '语文' limit 5
代码如下:

# coding:utf8

# SparkSession对象的导包, 对象是来自于 pyspark.sql包中
from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 构建SparkSession执行环境入口对象
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()

    # 通过SparkSession对象 获取 SparkContext对象
    sc = spark.sparkContext

    # SparkSQL的HelloWorld
    df = spark.read.csv("../data/input/stu_score.txt", sep=',', header=False)
    df2 = df.toDF("id", "name", "score")
    df2.printSchema()
    df2.show()

    df2.createTempView("score")

    # SQL 风格
    spark.sql("""
        SELECT * FROM score WHERE name='语文' LIMIT 5
    """).show()

    # DSL 风格
    df2.where("name='语文'").limit(5).show()

SparkSQL 和 Hive同样,都是用于大规模SQL分布式计算的计算框架,均可以运行在YARN之上,在企业中广泛被应用。
SparkSQL的数据抽象为:SchemaRDD(废弃)、DataFrame(Python、R、Java、Scala)、DataSet(Java、Scala)。
DataFrame同样是分布式数据集,有分区可以并行计算,和RDD不同的是,DataFrame中存储的数据结构是以表格形式组织的,方便进行SQL计算。
DataFrame对比DataSet基本相同,不同的是DataSet支持泛型特性,可以让Java、Scala语言更好的利用到。
SparkSession是2.0后退出的新执行环境入口对象,可以用于RDD、SQL等编程。

DataFrame的组成
DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:
• 行
• 列
• 表结构描述
比如,在MySQL中的一张表:
• 由许多行组成
• 数据也被分成多个列
• 表也有表结构信息(列、列名、列类型、列约束等)

基于这个前提,DataFrame的组成如下:
在结构层面:
- StructType对象描述整个DataFrame的表结构
- StructField对象描述一个列的信息
在数据层面:
- Row对象记录一行数据
- Column对象记录一列数据并包含列的信息

562f7dc317bd41569d6f15e07dc4829d.png

如图, 在表结构层面,DataFrame的表结构由:
StructType描述,如下图

c4fae3b6f4dd40a1b1ac87713628c404.png
一个StructField记录:列名、列类型、列是否运行为空
多个StructField组成一个StructType对象。
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空。同时,一行数据描述为Row对象,如Row(1, 张三, 11)
一列数据描述为Column对象,Column对象包含一列数据和列的信息

DataFrame的代码构建 - 基于RDD方式1 

DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构。

通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame
这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)

# coding:utf8

from pyspark.sql import SparkSession


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于RDD转换成DataFrame
    rdd = sc.textFile("../data/input/sql/people.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], int(x[1])))

    # 构建DataFrame对象
    # 参数1 被转换的RDD
    # 参数2 指定列名, 通过list的形式指定, 按照顺序依次提供字符串名称即可
    df = spark.createDataFrame(rdd, schema=['name', 'age'])

    # 打印DataFrame的表结构
    df.printSchema()

    # 打印df中的数据
    # 参数1 表示 展示出多少条数据, 默认不传的话是20
    # 参数2 表示是否对列进行截断, 如果列的数据长度超过20个字符串长度, 后续的内容不显示以...代替
    # 如果给False 表示不阶段全部显示, 默认是True
    df.show(20, False)

    # 将DF对象转换成临时视图表, 可供sql语句查询
    df.createOrReplaceTempView("people")
    spark.sql("SELECT * FROM people WHERE age < 30").show()

DataFrame的代码构建 - 基于RDD方式2

通过StructType对象来定义DataFrame的“表结构”转换RDD

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于RDD转换成DataFrame
    rdd = sc.textFile("../data/input/sql/people.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], int(x[1])))

    # 构建表结构的描述对象: StructType对象
    schema = StructType().add("name", StringType(), nullable=True).\
        add("age", IntegerType(), nullable=False)

    # 基于StructType对象去构建RDD到DF的转换
    df = spark.createDataFrame(rdd, schema=schema)

    df.printSchema()
    df.show()

使用RDD的toDF方法转换RDD

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于RDD转换成DataFrame
    rdd = sc.textFile("../data/input/sql/people.txt").\
        map(lambda x: x.split(",")).\
        map(lambda x: (x[0], int(x[1])))


    # toDF的方式构建DataFrame
    df1 = rdd.toDF(["name", "age"])
    df1.printSchema()
    df1.show()

    # toDF的方式2 通过StructType来构建
    schema = StructType().add("name", StringType(), nullable=True).\
        add("age", IntegerType(), nullable=False)

    df2 = rdd.toDF(schema=schema)
    df2.printSchema()
    df2.show()

将Pandas的DataFrame对象,转变为分布式的SparkSQL DataFrame对象

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 基于Pandas的DataFrame构建SparkSQL的DataFrame对象
    pdf = pd.DataFrame(
        {
            "id": [1, 2, 3],
            "name": ["张大仙", "王晓晓", "吕不为"],
            "age": [11, 21, 11]
        }
    )

    df = spark.createDataFrame(pdf)

    df.printSchema()
    df.show()

DataFrame的代码构建 - 读取外部数据
通过SparkSQL的统一API进行数据读取构建DataFrame
统一API示例代码:

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 构建StructType, text数据源, 读取数据的特点是, 将一整行只作为`一个列`读取, 默认列名是value 类型是String
    schema = StructType().add("data", StringType(), nullable=True)
    df = spark.read.format("text").\
        schema(schema=schema).\
        load("../data/input/sql/people.txt")

    df.printSchema()
    df.show()

读取text数据源:使用format(“text”)读取文本数据,读取到的DataFrame只会有一个列,列名默认称之为:value

schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text")\
.schema(schema)\
.load("../data/sql/people.txt")

读取json数据源
使用format(“json”)读取json数据
示例代码:

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # JSON类型自带有Schema信息
    df = spark.read.format("json").load("../data/input/sql/people.json")
    df.printSchema()
    df.show()

读取csv数据源
使用format(“csv”)读取csv数据
示例代码:

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 读取CSV文件
    df = spark.read.format("csv").\
        option("sep", ";").\
        option("header", True).\
        option("encoding", "utf-8").\
        schema("name STRING, age INT, job STRING").\
        load("../data/input/sql/people.csv")

    df.printSchema()
    df.show()

读取parquet数据源
使用format(“parquet”)读取parquet数据

parquet: 是Spark中常用的一种列式存储文件格式。和Hive中的ORC差不多, 他俩都是列存储格式。parquet对比普通的文本文件的区别:
● parquet 内置schema (列名\ 列类型\ 是否为空)
● 存储是以列作为存储格式
● 存储是序列化存储在文件中的(有压缩属性体积小)
Parquet文件不能直接打开查看,如果想要查看内容,可以在PyCharm中安装如下插件来查看:

52b27459f9ec4f1ea28eccdf586a7e4a.png
示例代码:

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 读取parquet类型的文件
    df = spark.read.format("parquet").load("../data/input/sql/users.parquet")

    df.printSchema()
    df.show()

DataFrame的入门操作
DataFrame支持两种风格进行编程,分别是:
• DSL风格
• SQL风格

DSL语法风格
DSL称之为:领域特定语言。
其实就是指DataFrame的特有API
DSL风格意思就是以调用API的方式来处理Data
比如:df.where().limit()
SQL语法风格
SQL风格就是使用SQL语句处理DataFrame的数据
比如:spark.sql(“SELECT * FROM xxx)

DSL - show 方法
功能:展示DataFrame中的数据, 默认展示20条
语法:

df.show(参数1, 参数2)
- 参数1: 默认是20, 控制展示多少条
- 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 要显示的话 请填入 truncate = True

如图,某个df.show后的展示结果:

b07756284cf042a5a088d5bce137beed.png

DSL - printSchema方法
功能:打印输出df的schema信息
语法:

df.printSchema()

807e126f6bb04cdbabef5ea769995109.png

DSL - select
功能:选择DataFrame中的指定列(通过传入参数进行指定)
语法:

df.select()

可传递:
• 可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串
列名来指定列
• List[Column]对象或者List[str]对象, 用来选择多个列

ef2170ebaae2414c98a5e46f1ab344e8.png

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    df = spark.read.format("csv").\
        schema("id INT, subject STRING, score INT").\
        load("../data/input/sql/stu_score.txt")

    # Column对象的获取
    id_column = df['id']
    subject_column = df['subject']

    # DLS风格演示
    df.select(["id", "subject"]).show()
    df.select("id", "subject").show()
    df.select(id_column, subject_column).show()

DSL - filter和where
功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame
语法: 

df.filter()
df.where()
where和filter功能上是等价的

c62c55be968548a89a6e9750f0631d91.png

    # filter API
    df.filter("score < 99").show()
    df.filter(df['score'] < 99).show()

    # where API
    df.where("score < 99").show()
    df.where(df['score'] < 99).show()

DSL - groupBy 分组
功能:按照指定的列进行数据的分组, 返回值是GroupedData对象
语法:

df.groupBy()

传入参数和select一样,支持多种形式,不管怎么传意思就是告诉spark按照哪个列分组

6ed97e39542c4d3983bf75baa2eca87d.png

    # group By API
    df.groupBy("subject").count().show()
    df.groupBy(df['subject']).count().show()

GroupedData对象
GroupedData对象是一个特殊的DataFrame数据集
其类全名:<class 'pyspark.sql.group.GroupedData'>
这个对象是经过groupBy后得到的返回值, 内部记录了 以分组形式存储的数据
GroupedData对象其实也有很多API,比如前面的count方法就是这个对象的内置方法
除此之外,像:min、max、avg、sum、等等许多方法都存在

SQL风格语法 - 注册DataFrame成为表
DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中
使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。
如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:

    # 注册成临时表
    df.createTempView("score") # 注册临时视图(表)
    df.createOrReplaceTempView("score_2") # 注册 或者 替换  临时视图
    df.createGlobalTempView("score_3") # 注册全局临时视图 全局临时视图在使用的时候 需要在前面带上global_temp. 前缀

f950dbb5e36a4d62ae1ab4c8d5b88657.png

SQL风格语法 - 使用SQL查询

8472c6099b9e42fa9cc152772dace233.png

    # 可以通过SparkSession对象的sql api来完成sql语句的执行
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM score GROUP BY subject").show()
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()

pyspark.sql.functions 包
PySpark提供了一个包: pyspark.sql.functions
这个包里面提供了 一系列的计算函数供SparkSQL使用
如何用呢?
导包

from pyspark.sql import functions as F

然后就可以用F对象调用函数计算了。
这些功能函数, 返回值多数都是Column对象。

词频统计案例练习
单词计数需求,使用DSL和SQL两种风格来实现。

# coding:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # TODO 1: SQL 风格进行处理
    rdd = sc.textFile("../data/input/words.txt").\
        flatMap(lambda x: x.split(" ")).\
        map(lambda x: [x])

    df = rdd.toDF(["word"])

    # 注册DF为表格
    df.createTempView("words")

    spark.sql("SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()


    # TODO 2: DSL 风格处理
    df = spark.read.format("text").load("../data/input/words.txt")

    # withColumn方法
    # 方法功能: 对已存在的列进行操作, 返回一个新的列, 如果名字和老列相同, 那么替换, 否则作为新列存在
    df2 = df.withColumn("value", F.explode(F.split(df['value'], " ")))
    df2.groupBy("value").\
        count().\
        withColumnRenamed("value", "word").\
        withColumnRenamed("count", "cnt").\
        orderBy("cnt", ascending=False).\
        show()

电影评分数据分析案例

f83fd35e786b4478ac848e83f3b74b86.png

6352d5c13e1e4a76b6781a4bf2b70484.png

# coding:utf8
import time

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as F


if __name__ == '__main__':
    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    sc = spark.sparkContext

    # 1. 读取数据集
    schema = StructType().add("user_id", StringType(), nullable=True).\
        add("movie_id", IntegerType(), nullable=True).\
        add("rank", IntegerType(), nullable=True).\
        add("ts", StringType(), nullable=True)
    df = spark.read.format("csv").\
        option("sep", "\t").\
        option("header", False).\
        option("encoding", "utf-8").\
        schema(schema=schema).\
        load("../data/input/sql/u.data")

    # TODO 1: 用户平均分
    df.groupBy("user_id").\
        avg("rank").\
        withColumnRenamed("avg(rank)", "avg_rank").\
        withColumn("avg_rank", F.round("avg_rank", 2)).\
        orderBy("avg_rank", ascending=False).\
        show()

    # TODO 2: 电影的平均分查询
    df.createTempView("movie")
    spark.sql("""
        SELECT movie_id, ROUND(AVG(rank), 2) AS avg_rank FROM movie GROUP BY movie_id ORDER BY avg_rank DESC
    """).show()

    # TODO 3: 查询大于平均分的电影的数量 # Row
    print("大于平均分电影的数量: ", df.where(df['rank'] > df.select(F.avg(df['rank'])).first()['avg(rank)']).count())

    # TODO 4: 查询高分电影中(>3)打分次数最多的用户, 此人打分的平均分
    # 先找出这个人
    user_id = df.where("rank > 3").\
        groupBy("user_id").\
        count().\
        withColumnRenamed("count", "cnt").\
        orderBy("cnt", ascending=False).\
        limit(1).\
        first()['user_id']
    # 计算这个人的打分平均分
    df.filter(df['user_id'] == user_id).\
        select(F.round(F.avg("rank"), 2)).show()

    # TODO 5: 查询每个用户的平局打分, 最低打分, 最高打分
    df.groupBy("user_id").\
        agg(
            F.round(F.avg("rank"), 2).alias("avg_rank"),
            F.min("rank").alias("min_rank"),
            F.max("rank").alias("max_rank")
        ).show()

    # TODO 6: 查询评分超过100次的电影, 的平均分 排名 TOP10
    df.groupBy("movie_id").\
        agg(
            F.count("movie_id").alias("cnt"),
            F.round(F.avg("rank"), 2).alias("avg_rank")
        ).where("cnt > 100").\
        orderBy("avg_rank", ascending=False).\
        limit(10).\
        show()

    time.sleep(10000)

"""
1. agg: 它是GroupedData对象的API, 作用是 在里面可以写多个聚合
2. alias: 它是Column对象的API, 可以针对一个列 进行改名
3. withColumnRenamed: 它是DataFrame的API, 可以对DF中的列进行改名, 一次改一个列, 改多个列 可以链式调用
4. orderBy: DataFrame的API, 进行排序, 参数1是被排序的列, 参数2是 升序(True) 或 降序 False
5. first: DataFrame的API, 取出DF的第一行数据, 返回值结果是Row对象.
# Row对象 就是一个数组, 你可以通过row['列名'] 来取出当前行中, 某一列的具体数值. 返回值不再是DF 或者GroupedData 或者Column而是具体的值(字符串, 数字等)
"""

SparkSQL Shuffle 分区数目

a257e6d69044428aae624620f9901faf.png

    # 0. 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        config("spark.sql.shuffle.partitions", 2).\
        getOrCreate()
    sc = spark.sparkContext

    """
    spark.sql.shuffle.partitions 参数指的是, 在sql计算中, shuffle算子阶段默认的分区数是200个.
    对于集群模式来说, 200个默认也算比较合适
    如果在local下运行, 200个很多, 在调度上会带来额外的损耗
    所以在local下建议修改比较低 比如2\4\10均可
    这个参数和Spark RDD中设置并行度的参数 是相互独立的.
    """

SparkSQL 数据清洗API

57ae7d1d5cb54302a0ecab976297eead.png

    df.dropDuplicates().show()
    df.dropDuplicates(['age', 'job']).show()

 a3a215af60ec41429748e429c56e36fb.png

    df.dropna().show()
    # # thresh = 3表示, 最少满足3个有效列,  不满足 就删除当前行数据
    df.dropna(thresh=3).show()
    df.dropna(thresh=2, subset=['name', 'age']).show()

64438ecc0a584588ac8a653eda93b3f5.png

    # 缺失值处理也可以完成对缺失值进行填充
    # DataFrame的 fillna 对缺失的列进行填充
    df.fillna("loss").show()

    # 指定列进行填充
    df.fillna("N/A", subset=['job']).show()

    # 设定一个字典, 对所有的列 提供填充规则
    df.fillna({"name": "未知姓名", "age": 1, "job": "worker"}).show()

DataFrame数据写出

00ee5a7503a941368bbddb92447e00c2.png

f52b2f3c5bc2411690981e93b0d01bde.png

 c65fb09aab28467fa77b20a911b55f1e.png

    # Write text 写出, 只能写出一个列的数据, 需要将df转换为单列df
    df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "ts")).\
        write.\
        mode("overwrite").\
        format("text").\
        save("../data/output/sql/text")

    # Write csv
    df.write.mode("overwrite").\
        format("csv").\
        option("sep", ";").\
        option("header", True).\
        save("../data/output/sql/csv")

    # Write json
    df.write.mode("overwrite").\
        format("json").\
        save("../data/output/sql/json")

    # Write parquet
    df.write.mode("overwrite").\
        format("parquet").\
        save("../data/output/sql/parquet")

DataFrame 通过JDBC读写数据库(MySQL示例)

    # 1. 写出df到mysql数据库中
    df.write.mode("overwrite").\
        format("jdbc").\
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true").\
        option("dbtable", "movie_data").\
        option("user", "root").\
        option("password", "2212072ok1").\
        save()

3ce460d3d77e487b9214ddb91de2cc4d.png

    # 2. 从mysql数据库中读df
    df2 = spark.read.format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true"). \
        option("dbtable", "movie_data"). \
        option("user", "root"). \
        option("password", "2212072ok1"). \
        load()

967106ad85384f3c89f40a2aa9121358.png

DataFrame 在结构层面上由StructField组成列描述,由StructType构造表描述。在数据层面上,Column对象记录列数据,Row对象记录行数据。
DataFrame可以从RDD转换、Pandas DF转换、读取文件、读取JDBC等方法构建。
spark.read.format()和df.write.format() 是DataFrame读取和写出的统一化标准API。
SparkSQL默认在Shuffle阶段200个分区,可以修改参数获得最好性能。
dropDuplicates可以去重、dropna可以删除缺失值、fillna可以填充缺失值。
SparkSQL支持JDBC读写,可用标准API对数据库进行读写操作。

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/976777.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java调用js文件的两种方法(支持V8引擎)

前言 对java逆向感兴趣的盆友可以关注我以前的文章&#xff0c;有图片验证码识别、AES、SHA256等各种加密的java实现&#xff0c;不定时更新常用算法和加密&#xff0c;欢迎一起交流讨论&#xff01; 在日常逆向中&#xff0c;一些前端的加密代码用java复现出来比较难&#xff…

使用工作流快速开发平台,做好企业数据资源管理!

面对越来越繁忙的业务&#xff0c;很多客户朋友不知道用什么样的平台才能快速处理好企业内部数据&#xff0c;做出更有利于企业发展的经营决策。其实&#xff0c;低代码技术平台、工作流快速开发平台是可以让企业减少重复工作&#xff0c;提高工作效率&#xff0c;实现流程化办…

中断处理原理:接口及按键驱动

一、什么是中断 一种硬件上的通知机制&#xff0c;用来通知CPU发生了某种需要立即处理的事件 分为&#xff1a; 内部中断 CPU执行程序的过程中&#xff0c;发生的一些硬件出错、运算出错事件&#xff08;如分母为0、溢出等等&#xff09;&#xff0c;不可屏蔽外部中断 外设发…

自动计算比例 计算属性 computed @input=“rate“

<el-col :span"12"><el-form-item label"当年累计实收租金:" prop"cumulativeRent"><el-inputv-model"createForm.cumulativeRent"input"rate"clearable:disabled"value 2"><template slot…

如何将MySQL中指定的表结构同步到人大金仓数据库

场景 刚开始做数据库适配的时候,这是一个棘手的问题,因为MySQL的库里,表结构,字段都是最新的,但是金仓的库,全是旧版本的表结构。需要把我们模块的表结构,同步到金仓中。 虽然金仓有数据库同步工具,但是直接把所有表都给同步过来,难免会影响到其他模块。 然后…

HCIP学习-IPv6

目录 前置学习内容 IPv6解决的一些IPv4的缺陷 无限的地址 层次化的地址结构 即插即用 简化报文头部 IPv4和IPv6报头比较 端到端的网络罗完整性 安全性增强 挣钱QoS特性 IPv6地址介绍 格式 首选格式 压缩格式 内嵌IPv4地址格式的IPv6地址格式 IPv6的网络前缀和接…

TRICONEX 8312 数字输入模块

Triconex 8312 数字输入模块是一种用于工业自动化和控制系统的模块&#xff0c;通常用于监测和采集数字信号。以下是Triconex 8312 数字输入模块的一些常见产品特点&#xff1a; 多通道输入&#xff1a;8312 模块通常具有多个数字输入通道&#xff0c;允许同时监测多个数字信号…

【Linux】多线程2——线程互斥与同步/多线程应用

文章目录 1. 线程互斥1.1 问题引入1.2 线程互斥的相关概念1.3 互斥量mutex1.4 互斥量实现原理1.5 死锁 2. 线程安全和可重入函数3. 线程同步3.1 同步概念3.2 条件变量 4. 生产消费模型4.1 基于阻塞队列的cp模型4.2 基于环形队列的cp模型POSIX信号量 5. 线程池5.1 互斥量RAII版本…

总结/笔记-vue中的插槽(默认插槽、具名插槽、作用域插槽)

问题&#xff1a; 遇到了一个插槽&#xff0c;写法为 #default ”{ row }“ 插槽知识点&#xff1a; 定义 插槽&#xff0c;用于 在组件中 引用外部组件或自定义组件的内容。 即 子组件中提供给父组件使用的一个占位符&#xff0c;父组件可以在这个占位符中填充任何模板代…

【TI毫米波雷达笔记】ADCBuf外设初始化配置及驱动(以IWR6843AOP为例)

【TI毫米波雷达笔记】ADCBuf外设初始化配置及驱动&#xff08;以IWR6843AOP为例&#xff09; ADCBuf是为mmwave服务的 在配置之前需要配置好mmwave #include < ti/drivers/ADCBuf.h>对应mmwave studio&#xff1a; ADCBuf模块上电 调用&#xff1a; ADCBuf_init();A…

bash: cmake: command not found...+++++++lsb_release: command not found

一 .bash: cmake: command not found… centos中安装那个cmake。 1、问题 [rootPC3 home]# cmake bash: cmake: command not found... Similar command is: make当前系统&#xff1a; [rootPC3 home]# lsb_release -a LSB Version: :core-4.1-amd64:core-4.1-noarch:cxx…

dex2oat编译模式、触发场景、命令强制执行

dex2oat简单理解就是把delvik虚拟机的可执行文件dex转化成AndroidRuntime虚拟机的可执行文件oat。 Android T版本由PKMS下发命令、native层进程installd负责具体执行dex2oat操作。installd回去调用dex2oat64完成编译工作&#xff0c;可以将dex2oat64理解成一个程序。源码路径&…

本地使用GFPGAN进行图像人脸修复

人脸修复 1.下载项目和权重文件2.部署环境3.下载权重文件4.运行代码5.网页端体验 首先来看一下效果图 1.下载项目和权重文件 https://github.com/iptop/GFPGAN-for-Video.git2.部署环境 根据README文件部署好环境&#xff0c;额外还需要&#xff1a; cd GFPGAN-1.3.8 pyt…

在Qt创建的UI中放一个显示点云的窗口(PCL+QT5)

1、首先在Qt Designer创建UI后&#xff0c;拖一个Widget窗口出来 2、在对象查看器中右击该Widget&#xff0c;选择提升窗口部件&#xff0c;如下操作&#xff1a; 3、把UI转出来放在VS项目中&#xff0c;其中你的UI代码头文件会自带QVTKOpenGLNativeWidget.h&#xff0c;当然你…

企业专题片的优势

企业专题片可以通过生动、感人的方式传达企业形象和信息&#xff0c;引起观众的共鸣和兴趣。它是一种强有力的营销工具&#xff0c;能够提升品牌形象、增加产品或服务的认知度&#xff0c;并在激烈的市场竞争中突显企业的实力和成果。企业专题片具有多个好处和影响&#xff0c;…

【C语言】入门——数组

目录 ​编辑 1.一维数组的创建和初始化 1.1一维数组的创建&#xff1a; 1.2 一维数组的初始化 &#xff1a; 2.二维数组的创建的初始化 2.1二维数组的创建&#xff1a; 2.2二维数组的初始化&#xff1a; 3.数组越界 4.数组的使用 4.1一维数组的使用&#xff1a; 4.2二维…

1、PostgreSQL数据库的历史和特性简介

PostgreSQL是一个开源的关系型数据库管理系统&#xff08;DBMS&#xff09;&#xff0c;它具有强大的功能和广泛的可扩展性&#xff0c;被广泛用于各种规模的应用程序和项目中。 一、PostgreSQL 的发展历史 PostgreSQL数据库的来历可以追溯到20世纪80年代末和90年代初。最早由…

MyBatisPlus入门篇1 - 入门案例、CRUD开发、Lombok、分页查询

目录 MyBatisPlus是基于MyBatis框架基础上开发的增强型工具&#xff0c;旨在简化开发、提高效率。 1.入门案例 ①创建SpringBoot项目&#xff0c;添加Spring Web和Mysql driver ②手动在pom.xml文件中添加mybatis plus和druid依赖 <dependency><groupId>com.b…

攻防世界-Get-the-key.txt

原题 解题思路 notepad看到&#xff0c;这应该是一个压缩包&#xff0c;解压。 但是解压的时候提示格式不对&#xff0c;不是zip&#xff0c;rar可以。解压出来有一个key.txt&#xff0c;打开就行。

taro h5 点击页面任意地方关闭弹窗组件 --- findDOMNode 判断点击节点是否属于某个组件

场景&#xff1a;如图&#xff0c;弹窗在大组件中&#xff0c;点击小组件显示弹窗&#xff0c;要求点击除弹窗外的任何元素都能关闭弹窗并且能执行元素原有的逻辑 方法一 最简单的是弹窗背后有一个覆盖整个页面的透明的cover, 点击直接关闭&#xff0c;但是这样虽然点击页面…