LlamaIndex核心概念Workflows工作流代码实战教程

news2024/10/11 21:23:37

前言

LlamaIndex中的工作流是一个事件驱动的抽象,用于将多个事件链接在一起。工作流由步骤组成,每个步骤负责处理特定的事件类型并发出新事件。

LlamaIndex中的工作流通过使用@step装饰器装饰函数来工作。这用于推断每个验证工作流的输入和输出类型,并确保每个步骤仅在可接受的事件准备就绪时运行。

你可以创建一个工作流来做任何事情!构建代理、RAG流、提取流或任何您想要的东西。

工作流也是自动检测的,因此您可以使用像alize phoenix这样的工具对每个步骤进行观察。(注意:可观察性适用于利用新仪器系统的集成。用法可能会有所不同。)

工作流使异步成为一等公民,本页假定您在异步环境中运行。这对您来说意味着正确设置async代码。如果你已经在FastAPI之类的服务器上运行,或者在笔记本上运行,你可以自由地使用await !

如果您正在运行自己的python脚本,最佳实践是使用单个异步入口点。

async def main():
    w = MyWorkflow(...)
    result = await w.run(...)
    print(result)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

快速开始

安装依赖

在自己的本地环境中安装一下依赖

pip install llama-index-core
pip install llama-index-llms-dashscope

代码实践

作为一个说明性示例,让我们写一个简短的工作流程,第一步生成了一个笑话,第二步对生成的笑话进行评价。
代码如下:

import asyncio
import os

from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step, )
from llama_index.llms.dashscope import DashScope, DashScopeGenerationModels

api_key = os.getenv('DASHSCOPE_API_KEY')
print('api_key', api_key)


class JokeEvent(Event):
    joke: str


class JokeFlow(Workflow):
    llm = DashScope(
        model_name=DashScopeGenerationModels.QWEN_TURBO, api_key=api_key, max_tokens=512,
        incremental_output=True
    )

    @step
    async def generate_joke(self, ev: StartEvent) -> JokeEvent:
        topic = ev.topic

        prompt = f"写下你最棒的笑话 {topic}."
        response = await self.llm.acomplete(prompt)
        print(str(response))
        return JokeEvent(joke=str(response))

    @step
    async def critique_joke(self, ev: JokeEvent) -> StopEvent:
        joke = ev.joke

        prompt = f"对下面的笑话进行全面的分析和评论: {joke}"
        responses = await self.llm.astream_complete(prompt,incremental_output=True)
        return StopEvent(result=responses)


# 假设 w.run 是一个异步方法
async def main():
    #draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")
    w = JokeFlow(timeout=60, verbose=True)
    responses = await w.run(topic="海盗")
    async for response in responses:
        print(response.delta, end="")
    #draw_most_recent_execution(w, filename="joke_flow_recent.html")


asyncio.run(main())

代码解读

定义工作流事件

class JokeEvent(Event):
    joke: str

JokeEvent事件是用户定义的pydantic对象。您可以控制属性和任何其他辅助方法。在这种情况下,我们的工作流依赖于单个用户定义的事件JokeEvent。

创建工作流类

class JokeFlow(Workflow):
    #llm = OpenAI(model="gpt-4o-mini")
    llm = DashScope(model_name=DashScopeGenerationModels.QWEN_TURBO, api_key=api_key, max_tokens=512)
    ...

我们的工作流是通过子类化workflow类来实现的。这里可以使用Open AI的llm实例,也可以用阿里云DashScope的实例。

工作流入口点

class JokeFlow(Workflow):
    ...

    @step
    async def generate_joke(self, ev: StartEvent) -> JokeEvent:
        topic = ev.topic

        prompt = f"写下你最棒的笑话 {topic}."
        response = await self.llm.acomplete(prompt)
        return JokeEvent(joke=str(response))

    ...

在这里,我们来到了工作流的入口点。虽然事件是使用定义的,但有两个特殊情况的事件,即StartEventStopEvent。这里,StartEvent表示向何处发送初始工作流输入。

StartEvent是一个有点特殊的对象,因为它可以保存任意属性。在这里,我们使用ev访问该主题。主题,如果它不在那里,则会引发错误。您还可以执行ev.get("topic")来处理属性可能不存在而不引发错误的情况。

此时,您可能已经注意到,我们没有明确地告诉工作流哪些步骤处理哪些事件。相反,@step装饰器用于推断每个步骤的输入和输出类型。此外,这些推断的输入和输出类型还用于在运行之前为您验证工作流是否有效!

工作流退出点

class JokeFlow(Workflow):
    ...

    @step
    async def critique_joke(self, ev: JokeEvent) -> StopEvent:
        joke = ev.joke

        prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
        response = await self.llm.acomplete(prompt)
        return StopEvent(result=str(response))

    ...

在这里,我们有了工作流中的第二步,也是最后一步。我们知道这是最后一步,因为它返回了特殊的StopEvent。当工作流遇到返回的StopEvent时,它立即停止工作流并返回结果。

在本例中,结果是一个字符串,但它可以是一个字典、列表或任何其他对象。

运行工作流

async def main():
    #draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")
    responses = await joke_flow.run(topic="海盗")
    async for response in responses:
        print(response.delta, end="")
    #draw_most_recent_execution(w, filename="joke_flow_recent.html")


asyncio.run(main())
  • 替换代码中的api_key 为自己的
  • 代码中注释的代码是可以生成工作流流程图html
  • run()方法是异步的,所以我们在这里使用await来等待结果。
  • 工作流的最后一步退出点我使用的流式响应

装饰非类功能

除了上述创建一个流程类实现流程外,还可以使用装饰器加方法实现,代码如下:

import asyncio
import os

from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step, )
from llama_index.llms.dashscope import DashScope, DashScopeGenerationModels

api_key = os.getenv('DASHSCOPE_API_KEY')
print('api_key', api_key)


class JokeEvent(Event):
    joke: str


joke_flow = Workflow(timeout=60, verbose=True)

llm = DashScope(
    model_name=DashScopeGenerationModels.QWEN_TURBO, api_key=api_key, max_tokens=512
)


@step(workflow=joke_flow)
async def generate_joke(ev: StartEvent) -> JokeEvent:
    topic = ev.topic

    prompt = f"写下你最棒的笑话 {topic}."

    response = await llm.acomplete(prompt)
    print(str(response))
    return JokeEvent(joke=str(response))


@step(workflow=joke_flow)
async def critique_joke(ev: JokeEvent) -> StopEvent:
    joke = ev.joke

    prompt = (
        f"对下面的笑话进行全面的分析和评论: {joke}"
    )
    response = await llm.astream_complete(prompt)
    return StopEvent(result=response)


# 假设 w.run 是一个异步方法
async def main():
    #draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")
    responses = await joke_flow.run(topic="海盗")
    async for response in responses:
        print(response.delta, end="")
    #draw_most_recent_execution(w, filename="joke_flow_recent.html")


asyncio.run(main())

  • 替换代码中的api_key 为自己的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2205889.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

鸿蒙NEXT开发-面试题库(最新)

注意:博主有个鸿蒙专栏,里面从上到下有关于鸿蒙next的教学文档,大家感兴趣可以学习下 如果大家觉得博主文章写的好的话,可以点下关注,博主会一直更新鸿蒙next相关知识 专栏地址: https://blog.csdn.net/qq_56760790/…

LUCEDA IPKISS Tutorial 77:在版图一定范围内填充dummy

案例分享:在给定的Shape内填充dummy 所有代码如下: from si_fab import all as pdk from ipkiss3 import all as i3 from shapely.geometry import Polygon, MultiPolygon import numpy as np import matplotlib.pyplot as pltclass CellFilledWithCon…

【AI换脸】Rope一键整合包,实现视频多人实时换脸

随着人工智能技术的发展,人们越来越注重人机交互的趣味性和实用性。AI换脸技术正是在这种背景下兴起的一种创新应用。Rope换脸工具以其易用性和卓越的效果,成为了众多用户和专业人士青睐的对象。 Rope是什么? Rope是一款开源的deepfake软件&…

Redis 的安装与部署(图文)

前言 Redis 暂不支持Windows 系统,官网上只能下载Linux 环境的安装包。但是启用WSL2 就可以在Windows 上运行Linux 二进制文件。[要使此方法工作,需要运行Windows 10 2004版及更高版本或Windows 11]。本文在CentOS Linux 系统上安装最新版Redis&#xf…

健身房预约小程序开发,高效管理健身场馆!

随着社会生活的提高,健身成为了人们在日常生活中的必要选择,而健身房也随之成为了大众经常光顾的地方。健身房预约管理系统是一个便捷预约健身的平台,可以让大众灵活预约,提高健身的便捷性和服务体验。同时,健身场馆也…

JAVA 并发八股

线程与进程区别 进程是正在运行的程序的实例,进程中包含了线程,每个线程执行不同的任务 不同进程使用不同的内存空间,在当前进程下所有线程可以共享内存空间 线程更轻量,线程上下文切换成本一般上要比进程上下文切换低&#xff0…

vueJS中wowjs、animate、swiper的使用

原文关注公众号 本文演示利用swiper纵向全屏滚动 npm 安装 wow.js,安装 wow.js后animate.css会自动安装; npm install wowjs --save-dev npm 安装 animate.css animate.css文档:http://5kzx.cn/doc.html npm install animate.css --save …

Python和MATLAB及C++和Fortran胶体粒子数学材料学显微镜学微观流变学及光学计算

🎯要点 二维成像拥挤胶体粒子检测算法粒子的局部结构和动力学分析椭圆粒子成链动态过程定量分析算法小颗粒的光散射和吸收活跃物质模拟群体行为提取粒子轨迹粘弹性,计算剪切模量计算悬浮液球形粒子多体流体动力学概率规划全息图跟踪和表征粒子位置、大小…

创建docker虚拟镜像,创建启动服务脚本

进入系统命令服务目录 编辑服务 [Unit] DescriptionDocker Application Container Engine Documentationhttps://docs.docker.com Afternetwork-online.target firewalld.service Wantsnetwork-online.target [Service] Typenotify ExecStart/usr/bin/dockerd ExecReload/bin/…

Gradle 插件获取所有依赖项,类似 androidDependencies?

诉求 在打包过程中我想知道某个模块的信息,比如: 模块androidx.work:work-runtime是否被依赖?模块androidx.work:work-runtime的版本号是多少? 我们利用 Android studio 已有的任务androidDependencies,双击执行很容…

PyQt5写好的py文件生成可执行的exe文件【Nuitka】

文章目录 1.Nuitka引入2.Nuitka与Pyinstaller对比Nuitka安装 3.Nuitka指令4.参数以及作用5.多文件格式封装完成后可删除文件6.运行问题问题1问题2 1.Nuitka引入 看过我上一篇PyQt5写好的py文件生成可执行的exe文件【Pyinstaller】的应该了解到用PyQt5写的界面程序可以通过Pyins…

安卓冻屏bug案例作业分享-千里马学员wms+input实战作业

背景: 近期有学员反馈在aosp14高版本上有了一个新窗口TaskBar,这个但是有需求就是对这个TaskBar进行隐藏,所以有一个需要对这个TaskBar进行进行隐藏需求 隐藏TaskBar需求做了之后发现有如下bug: 问题复现步骤: 因…

新款示波器RTE1104罗德与施瓦茨RS RTE1102原装二手

罗德与施瓦茨R&S RTE1104触摸屏RTE1102新款示波器 R&S-RTE1000 示波器系列: RTE1022标配:200MHz带宽,2通道,5GSa/s采样率,200M存储深度,16个数字通道(选配) RTE1032标配&a…

HCIP--以太网交换安全(二)端口安全

端口安全 一、端口安全概述 1.1、端口安全概述:端口安全是一种网络设备防护措施,通过将接口学习的MAC地址设为安全地址防止非法用户通信。 1.2、端口安全原理: 类型 定义 特点 安全动态MAC地址 使能端口而未是能Stichy MAC功能是转换的…

解决PyCharm 2023 Python Packages列表为空

原因是因为没有设置镜像源 展开 > 之后,这里 点击齿轮 添加一个阿里云的源 最后还需要点击刷新 可以选择下面的任意一个国内镜像源: 清华:https://pypi.tuna.tsinghua.edu.cn/simple 阿里云:http://mirrors.aliyun.com/…

asp.net core Partial 分部视图、视图组件(core mvc 才支持)、视图、Razor组件 、razor pages

分部视图 》》》传参 》》两个东西换个名称,PartialView()>渲染视图>不带Layout 部分视图与普通视图没太大区别,它可以将重复使用的HTML内容结合起来,可以单独使用。 一般命名是在名称前面加下划线,放在/Views/Shared 目…

【cocos creator】输入框滑动条联动小组建

滑动条滑动输入框内容会改变 输入框输入,滑动条位置改变 const { ccclass, property } cc._decorator;ccclass() export default class SliderEnter extends cc.Component {property({ type: cc.Float, displayName: "最大值", tooltip: "" }…

基于Web的停车场管理系统(论文+源码)_kaic

摘要 我国经济的发展愈发迅速,车辆也随之增加的难以想象,因此车位的治理也越来越繁杂,为了方便停车位相关信息的管理,设计开发一个合理的停车位管理系统尤为重要。因而,具有信息方便读取和操作简便的停车位管理系统的设…

Java基础-知识点

文章目录 数据类型包装类型缓存池 String概述不可变的含义不可变的好处String、StringBuffer、StringBuilderString.intern() 运算参数传递float与double隐式类型转换switch 继承访问权限抽象类与接口super重写与重载**1. 重写(Override)****2. 重载(Overload)** Object类的通用…

H3C GRE over IPsec VPN 实验

H3C GRE over IPsec VPN 实验 实验拓扑 ​​ 实验需求 某企业北京总部、上海分支、武汉分支分别通过 R1,R3,R4 接入互联网,配置默认路由连通公网按照图示配置 IP 地址,R1,R3,R4 分别配置 Loopback0 口匹配感兴趣流,Loopback1 口模拟业务网段北京总部拥有固定公网地址…