Spark 3.0 - 4.Pipeline 管道的工作流程

news2024/12/28 18:25:24

目录

一.引言

二.基本组件

三.Pipeline 基本流程

1.训练 Pipeline - Estimator

2.预测 Pipeline - Transformer

四.Pipeline 分解与构造

1.DataFrame

2.Transformer1 - Tokenizer

3.Transformer2 - HashingTF

4.Estimator - LR 

5.Pipeline With ParamMap - Estimator

5.1.配置 Pipeline

5.2.配置 ParamMap

5.3 Pipeline.fit

6.Pipeline With ParamMap - Transformer

6.1 模型存储与加载

6.2 Model transformer

7.完整代码

五.总结


一.引言

Spark ML 使用管道 Pipeline 就像 Python Sklearn 一样,可以把多个步骤例如 特征处理 -> 特征提取 -> 模型训练 等联结起来,让数据在 Pipeline 中流动。有了 Pipeline 之后,ML 更适合创建包含从数据清洗到特征工程再到模型训练等一系列流程中,无论什么模型都提供了统一的算法操作接口即 fit(),下面让我们看下管道的基本组件与流程示例。

二.基本组件

- DataFrame

数据源,也是 Spark Sql 中的概念,可以容纳多种数据类型用来保存数据。例如,一个 DataFrame 可以存储文本、标签、特征向量等不同列。可以说 ML 的所有基本 API 最终都需要以源头的 DataFrame 数据为主。

- Transformer

转换器,和 Spark、Flink 里的 Transformer 类似,例如 RDD -> RDD、DataStream -> DataStream,这里 Transformer 负责将 DataFrame 转换为 DataFrame。每个 Tansformer 都有一个 transform 方法,负责在原有 DataFrame 的基础上添加一个或者多个列得到新的 DataFrame。例如将原始数据转换,并增加一列新的特征向量。

- Estimator

Estimator 负责根据样本 fit 训练得到一个模型,模型的本质也是 Transformer,因为给定一个 DataFrame 数据集,模型可以转化得到一个新的预测标签列,所以 Estimator 就是调用 fit 方法并最终得到一个 Transformer。LR、SVM、PCA 等都可以看做是 Estimator。

- Pipeline

管道,Pipeline 将多个 Transformer 与 Estimator 连接起来并按顺序确定一个机器学习的工作流程。届时管道里的每一步都可以看做是一个 Stage,Stage 可以是 Transformer 也可以是 Estimator,就像 Spark 的 Stage 一样,一个任务流程图 Graph 梳理好后,每一步的组件都是固定的。

- Parameter

通用 API,由于 Pipeline 中可能存在多个 Transformer 与 Estimator,使用 Builder 的形式不易统一维护,所以可以使用 Parameter 一次性定义好所以参数,就像 SparkConf 一样。

三.Pipeline 基本流程

Pipeline 是一个管道,包含一个或多个 Transformer 与 Estimator,但是一个完整的 Pipeline 本质上也是 Transformer 或者 Estimator,区分 Pipeline 属于哪个类型,看其对应方法即可,如果调用 fit() 方法,那它就是 Estimator,如果调用 transform 方法,那它就是 Transformer。一般来说训练模型的 Pipeline 是 Estimator,通过模型预测结果的是 Transfomer。

1.训练 Pipeline - Estimator

下面通过基础的文本处理 LR 算法介绍了 Pipeline - Estimator 流程:

其中 Tokenizer 与 HashingTF 为 Transformer,负责数据的预处理与特征转化,最后的 Logistic Regression 是 Estimator,其负责 fit 上一个 Stage 送来的特征数据并得到模型。由于 Pipeline 的最后一个 Stage 是 Estimator,所以该 Pipeline 调用 fit() 方法,其类型也对应 Pipeline - Estimator。

Tips:

每个 Transformer 与 Estimator 都有一个唯一的 uid,可以视为当前 stage 的标识,用于保存对应的参数,即使是相同的类型也能有相同的 ID。

2.预测 Pipeline - Transformer

对于同一套流程的 Estimator 和 Transfomer 的 Pipeline 流程整体区别很小:

可以看到主要差别在 Pipeline 的最后一个 Stage,虽然依然是 Logistic Regression,但是 model 不再调用 fit() 方法训练模型,而是调用 transform() 预测最终结果,所以 Pipeline 的类型也随着最后一个 Stage 而转换为 Pepeline - Transformer,其余处理流程相同。下面通过实例实现展示如何实现一个 Pipeline 训练与预测。

四.Pipeline 分解与构造

上面介绍了 Pipeline 五大组件与一个 LR 的文本处理流程,下面示例将基于图中的组件一一介绍,并最终合并为完整的 Pipeline。

1.DataFrame

    // 准备数据(id, text, label).
    val training = spark.createDataFrame(Seq(
      (0L, "a b c d e spark", 1.0),
      (1L, "b d", 0.0),
      (2L, "spark f g h", 1.0),
      (3L, "hadoop mapreduce", 0.0)
    )).toDF("id", "text", "label")

本地模拟样本数据,其中 text 未未分词的文档内容,id 代表序号,Label 代表正负样本,由于是模拟样本,实际场景下,text 可以是评论或者留言,而 Label 可以标识该 text 是积极评论还是消极评论,或者留言是正向还是负向。

2.Transformer1 - Tokenizer

Tokenizer 为文档单词提取器,其利用分词将每个文档的文本拆分为单词。

    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")

    val info = hashingTF.transform(training)

- setInputCol 

设置输入文本列,本例下即为 text,例如 hadoop mapreduce。

- setOutputCol

设置分词后的输出列,为原始 DataFrame 新增一列分词列,其包含 text 后分词的结果,类型为 WrappedArray,单独调用 transform 后得到新的一列,其中文本已分词完毕。

      [0,a b c d e spark,1.0,WrappedArray(a, b, c, d, e, spark)]
      [1,b d,0.0,WrappedArray(b, d)]
      [2,spark f g h,1.0,WrappedArray(spark, f, g, h)]
      [3,hadoop mapreduce,0.0,WrappedArray(hadoop, mapreduce)]

3.Transformer2 - HashingTF

HashingTF 负责特征的向量化,负责将每个文档对应的单词转换为数值型的特征向量 Vector。

    val hashingTF = new HashingTF()
      .setNumFeatures(1000)
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
       
    val info = hashingTF.transform(tokenizer.transform(training))

- setNumFeatures

标识词库维度,例如你的评论词库规定为 10000 维,这使用 HashingTF hash 得到的特征列维度为 10000 维。

- setInpuCol 

设置输入列,这里输入列即为上面 Tokenizer 生成的 WrappedArray()。

- setOutputCol

设置输出列,本例中输出列名为 features,后续与 label 组合可供 Estimator fit 使用。

HashingTF 共返回三维数据 (featurNum, IndexArray, IndexCount),分别为特征数,特征映射后的 HashId 数组以及对应 HashId 的出现次数。

Tips1:

将 hadoop mapreduce 修改为 hadoop mapreduce hadoop spark spark 后,可以看到对应单词 HashId 的次数由 1 变为 2。

Tips2:

numFeatures 特征数,NLP 场景也可以理解为词库大小,默认值为 262144,如果 numFeatures 严重小于真实词库大小,会出现 hash 到同一分桶的情况,影响模型区分度。修改 numFeatures = 2 可以看到虽然有多个 word,但是 hashId 只有2维:

4.Estimator - LR 

Logistic Regression 这里不再赘述,上文我们做了详细的参数介绍,大家可以参考。

    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(0.001)

    lr.fit(hashingTF.transform(tokenizer.transform(training)))

启动后即可显示 LR 运行日志:

5.Pipeline With ParamMap - Estimator

    lr.fit(hashingTF.transform(tokenizer.transform(training)))

5.1.配置 Pipeline

回看上面的代码,transform + transform + fit 其实就是一个完整的 pipeline 管道流程,下面我们使用 Spark ML API 配置该管道,后续只需调用 pipeline.fit 即可实现与上面代码相同的效果,条理清晰了很多且易于管理。

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

5.2.配置 ParamMap

随着 Transformer 和 Estimator 数量的增加,每次使用 Builder 分别设置每个 Satage 维护起来非常麻烦,我们希望每个 Pipeline 的多个 Stage 只需一个 properties 维护,这就是 ParamMap 的由来,将 Tokenizer 、HashingTF 与 LR 的参数统一至 ParamMap 中。

    val paramMap = ParamMap(lr.maxIter -> 20, lr.regParam -> 0.01)
      .put(tokenizer.inputCol -> "text", tokenizer.outputCol -> "words")
      .put(hashingTF.numFeatures -> 1000, hashingTF.inputCol -> "words", hashingTF.outputCol -> "features")

5.3 Pipeline.fit

    // 调用fit()函数,训练数据
    val model = pipeline.fit(training, paramMap)

通过上面一通操作,我们的 pipeline 终于构建好了,现在调用 fit 方法即可训练模型。

6.Pipeline With ParamMap - Transformer

6.1 模型存储与加载

fit 得到的模型可以存储并根据响应地址加载

可以将训练好的pipeline输出到磁盘
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

也可以直接将为进行训练的pipeline写到文件
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

然后加载到出来
val pipelineModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

6.2 Model transformer

    // 准备(id, text) 这个格式未打标签的数据进行测试
    val test = spark.createDataFrame(Seq(
      (4L, "spark i j k"),
      (5L, "l m n"),
      (6L, "spark hadoop spark"),
      (7L, "apache hadoop")
    )).toDF("id", "text")

    // 在测试集上进行预测
    pipelineModel.transform(test)
      .select("id", "text", "probability", "prediction")
      .collect()
      .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
        println(s"($id, $text) --> prob=$prob, prediction=$prediction")
      }

再次调用 pipelineModel transform 方法即可完成预测流程。

7.完整代码

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// $example off$
import org.apache.spark.sql.SparkSession

object PipelineExample {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder      //创建spark会话
      .master("local")  //设置本地模式
      .appName("PipelineExample")  //设置名称
      .getOrCreate()   //创建会话变量

    // $example on$
    // 准备数据(id, text, label).
    val training = spark.createDataFrame(Seq(
      (0L, "a b c d e spark", 1.0),
      (1L, "b d", 0.0),
      (2L, "spark f g h", 1.0),
      (3L, "hadoop mapreduce", 0.0)
    )).toDF("id", "text", "label")

    // 配置一个包含三个stage的ML pipeline: tokenizer, hashingTF, and lr.
    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setNumFeatures(1000)
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(0.001)
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr))

    // 调用fit()函数,训练数据
    val model = pipeline.fit(training)

    // 可以将训练好的pipeline输出到磁盘
    model.write.overwrite().save("/tmp/spark-logistic-regression-model")

    // 也可以直接将为进行训练的pipeline写到文件
    pipeline.write.overwrite().save("/tmp/unfit-lr-model")

    // 然后加载到出来
    val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

    // 准备(id, text) 这个格式未打标签的数据进行测试
    val test = spark.createDataFrame(Seq(
      (4L, "spark i j k"),
      (5L, "l m n"),
      (6L, "spark hadoop spark"),
      (7L, "apache hadoop")
    )).toDF("id", "text")

    // 在测试集上进行预测
    model.transform(test)
      .select("id", "text", "probability", "prediction")
      .collect()
      .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
        println(s"($id, $text) --> prob=$prob, prediction=$prediction")
      }
    // $example off$

    spark.stop()
  }
}

五.总结

Pipeline Transformer 与 Pipeline Estimator 的构建大致就这些,本文采用简易数据测试不具代表性,后续将基于豆瓣电影评论实战,介绍如何自定义 Transformer 与 豆瓣影评情感分析实战。可以看到 Pipeline 的好处是将内部流程全部封装,用户对中间流程不感知,只需将数据处理为合适的格式即可直接调用并获得相应结果,而单独的 Transformer 与 Estimator 则更适合一步一步调试或获取中间结果,二者各有利弊,大家可以根据情况选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/21939.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringCloud微服务(一)——Consul服务注册中心

Consul服务注册中心 SpringCloud 中文官网:https://www.springcloud.cc/spring-cloud-consul.html Consul是一套开源的分布式服务发现和配置管理系统,Go语言开发。 Consul是一个服务网格(微服务间的 TCP/IP,负责服务之间的网络…

SharedPreferences存储

文章目录 前言 听说SharedPreferences存储技术快过时了,不过如果是单纯的使用的话,不费什么时间成本。 本文的Demo摘录自《第一行代码》。 一.什么是SharedPreferences SharedPreferences,一种通过使用键值对的方式来存储数据的技术。 二…

【深入浅出Spring6】第八期——面向切面编程 AOP

AOP(Aspect Oriented Programming)面向切面编程,属于面向对象编程的一种衍射,是一种编程思想或技术AOP的底层是由动态代理机制实现的 JDK动态代理CGLIB动态代理,自动识别并切换我们也可以通过配置属性指定就是用CGLIB …

【MySQL】六,sql_model的合理设置

宽松模式和严格模式 宽松模式 如果设置的是宽松模式,那么我们在插入数据的时候,即使是给了一个错误的数据,那么可能也不会报错。 举例:某张表的name字段为 char(10) ,插入数据的时候,如果name字段的数据长…

免费搜题系统

免费搜题系统 本平台优点: 多题库查题、独立后台、响应速度快、全网平台可查、功能最全! 1.想要给自己的公众号获得查题接口,只需要两步! 2.题库: 查题校园题库:查题校园题库后台(点击跳转&a…

跨模态神经搜索实践VCED 基于Streamlit实现前端页面设计和逻辑

1. Streamlit入门 1.1 Streamlit介绍 Streamlit是基于Python的Web应用程序框架,它可以使用Python代码轻松构建机器学习/数据科学相关的仪表板,其特点包括: 跨平台:支持Windows、macOS、Linux只需要掌握Python:不需要…

【时序】时间序列数据预处理

目录 1. 时间戳转换 2. 缺失值处理 3. 去噪 1)滚动平均值 2)傅里叶变换 4. 异常点检测 1)基于滚动统计的方法 2)孤立森林 3)K-means 聚类 为了分析预处理结果,我们后续使用 Kaggle 的 Air Passenge…

【Python】发布一个简单好用的日志记录器bestlog

需求 日志是非常重要的一个东西,我们往往习惯于在开发一个新项目的第一行代码时,就用 logging.info 代替 print,随时保持记录的好习惯,等代码上线以后也无需修改替换那些 print,直接开跑,有了完善的日志&a…

牛客刷题——Python入门总结

🤵‍♂️ 个人主页: 北极的三哈 个人主页 👨‍💻 作者简介:Python领域优质创作者。 📒 系列专栏:《Python入门学习》《牛客题库-Python篇》 🌐推荐《牛客网》——找工作神器|笔试题库|面试经…

【软考软件评测师】第三十章 操作系统(PV操作与死锁)

【软考软件评测师】第三十章 操作系统(PV操作与死锁) 第三十章 操作系统(PV操作与死锁)【软考软件评测师】第三十章 操作系统(PV操作与死锁)第一部分 知识点集锦1.PV操作1)P操作的定义2&#xf…

win11的文件属性默认显示全部,Windows11右键菜单修改为Win10模式的方法(手把手详细操作)

win11的文件属性默认显示全部,Windows11右键菜单修改为Win10模式的方法(手把手详细操作) 文章目录win11的文件属性默认显示全部,Windows11右键菜单修改为Win10模式的方法(手把手详细操作)Tips 1 先以管理员…

Source Map知多少?Golang手写SourceMap转换过程

文章目录一、问题背景二、Source Map 简介基本格式应用场景三、Source Map 的工作原理四、Source Map 的转换过程代码示例总结本文从原理的角度入手对 Source Map 进行了较为深入的分析,并从业务需要的角度出发,手动编写根据 Source Map 映射编码前后代码…

SpringBoot集成Mybatis项目实操

本文为《从零打造项目》系列第三篇文章,首发于个人网站。 《从零打造项目》系列文章 比MyBatis Generator更强大的代码生成器 SpringBoot项目基础设施搭建 前言 基于 orm-generate 项目可以实现项目模板代码,集成了三种 ORM 方式:Mybatis、M…

35m预应力简支梁桥毕业设计 课程设计-桥梁工程(计算书、8张CAD图)

35m预应力简支梁桥毕业设计 目 录 1、引言 1 2、桥型方案比选 2 2.1 桥梁设计原则 2 2.2方案一:25m预应力钢筋混凝土T梁桥 2 2.3方案二:25m预应力钢筋混凝土小箱梁 4 2.4桥墩方案比选 4 3、上部结构设计计算 5 3.1 设计资料及构造…

考研数据结构填空题整合

考研数据结构填空题整合 目录考研数据结构填空题整合一、ZYL组ZYL组一ZYL组二ZYL组三ZYL组四ZYL组五ZYL组六ZYL组七ZYL组八二、TJP组TJP组一TJP组二TJP组三三、LZH组LZH 组一LZH 组二LZH 组三LZH 组四LZH 组五LZH 组六LZH 组七四、LB组LB组一LB组二LB组三LB组四LB组五LB组六LB组…

FPGA实现精简版UDP通信,占资源很少但很稳定,提供2套工程源码

目录1.高端、中等和精简版UDP通信的选择2.精简版UDP通信实现方案3.工程1介绍及资源占用率和性能表现4.工程2介绍及资源占用率和性能表现5.上板调试验证6.福利:工程代码的获取1.高端、中等和精简版UDP通信的选择 FPGA实现UDP协议可难可易,具体根据项目需…

Python 函数转命令行界面库 -- Argsense CLI

argsense 是一个 python 命令行界面库, 是 click, fire, typer 之外的又一个选项. argsense 最大的特点是极低的侵入性设计和近乎零成本的上手难度, 如果你熟悉 python 函数是如何传参的 (这是大部分 python 初学者已经掌握的知识), 那么你就可以很快上手 argsense. 特性一览 …

大数据(9e)图解Flink窗口

文章目录1、代码模板1.1、pom.xml1.2、log4j.properties1.3、Java模板2、按键分区(Keyed)、非按键分区(Non-Keyed)2.1、Keyed2.2、Non-Keyed3、窗口的分类3.1、基于时间的窗口3.2、基于事件个数的窗口4、窗口函数5、示例代码5.1、…

TIA博途_水处理项目中开启累计运行时间最短的泵_程序示例

TIA博途_水处理项目中开启累计运行时间最短的泵_程序示例 需求: 有N台水泵,每个水泵统计累计运行时间。当满足条件时,根据设定开泵的数量,启动累计运行时间最短的对应数量的泵。故障切换时,也切换到运行时间最短的泵。 具体方法可参考以下内容: 如下图所示,打开TIA博途后…

【毕业设计】62-基于单片机的防酒驾\酒精浓度检测系统设计研究(原理图、源代码、仿真工程、低重复率参考设计、PPT)

【毕业设计】62-基于单片机的防酒驾\酒精浓度检测系统设计研究(原理图、源代码、仿真工程、低重复率参考设计、PPT)[toc] 资料下载链接 资料下载链接 资料链接:https://www.cirmall.com/circuit/33758/ 包含此题目毕业设计全套资料&#xf…