2024.5组队学习——MetaGPT(0.8.1)智能体理论与实战(中):订阅智能体OSS实现

news2025/1/23 14:59:31

传送门:

  • 《2024.5组队学习——MetaGPT(0.8.1)智能体理论与实战(上):MetaGPT安装、单智能体开发》
  • 《2024.5组队学习——MetaGPT(0.8.1)智能体理论与实战(下):多智能体开发》

    文章目录

    • 订阅智能体OSS (Open Source Software)
      • 一、 SubscriptionRunner
        • 1.1 源码解析
        • 1.2 官方文档示例
        • 1.3 Trigger(异步生成器)
        • 1.4 设置aiohttp代理
      • 二、 OSSWatcher Role实现
        • 2.1 爬取代码
        • 2.2 GitHub Trending总结
      • 三、 OSSWatcher Role实现
      • 四、 Trigger实现
      • 五、 Callback设计
        • 5.1 Discord
        • 5.2 Wechat
      • 六、 运行示例
      • 七、 开发作业
  • 学习资料:项目地址——hugging-multi-agent、在线阅读、MetaGPT项目、MetaGPT中文文档
  • 优秀作业链接:《MetaGPT环境搭建和尝试》、Destory的《MetaGPT教程笔记》、乐正萌的《MetaGPT教程Task3 》、 GISer Liu的《基于MetaGPT构建单智能体》、《MetaGPT课程3作业》

订阅智能体OSS (Open Source Software)

  当我们持续关注某些感兴趣的信息时,Agent可以实时获取这些信息并进行处理,然后通过一些如邮件、微信、discord等通知渠道将处理后的信息发送给我们,我们将这类Agent称为订阅智能体。此时,Agent的Role可称为“资讯订阅员”的,其包含的Action则主要有两种:

  • 从外界信息源中搜集信息
  • 对搜集得到的信息进行总结

一、 SubscriptionRunner

1.1 源码解析

  我们还可以为开发一些额外功能,比如定时运行和发送通知。在MetaGPT中,metagpt.subscription模块提供了SubscriptionRunner类,它是一个简单的包装器,用于使用asyncio管理不同角色的订阅任务。其主要作用是定时触发运行一个Role,然后将运行结果通知给用户。

class SubscriptionRunner(BaseModel):
	model_config = ConfigDict(arbitrary_types_allowed=True)
	# tasks字典,用于存储每个角色对应的异步任务
	tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)

    async def subscribe(
        self,
        role: Role,
        trigger: AsyncGenerator[Message, None],
        callback: Callable[
            [
                Message,
            ],
            Awaitable[None],
        ],
    ):
        loop = asyncio.get_running_loop()

        async def _start_role():
            async for msg in trigger:
                resp = await role.run(msg)
                await callback(resp)

        self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")

  subscribe 方法用于订阅一个角色,传入角色、触发器(trigger)和回调函数。它会创建一个异步任务——从触发器获取消息,并将角色处理后的响应传递给回调函数。

async def run(self, raise_exception: bool = True):
    """Runs all subscribed tasks and handles their completion or exception.

    Args:
        raise_exception: _description_. Defaults to True.

    Raises:
        task.exception: _description_
    """
    while True:
        for role, task in self.tasks.items():
            if task.done():
                if task.exception():
                    if raise_exception:
                        raise task.exception()
                    logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
                else:
                    logger.warning(
                        f"Task {task.get_name()} has completed. "
                        "If this is unexpected behavior, please check the trigger function."
                    )
                self.tasks.pop(role)
                break
        else:
            await asyncio.sleep(1)
  1. 使用一个无限的 while True 循环来持续检查所有任务的状态。
  2. 对于 self.tasks 字典中的每个 (role, task) 项:
    • 如果该任务已完成 (task.done() 为真):
      • 检查是否有异常 (task.exception())
        • 如果有异常,并且 raise_exception 参数为真,则抛出该异常
        • 如果有异常,并且 raise_exception 参数为假,则使用 logger 记录错误日志
      • 如果没有异常,则使用 logger 记录警告日志,提示任务已完成(可能是不期望的行为)
    • self.tasks 字典中移除该任务
    • 使用 break 语句退出循环,等待下一次循环
  3. 如果在当前循环中没有任何任务完成,则使用 await asyncio.sleep(1) 等待1秒,继续下一次循环。

  总的来说,run函数的作用是持续监视所有已订阅的任务,一旦有任务完成(无论是正常完成还是异常),就进行相应的处理,包括抛出异常、记录日志或移除已完成的任务。如果所有任务都还在运行中,它会等待一段时间后继续检查。这确保了所有已订阅的任务都可以持续运行,直到它们完成为止。

1.2 官方文档示例

import asyncio
from metagpt.subscription import SubscriptionRunner
from metagpt.roles import Searcher
from metagpt.schema import Message

async def trigger():
    while True:
        yield Message(content="the latest news about OpenAI")
        await asyncio.sleep(3600 * 24)

async def callback(msg: Message):
    print(msg.content)

async def main():
    pb = SubscriptionRunner()
    await pb.subscribe(Searcher(), trigger(), callback)
    await pb.run()

await main()

  从例子可以知道订阅智能体的实现主要有3个要素,分别是Role、Trigger、Callback,即智能体本身、触发器、数据回调。其中,trigger 是一个无限循环的异步函数:

  • 在循环中,它首先 yield 一个 Message 对象,其 content 属性设置为 “the latest news about OpenAI”。
  • 使用 await asyncio.sleep(3600 * 24) 暂停一天执行
  • 循环以上过程

  所以,trigger 函数的作用是每隔24小时产生一个包含 “the latest news about OpenAI” 内容的 Message 对象。在 main 函数中,trigger() 被传递给 subscribe 方法,作为 Searcher 角色的触发器。每当 trigger 生成一个新的 Message,Searcher 角色就会处理该消息,并将响应传递给 callback 函数打印出来。

  要注意的是,我们虽然不对订阅智能体的Role做限制,但是不是所有的Role都适合用来做订阅智能体,比如MetaGPT软件公司中的几个角色,例如产品经理、架构师、工程师等,因为当给这些Role一个需求输入时,它们的产出往往是类似的,并没有定时执行然后发送给我们能的必要。
  从应用的角度出发,订阅智能体的输出应该具有实时性,相同的一个需求描述输入,输出的内容一般是会随着时间的变化而不同,例如新闻资讯、技术前沿进展、热门的开源项目等。

1.3 Trigger(异步生成器)

  Trigger是个异步生成器(Asynchronous Generators)。在Python中,生成器(Generators)是一种特殊的迭代器,区别于list、dict等类型,它是在需要时才生成数据,而不是一次性把所有数据加载到内存中。这种按需生成的方式可以提高内存使用效率,特别是在处理大量数据或无限数据流时非常有用。

def generate_data(n):
    """生成包含n个整数的生成器"""
    for i in range(n):
        yield i

def process_data(data):
    """对每个数据进行平方运算"""
    for item in data:
        print(item**2)

# 生成一百万个整数的生成器
data_generator = generate_data(1000000)

# 处理生成器中的数据
process_data(data_generator)

  在上面的代码中,generate_data函数是一个生成器,它每次被调用时只生成一个整数,而不是一次性将所有数据生成到内存中,这样就大大减少了内存使用量。

  传统生成器是在同步模式下使用yield关键字来产生值,当生成器函数遇到yield语句时,它会暂停执行并将值产出,下次再次调用next()方法时,生成器会从上次暂停的地方继续执行。

  而异步生成器则是在异步的上下文中工作的。它使用asyncawait关键字,可以在yield语句处暂停执行并等待一个潜在的耗时异步操作完成后再继续。这使得它特别适合于处理异步数据流,比如从网络套接字接收数据、从异步API获取数据等。

import asyncio

async def counter(start, stop):
    n = start
    while n < stop:
        yield n
        n += 1
        await asyncio.sleep(0.5)  # 模拟异步操作

async def main():
    async for i in counter(3, 8):
        print(i)

asyncio.run(main())

# 输出:
# 3
# (暂停0.5秒)
# 4 
# (暂停0.5秒)  
# 5
# ...

  在这个异步生成器中,每次产出一个数值后,它会通过await asyncio.sleep(0.5)模拟一个耗时0.5秒的异步操作,然后再继续执行。这样的异步方式可以避免阻塞主线程,提高程序的响应能力。

1.4 设置aiohttp代理

  aiohttp是一个用于异步 HTTP 客户端/服务器的非常流行的Python库,它基于 asyncio 库构建,可以让你方便地编写单线程并发代码。

  作为一个异步HTTP客户端,aiohttp 允许你发送对服务器的请求而不阻塞事件循环。这意味着你的脚本可以高效地发送多个HTTP请求,无需为每个请求启动新线程。这在需要发起大量HTTP请求时特别有用,比如爬虫、API客户端等。如果你需要编写高度可扩展、高并发的网络应用程序,使用 aiohttp 将是一个不错的选择。

aiohttp 的一些主要特性包括:

  1. Client/Server模式: 可以作为客户端或服务器使用
  2. Web框架集成: 可以与现有的web框架(如Django、Flask等)集成
  3. 中间件: 支持中间件功能,方便插入自定义行为
  4. WebSocket支持: 支持WebSocket协议
  5. Gunicorn工作: 可以与Gunicorn集成作为ASGI服务器运行
  6. URLs构建: 方便构造查询参数或URL编码

  教程涉中需要访问到一些国外的网站,可能会遇到网络问题,因为 aiohttp 默认不走系统代理,所以需要做下代理配置。MetaGPT中已经提供了GLOBAL_PROXY参数用来表示全局代理配置,教程中遇到使用aiohttp进行请求的地方,都会将代理设置为GLOBAL_PROXY的值,所以可以通过在config/key.yaml配置文件中,添加自己代理服务器的配置,以解决网络问题:

GLOBAL_PROXY: http://127.0.0.1:8118  # 改成自己的代理服务器地址

我是在AutoDL上跑的项目,代理设置可参考帖子:《AutoDL 使用代理加速》,讲解了如何在一台服务器命令行中启用 clash 代理。

mkdir mihomo
cd mihomo

# 下载lash 二进制文件并解压
# 最原始的 clash 项目已经删库,这个是目前找到的规模比较大的继任 fork ,二进制文件也更名为 mihomo 
wget https://github.com/MetaCubeX/mihomo/releases/download/v1.18.1/mihomo-linux-amd64-v1.18.1.gz
gzip -d  mihomo-linux-amd64-v1.18.1.gz

# 下载Country.mmdb文件
wget https://github.com/Dreamacro/maxmind-geoip/releases/download/20240512/Country.mmdb
vim config.yaml  # 配置config文件

# 授予执行权限
chmod +x ./mihomo-linux-amd64-v1.18.1

  config文件需要订阅clash服务商获取,比如v2net。
  打开Clash Verge v1.3.8,下载clash-verge_1.3.8_amd64.AppImage,然后运行sudo apt install fuse安装FUSE。之后运行chmod +x clash-verge_1.3.8_amd64.AppImage修改此文件权限,最后运行./clash-verge_1.3.8_amd64.AppImage直接启动clash-verge软件。然后参考Set Up V2NET on Linux,就可以配置正确的config.yaml(文章中是apt安装clash-verge_x.x.x_amd64.deb文件,但是我都失败了,.AppImage文件无需安装,可以直接启动。。

最后直接执行

./mihomo-linux-amd64-v1.18.1 -d .   
#这里的 -d 选项很重要,用于设置工作目录为当前所在目录,否则找不到 config.yaml

看到类似如下输出就成功了
在这里插入图片描述
保留这个终端,以使得 mihomo 能持续运行并且监听服务端口。然后新开其他终端,并在新开终端中配置环境变量:

export https_proxy=http://127.0.0.1:7890/
export http_proxy=http://127.0.0.1:7890/

到这一步就能顺利访问到目标网址。测试:

# 如果返回结果中包含HTTP状态码(如200 OK),则表示通过代理访问成功。
curl -I https://www.google.com

也可以使用aiohttp库检测代理:

import aiohttp
import asyncio

async def fetch(url):
    proxy = "http://127.0.0.1:7890"
    async with aiohttp.ClientSession() as session:
        async with session.get(url, proxy=proxy) as response:
            return await response.text()

url = "https://api.github.com"
result = await.fetch(url)  				# notebook中运行
# result = asyncio.run(fetch(url)) 		# .py文件运行
print(result)

在这里插入图片描述

二、 OSSWatcher Role实现

总结起来, OSSWatcher Role 需要以下两个主要 Action:

  • 爬取热门开源项目信息,这里以GitHub Trending为目标网站,可选筛选条件包括编程语言 (language)、时间范围 (daily/weekly/monthly)、所用语言 (spoken language)。
  • 使用LLM分析热门开源项目,分析角度包括
    1. 编程语言趋势:观察Trending列表中使用的编程语言,了解当前哪些编程语言在开发者社区中更受欢迎
    2. 项目类型和用途:分析Trending列表中的项目,看看它们是属于哪些类别,以及它们的具体用途是什么
    3. 社区活跃度:查看项目的星标数量、贡献者数量
    4. 新兴技术和工具:注意新项目和涌现的技术,以便了解当前的技术趋势

2.1 爬取代码

不熟悉爬虫的话,可以参考对话,用ChatGPT帮我们写爬取分析Github Trending。以下是最终的代码:

import aiohttp
import asyncio
from bs4 import BeautifulSoup

async def fetch_html(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def parse_github_trending(html):
    soup = BeautifulSoup(html, 'html.parser')

    repositories = []

    for article in soup.select('article.Box-row'):
        repo_info = {}
        
        repo_info['name'] = article.select_one('h2 a').text.strip()
        repo_info['url'] = article.select_one('h2 a')['href'].strip()

        # Description
        description_element = article.select_one('p')
        repo_info['description'] = description_element.text.strip() if description_element else None

        # Language
        language_element = article.select_one('span[itemprop="programmingLanguage"]')
        repo_info['language'] = language_element.text.strip() if language_element else None

        # Stars and Forks
        stars_element = article.select('a.Link--muted')[0]
        forks_element = article.select('a.Link--muted')[1]
        repo_info['stars'] = stars_element.text.strip()
        repo_info['forks'] = forks_element.text.strip()

        # Today's Stars
        today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
        repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else None

        repositories.append(repo_info)

    return repositories

async def main():
    url = 'https://github.com/trending'
    html = await fetch_html(url)
    repositories = await parse_github_trending(html)

    for repo in repositories:
        print(f"Name: {repo['name']}")
        print(f"URL: https://github.com{repo['url']}")
        print(f"Description: {repo['description']}")
        print(f"Language: {repo['language']}")
        print(f"Stars: {repo['stars']}")
        print(f"Forks: {repo['forks']}")
        print(f"Today's Stars: {repo['today_stars']}")
        print()

稍微修改一下,把它作为一个Action类。

  1. 导入
from metagpt.actions.action import Action
from metagpt.config2 import Config
  1. 设计 action的异步run方法

    1. 创建 aiohttp.ClientSession 对象: 使用 async with 语句创建 aiohttp.ClientSession 对象,用于发送 HTTP 请求。
    2. 发送 GET 请求: 使用 client.get() 方法发送 GET 请求,并指定要访问的 URL。
    3. 设置代理: 通过 proxy=Config.default().proxy 参数设置全局代理服务器。
    4. 检查响应状态: 使用 response.raise_for_status() 方法检查响应状态,如果状态码不为 200,则会抛出异常。
    5. 读取响应内容: 使用 await response.text() 方法读取响应内容并将其存储在 html 变量中。
    6. 返回 HTML 内容: 将获取到的 HTML 内容返回给调用者。
async def run(self, url: str = "https://github.com/trending"):
    async with aiohttp.ClientSession() as client:
        async with client.get(url, proxy=CONFIG.global_proxy) as response:
            response.raise_for_status()
            html = await response.text()
  1. 将先前的parse_github_trending(html)方法搬运到action中用repositories做返回值

完整代码如下:

import aiohttp
from bs4 import BeautifulSoup
from metagpt.actions.action import Action
from metagpt.config2 import Config

class CrawlOSSTrending(Action):

    async def run(self, url: str = "https://github.com/trending"):
        async with aiohttp.ClientSession() as client:
            async with client.get(url, proxy=Config.default().proxy) as response:
                response.raise_for_status()
                html = await response.text()
 
        soup = BeautifulSoup(html, 'html.parser')
    
        repositories = []
    
        for article in soup.select('article.Box-row'):
            repo_info = {}
            
            repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")
            repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()
    
            # Description
            description_element = article.select_one('p')
            repo_info['description'] = description_element.text.strip() if description_element else None
    
            # Language
            language_element = article.select_one('span[itemprop="programmingLanguage"]')
            repo_info['language'] = language_element.text.strip() if language_element else None
    
            # Stars and Forks
            stars_element = article.select('a.Link--muted')[0]
            forks_element = article.select('a.Link--muted')[1]
            repo_info['stars'] = stars_element.text.strip()
            repo_info['forks'] = forks_element.text.strip()
    
            # Today's Stars
            today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
            repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else None
    
            repositories.append(repo_info)
    
        return repositories

2.2 GitHub Trending总结

  总结Action的实现比较简单,主要是写提示词。在提示词中我们可以要求LLM从几个角度进行分析,并按照一定的格式进行输出,例如

  1. 今天榜单的整体趋势,例如哪几个编程语言比较热门、最热门的项目是哪些、主要集中在哪些领域
  2. 榜单的仓库分类
  3. 推荐进一步关注哪些仓库,推荐原因是什么
from typing import Any
from metagpt.actions.action import Action

TRENDING_ANALYSIS_PROMPT = """# Requirements
You are a GitHub Trending Analyst, aiming to provide users with insightful and personalized recommendations based on the latest
GitHub Trends. Based on the context, fill in the following missing information, generate engaging and informative titles, 
ensuring users discover repositories aligned with their interests.

# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example


# [Title]

## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.

## The Trends Categories
1. Generative AI
    - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
    - [Project2](https://github/xx/project2): ...
...

## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...


---
# Github Trending
{trending}
"""

class AnalysisOSSTrending(Action):

    async def run(
        self,
        trending: Any
    ):
        return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))

三、 OSSWatcher Role实现

  以上Action都实现了,把它们都写到metagpt/actions/oss_trending.py文件中,然后新建文件metagpt/roles/oss_watcher.py,就可以写Role的代码了:

from metagpt.actions.oss_trending import CrawlOSSTrending, AnalysisOSSTrending
from metagpt.roles import Role

class OssWatcher(Role):
    def __init__(
        self,
        name="Codey",
        profile="OssWatcher",
        goal="Generate an insightful GitHub Trending analysis report.",
        constraints="Only analyze based on the provided GitHub Trending data.",
    ):
        super().__init__(name, profile, goal, constraints)
        self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
        self._set_react_mode(react_mode="by_order")

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self._rc.todo}")
        # By choosing the Action by order under the hood
        # todo will be first SimpleWriteCode() then SimpleRunCode()
        todo = self.rc.todo

        msg = self.get_memories(k=1)[0] # find the most k recent messages
        result = await todo.run(msg.content)

        msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
        self._rc.memory.add(msg)
        return msg

四、 Trigger实现

  最简单的触发方式即定时触发,Github Trending 大约是在10:00 AM UTC更新,代价可以选取一个比较适合自己的推送时间即可,比如每天早上9点。

import asyncio
import time

from datetime import datetime, timedelta
from metagpt.schema import Message
from pydantic import BaseModel, Field


class OssInfo(BaseModel):
    url: str
    timestamp: float = Field(default_factory=time.time)


async def oss_trigger(hour: int, minute: int, second: int = 0, url: str = "https://github.com/trending"):
    while True:
        now = datetime.now()
        next_time = datetime(now.year, now.month, now.day, hour, minute, second)
        if next_time < now:
            next_time = next_time + timedelta(1)
        wait = next_time - now
        print(wait.total_seconds())
        await asyncio.sleep(wait.total_seconds())
        yield Message(url, OssInfo(url=url))

  yield 语句被用于异步函数oss_trigger中,用于生成消息。每当调用这个异步函数时,它会在指定的时间间隔内生成一个消息,并在下一次调用时继续执行。此处我们预定义了OssInfo的结构,加入了时间戳的信息,并将其实例作为trigger生成的Message的instruct_content属性,作用是在早期的版本中,角色在接收Message会有一个去重操作,如果我们每次生成的Message只有url信息,那么第2次运行时,角色将不会接收新的Message,但是加入时间戳后,trigger生成的每个Message就不再相等,角色也会接收对应的Message。

  上述的简单例子,可以实现简单的按天定时触发的能力,不过如果需要更精细的控制,这个函数还需要继续优化。但我们可以借助一些第三方包实现这个功能,使用crontab实现定时触发是非常常见的一个做法,而且python也有一个异步的cron工具,即aiocron。使用aiocron我们可以直接使用cron的语法制定定时任务。上面我们使用了函数的方式来实现了定时Trigger异步生成器,接下来我们结合aiocron使用类的方式,来实现定时Trigger:

import time
from aiocron import crontab
from typing import Optional
from pytz import BaseTzInfo
from pydantic import BaseModel, Field
from metagpt.schema import Message

class GithubTrendingCronTrigger:
    def __init__(
        self,
        spec: str,
        tz: Optional[BaseTzInfo] = None,
        url: str = "https://github.com/trending",
    ) -> None:
        self.crontab = crontab(spec, tz=tz)
        self.url = url

    def __aiter__(self):
        return self

    async def __anext__(self):
        await self.crontab.next()
        return Message(content=self.url)

  基于aiocron我们可以少写很多代码,功能也更加强大,可以用cron语法非常灵活地配置定时规则。如果您想指定UTC 时间 10:00 AM 触发:

# 创建 GithubTrendingCronTrigger 实例,指定每天 UTC 时间 10:00 AM 触发
cron_trigger = GithubTrendingCronTrigger("0 10 * * *")

如果您想指定北京时间上午8:00来触发这个任务,您需要做两件事:

  1. 设置正确的 cron 表达式。
  2. 确保时区设置正确。
    北京时间是东八区(UTC+8),所以您应该在 tz 参数中设置相应的时区。而 cron 表达式遵循特定的格式,通常是:分钟、小时、日、月、星期几。

  对于每天上午8:00,cron 表达式应该是 “0 8 * * *”,这表示每天的第8小时的第0分钟触发。因此,您的 GithubTrendingCronTrigger 类的初始化代码应该类似于以下形式:

from pytz import timezone
beijing_tz = timezone('Asia/Shanghai')  获取北京时间的时区
cron_trigger = GithubTrendingCronTrigger("0 8 * * *", tz=beijing_tz)
  • 思考1:如果需要榜单更新再推送,可以如何实现?
  • 思考2:Crontab的定时方式可能不是很方便进行调试,有什么方便调试的方法吗?

五、 Callback设计

  Callback就是定义了如何处理智能体生成的信息,它本身没有过多难点,但是如果想将信息发送到我们日常使用的一些应用,可能会有一些成本。下面提供将智能体产生的数据发送到discord/微信的示例供大家参考。

5.1 Discord

  Discord是一款免费的通讯软件,让你可以与你的好友,社群以及开发者们进行语音,视频及文字聊天。目前,MetaGPT的海外社区就是在Discord上维护的,在国内,MetaGPT也有庞大的微信社区,所以本文档选取目前MetaGPT比较活跃的两个社区工具作为示例。

账号注册:在discord的开发者面板添加BOT,并将BOT添加到某个服务器中,详见参考教程。使用discord发送消息的示例如下:

import asyncio
import discord

async def send_discord_msg(channel_id: int, msg: str, token: str):
    intents = discord.Intents.default()
    intents.message_content = True
    client = discord.Client(intents=intents)
    async with client:
        await client.login(token)
        channel = await client.fetch_channel(channel_id)
        await channel.send(msg)

  discord单条消息有大小限制,过长的内容会导致发送不成功,我们可以按章节分多条msg发送,最终实现的discord_callback函数如下:

import asyncio
import discord

from metagpt.config import CONFIG

async def discord_callback(msg: Message):
    intents = discord.Intents.default()
    intents.message_content = True
    client = discord.Client(intents=intents, proxy=CONFIG.global_proxy)
    token = os.environ["DISCORD_TOKEN"]
    channel_id = int(os.environ["DISCORD_CHANNEL_ID"])
    async with client:
        await client.login(token)
        channel = await client.fetch_channel(channel_id)
        lines = []
        for i in msg.content.splitlines():
            if i.startswith(("# ", "## ", "### ")):
                if lines:
                    await channel.send("\n".join(lines))
                    lines = []
            lines.append(i)

        if lines:
            await channel.send("\n".join(lines))

其中,DISCORD_TOKEN参考官方文档discord readthedocs,"Creating a Bot Account"章节的第7步:
在这里插入图片描述
DISCORD_CHANNEL_ID即希望Bot发送消息的频道:
在这里插入图片描述

5.2 Wechat

wxpusher开发文档

  由于我们的内容是markdown格式的,直接发送微信消息阅读体验较差,所以我们需要寻找合适的微信消息发送方式。

  公众号可以发送富本文消息,比较符合我们的场景,但是为了个推送的功能,开发个公众号的成本也是比较大。目前已有许多的第三方公众号提供了消息推送的功能,例如server酱、wxpusher、Pushplus等,我们可以选择其中之一,例如wxpusher,它的代码是开源的,还有详细的文开发文档。

wxpusher的python客户端是同步的,根据API文档,可以快速简单地实现一个异步的客户端:

import os
from typing import Optional
import aiohttp


class WxPusherClient:
    def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
        self.base_url = base_url
        self.token = token or os.environ["WXPUSHER_TOKEN"]

    async def send_message(
        self,
        content,
        summary: Optional[str] = None,
        content_type: int = 1,
        topic_ids: Optional[list[int]] = None,
        uids: Optional[list[int]] = None,
        verify: bool = False,
        url: Optional[str] = None,
    ):
        payload = {
            "appToken": self.token,
            "content": content,
            "summary": summary,
            "contentType": content_type,
            "topicIds": topic_ids or [],
            "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
            "verifyPay": verify,
            "url": url,
        }
        url = f"{self.base_url}/api/send/message"
        return await self._request("POST", url, json=payload)

    async def _request(self, method, url, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, **kwargs) as response:
                response.raise_for_status()
                return await response.json()

然后实现callback:

async def wxpusher_callback(msg: Message):
    client = WxPusherClient()
    await client.send_message(msg.content, content_type=3)

  WXPUSHER_TOKEN获取见官方文档:
在这里插入图片描述

  WXPUSHER_UIDS可以从应用管理页的”用户管理->用户列表“获取用户的UID,如果要发送给多个用户,可以用逗号将不同用户UID隔开:

在这里插入图片描述

六、 运行示例

我们将上述代码都写在一个main.py文件里,完整代码文件见main.py或main.ipynb。

import asyncio
import os
from typing import Any, AsyncGenerator, Awaitable, Callable, Dict, Optional

import aiohttp
import discord
from aiocron import crontab
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from pytz import BaseTzInfo

from metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message

# fix SubscriptionRunner not fully defined
from metagpt.environment import Environment as _  # noqa: F401


# 订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考
class SubscriptionRunner(BaseModel):
    """A simple wrapper to manage subscription tasks for different roles using asyncio.
    Example:
        >>> import asyncio
        >>> from metagpt.subscription import SubscriptionRunner
        >>> from metagpt.roles import Searcher
        >>> from metagpt.schema import Message
        >>> async def trigger():
        ...     while True:
        ...         yield Message("the latest news about OpenAI")
        ...         await asyncio.sleep(3600 * 24)
        >>> async def callback(msg: Message):
        ...     print(msg.content)
        >>> async def main():
        ...     pb = SubscriptionRunner()
        ...     await pb.subscribe(Searcher(), trigger(), callback)
        ...     await pb.run()
        >>> asyncio.run(main())
    """

    tasks: Dict[Role, asyncio.Task] = Field(default_factory=dict)

    class Config:
        arbitrary_types_allowed = True

    async def subscribe(
        self,
        role: Role,
        trigger: AsyncGenerator[Message, None],
        callback: Callable[
            [
                Message,
            ],
            Awaitable[None],
        ],
    ):
        """Subscribes a role to a trigger and sets up a callback to be called with the role's response.
        Args:
            role: The role to subscribe.
            trigger: An asynchronous generator that yields Messages to be processed by the role.
            callback: An asynchronous function to be called with the response from the role.
        """
        loop = asyncio.get_running_loop()

        async def _start_role():
            async for msg in trigger:
                resp = await role.run(msg)
                await callback(resp)

        self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")

    async def unsubscribe(self, role: Role):
        """Unsubscribes a role from its trigger and cancels the associated task.
        Args:
            role: The role to unsubscribe.
        """
        task = self.tasks.pop(role)
        task.cancel()

    async def run(self, raise_exception: bool = True):
        """Runs all subscribed tasks and handles their completion or exception.
        Args:
            raise_exception: _description_. Defaults to True.
        Raises:
            task.exception: _description_
        """
        while True:
            for role, task in self.tasks.items():
                if task.done():
                    if task.exception():
                        if raise_exception:
                            raise task.exception()
                        logger.opt(exception=task.exception()).error(
                            f"Task {task.get_name()} run error"
                        )
                    else:
                        logger.warning(
                            f"Task {task.get_name()} has completed. "
                            "If this is unexpected behavior, please check the trigger function."
                        )
                    self.tasks.pop(role)
                    break
            else:
                await asyncio.sleep(1)


# Actions 的实现
TRENDING_ANALYSIS_PROMPT = """# Requirements
You are a GitHub Trending Analyst, aiming to provide users with insightful and personalized recommendations based on the latest
GitHub Trends. Based on the context, fill in the following missing information, generate engaging and informative titles, 
ensuring users discover repositories aligned with their interests.

# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example

``
# [Title]

## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.

## The Trends Categories
1. Generative AI
    - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
    - [Project2](https://github/xx/project2): ...
...

## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...
``

---
# Github Trending
{trending}
"""


class CrawlOSSTrending(Action):
    async def run(self, url: str = "https://github.com/trending"):
        async with aiohttp.ClientSession() as client:
            async with client.get(url, proxy=CONFIG.global_proxy) as response:
                response.raise_for_status()
                html = await response.text()

        soup = BeautifulSoup(html, "html.parser")

        repositories = []

        for article in soup.select("article.Box-row"):
            repo_info = {}

            repo_info["name"] = (
                article.select_one("h2 a")
                .text.strip()
                .replace("\n", "")
                .replace(" ", "")
            )
            repo_info["url"] = (
                "https://github.com" + article.select_one("h2 a")["href"].strip()
            )

            # Description
            description_element = article.select_one("p")
            repo_info["description"] = (
                description_element.text.strip() if description_element else None
            )

            # Language
            language_element = article.select_one(
                'span[itemprop="programmingLanguage"]'
            )
            repo_info["language"] = (
                language_element.text.strip() if language_element else None
            )

            # Stars and Forks
            stars_element = article.select("a.Link--muted")[0]
            forks_element = article.select("a.Link--muted")[1]
            repo_info["stars"] = stars_element.text.strip()
            repo_info["forks"] = forks_element.text.strip()

            # Today's Stars
            today_stars_element = article.select_one(
                "span.d-inline-block.float-sm-right"
            )
            repo_info["today_stars"] = (
                today_stars_element.text.strip() if today_stars_element else None
            )

            repositories.append(repo_info)

        return repositories


class AnalysisOSSTrending(Action):
    async def run(self, trending: Any):
        return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))


# Role实现
class OssWatcher(Role):
    def __init__(
        self,
        name="Codey",
        profile="OssWatcher",
        goal="Generate an insightful GitHub Trending analysis report.",
        constraints="Only analyze based on the provided GitHub Trending data.",
    ):
        super().__init__(name=name, profile=profile, goal=goal, constraints=constraints)
        self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
        self._set_react_mode(react_mode="by_order")

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self.rc.todo}")
        # By choosing the Action by order under the hood
        # todo will be first SimpleWriteCode() then SimpleRunCode()
        todo = self.rc.todo

        msg = self.get_memories(k=1)[0]  # find the most k recent messages
        result = await todo.run(msg.content)

        msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
        self.rc.memory.add(msg)
        return msg


# Trigger
class GithubTrendingCronTrigger:
    def __init__(
        self,
        spec: str,
        tz: Optional[BaseTzInfo] = None,
        url: str = "https://github.com/trending",
    ) -> None:
        self.crontab = crontab(spec, tz=tz)
        self.url = url

    def __aiter__(self):
        return self

    async def __anext__(self):
        await self.crontab.next()
        return Message(content=self.url)


# callback
async def discord_callback(msg: Message):
    intents = discord.Intents.default()
    intents.message_content = True
    client = discord.Client(intents=intents, proxy=CONFIG.global_proxy)
    token = os.environ["DISCORD_TOKEN"]
    channel_id = int(os.environ["DISCORD_CHANNEL_ID"])
    async with client:
        await client.login(token)
        channel = await client.fetch_channel(channel_id)
        lines = []
        for i in msg.content.splitlines():
            if i.startswith(("# ", "## ", "### ")):
                if lines:
                    await channel.send("\n".join(lines))
                    lines = []
            lines.append(i)

        if lines:
            await channel.send("\n".join(lines))


class WxPusherClient:
    def __init__(
        self,
        token: Optional[str] = None,
        base_url: str = "http://wxpusher.zjiecode.com",
    ):
        self.base_url = base_url
        self.token = token or os.environ["WXPUSHER_TOKEN"]

    async def send_message(
        self,
        content,
        summary: Optional[str] = None,
        content_type: int = 1,
        topic_ids: Optional[list[int]] = None,
        uids: Optional[list[int]] = None,
        verify: bool = False,
        url: Optional[str] = None,
    ):
        payload = {
            "appToken": self.token,
            "content": content,
            "summary": summary,
            "contentType": content_type,
            "topicIds": topic_ids or [],
            "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
            "verifyPay": verify,
            "url": url,
        }
        url = f"{self.base_url}/api/send/message"
        return await self._request("POST", url, json=payload)

    async def _request(self, method, url, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, **kwargs) as response:
                response.raise_for_status()
                return await response.json()


async def wxpusher_callback(msg: Message):
    client = WxPusherClient()
    await client.send_message(msg.content, content_type=3)


# 运行入口,
async def main(spec: str = "0 9 * * *", discord: bool = True, wxpusher: bool = True):
    callbacks = []
    if discord:
        callbacks.append(discord_callback)

    if wxpusher:
        callbacks.append(wxpusher_callback)

    if not callbacks:

        async def _print(msg: Message):
            print(msg.content)

        callbacks.append(_print)

    async def callback(msg):
        await asyncio.gather(*(call(msg) for call in callbacks))

    runner = SubscriptionRunner()
    await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)
    await runner.run()


if __name__ == "__main__":
    import fire

    fire.Fire(main)

七、 开发作业

尝试着完成一个能订阅自己感兴趣的资讯的Agent:

  • 根据前面你所学习的爬虫基本知识(如果你对写爬虫代码感到不熟练,使用GPT帮助你),为你的Agent自定义两个获取资讯的Action类
    • Action 1:独立实现对Github Trending页面的爬取,并获取每一个项目的名称、URL链接、描述
    • Action 2:独立完成对Huggingface Papers页面的爬取,先获取到每一篇Paper的链接(提示:标题元素中的href标签),并通过链接访问标题的描述页面(例如:https://huggingface.co/papers/2312.03818),在页面中获取一篇Paper的 标题、摘要
  • 重写有关方法,使你的Agent能自动生成总结内容的目录,然后根据二级标题进行分块,每块内容做出对应的总结,形成一篇资讯文档;
  • 自定义Agent的SubscriptionRunner类,独立实现Trigger、Callback的功能,让你的Agent定时为通知渠道发送以上总结的资讯文档(尝试实现邮箱发送的功能,这是加分项)

思考作业:- 目前为止我们设计的所有思考模式都可以总结为是链式的思考(chain of thought),能否利用 MetaGPT 框架实现树结构的思考(tree of thought)图结构的思考(graph of thought)?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1704313.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2024年电工杯高校数学建模竞赛(A题) 建模解析| 园区微电网风光储协调优化配置

问题重述及方法概述 问题1&#xff1a;各园区独立运营储能配置方案及其经济性分析 经济性分析采用成本-效益分析方法&#xff0c;计算购电量、弃风弃光电量、总供电成本和单位电量平均供电成本等指标。 问题2&#xff1a;联合园区储能配置方案及其经济性分析 经济性分析采用成…

《Ai企业知识库》-rasa X安装使用

背景&#xff1a; Rasa X 是一个为构建、改进和管理对话式AI助手而设计的工具。它是Rasa开源机器学习框架的一个扩展&#xff0c;旨在实现“对话驱动开发”&#xff08;Conversation-Driven Development&#xff09;。Rasa X 主要特点包括&#xff1a; 交互式学习&#xff1a;…

百度智能小程序源码系统 关键词推广优化排名高 带完整的安装代码包以及搭建教程

系统概述 百度智能小程序源码系统是一套完整的解决方案&#xff0c;为用户提供了创建、发布和管理智能小程序的平台。它基于百度平台的先进技术&#xff0c;确保小程序在运行和展示方面具有出色的表现。 代码示例 系统特色功能 1.关键词推广优化&#xff1a;系统内置了强大的…

Day 40 Web容器-Tomcat

Tomcat 一&#xff1a;Tomcat简介 1.简介 ​ Tomcat是Apache软件基金会&#xff08;Apache Software Foundation&#xff09;的Jakarta 项目中的一个核心项目 ​ Tomcat服务器是一个免费的开放源代码的Web应用服务器&#xff0c;属于轻量级应用服务器 ​ Tomcat是WEB容器/WE…

JVM(内存区域划分、类加载机制、垃圾回收机制)

目录 一. 内存区域划分 1.本地方法栈(Native Method Stacks) 2.虚拟机栈(JVM Stacks) 3.程序计数器(Program Counter Register) 4.堆(Heap) 5.元数据区(Metaspace) 二.类加载机制 1.加载 2.验证 3.准备 4.解析 5.初始化 "双亲委派模型" 三. GC 垃圾回收…

[自动驾驶技术]-7 Tesla自动驾驶方案之算法(AI Day 2022)

特斯拉在2022年AI Day上更新了感知规控算法模型&#xff0c;核心引入了Occupancy技术。下图是特斯拉活动日展示的主题内容&#xff0c;本文主要解读Planning和Neural Network部分。 1 规划决策 Interaction search-交互搜索 特斯拉在自动驾驶规划中使用了一种高度复杂和优化的…

SpringCloud整合Seata1.5.2

Windows下部署Seata1.5.2可参照博文&#xff1a;Windows下部署Seata1.5.2&#xff0c;解决Seata无法启动问题-CSDN博客 1. 引入依赖 <!-- 分布式事务 --> <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-st…

echarts配置记录,一些已经废弃的写法

1、normal&#xff0c;4.0以后无需将样式写在normal中了 改前&#xff1a; 改后&#xff1a; DEPRECATED: normal hierarchy in labelLine has been removed since 4.0. All style properties are configured in labelLine directly now. 2、axisLabel中的文字样式无需使用te…

近五年营收和净利润大幅“败北”,尚品宅配今年押注扩张加盟

​ 《港湾商业观察》廖紫雯 两个月前经历过高管公开信的尚品宅配&#xff08;300616.SZ&#xff09;&#xff0c;无论是2023年年报&#xff0c;还是今年一季报&#xff0c;虽然公司净利润表现尚佳&#xff0c;但收入端的持续承压仍然备受关注。 今年一季报&#xff0c;尚品宅…

Mac免费软件推荐

1. iTerm2 - 功能强大的终端 iTerm2 是一个功能强大且灵活的终端仿真器&#xff08;可替代系统默认终端&#xff09;&#xff0c;适合需要在 macOS 上进行大量终端操作的用户。其丰富的功能和高可定制性使得 iTerm2 成为许多开发者和系统管理员的首选工具。无论是处理多个会话…

RabbitMQ 之 死信队列

目录 ​编辑一、死信的概念 二、死信的来源 三、死信实战 1、代码架构图 2、消息 TTL 过期 &#xff08;1&#xff09;消费者 &#xff08;2&#xff09;生产者 &#xff08;3&#xff09;结果展示​编辑 3、队列达到最大长度 &#xff08;1&#xff09;消费者 &…

百度发布代码辅助工具,超强

不会用AI的程序员&#xff0c;会跟不会用智能手机的人一样 百度这个代码助手助手感觉还是不错的 https://comate.baidu.com/?inviteCodeijmce7dj 目前看下来这个代码助手是比较强的&#xff0c;比阿里的那个灵码好用&#xff0c;他可以引用到当前的文件&#xff0c;并且能分…

Spring Cache基本使用

Spring 从 3.1 版本开始定义缓存抽象来统一不同的缓存技术&#xff1b;在应用层面与后端存储之间&#xff0c;提供了一层抽象&#xff0c;这层抽象目的在于封装各种可插拔的后端存储( ehcache, redis, guava)&#xff0c;最小化因为缓存给现有业务代码带来的侵入。 一、Spring…

DRKCT复现

Osint 羡慕群友每一天 MISC 签到 扫码关注公众号&#xff0c;回复一下行 &#xff08;眼神要好&#xff0c; 我做题时没看见有个二维码&#xff09; 神秘的文字 把代码js运行一下 (用js的原因是前面给的动物代表的字符类似jsfuck代码) &#x13142;![]; &#x13080;!…

Daisy Chain

菊花链是双向和半双工的&#xff0c;因此在 COMH 和 COML 接口上有一个发送器 (TX) 和一个接收器 (RX)。TX 和 RX 功能由硬件根据器件的基底/堆栈检测自动控制。当接收到 WAKE ping/音调时&#xff0c;通信方向由 CONTROL1[DIR_SEL] 和 COMM_CTRL[TOP_STACK] 配置进行设置。 对…

如何处理网安发出的网络安全监督检查限期整改通知

近期&#xff0c;很多客户都收到了网安发出的限期整改通知。大家都比较关心的问题是&#xff0c;如何应对处理这些限期整改通知。后续是否有其他的影响&#xff0c;需要如何做进一步的优化整改和调整。今天就这些问题给大家做一些分享。 一. 为什么会有网安的网络安全检查 主…

系统管理、磁盘分区

系统管理 业务层面&#xff1a;为了满足一定的需求所做的特定操作。 硬盘是什么&#xff0c;硬盘的作用&#xff1a; **硬盘&#xff1a;**计算机的存储设备&#xff0c;机械硬盘是由一个或者多个磁性的盘组成&#xff0c;可以在盘片上进行数据的读写。 连接方式&#xff1a…

谈谈BlueStore的BitmapAllocator

背景 BlueStore在ceph里面承担了数据在底层磁盘上的读写任务&#xff0c;那它的功能里自然就有一块是管理磁盘空间使用的。说白了&#xff0c;就是记录&管理磁盘里哪些空间已经使用了&#xff0c;哪些空间还没有被使用。 目前官方的ceph里使用BitmapAllocator来管理磁盘空…

冯喜运:5.27黄金短线看震荡,今日黄金原油走势分析

【黄金消息面分析】&#xff1a;黄金作为传统的避险资产&#xff0c;在经济不确定性中扮演着至关重要的角色。近期&#xff0c;国际黄金价格经历了显著的波动。从5月9日的低点2325.19美元/盎司反弹至2340美元/盎司以上&#xff0c;尽管金价曾一度触及2449.89美元/盎司的历史高点…

C++ 数据结构算法 学习笔记(33) -查找算法及企业级应用

C 数据结构算法 学习笔记(33) -查找算法及企业级应用 数组和索引 日常生活中&#xff0c;我们经常会在电话号码簿中查阅“某人”的电话号码&#xff0c;按姓查询或者按字母排 序查询&#xff1b;在字典中查阅“某个词”的读音和含义等等。在这里&#xff0c;“电话号码簿”和…