如何在生成式AI里使用 Ray Data 进行大规模 RAG 应用的 Embedding Inference

news2025/1/11 6:25:14

检索增强生成 (RAG) 是企业级生成式 AI(GenAI)应用的热门案例之一。多数 RAG 教程演示了如何利用 OpenAI API 结合 Embedding 模型和大语言模型(LLM)来进行推理(Inference)。然而,在开发过程中,如果能使用开源工具,就可以免去访问自己数据的费用,同时也能加快迭代。

在 Embedding 步骤(即将数据转换为向量的过程)中,使用 Ray Data 取得的性能提升尤为显著。相比于使用 Pandas,采用 Ray Data 等工具对批量推理请求进行汇总处理可以显著节省资源和处理时间。例如,在一台配备 16GB RAM 的 Mac M2 笔记本电脑上,仅使用四个 worker node,Ray Data 的处理速度就比 Pandas 快了 60 倍!本文将详细介绍使用 Milvus + Ray Data 进行 Embedding inference 的性能。

我们的开源 RAG 技术栈包括:

  • BGM-M3 Embedding 模型:该模型能一次性生成三种类型的向量,包括稀疏向量、稠密向量和多向量。

  • Ray Data:高效的分布式 Embedding inference 推理工具。

  • AWS S3:用于临时存储推理结果。

  • Milvus 或 Zilliz Cloud:用作向量数据库。

示例数据来源:Kaggle 的 IMDB 海报数据集。

开源 RAG 技术栈

BGM-M3 Embedding 模型

BGM-M3 Embedding 模型是一种强大的多功能 Embedding 工具,其特点在于能够处理多语言(Multi-Linguality)、多功能(Multi-Functionality)和多粒度(Multi-Granularity)的数据,因此得名“M3”。该模型支持超过 100 种语言,并能计算三种常见的 Embedding 类型:稀疏向量、稠密向量和多向量。它还可以处理各种长度的文本——从短句到长文档,最多可支持 8192 个 token。更多详情,请参考论文或通过 HugginFace 网站了解此模型。

Milvus 2.4 版本现已集成 BGM-M3 Embedding 模型。

Ray Data

有长期运行的数据转换任务?

Ray Data 具备可扩展的数据处理能力,能够便捷和快速地在多台机器(CPU、GPU 等)上并行处理大量数据。Ray Data 尤其适用于数据可以被分割,能够并行处理的场景,例如同时进行的多个数据块切分和 Embedding 转换的任务。Ray Data 底层是一个强大的流式执行引擎,能够最大化集群中的 GPU 利用率。与使用在线服务(如 OpenAI 嵌入 API)运行嵌入相比,使用 Ray Data 进行离线嵌入作业可以节省大部分成本。

Anyscale 是 Ray 的托管平台。您可以轻松地在 Anyscale 上利用 GPU 机器扩展 Embedding 任务。

Milvus 和 Zilliz Cloud

RAG(Retrieval Augmented Generation)应用之所以能够迅速响应,关键在于其背后强大的向量数据库。Milvus 专为处理大规模业务和海量数据设计。与其他向量数据库不同,Milvus 能够根据数据量的增长灵活地进行扩展。Milvus 的存储、索引和查询是独立的,可以单独垂直扩展或水平扩展,这一设计让 RAG 应用能够迅速响应用户的实时查询需求,同时在收到查询前和收到查询时,Milvus 能够智能地进行离线计算。除了强大的性能外,Milvus 还提供了多种企业级重要特性,如多租户和基于角色的访问控制(RBAC)、高可用等。

Zilliz Cloud 基于开源的 Milvus 搭建的全托管向量数据库云服务。

设置 RAG 工具

本教程中将使用 Milvus 的 Python SDK、Ray Data、Amazon S3 和 Zilliz Cloud。

开始前,请先注册 AWS 账号以使用 Amazon S3。然后前往 console.aws.amazon.com > IAM > My security credentials > Create access key。复制并保存密钥(Access key 和 secret key)。

安装所需的工具库并运行 aws config。请在文件中输入 AWS 密码。

pip install boto3 
pip install awscli –force-reinstall –upgrade
aws config #fill in your key and secret key
more ~/.aws/credentials #make sure this looks correct

安装 Ray Data。

pip install -U "ray[data]"

安装 Pymilvus。

pip install -U pymilvus "pymilvus[model]" langchain

PyMilvus 2.4 版本及以上已打包 BGE-M3 embedding 模型。

import ray, os, pprint, time, boto3
from langchain.text_splitter import RecursiveCharacterTextSplitter
import numpy as np
import pymilvus
print(pymilvus.__version__) # must be >= 2.4.0
from pymilvus.model.hybrid import BGEM3EmbeddingFunction

如需使用 Zilliz,请先注册账号并创建集群。

准备数据

本文使用了 Kaggle IMDB 海报数据作为数据集,其中包含大约 48,000 部电影、影评、海报链接以及一些元数据。

我们将所有文本字段(电影名称、描述、影评文本)复制到名为‘text’的一列中,并将其保存为 Parquet 格式,因为这种文件格式比 CSV 更高效。

生成 Embedding 向量

根据以下步骤生成 Embedding 向量:

  1. 切分数据: 将输入文本切分为片段,将语义上相关的文本片段保存在一起。

  2. 调用推理(inference)模式下的 Embedding 模型,用于生成文本片段的 Embedding 向量。

Ray Data 可以并行处理以下数据操作请求:

  1. flat_map():切分数据(输出的行数将多于输入行数)。

  2. map_batches():调用 Embedding 模型。

chunk_size = 512
chunk_overlap = np.round(chunk_size * 0.10, 0)

# Define a LangChain text splitter.
text_splitter = RecursiveCharacterTextSplitter(
   chunk_size=chunk_size,
   chunk_overlap=chunk_overlap,
   length_function=len)  #len is a built-in Python function

# 1. Define a regular python function for chunking.
def chunk_row(row, splitter=text_splitter):

   # Copy the row columns into metadata.
   metadata = row.copy()
   del metadata['text'] # Remove text from metadata

   # Split the text into chunks.
   chunks = splitter.create_documents(
       texts=[row["text"]],
       metadatas=[metadata])
   chunk_list = [{
       "text": chunk.page_content,
       **chunk.metadata} for chunk in chunks]

   return chunk_list

# 2. Define a class with a callable method to compute embeddings.
class ComputeEmbeddings:
     def __init__(self):
           # Initialize a Milvus built-in sparse-dense-late-interaction-reranking encoder.
           # https://huggingface.co/BAAI/bge-m3
           self.model = BGEM3EmbeddingFunction(use_fp16=False, device="cpu")
           print(f"dense_dim: {self.model.dim['dense']}")
           print(f"sparse_dim: {self.model.dim['sparse']}")

     def __call__(self, batch):

           # Ray data batch is a dictionary where values are array values.
           # BGEM3EmbeddingFunction input is docs as a list of strings.
           docs = list(batch['text'])

           # Encode the documents. bge-m3 dense embeddings are already normalized.
           embeddings = self.model(docs)
           batch['vector_dense'] = embeddings['dense']
           return batch

if __name__ == "__main__":

   FILE_PATH = "s3://zilliz/kaggle_imdb.parquet"

   # Load and transform data.

   ds = ray.data.read_parquet(FILE_PATH)

   # Chunk the input text
   chunked_ds = ds.flat_map(chunk_row)

   # Compute embeddings with a class that calls the embeddings model.
   embeddings_ds = chunked_ds.map_batches(ComputeEmbeddings, concurrency=4)

   # Save the embeddings to S3 in a folder of parquet part files.
   embeddings_ds.write_parquet('s3://zilliz/kaggle_imdb_embeddings')

运行前,需要提交 Ray 任务:

  1. 将代码保存至 Python 脚本文件。本例中将文件命名为 ray_data_demo.py

  2. 在本地运行前,请先创建一个全新的路径,路径中只可包含 .py 脚本文件和 .parquet 数据文件。本例中将这个路径命名为ray_cluster

  3. 运行 Python 脚本以启动 Ray 集群并自动提交任务。

  4. 打开 http://127.0.0.1:8265,查看集群和任务。

Embedding 速度提升 60 倍

表格展示了在 16GB M2 笔记本电脑上 Embedding 任务的用时。示例使用了 1 个单节点 Ray 集群用于批量处理任务,并发为 4。Pandas 用时更长,原因是 Pandas 只有一个处理器。但是 Ray Data 有 4 个处理器。 Pandas 和 Ray在处理大型数据时性能都更出色。

将数据从 S3 批量插入到 Milvus 或 Zilliz Cloud 中

Milvus 和 Zilliz Cloud都支持批量插入(bulk insert)数据,可以直接从 AWS、GCP 或 Azure 对象存储中导入 Embedding 数据。除了界面外,Zilliz Cloud 还提供 RESTful API 和 SDK。

对于大量 Embedding 数据,使用 bulk insert 可以显著节省机器资源并缩短插入时间,与普通 insert 相比更为高效。此外,通过 bulk insert 构建的向量搜索索引比普通 insert 的索引更高效。

在 Zilliz Cloud 界面上仅需轻击几次鼠标便可轻松批量插入数据。首先,在集群中创建 1 个全新的 collection。创建时,需要开启 AutoID 和动态列,添加向量列,并设置向量列的向量维度。设置完毕后,点击“创建 Collection”。

接下来,点击“导入数据”并按照界面上的提示输入 parquet 文件的路径。(请注意,如果您的 S3 存储桶为非公开链接,您还需要输入 Access Key 和 Secret Key,以便 Zilliz Cloud 读取其中的数据)。Zilliz Cloud 支持 Amazon S3、Google Cloud Storage 或 Azure Blob Storage 等对象存储服务。点击“导入”开始将所有数据导入向量数据库 collection。

导入任务完成后,您可以选择在 collection 上构建索引,加速后续的向量搜索。

查询数据

为测试新导入至 Collection 中的数据,让我们基于电影数据进行提问。

def mc_run_search(question, output_fields, top_k=2, filter_expression=""):
  
   # Embed the question using the same encoder.
   embeddings = model_bgem3([question])
   query_embeddings = embeddings['dense']

   # Run semantic vector search using your query and the vector database.
   results = mc.search(
       COLLECTION_NAME,
       data=query_embeddings,
       search_params=SEARCH_PARAMS,
       output_fields=output_fields,
       # Milvus can utilize metadata in boolean expressions to filter search.
       filter=filter_expression,
       limit=top_k,
       consistency_level="Eventually"
   )

   # Assemble retrieved context and context metadata.
   # The search result is in the variable `results[0]`, which is type
   # 'pymilvus.orm.search.SearchResult'.
   METADATA_FIELDS = [f for f in output_fields if f != 'chunk']
   formatted_results, context, context_metadata = _utils.client_assemble_retrieved_context(
       results, metadata_fields=METADATA_FIELDS, num_shot_answers=top_k)
   return formatted_results, context, context_metadata

SAMPLE_QUESTION = "muybridge horse movie"

# Return top k unique results with HNSW index.
TOP_K = 2

# Define output fields to return.
OUTPUT_FIELDS = ["movie_id", "chunk", "PosterLink"]

formatted_results, context, context_metadata = \
   mc_run_search(SAMPLE_QUESTION, OUTPUT_FIELDS, TOP_K)

以下为查询结果:

完整的 Ray Data 脚本可在此获取。

总结

文本介绍了如何使用 Ray Data 和 Milvus/Zilliz Cloud 的批量插入功能大幅加速向量生成和加载过程。使用 Milvus 能够高效构建索引、节省计算资源、提升向量搜索速度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1986538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【第16章】Spring Cloud之Gateway全局过滤器(安全认证)

文章目录 前言一、公共模块1. 引入依赖2. 工具类 二、用户服务1. 新建模块2. 引入依赖3. 启动类4. 基本配置5. 登录接口 三、网关服务1. 引入依赖2. 应用配置3. 自定义过滤器 四、单元测试1.介绍2. 登录接口3. 提供者接口3.1 无token3.2 有token 总结 前言 我们已经接入了网关…

构造散列表以及处理冲突问题

目录 一. 散列表的基本概念 二. 散列表的冲突问题 三. 构造散列函数 四. 处理冲突 一. 散列表的基本概念 散列表简而言之就是一个根据一个对应关系(这个对应关系也叫散列函数)来存储关键码(关键字)的一个表。 例如,…

SSM电子商务系统-计算机毕业设计源码68470

基于SSM框架的电子商务系统的设计与实现 摘 要 随着电子商务的迅猛发展和计算机信息技术的全面跃升,网上购物系统由于其迎合了人们诉求和期望而渗入社会生活各个层面和角落。本文设计并实现了一个基于SSM框架的电子商务系统。该系统旨在为用户提供一个舒适且快捷的…

创建第一个Spring MVC项目

上篇文章,我们围绕什么是Spring MVC进行讲述了,这篇,我们将在IDEA创建我们的第一个Spring MVC项目。 创建我们的第一个Spring MVC项目 创建项目模块 如果你出现上面小图这样的,代表你就成功创建成功了一个Web项目,当…

商品购物网页的设计

系统名称: 基于TCP网络通信及数据库的网页查询系统 文档作者:清馨 创作时间:2024-8-3 最新修改时间:2024-8-6 最新版本号: 1.0 1.背景描述: 该系统为创建网络并发服务器,通过HTTP超文本网络…

【ML】 如何训练transform model, 模型训练实现细节

【ML】 如何训练transform model, 模型训练实现细节 1. transform 训练 原理2. transform 训练TIPS2.1 copy mechanism![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/69fb84a73d0240cc9042e17ae10bbef7.jpeg) 2. bean search束搜索的工作原理:束搜索的特…

部署DR模式集群

一、配置实验环境 每台主机的防火墙和SELinux都要关掉 systemctl stop firewalld setenforce 0 1、client(eth0为nat模式) 配置好网卡IP和网关IP,然后重启网卡 nmcli connection reload nmcli connection up eth0 [rootclient ~]# cat /etc/NetworkManager/syst…

500Kg载重履带式无人车技术详解

本款500Kg载重履带式无人车,专为复杂环境与多样化任务设计,具备卓越的机动性和承载能力。其基础参数如下: 最大载重量:500Kg,适用于运输装备、物资或执行特定作业任务。 整车重量:根据具体配置有所不同&am…

解决RDP远程计算机收到非预期的证书的问题

打开证书管理 查看远程桌面证书删除 然后搜索别的同名证书,如果在受信任的根证书里面,也删除。

差旅平台如何为企业降本30%?事前管控是关键

在多员工、业务跨地域的众多企业中,差旅成本常常成为一项巨大负担。为了有效解决这一问题,许多企业开始寻求可以显著降低企业差旅成本的差旅平台。本次,我们将集中拆解上市公司中智药业集团是如何通过差旅平台分贝通实现差旅成本减少30%效果的…

微信小程序教程011-3:京西购物商城实战之Home页实现

文章目录 3、首页3.0 创建home分支3.1 配置网络请求3.2 轮播图区域3.2.1 请求轮播图的数据3.2.2 渲染轮播图的UI结构3.2.3 配置小程序分包3.2.4 点击轮播图跳转到商品详情页3.2.5 封装 uni.$showMsg() 方法3.3 分类导航区域3.3.1 获取分类导航的数据3.3.2 渲染分类导航的UI结构…

通过Sui Gas Pool扩展赞助交易gas费

Mysten Labs正在向Sui开发者社区开源Sui Gas Pool功能。这项创新服务旨在大规模赞助Sui上交易的gas费,解决高并发需求应用的用户入驻问题。 Sui的原生功能赞助交易,允许交易使用与发送者地址不同的gas coin支付gas费。这一功能允许服务补贴用户的交易成…

如何评估并选择最佳的国内项目管理软件?

国内外主流的10款国内项目管理软件对比:PingCode、Worktile、Jira 、Basecamp、Trello、Asana 、Wrike、Tower 、禅道、Teambition 。 在选择适合自己企业的项目管理软件时,很多人会感到无从下手,担心无法找到既符合预算又能满足团队需求的解…

【优秀python案例】基于百度贴吧的数据采集与文本分析设计与实现

数据采集实现: 对百度贴吧帖子数据的采集。首先,使用requests库发送HTTP请求,通过设置请求头模拟浏览器访问,获取网页的HTML内容。然后,利用BeautifulSoup库对HTML内容进行解析,以便提取所需的信息。 在循…

AI回答:C#项目编译后生成部分文件的主要职责

【引入】以ConsoleApp1为例,请问C#编译之后以下文件有啥用 1.bin\runtimes 文件夹存放什么,有什么用? bin\runtimes 文件夹存放了项目的运行时相关文件,这些文件包括了各种目标平台的运行时库。 2.bin\生成的exe文件可以在别的电脑…

(这是让文心一言生成的文心一言指令博客)3分钟学会写文心一言指令:解锁AI创作新境界

3分钟学会写文心一言指令:解锁AI创作新境界 在这个AI技术日新月异的时代,文心一言作为领先的智能语言模型,正逐步改变着我们的创作与交流方式。无论是撰写文章、创作诗歌,还是进行日常对话,文心一言都能凭借其强大的语…

记忆化搜索——1

目录 1.斐波那契数 2.不同路径 3.最长递增子序列 4.猜数字大小2 5.矩阵中的最长递增路径 1.斐波那契数 该题规律很明显,就直接放记忆化搜索的版本了 class Solution { public:int dfs(int n){if(n0||n1)//递归出口{return n;}if(f[n-1]-1)//检查是否已经记忆过…

计算机网络中拥塞控制的门限值怎么设置

拥塞避免的门限值设置主要涉及到加权随机早期检测(‌WRED)‌技术,‌这是一种拥塞避免机制,‌通过为每个队列设定一对低门限和高门限值来实现。‌具体来说,‌当队列长度小于低门限时,‌不丢弃报文&#xff0…

64 lambda 表达式

lambda 表达式常用来声明匿名函数,即没有函数名字的临时使用的小函数,常用在临时需要一个类似于函数的功能但又不想定义函数的场合。 lambda 表达式只可以包含一个表达式,不允许包含其他复杂的语句,但在表达式中可以调用其他函数…

Flink实战(10)-checkpoint容错保证

0 前言 程序在 Flink 集群运行,某个算子因为某些原因出现故障,如何处理 在故障恢复后,如何保证数据状态,和故障发生之前的数据状态一致? 1 什么是 checkpoint(检查点)? Checkpoint 能生成快照(Snapshot)。 若 Flink 程序崩…