[AI Agent学习] MetaGPT源码浅析

news2025/1/5 23:13:32

前言

工作上,需要使用AI Agent,所以需要深入学习一下AI Agent,光阅读各种文章,总觉无法深入细节,所以开看各类AI Agent相关的开源项目,此为第一篇,学习一下MetaGPT的源码。

基本目标

MetaGPT是一个多智能体框架,他抽象了一个软件公司中的主要角色,用不同的AI Agent去扮演,这些AI Agent包括产品经理、软件架构师、项目经理、工程师,这些AI Agent会按照开发团队设计好的SOP去交互并最终产出一个项目。

老习惯:不为读而读,为解决某些问题或理清某些概念而读,那么面对MetaGPT,我有以下目标:

  • MetaGPT是怎么抽象出的软件公司开发流程的?SOP具体在代码上是怎么实现的?

  • MetaGPT中不同AI Agent是怎么交互的?

  • 从效果上看,MetaGPT输出的内容是比较格式化的,要做到这样的效果,prompt是怎么写的?

  • MetaGPT是怎么抽象具体的职业的?比如产品经理是怎么抽象的。

本文主要来从源码中,找到上面问题的答案。

入口

虽然README.md中说,很多同学运行时会有点问题,但我感觉,运行起来还是很轻松的,相比于早期的sd-webui,那是友好太多了,所以这里不多提,直接按README中的内容run一下就好了。

直接看到入口方法

# startup.py 

async def startup(
    idea: str,
    investment: float = 3.0,
    n_round: int = 5,
    code_review: bool = False,
    run_tests: bool = False,
    implement: bool = True
):
    # 1.开公司
    company = SoftwareCompany()
    # 2.雇员工
    company.hire([
        ProductManager(),  # 产品经理
        Architect(), # 架构师
        ProjectManager(), # 项目经理
    ])

    if implement or code_review:
        # 3.雇开发
        company.hire([Engineer(n_borg=5, use_code_review=code_review)])

    if run_tests:
        
        company.hire([QaEngineer()])
 # 4.设置金额(这次运行最多能消耗多少美金的GPT4)
    company.invest(investment)
    # 5.老板的需求
    company.start_project(idea)
    # 6.跑几轮
    await company.run(n_round=n_round)

从startup看,整个流程很清晰,可读性很高!

先看看SoftwareCompany类,部分代码如下:

# metagpt/software_company.py

class SoftwareCompany(BaseModel):
 # 环境
    environment: Environment = Field(default_factory=Environment)
    investment: float = Field(default=10.0)
    idea: str = Field(default="")

    async def run(self, n_round=3):
        """Run company until target round or no money"""
        while n_round > 0:
            # self._save()
            n_round -= 1
            logger.debug(f"{n_round=}")
            self._check_balance()
            await self.environment.run()
        return self.environment.history

通过SoftwareCompany类抽象一个软件公司,阅读代码后,你会发现SoftwareCompany类中的environment对象很重要,公司里的不同AI Agent都会与environment对象交互,这个抽象也很巧妙,就是在公司里,同事间的交流确实在公司这个“环境”里传播。

然后就是run方法,看到startup方法的最后,就是调用SoftwareCompany类的run方法,该方法会检测一下余额以及运行的轮数,如果余额不够或轮数超了,就会停止运行。

因为MetaGPT底层是使用openai api的,每次请求都需要花钱,你设置一个预算,默认是3美元,跑满3美元,就算任务没有完成,也会强行结束。

SoftwareCompany类的run方法会调用environment对象的run方法,代码如下:

# metagpt/environment.py/Environment

 async def run(self, k=1):
        """
        Process all Role runs at once
        """

        for _ in range(k):
            futures = []
            for role in self.roles.values():
                future = role.run()
                futures.append(future)
            # 当调用 asyncio.gather(*futures) 时,它会同时运行传递进来的协程,这些协程会在后台并发执行,而不会阻塞主线程。
            await asyncio.gather(*futures)

从上面代码可知,这才是真正的入口,循环所有的roles,然后调用每个role的run方法,整个过程通过python协程异步并发的运行,从而提高程序的运行效率。

这里的role就是不同身份的员工,其本质就是使用不同Prompt的请求openai的代码逻辑。

我们需要阅读一下role相关的代码,来尝试理解顺序性的问题。

role

在startup方法中,我们首先雇佣了3个不同的role:

company.hire([
        ProductManager(),  # 产品经理
        Architect(), # 架构师
        ProjectManager(),
    ])

雇佣的顺序是有讲究的。一开始雇佣的是ProductManager实例,也就是,这里已经执行了实例ProductManager类的实例化代码了:

# metagpt/roles/product_manager.py

class ProductManager(Role):

    
    def __init__(self, 
                 name: str = "Alice", 
                 profile: str = "Product Manager", 
                 goal: str = "Efficiently create a successful product",
                 constraints: str = "") -> None:
   
        super().__init__(name, profile, goal, constraints)
        # 1.写产品需求文档(Product Requirement Document)
        self._init_actions([WritePRD])
        # 2.观察老板的需求
        self._watch([BossRequirement])

上面代码中,_init_actions与_watch都是Role类中的方法,也就是当前产品经理这个role自己的方法。它初始化了自己的动作:写需求文档,以及定了自己要观察的东西:老板的需求。

BossRequirement怎么来的?在startup方法中,我们调用了SoftwareCompany的start_project方法,该方法会就我们的需要以BOSS的身份发送到SoftwareCompany的environment中,代码如下:

# metagpt/software_company.py/SoftwareCompany

def start_project(self, idea):
        """Start a project from publishing boss requirement."""
        self.idea = idea
        # Role初始观察到的message
        self.environment.publish_message(Message(role="BOSS", content=idea, cause_by=BossRequirement))

这个过程类似于,公司里,老板走到产研区域,大喊一声,我要做个xxxx,然后大家就知道了,metagpt就是抽象了这个过程,其他的role可以通过SoftwareCompany的environment的读到老板和其他role的信息。

当environment对象的run方法被运行时,role的run方法就会被执行,

async def run(self, message=None):
        if message:
            # 1.如果有人给你发消息,则将消息存入role的短期记忆中
            if isinstance(message, str):
                message = Message(message)
            if isinstance(message, Message):
                self.recv(message)
            if isinstance(message, list):
                self.recv(Message("\n".join(message)))
        # 2.观察环境中的信息,看看有没有需要处理的
        elif not await self._observe():
            # If there is no new information, suspend and wait
            logger.debug(f"{self._setting}: no news. waiting.")
            return
     # 3.如果有需要处理的,则通过 _react 处理,并获得处理结果
        rsp = await self._react()
        # 4.将处理的结果发到 environment 中
        self._publish_message(rsp)
        return rsp

run方法的逻辑也很清晰,先观察environment中的信息,如果有需要处理的,则_react进行处理并将结果发送回environment中。

那具体怎么观察呢?看到_observe方法的代码:

# metagpt/roles/role.py/Role
 
 async def _observe(self) -> int:
        if not self._rc.env:
            return 0
        # 从env的短期记忆中,获取信息
        env_msgs = self._rc.env.memory.get()
        # 从env中观察要观察的对象,获得对应的message
        observed = self._rc.env.memory.get_by_actions(self._rc.watch)
        # 记下来(role自己的memory)
        self._rc.news = self._rc.memory.remember(observed)  # remember recent exact or similar memories

        for i in env_msgs:
            # 将环境中的信息记到role memory中
            self.recv(i)

        news_text = [f"{i.role}: {i.content[:20]}..." for i in self._rc.news]
        if news_text:
            logger.debug(f'{self._setting} observed: {news_text}')
        return len(self._rc.news)

    def recv(self, message: Message) -> None:
        if message in self._rc.memory.get():
            return
        # 记一下信息
        self._rc.memory.add(message)

上面代码中,self._rc.env 对象就是当前role所在的环境,这里用了常见的双向关联技巧。

当其他role通过publish_message方法向environment发消息时,消息其实存在environment的memory中,在_observe方法中,首先environment的memory中读取消息,然后在通过get_by_actions方法去获得需要某个action产生的message,以ProductManager为里,ProductManager通过_watch方法将BossRequirement设置为需要观察的对象。

当role调用get_by_actions时,会去找BossRequirement对应的message(其实就是需要message,然后交给GPT4去处理),get_by_actions代码如下:

# metagpt/memory.py/Memory

 def get_by_action(self, action: Type[Action]) -> list[Message]:
        """Return all messages triggered by a specified Action"""
        return self.index[action]

    def get_by_actions(self, actions: Iterable[Type[Action]]) -> list[Message]:
        """Return all messages triggered by specified Actions"""
        rsp = []
        for action in actions:
            if action not in self.index:
                continue
            rsp += self.index[action]
        return rsp

    def add(self, message: Message):
        """Add a new message to storage, while updating the index"""
        if message in self.storage:
            return
        self.storage.append(message)
        if message.cause_by:
            self.index[message.cause_by].append(message)

因为get_by_actions其实就是从self.index中找到action这个key对应的value,所以self.index是什么就很关键,而这就需要看到add方法,以BossRequirement为例,就是将key设置为BossRequirement,然后存message。

简单而言,get_by_actions方法会获取由role的_watch方法指定的要观察的对象其对应的message,作为self._rc.news,然后role将这些news存到memory中。

如果role通过_observe方法观察到了news,那么就需要执行_react方法:

# metagpt/roles/role.py

 async def _react(self) -> Message:
        """Think first, then act"""
        await self._think()
        logger.debug(f"{self._setting}: {self._rc.state=}, will do {self._rc.todo}")
        return await self._act()

先看到_think方法:

# metagpt/roles/role.py/Role

 async def _think(self) -> None:
        # 1.如果只有一个动作,那么就直接让_act执行这一个动作则可
        if len(self._actions) == 1:
            self._set_state(0)
            return
        prompt = self._get_prefix()
        # 2.整合role的memory和state到prompt中,让GPT4处理
        prompt += STATE_TEMPLATE.format(history=self._rc.history, states="\n".join(self._states),
                                        n_states=len(self._states) - 1)
        next_state = await self._llm.aask(prompt)
        logger.debug(f"{prompt=}")
        if not next_state.isdigit() or int(next_state) not in range(len(self._states)):
            logger.warning(f'Invalid answer of state, {next_state=}')
            next_state = "0"
        self._set_state(int(next_state))

_think方法的作用是设置state,然后_act才会根据设置好的state决定要执行的action,如果当前role只有一个state,那么state直接设置成0则可,如果有多个action,那么就交由GPT4去判断:将role的memory中的信息全部取出作为history,然后将role的所有state也整合起来,一起放到prompt中,交由GPT4去选择当前history下要做什么action是最好的,让其返回state。

然后看_act方法:

async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self._rc.todo}")
        # 让role执行相应的action
        response = await self._rc.todo.run(self._rc.important_memory)
        if isinstance(response, ActionOutput):
            msg = Message(content=response.content, instruct_content=response.instruct_content,
                        role=self.profile, cause_by=type(self._rc.todo))
        else:
            msg = Message(content=response, role=self.profile, cause_by=type(self._rc.todo))
        # 记下当前action返回的msg
        self._rc.memory.add(msg)
        # logger.debug(f"{response}")

        return msg

_act方法的逻辑就是执行self._rc.todo对应的action,这个action是_think方法通过_set_state方法设置的,比如产品经理只有一个action:WritePRD,那么就会执行WritePRD的run方法,需要注意的是,为了让不同的role可以交互,_act还会产生message,每个message都会设置cause_by参数,以表明当前的message是由哪个动作产生的,其他role就可以通过_watch方法看到需要的message了。

随后,我们看到WritePRD的细节,代码如下:

#  metagpt/actions/write_prd.py
class WritePRD(Action):
    def __init__(self, name="", context=None, llm=None):
        super().__init__(name, context, llm)

    async def run(self, requirements, *args, **kwargs) -> ActionOutput:
        sas = SearchAndSummarize()
        rsp = ""
        info = f"### Search Results\n{sas.result}\n\n### Search Summary\n{rsp}"
        if sas.result:
            logger.info(sas.result)
            logger.info(rsp)

        prompt = PROMPT_TEMPLATE.format(requirements=requirements, search_information=info,
                                        format_example=FORMAT_EXAMPLE)
        logger.debug(prompt)
        prd = await self._aask_v1(prompt, "prd", OUTPUT_MAPPING)
        return prd

WritePRD类的本质也是一个prompt,它会接受requirements参数,从metagpt整个流程看,requirements参数就是老板的需求message,也就是产品经理要基于老板需求message写出PRD(产品需求文档),获得结果后,自己再记到role到memory中。

role间的交互

一开始时,用户以BOSS role输入一个需求message,这个message被丢到environment上,代码如下,主要publish_message方法的cause_by使用了BossRequirement,为了方便解释,我将需求定为:开发一个Crypto的量化交易系统。

# metagpt/software_company.py/SoftwareCompany

 def start_project(self, idea):
        self.idea = idea
        # Role初始观察到的message
        self.environment.publish_message(Message(role="BOSS", content=idea, cause_by=BossRequirement))

ProductManager通过_watch方法关注BossRequirement,从而在运行ProductManager时,_observe方法会收到【开发一个Crypto的量化交易系统】的message作为self._rc.news,随后调用_think和_act方法,因为只有WritePRD这一个action,所以就调用WritePRD的run方法了,并将role memory中记忆的【开发一个Crypto的量化交易系统】的message作为requirements,传入了prompt中。

从startup方法相关代码可知,ProductManager后是Architect(架构师),Architect通过_watch方法关注WritePRD。

在ProductManager的_act中,将Message添加到environment中,cause_by为WritePRD,此时就可以被Architect通过_watch方法关注,并通过_observe方法获得WritePRD产出的结果,存入Architect的记忆中,然后作为WriteDesign这个action的输入,WriteDesign会将WritePRD的内容放到它的prompt中。

至此,role的交互流程就比较清晰了。

回答问题

Metagpt还有很多代码细节,但读到这,已经可以回答开头的问题了:

1.MetaGPT是怎么抽象出的软件公司开发流程的?SOP具体在代码上是怎么实现的?SOP就是不同role之间交互的顺序,比如产品经理需要BOSS的message作为他prompt的约束,而产品经理的产出是架构师的输入,不同角色间的输入输出,就是SOP。

2.MetaGPT中不同AI Agent是怎么交互的?role通过_watch确定要从哪个role的哪个action中获得这个action的输出,具体获取的过程在_observer方法中,获得其他role的message后,如果当前的role有多个action可执行,则通过_think去选一个action(使用env的memory,即当前环境中发生的内容),再通过_act去具体的执行action,执行action时,会从role的memory中获取需要的message。

3.从效果上看,MetaGPT输出的内容是比较格式化的,要做到这样的效果,prompt是怎么写的?MetaGPT的prompt的设计形式值得学习,它主要使用MarkDown格式来设计prompt,多数prompt中都会有context、example,从而让GPT4更好的发挥zero-shot能力,想知道具体的,建议直接拉代码下来看。

4.MetaGPT是怎么抽象具体的职业的?从职业这个角度讲,主要通过action和承接的message来抽象,比如产品经理,就抽象成,接收老板需求,产出产品需求文档,将需求文档给到架构师的对象,然后每个role生成的结果都会放到env中,其他人也可以看到(很多角色只有一个action,就不会用到env中的message)。

结尾

我自己跑了几遍MetaGPT,还是有明显的局限性的。如果我按example的形式,让他写python小游戏,过程是丝滑的,但我换成让MetaGPT帮我设计一个今日头条的广告推荐系统,他就给了我下面这样的东西,很明显,不太可用。

322697900ee16441f4327cea4f2926c0.png

此外,在阅读源码的过程,也发现MetaGPT团队自己提出的一些问题,比如大段的代码受限GPT4的tokens限制还做不到,切开生成,可能又需要提供额外的code作为context,效果也没那么理想。

当然MetaGPT的team我还是很respect的,代码设计清晰,然后让MetaGPT可以实现自举的这个目标也很酷。

以上。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1040710.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

手机充电器成精会怎样?sanag塞那K30,自带插头和数据线的充电宝

充电宝是大家几乎每天都要用的装备,除了手机之外,现在耳机、手表也经常需要充电,不过相比于电量超大的充电宝,作为日常通勤或者周末郊游的装备,大家应该更倾向于轻巧且支持快充的充电宝,电量方面&#xff0…

leetcode334. 递增的三元子序列(java)

递增的三元子序列 题目描述贪心 题目描述 难度 - 中等 leetcode334. 递增的三元子序列 给你一个整数数组 nums &#xff0c;判断这个数组中是否存在长度为 3 的递增子序列。 如果存在这样的三元组下标 (i, j, k) 且满足 i < j < k &#xff0c;使得 nums[i] < nums[j]…

JOSEF约瑟 抗干扰中间继电器UEG/F-4H UEG/F-8H/110VDC 启动功率大于5W

UEG/F系列 抗干扰中间继电器 系列型号&#xff1a; UEG/F-1H1D/110VDC抗干扰中间继电器&#xff1b;UEG/F-1H1D/220VDC抗干扰中间继电器&#xff1b;UEG/F-1H1D/24VDC抗干扰中间继电器&#xff1b; UEG/F-1H1D/48VDC抗干扰中间继电器&#xff1b;UEG/F-1H1D/125VDC抗干扰中间…

“全景江西·南昌专场”数字技术应用场景发布会 | 万广明市长莅临拓世集团展位,一览AIGC科技魅力

随着数字技术的迅猛发展&#xff0c;传统产业正在发生深刻的变革&#xff0c;新兴产业蓬勃兴起。但要想实现数字经济超常规发展&#xff0c;就要在数字产业化上培育新优势&#xff0c;大力实施数字经济核心产业提速行动&#xff0c;加快推进“一核三基地”建设。在这个数字经济…

下载项目路径下的文件

文件下载&#xff08;比如模板下载&#xff09;&#xff0c;方法之一是先在服务器上创建一个路径&#xff0c;再通过代码里面写死或配置去读取这个路径的下的这个文件进行下载。 这个方法的缺点就是需要提前创建好这个目录&#xff0c;并给文件路径给与读写权限&#xff0c;上线…

助力养殖行业数字化转型,基于深度学习模型开发构建猪脸识别系统

养殖行业的数字化进程在国内还是比较缓慢的&#xff0c;一些大厂在前面的一些探索时期做过一些相关的工作&#xff0c;但是受限于各种因素并没有能够广泛地铺展开来&#xff0c;数字化不应该被理解为非常高大上的遥不可及的东西&#xff0c;数字化也不应该成为中低产的一道鸿沟…

AnsibleFATE部署过程

前言 基本上按照官方文档就行了&#xff0c;先做before deploy&#xff0c;再做three side guide.md。 以下是可能出现的问题 这个AnsibleUndefinedVariable: ‘ansible_ssh_host‘ is undefined.是肯定会遇到的&#xff0c;参考我这篇 安全性限制 ansible提示 warning&…

java单例的几种实现方式

单例模式 1.饿汉式&#xff08;线程安全&#xff09;2.懒汉式&#xff08;线程不安全&#xff09;3.懒汉式(线程安全)4. 双重校验5. 静态内部类6. 反射对于单例的破坏7. 序列化对于单例的破坏8.枚举(推荐方式) 1.饿汉式&#xff08;线程安全&#xff09; 在类加载期间初始化静…

写SAE评测,获 Airpods 2大奖【集结令】!

Serverless 应用引擎 SAE 开启测评有奖&#xff01;名额有限&#xff0c;先到先得&#x1f3c6;&#xff01; Serverless应用引擎SAE是一款极简易用、自适应弹性的容器化应用平台。现面向所有用户发出诚挚邀请&#xff0c;参与一分钟部署在线游戏&#xff0c;写下宝贵评测反馈。…

【操作系统笔记七】进程和线程

进程的组成 进程要读取 ELF 文件&#xff0c;那么&#xff1a; ① 要知道文件系统的信息&#xff0c;fs_struct② 要知道打开的文件的信息&#xff0c;files_struct 一个进程除了需要读取 ELF 文件外&#xff0c;还可以读取其他的文件中的数据。 进程中肯定有一个 mm_struct…

华为云云耀云服务器L实例评测|如何保障华为云云耀云服务器L实例的安全和性能

引言 云耀云服务器L实例是华为云提供的高性能计算实例&#xff0c;为用户提供稳定可靠的云计算环境。为了保障实例的安全和性能&#xff0c;用户可以通过设置防火墙和安全组策略来限制网络访问和防止恶意攻击。华为云提供了灵活的管理工具&#xff0c;用户可以通过控制台、API…

【AD】【规则设置】【pcb】默认规则设置

默认规则设置 PCB画板规则的设置1. 间距规则2. 线宽规则3. 过孔规则设置方法盖油的效果&#xff08;左侧&#xff09;过孔的外径盖油 - 8mil 【负片层】过孔的外径盖油 - 8mil 【正片层&#xff08;信号走线层&#xff09;】 【tip】焊盘形状的选取 4.铺铜高级设置&#xff0c;…

快速排序代码及时间空间复杂度

快速排序&#xff08;Quick Sort&#xff09;是一种高效的排序算法&#xff0c;它的平均时间复杂度为 O(n log n)&#xff0c;是许多排序算法中性能最好的之一。下面是快速排序的代码示例和时间空间复杂度分析&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#x…

计算机网络相关知识点(二)

TCP如何保证传输过程的可靠性&#xff1f; 校验和&#xff1a;发送方在发送数据之前计算校验和&#xff0c;接收方收到数据之后同样需要计算&#xff0c;如果不一致&#xff0c;那么代表传输有问题。 确认应答序&#xff0c;序列号&#xff1a;TCP进行传输时数据都进行了编号…

安全测试 —— Jmeter 登录接口密码 - rsa加密

1、出于安全考虑&#xff0c;有的网站在登陆时为了防止用户在登录时账户密码泄漏&#xff0c;会使用各种加密&#xff0c;给登录的账户密码加密。 比如&#xff1a;明文保存&#xff0c;对称加密算法&#xff0c;MD5、SHA1等单向HASH算法&#xff0c;RSA算法&#xff0c;加密F…

高校为何购买数据库的重要性

随着信息时代的到来&#xff0c;数据库已经成为人们获取信息的重要来源之一。高校作为学术研究的重要机构&#xff0c;对于数据库的需求也越来越大。但是&#xff0c;为什么高校要购买数据库呢&#xff1f;本文将从以下几个方面阐述高校购买数据库的重要性。 一、数据的重要性 …

【Linux】线程同步与互斥

文章目录 &#x1f4d6; 前言1. 线程互斥1.1 临界资源&#xff1a;1.2 互斥性与原子性&#xff1a;1.2 - 1 概念回顾 1.3 线程安全&#xff1a;1.3 - 1 可重入与不可重入 1.4 线程加锁与解锁&#xff1a;1.4 - 1 竞争锁1.4 - 2 锁的原子性 1.5 加锁的原子性如何实现&#xff1a…

丢失d3dcompiler 47.dll的修复方案,哪个更值得推荐

d3dcompiler 47.dll 是 DirectX 中的一部分&#xff0c;它负责实现硬件加速的图形渲染。当我们运行一些需要 DirectX 支持的游戏或程序时&#xff0c;系统会调用 d3dcompiler 47.dll 文件。如果该文件丢失或损坏&#xff0c;我们将无法正常运行这些游戏或程序&#xff0c;从而给…

外汇天眼:交易的本质就是要解决这两个问题!

方向 在交易中&#xff0c;方向的判断至关重要。尽管判断对错在很大程度上是一个概率游戏&#xff0c;但我们可以细分方法来更好地解决这个问题。解决方向的方法可以总结为三大类&#xff1a; 1.通过容错和次数来解决方向 纯粹的逆市加码被认为是低级的做法&#xff0c;因为…

详细解析下gRPC examples-RBAC authenication-权限组管理-基于自定义Token

详细解析下gRPC examples-RBAC authenication-权限组管理-基于自定义Token 什么是RABC认证&#xff1f; RBAC (Role-Based Access Control) 授权策略是一种用于控制系统或应用程序中用户或实体对资源的访问权限的方法。在 RBAC 中&#xff0c;访问控制是基于角色的&#xff0…