第2章 SparkSQL 核心编程

news2024/10/6 12:22:13

第2章 SparkSQL 核心编程

  • 2.1 新的起点
  • 2.2 DataFrame
    • 2.2.1 创建 DataFrame
    • 2.2.2 SQL 语法
    • 2.2.3 DSL 语法
    • 2.2.4 RDD 转换为 DataFrame
    • 2.2.5 DataFrame 转换为 RDD
  • 2.3 DataSet
    • 2.3.1 创建 DataSet
    • 2.3.2 RDD 转换为 DataSet
    • 2.3.3 DataSet 转换为 RDD
  • 2.4 DataFrame 和 DataSet 转换
  • 2.5 RDD、DataFrame、DataSet 三者的关系
    • 2.5.1 三者的共性
    • 2.5.2 三者的区别
    • 2.5.3 三者的互相转换
  • 2.6 IDEA 开发 SparkSQL
    • 2.6.1 添加依赖
    • 2.6.2 代码实现
  • 2.7 用户自定义函数
    • 2.7.1 UDF

(任意内容)

此处输入任意想输入的内容

本课件重点学习如何使用 Spark SQL 所提供的 DataFrame 和 DataSet 模型进行编程.,以及了解它们之间的关系和转换, 关于具体的 SQL 书写不是我们的重点。

2.1 新的起点

  • Spark Core 中,如果想要执行应用程序,需要首先构建上下文环境对象 SparkContext ,Spark SQL 其实可以理解为对 Spark Core 的一种封装,不仅仅在模型上进行了封装,上下文环境对象也进行了封装。

SparkContext 是 Apache Spark 中的核心组件之一,它是 Spark 应用程序与 Spark 集群通信的入口点。在 Spark 中,每个应用程序都需要创建一个 SparkContext 实例,用于与 Spark 集群建立连接并进行任务调度、资源管理等操作。

SparkContext 的主要作用包括:

  1. 与集群通信:SparkContext 负责与 Spark 集群建立通信,它与集群中的 ClusterManager 进行通信,请求资源和执行任务。

  2. 资源管理:SparkContext 负责管理应用程序在集群中的资源分配,包括 Executor 内存、CPU 核心数等。

  3. 任务调度:SparkContext 负责将应用程序中的任务分解成多个阶段,并在集群中调度执行这些任务。

  4. 创建 RDD:SparkContext 可以用来创建 RDD(弹性分布式数据集)和其他 Spark 数据结构。

  5. 访问 Spark 功能:SparkContext 提供了对 Spark 的各种功能和API的访问,如 Spark SQL、Spark Streaming、MLlib 等。

在 Spark 2.0 之前,通过 SparkContext 来创建各种类型的 RDD,并进行任务调度和资源管理。在 Spark 2.0 之后,引入了 SparkSession,它是 SparkContextSQLContextHiveContext 的统一入口,提供了更简洁的编程界面。

示例代码(Spark 2.0+):

import org.apache.spark.sql.SparkSession;

public class SparkExample {
    public static void main(String[] args) {
        // 创建 SparkSession 对象
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkExample")
                .master("local") // 在本地运行,也可以设置为集群的 URL
                .getOrCreate();

        // 使用 SparkSession 执行任务
        // ...

        // 关闭 SparkSession
        spark.stop();
    }
}

在现代的 Spark 应用程序中,推荐使用 SparkSession 来代替 SparkContext,因为它提供了更多的功能,并能兼容不同的数据源和功能模块,使代码更加简洁和易于维护。

  • 在老的版本中,SparkSQL 提供两种 SQL 查询起始点:一个叫 SQLContext,用于 Spark自己提供的 SQL 查询;一个叫 HiveContext,用于连接 Hive 的查询。

  • SparkSession 是 Spark 最新的 SQL 查询起始点,实质上是 SQLContext 和 HiveContext
    的组合,所以在 SQLContex 和 HiveContext 上可用的 API 在 SparkSession 上同样是可以使用
    的。SparkSession 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。当
    我们使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 spark 的 SparkSession 对
    象, 就像我们以前可以自动获取到一个 sc 来表示 SparkContext 对象一样。

SparkSession 是 Apache Spark 2.0 之后引入的一个重要组件,它是对 SparkContext、SQLContext 和 HiveContext 的统一入口,提供了更简洁、更方便的编程界面。SparkSession 可以用来访问 Spark 的各种功能,如 Spark SQL、Spark Streaming、MLlib 等,并且支持在不同的数据源之间无缝切换。

SparkSession 的主要作用包括:

  1. 统一入口:SparkSession 将 SparkContext、SQLContext 和 HiveContext 的功能整合在一个接口中,简化了应用程序的编写和维护。

  2. Spark SQL:SparkSession 提供了对 Spark SQL 的支持,可以使用 SQL 或 DataFrame API 进行数据查询和处理,从而将结构化数据和半结构化数据(JSON、CSV 等)直接转换为 DataFrame 进行分析。

  3. Hive 集成:SparkSession 具有与 Hive 的完全集成,可以直接访问 Hive 的数据、表和元数据,从而能够在 Spark 中执行 Hive 查询,并将结果写回到 Hive 表中。

  4. 读写数据:SparkSession 可以通过 DataFrame API 或者数据源 API 读取和写入各种类型的数据,如 Parquet、Avro、ORC、JSON、CSV 等。

  5. 支持多种数据源:SparkSession 可以无缝切换不同的数据源,可以访问 HDFS、Hive、Amazon S3、Cassandra 等。

  6. Streaming 支持:SparkSession 支持 Spark Streaming,可以实时处理数据流。

使用 SparkSession 示例:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkExample {
    public static void main(String[] args) {
        // 创建 SparkSession 对象
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkExample")
                .master("local") // 在本地运行,也可以设置为集群的 URL
                .getOrCreate();

        // 读取数据并创建 DataFrame
        Dataset<Row> df = spark.read().json("path/to/json/file");

        // 执行操作,如查询、转换等
        df.show();

        // 关闭 SparkSession
        spark.stop();
    }
}

总之,SparkSession 是 Apache Spark 中一个重要的组件,它提供了一个统一的编程接口,使得 Spark 的功能更易于使用和理解。在现代的 Spark 应用程序中,推荐使用 SparkSession 来代替原来的 SparkContextSQLContextHiveContext,以获得更多的功能和更简洁的编程体验。
在这里插入图片描述

2.2 DataFrame

Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。

DataFrame 是 Apache Spark 中的一个分布式数据集,它是由一组命名的列组成的分布式数据集合。DataFrame 可以理解为类似于传统数据库表或者关系型数据库中的表,但是它的数据是以分布式方式存储在集群中的,可以跨多个节点进行并行计算和处理。

在 Spark 中,DataFrame 是对 RDD(弹性分布式数据集)的高级抽象,它提供了一种更高级的数据处理方式,更接近于 SQL 表格的操作风格。DataFrame 可以用于读取和处理各种类型的数据,如 JSON、CSV、Parquet、Avro 等,同时也支持与 Hive 集成,可以直接读取和写入 Hive 表。

DataFrame 具有以下特点:

  1. 类型安全:DataFrame 是强类型的,它在编译期间能够捕获类型错误,从而提供了更好的开发体验和错误检测。

  2. 惰性求值:DataFrame 支持惰性求值,即在执行数据处理操作时,并不立即进行计算,而是生成一个执行计划,只有在遇到动作操作时才会实际触发计算。

  3. 优化执行:Spark 会对 DataFrame 的转换操作进行优化,以提高执行效率,例如可以进行谓词下推、投影下推等优化。

  4. SQL 支持:DataFrame 支持使用 SQL 进行数据查询和处理,可以方便地进行数据筛选、聚合、排序等操作。

  5. 面向列的处理:DataFrame 是面向列的数据处理方式,支持向量化的数据处理,能够更高效地进行数据计算。

DataFrame 的创建可以通过多种方式,如读取文件、从 RDD 转换、使用 Hive 表等。同时,DataFrame API 提供了丰富的操作方法,用于对数据进行转换和处理,包括过滤、排序、聚合、联接等。

示例代码(Java):

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class DataFrameExample {
    public static void main(String[] args) {
        // 创建 SparkSession 对象
        SparkSession spark = SparkSession
                .builder()
                .appName("DataFrameExample")
                .master("local") // 在本地运行,也可以设置为集群的 URL
                .getOrCreate();

        // 读取数据并创建 DataFrame
        Dataset<Row> df = spark.read().json("path/to/json/file");

        // 执行操作,如查询、转换等
        df.show();

        // 关闭 SparkSession
        spark.stop();
    }
}

以上示例中,我们通过 spark.read().json() 方法从 JSON 文件中读取数据并创建了一个 DataFrame,然后使用 df.show() 方法将 DataFrame 中的数据显示出来。

总之,DataFrame 是 Apache Spark 中的一个核心概念,它提供了高级的、类型安全的、优化的分布式数据处理方式,能够满足大规模数据处理和分析的需求。

2.2.1 创建 DataFrame

在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame有三种方式:通过 Spark 的数据源进行创建;从一个存在的 RDD 进行转换;还可以从 Hive Table 进行查询返回。

  1. 从 Spark 数据源进行创建

➢ 查看 Spark 支持创建文件的数据源格式

scala> spark.read.

csv format jdbc json load option options orc parquet schema 
table text textFile

➢ 在 spark 的 bin/data 目录中创建 user.json 文件

{"username":"zhangsan","age":20}

➢ 读取 json 文件创建 DataFrame

scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]

注意:如果从内存中获取数据,spark 可以知道数据类型具体是什么。如果是数字,默认作
为 Int 处理;但是从文件中读取的数字,不能确定是什么类型,所以用 bigint 接收,可以和
Long 类型转换,但是和 Int 不能进行转换

➢ 展示结果

+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
+---+--------+
  1. 从 RDD 进行转换

在 Apache Spark 中,RDD(弹性分布式数据集)是分布式数据处理的核心抽象,是一个不可变的、分区化的数据集合。RDD 提供了一系列的转换操作,用于对数据集进行处理和转换,以实现各种复杂的分布式计算任务。常见的 RDD 转换操作包括:

  1. map(func):对 RDD 中的每个元素应用函数 func,返回一个新的 RDD。

  2. filter(func):筛选出满足条件的元素,返回一个新的 RDD。

  3. flatMap(func):与 map 类似,但每个输入元素可以映射为零个或多个输出元素。

  4. union(other):将当前 RDD 与另一个 RDD 合并,返回一个包含两个 RDD 元素的新 RDD。

  5. distinct():去除 RDD 中的重复元素,返回一个新的 RDD。

  6. groupByKey():对 (key, value) 形式的 RDD 进行分组,返回一个新的 PairRDD,其中 key 是键,value 是具有相同键的值组成的可迭代集合。

  7. reduceByKey(func):对 (key, value) 形式的 RDD 进行按键聚合,返回一个新的 PairRDD,其中 key 是键,value 是经过 func 聚合后的结果。

  8. sortByKey():按照键对 RDD 进行排序,返回一个新的 PairRDD。

  9. join(other):对两个 PairRDD 进行连接操作,返回一个包含连接结果的新 PairRDD。

  10. cogroup(other):对两个 PairRDD 进行连接操作,并返回一个包含每个键的所有值的可迭代集合的新 PairRDD。

除了上述常见的转换操作外,RDD 还支持其他许多转换操作,如 Cartesian、mapPartitions、reduce、aggregate 等,每个转换操作都有不同的应用场景,可以根据具体的业务需求进行选择。

值得注意的是,RDD 是惰性求值的,即在进行转换操作时,并不会立即执行计算,而是创建一个转换后的 RDD 的执行计划。只有当对 RDD 执行动作操作时,Spark 才会根据执行计划实际进行计算和处理。这种特性有助于优化计算过程,并提高 Spark 的性能。

  1. 从 Hive Table 进行查询返回

在 Apache Spark 中,可以通过 HiveContext 或 SparkSession 的 SQL 接口来查询 Hive 表,并将查询结果返回为 DataFrame 或 Dataset。下面是通过 SparkSession 查询 Hive 表并返回结果的示例代码:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class HiveQueryExample {
    public static void main(String[] args) {
        // 创建 SparkSession 对象
        SparkSession spark = SparkSession
                .builder()
                .appName("HiveQueryExample")
                .enableHiveSupport() // 启用 Hive 支持
                .master("local") // 在本地运行,也可以设置为集群的 URL
                .getOrCreate();

        // 查询 Hive 表
        Dataset<Row> result = spark.sql("SELECT * FROM database_name.table_name");

        // 显示查询结果
        result.show();

        // 关闭 SparkSession
        spark.stop();
    }
}

在上述示例中,首先创建了一个 SparkSession 对象并启用了 Hive 支持。然后,通过 spark.sql() 方法执行了一个 SQL 查询,将结果存储在一个 DataFrame 对象中。最后,通过 result.show() 方法将查询结果显示出来。

需要注意的是,为了能够在 Spark 中查询 Hive 表,必须启用 Hive 支持,并且在创建 SparkSession 时通过 enableHiveSupport() 方法来启用。这样,Spark 就能够与 Hive 元数据库进行交互,并读取 Hive 表的元数据和数据。同时,还需要确保 Spark 配置中的 hive-site.xml 文件包含了正确的 Hive 配置信息,以便连接到 Hive 数据库。

通过以上方式,就可以在 Spark 中查询 Hive 表并将结果返回为 DataFrame 或 Dataset,从而方便地进行数据处理和分析。

2.2.2 SQL 语法

SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助。

  1. 读取 JSON 文件创建 DataFrame
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
  1. 对 DataFrame 创建一个临时表
scala> df.createOrReplaceTempView("people")

createOrReplaceTempView 是 Apache Spark 中用于创建或替换临时视图(Temporary View)的方法。它是在 Spark SQL 中的一个函数,用于将 DataFrame 或 Dataset 注册为一个临时视图,以便可以通过 SQL 查询对其进行操作。

临时视图是一种临时的、在当前 SparkSession 中有效的视图,它并不是在 Hive 元数据库中注册的永久性视图。因此,临时视图只在当前 SparkSession 中可用,一旦 SparkSession 关闭,临时视图也将被删除。

createOrReplaceTempView 的语法如下:

void createOrReplaceTempView(String viewName);

其中,viewName 参数是临时视图的名称,可以在后续的 SQL 查询中使用该名称来引用临时视图。

示例代码:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class CreateTempViewExample {
    public static void main(String[] args) {
        // 创建 SparkSession 对象
        SparkSession spark = SparkSession
                .builder()
                .appName("CreateTempViewExample")
                .master("local") // 在本地运行,也可以设置为集群的 URL
                .getOrCreate();

        // 读取数据并创建 DataFrame
        Dataset<Row> df = spark.read().json("path/to/json/file");

        // 将 DataFrame 注册为一个临时视图
        df.createOrReplaceTempView("my_temp_view");

        // 使用 SQL 查询临时视图
        Dataset<Row> result = spark.sql("SELECT * FROM my_temp_view");

        // 显示查询结果
        result.show();

        // 关闭 SparkSession
        spark.stop();
    }
}

在上述示例中,我们首先创建了一个 SparkSession 对象,并读取了数据并创建了 DataFrame。然后,通过 createOrReplaceTempView 方法将 DataFrame 注册为一个名为 “my_temp_view” 的临时视图。接下来,我们使用 SQL 查询该临时视图,并将查询结果显示出来。

通过 createOrReplaceTempView 方法,我们可以方便地将 DataFrame 或 Dataset 注册为一个临时视图,以便在 Spark SQL 中使用 SQL 进行数据查询和处理。

  1. 通过 SQL 语句实现查询全表
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  1. 结果展示
scala> sqlDF.show
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+

注意:普通临时表是 Session 范围内的,如果想应用范围内有效,可以使用全局临时表。使
用全局临时表时需要全路径访问,如:global_temp.people

  1. 对于 DataFrame 创建一个全局表
scala> df.createGlobalTempView("people")
  1. 通过 SQL 语句实现查询全表
scala> spark.sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+
scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|username|
+---+--------+
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
+---+--------+



2.2.3 DSL 语法

DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了。

  1. 创建一个 DataFrame
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
  1. 查看 DataFrame 的 Schema 信息
scala> df.printSchema
root
|-- age: Long (nullable = true)
|-- username: string (nullable = true)
  1. 只查看"username"列数据,
scala> df.select("username").show()
+--------+
|username|
+--------+
|zhangsan|
| lisi|
| wangwu|
+--------+


  1. 查看"username"列数据以及"age+1"数据
    注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式:单引号+字段名
scala> df.select($"username",$"age" + 1).show
scala> df.select('username, 'age + 1).show()
scala> df.select('username, 'age + 1 as "newage").show()




+--------+---------+
|username|(age + 1)|
+--------+---------+
|zhangsan| 21|
| lisi| 31|
| wangwu| 41|
+--------+---------+
  1. 查看"age"大于"30"的数据
scala> df.filter($"age">30).show
+---+---------+
|age| username|
+---+---------+
| 40| wangwu|
+---+---------+
  1. 按照"age"分组,查看数据条数
scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 20| 1|
| 30| 1|
| 40| 1|
+---+-----+

2.2.4 RDD 转换为 DataFrame

在 IDEA 中开发程序时,如果需要 RDD 与 DF 或者 DS 之间互相操作,那么需要引入import spark.implicits._

这里的 spark 不是 Scala 中的包名,而是创建的 sparkSession 对象的变量名称,所以必须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明,因为 Scala 只支持val 修饰的对象的引入。

spark-shell 中无需导入,自动完成此操作。

scala> val idRDD = sc.textFile("data/id.txt")
scala> idRDD.toDF("id").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+

实际开发中,一般通过样例类将 RDD 转换为 DataFrame

scala> case class User(name:String, age:Int)
defined class User
scala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, 
t._2)).toDF.show
+--------+---+
| name|age|
+--------+---+
|zhangsan| 30|
| lisi| 40|
+--------+---+

2.2.5 DataFrame 转换为 RDD

DataFrame 其实就是对 RDD 的封装,所以可以直接获取内部的 RDD

scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, 
t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] 
at rdd at <console>:25

scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])

注意:此时得到的 RDD 存储类型为 Row

scala> array(0)
res28: org.apache.spark.sql.Row = [zhangsan,30]
scala> array(0)(0)
res29: Any = zhangsan
scala> array(0).getAs[String]("name")
res30: String = zhangsan

2.3 DataSet

DataSet 是 Apache Spark 中引入的一个新的抽象概念,在 Spark 2.0 版本之后加入。DataSet 是对 DataFrame 的扩展,它提供了更加丰富和类型安全的 API,支持编译时类型检查,并且能够与 Java、Scala 中的强类型对象进行无缝交互。

DataSet 是一种分布式数据集合,类似于 DataFrame,但是 DataSet 是面向对象的,并且支持将分布式数据集合映射为强类型对象。这样,可以在编译时捕获类型错误,并提供更好的代码提示和自动完成功能,减少运行时错误。

DataSet 主要特点包括:

  1. 强类型:DataSet 支持对数据进行强类型编码,可以在编译时发现类型错误。

  2. 面向对象:DataSet 可以将分布式数据集合映射为强类型对象,支持面向对象的数据处理方式。

  3. SQL 支持:DataSet 也支持使用 SQL 进行数据查询和处理,与 DataFrame 一样提供了 SQL 接口。

  4. 惰性求值:DataSet 也是惰性求值的,即在执行数据处理操作时,并不立即进行计算,只有遇到动作操作时才会实际触发计算。

  5. 优化执行:DataSet 的转换操作也会被优化,以提高执行效率。

在使用 Spark 时,可以根据需求选择使用 DataFrame 或 DataSet 进行数据处理。如果需要更加强大的类型检查和面向对象的编程体验,可以选择使用 DataSet。如果需要更加灵活和简便的数据处理方式,可以选择使用 DataFrame。

示例代码(Scala):

import org.apache.spark.sql.{SparkSession, Dataset}

// 定义一个 case class 作为 DataSet 的强类型对象
case class Person(name: String, age: Int)

object DataSetExample {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession 对象
    val spark = SparkSession
      .builder()
      .appName("DataSetExample")
      .master("local") // 在本地运行,也可以设置为集群的 URL
      .getOrCreate()

    // 导入隐式转换,以便将 DataFrame 转换为 DataSet
    import spark.implicits._

    // 创建 DataSet
    val ds: Dataset[Person] = Seq(Person("Alice", 30), Person("Bob", 25)).toDS()

    // 显示 DataSet 中的数据
    ds.show()

    // 关闭 SparkSession
    spark.stop()
  }
}

以上示例中,我们首先定义了一个 case class Person,作为 DataSet 的强类型对象。然后,我们通过 Seq(Person("Alice", 30), Person("Bob", 25)).toDS() 将一个序列转换为 DataSet,并显示其中的数据。

总之,DataSet 是 Apache Spark 中对 DataFrame 的扩展,它提供了更强大、更类型安全的数据处理功能,适用于需要更加严格类型检查和面向对象编程的场景。

DataSet 是具有强类型的数据集合,需要提供对应的类型信息。

2.3.1 创建 DataSet

1) 使用样例类序列创建 DataSet

scala> case class Person(name: String, age: Long)
defined class Person
scala> val caseClassDS = Seq(Person("zhangsan",2)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]
scala> caseClassDS.show
+---------+---+
| name|age|
+---------+---+
| zhangsan| 2|
+---------+---+

2) 使用基本类型的序列创建 DataSet

scala> val ds = Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
+-----+

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet

2.3.2 RDD 转换为 DataSet

SparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSet,case 类定义了 table 的结构,case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。

scala> case class User(name:String, age:Int)
defined class User


scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, 
t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

2.3.3 DataSet 转换为 RDD

DataSet 其实也是对 RDD 的封装,所以可以直接获取内部的 RDD

scala> case class User(name:String, age:Int)
defined class User


scala> sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, 
t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] = [name: string, age: int]


scala> val rdd = res11.rdd
rdd: org.apache.spark.rdd.RDD[User] = MapPartitionsRDD[51] at rdd at 
<console>:25


scala> rdd.collect
res12: Array[User] = Array(User(zhangsan,30), User(lisi,49))

2.4 DataFrame 和 DataSet 转换

DataFrame 其实是 DataSet 的特例,所以它们之间是可以互相转换的。

➢ DataFrame 转换为 DataSet

scala> case class User(name:String, age:Int)
defined class User


scala> val df = sc.makeRDD(List(("zhangsan",30), 
("lisi",49))).toDF("name","age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]


scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]

➢ DataSet 转换为 DataFrame

scala> val ds = df.as[User]
ds: org.apache.spark.sql.Dataset[User] = [name: string, age: int]


scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

2.5 RDD、DataFrame、DataSet 三者的关系

  • 在 SparkSQL 中 Spark 为我们提供了两个新的抽象,分别是 DataFrame 和 DataSet。他们和 RDD 有什么区别呢?首先从版本的产生上来看:

➢ Spark1.0 => RDD

➢ Spark1.3 => DataFrame

➢ Spark1.6 => Dataset

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的 Spark 版本中,DataSet 有可能会逐步取代 RDD和 DataFrame 成为唯一的 API 接口。

2.5.1 三者的共性

➢ RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数据提供便利;

➢ 三者都有惰性机制,在进行创建、转换,如 map 方法时,不会立即执行,只有在遇到Action 如 foreach 时,三者才会开始遍历运算;

➢ 三者有许多共同的函数,如 filter,排序等;

➢ 在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在创建好 SparkSession 对象后尽量直接导入)

➢ 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出

➢ 三者都有 partition 的概念

➢ DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型

2.5.2 三者的区别

  1. RDD
  • ➢ RDD 一般和 spark mllib 同时使用
  • ➢ RDD 不支持 sparksql 操作
  1. DataFrame
  • ➢ 与 RDD 和 Dataset 不同,DataFrame 每一行的类型固定为 Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
  • ➢ DataFrame 与 DataSet 一般不与 spark mllib 同时使用
  • ➢ DataFrame 与 DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql 语句操作
  • ➢ DataFrame 与 DataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头,这样每一列的字段名一目了然(后面专门讲解)
  1. DataSet
  • ➢ Dataset 和 DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame 其实就是 DataSet 的一个特例 type DataFrame = Dataset[Row]
  • ➢ DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS 方法或者共性中的第七条提到的模式匹配拿出特定字段。而 Dataset 中,每一行是什么类型是不一定的,在自定义了 case class 之后可以很自由的获得每一行的信息。

2.5.3 三者的互相转换

在这里插入图片描述

2.6 IDEA 开发 SparkSQL

实际开发中,都是使用 IDEA 进行开发的。

2.6.1 添加依赖

<dependency>
	 <groupId>org.apache.spark</groupId>
	 <artifactId>spark-sql_2.12</artifactId>
	 <version>3.0.0</version>
</dependency>

2.6.2 代码实现

object SparkSQL01_Demo {
	 def main(args: Array[String]): Unit = {
 	//创建上下文环境配置对象
		 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
		 //创建 SparkSession 对象
		 val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
		 //RDD=>DataFrame=>DataSet 转换需要引入隐式转换规则,否则无法转换
		 //spark 不是包名,是上下文环境对象名
		 import spark.implicits._
		 //读取 json 文件 创建 DataFrame {"username": "lisi","age": 18}
		 val df: DataFrame = spark.read.json("input/test.json")
		 //df.show()
		 //SQL 风格语法
		 df.createOrReplaceTempView("user")
		 //spark.sql("select avg(age) from user").show
		 //DSL 风格语法
		 //df.select("username","age").show()
		 //*****RDD=>DataFrame=>DataSet*****
		 //RDD
		 val rdd1: RDD[(Int, String, Int)] = 
		spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",28),(3,"wangwu",
		20)))
		 //DataFrame
		 val df1: DataFrame = rdd1.toDF("id","name","age")
		 //df1.show()
		 //DateSet
		 val ds1: Dataset[User] = df1.as[User]
		 //ds1.show()
		 //*****DataSet=>DataFrame=>RDD*****
		 //DataFrame
		 val df2: DataFrame = ds1.toDF()
		 //RDD 返回的 RDD 类型为 Row,里面提供的 getXXX 方法可以获取字段值,类似 jdbc 处理结果集,
		但是索引从 0 开始
		 val rdd2: RDD[Row] = df2.rdd
		 //rdd2.foreach(a=>println(a.getString(1)))
		 //*****RDD=>DataSet*****
	 	rdd1.map{
	  		case (id,name,age)=>User(id,name,age)
		 }.toDS()
		 //*****DataSet=>=>RDD*****
		 ds1.rdd
		 //释放资源
		 spark.stop()
	 }
}
case class User(id:Int,name:String,age:Int)

2.7 用户自定义函数

用户可以通过 spark.udf 功能添加自定义函数,实现自定义功能。

2.7.1 UDF

  1. 创建 DataFrame
scala> val df = spark.read.json("data/user.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, username: string]
  1. 注册 UDF
scala> spark.udf.register("addName",(x:String)=> "Name:"+x)
res9: org.apache.spark.sql.expressions.UserDefinedFunction = 
UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
  1. 创建临时表
scala> df.createOrReplaceTempView("people")
  1. 应用 UDF
scala> spark.sql("Select addName(name),age from people").show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/783112.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

学习记录681@Gitlab升级实战

前言 我的Linux目前是centos8&#xff0c;目前使用的gitlab是从https://mirrors.tuna.tsinghua.edu.cn/ 下载下来的gitlab-ce-12.10.1-ce.0.el8.x86_64.rpm&#xff0c;然后安装的。 这里需要注意如果是centos8需要下载el8的gitlab&#xff0c;如果是centos7需要下载el7的git…

golang - 下载大文件,实时返回前端下载进度,实现下载进度条

示例&#xff1a; package mainimport ("fmt""io""net/http""os""path"//"github.com/kataras/iris""github.com/kataras/iris/v12""time" )func doSomething() {time.Sleep(time.Second * …

大数据学习04-Hbase分布式集群部署

系统环境&#xff1a;centos7 软件版本&#xff1a;jdk1.8、zookeeper3.4.8、hadoop2.8.5 一、下载 HBASE官网 cd /home/toolswget https://archive.apache.org/dist/hbase/2.2.4/hbase-2.2.4-bin.tar.gz二、解压 tar -zxvf hbase-2.2.4-bin.tar.gz -C /home/local/移动目…

【弹力设计篇】聊聊降级设计

我们知道在分布式系统中&#xff0c;故障是不可避免的&#xff0c;所以我们需要设计一个高可用的系统&#xff0c;对于接口层面除了幂等&重试机制&#xff0c;还需要保证接口高可用&#xff0c;因此 限流&排队&降级&熔断也需要考虑。本篇主要介绍下接口故障下降…

Qt 之 自定义json配置文件类,QJsonDocument应用

目录 一、前言 二、头文件代码 三、源文件代码 四、使用示例 五、使用效果 一、前言 Qt的配置类QSettings主要是键值结构的配置&#xff0c;若需要的配置项为树形结构&#xff0c;例如配置学校\学院\班级\学生这样&#xff0c;使用键值结构已经不满足我们的需求了&#xf…

【计算机视觉 | 图像分割】arxiv 计算机视觉关于图像分割的学术速递(7 月 21 日论文合集)

文章目录 一、分割|语义相关(14篇)1.1 CNOS: A Strong Baseline for CAD-based Novel Object Segmentation1.2 Spinal nerve segmentation method and dataset construction in endoscopic surgical scenarios1.3 WeakPolyp: You Only Look Bounding Box for Polyp Segmentatio…

【unity】模型裁剪shader(建筑生长动画)

【unity】模型裁剪shader&#xff08;建筑生长动画&#xff09; 思路 使用的核心方法是clip,当传入正值时渲染&#xff0c;传入负值时不渲染。定义一个裁剪向量&#xff0c;使用裁剪向量和模型点点乘&#xff0c;如果模型点和裁剪向量是同一个方向&#xff0c;点乘为正&#…

代码随想录算法训练营第58天|739 496

739 用stack来写 stack里面发index 不要放数值 重点在于 1.填写result数组不需要按顺序填写 根据index就可以 2.遍历的值比top小的话就放入stack 这样stack里面是一个递减数组 遍历的值只需和top比 如果比他大就pop 一直到把stack里面比新加入的值小的都pop完为止 这样stack里…

vue项目的vue.config.js在打包过程中,并不会处理api请求。

主要处理打包选项和静态资源文件 请求是axios处理的

nonebot2聊天机器人插件12:stable_diffusion_webui_api

nonebot2聊天机器人插件12&#xff1a;stable_diffusion_webui_api 1. 插件用途2. 代码实现3. 实际效果 该插件涉及知识点&#xff1a;定时器&#xff0c;调用bot的api发送消息 插件合集&#xff1a;nonebot2聊天机器人插件 该系列为用于QQ群聊天机器人的nonebot2相关插件&…

IPO向上,大模型向下:中国企服寻找新「出口」

2023年&#xff0c;资本市场给企服行业带来的动荡&#xff0c;无疑是一次洗牌机会。只有当SaaS企业深耕产业侧&#xff0c;才能找到实现标准化的解法&#xff0c;才能在一波又一波的浪潮下抓住机遇。 作者|思杭 编辑|皮爷 出品|产业家 2023上半年&#xff0c;企服行业在…

MySQL存储过程——系统变量

1.存储过程中的变量 1.1 查看系统变量 查看所有的系统变量 show variables;查看会话级别的系统变量 show session variables&#xff1b;查看会话和auto相关的变量 show session variables like auto%;查看全局的和auto相关变量 show global variables like auto%;查看某一…

js的几种排序

冒泡排序&#xff1a; function bubbleSort(arr) {var len arr.length;for (var i 0; i < len; i) {for (var j 0; j < len - 1 - i; j) {if (arr[j] > arr[j1]) { //相邻元素两两对比var temp arr[j1]; //元素交换arr[j1] arr[j];arr[j] temp;}}…

进程(process)与线程(thread)以及线程的三种实现方法

一、线程和进程区别 说起进程&#xff0c;就不得不说下程序。程序是指令和数据的集合&#xff0c;其本身没有任何运行的含义&#xff0c;是一个静态的概念。 而进程则是执行程序的一次执行过程&#xff0c;它是一个动态的概念。是系统资源分配的单位。 通常在一个进程中可以…

BGP对SR-MPLS的支持

目录 BGP的SID类型 BGP Prefix-SID BGP Anycast-SID BGP Peer-SID BGP SID的通告 通过Prefix-SID属性 通告Prefix-SID 通过BGP EPE 通告Peer-SID 为什么要使用BGP作为SR-MPLS的控制平面 IGP for SR-MPSL只可以在自治系统AS内分配SID&#xff0c;规划出AS域内的最优路径 …

第12章 STM32+BH1750光照传感器+OLED模块显示环境光照强度

今天给大家介绍一块嵌入式毕设中也经常用到的一款传感器——BH1750光照传感器&#xff0c;如下图。&#xff08;该传感器的购买链接和代码我已放在资料里&#xff0c;想要资料的同学&#xff0c;评论区留下邮箱即可&#xff09;相比光敏传感器&#xff0c;它可以直接输出环境光…

【Unity2D】设置一物体默认在其他物体之上不被遮挡

比如我想让机器人显示在箱子的前面。 点击箱子&#xff0c;将其层级设置在机器人的后面。 即修改箱子的Order in Layer 在机器人之后 物体默认的Order in Layer 都是0 &#xff0c;将箱子的Order in Layer修改为-1即可 这样将确保先绘制机器人&#xff0c;然后绘制箱子。这样…

ConstraintLayout(约束布局)替代LinearLayout权重,解决多View一行省略问题

1.看上面的设计图中圈红的地方&#xff1a;左边设计图是一张直播间消息流&#xff0c;其中标红的消息流意思是&#xff1a; 用户的等级标签&#xff08;一张图片&#xff09; 用户名字写死的文案send,要求这三个View写一行&#xff0c;但是当用户名字过长时会让用户名出现.....…

数据结构双向循环链表,增删改查基本操作

一、双向循环链表的描述 和单链表的循环类似&#xff0c;双向链表也可以有循环表&#xff0c;循环表的引进是为了弥补双向链表不能向前遍历的弊端。 在双向循环链表中&#xff0c;头结点的直接前驱为尾结点&#xff0c;而尾结点的直接后继为头结点。 二、双向循环链表的存储结…

黑马B站视频JAVA部分的知识与学习-【思维导图知识范围】

JAVA本系列黑马的JAVA学习路线–详解JAVA部分的学习语言视频选择收录专辑链接C张雪峰推荐选择了计算机专业之后-在大学期间卷起来-【大学生活篇】JAVA黑马B站视频JAVA部分的知识范围、学习步骤详解JAVAWEB黑马B站视频JAVAWEB部分的知识范围、学习步骤详解SpringBootSpringBoot知…