目录
一、导读
二、原理说明
1、简介
2、子图图示
3、使用说明
三、基础代码实现
1、实现功能
2、Graph 图示
3、代码实现
4、输出
5、分析
四、人机交互
1、实现中断
2、历史状态(父图)
3、历史状态(子图)
4、历史回溯
5、整体代码
五、子图状态更改
1、修改状态
2、恢复执行
3、整体代码
4、更新节点
5、更新子图
一、导读
环境:OpenEuler、Windows 11、WSL 2、Python 3.12.3 langchain 0.3 langgraph 0.2
背景:前期忙碌的开发阶段结束,需要沉淀自己的应用知识,过一遍LangGraph
时间:20250307
说明:技术梳理,LangGraph 的多代理(多智能体)可以基于子图实现,此处对子图进行说明,案例基于官方文档进行部分的修改
官方文档地址:LangGraph 子图实现
二、原理说明
1、简介
子图允许构建具有多个组件的复杂系统,这些组件本身是图。使用子图的常见用例是构建多代理系统。子图其本质就是一个节点,只不过该节点是一个图而已。由此可知:也有孙子图、曾孙子图等
在添加子图时,主要问题是父图和子图如何进行通信,即它们如何在图执行期间相互传递状态。这里有两种情况:
父图和子图共享state。在这种情况下,您可以添加一个编译后的子图节点
父图和子图具有不同的state。在这种情况下,您必须添加一个调用子图的节点函数:当父图和子图具有不同的状态模式时,这在调用子图之前或之后需要转换状态时很有用
2、子图图示
主节点具有选择性,分别是subgraph1、subgraph2
3、使用说明
一种常见的情况是父图和子图通过共享state进行通信。例如,在多代理系统中,代理通常通过共享的state进行通信。
如果你的子图与父图共享state,你可以按照以下步骤将其添加到父图中:
定义子图工作流(如下例中的subgraph_builder)并进行编译
在定义父图工作流时,将编译后的子图传递给.add_node方法
三、基础代码实现
1、实现功能
开始节点进行意图分类,如果天气相关则走天气相关的子图;否则走智能助手节点。
2、Graph 图示
3、代码实现
from langgraph.graph import StateGraph, END, START, MessagesState
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from typing import Literal
from langgraph.checkpoint.memory import MemorySaver
@tool
def get_weather(city: str):
"""获取天气信息"""
return f"{city},晴空万里,阳光明媚!"
# 添加记忆存储
memory = MemorySaver()
# 配置信息,用于记录对话记录
config = {"configurable": {"thread_id": "1"}}
# 指定大模型的API Key 等相关信息
llm = ChatOpenAI(
base_url="https://lxxxxx.enovo.com/v1/",
api_key="sxxxxxxxwW",
model_name="qwen2.5-instruct"
)
# 绑定工具
model = raw_model.bind_tools([get_weather])
# 子图的state
class SubGraphState(MessagesState):
city: str
# 识别地点(城市)
def model_node(state: SubGraphState):
system_message = """用户的问题是某个地方的天气问题,请辨别具体城市名称,并输出城市名称。"""
messages = [{"role": "system", "content": system_message}] + state["messages"]
result = raw_model.invoke(messages)
return {"city": result.content}
# 天气工具节点
def weather_node(state: SubGraphState):
result = get_weather.invoke({"city": state["city"]})
return {"messages": [{"role": "assistant", "content": result}]}
# 子图的添加节点、边以及编译图
subgraph = StateGraph(SubGraphState)
subgraph.add_node(model_node)
subgraph.add_node(weather_node)
subgraph.add_edge(START, "model_node")
subgraph.add_edge("model_node", "weather_node")
subgraph.add_edge("weather_node", END)
subgraph = subgraph.compile()
# 意图分类state
class RouterState(MessagesState):
route: Literal["天气", "其他"]
# 意图分类节点
def router_node(state: RouterState):
system_message = """用户输入与天气相关则输出"天气",与天气无关返回"其他”"""
messages = [{"role": "system", "content": system_message}] + state["messages"]
route = raw_model.invoke(messages)
return {"route": route.content}
# 智能助手节点
def normal_llm_node(state: RouterState):
response = raw_model.invoke(state["messages"])
return {"messages": [response]}
# 选择边函数
def route_after_prediction(state: RouterState):
if state["route"] == "天气":
return "weather_graph"
else:
return "normal_llm_node"
# 父图的添加边、节点以及编译父图
graph = StateGraph(RouterState)
graph.add_node(router_node)
graph.add_node(normal_llm_node)
# 将子图作为一个节点添加到父图中
graph.add_node("weather_graph", subgraph)
graph.add_edge(START, "router_node")
graph.add_conditional_edges("router_node", route_after_prediction)
graph.add_edge("normal_llm_node", END)
graph.add_edge("weather_graph", END)
graph = graph.compile(checkpointer=memory)
# 以流式调用图
def stream_graph_updates(user_input: str):
for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}, config=config, stream_mode="updates"):
if event.get("router_node"):
continue
else:
for value in event.values():
print("Assistant:", value["messages"][-1].content)
# 设置聊天退出方法
while True:
try:
user_input = input("User: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
stream_graph_updates(user_input)
except Exception as e:
print(e)
user_input = "What do you know about LangGraph?"
print("User: " + user_input)
stream_graph_updates(user_input)
break
4、输出
User: 北京天气怎么样
Assistant: 北京,晴空万里,阳光明媚!
User: hi
Assistant: Hello! How can I assist you today?
User: 天津的天气怎么样
Assistant: 天津,晴空万里,阳光明媚!
5、分析
据上述输出可知,询问天气则会返回相关信息;其他对话内容会按照普通助手功能回复。
四、人机交互
1、实现中断
子图实现中断仅需在编译的时候添加 interrupt_before=["节点名称"],修改如下:
subgraph = subgraph.compile(interrupt_before=["weather_node"])
这样修改就会导致在weather_node发生中断,且输出值为 {'__interrupt__': ()},所以判断该dict是否存在__interrupt__即可。
调用图的地方进行判断,如下:
def stream_graph_updates(user_input: str):
for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}, config=config, stream_mode="updates"):
if "__interrupt__" in event:
print("中断")
elif event.get("router_node"):
continue
else:
for value in event.values():
print("Assistant:", value["messages"][-1].content)
循环输出内容如下:
User: 北京天气怎么样
中断
User:
2、历史状态(父图)
由于此时运行状态已经到了while True:,所以此时也不存在graph等变量信息。
所以,想要实现人机交互,需要在输出中断的地方即:print("中断"),下方执行逻辑并恢复状态
添加如下代码,来查看所有的历史节点
for h in graph.get_state_history(config=config):
print(h.next)
else:
print("遍历结束")
其输出信息如下:
User: 北京天气怎么样
中断
('weather_graph',)
('router_node',)
('__start__',)
遍历结束
User:
3、历史状态(子图)
显然,存在问题,由上述的Graph图示可知,该问题到达了model_node节点,但是遍历却没有。该问题是由于子图的问题。如何查询所有的遍历呢?weather_graph是子图,可以先获取子图的state,然后再遍历子图的历史记录,方法如下
# 以流式调用图
def stream_graph_updates(user_input: str):
for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}, config=config, stream_mode="updates"):
if "__interrupt__" in event:
print("中断")
for h in graph.get_state_history(config):
print(h.next)
if h.next == ("weather_graph",):
weather_graph_state = h.tasks[0].state
for h in graph.get_state_history(weather_graph_state):
print(h.next)
if h.next == ("model_node",):
model_node_state = h
print(model_node_state.next)
elif event.get("router_node"):
continue
else:
for value in event.values():
print("Assistant:", value["messages"][-1].content)
此处代码为了遍历所有,并未将判断后就停止遍历
输出内容
User: 北京天气怎么样
中断
('weather_graph',)
('router_node',)
('__start__',)
('weather_node',)
('model_node',)
('__start__',)
('model_node',)
User:
最开始打印的是父图的节点信息,获得子图的state后,定义为weather_graph_state,之后遍历weather_graph_state的历史记录,找到指定的model_node,抵达目标历史记录定位。
4、历史回溯
实现时空穿梭,此时如果在 print(model_node_state.next)下添加stream_graph_updates(user_input),如果user_input发生变化,则按照逻辑去执行,反之实现死循环,部分代码如下:
# 以流式调用图
def stream_graph_updates(user_input: str):
for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}, config=config, stream_mode="updates"):
if "__interrupt__" in event:
print("中断")
for h in graph.get_state_history(config):
print(h.next)
if h.next == ("weather_graph",):
weather_graph_state = h.tasks[0].state
for h in graph.get_state_history(weather_graph_state):
print(h.next)
if h.next == ("model_node",):
model_node_state = h
print(model_node_state.next)
stream_graph_updates(user_input)
elif event.get("router_node"):
continue
else:
for value in event.values():
print("Assistant:", value["messages"][-1].content)
输出内容如下
User: 北京天气怎么样
中断
('weather_graph',)
('router_node',)
('__start__',)
('weather_node',)
('model_node',)
('__start__',)
('model_node',)
中断
('weather_graph',)
('router_node',)
('__start__',)
('weather_graph',)
('router_node',)
('__start__',)
('weather_node',)
('model_node',)
('__start__',)
('model_node',)
中断
('weather_graph',)
('router_node',)
('__start__',)
('weather_graph',)。。。
此处添加自己的逻辑,可以实现如果出现错误,实现自动回溯功能
5、整体代码
from langgraph.graph import StateGraph, END, START, MessagesState
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from typing import Literal
from langgraph.checkpoint.memory import MemorySaver
@tool
def get_weather(city: str):
"""获取天气信息"""
return f"{city},晴空万里,阳光明媚!"
# 添加记忆存储
memory = MemorySaver()
# 配置信息,用于记录对话记录
config = {"configurable": {"thread_id": "1"}}
# 指定大模型的API Key 等相关信息
llm = ChatOpenAI(
base_url="https://lxxxxx.enovo.com/v1/",
api_key="sxxxxxxxwW",
model_name="qwen2.5-instruct"
)
# 绑定工具
model = raw_model.bind_tools([get_weather])
# 子图的state
class SubGraphState(MessagesState):
city: str
# 识别地点(城市)
def model_node(state: SubGraphState):
system_message = """用户的问题是某个地方的天气问题,请辨别具体城市名称,并输出城市名称。"""
messages = [{"role": "system", "content": system_message}] + state["messages"]
result = raw_model.invoke(messages)
return {"city": result.content}
# 天气工具节点
def weather_node(state: SubGraphState):
result = get_weather.invoke({"city": state["city"]})
return {"messages": [{"role": "assistant", "content": result}]}
# 子图的添加节点、边以及编译图
subgraph = StateGraph(SubGraphState)
subgraph.add_node(model_node)
subgraph.add_node(weather_node)
subgraph.add_edge(START, "model_node")
subgraph.add_edge("model_node", "weather_node")
subgraph.add_edge("weather_node", END)
# subgraph = subgraph.compile()
subgraph = subgraph.compile(interrupt_before=["weather_node"])
# 意图分类state
class RouterState(MessagesState):
route: Literal["天气", "其他"]
# 意图分类节点
def router_node(state: RouterState):
system_message = """用户输入与天气相关则输出"天气",与天气无关返回"其他”"""
messages = [{"role": "system", "content": system_message}] + state["messages"]
route = raw_model.invoke(messages)
return {"route": route.content}
# 智能助手节点
def normal_llm_node(state: RouterState):
response = raw_model.invoke(state["messages"])
return {"messages": [response]}
# 选择边函数
def route_after_prediction(state: RouterState):
if state["route"] == "天气":
return "weather_graph"
else:
return "normal_llm_node"
# 父图的添加边、节点以及编译父图
graph = StateGraph(RouterState)
graph.add_node(router_node)
graph.add_node(normal_llm_node)
# 将子图作为一个节点添加到父图中
graph.add_node("weather_graph", subgraph)
graph.add_edge(START, "router_node")
graph.add_conditional_edges("router_node", route_after_prediction)
graph.add_edge("normal_llm_node", END)
graph.add_edge("weather_graph", END)
graph = graph.compile(checkpointer=memory)
# 以流式调用图
def stream_graph_updates(user_input: str):
for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}, config=config, stream_mode="updates"):
if "__interrupt__" in event:
print("中断")
for h in graph.get_state_history(config):
print(h.next)
if h.next == ("weather_graph",):
weather_graph_state = h.tasks[0].state
for h in graph.get_state_history(weather_graph_state):
print(h.next)
if h.next == ("model_node",):
model_node_state = h
print(model_node_state.next)
stream_graph_updates(user_input)
elif event.get("router_node"):
continue
else:
for value in event.values():
print("Assistant:", value["messages"][-1].content)
# 设置聊天退出方法
while True:
try:
user_input = input("User: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
stream_graph_updates(user_input)
except Exception as e:
print(e)
user_input = "What do you know about LangGraph?"
print("User: " + user_input)
stream_graph_updates(user_input)
break
五、子图状态更改
我会询问北京天气怎么,通过中断,将其state中的city的value修改为天津,实现该demo的功能。
此处我会将部分代码修改,因为上述代码本身为了能够清晰明了的展示其功能
1、修改状态
城市获取的节点是model_node,通过人机交互回溯到该节点,并更新该历史状态中的city信息。
获取当前状态
state = graph.get_state(config)
将city对应的value修改为 天津
graph.update_state(state.tasks[0].state, {"city": "天津"})
部分代码如下
# 以流式调用图
def stream_graph_updates(user_input: str):
for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}, config=config, stream_mode="updates"):
if "__interrupt__" in event:
print("中断")
for h in graph.get_state_history(config):
if h.next == ("weather_graph",):
weather_graph_state = h.tasks[0].state
break
for h in graph.get_state_history(weather_graph_state):
if h.next == ("model_node",):
state = graph.get_state(config)
graph.update_state(state.tasks[0].state, {"city": "天津"})
break
elif event.get("router_node"):
continue
else:
for value in event.values():
print("Assistant:", value["messages"][-1].content)
该段代码实现了将第一个task的city值,修改为天津
2、恢复执行
因为此节点不需要输入信息,故而此处输入信息为None,代码如下
# 恢复执行graph
def update_city():
for event in graph.stream(None, config=config, stream_mode="updates"):
print(event.get("weather_graph").get("messages")[-1].content)
此函数在graph.update_state(state.tasks[0].state, {"city": "天津"})添加update_city()
graph.update_state(state.tasks[0].state, {"city": "天津"})
update_city()
3、整体代码
from langgraph.graph import StateGraph, END, START, MessagesState
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from typing import Literal
from langgraph.checkpoint.memory import MemorySaver
@tool
def get_weather(city: str):
"""获取天气信息"""
return f"{city},晴空万里,阳光明媚!"
# 添加记忆存储
memory = MemorySaver()
# 配置信息,用于记录对话记录
config = {"configurable": {"thread_id": "1"}}
# 指定大模型的API Key 等相关信息
llm = ChatOpenAI(
base_url="https://lxxxxx.enovo.com/v1/",
api_key="sxxxxxxxwW",
model_name="qwen2.5-instruct"
)
# 绑定工具
model = raw_model.bind_tools([get_weather])
# 子图的state
class SubGraphState(MessagesState):
city: str
# 识别地点(城市)
def model_node(state: SubGraphState):
system_message = """用户的问题是某个地方的天气问题,请辨别具体城市名称,并输出城市名称。"""
messages = [{"role": "system", "content": system_message}] + state["messages"]
result = raw_model.invoke(messages)
return {"city": result.content}
# 天气工具节点
def weather_node(state: SubGraphState):
result = get_weather.invoke({"city": state["city"]})
return {"messages": [{"role": "assistant", "content": result}]}
# 子图的添加节点、边以及编译图
subgraph = StateGraph(SubGraphState)
subgraph.add_node(model_node)
subgraph.add_node(weather_node)
subgraph.add_edge(START, "model_node")
subgraph.add_edge("model_node", "weather_node")
subgraph.add_edge("weather_node", END)
# subgraph = subgraph.compile()
subgraph = subgraph.compile(interrupt_before=["weather_node"])
# 意图分类state
class RouterState(MessagesState):
route: Literal["天气", "其他"]
# 意图分类节点
def router_node(state: RouterState):
system_message = """用户输入与天气相关则输出"天气",与天气无关返回"其他”"""
messages = [{"role": "system", "content": system_message}] + state["messages"]
route = raw_model.invoke(messages)
return {"route": route.content}
# 智能助手节点
def normal_llm_node(state: RouterState):
response = raw_model.invoke(state["messages"])
return {"messages": [response]}
# 选择边函数
def route_after_prediction(state: RouterState):
if state["route"] == "天气":
return "weather_graph"
else:
return "normal_llm_node"
# 父图的添加边、节点以及编译父图
graph = StateGraph(RouterState)
graph.add_node(router_node)
graph.add_node(normal_llm_node)
# 将子图作为一个节点添加到父图中
graph.add_node("weather_graph", subgraph)
graph.add_edge(START, "router_node")
graph.add_conditional_edges("router_node", route_after_prediction)
graph.add_edge("normal_llm_node", END)
graph.add_edge("weather_graph", END)
graph = graph.compile(checkpointer=memory)
# 恢复执行graph
def update_city():
for event in graph.stream(None, config=config, stream_mode="updates"):
print(event.get("weather_graph").get("messages")[-1].content)
# 以流式调用图
def stream_graph_updates(user_input: str):
for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}, config=config, stream_mode="updates"):
if "__interrupt__" in event:
print("中断")
for h in graph.get_state_history(config):
if h.next == ("weather_graph",):
weather_graph_state = h.tasks[0].state
break
for h in graph.get_state_history(weather_graph_state):
if h.next == ("model_node",):
state = graph.get_state(config)
# state.values["city"] = "天津"
graph.update_state(state.tasks[0].state, {"city": "天津"})
update_city()
break
elif event.get("router_node"):
continue
else:
for value in event.values():
print("Assistant:", value["messages"][-1].content)
# 设置聊天退出方法
while True:
try:
user_input = input("User: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
stream_graph_updates(user_input)
except Exception as e:
print(e)
user_input = "What do you know about LangGraph?"
print("User: " + user_input)
stream_graph_updates(user_input)
break
输出内容
User: 北京天气怎么样
中断
天津,晴空万里,阳光明媚!
User:
4、更新节点
指定节点,指定输出内容
只需将
graph.update_state(state.tasks[0].state, {"city": "天津"})
替换为
graph.update_state(
state.tasks[0].state,
{"messages": [{"role": "assistant", "content": "天津,晴空万里,阳光明媚!"}]},
as_node="weather_node",
)
输出内容
User: 北京天气怎么样
中断
天津,晴空万里,阳光明媚!
User:
5、更新子图
指定子图(子图本质就是一个节点),指定输出内容
由于是针对整个子图操作,所以不再需要update_city方法,部分代码如下
for h in graph.get_state_history(weather_graph_state):
if h.next == ("model_node",):
graph.update_state(
config,
{"messages": [{"role": "assistant", "content": "天津,晴空万里,阳光明媚!"}]},
as_node="weather_graph",
)
messages = graph.get_state(config).values["messages"]
print(messages[-1].content)
break
输出内容
User: 北京天气怎么样
中断
天津,晴空万里,阳光明媚!
User:
由于此处代码变化有些多,此处提供完整代码
from langgraph.graph import StateGraph, END, START, MessagesState
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from typing import Literal
from langgraph.checkpoint.memory import MemorySaver
@tool
def get_weather(city: str):
"""获取天气信息"""
return f"{city},晴空万里,阳光明媚!"
# 添加记忆存储
memory = MemorySaver()
# 配置信息,用于记录对话记录
config = {"configurable": {"thread_id": "1"}}
# 指定大模型的API Key 等相关信息
llm = ChatOpenAI(
base_url="https://lxxxxx.enovo.com/v1/",
api_key="sxxxxxxxwW",
model_name="qwen2.5-instruct"
)
# 绑定工具
model = raw_model.bind_tools([get_weather])
# 子图的state
class SubGraphState(MessagesState):
city: str
# 识别地点(城市)
def model_node(state: SubGraphState):
system_message = """用户的问题是某个地方的天气问题,请辨别具体城市名称,并输出城市名称。"""
messages = [{"role": "system", "content": system_message}] + state["messages"]
result = raw_model.invoke(messages)
return {"city": result.content}
# 天气工具节点
def weather_node(state: SubGraphState):
result = get_weather.invoke({"city": state["city"]})
return {"messages": [{"role": "assistant", "content": result}]}
# 子图的添加节点、边以及编译图
subgraph = StateGraph(SubGraphState)
subgraph.add_node(model_node)
subgraph.add_node(weather_node)
subgraph.add_edge(START, "model_node")
subgraph.add_edge("model_node", "weather_node")
subgraph.add_edge("weather_node", END)
# subgraph = subgraph.compile()
subgraph = subgraph.compile(interrupt_before=["weather_node"])
# 意图分类state
class RouterState(MessagesState):
route: Literal["天气", "其他"]
# 意图分类节点
def router_node(state: RouterState):
system_message = """用户输入与天气相关则输出"天气",与天气无关返回"其他”"""
messages = [{"role": "system", "content": system_message}] + state["messages"]
route = raw_model.invoke(messages)
return {"route": route.content}
# 智能助手节点
def normal_llm_node(state: RouterState):
response = raw_model.invoke(state["messages"])
return {"messages": [response]}
# 选择边函数
def route_after_prediction(state: RouterState):
if state["route"] == "天气":
return "weather_graph"
else:
return "normal_llm_node"
# 父图的添加边、节点以及编译父图
graph = StateGraph(RouterState)
graph.add_node(router_node)
graph.add_node(normal_llm_node)
# 将子图作为一个节点添加到父图中
graph.add_node("weather_graph", subgraph)
graph.add_edge(START, "router_node")
graph.add_conditional_edges("router_node", route_after_prediction)
graph.add_edge("normal_llm_node", END)
graph.add_edge("weather_graph", END)
graph = graph.compile(checkpointer=memory)
# 恢复执行graph
def update_city():
for event in graph.stream(None, config=config, stream_mode="updates"):
print(event)
print(event.get("weather_graph").get("messages")[-1].content)
# 以流式调用图
def stream_graph_updates(user_input: str):
for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}, config=config, stream_mode="updates"):
if "__interrupt__" in event:
print("中断")
for h in graph.get_state_history(config):
if h.next == ("weather_graph",):
weather_graph_state = h.tasks[0].state
break
for h in graph.get_state_history(weather_graph_state):
if h.next == ("model_node",):
graph.update_state(
config,
{"messages": [{"role": "assistant", "content": "天津,晴空万里,阳光明媚!"}]},
as_node="weather_graph",
)
messages = graph.get_state(config).values["messages"]
print(messages[-1].content)
break
elif event.get("router_node"):
continue
else:
for value in event.values():
print("Assistant:", value["messages"][-1].content)
# 设置聊天退出方法
while True:
try:
user_input = input("User: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
stream_graph_updates(user_input)
except Exception as e:
print(e)
user_input = "What do you know about LangGraph?"
print("User: " + user_input)
stream_graph_updates(user_input)
break
六、综述
1、子图是父图的节点,可是孙图绝不是父图的节点
2、愚公移山 :子子孙孙,无穷尽也
3、子图的使用基本是在多智能体中使用
4、通信尽量使用state,避免函数调用