数据分析ReAct工作流

news2024/11/12 15:04:38

让我用一个数据分析项目的例子来展示plan-and-execute框架的应用。这个例子会涉及数据处理、分析和可视化等任务。

from typing import List, Dict, Any
from dataclasses import dataclass
import json
from enum import Enum
import logging
from datetime import datetime

# 任务状态枚举
class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

# 任务优先级枚举
class TaskPriority(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3

# 任务定义
@dataclass
class Task:
    id: str
    name: str
    description: str
    priority: TaskPriority
    dependencies: List[str]  # 依赖的任务ID列表
    status: TaskStatus
    result: Any = None
    error: str = None
    
# 工作流执行器
class WorkflowExecutor:
    def __init__(self):
        self.tasks = {}
        self.logger = logging.getLogger(__name__)
        
    def add_task(self, task: Task):
        self.tasks[task.id] = task
        
    def get_ready_tasks(self) -> List[Task]:
        """获取所有依赖已满足的待执行任务"""
        ready_tasks = []
        for task in self.tasks.values():
            if task.status == TaskStatus.PENDING:
                dependencies_met = all(
                    self.tasks[dep_id].status == TaskStatus.COMPLETED
                    for dep_id in task.dependencies
                )
                if dependencies_met:
                    ready_tasks.append(task)
        return sorted(ready_tasks, key=lambda x: x.priority.value, reverse=True)
    
    def execute_task(self, task: Task):
        """执行单个任务"""
        task.status = TaskStatus.RUNNING
        try:
            # 这里实现具体任务的执行逻辑
            result = self.task_handlers[task.id](
                task, 
                {dep: self.tasks[dep].result for dep in task.dependencies}
            )
            task.result = result
            task.status = TaskStatus.COMPLETED
        except Exception as e:
            task.status = TaskStatus.FAILED
            task.error = str(e)
            self.logger.error(f"Task {task.id} failed: {e}")
            
    def execute_workflow(self):
        """执行整个工作流"""
        while True:
            ready_tasks = self.get_ready_tasks()
            if not ready_tasks:
                break
            
            for task in ready_tasks:
                self.execute_task(task)
                
        # 检查是否所有任务都完成
        all_completed = all(
            task.status == TaskStatus.COMPLETED 
            for task in self.tasks.values()
        )
        return all_completed

# 数据分析工作流示例
class DataAnalysisWorkflow:
    def __init__(self, data_path: str, output_path: str):
        self.data_path = data_path
        self.output_path = output_path
        self.executor = WorkflowExecutor()
        
    def plan_workflow(self):
        """规划工作流程"""
        tasks = [
            Task(
                id="load_data",
                name="加载数据",
                description="从CSV文件加载数据",
                priority=TaskPriority.HIGH,
                dependencies=[],
                status=TaskStatus.PENDING
            ),
            Task(
                id="clean_data",
                name="数据清洗",
                description="处理缺失值和异常值",
                priority=TaskPriority.HIGH,
                dependencies=["load_data"],
                status=TaskStatus.PENDING
            ),
            Task(
                id="feature_engineering",
                name="特征工程",
                description="创建新特征",
                priority=TaskPriority.MEDIUM,
                dependencies=["clean_data"],
                status=TaskStatus.PENDING
            ),
            Task(
                id="statistical_analysis",
                name="统计分析",
                description="计算基本统计指标",
                priority=TaskPriority.MEDIUM,
                dependencies=["clean_data"],
                status=TaskStatus.PENDING
            ),
            Task(
                id="visualization",
                name="数据可视化",
                description="生成图表",
                priority=TaskPriority.MEDIUM,
                dependencies=["statistical_analysis"],
                status=TaskStatus.PENDING
            ),
            Task(
                id="generate_report",
                name="生成报告",
                description="生成分析报告",
                priority=TaskPriority.LOW,
                dependencies=["visualization", "feature_engineering"],
                status=TaskStatus.PENDING
            )
        ]
        
        for task in tasks:
            self.executor.add_task(task)
            
    def register_task_handlers(self):
        """注册任务处理函数"""
        self.executor.task_handlers = {
            "load_data": self.load_data,
            "clean_data": self.clean_data,
            "feature_engineering": self.feature_engineering,
            "statistical_analysis": self.statistical_analysis,
            "visualization": self.visualization,
            "generate_report": self.generate_report
        }
        
    def load_data(self, task: Task, dependencies: Dict):
        import pandas as pd
        df = pd.read_csv(self.data_path)
        return df
        
    def clean_data(self, task: Task, dependencies: Dict):
        df = dependencies["load_data"]
        # 处理缺失值
        df = df.fillna(df.mean())
        # 处理异常值
        # ... 其他清洗逻辑
        return df
        
    def feature_engineering(self, task: Task, dependencies: Dict):
        df = dependencies["clean_data"]
        # 创建新特征
        # ... 特征工程逻辑
        return df
        
    def statistical_analysis(self, task: Task, dependencies: Dict):
        df = dependencies["clean_data"]
        stats = {
            "basic_stats": df.describe(),
            "correlations": df.corr(),
            # ... 其他统计分析
        }
        return stats
        
    def visualization(self, task: Task, dependencies: Dict):
        import matplotlib.pyplot as plt
        stats = dependencies["statistical_analysis"]
        figures = []
        # 生成可视化
        # ... 可视化逻辑
        return figures
        
    def generate_report(self, task: Task, dependencies: Dict):
        figures = dependencies["visualization"]
        df_features = dependencies["feature_engineering"]
        
        report = {
            "timestamp": datetime.now().isoformat(),
            "statistics": str(dependencies["statistical_analysis"]),
            "features": df_features.columns.tolist(),
            "figures": [f.to_json() for f in figures]
        }
        
        # 保存报告
        with open(f"{self.output_path}/report.json", "w") as f:
            json.dump(report, f, indent=2)
            
        return report
        
    def run(self):
        """运行完整的工作流"""
        self.plan_workflow()
        self.register_task_handlers()
        success = self.executor.execute_workflow()
        
        if success:
            final_report = self.executor.tasks["generate_report"].result
            print("工作流执行成功!")
            return final_report
        else:
            failed_tasks = [
                task for task in self.executor.tasks.values()
                if task.status == TaskStatus.FAILED
            ]
            print("工作流执行失败。失败的任务:")
            for task in failed_tasks:
                print(f"- {task.name}: {task.error}")
            return None

# 使用示例
def main():
    workflow = DataAnalysisWorkflow(
        data_path="data/sales_data.csv",
        output_path="output"
    )
    
    result = workflow.run()
    
    if result:
        print("分析报告已生成:", result)
    else:
        print("工作流执行失败")

if __name__ == "__main__":
    main()

这个例子展示了:

  1. 工作流框架的核心组件:
  • Task定义
  • 工作流执行器
  • 依赖管理
  • 状态追踪
  • 错误处理
  1. 实现的关键特性:
  • 自动任务规划
  • 依赖关系处理
  • 并行任务执行
  • 结果传递
  • 错误恢复
  1. 可以扩展的方向:
# 1. 添加任务重试机制
class RetryableExecutor(WorkflowExecutor):
    def execute_task(self, task: Task, max_retries: int = 3):
        retries = 0
        while retries < max_retries:
            try:
                super().execute_task(task)
                if task.status == TaskStatus.COMPLETED:
                    break
            except Exception as e:
                retries += 1
                self.logger.warning(f"Retry {retries}/{max_retries} for task {task.id}")

# 2. 添加进度监控
class MonitoredWorkflow(DataAnalysisWorkflow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.progress_callback = None
        
    def set_progress_callback(self, callback):
        self.progress_callback = callback
        
    def update_progress(self, task: Task, status: str):
        if self.progress_callback:
            self.progress_callback(task, status)

# 3. 添加中间结果缓存
class CachedExecutor(WorkflowExecutor):
    def __init__(self, cache_dir: str):
        super().__init__()
        self.cache_dir = cache_dir
        
    def get_cached_result(self, task: Task):
        cache_path = f"{self.cache_dir}/{task.id}.cache"
        if os.path.exists(cache_path):
            return pickle.load(open(cache_path, "rb"))
        return None
        
    def cache_result(self, task: Task):
        cache_path = f"{self.cache_dir}/{task.id}.cache"
        pickle.dump(task.result, open(cache_path, "wb"))
  1. 使用建议:
# 1. 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# 2. 添加性能监控
from time import time

class PerformanceMonitor:
    def __init__(self):
        self.task_times = {}
        
    def start_task(self, task_id: str):
        self.task_times[task_id] = {"start": time()}
        
    def end_task(self, task_id: str):
        self.task_times[task_id]["end"] = time()
        
    def get_task_duration(self, task_id: str):
        times = self.task_times[task_id]
        return times["end"] - times["start"]

# 3. 实现优雅的终止
import signal

class GracefulWorkflow(DataAnalysisWorkflow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.should_stop = False
        signal.signal(signal.SIGINT, self.handle_interrupt)
        
    def handle_interrupt(self, signum, frame):
        print("\nReceived interrupt signal. Cleaning up...")
        self.should_stop = True

这个框架可以用于很多场景,比如:

  • 数据处理管道
  • ETL工作流
  • 机器学习实验
  • 报告生成系统
  • 自动化测试流程

关键是要根据具体需求调整任务定义和执行逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2236638.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VS Code 插件 MySQL Shell for VS Code

https://marketplace.visualstudio.com/items?itemNameOracle.mysql-shell-for-vs-code

2024年云手机推荐榜单:高性能云手机推荐

无论是手游玩家、APP测试人员&#xff0c;还是数字营销工作者&#xff0c;云手机都为他们带来了极大的便利。本文将为大家推荐几款在市场上表现优异的云手机&#xff0c;希望这篇推荐指南可以帮助大家找到最适合自己的云手机&#xff01; 1. OgPhone云手机 OgPhone云手机是一款…

「QT」QT5程序设计专栏目录

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「QT」QT5程序设计&#x1f4da;全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasolid…

VMWARE ESXI VMFS阵列故障 服务器数据恢复

1&#xff1a;河南用户一台DELL R740 3块2.4T硬盘组的RAID5&#xff0c;早期坏了一个盘没有及时更换&#xff0c;这次又坏了一个&#xff0c;导致整组RAID5处于数据丢失的状态&#xff0c; 2&#xff1a;该服务器装的是VMware ESXI 6.7&#xff0c;用户把3块硬盘寄过来进行数据…

怎么对 PDF 添加权限密码或者修改密码-免费软件分享

序言 目前市面上有关PDF处理的工具有很多&#xff0c;不过绝大多数的PDF处理工具都需要付费使用&#xff0c;且很多厂商甚至连试用的机会也不给用户&#xff0c;偶有试用的&#xff0c;其试用版的条件也极为苛刻&#xff0c;比如只能处理前两页&#xff0c;或者只能处理非常小的…

轻松上云:使用Python与阿里云OSS实现文件上传

轻松上云&#xff1a;使用Python与阿里云OSS实现文件上传 ​ 在数字化时代&#xff0c;数据的存储和管理变得越来越重要。阿里云对象存储服务&#xff08;OSS&#xff09;提供了一种高效、安全的方式来存储和访问各种类型的文件。本文将介绍如何利用Python编程语言结合阿里云O…

通过包控制->获取包重新获取之后,需求类型列表不对

龙勤思(2017年11月27日)&#xff1a; 这个类型列表&#xff0c;我在把需求包提交到svn&#xff0c;再新建一个eap&#xff0c;通过包控制->获取包重新获取之后&#xff0c;就变成默认的如下列表了。我从你的原始的eap导出参考数据&#xff0c;再导入到新建的eap&#xff0c…

python+pptx:(三)添加统计图、删除指定页

目录 统计图 删除PPT页 from pptx import Presentation from pptx.util import Cm, Inches, Mm, Pt from pptx.dml.color import RGBColor from pptx.chart.data import ChartData from pptx.enum.chart import XL_CHART_TYPE, XL_LABEL_POSITION, XL_DATA_LABEL_POSITIONfil…

基础概念理解

一&#xff0c;数据结构分类 连续结构&#xff0c;跳转结构。 二&#xff0c;对变量的理解 在 C 语言中&#xff0c;变量是用于存储数据的抽象符号。变量本质上是一块内存区域的标识符&#xff08;即它代表内存中的某一块区域&#xff09;&#xff0c;用来存储数据&#xff…

【微服务】不同微服务之间用户信息的获取和传递方案

如何才能在每个微服务中都拿到用户信息&#xff1f;如何在微服务之间传递用户信息&#xff1f; 文章目录 概述利用微服务网关做登录校验网关转微服务获取用户信息openFeign传递微服务之间的用户信息 概述 要在每个微服务中获取用户信息&#xff0c;可以采用以下几种方法&#…

5G NR:各物理信道的DMRS配置

DMRS简介 在5G中&#xff0c;DMRS&#xff08;DeModulation Reference Signal&#xff09;广泛存在于各个重要的物理信道当中&#xff0c;如下行的PBCH&#xff0c;PDCCH和PDSCH&#xff0c;以及上行的PUCCH和PUSCH。其最为重要的作用就是相干解调&#xff08;Coherence Demodu…

使用Docker快速部署FastAPI Web应用

Docker是基于 Linux 内核的cgroup、namespace以及 AUFS 类的Union FS 等技术&#xff0c;对进程进行封装隔离&#xff0c;一种操作系统层面的虚拟化技术。Docker中每个容器都基于镜像Image运行&#xff0c;镜像是容器的只读模板&#xff0c;容器是模板的一个实例。镜像是分层结…

「QT」几何数据类 之 QRectF 浮点型矩形类

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「QT」QT5程序设计&#x1f4da;全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasolid…

2024双十一有什么是宝妈们值得入手的?双十一母婴必买清单

随着双十一购物狂欢节的临近&#xff0c;宝妈们纷纷开始筹备为家庭增添新的宝贝。作为一年一度的大型促销活动&#xff0c;双十一不仅提供了各种优惠&#xff0c;更是宝妈们囤货的好时机。2024双十一有什么是宝妈们值得入手的&#xff1f;在这个特殊的日子里&#xff0c;母婴产…

VMware Fusion和centos 8的安装

资源 本文用到的文件&#xff1a;centos8镜像 , VMware 软件包 , Termius 文件链接: https://pan.baidu.com/s/1kOES_ZJ8NGN-BnJl6NC7Sg?pwd63ct 安装虚拟机 先 安装 vmware &#xff0c;然后打开&#xff0c;将下载的 iso 镜像拖入 拖入镜像文件iso Continue, 然后随便选…

返回对象的唯一标识符通常是对象的内存地址id(对象或变量)

【小白从小学Python、C、Java】 【考研初试复试毕业设计】 【Python基础AI数据分析】 返回对象的唯一标识符 通常是对象的内存地址 id(对象或变量) [太阳]选择题 根据题目代码&#xff0c;执行的结果是&#xff1f; a [1, 2, 3] b a c a.copy() print("【显示】id(a) &…

SAP ABAP开发学习——WDA 四

目录 页面技术的发展 WebDynpro Layout控件 Layout的组件结构 布局方式 流式布局FlowLayout ​编辑 行布局RowLayout 矩阵布局MatrixLayout 网格布局GridLayout 数据绑定 在屏幕上显示数据 数据的双向传输 通过数据绑定控制UI显示 属性节点类型 属性的数据类型 …

速度快还看巡飞,筒射巡飞无人机技术详解

筒射巡飞无人机&#xff08;Launch and Recovery by Tube&#xff0c;LRAT或Launcher-Deployed Loitering Munition&#xff0c;LDLM&#xff09;作为一种新型无人机系统&#xff0c;近年来在军事和民用领域都展现出了巨大的潜力。以下是对筒射巡飞无人机技术的详细解析&#x…

想要监控办公电脑?那款电脑监控软件最好

在现代企业中&#xff0c;电脑监控已经成为了一项不可或缺的管理工具&#xff0c;尤其是对那些有多个部门和员工的公司。良好的电脑监控软件不仅能够帮助管理者了解员工的工作情况&#xff0c;还能提高工作效率、防止内部信息泄露以及保障公司数据安全。市场上有不少监控软件&a…

Elasticsearch(三):Elasticvue使用及DSL执行新增、查询操作

Elasticvue使用及DSL执行CURD 1 概述2 什么是Elasticsearch DSL3 基本结构4 客户端工具介绍4.1 索引介绍4.2 创建简单索引4.3 创建相对完整的索引4.4 插入数据4.4.1 基本插入操作4.4.2 批量插入操作 5 常用的DSL查询类型5.1 match查询5.1.1 match工作原理5.1.2 operator 参数5.…