Dify中的经济索引模式实现过程

news2025/1/12 23:08:50

当索引模式为经济时,使用离线的向量引擎、关键词索引等方式,降低了准确度但无需花费 Token。

一.提取函数**_extract**

根据不同文档类型进行内容的提取:

def _extract(self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: dict) \
        -> list[Document]:  # 提取
    # load file
    if dataset_document.data_source_type not in ["upload_file", "notion_import"]:  # 数据源类型
        return []

    data_source_info = dataset_document.data_source_info_dict  # 数据源信息
    text_docs = []  # 文本文档
    if dataset_document.data_source_type == 'upload_file':
        if not data_source_info or 'upload_file_id' not in data_source_info:
            raise ValueError("no upload file found")

        file_detail = db.session.query(UploadFile). \
            filter(UploadFile.id == data_source_info['upload_file_id']). \
            one_or_none()

        if file_detail:
            extract_setting = ExtractSetting(
                datasource_type="upload_file",
                upload_file=file_detail,
                document_model=dataset_document.doc_form
            )
            text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
    elif dataset_document.data_source_type == 'notion_import':
        if (not data_source_info or 'notion_workspace_id' not in data_source_info
                or 'notion_page_id' not in data_source_info):
            raise ValueError("no notion import info found")
        extract_setting = ExtractSetting(
            datasource_type="notion_import",
            notion_info={
                "notion_workspace_id": data_source_info['notion_workspace_id'],
                "notion_obj_id": data_source_info['notion_page_id'],
                "notion_page_type": data_source_info['type'],
                "document": dataset_document,
                "tenant_id": dataset_document.tenant_id
            },
            document_model=dataset_document.doc_form
        )
        text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
    # update document status to splitting
    self._update_document_index_status(
        document_id=dataset_document.id,
        after_indexing_status="splitting",
        extra_update_params={
            DatasetDocument.word_count: sum([len(text_doc.page_content) for text_doc in text_docs]),
            DatasetDocument.parsing_completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
        }
    )  # 更新文档状态为拆分

    # replace doc id to document model id
    text_docs = cast(list[Document], text_docs)
    for text_doc in text_docs:
        text_doc.metadata['document_id'] = dataset_document.id
        text_doc.metadata['dataset_id'] = dataset_document.dataset_id

    return text_docs

直接调用的是core.indexing_runner.IndexingRunner._extract()方法:

得到上传文件的详细信息:

接下来调用提取内容函数extract()方法:

根据不同的IndexType类型返回不同的索引处理器:

因为这里index_processor类型为core.rag.index_processor.processor.paragraph_index_processor.ParagraphIndexProcessor,实际调用的是ParagraphIndexProcessor类中的extract函数:

ExtractProcessor.extract()类方法根据文件类型进行解析:

因为文件类型为txt,所以执行TextExtractor()

TextExtractor()实际执行的位置为dify\api\core\rag\extractor\text_extractor.py中的extract()方法:

最终得到text_docs内容:

二.转换函数_transform

def _transform(self, index_processor: BaseIndexProcessor, dataset: Dataset,
               text_docs: list[Document], doc_language: str, process_rule: dict) -> list[Document]:  # 转换
    # get embedding model instance
    embedding_model_instance = None
    if dataset.indexing_technique == 'high_quality':
        if dataset.embedding_model_provider:
            embedding_model_instance = self.model_manager.get_model_instance(
                tenant_id=dataset.tenant_id,
                provider=dataset.embedding_model_provider,
                model_type=ModelType.TEXT_EMBEDDING,
                model=dataset.embedding_model
            )  # 获取嵌入模型实例
        else:
            embedding_model_instance = self.model_manager.get_default_model_instance(
                tenant_id=dataset.tenant_id,
                model_type=ModelType.TEXT_EMBEDDING,
            )  # 获取默认嵌入模型实例

    documents = index_processor.transform(text_docs, embedding_model_instance=embedding_model_instance,
                                          process_rule=process_rule, tenant_id=dataset.tenant_id,
                                          doc_language=doc_language)  # 转换文档

    return documents

实际调用的是core.indexing_runner.IndexingRunner._transform

因为设置dataset.indexing_technique'economy'

因为这里index_processor类型为core.rag.index_processor.processor.paragraph_index_processor.ParagraphIndexProcessor,实际调用的是ParagraphIndexProcessor类中的transform函数:

这里splitter类型为core.splitter.fixed_text_splitter.FixedRecursiveCharacterTextSplitter

core.splitter.fixed_text_splitter.FixedRecursiveCharacterTextSplitter类:

调用文档清理CleanProcessor.clean()类方法:

实际调用的是dify\api\core\rag\cleaner\clean_processor.py中的CleanProcessor.clean()类方法:

clean方法的处理过程如下:

(1)默认清理:首先,方法会执行一些默认的清理操作,包括删除无效的符号。这些操作主要是通过正则表达式来实现的,例如,替换<\|<,替换\|>>,以及删除一些特定的ASCII和Unicode字符。

(2)规则应用:接下来方法会根据传入的process_rule字典中定义的规则来进一步清理文本。process_rule字典包含了一系列的清理规则,这些规则在rules键下的pre_processing_rules列表中定义。

(3)删除额外的空格:如果启用了remove_extra_spaces规则,方法会删除文本中的额外空格。这包括将三个或更多连续的换行符替换为两个换行符,以及将两个或更多连续的空格(包括特定的Unicode空格字符)替换为单个空格。

(4)删除URL和电子邮件地址:如果启用了remove_urls_emails规则,方法会从文本中删除URL和电子邮件地址。这是通过匹配特定的正则表达式模式来实现的,分别用于识别和删除电子邮件地址和URL。

通过splitter.split_documents([document])分割文档为文档节点:

实际调用的是dify\api\core\splitter\fixed_text_splitter.py中的FixedRecursiveCharacterTextSplitter类的split_text()方法。split_text方法的目的是将传入的文本分割成多个块,并返回这些块组成的列表。这个方法的处理过程可以分为以下几个步骤:

(1)检查固定分隔符:首先,方法检查是否存在一个固定的分隔符(_fixed_separator)。如果存在,它将使用这个分隔符来直接分割文本。这意味着文本将在每个出现固定分隔符的地方被分割。

(2)分割文本:使用固定分隔符分割文本后,会得到一个初步的块列表。然后,对这些初步的块进行进一步的处理,以确保每个块的长度不超过设定的大小(_chunk_size)。

(3)递归分割:对于每个初步的块,如果其长度超过了设定的大小,将使用recursive_split_text方法递归地进行进一步分割。这个递归过程会继续,直到所有的块都不超过设定的大小。

(4)返回最终块列表:最后,将所有处理后符合长度要求的块组成一个列表返回。

通过固定分隔符'\n'分割:

document_nodes = splitter.split_documents([document])实际返回文档节点数量为12:

dify\api\core\rag\index_processor\processor\paragraph_index_processor.py中,transform()方法主要是对文档进行预处理和分割,以便后续的索引或其它处理步骤可以更有效地处理文档的各个部分。主要执行步骤如下:

(1)获取分割器:根据传入的处理规则(process_rule)和嵌入模型实例(embedding_model_instance),获取文档分割器(splitter)。

(2)遍历文档:对于每个传入的文档(documents列表中的每个Document对象),执行以下子步骤:

  • 文档清理:使用CleanProcessor.clean方法清理文档的内容(document.page_content),移除不需要的字符或格式。

  • 文档分割:使用步骤1中获取的分割器(splitter)将清理后的文档内容分割成多个节点(document_nodes),每个节点代表文档的一部分。

  • 节点处理:对于每个分割后的节点,如果节点内容非空,则生成一个唯一的文档ID(doc_id)和文本哈希值(hash),并将这些信息添加到节点的元数据中。如果节点内容以特定字符(如.)开头,则移除这些字符。

  • 收集文档节点:将处理后的节点添加到一个列表中(split_documents),以便进一步处理。

(3)返回结果:将所有处理后的文档节点合并到一个列表中(all_documents),并返回该列表作为transform方法的结果。

三.保存函数_load_segments

接下调用self._load_segments(dataset, dataset_document, documents)方法:

实际调用的是_load_segments(self, dataset, dataset_document, documents)方法:

_load_segments 方法的主要目的是将处理后的文档段(documents)保存到数据库中,并更新相关文档的状态。这个过程可以分为以下几个步骤:

(1)初始化数据集文档存储:创建一个 DatasetDocumentStore 实例,这个实例与特定的数据集、创建者和文档ID相关联。

(2)添加文档段:使用 DatasetDocumentStore.add_documents(documents) 方法将处理后的文档段添加到数据库中。这里的 documents 是一个包含多个文档段的列表,每个文档段都是一个 Document 实例。具体doc_store.add_documents()self.get_document_segment()方法就不逐行调试,感兴趣可自行调试。

(3)更新文档状态为索引中:在所有文档段都保存到数据库之后,更新原始文档的状态为“索引中”(indexing)。这是通过调用 _update_document_index_status 方法实现的,该方法还会更新文档的清理完成时间和拆分完成时间。self._update_document_index_status()方法就不逐行调试,感兴趣可自行调试。

(4)更新段状态为索引中:最后,更新所有相关文档段的状态为"索引中"。这是通过调用 _update_segments_by_document 方法实现的,该方法会更新所有相关文档段的状态和索引时间。self._update_segments_by_document()方法就不逐行调试,感兴趣可自行调试。

这个过程确保了文档段的正确保存和状态更新,为后续的索引和检索操作做好准备。

四.加载函数_load

接下来调用self._load()方法:

实际调用的是def _load(self, index_processor: BaseIndexProcessor, dataset: Dataset, dataset_document: DatasetDocument, documents: list[Document]) -> None:_load 方法的主要目的是将处理后的文档数据加载到索引中,并更新相关文档和文档段的状态为完成,以支持后续的搜索和检索操作。这个过程可以分为以下几个步骤:

(1)检查索引技术:根据数据集的索引技术(例如,economy),如果是high_quality,那么需要获取嵌入模型实例来进行高质量的索引处理。

(2)分块处理:如果使用嵌入模型实例,将文档数据分块处理。这通常涉及到并发执行,以提高处理效率。

(3)创建关键字索引:在一个独立的线程中,对文档数据进行关键字索引的创建,以便于后续的搜索和检索。

(4)更新文档状态:在所有文档数据被成功加载到索引后,更新相关文档的状态为completed,表示索引过程已完成。

(5)异常处理:在处理过程中,如果遇到任何异常(如文档暂停、提供者令牌未初始化错误等),将会更新文档的索引状态为error,并记录错误信息。

重点解释的是创建关键字线程这部分:

# create keyword index  # 创建关键字索引
create_keyword_thread = threading.Thread(target=self._process_keyword_index,
                                         args=(current_app._get_current_object(),
                                               dataset.id, dataset_document.id, documents))
create_keyword_thread.start() # 启动线程
create_keyword_thread.join()  # 等待线程结束

create_keyword_thread是一个 Thread 对象,之前通过 threading.Thread 创建并启动。它代表了一个独立的线程,用于执行某些任务,这里假设是处理关键字索引的任务。

.join(): 这是 Thread 类的一个方法。调用这个方法会使得调用它的线程(如主线程)等待,直到 create_keyword_thread 线程完成执行。如果 create_keyword_thread 已经完成,join() 会立即返回。

create_keyword_thread.join() 的作用是阻塞调用它的线程(通常是主线程),直到 create_keyword_thread 线程执行完成。这样做的目的是确保 create_keyword_thread 线程中的任务完全执行完毕后,主线程才继续执行后面的代码。

这种做法在需要确保某个线程中的任务完全完成后才进行下一步操作时非常有用,比如在处理完所有数据后才关闭数据库连接,或者在继续执行依赖于线程任务结果的代码之前确保线程任务已完成。

这里面执行的任务是target=self._process_keyword_index。这个方法主要用于在后台线程中处理关键字索引的创建和相关文档段状态的更新,确保这些操作在正确的应用上下文中执行:

def _process_keyword_index(self, flask_app, dataset_id, document_id, documents):  # 处理关键字索引
    with flask_app.app_context():
        dataset = Dataset.query.filter_by(id=dataset_id).first()
        if not dataset:
            raise ValueError("no dataset found")
        keyword = Keyword(dataset)
        keyword.create(documents)
        if dataset.indexing_technique != 'high_quality':
            document_ids = [document.metadata['doc_id'] for document in documents]
            db.session.query(DocumentSegment).filter(
                DocumentSegment.document_id == document_id,
                DocumentSegment.index_node_id.in_(document_ids),
                DocumentSegment.status == "indexing"
            ).update({
                DocumentSegment.status: "completed",
                DocumentSegment.enabled: True,
                DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
            })

            db.session.commit()

_process_keyword_index 方法的目的是在 Flask 应用的上下文中处理关键字索引。这个方法接收四个参数:flask_app(Flask 应用实例),dataset_id(数据集的 ID),document_id(文档的 ID),以及documents(文档对象列表)。方法的执行流程如下:

(1)使用 Flask 应用的上下文:这是必要的步骤,因为在 Flask 应用之外的线程中执行数据库操作或者访问 Flask 应用的配置时,需要手动创建应用上下文。

(2)查询数据集:通过 dataset_id 从数据库中查询对应的数据集对象。如果没有找到对应的数据集,抛出一个值错误异常。

(3)创建关键字索引:使用查询到的数据集对象初始化 Keyword 类的实例,并调用其 create 方法,传入文档对象列表来创建关键字索引。

重点分析keyword.create(documents)的过程,由于这个过程比较长,用另外一篇文档进行详细分析,具体参考文献 [1]。

(4)更新文档段状态:如果数据集的索引技术不是 'high_quality',则获取所有文档对象中的 doc_id,并更新数据库中对应 document_idindex_node_id 的文档段对象的状态为 "completed",同时设置其为启用状态,并记录完成时间。最后,提交数据库会话以保存更改。

参考文献

[1] Dify中Jieba类的create()方法执行过程:https://z0yrmerhgi8.feishu.cn/wiki/RKIewMrY2iaC1wks20FcMqhcnae

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1926287.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

力扣经典题目之->移除值为val元素的讲解,的实现与讲解

一&#xff1a;题目 博主本文将用指向来形象的表示下标位的移动。 二&#xff1a;思路 1&#xff1a;两个整形&#xff0c;一个start&#xff0c;一个end&#xff0c;在一开始都 0&#xff0c;即这里都指向第一个元素。 2&#xff1a;在查到val之前&#xff0c;查一个&…

C语言 ——— 将一句英语短句中的单词进行倒置

目录 题目要求 代码实现 题目要求 将一句英语短句中的单词进行倒置&#xff0c;标点符号不倒置 如&#xff1a; 输入&#xff1a;"I like chongqing very much," 输出&#xff1a;"much, very chongqing like I" 代码实现 #include<stdio.h> #i…

c#与欧姆龙PLC通信——如何更改PLC的IP地址

前言 我们有时候需要改变欧姆龙Plc的ip地址,下图有两种更改方式,一种是已知之前Plc设置的Ip地址,还有一种是之前不知道Pl的Ip地址是多少,下面分别做介绍。 1、已知PLC的IP地址的情况下更改地址 假设已知PLC的Ip地址,比如本文中PLC的IP为192.168.1.2,我首先将电脑的IP地…

搭建调用链监控Zipkin和Sleuth

项目环境: win7、jdk8 1、添加依赖&#xff0c;添加了spring-cloud-starter-zipkin会自动导入Sleuth <!--Sleuth&#xff0c;zipkin--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-zipkin</…

安卓onNewIntent 什么时候执行

一.详细介绍 onNewIntent 方法 onNewIntent 是 Android 中 Activity 生命周期的一部分。它在特定情况下被调用&#xff0c;主要用于处理新的 Intent&#xff0c;而不是创建新的 Activity 实例。详细介绍如下&#xff1a; 使用场景 singleTop 启动模式&#xff1a; 如果一个 Ac…

python+mysql图书管理系统,谈谈思路及实现代码

&#x1f3c6;本文收录于《CSDN问答解答》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收藏&…

【链表】算法题(一) ---- 力扣 / 牛客

一、移除链表元素 移除链表中值为val的元素&#xff0c;并返回新的头节点 思路&#xff1a; 题目上这样说&#xff0c;我们就可以创建一个新的链表&#xff0c;将值不为val的节点&#xff0c;尾插到新的链表当中&#xff0c;最后返回新链表的头节点。 typedef struct ListNo…

java《字符串进阶篇》--习题逐语句分析及认识链式编程

一、前言 字符串相关的习题分享&#xff0c;随着学习的深入&#xff0c;应该要多做一些习题来巩固知识点&#xff0c;而不是一味的去学习新的东西。这几天尽可能地去给大家分享一些常用的方法及习题的讲解&#xff0c;希望大家认真观看&#xff0c;每一道题都有对应的分析。基…

GAMMA数据处理(八)

新学习了一个命令&#xff1a; SLC_cat_ScanSAR - Concatenate sequential ScanSAR burst SLC images (Sentinel-1, TSX, RCM...)&#xff0c;做数据拼接的。之前一直没有涉及到拼接问题&#xff0c;就一直没管。如果研究区包含两景SLC&#xff0c;可以拼接成一景。但是不知道…

计算机丢失CH375DLL怎么办,CH375DLL.DLL;计算机找不到CH375DLL怎么办,CH375DLL.DLL

翻遍CSDN&#xff0c;发现的文章不是只有描述不给资源&#xff0c;要不就是资源收费。 真是狗屎啊&#xff1b; 在千辛万苦找到资源后&#xff0c;我决定写一篇&#xff1b; 首先是资源文件下载 我上传的&#xff1a;&#xff08;肯定是0积分&#xff0c;如果收费了告诉我&…

Nuxt.js 错误侦探:useError 组合函数

title: Nuxt.js 错误侦探&#xff1a;useError 组合函数 date: 2024/7/14 updated: 2024/7/14 author: cmdragon excerpt: 摘要&#xff1a;文章介绍Nuxt.js中的useError组合函数&#xff0c;用于统一处理客户端和服务器端的错误&#xff0c;提供statusCode、statusMessage和…

IOT 可编程控制系统

IOT&#xff08;物联网&#xff09;可编程控制系统&#xff0c;如GF-MAXCC等&#xff0c;是一种集成了多种先进技术和功能的智能化控制设备&#xff0c;它能够在物联网系统中发挥关键作用&#xff0c;实现对多种设备的集中管理和控制。具体来说&#xff0c;IOT可编程控制系统的…

7天学会CANOpen

本系列文章&#xff0c;主要介绍CANOpen的学习知识&#xff0c;能够全面掌握CANOpen原理。文章会不定期的更新。 学习基础&#xff1a;CAN通信。 1. CANOpen通信协议1 2. CANOpen对象字的理解 3. CANOpen之CAN-ID、NODE-ID、COB-ID 4. CanOpen报文类型 5. CANO报文---SDO…

python 怎样生成窗体

通过import tkinter导入Tkinter模块&#xff0c;没有这句下面的都不成立了。 wintkinter.Tk()&#xff0c;这句是创建windows的窗口对象&#xff0c;注意后面的Tk&#xff0c;大小写。 win.title("窗口")&#xff0c;这段是设置窗口上的标题。 另外窗口的大小你可以通…

Paddle 打包部署

PaddleOCR 打包部署exe 心酸历程 PaddleOCR部署exe模式PaddleOCR安装到本地(稍后有时间再写)PaddleOCR打包过程异常问题记录&#xff01;&#xff01;&#xff01;&#xff01;No such file or directory: D:\\py_project\\paddleOCR\\dist\\paddleOCR\\_internal\\paddleocr\\…

JVM:垃圾回收器

文章目录 一、介绍二、年轻代-Serial垃圾回收器三、老年代-SerialOld垃圾回收器四、年轻代-ParNew垃圾回收器五、老年代-CMS&#xff08;Concurrent Mark Sweep&#xff09;垃圾回收器六、年轻代-Parllel Scavenge垃圾回收器七、Parallel Old垃圾回收器八、G1垃圾回收器 一、介…

《Python数据科学之五:模型评估与调优深入解析》

《Python数据科学之五&#xff1a;模型评估与调优深入解析》 在数据科学项目中&#xff0c;精确的模型评估和细致的调优过程是确保模型质量、提高预测准确性的关键步骤。本文将详细探讨如何利用 Python 及其强大的库进行模型评估和调优&#xff0c;确保您的模型能够达到最佳性能…

防火墙之NAT,智能选路篇

什么是NAT? 网络地址转换 1.静态NAT&#xff08;static NAT&#xff09;&#xff08;静态一对一映射&#xff09;&#xff1a;设置起来最为简单&#xff0c;内部网络中的每个主机都被永久映射成外部网络中的某个合法的地址。多用于服务器场景。 2.动态NAT&#xff08;pool…

mavsdk_server安卓平台编译

1.下载好mavsdk并进入mavsdk目录 2.生成docker安卓平台文件 docker run --rm dockcross/android-arm64 >./dockcross-android-arm64 3.生成makefile ./dockcross-android-arm64 cmake -DCMAKE_BUILD_TYPERelease -DBUILD_MAVSDK_SERVERON -DBUILD_SHARED_LIBSOFF -Bbuild/…

Study--Oracle-07-ASM自动存储管理(二)

一、ASM安装准备条件 1、ASM支持存储类型 本地祼设备(本地的磁盘和分区) 网络附加存储(NAS) 存储区域网络(SAN) 2、ASM使用本地裸设备,要点: 已经被挂载到操作系统上或者已经做了分区 映射裸设备为文件名 设置正确的权限(针对grid用户和asmadmin组,权限为660) 二、OR…