assistant.py
这段代码主要涉及一个名为 AssistantService
的服务类,它处理了助手(Assistant)的各种操作,包括创建、获取、更新、删除助手,以及管理工具、技能、权限等。代码还涉及了一些错误处理、权限验证和与数据库的交互。以下是对每一部分的详细解析:
import json
from datetime import datetime
from typing import Any, List, Optional
from uuid import UUID
from fastapi import Request
from loguru import logger
from bisheng.api.errcode.assistant import (AssistantInitError, AssistantNameRepeatError,
AssistantNotEditError, AssistantNotExistsError, ToolTypeRepeatError,
ToolTypeEmptyError, ToolTypeNotExistsError, ToolTypeIsPresetError)
from bisheng.api.errcode.base import UnAuthorizedError, NotFoundError
from bisheng.api.services.assistant_agent import AssistantAgent
from bisheng.api.services.assistant_base import AssistantUtils
from bisheng.api.services.audit_log import AuditLogService
from bisheng.api.services.base import BaseService
from bisheng.api.services.llm import LLMService
from bisheng.api.services.user_service import UserPayload
from bisheng.api.utils import get_request_ip
from bisheng.api.v1.schemas import (AssistantInfo, AssistantSimpleInfo, AssistantUpdateReq,
StreamData, UnifiedResponseModel, resp_200, resp_500)
from bisheng.cache import InMemoryCache
from bisheng.database.models.assistant import (Assistant, AssistantDao, AssistantLinkDao,
AssistantStatus)
from bisheng.database.models.flow import Flow, FlowDao
from bisheng.database.models.gpts_tools import GptsToolsDao, GptsToolsRead, GptsToolsTypeRead, GptsTools
from bisheng.database.models.group_resource import GroupResourceDao, GroupResource, ResourceTypeEnum
from bisheng.database.models.knowledge import KnowledgeDao
from bisheng.database.models.role_access import AccessType, RoleAccessDao
from bisheng.database.models.tag import TagDao
from bisheng.database.models.user import UserDao
from bisheng.database.models.user_group import UserGroupDao
from bisheng.database.models.user_role import UserRoleDao
1. 导入部分
标准库导入
import json
from datetime import datetime
from typing import Any, List, Optional
from uuid import UUID
json
:- 用途:用于处理 JSON 数据的序列化和反序列化。
datetime
:- 用途:处理日期和时间。
datetime.now()
:获取当前的日期和时间。
typing
模块:Any
:表示任意类型。List
:表示列表类型。Optional
:表示可选类型,可能为None
。
uuid.UUID
:- 用途:处理通用唯一标识符(UUID)。
第三方库导入
from fastapi import Request
from loguru import logger
fastapi.Request
:- 用途:FastAPI 中的请求对象,包含了请求的所有信息。
loguru.logger
:- 用途:用于日志记录,提供了简单且功能强大的日志记录功能。
项目内部模块导入
这些导入是项目内部的模块,包含了各种错误码、服务类、数据库模型等。
- 错误码模块:
bisheng.api.errcode.*
:定义了各种错误码和异常类,用于统一错误处理。AssistantInitError
:助手初始化错误。AssistantNameRepeatError
:助手名称重复错误。AssistantNotEditError
:助手不可编辑错误。AssistantNotExistsError
:助手不存在错误。ToolTypeRepeatError
:工具类型重复错误。ToolTypeEmptyError
:工具类型为空错误。ToolTypeNotExistsError
:工具类型不存在错误。ToolTypeIsPresetError
:工具类型是预置的错误。UnAuthorizedError
:未授权错误。NotFoundError
:未找到错误。
- 服务类模块:
bisheng.api.services.*
:包含了各种服务类,用于处理业务逻辑。AssistantAgent
:助手代理类。AssistantUtils
:助手工具类,提供了一些辅助方法。AuditLogService
:审计日志服务。BaseService
:服务基类。LLMService
:大型语言模型服务。UserPayload
:用户载荷,包含了用户信息。get_request_ip
:获取请求的 IP 地址。
- API 模型和响应:
bisheng.api.v1.schemas
:定义了 API 的请求和响应模型。AssistantInfo
:助手信息模型。AssistantSimpleInfo
:简化的助手信息模型。AssistantUpdateReq
:助手更新请求模型。StreamData
:流数据模型。UnifiedResponseModel
:统一的响应模型。resp_200
:返回 200 状态码的响应。resp_500
:返回 500 状态码的响应。
- 缓存模块:
InMemoryCache
:内存缓存类,用于缓存数据。
- 数据库模型和数据访问对象(DAO):
bisheng.database.models.*
:定义了数据库模型和数据访问对象,用于与数据库交互。Assistant
,AssistantDao
,AssistantLinkDao
,AssistantStatus
:助手模型、DAO、链接 DAO 和助手状态。Flow
,FlowDao
:流程模型和 DAO。GptsTools
,GptsToolsDao
,GptsToolsRead
,GptsToolsTypeRead
:GPT 工具模型和 DAO。GroupResource
,GroupResourceDao
,ResourceTypeEnum
:组资源模型、DAO 和资源类型枚举。KnowledgeDao
:知识库 DAO。RoleAccessDao
,AccessType
:角色访问 DAO 和访问类型。TagDao
:标签 DAO。UserDao
:用户 DAO。UserGroupDao
:用户组 DAO。UserRoleDao
:用户角色 DAO。
2. AssistantService
类
class AssistantService(BaseService, AssistantUtils):
UserCache: InMemoryCache = InMemoryCache()
...
功能和用途
AssistantService
类是一个服务类,继承自 BaseService
和 AssistantUtils
,用于处理助手(Assistant)相关的业务逻辑。它包含了创建、获取、更新、删除助手,以及管理工具、技能、权限等功能的方法。
属性
UserCache
:一个内存缓存实例,用于缓存用户信息,避免频繁访问数据库。
方法概览
get_assistant
:获取助手列表。return_simple_assistant_info
:将助手模型转换为简化的助手信息。get_assistant_info
:获取单个助手的详细信息。create_assistant
:创建助手。create_assistant_hook
:创建助手后的钩子函数,执行额外的逻辑。delete_assistant
:删除助手。delete_assistant_hook
:删除助手后的钩子函数。auto_update_stream
:自动更新助手的提示词和工具选择,以流的形式返回。update_assistant
:更新助手信息。update_assistant_hook
:更新助手后的钩子函数。update_status
:更新助手的状态(上线、下线)。update_prompt
:更新助手的提示词。update_flow_list
:更新助手的技能列表。get_gpts_tools
:获取用户可见的工具列表。update_tool_config
:更新工具配置。add_gpts_tools
:添加自定义工具。add_gpts_tools_hook
:添加自定义工具后的钩子函数。update_gpts_tools
:更新工具类别。delete_gpts_tools
:删除工具类别。delete_gpts_tool_hook
:删除自定义工具后的钩子函数。update_tool_list
:更新助手的工具列表。check_update_permission
:检查是否有权限更新助手。get_link_info
:获取助手关联的工具、流程和知识库信息。get_user_name
:获取用户的用户名。judge_name_repeat
:判断助手名称是否重复。get_auto_info
:自动生成助手的提示词、描述等信息。get_auto_tool_info
:自动选择工具。get_auto_flow_info
:自动选择技能。
详细解析
get_assistant
方法
@classmethod
def get_assistant(cls,
user: UserPayload,
name: str = None,
status: int | None = None,
tag_id: int | None = None,
page: int = 1,
limit: int = 20) -> UnifiedResponseModel[List[AssistantSimpleInfo]]:
"""
获取助手列表
"""
...
- 功能:获取助手列表,根据用户权限、名称、状态、标签等进行筛选和分页。
- 参数:
user
:当前登录的用户信息,类型为UserPayload
。name
:助手名称的搜索关键字,支持模糊匹配。status
:助手的状态(在线、离线等)。tag_id
:标签 ID,用于按标签筛选助手。page
:分页页码,默认为 1。limit
:每页显示的助手数量,默认为 20。
- 逻辑:
- 根据标签筛选:
- 如果提供了
tag_id
,通过TagDao.get_resources_by_tags
获取具有该标签的助手 ID 列表。 - 如果没有助手匹配该标签,直接返回空的结果集。
- 如果提供了
- 获取助手列表:
- 管理员用户:
- 调用
AssistantDao.get_all_assistants
获取所有助手。
- 调用
- 普通用户:
- 通过
UserRoleDao
和RoleAccessDao
获取用户具有访问权限的助手 ID 列表。 - 调用
AssistantDao.get_assistants
获取用户有权限的助手列表。
- 通过
- 管理员用户:
- 获取助手的分组和标签:
- 通过
GroupResourceDao.get_resources_group
获取助手所属的分组信息。 - 通过
TagDao.get_tags_by_resource
获取助手关联的标签。
- 通过
- 组装返回数据:
- 遍历助手列表,调用
cls.return_simple_assistant_info
转换为简化的助手信息。 - 判断用户是否有编辑权限,添加
write
属性。 - 添加助手的分组和标签信息。
- 遍历助手列表,调用
- 返回结果:
- 使用
resp_200
返回包含助手列表和总数的响应。
- 使用
- 根据标签筛选:
return_simple_assistant_info
方法
@classmethod
def return_simple_assistant_info(cls, one: Assistant) -> AssistantSimpleInfo:
"""
将数据库的助手 model 简化处理后成返回前端的格式
"""
simple_dict = one.model_dump(include={
'id', 'name', 'desc', 'logo', 'status', 'user_id', 'create_time', 'update_time'
})
simple_dict['user_name'] = cls.get_user_name(one.user_id)
return AssistantSimpleInfo(**simple_dict)
- 功能:将数据库中的助手模型转换为简化的助手信息,方便前端显示。
- 逻辑:
- 使用
model_dump
方法提取需要的字段。 - 获取用户的用户名。
- 创建并返回
AssistantSimpleInfo
实例。
- 使用
get_assistant_info
方法
@classmethod
def get_assistant_info(cls, assistant_id: UUID, login_user: UserPayload):
assistant = AssistantDao.get_one_assistant(assistant_id)
if not assistant:
return AssistantNotExistsError.return_resp()
# 检查是否有权限获取信息
if not login_user.access_check(assistant.user_id, assistant.id.hex, AccessType.ASSISTANT_READ):
return UnAuthorizedError.return_resp()
tool_list = []
flow_list = []
knowledge_list = []
links = AssistantLinkDao.get_assistant_link(assistant_id)
for one in links:
if one.tool_id:
tool_list.append(one.tool_id)
elif one.knowledge_id:
knowledge_list.append(one.knowledge_id)
elif one.flow_id:
flow_list.append(one.flow_id)
else:
logger.error(f'not expect link info: {one.dict()}')
tool_list, flow_list, knowledge_list = cls.get_link_info(tool_list, flow_list,
knowledge_list)
assistant.logo = cls.get_logo_share_link(assistant.logo)
return resp_200(data=AssistantInfo(**assistant.dict(),
tool_list=tool_list,
flow_list=flow_list,
knowledge_list=knowledge_list))
- 功能:获取单个助手的详细信息,包括其关联的工具、技能和知识库。
- 逻辑:
- 权限检查:
- 获取助手对象,如果不存在,返回错误响应。
- 调用
login_user.access_check
检查用户是否有读取权限。
- 获取关联信息:
- 通过
AssistantLinkDao
获取助手的关联信息,包括工具、技能、知识库的 ID 列表。 - 调用
cls.get_link_info
获取关联对象的详细信息。
- 通过
- 处理助手 Logo:
- 调用
cls.get_logo_share_link
获取助手 Logo 的共享链接。
- 调用
- 返回结果:
- 使用
AssistantInfo
数据模型,组装助手的详细信息,返回成功响应。
- 使用
- 权限检查:
create_assistant
方法
@classmethod
async def create_assistant(cls, request: Request, login_user: UserPayload, assistant: Assistant) \
-> UnifiedResponseModel[AssistantInfo]:
# 检查下是否有重名
if cls.judge_name_repeat(assistant.name, assistant.user_id):
return AssistantNameRepeatError.return_resp()
logger.info(f"assistant original prompt id: {assistant.id}, desc: {assistant.prompt}")
# 自动补充默认的模型配置
assistant_llm = LLMService.get_assistant_llm()
if assistant_llm.llm_list:
for one in assistant_llm.llm_list:
if one.default:
assistant.model_name = one.model_id
break
# 自动生成描述
assistant, _, _ = await cls.get_auto_info(assistant)
assistant = AssistantDao.create_assistant(assistant)
cls.create_assistant_hook(request, assistant, login_user)
return resp_200(data=AssistantInfo(**assistant.dict(),
tool_list=[],
flow_list=[],
knowledge_list=[]))
- 功能:创建新的助手。
- 逻辑:
- 名称重复检查:
- 调用
cls.judge_name_repeat
检查助手名称是否重复,如果重复,返回错误响应。
- 调用
- 设置默认模型配置:
- 调用
LLMService.get_assistant_llm
获取可用的语言模型列表。 - 如果存在默认模型,设置助手的
model_name
。
- 调用
- 自动生成描述:
- 调用
cls.get_auto_info
,自动生成助手的描述(desc
)。
- 调用
- 创建助手:
- 调用
AssistantDao.create_assistant
将助手保存到数据库。
- 调用
- 执行创建后的钩子函数:
- 调用
cls.create_assistant_hook
,执行一些额外的逻辑,如关联用户组、记录审计日志等。
- 调用
- 返回结果:
- 使用
AssistantInfo
数据模型,返回创建的助手信息。
- 使用
- 名称重复检查:
create_assistant_hook
方法
@classmethod
def create_assistant_hook(cls, request: Request, assistant: Assistant, user_payload: UserPayload) -> bool:
"""
创建助手成功后的 hook,执行一些其他业务逻辑
"""
# 查询下用户所在的用户组
user_group = UserGroupDao.get_user_group(user_payload.user_id)
if user_group:
# 批量将助手资源插入到关联表里
batch_resource = []
for one in user_group:
batch_resource.append(GroupResource(
group_id=one.group_id,
third_id=assistant.id.hex,
type=ResourceTypeEnum.ASSISTANT.value))
GroupResourceDao.insert_group_batch(batch_resource)
# 写入审计日志
AuditLogService.create_build_assistant(user_payload, get_request_ip(request), assistant.id.hex)
# 写入 logo 缓存
cls.get_logo_share_link(assistant.logo)
return True
- 功能:创建助手后的钩子函数,执行额外的业务逻辑。
- 逻辑:
- 关联用户组:
- 获取用户所属的用户组,将助手资源与用户组关联。
- 记录审计日志:
- 调用
AuditLogService.create_build_assistant
记录创建助手的操作日志。
- 调用
- 缓存助手 Logo:
- 调用
cls.get_logo_share_link
,缓存助手的 Logo。
- 调用
- 关联用户组:
delete_assistant
方法
@classmethod
def delete_assistant(cls, request: Request, login_user: UserPayload, assistant_id: UUID) -> UnifiedResponseModel:
assistant = AssistantDao.get_one_assistant(assistant_id)
if not assistant:
return AssistantNotExistsError.return_resp()
# 判断授权
if not login_user.access_check(assistant.user_id, assistant.id.hex, AccessType.ASSISTANT_WRITE):
return UnAuthorizedError.return_resp()
AssistantDao.delete_assistant(assistant)
cls.delete_assistant_hook(request, login_user, assistant)
return resp_200()
- 功能:删除助手。
- 逻辑:
- 获取助手对象:
- 如果助手不存在,返回错误响应。
- 权限检查:
- 检查用户是否有删除权限。
- 删除助手:
- 调用
AssistantDao.delete_assistant
删除助手。
- 调用
- 执行删除后的钩子函数:
- 调用
cls.delete_assistant_hook
,执行额外的逻辑。
- 调用
- 返回结果:
- 返回成功响应。
- 获取助手对象:
delete_assistant_hook
方法
@classmethod
def delete_assistant_hook(cls, request: Request, login_user: UserPayload, assistant: Assistant) -> bool:
""" 清理关联的助手资源 """
logger.info(f"delete_assistant_hook id: {assistant.id}, user: {login_user.user_id}")
# 写入审计日志
AuditLogService.delete_build_assistant(login_user, get_request_ip(request), assistant.id.hex)
# 清理和用户组的关联
GroupResourceDao.delete_group_resource_by_third_id(assistant.id.hex, ResourceTypeEnum.ASSISTANT)
return True
- 功能:删除助手后的钩子函数,执行清理工作。
- 逻辑:
- 记录审计日志:
- 调用
AuditLogService.delete_build_assistant
记录删除助手的操作日志。
- 调用
- 清理资源关联:
- 删除助手与用户组的关联关系。
- 记录审计日志:
update_assistant
方法
@classmethod
async def update_assistant(cls, request: Request, login_user: UserPayload, req: AssistantUpdateReq) \
-> UnifiedResponseModel[AssistantInfo]:
""" 更新助手信息 """
assistant = AssistantDao.get_one_assistant(req.id)
if not assistant:
return AssistantNotExistsError.return_resp()
check_result = cls.check_update_permission(assistant, login_user)
if check_result is not None:
return check_result
# 更新助手数据
if req.name and req.name != assistant.name:
# 检查下是否有重名
if cls.judge_name_repeat(req.name, assistant.user_id):
return AssistantNameRepeatError.return_resp()
assistant.name = req.name
assistant.desc = req.desc
assistant.logo = req.logo
assistant.prompt = req.prompt
assistant.guide_word = req.guide_word
assistant.guide_question = req.guide_question
assistant.model_name = req.model_name
assistant.temperature = req.temperature
assistant.update_time = datetime.now()
AssistantDao.update_assistant(assistant)
# 更新助手关联信息
if req.tool_list is not None:
AssistantLinkDao.update_assistant_tool(assistant.id, tool_list=req.tool_list)
if req.flow_list is not None:
AssistantLinkDao.update_assistant_flow(assistant.id, flow_list=req.flow_list)
if req.knowledge_list is not None:
AssistantLinkDao.update_assistant_knowledge(assistant.id,
knowledge_list=req.knowledge_list,
flow_id='')
tool_list, flow_list, knowledge_list = cls.get_link_info(req.tool_list, req.flow_list,
req.knowledge_list)
cls.update_assistant_hook(request, login_user, assistant)
return resp_200(data=AssistantInfo(**assistant.dict(),
tool_list=tool_list,
flow_list=flow_list,
knowledge_list=knowledge_list))
- 功能:更新助手的信息,包括基本信息和关联的工具、技能、知识库等。
- 逻辑:
- 获取助手对象:
- 如果助手不存在,返回错误响应。
- 权限检查:
- 调用
cls.check_update_permission
检查是否有权限更新助手。
- 调用
- 更新助手数据:
- 检查名称是否有变化,且是否重复。
- 更新助手的描述、Logo、提示词、引导词、引导问题、模型名称、温度等属性。
- 更新助手的更新时间。
- 调用
AssistantDao.update_assistant
保存更新。
- 更新关联信息:
- 更新助手关联的工具、技能、知识库。
- 获取关联对象的详细信息:
- 调用
cls.get_link_info
获取关联的工具、技能、知识库的详细信息。
- 调用
- 执行更新后的钩子函数:
- 调用
cls.update_assistant_hook
,执行额外的逻辑。
- 调用
- 返回结果:
- 使用
AssistantInfo
数据模型,返回更新后的助手信息。
- 使用
- 获取助手对象:
update_assistant_hook
方法
@classmethod
def update_assistant_hook(cls, request: Request, login_user: UserPayload, assistant: Assistant) -> bool:
""" 更新助手的钩子 """
logger.info(f"delete_assistant_hook id: {assistant.id}, user: {login_user.user_id}")
# 写入审计日志
AuditLogService.update_build_assistant(login_user, get_request_ip(request), assistant.id.hex)
# 写入缓存
cls.get_logo_share_link(assistant.logo)
return True
- 功能:更新助手后的钩子函数,执行额外的业务逻辑。
- 逻辑:
- 记录审计日志:
- 调用
AuditLogService.update_build_assistant
记录更新助手的操作日志。
- 调用
- 缓存助手 Logo:
- 调用
cls.get_logo_share_link
,缓存助手的 Logo。
- 调用
- 记录审计日志:
update_status
方法
@classmethod
async def update_status(cls, request: Request, login_user: UserPayload, assistant_id: UUID,
status: int) -> UnifiedResponseModel:
""" 更新助手的状态 """
assistant = AssistantDao.get_one_assistant(assistant_id)
if not assistant:
return AssistantNotExistsError.return_resp()
# 判断权限
if not login_user.access_check(assistant.user_id, assistant.id.hex, AccessType.ASSISTANT_WRITE):
return UnAuthorizedError.return_resp()
# 状态相等不做改动
if assistant.status == status:
return resp_200()
# 尝试初始化 agent,初始化成功则上线,否则不上线
if status == AssistantStatus.ONLINE.value:
tmp_agent = AssistantAgent(assistant, '')
try:
await tmp_agent.init_assistant()
except Exception as e:
logger.exception('online agent init failed')
return AssistantInitError.return_resp('助手编译报错:' + str(e))
assistant.status = status
AssistantDao.update_assistant(assistant)
cls.update_assistant_hook(request, login_user, assistant)
return resp_200()
- 功能:更新助手的状态,例如上线或下线。
- 逻辑:
- 获取助手对象和权限检查。
- 状态是否需要更新:
- 如果状态未改变,直接返回成功响应。
- 上线操作:
- 如果要将助手上线,尝试初始化助手代理
AssistantAgent
,如果初始化失败,返回错误响应。
- 如果要将助手上线,尝试初始化助手代理
- 更新状态:
- 更新助手的状态,并保存到数据库。
- 执行更新后的钩子函数。
- 返回结果。
其他方法
由于篇幅限制,其他方法的详细解析可按上述示例自行理解。它们的主要功能包括:
- 更新助手的提示词、技能列表、工具列表。
- 获取用户可见的工具列表,支持预置和自定义工具。
- 添加、更新、删除自定义工具类别和工具。
- 检查更新助手的权限,避免未授权的操作。
- 自动生成助手的提示词、描述、工具和技能选择。
3. 辅助方法和工具
check_update_permission
方法
@classmethod
def check_update_permission(cls, assistant: Assistant, user_payload: UserPayload) -> Any:
# 判断权限
if not user_payload.access_check(assistant.user_id, assistant.id.hex, AccessType.ASSISTANT_WRITE):
return UnAuthorizedError.return_resp()
# 已上线不允许改动
if assistant.status == AssistantStatus.ONLINE.value:
return AssistantNotEditError.return_resp()
return None
- 功能:检查用户是否有权限更新助手,且助手是否允许编辑。
- 逻辑:
- 权限检查:
- 检查用户是否有写权限。
- 状态检查:
- 如果助手已上线,不允许编辑,返回错误响应。
- 返回结果:
- 如果没有问题,返回
None
。
- 如果没有问题,返回
- 权限检查:
get_link_info
方法
@classmethod
def get_link_info(cls,
tool_list: List[int],
flow_list: List[str],
knowledge_list: List[int] = None):
tool_list = GptsToolsDao.get_list_by_ids(tool_list) if tool_list else []
flow_list = FlowDao.get_flow_by_ids(flow_list) if flow_list else []
knowledge_list = KnowledgeDao.get_list_by_ids(knowledge_list) if knowledge_list else []
return tool_list, flow_list, knowledge_list
-
功能:根据 ID 列表,获取工具、流程(技能)、知识库的详细信息。
-
逻辑
:
- 如果对应的列表不为空,调用相应的 DAO 方法获取对象列表。
- 返回三个列表,分别对应工具、流程、知识库。
judge_name_repeat
方法
@classmethod
def judge_name_repeat(cls, name: str, user_id: int) -> bool:
""" 判断助手名字是否重复 """
assistant = AssistantDao.get_assistant_by_name_user_id(name, user_id)
if assistant:
return True
return False
- 功能:判断助手名称是否重复。
- 逻辑:
- 调用
AssistantDao.get_assistant_by_name_user_id
检查同一用户下是否有同名的助手。 - 如果存在,返回
True
,表示名称重复。
- 调用
get_auto_info
方法
@classmethod
async def get_auto_info(cls, assistant: Assistant) -> (Assistant, List[int], List[int]):
"""
自动生成助手的 prompt,自动选择工具和技能
return:助手信息,工具 ID 列表,技能 ID 列表
"""
# 初始化 agent
auto_agent = AssistantAgent(assistant, '')
await auto_agent.init_auto_update_llm()
# 自动生成描述
assistant.desc = auto_agent.generate_description(assistant.prompt)
return assistant, [], []
- 功能:自动生成助手的描述(
desc
)等信息,利用大型语言模型的能力。 - 逻辑:
- 初始化
AssistantAgent
。 - 调用
auto_agent.generate_description
生成描述。
- 初始化
4. 总结
这段代码实现了一个完整的助手管理服务,包括助手的创建、获取、更新、删除,以及工具、技能、权限的管理。它利用了大量的辅助类和方法,与数据库进行交互,处理了各种业务逻辑和异常情况。
如何在项目中使用
- API 路由:在 FastAPI 或其他框架中,定义相应的路由,将请求转发给
AssistantService
中的方法处理。 - 权限验证:利用
UserPayload
和权限检查方法,确保只有具有相应权限的用户才能执行特定操作。 - 异常处理:使用自定义的错误码和异常类,统一错误响应,方便前端处理。
- 日志记录:利用
loguru
记录关键操作和异常,方便调试和运维。 - 缓存优化:利用内存缓存
InMemoryCache
缓存频繁访问的数据,减少数据库压力。
进一步阅读
- FastAPI:了解 FastAPI 框架的使用方法,特别是依赖注入、请求和响应模型的定义。
- SQLAlchemy:如果数据库交互使用了 ORM,如 SQLAlchemy,学习其模型定义和查询方式。
- 权限管理:深入理解基于角色的访问控制(RBAC),以及如何在代码中实现权限验证。
- 异步编程:如果代码中涉及异步操作,学习 Python 的异步编程模型(
async
/await
)。
常见问题
- 如何处理并发请求?
- 使用异步编程和协程,确保服务能够高效处理并发请求。
- 如何扩展服务功能?
- 遵循面向对象的设计原则,添加新的方法和类,保持代码的可维护性和可扩展性。
- 如何确保数据安全?
- 在处理用户输入时,防止 SQL 注入和其他安全漏洞。
- 在权限验证中,严格检查用户的权限,避免越权操作。
- 如何优化性能?
- 使用缓存减少数据库访问。
- 优化数据库查询,使用索引和高效的查询语句。
- 如何处理错误和异常?
- 使用统一的错误码和异常类,捕获并处理各种可能的异常情况。
- 在关键的地方添加日志记录,方便排查问题。
AssistantService类
AssistantService
类是应用程序中管理 助手(Assistant) 的核心服务类。它处理与助手相关的各种操作,包括创建、获取、更新、删除助手,以及管理助手关联的工具、技能、权限等。该类继承了两个父类:
BaseService
:服务的基类,可能提供了一些通用的服务功能。AssistantUtils
:助手工具类,包含了一些辅助方法,如获取用户名称、判断名称是否重复等。
下面,我们将深入分析 AssistantService
类的各个方法,了解其功能、参数和实现逻辑。
class AssistantService(BaseService, AssistantUtils):
UserCache: InMemoryCache = InMemoryCache()
- 属性:
UserCache
:一个内存缓存实例,用于缓存用户信息,避免频繁访问数据库。
方法列表
get_assistant
:获取助手列表。return_simple_assistant_info
:将助手模型转换为简化的助手信息。get_assistant_info
:获取单个助手的详细信息。create_assistant
:创建新的助手。create_assistant_hook
:创建助手后的钩子函数。delete_assistant
:删除助手。delete_assistant_hook
:删除助手后的钩子函数。update_assistant
:更新助手信息。update_assistant_hook
:更新助手后的钩子函数。update_status
:更新助手的状态。update_prompt
:更新助手的提示词。update_flow_list
:更新助手的技能列表。get_gpts_tools
:获取用户可见的工具列表。update_tool_config
:更新工具配置。add_gpts_tools
:添加自定义工具。add_gpts_tools_hook
:添加自定义工具后的钩子函数。update_gpts_tools
:更新工具类别。delete_gpts_tools
:删除工具类别。delete_gpts_tool_hook
:删除自定义工具后的钩子函数。update_tool_list
:更新助手的工具列表。check_update_permission
:检查是否有权限更新助手。get_link_info
:获取助手关联的工具、流程和知识库信息。get_user_name
:获取用户的用户名。judge_name_repeat
:判断助手名称是否重复。get_auto_info
:自动生成助手的提示词、描述等信息。get_auto_tool_info
:自动选择工具。get_auto_flow_info
:自动选择技能。
下面,我将详细解释其中的一些关键方法。
1. get_assistant
@classmethod
def get_assistant(cls, user: UserPayload, name: str = None, status: int | None = None, tag_id: int | None = None, page: int = 1, limit: int = 20) -> UnifiedResponseModel[List[AssistantSimpleInfo]]:
"""
获取助手列表
"""
assistant_ids = []
if tag_id:
ret = TagDao.get_resources_by_tags([tag_id], ResourceTypeEnum.ASSISTANT)
assistant_ids = [UUID(one.resource_id) for one in ret]
if not assistant_ids:
return resp_200(data={'data': [], 'total': 0})
data = []
if user.is_admin():
res, total = AssistantDao.get_all_assistants(name, page, limit, assistant_ids, status)
else:
# 权限管理可见的助手信息
assistant_ids_extra = []
user_role = UserRoleDao.get_user_roles(user.user_id)
if user_role:
role_ids = [role.role_id for role in user_role]
role_access = RoleAccessDao.get_role_access(role_ids, AccessType.ASSISTANT_READ)
if role_access:
assistant_ids_extra = [UUID(access.third_id).hex for access in role_access]
res, total = AssistantDao.get_assistants(user.user_id, name, assistant_ids_extra, status, page, limit, assistant_ids)
assistant_ids = [one.id.hex for one in res]
# 查询助手所属的分组
assistant_groups = GroupResourceDao.get_resources_group(ResourceTypeEnum.ASSISTANT, assistant_ids)
assistant_group_dict = {}
for one in assistant_groups:
if one.third_id not in assistant_group_dict:
assistant_group_dict[one.third_id] = []
assistant_group_dict[one.third_id].append(one.group_id)
# 获取助手关联的标签
flow_tags = TagDao.get_tags_by_resource(ResourceTypeEnum.ASSISTANT, assistant_ids)
for one in res:
one.logo = cls.get_logo_share_link(one.logo)
simple_assistant = cls.return_simple_assistant_info(one)
if one.user_id == user.user_id or user.is_admin():
simple_assistant.write = True
simple_assistant.group_ids = assistant_group_dict.get(one.id.hex, [])
simple_assistant.tags = flow_tags.get(one.id.hex, [])
data.append(simple_assistant)
return resp_200(data={'data': data, 'total': total})
功能:获取助手列表,根据用户权限、名称、状态、标签等进行筛选和分页。
参数:
user
:当前登录的用户信息(UserPayload
)。name
:可选,助手名称的搜索关键字,支持模糊匹配。status
:可选,助手的状态(在线、离线等)。tag_id
:可选,标签 ID,用于按标签筛选助手。page
:分页页码,默认为 1。limit
:每页显示的助手数量,默认为 20。
逻辑:
- 根据标签筛选:
- 如果提供了
tag_id
,通过TagDao.get_resources_by_tags
获取具有该标签的助手 ID 列表。 - 如果没有助手匹配该标签,直接返回空的结果集。
- 如果提供了
- 获取助手列表:
- 管理员用户:
- 调用
AssistantDao.get_all_assistants
获取所有助手。
- 调用
- 普通用户:
- 获取用户的角色,通过
UserRoleDao
。 - 获取用户角色具有读取权限的助手 ID 列表,通过
RoleAccessDao
。 - 调用
AssistantDao.get_assistants
获取用户有权限的助手列表。
- 获取用户的角色,通过
- 管理员用户:
- 获取助手的分组和标签:
- 通过
GroupResourceDao.get_resources_group
获取助手所属的分组信息。 - 通过
TagDao.get_tags_by_resource
获取助手关联的标签。
- 通过
- 组装返回数据:
- 遍历助手列表,调用
cls.return_simple_assistant_info
转换为简化的助手信息。 - 判断用户是否有编辑权限,设置
write
属性。 - 添加助手的分组和标签信息。
- 遍历助手列表,调用
- 返回结果:
- 使用
resp_200
返回包含助手列表和总数的响应。
- 使用
2. return_simple_assistant_info
@classmethod
def return_simple_assistant_info(cls, one: Assistant) -> AssistantSimpleInfo:
"""
将数据库的助手model简化处理后成返回前端的格式
"""
simple_dict = one.model_dump(include={
'id', 'name', 'desc', 'logo', 'status', 'user_id', 'create_time', 'update_time'
})
simple_dict['user_name'] = cls.get_user_name(one.user_id)
return AssistantSimpleInfo(**simple_dict)
功能:将数据库中的助手模型转换为简化的助手信息,便于前端显示。
逻辑:
- 使用
model_dump
方法提取需要的字段。 - 获取用户的用户名,调用
cls.get_user_name(one.user_id)
。 - 创建并返回
AssistantSimpleInfo
实例。
3. get_assistant_info
@classmethod
def get_assistant_info(cls, assistant_id: UUID, login_user: UserPayload):
assistant = AssistantDao.get_one_assistant(assistant_id)
if not assistant:
return AssistantNotExistsError.return_resp()
# 检查是否有权限获取信息
if not login_user.access_check(assistant.user_id, assistant.id.hex, AccessType.ASSISTANT_READ):
return UnAuthorizedError.return_resp()
tool_list = []
flow_list = []
knowledge_list = []
links = AssistantLinkDao.get_assistant_link(assistant_id)
for one in links:
if one.tool_id:
tool_list.append(one.tool_id)
elif one.knowledge_id:
knowledge_list.append(one.knowledge_id)
elif one.flow_id:
flow_list.append(one.flow_id)
else:
logger.error(f'not expect link info: {one.dict()}')
tool_list, flow_list, knowledge_list = cls.get_link_info(tool_list, flow_list, knowledge_list)
assistant.logo = cls.get_logo_share_link(assistant.logo)
return resp_200(data=AssistantInfo(**assistant.dict(), tool_list=tool_list, flow_list=flow_list, knowledge_list=knowledge_list))
功能:获取单个助手的详细信息,包括其关联的工具、技能和知识库。
逻辑:
- 权限检查:
- 获取助手对象,如果不存在,返回错误响应。
- 调用
login_user.access_check
检查用户是否有读取权限。
- 获取关联信息:
- 通过
AssistantLinkDao.get_assistant_link
获取助手的关联信息,包括工具、技能、知识库的 ID 列表。 - 调用
cls.get_link_info
获取关联对象的详细信息。
- 通过
- 处理助手 Logo:
- 调用
cls.get_logo_share_link
获取助手 Logo 的共享链接。
- 调用
- 返回结果:
- 使用
AssistantInfo
数据模型,组装助手的详细信息,返回成功响应。
- 使用
4. create_assistant
@classmethod
async def create_assistant(cls, request: Request, login_user: UserPayload, assistant: Assistant) -> UnifiedResponseModel[AssistantInfo]:
"""
创建助手
"""
# 检查下是否有重名
if cls.judge_name_repeat(assistant.name, assistant.user_id):
return AssistantNameRepeatError.return_resp()
logger.info(f"assistant original prompt id: {assistant.id}, desc: {assistant.prompt}")
# 自动补充默认的模型配置
assistant_llm = LLMService.get_assistant_llm()
if assistant_llm.llm_list:
for one in assistant_llm.llm_list:
if one.default:
assistant.model_name = one.model_id
break
# 自动生成描述
assistant, _, _ = await cls.get_auto_info(assistant)
assistant = AssistantDao.create_assistant(assistant)
cls.create_assistant_hook(request, assistant, login_user)
return resp_200(data=AssistantInfo(**assistant.dict(), tool_list=[], flow_list=[], knowledge_list=[]))
功能:创建新的助手。
逻辑:
- 名称重复检查:
- 调用
cls.judge_name_repeat
检查助手名称是否重复,如果重复,返回错误响应。
- 调用
- 设置默认模型配置:
- 调用
LLMService.get_assistant_llm
获取可用的语言模型列表。 - 如果存在默认模型,设置助手的
model_name
。
- 调用
- 自动生成描述:
- 调用
cls.get_auto_info
,自动生成助手的描述(desc
)。
- 调用
- 创建助手:
- 调用
AssistantDao.create_assistant
将助手保存到数据库。
- 调用
- 执行创建后的钩子函数:
- 调用
cls.create_assistant_hook
,执行一些额外的逻辑,如关联用户组、记录审计日志等。
- 调用
- 返回结果:
- 使用
AssistantInfo
数据模型,返回创建的助手信息。
- 使用
5. create_assistant_hook
@classmethod
def create_assistant_hook(cls, request: Request, assistant: Assistant, user_payload: UserPayload) -> bool:
"""
创建助手成功后的hook,执行一些其他业务逻辑
"""
# 查询下用户所在的用户组
user_group = UserGroupDao.get_user_group(user_payload.user_id)
if user_group:
# 批量将助手资源插入到关联表里
batch_resource = []
for one in user_group:
batch_resource.append(GroupResource(
group_id=one.group_id,
third_id=assistant.id.hex,
type=ResourceTypeEnum.ASSISTANT.value))
GroupResourceDao.insert_group_batch(batch_resource)
# 写入审计日志
AuditLogService.create_build_assistant(user_payload, get_request_ip(request), assistant.id.hex)
# 写入logo缓存
cls.get_logo_share_link(assistant.logo)
return True
功能:创建助手后的钩子函数,执行额外的业务逻辑。
逻辑:
- 关联用户组:
- 获取用户所属的用户组。
- 将助手资源与用户组关联,批量插入关联关系。
- 记录审计日志:
- 调用
AuditLogService.create_build_assistant
记录创建助手的操作日志。
- 调用
- 缓存助手 Logo:
- 调用
cls.get_logo_share_link
,缓存助手的 Logo。
- 调用
6. delete_assistant
@classmethod
def delete_assistant(cls, request: Request, login_user: UserPayload, assistant_id: UUID) -> UnifiedResponseModel:
assistant = AssistantDao.get_one_assistant(assistant_id)
if not assistant:
return AssistantNotExistsError.return_resp()
# 判断授权
if not login_user.access_check(assistant.user_id, assistant.id.hex, AccessType.ASSISTANT_WRITE):
return UnAuthorizedError.return_resp()
AssistantDao.delete_assistant(assistant)
cls.delete_assistant_hook(request, login_user, assistant)
return resp_200()
功能:删除助手。
逻辑:
- 获取助手对象:
- 如果助手不存在,返回错误响应。
- 权限检查:
- 检查用户是否有删除权限。
- 删除助手:
- 调用
AssistantDao.delete_assistant
删除助手。
- 调用
- 执行删除后的钩子函数:
- 调用
cls.delete_assistant_hook
,执行额外的逻辑。
- 调用
- 返回结果:
- 返回成功响应。
7. delete_assistant_hook
@classmethod
def delete_assistant_hook(cls, request: Request, login_user: UserPayload, assistant: Assistant) -> bool:
""" 清理关联的助手资源 """
logger.info(f"delete_assistant_hook id: {assistant.id}, user: {login_user.user_id}")
# 写入审计日志
AuditLogService.delete_build_assistant(login_user, get_request_ip(request), assistant.id.hex)
# 清理和用户组的关联
GroupResourceDao.delete_group_resource_by_third_id(assistant.id.hex, ResourceTypeEnum.ASSISTANT)
return True
功能:删除助手后的钩子函数,执行清理工作。
逻辑:
- 记录审计日志:
- 调用
AuditLogService.delete_build_assistant
记录删除助手的操作日志。
- 调用
- 清理资源关联:
- 删除助手与用户组的关联关系。
8. update_assistant
@classmethod
async def update_assistant(cls, request: Request, login_user: UserPayload, req: AssistantUpdateReq) -> UnifiedResponseModel[AssistantInfo]:
""" 更新助手信息 """
assistant = AssistantDao.get_one_assistant(req.id)
if not assistant:
return AssistantNotExistsError.return_resp()
check_result = cls.check_update_permission(assistant, login_user)
if check_result is not None:
return check_result
# 更新助手数据
if req.name and req.name != assistant.name:
# 检查下是否有重名
if cls.judge_name_repeat(req.name, assistant.user_id):
return AssistantNameRepeatError.return_resp()
assistant.name = req.name
assistant.desc = req.desc
assistant.logo = req.logo
assistant.prompt = req.prompt
assistant.guide_word = req.guide_word
assistant.guide_question = req.guide_question
assistant.model_name = req.model_name
assistant.temperature = req.temperature
assistant.update_time = datetime.now()
AssistantDao.update_assistant(assistant)
# 更新助手关联信息
if req.tool_list is not None:
AssistantLinkDao.update_assistant_tool(assistant.id, tool_list=req.tool_list)
if req.flow_list is not None:
AssistantLinkDao.update_assistant_flow(assistant.id, flow_list=req.flow_list)
if req.knowledge_list is not None:
AssistantLinkDao.update_assistant_knowledge(assistant.id, knowledge_list=req.knowledge_list, flow_id='')
tool_list, flow_list, knowledge_list = cls.get_link_info(req.tool_list, req.flow_list, req.knowledge_list)
cls.update_assistant_hook(request, login_user, assistant)
return resp_200(data=AssistantInfo(**assistant.dict(), tool_list=tool_list, flow_list=flow_list, knowledge_list=knowledge_list))
功能:更新助手的信息,包括基本信息和关联的工具、技能、知识库等。
逻辑:
- 获取助手对象:
- 如果助手不存在,返回错误响应。
- 权限检查:
- 调用
cls.check_update_permission
检查是否有权限更新助手。
- 调用
- 更新助手数据:
- 检查名称是否有变化,且是否重复。
- 更新助手的描述、Logo、提示词、引导词、引导问题、模型名称、温度等属性。
- 更新助手的更新时间。
- 调用
AssistantDao.update_assistant
保存更新。
- 更新关联信息:
- 更新助手关联的工具、技能、知识库。
- 获取关联对象的详细信息:
- 调用
cls.get_link_info
获取关联的工具、技能、知识库的详细信息。
- 调用
- 执行更新后的钩子函数:
- 调用
cls.update_assistant_hook
,执行额外的逻辑。
- 调用
- 返回结果:
- 使用
AssistantInfo
数据模型,返回更新后的助手信息。
- 使用
9. update_assistant_hook
@classmethod
def update_assistant_hook(cls, request: Request, login_user: UserPayload, assistant: Assistant) -> bool:
""" 更新助手的钩子 """
logger.info(f"update_assistant_hook id: {assistant.id}, user: {login_user.user_id}")
# 写入审计日志
AuditLogService.update_build_assistant(login_user, get_request_ip(request), assistant.id.hex)
# 写入缓存
cls.get_logo_share_link(assistant.logo)
return True
功能:更新助手后的钩子函数,执行额外的业务逻辑。
逻辑:
- 记录审计日志:
- 调用
AuditLogService.update_build_assistant
记录更新助手的操作日志。
- 调用
- 缓存助手 Logo:
- 调用
cls.get_logo_share_link
,缓存助手的 Logo。
- 调用
10. update_status
@classmethod
async def update_status(cls, request: Request, login_user: UserPayload, assistant_id: UUID, status: int) -> UnifiedResponseModel:
""" 更新助手的状态 """
assistant = AssistantDao.get_one_assistant(assistant_id)
if not assistant:
return AssistantNotExistsError.return_resp()
# 判断权限
if not login_user.access_check(assistant.user_id, assistant.id.hex, AccessType.ASSISTANT_WRITE):
return UnAuthorizedError.return_resp()
# 状态相等不做改动
if assistant.status == status:
return resp_200()
# 尝试初始化agent,初始化成功则上线、否则不上线
if status == AssistantStatus.ONLINE.value:
tmp_agent = AssistantAgent(assistant, '')
try:
await tmp_agent.init_assistant()
except Exception as e:
logger.exception('online agent init failed')
return AssistantInitError.return_resp('助手编译报错:' + str(e))
assistant.status = status
AssistantDao.update_assistant(assistant)
cls.update_assistant_hook(request, login_user, assistant)
return resp_200()
功能:更新助手的状态,例如上线或下线。
逻辑:
- 获取助手对象和权限检查:
- 获取助手对象,如果不存在,返回错误响应。
- 检查用户是否有写权限。
- 状态是否需要更新:
- 如果状态未改变,直接返回成功响应。
- 上线操作:
- 如果要将助手上线,尝试初始化助手代理
AssistantAgent
,如果初始化失败,返回错误响应。
- 如果要将助手上线,尝试初始化助手代理
- 更新状态:
- 更新助手的状态,并保存到数据库。
- 执行更新后的钩子函数:
- 调用
cls.update_assistant_hook
,执行额外的逻辑。
- 调用
- 返回结果:
- 返回成功响应。
其他方法
由于篇幅限制,其他方法的详细解析我就不一一展开了。但总体来说,这些方法的功能包括:
- 更新助手的提示词、技能列表、工具列表。
- 获取用户可见的工具列表,支持预置和自定义工具。
- 添加、更新、删除自定义工具类别和工具。
- 检查更新助手的权限,避免未授权的操作。
- 自动生成助手的提示词、描述、工具和技能选择。
总结
AssistantService
类是应用程序中负责助手管理的核心类。它提供了丰富的方法,涵盖了助手的创建、读取、更新、删除(CRUD)操作,以及与工具、技能、权限等相关的管理。
- 权限管理:通过
user_payload.access_check
方法,在每个需要权限验证的地方,确保用户有相应的权限执行操作。 - 审计日志:在关键操作后,调用
AuditLogService
记录用户的操作,便于后续的审计和追踪。 - 缓存优化:使用
InMemoryCache
缓存用户信息,减少对数据库的频繁访问,提高性能。 - 自动化功能:利用
AssistantAgent
和LLMService
,实现了自动生成助手描述、自动选择工具和技能等智能化功能。 - 扩展性:通过钩子函数(如
create_assistant_hook
、delete_assistant_hook
等),在主要操作后执行额外的业务逻辑,增强了代码的可扩展性和可维护性。
架构
项目总体结构
项目名为 bisheng
,包含以下主要部分:
.github/workflows
:用于CI/CD的GitHub Actions配置。docker
:包含Docker相关的配置,用于容器化部署。src
:源代码目录,包含后端、前端和其他组件。backend
:后端代码,主要由Python编写。bisheng-langchain
:与后端相关的子模块,可能是对LangChain库的扩展或定制。frontend
:前端代码。
后端架构概览
后端主要位于 src/backend/bisheng
目录下,采用了模块化的架构设计,按照功能划分了多个子模块。这种设计有助于代码的维护和扩展。
使用的技术栈
- 编程语言:Python
- Web框架:可能使用了 FastAPI 或 Django(根据文件结构,更可能是 FastAPI)
- 数据库:可能使用了 SQLAlchemy 作为ORM,数据库可能是 MySQL 或 PostgreSQL
- 缓存:Redis
- 消息队列:可能使用了 Celery 进行任务队列和异步任务处理
- 语言模型集成:与 LangChain 库集成,用于处理大型语言模型(LLM)的相关功能
- 容器化:使用 Docker 进行部署
详细模块解析
逐一解释 src/backend/bisheng
目录下的主要模块及其作用。
1. api
模块
- 路径:
bisheng/api
- 功能:负责定义后端的 API 接口,包括请求处理、响应格式、错误码等。
- 子模块:
errcode
:定义错误码和异常类,统一管理错误响应。services
:API层使用的服务类,处理业务逻辑。v1
和v2
:API 的不同版本,方便接口的升级和维护。schema
:定义请求和响应的数据模型,使用 Pydantic 进行数据校验。
2. cache
模块
- 路径:
bisheng/cache
- 功能:实现缓存功能,可能使用 Redis 或内存缓存,加速数据的读取和减少数据库压力。
3. chat
模块
- 路径:
bisheng/chat
- 功能:处理与聊天相关的逻辑,例如聊天记录管理、消息处理、会话管理等。
4. components
模块
- 路径:
bisheng/components
- 功能:封装可复用的组件,提供通用的功能。
- 子模块:
custom_components
:自定义的组件,实现特定的功能需求。
5. core
模块
- 路径:
bisheng/core
- 功能:核心功能模块,可能包含项目的主要业务逻辑和关键算法。
6. custom
模块
- 路径:
bisheng/custom
- 功能:自定义的功能模块,可能包含特殊的业务需求或对第三方库的定制。
7. database
模块
- 路径:
bisheng/database
- 功能:与数据库交互的模块,定义了数据模型和数据库操作。
- 子模块:
data
:可能包含数据库初始化、数据迁移或样本数据。models
:定义数据库的模型类,使用 ORM(如 SQLAlchemy)映射数据库表。
8. field_typing
模块
- 路径:
bisheng/field_typing
- 功能:定义字段的类型,用于数据模型和校验。
9. graph
模块
- 路径:
bisheng/graph
- 功能:处理图相关的功能,可能涉及节点(vertex)、边(edge)和图(graph)的操作。
- 子模块:
edge
:处理图中的边。vertex
:处理图中的节点。graph
:整体的图结构和算法。
10. interface
模块
- 路径:
bisheng/interface
- 功能:定义各种接口和抽象基类,为不同的组件提供统一的接口规范。
- 子模块:
agents
:智能体相关的接口。autogenRole
:自动生成角色的接口。chains
:处理链式调用的接口。custom
:自定义接口。code_parser
:代码解析器。custom_component
:自定义组件接口。directory_reader
:目录读取器。
document_loaders
:文档加载器接口。embeddings
:词嵌入相关的接口。llms
:大型语言模型的接口。memories
:记忆模块的接口。prompts
:提示词相关的接口。retrievers
:检索器接口。toolkits
:工具包接口。utilities
:通用工具的接口。vector_store
:向量存储的接口。
11. processing
模块
- 路径:
bisheng/processing
- 功能:数据处理模块,可能包含数据预处理、清洗、转换等功能。
12. restructure
模块
- 路径:
bisheng/restructure
- 功能:重构或优化代码结构的模块。
- 子模块:
assistants
:助手相关的重构代码。
13. script
模块
- 路径:
bisheng/script
- 功能:包含脚本文件,用于执行特定任务,如数据迁移、批处理任务等。
14. services
模块
- 路径:
bisheng/services
- 功能:核心业务逻辑层,封装了具体的服务。
- 子模块:
auth
:认证与授权服务,处理用户登录、权限验证等。cache
:缓存服务,提供缓存操作的封装。chat
:聊天服务,处理聊天相关的业务逻辑。credentials
:凭证管理服务。database
:数据库服务,提供数据库操作的封装。models
:数据库模型。api_key
:API 密钥管理。component
:组件模型。credential
:凭证模型。flow
:流程模型。user
:用户模型。
plugins
:插件管理服务。session
:会话管理服务。settings
:配置管理服务。store
:存储服务,可能涉及文件或对象存储。task
:任务管理服务。backends
:任务后端,实现任务的具体执行。
15. template
模块
- 路径:
bisheng/template
- 功能:模板相关的模块,用于生成动态内容。
- 子模块:
field
:字段模板。frontend_node
:前端节点模板。formatter
:格式化器,用于格式化输出内容。
template
:通用模板。
16. utils
模块
- 路径:
bisheng/utils
- 功能:工具模块,包含各种辅助函数和通用工具类。
17. worker
模块
- 路径:
bisheng/worker
- 功能:后台任务处理模块,可能使用 Celery 等框架处理异步任务。
- 子模块:
knowledge
:与知识库相关的任务处理。
bisheng-langchain 模块
路径:src/bisheng-langchain
这个模块可能是对 LangChain 库的定制或扩展,LangChain 是一个用于构建大型语言模型应用的库。
主要模块
agents
:智能体模块,定义了不同类型的智能体。chatglm_functions_agent
:ChatGLM 模型的功能智能体。llm_functions_agent
:通用的 LLM 功能智能体。
chains
:链式调用模块,处理复杂的链式任务。autogen
:自动生成相关的链。combine_documents
:文档合并链。conversational_retrieval
:对话式检索链。question_answering
:问答链。
document_loaders
:文档加载器,处理不同类型文档的加载和解析。embeddings
:词嵌入模块,处理向量化表示。gpts
:GPT 模型相关的模块。tools
:GPT 工具集,包括计算器、搜索、代码解释器等。
memory
:记忆模块,处理会话和上下文记忆。rag
:检索增强生成(Retrieval-Augmented Generation)模块。retrievers
:检索器,实现高效的信息检索。
其他重要模块
docker
:包含 Docker 部署相关的配置文件,如 Dockerfile、docker-compose 等。test
:测试代码,包含单元测试、集成测试等,确保代码的质量和稳定性。
后端与其他组件的交互
- 数据库:通过 ORM(如 SQLAlchemy)与数据库交互,进行数据的存储和读取。
- 缓存:使用 Redis 进行缓存,加速数据访问。
- 外部 API:通过封装的服务或工具模块,与外部的 API(如 OpenAI、ChatGPT 等)进行交互。
- 前端:后端提供 API 接口,供前端调用,实现数据的展示和交互。
设计模式和架构选择
- 模块化设计:按照功能划分模块,增强代码的可维护性和可扩展性。
- 服务层(Service Layer):业务逻辑集中在服务层,控制器层负责处理请求并调用服务层。
- 数据访问层(DAO):封装了数据库操作,提供统一的接口。
- 依赖注入:使用依赖注入管理组件和服务之间的依赖关系,提升代码的灵活性。
- 异步编程:可能使用了 Python 的
asyncio
库,实现异步的 I/O 操作,提高性能。