import os
from typing import Any
from langchain. agents import AgentType, initialize_agent, Tool
from langchain_openai import ChatOpenAI
from langchain. tools import BaseTool
from langchain_experimental. tools. python. tool import PythonREPLTool
from langchain. memory import ConversationBufferMemory
from langchain_community. utilities import WikipediaAPIWrapper
from langgraph. prebuilt import create_react_agent
os. environ[ "OPENAI_API_KEY" ] = "sk-cRMC2m0GsE18vYaWdAMj"
os. environ[ "OPENAI_BASE_URL" ] = "https://aigptx.top/v1/"
def create_init_tool_agent ( ) :
"""创建基本的ReAct智能体"""
wikipedia = WikipediaAPIWrapper( )
python_repl = PythonREPLTool( )
tools = [
Tool(
name= "维基百科" ,
func= wikipedia. run,
description= "用于查询维基百科文章的工具"
) ,
Tool(
name= "Python解释器" ,
func= python_repl. run,
description= "用于执行Python代码的工具,可以进行计算或数据分析"
)
]
llm = ChatOpenAI( temperature= 1 , max_tokens= 2000 , model= 'gpt-3.5-turbo-0125' )
memory = ConversationBufferMemory( memory_key= "chat_history" , return_messages= True )
langgraph_agent_executor = initialize_agent(
tools,
llm,
agent= AgentType. CHAT_CONVERSATIONAL_REACT_DESCRIPTION,
verbose= True ,
memory= memory,
handle_parsing_errors= True
)
return langgraph_agent_executor
def create_openai_functions_agent ( ) :
"""创建基于OpenAI函数调用的智能体"""
wikipedia = WikipediaAPIWrapper( )
python_repl = PythonREPLTool( )
tools = [
Tool(
name= "Python执行器" ,
func= python_repl. run,
description= "执行Python代码的工具,适合进行计算、数据处理"
) ,
Tool(
name= "维基百科" ,
func= wikipedia. run,
description= "搜索维基百科文章的工具,适合查询事实性信息"
)
]
llm = ChatOpenAI( temperature= 0 )
agent = initialize_agent(
tools,
llm,
agent= AgentType. OPENAI_FUNCTIONS,
verbose= True
)
return agent
class WeatherTool ( BaseTool) :
name: str = "天气查询"
description: str = "查询指定城市的天气情况"
def _run ( self, city: str ) - > str :
return f" { city} 的天气: 晴朗, 25°C, 湿度50%"
async def _arun ( self, city: str ) - > str :
return self. _run( city)
class CalculatorTool ( BaseTool) :
name: str = "计算器"
description: str = "进行数学计算,输入应为数学表达式"
def _run ( self, expression: str ) - > str :
try :
result = eval ( expression)
return f"计算结果: { result} "
except Exception as e:
return f"计算错误: { str ( e) } "
async def _arun ( self, expression: str ) - > str :
return self. _run( expression)
def create_custom_tool_agent ( ) :
"""创建带有自定义工具的智能体"""
tools = [
WeatherTool( ) ,
CalculatorTool( ) ,
PythonREPLTool( )
]
llm = ChatOpenAI( temperature= 0 )
agents = initialize_agent(
tools,
llm,
agent= AgentType. CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose= True
)
return agents
if __name__ == "__main__" :
print ( "=== LangChain 单智能体模式示例 ===" )
agent_type = "openai_functions"
response: Any = ''
if agent_type == "react" :
agent = create_init_tool_agent( )
response = agent. invoke( { 'input' : '谁是阿尔伯特·爱因斯坦? 他出生于哪一年? 计算从他出生到现在过了多少年。回答的时候请使用中文输出' , 'chat_history' : [ ] } )
elif agent_type == "openai_functions" :
agent = create_openai_functions_agent( )
response = agent. invoke( { 'input' : '计算 2345 + 5678 的结果,并解释这两个数字的数学特性。' , 'chat_history' : [ ] } )
elif agent_type == "custom" :
agent = create_custom_tool_agent( )
response = agent. invoke( { 'input' : '北京今天的时间和今天的天气如何?然后计算25乘以4的结果。' , 'chat_history' : [ ] } )
print ( f"\n最终回答: { response} " )