Spark SQL 中DataFrame DSL的使用

news2024/11/16 9:24:40

在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式,使用的是一种类sql的风格语法。

文章链接:

一、单词统计案例引入

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object Demo2DSLWordCount {
  def main(args: Array[String]): Unit = {
    /**
     * 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
     */
    val sparkSession: SparkSession = SparkSession.builder()
      .master("local")
      .appName("wc spark sql")
      .getOrCreate()

    /**
     * spark sql和spark core的核心数据类型不太一样
     *
     * 1、读取数据构建一个DataFrame,相当于一张表
     */
    val linesDF: DataFrame = sparkSession.read
      .format("csv") //指定读取数据的格式
      .schema("line STRING") //指定列的名和列的类型,多个列之间使用,分割
      .option("sep", "\n") //指定分割符,csv格式读取默认是英文逗号
      .load("spark/data/words.txt") // 指定要读取数据的位置,可以使用相对路径

    /**
     * DSL: 类SQL语法 api  介于代码和纯sql之间的一种api
     *
     * spark在DSL语法api中,将纯sql中的函数都使用了隐式转换变成一个scala中的函数
     * 如果想要在DSL语法中使用这些函数,需要导入隐式转换
     *
     */
    //导入Spark sql中所有的sql隐式转换函数
    import org.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理
    import sparkSession.implicits._

//    linesDF.select(explode(split($"line","\\|")) as "word")
//      .groupBy($"word")
//      .count().show()

    val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word")
      .groupBy($"word")
      .agg(count($"word") as "counts")

    /**
     * 保存数据
     */
    resultDF
      .repartition(1)
      .write
      .format("csv")
      .option("sep","\t")
      .mode(SaveMode.Overwrite)
      .save("spark/data/sqlout2")

  }

}

注意:show()可以指定两个参数,第一个参数为展现的条数,不指定默认展示前20条数据,第二个参数默认为false,代表的是如果数据过长展示就会不完全,可以指定为true,使得数据展示完整,比如 : show(200,truncate = false)

二、数据源获取

查看官方文档:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多种数据源的获取。

 1、csv-->json

    val sparkSession: SparkSession = SparkSession.builder()
      .master("local[2]")
      .appName("多种类型数据源读取演示")
      .config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个
      .getOrCreate()

    //导入spark sql中所有的隐式转换函数
    import org.apache.spark.sql.functions._
    //导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段
    import sparkSession.implicits._

    /**
     * 读csv格式的文件-->写到json格式文件中
     */
    //1500100967,能映秋,21,女,文科五班
    val studentsDF: DataFrame = sparkSession.read
      .format("csv")
      .schema("id String,name String,age Int,gender String,clazz String")
      .option("sep", ",")
      .load("spark/data/student.csv")

    studentsDF.write
      .format("json")
      .mode(SaveMode.Overwrite)
      .save("spark/data/students_out_json.json")
    

2、json-->parquet


    val sparkSession: SparkSession = SparkSession.builder()
      .master("local")
      .appName("")
      .config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个
      .getOrCreate()

    //导入spark sql中所有的隐式转换函数
    //导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段

    /**
     * 读取json数据格式,因为json数据有键值对,会自动的将健作为列名,值作为列值,不需要手动的设置表结构
     */

    //1500100967,能映秋,21,女,文科五班
    //方式1:
    //    val studentsJsonDF: DataFrame = sparkSession.read
    //      .format("json")
    //      .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")
    //方式2:实际上也是调用方式1,只是更简洁了
    // def json(paths: String*): DataFrame = format("json").load(paths : _*)
    val studebtsReadDF: DataFrame = sparkSession.read
      .json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")

    studebtsReadDF.write
      .format("parquet")
      .mode(SaveMode.Overwrite)
      .save("spark/data/students_parquet")

3、parquet-->csv

    val sparkSession: SparkSession = SparkSession.builder()
      .master("local")
      .appName("")
      .config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个
      .getOrCreate()

    //导入Spark sql中所有的sql隐式转换函数
    import org.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理
    import sparkSession.implicits._


    /**
     * parquet:压缩的比例由信息熵决定,通俗的说就是数据的重复程度决定
     */

    val studebtsReadDF: DataFrame = sparkSession.read
      .format("parquet")
      .load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")

    studebtsReadDF.write
      .format("csv")
      .mode(SaveMode.Overwrite)
      .save("spark/data/students_csv")

三、DataFrame DSL API的使用

1、select


import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo1Select {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder()
      .master("local")
      .appName("select函数演示")
      .getOrCreate()

    //导入Spark sql中所有的sql隐式转换函数
    import org.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理
    import sparkSession.implicits._

    val studentsDF: DataFrame = sparkSession.read
      .format("csv")
      .schema("id String,name String,age String,gender String,clazz String")
      .option("sep", ",")
      .load("spark/data/student.csv")

    /**
     * select函数
     */
    //方式1:只能查询原有字段,不能对字段做出处理,比如加减、起别名之类
    studentsDF.select("id", "name", "age")
    //方式2:弥补了方式1的不足
    studentsDF.selectExpr("id","name","age+1 as new_age")
    //方式3:使用隐式转换函数中的$将字段变为一个对象
    val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")
        //3.1使用对象对字段进行处理
//    stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show()       //不可使用未变为对象的字段
    stuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age")                 // +是函数,可以等价于该语句
        //3.2可以在select中使用sql函数
    studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))

    
  }
}

2、where

    /**
     * where函数:过滤数据
     */
    //方式1:直接将sql中的where语句以字符串形式传参
    studentsDF.where("clazz='文科一班' and gender='男'")
    //方式2:使用$列对象形式过滤
    /**
     * 注意在此种方式下:等于和不等于符号与我们平常使用的有所不同
     * 等于:===
     * 不等于:=!=
     */
    studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()

3、groupBy和agg

    /**
     * groupby:分组函数     agg:聚合函数
     * 注意:
     * 1、groupby与agg函数通常都是一起使用
     * 2、分组聚合之后的结果DataFrame中只会包含分组字段与聚合字段
     * 3、分组聚合之后select中无法出现不是分组的字段
     */
    //需求:根据班级分组,求每个班级的人数和平均年龄
    studentsDF.groupBy($"clazz")
      .agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age")
      .show()

4、join

/**
     * 5、join:表关联
     */
    val subjectDF1: DataFrame = sparkSession.read
      .format("csv")
      .option("sep", ",")
      .schema("id String,subject_id String,score Int")
      .load("spark/data/score.csv")

    val subjectDF2: DataFrame = sparkSession.read
      .format("csv")
      .option("sep", ",")
      .schema("sid String,subject_id String,score Int")
      .load("spark/data/score.csv")

    //关联场景1:所关联的字段名字一样
    studentsDF.join(subjectDF1,"id")
    //关联场景2:所关联的字段名字不一样
    studentsDF.join(subjectDF2,$"id"===$"sid","inner")
//    studentsDF.join(subjectDF2,$"id"===$"sid","left").show()

    /**
     * 上面两种关联场景默认inner连接方式(内连接),可以指定参数选择连接方式,比如左连接、右连接、全连接之类
     * * @param joinType Type of join to perform. Default `inner`. Must be one of:
     * *                 `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,
     * *                 `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.
     */

5、开窗

    /**
     * 开窗函数
     * 1、ROW_NUMBER():为分区中的每一行分配一个唯一的序号。序号是根据ORDER BY子句定义的顺序分配的
     * 2、RANK()和DENSE_RANK():为分区中的每一行分配一个排名。RANK()在遇到相同值时会产生间隙,而DENSE_RANK()则不会。
     *
     */

    //需求:统计每个班级总分前三的学生
    val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")

    //方式1:在select中使用row_number() over Window.partitionBy().orderBy()
    stu_scoreDF.groupBy($"clazz", $"id")
      .agg(sum($"score") as "sum_score")
      .select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank")
      .where($"score_rank" <= 3)

    //方式2:使用withcolumn()函数,会新增一列,但是要预先指定列名
    stu_scoreDF
      .repartition(1)
      .groupBy($"clazz", $"id")
      .agg(sum($"score") as "sum_score")
      .withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc))
      .where($"score_rank" <= 3)
      .show()

注意:

      DSL API 不直接对应 SQL 的关键字执行顺序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照构建逻辑查询的方式来组织代码,使其与 SQL 查询的逻辑结构相似。

在构建 Spark DataFrame 转换和操作时,常用流程介绍:

  1. 选择数据源:使用 spark.read 或从其他 DataFrame 派生。
  2. 转换:使用各种转换函数(如 selectfiltermapflatMapjoin 等)来修改 DataFrame。
  3. 聚合:使用 groupBy 和聚合函数(如 sumavgcount 等)对数据进行分组和汇总。
  4. 排序:使用 orderBy 或 sort 对数据进行排序。
  5. 输出:使用 showcollectwrite 等函数将结果输出到控制台、收集到驱动程序或写入外部存储。

四、RDD与DataFrame的转换

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}


object RddToDf {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("Rdd与Df之间的转换")
      .master("local")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import org.apache.spark.sql.functions._
    import sparkSession.implicits._

    val sparkContext: SparkContext = sparkSession.sparkContext
    val idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv")
      .map(_.split(","))
      .map {
        case Array(id: String, name: String, _, _, _) => (id, name)
      }

    /**
     * Rdd-->DF
     * 因为在Rdd中不会存储文件的结构(schema)信息,所以要指定字段
     */
    val idNameDF: DataFrame = idNameRdd.toDF("id", "name")
    idNameDF.createOrReplaceTempView("idNameTb")

    sparkSession.sql("select id,name from idNameTb").show()


    /**
     * DF-->Rdd
     */
    val idNameRdd2: RDD[Row] = idNameDF.rdd
    idNameRdd2.foreach(println)
    
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1688408.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STL--string类的at()函数

at()成员函数介绍 获取索引位置的引用,和[]的作用类似,唯一的区别是[]不判断下标是否越界,at中的索引(下标)如果越界会引发异常。可以把at理解为安全版本的[]。 int main() {array<int, 5>a{1,2,3,4,5};try{//cout << a[5] << endl; //程序崩溃,但没有异常…

如何创建 Gala Games 账户:解决 Cloudflare 验证指南 2024

Gala Games 站在数字娱乐新时代的前沿&#xff0c;将区块链技术与游戏相结合&#xff0c;重新定义了所有权和奖励。本文将引导您创建 Gala Games 账户并使用 CapSolver 解决 Cloudflare 验证难题&#xff0c;确保您顺利进入这一创新的生态系统。 什么是 Gala Games&#xff1f…

Python操作MySQL实战

文章导读 本文用于巩固Pymysql操作MySQL与MySQL操作的知识点&#xff0c;实现一个简易的音乐播放器&#xff0c;拟实现的功能包括&#xff1a;用户登录&#xff0c;窗口显示&#xff0c;加载本地音乐&#xff0c;加入和删除播放列表&#xff0c;播放音乐。 点击此处获取参考源…

安装cad新版本比如2023之后,cad2016就打开闪退,每次重启可以进一次,出现许可无效弹窗

第一步&#xff0c;先右键弹窗的cad图标&#xff0c;打开文件位置&#xff0c;复制他的安装目录。比如这样 然后点击一下空白处&#xff0c;全选&#xff0c;右键复制一下 第二步&#xff0c;然后打开autoremove最新版本 点击扩展&#xff0c;滚轮往下翻到最下面。点击这个c…

炫酷网页设计:HTML5 + CSS3打造8种心形特效

你以为520过去了&#xff0c;你就逃过一劫了&#xff1f;那不是还有分手呢&#xff0c;那不是还得再找对象呢&#xff0c;那不是还有七夕节呢&#xff0c;那不是还有纪念日呢&#xff0c;那不是还有各种各样的节日呢&#xff0c;所以呀&#xff0c;这8种HTML5 CSS3打造8种心形…

瑞芯微RV1126——交叉编译与移植

一、搭建这个nfs服务挂载 (1) sudo apt install nfs-kernel-server (2) 然后在你的ubuntu创建一个nfs共享目录&#xff1a; (3) sudo /etc/init.d/nfs-kernel-server restart 重启nfs服务 (4) 修改配置文件: sudo vim /etc/exports 在这个配置文件里面添加&#xff1a;/hom…

Visual Studio 调试及快捷键

文章目录 原文连接环境一、调试器的基本使用1、更改执行流2、快速执行某一条代码断点设置条件断点查看内存信息查看调用堆栈查看寄存器信息设置监视断点调试二、快捷键一、窗口快捷键二、查找相关快捷键三、代码快捷键原文连接 【教程】visual studio debug 技巧总结 环境 一…

风控指南:国内车险欺诈呈现四大趋势

目录 车险欺诈呈现内外勾结的团伙化 防范车险欺诈需要多重合作 保险企业需要提升反欺诈能力 监管部门需要加强协同合作 2024年4月11日&#xff0c;国家金融监督管理总局官网发布国家金融监督管理总局关于《反保险欺诈工作办法&#xff08;征求意见稿&#xff09;》公开征求意见…

纯代码如何实现WordPress搜索包含评论内容?

WordPress自带的搜索默认情况下是不包含评论内容的&#xff0c;不过有些WordPress网站评论内容比较多&#xff0c;而且也比较有用&#xff0c;所以想要让用户在搜索时也能够同时搜索到评论内容&#xff0c;那么应该怎么做呢&#xff1f; 网络上很多教程都是推荐安装SearchWP插…

C语言——小知识和小细节18

一、力扣题目 1、题目本体 2、题解 本题目我们使用异或分组的方法来解决。可以在我之前的文章《C语言——操作符CSDN博客》中看一下异或的特点。 由于异或的运算规则为相同为0&#xff0c;不同为1&#xff0c;而且是在二进制补码上进行操作的&#xff0c;我们可以发现的一个…

Java面试真题日常练习

题目&#xff1a;反转字符串 描述&#xff1a;编写一个函数&#xff0c;输入一个字符串&#xff0c;将其反转并返回结果。 解题思路&#xff1a;可以使用两个指针&#xff0c;一个指向字符串的开头&#xff0c;一个指向字符串的末尾&#xff0c;然后不断交换两个指针所指的字符…

[AI Google] 10个即将到来的Android生态系统更新

新的体验带来了更强的防盗保护、手表电池寿命优化&#xff0c;以及对电视、汽车等的娱乐功能改进。 昨天&#xff0c;我们分享了Android如何以人工智能为核心重新构想智能手机。今天&#xff0c;我们推出了Android 15的第二个测试版&#xff0c;并分享了更多我们改进操作系统的…

Python代码注释的艺术与智慧

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、引言&#xff1a;注释的必要性 二、注释的误区&#xff1a;不是越多越好 过度注释的问题…

jetcache缓存

1 介绍 是阿里的双极缓存&#xff0c;jvm-->redis-->数据库 文档&#xff1a;jetcache/docs/CN at master alibaba/jetcache GitHub 2 注意事项 使用的实体类一定实现序列化接口定时刷新注解&#xff0c;慎用 它会为每一个key创建一个定时器 &#xff1a;场景为&…

k8s节点亲和性配置

在Kubernetes中&#xff0c;你可以使用节点亲和性&#xff08;Node Affinity&#xff09;来控制Pod调度到特定的节点上。节点亲和性是通过Pod的spec.affinity.nodeAffinity属性来设置的。 以下是一个配置节点亲和性的YAML示例&#xff1a; apiVersion: v1 kind: Pod metadata…

YOLOv8原理详解

Yolov8是2023年1月份开源的。与yolov5一样&#xff0c;支持目标检测、分类、分割任务。 Yolov8主要改进之处有以下几个方面&#xff1a; Backbone&#xff1a;依旧采用的CSP的思想&#xff0c;不过将Yolov5中的C3模块替换为C2F模块&#xff0c;进一步降低了参数量&#xff0c…

为什么越来越多的服装连锁门店收银选web收银系统

如今&#xff0c;随着时尚产业的快速发展&#xff0c;服装连锁门店在管理上面临着越来越多的挑战。其中&#xff0c;收银系统作为零售店铺重要的管理工具&#xff0c;选择适合的收银系统对于提高门店管理效率和顾客体验至关重要。本文将探讨为什么服装连锁门店应选择Web收银系统…

kafka监控配置和告警配置——筑梦之路

kafka_exporter项目地址&#xff1a;https://github.com/danielqsj/kafka_exporter docker-compose部署kafka_exporter # docker-compose部署多个kafka_exporter&#xff0c;每个exporter对接一个kafka# cat docker-compose.ymlversion: 3.1 services:kafka-exporter-opslogs…

Android ART 虚拟机简析

源码基于&#xff1a;Android U 1. prop 名称选项名称heap 变量名称功能 dalvik.vm.heapstartsize MemoryInitialSize initial_heap_size_ 虚拟机在启动时&#xff0c;向系统申请的起始内存 dalvik.vm.heapgrowthlimit HeapGrowthLimit growth_limit_ 应用可使用的 max…

GetWay

SpringCloud - Spring Cloud 之 Gateway网关&#xff0c;Route路由&#xff0c;Predicate 谓词/断言&#xff0c;Filter 过滤器&#xff08;十三&#xff09;_spring.cloud.gateway.routes-CSDN博客 官网&#xff1a;Spring Cloud Gateway 工作原理&#xff1a;Spring Cloud G…