文章目录
- 一、workforce 简介
- 1.架构设计
- 2.通信机制
- 二、workforce 工作流程图示例
- 1.用户角色
- 2.工作流程
- 三、workforce 中重要函数说明
- 1.`__init__`函数
- 2.`add_single_agent_worker` 函数
- 3.`add_role_playing_worker` 函数
- 4.`add_workforce` 函数
- 四、基于workforce实现多智能体协调
- (1)创建一个 Workforce 实例
- (2)worker定义
- (3)添加到Workforce 的工作节点
- (4)创建一个任务
- (5)启动 Workforce 的任务处理流程
- 完整示例代码
- 五、遇到问题
一、workforce 简介
Workforce是CAMEL框架中的一个多智能体协同工作系统。它以一种简洁的方式让多个智能体协作完成任务,类似于一个高效的团队合作系统。
1.架构设计
Workforce采用层级架构设计。一个workforce可以包含多个工作节点(worker nodes)
,每个工作节点可以包含一个或多个智能体作为工作者。工作节点由workforce内部的**协调智能体(coordinator agent)**管理,协调智能体根据工作节点的描述及其工具集来分配任务。
2.通信机制
Workforce内部的通信基于任务通道(task channel)。Workforce初始化时会创建一个所有节点共享的通道。任务会被发布到这个通道中,每个工作节点会监听通道,接受分配给它的任务并解决。当任务完成后,工作节点会将结果发布回通道,结果会作为其他任务的"依赖项"保留在通道中,供所有工作节点共享。
通过这种机制,workforce能够以协作和高效的方式解决任务。
二、workforce 工作流程图示例
如图,展示了如何通过以下智能体协作完成一个请求,即“创建一个产品的登录页面”
1.用户角色
-
Root Node (Manager):作为系统的管理者,负责接收任务并协调任务的分解和分配。
-
Coordinator Agent (协调智能体) 和 Task Manager Agent (任务管理智能体):管理任务分解、依赖关系、分发任务,以及监控任务完成情况。
-
Leaf Nodes (Workers):执行任务的实际智能体,分别承担不同的角色(如“内容撰写者”和“代码撰写者”)。
2.工作流程
(a) 用户需求的接收
用户发出任务请求(例如“创建一个登录页面”,图中 1)。
Coordinator Agent 接收需求,作为入口点。
(b) 任务分解与定义
Coordinator Agent 通过任务分解策略,将请求拆分为多个子任务(如任务 A.1 和 A.2),并定义:
这些任务被送到 Task Manager Agent 进行分发(图中 2 和 3)。
© 任务的分配
Task Manager Agent 将任务分发到 Channel(图中 4),这是一个任务管理中枢。
任务按角色需求分配到 Leaf Nodes (Workers),包括:
(d) Leaf Nodes 执行任务
内容撰写者 (Content Writer):
代码撰写者 (Code Writer):
(e) 结果整合与返回
Coordinator Agent 汇总所有任务结果(如 A.1 和 A.2 的结果)。
将完整的任务结果返回给用户(图中 18)。
三、workforce 中重要函数说明
1.__init__
函数
def __init__(
self,
description: str,
children: Optional[List[BaseNode]] = None,
coordinator_agent_kwargs: Optional[Dict] = None,
task_agent_kwargs: Optional[Dict] = None,
new_worker_agent_kwargs: Optional[Dict] = None,
) -> None:
super().__init__(description)
self._child_listening_tasks: Deque[asyncio.Task] = deque()
self._children = children or []
self.new_worker_agent_kwargs = new_worker_agent_kwargs
coord_agent_sys_msg = BaseMessage.make_assistant_message(
role_name="Workforce Manager",
content="You are coordinating a group of workers. A worker can be "
"a group of agents or a single agent. Each worker is "
"created to solve a specific kind of task. Your job "
"includes assigning tasks to a existing worker, creating "
"a new worker for a task, etc.",
)
self.coordinator_agent = ChatAgent(
coord_agent_sys_msg, **(coordinator_agent_kwargs or {})
)
task_sys_msg = BaseMessage.make_assistant_message(
role_name="Task Planner",
content="You are going to compose and decompose tasks.",
)
self.task_agent = ChatAgent(task_sys_msg, **(task_agent_kwargs or {}))
# If there is one, will set by the workforce class wrapping this
self._task: Optional[Task] = None
self._pending_tasks: Deque[Task] = deque()
其中coordinator_agent的本质为ChatAgent
,role_name=“Workforce Manager”,对应的提示词为:
"You are coordinating a group of workers. A worker can be "
"a group of agents or a single agent. Each worker is "
"created to solve a specific kind of task. Your job "
"includes assigning tasks to a existing worker, creating "
"a new worker for a task, etc."
task_agentt的本质也是ChatAgent
,role_name=“Task Planner”,对应的提示词为:
"You are going to compose and decompose tasks."
2.add_single_agent_worker
函数
@check_if_running(False)
def add_single_agent_worker(
self, description: str, worker: ChatAgent
) -> Workforce:
r"""Add a worker node to the workforce that uses a single agent.
Args:
description (str): Description of the worker node.
worker (ChatAgent): The agent to be added.
Returns:
Workforce: The workforce node itself.
"""
worker_node = SingleAgentWorker(description, worker)
self._children.append(worker_node)
return self
调用add_single_agent_worker
时会添加单个工作节点worker_node
.
3.add_role_playing_worker
函数
def add_role_playing_worker(
self,
description: str,
assistant_role_name: str,
user_role_name: str,
assistant_agent_kwargs: Optional[Dict] = None,
user_agent_kwargs: Optional[Dict] = None,
chat_turn_limit: int = 3,
) -> Workforce:
r"""Add a worker node to the workforce that uses `RolePlaying` system.
Args:
description (str): Description of the node.
assistant_role_name (str): The role name of the assistant agent.
user_role_name (str): The role name of the user agent.
assistant_agent_kwargs (Optional[Dict], optional): The keyword
arguments to initialize the assistant agent in the role
playing, like the model name, etc. Defaults to `None`.
user_agent_kwargs (Optional[Dict], optional): The keyword arguments
to initialize the user agent in the role playing, like the
model name, etc. Defaults to `None`.
chat_turn_limit (int, optional): The maximum number of chat turns
in the role playing. Defaults to 3.
Returns:
Workforce: The workforce node itself.
"""
worker_node = RolePlayingWorker(
description,
assistant_role_name,
user_role_name,
assistant_agent_kwargs,
user_agent_kwargs,
chat_turn_limit,
)
self._children.append(worker_node)
return self
调用add_role_playing_worker
时会添加RolePlayingWorker
的工作节点.
4.add_workforce
函数
@check_if_running(False)
def add_workforce(self, workforce: Workforce) -> Workforce:
r"""Add a workforce node to the workforce.
Args:
workforce (Workforce): The workforce node to be added.
Returns:
Workforce: The workforce node itself.
"""
self._children.append(workforce)
return self
调用add_workforce
时会添加workforce
类型的工作节点.
四、基于workforce实现多智能体协调
关键步骤为
(1)创建一个 Workforce 实例
workforce = Workforce(description="旅游攻略制作与评估工作组",new_worker_agent_kwargs={'model':model},coordinator_agent_kwargs={'model':model},task_agent_kwargs={'model':model})
(2)worker定义
planner_agent = ChatAgent(
system_message="""你是一个专业的旅行规划师。你的职责是:
1. 根据景点分布规划合理的游览顺序
2. 为每天安排适量的景点和活动
3. 考虑用餐、休息等时间
4. 注意不同季节的特点
请确保行程安排合理且具有可行性。""",
model=model,
output_language='中文'
)
(3)添加到Workforce 的工作节点
workforce.add_single_agent_worker(
"负责制定详细行程规划",
worker=planner_agent
)
(4)创建一个任务
from camel.tasks import Task
# 创建一个用于测试的任务
task = Task(
content="规划一个3天的巴黎旅行计划。",
id="0", # id可以是任何标记字符串
)
(5)启动 Workforce 的任务处理流程
task = workforce.process_task(task)
完整示例代码
from camel.agents import ChatAgent
from camel.models import ModelFactory
from camel.types import ModelPlatformType
from camel.messages import BaseMessage
from camel.societies.workforce import Workforce
from camel.toolkits import SearchToolkit
from camel.tasks import Task
from camel.toolkits import FunctionTool
from camel.configs import SiliconFlowConfig # 关键
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv('Siliconflow_API_KEY')
model = ModelFactory.create(
model_platform=ModelPlatformType.SILICONFLOW,
model_type="Qwen/Qwen2.5-72B-Instruct",
model_config_dict=SiliconFlowConfig(temperature=0.2).as_dict(),
api_key=api_key
)
# 创建一个 Workforce 实例
workforce = Workforce(description="旅游攻略制作与评估工作组",new_worker_agent_kwargs={'model':model},coordinator_agent_kwargs={'model':model},task_agent_kwargs={'model':model})
search_tool = FunctionTool(SearchToolkit().search_duckduckgo)
search_agent = ChatAgent(
system_message="""你是一个专业的旅游信息搜索助手。你的职责是:
1. 搜索目的地的主要景点信息
2. 搜索当地特色美食信息
3. 搜索交通和住宿相关信息
请确保信息的准确性和实用性。""",
model=model,
tools=[search_tool],
output_language='中文'
)
planner_agent = ChatAgent(
system_message="""你是一个专业的旅行规划师。你的职责是:
1. 根据景点分布规划合理的游览顺序
2. 为每天安排适量的景点和活动
3. 考虑用餐、休息等时间
4. 注意不同季节的特点
请确保行程安排合理且具有可行性。""",
model=model,
output_language='中文'
)
reviewer_agent = ChatAgent(
system_message="""你是一个经验丰富的旅行爱好者。你的职责是:
1. 从游客角度评估行程的合理性
2. 指出可能的问题和改进建议
3. 补充实用的旅行小贴士
4. 评估行程的性价比
请基于实际旅行经验给出中肯的建议。""",
model=model,
output_language='中文'
)
# 添加工作节点
workforce.add_single_agent_worker(
"负责搜索目的地相关信息",
worker=search_agent
).add_single_agent_worker(
"负责制定详细行程规划",
worker=planner_agent
).add_single_agent_worker(
"负责从游客角度评估行程",
worker=reviewer_agent
)
from camel.tasks import Task
# 创建一个用于测试的任务
task = Task(
content="规划一个3天的巴黎旅行计划。",
id="0", # id可以是任何标记字符串
)
task = workforce.process_task(task)
print(task.result)
五、遇到问题
关键报错代码
If you have a specific format in mind or a particular context for this number, please let me know!
Traceback (most recent call last):
File "/home/allyoung/camel_course/03-2-33-Workforce.py", line 84, in <module>
task = workforce.process_task(task)
File "/home/allyoung/camel/camel/societies/workforce/utils.py", line 69, in wrapper
return func(self, *args, **kwargs)
File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 153, in process_task
asyncio.run(self.start())
File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/asyncio/base_events.py", line 641, in run_until_complete
return future.result()
File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 469, in start
await self._listen_to_channel()
File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 437, in _listen_to_channel
await self._post_ready_tasks()
File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 402, in _post_ready_tasks
assignee_id = self._find_assignee(task=ready_task)
File "/home/allyoung/camel/camel/societies/workforce/workforce.py", line 288, in _find_assignee
result_dict = json.loads(response.msg.content)
File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/home/allyoung/anaconda3/envs/python310/lib/python3.10/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)