GraphRAG:LLM之Graphrag接入milvus

news2025/1/6 20:59:27

前言

微软目前的graphrag更像个demo,数据量大的时候不是很友好的啊,所以将milvus接入了graphrag,看完这篇文章,其他数据库接入应该也没问题

注:这篇文章只是在search的时候接入进来,index过程或者说整个流程接入有时间再写一遍博客

连接数据库

在graphrag.query.cil.py 文件中,我们定位到run_local_search函数中,找到

store_entity_semantic_embeddings(
    entities=entities, vectorstore=description_embedding_store
)

 将其注释掉,然后新加上

    if vector_store_type == VectorStoreType.Milvus:
        #自定义实现
        store_text_semantic_embeddings(Texts=text_units, vectorstore=description_embedding_store,
                                       final_documents=final_documents)
    else:
        store_entity_semantic_embeddings(
            entities=entities, vectorstore=description_embedding_store
        )

 其中vector_store_type是graphrag中向量数据库的选择,位于graphrag\vector_stores\typing.py中,我们需要手动加上 Milvus = 'milvus'

class VectorStoreType(str, Enum):
    """The supported vector store types."""

    LanceDB = "lancedb"
    AzureAISearch = "azure_ai_search"
    Milvus = 'milvus'

 同时对get_vector_store进行修改,加入case VectorStoreType.Milvus

MilvusVectorStore是自定义类,实现milvus的接口,后续会讲
    @classmethod
    def get_vector_store(
        cls, vector_store_type: VectorStoreType | str, kwargs: dict
    ) -> LanceDBVectorStore | AzureAISearch:
        """Get the vector store type from a string."""
        match vector_store_type:
            case VectorStoreType.LanceDB:
                return LanceDBVectorStore(**kwargs)
            case VectorStoreType.AzureAISearch:
                return AzureAISearch(**kwargs)
            case VectorStoreType.Milvus:
                return MilvusVectorStore(**kwargs)
            case _:
                if vector_store_type in cls.vector_store_types:
                    return cls.vector_store_types[vector_store_type](**kwargs)
                msg = f"Unknown vector store type: {vector_store_type}"
                raise ValueError(msg)

        

然后是store_text_semantic_embeddings函数是对齐store_entity_semantic_embeddings实现的,位于graphrag\query\input\loaders\dfs.py中

def store_text_semantic_embeddings(
    Texts: list[TextUnit],
    vectorstore: BaseVectorStore,
    final_documents:DataFrame,
) -> BaseVectorStore:
    """Store entity semantic embeddings in a vectorstore."""

    documents = []
    for Text in Texts:
        matching_rows = final_documents[final_documents['id'] == Text.document_ids[0]]
        if not matching_rows.empty: #如果存在文章名字 则存入文章名字 否则存入graphrag生成的文本块id
            document_title = matching_rows['title'].values[0]
        else:
            document_title = Text.document_ids
        attributes_dict = {'document_title': document_title,"entity_ids": Text.entity_ids} #除了文章名字 还有文本块中提取的实例id
        if Text.attributes:
            attributes_dict.update({**Text.attributes})
        documents.append(
            VectorStoreDocument(
            id=Text.id,
            text=Text.text,
            vector=Text.text_embedding,
            attributes=attributes_dict
            )
        )

    vectorstore.load_documents(documents=documents) #将文本块数据加载进milvus数据库中
    return vectorstore

 具体代码如下:

from graphrag.query.input.loaders.dfs import (
    store_entity_semantic_embeddings,store_text_semantic_embeddings
)
def run_local_search(
    data_dir: str | None,
    root_dir: str | None,
    community_level: int,
    response_type: str,
    query: str,
):
    """Run a local search with the given query."""
    data_dir, root_dir, config = _configure_paths_and_settings(data_dir, root_dir)
    data_path = Path(data_dir)


    final_documents = pd.read_parquet(data_path / "create_final_documents.parquet")

    final_text_units = pd.read_parquet(data_path / "create_final_text_units.parquet")


    final_community_reports = pd.read_parquet(
        data_path / "create_final_community_reports.parquet"
    )

    final_relationships = pd.read_parquet(
        data_path / "create_final_relationships.parquet"
    )
    final_nodes = pd.read_parquet(data_path / "create_final_nodes.parquet")
    final_entities = pd.read_parquet(data_path / "create_final_entities.parquet")
    final_covariates_path = data_path / "create_final_covariates.parquet"
    final_covariates = (
        pd.read_parquet(final_covariates_path)
        if final_covariates_path.exists()
        else None
    )

    #不做调整 默认是{}
    vector_store_args = (
        config.embeddings.vector_store if config.embeddings.vector_store else {}
    )
    #获取数据库类型 默认VectorStoreType.LanceDB 
    vector_store_type = vector_store_args.get("type", VectorStoreType.LanceDB)

    #初始化数据库 默认获取LanceDB
    description_embedding_store = __get_embedding_description_store(
        vector_store_type=vector_store_type,
        config_args=vector_store_args,
    )

    #获取实例
    entities = read_indexer_entities(final_nodes, final_entities, community_level)


    #covariates 默认{}
    covariates = (
        read_indexer_covariates(final_covariates)
        if final_covariates is not None
        else []
    )

    reports = read_indexer_reports(final_community_reports, final_nodes, community_level)
    text_units = read_indexer_text_units(final_text_units)
    relationships = read_indexer_relationships(final_relationships)
    covariates = {"claims": covariates}

    if vector_store_type == VectorStoreType.Milvus:
        #自定义实现 将文本块数据存入milvus中
        store_text_semantic_embeddings(Texts=text_units, vectorstore=description_embedding_store,
                                       final_documents=final_documents)
    else:
        store_entity_semantic_embeddings(
            entities=entities, vectorstore=description_embedding_store
        )


    search_engine = get_local_search_engine(
        config,
        reports=reports,
        text_units=text_units,
        entities=entities,
        relationships=relationships,
        covariates=covariates,
        description_embedding_store=description_embedding_store,
        response_type=response_type,
    )


    result = search_engine.search(query=query,method_type=method_type)

    reporter.success(f"Local Search Response: {result.response}")
    return result

 然后在graphrag.vector_stores中创建个milvus.py文件,我们实现一个MilvusVectorStore类

 

from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection,utility
from pymilvus import MilvusClient
from tqdm import tqdm
from datetime import datetime
from pymilvus import AnnSearchRequest
from typing import Any
from .base import (
    BaseVectorStore,
    VectorStoreDocument,
    VectorStoreSearchResult,
)
from graphrag.model.types import TextEmbedder
import json

from xinference.client import Client #不是必要
client = Client("http://0.0.0.0:9997")
list_models_run = client.list_models()
model_uid = list_models_run['bge-m3']['id']
embedding_client = client.get_model(model_uid)

class MilvusVectorStore(BaseVectorStore):
    def __init__(self,url:str='0.0.0.0',collection_name:str='data_store',recrate:bool=False,key_word_flag:bool=True):
        self.key_word_flag = key_word_flag
        connections.connect(host=url, port="19530")
        self.has_collection = utility.has_collection(collection_name) #判断是否存在collection
        print(f'has_collection={self.has_collection}')
        if recrate and  self.has_collection :
            s= input(f'Are you sure delete {collection_name}, yes or no \n ')
            if s =='yes':
                self.delete_collection(collection_name)
                print(f'删除{collection_name}成功')
        if not recrate and self.has_collection: #判断是否存在collection_name
            self.collection  = Collection(name=collection_name)
        else:
            schema = self.get_schema()
            self.collection = Collection(name=collection_name, schema=schema)

    def get_schema(self):
        id = FieldSchema(name="id", dtype=DataType.INT64,is_primary=True,auto_id=True)  # 主键索引
        graph_id = FieldSchema(name="graph_id", dtype=DataType.VARCHAR,max_length=128)
        text = FieldSchema(name="text", dtype=DataType.VARCHAR,max_length=58192)
        file_name = FieldSchema(name="file_name", dtype=DataType.VARCHAR,max_length=512)
        text_embedding = FieldSchema(name="text_embedding", dtype=DataType.FLOAT_VECTOR,dim=1024)  # 向量,dim=2代表向量只有两列,自己的数据的话一个向量有多少个元素就多少列
        #n_tokens = FieldSchema(name="n_tokens", dtype=DataType.INT64)
        if self.key_word_flag:
            key_word = FieldSchema(name="key_word", dtype=DataType.VARCHAR, max_length=8192)
            key_word_embedding = FieldSchema(name="key_word_embedding", dtype=DataType.FLOAT_VECTOR,dim=1024)
            schema = CollectionSchema(fields=[id,graph_id,text,file_name,text_embedding,key_word,key_word_embedding], description="文本与文本嵌入存储")  # 描述
        else:
            schema = CollectionSchema(fields=[id, graph_id,text, file_name, text_embedding],description="文本与文本嵌入存储")  # 描述
        return schema

    def change_collection(self,collection_name):
        schema = self.get_schema()
        self.collection  = Collection(name=collection_name,schema=schema)

    def delete_collection(self,collection_name):
        utility.drop_collection(collection_name)

    def release_collection(self):
        # self.collection.release_collection(collection_name)
        self.collection.release()



    def list_collections(self):
        collections_list = utility.list_collections()
        return collections_list


    def create_index(self,metric_type='L2',index_name='L2'):
        #utility.drop_collection(collection_name=collection_name)
        # self.collection = Collection(name=collection_name, schema=schema)
        index_params = {
            "index_type": "AUTOINDEX",
             "metric_type":metric_type,
            "params": {}
        }
        self.collection.create_index(
            field_name="text_embedding",
            index_params=index_params,
            index_name= 'text_embedding'
        )
        if self.key_word_flag:
            self.collection.create_index(
                field_name="key_word_embedding",
                index_params=index_params,
                index_name='key_word_embedding'
            )
        self.collection.load() #load_fields=['id',"text_embedding"]

    def drop_index(self):
        self.collection.release()
        self.collection.drop_index()


    def insert_data(self,data_dict:dict):#text_id_list,text_list,file_name_list,text_embedding_list,key_word_list,key_word_embedding_list
        start = datetime.now()
        self.collection.insert(data_dict)
        # if self.key_word_flag:
        #     for id,text,file_name,text_embedding,key_word,key_word_embedding in zip(text_id_list,text_list,file_name_list,text_embedding_list,key_word_list,key_word_embedding_list):
        #         self.collection.insert([[id],[text],[file_name],[text_embedding],[key_word],[key_word_embedding]])
        # else:
        #     for id,text,file_name,text_embedding in zip(text_id_list,text_list,file_name_list,text_embedding_list):
        #         self.collection.insert([[id],[text],[file_name],[text_embedding]])
        end = datetime.now()
        print(f'插入数据消化时间{end-start}')

    def search(self,query_embedding, top_k=10,metric_type='L2'):
        search_params = {
            "metric_type": metric_type,
            "params": {"level": 2}
        }

        results = self.collection.search(
            [query_embedding],
            anns_field="text_embedding",
            param=search_params,
            limit=top_k,
            output_fields=['graph_id',"text", "file_name",'text_embedding']
        )[0]

        return results


    def hybrid_search(self, query_dense_embedding, query_sparse_embedding, rerank,top_k=10, metric_type='L2'):
        dense_search_params = {
            "index_type": "AUTOINDEX",
             "metric_type":metric_type,
            "params": {}
        }
        # dense_req = self.collection.search( [query_dense_embedding],
        #     anns_field="text_embedding",
        #     param=dense_search_params,
        #     limit=top_k,
        #     output_fields=["text", "file_name"])

        dense_req = AnnSearchRequest(
            [query_dense_embedding], "text_embedding", dense_search_params, limit=top_k
        )

        sparse_search_params = {
            "index_type": "AUTOINDEX",
             "metric_type":metric_type,
            "params": {}
        }

        # sparse_req = self.collection.search( [query_sparse_embedding],
        #     anns_field="text_embedding",
        #     param=sparse_search_params,
        #     limit=top_k,
        #     output_fields=["text", "file_name"])

        sparse_req = AnnSearchRequest(
            [query_sparse_embedding], "key_word_embedding", sparse_search_params, limit=top_k
        )

        res = self.collection.hybrid_search(
            [dense_req,sparse_req],rerank=rerank, limit=top_k, output_fields=["text", "file_name"]
        )[0]

        return res


    def reranker_init(self,model_name_or_path,device="cpu"):
        self.reranker = bge_rf = BGERerankFunction(
            model_name=model_name_or_path,  # Specify the model name. Defaults to `BAAI/bge-reranker-v2-m3`.
            device="cpu"  # Specify the device to use, e.g., 'cpu' or 'cuda:0'
        )

    def rereank(self,query,serach_result,top_k,rerank_client=None):
        documents_list = [i.entity.get('text') for i in serach_result]
        #如果外部传入非milvus集成的rerank
        if rerank_client:
            response = rerank_client.rerank(
                query=query,
                documents=documents_list,
                top_n=top_k,
            )
            rerank_results = response['results']
            results = []
            for i in rerank_results:
                index = i['index']
                results.append(serach_result[index])

            h = 1

        else:
            results = self.reranker(
                query=query,
                documents=documents_list,
                top_k=top_k,
            )

        return results

    def filter_by_id(self, include_ids: list[str] | list[int]) -> Any:
        """Build a query filter to filter documents by id."""
        if len(include_ids) == 0:
            self.query_filter = None
        else:
            if isinstance(include_ids[0], str):
                id_filter = ", ".join([f"'{id}'" for id in include_ids])
                self.query_filter = f"id in ({id_filter})"
            else:
                self.query_filter = (
                    f"id in ({', '.join([str(id) for id in include_ids])})"
                )
        return self.query_filter


    def connect(self,url:str='0.0.0.0',collection_name:str='data_store',recrate:bool=False,key_word_flag:bool=False,**kwargs: Any) -> Any:
        self.key_word_flag = key_word_flag
        connections.connect(host=url, port="19530")
        has_collection = utility.has_collection(collection_name) #判断是否存在collection
        if recrate and  has_collection :
            s= input(f'Are you sure delete {collection_name}, yes or no \n ')
            if s =='yes':
                self.delete_collection(collection_name)
                print(f'删除{collection_name}成功')

        if not recrate and has_collection: #判断是否存在collection_name
            self.collection  = Collection(name=collection_name)
        else:
            schema = self.get_schema()
            self.collection = Collection(name=collection_name, schema=schema)
        self.create_index()

    def load_documents(
        self, documents: list[VectorStoreDocument], overwrite: bool = True
    ) -> None:
        """Load documents into vector storage."""
        documents = [
            document
            for document in documents
            if document.vector is not None
        ]

        if self.has_collection:
            s = input(f'Are you want to insert data, yes or no \n ')
            if s == 'yes':
                batch = 100
                documents_len = len(documents)
                insert_len = int(documents_len / batch) #milvus 一次性不能插入太多数据 所以需要分批次插入

                data_list = list()
                start = datetime.now()
                print(f'插入数据中***')
                for document in documents:
                    attributes = document.attributes
                    file_name = attributes.get('document_title')[0]
                    temp_dict = {
                        "graph_id": document.id,
                        "text": document.text,
                        "text_embedding": document.vector,
                        "file_name": file_name,
                    }
                    data_list.append(temp_dict)

                    if len(data_list) >= insert_len:
                        self.collection.insert(data_list)
                        data_list = []
                if data_list:  # 防止还有数据
                    self.collection.insert(data_list)
                end = datetime.now()
                print(f'插入数据消化时间{end-start}')

    def similarity_search_by_text(
        self, text: str, text_embedder: TextEmbedder, k: int = 10, **kwargs: Any
    ) -> list[VectorStoreSearchResult]:
        """Perform a similarity search using a given input text."""
        query_embedding = embedding_client.create_embedding([text])['data'][0]['embedding']

        if query_embedding:
            search_result = self.similarity_search_by_vector(query_embedding, k)
            return search_result
        return []

    def similarity_search_by_vector(
        self, query_embedding: list[float], k: int = 10, **kwargs: Any
    ) -> list[VectorStoreSearchResult]:
        docs = self.search(query_embedding=query_embedding,top_k=k)

        result = []

        for doc in docs:
            file_name = doc.entity.get('file_name')
            attributes = {'document_title':file_name,'entity':[]}
            score = abs(float(doc.score))
            temp = VectorStoreSearchResult(
                document=VectorStoreDocument(
                    id=doc.entity.get('graph_id'),
                    text=doc.entity.get('text'),
                    vector=doc.entity.get('text_embedding'),
                    attributes=attributes,
                ),
                score=score,
            )
            result.append(temp)


        # return [
        #     VectorStoreSearchResult(
        #         document=VectorStoreDocument(
        #             id=doc["id"],
        #             text=doc["text"],
        #             vector=doc["vector"],
        #             attributes=json.loads(doc["attributes"]),
        #         ),
        #         score=1 - abs(float(doc["_distance"])),
        #     )
        #     for doc in docs
        # ]
        return result

    def similarity_search_by_query(
        self, query: str, text_embedder: TextEmbedder, k: int = 10, **kwargs: Any
    ) -> list[VectorStoreSearchResult]:
        h  =1
    def similarity_search_by_hybrid(
        self, query: str, text_embedder: TextEmbedder, k: int = 10,oversample_scaler:int=10, **kwargs: Any
    ) -> list[VectorStoreSearchResult]:
        h = 1

 修改搜索代码

找到graphrag\query\structured_search\local_search\mixed_context.py文件

或者在graphrag\query\cli.py的run_local_search函数中的get_local_search_engine跳转,

找到get_local_search_engine函数的return中的LocalSearchMixedContext跳转就到了该类的实现代码

定位到build_context函数的map_query_to_entities,进行跳转到函数实现,位于graphrag\query\context_builder\entity_extraction.py中找到

                matched = get_entity_by_key(
                    entities=all_entities, #所有的Entity
                    key=embedding_vectorstore_key,
                    value=result.document.id,
                )

修改成

                entity_ids = result.document.attributes.get('entity_ids')
                if entity_ids:
                    for entity_id in entity_ids:
                        matched = get_entity_by_key(
                            entities=all_entities, #所有的Entity
                            key=embedding_vectorstore_key,
                            value=entity_id,
                        )
                        if matched:
                            matched_entities.append(matched)

 如果想保留graphrag原本这部分的搜索代码,可以像我这个样子

        for result in search_results:
            if method_type == 'text_match':
                entity_ids = result.document.attributes.get('entity_ids')
                if entity_ids:
                    for entity_id in entity_ids:
                        matched = get_entity_by_key(
                            entities=all_entities, #所有的Entity
                            key=embedding_vectorstore_key,
                            value=entity_id,
                        )
                        if matched:
                            matched_entities.append(matched)

            else:
                matched = get_entity_by_key(
                    entities=all_entities, #所有的Entity
                    key=embedding_vectorstore_key,
                    value=result.document.id,
                )
                if matched:
                    matched_entities.append(matched)

加行参数进行控制或者根据vector_store_type进行控制,最后修改map_query_to_entities函数的return,加上search_results

def map_query_to_entities(
    query: str,
    text_embedding_vectorstore: BaseVectorStore,
    text_embedder: BaseTextEmbedding,
    all_entities: list[Entity],
    embedding_vectorstore_key: str = EntityVectorStoreKey.ID,
    include_entity_names: list[str] | None = None,
    exclude_entity_names: list[str] | None = None,
    k: int = 10,
    oversample_scaler: int = 2,
    method_type:str|None = None,
) -> list[Entity]:
    """Extract entities that match a given query using semantic similarity of text embeddings of query and entity descriptions."""
    if include_entity_names is None:
        include_entity_names = []
    if exclude_entity_names is None:
        exclude_entity_names = []
    matched_entities = []
    if query != "":
        # get entities with highest semantic similarity to query
        # oversample to account for excluded entities
        # 在graphrag文件夹目录的vector_stores目录下的lancedb文件中查看

        print(f'准备embedding')
        start_time = datetime.now()
        #返回的是相似的向量
        search_results = text_embedding_vectorstore.similarity_search_by_text(
            text=query,
            text_embedder=lambda t: text_embedder.embed(t),
            k=k * oversample_scaler,
        )
        end_time = datetime.now()
        print(f'耗时{end_time-start_time}')

        for result in search_results:
            if method_type == 'text_match':
                entity_ids = result.document.attributes.get('entity_ids')
                if entity_ids:
                    for entity_id in entity_ids:
                        matched = get_entity_by_key(
                            entities=all_entities, #所有的Entity
                            key=embedding_vectorstore_key,
                            value=entity_id,
                        )
                        if matched:
                            matched_entities.append(matched)

            else:
                matched = get_entity_by_key(
                    entities=all_entities, #所有的Entity
                    key=embedding_vectorstore_key,
                    value=result.document.id,
                )
                if matched:
                    matched_entities.append(matched)
    else:
        all_entities.sort(key=lambda x: x.rank if x.rank else 0, reverse=True)
        matched_entities = all_entities[:k]


    # filter out excluded entities
    # 默认exclude_entity_names []
    if exclude_entity_names:
        matched_entities = [
            entity
            for entity in matched_entities
            if entity.title not in exclude_entity_names
        ]

    # add entities in the include_entity list
    included_entities = []
    #默认include_entity_names []
    for entity_name in include_entity_names:
        included_entities.extend(get_entity_by_name(all_entities, entity_name))
    return included_entities + matched_entities,search_results #原本没有search_results

 不要忘记在graphrag\query\structured_search\local_search\mixed_context.py的build_context函数中修改map_query_to_entities由2个返回值变成了3个

        selected_entities,search_results = map_query_to_entities(
            query=query,
            text_embedding_vectorstore=self.entity_text_embeddings,
            text_embedder=self.text_embedder,
            all_entities=list(self.entities.values()),
            embedding_vectorstore_key=self.embedding_vectorstore_key,
            include_entity_names=include_entity_names,
            exclude_entity_names=exclude_entity_names,
            k=top_k_mapped_entities,
            oversample_scaler=20,#2
            method_type=method_type
        )

在build_context函数末尾找到self._build_text_unit_context函数,新加参数传入search_results

        text_unit_context, text_unit_context_data,document_id_context = self._build_text_unit_context(
            selected_entities=selected_entities,
            max_tokens=text_unit_tokens,
            return_candidate_context=return_candidate_context,
            search_results=search_results,
            method_type= method_type
        )

 跳转到该函数的实现位置,仍然在mixed_context.py中,修改或者替换掉下面的代码

            for index, entity in enumerate(selected_entities):
                if entity.text_unit_ids:
                    for text_id in entity.text_unit_ids:
                        if (
                            text_id not in [unit.id for unit in selected_text_units]
                            and text_id in self.text_units
                        ):
                            selected_unit = self.text_units[text_id]
                            num_relationships = count_relationships(
                                selected_unit, entity, self.relationships
                            )
                            if selected_unit.attributes is None:
                                selected_unit.attributes = {}
                            selected_unit.attributes["entity_order"] = index
                            selected_unit.attributes["num_relationships"] = (
                                num_relationships
                            )
                            selected_text_units.append(selected_unit)

            # sort selected text units by ascending order of entity order and descending order of number of relationships
            selected_text_units.sort(
                key=lambda x: (
                    x.attributes["entity_order"],  # type: ignore
                    -x.attributes["num_relationships"],  # type: ignore
                )
            )

            for unit in selected_text_units:
                del unit.attributes["entity_order"]  # type: ignore
                del unit.attributes["num_relationships"]  # type: ignore

我的建议还是保留着,反正我是改成了

        if method_type =='text_match':
            for index, Text in enumerate(search_results):
                text_id  =Text.document.id
                if (
                        text_id not in [unit.id for unit in selected_text_units]
                        and text_id in self.text_units
                ):
                    selected_unit = self.text_units[text_id]
                    if selected_unit.attributes is None:
                        selected_unit.attributes = {'documnet_title':Text.document.attributes['document_title']}
                    selected_text_units.append(selected_unit)


        else:
            for index, entity in enumerate(selected_entities):
                if entity.text_unit_ids:
                    for text_id in entity.text_unit_ids:
                        if (
                            text_id not in [unit.id for unit in selected_text_units]
                            and text_id in self.text_units
                        ):
                            selected_unit = self.text_units[text_id]
                            num_relationships = count_relationships(
                                selected_unit, entity, self.relationships
                            )
                            if selected_unit.attributes is None:
                                selected_unit.attributes = {}
                            selected_unit.attributes["entity_order"] = index
                            selected_unit.attributes["num_relationships"] = (
                                num_relationships
                            )
                            selected_text_units.append(selected_unit)

            # sort selected text units by ascending order of entity order and descending order of number of relationships
            selected_text_units.sort(
                key=lambda x: (
                    x.attributes["entity_order"],  # type: ignore
                    -x.attributes["num_relationships"],  # type: ignore
                )
            )

            for unit in selected_text_units:
                del unit.attributes["entity_order"]  # type: ignore
                del unit.attributes["num_relationships"]  # type: ignore

然后就更换掉了向量数据库了,传入图数据啥的代码量更大 等我有时间再搞

代码量有点大

欢迎大家点赞或收藏~

大家的点赞或收藏可以鼓励作者加快更新~

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2271110.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PHP框架+gatewayworker实现在线1对1聊天--发送消息(6)

文章目录 发送消息原理说明发送功能实现html部分javascript代码PHP代码 发送消息原理说明 接下来我们发送聊天的文本信息。点击发送按钮的时候,会自动将文本框里的内容发送出去。过程是我们将信息发送到服务器,服务器再转发给对方。文本框的id为msgcont…

DuckDB:密钥管理器及其应用

密钥管理器(Secrets Manager)为所有使用密钥的后端提供了统一的用户界面。密钥信息可以被限定范围,因此不同的存储前缀可以有不同的密钥信息,例如允许在单个查询中连接跨组织的数据。密钥也可以持久化,这样就不需要在每次启动DuckDB时都指定它…

告别Kibana:Elasticsearch 桌面客户端的新变革

告别Kibana:Elasticsearch 桌面客户端的新变革 在大数据处理与分析领域,Elasticsearch 及其相关技术的应用日益广泛。长期以来,Kibana 在数据可视化与查询管理方面占据重要地位,但随着技术的不断发展,用户对于更高效、…

模块化通讯管理机在物联网系统中的应用

安科瑞刘鸿鹏 摘要 随着能源结构转型和智能化电网的推进,电力物联网逐渐成为智能电网的重要组成部分。本文以安科瑞ANet系列智能通信管理机为例,探讨其在电力物联网中的应用,包括数据采集、规约转换、边缘计算、远程控制等技术实践&#…

AAAI 2025论文分享┆一种接近全监督的无训练文档信息抽取方法:SAIL(文中附代码链接)

本推文详细介绍了一篇上海交通大学乐心怡老师课题组被人工智能顶级会议AAAI 2025录用的的最新论文《SAIL: Sample-Centric In-Context Learning for Document Information Extraction》。论文的第一作者为张金钰。该论文提出了一种无需训练的、以样本为中心的、基于上下文学习的…

SAP物料主数据界面增加客制化字段、客制化页签的方式

文章目录 前言一、不增加页签,只增加客制化字段二、增加物料主数据页签 前言 【SAP系统MM模块研究】 #SAP #MM #物料 #客制化 #物料主数据 项目上难免会遇到客户要在物料主数据的界面上,增加新字段的需求。 实现方式有: (1&…

ROS2软件架构全面解析-学习如何设计通信中间件框架

前言 ROS(Robot Operating System) 2 是一个用于开发机器人应用的软件平台,也称为机器人软件开发工具包 (SDK)。 ROS2是ROS1的迭代升级版本 ,最主要的升级点是引入DDS(Data Distribution Service)为基础的…

接口自动化测试流程、工具及其实践

🍅 点击文末小卡片,免费获取软件测试全套资料,资料在手,涨薪更快 一、接口自动化测试简介 接口自动化测试是指通过编写脚本或使用自动化工具,对软件系统的接口进行测试的过程。接口测试是软件测试中的一种重要测试类…

香橙派5plus单独编译并安装linux内核无法启动的原因分析与解决记录

1 说明 我依照官方手册编译单独编译linux内核,安装后重启出现内核启动失败的问题,编译和安装步骤如下:# 1. 克隆源码 git clone --depth1 -b orange-pi-6.1-rk35xx https://github.com/orangepi-xunlong/linux-orangepi# 2 配置源码 make rockchip_linu…

数据库知识汇总1

一. 数据库系统概述 信息需要媒体(文本、图像视频等)表现出来才能被人类所获取,媒体可以转换成比特或者符号,这些称为数据; 数据/信息的特点:爆炸式增长、无限复制、派生; 数据库是指长期长期…

Win32汇编学习笔记03.RadAsm和补丁

Win32汇编学习笔记03.RadAsm和补丁-C/C基础-断点社区-专业的老牌游戏安全技术交流社区 - BpSend.net 扫雷游戏啊下补丁 在扫雷游戏中,点关闭弹出一个确认框,确认之后再关闭,取消就不关闭 首先第一步就是确认关闭按钮响应的位置,一般都是 WM_CLOSE 的消息 ,消息响应一般都在过…

OSPF特殊区域(open shortest path first LSA Type7)

一、区域介绍 1、Stub区域 Stub区域是一种可选的配置属性。通常来说,Stub区域位于自治系统的边界,例如,只有一 个ABR的非骨干区域。在这些区域中,设备的路由表规模以及路由信息传递的数量都会大量减少。 kill 4 5类type 传递1 …

论文解读之Generative Dense Retrieval: Memory Can Be a Burden

本次论文解读,博主带来生成式稠密检索:记忆可能成为一种负担的论文分享 一、简介 生成式检索根据给定的查询,自回归地检索相关的文档标识符,在小规模的文档库中表现不错,通过使用模型参数记忆文档库,生成…

vue,使用unplugin-auto-import避免反复import,按需自动引入

项目库:https://github.com/unplugin/unplugin-auto-import 参考: https://juejin.cn/post/7012446423367024676 https://cloud.tencent.com/developer/article/2236166 背景: vue3项目中,基本所有页面都会引入vue3框架的api&…

[深度学习] 大模型学习1-大语言模型基础知识

大语言模型(Large Language Model,LLM)是一类基于Transformer架构的深度学习模型,主要用于处理与自然语言相关的各种任务。简单来说,当用户输入文本时,模型会生成相应的回复或结果。它能够完成许多任务&…

OCR图片中文字识别(Tess4j)

文章目录 Tess4J下载 tessdataJava 使用Tess4j 的 demo Tess4J Tess4J 是 Tesseract OCR 引擎的 Java 封装库,它让 Java 项目更轻松地实现 OCR(光学字符识别)功能。 下载 tessdata 下载地址:https://github.com/tesseract-ocr/…

Vue2/Vue3使用DataV

Vue2 注意vue2与3安装DataV命令命令是不同的Vue3 DataV - Vue3 官网地址 注意vue2与3安装DataV命令命令是不同的 vue3vite 与 Vue3webpack 对应安装也不同vue3vite npm install kjgl77/datav-vue3全局引入 // main.ts中全局引入 import { createApp } from vue import Da…

【JVM】总结篇-字节码篇

字节码篇 Java虚拟机的生命周期 JVM的组成 Java虚拟机的体系结构 什么是Java虚拟机 虚拟机:指以软件的方式模拟具有完整硬件系统功能、运行在一个完全隔离环境中的完整计算机系统 ,是物理机的软件实现。常用的虚拟机有VMWare,Visual Box&…

国内Ubuntu环境Docker部署Stable Diffusion入坑记录

国内Ubuntu环境Docker部署Stable Diffusion入坑记录 本文旨在记录使用dockerpython进行部署 stable-diffusion-webui 项目时遇到的一些问题,以及解决方案,原项目地址: https://github.com/AUTOMATIC1111/stable-diffusion-webui 问题一览: …

音频进阶学习九——离散时间傅里叶变换DTFT

文章目录 前言一、DTFT的解释1.DTFT公式2.DTFT右边释义1) 复指数 e − j ω n e^{-j\omega n} e−jωn2)序列与复指数相乘 x [ n ] ∗ e − j ω n x[n]*e^{-j\omega n} x[n]∗e−jωn复指数序列复数的共轭正交正交集 3)复指数序列求和 3.DTF…