基于CAMEL 的Workforce 实现多智能体协同工作系统

news2025/3/22 18:24:23

文章目录

    • 一、workforce 简介
      • 1.架构设计
      • 2.通信机制
    • 二、workforce 工作流程图示例
      • 1.用户角色
      • 2.工作流程
    • 三、workforce 中重要函数说明
      • 1.`__init__`函数
      • 2.`add_single_agent_worker` 函数
      • 3.`add_role_playing_worker` 函数
      • 4.`add_workforce` 函数
    • 四、基于workforce实现多智能体协调
      • (1)创建一个 Workforce 实例
      • (2)worker定义
      • (3)添加到Workforce 的工作节点
      • (4)创建一个任务
      • (5)启动 Workforce 的任务处理流程
      • 完整示例代码
    • 五、遇到问题

一、workforce 简介

Workforce是CAMEL框架中的一个多智能体协同工作系统。它以一种简洁的方式让多个智能体协作完成任务,类似于一个高效的团队合作系统。

1.架构设计

Workforce采用层级架构设计。一个workforce可以包含多个工作节点(worker nodes),每个工作节点可以包含一个或多个智能体作为工作者。工作节点由workforce内部的**协调智能体(coordinator agent)**管理,协调智能体根据工作节点的描述及其工具集来分配任务。

2.通信机制

Workforce内部的通信基于任务通道(task channel)。Workforce初始化时会创建一个所有节点共享的通道。任务会被发布到这个通道中,每个工作节点会监听通道,接受分配给它的任务并解决。当任务完成后,工作节点会将结果发布回通道,结果会作为其他任务的"依赖项"保留在通道中,供所有工作节点共享。

通过这种机制,workforce能够以协作和高效的方式解决任务。

二、workforce 工作流程图示例

如图,展示了如何通过以下智能体协作完成一个请求,即“创建一个产品的登录页面”

在这里插入图片描述

1.用户角色

  • Root Node (Manager):作为系统的管理者,负责接收任务并协调任务的分解和分配。

  • Coordinator Agent (协调智能体)Task Manager Agent (任务管理智能体):管理任务分解、依赖关系、分发任务,以及监控任务完成情况。

  • Leaf Nodes (Workers):执行任务的实际智能体,分别承担不同的角色(如“内容撰写者”和“代码撰写者”)。

2.工作流程

(a) 用户需求的接收

用户发出任务请求(例如“创建一个登录页面”,图中 1)。

Coordinator Agent 接收需求,作为入口点。

(b) 任务分解与定义

Coordinator Agent 通过任务分解策略,将请求拆分为多个子任务(如任务 A.1 和 A.2),并定义:

这些任务被送到 Task Manager Agent 进行分发(图中 2 和 3)。

© 任务的分配

Task Manager Agent 将任务分发到 Channel(图中 4),这是一个任务管理中枢。

任务按角色需求分配到 Leaf Nodes (Workers),包括:

(d) Leaf Nodes 执行任务

内容撰写者 (Content Writer)

代码撰写者 (Code Writer)

(e) 结果整合与返回

Coordinator Agent 汇总所有任务结果(如 A.1 和 A.2 的结果)。

将完整的任务结果返回给用户(图中 18)。

三、workforce 中重要函数说明

1.__init__函数

    def __init__(
        self,
        description: str,
        children: Optional[List[BaseNode]] = None,
        coordinator_agent_kwargs: Optional[Dict] = None,
        task_agent_kwargs: Optional[Dict] = None,
        new_worker_agent_kwargs: Optional[Dict] = None,
    ) -> None:
        super().__init__(description)
        self._child_listening_tasks: Deque[asyncio.Task] = deque()
        self._children = children or []
        self.new_worker_agent_kwargs = new_worker_agent_kwargs

        coord_agent_sys_msg = BaseMessage.make_assistant_message(
            role_name="Workforce Manager",
            content="You are coordinating a group of workers. A worker can be "
            "a group of agents or a single agent. Each worker is "
            "created to solve a specific kind of task. Your job "
            "includes assigning tasks to a existing worker, creating "
            "a new worker for a task, etc.",
        )
        self.coordinator_agent = ChatAgent(
            coord_agent_sys_msg, **(coordinator_agent_kwargs or {})
        )

        task_sys_msg = BaseMessage.make_assistant_message(
            role_name="Task Planner",
            content="You are going to compose and decompose tasks.",
        )
        self.task_agent = ChatAgent(task_sys_msg, **(task_agent_kwargs or {}))

        # If there is one, will set by the workforce class wrapping this
        self._task: Optional[Task] = None
        self._pending_tasks: Deque[Task] = deque()

其中coordinator_agent的本质为ChatAgent,role_name=“Workforce Manager”,对应的提示词为:

"You are coordinating a group of workers. A worker can be "
            "a group of agents or a single agent. Each worker is "
            "created to solve a specific kind of task. Your job "
            "includes assigning tasks to a existing worker, creating "
            "a new worker for a task, etc."

task_agentt的本质也是ChatAgent,role_name=“Task Planner”,对应的提示词为:

"You are going to compose and decompose tasks."

2.add_single_agent_worker 函数

    @check_if_running(False)
    def add_single_agent_worker(
        self, description: str, worker: ChatAgent
    ) -> Workforce:
        r"""Add a worker node to the workforce that uses a single agent.

        Args:
            description (str): Description of the worker node.
            worker (ChatAgent): The agent to be added.

        Returns:
            Workforce: The workforce node itself.
        """
        worker_node = SingleAgentWorker(description, worker)
        self._children.append(worker_node)
        return self

调用add_single_agent_worker时会添加单个工作节点worker_node.

3.add_role_playing_worker 函数

    def add_role_playing_worker(
        self,
        description: str,
        assistant_role_name: str,
        user_role_name: str,
        assistant_agent_kwargs: Optional[Dict] = None,
        user_agent_kwargs: Optional[Dict] = None,
        chat_turn_limit: int = 3,
    ) -> Workforce:
        r"""Add a worker node to the workforce that uses `RolePlaying` system.

        Args:
            description (str): Description of the node.
            assistant_role_name (str): The role name of the assistant agent.
            user_role_name (str): The role name of the user agent.
            assistant_agent_kwargs (Optional[Dict], optional): The keyword
                arguments to initialize the assistant agent in the role
                playing, like the model name, etc. Defaults to `None`.
            user_agent_kwargs (Optional[Dict], optional): The keyword arguments
                to initialize the user agent in the role playing, like the
                model name, etc. Defaults to `None`.
            chat_turn_limit (int, optional): The maximum number of chat turns
                in the role playing. Defaults to 3.

        Returns:
            Workforce: The workforce node itself.
        """
        worker_node = RolePlayingWorker(
            description,
            assistant_role_name,
            user_role_name,
            assistant_agent_kwargs,
            user_agent_kwargs,
            chat_turn_limit,
        )
        self._children.append(worker_node)
        return self

调用add_role_playing_worker时会添加RolePlayingWorker的工作节点.

4.add_workforce 函数

	@check_if_running(False)
    def add_workforce(self, workforce: Workforce) -> Workforce:
        r"""Add a workforce node to the workforce.

        Args:
            workforce (Workforce): The workforce node to be added.

        Returns:
            Workforce: The workforce node itself.
        """
        self._children.append(workforce)
        return self

调用add_workforce时会添加workforce类型的工作节点.

四、基于workforce实现多智能体协调

关键步骤为

(1)创建一个 Workforce 实例

workforce = Workforce(description="旅游攻略制作与评估工作组",new_worker_agent_kwargs={'model':model},coordinator_agent_kwargs={'model':model},task_agent_kwargs={'model':model})

(2)worker定义

planner_agent = ChatAgent(
            system_message="""你是一个专业的旅行规划师。你的职责是:
                1. 根据景点分布规划合理的游览顺序
                2. 为每天安排适量的景点和活动
                3. 考虑用餐、休息等时间
                4. 注意不同季节的特点
                请确保行程安排合理且具有可行性。""",
            model=model,
            output_language='中文'
        )

(3)添加到Workforce 的工作节点

workforce.add_single_agent_worker(
    "负责制定详细行程规划",
    worker=planner_agent
)

(4)创建一个任务

from camel.tasks import Task

# 创建一个用于测试的任务
task = Task(
    content="规划一个3天的巴黎旅行计划。",
    id="0",  # id可以是任何标记字符串
)

(5)启动 Workforce 的任务处理流程

task = workforce.process_task(task)

完整示例代码

from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.messages import BaseMessage
from camel.societies.workforce import Workforce
from camel.toolkits import SearchToolkit
from camel.tasks import Task
from camel.toolkits import FunctionTool
from camel.configs import SiliconFlowConfig  # 关键

from dotenv import load_dotenv
import os

load_dotenv()

api_key = os.getenv('Siliconflow_API_KEY')

model = ModelFactory.create(
    model_platform=ModelPlatformType.SILICONFLOW,
    model_type="Qwen/Qwen2.5-72B-Instruct",
    model_config_dict=SiliconFlowConfig(temperature=0.2).as_dict(),
    api_key=api_key
)
        
# 创建一个 Workforce 实例
workforce = Workforce(description="旅游攻略制作与评估工作组",new_worker_agent_kwargs={'model':model},coordinator_agent_kwargs={'model':model},task_agent_kwargs={'model':model})

search_tool = FunctionTool(SearchToolkit().search_duckduckgo)

search_agent = ChatAgent(
            system_message="""你是一个专业的旅游信息搜索助手。你的职责是:
                1. 搜索目的地的主要景点信息
                2. 搜索当地特色美食信息
                3. 搜索交通和住宿相关信息
                请确保信息的准确性和实用性。""",
            model=model,
            tools=[search_tool],
            output_language='中文'
        )

planner_agent = ChatAgent(
            system_message="""你是一个专业的旅行规划师。你的职责是:
                1. 根据景点分布规划合理的游览顺序
                2. 为每天安排适量的景点和活动
                3. 考虑用餐、休息等时间
                4. 注意不同季节的特点
                请确保行程安排合理且具有可行性。""",
            model=model,
            output_language='中文'
        )

reviewer_agent = ChatAgent(
    system_message="""你是一个经验丰富的旅行爱好者。你的职责是:
        1. 从游客角度评估行程的合理性
        2. 指出可能的问题和改进建议
        3. 补充实用的旅行小贴士
        4. 评估行程的性价比
        请基于实际旅行经验给出中肯的建议。""",
    model=model,
    output_language='中文'
)

# 添加工作节点
workforce.add_single_agent_worker(
    "负责搜索目的地相关信息",
    worker=search_agent
).add_single_agent_worker(
    "负责制定详细行程规划",
    worker=planner_agent
).add_single_agent_worker(
    "负责从游客角度评估行程",
    worker=reviewer_agent
)

from camel.tasks import Task

# 创建一个用于测试的任务
task = Task(
    content="规划一个3天的巴黎旅行计划。",
    id="0",  # id可以是任何标记字符串
)

task = workforce.process_task(task)

print(task.result)

五、遇到问题

关键报错代码

If you have a specific format in mind or a particular context for this number, please let me know!
Traceback (most recent call last):
  File "/home/allyoung/camel_course/03-2-33-Workforce.py", line 84, in <module>
    task = workforce.process_task(task)
  File "/home/allyoung/camel/camel/societies/workforce/utils.py", line 69, in wrapper
    return func(self, *args, **kwargs)
  File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 153, in process_task
    asyncio.run(self.start())
  File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/asyncio/base_events.py", line 641, in run_until_complete
    return future.result()
  File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 469, in start
    await self._listen_to_channel()
  File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 437, in _listen_to_channel
    await self._post_ready_tasks()
  File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 402, in _post_ready_tasks
    assignee_id = self._find_assignee(task=ready_task)
  File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 288, in _find_assignee
    result_dict = json.loads(response.msg.content)
  File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
  File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2319720.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PostgreSQL_数据表结构设计并创建

目录 前置&#xff1a; 1 数据表设计思路 2 数据表格SQL 3 创建 3.1 创建数据库 db_stock 3.2 在 pgAdmin4 中创建表 前置&#xff1a; 本博文是一个系列。在本人“数据库专栏”-》“PostgreSQL_”开头的博文 1 数据表设计思路 1 日数据来自优矿&#xff0c;优矿的数据…

如何在MCU工程中启用HardFault硬错误中断

文章目录 一、HardFault出现场景二、启动HardFault三、C代码示例 一、HardFault出现场景 HardFault&#xff08;硬故障&#xff09; 错误中断是 ARM Cortex-M 系列微控制器中一个较为严重的错误中断&#xff0c;一旦触发&#xff0c;表明系统遇到了无法由其他异常处理机制解决…

MySQL -- 复合查询

数据库的查询是数据库使用中比较重要的环节&#xff0c;前面的基础查询比较简单&#xff0c;不做介绍&#xff0c;可自行查阅。本文主要介绍复合查询&#xff0c;并结合用例进行讲解。 本文的用例依据Soctt模式的经典测试表&#xff0c;可以自行下载&#xff0c;也可以自己创建…

卷积神经网络 - 卷积层(具体例子)

为了更一步学习卷积神经网络之卷积层&#xff0c;本文我们来通过几个个例子来加深理解。 一、灰度图像和彩色图像的关于特征映射的例子 下面我们通过2个例子来形象说明卷积层中“特征映射”的概念&#xff0c;一个针对灰度图像&#xff0c;一个针对彩色图像。 例子 1&#x…

测试Claude3.7 sonnet画蛋白质

测试Claude3.7 sonnet画蛋白虽然画的很粗糙&#xff0c;但是大致画了出来

java项目之基于ssm的游戏攻略网站(源码+文档)

项目简介 游戏攻略网站实现了以下功能&#xff1a; 管理员主要负责填充图书和其类别信息&#xff0c;并对已填充的数据进行维护&#xff0c;包括修改与删除&#xff0c;管理员也需要审核老师注册信息&#xff0c;发布公告信息&#xff0c;管理自助租房信息等。 &#x1f495;…

本地基于Ollama部署的DeepSeek详细接口文档说明

前文&#xff0c;我们已经在本地基于Ollama部署好了DeepSeek大模型&#xff0c;并且已经告知过如何查看本地的API。为了避免网络安全问题&#xff0c;我们希望已经在本地调优的模型&#xff0c;能够嵌入到在本地的其他应用程序中&#xff0c;发挥本地DeepSeek的作用。因此需要知…

python NameError报错之导库报错

在日常代码编写中&#xff0c;经常出现如 图1 一样的报错&#xff0c;在代码多时很难找到问题&#xff0c;但翻看代码后就会发现是因为未导库&#xff0c; 图1 报错 代码: time.sleep(0.1) print("time库") 解决方法: 第一步:在代码中添加导库代码 import time #…

Web3网络生态中数据保护合规性分析

Web3网络生态中数据保护合规性分析 在这个信息爆炸的时代&#xff0c;Web3网络生态以其独特的去中心化特性&#xff0c;逐渐成为数据交互和价值转移的新平台。Web3&#xff0c;也被称为去中心化互联网&#xff0c;其核心理念是将数据的控制权归还给用户&#xff0c;实现数据的…

C++ 语法之数组指针

一维数组&#xff1a; 如果我们定义了一个一维数组&#xff0c;那么这个数组名&#xff0c;就是指向第一个数组元素的地址&#xff0c;也即&#xff0c;是整个数组分配的内存空间的首地址。 比如 int a[3]; 定义了一个包含三个元素的数组。因为一个int占4个字节&#xff0c;那…

PLY格式文件如何转换成3DTiles格式——使用GISBox软件实现高效转换

一、概述 在三维GIS和数字孪生领域&#xff0c;3DTiles格式已成为主流的数据格式之一。它由Cesium团队提出&#xff0c;专为大规模3D数据可视化设计&#xff0c;能够高效地加载和展示海量模型数据。而PLY格式则是一种常见的三维模型文件格式&#xff0c;主要用于存储点云数据或…

Java定时任务的三重境界:从单机心跳到分布式协调

《Java定时任务的三重境界&#xff1a;从单机心跳到分布式协调》 本文将以生产级代码标准&#xff0c;揭秘Java定时任务从基础API到分布式调度的6种实现范式&#xff0c;深入剖析ScheduledThreadPoolExecutor与Quartz Scheduler的线程模型差异&#xff0c;并给出各方案的性能压…

【Linux网络】手动部署并测试内网穿透

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;博客仓库&#xff1a;https://gitee.com/JohnKingW/linux_test/tree/master/lesson &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &…

java项目之在线购物系统(源码+文档)

项目简介 在线购物系统实现了以下功能&#xff1a; 使用在线购物系统的用户分管理员和用户两个角色的权限子模块。 管理员所能使用的功能主要有&#xff1a;主页、个人中心、用户管理、商品分类管理、商品信息管理、系统管理、订单管理等。 用户可以实现主页、个人中心、我的…

OO_Unit1

第一次作业 UML类图 代码复杂度分析 其中Expr中的toString方法认知复杂度比较高&#xff0c;主要源于多层条件嵌套和分散的字符串处理逻辑&#xff0c;重构时可重点关注这两部分的解耦。 代码量分析 1.”通用形式“ 我觉得我的设计的最大特点就是“通用形式”&#xff0c;具…

重要重要!!fisher矩阵元素有什么含义和原理; Fisher 信息矩阵的形式; 得到fisher矩阵之后怎么使用

fisher矩阵元素有什么含义和原理 目录 fisher矩阵元素有什么含义和原理一、对角线元素( F i , i F_{i,i} Fi,i​)的含义与原理二、非对角线元素( F i , j F_{i,j} Fi,j​)的含义与原理Fisher 信息矩阵的形式矩阵的宽度有位置权重数量决定1. **模型参数结构决定矩阵维度**2.…

[已解决]jupyter notebook报错 500 : Internal Server Error及notebook闪退

jupyter notebook出现如上图的报错&#xff0c;可以在黑色窗口中检查是为什么报错。 我检查发现是nbconvert导致的问题&#xff0c;卸载重装nbconvert。 但是这时候出现&#xff0c;jupyter notebook闪退问题。jupyter的黑色窗口出现一秒钟就没了。 在Anaconda Prompt中检查ju…

2025年渗透测试面试题总结- 某亭-安全研究员(题目+回答)

网络安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 一、SQL注入过滤单引号绕过方法 二、MySQL报错注入常用函数 三、报错注入绕WAF 四、MySQL写文件函数…

Redis分布式锁如何实现——简单理解版

目录 前言 满足条件 加锁之后产生的问题 避免死锁的方法 Lua脚本实现避免释放其他锁 看门狗判断过期 扩展 Lua脚本 Redission 前言 在如今开发的某些项目中&#xff0c;多个进程必须以互斥的方式独占共享资源&#xff0c;这时用分布式锁是最直接有效的&#xff0c;分布式…

数字化转型驱动卫生用品安全革新

当315晚会上晃动的暗访镜头揭露卫生巾生产车间里漂浮的异物、纸尿裤原料仓中霉变的碎屑时&#xff0c;这一触目惊心的场景无情地撕开了“贴身安全”的遮羞布&#xff0c;暴露的不仅是部分企业的道德缺失&#xff0c;更凸显了当前检测与监管体系的漏洞&#xff0c;为整个行业敲响…