检索增强生成 (RAG) 是企业级生成式 AI(GenAI)应用的热门案例之一。多数 RAG 教程演示了如何利用 OpenAI API 结合 Embedding 模型和大语言模型(LLM)来进行推理(Inference)。然而,在开发过程中,如果能使用开源工具,就可以免去访问自己数据的费用,同时也能加快迭代。
在 Embedding 步骤(即将数据转换为向量的过程)中,使用 Ray Data 取得的性能提升尤为显著。相比于使用 Pandas,采用 Ray Data 等工具对批量推理请求进行汇总处理可以显著节省资源和处理时间。例如,在一台配备 16GB RAM 的 Mac M2 笔记本电脑上,仅使用四个 worker node,Ray Data 的处理速度就比 Pandas 快了 60 倍!本文将详细介绍使用 Milvus + Ray Data 进行 Embedding inference 的性能。
我们的开源 RAG 技术栈包括:
-
BGM-M3 Embedding 模型:该模型能一次性生成三种类型的向量,包括稀疏向量、稠密向量和多向量。
-
Ray Data:高效的分布式 Embedding inference 推理工具。
-
AWS S3:用于临时存储推理结果。
-
Milvus 或 Zilliz Cloud:用作向量数据库。
示例数据来源:Kaggle 的 IMDB 海报数据集。
开源 RAG 技术栈
BGM-M3 Embedding 模型
BGM-M3 Embedding 模型是一种强大的多功能 Embedding 工具,其特点在于能够处理多语言(Multi-Linguality)、多功能(Multi-Functionality)和多粒度(Multi-Granularity)的数据,因此得名“M3”。该模型支持超过 100 种语言,并能计算三种常见的 Embedding 类型:稀疏向量、稠密向量和多向量。它还可以处理各种长度的文本——从短句到长文档,最多可支持 8192 个 token。更多详情,请参考论文或通过 HugginFace 网站了解此模型。
Milvus 2.4 版本现已集成 BGM-M3 Embedding 模型。
Ray Data
有长期运行的数据转换任务?
Ray Data 具备可扩展的数据处理能力,能够便捷和快速地在多台机器(CPU、GPU 等)上并行处理大量数据。Ray Data 尤其适用于数据可以被分割,能够并行处理的场景,例如同时进行的多个数据块切分和 Embedding 转换的任务。Ray Data 底层是一个强大的流式执行引擎,能够最大化集群中的 GPU 利用率。与使用在线服务(如 OpenAI 嵌入 API)运行嵌入相比,使用 Ray Data 进行离线嵌入作业可以节省大部分成本。
Anyscale 是 Ray 的托管平台。您可以轻松地在 Anyscale 上利用 GPU 机器扩展 Embedding 任务。
Milvus 和 Zilliz Cloud
RAG(Retrieval Augmented Generation)应用之所以能够迅速响应,关键在于其背后强大的向量数据库。Milvus 专为处理大规模业务和海量数据设计。与其他向量数据库不同,Milvus 能够根据数据量的增长灵活地进行扩展。Milvus 的存储、索引和查询是独立的,可以单独垂直扩展或水平扩展,这一设计让 RAG 应用能够迅速响应用户的实时查询需求,同时在收到查询前和收到查询时,Milvus 能够智能地进行离线计算。除了强大的性能外,Milvus 还提供了多种企业级重要特性,如多租户和基于角色的访问控制(RBAC)、高可用等。
Zilliz Cloud 基于开源的 Milvus 搭建的全托管向量数据库云服务。
设置 RAG 工具
本教程中将使用 Milvus 的 Python SDK、Ray Data、Amazon S3 和 Zilliz Cloud。
开始前,请先注册 AWS 账号以使用 Amazon S3。然后前往 console.aws.amazon.com > IAM > My security credentials > Create access key。复制并保存密钥(Access key 和 secret key)。
安装所需的工具库并运行 aws config。请在文件中输入 AWS 密码。
pip install boto3
pip install awscli –force-reinstall –upgrade
aws config #fill in your key and secret key
more ~/.aws/credentials #make sure this looks correct
安装 Ray Data。
pip install -U "ray[data]"
安装 Pymilvus。
pip install -U pymilvus "pymilvus[model]" langchain
PyMilvus 2.4 版本及以上已打包 BGE-M3 embedding 模型。
import ray, os, pprint, time, boto3
from langchain.text_splitter import RecursiveCharacterTextSplitter
import numpy as np
import pymilvus
print(pymilvus.__version__) # must be >= 2.4.0
from pymilvus.model.hybrid import BGEM3EmbeddingFunction
如需使用 Zilliz,请先注册账号并创建集群。
准备数据
本文使用了 Kaggle IMDB 海报数据作为数据集,其中包含大约 48,000 部电影、影评、海报链接以及一些元数据。
我们将所有文本字段(电影名称、描述、影评文本)复制到名为‘text’的一列中,并将其保存为 Parquet 格式,因为这种文件格式比 CSV 更高效。
生成 Embedding 向量
根据以下步骤生成 Embedding 向量:
-
切分数据: 将输入文本切分为片段,将语义上相关的文本片段保存在一起。
-
调用推理(inference)模式下的 Embedding 模型,用于生成文本片段的 Embedding 向量。
Ray Data 可以并行处理以下数据操作请求:
-
flat_map():切分数据(输出的行数将多于输入行数)。
-
map_batches():调用 Embedding 模型。
chunk_size = 512
chunk_overlap = np.round(chunk_size * 0.10, 0)
# Define a LangChain text splitter.
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len) #len is a built-in Python function
# 1. Define a regular python function for chunking.
def chunk_row(row, splitter=text_splitter):
# Copy the row columns into metadata.
metadata = row.copy()
del metadata['text'] # Remove text from metadata
# Split the text into chunks.
chunks = splitter.create_documents(
texts=[row["text"]],
metadatas=[metadata])
chunk_list = [{
"text": chunk.page_content,
**chunk.metadata} for chunk in chunks]
return chunk_list
# 2. Define a class with a callable method to compute embeddings.
class ComputeEmbeddings:
def __init__(self):
# Initialize a Milvus built-in sparse-dense-late-interaction-reranking encoder.
# https://huggingface.co/BAAI/bge-m3
self.model = BGEM3EmbeddingFunction(use_fp16=False, device="cpu")
print(f"dense_dim: {self.model.dim['dense']}")
print(f"sparse_dim: {self.model.dim['sparse']}")
def __call__(self, batch):
# Ray data batch is a dictionary where values are array values.
# BGEM3EmbeddingFunction input is docs as a list of strings.
docs = list(batch['text'])
# Encode the documents. bge-m3 dense embeddings are already normalized.
embeddings = self.model(docs)
batch['vector_dense'] = embeddings['dense']
return batch
if __name__ == "__main__":
FILE_PATH = "s3://zilliz/kaggle_imdb.parquet"
# Load and transform data.
ds = ray.data.read_parquet(FILE_PATH)
# Chunk the input text
chunked_ds = ds.flat_map(chunk_row)
# Compute embeddings with a class that calls the embeddings model.
embeddings_ds = chunked_ds.map_batches(ComputeEmbeddings, concurrency=4)
# Save the embeddings to S3 in a folder of parquet part files.
embeddings_ds.write_parquet('s3://zilliz/kaggle_imdb_embeddings')
运行前,需要提交 Ray 任务:
-
将代码保存至 Python 脚本文件。本例中将文件命名为
ray_data_demo.py
。 -
在本地运行前,请先创建一个全新的路径,路径中只可包含
.py
脚本文件和.parquet
数据文件。本例中将这个路径命名为ray_cluster
。 -
运行 Python 脚本以启动 Ray 集群并自动提交任务。
-
打开 http://127.0.0.1:8265,查看集群和任务。
Embedding 速度提升 60 倍
表格展示了在 16GB M2 笔记本电脑上 Embedding 任务的用时。示例使用了 1 个单节点 Ray 集群用于批量处理任务,并发为 4。Pandas 用时更长,原因是 Pandas 只有一个处理器。但是 Ray Data 有 4 个处理器。 Pandas 和 Ray在处理大型数据时性能都更出色。
将数据从 S3 批量插入到 Milvus 或 Zilliz Cloud 中
Milvus 和 Zilliz Cloud都支持批量插入(bulk insert)数据,可以直接从 AWS、GCP 或 Azure 对象存储中导入 Embedding 数据。除了界面外,Zilliz Cloud 还提供 RESTful API 和 SDK。
对于大量 Embedding 数据,使用 bulk insert 可以显著节省机器资源并缩短插入时间,与普通 insert 相比更为高效。此外,通过 bulk insert 构建的向量搜索索引比普通 insert 的索引更高效。
在 Zilliz Cloud 界面上仅需轻击几次鼠标便可轻松批量插入数据。首先,在集群中创建 1 个全新的 collection。创建时,需要开启 AutoID 和动态列,添加向量列,并设置向量列的向量维度。设置完毕后,点击“创建 Collection”。
接下来,点击“导入数据”并按照界面上的提示输入 parquet 文件的路径。(请注意,如果您的 S3 存储桶为非公开链接,您还需要输入 Access Key 和 Secret Key,以便 Zilliz Cloud 读取其中的数据)。Zilliz Cloud 支持 Amazon S3、Google Cloud Storage 或 Azure Blob Storage 等对象存储服务。点击“导入”开始将所有数据导入向量数据库 collection。
导入任务完成后,您可以选择在 collection 上构建索引,加速后续的向量搜索。
查询数据
为测试新导入至 Collection 中的数据,让我们基于电影数据进行提问。
def mc_run_search(question, output_fields, top_k=2, filter_expression=""):
# Embed the question using the same encoder.
embeddings = model_bgem3([question])
query_embeddings = embeddings['dense']
# Run semantic vector search using your query and the vector database.
results = mc.search(
COLLECTION_NAME,
data=query_embeddings,
search_params=SEARCH_PARAMS,
output_fields=output_fields,
# Milvus can utilize metadata in boolean expressions to filter search.
filter=filter_expression,
limit=top_k,
consistency_level="Eventually"
)
# Assemble retrieved context and context metadata.
# The search result is in the variable `results[0]`, which is type
# 'pymilvus.orm.search.SearchResult'.
METADATA_FIELDS = [f for f in output_fields if f != 'chunk']
formatted_results, context, context_metadata = _utils.client_assemble_retrieved_context(
results, metadata_fields=METADATA_FIELDS, num_shot_answers=top_k)
return formatted_results, context, context_metadata
SAMPLE_QUESTION = "muybridge horse movie"
# Return top k unique results with HNSW index.
TOP_K = 2
# Define output fields to return.
OUTPUT_FIELDS = ["movie_id", "chunk", "PosterLink"]
formatted_results, context, context_metadata = \
mc_run_search(SAMPLE_QUESTION, OUTPUT_FIELDS, TOP_K)
以下为查询结果:
完整的 Ray Data 脚本可在此获取。
总结
文本介绍了如何使用 Ray Data 和 Milvus/Zilliz Cloud 的批量插入功能大幅加速向量生成和加载过程。使用 Milvus 能够高效构建索引、节省计算资源、提升向量搜索速度。