书生·浦语大模型实战营之Lagent & AgentLego 智能体应用搭建
Lagent 简介
Lagent 是一个轻量级开源智能体框架,旨在让用户可以高效地构建基于大语言模型的智能体。同时它也提供了一些典型工具以增强大语言模型的能力。
Lagent 目前已经支持了包括 AutoGPT、ReAct 等在内的多个经典智能体范式,也支持了如下工具:
- Arxiv 搜索
- Bing 地图
- Google 学术搜索
- Google 搜索
- 交互式 IPython 解释器
- IPython 解释器
- PPT
- Python 解释器
AgentLego 简介
AgentLego 是一个提供了多种开源工具 API 的多模态工具包,旨在像是乐高积木一样,让用户可以快速简便地拓展自定义工具,从而组装出自己的智能体。通过 AgentLego 算法库,不仅可以直接使用多种工具,也可以利用这些工具,在相关智能体框架(如 Lagent,Transformers Agent 等)的帮助下,快速构建可以增强大语言模型能力的智能体。
AgentLego 目前提供了如下工具:
Lagent 是一个智能体框架,而 AgentLego 与大模型智能体并不直接相关,而是作为工具包,在相关智能体的功能支持模块发挥作用。
两者之间的关系可以用下图来表示:
环境配置
在创建开发机界面选择镜像为 Cuda12.2-conda,并选择 GPU 为30% A100
创建一个用于存放 Agent 相关文件的目录
mkdir -p /root/agent
配置 conda 环境,可以输入如下指令
studio-conda -t agent -o pytorch-2.1.2
安装 Lagent 和 AgentLego
Lagent 和 AgentLego 都提供了两种安装方法,一种是通过 pip 直接进行安装,另一种则是从源码进行安装。为了方便使用 Lagent 的 Web Demo 以及 AgentLego 的 WebUI,我们选择直接从源码进行安装。 此处附上源码安装的相关帮助文档:
Lagent:https://lagent.readthedocs.io/zh-cn/latest/get_started/install.html
AgentLego:https://agentlego.readthedocs.io/zh-cn/latest/get_started.html
可以执行如下命令进行安装:
cd /root/agent
conda activate agent
git clone https://gitee.com/internlm/lagent.git
cd lagent && git checkout 581d9fb && pip install -e . && cd ..
git clone https://gitee.com/internlm/agentlego.git
cd agentlego && git checkout 7769e0d && pip install -e . && cd ..
安装其他依赖
安装其他将要用到的依赖库,如 LMDeploy,可以执行如下命令:
conda activate agent
pip install lmdeploy==0.3.0
git clone 的方法 下载 tutorial
cd /root/agent
git clone -b camp2 https://gitee.com/internlm/Tutorial.git
Lagent:轻量级智能体框架
在这一部分中,我们将带大家体验 Lagent 的 Web Demo,使用 Lagent 自定义工具,并体验自定义工具的效果。
详细文档可以访问:Lagent:轻量级智能体框架。
Lagent Web Demo
使用 LMDeploy 部署
由于 Lagent 的 Web Demo 需要用到 LMDeploy 所启动的 api_server,因此我们首先按照下图指示在 vscode terminal 中执行如下代码使用 LMDeploy 启动一个 api_server
conda activate agent
lmdeploy serve api_server /root/share/new_models/Shanghai_AI_Laboratory/internlm2-chat-7b \
--server-name 127.0.0.1 \
--model-name internlm2-chat-7b \
--cache-max-entry-count 0.1
启动并使用 Lagent Web Demo
接下来我们按照下图指示新建一个 terminal 以启动 Lagent Web Demo。在新建的 terminal 中执行如下指令
conda activate agent
cd /root/agent/lagent/examples
streamlit run internlm2_agent_web_demo.py --server.address 127.0.0.1 --server.port 7860
等待 LMDeploy 的 api_server 与 Lagent Web Demo 完全启动后,在本地进行端口映射,将 LMDeploy api_server 的23333端口以及 Lagent Web Demo 的7860端口映射到本地。可以执行:
ssh -CNg -L 7860:127.0.0.1:7860 -L 23333:127.0.0.1:23333 root@ssh.intern-ai.org.cn -p 43844
在本地的浏览器页面中打开 http://localhost:7860 以使用 Lagent Web Demo。首先输入模型 IP 为 127.0.0.1:23333,在输入完成后按下回车键以确认。并选择插件为 ArxivSearch,以让模型获得在 arxiv 上搜索论文的能力。
系统提示词:当开启工具以及代码时,根据需求选择合适的工具进行调用
数据分析提示词:你现在已经能够在一个有状态的 Jupyter 笔记本环境中运行 Python 代码。当你向 python 发送含有 Python 代码的消息时,它将在该环境中执行。这个工具适用于多种场景,如数据分析或处理(包括数据操作、统计分析、图表绘制),复杂的计算问题(解决数学和物理难题),编程示例(理解编程概念或特性),文本处理和分析(比如文本解析和自然语言处理),机器学习和数据科学(用于展示模型训练和数据可视化),以及文件操作和数据导入(处理CSV、JSON等格式的文件)。
插件提示词:
你可以使用如下工具:
{prompt}
如果你已经获得足够信息,请直接给出答案. 避免不必要的工具调用! 同时注意你可以使用的工具,不要随意捏造!
输入“请帮我搜索 InternLM2 Technical Report” 以让模型搜索书生·浦语2的技术报告。效果如下图所示,可以看到模型正确输出了 InternLM2 技术报告的相关信息。尽管还输出了其他论文,但这是由 arxiv 搜索 API 的相关行为导致的
lagent/agents/internlm2_agent.py
import json
import logging
from copy import deepcopy
from typing import Dict, List, Optional, Union
from lagent.actions import ActionExecutor
from lagent.agents.base_agent import BaseAgent
from lagent.llms import BaseAPIModel, BaseModel
from lagent.schema import ActionReturn, ActionStatusCode, AgentReturn, AgentStatusCode, ModelStatusCode # noqa: E501
API_PREFIX = (
"This is the subfunction for tool '{tool_name}', you can use this tool. "
'The description of this function is: \n{description}')
META_CN = ('当开启工具以及代码时,根据需求选择合适的工具进行调用')
INTERPRETER_CN = ('你现在已经能够在一个有状态的 Jupyter 笔记本环境中运行 Python 代码。'
'当你向 python 发送含有 Python 代码的消息时,它将在该环境中执行。'
'这个工具适用于多种场景,如数据分析或处理(包括数据操作、统计分析、图表绘制),'
'复杂的计算问题(解决数学和物理难题),编程示例(理解编程概念或特性),'
'文本处理和分析(比如文本解析和自然语言处理),'
'机器学习和数据科学(用于展示模型训练和数据可视化),'
'以及文件操作和数据导入(处理CSV、JSON等格式的文件)。')
PLUGIN_CN = ('你可以使用如下工具:'
'\n{prompt}\n'
'如果你已经获得足够信息,请直接给出答案. 避免不必要的工具调用! '
'同时注意你可以使用的工具,不要随意捏造!')
class Internlm2Protocol:
def __init__(
self,
meta_prompt: str = META_CN,
interpreter_prompt: str = INTERPRETER_CN,
plugin_prompt: str = PLUGIN_CN,
few_shot: Optional[List] = None,
language: Dict = dict(
begin='',
end='',
belong='assistant',
),
tool: Dict = dict(
begin='{start_token}{name}\n',
start_token='<|action_start|>',
name_map=dict(plugin='<|plugin|>', interpreter='<|interpreter|>'),
belong='assistant',
end='<|action_end|>\n',
),
execute: Dict = dict(
role='execute', begin='', end='', fallback_role='environment'),
) -> None:
self.meta_prompt = meta_prompt
self.interpreter_prompt = interpreter_prompt
self.plugin_prompt = plugin_prompt
self.roles_cfg = dict(tool=tool, language=language)
self.language = language
self.execute = execute
self.tool = tool
self.few_shot = few_shot
def format_sub_role(self, messages: List[Dict]) -> List[Dict]:
def format_interpreter(message):
if isinstance(message['content'], dict):
# assert message['content']['name'] == 'IPythonInterpreter'
return dict(
role=message['role'],
name=message['name'],
content=message['content']['parameters']['command'])
else:
return message
def format_plugin(message):
if isinstance(message['content'], dict):
return dict(
role=message['role'],
name=message['name'],
content=json.dumps(message['content']))
else:
return message
new_message = list()
for message in messages:
if message['role'] in [
'assistant', 'user', 'system', 'environment'
]:
new_message.append(message)
continue
role_cfg = self.roles_cfg[message['role']]
begin = role_cfg['begin']
if message['role'] == 'tool':
if message['name'] == 'interpreter':
message = format_interpreter(message)
elif message['name'] == 'plugin':
message = format_plugin(message)
else:
raise NotImplementedError
begin = role_cfg['begin'].format(
start_token=role_cfg.get('start_token', ''),
name=role_cfg.get('name_map', {}).get(message['name'], ''))
new_content = begin + message['content'] + role_cfg['end']
if role_cfg.get('fallback_role'):
new_message.append(
dict(role=role_cfg['fallback_role'], content=new_content))
elif role_cfg.get('belong'):
if new_message[-1]['role'] != role_cfg.get('belong'):
new_message.append(
dict(role=role_cfg.get('belong'), content=new_content))
else:
new_message[-1]['content'] += new_content
else:
new_message.append(
dict(role=message['role'], content=new_content))
return new_message
def format(self,
inner_step: List[Dict],
plugin_executor: ActionExecutor = None,
interpreter_executor: ActionExecutor = None,
**kwargs) -> list:
formatted = []
if self.meta_prompt:
formatted.append(dict(role='system', content=self.meta_prompt))
if interpreter_executor and self.interpreter_prompt:
interpreter_info = interpreter_executor.get_actions_info()[0]
interpreter_prompt = self.interpreter_prompt.format(
code_prompt=interpreter_info['description'])
formatted.append(
dict(
role='system',
content=interpreter_prompt,
name='interpreter'))
if plugin_executor and plugin_executor.actions and self.plugin_prompt:
plugin_descriptions = []
for api_info in plugin_executor.get_actions_info():
plugin = deepcopy(api_info)
if isinstance(api_info, dict):
tool_name = api_info['name'].split('.')[0]
plugin['description'] = API_PREFIX.format(
tool_name=tool_name, description=plugin['description'])
# only keep required parameters
required_parameters = [
param for param in plugin['parameters']
if param['name'] in plugin['required']
]
plugin['parameters'] = required_parameters
plugin_descriptions.append(plugin)
plugin_prompt = self.plugin_prompt.format(
prompt=json.dumps(
plugin_descriptions, ensure_ascii=False, indent=4))
formatted.append(
dict(role='system', content=plugin_prompt, name='plugin'))
if self.few_shot:
for few_shot in self.few_shot:
formatted += self.format_sub_role(few_shot)
formatted += self.format_sub_role(inner_step)
return formatted
def parse(self, message, plugin_executor: ActionExecutor,
interpreter_executor: ActionExecutor):
if self.language['begin']:
message = message.split(self.language['begin'])[-1]
if self.tool['name_map']['plugin'] in message:
message, action = message.split(
f"{self.tool['start_token']}{self.tool['name_map']['plugin']}")
action = action.split(self.tool['end'].strip())[0]
return 'plugin', message, action
if self.tool['name_map']['interpreter'] in message:
message, code = message.split(
f"{self.tool['start_token']}"
f"{self.tool['name_map']['interpreter']}")
code = code.split(self.tool['end'].strip())[0].strip()
return 'interpreter', message, dict(
name=interpreter_executor.action_names()[0],
parameters=dict(
command=code)) if interpreter_executor else None
return None, message.split(self.tool['start_token'])[0], None
def format_response(self, action_return, name) -> dict:
if action_return.state == ActionStatusCode.SUCCESS:
response = action_return.format_result()
else:
response = str(action_return.errmsg)
content = self.execute['begin'] + response + self.execute['end']
if self.execute.get('fallback_role'):
return dict(
role=self.execute['fallback_role'], content=content, name=name)
elif self.execute.get('belong'):
return dict(
role=self.execute['belong'], content=content, name=name)
return dict(role=self.execute['role'], content=response, name=name)
class Internlm2Agent(BaseAgent):
def __init__(self,
llm: Union[BaseModel, BaseAPIModel],
plugin_executor: ActionExecutor = None,
interpreter_executor: ActionExecutor = None,
protocol=Internlm2Protocol(),
max_turn: int = 3) -> None:
self.max_turn = max_turn
self._interpreter_executor = interpreter_executor
super().__init__(
llm=llm, action_executor=plugin_executor, protocol=protocol)
def chat(self, message: Union[str, Dict], **kwargs) -> AgentReturn:
if isinstance(message, str):
message = dict(role='user', content=message)
if isinstance(message, dict):
message = [message]
inner_history = message[:]
offset = len(inner_history)
agent_return = AgentReturn()
for _ in range(self.max_turn):
# list of dict
prompt = self._protocol.format(
inner_step=inner_history,
plugin_executor=self._action_executor,
interpreter_executor=self._interpreter_executor,
)
response = self._llm.chat(prompt, **kwargs)
name, language, action = self._protocol.parse(
message=response,
plugin_executor=self._action_executor,
interpreter_executor=self._interpreter_executor,
)
if name:
if name == 'plugin':
if self._action_executor:
executor = self._action_executor
else:
logging.info(msg='No plugin is instantiated!')
continue
try:
action = json.loads(action)
except Exception as e:
logging.info(msg=f'Invaild action {e}')
continue
elif name == 'interpreter':
if self._interpreter_executor:
executor = self._interpreter_executor
else:
logging.info(msg='No interpreter is instantiated!')
continue
else:
logging.info(
msg=(f"Invalid name '{name}'. Currently only 'plugin' "
"and 'interpreter' are supported."))
continue
action_return: ActionReturn = executor(action['name'],
action['parameters'])
action_return.thought = language
agent_return.actions.append(action_return)
inner_history.append(dict(role='language', content=language))
if not name or action_return.type == executor.finish_action.name:
agent_return.response = language
agent_return.state = AgentStatusCode.END
break
else:
inner_history.append(
dict(role='tool', content=action, name=name))
inner_history.append(
self._protocol.format_response(action_return, name=name))
agent_return.inner_steps = inner_history[offset:]
return agent_return
def stream_chat(self, message: List[dict], **kwargs) -> AgentReturn:
if isinstance(message, str):
message = dict(role='user', content=message)
if isinstance(message, dict):
message = [message]
inner_history = message[:]
offset = len(inner_history)
agent_return = AgentReturn()
last_agent_state = AgentStatusCode.SESSION_READY
for _ in range(self.max_turn):
# list of dict
prompt = self._protocol.format(
inner_step=inner_history,
plugin_executor=self._action_executor,
interpreter_executor=self._interpreter_executor,
)
response = ''
for model_state, res, _ in self._llm.stream_chat(prompt, **kwargs):
model_state: ModelStatusCode
response = res
if model_state.value < 0:
agent_return.state = getattr(AgentStatusCode,
model_state.name)
yield deepcopy(agent_return)
return
else:
name, language, action = self._protocol.parse(
message=response,
plugin_executor=self._action_executor,
interpreter_executor=self._interpreter_executor,
)
if name:
if model_state == ModelStatusCode.END:
agent_state = last_agent_state + 1
if name == 'plugin':
if self._action_executor:
executor = self._action_executor
else:
logging.info(
msg='No plugin is instantiated!')
continue
try:
action = json.loads(action)
except Exception as e:
logging.info(msg=f'Invaild action {e}')
continue
elif name == 'interpreter':
if self._interpreter_executor:
executor = self._interpreter_executor
else:
logging.info(
msg='No interpreter is instantiated!')
continue
agent_return.state = agent_state
agent_return.response = action
else:
agent_state = (
AgentStatusCode.PLUGIN_START if name
== 'plugin' else AgentStatusCode.CODING)
if agent_state != last_agent_state:
# agent_return.state = agent_state
agent_return.response = language
yield deepcopy(agent_return)
agent_return.state = agent_state
agent_return.response = action
else:
agent_state = AgentStatusCode.STREAM_ING
agent_return.state = agent_state
agent_return.response = language
last_agent_state = agent_state
yield deepcopy(agent_return)
if name:
action_return: ActionReturn = executor(action['name'],
action['parameters'])
action_return.thought = language
agent_return.actions.append(action_return)
inner_history.append(dict(role='language', content=language))
if not name:
agent_return.response = language
break
elif action_return.type == executor.finish_action.name:
try:
response = action_return.args['text']['response']
except Exception:
logging.info(msg='Unable to parse FinishAction.')
response = ''
agent_return.response = response
break
else:
inner_history.append(
dict(role='tool', content=action, name=name))
inner_history.append(
self._protocol.format_response(action_return, name=name))
agent_state += 1
agent_return.state = agent_state
yield agent_return
agent_return.inner_steps = deepcopy(inner_history[offset:])
agent_return.state = AgentStatusCode.END
yield agent_return
lagent/actions/arxiv_search.py 代码
from typing import Optional, Type
from lagent.actions.base_action import BaseAction, tool_api
from lagent.actions.parser import BaseParser, JsonParser
from lagent.schema import ActionReturn, ActionStatusCode
class ArxivSearch(BaseAction):
"""Search information from Arxiv.org. \
Useful for when you need to answer questions about Physics, Mathematics, \
Computer Science, Quantitative Biology, Quantitative Finance, Statistics, \
Electrical Engineering, and Economics from scientific articles on arxiv.org.
"""
def __init__(self,
top_k_results: int = 3,
max_query_len: int = 300,
doc_content_chars_max: int = 1500,
description: Optional[dict] = None,
parser: Type[BaseParser] = JsonParser,
enable: bool = True):
super().__init__(description, parser, enable)
self.top_k_results = top_k_results
self.max_query_len = max_query_len
self.doc_content_chars_max = doc_content_chars_max
@tool_api(explode_return=True)
def get_arxiv_article_information(self, query: str) -> dict:
"""Run Arxiv search and get the article meta information.
Args:
query (:class:`str`): the content of search query
Returns:
:class:`dict`: article information
* content (str): a list of 3 arxiv search papers
"""
import arxiv
try:
results = arxiv.Search( # type: ignore
query[:self.max_query_len],
max_results=self.top_k_results).results()
except Exception as exc:
return ActionReturn(
errmsg=f'Arxiv exception: {exc}',
state=ActionStatusCode.HTTP_ERROR)
docs = [
f'Published: {result.updated.date()}\nTitle: {result.title}\n'
f'Authors: {", ".join(a.name for a in result.authors)}\n'
f'Summary: {result.summary[:self.doc_content_chars_max]}'
for result in results
]
if docs:
return {'content': '\n\n'.join(docs)}
return {'content': 'No good Arxiv Result was found'}
用 Lagent 自定义工具
在本节中,我们将基于 Lagent 自定义一个工具。Lagent 中关于工具部分的介绍文档位于 https://lagent.readthedocs.io/zh-cn/latest/tutorials/action.html 。使用 Lagent 自定义工具主要分为以下几步:
- 继承 BaseAction 类
- 实现简单工具的 run 方法;或者实现工具包内每个子工具的功能
- 简单工具的 run 方法可选被 tool_api 装饰;工具包内每个子工具的功能都需要被 tool_api 装饰
下面我们将实现一个调用和风天气 API 的工具以完成实时天气查询的功能。
创建工具文件
首先通过 touch /root/agent/lagent/lagent/actions/weather.py 新建工具文件,该文件内容如下
vim /root/agent/lagent/lagent/actions/weather.py
import json
import os
import requests
from typing import Optional, Type
from lagent.actions.base_action import BaseAction, tool_api
from lagent.actions.parser import BaseParser, JsonParser
from lagent.schema import ActionReturn, ActionStatusCode
class WeatherQuery(BaseAction):
"""Weather plugin for querying weather information."""
def __init__(self,
key: Optional[str] = None,
description: Optional[dict] = None,
parser: Type[BaseParser] = JsonParser,
enable: bool = True) -> None:
super().__init__(description, parser, enable)
key = os.environ.get('WEATHER_API_KEY', key)
if key is None:
raise ValueError(
'Please set Weather API key either in the environment '
'as WEATHER_API_KEY or pass it as `key`')
self.key = key
self.location_query_url = 'https://geoapi.qweather.com/v2/city/lookup'
self.weather_query_url = 'https://devapi.qweather.com/v7/weather/now'
@tool_api
def run(self, query: str) -> ActionReturn:
"""一个天气查询API。可以根据城市名查询天气信息。
Args:
query (:class:`str`): The city name to query.
"""
tool_return = ActionReturn(type=self.name)
status_code, response = self._search(query)
if status_code == -1:
tool_return.errmsg = response
tool_return.state = ActionStatusCode.HTTP_ERROR
elif status_code == 200:
parsed_res = self._parse_results(response)
tool_return.result = [dict(type='text', content=str(parsed_res))]
tool_return.state = ActionStatusCode.SUCCESS
else:
tool_return.errmsg = str(status_code)
tool_return.state = ActionStatusCode.API_ERROR
return tool_return
def _parse_results(self, results: dict) -> str:
"""Parse the weather results from QWeather API.
Args:
results (dict): The weather content from QWeather API
in json format.
Returns:
str: The parsed weather results.
"""
now = results['now']
data = [
f'数据观测时间: {now["obsTime"]}',
f'温度: {now["temp"]}°C',
f'体感温度: {now["feelsLike"]}°C',
f'天气: {now["text"]}',
f'风向: {now["windDir"]},角度为 {now["wind360"]}°',
f'风力等级: {now["windScale"]},风速为 {now["windSpeed"]} km/h',
f'相对湿度: {now["humidity"]}',
f'当前小时累计降水量: {now["precip"]} mm',
f'大气压强: {now["pressure"]} 百帕',
f'能见度: {now["vis"]} km',
]
return '\n'.join(data)
def _search(self, query: str):
# get city_code
try:
city_code_response = requests.get(
self.location_query_url,
params={'key': self.key, 'location': query}
)
except Exception as e:
return -1, str(e)
if city_code_response.status_code != 200:
return city_code_response.status_code, city_code_response.json()
city_code_response = city_code_response.json()
if len(city_code_response['location']) == 0:
return -1, '未查询到城市'
city_code = city_code_response['location'][0]['id']
# get weather
try:
weather_response = requests.get(
self.weather_query_url,
params={'key': self.key, 'location': city_code}
)
except Exception as e:
return -1, str(e)
return weather_response.status_code, weather_response.json()
为了获得稳定的天气查询服务, 首先要获取 API KEY。打开 https://dev.qweather.com/docs/api/
体验自定义工具效果
在两个 terminal 中分别启动 LMDeploy 服务和 Tutorial 已经写好的用于这部分的 Web Demo
conda activate agent
lmdeploy serve api_server /root/share/new_models/Shanghai_AI_Laboratory/internlm2-chat-7b \
--server-name 127.0.0.1 \
--model-name internlm2-chat-7b \
--cache-max-entry-count 0.1
export WEATHER_API_KEY=在2.2节获取的API KEY
# 比如 export WEATHER_API_KEY=1234567890abcdef
conda activate agent
cd /root/agent/Tutorial/agent
streamlit run internlm2_weather_web_demo.py --server.address 127.0.0.1 --server.port 7860
在本地执行如下操作以进行端口映射
在输入模型地址并选择好工具后,就可以开始体验了。
读者可以关注大模型返回的工具名称及参数。query参数解析为上海
{
"name": "WeatherQuery",
"parameters": {
"query": "上海"
}
}
AgentLego:组装智能体“乐高”
使用 AgentLego 工具,体验 AgentLego 的 WebUI,以及基于 AgentLego 自定义工具并体验自定义工具的效果。
https://github.com/InternLM/Tutorial/tree/camp2/agent
https://github.com/InternLM/Tutorial/blob/camp2/agent/lagent.md
https://github.com/InternLM/Tutorial/blob/camp2/agent/agentlego.md