【Apache Spark 】第 11 章使用 Apache Spark 管理、部署和扩展机器学习管道

news2025/1/13 6:03:42

 🔎大家好,我是Sonhhxg_柒,希望你看完之后,能对你有所帮助,不足请指正!共同学习交流🔎

📝个人主页-Sonhhxg_柒的博客_CSDN博客 📃

🎁欢迎各位→点赞👍 + 收藏⭐️ + 留言📝​

📣系列专栏 - 机器学习【ML】 自然语言处理【NLP】  深度学习【DL】

 🖍foreword

✔说明⇢本人讲解主要包括Python、机器学习(ML)、深度学习(DL)、自然语言处理(NLP)等内容。

如果你对这个系列感兴趣的话,可以关注订阅哟👋

文章目录

模型管理

MLflow

Tracking(追踪)

使用 MLlib 的模型部署选项

Batch

Streaming

用于实时推理的模型导出模式

将 Spark 用于非 MLlib 模型

Pandas UDF

Spark 用于分布式超参数调优

Joblib

Hyperopt

概括


在上一章中,我们介绍了如何使用 MLlib 构建机器学习管道。本章将重点介绍如何管理和部署您训练的模型。在本章结束时,您将能够使用 MLflow 来跟踪、重现和部署您的 MLlib 模型,讨论各种模型部署场景的困难和权衡,并构建可扩展的机器学习解决方案。但在我们讨论部署模型之前,让我们先讨论一些模型管理的最佳实践,以使您的模型为部署做好准备。

模型管理

在部署机器学习模型之前,您应该确保可以重现和跟踪模型的性能。对我们来说,机器学习解决方案的端到端可重现性意味着我们需要能够重现生成模型的代码、训练中使用的环境、训练数据以及模型本身。每个数据科学家都喜欢提醒您设置种子,以便您可以重现您的实验(例如,对于训练/测试拆分,当使用具有固有随机性的模型(如随机森林)时)。然而,除了设置种子之外,还有更多方面有助于重现性,其中一些方面要微妙得多。这里有一些例子:

库版本控制

当数据科学家把他们的代码交给你时,他们可能会也可能不会提到依赖库。虽然您可以通过查看错误消息来确定需要哪些库,但您无法确定它们使用了哪些库版本,因此您可能会安装最新版本。但是,如果他们的代码是基于以前版本的库构建的,这可能会利用与您安装的版本不同的某些默认行为,那么使用最新版本可能会导致代码中断或结果不同(例如,考虑XGBoost如何改变它在 v0.90 中处理缺失值的方式)。

数据演变

假设您在 2020 年 6 月 1 日构建了一个模型,并跟踪了所有超参数、库等。然后您尝试在 2020 年 7 月 1 日重现相同的模型,但由于基础数据已中断,因此管道中断或结果不同如果有人在初始构建后添加了额外的列或增加了一个数量级的数据,则可能会发生这种情况。

执行顺序

如果数据科学家将他们的代码交给你,你应该能够自上而下地运行它而不会出错。然而,数据科学家因无序运行或多次运行同一个有状态单元而臭名昭著,这使得他们的结果很难重现。(他们还可能签入具有与用于训练最终模型的超参数不同的超参数的代码副本!)

并行操作

为了最大化吞吐量,GPU 将并行运行许多操作。但是,执行顺序并不总是得到保证,这可能导致不确定的输出。tf.reduce_sum()这是在聚合浮点数(精度有限)等函数时的一个已知问题:添加它们的顺序可能会产生稍微不同的结果,这可能会在多次迭代中加剧。

无法重现您的实验通常会阻碍业务部门采用您的模型或将其投入生产。虽然您可以构建自己的内部工具来跟踪您的模型、数据、依赖版本等,但它们可能会变得过时、脆弱,并且需要大量的开发工作来维护。同样重要的是拥有用于管理模型的行业标准,以便可以轻松地与合作伙伴共享它们。有开源和专有工具可以帮助我们通过抽象出许多这些常见的困难来重现我们的机器学习实验。本节将重点介绍 MLflow,因为它与当前可用的开源模型管理工具中的 MLlib 具有最紧密的集成。

MLflow

MLflow是一个开源平台,可帮助开发人员复制和共享实验、管理模型等等。它提供 Python、R 和 Java/Scala 接口,以及 REST API. 如图 11-1所示,MLflow 有四个主要组件:

Tracking

提供 API 来记录参数、指标、代码版本、模型和工件,例如绘图和文本。

Projects

一种标准化格式,用于打包您的数据科学项目及其依赖项以在其他平台上运行。它可以帮助您管理模型训练过程。

Models

一种标准化格式,用于打包模型以部署到不同的执行环境。它为加载和应用模型提供了一致的 API,无论用于构建模型的算法或库如何。

Registry

用于跟踪模型沿袭、模型版本、阶段转换和注释的存储库。

图 11-1。MLflow 组件

让我们跟踪我们在第 10 章中运行的 MLlib 模型实验的可重复性。然后,当我们讨论模型部署时,我们将看到 MLflow 的其他组件如何发挥作用。要开始使用 MLflow,只需pip install mlflow在本地主机上运行。

Tracking(追踪)

MLflow Tracking 是一个日志 API,它与实际进行训练的库和环境无关。它是围绕运行的概念组织的,运行是数据科学代码的执行。运行被聚合到实验中,因此许多运行可以成为给定实验的一部分。

MLflow 跟踪服务器可以托管许多实验。您可以使用笔记本、本地应用程序或云作业登录到跟踪服务器,如图 11-2所示。

图 11-2。MLflow 跟踪服务器

让我们检查一些可以记录到跟踪服务器的内容:

Parameters

代码的键/值输入——例如,像随机森林num_treesmax_depth在随机森林中的超参数

Metrics

数值(可以随时间更新)——例如,RMSE 或精度值

Artifacts

文件、数据和模型——例如matplotlib图像或 Parquet 文件

Metadata

有关运行的信息,例如执行运行的源代码或代码的版本(例如,代码版本的 Git 提交哈希字符串)

Models

您训练的模型

默认情况下,跟踪服务器将所有内容记录到文件系统中,但您可以指定一个数据库以加快查询速度,例如参数和指标。让我们将 MLflow 跟踪添加到第 10 章中的随机森林代码中:

# In Python
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

filePath = """/databricks-datasets/learning-spark-v2/sf-airbnb/
sf-airbnb-clean.parquet"""
airbnbDF = spark.read.parquet(filePath)
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed=42)

categoricalCols = [field for (field, dataType) in trainDF.dtypes 
                   if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols, 
                              outputCols=indexOutputCols, 
                              handleInvalid="skip")

numericCols = [field for (field, dataType) in trainDF.dtypes 
               if ((dataType == "double") & (field != "price"))]
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, 
                               outputCol="features")

rf = RandomForestRegressor(labelCol="price", maxBins=40, maxDepth=5, 
                           numTrees=100, seed=42)

pipeline = Pipeline(stages=[stringIndexer, vecAssembler, rf])

要开始使用 MLflow 进行日志记录,您需要使用mlflow.start_run()mlflow.end_run()本章中的示例将使用with子句在块的末尾自动结束运行,而不是显式调用with

# In Python 
import mlflow
import mlflow.spark
import pandas as pd

with mlflow.start_run(run_name="random-forest") as run:
  # Log params: num_trees and max_depth
  mlflow.log_param("num_trees", rf.getNumTrees())
  mlflow.log_param("max_depth", rf.getMaxDepth())
 
  # Log model
  pipelineModel = pipeline.fit(trainDF)
  mlflow.spark.log_model(pipelineModel, "model")

  # Log metrics: RMSE and R2
  predDF = pipelineModel.transform(testDF)
  regressionEvaluator = RegressionEvaluator(predictionCol="prediction", 
                                            labelCol="price")
  rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
  r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
  mlflow.log_metrics({"rmse": rmse, "r2": r2})

  # Log artifact: feature importance scores
  rfModel = pipelineModel.stages[-1]
  pandasDF = (pd.DataFrame(list(zip(vecAssembler.getInputCols(), 
                                    rfModel.featureImportances)), 
                           columns=["feature", "importance"])
              .sort_values(by="importance", ascending=False))

  # First write to local filesystem, then tell MLflow where to find that file
  pandasDF.to_csv("feature-importance.csv", index=False)
  mlflow.log_artifact("feature-importance.csv")

让我们检查一下 MLflow UI,您可以通过mlflow ui在终端中运行并导航到http://localhost:5000/来访问它。图 11-3显示了 UI 的屏幕截图。

图 11-3。MLflow 用户界面

UI 存储给定实验的所有运行。您可以搜索所有运行,筛选符合特定条件的运行,并排比较运行等。如果您愿意,还可以将内容导出为 CSV 文件以在本地进行分析。在名为"random-forest". 您应该会看到如图 11-4 所示的屏幕。

图 11-4。随机森林运行

您会注意到它会跟踪用于此 MLflow 运行的源代码,并存储所有相应的参数、指标等。您可以在自由文本中添加有关此运行的注释以及标签。运行完成后,您无法修改参数或指标。

您还可以使用MlflowClientREST API 或 REST API 查询跟踪服务器:

# In Python
from mlflow.tracking import MlflowClient

client = MlflowClient()
runs = client.search_runs(run.info.experiment_id, 
                          order_by=["attributes.start_time desc"], 
                          max_results=1)

run_id = runs[0].info.run_id
runs[0].data.metrics

这会产生以下输出:

{'r2':0.22794251914574226,'rmse':211.5096898777315}

我们将此代码作为MLflow 项目托管在本书的GitHubmax_depth存储库中,因此您可以尝试使用和的不同超参数值运行它num_trees。MLflow 项目中的 YAML 文件指定了库依赖项,因此此代码可以在其他环境中运行:

# In Python
mlflow.run(
  "https://github.com/databricks/LearningSparkV2/#mlflow-project-example", 
  parameters={"max_depth": 5, "num_trees": 100})

# Or on the command line
mlflow run https://github.com/databricks/LearningSparkV2/#mlflow-project-example
-P max_depth=5 -P num_trees=100

现在您已经跟踪并重现了您的实验,让我们讨论一下可用于您的 MLlib 模型的各种部署选项.

使用 MLlib 的模型部署选项

部署机器学习模型对于每个组织和用例来说都意味着不同的东西。业务限制将对延迟、吞吐量、成本等提出不同的要求,这决定了哪种模型部署模式适合手头的任务——批处理、流式传输、实时或移动/嵌入式。在移动/嵌入式系统上部署模型超出了本书的范围,因此我们将主要关注其他选项。表 11-1显示了用于生成预测的这三个部署选项的吞吐量和延迟权衡。我们关心并发请求的数量和这些请求的大小,最终的解决方案看起来会大不相同。

表 11-1。批处理、流式传输和实时比较
ThroughputLatency示例应用程序
Batch高的高(几小时到几天)客户流失预测
Streaming中等的中(秒到分钟)动态定价
Real-time低的低(毫秒)在线广告竞价

批处理会定期生成预测并将结果写入持久存储以供其他地方使用。它通常是最便宜和最简单的部署选项,因为您只需要在计划运行期间支付计算费用。每个数据点的批处理效率要高得多,因为在所有预测中摊销时累积的开销更少。Spark 的情况尤其如此,因为驱动程序和执行程序之间来回通信的开销——你不会希望一次预测一个数据点!然而,它的主要缺点是延迟,因为它通常安排在几个小时或几天的时间内来生成下一批预测。

流式传输在吞吐量和延迟之间提供了很好的权衡。您将不断地对微批量数据进行预测,并在几秒钟到几分钟内得到您的预测。如果您使用的是结构化流式处理,几乎所有代码看起来都与批处理用例相同,因此可以轻松地在这两个选项之间来回切换。使用流式传输时,您必须为用于持续保持正常运行和运行的虚拟机或计算资源付费,并确保您已正确配置流以容错,并在传入数据出现峰值时提供缓冲。

实时部署优先考虑延迟而不是吞吐量,并在几毫秒内生成预测。您的基础架构将需要支持负载平衡,并且能够在需求激增的情况下扩展到许多并发请求(例如,对于假期前后的在线零售商)。有时当人们说“实时部署”时,他们的意思是实时提取预先计算的预测,但这里我们指的是生成实时模型预测。实时部署是 Spark 无法满足延迟要求的唯一选项,因此要使用它,您需要将模型导出到 Spark 之外。例如,如果您打算使用 REST 端点进行实时模型推理(例如,在 50 毫秒内计算预测),则 MLlib 不满足此应用程序所需的延迟要求,如图 11-5所示。您将需要从 Spark 中进行功能准备和建模,这可能既费时又困难。

图 11-5。MLlib 的部署选项

在开始建模过程之前,您需要定义模型部署要求。MLlib 和 Spark 只是您工具箱中的几个工具,您需要了解应该在何时何地应用它们。本节的其余部分将更深入地讨论 MLlib 的部署选项,然后我们将考虑 Spark 用于非 MLlib 模型的部署选项。

Batch

批量部署代表了部署机器学习模型的大多数用例,这可以说是最容易实现的选项。您将运行常规作业来生成预测,并将结果保存到表、数据库、数据湖等以供下游使用。事实上,你已经在第 10 章看到了如何使用 MLlib 生成批量预测。MLlibmodel.transform()会将模型并行应用于 DataFrame 的所有分区:

# In Python
# Load saved model with MLflow
import mlflow.spark
pipelineModel = mlflow.spark.load_model(f"runs:/{run_id}/model")

# Generate predictions
inputDF = spark.read.parquet("/databricks-datasets/learning-spark-v2/
  sf-airbnb/sf-airbnb-clean.parquet")

predDF = pipelineModel.transform(inputDF)

批量部署时要记住的几件事是:

您将多久生成一次预测?

延迟和吞吐量之间存在权衡。您将获得更高的吞吐量,将许多预测批处理在一起,但是接收任何单个预测所需的时间会更长,从而延迟您对这些预测采取行动的能力。

您多久重新训练一次模型?

与 sklearn 或 TensorFlow 等库不同,MLlib 不支持在线更新或热启动。如果您想重新训练模型以包含最新数据,则必须从头开始重新训练整个模型,而不是利用现有参数。在重新训练的频率方面,有些人会设置一个固定的工作来重新训练模型(例如,每月一次),而另一些人会主动监控模型漂移以确定何时需要重新训练。

您将如何对模型进行版本化?

您可以使用MLflow 模型注册表来跟踪您正在使用的模型,并控制它们如何转换到/从暂存、生产和存档。您可以在图 11-6中看到模型注册表的屏幕截图。您也可以将模型注册表与其他部署选项一起使用。

图 11-6。MLflow 模型注册表

除了使用 MLflow UI 来管理您的模型之外,您还可以通过编程方式管理它们。例如,一旦您注册了生产模型,它就有一个一致的 URI,您可以使用它来检索最新版本:

# Retrieve latest production model
model_production_uri = f"models:/{model_name}/production"
model_production = mlflow.spark.load_model(model_production_uri)

Streaming

结构化流式处理无需等待每小时或每晚的工作来处理数据并生成预测,而是可以持续对传入数据执行推理。虽然这种方法比批处理解决方案成本更高,因为您必须不断地为计算时间付费(并获得较低的吞吐量),但您可以获得更频繁地生成预测的额外好处,以便您可以更快地对它们采取行动。流解决方案通常比批处理解决方案更难维护和监控,但它们提供更低的延迟。

使用 Spark,将批量预测转换为流预测非常容易,而且几乎所有代码都是相同的。唯一不同的是,当你读入数据时,你需要使用spark.readStream()而不是spark.read()改变数据的来源。在下面的示例中,我们将通过在 Parquet 文件目录中流式传输来模拟读取流式数据。你会注意到我们正在指定一个schema即使我们正在使用 Parquet 文件。这是因为我们在处理流数据时需要先验地定义模式。在这个例子中,我们将使用在前一章的 Airbnb 数据集上训练的随机森林模型来执行这些流预测。我们将使用 MLflow 加载保存的模型。我们已将源文件划分为 100 个小型 Parquet 文件,因此您可以看到输出在每个触发时间间隔发生变化:

# In Python
# Load saved model with MLflow
pipelineModel = mlflow.spark.load_model(f"runs:/{run_id}/model")

# Set up simulated streaming data
repartitionedPath = "/databricks-datasets/learning-spark-v2/sf-airbnb/
  sf-airbnb-clean-100p.parquet"
schema = spark.read.parquet(repartitionedPath).schema

streamingData = (spark
                 .readStream
                 .schema(schema) # Can set the schema this way
                 .option("maxFilesPerTrigger", 1)
                 .parquet(repartitionedPath))

# Generate predictions
streamPred = pipelineModel.transform(streamingData)

生成这些预测后,您可以将它们写出到任何目标位置以供以后检索(有关结构化流式处理技巧,请参阅第 8 章)。如您所见,批处理和流处理场景之间的代码几乎没有变化,这使得 MLlib 成为两者的绝佳解决方案。但是,根据您的任务的延迟需求,MLlib 可能不是最佳选择。使用 Spark,在生成查询计划以及在驱动程序和工作程序之间传达任务和结果时会产生大量开销。因此,如果您需要真正的低延迟预测,则需要将模型导出到 Spark 之外。

近实时

如果您的用例需要数百毫秒到几秒的预测,您可以构建一个使用 MLlib 生成预测的预测服务器。虽然这不是 Spark 的理想用例,因为您正在处理非常少量的数据,但与流式处理或批处理解决方案相比,您将获得更低的延迟。

用于实时推理的模型导出模式

有些领域需要实时推理,包括欺诈检测、广告推荐等。虽然使用少量记录进行预测可能会实现实时推理所需的低延迟,但您将需要应对负载平衡(处理许多并发请求)以及延迟关键任务中的地理位置。有一些流行的托管解决方案,例如AWS SageMaker和Azure ML,它们提供了低延迟的模型服务解决方案。在本节中,我们将向您展示如何导出您的 MLlib 模型,以便将它们部署到这些服务中。

将模型导出到 Spark 的一种方法是在 Python、C 等中本地重新实现模型。虽然提取模型的系数似乎很简单,但导出所有特征工程和预处理步骤以及它们(OneHotEncoder、、VectorAssembler等) .) 很快就会变得很麻烦并且很容易出错。有一些开源库,例如 MLeap 和ONNX,可以帮助您自动导出受支持的 MLlib 模型子集,以消除它们对 Spark 的依赖。然而,在撰写本文时,开发 MLeap 的公司不再支持它。MLeap 也不支持 Scala 2.12/Spark 3.0。

另一方面,ONNX(开放神经网络交换)已成为机器学习互操作性的事实上的开放标准。你们中的一些人可能还记得其他 ML 互操作性格式,例如 PMML(预测模型标记语言),但这些格式从未像现在的 ONNX 那样获得相同的牵引力。ONNX 作为一种允许开发人员在库和语言之间轻松切换的工具在深度学习社区中非常流行,并且在撰写本文时它对 MLlib 具有实验性支持。

除了导出 MLlib 模型外,还有其他与 Spark 集成的第三方库,便于在实时场景中部署,例如XGBoost和 H2O.ai 的Sparkling Water(其名称来源于 H2O 和 Spark 的组合) .

XGBoost 是Kaggle 结构化数据问题竞赛中最成功的算法之一,它是数据科学家中非常受欢迎的库。虽然 XGBoost 在技术上不是 MLlib 的一部分,但XGBoost4J-Spark 库允许您将分布式 XGBoost 集成到您的 MLlib 管道中。XGBoost 的一个好处是易于部署:训练 MLlib 管道后,您可以提取 XGBoost 模型并将其保存为非 Spark 模型以便在 Python 中提供服务,如下所示:

// In Scala
val xgboostModel = 
  xgboostPipelineModel.stages.last.asInstanceOf[XGBoostRegressionModel]
xgboostModel.nativeBooster.saveModel(nativeModelPath)
# In Python
import xgboost as xgb
bst = xgb.Booster({'nthread': 4})
bst.load_model("xgboost_native_model")

笔记

在撰写本文时,分布式 XGBoost API 仅在 Java/Scala 中可用。本书的GitHub 存储库中包含了一个完整的示例。

现在您已经了解了导出 MLlib 模型以用于实时服务环境的不同方法,让我们讨论如何将 Spark 用于非 MLlib 模型.

将 Spark 用于非 MLlib 模型

如前所述,MLlib 并不总是满足您的机器学习需求的最佳解决方案。它可能无法满足超低延迟推理要求或内置支持您想要使用的算法。对于这些情况,您仍然可以使用 Spark,但不能使用 MLlib。在本节中,我们将讨论如何使用 Spark 执行使用 Pandas UDF 的单节点模型的分布式推理、执行超参数调整和缩放特征工程。

Pandas UDF

虽然 MLlib 非常适合模型的分布式训练,但您不仅限于使用 MLlib 通过 Spark 进行批量或流式预测 - 您可以创建自定义函数来大规模应用预训练模型,称为用户定义函数(UDF,涵盖在第 5 章中)。一个常见的用例是在单台机器上构建 scikit-learn 或 TensorFlow 模型,可能是在数据的子集上,但使用 Spark 对整个数据集执行分布式推理。

如果您在 Python 中定义自己的 UDF 以将模型应用于 DataFrame 的每条记录,请选择pandas UDF以优化序列化和反序列化,如第 5 章所述。但是,如果您的模型非常大,那么 Pandas UDF 在同一 Python 工作进程中为每个批次重复加载相同模型的开销会很高。在 Spark 3.0 中,Pandas UDF 可以接受pandas.Seriesor的迭代器,pandas.DataFrame因此您可以只加载一次模型,而不是为迭代器中的每个系列加载它。有关带有 Pandas UDF 的 Apache Spark 3.0 中的新功能的更多详细信息,请参阅第 12 章。

笔记

如果工作人员在第一次加载模型权重后对其进行缓存,则后续调用相同 UDF 并加载相同模型的速度将显着加快。

在以下示例中,我们将使用mapInPandas()Spark 3.0 中引入的 将scikit-learn模型应用于我们的 Airbnb 数据集。mapInPandas()将 的迭代器pandas.DataFrame作为输入,并输出 的另一个迭代器pandas.DataFrame。如果您的模型需要所有列作为输入,那么它很灵活且易于使用,但它需要对整个 DataFrame 进行序列化/反序列化(因为它被传递到其输入)。您可以pandas.DataFrame使用spark.sql.execution.arrow.maxRecordsPerBatch配置控制每个的大小。生成模型的完整代码副本可在本书的GitHub存储库中找到,但在这里我们将只专注于scikit-learn从 MLflow 加载保存的模型并将其应用到我们的 Spark DataFrame:

# In Python
import mlflow.sklearn
import pandas as pd

def predict(iterator):
  model_path = f"runs:/{run_id}/random-forest-model"
  model = mlflow.sklearn.load_model(model_path) # Load model
  for features in iterator:
    yield pd.DataFrame(model.predict(features))
    
df.mapInPandas(predict, "prediction double").show(3)

+-----------------+
|       prediction|
+-----------------+
| 90.4355866254844|
|255.3459534312323|
| 499.625544914651|
+-----------------+

除了使用 Pandas UDF 大规模应用模型外,您还可以使用它们来并行化构建多个模型的过程。例如,您可能希望为每种 IoT 设备类型构建一个模型来预测故障时间。您可以使用pyspark.sql.GroupedData.applyInPandas()(在 Spark 3.0 中引入)来完成此任务。该函数接受一个pandas.DataFrame并返回另一个pandas.DataFrame。本书的 GitHub 存储库包含完整的代码示例,用于为每种 IoT 设备类型构建模型并使用 MLflow 跟踪单个模型;为简洁起见,此处仅包含一个片段:

# In Python
df.groupBy("device_id").applyInPandas(build_model, schema=trainReturnSchema)

groupBy()将导致您的数据集完全洗牌,您需要确保您的模型和每个组的数据可以放在一台机器上。你们中的一些人可能熟悉(pyspark.sql.GroupedData.apply()例如,df.groupBy("device_id").apply(build_model)),但该 API 将在未来的 Spark 版本中被弃用,而支持pyspark.sql.GroupedData.applyInPandas().

现在您已经了解了如何应用 UDF 来执行分布式推理和并行化模型构建,让我们看看如何使用 Spark 进行分布式超参数调优。

Spark 用于分布式超参数调优

即使您不打算进行分布式推理或不需要 MLlib 的分布式训练功能,您仍然可以利用 Spark 进行分布式超参数调优。本节将特别介绍两个开源库:Joblib 和 Hyperopt。

Joblib

根据其文档,Joblib是“一组在 Python 中提供轻量级流水线的工具”。它有一个 Spark 后端,用于在 Spark 集群上分发任务。Joblib 可用于超参数调整,因为它会自动将您的数据副本广播给所有工作人员,然后这些工作人员在其数据副本上创建具有不同超参数的自己的模型。这使您可以并行训练和评估多个模型。您仍然有一个基本限制,即单个模型和所有数据必须适合单个机器,但是您可以简单地并行化超参数搜索,如图 11-7所示。

要使用 Joblib,请通过pip install joblibspark. 确保您使用的是scikit-learn0.21 或更高版本以及pyspark2.4.4 或更高版本。这里展示了如何进行分布式交叉验证的示例,同样的方法也适用于分布式超参数调整:

# In Python
from sklearn.utils import parallel_backend
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.model_selection import GridSearchCV
import pandas as pd
from joblibspark import register_spark

register_spark() # Register Spark backend

df = pd.read_csv("/dbfs/databricks-datasets/learning-spark-v2/sf-airbnb/
  sf-airbnb-numeric.csv")
X_train, X_test, y_train, y_test = train_test_split(df.drop(["price"], axis=1), 
  df[["price"]].values.ravel(), random_state=42)

rf = RandomForestRegressor(random_state=42)
param_grid = {"max_depth": [2, 5, 10], "n_estimators": [20, 50, 100]}
gscv = GridSearchCV(rf, param_grid, cv=3)

with parallel_backend("spark", n_jobs=3):
  gscv.fit(X_train, y_train)
  
print(gscv.cv_results_)

有关从交叉验证器返回的参数的说明,请参阅scikit-learn GridSearchCV 文档。

Hyperopt

Hyperopt是一个 Python 库,用于“对尴尬的搜索空间进行串行和并行优化,其中可能包括实值、离散和条件维度”。您可以通过pip install hyperopt. 使用 Apache Spark 扩展 Hyperopt有两种主要方法:

  • 将单机 Hyperopt 与分布式训练算法(例如 MLlib)一起使用

  • SparkTrials将分布式Hyperopt与单机训练算法一起使用

对于前一种情况,与任何其他库相比,您无需配置任何特殊配置即可将 MLlib 与 Hyperopt 结合使用。那么,让我们看看后一种情况:具有单节点模型的分布式 Hyperopt。不幸的是,在撰写本文时,您无法将分布式超参数评估与分布式训练模型结合起来。用于并行化Keras模型的超参数搜索的完整代码示例可以在本书的GitHub 存储库中找到;这里只包含一个片段来说明 Hyperopt 的关键组件:

# In Python
import hyperopt

best_hyperparameters = hyperopt.fmin(
  fn = training_function,
  space = search_space,
  algo = hyperopt.tpe.suggest,
  max_evals = 64,
  trials = hyperopt.SparkTrials(parallelism=4))

fmin()生成新的超参数配置以供您使用training_function并将它们传递给SparkTrialsSparkTrials在每个 Spark 执行器上并行运行这些训练任务的批次作为单任务 Spark 作业。当 Spark 任务完成后,它会将结果和相应的损失返回给驱动程序。Hyperopt 使用这些新结果为未来的任务计算更好的超参数配置。这允许大规模扩展超参数调整。MLflow 还与 Hyperopt集成,因此您可以跟踪作为超参数调整的一部分训练的所有模型的结果。

的一个重要参数SparkTrialsparallelism。这决定了同时评估的最大试验次数。如果parallelism=1,那么您正在按顺序训练每个模型,但是通过充分利用自适应算法,您可能会得到更好的模型。如果您设置parallelism=max_evals(要训练的模型总数),那么您只是在进行随机搜索。1和之间的任何数字max_evals都允许您在可扩展性和适应性之间进行权衡。默认情况下,parallelism设置为 Spark 执行器的数量。您还可以指定 atimeout来限制允许的最大秒数fmin()

即使 MLlib 不适合您的问题,希望您能看到在任何机器学习任务中使用 Spark 的价值。

概括

在本章中,我们介绍了管理和部署机器学习管道的各种最佳实践。您了解了 MLflow 如何帮助您跟踪和重现实验以及打包您的代码及其依赖项以部署到其他地方。我们还讨论了主要的部署选项——批处理、流式传输和实时——及其相关的权衡。MLlib 是用于大规模模型训练和批处理/流式处理用例的绝佳解决方案,但它不会击败用于小数据集实时推理的单节点模型。您的部署要求直接影响您可以使用的模型和框架的类型,在开始您的模型构建过程之前讨论这些要求至关重要。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/87.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

编程神器Copilot逐字抄袭他人代码?

自面世后就饱受争议的 GitHub Copilot 编程神器最近又遭遇舆论风暴。 日前,德州农工大学的一位计算机科学教授 Tim Davis 在推特上发文称, GitHub Copilot 在没有标注来源也没有 LGPL 许可的情况下,输出了大量应该受版权保护的代码。 Tim Davis 还发了自己和 GitHub Copil…

90后汕头返种水稻 国稻种芯·中国水稻会:广东新农人田保姆

90后汕头返种水稻 国稻种芯中国水稻会:广东新农人田保姆 南方日报 张伟炜 新闻中国采编网 中国新闻采编网 谋定研究中国智库网 中国农民丰收节国际贸易促进会 国稻种芯中国水稻节 中国三农智库网-功能性农业农业大健康大会报道:“5月稻谷病虫害防护非常…

机器学习(周志华)课后习题

第1章 绪论 1.1 表1.1若只包含编号1和4的两个样例,试给出相应的版本空间。 版本空间:与训练及一致的假设集合。 色泽青绿,根蒂*,敲声*; 色泽*,根蒂蜷缩,敲声*; 色泽*,根…

nuxt.js 进行项目重构-首页

nuxt.js 也是基于vue 的 那么就离不开组件化开发 我们按照组件结构来进行分析 navTop 页面的头部 通用组件 分隔了三个位置 适用于大多数头部 且预留插槽 <template><div class"nav-top"><div class"left"><slot name"left…

Spring5入门到实战------10、操作术语解释--Aspectj注解开发实例。AOP切面编程的实际应用

1、操作术语 1.1、连接点 类里面哪些方法可以被增强、这些方法被称为连接点。比如&#xff1a;用户控制层有登录、注册、修改密码、修改信息等方法。假如只有登录类和注册类可以被增强&#xff0c;登录和注册方法就称为连接点 1.2、切入点 实际被真正增强的方法&#xff0c…

C++ 【UVA488】Triangle Wave

&#x1f4cb; 个人简介 &#x1f496;大家好&#xff0c;我是2022年3月份新人榜排名第三的 ༺Blog༒Hacker༻ &#x1f389;支持我&#xff1a;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; &#x1f4ac;格言&#xff1a;༺永做优质༒programmer༻ &#x1f4e3; 系列专栏&am…

Unity技术手册-编辑器基础入门万字大总结

往期文章分享点击跳转>《导航贴》- Unity手册&#xff0c;系统实战学习点击跳转>《导航贴》- Android手册&#xff0c;重温移动开发 本文约8千字&#xff0c;新手阅读需要20分钟&#xff0c;复习需要12分钟 【收藏随时查阅不再迷路】 &#x1f449;关于作者 众所周知&…

【C/C++】程序环境,探索程序的执行过程(习得无上内功《易筋经》的第一步)

目录1.程序的翻译环境和执行环境2.详解编译链接2.1翻译环境2.2编译本身也分为几个阶段预编译&#xff08;预处理&#xff09;编译汇编详解符号表形成符号表2.3.链接合并段表符号表的合并和重定位3.运行环境总结&#xff1a;1.程序的翻译环境和执行环境 在ANSIC&#xff08;标准…

LeetCode每日一题——1235. 规划兼职工作

LeetCode每日一题系列 题目&#xff1a;1235. 规划兼职工作 难度&#xff1a;困难 文章目录LeetCode每日一题系列题目示例思路题解题目 你打算利用空闲时间来做兼职工作赚些零花钱。 这里有 n 份兼职工作&#xff0c;每份工作预计从 startTime[i] 开始到 endTime[i] 结束&a…

1024程序员节|基于Springboot实现爱心捐赠管理系统

作者主页&#xff1a;编程指南针 作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、掘金特邀作者、多年架构师设计经验、腾讯课堂常驻讲师 主要内容&#xff1a;Java项目、毕业设计、简历模板、学习资料、面试题库、技术互助 文末获取源码 项目编号&#xff1a;BS-XX-…

Mybatis-plus学习(基于版本3.0.5)

文章目录一.概念1.1 简介1.2 特性二.快速入门三.CRUD扩展3.1 Insert插入3.2 主键生成策略3.3 Update更新3.4 自动填充3.5 乐观锁3.6 查询操作3.7 删除操作3.8 性能分析插件&#xff08;新版本的Mybatis-plus已将此插件移除&#xff09;3.9 条件构造器3.10 代码生成器一.概念 1…

Transformer合集3

太多了 我都累了 这都第4了 这次先是关于他的小样本目标检测 , 用很少的训练示例检测新目标 小样本目标检测 论文地址&#xff1a; https://openaccess.thecvf.com/content/CVPR2022/papers/Han_Few-Shot_Object_Detection_With_Fully_Cross-Transformer_CVPR_2022_paper.…

docker安装influxdb及备份恢复

influxdb安装influxdb1&#xff0c;拉取镜像2&#xff0c;创建目录并进入到目录内3&#xff0c;创建influxdb容器服务4&#xff0c;访问&#xff1a;ip8086备份恢复influxdb数据准备1.1 创建用户&#xff0c;填入组织&#xff0c;桶信息1.2&#xff0c;给桶添加点数据1&#xf…

ansible部署lnmp架构

环境准备&#xff1a; 主机名IP服务系统ansible192.168.160.131ansibleCentOS-8.5nginx192.168.160.132nginxCentOS-8.5mysql192.168.160.137mysqlCentOS-8.5php192.168.160.139phpCentOS-8.5 1、生成私钥&#xff0c;对另外三台主机进行免密登入 [rootansible ~]# ssh-keyge…

【单片机毕业设计】【mcuclub-jj-007】基于单片机的门铃的设计

最近设计了一个项目基于单片机的门铃&#xff0c;与大家分享一下&#xff1a; 一、基本介绍 项目名&#xff1a;门铃 项目编号&#xff1a;mcuclub-jj-007 单片机类型&#xff1a;STC89C52、STM32F103C8T6 具体功能&#xff1a; 1、通过人体热释电检测是否有人&#xff0c;当…

Java --- 创建SpringMVC项目

目录 一、什么是MVC 二、什么是SpringMVC 三、SpringMVC的特点 四、创建SpringMVC项目 4.1、开发环境 4.2、创建maven工程 4.3、配置web.xml文件 4.4、创建请求控制器 4.5、配置springMVC.xml文件 4.5、访问首页面 4.6、访问指定页面 一、什么是MVC MVC是一种软件架…

C++:C++的IO流

while (scanf("%s", buff) ! EOF)如何终止&#xff1f; 答&#xff1a;ctrl z换行 是规定&#xff0c;ctrl c 是发送信号杀死进程&#xff08;一般不建议ctrl c&#xff09;。 int main() {string str;while (cin >> str) // operator>>(cin, str){cou…

K_A01_001 基于单片机驱动WS2812 点灯流水灯 0-9显示

目录 一、资源说明 二、基本参数 三、通信协议说明 WS2812时序: 代码: 四、部分代码说明 1、接线说明 2、主函数 五、相关资料链接 六、数字提取格式 七、视频效果展示与资料获取 八、项目所有材料清单 九、注意事项 十、接线表格 一、资源说明 单片机型号 测试条件 模…

【一起学习数据结构与算法】优先级队列(堆)

目录一、什么是优先级队列&#xff1f;二、堆 (heap&#xff0c;基于二叉树)2.1 什么是堆&#xff1f;2.2 堆的分类2.3 结构与存储三、堆的操作3.1 堆创建3.2 插入元素3.3 弹出元素四、用堆模拟实现优先级队列五、堆的一个重要应用-堆排序六、经典的TOPK问题6.1 排序6.2 堆一、…

如何用两个晚上教女生学会Python

文章目录安装、需求引导和开发模型命令行计算器用温度指导穿衣VS Code 和女孩子的衣柜用遍历来挑选衣物交互课后作业事情的起因是这样的&#xff0c;知乎上有个妹纸加我&#xff0c;说要相亲。尽管我欣喜若狂&#xff0c;但恰巧在外出差&#xff0c;根本走不开。妹纸于是说要不…