【AI Agent系列】【MetaGPT多智能体学习】5. 多智能体案例拆解 - 基于MetaGPT的智能体辩论(附完整代码)

news2024/12/23 8:10:15

本系列文章跟随《MetaGPT多智能体课程》(https://github.com/datawhalechina/hugging-multi-agent),深入理解并实践多智能体系统的开发。

本文为该课程的第四章(多智能体开发)的第三篇笔记。主要是对课程刚开始环境搭建成功后跑通的第一个MetaGPT多智能体案例 - 多智能体辩论,进行学习,并拆解其中的实现步骤和原理。

系列笔记

  • 【AI Agent系列】【MetaGPT多智能体学习】0. 环境准备 - 升级MetaGPT 0.7.2版本及遇到的坑
  • 【AI Agent系列】【MetaGPT多智能体学习】1. 再理解 AI Agent - 经典案例和热门框架综述
  • 【AI Agent系列】【MetaGPT多智能体学习】2. 重温单智能体开发 - 深入源码,理解单智能体运行框架
  • 【AI Agent系列】【MetaGPT多智能体学习】3. 开发一个简单的多智能体系统,兼看MetaGPT多智能体运行机制
  • 【AI Agent系列】【MetaGPT多智能体学习】4. 基于MetaGPT的Team组件开发你的第一个智能体团队

文章目录

  • 系列笔记
  • 0. 辩论Action的定义
  • 1. 辩论智能体的定义
    • 1.1 让两个智能体交替运行的重点
    • 1.2 ~~重写 _observe 函数~~
    • 1.3 重写 _act 函数
    • 1.4 最终Role的代码
  • 2. 实例化智能体
  • 3. 完整代码及运行结果
  • 4. 总结

  • 之前跑通环境的文章:【AI Agent系列】【MetaGPT多智能体学习】0. 环境准备 - 升级MetaGPT 0.7.2版本及遇到的坑

多智能体辩论需求。假设是两个智能体进行辩论。

0. 辩论Action的定义

辩论进行的动作都一样,所以Action可以共用一个。除了 Prompt 需要费点功夫想,其它的流程都和之前一样,平平无奇。

class SpeakAloud(Action):
    """Action: Speak out aloud in a debate (quarrel)"""

    PROMPT_TEMPLATE: str = """
    ## BACKGROUND
    Suppose you are {name}, you are in a debate with {opponent_name}.
    ## DEBATE HISTORY
    Previous rounds:
    {context}
    ## YOUR TURN
    Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,
    craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:
    """
    name: str = "SpeakAloud"

    async def run(self, context: str, name: str, opponent_name: str):
        prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)
        # logger.info(prompt)

        rsp = await self._aask(prompt)

        return rsp

1. 辩论智能体的定义

智能体的职能也一样,所以可以共用一个Role的构造。

1.1 让两个智能体交替运行的重点

它的Action就是上面的辩论Action,通过 self.set_actions([SpeakAloud]) 设置。

它的动作时机通过 self._watch([UserRequirement, SpeakAloud]) 来设置,可以看到它观察环境中出现 UserRequirementSpeakAloud 来源的消息时,会有所动作。

那么问题来了,现在辩论的智能体Role和Action都只有一个,Role1说完之后放到环境中的信息来源是 SpeakAloud,Role2说完后放到环境中的也是 SpeakAloud,如果还是之前的代码,那当环境中出现 SpeakAloud时,Role1 和 Role2 都会触发动作了。这是不对的。

我们需要的是Role1等待Role2说完再说,Role2等待Role1说完后再说,交替着表达。

怎么实现这一点呢?回想我们之前学的单智能体的运行周期,_observe —> _react(_think + _act) —> publish_message,能阻止智能体不动作的是 _observe 函数。

1.2 重写 _observe 函数

先来看下原_observe函数的实现:

async def _observe(self, ignore_memory=False) -> int:
    """Prepare new messages for processing from the message buffer and other sources."""
    # Read unprocessed messages from the msg buffer.
    news = []
    if self.recovered:
        news = [self.latest_observed_msg] if self.latest_observed_msg else []
    if not news:
        news = self.rc.msg_buffer.pop_all()
    # Store the read messages in your own memory to prevent duplicate processing.
    old_messages = [] if ignore_memory else self.rc.memory.get()
    self.rc.memory.add_batch(news)
    # Filter out messages of interest.
    self.rc.news = [
        n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
    ]
    self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None  # record the latest observed msg

    # Design Rules:
    # If you need to further categorize Message objects, you can do so using the Message.set_meta function.
    # msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer.
    news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]
    if news_text:
        logger.debug(f"{self._setting} observed: {news_text}")
    return len(self.rc.news)

它的结果 len(self.rc.news) 只要大于0,这个智能体就会执行 _react,开始行动。self.rc.news怎么来的?重点看源码中的这句:

# Filter out messages of interest.
self.rc.news = [
    n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages
]

self.rc.news 是从 news 中过滤出了该智能体所关心的消息 n.cause_by in self.rc.watch 和 指定发送给该智能体的消息 self.name in n.send_to。而 news 其实就是 msg_buffermsg_buffer 来自环境的 publish_message

再来看下环境 publish_message 的代码:

def publish_message(self, message: Message, peekable: bool = True) -> bool:
        """
        Distribute the message to the recipients.
        In accordance with the Message routing structure design in Chapter 2.2.1 of RFC 116, as already planned
        in RFC 113 for the entire system, the routing information in the Message is only responsible for
        specifying the message recipient, without concern for where the message recipient is located. How to
        route the message to the message recipient is a problem addressed by the transport framework designed
        in RFC 113.
        """
        logger.debug(f"publish_message: {message.dump()}")
        found = False
        # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
        for role, addrs in self.member_addrs.items():
            if is_send_to(message, addrs):
                role.put_message(message)
                found = True
        if not found:
            logger.warning(f"Message no recipients: {message.dump()}")
        self.history += f"\n{message}"  # For debug

        return True

看里面的 is_send_to 函数:

def is_send_to(message: "Message", addresses: set):
    """Return whether it's consumer"""
    if MESSAGE_ROUTE_TO_ALL in message.send_to:
        return True

    for i in addresses:
        if i in message.send_to:
            return True
    return False

如果指定了 message.send_to,则环境只会将该消息发送给指定的Role

光看上面的代码和我的文字可能不是很好理解,容易绕晕,我给你们画了个图:

在这里插入图片描述

如上图的流程,这对于我们的辩论智能体来说其实就够了。Role1执行完动作后,将结果消息指定发送给Role2(通过message的send_to参数),这样Role1的msg_buffer中就不会出现它自身的这个消息,下次run时msg_buffer就直接是空的,就不运行了,等待Role2的消息。同样的,Role2执行完后将消息指定发送给Role1。这样就实现了交互行动,辩论的过程。

教程中用的0.6.6版本,不知道源码是不是这样。但是它为了实现这个辩论过程,还重写了 _observe 函数:

async def _observe(self) -> int:
    await super()._observe()
    # accept messages sent (from opponent) to self, disregard own messages from the last round
    self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}] # 第二次筛选
    return len(self.rc.news)

可以参考下。但是我用的0.7.2版本,看源码和实践证明,不用重写_observe了。

1.3 重写 _act 函数

这里为什么要重写 _act 函数,最主要的原因是要填执行完结果消息中的 send_to 参数(源码中是没有填的)。还有就是组装辩论的上下文(对手说了什么 context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories))。

async def _act(self) -> Message:
    logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
    todo = self.rc.todo  # An instance of SpeakAloud

    memories = self.get_memories()
    context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
    print(context)

    rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)

    msg = Message(
        content=rsp,
        role=self.profile,
        cause_by=type(todo),
        sent_from=self.name,
        send_to=self.opponent_name,
    )
    self.rc.memory.add(msg)

    return msg

cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name,
这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。

给大家再看一下组装的上下文的示例,有个更直观的认识(Trump发言时,收到的上文是Biden及其之前发言的内容):

在这里插入图片描述

1.4 最终Role的代码

class Debator(Role):
    name: str = ""
    profile: str = ""
    opponent_name: str = ""

    def __init__(self, **data: Any):
        super().__init__(**data)
        self.set_actions([SpeakAloud])
        self._watch([UserRequirement, SpeakAloud])

	## 0.7.2 版本可以不用重写 _observe,具体原因分析见上文
    # async def _observe(self) -> int:
    #     await super()._observe()
    #     # accept messages sent (from opponent) to self, disregard own messages from the last round
    #     self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}]
    #     return len(self.rc.news)

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        todo = self.rc.todo  # An instance of SpeakAloud

        memories = self.get_memories()
        context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
        print(context)

        rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)

        msg = Message(
            content=rsp,
            role=self.profile,
            cause_by=type(todo),
            sent_from=self.name,
            send_to=self.opponent_name,
        )
        self.rc.memory.add(msg)

        return msg

2. 实例化智能体

实例化两个智能体,Team组件的用法前面已经学习过,可参考我的上篇文章。

其中值得注意的是,team.run_project(idea, send_to="Biden") 通过 send_to 参数指定谁先发言。

async def debate(idea: str, investment: float = 3.0, n_round: int = 5):
    """Run a team of presidents and watch they quarrel. :)"""
    Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")
    Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")
    team = Team()
    team.hire([Biden, Trump])
    team.invest(investment)
    team.run_project(idea, send_to="Biden")  # send debate topic to Biden and let him speak first
    await team.run(n_round=n_round)

3. 完整代码及运行结果

完整代码:


import asyncio
import platform
from typing import Any

import fire

from metagpt.actions import Action, UserRequirement
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.team import Team


class SpeakAloud(Action):
    """Action: Speak out aloud in a debate (quarrel)"""

    PROMPT_TEMPLATE: str = """
    ## BACKGROUND
    Suppose you are {name}, you are in a debate with {opponent_name}.
    ## DEBATE HISTORY
    Previous rounds:
    {context}
    ## YOUR TURN
    Now it's your turn, you should closely respond to your opponent's latest argument, state your position, defend your arguments, and attack your opponent's arguments,
    craft a strong and emotional response in 80 words, in {name}'s rhetoric and viewpoints, your will argue:
    """
    name: str = "SpeakAloud"

    async def run(self, context: str, name: str, opponent_name: str):
        prompt = self.PROMPT_TEMPLATE.format(context=context, name=name, opponent_name=opponent_name)
        # logger.info(prompt)

        rsp = await self._aask(prompt)

        return rsp


class Debator(Role):
    name: str = ""
    profile: str = ""
    opponent_name: str = ""

    def __init__(self, **data: Any):
        super().__init__(**data)
        self.set_actions([SpeakAloud])
        self._watch([UserRequirement, SpeakAloud])

    # async def _observe(self) -> int:
    #     await super()._observe()
    #     # accept messages sent (from opponent) to self, disregard own messages from the last round
    #     self.rc.news = [msg for msg in self.rc.news if msg.send_to == {self.name}]
    #     return len(self.rc.news)

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")
        todo = self.rc.todo  # An instance of SpeakAloud

        memories = self.get_memories()
        context = "\n".join(f"{msg.sent_from}: {msg.content}" for msg in memories)
        print(context)

        rsp = await todo.run(context=context, name=self.name, opponent_name=self.opponent_name)

        msg = Message(
            content=rsp,
            role=self.profile,
            cause_by=type(todo),
            sent_from=self.name,
            send_to=self.opponent_name,
        )
        self.rc.memory.add(msg)

        return msg


async def debate(idea: str, investment: float = 3.0, n_round: int = 5):
    """Run a team of presidents and watch they quarrel. :)"""
    Biden = Debator(name="Biden", profile="Democrat", opponent_name="Trump")
    Trump = Debator(name="Trump", profile="Republican", opponent_name="Biden")
    team = Team()
    team.hire([Biden, Trump])
    team.invest(investment)
    team.run_project(idea, send_to="Biden")  # send debate topic to Biden and let him speak first
    await team.run(n_round=n_round)


def main(idea: str, investment: float = 3.0, n_round: int = 10):
    """
    :param idea: Debate topic, such as "Topic: The U.S. should commit more in climate change fighting"
                 or "Trump: Climate change is a hoax"
    :param investment: contribute a certain dollar amount to watch the debate
    :param n_round: maximum rounds of the debate
    :return:
    """
    if platform.system() == "Windows":
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(debate(idea, investment, n_round))


if __name__ == "__main__":
    fire.Fire(main("Topic: The U.S. should commit more in climate change fighting"))

运行结果(交替发言):

在这里插入图片描述

4. 总结

这节内容最主要的是让我们了解和实践了多智能体间更多的消息交互和订阅机制,主要是 send_to 参数的使用。

cause_by=type(todo), sent_from=self.name, send_to=self.opponent_name, 这三个参数分别是形容Message的内容属性,来自于哪个action以及角色,并要发送给哪个角色。通过这样的机制可以实现相较于watch更灵活的订阅机制。


站内文章一览

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1482284.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

YOLOv9有效提点|加入SE、CBAM、ECA、SimAM等几十种注意力机制(一)

专栏介绍:YOLOv9改进系列 | 包含深度学习最新创新,主力高效涨点!!! 一、本文介绍 本文将以SE注意力机制为例,演示如何在YOLOv9种添加注意力机制! 《Squeeze-and-Excitation Networks》 SENet提出…

【六袆 - React】Next.js:React 开发框架;Next.js开发框架的特点

Next.js:React 开发框架 Next.js的特点 1.直观的、基于页面的路由系统(并支持动态路由) Next.js 提供了基于文件系统的路由,意味着你可以通过创建页面文件来定义路由。 伪代码示例: // pages/index.js export defa…

css【详解】—— 圣杯布局 vs 双飞翼布局 (含手写清除浮动 clearfix)

两者功能效果相同&#xff0c;实现方式不同 效果预览 两侧宽度固定&#xff0c;中间宽度自适应&#xff08;三栏布局&#xff09;中间部分优先渲染允许三列中的任意一列成为最高列 圣杯布局 通过左右栏填充容器的左右 padding 实现&#xff0c;更多细节详见注释。 <!DOCTYP…

day03-Vue-Element

一、Ajax 1 Ajax 介绍 1.1 Ajax 概述 概念&#xff1a;Asynchronous JavaScript And XML&#xff0c;异步 的 JavaScript 和 XML。 作用&#xff1a; 数据交换&#xff1a;通过 Ajax 可以给服务器发送请求&#xff0c;并获取服务器响应的数据。异步交互&#xff1a;可以在 不…

排序(2)——希尔排序

希尔排序&#xff08;缩小增量排序&#xff09; 基本思想 希尔排序法又称缩小增量法。希尔排序法的基本思想是&#xff1a;先选定一个整数&#xff0c;把待排序文件中所有记录分成个组&#xff0c;所有距离为的记录分在同一组内&#xff0c;并对每一组内的记录进行排序。然后&…

lotus 从矿工可用余额扣除扇区质押

修改 miner配置文件 # Whether to use available miner balance for sector collateral instead of sending it with each message## type: bool# env var: LOTUS_SEALING_COLLATERALFROMMINERBALANCE#CollateralFromMinerBalance falseCollateralFromMinerBalance true质押金…

手写数字识别(慕课MOOC人工智能之模式识别)

问题&#xff1a;手写数字识别 数据集 数据集链接请点击我 代码 %mat2vector.m function [data_] mat2vector(data,num)[row,col,~] size(data);data_zeros(num,row*col);for page 1:numfor rows 1:rowfor cols1:coldata_(page,((rows-1)*colcols)) im2double(data(rows,cols…

应用稳定性优化1:ANR问题全面解析

闪退、崩溃、无响应、重启等是应用稳定性常见的问题现象&#xff0c;稳定性故障大体可归类为ANR/冻屏、Crash/Tombstone、资源泄露三大类。本文通过对三类故障的产生原因、故障现象、触发机制及如何定位等&#xff0c;展开深度解读。 本文将详解ANR类故障&#xff0c;并通过一…

java数据结构与算法刷题-----LeetCode437. 路径总和 III(前缀和必须掌握)

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 深度优先2. 前缀和 1. 深度优先 解题思路&#xff1a;时间复…

图书推荐||Word文稿之美

让你的文档从平凡到出众&#xff01; 本书内容 《Word文稿之美》是一本全面介绍Word排版技巧和应用的实用指南。从初步认识数字排版到高效利用模板、图文配置和表格与图表的排版技巧&#xff0c;再到快速修正错误和保护文件&#xff0c;全面系统地讲解数字排版的技术和能力&…

多行业万能预约门店小程序源码系统 支持多门店预约小程序 带完整的安装代码包以及搭建教程

随着消费者对于服务体验要求的不断提升&#xff0c;门店预约系统成为了许多行业提升服务质量、提高运营效率的重要工具。然而&#xff0c;市面上的预约系统往往功能单一&#xff0c;无法满足多行业、多场景的个性化需求。下面&#xff0c;小编集合了多年的行业经验和技术积累&a…

Linux 安装k8s

官网 常见的三种安装k8s方式 1.kubeadm 2.kops&#xff1a;自动化集群制备工具 3.kubespray&#xff1a; 提供了 Ansible Playbook 下面以kubeadm安装k8s kubeadm的安装是通过使用动态链接的二进制文件完成的&#xff0c;目标系统需要提供 glibc ##使用 ss 或者 netstat 检测端…

基于Springboot的高校实习信息发布网站的设计与实现(有报告)。Javaee项目,springboot项目。

演示视频&#xff1a; 基于Springboot的高校实习信息发布网站的设计与实现&#xff08;有报告&#xff09;。Javaee项目&#xff0c;springboot项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xf…

jmeter如何请求访问https接口

添加线程组http请求 新建线程组&#xff0c;添加http请求 填入协议&#xff0c;ip&#xff0c;端口&#xff0c;请求类型&#xff0c;路径&#xff0c;以及请求参数&#xff0c;查看结果树等。 然后最关键的一步来了。 导入证书 步骤&#xff1a;获取证书&#xff0c;重新生…

Windows11家庭版安装Docker

文章目录 安装Docker安装hyper-v继续解决报错完成效果图进一步测试是否完成安装 安装Docker windows如何安装docker 装好之后&#xff0c;我打开报错。 安装hyper-v 按这个视频操作&#xff1a;Windows 11 家庭版安装 Hyper-V bat文件里的代码是&#xff1a; pushd "…

机器学习 | 模型性能评估

目录 一. 回归模型的性能评估1. 平均平方误差(MSE)2. 平均绝对误差(MAE)3. R 2 R^{2} R2 值3.1 R 2 R^{2} R2优点 二. 分类模型的性能评估1. 准确率&#xff08;Accuracy&#xff09;2. 召回率&#xff08;Recall&#xff09;3. 精确率&#xff08;Precision&#xff09;4. …

指针篇章-(1)

指针&#xff08;1&#xff09;学习流程 —————————————————————————————————————————————————————————————————————————————————————————————————————————————…

帝国cms7.5仿非小号区块链门户资讯网站源码 带手机版

帝国cms7.5仿非小号区块链门户资讯网站源码 带手机版 带自动采集 开发环境&#xff1a;帝国cms 7.5 安装环境&#xff1a;phpmysql 包含火车头采集规则和模块&#xff0c;采集目标站非小号官网。 专业的数字货币大数据平台模板&#xff0c;采用帝国cms7.5内核仿制&#xff0…

爱普生的SG2016系列高频,低相位抖动spxo样品

精工爱普生公司(TSE: 6724&#xff0c;“爱普生”)已经开始发货样品的新系列简单封装晶体振荡器(SPXO)与差分输出1。该系列包括SG2016EGN、SG2016EHN、SG2016VGN和SG2016VHN。它们在基本模式下都具有低相位抖动&#xff0c;并且采用尺寸为2.0 x 1.6 mm的小封装&#xff0c;高度…

【go从入门到精通】什么是go?为什么要选择go?

go的出生&#xff1a; go语言&#xff08;或Golang&#xff09;是Google开发的开源编程语言&#xff0c;诞生于2006年1月2日下午15点4分5秒&#xff0c;于2009年11月开源&#xff0c;2012年发布go稳定版。Go语言在多核并发上拥有原生的设计优势&#xff0c;Go语言从底层原生支持…