SparkSQL编程入口和模型与SparkSQL基本编程

news2025/1/9 4:05:31

SparkSQL编程入口和模型

SparkSQL编程模型

主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。

1)SQL:SQL不用多说,就和Hive操作一样,但是需要清楚一点的是,SQL操作的是表,所以要想用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表才可以。同时支持,通用sql和hivesql。

2)DSL(DataFrame&DataSet):在支持SQL编程的同时,方便大家使用函数式编程的思想,类似sparkcore的编程模式,sparksql也支持DSL(Domain Specified Language,领域专用语言,或者特定领域语言),即通过DataFrame和Dataset来支持类似RDD的编程。

DataFrame和Dataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说白了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东西,比如表头,表名,字段,字段类型,只有数据。

Dataset是在spark1.6.2开始出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。

一般的,将RDD称之为Spark体系中的第一代编程模型;DataFrame比RDD多了一个Schema元数据信息,被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强大的函数式编程)和DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新一代的编程模型。

RDD V.S. DataFrame V.S. Dataset

RDD

弹性分布式数据集,是Spark对数据进行的一种抽象,可以理解为Spark对数据的一种组织方式,更简单些说,RDD就是一种数据结构,里面包含了数据和操作数据的方法。从字面上就能看出的几个特点:

1)弹性:数据可完全放内存或完全放磁盘,也可部分存放在内存,部分存放在磁盘,并可以自动切换。RDD出错后可自动重新计算(通过血缘自动容错)。可checkpoint(设置检查点,用于容错),可persist或cache(缓存),里面的数据是分片的(也叫分区,partition),分片的大小可自由设置和细粒度调整。

2)分布式:RDD中的数据可存放在多个节点上。

3)数据集:即数据的集合,相对于DataFrame和Dataset,RDD是Spark最底层的抽象,目前是开发者用的最多的,但逐步会转向DataFrame和Dataset(当然,这是Spark的发展趋势)调整。

DataFrame

理解了RDD,DataFrame就容易理解些,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)。

假设RDD中的两行数据长这样,如图-5所示。

图-5 rdd数据

那么DataFrame中的数据长这样,如图-6所示。

图-6 dataframe数据

从上面两个图可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...)。

有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。

不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

Dataset:相对于RDD,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束,下图-7是官网对于dataset的表述。

图-7 dataset

假设RDD中的两行数据如同-5所示,那么Dataset中的数据长这样,如图-8所示。

图-8 dataset数据

或者也可以如图-9所示,其中每行数据是个Object。

图-9 dataset数据

使用Dataset API的程序,会经过Spark SQL的优化器进行优化(优化器叫什么还记得吗?)

目前仅支持Scala、Java API,尚未提供Python的API(所以一定要学习Scala),相比DataFrame,Dataset提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),到提交到集群运行时才发现错误,实在是不方便,这也是引入Dataset的一个重要原因。

使用DataFrame的代码中json文件中并没有score字段,但是能编译通过,但是运行时会报异常!如图-10代码所示。

图-10 dataframe编码

而使用Dataset实现,会在IDE中就报错,出错提前到了编译之前,如下图-11所示。

图-11 dataset编码

SparkSession

在SparkSQL中的编程模型,不再是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本中为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,在Spark2.0之后将这两个进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器(Builder方式)模式创建SparkSession。

SparkSQL基本编程

SparkSQL编程初体验

1)SparkSession的构建:

val spark = SparkSession.builder()

         .appName("SparkSQLOps")

         .master("local[*]")

           //.enableHiveSupport()//支持hive的相关操作

          .getOrCreate()

2)基本编程:

object SparkSQLOps {

    def main(args: Array[String]): Unit = {

        val spark = SparkSession.builder()

                    .appName("SparkSQLOps")

                    .master("local[*]")

                 //.enableHiveSupport()//支持hive的相关操作

                   .getOrCreate()

        //加载数据

        val pdf:DataFrame = spark.read.json("file:///E:/data/spark/sql/people.json")

        //二维表结构

        pdf.printSchema()

        //数据内容 select * from tbl

        pdf.show()

        //具体的查询 select name, age from tbl

        pdf.select("name", "age").show()

        //导入sparksession中的隐式转换操作,增强sql的功能

import spark.implicits._

        pdf.select($"name",$"age").show()

        //列的运算,给每个人的年龄+10 select name, age+10,height-1 from tbl

        pdf.select($"name",$"height" - 1, new Column("age").+(10)).show()

        //起别名

select name, age+10 as age,height-1  as height from tbl

        pdf.select($"name",($"height" -1).as("height")).show()

        //做聚合统计 统计不同年龄的人数

select age, count(1) counts from tbl group by age

        pdf.select($"age").groupBy($"age").count().show()

        //条件查询 获取年龄超过18的用户

        //pdf.select("name", "age", "height").where($"age".>(18)).show()

        pdf.select("name", "age", "height").where("age > 18").show()

        //sql风格

        //pdf.registerTempTable()

//在spark2.0之后处于维护状态,使用createOrReplaceTempView

        /*

            从使用范围上说,分为global和非global

            global是当前SparkApplication中可用,非global只在当前SparkSession中可用

            从创建的角度上说,分为createOrReplace和不Replace

                createOrReplace会覆盖之前的数据

                create不Replace,如果视图存在,会报错

         */

        pdf.createOrReplaceTempView("people")

        spark.sql(

            """

              |select

              | age,

              | count(1) as countz

              |from people

              |group by age

            """.stripMargin).show

        spark.stop()

    }

}

SparkSQL编程模型的操作

DataFrame的构建方式

在Spark SQL中SparkSession是创建DataFrames和执行SQL的入口,创建DataFrames有三种方式,一种是可以从一个存在的RDD进行转换,还可以从Hive Table进行查询返回,或者通过Spark的数据源进行创建。

从Spark数据源进行创建:

package chapter1
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object Create_DataFrame {
    def main(args: Array[String]): Unit = {
        //创建程序入口
        val spark = SparkSession.builder()

.appName("createDF")

.master("local[*]")

.getOrCreate()
        //调用sparkContext
        val sc: SparkContext = spark.sparkContext
        //设置控制台日志输出级别
        sc.setLogLevel("WARN")
        //从数据源创建DataFrame
        val personDF = spark.read.json("resources/people.json")
        //展示数据
        personDF.show()
    }
}

从RDD进行转换:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object Create_DataFrame1 {
    def main(args: Array[String]): Unit = {
        //创建程序入口
        val spark= SparkSession.builder()

.appName("createDF")

.master("local[*]")

.getOrCreate()
        //调用sparkContext
        val sc: SparkContext = spark.sparkContext
        //设置控制台日志输出级别
        sc.setLogLevel("WARN")
        //导包
        import spark.implicits._
        //加载数据
        val file: RDD[String] = sc.textFile("E:\\资料\\data\\person.txt")
        //按照分隔符进行切分
        val spliFile: RDD[Array[String]] = file.map(line=>line.split(" "))
        //指定字段类型
        val personRDD: RDD[(Int, String, Int)] = spliFile.map(line=>(line(0).toInt,line(1),line(2).toInt))
        //调用toDF方法指定列名
        val personDF: DataFrame = personRDD.toDF("id","name","age")
        //展示数据
        personDF.show()
        //释放资源
        spark.stop()
        sc.stop()
    }
}

通过反射创建DataFrame:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
case class person(id:Int,name:String,age:Int)
object createDataFrame2 {
    def main(args: Array[String]): Unit = {
        //创建程序入口
        val spark = SparkSession.builder()

.appName("createDF")

.master("local[*]")

.getOrCreate()
        //调用sparkContext
        val sc: SparkContext = spark.sparkContext
        //设置控制台日志输出级别
        sc.setLogLevel("WARN")
        //导包
        import spark.implicits._
        //加载数据
        val file: RDD[String] = sc.textFile("E:\\资料\\data\\person.txt")
        //按照分隔符进行切分
        val spliFile: RDD[Array[String]] = file.map(line=>line.split(" "))
        //指定字段类型
        val personRDD: RDD[person] = spliFile.map(line=>person(line(0).toInt,line(1),line(2).toInt))
        //调用toDF方法指定列名
        val personDF: DataFrame = personRDD.toDF()
        //展示数据
        personDF.show()
        //释放资源
        spark.stop()
        sc.stop()
    }
}

动态编程:

/*

  使用动态编程的方式构建DataFrame

  Row-->行,就代表了二维表中的一行记录,jdbc中的resultset,就是java中的一个对象

  */

val row:RDD[Row] = spark.sparkContext.parallelize(List(

    Row(1, "李伟", 1, 180.0),

    Row(2, "汪松伟", 2, 179.0),

    Row(3, "常洪浩", 1, 183.0),

    Row(4, "麻宁娜", 0, 168.0)

))

//表对应的元数据信息

val schema = StructType(List(

    StructField("id", DataTypes.IntegerType, false),

    StructField("name", DataTypes.StringType, false),

    StructField("gender", DataTypes.IntegerType, false),

    StructField("height", DataTypes.DoubleType, false)

))

val df = spark.createDataFrame(row, schema)

df.printSchema()

df.show()

说明,这里学习三个新的类:

1)Row:代表的是二维表中的一行记录,或者就是一个Java对象。

2)StructType:是该二维表的元数据信息,是StructField的集合。

3)StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)。

Dataset的构建方式

Dataset是DataFrame的升级版,创建方式和DataFrame类似,但有不同。

//dataset的构建

object SparkSQLDatasetOps {

    def main(args: Array[String]): Unit = {

        val spark = SparkSession.builder()

                    .appName("SparkSQLDataset")

                    .master("local[*]")

                    .getOrCreate()

        //dataset的数据集

        val list = List(

            new Student(1, "王盛芃", 1, 19),

            new Student(2, "李金宝", 1, 49),

            new Student(3, "张海波", 1, 39),

            new Student(4, "张文悦", 0, 29)

        )

        import spark.implicits._

        val ds = spark.createDataset[Student](list)

        ds.printSchema()

        ds.show()

        spark.stop()

    }

}

case class Student(id:Int, name:String, gender:Int, age:Int)

在编码中需要注意的是,如果导入spark.implicits隐式转换或者数据类型不是case class,便会出现如图-12所示的bug。

图-12 dataset编码注意的问题

在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过。

RDD和DataFrame以及DataSet的互相转换

RDD→DataFrame:

def beanRDD2DataFrame(spark:SparkSession): Unit = {

val stuRDD:RDD[Student] = spark.sparkContext.parallelize(List(

new Student(1, "王盛芃", 1, 19),

new Student(2, "李金宝", 1, 49),

new Student(3, "张海波", 1, 39),

new Student(4, "张文悦", 0, 29)

))

val sdf =spark.createDataFrame(stuRDD, classOf[Student])

sdf.printSchema()

sdf.show()

}

RDD→Dataset:

Def rdd2Dataset(spark:SparkSession): Unit = {

    val stuRDD = spark.sparkContext.parallelize(List(

        Student(1, "王盛芃", 1, 19),

        Student(2, "李金宝", 1, 49),

        Student(3, "张海波", 1, 39),

        Student(4, "张文悦", 0, 29)

    ))

    import spark.implicits._

    val ds:Dataset[Student] = spark.createDataset[Student](stuRDD)

    ds.show()

}

case class Student(id:Int, name:String, gender:Int, age:Int)

RDD转换为DataFrame和Dataset的时候可以有更加简单的方式,如下:

import spark.implicits._

rdd.toDF()

rdd.toDS()

DataFrame→RDD:

val rdd:RDD[Row] = df.rdd

rdd.foreach(row => {

    val id = row.getInt(0)

    val name = row.getString(1)

    val gender = row.getInt(2)

    val height = row.getAs[Double]("height")

    println(s"id=${id},name=$name,gender=$gender,height=$height")

})

Dataset→RDD:

val stuDS: Dataset[Student] = list2Dataset(spark)

val stuRDD:RDD[Student] = stuDS.rdd

stuRDD.foreach(println)

Dataset→DataFrame:

val stuDS: Dataset[Student] = list2Dataset(spark)      

//dataset --->dataframe

val df:DataFrame = stuDS.toDF()

df.show()

DataFrame→Dataset:无法直接将DataFrame转化为Dataset,需要通过as方法添加泛型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1647515.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

90、动态规划-最长的有效括号

思路: 找出有效括号并且是最长的有效括号 dp[i]表示以i结尾的括号最长是多少 然后从1开始 因为从0位置不管是左括号还是右括号都是无法形成一个完成的括号。所以dp[0]0; 当i1时候,判断括号是否是)如果不是那么无法结尾&#x…

3. FactoryTalk View SE按钮工具库

系统自带的按钮比较丑陋,为了迎合客户需求可以从工具库中选择漂亮的按钮图形。 单击按钮选择释放外观–使用图像引用–启动库–选择按钮库,找到一款合适的图形–右键copy copy之后点击从库中粘贴–确定 这样就实现按下按钮的颜色是红色,在…

记录一次给PCAN升级固件pcan_canable_hw-449dc73.bin

方法一:网页升级 首先将3.3V与BOOT短接,插入电脑USB接口,识别为STM32 BOOTLOADER,芯片进入DFU模式。 如果电脑没有识别到STM32 BOOTLOADER,或无法驱动,则需要安装ImpulseRC_Driver_Fixer修复工具。 推荐使用Google浏览器打开网页升级选择PCAN固件,点Connect and Update,…

【Stream 流】通过一个例子看遍所有Stream API使用场景

前言 上篇文章记录了方法引用,Lambda表达式等基础的知识点,这篇文章主要结合课设项目详细介绍Stream 流的API以及它的主要场景。 Stream API作用 在Java 8及其以后的版本中,Stream API为处理集合数据提供了强大而灵活的功能。有了Stream AP…

Java双亲委派机制

系列文章目录 文章目录 系列文章目录前言 前言 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。 概述 Java程序在运…

JAVA聊天室-网络编程socket+javafx+maven【附带exe+源代码,小白可运行,简单容易上手,代码附带注释】

前言 可以学习到java网络编程的知识,也可以拿去当个模板去添加功能 项目展示: chatv1.0 服务器打包视频: 服务器打包exe 1. 项目概述 本项目旨在开发一个简单的群聊室应用程序,使用JavaFX作为用户界面框架,以及Java…

管道通信与Linux命令的执行-(读书笔记-十三)

|前一个命令的输出作为后一个命令的输入。 在Linux中,|符号是一个管道符号,用于将前一个命令的输出作为后一个命令的输入。这种机制允许你将多个命令组合在一起,以执行复杂的操作。下面是一些基本的例子: 查看当前目录下的文件列…

Axure实现菜单抽屉效果

Axure是怎么实现如下效果的? 菜单打开和收起侧边栏菜单抽屉效果 实现效果 两级菜单,点击菜单收起其他菜单,打开当前菜单。 实现原理 单击一级菜单时,1)切换当下二季菜单的显示/隐藏状态 2)隐藏其他菜单…

[Android]四大组件简介

在 Android 开发中,“四大组件”(Four Major Components)是指构成 Android 应用程序的四种核心组件,它们通过各自的方式与系统交互,实现应用的多样功能。这些组件是:Activity、Service、Broadcast Receiver…

青春送温暖 立夏寄真情

(通讯员:赵灿飞 图:杨美、孙红浪) 在青春洋溢的五月,为传承中华民族尊老敬老的传统美德,促进当代青年与老人的跨代交流,增强青年的社会责任感和使命感,传递正能量和关爱困难群体…

组播应用:SW1、SW2、RT1、RT2、AC1运行PIM-SM

SW1、SW2、RT1、RT2、AC1运行PIM-SM,SW1 Vlan10为C-BSR和C-RP;SW1产品网络(PC1)启用组播,用VLC工具串流播放视频文件“1.mp4”,模拟组播源,设置此视频循环播放,组地址232.1.1.1,端口1234,实现总公司和分公司收看视频,用PC2测试。 一、SW1、SW2、RT1、RT2、AC1配置如…

功能全面的外发文件控制方案,拿走不谢

在日常办公中,很多企业往往只采取各种措施来确保存储数据的安全,却忽略了文件外发的安全。因此企业由于自身的安全防护机制不严谨,引发的数据安全事件频发,经常导致严重的经济损失。使用较多的外发方式有邮件、IM通讯工具、网盘、…

docker的commit命令使用制作镜像

docker run -it ubuntu 最基础的ubuntu启动后安装vim 的命令 apt-get update apt-get -y install vim docker commit -m"my_test_ubuntu" -a"za" 80977284a998 atljw/myubuntu:1.0 将本地镜像推送到阿里云 首先登录阿里云服务-控制台 记得一定要设定设…

鸢尾花分类-pytorch实现

前言 本文用pytorch实现了鸢尾花分类,数据不多,只做代码展示用,后续有升级版本。 代码 -*- coding: utf-8 -*- File : main.py Author: Shanmh Time : 2024/05/06 上午9:37 Function:import torch from sklearn import datase…

uniapp生成二维码(uQRCode)与自定义绘制样式与内容

二维码生成使用了一款基于Javascript环境开发的插件 uQRCode ,它不仅适用于uniapp,也适用于所有Javascript运行环境的前端应用和Node.js。 uQRCode 插件地址:https://ext.dcloud.net.cn/plugin?id1287 目录 1、npm安装 2、通过import引…

fabric搭建生产网络

fabric搭建生产网络 一、生成组织结构与身份证书 解包 hyperledger-fabric-linux-amd64-2.5.0.tar.gz 1.1、crypto-config.yaml配置文件 ./bin/cryptogen showtemplate > crypto-config.yaml 将crypto-config.yaml内容修改为: # -------------------------…

pygame实现鼠标绘制并调节画笔大小

pygame实现鼠标绘制并调节画笔大小 pygame介绍调节画笔大小鼠标绘制效果 pygame介绍 Pygame是一个开源的Python库,专为电子游戏开发而设计。它建立在SDL(Simple DirectMedia Layer)的基础上,允许开发者使用Python这种高级语言来实…

微信个人号开发api接口-视频号矩阵接口-VIdeosApi

友情链接:VIdeosApi 获取用户主页 接口地址: http://api.videosapi.com/finder/v2/api/finder/userPage 入参 { "appId": "{{appid}}", "lastBuffer": "", "toUserName": "v2_060000231003b2…

03、 Kafaka单机环境部署

03、 Kafka单机环境部署 1、 Docker 安装单机版本搭建 (1)安装Zookeeper docker pull zookeeper(2)启动zookeeper docker run -d --name zookeeper -p 2181:2181 zookeeper(3)安装 Kafka docker pull …

酷开科技线上出游,用酷开系统云逛博物馆!

五一假期,当全国各地的旅游景点迎来人潮高峰期时,酷开科技为那些寻求宁静假期体验的消费者带来了一个独特的解决方案——“云逛博物馆”。通过酷开系统,消费者可以在家中的电视上,体验维也纳艺术史博物馆的沉浸式画展,…