文章目录
- 1. 写在前面
- 2. 下载源码构建
- 3. 通过release部署
- 4. 运行本地服务
- 5. 通过API获取数据
- 6. 完整代码实现
1. 写在前面
前段时间写了一个QQ群消息监测的自动化机器人,需求则是加入到某些特定的群组,对群内对话消息进行实时监测与分析。当然,我们做的只是威胁情报相关的一些信息收集,比如说发布与转载一些盗版、仿冒类的内容与应用
我忽然觉得它或许能够用到更有趣的一些需求上去:
1、你的(对象)在某些QQ群内每天都聊了些什么内容?
2、对某些成员的发言特别在意怕错过消息实时监测获取
3、每天哪些人比较活跃,发布带颜色的东西!天天水群摸鱼?
带着上面的需求,来搭建QQ机器人。我们依然需要站在巨人的肩膀上去开展后续的工作,基于开源项目go-qchttp
go-cqhttp是一个QQ机器人框架,它与QQ之间的交互基于 CQHTTP协议(CoolQ HTTP API 插件协议)
2. 下载源码构建
下载源码并解压运行:
git clone https://github.com/Mrs4s/go-cqhttp.git
cd go-cqhttp
go build -ldflags "-s -w -extldflags '-static'"
3. 通过release部署
这里我推荐大家从release选择符合版本的客户端下载到本地再去运行:releases
下载完成后,需要对device.json跟config.yml文件进行配置修改
device.json文件需要注意一个地方,目前只有手表协议可以使用 ,所以如下所示protocol配置2即可:
config.yml文件是基本配置文件,用于设置机器人的基本属性和行为,配置如下所示:
# go-cqhttp 默认配置文件
account: # 账号相关
uin: 1586501625 # QQ账号
password: '' # 密码为空时使用扫码登录
encrypt: false # 是否开启密码加密
status: 0 # 在线状态 请参考 https://docs.go-cqhttp.org/guide/config.html#在线状态
relogin: # 重连设置
delay: 3 # 首次重连延迟, 单位秒
interval: 3 # 重连间隔
max-times: 0 # 最大重连次数, 0为无限制
# 是否使用服务器下发的新地址进行重连
# 注意, 此设置可能导致在海外服务器上连接情况更差
use-sso-address: true
# 是否允许发送临时会话消息
allow-temp-session: false
heartbeat:
# 心跳频率, 单位秒
# -1 为关闭心跳
interval: 5
message:
# 上报数据类型
# 可选: string,array
post-format: string
# 是否忽略无效的CQ码, 如果为假将原样发送
ignore-invalid-cqcode: false
# 是否强制分片发送消息
# 分片发送将会带来更快的速度
# 但是兼容性会有些问题
force-fragment: false
# 是否将url分片发送
fix-url: false
# 下载图片等请求网络代理
proxy-rewrite: ''
# 是否上报自身消息
report-self-message: false
# 移除服务端的Reply附带的At
remove-reply-at: false
# 为Reply附加更多信息
extra-reply-data: false
# 跳过 Mime 扫描, 忽略错误数据
skip-mime-scan: false
# 是否自动转换 WebP 图片
convert-webp-image: false
output:
# 日志等级 trace,debug,info,warn,error
log-level: warn
# 日志时效 单位天. 超过这个时间之前的日志将会被自动删除. 设置为 0 表示永久保留.
log-aging: 15
# 是否在每次启动时强制创建全新的文件储存日志. 为 false 的情况下将会在上次启动时创建的日志文件续写
log-force-new: true
# 是否启用日志颜色
log-colorful: true
# 是否启用 DEBUG
debug: false # 开启调试模式
# 默认中间件锚点
default-middlewares: &default
# 访问密钥, 强烈推荐在公网的服务器设置
access-token: ''
# 事件过滤器文件目录
filter: ''
# API限速设置
# 该设置为全局生效
# 原 cqhttp 虽然启用了 rate_limit 后缀, 但是基本没插件适配
# 目前该限速设置为令牌桶算法, 请参考:
# https://baike.baidu.com/item/%E4%BB%A4%E7%89%8C%E6%A1%B6%E7%AE%97%E6%B3%95/6597000?fr=aladdin
rate-limit:
enabled: false # 是否启用限速
frequency: 1 # 令牌回复频率, 单位秒
bucket: 1 # 令牌桶大小
database: # 数据库相关设置
leveldb:
# 是否启用内置leveldb数据库
# 启用将会增加10-20MB的内存占用和一定的磁盘空间
# 关闭将无法使用 撤回 回复 get_msg 等上下文相关功能
enable: true
sqlite3:
# 是否启用内置sqlite3数据库
# 启用将会增加一定的内存占用和一定的磁盘空间
# 关闭将无法使用 撤回 回复 get_msg 等上下文相关功能
enable: false
cachettl: 3600000000000 # 1h
# 连接服务列表
servers:
# 添加方式,同一连接方式可添加多个,具体配置说明请查看文档
#- http: # http 通信
#- ws: # 正向 Websocket
#- ws-reverse: # 反向 Websocket
#- pprof: #性能分析服务器
- http: # HTTP 通信设置
address: 0.0.0.0:5700 # HTTP监听地址
version: 11 # OneBot协议版本, 支持 11/12
timeout: 5 # 反向 HTTP 超时时间, 单位秒,<5 时将被忽略
long-polling: # 长轮询拓展
enabled: false # 是否开启
max-queue-size: 2000 # 消息队列大小,0 表示不限制队列大小,谨慎使用
middlewares:
<<: *default # 引用默认中间件
post: # 反向HTTP POST地址列表
#- url: '' # 地址
# secret: '' # 密钥
# max-retries: 3 # 最大重试,0 时禁用
# retries-interval: 1500 # 重试时间,单位毫秒,0 时立即
#- url: http://127.0.0.1:5701/ # 地址
# secret: '' # 密钥
# max-retries: 10 # 最大重试,0 时禁用
# retries-interval: 1000 # 重试时间,单位毫秒,0 时立即
以上配置文件一般无需改动,HTTP监控地址默认是5700
4. 运行本地服务
配置完成后,我们直接在命令行启动:
./go-cqhttp
这里我们不需要在config.yml文件内QQ配置账号及密码,因为账号密码登陆的方式目前也存在小部分的不稳定问题,这里建议直接使用扫码的方式登陆。服务启动后我们不用操作,5秒后会自动弹出二维码
拿出手机扫描二维码登录,然后在手机上点击确认登录即可,如下图所示:
登陆成功以后控制台会自动加载我们的好友、群等信息等
服务运行后,我们就可以根据go-cqhttp提供的API接口进行交互啦!CQHTTP协议支持HTTP、WebSocket,允许QQ客户端主动活跃事件和消息给go-cqhttp
CQHTTP 插件是 2017 年初出现的基于 CKYU 机器人平台的一款开源免费插件,它使用户能够通过 HTTP 或 WebSocket 对 CKYU 的事件进行上报以及接收请求来调用 CKYU 的 DLL 接口,从而可以使用其它语言(不方便编译到原生二进制的语言)编写 CKYU 插件
服务相关的日志信息也会在控制台自动打印出来,比如说一些消息的接受,群里的一些信息动态,如下图所示:
5. 通过API获取数据
接下来我们通过Postman工具请求API获取群信息数据,如下是手机QQ群内最新消息面板截图:
对比下面通过API获取到的群内最新消息截图(同步一致):
字段结构也是非常的丰富,基本上QQ内有的API接口都能够获取到
总的来说,go-cqhttp 扮演了一个中间层,通过 CQHTTP 协议与 QQ 客户端通信,同时为用户插件提供了丰富的功能和 API。这样,可以让用户编写自定义插件,实现各种的功能,从自动回复消息到管理QQ群等。这个框架的灵活和功能性使其在QQ机器人开发领域非常受欢迎
go-cqhttp 具有内置的 HTTP 服务器,用于接收来自 QQ 客户端和插件的 HTTP 请求。这些请求包括发送消息、处理事件、获取用户信息等
6. 完整代码实现
它的拓展API功能非常丰富,不仅支持对群消息对话信息的监控,还支持对个人好友用户的会话监控
同时也支持用户信息、群文件、群成员等等数据API
# 视频与图片链接提取
async def extract_regex_matches(self, messages):
match = re.search(r'url=(https?://\S+?term=(unknow|2))', messages)
if match:
url = match.group(1).replace('amp;', '')
return url
return messages
# 消息分类处理
async def filter_json_data(self, json_data: dict):
message = json_data['message']
if 'CQ:image' or 'CQ:video' in message:
matches = await self.extract_regex_matches(message)
json_data['message'] = matches
elif 'CQ:forward' in message:
message_id = re.search(r'id=([^,\]]+)', message).group(1)
forward_data = await self.get_forwarded_message(message_id)
forward_data = [{'content': await self.extract_regex_matches(msg.get('content', ''))} for msg in forward_data]
json_data['message'] = forward_data
return json_data
async def fetch_data(url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=GroupMonitorBot.headers) as response:
data = await response.json()
return data
async def convert_timestamp_to_datetime(timestamp: int):
return datetime.datetime.fromtimestamp(
timestamp).strftime('%Y-%m-%d %H:%M:%S')
# 获取群消息
async def get_group_messages(self, group_id: int):
cache_timestamp = await self.deduplicate(group_id, 'msg_time')
response = await GroupMonitorBot.fetch_data(
self.server.format(
'get_group_msg_history?group_id={}'.format(
group_id)
)
)
messages = response.get('data', {}).get('messages', [])
if messages:
for data in messages:
timestamp = data.get(
'time', 0)
if timestamp > int(cache_timestamp):
message_type, group_id = data.get(
'message_type', ''), data.get(
'group_id', '')
role = data.get(
'sender', {}).get(
'role', '')
user_id, message_id, message = data.get(
'user_id', ''), data.get(
'message_id', ''), data.get(
'message', '')
user_info = await self.get_member_info(user_id)
json_to_analyze = {
'message_type': message_type,
'time': await convert_timestamp_to_datetime(timestamp),
'group_id': group_id,
'role': role,
'message_id': message_id,
'message': message
}
if user_info:
json_to_analyze.update(user_info)
analyzed_data = await self.filter_json_data(json_to_analyze)
# 获取用户信息
async def get_member_info(self, user_id: int):
response = await GroupMonitorBot.fetch_data(
self.server.format(
'get_stranger_info?user_id={}'.format(
user_id)
)
)
data = response.get('data', {})
return data
# 获取转发群聊消息
async def get_forwarded_message(self, message_id: int):
response = await GroupMonitorBot.fetch_data(
self.server.format(
'get_forward_msg?message_id={}'.format(
message_id)
)
)
data = response.get('data', {}).get('messages', [])
data = [{'time': await GroupMonitorBot.convert_timestamp_to_datetime(item.get('time', ''))} for item in data]
return data
有了这个Bot机器人,后续在数据的基础上无论做统计、分析还是用户画像都是非常的简单了!
好了,到这里又到了跟大家说再见的时候了。创作不易,帮忙点个赞再走吧。你的支持是我创作的动力,希望能带给大家更多优质的文章