什么是RAG?
首先我们给出RAG的定义:RAG(Retrieval-Augmented Generation)技术是一种结合了信息检索(Retrieval)和生成式模型(Generation)的人工智能方法。对于用户的Query,它首先通过检索系统从大规模知识库中提取相关信息,然后将这些信息输入到生成式模型中,以生成更加准确和上下文相关的文本。RAG技术在自然语言处理任务中表现出色,特别是在需要结合外部知识来生成文本的场景,如问答系统、对话生成和文档摘要等。通过这种方式,RAG能够有效地利用外部知识来增强生成模型的性能,从而提供更高质量的输出。
简单来说:RAG(检索增强生成) = 检索技术 + LLM 提示
为什么使用RAG?
要理解RAG为什么会出现,我们要先看看大模型的局限性:
- 知识的局限性:模型自身的知识完全源于它的训练数据,而现有的主流大模型(ChatGPT、文心一言、通义千问…)的训练集基本都是构建于网络公开的数据,对于一些实时性的、非公开的或离线的数据是无法获取到的,这部分知识也就无从具备。
- 幻觉问题不可避免:所有的AI模型的底层原理都是基于数学概率,其模型输出实质上是一系列数值运算,大模型也不例外,所以它有时候会一本正经地胡说八道,尤其是在大模型自身不具备某一方面的知识或不擅长的场景。而这种幻觉问题的区分是比较困难的,因为它要求使用者自身具备相应领域的知识。论文《Calibrated Language Models Must Hallucinate》证明了预训练语言模型对特定类型的事实产生幻觉存在一个固有的统计学原因,而与 Transformer 架构或数据质量无关。
- 数据安全性:对于企业来说,数据安全至关重要,没有企业愿意承担数据泄露的风险,将自身的私域数据上传第三方平台进行训练。这也导致完全依赖通用大模型自身能力的应用方案不得不在数据安全和效果方面进行取舍。
为了克服这些限制,提高模型的能力,有两种主要途径:一种是微调(Fine Tune)来更新模型,另一种是让他们能够与外部世界互动,以不同的形式和方式获取知识。
- 微调(Fine Tuning)
微调固然效果好,可以让模型真正的“学会”一些私域知识。但是微调也会带来几个问题:首先,由于生成模型依赖于内在知识(权重),因此模型还是无法摆脱幻觉的产生,在对理解门槛高且准确性要求严格的场景下,这就是完全无法接受的,因为用户很难从回答的表面看出模型是否是在胡说八道。其次,在真实场景中,每时每刻都在产生大量数据,对一个事物的概念会迭代的飞快,如某个政策的解读、某个指标的调整等。而模型微调并不是一个简单的工作,无论是从数据准备、算力资源、微调效果、训练时间等各个角度来看,随时用新产生的数据来进行微调都是不现实的,且最终微调的效果也无法保证,能够做到每月更新一次都已经是很理想的状态。最后,对于超大型的大语言模型来说,频繁微调的成本是难以承受的。
- 检索增强生成(RAG)
RAG的主要作用类似搜索引擎,找到用户提问最相关的知识或者是相关的对话历史,并结合原始提问(查询),创造信息丰富的prompt,指导模型生成准确输出。其本质上应用了情境学习(In-Context Learning)的原理。RAG(Retrieval Augmented Generation)为生成式模型提供了与外部世界互动提供了一个很有前景的解决方案。
RAG的工作流程
RAG(Retrieval-Augmented Generation)流程是一种结合信息检索与生成模型的技术,旨在提高生成模型的准确性与效果,尤其在处理开放域问题时,能利用外部知识库或文档增强模型的回答质量。其流程通常分为以下几个步骤:
构建知识库:
-
在构建一个高效的RAG系统时,首要步骤是准备知识文档。接着执行一项关键步骤:文档切片。我们需要将长篇文档分割成多个文本块,以便更高效地处理和检索信息。这不仅有助于减轻模型的负担,还能提高信息检索的准确性。
-
向量化:将文档切片转化为向量是RAG系统中至关重要的一步。一般来说,使用各种Embedding模型来将每个文档切片转化为高维向量表示。这个过程的目的是为了使文本可以在高维空间中进行高效的相似度比较和检索。文档切片的向量可以存储在向量数据库(如FAISS、Pinecone、Weaviate等)中,供后续的检索使用。
用户检索:
-
用户提出问题时,首先将问题文本进行同样的向量化处理,得到问题的向量表示。然后,通过向量数据库进行相似度检索,从知识库中找到与用户问题最相关的文档切片。常用的相似度度量方法包括余弦相似度、欧几里得距离等。通常,检索系统会返回多个与问题最相关的文档或文档片段。
-
一旦检索到相关文档切片,接下来,生成模型(如GPT、Qwen等)会根据这些文档切片与用户的问题生成答案。生成过程不仅依赖于模型本身的语言理解和生成能力,还可以借助检索到的相关信息,增强回答的准确性与丰富性。这一过程是RAG的核心,生成模型会将检索到的文本块作为额外的上下文,提升其回答的质量。
-
生成的答案可能需要一些后处理步骤,比如去除不必要的内容,确保语言流畅性,以及在某些情况下对答案进行事实检查。最终,生成的答案将返回给用户。
实现
有了上述知识,接下来就可以实现一个基于MindSpore和MindNLP的RAG简易架构,代码开源在https://github.com/ResDream/MindTinyRAG
首先我们需要一个阅读数据的Reader:
import os
import PyPDF2
import markdown
import json
import tiktoken
from bs4 import BeautifulSoup
import re
enc = tiktoken.get_encoding("cl100k_base")
class ReadFiles:
"""
class to read files
"""
def __init__(self, path: str) -> None:
self._path = path
self.file_list = self.get_files()
def get_files(self):
# args:dir_path,目标文件夹路径
file_list = []
for filepath, dirnames, filenames in os.walk(self._path):
# os.walk 函数将递归遍历指定文件夹
for filename in filenames:
# 通过后缀名判断文件类型是否满足要求
if filename.endswith(".md"):
# 如果满足要求,将其绝对路径加入到结果列表
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".txt"):
file_list.append(os.path.join(filepath, filename))
elif filename.endswith(".pdf"):
file_list.append(os.path.join(filepath, filename))
return file_list
def get_content(self, max_token_len: int = 600, cover_content: int = 150):
docs = []
# 读取文件内容
for file in self.file_list:
content = self.read_file_content(file)
chunk_content = self.get_chunk(
content, max_token_len=max_token_len, cover_content=cover_content)
docs.extend(chunk_content)
return docs
@classmethod
def get_chunk(cls, text: str, max_token_len: int = 600, cover_content: int = 150):
chunk_text = []
curr_len = 0
curr_chunk = ''
token_len = max_token_len - cover_content
lines = text.splitlines() # 假设以换行符分割文本为行
for line in lines:
line = line.replace(' ', '')
line_len = len(enc.encode(line))
if line_len > max_token_len:
# 如果单行长度就超过限制,则将其分割成多个块
num_chunks = (line_len + token_len - 1) // token_len
for i in range(num_chunks):
start = i * token_len
end = start + token_len
# 避免跨单词分割
while not line[start:end].rstrip().isspace():
start += 1
end += 1
if start >= line_len:
break
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
# 处理最后一个块
start = (num_chunks - 1) * token_len
curr_chunk = curr_chunk[-cover_content:] + line[start:end]
chunk_text.append(curr_chunk)
if curr_len + line_len <= token_len:
curr_chunk += line
curr_chunk += '\n'
curr_len += line_len
curr_len += 1
else:
chunk_text.append(curr_chunk)
curr_chunk = curr_chunk[-cover_content:]+line
curr_len = line_len + cover_content
if curr_chunk:
chunk_text.append(curr_chunk)
return chunk_text
@classmethod
def read_file_content(cls, file_path: str):
# 根据文件扩展名选择读取方法
if file_path.endswith('.pdf'):
return cls.read_pdf(file_path)
elif file_path.endswith('.md'):
return cls.read_markdown(file_path)
elif file_path.endswith('.txt'):
return cls.read_text(file_path)
else:
raise ValueError("Unsupported file type")
@classmethod
def read_pdf(cls, file_path: str):
# 读取PDF文件
with open(file_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
text = ""
for page_num in range(len(reader.pages)):
text += reader.pages[page_num].extract_text()
return text
@classmethod
def read_markdown(cls, file_path: str):
# 读取Markdown文件
with open(file_path, 'r', encoding='utf-8') as file:
md_text = file.read()
html_text = markdown.markdown(md_text)
# 使用BeautifulSoup从HTML中提取纯文本
soup = BeautifulSoup(html_text, 'html.parser')
plain_text = soup.get_text()
# 使用正则表达式移除网址链接
text = re.sub(r'http\S+', '', plain_text)
return text
@classmethod
def read_text(cls, file_path: str):
# 读取文本文件
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
class Documents:
"""
获取已分好类的json格式文档
"""
def __init__(self, path: str = '') -> None:
self.path = path
def get_content(self):
with open(self.path, mode='r', encoding='utf-8') as f:
content = json.load(f)
return content
接下来我们实现一个Embedding类
import os
from copy import copy
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
class BaseEmbeddings:
"""
Base class for embeddings
"""
def __init__(self, path: str, is_api: bool) -> None:
self.path = path
self.is_api = is_api
def get_embedding(self, text: str, model: str) -> List[float]:
raise NotImplementedError
@classmethod
def cosine_similarity(cls, vector1: List[float], vector2: List[float]) -> float:
"""
calculate cosine similarity between two vectors
"""
dot_product = np.dot(vector1, vector2)
magnitude = np.linalg.norm(vector1) * np.linalg.norm(vector2)
if not magnitude:
return 0
return dot_product / magnitude
# 使用MindNLP的SentenceTransformer实现
class MindNLPEmbedding(BaseEmbeddings):
"""
class for MindNLP embeddings
"""
def __init__(self, path: str = 'BAAI/bge-base-zh-v1.5', is_api: bool = False) -> None:
super().__init__(path, is_api)
self._model = self.load_model(path)
def get_embedding(self, text: str):
sentence_embedding = self._model.encode([text], normalize_embeddings=True)
return sentence_embedding
def load_model(self, path: str):
from mindnlp.sentence import SentenceTransformer
model = SentenceTransformer(path)
return model
@classmethod
def cosine_similarity(cls, sentence_embedding_1, sentence_embedding_2):
"""
calculate cosine similarity between two vectors
"""
similarity = sentence_embedding_1 @ sentence_embedding_2.T
return similarity
接着实现一个LLM类用于接收不同的模型进行生成:
import os
from typing import Dict, List, Optional, Tuple, Union
PROMPT_TEMPLATE = dict(
RAG_PROMPT_TEMPALTE="""使用以上下文来回答用户的问题。如果你不知道答案,请输出我不知道。总是使用中文回答。
问题: {question}
可参考的上下文:
···
{context}
···
如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。
有用的回答:""",
MindNLP_PROMPT_TEMPALTE="""先对上下文进行内容总结,再使用上下文来回答用户的问题。如果你不知道答案,请输出我不知道。总是使用中文回答。
问题: {question}
可参考的上下文:
···
{context}
···
如果给定的上下文无法让你做出回答,请回答数据库中没有这个内容,你不知道。
有用的回答:"""
)
class BaseModel:
def __init__(self, path: str = '') -> None:
self.path = path
def chat(self, prompt: str, history: List[dict], content: str) -> str:
pass
def load_model(self):
pass
class OpenAIChat(BaseModel):
def __init__(self, path: str = '', model: str = "gpt-3.5-turbo-1106") -> None:
super().__init__(path)
self.model = model
def chat(self, prompt: str, history: List[dict], content: str) -> str:
from openai import OpenAI
client = OpenAI()
client.api_key = os.getenv("OPENAI_API_KEY")
client.base_url = os.getenv("OPENAI_BASE_URL")
history.append({'role': 'user',
'content': PROMPT_TEMPLATE['RAG_PROMPT_TEMPALTE'].format(question=prompt, context=content)})
response = client.chat.completions.create(
model=self.model,
messages=history,
max_tokens=150,
temperature=0.1
)
return response.choices[0].message.content
class MindNLPChat(BaseModel):
def __init__(self, path: str = '') -> None:
super().__init__(path)
self.load_model()
def chat(self, prompt: str, history: List = [], content: str = '') -> str:
prompt = PROMPT_TEMPLATE['MindNLP_PROMPT_TEMPALTE'].format(question=prompt, context=content)
response, history = self.model.chat(self.tokenizer, prompt, history, max_length=512)
return response
def load_model(self):
import mindspore
from mindnlp.transformers import AutoTokenizer, AutoModelForCausalLM
self.tokenizer = AutoTokenizer.from_pretrained(self.path, mirror="huggingface")
self.model = AutoModelForCausalLM.from_pretrained(self.path, ms_dtype=mindspore.float16, mirror="huggingface")
最后实现一个简单的知识库:
import os
from typing import Dict, List, Optional, Tuple, Union
import json
import numpy as np
from tqdm import tqdm
from RAG.Embeddings import BaseEmbeddings, MindNLPEmbedding
# MindNLP的SentenceTransformer实现
class VectorStore:
def __init__(self, document: List[str] = ['']) -> None:
self.document = document
def get_vector(self, EmbeddingModel: BaseEmbeddings):
self.vectors = []
for doc in tqdm(self.document, desc="Calculating embeddings"):
self.vectors.append(EmbeddingModel.get_embedding(doc))
return self.vectors
def persist(self, path: str = 'storage'):
if not os.path.exists(path):
os.makedirs(path)
with open(f"{path}/document.json", 'w', encoding='utf-8') as f:
json.dump(self.document, f, ensure_ascii=False)
if self.vectors:
# 将 numpy.ndarray 转换为列表
vectors_list = [vector.tolist() for vector in self.vectors]
with open(f"{path}/vectors.json", 'w', encoding='utf-8') as f:
json.dump(vectors_list, f)
def load_vector(self, EmbeddingModel: BaseEmbeddings, path: str = 'storage'):
with open(f"{path}/vectors.json", 'r', encoding='utf-8') as f:
vectors_list = json.load(f)
with open(f"{path}/document.json", 'r', encoding='utf-8') as f:
self.document = json.load(f)
# 查询 EmbeddingModel 的类别
if isinstance(EmbeddingModel, MindNLPEmbedding):
# 将列表重新变为 numpy.ndarray
self.vectors = [np.array(vector) for vector in vectors_list]
else:
self.vectors = vectors_list
def get_similarity(self, vector1, vector2, EmbeddingModel: BaseEmbeddings):
return EmbeddingModel.cosine_similarity(vector1, vector2)
def query(self, query: str, EmbeddingModel: BaseEmbeddings, k: int = 1):
# 获取查询字符串的嵌入向量
query_vector = EmbeddingModel.get_embedding(query)
# 计算查询向量与数据库中每个向量的相似度
similarities = [self.get_similarity(query_vector, vector, EmbeddingModel) for vector in self.vectors]
# 将相似度、向量和文档存储在一个列表中
results = []
for similarity, vector, document in zip(similarities, self.vectors, self.document):
results.append({
'similarity': similarity,
'vector': vector,
'document': document
})
# 按相似度从高到低排序
results.sort(key=lambda x: x['similarity'], reverse=True)
# 获取最相似的 k 个文档
top_k_documents = [result['document'] for result in results[:k]]
return top_k_documents
完成上述代码的编写,我们就可以使用这个框架了:
导入对应包
from RAG.VectorBase import VectorStore
from RAG.utils import ReadFiles
from RAG.LLM import MindNLPChat
from RAG.Embeddings import MindNLPEmbedding
from RAG.Reranker import MindNLPReranker
建立知识库
docs = ReadFiles('./data').get_content(max_token_len=600, cover_content=150) # 获得data目录下的所有文件内容并分割
vector = VectorStore(docs)
embedding = MindNLPEmbedding("BAAI/bge-base-zh-v1.5") # 创建EmbeddingModel
vector.get_vector(EmbeddingModel=embedding)
vector.persist(path='storage') # 将向量和文档内容保存到storage目录下,下次再用就可以直接加载本地的数据库
读取知识库并完成RAG
vector = VectorStore()
embedding = MindNLPEmbedding("BAAI/bge-base-zh-v1.5") # 创建EmbeddingModel
vector.load_vector(EmbeddingModel=embedding, path='./storage') # 加载本地的数据库
question = 'git如何新建分支?'
content = vector.query(question, EmbeddingModel=embedding, k=1)[0]
print(content)
chat = MindNLPChat(path='openbmb/MiniCPM-2B-dpo-bf16')
print(chat.chat(question, [], content))
我们也可以实现一个简单的Reranker来增强RAG的能力
from typing import List
import numpy as np
class BaseReranker:
"""
Base class for reranker
"""
def __init__(self, path: str) -> None:
self.path = path
def rerank(self, text: str, content: List[str], k: int) -> List[str]:
raise NotImplementedError
class MindNLPReranker(BaseReranker):
"""
class for MindNLP reranker
"""
def __init__(self, path: str = 'BAAI/bge-reranker-base') -> None:
super().__init__(path)
self._model= self.load_model(path)
def rerank(self, text: str, content: List[str], k: int) -> List[str]:
query_embedding = self._model.encode(text, normalize_embeddings=True)
sentences_embedding = self._model.encode(sentences=content, normalize_embeddings=True)
similarity = query_embedding @ sentences_embedding.T
# 获取按相似度排序后的索引
ranked_indices = np.argsort(similarity)[::-1] # 按相似度降序排序
# 选择前 k 个最相关的候选内容
top_k_sentences = [content[i] for i in ranked_indices[:k]]
return top_k_sentences
def load_model(self, path: str):
from mindnlp.sentence import SentenceTransformer
model = SentenceTransformer(path)
return model
使用Reranker
embedding = MindNLPEmbedding("BAAI/bge-base-zh-v1.5") # 创建EmbeddingModel
# 创建RerankerModel
reranker = MindNLPReranker('BAAI/bge-reranker-base')
if have_created_db:
# 保存数据库之后
vector = VectorStore()
vector.load_vector(EmbeddingModel=embedding, path='./storage') # 加载本地的数据库
else:
# 没有保存数据库
docs = ReadFiles('./data').get_content(max_token_len=600, cover_content=150) # 获得data目录下的所有文件内容并分割
vector = VectorStore(docs)
vector.get_vector(EmbeddingModel=embedding)
vector.persist(path='storage') # 将向量和文档内容保存到storage目录下,下次再用就可以直接加载本地的数据库
question = '远程仓库的协作与贡献有哪些?'
# 从向量数据库中查询出最相似的3个文档
content = vector.query(question, EmbeddingModel=embedding, k=3)
print(content)
# 从一阶段查询结果中用Reranker再次筛选出最相似的2个文档
rerank_content = reranker.rerank(question, content, k=2)
print(rerank_content)
# 最后选择最相似的文档, 交给LLM作为可参考上下文
best_content = rerank_content[0]
chat = MindNLPChat(path='openbmb/MiniCPM-2B-dpo-bf16')
print(chat.chat(question, [], best_content))