文章目录
- 功能描述
- 源码分析
- 依赖
- 参数配置
- 数据校验
- 多线程并发执行
- 定时任务注册
- 自动关机
- 主程序
- 源码整合
- 本篇小结
更多相关内容可查看
功能描述
需要python环境,详情可看主页python相关文章【Python】从0开始写脚本、Selenium详细教程、附源码案例(保姆篇)
源码分析
依赖
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
import threading
import schedule
import datetime
import subprocess
import sys
import re
参数配置
#手机号
phon_num = "155xxxxx"
#定时任务间隔时间(分钟)
interval = 2
#定时任务开始时间
start_time = "12:10"
#定时任务结束时间
end_time = "12:20"
#控制台打印间隔时间(秒)
printtime = 30
#执行完是否自动关机(True)
shutdown_flag = False
数据校验
def is_eleven_digit_number(element):
return element.isdigit() and len(element) == 11
def is_time_format(element):
pattern = r'^\d{2}:\d{2}$'
return bool(re.match(pattern, element))
def checkdata():
if not is_eleven_digit_number(phon_num):
print(f'{phon_num} :不是一个正确的手机号码,请修改!')
sys.exit(1)
if not is_time_format(start_time):
print(f'{start_time} :不符合时间格式 "HH:MM",请修改!')
sys.exit(1)
if not is_time_format(end_time):
print(f'{end_time} :不符合时间格式 "HH:MM",请修改!')
sys.exit(1)
这段代码是一个简单的数据验证函数 checkdata()
,用于检查输入数据的有效性。它依赖两个辅助函数 is_eleven_digit_number()
和 is_time_format()
来验证不同类型的数据:
-
is_eleven_digit_number(element)
: 这个函数用于检查给定的element
是否是一个11位的数字字符串。它首先使用isdigit()
方法检查字符串是否全由数字组成,然后使用len(element) == 11
来确保字符串长度为11位。 -
is_time_format(element)
: 这个函数用于验证时间格式是否符合 “HH:MM” 的形式,即两位小时和两位分钟,中间用冒号分隔。它使用正则表达式^\d{2}:\d{2}$
来匹配这种格式,确保输入的字符串满足要求。
在 checkdata()
函数中:
- 首先检查变量
phon_num
是否为一个11位的数字串,如果不是,则打印提示信息并终止程序执行。 - 然后分别检查
start_time
和end_time
是否符合 “HH:MM” 的时间格式要求,如果不符合则同样打印提示信息并终止程序执行。
这段代码的目的是确保输入数据的格式正确,以便后续的处理能够顺利进行。
多线程并发执行
def execute_tasks():
threads = [
threading.Thread(target=send_paipai, args=(phon_num,)),
threading.Thread(target=func3, args=(phon_num,)),
threading.Thread(target=send_chuanhang, args=(phon_num,)),
threading.Thread(target=send_wushang, args=(phon_num,)),
threading.Thread(target=send_caiyun, args=(phon_num,)),
threading.Thread(target=send_phpzw, args=(phon_num,)),
threading.Thread(target=func9, args=(phon_num,)),
threading.Thread(target=func1, args=(phon_num,)),
threading.Thread(target=func21, args=(phon_num,)),
threading.Thread(target=send_spump, args=(phon_num,)),
threading.Thread(target=func24, args=(phon_num,)),
threading.Thread(target=func19, args=(phon_num,)),
threading.Thread(target=func10, args=(phon_num,))
]
for t in threads:
t.start()
for t in threads:
t.join()
print("本轮已发送完成")
这段代码是一个并发执行多个任务的函数 execute_tasks()
。它使用了多线程的方式,同时启动了多个线程来执行不同的任务,然后等待所有线程执行完毕后输出完成信息。
具体描述如下:
-
创建线程列表:
- 代码首先定义了一个列表
threads
,其中包含了多个threading.Thread
对象。 - 每个
threading.Thread
对象通过target=
指定要执行的函数,通过args=
传入该函数需要的参数。这些函数包括send_paipai()
,func3()
,send_chuanhang()
, 等等,每个函数都以phon_num
作为参数。
- 代码首先定义了一个列表
-
启动线程:
- 使用一个
for
循环遍历threads
列表,对每个线程对象调用start()
方法启动线程。这样可以并发执行每个任务,而不是按顺序逐个执行。
- 使用一个
-
等待线程结束:
- 接着使用另一个
for
循环遍历threads
列表,对每个线程对象调用join()
方法。join()
方法会阻塞主线程,直到被调用线程执行完毕才会继续执行主线程。 - 这段代码确保所有任务都执行完成后才会继续往下执行,以保证后续操作不会受到未完成任务的影响。
- 接着使用另一个
-
输出完成信息:
- 最后打印一条信息 “本轮已发送完成”,表示所有任务都已经成功执行完毕。
总体来说,这段代码利用多线程的方式提高了任务执行的效率,特别适合需要并发执行多个独立任务的场景,如发送多个通知或处理多个独立的数据处理任务。
定时任务注册
def register_hourly_tasks():
while True:
current_time = datetime.datetime.now().strftime("%H:%M")
if start_time <= current_time <= end_time:
# 如果当前时间在定时任务时间范围内,注册任务
while current_time <= end_time:
schedule.every().day.at(current_time).do(execute_tasks)
print(f"已注册定时任务执行时间:{current_time}")
current_time = (datetime.datetime.strptime(current_time, "%H:%M") + datetime.timedelta(
minutes=interval)).strftime("%H:%M")
else:
break
else:
# 如果当前时间不在定时任务时间范围内,计算距离下一个任务开始的时间
if current_time < start_time:
next_time = start_time
else:
next_time = (datetime.datetime.strptime(current_time, "%H:%M") + datetime.timedelta(
minutes=interval)).strftime("%H:%M")
next_start = datetime.datetime.strptime(next_time, "%H:%M")
current_datetime = datetime.datetime.now()
delta = next_start - current_datetime
hours = delta.seconds // 3600
minutes = (delta.seconds // 60) % 60
seconds = delta.seconds % 60
print(f"当前时间为 {current_datetime.strftime('%H:%M')}, 距离定时任务开始还剩 {hours} 小时 {minutes} 分钟 {seconds} 秒")
time.sleep(printtime)
这段代码定义了一个函数 register_hourly_tasks()
,用于注册和执行定时任务。
-
主循环逻辑:
- 函数通过一个
while True
循环来持续执行任务注册和计时功能。
- 函数通过一个
-
获取当前时间:
- 使用
datetime.datetime.now().strftime("%H:%M")
获取当前时间的小时和分钟,格式化为字符串 “%H:%M”。
- 使用
-
判断当前时间是否在任务时间范围内:
- 如果
current_time
在指定的start_time
和end_time
范围内,则执行以下操作:- 进入内部的
while
循环,该循环会注册多个定时任务,直到current_time
大于end_time
。 - 使用
schedule.every().day.at(current_time).do(execute_tasks)
注册定时任务,指定任务执行的时间为current_time
。 - 打印信息表明已经注册了定时任务的执行时间。
- 更新
current_time
,使其增加interval
分钟后继续注册下一个任务。
- 进入内部的
- 如果
-
处理不在任务时间范围内的情况:
- 如果
current_time
不在指定的时间范围内,则执行以下操作:- 确定下一个任务开始的时间
next_time
,如果current_time
小于start_time
,则下一个任务从start_time
开始;否则,从当前时间加上interval
分钟后开始。 - 计算距离下一个任务开始的时间差
delta
。 - 将时间差
delta
转换成小时、分钟和秒数,并打印出距离下一个任务开始的剩余时间。 - 使用
time.sleep(printtime)
让程序休眠,直到下一个任务开始的时间,以节省系统资源和避免不必要的轮询。
- 确定下一个任务开始的时间
- 如果
总体来说,这段代码通过使用 schedule
模块实现了根据时间注册和执行定时任务,并提供了实时的时间信息反馈和等待功能,使得任务的执行更加自动化和精确。
自动关机
def shutdown_computer():
subprocess.run(["shutdown", "/s", "/t", "1"])
主程序
def start():
#数据校验
checkdata()
#定时任务注册
register_hourly_tasks()
#定时任务执行
while datetime.datetime.now().strftime("%H:%M") <= end_time:
schedule.run_pending()
time.sleep(30)
#清空定时任务
schedule.clear()
#关机
if shutdown_flag:
shutdown_computer()
if __name__ == "__main__":
start()
源码整合
#手机号
phon_num = "155xxxxx"
#定时任务间隔时间(分钟)
interval = 2
#定时任务开始时间
start_time = "12:10"
#定时任务结束时间
end_time = "12:20"
#控制台打印间隔时间(秒)
printtime = 30
#执行完是否自动关机(True)
shutdown_flag = False
def is_eleven_digit_number(element):
return element.isdigit() and len(element) == 11
def is_time_format(element):
pattern = r'^\d{2}:\d{2}$'
return bool(re.match(pattern, element))
def execute_tasks():
threads = [
threading.Thread(target=send_paipai, args=(phon_num,)),
threading.Thread(target=func3, args=(phon_num,)),
threading.Thread(target=send_chuanhang, args=(phon_num,)),
threading.Thread(target=send_wushang, args=(phon_num,)),
threading.Thread(target=send_caiyun, args=(phon_num,)),
threading.Thread(target=send_phpzw, args=(phon_num,)),
threading.Thread(target=func9, args=(phon_num,)),
threading.Thread(target=func1, args=(phon_num,)),
threading.Thread(target=func21, args=(phon_num,)),
threading.Thread(target=send_spump, args=(phon_num,)),
threading.Thread(target=func24, args=(phon_num,)),
threading.Thread(target=func19, args=(phon_num,)),
threading.Thread(target=func10, args=(phon_num,))
]
for t in threads:
t.start()
for t in threads:
t.join()
print("本轮已发送完成")
def register_hourly_tasks():
while True:
current_time = datetime.datetime.now().strftime("%H:%M")
if start_time <= current_time <= end_time:
# 如果当前时间在定时任务时间范围内,注册任务
while current_time <= end_time:
schedule.every().day.at(current_time).do(execute_tasks)
print(f"已注册定时任务执行时间:{current_time}")
current_time = (datetime.datetime.strptime(current_time, "%H:%M") + datetime.timedelta(
minutes=interval)).strftime("%H:%M")
else:
break
else:
# 如果当前时间不在定时任务时间范围内,计算距离下一个任务开始的时间
if current_time < start_time:
next_time = start_time
else:
next_time = (datetime.datetime.strptime(current_time, "%H:%M") + datetime.timedelta(
minutes=interval)).strftime("%H:%M")
next_start = datetime.datetime.strptime(next_time, "%H:%M")
current_datetime = datetime.datetime.now()
delta = next_start - current_datetime
hours = delta.seconds // 3600
minutes = (delta.seconds // 60) % 60
seconds = delta.seconds % 60
print(f"当前时间为 {current_datetime.strftime('%H:%M')}, 距离定时任务开始还剩 {hours} 小时 {minutes} 分钟 {seconds} 秒")
time.sleep(printtime)
def shutdown_computer():
subprocess.run(["shutdown", "/s", "/t", "1"])
def checkdata():
if not is_eleven_digit_number(phon_num):
print(f'{phon_num} :不是一个正确的手机号码,请修改!')
sys.exit(1)
if not is_time_format(start_time):
print(f'{start_time} :不符合时间格式 "HH:MM",请修改!')
sys.exit(1)
if not is_time_format(end_time):
print(f'{end_time} :不符合时间格式 "HH:MM",请修改!')
sys.exit(1)
def start():
#数据校验
checkdata()
#定时任务注册
register_hourly_tasks()
#定时任务执行
while datetime.datetime.now().strftime("%H:%M") <= end_time:
schedule.run_pending()
time.sleep(30)
#清空定时任务
schedule.clear()
#关机
if shutdown_flag:
shutdown_computer()
if __name__ == "__main__":
start()
本篇小结
这里没给出的部分脚本为如下方法:(可私信或留言获取)