脚本相关技术
wxauto
Windows版本微信客户端(非网页版)自动化,可实现简单的发送、接收微信消息,简单微信机器人
GitHub地址
AI
脚本联动的是讯飞星火的api(主要是免费且无限token数(必须实名后才能领无限token数))
地址:控制台-讯飞开放平台
脚本
import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse, urlencode
import ssl
from datetime import datetime
from time import mktime, sleep
from wsgiref.handlers import format_date_time
import websocket
from wxauto import WeChat
# 讯飞星火API WebSocket参数
class Ws_Param(object):
def __init__(self, APPID, APIKey, APISecret, Spark_url):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.host = urlparse(Spark_url).netloc
self.path = urlparse(Spark_url).path
self.Spark_url = Spark_url
def create_url(self):
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
signature_origin = "host: " + self.host + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + self.path + " HTTP/1.1"
signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
v = {"authorization": authorization, "date": date, "host": self.host}
url = self.Spark_url + '?' + urlencode(v)
return url
# WebSocket事件处理
def on_error(ws, error):
print("### error:", error)
def on_close(ws, one, two):
print("### closed ###")
def on_open(ws):
thread.start_new_thread(run, (ws,))
def run(ws, *args):
data = json.dumps(gen_params(appid=ws.appid, domain=ws.domain, question=ws.question))
ws.send(data)
def on_message(ws, message):
data = json.loads(message)
code = data['header']['code']
if code != 0:
print(f'请求错误: {code}, {data}')
ws.close()
else:
choices = data["payload"]["choices"]
status = choices["status"]
content = choices["text"][0]["content"]
print(content, end="")
global answer
answer += content
if status == 2:
ws.close()
def gen_params(appid, domain, question):
data = {
"header": {
"app_id": appid,
"uid": "1234"
},
"parameter": {
"chat": {
"domain": domain,
"temperature": 0.5,
"max_tokens": 2048
}
},
"payload": {
"message": {
"text": question
}
}
}
return data
def main(appid, api_key, api_secret, Spark_url, domain, question):
wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
ws.appid = appid
ws.question = question
ws.domain = domain
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
# 初始化讯飞星火API的参数
# 定义应用ID,用于标识API的唯一身份
appid = ""
# 定义API密钥,用于验证身份
api_secret = ""
# 定义API公钥,用于加密和解密数据
api_key = ""
# 定义API域名,用于指定API的服务范围
domain = "general"
# 定义Spark API的WebSocket地址,用于建立实时通讯连接
Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat"
text = []
def getText(role, content):
jsoncon = {"role": role, "content": content}
text.append(jsoncon)
return text
def getlength(text):
length = 0
for content in text:
leng = len(content["content"])
length += leng
return length
def checklen(text):
while getlength(text) > 8000:
del text[0]
return text
# 初始化微信
wx = WeChat()
# 确保微信已经打开
print("请确保微信客户端已打开并登录。")
# 循环监听微信消息并回复
while True:
# 获取最新消息列表
messages = wx.GetAllMessage()
# 检查是否有消息
if messages:
# 获取最新一条消息
latest_message = messages[-1]
# 检查消息是否来自指定用户且不是自己发出的消息
if latest_message.sender == '独行' and not latest_message.content.startswith('我:'):
# 提取并处理问题内容
question = latest_message.content.strip()
question = checklen(getText("user", question))
# 初始化答案变量
answer = ""
# 调用主函数处理问题并获取答案
main(appid, api_key, api_secret, Spark_url, domain, question)
# 获取处理后的答案文本
getText("assistant", answer)
# 发送答案给指定用户
wx.SendMsg(answer, '独行')
# 休眠2秒,避免频繁操作
sleep(2)
else:
# 如果没有消息,可以做一些等待处理
sleep(2)