Spark 3.0 - 6.ML 自定义 Transformer 踩坑大全

news2024/11/15 19:25:46

目录

一.新征程开始 - 道阻且长

二.从源码入手 - 一探究竟

1.Tokenizer

2.UnaryTransformer

三.取源码精髓 - 照猫画虎

四.从实际出发 - 小试牛刀

五.扫重重障碍 - 补阙挂漏

1.Task not serializable

2.jsonEncode only supports string

3.Invisibility Error

4.Non-Writable stage

5.org.example.JiebaTokenizer.read()

六.慢慢人生路 - 学成归来


一.新征程开始 - 道阻且长

上文介绍豆瓣评论情感分析时,使用自定义 Transformer 实现了 JiebaTokenizer 完成了对 DataFrame 数据进行分词的需求,本文将基于 JiebaTokenizer 的实现细节详解如何自定义实现 Transformer 以及实战中可能遇到的坑。

二.从源码入手 - 一探究竟

自定义 Transformer 前,我们先看看 Spark ML 现有的 Transformer 是怎么实现的,正所谓俗话知己知彼百战不殆。

1.Tokenizer

Tokenizer 来自 org.apache.spark.ml.feature 官方库,负责根据空格将文本数据的分词:

可以看到 Tokenizer 继承 UnaryTransformer 实现了 Transform 方法,所以我们下面还得结合 UnaryTransformer 回看 Tokenizer 的方法。

2.UnaryTransformer

A.参数形式

这里和 Flink 的 ProcessFunction 很像,IN、OUT 代表输入输出类型, S <: T 代表 S必须是T的子类或者同类,这里实现时 extend UnaryTransformer,所以肯定满足第三个条件,我们只需定义输入输出类型即可:

Tokenizer 的开头我们便理解了,该函数接收 String 字符串,返回 Seq[String] 的分词。

B.SetInputCol、SetOutputCol

本质上,输入与输出的列名也是 Param 的一部分,因为 set 方法负责当前 Transformer 的所有参数配置,以上面 extends 的 HasInputCol 为例:

Transformer 内自定义参数需要使用 Param 类,定义参数所在类、名称与描述,但是该参数无法直接使用,其类型为 Param[T],如果想要在 transform 方法中使用,需要调用 $(Param[T]) 即可获取对应参数 T。以上文 JiebaTokenizer 为例,如果用户每次想要自定义 StopWords 并通过参数传入,只需在自定义函数内添加如下几行:

  // 定义参数
  final val stopwords = new Param[Set[String]](this, "StopWords", "停用词")

  // 参数设置
  def setStopWords(stopWords: Set[String]): this.type = set(stopwords, stopWords)

  // 获取参数
  def getStopWords: Set[String] = $(stopwords)

添加后即可像官方 API 一样通过有回调的 Set 方法依次配置 Transformer 参数:

C.createTransformFunc

将输入转换为输出,说白了就是一个基本的 map 函数,就像 Hive UDF 一样,一进一出。该方法为整个代码类的核心所在,决定了 transform 如何转化数据。

Tokenizer 的实现即将输入字符 _ 先转小写再分割,我们自定义 \t 分词可以采用如下方式,更加清晰:

  override protected def createTransformFunc: String => Array[String] = {
    parseContent
  }

  /**
    * \t 分词
    */
  private def parseContent(text: String): Array[String] = {
    text.split("\t")
  }

D.InputType 、OutputType

作用分别为验证输入类型与返回输出类型,前者主要用于输入数据的判断,如果数据类型不对则抛出异常警告⚠️开发者,后者返回输出类型主要用户后续 DataFrame Schem 类型判断。

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == StringType,
      s"Input type must be ${StringType.catalogString} type but got ${inputType.catalogString}.")
  }

  override protected def outputDataType: DataType = new ArrayType(StringType, true)

Tokenizer 中,输入类型为 String,所以需要见检查是否为 StringType,否则抛出异常,返回值类型为 Set[String],这里规范为统一数组类型 ArrayType(StringType)。

E.transformSchema

该函数官方 API 已帮我们实现,正常情况下无需复写,可以看到其中的逻辑,首先检查输入列对应 Schema,随后判断输出列是否存在,随后在原有 Schema 的基础上增加 output 列,这里通过 StructField 实现,指定该列的列名、类型与 nullable,如果你对输出类型的 Schema 有其他需求,也可以重写该函数,或者待返回后再修改 Schema。

F.transform

  override def transform(dataset: Dataset[_]): DataFrame = {
    val outputSchema = transformSchema(dataset.schema, logging = true)
    val transformUDF = udf(this.createTransformFunc)
    dataset.withColumn($(outputCol), transformUDF(dataset($(inputCol))),
      outputSchema($(outputCol)).metadata)
  }

transform 方法逻辑,官方已实现,主要步骤为修改原始 Schema 【根据 OutputType 在 Schema 末尾新增一列 StructField】,UDF 转换并通过 DF 的 withColumn 方法将 inputCol 的内容转化至 OutputCol。 

三.取源码精髓 - 照猫画虎

通过上面的源码分析,我们知道自定义 Transformer 拢共分三步:

第一步.定义输入输出类型

第二步.添加自定义参数,没有则忽略

第三步.添加自定义 UDF

下面基于上述步骤实现自定义 jieba 分词 Transformer:

import com.huaban.analysis.jieba.JiebaSegmenter
import org.apache.spark.annotation.Since
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.param.Param
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, MLReader, MLWriter}
import org.apache.spark.sql.types._

class JiebaTokenizer(override val uid: String)
  extends UnaryTransformer[String, Array[String], JiebaTokenizer] with DefaultParamsWritable with java.io.Serializable {

  lazy val jieba = new JiebaSegmenter()

  def this() = this(Identifiable.randomUID("JiebaTokenizer"))

  val inputPath = "./stopwords.txt"

  val stopWords = scala.io.Source.fromFile(inputPath)
    .getLines().toSet

  override protected def outputDataType: DataType = new ArrayType(StringType, true)

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == DataTypes.StringType,
      s"Input type must be string type but got $inputType."
    )
  }

  override protected def createTransformFunc: String => Array[String] = {
    parseContent
  }

  /**
    * Jieba 分词
    */
  private def parseContent(text: String): Array[String] = {
    if (text == null || text.isEmpty) {
      return Array.empty[String]
    }
    jieba.sentenceProcess(text).toArray().map(_.toString).filter(str => !stopWords.contains(str))
  }

}

object JiebaTokenizer extends DefaultParamsReadable[JiebaTokenizer] {

  override def load(path: String): JiebaTokenizer = {
    super.load(path)
  }

}

A.StopWords

分词一般需要初始化 StopWords,这里由于是本机,所以直接通过 io.source 读取,如果打包上集群,可以将 stopwords 放置在 resources 文件中并通过 Stream 读取。

B.JiebaSegmenter

初始化 JiebaSegmenter,后续 createTransformFunc 将基于该类进行语义分割,实现分词。

这里主要分析 UDF 函数,关于输入输出类型的函数,照搬源码并修改类型即可,非常方便。👍

四.从实际出发 - 小试牛刀

下面我们在实战中使用 JiebaTokenier Transform 看一下效果:

    val jiebaTokenizer = new JiebaTokenizer()
      .setInputCol("comment")
      .setOutputCol("output")
    jiebaTokenizer.transform(trainData)

    jiebaTokenizer.transform(trainData).select("movie", "comment", "output").show(10)

comment 为豆瓣影评,通过分词+停用词过滤得到输出分词数组。

五.扫重重障碍 - 补阙挂漏

上面的代码是博主经过一晚上测试得到的最终版本,下面罗列下使用期间可能遇到的坑。

1.Task not serializable

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

对于常规 UDF 逻辑,例如 string.split($sep) 这些不会出错,该错误原因是因为 Transformer 中引入的第三方类,例如 JiebaSegmenter,如果第三方类位继承序列化则会报错。

 解决:

此时在自定义 Transformer 类后 extends java.io.Serializable 是没有用的,需要使用 lazy 关键字修饰对应类,使其延迟初始化,从而可以在 Spark 序列化时通过运行:

  lazy val jieba = new JiebaSegmenter()

2.jsonEncode only supports string

ERROR Instrumentation: java.lang.UnsupportedOperationException: The default jsonEncode only supports string, vector and matrix. org.apache.spark.ml.param.Param must override jsonEncode for scala.collection.immutable.HashSet$HashTrieSet.

为了使用官方的 Api 设置参数,我们继承 Param 实现了 StopWords: Set[String] 的 set 方法:

  final val stopwords = new Param[Set[String]](this, "StopWords", "停用词")
//  /** @group setParam */
  def setStopWords(stopWords: Set[String]): this.type = set(stopwords, stopWords)
  def getStopWords: Set[String] = $(stopwords)

通过官方 API 配置:

    val jiebaTokenizer = new JiebaTokenizer()
      .setInputCol("comment")
      .setOutputCol("output")
      .setStopWords(stopWords)

这样的写法虽然比较优雅,但是 PipelineModel 或者 Model 在存储时,JsonENcode 要求 Param 类型必须为 String、vector 或者 matrix,这里我们的停用词为 Hashset 形式,故在保存模型时无法存储参数,导致 model.write.save 失败,所以这里建议使用 Param 参数传入常规的 String、Int 即可。

 解决:

  val inputPath = "./stopwords.txt"

  val stopWords = scala.io.Source.fromFile(inputPath)
    .getLines().toSet

直接读取本地文件 [Local模式] 或者读取 resource 文件 [集群模式] 在类内初始化,参数尽量传 String、Int,最简单但也是最有效的办法。

3.Invisibility Error

博主还尝试了另外一种参数设置方法:

  var stopWords = Set.empty[String]

  def setStopWords(stopWords: Set[String]): this.type = {
    this.stopWords = stopWords
    this
  }

添加自定义回调函数,并覆盖 stopWords,如此修改后,模型保存无误。 但是存在一个可见性的问题,在模型 Pipeline-Transform 阶段,由于我们手动 setStopWords 所以该 stopwords 可以生效起到过滤作用,但是在 Pipeline-Estimator 阶段,由于 stopwords 不是内置 Param,所以 model.write.save 时无法保存该 stopwods,待预测时其不会自动调用 setStopWords 方法,所以预测阶段其实停用词一直为空 set,起不到过滤作用!!!大家可以打印验证,这个问题会导致预测期间出现数据干扰项影响模型结果。

    val jiebaTokenizer = new JiebaTokenizer()
      .setInputCol("comment")
      .setOutputCol("output")
      .setStopWords(stopWords)

解决:

只在 Transform 阶段使用不保存模型的场景可以使用,或者使用上面的解决方案,在类内加载。

4.Non-Writable stage

Exception in thread "main" java.lang.UnsupportedOperationException: Pipeline write will fail on this Pipeline because it contains a stage which does not implement Writable. Non-Writable stage: JiebaTokenizer_216430a184a7 of type class org.example.JiebaTokenizer

模型参数存储异常,自定义 Transformer 类如果不支持 Writable 则该 Transform 只能应用于 Transform 阶段,如果在 Pipeline 中添加未支持 Writable 的自定义类,则 Pipeline 无法保存相关类参数,从而无法 save。

 解决:

继承 org.apache.spark.ml.util.DefaultParamsWritable 类即可。

5.org.example.JiebaTokenizer.read()

ERROR Instrumentation: java.lang.NoSuchMethodException: org.example.JiebaTokenizer.read()

模型保存正常,但是 PipelineModel.load 时出错,这是因为没有在自定义 Class 内初始化对应 object 并添加 read 方法,导致 Pipeline 模型读取时无法读取该类。

解决:

自定义类内增加 Object 并 override load 方法,方法内只需 super.load 即可。这样模型的存储和加载就都没有问题了。

object JiebaTokenizer extends DefaultParamsReadable[JiebaTokenizer] {

  override def load(path: String): JiebaTokenizer = {
    super.load(path)
  }

}

六.慢慢人生路 - 学成归来

自定义 Transformer 大致就这么多内容,大家需要注意:

- 自定义函数内是否有不可序列化的类

- 是否需要添加自定义参数并随模型保存

- Transfomrer 用于 Pipeline 哪个阶段

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/30770.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

补充(三)完善保密性三种定义的通俗表述及等价性的证明

目录 DEFINITION 2.3 完美保密加密方案的定义 LEMMA 2.5 完美保密方案的等价定义(一) LEMMA 2.7 完美保密方案的等价定义(二) 三个等价定义的通俗描述 等价性的证明&#xff08;手写过程&#xff09; DEFINITION 2.3 完美保密加密方案的定义 一个在明文空间M上的加密方案…

web前端期末大作业 html+css家乡旅游主题网页设计 湖北武汉家乡介绍网页设计实例

家乡旅游景点网页作业制作 网页代码运用了DIV盒子的使用方法&#xff0c;如盒子的嵌套、浮动、margin、border、background等属性的使用&#xff0c;外部大盒子设定居中&#xff0c;内部左中右布局&#xff0c;下方横向浮动排列&#xff0c;大学学习的前端知识点和布局方式都有…

外卖项目05---套餐管理业务开发

套餐&#xff1a;一组菜品的集合 点击之后就会弹出下面的界面&#xff1a; 上面是后台的管理操作&#xff0c;下面是处理完成后在用户端展示的界面效果&#xff1a; 目录 一、新增套餐 70 1.1需求分析 70 1.2数据模型 70 ​编辑1.3新增套餐---代码开发---准备工作&梳理…

vtk.js滚动切片

关于滚动切片&#xff0c;官方有一个很好的例子,如下图&#xff1a; 关键代码 const mapper vtkImageMapper.newInstance(); mapper.setInputConnection(rtSource.getOutputPort()); mapper.setSliceAtFocalPoint(true);//这一行切片的光照焦点必须要设置为true&#xff0c;否…

CSDN周赛第十期总结

文章目录竞赛概述题解熊孩子摆拜访问题目描述数据范围样例输入样例输出代码走楼梯题目描述输入描述输出描述样例输入 1样例输出 1代码括号上色输入描述&#xff1a;输出描述&#xff1a;输入样例&#xff1a;输出样例&#xff1a;代码喜水青蛙题目描述输入描述&#xff1a;输出…

spark3.0.2搭建教程

spark3.0.2搭建教程 spark3.0.2安装教程 文章目录spark3.0.2安装教程一、前期准备二、spark搭建&#xff08;一&#xff09;搭建1、将spark上传到虚拟机上2、解压安装包&#xff08;二&#xff09;、standalone&#xff08;独立部署&#xff09;模型1、修改配置文件&#xff0…

学生HTML静态网页基础水平制作DIV+CSS+JavaScript技术制作美食网页——美食城6页面

&#x1f468;‍&#x1f393;静态网站的编写主要是用HTML DIVCSS JS等来完成页面的排版设计&#x1f469;‍&#x1f393;,常用的网页设计软件有Dreamweaver、EditPlus、HBuilderX、VScode 、Webstorm、Animate等等&#xff0c;用的最多的还是DW&#xff0c;当然不同软件写出的…

高等数学(第七版)同济大学 习题10-2(前10题) 个人解答

高等数学&#xff08;第七版&#xff09;同济大学 习题10-2&#xff08;前10题&#xff09; 函数作图软件&#xff1a;Mathematica 1.计算下列二重积分:\begin{aligned}&1. \ 计算下列二重积分:&\end{aligned}​1. 计算下列二重积分:​​ (1)∬D(x2y2)dσ&#xff0c;…

【构建ML驱动的应用程序】第 7 章 :使用分类器编写推荐

&#x1f50e;大家好&#xff0c;我是Sonhhxg_柒&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流&#x1f50e; &#x1f4dd;个人主页&#xff0d;Sonhhxg_柒的博客_CSDN博客 &#x1f4c3; &#x1f381;欢迎各位→点赞…

动态内存管理❀C

目录❀动态内存管理的意义❀动态内存管理函数malloc - 申请空间free - 释放空间calloc - 申请空间realloc - 调整空间大小❀常见的动态内存错误对NULL指针的解引用操作 - err对动态开辟空间的越界访问 - err对非动态开辟内存使用free释放 - err使用free释放一块动态开辟内存的一…

Pycharm安装配置Pyside6

PySide6是在Python环境下的一套Qt6 API库。使用PySide6可以轻松创建基于Qt6的GUI程序&#xff1b;PySide6由Qt官方维护。 1. Pyside6的安装&#xff1a; 直接安装在原python上面&#xff0c;在cmd里运行&#xff1a;(网速慢使用阿里源源) pip3 install Pyside6 -i https://p…

网络安全——逻辑漏洞之越权漏洞

作者名&#xff1a;Demo不是emo 主页面链接&#xff1a;主页传送门创作初心&#xff1a;舞台再大&#xff0c;你不上台&#xff0c;永远是观众&#xff0c;没人会关心你努不努力&#xff0c;摔的痛不痛&#xff0c;他们只会看你最后站在什么位置&#xff0c;然后羡慕或鄙夷座右…

高通导航器软件开发包使用指南(8)

高通导航器软件开发包使用指南&#xff08;8&#xff09;7 电子速度控制器7.1 ESC固件更新7.1.1相关参数说明7.1.3在初始化期间启用更新7.1.4固件配置7.1.5固件从版本7.1.6更新程序7 电子速度控制器 7.1 ESC固件更新 高通公司Navigator支持ESC固件更新&#xff0c;无需连接或…

2022亚太C题详细思路

2022年亚太今日已经正式开赛&#xff0c;为了帮助大家更好的选题建模&#xff0c;这里首先对ABC三道题目进行浅要评析&#xff0c;以方便大家更好的择题。同时相关资料也会后续进行补充。预计明日公布各题统计选题人数以及较为完善的资料。今天作为第一天重要的是择好题&#x…

Tableau阈值设置及其使用

阈值又叫临界值&#xff0c;是指一个效应能够产生的最低值或最高值。 ——百度百科 文章目录前言一、案例中阈值的使用背景介绍二、设置阈值参数三、颜色区分四、可筛选设置总结前言 介绍Tableau阈值的设置&#xff0c;供各位小伙伴参考。本文案例来源于Tableau自带示例工作薄…

mysql 数据备份与恢复使用详解

一、前言 对一个运行中的线上系统来说&#xff0c;定期对数据库进行备份是非常重要的&#xff0c;备份不仅可以确保数据的局部完整性&#xff0c;一定程度上也为数据安全性提供了保障&#xff0c;设想如果某种极端的场景下&#xff0c;比如磁盘损坏导致某个时间段数据丢失&…

冒泡排序法

目录 一、问题 二、冒泡排序的思想 三、举例 四、算法分析 五、代码实现 一、问题 现有一个整型数组&#xff08;乱序&#xff09;&#xff0c;并且写一个函数&#xff08;Sort&#xff09;对数组进行排序&#xff0c;顺序要求升序。 二、冒泡排序的思想 两两相邻的元素…

【100个 Unity实用技能】 | Unity自定义脚本的初始模版

Unity 小科普 老规矩&#xff0c;先介绍一下 Unity 的科普小知识&#xff1a; Unity是 实时3D互动内容创作和运营平台 。包括游戏开发、美术、建筑、汽车设计、影视在内的所有创作者&#xff0c;借助 Unity 将创意变成现实。Unity 平台提供一整套完善的软件解决方案&#xff…

java每日一练(2)

java每日一练(2) 单选部分 1.A 派生出子类 B &#xff0c; B 派生出子类 C &#xff0c;并且在 java 源代码有如下声明&#xff1a; A a0new A();A a1new B();A a2new C(); 问以下哪个说法是正确的&#xff08;&#xff09; A 只有第一行能通过编译 B 第1、2行能通过编译&…