霸王茶姬小程序任务脚本
小白操作----仅供学习研究参考
功能: 积分签到
解析
该脚本用于“霸王茶姬小程序”的签到和积分查询操作。通过模拟网络请求登录账号,获取个人信息,执行每日签到,并查询积分情况。支持多账号操作,并能通过微信推送签到结果和相关信息。
RUN这个类是脚本的核心,负责执行各种任务。
-
init: 初始化类,设置用户信息、会话和请求头。
-
personal_info: 获取并验证用户的个人信息,如用户名和手机号。
-
user_sign_statistics: 获取用户的签到信息,并根据签到状态执行签到操作。
-
take_part_in_sign: 执行签到操作并获取签到奖励。
-
points_info: 查询用户的积分信息,包括即将过期的积分。
-
main: 主流程控制,依次执行登录、获取用户信息、签到和查询积分等操作。
-
sendMsg: 发送微信推送通知消息。
辅助函数
-
Log: 记录日志并更新消息内容。
-
down_file: 下载文件并进行必要的文件操作。
-
import_Tools: 导入工具模块并初始化环境。
主函数负责脚本的总体运行流程,包括加载配置、检查环境、执行任务等。它会导入必要的工具模块,并根据配置执行多账号的签到操作。
定时任务
脚本通过定时任务 (cron) 在每天的1点30分运行,自动执行签到和相关操作。
该脚本的主要功能是自动化执行“霸王茶姬小程序”的签到和积分查询操作,通过模拟登录、获取用户信息、签到和积分查询等步骤,实现对签到任务的自动化处理,并通过微信推送通知用户签到结果。
import os
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# import CHERWIN_TOOLS
# 禁用安全请求警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
IS_DEV = False
if os.path.isfile('DEV_ENV.py'):
import DEV_ENV
IS_DEV = True
if os.path.isfile('notify.py'):
from notify import send
print("加载通知服务成功!")
else:
print("加载通知服务失败!")
send_msg = ''
one_msg=''
def Log(cont=''):
global send_msg,one_msg
print(cont)
if cont:
one_msg += f'{cont}\n'
send_msg += f'{cont}\n'
class RUN:
def __init__(self,info,index):
global one_msg
one_msg = ''
split_info = info.split('@')
self.token = split_info[0]
len_split_info = len(split_info)
last_info = split_info[len_split_info - 1]
self.send_UID = None
if len_split_info > 0 and "UID_" in last_info:
self.send_UID = last_info
self.index = index + 1
Log(f"\n---------开始执行第{self.index}个账号>>>>>")
self.s = requests.session()
self.s.verify = False
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x6309080f) XWEB/8555',
'work-wechat-userid': '',
'multi-store-id': '',
'gdt-vid': '',
'qz-gtd': '',
'scene': '1006',
'Qm-From': 'wechat',
'store-id': '49006',
'Qm-User-Token': self.token,
'channelCode': '',
'Qm-From-Type': 'catering',
'promotion-code': '',
'work-staff-name': '',
'work-staff-id': '',
'Accept': 'v=1.0',
'Accept-Encoding': 'gzip,compress,br,deflate',
'Referer': 'https://servicewechat.com/wxafec6f8422cb357b/87/page-frame.html'
}
self.s.headers.update(self.headers)
self.appid = 'wxafec6f8422cb357b'
self.activity_id='947079313798000641'
def personal_info(self):
personal_info_valid = False
try:
# 请求的参数
params = {'appid': self.appid}
# 发送GET请求
response = self.s.get('https://webapi.qmai.cn/web/catering/crm/personal-info', json=params)
result = response.json()
# 检查请求是否成功
if result.get('code','-1') == '0':
personal_info_valid = True
# 提取个人信息
mobile_phone = result['data']['mobilePhone'] if 'data' in result and 'mobilePhone' in result[
'data'] else None
self.mobile_phone = mobile_phone[:3] + "*" * 4 + mobile_phone[7:]
self.name = result['data']['name'] if 'data' in result and 'name' in result['data'] else None
Log(f"账号[{self.index}]登陆成功!\n用户名:【{self.name}】 \n手机号:【{self.mobile_phone}】")
else:
# 如果请求不成功,则打印错误信息
message = result.get('message', '')
Log(f'登录失败: {message}')
except Exception as e:
# 捕获任何异常并打印
print(e)
finally:
# 最终返回请求是否成功的标志
return personal_info_valid
def user_sign_statistics(self):
try:
json_data = {
'activityId': self.activity_id,
'appid': self.appid
}
# Send the POST request
response = self.s.post('https://webapi.qmai.cn/web/cmk-center/sign/userSignStatistics', json=json_data)
result = response.json()
status_code = response.status_code
# Check if the request was successful
if result.get('code', status_code) == 0:
data = result.get('data', {})
sign_days = data.get('signDays', '')
sign_status = data.get('signStatus', 0) == 1
Log(f'新版签到今天{"已" if sign_status else "未"}签到, 已连续签到{sign_days}天')
if not sign_status:
self.take_part_in_sign()
return sign_status, sign_days
else:
message = result.get('message', '')
Log(f'查询新版签到失败: {message}')
return False, 0
except Exception as e:
print(e)
return False, 0
def take_part_in_sign(self):
try:
json_data = {
'activityId': self.activity_id,
'appid': self.appid
}
response = self.s.post('https://webapi.qmai.cn/web/cmk-center/sign/takePartInSign', json=json_data)
result = response.json()
status_code = response.status_code
if result.get('code', status_code) == 0:
data = result.get('data',{})
rewardDetailList = data.get('rewardDetailList',[{}])
if rewardDetailList:
rewardName = rewardDetailList[0].get('rewardName','')
sendNum = rewardDetailList[0].get('sendNum','')
Log(f'新版签到成功,获得【{sendNum}】{rewardName}')
return True
else:
Log(f'签到失败:【{result.get("message","")}】')
return True
else:
message = result.get('message', '')
Log(f'新版签到失败: {message}')
return False
except Exception as e:
print(e)
return False
def points_info(self):
try:
json_data = {
'appid': self.appid
}
response = self.s.post('https://webapi.qmai.cn/web/catering/crm/points-info', json=json_data)
result = response.json()
status_code = response.status_code
if result.get('code', status_code) == '0':
data = result.get('data', {})
soon_expired_points = data.get('soonExpiredPoints', 0)
total_points = data.get('totalPoints', 0)
expired_time = data.get('expiredTime', '')
if soon_expired_points:
Log(f'有【{soon_expired_points}】积分将于( {expired_time})过期')
Log(f'当前积分: 【{total_points}】')
return total_points, soon_expired_points, expired_time
else:
message = result.get('message', '')
Log(f'查询积分失败: {message}')
return None
except Exception as e:
print(e)
return False
def main(self):
if not self.personal_info() :
Log("用户信息无效,请更新CK")
self.sendMsg()
return False
self.user_sign_statistics()
self.points_info()
self.sendMsg()
return True
def sendMsg(self, help=False):
if self.send_UID:
push_res = CHERWIN_TOOLS.wxpusher(self.send_UID, one_msg, APP_NAME, help)
print(push_res)
def down_file(filename, file_url):
print(f'开始下载:{filename},下载地址:{file_url}')
try:
response = requests.get(file_url, verify=False, timeout=10)
response.raise_for_status()
with open(filename + '.tmp', 'wb') as f:
f.write(response.content)
print(f'【{filename}】下载完成!')
# 检查临时文件是否存在
temp_filename = filename + '.tmp'
if os.path.exists(temp_filename):
# 删除原有文件
if os.path.exists(filename):
os.remove(filename)
# 重命名临时文件
os.rename(temp_filename, filename)
print(f'【{filename}】重命名成功!')
return True
else:
print(f'【{filename}】临时文件不存在!')
return False
except Exception as e:
print(f'【{filename}】下载失败:{str(e)}')
return False
def import_Tools():
global CHERWIN_TOOLS,ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode
import CHERWIN_TOOLS
ENV, APP_INFO, TIPS, TIPS_HTML, AuthorCode = CHERWIN_TOOLS.main(APP_NAME, local_script_name, ENV_NAME,local_version)
if __name__ == '__main__':
APP_NAME = '霸王茶姬小程序'
ENV_NAME = 'BWCJ'
CK_NAME = 'qm-user-token'
print(f'''
{APP_NAME}签到
本文部分变量已做脱敏处理,仅用于测试和学习研究,禁止用于商业用途。