PySpark(四)PySpark SQL、Catalyst优化器、Spark SQL的执行流程

news2025/1/16 1:53:24

目录

PySpark SQL

基础

SparkSession对象

DataFrame入门

 DataFrame构建

DataFrame代码风格

 DSL

SQL

SparkSQL Shuffle 分区数目

 DataFrame数据写出

Spark UDF

Catalyst优化器 

Spark SQL的执行流程


PySpark SQL

基础

PySpark SQL与Hive的异同

Hive和Spark 均是:“分布式SQL计算引擎”
均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。
目前,企业中使用Hive仍旧居多,但SparkSQL将会在很近的未来替代Hive成为分布式SQL计算市场的顶级

这里的重点是:Spark SQL能支持SQL和其他代码混合执行,自由度更高,且其是内存计算,更快。但是其没有元数据管理,然而它最终还是会作用到Hive层面,可以调用Hive的Metasotre

SparkSQL的基本对象是DataFrame,其特点及与其他对象的区别为: 

 SparkSQL 其实有3类数据抽象对象

  • SchemaRDD对象 (已废弃)
  • DataSet对象: 可用于Java、Scala语言
  • DataFrame对象:可用于Java、Scala、Python、R

SparkSession对象

 在RDD阶段,程序的执行入口对象是: SparkContext
在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象
SparkSession对象可以:
-用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext

from pyspark.sql import SparkSession
if __name__ == '__main__':
    spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
    sc = spark.sparkContext

DataFrame入门

DataFrame的组成如下
在结构层面
StructType对象描述整个DataFrame的表结构

StructField对象描述一个列的信息
在数据层面
Row对象记录一行数据
Column对象记录一列数据并包含列的信息

 DataFrame构建

1、用RDD进行构建

rdd的结构要求为:[[xx,xx],[xx,xx]]

spark.createDataFrame(rdd,schema=[])

    spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    rdd = sc.textFile('data/input/sql/people.txt').map(lambda x:x.split(',')).map(lambda x:[x[0],int(x[1])])
    print(rdd.collect())
    # [['Michael', 29], ['Andy', 30], ['Justin', 19]]
    df = spark.createDataFrame(rdd,schema=['name','age'])
    df.printSchema()#打印表结构
    df.show()#打印表
#     root
#     | -- name: string(nullable=true)
#     | -- age: long(nullable=true)
# 
# +-------+---+
# | name | age |
# +-------+---+
# | Michael | 29 |
# | Andy | 30 |
# | Justin | 19 |
# +-------+---+

2、利用StructType进行创建

需要先引入StructType,StringType,IntegerType等构建schema

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerType
if __name__ == '__main__':
    spark =  SparkSession.builder.appName('lmx').master('local[*]').getOrCreate()
    sc = spark.sparkContext
    rdd = sc.textFile('data/input/sql/people.txt').map(lambda x:x.split(',')).map(lambda x:[x[0],int(x[1])])
#构建schema    
schema =StructType().add("name",StringType(),nullable=False).\
        add('age',IntegerType(),nullable=True)
    df = spark.createDataFrame(rdd,schema=schema)
    df.printSchema()
    df.show()

3、toDF将rdd转换为df

下面展示了两种方式

    # 只设定列名,列的数据结构则是内部自己判断
    df = rdd.toDF(['name','age'])
    df.printSchema()
    # root
    # | -- name: string(nullable=true)
    # | -- age: long(nullable=true)
    # 设定列名和数据类型
    schema =StructType().add("name",StringType(),nullable=False).\
        add('age',IntegerType(),nullable=True)
    df = rdd.toDF(schema=schema)
    df.printSchema()
    # root
    # | -- name: string(nullable=false)
    # | -- age: integer(nullable=true)

4、基于pandas构建 

    dfp = pd.DataFrame({
        "id":[1,2,3],
        'score':[99,98,100]
    })
    df = spark.createDataFrame(dfp)
    df.printSchema()
    df.show()
    # root
    # | -- id: long(nullable=true)
    # | -- score: long(nullable=true)
    # 
    # +---+-----+
    # | id | score |
    # +---+-----+
    # | 1 | 99 |
    # | 2 | 98 |
    # | 3 | 100 |
    # +---+-----+

5、通过文件读取创造

在读取json和parquet文件时不需要设定schema,因为文件已经自带

而读取csv时,还需要使用.option设定 header等参数 

这里说一下parquet文件

parquet:是Spark中常用的一种列式存储文件格式
和Hive中的ORC差不多,他俩都是列存储格式
parquet对比普通的文本文件的区别:

  • parquet 内置schema(列名列类型 是否为空)
  • 存储是以列作为存储格式
  • 存储是序列化存储在文件中的(有压缩属性体积小)

DataFrame代码风格

 DataFrame支持两种风格进行编程,分别是DSL风格SQL风格
DSL语法风格
DSL称之为:领域特定语言
其实就是指DataFrame的特有API
DSL风格意思就是以调用API的方式来处理Data比如: df.where0.limit0
SQL语法风格
SQL风格就是使用SQL语句处理DataFrame的数据比如: spark.sql(“SELECT*FROM xxx)

 DSL

其实就是用其内置的API处理数据,举例:

    df.select('id','subject').show()
    df.where('subject="语文"').show()
    df.select('id','subject').where('subject="语文"').show()
    df.groupBy('subject').count().show()

API其实跟SQL类似,这里不详细说明了,个人感觉不如直接写SQL语句

SQL

DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sgl0来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表采用如下的方式:

    df.createTempView('tmp') #创建临时视图
    df.createGlobalTempView('global_tmp')#创建全局试图
    # 全局表: 跨SparkSession对象使用在一个程序内的多个SparkSession中均可调用查询前带上前缀:global_tmp
    df.createOrReplaceTempView('repalce_tmp')#创建临时表,如果存在则替换

然后使用spark.sql的形式书写sql代码

    spark.sql('select * from tmp where subject = "语文"').show()
    spark.sql('select id,score from repalce_tmp where score>90').show()
    spark.sql('select subject,max(score) from global_temp.global_tmp group by subject').show()

SparkSQL Shuffle 分区数目

 原因: 在SparkSQL中当Job中产生Shufle时,默认的分区数 spark.sql.shufle,partitions 为200,在实际项目中要合理的设置。
在代码中可以设置:

spark =  SparkSession.builder.appName('lmx').\
master('local[*]').config('spark.sql.shufle,partitions',2).\
getOrCreate()

spark.sqL.shuffle.partitions 参数指的是,在sql计算中,shuffle算子阶段默认的分区数是200

对于集群模式来说,200个默认也算比较合适

如在Local下运行,200个很多,在调度上会带宋限外的损耗,所以在Local下建议修改比较低, 比如2\4\10均可,这个参数和Spark RDD中设置并行度的参数是相互独立的

 DataFrame数据写出

统一API:

下面提供两种方法,分别写出为json和csv

    spark.sql(
        'select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc'
    ).write.mode('overwrite').format('json').save('data/output/1t')

    spark.sql(
        'select user_id,avg(score) avg_score from tmp group by user_id order by avg_score desc'
    ).write.mode('overwrite').format('csv')\
        .option('header',True)\
        .option('sep',';')\
        .save('data/output/csv')

其他的一些方法: 

SparkSQL中读取数据和写出数据 - 知乎

不过这里似乎不能自己命名导出的数据文件

Spark UDF

无论Hive还是SparKSQL分析处理数据时,往往需要使用函数,SparkSQL模块本身自带很多实现公共功能的函数,在pyspark.sql.functions中SparkSQL与Hive一样支持定义函数:UDF和UDAF,尤其是UDF函数在实际项目中使用最为广泛。回顾Hive中自定义函数有三种类型:
第一种:UDF(User-Defined-Function)函数.
一对一的关系,输入一个值经过函数以后输出一个值;
在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;

第二种:UDAF(User-Defined Aggregation Function)聚合函数

多对一的关系,输入多个值输出一个值,通常与groupBy联合使用;

第三种:UDTF(User-DefinedTable-Generating Functions)函数

一对多的关系,输入一个值输出多个值(一行变为多行),用户自定义生成函数,有点像flatMap;

在SparkSQL中,目前仅仅支持UDF函数和UDAF函数,目前Python仅支持UDF 

UDF有两种定义方式

方式1语法
udf对象=sparksession.udfregister(参数1,参数2,参数3)

参数1:UDF名称,可用于SQL风格

参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型

udf对象:返回值对象,是一个UDF对象,可用于DSL风格
方式2语法

from pyspark.sql import functions as F

udf对象 = F.udf(参数1,参数2)

参数1:被注册成UDF的方法名

参数2:声明UDF的返回值类型

udf对象:返回值对象,是一个UDF对象,可用于DSL风格

举例:

    def double_score(num):
        return 2*num

    udf1 = spark.udf.register('udf_1',double_score,IntegerType())
    # dsl风格
    df.select(udf1(df['score'])).show()
    # sql风格
    df.selectExpr('udf_1(score)').show()
    # sql风格2
    df.createTempView('tmp')
    spark.sql("select udf_1(score) from tmp").show()

    udf2 = F.udf(double_score,IntegerType())
    df.select(udf2(df['score'])).show()

当返回值是数组时,需要定义数组内部数据的数据类型:ArrayType(StringType())

    spark =  SparkSession.builder.appName('lmx').master('local[*]').config('spark.sql.shufle,partitions',2).getOrCreate()
    sc = spark.sparkContext

    rdd=sc.parallelize([['i love you'],['i like you']])
    df = rdd.toDF(['ifo'])
    def func(num):
        return num.split(' ')
    udf = spark.udf.register('udf_sql',func,ArrayType(StringType()))

    # dsl风格
    df.select(udf(df['ifo'])).show()

当返回值是字典时,需要使用StructType(),且定义每个列的名字(需要跟函数返回值的列名一样)和数据类型

    rdd=sc.parallelize([[1],[2],[3],[4],[5]])
    df = rdd.toDF(['ifo'])
    df.show()
    def func(num):
        return {'num':num,'num1':num+10}
    udf = spark.udf.register('udf_sql',func,StructType().\
                             add('num',IntegerType(),nullable=False).\
                             add('num1',IntegerType(),nullable=False))
    df.select(udf(df['ifo'])).show()

Catalyst优化器 

RDD的执行流程为:

代码 ->DAG调度器逻辑任务 ->Task调度器任务分配和管理监控 ->Worker干活

SparkSQL会对写完的代码,执行“自动优化”,既Catalyst优化器,以提升代码运行效率,避免开发者水平影响到代码执行效率。 (RDD代码不会,是因为RDD的数据对象太过复杂,无法被针对性的优化)

加入优化的SparkSQL大致架构为:

1.API 层简单的说就是 Spark 会通过一些 API 接受 SQL 语句

2.收到 SQL 语句以后,将其交给 Catalyst,Catalyst 负责解析 SQL,生成执行计划等

3.Catalyst 的输出应该是 RDD 的执行计划

4.最终交由集群运行 

 Catalyst优化器主要分为四个步骤

1、解析sql,生成AST(抽象语法树)

2、在 AST 中加入元数据信息,做这一步主要是为了一些优化,例如 col=col 这样的条件

以上面的图为例:

  • score.id → id#1#L 为 score.id 生成 id 为1,类型是 Long
  • score.math_score→math_score#2#L为 score.math_score 生成 id 为 2,类型为 Long
  • people.id→id#3#L为 people.id 生成 id 为3,类型为 Long
  • people.age→age#4#L为 people.age 生成 id 为 4,类型为 Long 

3、对已经加入元数据的 AST,输入优化器,进行优化,主要包含两种常见的优化:

谓词下推(Predicate Pushdown)\ 断言下推:将逻辑判断 提前到前面,以减少shuffle阶段的数据量。

以上面的demo举例,可以先进行people.age>10的判断再进行Join等操作。

列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的宽度

以上面的demo举例,由于只select了score和id,所以开始的时候,可以只保留这两个列,由于parquet是按列存储的,所以很适合这个操作

4、上面的过程生成的 AST 其实最终还没办法直接运行,这个 AST 叫做 逻辑计划,结束后,需要生成 物理计划,从而生成 RDD 来运行

Spark SQL的执行流程

如此,Spark SQL的执行流程为: 

1.提交SparkSQL代码
2.catalyst优化
        a.生成原始AST语法数
        b.标记AST元数据
        c.进行断言下推和列值裁剪 以及其它方面的优化作用在AST上
        d.将最终AST得到,生成执行计划
        e.将执行计划翻译为RDD代码
3.Driver执行环境入口构建(SparkSession)
4.DAG 调度器规划逻辑任务
5.TASK 调度区分配逻辑任务到具体Executor上工作并监控管理任务
6.Worker干活

Spark新特性

自适应查询(SparkSQL)

即:Adaptive Query Execution

由于缺乏或者不准确的数据统计信息(元数据)和对成本的错误估算(执行计划调度)导致生成的初始执行计划不理想

在Spark3.x版本提供Adaptive Query Execution自适应查询技术 通过在”运行时”对查询执行计划进行优化, 允许Planner在运行时执行可选计划,这些可选计划将会基于运行时数据统 计进行动态优化, 从而提高性能,其开启方式为:

set spark.sql.adaptive.enabled = true;

Adaptive Query Execution AQE主要提供了三个自适应优化:

动态合并

即:Dynamically coalescing shuffle partitions 

 可以动态调整shuffle分区的数量。用户可以在开始时设置相对较多的shuffle分区数,AQE会在运行时将相邻的小分区 合并为较大的分区。

动态调整Join策略

即:Dynamically switching join strategies 

此优化可以在一定程度上避免由于缺少统计信息或着错误估计大小(当然也可能两种情况同时存在),而导致执行计 划性能不佳的情况。这种自适应优化可以在运行时sort merge join转换成broadcast hash join,从而进一步提升性能

动态优化倾斜Join 

 skew joins可能导致负载的极端不平衡,并严重降低性能。在AQE从shuffle文件统计信息中检测到任何倾斜后,它可 以将倾斜的分区分割成更小的分区,并将它们与另一侧的相应分区连接起来。这种优化可以并行化倾斜处理,获得更 好的整体性能。

触发条件: 1. 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionFactor (default=10) * "median partition size(中位数分区大小)"

2. 分区大小> spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default = 256MB )
 

动态分区裁剪(SparkSQL)

即:Dynamic Partition Pruning

当优化器在编译时无法识别可跳过的分区时,可以使用"动态分区裁剪",即基于运行时推断的信息来进一步进行分区 裁剪。这在星型模型中很常见,星型模型是由一个或多个并且引用了任意数量的维度表的事实表组成。在这种连接操 作中,我们可以通过识别维度表过滤之后的分区来裁剪从事实表中读取的分区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1435146.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2月5日作业

1.请编程实现哈希表的创建存储数组(12,24,234,234,23,234,23),输入key查找的值&#xff0c;实现查找功能 #include<stdio.h> #include<string.h> #include<stdlib.h> #include<math.h> typedef int datatype; typedef struct node {datatype data;str…

JenkinsGitLab完成自动化构建部署

关于GitLab安装:GitLab安装-CSDN博客 Docker中安装GitLab:Docker下安装GitLab-CSDN博客 安装JenKins Jenkins官网:Jenkins 中文版:Jenkins 安装时候中文页面的war包下不来 在英文页面 记得装JDK8以上 JenKins使用java写的 运行JenKins需要JDK环境 我这里已经装好了 将下…

VXLAN:虚拟化网络的强大引擎

1.什么是VXLAN VXLAN&#xff08;Virtual eXtensible Local Area Network&#xff0c;虚拟扩展局域网&#xff09;&#xff0c;是由IETF定义的NVO3&#xff08;Network Virtualization over Layer 3&#xff09;标准技术之一&#xff0c;是对传统VLAN协议的一种扩展。VXLAN的特…

华为云GaussDB在新零售云转型上的摸索实验

新零售的“云化” 阿里研究院曾经提到过一个理念&#xff1a; 零售的本质是无时无刻不为消费者提供超出预期的“内容”。 这个理念其实不难理解&#xff0c;想要留住消费者&#xff0c;靠大家都能提供的“内容”显然是行不通的。超出预期&#xff0c;才能吸引消费者的“消费…

5-4、S加减单片机程序【51单片机+L298N步进电机系列教程】

↑↑↑点击上方【目录】&#xff0c;查看本系列全部文章 摘要&#xff1a;本节介绍实现步进电机S曲线运动的代码 一、目标功能 实现步进电机转动总角度720&#xff0c;其中加减速各90 加速段&#xff1a;加速类型&#xff1a;S曲线  加速角度&#xff1a;角度为90  起步速度…

Python:批量url链接保存为PDF

我的数据是先把url链接获取到存入excel中&#xff0c;后续对excel做的处理&#xff0c;各位也可以直接在程序中做处理&#xff0c;下面就是针对excel中的链接做批量处理 excel内容格式如下&#xff08;涉及具体数据做了隐藏&#xff09; 标题文件链接文件日期网页标题1http://…

标准库 STM32+EC11编码器+I2C ssd1306多级菜单例程

标准库 STM32EC11编码器I2C ssd1306多级菜单例程 &#x1f4cc;原创项目来源于&#xff1a;https://github.com/AdamLoong/Embedded_Menu_Simple&#x1f4cd;相关功能演示观看&#xff1a;https://space.bilibili.com/74495335 单片机多级菜单v1.2 &#x1f449;本次采用的是原…

对于模糊查询的SQL,怎么优先返回等值记录

说明&#xff1a;记录一次SQL改进的方法&#xff0c;希望能对大家有启发。 场景 前端项目有一个输入框&#xff0c;根据输入的银行名称&#xff0c;去模糊查询对应的数据库表&#xff0c;返回结果集&#xff0c;显示到下拉列表中。 因为银行名称字段包括了分行名&#xff0c…

【机器学习】机器学习简单入门

&#x1f388;个人主页&#xff1a;甜美的江 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;matplotlib &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进…

Linux基础-磁盘

1.磁盘分区 1.分区有固定大小 2.直接写在这块盘的磁盘分区表中&#xff08;DPT&#xff09;&#xff0c;和上面装什么操作系统没有任何关系 2.每一个磁盘分区都要先有一个磁盘分区类型 GPT&#xff08;首选&#xff09; MBR 3.磁盘专业术语叫做块设备&#xff08;Block Dev…

re:从0开始的CSS学习之路 2. 选择器超长大合集

0. 写在前面 虽然现在还是不到25的青年人&#xff0c;有时仍会感到恐慌&#xff0c;害怕不定的未来&#xff0c;后悔失去的时间&#xff0c;但细细想来&#xff0c;只有自己才知道&#xff0c;再来一次也不会有太多的改变。 CSS的选择器五花八门&#xff0c;而且以后在JavaScr…

【DDD】学习笔记-数据模型与对象模型

在建立数据设计模型时&#xff0c;我们需要注意表设计与类设计之间的差别&#xff0c;这事实上是数据模型与对象模型之间的差别。 数据模型与对象模型 我们首先来分析在设计时对冗余的考虑。前面在讲解数据分析模型时就提及&#xff0c;在确定数据项模型时&#xff0c;需要遵…

Go语言每日一练——链表篇(五)

传送门 牛客面试笔试必刷101题 ----------------合并k个已排序的链表 题目以及解析 题目 解题代码及解析 解析 这一道题与昨天的合并链表题目类似&#xff0c;但是由于有K个且时间复杂度要求控制在O(nlogn)&#xff0c;这里主要有两种解法&#xff1a;一种是依旧使用归并来…

7.0 Zookeeper 客户端基础命令使用

zookeeper 命令用于在 zookeeper 服务上执行操作。 首先执行命令&#xff0c;打开新的 session 会话&#xff0c;进入终端。 $ sh zkCli.sh 下面开始讲解基本常用命令使用&#xff0c;其中 acl 权限内容在后面章节详细阐述。 ls 命令 ls 命令用于查看某个路径下目录列表。…

函数的连续与间断【高数笔记】

【连续】 分类&#xff0c;分几个&#xff1f;每类特点&#xff1f; 连续条件&#xff0c;是同时满足还是只需其一&#xff1f; 【间断】 分类&#xff0c;分几个大类&#xff0c;又分几个小类&#xff1f;每类特点&#xff1f; 间断条件&#xff0c;是同时满足还是只需其一&am…

PAT-Apat甲级题1008(python和c++实现)

PTA | 1008 Elevator 1008 Elevator 作者 CHEN, Yue 单位 浙江大学 The highest building in our city has only one elevator. A request list is made up with N positive numbers. The numbers denote at which floors the elevator will stop, in specified order. It …

【C/C++ 17】继承

目录 一、继承的概念 二、基类和派生类对象赋值转换 三、继承的作用域 四、派生类的默认成员函数 五、继承与友元 六、继承与静态成员变量 七、菱形继承与虚拟继承 一、继承的概念 继承是指一个类可以通过继承获得另一个类的属性和方法&#xff0c;扩展自己的功能&…

二、SSM 整合配置实战

本章概要 依赖整合和添加控制层配置编写(SpringMVC 整合)业务配置编写(AOP/TX 整合)持久层配置编写(MyBatis 整合)容器初始化配置类整合测试 2.1 依赖整合和添加 数据库准备 数据库脚本 CREATE DATABASE mybatis-example;USE mybatis-example;CREATE TABLE t_emp(emp_id INT…

MQ,RabbitMQ,SpringAMQP的原理与实操

MQ 同步通信 异步通信 事件驱动优势&#xff1a; 服务解耦 性能提升&#xff0c;吞吐量提高 服务没有强依赖&#xff0c;不担心级联失败问题 流量消峰 ​ 小结: 大多情况对时效性要求较高&#xff0c;所有大多数时间用同步。而如果不需要对方的结果&#xff0c;且吞吐…

2024美赛数学建模E题:房产保险的可持续性,思路全解,代码模型分析

2024美赛数学建模E题思路全解&#xff0c;代码模型分析,完整详细内容见文末名片 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 保险公司应该在承保保单时考虑多种因素&#xff0c;以确保公司的长期健康和稳定性。以下是一个可能的模式&#xff0c;以确…