LangChain 文档详细解析
LangChain 是一个用于构建与语言模型(如GPT-4)交互的框架。本文档介绍了LangChain v0.1版本中的Runnable接口及其相关功能。
目录
- Runnable接口
- 输入和输出模式
- 使用示例
- 异步方法
- 事件流
- 并行处理
1. Runnable接口
为了简化自定义链(chain)的创建,LangChain实现了一个名为“Runnable”的协议。许多LangChain组件,如聊天模型、LLMs(大型语言模型)、输出解析器、检索器、提示模板等,都实现了Runnable协议。Runnable协议定义了一组标准方法,使得调用和定义自定义链变得更加容易。
标准接口方法包括:
stream
: 流式返回响应的片段。invoke
: 在输入上调用链。batch
: 在输入列表上调用链。
这些方法还有对应的异步版本,适用于并发处理:
astream
: 异步流式返回响应的片段。ainvoke
: 异步调用链。abatch
: 异步在输入列表上调用链。astream_log
: 除了最终响应,还流式返回中间步骤。astream_events
: (测试版)在链中发生事件时流式传输事件(在langchain-core 0.1.14中引入)。
组件的输入和输出类型因组件而异:
组件 | 输入类型 | 输出类型 |
---|---|---|
Prompt | 字典 | PromptValue |
ChatModel | 单字符串、聊天消息列表或PromptValue | ChatMessage |
LLM | 单字符串、聊天消息列表或PromptValue | String |
OutputParser | LLM或ChatModel的输出 | 取决于解析器 |
Retriever | 单字符串 | 文档列表 |
Tool | 单字符串或字典,取决于工具 | 取决于工具 |
所有Runnable组件都公开输入和输出的schema,便于检查输入和输出:
input_schema
: 由Runnable结构自动生成的输入Pydantic模型。output_schema
: 由Runnable结构自动生成的输出Pydantic模型。
2. 输入和输出模式
输入模式 描述了Runnable接受的输入结构,而 输出模式 描述了Runnable生成的输出结构。通过调用.schema()
方法,可以获取输入或输出的JSONSchema表示。
示例:
# 获取链的输入模式(链的第一个部分是prompt)
chain.input_schema.schema()
输出示例:
{
"title": "PromptInput",
"type": "object",
"properties": {
"topic": {
"title": "Topic",
"type": "string"
}
}
}
类似地,可以获取具体组件的输入模式,如prompt和model:
prompt.input_schema.schema()
model.input_schema.schema()
输出模式 类似,描述了链的最终输出:
chain.output_schema.schema()
输出示例:
{
"title": "ChatOpenAIOutput",
"anyOf": [
{"$ref": "#/definitions/AIMessage"},
{"$ref": "#/definitions/HumanMessage"},
...
],
"definitions": {
"AIMessage": { ... },
"HumanMessage": { ... },
...
}
}
3. 使用示例
为了说明Runnable接口的方法,我们创建一个简单的PromptTemplate和ChatModel链。
安装必要的包:
%pip install --upgrade --quiet langchain-core langchain-community langchain-openai
代码示例:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model
这里,我们创建了一个提示模板,要求模型根据主题(topic
)讲一个笑话,并将提示与模型连接成一个链(chain
)。
调用方法示例:
-
Stream(流式调用):
for s in chain.stream({"topic": "bears"}): print(s.content, end="", flush=True)
输出:
Sure, here's a bear-themed joke for you: Why don't bears wear shoes? Because they already have bear feet!
-
Invoke(调用):
chain.invoke({"topic": "bears"})
输出:
AIMessage(content="Why don't bears wear shoes? \n\nBecause they have bear feet!")
-
Batch(批量调用):
chain.batch([{"topic": "bears"}, {"topic": "cats"}])
输出:
[ AIMessage(content="Sure, here's a bear joke for you:\n\nWhy don't bears wear shoes?\n\nBecause they already have bear feet!"), AIMessage(content="Why don't cats play poker in the wild?\n\nToo many cheetahs!") ]
4. 异步方法
LangChain还支持异步调用,适用于需要并发处理的场景。以下是异步方法的示例:
-
异步流式调用:
async for s in chain.astream({"topic": "bears"}): print(s.content, end="", flush=True)
输出与同步流式调用类似,但以异步方式处理。
-
异步调用:
await chain.ainvoke({"topic": "bears"})
输出:
AIMessage(content="Why don't bears ever wear shoes?\n\nBecause they already have bear feet!")
-
异步批量调用:
await chain.abatch([{"topic": "bears"}])
输出:
[ AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!") ]
-
异步流式事件(测试版):
async for event in retrieval_chain.astream_events( "where did harrison work?", version="v1", include_names=["Docs", "my_llm"] ): # 处理事件
输出示例展示了事件的触发过程和相应的数据变化。
5. 事件流
事件流 提供了一种机制,可以在链执行过程中捕捉和处理各种事件。这对于调试、监控和展示进度非常有用。
事件参考表:
事件名称 | 相关组件 | 数据内容 |
---|---|---|
on_chat_model_start | ChatModel | 输入消息的内容 |
on_chat_model_stream | ChatModel | AI消息片段 |
on_chat_model_end | ChatModel | 最终生成的消息 |
on_llm_start | LLM | 输入内容 |
on_llm_stream | LLM | 生成的内容片段 |
on_llm_end | LLM | 最终生成的内容 |
on_chain_start | Chain | 链的开始执行 |
on_chain_stream | Chain | 链执行过程中的流式数据 |
on_chain_end | Chain | 链执行完成后的输出 |
on_tool_start | Tool | 工具开始执行的输入数据 |
on_tool_stream | Tool | 工具执行过程中的流式数据 |
on_tool_end | Tool | 工具执行完成后的输出 |
on_retriever_start | Retriever | 检索器开始执行的查询 |
on_retriever_chunk | Retriever | 检索器返回的文档片段 |
on_retriever_end | Retriever | 检索器执行完成后的文档列表 |
on_prompt_start | PromptTemplate | 提示模板开始执行的输入数据 |
on_prompt_end | PromptTemplate | 提示模板执行完成后的消息 |
示例代码解析:
async for event in retrieval_chain.astream_events(
"where did harrison work?", version="v1", include_names=["Docs", "my_llm"]
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="|")
elif kind in {"on_chat_model_start"}:
print()
print("Streaming LLM:")
elif kind in {"on_chat_model_end"}:
print()
print("Done streaming LLM.")
elif kind == "on_retriever_end":
print("--")
print("Retrieved the following documents:")
print(event["data"]["output"]["documents"])
elif kind == "on_tool_end":
print(f"Ended tool: {event['name']}")
else:
pass
输出示例:
--
Retrieved the following documents:
[Document(page_content='harrison worked at kensho')]
Streaming LLM:
|H|arrison| worked| at| Kens|ho|.||
Done streaming LLM.
6. 并行处理
并行处理 允许同时执行多个Runnable,提升处理效率。LangChain通过RunnableParallel实现并行执行。
示例代码:
from langchain_core.runnables import RunnableParallel
chain1 = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
chain2 = (
ChatPromptTemplate.from_template("write a short (2 line) poem about {topic}")
| model
)
combined = RunnableParallel(joke=chain1, poem=chain2)
调用示例:
combined.invoke({"topic": "bears"})
输出示例:
{
'joke': AIMessage(content="Sure, here's a bear-related joke for you:\n\nWhy did the bear bring a ladder to the bar?\n\nBecause he heard the drinks were on the house!"),
'poem': AIMessage(content="In wilderness they roam,\nMajestic strength, nature's throne.")
}
并行与批量处理结合:
combined.batch([{"topic": "bears"}, {"topic": "cats"}])
输出示例:
[
{
'joke': AIMessage(content="Sure, here's a bear joke for you:\n\nWhy don't bears wear shoes?\n\nBecause they have bear feet!"),
'poem': AIMessage(content="Majestic bears roam,\nNature's strength, beauty shown.")
},
{
'joke': AIMessage(content="Why don't cats play poker in the wild?\n\nToo many cheetahs!"),
'poem': AIMessage(content="Whiskers dance, eyes aglow,\nCats embrace the night's gentle flow.")
}
]
invoke()方法
invoke()
方法详解
在LangChain的Runnable接口中,invoke()
方法是一个核心功能,用于在给定的输入上同步调用链(Chain)并获取结果。它简化了与链交互的过程,使开发者能够轻松地利用预定义或自定义的链来处理输入并生成输出。
1. 什么是 invoke()
方法?
invoke()
方法是Runnable协议的一部分,旨在同步地执行链上的操作。它接受一个输入,传递给链中的第一个组件(例如Prompt模板),然后依次通过链中的各个组件处理,最终返回链的输出。
2. invoke()
方法的基本用法
示例场景
假设我们有一个简单的链,由一个提示模板和一个聊天模型组成。我们希望根据给定的主题(topic
)生成一个笑话。
步骤详解
-
安装必要的包
首先,确保安装了LangChain及其相关组件:
pip install --upgrade langchain-core langchain-community langchain-openai
-
导入必要的模块并初始化组件
from langchain_core.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI # 初始化聊天模型 model = ChatOpenAI() # 创建一个提示模板,要求模型根据主题讲笑话 prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}") # 将提示模板和模型连接成一个链 chain = prompt | model
-
使用
invoke()
方法调用链# 定义输入 input_data = {"topic": "bears"} # 调用 `invoke()` 方法 result = chain.invoke(input_data) # 输出结果 print(result)
输出示例:
AIMessage(content="Why don't bears wear shoes? \n\nBecause they have bear feet!")
3. invoke()
方法的参数
-
inputs
: 一个字典,包含链所需的所有输入参数。键通常对应于提示模板中的占位符。 -
config
(可选): 一个字典,用于配置调用的行为。例如,可以设置run_name
、tags
等,用于追踪和管理不同的运行。result = chain.invoke(input_data, config={"run_name": "joke_run"})
4. invoke()
方法的返回值
invoke()
方法的返回值类型取决于链的最后一个组件。例如:
- 如果链的最后一个组件是ChatModel,返回值可能是
ChatMessage
对象。 - 如果链的最后一个组件是LLM,返回值可能是一个字符串。
示例:
result = chain.invoke({"topic": "cats"})
print(result)
可能的输出:
AIMessage(content="Why don't cats play poker in the wild?\n\nToo many cheetahs!")
5. invoke()
方法的应用场景
- 单次请求:当需要根据单一输入生成输出时,
invoke()
方法非常适用。 - 集成到应用程序中:可以将
invoke()
方法集成到Web应用、命令行工具或其他软件中,以实现动态内容生成。
6. invoke()
方法与其他方法的对比
stream()
:invoke()
是同步的,适用于需要一次性获取完整输出的场景。stream()
是流式的,适用于需要逐步处理输出(如实时显示生成内容)的场景。
batch()
:invoke()
处理单个输入。batch()
允许同时处理多个输入,提高处理效率。
- 异步方法 (
ainvoke()
):ainvoke()
是invoke()
的异步版本,适用于需要并发处理的场景。
7. 实际案例
案例1:生成笑话
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# 初始化模型和提示模板
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model
# 调用 `invoke()` 方法
input_data = {"topic": "dogs"}
joke = chain.invoke(input_data)
# 打印结果
print(joke.content)
输出:
Why don't dogs make good dancers?
Because they have two left feet!
案例2:生成诗歌
假设我们有一个链,用于根据主题生成简短的诗歌:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# 初始化模型和提示模板
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("write a short (2 line) poem about {topic}")
chain = prompt | model
# 调用 `invoke()` 方法
input_data = {"topic": "sunset"}
poem = chain.invoke(input_data)
# 打印结果
print(poem.content)
输出:
Sunset paints the sky in gold,
Day's embrace, a story told.
8. 进阶用法
配置 invoke()
方法
可以通过config
参数传递额外的配置信息,以定制链的行为。例如,指定运行名称和标签:
input_data = {"topic": "technology"}
config = {"run_name": "tech_joke_run", "tags": ["joke", "technology"]}
result = chain.invoke(input_data, config=config)
print(result.content)
错误处理
在使用invoke()
方法时,可能会遇到各种错误(如输入格式错误、模型响应错误等)。建议使用异常处理机制来捕获和处理这些错误。
try:
result = chain.invoke({"topic": "invalid_topic"})
print(result.content)
except Exception as e:
print(f"An error occurred: {e}")
9. 性能考虑
invoke()
方法是同步的,适用于处理较短或不频繁的请求。如果需要处理大量请求或需要更高的响应速度,建议考虑使用异步方法(如 ainvoke()
)或批量处理(batch()
)来优化性能。
示例:批量调用
inputs = [{"topic": "bears"}, {"topic": "cats"}, {"topic": "technology"}]
results = chain.batch(inputs)
for res in results:
print(res.content)
输出:
Why don't bears wear shoes?
Because they have bear feet!
Why don't cats play poker in the wild?
Too many cheetahs!
Why did the computer go to the doctor?
Because it had a virus!
10. 总结
invoke()
方法是LangChain中一个强大且简洁的同步调用接口,适用于各种需要与链交互的场景。通过invoke()
,开发者可以轻松地将输入传递给链,并获取生成的输出,无需关心链内部的具体实现细节。这使得构建复杂的自然语言处理应用变得更加高效和直观。
希望通过以上详细讲解,您能够更好地理解和使用LangChain中的invoke()
方法,构建出强大而灵活的语言模型应用!
BaseChatModel
目录
- 类简介
- 核心方法
invoke()
ainvoke()
stream()
astream()
astream_events()
batch()
abatch()
batch_as_completed()
abatch_as_completed()
- 声明方法
bind_tools()
with_structured_output()
with_retry()
with_fallbacks()
configurable_fields()
configurable_alternatives()
with_types()
- 创建自定义聊天模型
- 属性和参数
cache
callback_manager
callbacks
custom_get_token_ids
disable_streaming
metadata
rate_limiter
tags
verbose
- 辅助方法
get_num_tokens()
get_num_tokens_from_messages()
get_token_ids()
- 示例代码
- 绑定监听器
- 重试机制
- 结构化输出
- 并行批量处理
- 总结
1. 类简介
BaseChatModel
是 LangChain 中所有聊天模型的基类。它定义了一系列方法,这些方法用于与底层模型交互,包括同步和异步调用、流式处理、批量处理等。通过继承 BaseChatModel
,开发者可以创建自定义的聊天模型,扩展其功能以满足特定需求。
2. 核心方法
2.1 invoke()
方法
描述: invoke()
是 BaseChatModel
中的一个核心同步方法,用于将单个输入传递给聊天模型,并返回生成的响应。
签名:
invoke(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) → BaseMessage
参数:
input
(LanguageModelInput
): 要传递给模型的输入,可以是字符串、消息列表或PromptValue
对象。config
(RunnableConfig | None
): 可选的配置参数,用于定制调用行为。stop
(list[str] | None
): 可选的停止词列表,当生成的文本包含这些词时,模型将停止生成。**kwargs
(Any
): 其他可选的关键字参数。
返回值:
BaseMessage
: 模型生成的单个响应消息。
示例:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# 初始化聊天模型和提示模板
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
chain = prompt | model
# 调用 invoke 方法
input_data = {"topic": "bears"}
result = chain.invoke(input_data)
print(result.content)
# 输出示例: "Why don't bears wear shoes? \n\nBecause they have bear feet!"
2.2 ainvoke()
方法
描述: ainvoke()
是 invoke()
的异步版本,用于在异步环境中调用聊天模型。
签名:
async ainvoke(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) → BaseMessage
参数和返回值: 与 invoke()
方法相同,但返回的是一个异步的 BaseMessage
对象。
示例:
import asyncio
async def get_joke():
result = await chain.ainvoke({"topic": "cats"})
print(result.content)
asyncio.run(get_joke())
# 输出示例: "Why don't cats play poker in the wild?\n\nToo many cheetahs!"
2.3 stream()
方法
描述: stream()
方法用于同步地流式获取模型的输出,适用于需要逐步处理输出的场景。
签名:
stream(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) → Iterator[BaseMessageChunk]
返回值:
Iterator[BaseMessageChunk]
: 一个迭代器,逐步返回模型生成的输出片段。
示例:
for chunk in chain.stream({"topic": "bears"}):
print(chunk.content, end="", flush=True)
# 输出示例:
# Sure, here's a bear-themed joke for you:
#
# Why don't bears wear shoes?
#
# Because they already have bear feet!
2.4 astream()
方法
描述: astream()
是 stream()
的异步版本,用于在异步环境中流式获取模型的输出。
签名:
async astream(input: LanguageModelInput, config: RunnableConfig | None = None, *, stop: list[str] | None = None, **kwargs: Any) → AsyncIterator[BaseMessageChunk]
返回值:
AsyncIterator[BaseMessageChunk]
: 一个异步迭代器,逐步返回模型生成的输出片段。
示例:
async def stream_joke():
async for chunk in chain.astream({"topic": "bears"}):
print(chunk.content, end="", flush=True)
asyncio.run(stream_joke())
# 输出示例与同步流式调用相同
2.5 astream_events()
方法
描述: astream_events()
方法用于生成事件流,提供关于模型执行进度的实时信息,包括中间结果。
签名:
async astream_events(input: Any, config: RunnableConfig | None = None, *, version: Literal['v1', 'v2'], include_names: Sequence[str] | None = None, include_types: Sequence[str] | None = None, include_tags: Sequence[str] | None = None, exclude_names: Sequence[str] | None = None, exclude_types: Sequence[str] | None = None, exclude_tags: Sequence[str] | None = None, **kwargs: Any) → AsyncIterator[StandardStreamEvent | CustomStreamEvent]
返回值:
AsyncIterator[StandardStreamEvent | CustomStreamEvent]
: 一个异步迭代器,逐步返回标准或自定义的事件。
示例:
async for event in chain.astream_events("tell me a joke about bears", version="v2"):
print(event)
# 输出示例:
# {
# "event": "on_chat_model_start",
# "name": "ChatOpenAI",
# "run_id": "some-unique-id",
# "data": {"messages": [["SystemMessage", "HumanMessage"]]},
# ...
# }
2.6 batch()
方法
描述: batch()
方法用于同步地批量处理多个输入,提高处理效率。
签名:
batch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) → list[Output]
返回值:
list[Output]
: 模型生成的多个输出消息列表。
示例:
inputs = [{"topic": "bears"}, {"topic": "cats"}]
results = chain.batch(inputs)
for res in results:
print(res.content)
# 输出示例:
# "Why don't bears wear shoes? \n\nBecause they have bear feet!"
# "Why don't cats play poker in the wild?\n\nToo many cheetahs!"
2.7 abatch()
方法
描述: abatch()
是 batch()
的异步版本,用于在异步环境中批量处理多个输入。
签名:
async abatch(inputs: list[Input], config: RunnableConfig | list[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) → list[Output]
返回值:
list[Output]
: 模型生成的多个输出消息列表。
示例:
async def batch_jokes():
inputs = [{"topic": "bears"}, {"topic": "cats"}]
results = await chain.abatch(inputs)
for res in results:
print(res.content)
asyncio.run(batch_jokes())
# 输出示例:
# "Why don't bears wear shoes?\n\nBecause they have bear feet!"
# "Why don't cats play poker in the wild?\n\nToo many cheetahs!"
2.8 batch_as_completed()
方法
描述: batch_as_completed()
方法用于同步地并行处理多个输入,并在每个输入处理完成后立即返回结果。
签名:
batch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) → Iterator[tuple[int, Output | Exception]]
返回值:
Iterator[tuple[int, Output | Exception]]
: 一个迭代器,逐步返回每个输入的索引和对应的输出或异常。
示例:
inputs = [{"topic": "bears"}, {"topic": "cats"}, {"topic": "technology"}]
for idx, result in chain.batch_as_completed(inputs):
if isinstance(result, Exception):
print(f"Input {idx} raised an exception: {result}")
else:
print(f"Input {idx} result: {result.content}")
# 输出示例:
# Input 0 result: "Why don't bears wear shoes?\n\nBecause they have bear feet!"
# Input 1 result: "Why don't cats play poker in the wild?\n\nToo many cheetahs!"
# Input 2 result: "Why did the computer go to the doctor?\n\nBecause it had a virus!"
2.9 abatch_as_completed()
方法
描述: abatch_as_completed()
是 batch_as_completed()
的异步版本,用于在异步环境中并行处理多个输入,并在每个输入处理完成后立即返回结果。
签名:
async abatch_as_completed(inputs: Sequence[Input], config: RunnableConfig | Sequence[RunnableConfig] | None = None, *, return_exceptions: bool = False, **kwargs: Any | None) → AsyncIterator[tuple[int, Output | Exception]]
返回值:
AsyncIterator[tuple[int, Output | Exception]]
: 一个异步迭代器,逐步返回每个输入的索引和对应的输出或异常。
示例:
async def batch_as_completed_jokes():
inputs = [{"topic": "bears"}, {"topic": "cats"}, {"topic": "technology"}]
async for idx, result in chain.abatch_as_completed(inputs):
if isinstance(result, Exception):
print(f"Input {idx} raised an exception: {result}")
else:
print(f"Input {idx} result: {result.content}")
asyncio.run(batch_as_completed_jokes())
# 输出示例:
# Input 0 result: "Why don't bears wear shoes?\n\nBecause they have bear feet!"
# Input 1 result: "Why don't cats play poker in the wild?\n\nToo many cheetahs!"
# Input 2 result: "Why did the computer go to the doctor?\n\nBecause it had a virus!"
3. 声明方法
除了核心的同步和异步调用方法,BaseChatModel
还提供了一些声明方法,用于创建其他可运行对象(Runnable),扩展模型的功能。
3.1 bind_tools()
方法
描述: 创建一个可以调用工具(tools)的聊天模型。
签名:
bind_tools(tools: Sequence[Dict[str, Any] | type | Callable | BaseTool], **kwargs: Any) → Runnable[LanguageModelInput, BaseMessage]
参数:
tools
(Sequence[Union[Dict[str, Any], type, Callable, BaseTool]]
): 要绑定到模型的工具序列。**kwargs
(Any
): 其他可选的关键字参数。
返回值:
Runnable[LanguageModelInput, BaseMessage]
: 一个新的 Runnable 对象,可以调用指定的工具。
示例:
from langchain_core.runnables import RunnableLambda
from langchain_core.callbacks.manager import adispatch_custom_event
from langchain_core.tools import BaseTool
@tool
def some_tool(x: int, y: str) -> dict:
'''Some_tool.'''
return {"x": x, "y": y}
llm = ChatOpenAI(model_name="gpt-4")
structured_llm = llm.bind_tools(tools=[some_tool])
result = structured_llm.invoke({"x": 1, "y": "2"})
print(result.content)
# 输出示例: '{"x": 1, "y": "2"}'
3.2 with_structured_output()
方法
描述: 创建一个包装器,使用指定的 schema 结构化模型输出。
签名:
with_structured_output(schema: Dict | type, *, include_raw: bool = False, **kwargs: Any) → Runnable[LanguageModelInput, Dict | BaseModel]
参数:
schema
(Union[Dict, type]
): 输出的 schema,可以是 JSON Schema、Pydantic 类等。include_raw
(bool
): 是否包含原始模型响应。**kwargs
(Any
): 其他可选的关键字参数。
返回值:
Runnable[LanguageModelInput, Union[Dict, BaseModel]]
: 一个新的 Runnable 对象,输出将按照指定的 schema 进行结构化。
示例:
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
llm = ChatOpenAI(model_name="gpt-4", temperature=0)
structured_llm = llm.with_structured_output(AnswerWithJustification)
result = structured_llm.invoke("What weighs more, a pound of bricks or a pound of feathers?")
print(result)
# 输出示例:
# AnswerWithJustification(
# answer='They weigh the same.',
# justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'
# )
3.3 with_retry()
方法
描述: 创建一个包装器,在调用模型时遇到异常时进行重试。
签名:
with_retry(*, retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,), wait_exponential_jitter: bool = True, stop_after_attempt: int = 3) → Runnable[Input, Output]
参数:
retry_if_exception_type
(tuple[type[BaseException], ...]
): 要重试的异常类型。wait_exponential_jitter
(bool
): 是否在重试之间添加抖动。stop_after_attempt
(int
): 最大重试次数。
返回值:
Runnable[Input, Output]
: 一个新的 Runnable 对象,具有重试机制。
示例:
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count += 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
3.4 with_fallbacks()
方法
描述: 为 Runnable 添加回退机制,当原始 Runnable 调用失败时,依次尝试回退的 Runnable。
签名:
with_fallbacks(fallbacks: Sequence[Runnable[Input, Output]], *, exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,), exception_key: Optional[str] = None) → RunnableWithFallbacksT[Input, Output]
参数:
fallbacks
(Sequence[Runnable[Input, Output]]
): 回退的 Runnable 序列。exceptions_to_handle
(tuple[type[BaseException], ...]
): 要处理的异常类型。exception_key
(Optional[str]
): 如果指定字符串,则将处理的异常作为输入的一部分传递给回退的 Runnable。
返回值:
RunnableWithFallbacksT[Input, Output]
: 一个新的 Runnable 对象,具备回退机制。
示例:
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print(''.join(runnable.stream({}))) # 输出: foo bar
3.5 configurable_fields()
方法
描述: 配置 Runnable 的特定字段,以便在运行时通过 RunnableConfig
进行配置。
签名:
configurable_fields(**kwargs: ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption) → RunnableSerializable
参数:
**kwargs
(ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption
): 要配置的字段。
返回值:
RunnableSerializable
: 一个新的 Runnable 对象,具有可配置的字段。
示例:
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# 使用默认的 max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# 配置 max_tokens = 200
print("max_tokens_200: ", model.with_config(
configurable={"output_token_number": 200}
).invoke("tell me something about chess").content)
3.6 configurable_alternatives()
方法
描述: 为 Runnable 配置替代模型,以便在运行时通过 RunnableConfig
进行切换。
签名:
configurable_alternatives(which: ConfigurableField, *, default_key: str = 'default', prefix_keys: bool = False, **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]) → RunnableSerializable
参数:
which
(ConfigurableField
): 用于选择替代模型的字段。default_key
(str
): 默认使用的替代模型键。prefix_keys
(bool
): 是否为键添加前缀。**kwargs
(Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
): 替代模型的键值对。
返回值:
RunnableSerializable
: 一个新的 Runnable 对象,具有可配置的替代模型。
示例:
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-3-sonnet-20240229"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI()
)
# 使用默认模型 ChatAnthropic
print(model.invoke("which organization created you?").content)
# 切换到 ChatOpenAI 模型
print(
model.with_config(
configurable={"llm": "openai"}
).invoke("which organization created you?").content
)
3.7 with_types()
方法
描述: 为 Runnable 绑定输入和输出的类型。
签名:
with_types(*, input_type: type[Input] | None = None, output_type: type[Output] | None = None) → Runnable[Input, Output]
参数:
input_type
(type[Input] | None
): 要绑定的输入类型。output_type
(type[Output] | None
): 要绑定的输出类型。
返回值:
Runnable[Input, Output]
: 一个新的 Runnable 对象,具有绑定的输入和输出类型。
示例:
from typing import List
from pydantic import BaseModel
class InputModel(BaseModel):
topic: str
class OutputModel(BaseModel):
joke: str
structured_llm = llm.with_types(input_type=InputModel, output_type=OutputModel)
input_data = InputModel(topic="bears")
result = structured_llm.invoke(input_data)
print(result.joke)
4. 创建自定义聊天模型
要创建自定义的聊天模型,开发者需要继承 BaseChatModel
并实现其抽象方法和属性。
必要的方法和属性
方法/属性 | 描述 | 必须实现 |
---|---|---|
_generate | 使用提示生成聊天结果。 | 必须 |
_llm_type (property) | 唯一标识模型类型,用于日志记录。 | 必须 |
_identifying_params (property) | 表示模型参数化的追踪信息。 | 可选 |
_stream | 实现流式输出。 | 可选 |
_agenerate | 实现原生异步方法。 | 可选 |
_astream | 实现 _stream 的异步版本。 | 可选 |
示例:自定义聊天模型
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import BaseMessage
from typing import List, Any
class CustomChatModel(BaseChatModel):
@property
def _llm_type(self) -> str:
return "custom_chat_model"
def _generate(self, prompt: str, stop: List[str] | None = None, **kwargs: Any) -> BaseMessage:
# 自定义生成逻辑
response = f"Custom response to: {prompt}"
return BaseMessage(content=response)
async def _agenerate(self, prompt: str, stop: List[str] | None = None, **kwargs: Any) -> BaseMessage:
# 自定义异步生成逻辑
response = f"Async custom response to: {prompt}"
return BaseMessage(content=response)
5. 属性和参数
BaseChatModel
类提供了一系列属性和参数,用于定制模型的行为。
5.1 cache
参数
描述: 是否缓存模型的响应。
类型: BaseCache | bool | None
行为:
True
: 使用全局缓存。False
: 不使用缓存。None
: 如果设置了全局缓存,则使用全局缓存,否则不使用。BaseCache
实例: 使用提供的缓存实例。
注意: 当前不支持对流式方法进行缓存。
5.2 callback_manager
参数
描述: 回调管理器,用于添加到运行跟踪中。
类型: BaseCallbackManager | None
状态: 自版本 0.1.7 起已弃用,建议使用 callbacks
参数代替。
5.3 callbacks
参数
描述: 用于添加到运行跟踪中的回调。
类型: Callbacks = None
5.4 custom_get_token_ids
参数
描述: 可选的编码器,用于计数 token。
类型: Callable[[str], list[int]] | None
5.5 disable_streaming
参数
描述: 是否禁用模型的流式输出。
类型: bool | Literal['tool_calling']
行为:
True
: 始终禁用流式输出,stream()
和astream()
将调用invoke()
和ainvoke()
。'tool_calling'
: 仅在模型调用时使用tools
关键字参数时禁用流式输出。False
(默认): 如果可用,则始终使用流式输出。
5.6 metadata
参数
描述: 要添加到运行跟踪中的元数据。
类型: dict[str, Any] | None
5.7 rate_limiter
参数
描述: 可选的速率限制器,用于限制请求数量。
类型: BaseRateLimiter | None
5.8 tags
参数
描述: 要添加到运行跟踪中的标签。
类型: list[str] | None
5.9 verbose
参数
描述: 是否打印响应文本。
类型: bool [Optional]
6. 辅助方法
6.1 get_num_tokens()
方法
描述: 获取文本中的 token 数量,适用于检查输入是否符合模型的上下文窗口限制。
签名:
get_num_tokens(text: str) → int
参数:
text
(str
): 要计数的文本。
返回值:
int
: 文本中的 token 数量。
示例:
num_tokens = model.get_num_tokens("Hello, how are you?")
print(num_tokens)
# 输出示例: 6
6.2 get_num_tokens_from_messages()
方法
描述: 获取消息列表中的 token 数量,用于检查输入是否符合模型的上下文窗口限制。
签名:
get_num_tokens_from_messages(messages: list[BaseMessage], tools: Sequence | None = None) → int
参数:
messages
(list[BaseMessage]
): 要计数的消息列表。tools
(Sequence | None
): 如果提供,将工具转换为工具 schema 进行计数。
返回值:
int
: 消息中的 token 数量总和。
注意: 基础实现忽略了工具 schema。
示例:
from langchain_core.messages import BaseMessage
messages = [BaseMessage(content="Hello!"), BaseMessage(content="How can I assist you today?")]
num_tokens = model.get_num_tokens_from_messages(messages)
print(num_tokens)
# 输出示例: 9
6.3 get_token_ids()
方法
描述: 返回文本中 token 的有序 ID 列表。
签名:
get_token_ids(text: str) → list[int]
参数:
text
(str
): 要转换为 token ID 的文本。
返回值:
list[int]
: 文本中 token 的 ID 列表,按出现顺序排列。
示例:
token_ids = model.get_token_ids("Hello")
print(token_ids)
# 输出示例: [15496]
7. 示例代码
7.1 绑定监听器
描述: 通过 with_listeners()
方法,为 Runnable 绑定生命周期监听器,以在运行开始、结束或出错时执行特定的回调函数。
示例:
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
import asyncio
def format_t(t):
return time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(t))
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
async def fn_start(run_obj: Run):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Run):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
输出示例:
on start callback starts at 2024-05-16T14:20:29
on start callback starts at 2024-05-16T14:20:29
on start callback ends at 2024-05-16T14:20:32
on start callback ends at 2024-05-16T14:20:32
Runnable[2s]: starts at 2024-05-16T14:20:32
Runnable[3s]: starts at 2024-05-16T14:20:32
Runnable[2s]: ends at 2024-05-16T14:20:34
on end callback starts at 2024-05-16T14:20:34
Runnable[3s]: ends at 2024-05-16T14:20:35
on end callback starts at 2024-05-16T14:20:35
on end callback ends at 2024-05-16T14:20:37
on end callback ends at 2024-05-16T14:20:39
7.2 重试机制
描述: 通过 with_retry()
方法,为 Runnable 添加重试机制,在遇到指定类型的异常时自动重试。
示例:
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count += 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
解释:
- 定义了一个简单的 Lambda 函数
_lambda
,当输入为1
时抛出ValueError
异常。 - 使用
with_retry()
方法配置最多重试 2 次,并指定仅对ValueError
类型的异常进行重试。 - 调用
invoke(1)
,由于输入为1
,会抛出异常并触发重试机制。 - 最终,
count
的值为2
,表明重试机制正常工作。
7.3 结构化输出
描述: 通过 with_structured_output()
方法,使用指定的 schema 结构化模型的输出,使其符合特定的数据结构。
示例:
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
llm = ChatModel(model="model-name", temperature=0)
structured_llm = llm.with_structured_output(AnswerWithJustification)
result = structured_llm.invoke("What weighs more, a pound of bricks or a pound of feathers?")
print(result)
# 输出示例:
# AnswerWithJustification(
# answer='They weigh the same.',
# justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'
# )
7.4 并行批量处理
描述: 通过 RunnableParallel
实现并行处理多个 Runnable,提高处理效率。
示例:
from langchain_core.runnables import RunnableParallel
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
chain1 = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
chain2 = ChatPromptTemplate.from_template("write a short (2 line) poem about {topic}") | model
combined = RunnableParallel(joke=chain1, poem=chain2)
# 同步调用
result = combined.invoke({"topic": "bears"})
print(result)
# 输出示例:
# {
# 'joke': AIMessage(content="Sure, here's a bear-related joke for you:\n\nWhy did the bear bring a ladder to the bar?\n\nBecause he heard the drinks were on the house!"),
# 'poem': AIMessage(content="In wilderness they roam,\nMajestic strength, nature's throne.")
# }
# 批量调用
batch_results = combined.batch([{"topic": "bears"}, {"topic": "cats"}])
for res in batch_results:
print(res)
# 输出示例:
# {
# 'joke': AIMessage(content="Sure, here's a bear joke for you:\n\nWhy don't bears wear shoes?\n\nBecause they have bear feet!"),
# 'poem': AIMessage(content="Majestic bears roam,\nNature's strength, beauty shown.")
# }
# {
# 'joke': AIMessage(content="Why don't cats play poker in the wild?\n\nToo many cheetahs!"),
# 'poem': AIMessage(content="Whiskers dance, eyes aglow,\nCats embrace the night's gentle flow.")
# }
8. 总结
BaseChatModel
类是 LangChain 中构建聊天模型的基础类,提供了一系列核心和声明方法,支持同步和异步调用、流式处理、批量处理、重试机制、回退机制、结构化输出等功能。通过继承和扩展 BaseChatModel
,开发者可以创建功能强大且灵活的自定义聊天模型,满足不同应用场景的需求。
关键要点:
- 核心方法:包括
invoke()
,ainvoke()
,stream()
,astream()
等,用于与模型进行同步和异步交互。 - 声明方法:如
with_retry()
,with_fallbacks()
,with_structured_output()
等,用于增强模型的功能。 - 扩展性:通过继承
BaseChatModel
,开发者可以实现自定义的生成逻辑和模型行为。 - 配置与定制:支持通过参数和方法配置模型的行为,如缓存、速率限制、标签、元数据等。
MessagePlaceholder
MessagesPlaceholder
在提示模板中占据一个位置,用于动态地插入传递给 invoke
方法的消息列表。在本例中,variable_name="messages"
表示占位符将被替换为 state
中键为 "messages"
的值。