Saprk SQL基础知识

news2024/11/14 20:11:31

一.Spark SQL基本介绍

1.什么是Spark SQL

Spark SQL是Spark多种组件中其中一个,主要是用于处理大规模的[结构化数据]

Spark SQL的特点:

1).融合性:既可以使用SQL语句,也可以编写代码,同时支持两者混合使用.

2).统一的数据访问:Spark SQL用统一的API对接不同的数据源

3).Hive的兼容性:Spark SQL可以和Hive进行整合,合并后将执行引擎换成Spark,核心是基于hive的metastore来处理.

4).标准化连接:Spark SQL支持JDBC/ODBC连接

2.Spark SQL和Hive的异同点

相同点:

①都是分布式SQL计算引擎

②都可以处理大规模结构化数据

③都可以建立在Yarn集群上运行

不同点:

①Spark SQL的底层是RDD,Hive SQL的底层是MapReduce

②Spark SQL既可以编写SQL语句,又可以编写代码,而Hive SQL只可以编写SQL语句

③Spark SQL没有元数据管理服务,而Hive SQL有metastore管理元数据服务

④Spark SQL是基于内存运行的,Hive SQL是基于磁盘运行的

3.Spark SQL的数据结构对比

说明:

pandas的DataFrame:二维表 处理单机结构数据

Spark Core:处理任何的数据结构,处理大规模的分布式数据

Spark SQL:二维表,处理大规模的分布式结构数据 

 

RDD:存储直接就是对象,比如在图中,存储就是一个Person的对象,但是里面是什么数据内容,不太清楚.

DataFrame:将Person中各个字段数据,进行结构化存储,形成一个DataFrame,可以直接看到数据

Dataset:将Person对象中数据都按照结构化的方式存储好,同时保留对象的类型,从而知道来源于一个Person对象

由于Python不支持泛型,所以无法使用Dataset类型,客户端仅支持DataFrame类型 

二.DataFrame详解

1.DataFrame基本介绍

 

DataFrame表示的是一个二维的表,二维表,必然存在行,列等表结构描述信息.

表结构描述信息(元数据Schema) :StructType对象

字段:StructField对象,可以描述字段名称,字段数据类型,是否可以为空

行:Row对象

列:Column对象,包含字段名称和字段值

在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息

2.DataFrame的构建方式

2.1 通过RDD得到一个DataFrame

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructField

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('rdd_2_dataframe')\
        .master('local[*]')\
        .getOrCreate()

    # 通过SparkSession得到SparkContext
    sc = spark.sparkContext

    # 2- 数据输入
    # 2.1- 创建一个RDD
    init_rdd = sc.parallelize(["1,李白,20","2,安其拉,18"])

    # 2.2- 将RDD的数据结构转换成二维结构
    new_rdd = init_rdd.map(lambda line: (
            int(line.split(",")[0]),
            line.split(",")[1],
            int(line.split(",")[2])
        )
    )

    # 将RDD转成DataFrame:方式一
    # schema方式一
    schema = StructType()\
        .add('id',IntegerType(),False)\
        .add('name',StringType(),False)\
        .add('age',IntegerType(),False)


    # schema方式二
    schema = StructType([
        StructField('id',IntegerType(),False),
        StructField('name',StringType(),False),
        StructField('age',IntegerType(),False)
    ])

    # schema方式三
    schema = "id:int,name:string,age:int"

    # schema方式四
    schema = ["id","name","age"]

    init_df = spark.createDataFrame(
        data=new_rdd,
        schema=schema
    )

    # 将RDD转成DataFrame:方式二
    """
        toDF:中的schema既可以传List,也可以传字符串形式的schema信息
    """
    # init_df = new_rdd.toDF(schema=["id","name","age"])
    init_df = new_rdd.toDF(schema="id:int,name:string,age:int")

    # 3- 数据处理
    # 4- 数据输出
    init_df.show()
    init_df.printSchema()

    # 5- 释放资源
    sc.stop()
    spark.stop()

场景:RDD可以存储任意结构的数据;而DataFrame只能处理二维表数据。在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者是非结构化的数据,那么我可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发效率高的SparkSQL来对后续数据进行处理分析。

2.2 内部初始化数据得到DataFrame

from pyspark import SparkConf, SparkContext
import os

# 绑定指定的Python解释器
from pyspark.sql import SparkSession

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("内部初始化数据得到DataFrame。类似SparkCore中的parallelize")

    # 1- 创建SparkSession顶级对象
    spark = SparkSession.builder\
        .appName('inner_create_dataframe')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    """
        通过createDataFrame创建DataFrame,schema数据类型可以是:DataType、字符串、List
            字符串:格式要求
                格式一 字段1 字段类型,字段2 字段类型
                格式二(推荐) 字段1:字段类型,字段2:字段类型
                
            List:格式要求
                ["字段1","字段2"]
    """
    # 内部初始化数据得到DataFrame
    init_df = spark.createDataFrame(
        data=[(1,'张三',18),(2,'李四',30)],
        schema="id:int,name:string,age:int"
    )

    # init_df = spark.createDataFrame(
    #     data=[(1, '张三', 18), (2, '李四', 30)],
    #     schema="id int,name string,age int"
    # )

    # init_df = spark.createDataFrame(
    #     data=[(1, '张三', 18), (2, '李四', 30)],
    #     schema=["id","name","age"]
    # )

    # init_df = spark.createDataFrame(
    #     data=[(1, '张三', 18), (2, '李四', 30)],
    #     schema=["id:int", "name:string", "age:int"]
    # )

    # 3- 数据处理
    # 4- 数据输出
    # 输出dataframe的数据内容
    init_df.show()

    # 输出dataframe的schema信息
    init_df.printSchema()

    # 5- 释放资源
    spark.stop()

场景:一般用在开发和测试中,因为只能处理少量的数据

Schema总结

通过createDataFrame创建DataFrame,schema数据类型可以是:DataType,字符串,List

1:字符串

格式一 字段1 字段类型,字段2 字段类型

格式二 字段1:字段类型,字段2:字段类型

2:List

["字段1","字段2"]

3:DataType

格式一 schema = StructType().add('id',IntegerType(),False)

.add('id',IntegerType(),False).add('id',IntegerType(),False)

格式二 schema = StructType([StructField('id',IntegerType,False),

StructField('id',IntegerType,False),

StructField('id',IntegerType,False)])

 2.3 读取外部文件

复杂API

统一API格式:

sparksession.read

.format('text|csv|json|parquet|orc|avro|jdbc|...')

.option('k','v')

.schema(StructType | String)

.load('加载数据路径') #读取外部文件的路径,支持HDFS也支持本地

简写API

 请注意: 以上所有的外部读取方式,都有简单的写法。spark内置了一些常用的读取方案的简写

格式:spark.read.读取方式()

例如:

df = spark.read.csv(

path ='file:///export/data/_03_spark_sql/data/stu.txt',header=True,sep=' ',inferSchema=True,encoding='utf-8')

2.3.1 Text方式读取

 

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("text方式读取文件")

    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('text_demo')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    """
        load:支持读取HDFS文件系统和本地文件系统
            HDFS文件系统:hdfs://node1:8020/文件路径
            本地文件系统:file:///文件路径
            
        text方式读取文件总结:
            1- 不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理
            2- 默认生成的列名叫value,数据类型string
            3- 我们只能够在schema中修改字段value的名称,其他任何内容不能修改
    """
    init_df = spark.read\
        .format('text')\
        .schema("my_field string")\
        .load('file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt')

    # 3- 数据处理
    # 4- 数据输出
    init_df.show()
    init_df.printSchema()

    # 5- 释放资源
    spark.stop()
 

text方式读取文件总结:

1-不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理

2-默认生成的列名叫value,数据类型string

3-我们只能够在schema中修改字段value的名称,其他任何内容不能修改 

2.3.2 CSV方式读取

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    print("csv方式读取文件")

    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('csv_demo')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    """
        csv格式读取外部文件总结:
            1- 复杂API和简写API都必须掌握
            2- 相关参数作用说明:
                2.1- path:指定读取的文件路径。支持HDFS和本地文件路径
                2.2- schema:手动指定元数据信息
                2.3- sep:指定字段间的分隔符
                2.4- encoding:指定文件的编码方式
                2.5- header:指定文件中的第一行是否是字段名称
                2.6- inferSchema:根据数据内容自动推断数据类型。但是,推断结果可能不精确
    """
    # 复杂API写法
    init_df = spark.read\
        .format('csv')\
        .schema("id int,name string,address string,sex string,age int")\
        .option("sep"," ")\
        .option("encoding","UTF-8")\
        .option("header","True")\
        .load('file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt')

    # 简写API写法
    # init_df = spark.read.csv(
    #     path='file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt',
    #     schema="id int,name string,address string,sex string,age int",
    #     sep=' ',
    #     encoding='UTF-8',
    #     header="True"
    # )

    # init_df = spark.read.csv(
    #     path='file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt',
    #     sep=' ',
    #     encoding='UTF-8',
    #     header="True",
    #     inferSchema=True
    # )

    # 3- 数据处理
    # 4- 数据输出
    init_df.show()
    init_df.printSchema()

    # 5- 释放资源
    spark.stop()

csv格式读取外部文件总结:

1-相关参数说明:

1.1 path:文件路径,HDFS和本地

1.2 schema:手动指定元数据信息

1.3 sep:指定字段间的分隔符

1.4 encoding:指定文件的编码方式

1.5 header:指定文件中的第一行是否是字段名称

1.6 inferSchema:根据数据内容自动推断数据类型,但是推断结果可能不精确 

 2.3.3 JSON方式读取

json的数据内容

{'id': 1,'name': '张三','age': 20}
{'id': 2,'name': '李四','age': 23,'address': '北京'}
{'id': 3,'name': '王五','age': 25}
{'id': 4,'name': '赵六','age': 29}

代码实现:

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('json_demo')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    """
        json读取数据总结:
            1- 需要手动指定schema信息。如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充
            2- csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔
    """
    # init_df = spark.read.json(
    #     path='file:///export/data/gz16_pyspark/02_spark_sql/data/data.txt',
    #     schema="id2 int,name string,age int,address string",
    #     encoding='UTF-8'
    # )

    # init_df = spark.read.json(
    #     path='file:///export/data/gz16_pyspark/02_spark_sql/data/data.txt',
    #     schema="id:int,name:string,age:int,address:string",
    #     encoding='UTF-8'
    # )

    init_df = spark.read.json(
        path='file:///export/data/gz16_pyspark/02_spark_sql/data/data.txt',
        schema="id int,name string,age int,address string",
        encoding='UTF-8'
    )

    # 3- 数据输出
    init_df.show()
    init_df.printSchema()


    # 4- 释放资源
    spark.stop()

 json读取数据总结:

1-需要手动指定schema信息,如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充

2-csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔

3.DataFrame的相关API

操作DataFrame一般有两种操作方案:一种为DSL方式,另一种为SQL方式

 

SQL方式: 通过编写SQL语句完成统计分析操作
DSL方式: 特定领域语言,使用DataFrame特有的API完成计算操作,也就是代码形式

从使用角度来说: SQL可能更加的方便一些,当适应了DSL写法后,你会发现DSL要比SQL更好用
从Spark角度来说: 更推荐使用DSL方案,此种方案更加利于Spark底层的优化处理

3.1 SQL相关的API

创建一个视图/表

 

df.createTempView('视图名称'): 创建一个临时的视图(表名)
df.createOrReplaceTempView('视图名称'): 创建一个临时的视图(表名),如果视图存在,直接替换
临时视图,仅能在当前这个Spark Session的会话中使用


df.createGlobalTempView('视图名称'): 创建一个全局视图,运行在一个Spark应用中多个spark会话中都可以使用。在使用的时候必须通过 global_temp.视图名称 方式才可以加载到。较少使用

执行SQL语句

spark.sql('书写SQL') 

3.2 DSL相关的API

show():用于展示DF中数据,默认仅展示前20行

参数1:设置默认展示多少行,默认为20

参数2:是否为阶段列,默认仅展示前20个字符数据,如果过长,不展示

printSchema():用于打印当前这个DF的表结构信息

select():类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样

  • filter()和 where():用于对数据进行过滤操作, 一般在spark SQL中主要使用where

  • groupBy():用于执行分组操作

  • orderBy():用于执行排序操作

DSL主要支持以下几种传递的方式:  str | Column对象 | 列表
    str格式:  '字段'
    Column对象:  
        DataFrame含有的字段  df['字段']
        执行过程新产生:  F.col('字段')
    列表: 
        ['字段1','字段2'...]
        [df['字段1'],df['字段2']]

 为了能够支持在编写Spark SQL的DSL时候,在DSL中使用SQL函数,专门提供一个SQL的函数库。直接加载使用即可

导入这个函数库: import pyspark.sql.functions as F
通过F调用对应的函数即可。SparkSQL中所支持的函数,都可以通过以下地址查询到: 
https://spark.apache.org/docs/3.1.2/api/sql/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1365288.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vueRouter 配合 keep-alive 不生效的问题

文章目录 问题说明案例复现demo 结构问题复现和解决 其实这个不生效的问题根本也不算一个问题,犯的错和写错单词差不多,但是也是一时上头没发现,所以记录一下,如果遇到同样的问题,也希望可以帮助你早点看到这个哭笑不得…

WWDG---窗口看门狗

一.简介 窗口看门狗跟独立看门狗一样,也是一个递减计数器不断的往下递减计数,必须在一个窗口的上限值(用户定义)和下限值(0X40,固定不能变)之间喂狗不会复位,在上限值之前和下限值之…

etcd储存安装

目录 etcd介绍: etcd工作原理 选举 复制日志 安全性 etcd工作场景 服务发现 etcd基本术语 etcd安装(centos) 设置:etcd后台运行 etcd 是云原生架构中重要的基础组件,由 CNCF 孵化托管。etcd 在微服务和 Kubernates 集群中不仅可以作为服务注册…

【hcie-cloud】【18】华为云Stack灾备服务介绍【容灾解决方案介绍、灾备方案架构介绍、管理组件灾备方案介绍、高阶云服务容灾简介、缩略词】【下】

文章目录 灾备方案概述、备份解决方案介绍容灾解决方案介绍华为云容灾解决方案概览云容灾服务云硬盘高可用服务 (VHA)VHA组网结构VHA逻辑组网架构VHA管理组件介绍VHA服务实现原理云服务器高可用服务(CSHA)CSHA物理组网架构CSHA逻辑组网架构CSHA服务组件间…

嵌入式培训机构四个月实训课程笔记(完整版)-Linux系统编程第五天-Linux消息共享内存练习题(物联技术666)

更多配套资料CSDN地址:点赞+关注,功德无量。更多配套资料,欢迎私信。 物联技术666_嵌入式C语言开发,嵌入式硬件,嵌入式培训笔记-CSDN博客物联技术666擅长嵌入式C语言开发,嵌入式硬件,嵌入式培训笔记,等方面的知识,物联技术666关注机器学习,arm开发,物联网,嵌入式硬件,单片机…

2024--Django平台开发-Web框架和Django基础(二)---Mysql多版本共存(Mac系统)

MySQL多版本共存(Mac系统) 想要在Mac系统上同时安装【MySQL5.7 】【MySQL8.0】版本,需要进行如下的操作和配置。 想要同时安装两个版本可以采取如下方案: 方案1:【讲解】 MySQL57,用安装包进行安装。 MyS…

像专家一样使用TypeScript映射类型

掌握TypeScript的映射类型,了解TypeScript内置的实用类型是如何工作的。 您是否使用过Partial、Required、Readonly和Pick实用程序类型? 你知道他们内部是怎么运作的吗? 如果您想彻底掌握它们并创建自己的实用程序类型,那么不要错过本文所涵盖的内容。…

2、Excel:基础概念、表格结构与常见函数

数据来源:八月成交数据 数据初探 业务背景 数据来源行业:金融行业(根据应收利息和逾期金额字段来判断) 可以猜测: 业务主体:某互联网金融公司(类似支付宝)也业务模式:给…

leetcode动态规划问题总结 Python

目录 一、基础理论 二、例题 1. 青蛙跳台阶 2. 解密数字 3. 最长不含重复字符的子字符串 4. 连续子数组的最大和 5. 最长递增子序列 6. 最长回文字符串 7. 机器人路径条数 8. 礼物的最大价值 一、基础理论 动态规划其实是一种空间换时间的基于历史数据的递推算法&…

Java异常机制:从混乱到控制的错误管理艺术

👑专栏内容:Java⛪个人主页:子夜的星的主页💕座右铭:前路未远,步履不停 目录 一、异常的体系结构1、异常的体系结构2、异常的分类 二、异常的处理1、异常的抛出2、异常的捕获2.1、异常声明throws2.2、try-c…

C#中List<T>底层原理剖析

C#中List底层原理剖析 1. 基础用法2. List的Capacity与Count:3.List的底层原理3.1. 构造3.2 Add()接口3.3 Remove()接口3.4 Inster()接口3.5 Clear()接口3.6 Contains()接口3.7 ToArray()接口3.8 Find()接口3.8 Sort()接口 4. 总结5. 参考 1. 基础用法 list.Max() …

2024龙年艺术字矢量Ai设计文件60套

2024新年将至,设计师们早已开始为龙年海报、推文的制作摩拳擦掌。该合集不仅内容丰富多样,作为矢量文件资源,也能够让设计者更为轻松地编辑与创作。 合集内另附200多张电脑壁纸。 文件总大小368MB 链接:https://pan.quark.cn/s/0caab4cf065…

Google Earth Engine谷歌地球引擎GEE批量计算一年中每个指定天数范围内遥感影像平均值的方法

本文介绍在谷歌地球引擎(Google Earth Engine,GEE)中,计算长时间序列遥感影像数据在多年中,在每一个指定天数的时间范围内的平均值的方法。 本文是谷歌地球引擎(Google Earth Engine,GEE&#x…

MySQL BufferPool精讲

缓存的重要性 我们知道,对于使用InnoDB作为存储引擎的表来说,不管是用于存储用户数据的索引(包括聚簇索引和二级索引),还是各种系统数据,都是以页的形式存放在表空间中的,而所谓的表空间只不过…

杰发科技AC7840——CAN通信简介(2)

1.时钟频率 2.位时间 3.采样点 4.消息缓冲区 和ST、NXP的邮箱类似,AutoChips用了缓冲区的概念。 5.接收缓冲区 屏蔽掉demo程序的发送,只看接收情况 在回调中接收数据 先判断是不是进了接收中断 接收数据的处理函数 所有buff数据放到Info buff的内容 BUF…

环境中碳循环

含碳的物质有CO2、CO、CH4、糖类、脂肪和蛋白质等,碳循环以CO2为中心,CO2被植物、藻类利用进行光合作用,合成植物性碳;动物摄食植物就将植物性碳转化为动物性碳;动物和人呼吸放出CO2,有机碳化合物被厌氧微生…

AArch64 memory management学习(一)

提示 该博客主要为个人学习,通过阅读官网手册整理而来(个人觉得阅读官网的英文文档非常有助于理解各个IP特性)。若有不对之处请参考参考文档,以官网参考文档为准。AArch64 memory management学习一共分为两章,这是第一…

Github 2024-01-08开源项目周报 Top14

根据Github Trendings的统计,本周(2024-01-08统计)共有14个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目5TypeScript项目3C项目2Dart项目1QML项目1Go项目1Shell项目1Rust项目1JavaScript项目1C#项目1 免费…

【网络安全】PKI加密

1、PKI概述 名称:Public Key Infrastruction 公钥基础设施 作用:通过加密技术和数字签名保证信息的安全 组成:公钥机密技术、数字证书、CA、RA 2、信息安全三要素 机密性 完整性 身份验证/操作的不可否认性 3、哪些IT领域用到PKI&…

【Golang】go编程语言适合哪些项目开发?

文章目录 **前言****Go 编程语言适合哪些项目开发?****1. 网络编程项目:****2. 大数据处理项目:****3. 云计算项目:****4. Web开发项目:****5. 嵌入式系统项目:****6.API开发**:**1. 并发性能:*…