重磅发布!使用 LangGraph 创建一个超级AI Agent

几天前,LangChain 正式宣布了名为 LangGraph 的新库,LangGraph 建立在 LangChain 之上,简化了创建和管理Agent及其运行时的过程。

在这篇文章中,我们将全面介绍 langGraph,什么是代理和代理运行时?Langgraph 的特点是什么,以及如何在 Langgraph 中构建一个代理执行器,我们将探讨 Langgraph 中的聊天代理执行器以及如何在人类循环和聊天中修改 Langgraph 中的聊天 agent 执行器。



  • Agent Executor与LangChain类似,但在LangGraph中需要重建;

  • Chat Agent Executor以消息列表的形式处理代理状态,非常适合使用消息进行功能调用和响应的基于聊天的模型。



首先,我们需要通过安装几个包来设置我们的环境:LangChain、LangChain OpenAI和Tavily Python。这些将帮助我们利用现有的LangChain代理类,为我们的代理提供OpenAI的语言模型,并使用Tavily Python包实现搜索功能。

!pip install --quiet -U langchain langchain_openai tavily-python


import os
import getpass

os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")
os.environ["TAVILY_API_KEY"] = getpass.getpass("Tavily API Key:")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = getpass.getpass("LangSmith API Key:")


from langchain import hub
from langchain.agents import create_openai_functions_agent
from langchain_openai.chat_models import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

tools = [TavilySearchResults(max_results=1)]

# Get the prompt to use - you can modify this!
prompt = hub.pull("hwchase17/openai-functions-agent")

# Choose the LLM that will drive the agent
llm = ChatOpenAI(model="gpt-3.5-turbo-1106", streaming=True)

# Construct the OpenAI Functions agent
agent_runnable = create_openai_functions_agent(llm, tools, prompt)


from typing import TypedDict, Annotated, List, Union
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
   # The input string
   input: str
   # The list of previous messages in the conversation
   chat_history: list[BaseMessage]
   # The outcome of a given call to the agent
   # Needs `None` as a valid type, since this is what this will start as
   agent_outcome: Union[AgentAction, AgentFinish, None]
   # List of actions and corresponding observations
   # Here we annotate this with `operator.add` to indicate that operations to
   # this state should be ADDED to the existing values (not overwrite it)
   intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]


我们将研究一些细节,如调用代理的“run agent”节点和执行代理选择的工具的“execute tools”函数。我们还将添加一个“should continue”函数来确定下一步行动。

from langchain_core.agents import AgentFinish
from langgraph.prebuilt.tool_executor import ToolExecutor

# This a helper class we have that is useful for running tools
# It takes in an agent action and calls that tool and returns the result
tool_executor = ToolExecutor(tools)

# Define the agent
def run_agent(data):
    agent_outcome = agent_runnable.invoke(data)
    return {"agent_outcome": agent_outcome}

# Define the function to execute tools
def execute_tools(data):
    # Get the most recent agent_outcome - this is the key added in the `agent` above
    agent_action = data['agent_outcome']
    output = tool_executor.invoke(agent_action)
    return {"intermediate_steps": [(agent_action, str(output))]}

# Define logic that will be used to determine which conditional edge to go down
def should_continue(data):
    # If the agent outcome is an AgentFinish, then we return `exit` string
    # This will be used when setting up the graph to define the flow
    if isinstance(data['agent_outcome'], AgentFinish):
        return "end"
    # Otherwise, an AgentAction is returned
    # Here we return `continue` string
    # This will be used when setting up the graph to define the flow
        return "continue"


from langgraph.graph import END, StateGraph

# Define a new graph
workflow = StateGraph(AgentState)

# Define the two nodes we will cycle between
workflow.add_node("agent", run_agent)
workflow.add_node("action", execute_tools)

# Set the entrypoint as `agent`
# This means that this node is the first one called

# We now add a conditional edge
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    # Next, we pass in the function that will determine which node is called next.
    # Finally we pass in a mapping.
    # The keys are strings, and the values are other nodes.
    # END is a special node marking that the graph should finish.
    # What will happen is we will call `should_continue`, and then the output of that
    # will be matched against the keys in this mapping.
    # Based on which one it matches, that node will then be called.
        # If `tools`, then we call the tool node.
        "continue": "action",
        # Otherwise we finish.
        "end": END

# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge('action', 'agent')

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()


inputs = {"input": "what is the weather in sf", "chat_history": []}
for s in app.stream(inputs):


{'agent_outcome': AgentActionMessageLog(tool='tavily_search_results_json', tool_input={'query': 'weather in San Francisco'}, log="\nInvoking: `tavily_search_results_json` with `{'query': 'weather in San Francisco'}`\n\n\n", message_log=[AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"query":"weather in San Francisco"}', 'name': 'tavily_search_results_json'}})])}
{'intermediate_steps': [(AgentActionMessageLog(tool='tavily_search_results_json', tool_input={'query': 'weather in San Francisco'}, log="\nInvoking: `tavily_search_results_json` with `{'query': 'weather in San Francisco'}`\n\n\n", message_log=[AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"query":"weather in San Francisco"}', 'name': 'tavily_search_results_json'}})]), "[{'url': 'https://www.whereandwhen.net/when/north-america/california/san-francisco-ca/january/', 'content': 'Best time to go to San Francisco? Weather in San Francisco in january 2024  How was the weather last january? Here is the day by day recorded weather in San Francisco in january 2023:  Seasonal average climate and temperature of San Francisco in january  8% 46% 29% 12% 8% Evolution of daily average temperature and precipitation in San Francisco in januaryWeather in San Francisco in january 2024. The weather in San Francisco in january comes from statistical datas on the past years. You can view the weather statistics the entire month, but also by using the tabs for the beginning, the middle and the end of the month. ... 16-01-2023 45°F to 52°F. 17-01-2023 45°F to 54°F. 18-01-2023 47°F to ...'}]")]}







4.1 安装软件包:

同样需要LangChain软件包,LangChain OpenAI用于模型,Tavily软件包用于搜索工具,并为这些服务设置API密钥。

!pip install --quiet -U langchain langchain_openai tavily-python
import os
import getpass

os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")
os.environ["TAVILY_API_KEY"] = getpass.getpass("Tavily API Key:")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = getpass.getpass("LangSmith API Key:")

4.2 设置工具和模型:

我们将使用Tavily Search作为我们的工具,并设置一个工具执行器来调用这些工具。对于模型,我们将使用LangChain集成中的Chat OpenAI模型,确保其在启用流式进行初始化。这使我们能够流式返回tokens,并附加我们希望模型调用的函数。

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolExecutor
from langchain.tools.render import format_tool_to_openai_function
tools = [TavilySearchResults(max_results=1)]
tool_executor = ToolExecutor(tools)
# We will set streaming=True so that we can stream tokens
# See the streaming section for more information on this.
model = ChatOpenAI(temperature=0, streaming=True)
functions = [format_tool_to_openai_function(t) for t in tools]
model = model.bind_functions(functions)

4.3 定义代理状态:

代理状态是一个简单的字典,其中包含消息列表的键。我们将使用“add to”标记,这样随着时间的推移,节点对此消息列表的任何更新都会累积。

from typing import TypedDict, Annotated, Sequence
import operator
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]

4.4 创建节点和边:


from langgraph.prebuilt import ToolInvocation
import json
from langchain_core.messages import FunctionMessage

# Define the function that determines whether to continue or not
def should_continue(state):
    messages = state['messages']
    last_message = messages[-1]
    # If there is no function call, then we finish
    if "function_call" not in last_message.additional_kwargs:
        return "end"
    # Otherwise if there is, we continue
        return "continue"

# Define the function that calls the model
def call_model(state):
    messages = state['messages']
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}

# Define the function to execute tools
def call_tool(state):
    messages = state['messages']
    # Based on the continue condition
    # we know the last message involves a function call
    last_message = messages[-1]
    # We construct an ToolInvocation from the function_call
    action = ToolInvocation(
    # We call the tool_executor and get back a response
    response = tool_executor.invoke(action)
    # We use the response to create a FunctionMessage
    function_message = FunctionMessage(content=str(response), name=action.tool)
    # We return a list, because this will get added to the existing list
    return {"messages": [function_message]}

4.5 构建图:


from langgraph.graph import StateGraph, END
# Define a new graph
workflow = StateGraph(AgentState)

# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", call_tool)

# Set the entrypoint as `agent`
# This means that this node is the first one called

# We now add a conditional edge
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    # Next, we pass in the function that will determine which node is called next.
    # Finally we pass in a mapping.
    # The keys are strings, and the values are other nodes.
    # END is a special node marking that the graph should finish.
    # What will happen is we will call `should_continue`, and then the output of that
    # will be matched against the keys in this mapping.
    # Based on which one it matches, that node will then be called.
        # If `tools`, then we call the tool node.
        "continue": "action",
        # Otherwise we finish.
        "end": END

# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge('action', 'agent')

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()

4.6 编译和使用图形:


from langchain_core.messages import HumanMessage

inputs = {"messages": [HumanMessage(content="what is the weather in sf")]}

4.7 观察执行过程:





让我们修改LangGraph中的聊天代理执行器,使其包含一个“human in the loop”组件,这样在执行工具操作之前可以进行人工验证。

设置: 初始设置保持不变。不需要额外安装。我们将创建我们的工具,设置工具执行器,准备我们的模型,将工具绑定到模型,并定义代理状态——所有这些都与我们在前一个会话中所做的一样。

关键修改——调用工具功能: 主要的变化来自调用工具功能。我们添加了一个步骤,系统在交互式IDE中提示用户(即您!),询问是否继续执行特定操作。如果用户响应“否”,则会引发错误,进程将停止。这是我们的人工验证步骤。

# Define the function to execute tools
def call_tool(state):
    messages = state['messages']
    # Based on the continue condition
    # we know the last message involves a function call
    last_message = messages[-1]
    # We construct an ToolInvocation from the function_call
    action = ToolInvocation(
    response = input(prompt=f"[y/n] continue with: {action}?")
    if response == "n":
        raise ValueError
    # We call the tool_executor and get back a response
    response = tool_executor.invoke(action)
    # We use the response to create a FunctionMessage
    function_message = FunctionMessage(content=str(response), name=action.tool)
    # We return a list, because this will get added to the existing list
    return {"messages": [function_message]}


utput from node 'agent':
{'messages': [AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{\n  "query": "weather in San Francisco"\n}', 'name': 'tavily_search_results_json'}})]}


ValueError                                Traceback (most recent call last)
Cell In[10], line 4
      1 from langchain_core.messages import HumanMessage
      3 inputs = {"messages": [HumanMessage(content="what is the weather in sf")]}
----> 4 for output in app.stream(inputs):
      5     # stream() yields dictionaries with output keyed by node name
      6     for key, value in output.items():
      7         print(f"Output from node '{key}':")






def call_model(state):
    messages = state['messages'][-5:]
    response = model.invoke(messages)
    # We return a list, because this will get added to the existing list
    return {"messages": [response]}
  • 仅选择最近的五条消息。

  • 包括系统消息加上五条最新消息。

  • 总结比最近五条消息旧的消息。


使用修改的执行器: 实现非常简单。仅有一条输入消息不同,但重要的是,您希望应用于代理步骤的任何逻辑都可以插入到这个新的修改部分。




关键修改——强制工具调用优先:我们这里的重点是设置聊天代理调用特定工具作为其第一个操作。为此,我们将添加一个新节点,并将其命名为“first model node”。该节点将被编程为返回一条消息,指示代理调用特定工具,如“Tavil search results Json”工具,并将最新的消息内容作为查询。

# This is the new first - the first call of the model we want to explicitly hard-code some action
from langchain_core.messages import AIMessage
import json

def first_model(state):
    human_input = state['messages'][-1].content
    return {
        "messages": [
                    "function_call": {
                        "name": "tavily_search_results_json", 
                        "arguments": json.dumps({"query": human_input})

更新图:我们将修改现有的图,将这个新的“first agent”节点作为入口点。这样可以确保始终首先调用第一个代理节点,然后调用动作节点。我们设置了一个从代理到动作或结束的条件节点,以及一个从动作回到代理的直接节点。关键的添加是从第一个代理到操作的一个新节点,确保工具调用一开始就发生。

from langgraph.graph import StateGraph, END
# Define a new graph
workflow = StateGraph(AgentState)

# Define the new entrypoint
workflow.add_node("first_agent", first_model)

# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("action", call_tool)

# Set the entrypoint as `agent`
# This means that this node is the first one called

# We now add a conditional edge
    # First, we define the start node. We use `agent`.
    # This means these are the edges taken after the `agent` node is called.
    # Next, we pass in the function that will determine which node is called next.
    # Finally we pass in a mapping.
    # The keys are strings, and the values are other nodes.
    # END is a special node marking that the graph should finish.
    # What will happen is we will call `should_continue`, and then the output of that
    # will be matched against the keys in this mapping.
    # Based on which one it matches, that node will then be called.
        # If `tools`, then we call the tool node.
        "continue": "action",
        # Otherwise we finish.
        "end": END

# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge('action', 'agent')

# After we call the first agent, we know we want to go to action
workflow.add_edge('first_agent', 'action')

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()






