NTWORK 概述
基于 PC 企业微信的 api 接口,支持收发文本、群@、名片、图片、文件、视频、链接卡片等。
下载安装 ntwork
pip install ntwork
国内源安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple ntwork
企业微信版本下载
官方下载:https://dldir1.qq.com/wework/work_weixin/WeCom_4.0.8.6027.exe
简单基础案例
import sys
import ntwork
wework = ntwork.WeWork()
# 打开pc企业微信, smart: 是否管理已经登录的企业微信`在这里插入代码片`
wework.open(smart=True)
# 等待登录
wework.wait_login()
# 向文件助手发送一条消息
wework.send_text(conversation_id="FILEASSIST", content="hello, NtWork")
try:
while True:
pass
except KeyboardInterrupt:
ntwork.exit_()
sys.exit()
获取联系人列表、群列表
# -*- coding: utf-8 -*-
import sys
import ntwork
wework = ntwork.WeWork()
# 打开pc企业微信, smart: 是否管理已经登录的微信
wework.open(smart=True)
# 等待登录
wework.wait_login()
# 获取内部(同事)列表并输出
contacts = wework.get_inner_contacts()
print("内部(同事)联系人列表: ")
print(contacts)
# 获取外部(客户)列表并输出
contacts = wework.get_external_contacts()
print("外部(客户)联系人列表: ")
print(contacts)
# 获取群列表并输出
rooms = wework.get_rooms()
print("群列表: ")
print(rooms)
try:
while True:
pass
except KeyboardInterrupt:
ntwork.exit_()
sys.exit()
监听消息自动回复
import sys
import ntwork
wework = ntwork.WeWork()
# 打开pc企业微信, smart: 是否管理已经登录的微信
wework.open(smart=True)
# 注册消息回调
@wework.msg_register(ntwork.MT_RECV_TEXT_MSG)
def on_recv_text_msg(wework_instance: ntwork.WeWork, message):
data = message["data"]
sender_user_id = data["sender"]
self_user_id = wework_instance.get_login_info()["user_id"]
conversation_id: str = data["conversation_id"]
# 判断消息不是自己发的并且不是群消息时,回复对方
if sender_user_id != self_user_id and not conversation_id.startswith("R:"):
wework_instance.send_text(conversation_id=conversation_id,
content=f"你发送的消息是: {data['content']}")
try:
while True:
pass
except KeyboardInterrupt:
ntwork.exit_()
sys.exit()
使用 fastapi 框架
通过 fastapi 的 swagger 在线文档可以很方便的管理 ntwork 接口:案例
常见问题解决方案
版本不匹配异常
如果出现ntwork.exception.WeWorkVersionNotMatchError异常
异常, 请确认是否安装github上指定的企业微信版本,如果确认已经安装,还是报错,可以在代码中添加以下代码,跳过微信版本检测
import ntwork
ntwork.set_wework_exe_path(wework_version='4.0.8.6027')
账号克隆登录
新建一些 ntwork.wework 实例,然后调用 open 方法。
import ntwork
# 多开3个微信
for i in range(3):
wework = ntwork.WeWork()
wework.open(smart=False)
更完善的多实例管理查看 fastapi_example例子
如何监听输出所有的消息
# 注册监听所有消息回调
import ntwork
@wework.msg_register(ntwork.MT_ALL)
def on_recv_text_msg(wework_instance: ntwork.WeWork, message):
print("########################")
print(message)
完全例子查看examples/msg_register_all.py
如何关闭 ntwork 日志
os.environ['NTWORK_LOG'] = "ERROR"
要在import ntwork
前执行
# -*- coding: utf-8 -*-
import os
import sys
import time
os.environ['NTWORK_LOG'] = "ERROR"
import ntwork
如何关闭 cmd 窗口
先使用pip install pywin32
安装pywin32模块, 然后在代码中添加以下代码, 完整例子查看examples/cmd_close_event.py
import sys
import ntwork
import win32api
def on_exit(sig, func=None):
ntwork.exit_()
sys.exit()
# 当关闭cmd窗口时
win32api.SetConsoleCtrlHandler(on_exit, True)
Pyinstaller 打包 exe
使用 pyinstaller 打包 ntwork 项目,需要添加 --collect-data=ntwork
选项
打包成单个 exe 程序
pyinstaller -F --collect-data=ntwork main.py
将所有的依赖文件打包到一个目录中
pyinstaller -y --collect-data=ntwork main.py
打包 fastapi_example 示例,需要添加 --paths=. --collect-data=ntchat
pyinstaller -F --paths=. --collect-data=ntchat main.py
登录日志预览
2024-04-12 00:45:21,949 - WeWorkManager - INFO - initialize wework, version: 4.0.8.6027
2024-04-12 00:45:21,949 - WeWorkManager - DEBUG - new wework instance
2024-04-12 00:45:21,982 - WeWorkInstance - INFO - open wework pid: 57024
2024-04-12 00:45:23,671 - WeWorkManager - DEBUG - accept client_id: 1
2024-04-12 00:45:26,225 - WeWorkInstance - DEBUG - on recv message: {'data': {'account': '', 'acctid': 'WangPaiGeGe', 'avatar': 'https://wework.qpic.cn/wwpic3az/457956_UJkq5TJkTfCnA5x_1712341231/0', 'corp_id': '1970325052533916', 'document_root': 'C:\\Users\\Administrator\\Documents\\WXWork\\1688856738534700', 'email': '', 'job_name': '', 'mobile': '19561983930', 'nickname': '', 'pid': 57024, 'position': '', 'sex': 1, 'user_id': '1688856738534700', 'username': '王牌哥哥'}, 'type': 11026}
2024-04-12 00:45:26,241 - WeWorkInstance - INFO - login success, name: 王牌哥哥
私聊自动回复案例
import sys
import ntwork
wework = ntwork.WeWork()
# 打开 pc 企业微信, smart: 是否管理已经登录的微信
wework.open(smart=True)
# 注册消息回调
@wework.msg_register(ntwork.MT_RECV_TEXT_MSG)
def on_recv_text_msg(wework_instance: ntwork.WeWork, message):
data = message["data"]
sender_user_id = data["sender"]
self_user_id = wework_instance.get_login_info()["user_id"]
conversation_id: str = data["conversation_id"]
# 判断消息不是自己发的并且不是群消息时,回复对方
if sender_user_id != self_user_id and not conversation_id.startswith("R:"):
wework_instance.send_text(conversation_id=conversation_id, content=f"你发送的消息是: {data['content']}")
try:
while True:
pass
except KeyboardInterrupt:
ntwork.exit_()
sys.exit()
消息属性
私聊消息属性说明
{
'appinfo': 'CAQQiZrgsAYYpMv0mZmAgAMgyIKKUw==',
'at_list': [],
'content': '我喜欢你',
'content_type': 0,
'conversation_id': 'S:1688856625489316_1688856738534700',
'is_pc': 0,
'local_id': '34',
'receiver': '1688856738534700',
'send_time': '1712852233',
'sender': '1688856625489316',
'sender_name': '唤醒手腕',
'server_id': '1000235'
}
群聊消息属性说明
{
'appinfo': 'CAQQiKTgsAYYpMv0mZmAgAMg99ak/Ak=',
'at_list': [],
'content': '我喜欢你',
'content_type': 0,
'conversation_id': 'R:1970325052533916',
'is_pc': 0,
'local_id': '41',
'receiver': '1688856738534700',
'send_time': '1712853512',
'sender': '1688856625489316',
'sender_name': '唤醒手腕',
'server_id': '1000258'
}
案例:故障离线表格统计
读取故障数据源码
import re
# 假设这是你的消息体字符串
test_message = """
【故障位置】化州西收费站97车道
【故障情况】不能缴费
【填报人】唤醒手腕
"""
def analysis_repair_data(message):
# 定义正则表达式模式,用于匹配每个字段
pattern = r"【故障位置】(.*)\n【故障情况】(.*)\n【填报人】(.*)"
# 使用re.search来查找匹配项
match = re.search(pattern, message, re.DOTALL)
if match:
fault_location = match.group(1).strip() # 提取故障位置
fault_situation = match.group(2).strip() # 提取故障情况
reporter = match.group(3).strip() # 提取填报人
return {"fault_location": fault_location,
"fault_situation": fault_situation,
"reporter": reporter
}
else:
return False
读取维修数据源码
import re
# 假设这是你的消息体字符串
test_message = """
【故障编号】
【维修情况】已修复
【维修人】xxx
"""
def analysis_solve_data(message):
# 定义正则表达式模式,用于匹配每个字段
pattern = r"""
【故障编号】(.*)
【维修情况】(.*)
【维修人】(.*)
"""
pattern = re.compile(pattern, re.DOTALL | re.VERBOSE)
# 使用re.search来查找匹配项
match = pattern.search(message)
if match:
serial = match.group(1).strip() # 提取故障编号
repair_situation = match.group(2).strip() # 提取维修情况
repair_person = match.group(3).strip() # 提取维修人
return {"serial": serial,
"repair_situation": repair_situation,
"repair_person": repair_person
}
else:
return False
更新离线 Excel 表格源码
from datetime import datetime
from openpyxl.reader.excel import load_workbook
# 加载工作簿
wb = load_workbook(filename='故障维修登记表.xlsx')
# 选择活动工作表或者通过名字选择工作表
ws = wb.active # 使用活动工作表
def insert_new_note(repair_data):
# 获取行数
row_count = ws.max_row
now = datetime.now()
# 使用f-string格式化时间
formatted_time = f"{now:%Y%m%d-%H%M%S}"
data = {
"index": row_count,
"serial": formatted_time,
"fault_location": repair_data['fault_location'],
"fault_situation": repair_data['fault_situation'],
"reporter": repair_data['reporter'],
"repair_situation": "",
"repair_person": ""
}
row_num = row_count + 1
ws.cell(row=row_num, column=1).value = data['index']
ws.cell(row=row_num, column=2).value = data['serial']
ws.cell(row=row_num, column=3).value = data['fault_location']
ws.cell(row=row_num, column=4).value = data['fault_situation']
ws.cell(row=row_num, column=5).value = data['reporter']
ws.cell(row=row_num, column=6).value = data['repair_situation']
ws.cell(row=row_num, column=7).value = data['repair_person']
# 保存工作簿到文件
wb.save("故障维修登记表.xlsx")
return data['serial']
def finish_old_note(solve_data):
target_name = solve_data['serial']
# 遍历所有行,查找目标值
state = False
for index, row in enumerate(ws.iter_rows(values_only=True)):
if row[1] == "编号":
continue
if row[1] == target_name: # 假设"Name"在第一列
# 找到目标行后,修改该行的数据
# 这里假设我们要修改的是第二列的数据,将其改为"New Value"
ws.cell(row=(index + 1), column=6).value = solve_data['repair_situation']
ws.cell(row=(index + 1), column=7).value = solve_data['repair_person']
state = True
wb.save("故障维修登记表.xlsx")
break # 找到后就退出循环
if state:
return solve_data['serial']
else:
return False
主程序 app.py 源码
# -*- coding: utf-8 -*-
import sys
from datetime import datetime
import ntwork
from read_repair_data import analysis_repair_data
from read_solve_data import analysis_solve_data
from update_excel import insert_new_note, finish_old_note
wework = ntwork.WeWork()
# 打开 pc 企业微信, smart: 是否管理已经登录的微信
wework.open(smart=True)
# 注册消息回调
@wework.msg_register(ntwork.MT_RECV_TEXT_MSG)
def on_recv_text_msg(wework_instance: ntwork.WeWork, message):
data = message["data"]
if data['content'][0:1] != "【":
return
sender_user_id = data["sender"]
self_user_id = wework_instance.get_login_info()["user_id"]
conversation_id: str = data["conversation_id"]
# 判断消息不是自己发的并且不是群消息时,回复对方
if sender_user_id != self_user_id and conversation_id.startswith("R:"):
insert_data = analysis_repair_data(data['content'])
if insert_data:
serial = insert_new_note(insert_data)
now = datetime.now()
# 使用f-string格式化时间
formatted_time = f"{now:%Y-%m-%d %H:%M:%S}"
print(f"【反馈结果】【{formatted_time}】请注意存在新的反馈:{serial}")
wework_instance.send_text(conversation_id=conversation_id, content=f"【反馈结果】反馈成功\n【故障编号】{serial}")
insert_data = analysis_solve_data(data['content'])
if insert_data:
serial = finish_old_note(insert_data)
if serial:
# 使用f-string格式化时间
now = datetime.now()
formatted_time = f"{now:%Y-%m-%d %H:%M:%S}"
print(f"【维修结果】【{formatted_time}】请注意存在新的维修:{serial}")
wework_instance.send_text(conversation_id=conversation_id, content=f"【维修结果】维修成功\n【故障编号】{str(serial)}")
else:
wework_instance.send_text(conversation_id=conversation_id, content=f"【操作警告】故障编号不存在!")
try:
while True:
pass
except KeyboardInterrupt:
ntwork.exit_()
sys.exit()