Spark SQL概述与基本操作

news2024/11/18 9:46:36

目录

一、Spark SQL概述

        (1)概念

        (2)特点

        (3)Spark SQL与Hive异同

        (4)Spark的数据抽象

二、Spark Session对象执行环境构建

          (1)Spark Session对象

        (2)代码演示

三、DataFrame创建

        (1)DataFrame组成

        (2)DataFrame创建方式(转换)

        (3)DataFrame创建方式(标准API读取)

四、DataFrame编程

        (1)DSL语法风格

        (2)SQL语法风格

五、Spark SQL——wordcount代码示例

        (1)pyspark.sql.functions包

        (2)代码示例


一、Spark SQL概述

        (1)概念

        Spark SQL是Apache Spark的一个模块,它用于处理结构化和半结构化的数据。Spark SQL允许用户使用SQL查询和操作数据,这种操作可以直接在Spark的DataFrame/Dataset API中进行。此外,Spark SQL还支持多种语言,包括Scala、Java、Python和R。

        (2)特点

        ①融合性:SQL可以无缝集成在代码中,随时用SQL处理数据。

        ②统一数据访问:一套标准API可读写不同的数据源。

        ③Hive兼容:可以使用Spark SQL直接计算生成Hive数据表。

        ④标准化连接:支持标准化JDBC \ ODBC连接,方便和各种数据库进行数据交互。

        (3)Spark SQL与Hive异同

        共同点:Hive和Spark均是:“分布式SQL计算引擎”,均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。

        (4)Spark的数据抽象

        Spark SQL的数据抽象:

        Data Frame与RDD:

二、Spark Session对象执行环境构建

          (1)Spark Session对象

        在RDD阶段,程序的执行入口对象是:SparkContext。在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。

        Spark Session对象作用:

        ①用于SparkSQL编程作为入口对象。

        ②用于SparkCore编程,可以通过Spark Session对象中获取到Spark Context。

        (2)代码演示
# cording:utf8

# Spark Session对象的导包,对象是来自于pyspark.sql包中
from pyspark.sql import SparkSession
if __name__ == '__main__':
    # 构建Spark Session执行环境入口对象
    spark = SparkSession.builder.\
            appName('test').\
            master('local[*]').\
            getOrCreate()
    # 通过Spark Session对象 获取SparkContext对象
    sc = spark.sparkContext

    # SparkSQL测试
    df = spark.read.csv('../input/stu_score.txt', sep=',', header=False)
    df2 = df.toDF('id', 'name', 'score')
    # 打印表结构
    # df2.printSchema()
    # 打印数据内容
    # df2.show()

    df2.createTempView('score')
    # SQL风格
    spark.sql("""SELECT * FROM score WHERE name='语文' LIMIT 5
    """).show()

    # DSL 风格
    df2.where("name='语文'").limit(5).show()

三、DataFrame创建

        (1)DataFrame组成

        DataFrame是一个二维表结构,表格结构的组成:

                ①行

                ②列

                ③表结构描述

        比如,在MySQL中的一个表:

                ①有许多列组成

                ②数据也被分为多个列

                ③表也有表结构信息(列、列名、列类型、列约束等)

        基于这个前提下,DataFrame的组成如下:

                在结构层面:

                        ①StructType对象描述整个DataFrame的表结构

                        ②StructField对象描述一个列的信息

                在数据层面:

                        ①Row对象记录一行数据

                        ②Column对象记录一列数据并包含列的信息

        (2)DataFrame创建方式(转换)

        ①基于RDD方式

# cording:utf8

from pyspark.sql import SparkSession

if __name__ == '__main__':
    # 构建执行环境对象Spark Session
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()

    # 构建SparkContext

    sc = spark.sparkContext

    # 基于RDD转换为DataFrame
    rdd = sc.textFile('../input/people.txt').\
        map(lambda x: x.split(',')).\
        map(lambda x: (x[0], int(x[1])))

    # 构建DataFrame对象
    # 参数1,被转换的RDD
    # 参数2,指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可
    df = spark.createDataFrame(rdd,schema=['name', 'age'])

    # 打印Data Frame的表结构
    df.printSchema()

    # 打印df中的数据
    # 参数1,表示 展示出多少条数据,默认不传的话是20
    # 参数2,表示是否对列进行截断,如果列的数据长度超过20个字符串长度,厚旬欸日不显示,以....代替
    # 如果给False 表示不截断全部显示,默认是True
    df.show(20,False)

    # 将DF对象转换成临时视图表,可供sql语句查询
    df.createOrReplaceTempView('people')
    spark.sql('SELECT * FROM people WHERE age < 30').show()

        ②通过StructType对象来定义DataFrame的 ‘ 表结构 ’ 转换RDD

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
if __name__ == '__main__':
    # 构建执行环境对象Spark Session
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()

    # 构建SparkContext

    sc = spark.sparkContext

    # 基于RDD转换为DataFrame
    rdd = sc.textFile('../input/people.txt').\
        map(lambda x: x.split(',')).\
        map(lambda x: (x[0], int(x[1])))

    # 构建表结构的描述对象:StructType 对象
    # 参数1,列名
    # 参数2,列数据类型
    # 参数3,是否允许为空
    schema = StructType().add('name', StringType(), nullable=True).\
        add('age', IntegerType(), nullable=False)

    # 构建DataFrame对象
    # 参数1,被转换的RDD
    # 参数2,指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可
    df = spark.createDataFrame(rdd, schema=schema)

    df.printSchema()
    df.show()

        ③通过RDD的toDF方法创建RDD

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
if __name__ == '__main__':
    # 构建执行环境对象Spark Session
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()

    # 构建SparkContext

    sc = spark.sparkContext

    # 基于RDD转换为DataFrame
    rdd = sc.textFile('../input/people.txt').\
        map(lambda x: x.split(',')).\
        map(lambda x: (x[0], int(x[1])))

    # toDF构建DataFrame
    # 第一种构建方式,只能设置列名,列类型靠RDD推断,默认允许为空
    df1 = rdd.toDF(['name', 'name'])
    df1.printSchema()
    df1.show()
    # toDF方式2:通过StructType来构造
    # 设置全面,能设置列名、列数据类型、是否为空
    # 构建表结构的描述对象:StructType 对象
    # 参数1,列名
    # 参数2,列数据类型
    # 参数3,是否允许为空
    schema = StructType().add('name', StringType(), nullable=True).\
        add('age', IntegerType(), nullable=False)

    df2 = rdd.toDF(schema=schema)
    df2.printSchema()
    df2.show()




        ④基于Pandas的DataFrame创建DataFrame

# cording:utf8

from pyspark.sql import SparkSession
import pandas as pd

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 基于pandas的DataFrame构建SparkSQL的DataFrame对象
    pdf = pd.DataFrame(
        {
            'id': [1, 2, 3],
            'name': ['张大仙', '王晓晓', '吕不韦'],
            'age': [1, 2, 3]
        }
    )

    df = spark.createDataFrame(pdf)

    df.printSchema()
    df.show()

        (3)DataFrame创建方式(标准API读取)

        统一API示例代码:

        ①读取本地text文件

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 构建StructType,text数据源,
    # text读取数据的特点是:将一整行只作为一个列读取,默认列名是value 类型是String
    schema = StructType().add('data', StringType(),nullable=True)
    df = spark.read.format('text').\
        schema(schema=schema).\
        load('../input/people.txt')

    df.printSchema()
    df.show()

        ②读取json文件

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # json文件类型自带Schema信息
    df = spark.read.format('json').load('../input/people.json')
    df.printSchema()
    df.show()

        ③读取csv文件

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 读取csv文件
    df = spark.read.format('csv').\
        option('sep', ';').\
        option('header', True).\
        option('encoding', 'utf-8').\
        schema('name STRING, age INT, job STRING').\
        load('../input/people.csv')

    df.printSchema()
    df.show()

        ④读取parquet文件

        parquet文件:是Spark中常用的一种列式存储文件格式,和Hive中的ORC差不多,他们都是列存储格式。

        parquet对比普通的文本文件的区别:

                ①parquet内置schema(列名、列类型、是否为空)

                ②存储是以列作为存储格式

                ③存储是序列化存储在文件中的(有压缩属性体积小)

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    # 读取parquet文件
    df = spark.read.format('parquet').load('../input/users.parquet')

    df.printSchema()
    df.show()

四、DataFrame编程

        (1)DSL语法风格
# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    df = spark.read.format('csv').\
        schema('id INT, subject STRING, score INT').\
        load('../input/stu_score.txt')

    # Column对象的获取
    id_column = df['id']
    subject_column = df['subject']

    # DLS风格
    df.select(['id', 'subject']).show()
    df.select('id', 'subject').show()
    df.select(id_column, subject_column).show()

    # filter API
    df.filter('score < 99').show()
    df.filter(df['score'] < 99).show()

    # where API
    df.where('score < 99').show()
    df.where(df['score'] < 99).show()

    # group By API
    # df.groupBy API的返回值为 GroupedData类型1
    # GroupedData对象不是DataFrame
    # 它是一个 有分组关系的数据结构,有一些API供我们对分组做聚合
    # SQL:group by 后接上聚合: sum avg count min max
    # GroupedData 类似于SQL分组后的数据结构,同样由上述5中聚合方法
    # GroupedData 调用聚合方法后,返回值依旧是DayaFrame
    # GroupedData 只是一个中转的对象,最终还是会获得DataFrame的结果
    df.groupBy('subject').count().show()
    df.groupBy(df['subject']).count().show()
        (2)SQL语法风格

        DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql()来执行SQL语句查询,结果返回一个DataFrame。
        如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:        

df.createTempView( "score")            #注册一个临时视图(表)
df.create0rReplaceTempView("score")    #注册一个临时表,如果存在进行替换。
df.createGlobalTempView( "score")      #注册一个全局表

        全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:
        global_temp.
        临时表:只在当前SparkSession中可用

# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

if __name__ == '__main__':
    # 构建执行环境入口对象SparkSession
    spark = SparkSession.builder.\
        appName('test').\
        master('local[*]').\
        getOrCreate()
    sc = spark.sparkContext

    df = spark.read.format('csv').\
        schema('id INT, subject STRING, score INT').\
        load('../input/stu_score.txt')

    # 注册成临时表
    df.createTempView('score')              # 注册临时视图(表)
    df.createOrReplaceTempView('score_2')   # 注册或者替换为临时视图
    df.createGlobalTempView('score_3')      # 注册全局临时视图 全局临时视图使用的时候 需要在前面带上global_temp. 前缀

    # 可以通过SparkSession对象的sql api来完成sql语句的执行
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM score GROUP BY subject").show()
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()
    spark.sql("SELECT subject, COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()

五、Spark SQL——wordcount代码示例

        (1)pyspark.sql.functions包

        这个包里面提供了一系列的计算函数供SparkSQL使用

        导包:from pyspark.sql import functions as F

        这些函数返回值多数都是Column对象。

        (2)代码示例
# cording:utf8

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

if __name__ == '__main__':
    spark = SparkSession.builder.appName('wordcount').master('local[*]').getOrCreate()
    sc = spark.sparkContext

    # TODO 1:SQL风格进行处理
    rdd = sc.textFile('../input/words.txt').\
        flatMap(lambda x: x.split(' ')).\
        map(lambda x: [x])

    df = rdd.toDF(['word'])

    # 注册DF为表格
    df.createTempView('words')

    spark.sql('SELECT word,COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC').show()

    # TODO 2:DSL 风格处理
    df = spark.read.format('text').load('../input/words.txt')

    # withColumn 方法
    # 方法功能:对已存在的列进行操作,返回一个新的列,如果名字和老列相同,那么替换,否则作为新列存在
    df2 = df.withColumn('value', F.explode(F.split(df['value'], ' ')))
    df2.groupBy('value').\
        count().\
        withColumnRenamed('value', 'word').\
        withColumnRenamed('count', 'cnt').\
        orderBy('cnt', ascending=False).show()

    # withColumnRenamed() 对列名进行重命名
    # orderBy() 排序

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1124958.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python-字符串(切片操作与内建函数)

目录 一、字符串介绍 1、什么是字符串 2、转义字符 二、字符串的输入和输出 1、字符串输出 2、字符串输入 三、访问字符串中的值 1、字符串的存储方式 2、使用切片截取字符串 四、字符串内建函数 1、find 2、index 3、count 4、replace 5、split 6、capitalize …

Centos 7 Zabbix配置安装

前言 Zabbix是一款开源的网络监控和管理软件&#xff0c;具有高度的可扩展性和灵活性。它可以监控各种网络设备、服务器、虚拟机以及应用程序等&#xff0c;收集并分析性能指标&#xff0c;并发送警报和报告。Zabbix具有以下特点&#xff1a; 1. 支持多种监控方式&#xff1a;可…

Docker容器引擎的介绍

目录 Docker概述 容器受欢迎的原因 Docker与虚拟机的区别 Docker三个核心概念 Docker的安装 1、环境准备 2、安装依赖包 3、设置阿里云镜像源 4、安装 Docker-CE并设置为开机自动启动 Docker命令 1、查看 docker 版本信息 2、docker 信息查看 3、Docker 镜像操作命…

GoLong的学习之路(五)语法之数组

书接上回&#xff0c;上回书说到&#xff0c;循环语句&#xff0c;在go中循环语句的少了whlie这个关键词&#xff0c;但是与之for可以改这个改这个特点。并且在终止关键词中&#xff0c;又有标签可以方便&#xff0c;停止。这次说数组 文章目录 Array(数组)数组的初始化方法一方…

数据结构堆详解

[TOC]堆详解 一&#xff0c;堆 1.1堆的概念 堆的性质&#xff1a; 堆中某个节点的值总是不大于或不小于其父节点的值&#xff1b; 堆总是一棵完全二叉树。 1.2堆的存储模式 我们前面的文章提到过&#xff0c;二叉树的两种存储模式&#xff0c;一个是顺序存储&#xff0c;一…

网络第一颗

✍ 如何理解局域网和广域网&#xff1f; ✍ 路由器和交换机是怎样工作的&#xff1f; ✍ 三层交换机能不能代替路由器&#xff1f; -- 1.局域网 2. 广域网 -- -- 企业网络 运营商架构 数据中心架构 -- 局域网 - 内网 - 私网 -- 通过交换机连接的 转发相同IP地址段的…

NVIDIA显卡算力表--nvidia显卡算力表

参考链接&#xff1a;https://blog.csdn.net/qq_41070955/article/details/108269915 官方链接&#xff1a;https://developer.nvidia.com/cuda-gpus

电压放大器在工业领域有哪些用途

电压放大器在工业领域中有广泛的应用&#xff0c;其主要功能是将传感器或其他信号源的微小电压信号放大为更大幅度的电压信号&#xff0c;以便进行后续的信号处理、控制和监测。以下是电压放大器在工业领域中的一些常见用途&#xff1a; 传感器信号放大&#xff1a;工业生产中经…

Java 通过反射修改字符串 String 类型变量的取值而不改变字符串变量的指向

注意点 由于 JDK 8 中有关反射相关的功能自从 JDK 9 开始就已经被限制了&#xff0c;如&#xff1a;通过反射修改 String 类型变量的 value 字段(final byte[])&#xff0c;所以要能够使用运行此方法&#xff0c;需要在运行项目时&#xff0c;添加虚拟机(VM)选项&#xff1a;-…

map set 使用快速上手【C++】

目录 一&#xff0c;关联式容器 二&#xff0c;键值对 三&#xff0c;set 1&#xff09;使用参考此文档 2&#xff09;count 函数 3&#xff09;multiset类 四&#xff0c;map 1. 模板参数介绍 2.operator[]介绍 3. multimap 英语比较好的同学可以自行查找文档 学…

springboot+avue框架开发的医院绩效考核系统全套源码

医院综合绩效核算系统全套源码 &#xff08;应用案例自主版权演示&#xff09; 医院绩效考核系统以医院的发展战略为导向&#xff0c;把科室、员工的绩效考核跟战略发展目标紧密结合&#xff0c;引导医院各个科室、各员工的工作目标跟医院的发展目标结合在一起&#xff0c;实现…

代码随想录Day26 贪心01 LeetCode T53 最大子数组和

LeetCode T53 最大子数组和 题目链接:53. 最大子数组和 - 力扣&#xff08;LeetCode&#xff09; 题目思路: 贪心贪的是哪里呢&#xff1f; 如果 -2 1 在一起&#xff0c;计算起点的时候&#xff0c;一定是从 1 开始计算&#xff0c;因为负数只会拉低总和&#xff0c;这就是贪…

VPN访问外网的原理

一.前言 许多人都用VPN翻墙&#xff0c;那么VPN为什么可以做到访问外网&#xff1f; VPN的全称叫“Virtual Private Network”意思就是虚拟私人专用网络&#xff0c;是专用网络的延伸&#xff0c;通过VPN&#xff0c;可以模拟点对点专用连接的方式&#xff0c;通过共享和公共网…

对知识蒸馏的一些理解

知识蒸馏是一种模型压缩技术&#xff0c;它通过从一个大模型&#xff08;教师模型&#xff09;中传输知识到一个小模型&#xff08;学生模型&#xff09;中来提高学生模型的性能&#xff0c;知识蒸馏也要用到真实的数据集标签。 软损失soft loss就是拿教师模型在蒸馏温度为T的…

Ai写作创作系统ChatGPT网站源码+图文搭建教程+支持GPT4.0+支持ai绘画(Midjourney)/支持OpenAI GPT全模型+国内AI全模型

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统AI绘画系统&#xff0c;支持OpenAI GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署…

map 和 set 的一起使用

map 和 set 一起使用的场景其实也蛮多的&#xff0c;最近业务上就遇到了。需求是这样的&#xff0c;一条路径&#xff08;mpls中的lsp&#xff09;会申请多个 id&#xff0c;这个 id 是独一无二的。这里很显然就就一个”一对多“的情况&#xff0c;合适用这个容器不保存这些信息…

【Java集合类面试九】、介绍一下HashMap的扩容机制

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;介绍一下HashMap的扩容机…

【Java集合类面试七】、 JDK7和JDK8中的HashMap有什么区别?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;JDK7和JDK8中的HashMap有…

【保姆级教程】:docker搭建MongoDB三节点副本集

容器可以理解为一个进程&#xff0c;镜像是把环境&#xff0c;组件等都配置好&#xff0c;运行成容器的&#xff0c;容器里面运行服务&#xff0c;也可以说是一个进程。镜像是模板&#xff0c;镜像是实例。 一个镜像可以创建多个实例。也就是多个容器&#xff0c;容器之间相互…

【已解决】vue项目之爆红红红红······

我是用npm update更新依赖的时候就开始爆红了... 这里显示是依赖问题&#xff0c;有多种解决方式&#xff1a;1&#xff0c;哪个依赖出问题就去提高或者降低依赖的版本&#xff1b;2&#xff0c;提高或者降低vue-cli的版本。 第一种&#xff1a; 我的报错信息提示eslint这个依…