前言
LlamaIndex
中的工作流是一个事件驱动的抽象,用于将多个事件链接在一起。工作流由步骤组成,每个步骤负责处理特定的事件类型并发出新事件。
LlamaIndex
中的工作流通过使用@step
装饰器装饰函数来工作。这用于推断每个验证工作流的输入和输出类型,并确保每个步骤仅在可接受的事件准备就绪时运行。
你可以创建一个工作流来做任何事情!构建代理、RAG
流、提取流或任何您想要的东西。
工作流也是自动检测的,因此您可以使用像alize phoenix这样的工具对每个步骤进行观察。(注意:可观察性适用于利用新仪器系统的集成。用法可能会有所不同。)
工作流使异步成为一等公民,本页假定您在异步环境中运行。这对您来说意味着正确设置async
代码。如果你已经在FastAPI
之类的服务器上运行,或者在笔记本上运行,你可以自由地使用await
!
如果您正在运行自己的python
脚本,最佳实践是使用单个异步入口点。
async def main():
w = MyWorkflow(...)
result = await w.run(...)
print(result)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
快速开始
安装依赖
在自己的本地环境中安装一下依赖
pip install llama-index-core
pip install llama-index-llms-dashscope
代码实践
作为一个说明性示例,让我们写一个简短的工作流程,第一步生成了一个笑话,第二步对生成的笑话进行评价。
代码如下:
import asyncio
import os
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step, )
from llama_index.llms.dashscope import DashScope, DashScopeGenerationModels
api_key = os.getenv('DASHSCOPE_API_KEY')
print('api_key', api_key)
class JokeEvent(Event):
joke: str
class JokeFlow(Workflow):
llm = DashScope(
model_name=DashScopeGenerationModels.QWEN_TURBO, api_key=api_key, max_tokens=512,
incremental_output=True
)
@step
async def generate_joke(self, ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"写下你最棒的笑话 {topic}."
response = await self.llm.acomplete(prompt)
print(str(response))
return JokeEvent(joke=str(response))
@step
async def critique_joke(self, ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = f"对下面的笑话进行全面的分析和评论: {joke}"
responses = await self.llm.astream_complete(prompt,incremental_output=True)
return StopEvent(result=responses)
# 假设 w.run 是一个异步方法
async def main():
#draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")
w = JokeFlow(timeout=60, verbose=True)
responses = await w.run(topic="海盗")
async for response in responses:
print(response.delta, end="")
#draw_most_recent_execution(w, filename="joke_flow_recent.html")
asyncio.run(main())
代码解读
定义工作流事件
class JokeEvent(Event):
joke: str
JokeEvent事件是用户定义的pydantic对象。您可以控制属性和任何其他辅助方法。在这种情况下,我们的工作流依赖于单个用户定义的事件JokeEvent。
创建工作流类
class JokeFlow(Workflow):
#llm = OpenAI(model="gpt-4o-mini")
llm = DashScope(model_name=DashScopeGenerationModels.QWEN_TURBO, api_key=api_key, max_tokens=512)
...
我们的工作流是通过子类化workflow类来实现的。这里可以使用Open AI的llm实例,也可以用阿里云DashScope的实例。
工作流入口点
class JokeFlow(Workflow):
...
@step
async def generate_joke(self, ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"写下你最棒的笑话 {topic}."
response = await self.llm.acomplete(prompt)
return JokeEvent(joke=str(response))
...
在这里,我们来到了工作流的入口点。虽然事件是使用定义的,但有两个特殊情况的事件,即StartEvent
和StopEvent
。这里,StartEvent
表示向何处发送初始工作流输入。
StartEvent
是一个有点特殊的对象,因为它可以保存任意属性。在这里,我们使用ev访问该主题。主题,如果它不在那里,则会引发错误。您还可以执行ev.get("topic")
来处理属性可能不存在而不引发错误的情况。
此时,您可能已经注意到,我们没有明确地告诉工作流哪些步骤处理哪些事件。相反,@step
装饰器用于推断每个步骤的输入和输出类型。此外,这些推断的输入和输出类型还用于在运行之前为您验证工作流是否有效!
工作流退出点
class JokeFlow(Workflow):
...
@step
async def critique_joke(self, ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
response = await self.llm.acomplete(prompt)
return StopEvent(result=str(response))
...
在这里,我们有了工作流中的第二步,也是最后一步。我们知道这是最后一步,因为它返回了特殊的StopEvent
。当工作流遇到返回的StopEvent
时,它立即停止工作流并返回结果。
在本例中,结果是一个字符串,但它可以是一个字典、列表或任何其他对象。
运行工作流
async def main():
#draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")
responses = await joke_flow.run(topic="海盗")
async for response in responses:
print(response.delta, end="")
#draw_most_recent_execution(w, filename="joke_flow_recent.html")
asyncio.run(main())
- 替换代码中的
api_key
为自己的 - 代码中注释的代码是可以生成工作流流程图html
- run()方法是异步的,所以我们在这里使用await来等待结果。
- 工作流的最后一步退出点我使用的流式响应
装饰非类功能
除了上述创建一个流程类实现流程外,还可以使用装饰器加方法实现,代码如下:
import asyncio
import os
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step, )
from llama_index.llms.dashscope import DashScope, DashScopeGenerationModels
api_key = os.getenv('DASHSCOPE_API_KEY')
print('api_key', api_key)
class JokeEvent(Event):
joke: str
joke_flow = Workflow(timeout=60, verbose=True)
llm = DashScope(
model_name=DashScopeGenerationModels.QWEN_TURBO, api_key=api_key, max_tokens=512
)
@step(workflow=joke_flow)
async def generate_joke(ev: StartEvent) -> JokeEvent:
topic = ev.topic
prompt = f"写下你最棒的笑话 {topic}."
response = await llm.acomplete(prompt)
print(str(response))
return JokeEvent(joke=str(response))
@step(workflow=joke_flow)
async def critique_joke(ev: JokeEvent) -> StopEvent:
joke = ev.joke
prompt = (
f"对下面的笑话进行全面的分析和评论: {joke}"
)
response = await llm.astream_complete(prompt)
return StopEvent(result=response)
# 假设 w.run 是一个异步方法
async def main():
#draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")
responses = await joke_flow.run(topic="海盗")
async for response in responses:
print(response.delta, end="")
#draw_most_recent_execution(w, filename="joke_flow_recent.html")
asyncio.run(main())
- 替换代码中的
api_key
为自己的