Spark 3.0 - 2.机器学习核心 DataFrame 应用 API 与操作详解

news2025/1/10 3:14:57

目录

一.引言

二.创建 DataFrame

1.CreateDataFrame

2.RDD toDF By Spark implicits

3.By Read Format File

三.常用处理 API

1.select 选择

2.selectExpr 表达式

3.collect / collectAsList 收集

4.count 统计

5.limit 限制

6.distinct 去重

7.filter 过滤

8.map 一对一

9.flatMap 一对多

10.drop 删除列

11.sort / orderBy 排序

四.常用采样 API

1.sample 采样

2.randomSplit 划分

五.数据互转

1.DF -> RDD

2.DF -> DS

3.DS -> DF

4.DS -> RDD

5.RDD -> DF

6.RDD -> DS

六.总结


一.引言

DataFrame 实质上是存在于不同节点计算机中的一张关系型数据表,RDD 可以看做是 DataFrame 的前身,DataFrame 是 RDD 的扩展。RDD 中可以存储任何类型的数据,但是直接使用 RDD 在字段需求明显时存在算子难以复用的缺点,这时候如果需要使用 RDD 我们需要定义相对复杂的处理逻辑,而通过 DataFrame 则可以通过列式存储数据的优势,快速将算子应用在多个列上,提高开发效率,例如我们可以一行代码求 A 列的 SUM,B 列的 MAX,C 列的 MIN,而使用 RDD 则需要 GroupBy 或者相对复杂的 ProcessFunction。

二.创建 DataFrame

DataFrame 可以看做是 RDD[Row] + Schema,通过 Schema 指定每一列的属性,从而使得框架能够了解数据的结构与类型,Spark 实际使用中 Schema 需要通过 StrucType 类并指定每个字段的 StructFields ,域中明确了列名、数据类型以及一个 Boolean 参数代表该字段是否可以为空。

1.CreateDataFrame

首先通过二元数组生成 RDD[Row],随后通过 StructType + StrucField 定义每一列数据的类型,这里第一列为 Name,String 类型,不可以为 null,第二列为 Age,Int 类型,可以为 null。 

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}  

    val spark = SparkSession
      .builder()
      .master("local")
      .appName("TestDataFrame")
      .getOrCreate()

    val sc = spark.sparkContext
    sc.setLogLevel("error")

    // 1.1 Create DataFrame
    val schema =
      StructType(
        StructField("name", StringType, false) ::
        StructField("age", IntegerType, true) :: Nil
      )

    val random = scala.util.Random
    val peoples = sc.parallelize((0 to 100).map(_ => {
      val name = random.nextString(5)
      val age = random.nextInt(50)
      (name, age)
    }))

    val dataFrame = spark.createDataFrame(peoples.map(p => Row(p._1, p._2)), schema)

    dataFrame.printSchema()

    dataFrame.createOrReplaceTempView("people")
    spark.sql("select name from people").collect().foreach(println(_))

可以通过 printSchema 输出 DataFrame 的 schema,也可以通过 createOrReplaceTempView 注册临时表,并最终通过 sql 执行相关语句,这在一定程度上与 Flink SQL 很类似:

Tips:

这里初始化 Schema 的 StructType 时用到了 Scala List 的简易写法,其中 :: 代表连接列表元素,例如 A :: B :: C :: Nil 可以看做是 List[A, B, C],除此之外,还可以通过 ::: 三个冒号连接两个列表,代表二者的 concat 合并,例如 A :: B :: C :: Nil ::: DDD :: Nil,其实就是 List[A, B, C, D]。感兴趣的同学可以自己本地测试下述 Demo。

     val site = "A" :: "B" :: "C" :: Nil ::: "DDD" :: Nil
     println(site.length)
     site.foreach(println(_))

2.RDD toDF By Spark implicits

常见的方法除了生成 RDD 再指定 Schema 外,也可以引入 spark.implicits._ 隐式转换,通过 RDD.toDF() 方法转换生成 DataFrame,此时 Spark 可以自动推断 DF 的 Schema。

    import spark.implicits._
    val peopleDF = peoples.toDF("name", "age")
    peopleDF.printSchema()

可以看到自动 infer 推断得到的 Schema nullable 与我们上面自定义的 Schema 是反的,上面是 Name 不为空, Age 可以为空,自动推断的结果是 Name: String 可以为空,而 Age: Int 不能为空。

3.By Read Format File

上面两种方法都用到了 RDD 并做转换,还有一种方式可以直接生成 DataFrame,即读取指定 format 的文件,例如 CSV、Json、Parquet 等等,下面示例 Json 的读取方法:

    val peopleFromJson = spark.read.schema(schema).json("people.json")
    peopleFromJson.collect().foreach(println(_))

Json 文件中数据如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

三.常用处理 API

1.select 选择

    peopleDF.select("name").take(10)

select 用于选择 dataframe 中某些列:

2.selectExpr 表达式

    peopleDF.selectExpr("name as NAME").printSchema()

基于 dataframe 原有列进行特殊处理,可以指定不同  SQL 表达式,修改列名后 DataFrame 对应的 schema 也相应改变:

3.collect / collectAsList 收集

    peopleDF.collect().take(5).foreach(println(_))

    peopleDF.collectAsList().forEach(x => println(x))

二者均可以将 DataFrame 的数据拉取到本级内存或任务 Master 上,唯一区别在于返回值类型,前者为 Scala Array[T],后者为 Java List<T>。

4.count 统计

    println(peopleDF.count())

count 用于获取 DataFrame 的行数,除此之外,count 也视为 Spark 的 Action 算子,可以触发 Spark 执行逻辑。

5.limit 限制

    peopleDF.limit(5).show()

用于限制表中数据,这里可以理解为 TopN。

6.distinct 去重

    println(peopleDF.distinct().count())

可以实现数据集中的重复项。

7.filter 过滤

    println(peopleDF.filter("age > 18").count())

按照条件对数据集进行过滤。

8.map 一对一

    val rdd = sc.parallelize(Seq("hello,spark", "hello,hadoop"))

    rdd.toDF("id").map(x => "str:" + x).show()

用于数据集处理的一一映射。

9.flatMap 一对多

    val rdd = sc.parallelize(Seq("hello,spark", "hello,hadoop"))
   
    rdd.toDF("id").flatMap(x => x.toString().split(",")).show()

对数据集整体操作,并最终展平,可以看做是 map + flatten 的组合体。

10.drop 删除列

    peopleDF.drop("name").printSchema()

删除某一列。

11.sort / orderBy 排序

   dataFrame.sort(dataFrame("age").asc_nulls_first).show()

   dataFrame.orderBy(dataFrame("age").asc_nulls_first).show()

根据数据集中某个字段排序,其中 asc 与 desc 可以选择升序与降序,除此之外还新添加了 asc_nulls_first、desc_nulls_first、asc_nulls_last、desc_nulls_last 分别指定排序类型与排序结果中缺失值在前还是在后展示。

四.常用采样 API

除了基础处理函数外,DataFram 还提供采样 API,这对于机器学习场景中数据集的划分十分有效。

1.sample 采样

    dataFrame.sample(false, 0.8, 10).show()

三个参数分比为:

withReplacement : 是否放回,false 代表不放回,true 为放回

ratio : 代表采样比例,注意最终数量不一定完全符合比例

seed : 随机种子,如果 seed 不变,则采样结果不变

上述代码代表以 seed=10,不放回采样原始数据的 80% 左右数据

2.randomSplit 划分

   val split = dataFrame.randomSplit(Array(0.25, 0.75), 10) // 按比例划分
    println(split(0).count())
    println(split(1).count())

通过 Array 指定划分比例,数组中有多少权重就会生成多少个 DataFrame,如果权重和不为1,spark 会自动将其标准化,这在生成训练集、测试集、验证集时非常常用,除此之外,划分分组也需要指定随机种子 seed。

上述代码代表以 seed = 10,以 1:3 的比例划分数据集。

五.数据互转

前面我们已经提到 DataFrame = RDD[Row] + schema,除此之外,还有 DataFrame = DataSet[Row],可以看到 RDD 是特殊的 DataFrame,DataFrame 又是特殊的 DataSet,通过 spark.implicits._ 可以实现三者的轻松转换。

1.DF -> RDD

    val rdd1 = dataFrame.rdd // DF -> RDD

2.DF -> DS

    val ds = dataFrame.as[Person] // DF -> DS

这里 Person 为 case class:

  case class Person(name: String, age: Int)

注意不要将 case class 放下 main 函数内,否则代码编译会报错无法编码。

3.DS -> DF

    val df = ds.toDF() // DS -> DF

4.DS -> RDD

    val rdd2 = ds.rdd // DS -> RDD

5.RDD -> DF

    val df2 = rdd.toDF("name") // RDD -> DF

6.RDD -> DS

    val ds2 = rdd.map(x => Person(x, 1)).toDS() // RDD -> DS

Tips:

从上面的转换可以看出,携带信息多的数据类型向携带数据少的类型转换无需提供额外信息,例如 DS 或者 DF 转 RDD,只需要 .rdd 方法即可,而信息少的数据向信息多的数据转换时则需补充额外信息,例如 RDD 或者 DF 转换至 DS,都需要补充 DS[T] 的类信息 T,上述实例中 T 为 Person。

六.总结

DataFrame 是 Spark 机器学习的基础也是核心,后续章节的大部分 DataFrame 操作都将基于上述操作实现或拓展。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/13916.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Matlab:创建分类数组

Matlab&#xff1a;创建分类数组基于字符串数组创建分类数组添加新元素和缺失的元素基于字符串数组创建有序分类数组基于分 bin 数值数据创建有序分类数组此示例说明如何创建分类数组。categorical 是一个数据类型&#xff0c;用来存储值来自一组有限离散类别的数据。这些分类可…

FTX 深度数据复盘

Nov 2022, Sabrina Data Source: Footprint Analytics Dashboards 11月2日&#xff0c;Coindesk 公布了 Alameda 的私人财务文件&#xff0c;这是一家由 FTX 创始人 Sam Bankman-Fried 拥有的风险投资和交易公司&#xff0c;与该交易所密切相关&#xff0c;从而引发了加密货币…

Spring Framework 6.0 框架

Spring Framework 6.0.0 现已正式发布。 “这是 2023 年及以后新一代框架的开始&#xff0c;拥抱了 OpenJDK 和 Java 生态系统中当前和即将到来的创新。同时&#xff0c;我们将其精心设计为针对现代运行时环境的 Spring Framework 5.3.x 的直接升级。” 作为核心框架的重大修订…

脑肽载体Angiopep-2、906480-05-5、TFFYGGSRGKRNNFKTEEY

Angiopep-2 hydrochloride 是脑肽载体。抗肿瘤药物与 Angiopep-2 肽载体的结合可提高其在脑癌中的活性. Angiopep-2 hydrochloride is a brain peptide vector. The conjugation of anticancer agents with the Angiopep-2 peptide vector could increase their efficacy in th…

【正点原子FPGA连载】 第三章 硬件资源详解 摘自【正点原子】DFZU2EG/4EV MPSoC 之FPGA开发指南V1.0

1&#xff09;实验平台&#xff1a;正点原子MPSoC开发板 2&#xff09;平台购买地址&#xff1a;https://detail.tmall.com/item.htm?id692450874670 3&#xff09;全套实验源码手册视频下载地址&#xff1a; http://www.openedv.com/thread-340252-1-1.html 第三章 硬件资源…

提升Mac运行速度的十大小技巧,你用过几个?

经常听到小伙伴在抱怨PC电脑很慢&#xff0c;但是其实Mac电脑随着用的时间增长&#xff0c;运行速度也会越来越慢&#xff0c;那么造成Mac运行慢的原因有很多&#xff0c;可能是操作系统过时未更新&#xff0c;也可能是内存&#xff08;RAM&#xff09;不足&#xff0c;以下小编…

终于把下载安装更新的功能整出来了,记录关键点

我的第一个安卓应用终于也有了APP内安装更新的功能&#xff08;赶上末班车了吗&#xff09;&#xff0c;记录一些关键点&#xff0c;方方面面的。 托管检测更新和下载服务 由于没有服务器&#xff0c;这两个核心功能可以托管到一些比较好的平台。检测我用的是蒲公英分发&…

MySQL8.0分析查询语句EXPLAIN

文章目录学习资料分析查询语句EXPLAINidselect_typepartitions&#xff08;可略&#xff09;type【重点】possible_keys和keykey_len【重点】refrows【重点】filteredExtra【重点】EXPLAIN四种输出格式传统格式JSON格式SHOW WARNINGS的使用学习资料 【MySQL数据库教程天花板&a…

《深度学习进阶 自然语言处理》第五章:RNN通俗介绍

文章目录5.1 概率和语言模型5.1.1 概率视角下的word2vec5.1.2 语言模型5.1.3 将CBOW模型用作语言模型的效果怎么样&#xff1f;5.2 RNN5.2.1 循环神经网络5.2.2 展开循环5.2.3 Backpropagation Through Time5.2.4 Truncated BPTT5.2.5 Truncated BPTT的mini-batch学习5.3 RNN的…

会话跟踪技术。

目录 一、会话跟踪技术 二、Cookie 介绍 1、Cookie 基础 2、Cookie 使用细节 三、Session 介绍 1、Session 基本介绍 2、Session的原理分析 3、Session的使用细节 一、会话跟踪技术 ▶ 会话 会话:用户打开浏览器&#xff0c;访问web服务器的资源&#xff0c;会话建立&a…

SAP 直接外部数据库技术配置手册-Oracle

一、操作步骤: 1、SAP Basis配置TNS文件:tnsnames.ora 事务码AL11下的 DIR_SETUPS变量D:\usr\sap\<SID>\SYS\profile双击进入文件路径oracle可以查看到文件 tnsnames.ora (不是路径D:\oracle\<SID>\102\NETWORK\ADMIN下的tnsnames.ora文件),加入如下信息(…

cubeIDE开发, stm32的WIFI通信设计(基于AT指令)

一、stm32的WIFI配置 通常WIFI模块就是一个独立的单片机&#xff0c;只是内置了WFIF通信软件的单片机&#xff0c;并该通信软件提供了AT通信指令集给开发人员&#xff0c;基于这些指令集我们就可以针对项目需要进行二次集成开发出所需的业务应用软件。 本文本文采用的开发板是s…

一、什么是计算机网络

1.1 概述 信件的要素&#xff1a; 打电话时包括连接和接通过程&#xff0c;要关注包括拨打者的状态和接听者的状态&#xff0c;称为TCP连接。发短信时只要发送者将短信发送出去即可&#xff0c;是否被接收或者发送的过程中是否有丢失这些都不关注&#xff0c;称之为UDP连接。计…

CentOS7安装jdk

文章目录前言准备工作一、将jdk的压缩文件传递到虚拟机里面二、解压缩三、配置环境变量前言 在大数据的技术中&#xff0c;Linux的环境是基础&#xff0c;jdk则是这些大数据工具的基础&#xff0c;在这篇博文中&#xff0c;我们主要介绍如何在Linux环境里安装jdk&#xff0c;以…

MySQL8.0优化 - 索引的数据结构

文章目录学习资料索引的数据结构B树常见索引概念聚簇索引特点优点缺点限制二级索引&#xff08;辅助索引、非聚簇索引&#xff09;回表联合索引Innodb的B树索引注意事项1、根页面位置万年不动2、内节点中目录项记录的唯一性3、一个页面最少存储2条记录索引的代价学习资料 【My…

Python可视化必备,在Matplotlib/Seaborn中轻松玩转图形拼接!

数据展示时&#xff0c;在同一页面上混合排版多个图形是一种常见的用法。 本次分享一个Python轮子patchworklib&#xff1a; 通过|、/轻松实现图形排列&#xff1b;比matplotlib、seaborn等自带子图功能更加灵活&#xff1b;灵感源于R中的patchwork。目录 在Matplotlib中使用…

【Java学习笔记】第四章 面向对象编程三部曲(中)

【Java学习笔记】第四章 面向对象编程三部曲&#xff08;上&#xff09; 【Java学习笔记】第四章 面向对象编程三部曲&#xff08;中&#xff09; 【Java学习笔记】第四章 面向对象编程三部曲&#xff08;下&#xff09; 文章目录5. 面向对象编程&#xff08;中&#xff09;5…

gdb调试 入门

程序的调试过程主要有&#xff1a;单步执行&#xff0c;跳入函数&#xff0c;跳出函数&#xff0c;设置断点&#xff0c;设置观察点&#xff0c;查看变量。 You can run "gdb" with no arguments or options; but the most usualway to start GDB is with one argume…

ANDROID ROOT FIDDLER HTTPS 抓包

参考 adb 修改手机代理方式_userwyh的博客-CSDN博客_adb shell settings put global http_proxy 手机模拟器安装证书并抓包_虚无-缥缈的博客-CSDN博客_模拟器安装证书 安卓手机使用adb添加系统证书方法 - 知乎 设置设备代理&#xff08;需要ROOT 设置代理&#xff1a; adb…

【重新安装Anaconda心得】

文章目录&#xff08;一&#xff09;环境变量设置&#xff08;二&#xff09;Anaconda添加镜像源【可以使用境外流量不用添加】&#xff08;三&#xff09;创建虚拟环境的细节&#xff08;四&#xff09;补充&#xff1a;conda的常用命令&#xff08;一&#xff09;环境变量设置…