概览
传统的文本提取方法常常无法理解非结构化内容,因此提取数据的数据往往是错误的。本文将探讨使用 Indexify,一个用于实时多模态数据提取的开源框架,来更好地分析pdf等非结构化文件。我将介绍如何设置 Indexify,包括服务器设置、提取graph创建、文档导入和数据查询,以及如何创建自定义提取器。通过 Indexify 可以增强文档分析,从而获得更准确的见解、做出更好的决策并简化管理流程。
Indexify 介绍
Indexify 是一个开源的导入和提取引擎,旨在为实时 LLM 应用提供支持。它能够以极低的延迟从非结构化来源进行实时数据提取。它还支持可应用于各种用例的多模态提取工作流程,包括从文档中提取实体和嵌入、音频转录、摘要以及从图像甚至视频中检测对象。
它还支持高效地索引、存储和检索数据,使其成为可扩展实时 RAG 系统的工具。
任何工作流程都可以通过 4 个基本步骤轻松实现:
- 启动 Indexify 服务器和提取器。
- 创建提取graph。
- 导入需要处理的数据(视频、图像、音频、PDF 等)。
- 检索提取的数据。
提取器
提取器模块是 Indexify 的核心功能。提取器可以从任何模态的非结构化数据中提取和返回结构化信息。例如,从 PDF 中以 JSON 格式获取特定信息,将数据转换为嵌入,以及识别视频中的面部或物体。
提取器通常接收非结构化数据作为输入,并生成内容对象列表和特征作为输出。来自非结构化数据的原始字节存储在 Blob 存储中,而提取的特征存储在向量数据库和结构化存储中以供检索。任何用于处理非结构化数据的模型或算法都可以通过扩展提取器 SDK 中提供的抽象类来实现为提取器。
协调器
Indexify中有一个高性能任务调度器。当数据开始被导入时,它们将任务分配给提取器,从而帮助实现卓越的速度和性能。
开始教程
对于本教程,需要有 Python 3.11 或更高版本以获得最佳性能。其他安装说明将在后面给出。
我将使用 Groq 作为 LLM 引擎。要开始,请访问此页面并创建一个 API 密钥。
然后安装sdk
pip install groq
对于数据,我已经准备了一些经典论文作为 PDF 文件。当然你也可以使用自己的文档集。
安装和配置 Indexify
在 Linux 系统上设置 Indexify 很容易。只需要 3 个运行的终端窗口。
- 终端 1:用于下载和运行 Indexify 服务。
- 终端 2:用于运行 Indexify 提取器,这些提取器处理结构化提取、分块和嵌入。
- 终端 3:用于运行 Python 脚本以从 Indexify 服务加载和查询数据。
这里可以使用以下命令启动和运行 Indexify 服务。
终端 1
curl https://getindexify.ai | sh
./indexify server -d
服务将在 http://localhost:8900 上运行。接下来,创建一个 Python 环境并安装所有必要的库和提取器。
终端 2
pip3 install indexify-extractor-sdk indexify wikipedia
indexify-extractor download tensorlake/paddleocr_extractor
indexify-extractor download tensorlake/minilm-l6
indexify-extractor download tensorlake/chunk-extractor
下载完成后,使用以下命令运行提取器服务:
终端 2
indexify-extractor join-server
经过以上两个步骤环境就准备好了。在本教程中,我将保持这两个终端运行。
准备文档集
第一步是整理文档集。在本工作流程中,我将使用 PDF 文档。对于多个文档,你可以像这样构建目录:将所有文档作为 PDF 添加到 data 目录。对于其他文件类型,你必须使用其他提取器或定义自定义提取器,我将在本文中稍后讨论。
└── data
├── doc1
├── doc2
├── doc3
├── doc4
├── venv
├── indexify file
├── ingest_document.py
├── query_doc.py
└── setup_extraction_graph.py
使用 Indexify 导入和处理文档
设置提取graph
Indexify 可以使用其核心功能(提取器)处理和存储来自任何模态的数据。这些提取器旨在从大量非结构化数据中提取内容。通过将不同的提取器链接在一起,我们可以创建一个pipeline,简化整个数据提取和存储过程。此过程通过创建提取graph来管理。
在本教程中,我将构建以下提取graph:
- paddle_ocr:用于识别和提取论文中的文本。
- chunk_extract:用于将数据划分为块以进行 RAG。
- minilm-l6:用于将数据转换为嵌入。
配置 Indexify 客户端并以 YAML 格式定义流程,如下所示。
# setup_extraction_graph.py
from indexify import ExtractionGraph, IndexifyClient
client = IndexifyClient()
extraction_graph_spec = """
name: 'propertyQA'
extraction_policies:
- extractor: 'tensorlake/paddleocr_extractor'
name: 'textextract'
- extractor: 'tensorlake/chunk-extractor'
name: 'chunker'
input_params:
chunk_size: 1000
overlap: 100
content_source: 'textextract'
- extractor: 'tensorlake/minilm-l6'
name: 'pdfembedding'
content_source: 'chunker'
"""
extraction_graph = ExtractionGraph.from_yaml(extraction_graph_spec)
client.create_extraction_graph(extraction_graph)
终端 3
运行命令以创建提取graph。
python3 ./setup_extraction_graph.py
自定义提取器
为了捕获复杂数据,我可以创建自定义提取器并将其添加到我们的提取graph中。你可以使用以下模板定义它们。
from pydantic import BaseModel
from typing import List
from indexify_extractor_sdk import Extractor, Content, Feature
from indexify_extractor_sdk.base_extractor import Content
import json
class InputParams(BaseModel):
a: int = 0
b: str = ""
class MyExtractor(Extractor):
name = "your-docker-hub-username/MyExtractor"
description = "Description of the extractor goes here."
# Any python dependencies included in the extractor must be listed here.
python_dependencies = ["torch", "transformers"]
# Any system dependencies that the python code here depends on needs to be listed here. We use Ubuntu base images, so any ubuntu package can be installed here.
system_dependencies = []
input_mime_types = ["text/plain"]
def __init__(self):
super().__init__()
def extract(self, content: Content, params: InputParams) -> List[Content]:
return [
Content.from_text(
text="Hello World", feature=Feature.embedding(values=[1, 2, 3])
),
Content.from_text(
text="Pipe Baz", feature=Feature.embedding(values=[1, 2, 3])
),
Content.from_text(
text="Hello World",
feature=Feature.metadata(value=json.dumps({"key": "value"})),
),
]
def sample_input(self) -> Content:
Content.from_text(text="Hello World")
if __name__ == "__main__":
MyExtractor().extract_sample_input()`
InputParams 类使用 Pydantic 来定义可用于配置提取器行为的参数。
MyExtractor 是实现提取器的主类。它指定了它可以处理的名称、描述、依赖项和输入类型。导入到 Indexify 的任何与这些 MIME 类型不匹配的内容都不会发送到此提取器。
extract 方法是核心功能。它处理输入内容并返回具有特征(如元数据)的转换后的内容对象列表。sample_input 方法提供了一个示例输入以供测试。
例如,以下是一个自定义提取器,它以 JSON 格式返回论文概要信息。
# custom_extractor.py
from pydantic import BaseModel
from typing import List
from indexify_extractor_sdk import Extractor, Content, Feature
from indexify_extractor_sdk.base_extractor import Content
import json
import re
class InputParams(BaseModel):
author_regex: str = r"^([\w\s]+)"
team_regex: str = r"(Google Brain)"
email_regex: str = r"([\w.-]+@[\w.-]+\.[\w]+)"
class PropertyExtractor(Extractor):
name = "your-docker-hub-username/PropertyExtractor"
description = "Extract author,team and email from this article."
python_dependencies = ["re"]
system_dependencies = []
input_mime_types = ["text/plain"]
def __init__(self):
super().__init__()
def extract(self, content: Content, params: InputParams) -> List[Content]:
text = content.text()
author_match = re.search(params.author_regex, text)
team_match= re.search(params.team_regex, text)
email_match = re.search(params.email_regex, text)
property_info = {
"author": author_match.group(1) if author_match else "",
"team": team_match.group(1) if team_match else "",
"team": email_match.group(1) if email_match else "",
}
return [
Content.from_text(
text=json.dumps(property_info),
feature=Feature.metadata(value=json.dumps(property_info))
)
]
def sample_input(self) -> Content:
return Content.from_text(text="Ashish Vaswani Google Brain avaswani@google.com")
if __name__ == "__main__":
PropertyExtractor().extract_sample_input()
你可以将提取器打包到容器中以用于生产,或者使用以下命令在本地安装提取器,并在提取graph中使用它。
indexify-extractor install-local custom_extractor:PropertyExtractor
上传文档
设置提取graph后,data 目录中的每个文档都应该通过pipeline并作为嵌入存储在向量数据库中。Indexify 具有一个内置的向量数据库,你可以使用它来存储、查询和检索数据。
# ingest_document.py
import os
import requests
from indexify import IndexifyClient
# Initialize IndexifyClient
client = IndexifyClient()
folder_path = "data"
for filename in os.listdir(folder_path):
if filename.endswith(".pdf"):
# Construct the full file path
file_path = os.path.join(folder_path, filename)
# Upload the PDF to Indexify
client.upload_file("propertyQA", file_path)
print(f"Uploaded: {filename}")
可以使用 Indexify UI 在 http://localhost:8900/ui 上查看向量数据库和索引。
你也可以使用客户端访问索引:
from indexify import IndexifyClient
client = IndexifyClient()
content_ids = [content.id for content in client.list_content("propertyQA")]
extract = client.get_extracted_content(content_ids[1], "propertyQA", "textextract")
embedding = client.get_extracted_content(content_ids[1], "propertyQA", "pdfembedding")
在 Indexify 中,导入的数据也存储在 SQL 表中,允许你使用内置 API 和 SQL 查询来查询数据。
result = client.sql_query("select * from propertyQA;")
输出:
`SqlQueryResult(result=[{'content_id': 'd6e584685d74a21d', 'type': 'text'}, {'content_id': 'e32fd65fc2bbebf3', 'type': 'text'}])`
使用 Indexify 提出复杂问题
语义搜索和查询公式
Indexify 的高级提取引擎使用户能够制定和执行超出基本数据提取的复杂查询。Indexify 可以解释文档的上下文和语义,从而允许更复杂的查询。
Indexify 从创建的索引中检索相关上下文并返回一个 Content 对象。来自此对象的信息可用于构建复杂提示,这些提示可以传递给 LLM 以生成适当的响应。让我们看看它是如何工作的。
results = client.search_index(name=index, query=question, top_k=3)
context = ""
for result in results:
context = context + f"content id: {result['content_id']} \n \npassage: {result['text']}\n"
在这里,Indexify 通过索引执行简单的语义搜索,并返回最相关的段落以及元数据(如内容 ID),使你能够理解结果的上下文和重要性。来自 text 参数的数据可用于构建上下文。
检索和分析结果
语义搜索和复杂查询使你能够更深入地挖掘文档集,提取更加深入的答案。
复杂问题的示例:
- “与传统的基于 RNN 的模型相比,Transformer 模型如何利用自注意力机制来提高序列到序列任务的效率和有效性?”
LLM 可以帮助描述性且准确地回答此类查询。从上一步检索到的上下文可用于构建复杂提示。
"You are an expert in paper interpretation,You must help me interpret the core content of the paper. Answer the question, based on the context. Answer \"Information not found\" if there is no context. Do not hallucinate. \nquestion: {question} \ncontext: {context}"
构建的提示然后用于查询 LLM。在本教程中,我使用了来自 Groq 的 gemma-7b 模型。以下是工作流程的完整代码。
`#query_doc.py
from indexify import IndexifyClient
from groq import Groq
client = IndexifyClient()
groq_client = Groq(
api_key="API_KEY",
)
def get_context(question: str, index: str, top_k=3):
results = client.search_index(name=index, query=question, top_k=3)
context = ""
for result in results:
context = context + f"content id: {result['content_id']} \n \npassage: {result['text']}\n"
return context
def create_prompt(question, context):
return f"You are an expert in paper interpretation,You must help me interpret the core content of the paper. Answer the question, based on the context. Answer \"Information not found\" if there is no context. Do not hallucinate. \nquestion: {question} \ncontext: {context}"
def generate_response(prompt):
chat_completion = groq_client.chat.completions.create(
messages=[
{
"role": "user",
"content": prompt,
}
],
model="gemma-7b-it",
)
return chat_completion.choices[0].message.content
question = "How does the Transformer model use self-attention mechanisms to improve the efficiency and effectiveness of sequence-to-sequence tasks compared to traditional RNN-based models?"
context = get_context(question, "propertyQA.pdfembedding.embedding")
prompt = create_prompt(question, context)
response = generate_response(prompt)
print(response)`
运行此文件以获取响应。
终端 3
python3 ./query_doc.py
The Transformer model replaces recurrent layers with self-attention mechanisms, allowing for parallelization and capturing dependencies between words regardless of their distance in the sequence. This improves efficiency by reducing the training time and enhances effectiveness by better handling long-range dependencies.
大规模存储和查询数据
使用 LangChain 进行扩展
优化你的系统以实现高效的索引和检索,以管理大量的论文数据和论文分析数据。这涉及配置向量数据库以有效地管理大量嵌入。在生产环境中,Indexify 可以水平扩展到多个服务器实例和协调器,以并行简化实时数据提取和存储。你还可以微调提取graph中的参数(如 chunk_size 和 overlap),以在粒度和性能之间取得平衡。
extractor: 'tensorlake/chunk-extractor'
name: 'chunker'
input_params:
chunk_size: 1000
overlap: 100
content_source: 'textextract'
Indexify 与流行的 LangChain 框架很好地集成,LangChain 框架是一个高度可扩展的框架,用于构建 AI 应用。
终端 3
pip install indexify-langchain langchain-groq
我正在使用来自 LangChain 的基于 Indexify 的检索器。我将传递我准备好的索引和 top_k 参数。
from indexify import IndexifyClient
client = IndexifyClient()
from indexify_langchain import IndexifyRetriever
params = {"name": "propertyQA.pdfembedding.embedding", "top_k": 2}
retriever = IndexifyRetriever(client=client, params=params)
以下是完整代码:
# langchain_query_doc.py
import requests
import dotenv
# Setup retriever
from indexify import IndexifyClient
from indexify_langchain import IndexifyRetriever
client = IndexifyClient()
params = {"name": "propertyQA.pdfembedding.embedding", "top_k": 2}
retriever = IndexifyRetriever(client=client, params=params)
from langchain_groq import ChatGroq
llm = ChatGroq(
model="gemma-7b-it",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
api_key="API_KEY",
)
# Setup Chat Prompt Template
from langchain.prompts import ChatPromptTemplate
template = """
You are an expert in paper interpretation,You must help me interpret the core content of the paper. Answer the question, based on the context. Answer \"Information not found\" if there is no context. Do not hallucinate. \nquestion: {question} \ncontext: {context}
"""
prompt = ChatPromptTemplate.from_template(template)
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# Query
query = "How does the Transformer model use self-attention mechanisms to improve the efficiency and effectiveness of sequence-to-sequence tasks compared to traditional RNN-based models?"
print(rag_chain.invoke(query))
这使用 IndexifyRetriever 根据查询检索最相关的段落。然后,它使用 ChatPromptTemplate 创建一个针对上下文的提示。该提示传递给 LLM,模型在其中处理信息并生成响应。
将查询结果存储在数据库中
执行查询并检索结果后,将这些数据存储在结构化数据库中可以确保你可以参考它并执行进一步的分析。
首先,要将查询结果存储在数据库中,请设置一个关系数据库并定义一个模式,该模式可以容纳 Indexify 返回的复杂数据结构。以下是如何将查询结果存储在 PostgreSQL 数据库中的示例:
import psycopg2
# Connect to your PostgreSQL database
conn = psycopg2.connect(
dbname="indexify_results",
user="your_username",
password="your_password",
host="localhost",
port="5432"
)
cursor = conn.cursor()
# Create a table to store query results
cursor.execute("""
CREATE TABLE IF NOT EXISTS query_results (
id SERIAL PRIMARY KEY,
query TEXT,
content_id VARCHAR(255),
passage TEXT
);
""")
conn.commit()
# Example function to store results
def store_results(question: str, results: list):
for result in results:
cursor.execute("""
INSERT INTO query_results (query, content_id, passage)
VALUES (%s, %s, %s);
""", (question, result['content_id'], result['text']))
conn.commit()
# Retrieve and store query results
question = "How does the Transformer model use self-attention mechanisms to improve the efficiency and effectiveness of sequence-to-sequence tasks compared to traditional RNN-based models?"
results = client.search_index(name="propertyQA.pdfembedding.embedding", query=question, top_k=5)
store_results(question, results)
# Close the database connection
cursor.close()
conn.close()
在定义数据库模式时,请考虑数据的关联关系和复杂性。该模式应该容纳内容 ID 和段落文本以及元数据,例如时间戳、文档来源以及任何相关的标签或类别。
以下是一个你可以考虑的示例模式。
CREATE TABLE query_results (
id SERIAL PRIMARY KEY,
query TEXT,
content_id VARCHAR(255),
passage TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
document_source VARCHAR(255),
tags TEXT[]
);
总结
我们已经介绍了 Indexify 如何通过解读论文的示例,为需要更深入的实时分析的应用提供高效的数据提取和检索。我们详细介绍了 Indexify 的工作原理,包括如何构建用于数据检索的知识库。此外,我们还考察了如何扩展系统以处理更大的数据集和更高的吞吐量。最后,我们讨论了将结果存储在结构化数据库中以供将来参考和深入分析的方法。