大家好,我是TheWeiJun;最近看见微信里各个群聊都在聊chatGPT,甚至有的大佬们都把chatGPT接入了微信群聊,于是就有粉丝来找小编,希望能出一期chatGPT的文章;故今天这篇文章我将手把手教大家如何实现并自定义自己的聊天机器人。码字不易,在阅读的同时记得给我一个star!
特别声明:本公众号文章只作为学术研究,不作为其它不法用途;如有侵权请联系作者删除。
目录
一、前言介绍
二、环境准备
三、chatGPT接入
四、源码重写
五、效果展示
趣味模块
近期微软推出了chatGPT,小明想把chatGPT接入到微信群聊中,平日里闲下来没事的时候,和gpt聊聊八卦、谈谈人生、增长一下学习知识。甚至在女朋友不知情的情况下,偷偷看看漂亮小姐姐的图片和视频;而这些功能都希望能通过微信机器人去实现。为了满足小明同学的要求,今天给大家分享一个chatGPT与itchat打造的全网最强机器狗。相信我,一定要看到最后,你肯定不会后悔。
一、前言介绍
1、什么是chatGPT?
ChatGPT是美国人工智能研究实验室OpenAI新推出的一种人工智能技术驱动的自然语言处理工具,使用了Transformer神经网络架构,也是GPT-3.5架构,这是一种用于处理序列数据的模型,拥有语言理解和文本生成能力,尤其是它会通过连接大量的语料库来训练模型,这些语料库包含了真实世界中的对话,使得ChatGPT具备上知天文下知地理,还能根据聊天的上下文进行互动的能力,做到与真正人类几乎无异的聊天场景进行交流。ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。
2、什么是itchat?
itchat是一个开源的微信个人号接口,使用python调用微信从未如此简单。使用不到三十行的代码,你就可以完成一个能够处理所有信息的微信机器人。当然,该api的使用远不止一个机器人,如今微信已经成为了个人社交的很大一部分。
二、环境准备
为了实现一个微信机器狗,我们需要准备如下的环境和工具包:
-
微信号一个,需要实名认证(itchat会获取登录后的skey,不然无法登录成功)
-
chatGPT账号一个,并能够获取Api Key。
-
科学上网工具,不然无法访问chatGPT接口。
-
安装itchat、openai第三方工具包。
-
Python环境为Python3;最好是3.7以上版本。
-
有服务器的可以部署到服务器,没有的本地环境也可以。
附上环境安装命令:
pip3 install itchat-uos==1.5.0.dev0
pip3 install --upgrade openai
三、chatGPT接入
1、查看官方文档,如何使用python接入chatGPT,截图如下:
2、阅读chatGPT官方文档后,编写Python代码如下:
import openai
import requests
# openai api方式
def get_answer_api(keywrod, api_key):
openai.api_key = api_key
response = openai.Completion.create(
model="text-davinci-003", # 模型
prompt=keywrod,
temperature=0.5,
max_tokens=1024, # 限制回复字数(一个汉字为两个token)
top_p=1,
best_of=1,
frequency_penalty=0,
presence_penalty=0,
# stop=["\n"]
)
content = response.get('choices')
if content:
text = content[0].get('text').replace('\n', '')
else:
text = '对不起,我没有理解你的意思!'
print(text)
return text
# requests 请求方式
def get_answer_req(keywrod, api_key):
url = "https://api.openai.com/v1/completions"
headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}'}
data = {
'model': "text-davinci-003", # 模型
'prompt': keywrod,
'temperature': 0.5,
'max_tokens': 1024, # 限制回复字数(一个汉字为两个token)
'top_p': 1,
'frequency_penalty': 0,
'presence_penalty': 0,
}
response = requests.post(url, headers=headers, json=data)
content = response.json().get('choices')
if content:
text = content[0].get('text').replace('\n', '')
else:
text = '对不起,我连接不到你的网络!'
print(text)
return text
if __name__ == '__main__':
api_key = "你的api_key"
get_answer_api('讲个笑话', api_key)
3、运行代码后,chatGPT输出内容如下图所示:
4、chatGPT接口OK后,我们把它和itchat进行关联,相关代码如下所示:
# encoding:utf-8
"""
wechat channel
"""
import itchat
import json
from itchat.content import *
from channel.channel import Channel
from concurrent.futures import ThreadPoolExecutor
from common.log import logger
from common.tmp_dir import TmpDir
from config import conf
import requests
import io
thread_pool = ThreadPoolExecutor(max_workers=8)
@itchat.msg_register(TEXT)
def handler_single_msg(msg):
WechatChannel().handle_text(msg)
return None
@itchat.msg_register(TEXT, isGroupChat=True)
def handler_group_msg(msg):
WechatChannel().handle_group(msg)
return None
@itchat.msg_register(VOICE)
def handler_single_voice(msg):
WechatChannel().handle_voice(msg)
return None
class WechatChannel(Channel):
def __init__(self):
pass
def startup(self):
# login by scan QRCode
itchat.auto_login(enableCmdQR=2)
# start message listener
itchat.run()
def handle_voice(self, msg):
if conf().get('speech_recognition') != True :
return
logger.debug("[WX]receive voice msg: " + msg['FileName'])
thread_pool.submit(self._do_handle_voice, msg)
def _do_handle_voice(self, msg):
from_user_id = msg['FromUserName']
other_user_id = msg['User']['UserName']
if from_user_id == other_user_id:
file_name = TmpDir().path() + msg['FileName']
msg.download(file_name)
query = super().build_voice_to_text(file_name)
if conf().get('voice_reply_voice'):
self._do_send_voice(query, from_user_id)
else:
self._do_send_text(query, from_user_id)
def handle_text(self, msg):
logger.debug("[WX]receive text msg: " + json.dumps(msg, ensure_ascii=False))
content = msg['Text']
self._handle_single_msg(msg, content)
def _handle_single_msg(self, msg, content):
from_user_id = msg['FromUserName']
to_user_id = msg['ToUserName'] # 接收人id
other_user_id = msg['User']['UserName'] # 对手方id
match_prefix = self.check_prefix(content, conf().get('single_chat_prefix'))
if "」\n- - - - - - - - - - - - - - -" in content:
logger.debug("[WX]reference query skipped")
return
if from_user_id == other_user_id and match_prefix is not None:
# 好友向自己发送消息
if match_prefix != '':
str_list = content.split(match_prefix, 1)
if len(str_list) == 2:
content = str_list[1].strip()
img_match_prefix = self.check_prefix(content, conf().get('image_create_prefix'))
if img_match_prefix:
content = content.split(img_match_prefix, 1)[1].strip()
thread_pool.submit(self._do_send_img, content, from_user_id)
else :
thread_pool.submit(self._do_send_text, content, from_user_id)
elif to_user_id == other_user_id and match_prefix:
# 自己给好友发送消息
str_list = content.split(match_prefix, 1)
if len(str_list) == 2:
content = str_list[1].strip()
img_match_prefix = self.check_prefix(content, conf().get('image_create_prefix'))
if img_match_prefix:
content = content.split(img_match_prefix, 1)[1].strip()
thread_pool.submit(self._do_send_img, content, to_user_id)
else:
thread_pool.submit(self._do_send_text, content, to_user_id)
def handle_group(self, msg):
logger.debug("[WX]receive group msg: " + json.dumps(msg, ensure_ascii=False))
group_name = msg['User'].get('NickName', None)
group_id = msg['User'].get('UserName', None)
if not group_name:
return ""
origin_content = msg['Content']
content = msg['Content']
content_list = content.split(' ', 1)
context_special_list = content.split('\u2005', 1)
if len(context_special_list) == 2:
content = context_special_list[1]
elif len(content_list) == 2:
content = content_list[1]
if "」\n- - - - - - - - - - - - - - -" in content:
logger.debug("[WX]reference query skipped")
return ""
config = conf()
match_prefix = (msg['IsAt'] and not config.get("group_at_off", False)) or self.check_prefix(origin_content, config.get('group_chat_prefix')) \
or self.check_contain(origin_content, config.get('group_chat_keyword'))
if ('ALL_GROUP' in config.get('group_name_white_list') or group_name in config.get('group_name_white_list') or self.check_contain(group_name, config.get('group_name_keyword_white_list'))) and match_prefix:
img_match_prefix = self.check_prefix(content, conf().get('image_create_prefix'))
if img_match_prefix:
content = content.split(img_match_prefix, 1)[1].strip()
thread_pool.submit(self._do_send_img, content, group_id)
else:
thread_pool.submit(self._do_send_group, content, msg)
def send(self, msg, receiver):
itchat.send(msg, toUserName=receiver)
logger.info('[WX] sendMsg={}, receiver={}'.format(msg, receiver))
def _do_send_voice(self, query, reply_user_id):
try:
if not query:
return
context = dict()
context['from_user_id'] = reply_user_id
reply_text = super().build_reply_content(query, context)
if reply_text:
replyFile = super().build_text_to_voice(reply_text)
itchat.send_file(replyFile, toUserName=reply_user_id)
logger.info('[WX] sendFile={}, receiver={}'.format(replyFile, reply_user_id))
except Exception as e:
logger.exception(e)
def _do_send_text(self, query, reply_user_id):
try:
if not query:
return
context = dict()
context['session_id'] = reply_user_id
reply_text = super().build_reply_content(query, context)
if reply_text:
self.send(conf().get("single_chat_reply_prefix") + reply_text, reply_user_id)
except Exception as e:
logger.exception(e)
def _do_send_img(self, query, reply_user_id):
try:
if not query:
return
context = dict()
context['type'] = 'IMAGE_CREATE'
img_url = super().build_reply_content(query, context)
if not img_url:
return
# 图片下载
pic_res = requests.get(img_url, stream=True)
image_storage = io.BytesIO()
for block in pic_res.iter_content(1024):
image_storage.write(block)
image_storage.seek(0)
# 图片发送
itchat.send_image(image_storage, reply_user_id)
logger.info('[WX] sendImage, receiver={}'.format(reply_user_id))
except Exception as e:
logger.exception(e)
def _do_send_group(self, query, msg):
if not query:
return
context = dict()
group_name = msg['User']['NickName']
group_id = msg['User']['UserName']
group_chat_in_one_session = conf().get('group_chat_in_one_session', [])
if ('ALL_GROUP' in group_chat_in_one_session or \
group_name in group_chat_in_one_session or \
self.check_contain(group_name, group_chat_in_one_session)):
context['session_id'] = group_id
else:
context['session_id'] = msg['ActualUserName']
reply_text = super().build_reply_content(query, context)
if reply_text:
reply_text = '@' + msg['ActualNickName'] + ' ' + reply_text.strip()
self.send(conf().get("group_chat_reply_prefix", "") + reply_text, group_id)
def check_prefix(self, content, prefix_list):
for prefix in prefix_list:
if content.startswith(prefix):
return prefix
return None
def check_contain(self, content, keyword_list):
if not keyword_list:
return None
for ky in keyword_list:
if content.find(ky) != -1:
return True
return None
总结:此刻我们已经把chatGPT和itchat成功结合,实现了微信机器狗自动会话功能,同时也增加了自动发图片、视频功能。但是在小编登录过程中,发现只要登录出现异地操作,二维码会重复弹出的问题,截图如下所示:
为了解决二维码重复弹出问题及程序异常退出问题,我们接下来对itchat官方源码进行重写。
四、源码重写
1、将itchat源码login模块进行重写,重写后代码如下:
import os
import time
import re
import io
import threading
import json
import xml.dom.minidom
import random
import traceback
import logging
try:
from httplib import BadStatusLine
except ImportError:
from http.client import BadStatusLine
import requests
from pyqrcode import QRCode
from .. import config, utils
from ..returnvalues import ReturnValue
from ..storage.templates import wrap_user_dict
from .contact import update_local_chatrooms, update_local_friends
from .messages import produce_msg
logger = logging.getLogger('itchat')
def load_login(core):
core.login = login
core.get_QRuuid = get_QRuuid
core.get_QR = get_QR
core.check_login = check_login
core.web_init = web_init
core.show_mobile_login = show_mobile_login
core.start_receiving = start_receiving
core.get_msg = get_msg
core.logout = logout
def login(self, enableCmdQR=False, picDir=None, qrCallback=None,
loginCallback=None, exitCallback=None):
if self.alive or self.isLogging:
logger.warning('itchat has already logged in.')
return
self.isLogging = True
while self.isLogging:
uuid = push_login(self)
if uuid:
qrStorage = io.BytesIO()
else:
logger.info('Getting uuid of QR code.')
while not self.get_QRuuid():
time.sleep(1)
logger.info('Downloading QR code.')
qrStorage = self.get_QR(enableCmdQR=enableCmdQR,
picDir=picDir, qrCallback=qrCallback)
logger.info('Please scan the QR code to log in.')
isLoggedIn = False
while not isLoggedIn:
status = self.check_login()
if hasattr(qrCallback, '__call__'):
qrCallback(uuid=self.uuid, status=status,
qrcode=qrStorage.getvalue())
if status == '200':
isLoggedIn = True
elif status == '201':
if isLoggedIn is not None:
logger.info('Please press confirm on your phone.')
isLoggedIn = None
logger.info('wait 10 seconds.')
time.sleep(10)
elif status != '408':
break
if isLoggedIn:
break
elif self.isLogging:
logger.info('Log in time out, reloading QR code.')
else:
return # log in process is stopped by user
logger.info('Loading the contact, this may take a little while.')
self.web_init()
self.show_mobile_login()
self.get_contact(True)
if hasattr(loginCallback, '__call__'):
r = loginCallback()
else:
utils.clear_screen()
if os.path.exists(picDir or config.DEFAULT_QR):
os.remove(picDir or config.DEFAULT_QR)
logger.info('Login successfully as %s' % self.storageClass.nickName)
self.start_receiving(exitCallback)
self.isLogging = False
总结:在触发状态码为201的时候,进行10秒等待,否则会重复弹出二维码,让你无法登录。
2、将itchat源码core模块进行重写,重写后代码如下:
class Core(object):
def __init__(self):
''' init is the only method defined in core.py
alive is value showing whether core is running
- you should call logout method to change it
- after logout, a core object can login again
storageClass only uses basic python types
- so for advanced uses, inherit it yourself
receivingRetryCount is for receiving loop retry
- it's 5 now, but actually even 1 is enough
- failing is failing
'''
self.alive, self.isLogging = False, False
self.storageClass = storage.Storage(self)
self.memberList = self.storageClass.memberList
self.mpList = self.storageClass.mpList
self.chatroomList = self.storageClass.chatroomList
self.msgList = self.storageClass.msgList
self.loginInfo = {}
self.s = requests.Session()
self.uuid = None
self.functionDict = {'FriendChat': {}, 'GroupChat': {}, 'MpChat': {}}
self.useHotReload, self.hotReloadDir = False, 'itchat.pkl'
self.receivingRetryCount = 1000
def login(self, enableCmdQR=False, picDir=None, qrCallback=None,
loginCallback=None, exitCallback=None):
''' log in like web wechat does
for log in
- a QR code will be downloaded and opened
- then scanning status is logged, it paused for you confirm
- finally it logged in and show your nickName
for options
- enableCmdQR: show qrcode in command line
- integers can be used to fit strange char length
- picDir: place for storing qrcode
- qrCallback: method that should accept uuid, status, qrcode
- loginCallback: callback after successfully logged in
- if not set, screen is cleared and qrcode is deleted
- exitCallback: callback after logged out
- it contains calling of logout
for usage
..code::python
import itchat
itchat.login()
it is defined in components/login.py
and of course every single move in login can be called outside
- you may scan source code to see how
- and modified according to your own demand
'''
raise NotImplementedError()
总结:修改该模块主要因为异常次数过多,itchat微信就会自动退出。重写后将异常次数拉满,这样就能避免因为网络不稳定问题造成微信自动退出。
3、整个项目启动代码配置如下:
{
"open_ai_api_key": "",
"proxy": "127.0.0.1:7890",
"single_chat_prefix": ["机器狗", "@机器狗"],
"single_chat_reply_prefix": "[机器狗] ",
"group_chat_prefix": ["@机器狗", "@机器狗"],
"group_name_white_list": ["机器狗家族", "AI回答"],
"image_create_prefix": ["画", "看", "找"],
"conversation_max_tokens": 1000,
"character_desc": "我是机器狗, 一个由OpenAI训练的大型语言模型, 旨在回答并解决人们的任何问题,并且可以使用多种语言与人交流。",
"expires_in_seconds": 1200
}
4、然后将proxy代理部署到linux服务器,部署截图如下:
5、最后将代码运行起来并进行手机扫码登录,登录后log截图如下:
注:此刻我们已经完全脱机,不需要手机保持在线;可以实现24小时*N天超长待机。
五、效果展示
1、代码成功部署服务器后,我们在微信群聊发送指定内容,查看机器狗返回内容如下:
-
机器狗文本信息截图
-
文本图片魔改版截图
-
机器狗图片信息截图
-
机器狗视频信息截图
总结:图片、视频功能是小编通过itchat模块自定义扩展的,并不是chatGPT真实返回的,供大家观赏使用,切勿用于不法用途。
粉丝福利:公众号后台回复 chatgpt 即可获取机器狗完整代码(有效期7天)
原文链接:微信自动聊天机器狗,配置chatGPT,比Siri还智能!
今天分享到这里就结束了,欢迎大家关注下期内容,我们不见不散☀️☀️😊
文章来源:逆向与爬虫的故事
微信搜:逆向与爬虫的故事;给我一个关注!