当索引模式为经济时,使用离线的向量引擎、关键词索引等方式,降低了准确度但无需花费 Token。
一.提取函数**_extract
**
根据不同文档类型进行内容的提取:
def _extract(self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: dict) \
-> list[Document]: # 提取
# load file
if dataset_document.data_source_type not in ["upload_file", "notion_import"]: # 数据源类型
return []
data_source_info = dataset_document.data_source_info_dict # 数据源信息
text_docs = [] # 文本文档
if dataset_document.data_source_type == 'upload_file':
if not data_source_info or 'upload_file_id' not in data_source_info:
raise ValueError("no upload file found")
file_detail = db.session.query(UploadFile). \
filter(UploadFile.id == data_source_info['upload_file_id']). \
one_or_none()
if file_detail:
extract_setting = ExtractSetting(
datasource_type="upload_file",
upload_file=file_detail,
document_model=dataset_document.doc_form
)
text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
elif dataset_document.data_source_type == 'notion_import':
if (not data_source_info or 'notion_workspace_id' not in data_source_info
or 'notion_page_id' not in data_source_info):
raise ValueError("no notion import info found")
extract_setting = ExtractSetting(
datasource_type="notion_import",
notion_info={
"notion_workspace_id": data_source_info['notion_workspace_id'],
"notion_obj_id": data_source_info['notion_page_id'],
"notion_page_type": data_source_info['type'],
"document": dataset_document,
"tenant_id": dataset_document.tenant_id
},
document_model=dataset_document.doc_form
)
text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
# update document status to splitting
self._update_document_index_status(
document_id=dataset_document.id,
after_indexing_status="splitting",
extra_update_params={
DatasetDocument.word_count: sum([len(text_doc.page_content) for text_doc in text_docs]),
DatasetDocument.parsing_completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
}
) # 更新文档状态为拆分
# replace doc id to document model id
text_docs = cast(list[Document], text_docs)
for text_doc in text_docs:
text_doc.metadata['document_id'] = dataset_document.id
text_doc.metadata['dataset_id'] = dataset_document.dataset_id
return text_docs
直接调用的是core.indexing_runner.IndexingRunner._extract()
方法:
得到上传文件的详细信息:
接下来调用提取内容函数extract()
方法:
根据不同的IndexType
类型返回不同的索引处理器:
因为这里index_processor
类型为core.rag.index_processor.processor.paragraph_index_processor.ParagraphIndexProcessor
,实际调用的是ParagraphIndexProcessor
类中的extract
函数:
ExtractProcessor.extract()
类方法根据文件类型进行解析:
因为文件类型为txt
,所以执行TextExtractor()
:
TextExtractor()
实际执行的位置为dify\api\core\rag\extractor\text_extractor.py
中的extract()
方法:
最终得到text_docs
内容:
二.转换函数_transform
def _transform(self, index_processor: BaseIndexProcessor, dataset: Dataset,
text_docs: list[Document], doc_language: str, process_rule: dict) -> list[Document]: # 转换
# get embedding model instance
embedding_model_instance = None
if dataset.indexing_technique == 'high_quality':
if dataset.embedding_model_provider:
embedding_model_instance = self.model_manager.get_model_instance(
tenant_id=dataset.tenant_id,
provider=dataset.embedding_model_provider,
model_type=ModelType.TEXT_EMBEDDING,
model=dataset.embedding_model
) # 获取嵌入模型实例
else:
embedding_model_instance = self.model_manager.get_default_model_instance(
tenant_id=dataset.tenant_id,
model_type=ModelType.TEXT_EMBEDDING,
) # 获取默认嵌入模型实例
documents = index_processor.transform(text_docs, embedding_model_instance=embedding_model_instance,
process_rule=process_rule, tenant_id=dataset.tenant_id,
doc_language=doc_language) # 转换文档
return documents
实际调用的是core.indexing_runner.IndexingRunner._transform
:
因为设置dataset.indexing_technique
为'economy'
:
因为这里index_processor
类型为core.rag.index_processor.processor.paragraph_index_processor.ParagraphIndexProcessor
,实际调用的是ParagraphIndexProcessor
类中的transform
函数:
这里splitter
类型为core.splitter.fixed_text_splitter.FixedRecursiveCharacterTextSplitter
:
core.splitter.fixed_text_splitter.FixedRecursiveCharacterTextSplitter
类:
调用文档清理CleanProcessor.clean()
类方法:
实际调用的是dify\api\core\rag\cleaner\clean_processor.py
中的CleanProcessor.clean()
类方法:
clean
方法的处理过程如下:
(1)默认清理:首先,方法会执行一些默认的清理操作,包括删除无效的符号。这些操作主要是通过正则表达式来实现的,例如,替换<\|
为<
,替换\|>
为>
,以及删除一些特定的ASCII和Unicode字符。
(2)规则应用:接下来方法会根据传入的process_rule
字典中定义的规则来进一步清理文本。process_rule
字典包含了一系列的清理规则,这些规则在rules
键下的pre_processing_rules
列表中定义。
(3)删除额外的空格:如果启用了remove_extra_spaces
规则,方法会删除文本中的额外空格。这包括将三个或更多连续的换行符替换为两个换行符,以及将两个或更多连续的空格(包括特定的Unicode空格字符)替换为单个空格。
(4)删除URL和电子邮件地址:如果启用了remove_urls_emails
规则,方法会从文本中删除URL和电子邮件地址。这是通过匹配特定的正则表达式模式来实现的,分别用于识别和删除电子邮件地址和URL。
通过splitter.split_documents([document])
分割文档为文档节点:
实际调用的是dify\api\core\splitter\fixed_text_splitter.py
中的FixedRecursiveCharacterTextSplitter
类的split_text()
方法。split_text
方法的目的是将传入的文本分割成多个块,并返回这些块组成的列表。这个方法的处理过程可以分为以下几个步骤:
(1)检查固定分隔符:首先,方法检查是否存在一个固定的分隔符(_fixed_separator
)。如果存在,它将使用这个分隔符来直接分割文本。这意味着文本将在每个出现固定分隔符的地方被分割。
(2)分割文本:使用固定分隔符分割文本后,会得到一个初步的块列表。然后,对这些初步的块进行进一步的处理,以确保每个块的长度不超过设定的大小(_chunk_size
)。
(3)递归分割:对于每个初步的块,如果其长度超过了设定的大小,将使用recursive_split_text
方法递归地进行进一步分割。这个递归过程会继续,直到所有的块都不超过设定的大小。
(4)返回最终块列表:最后,将所有处理后符合长度要求的块组成一个列表返回。
通过固定分隔符'\n'
分割:
document_nodes = splitter.split_documents([document])
实际返回文档节点数量为12:
在dify\api\core\rag\index_processor\processor\paragraph_index_processor.py
中,transform()
方法主要是对文档进行预处理和分割,以便后续的索引或其它处理步骤可以更有效地处理文档的各个部分。主要执行步骤如下:
(1)获取分割器:根据传入的处理规则(process_rule)和嵌入模型实例(embedding_model_instance
),获取文档分割器(splitter
)。
(2)遍历文档:对于每个传入的文档(documents
列表中的每个Document
对象),执行以下子步骤:
-
文档清理:使用
CleanProcessor.clean
方法清理文档的内容(document.page_content
),移除不需要的字符或格式。 -
文档分割:使用步骤1中获取的分割器(
splitter
)将清理后的文档内容分割成多个节点(document_nodes
),每个节点代表文档的一部分。 -
节点处理:对于每个分割后的节点,如果节点内容非空,则生成一个唯一的文档ID(
doc_id
)和文本哈希值(hash
),并将这些信息添加到节点的元数据中。如果节点内容以特定字符(如.
或。
)开头,则移除这些字符。 -
收集文档节点:将处理后的节点添加到一个列表中(
split_documents
),以便进一步处理。
(3)返回结果:将所有处理后的文档节点合并到一个列表中(all_documents
),并返回该列表作为transform
方法的结果。
三.保存函数_load_segments
接下调用self._load_segments(dataset, dataset_document, documents)
方法:
实际调用的是_load_segments(self, dataset, dataset_document, documents)
方法:
_load_segments
方法的主要目的是将处理后的文档段(documents
)保存到数据库中,并更新相关文档的状态。这个过程可以分为以下几个步骤:
(1)初始化数据集文档存储:创建一个 DatasetDocumentStore
实例,这个实例与特定的数据集、创建者和文档ID相关联。
(2)添加文档段:使用 DatasetDocumentStore.add_documents(documents)
方法将处理后的文档段添加到数据库中。这里的 documents
是一个包含多个文档段的列表,每个文档段都是一个 Document
实例。具体doc_store.add_documents()
和self.get_document_segment()
方法就不逐行调试,感兴趣可自行调试。
(3)更新文档状态为索引中:在所有文档段都保存到数据库之后,更新原始文档的状态为“索引中”(indexing)。这是通过调用 _update_document_index_status
方法实现的,该方法还会更新文档的清理完成时间和拆分完成时间。self._update_document_index_status()
方法就不逐行调试,感兴趣可自行调试。
(4)更新段状态为索引中:最后,更新所有相关文档段的状态为"索引中"。这是通过调用 _update_segments_by_document
方法实现的,该方法会更新所有相关文档段的状态和索引时间。self._update_segments_by_document()
方法就不逐行调试,感兴趣可自行调试。
这个过程确保了文档段的正确保存和状态更新,为后续的索引和检索操作做好准备。
四.加载函数_load
接下来调用self._load()
方法:
实际调用的是def _load(self, index_processor: BaseIndexProcessor, dataset: Dataset, dataset_document: DatasetDocument, documents: list[Document]) -> None:
。_load
方法的主要目的是将处理后的文档数据加载到索引中,并更新相关文档和文档段的状态为完成,以支持后续的搜索和检索操作。这个过程可以分为以下几个步骤:
(1)检查索引技术:根据数据集的索引技术(例如,economy
),如果是high_quality
,那么需要获取嵌入模型实例来进行高质量的索引处理。
(2)分块处理:如果使用嵌入模型实例,将文档数据分块处理。这通常涉及到并发执行,以提高处理效率。
(3)创建关键字索引:在一个独立的线程中,对文档数据进行关键字索引的创建,以便于后续的搜索和检索。
(4)更新文档状态:在所有文档数据被成功加载到索引后,更新相关文档的状态为completed
,表示索引过程已完成。
(5)异常处理:在处理过程中,如果遇到任何异常(如文档暂停、提供者令牌未初始化错误等),将会更新文档的索引状态为error
,并记录错误信息。
重点解释的是创建关键字线程这部分:
# create keyword index # 创建关键字索引
create_keyword_thread = threading.Thread(target=self._process_keyword_index,
args=(current_app._get_current_object(),
dataset.id, dataset_document.id, documents))
create_keyword_thread.start() # 启动线程
create_keyword_thread.join() # 等待线程结束
create_keyword_thread
是一个 Thread
对象,之前通过 threading.Thread
创建并启动。它代表了一个独立的线程,用于执行某些任务,这里假设是处理关键字索引的任务。
.join()
: 这是 Thread
类的一个方法。调用这个方法会使得调用它的线程(如主线程)等待,直到 create_keyword_thread
线程完成执行。如果 create_keyword_thread
已经完成,join()
会立即返回。
create_keyword_thread.join()
的作用是阻塞调用它的线程(通常是主线程),直到 create_keyword_thread
线程执行完成。这样做的目的是确保 create_keyword_thread
线程中的任务完全执行完毕后,主线程才继续执行后面的代码。
这种做法在需要确保某个线程中的任务完全完成后才进行下一步操作时非常有用,比如在处理完所有数据后才关闭数据库连接,或者在继续执行依赖于线程任务结果的代码之前确保线程任务已完成。
这里面执行的任务是target=self._process_keyword_index
。这个方法主要用于在后台线程中处理关键字索引的创建和相关文档段状态的更新,确保这些操作在正确的应用上下文中执行:
def _process_keyword_index(self, flask_app, dataset_id, document_id, documents): # 处理关键字索引
with flask_app.app_context():
dataset = Dataset.query.filter_by(id=dataset_id).first()
if not dataset:
raise ValueError("no dataset found")
keyword = Keyword(dataset)
keyword.create(documents)
if dataset.indexing_technique != 'high_quality':
document_ids = [document.metadata['doc_id'] for document in documents]
db.session.query(DocumentSegment).filter(
DocumentSegment.document_id == document_id,
DocumentSegment.index_node_id.in_(document_ids),
DocumentSegment.status == "indexing"
).update({
DocumentSegment.status: "completed",
DocumentSegment.enabled: True,
DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
})
db.session.commit()
_process_keyword_index
方法的目的是在 Flask 应用的上下文中处理关键字索引。这个方法接收四个参数:flask_app
(Flask 应用实例),dataset_id
(数据集的 ID),document_id
(文档的 ID),以及documents
(文档对象列表)。方法的执行流程如下:
(1)使用 Flask 应用的上下文:这是必要的步骤,因为在 Flask
应用之外的线程中执行数据库操作或者访问 Flask 应用的配置时,需要手动创建应用上下文。
(2)查询数据集:通过 dataset_id
从数据库中查询对应的数据集对象。如果没有找到对应的数据集,抛出一个值错误异常。
(3)创建关键字索引:使用查询到的数据集对象初始化 Keyword
类的实例,并调用其 create
方法,传入文档对象列表来创建关键字索引。
重点分析keyword.create(documents)
的过程,由于这个过程比较长,用另外一篇文档进行详细分析,具体参考文献 [1]。
(4)更新文档段状态:如果数据集的索引技术不是 'high_quality'
,则获取所有文档对象中的 doc_id
,并更新数据库中对应 document_id
和 index_node_id
的文档段对象的状态为 "completed"
,同时设置其为启用状态,并记录完成时间。最后,提交数据库会话以保存更改。
参考文献
[1] Dify中Jieba类的create()方法执行过程:https://z0yrmerhgi8.feishu.cn/wiki/RKIewMrY2iaC1wks20FcMqhcnae