Zilliz 推出 Spark Connector:简化非结构化数据处理流程

news2025/1/13 3:10:30

随着人工智能(AI)和深度学习(Deep Learning)技术的高速发展,使用神经网络模型将数据转化为 Embedding 向量 已成为处理非结构化数据并实现语义检索的首选方法,广泛应用于搜索、推荐系统等 AI 业务中。

以生产级别的搜索系统为例,该系统通常包含两个部分:离线数据索引和在线查询服务。实现该系统需要使用多种技术栈。例如,在离线处理中,如何将来源于多种渠道的非结构化数据数据高效、方便地处理并推送到向量数据库以实现在线查询,是一个充满挑战的问题。Apache Spark 和 Databricks 是应用广泛的大批量数据处理方案。Zilliz Cloud 推出了 Spark Connector。该工具将 Milvus 和 Zilliz Cloud 向量数据库 API 集成于 Apache Spark 和 Databricks 任务,大大简化数据处理和推送的实现难度。

本文将介绍 Spark Connector 及其应用场景,并手把手教你如何使用它实现数据推送。

Spark Connector 工作原理及使用场景

Apache Spark 和 Databricks 适合处理海量数据,例如以批量的方式进行非结构化数据清洗并调用模型生成 Embedding 向量。而 Milvus 则擅长存储模型生成的 Embedding 向量数据,并构建索引支持在线服务中的高效查询。这两大工具的强强联合可以实现轻松开发生成式 AI、推荐系统、图像和视频搜索等应用。

当用户在搭建 AI 应用时,很多用户都会遇到如何将数据从 Apache Spark 或 Databricks 导入到 Milvus 或 Zilliz Cloud (全托管的 Milvus 服务) 中的问题。使用 Spark Connector,用户能够在 Apache Spark 或 Databricks 任务中直接调用函数,完成数据向 Milvus 的增量插入或者批量导入,不需要再额外实现“胶水”业务逻辑,简化了数据推送流程。

批量导入数据

由于深度学习进展日新月异,专注于深度学习的团队通常需要频繁更新 Embedding 模型。在第一次批量建库,或者每次更新模型后,都需要处理全量数据、生成一套新的向量数据集。这样一来,就需要启动一个新的 Spark 任务来执行这一次处理,并将新的向量数据集重新插入到向量数据库中以供在线服务使用。有了 Databricks Connector,您只需要授予 Spark 任务写入 Milvus S3 bucket (或者授予 Zilliz Cloud 访问临时的数据源 bucket)的权限即可。简化后的数据处理流程允许您仅仅通过一个简单的函数调用将 Spark 任务生成的向量直接加载到 Milvus 或 Zilliz Cloud 实例中。

增量插入数据

// Specify the target Milvus instance and vector data collection
df.write.format("milvus")
    .option(MILVUS_URI, "https://in01-xxxxxxxxx.aws-us-west-2.vectordb.zillizcloud.com:19535")
    .option(MILVUS_TOKEN, dbutils.secrets.get(scope = "zillizcloud", key = "token"))
    .option(MILVUS_COLLECTION_NAME, "text_embedding")
    .option(MILVUS_COLLECTION_VECTOR_FIELD, "embedding")
    .option(MILVUS_COLLECTION_VECTOR_DIM, "128")
    .option(MILVUS_COLLECTION_PRIMARY_KEY, "id")
    .mode(SaveMode.Append)
    .save()

对于数据量相对较小的用户而言,使用 Spark Connector 也能简化开发工作。您的任务中无需再实现建立服务端连接以及插入数据的代码,只需调用 Connector 中提供的函数即可。

如何使用 Spark Connector

下面,我们将介绍如何使用 Spark Connector 简化数据迁移和处理流程。

使用 Dataframe 直接进行增量插入

使用 Spark Connector,您可以直接利用 Apache Spark 中 Dataframe 的 write API 将数据以增量方式插入到 Milvus 中,大幅降低数据插入流程的实现成本。同理,您也可以直接将数据从 Apache Spark 或 Databricks 导入到 Zilliz Cloud(全托管的 Milvus 服务)中。以下为示例代码:

将数据批量导入到 Collection 中

如果您需要将大量数据高效导入 Collection 中,我们推荐使用 MilvusUtils. bulkInsertFromSpark() 函数。

  • 将数据加载到 Milvus Collection 中

这个过程中需要使用 S3 或 MinIO bucket 作为 Milvus 实例的内部存储。Spark 或 Databricks 任务获取 bucket 的写入权限后,就可以使用 Connector 将数据批量写入 bucket 中,最终一次操作批量插入到向量 Collection 中以供查询使用。

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3a://milvus-bucket/result"
df.write
  .mode("overwrite")
  .format("parquet")
  .save(outputPath)
// Specify Milvus options.
val targetProperties = Map(
  MilvusOptions.MILVUS_HOST -> host,
  MilvusOptions.MILVUS_PORT -> port.toString,
  MilvusOptions.MILVUS_COLLECTION_NAME -> targetCollectionName,
  MilvusOptions.MILVUS_BUCKET -> bucketName,
  MilvusOptions.MILVUS_ROOTPATH -> rootPath,
  MilvusOptions.MILVUS_FS -> fs,
  MilvusOptions.MILVUS_STORAGE_ENDPOINT -> minioEndpoint,
  MilvusOptions.MILVUS_STORAGE_USER -> minioAK,
  MilvusOptions.MILVUS_STORAGE_PASSWORD -> minioSK,
)
val targetMilvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(targetProperties.asJava))
 
// Bulk insert Spark output files into Milvus
MilvusUtils.bulkInsertFromSpark(spark, targetMilvusOptions, outputPath, "parquet")
  • 将数据加载到 Zilliz Cloud Collection 中

如果您使用的是全托管 Milvus 服务——Zilliz Cloud,您可以使用 Zilliz Cloud 提供的数据导入 API 。Zilliz Cloud 提供多样的工具和完整的文档,从而帮助您将各种来源(如 Spark)的数据高效导入 Zilliz Cloud 中。您需要设置一个 S3 bucket 作为媒介,然后授权 Zilliz Cloud 读取 bucket 中的数据。这样一来,Zilliz Cloud 数据导入 API 便可无缝将数据从 S3 bucket 加载到向量数据库中。

以 Databricks 为例,开始前,您需要先通过在 Databricks 集群中添加 jar 文件来加载带有Spark Connector 的 Runtime 库。有多种安装库的方法。下图展示了如何从本地上传 jar 至集群。

如需了解更多如何在 Databricks Workspace 中安装库的信息,请参阅 Databrick 官方文档。

批量插入数据时需要将数据存储在一个临时的 bucket 中,随后再批量导入至 Zilliz Cloud 中。您可以先创建一个 S3 bucket,点击此处了解详情。为了保护您的 Zilliz Cloud 鉴权用户名密码安全,您可以跟随指南在 Databricks 上安全管理密码。

以下为批量数据迁移的示例代码。和前文的 Milvus 例子一样,您只需要填写用于鉴权的向量数据库 URI、Token 以及 S3 bucket 的地址、AK、SK。

// Write the data in batch into the Milvus bucket storage.
val outputPath = "s3://my-temp-bucket/result"
df.write
  .mode("overwrite")
  .format("mjson")
  .save(outputPath)

// Specify Milvus options.
val properties = Map(
  MILVUS_URI -> uri,
  MILVUS_TOKEN -> token,
  MILVUS_COLLECTION_NAME -> collectionName,
  MILVUS_STORAGE_ENDPOINT -> s3Endpoint,
  MILVUS_STORAGE_USER -> s3ak,
  MILVUS_STORAGE_PASSWORD -> s3sk,
  ZILLIZCLOUD_API_KEY -> apiKey,
  ZILLIZCLOUD_REGION -> region,
  ZILLIZCLOUD_INSTANCE_ID -> clusterId,
)
val milvusOptions = new MilvusOptions(new CaseInsensitiveStringMap(properties.asJava))

// Call util func to bulkinsert data into Zilliz Cloud through Import Data API.
MilvusUtils.bulkInsertFromSpark(spark, milvusOptions, outputDir, "json")

Connector 使用全流程:Notebook 示例

为帮助您快速上手,我们准备了一个 Notebook 示例 完整地介绍了如何使用 Connector 简化数据增量或批式导入至 Milvus 或 Zilliz Cloud 的流程。

总结

Apache Spark 和 Databricks 与 Milvus 和 Zilliz Cloud(全托管的 Milvus 服务)的整合为 AI 应用开发进一步带来了便利。开发人员可以轻松将数据以增量或批量的形式从数据处理端导入 Milvus 和 Zilliz Cloud 中,实现高效的检索。Spark Connector 助力高效开发可扩展的 AI 解决方案,充分释放非结构化数据的潜能。

准备好开启您的 AI 之旅了吗?立刻免费使用 Zilliz Cloud。

本文作者:陈将,Zilliz 生态和 AI 平台负责人。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1944262.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows 11+Visual Studio 2022 环境OpenCV+CUDA 12.5安装及踩坑笔记

周六日在家捣腾了一下,把过程记录下来。 前置条件 Visual Studio C 生成工具和本机显卡适配的CUDA与CUDA匹配的cuDNNPython 3NumPyOpenCV源代码以及对应版本的OpenCV-contrib模块源码CMake Visual Studio 下载Visual Studio(我本机的是VS2022&#xf…

虚拟局域网配置与分析-VLAN

前言:本博客仅作记录学习使用,部分图片出自网络,如有侵犯您的权益,请联系删除 一、相关知识 虚拟局域网(Virtual Local Area Network,VLAN)是一组逻辑上的设备和用户;不受物理位置的…

二、【Python】入门 - 【PyCharm】安装教程

往期博主文章分享文章: 【机器学习】专栏http://t.csdnimg.cn/sQBvw 目录 第一步:PyCharm下载 第二步:安装(点击安装包打开下图页面) 第三步:科学使用,请前往下载最新工具及教程&#xff1a…

前端:Vue学习-3

前端:Vue学习-3 1. 自定义指令2. 插槽2.1 插槽 - 后备内容(默认值)2.2 插槽 - 具名插槽2.3 插槽 - 作用域插槽 3. Vue - 路由3.1 路由模块封装3.2 声明式导航 router-link 高亮3.3 自定义匹配的类名3.4 声明式导肮 - 跳转传参3.5 Vue路由 - 重…

C#初级——条件判断语句和循环语句

条件判断语句 简单的条件判断语句&#xff0c;if()里面进行条件判断&#xff0c;如果条件判断正确就执行语句块1&#xff0c;如果不符合就执行语句块2。 if (条件判断) { 语句块1 } else { 语句块2 } int age 18;if (age < 18){Console.WriteLine("未…

Python面试宝典第18题:单词搜索

题目 给定一个m x n的二维字符网格board和一个字符串单词word。如果word存在于网格中&#xff0c;返回true。否则&#xff0c;返回false。单词必须按照字母顺序&#xff0c;通过相邻的单元格内的字母构成。所谓相邻单元格&#xff0c;是那些水平相邻或垂直相邻的单元格。 备注&…

Blender材质-PBR与纹理材质

1.PBR PBR:Physically Based Rendering 基于物理的渲染 BRDF:Bidirection Reflectance Distribution Function 双向散射分散函数 材质着色操作如下图&#xff1a; 2.纹理材质 左上角&#xff1a;编辑器类型中选择&#xff0c;着色器编辑器 新建着色器 -> 新建纹理 -> 新…

爬虫学习3:爬虫的深度爬取

爬虫的深度爬取和爬取视频的方式 深度爬取豆瓣读书 import time import fake_useragent import requests from lxml import etree head {"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 …

[AWS]MSK调用,报错Access denied

背景&#xff1a;首先MSK就是配置一个AWS的托管 kafka&#xff0c;创建完成之后就交给开发进行使用&#xff0c;开发通常是从代码中&#xff0c;编写AWS的access_key 和secret_key进行调用。 但是开发在进行调用的时候&#xff0c;一直报错连接失败&#xff0c;其实问题很简单&…

AI学习记录 - 本地知识库实现的相关知识

在公司内部实现了个知识库&#xff0c;由于保密吗&#xff0c;只介绍在实现知识库的过程中用到的知识&#xff08;虽然知识库也是个烂大街的东西了hehehehe&#xff09; 1、分词器 先分词&#xff0c;中文可以使用jieba分词 2、构造数据集 将词汇向量化是自然语言处理中的…

OpenHarmony 开发

本心、输入输出、结果 文章目录 OpenHarmony 开发前言JonathanOpenHarmony 并不是 AndroidOpenHarmony 应用迁移OpenHarmony 的开发流程OpenHarmony 开发 编辑 | 简简单单 Online zuozuo 地址 | https://blog.csdn.net/qq_15071263 如果觉得本文对你有帮助,欢迎点赞、收藏、评…

【北京迅为】《i.MX8MM嵌入式Linux开发指南》-第三篇 嵌入式Linux驱动开发篇-第四十七章 字符设备和杂项设备总结回顾

i.MX8MM处理器采用了先进的14LPCFinFET工艺&#xff0c;提供更快的速度和更高的电源效率;四核Cortex-A53&#xff0c;单核Cortex-M4&#xff0c;多达五个内核 &#xff0c;主频高达1.8GHz&#xff0c;2G DDR4内存、8G EMMC存储。千兆工业级以太网、MIPI-DSI、USB HOST、WIFI/BT…

vim gcc

vim 使用 vs filename 分屏 ctrl ww 切窗口 shift zz 快速提出vim vim配置 vim启动时自动读取当前用户的家目录的.vimrc文件 vim配置只影响本用户 其他用户观看同一文件不受影响 gcc指令 & c文件编译过程 动态库 静态库 & 链接方式 有相应库才能进行…

数据传输安全--IPSEC

目录 IPSEC IPSEC可以提供的安全服务 IPSEC 协议簇 两种工作模式 传输模式 隧道模式 两个通信保护协议&#xff08;两个安全协议&#xff09; AH&#xff08;鉴别头协议&#xff09; 可以提供的安全服务 报头 安全索引参数SPI 序列号 认证数据 AH保护范围 传输模…

深入浅出WebRTC—LossBasedBweV2

WebRTC 同时使用基于丢包的带宽估计算法和基于延迟的带宽估计算法那&#xff0c;能够实现更加全面和准确的带宽评估和控制。基于丢包的带宽估计算法主要依据网络中的丢包情况来动态调整带宽估计&#xff0c;以适应网络状况的变化。本文主要讲解最新 LossBasedBweV2 的实现。 1…

SSIS_SQLITE

1.安装 SQLite ODBC 驱动程序 2.添加SQLite数据源 在“用户DSN”或“系统DSN”选项卡中&#xff0c;点击“添加”。选择“SQLite3 ODBC Driver”&#xff0c;然后点击“完成”。在弹出的配置窗口中&#xff0c;设置数据源名称&#xff08;DSN&#xff09;&#xff0c;并指定S…

python实现特征检测算法4

python实现Richardson-Lucy 反卷积算法 Richardson-Lucy 反卷积算法算法原理Python实现详细解释Richardson-Lucy算法的优缺点应用领域Richardson-Lucy 反卷积算法 Richardson-Lucy反卷积算法是一种迭代算法,用于恢复因成像系统中的点扩散函数(PSF)导致的模糊图像。该算法最…

Spring MVC 应用分层

1. 类名使⽤⼤驼峰⻛格&#xff0c;但以下情形例外&#xff1a;DO/BO/DTO/VO/AO 2. ⽅法名、参数名、成员变量、局部变量统⼀使⽤⼩驼峰⻛格 3. 包名统⼀使⽤⼩写&#xff0c;点分隔符之间有且仅有⼀个⾃然语义的英语单词. 常⻅命名命名⻛格介绍 ⼤驼峰: 所有单词⾸字⺟…

【LLM-推理】Self-Refine:使用feedback迭代修正LLM的Output

来源&#xff1a; https://selfrefine.info/ 1.论文速读(摘要引言) 本文主要提出了Self-Refine策略&#xff0c;旨在通过一个LLM不断refine修正LLM的输出&#xff0c;使其在无需额外训练的情况下&#xff0c;在下游任务产生更好的效果。 该方法的直观Insight&#xff1a;我们…

7.23 字符串简单中等 520 125 14 34

520 Detect Capital 思路&#xff1a; 题目&#xff1a;判定word &#xff1a;if the usage of capitals in it is right.遍历所有的string&#xff1a; 两种情况&#xff1a; 首字母capitals–>判定第二个字母是否大写–>所有字母大写 otherwise 除第一个以外全部小写&a…