Python定时任务
Python任务调度模块 – APScheduler
python调度框架APScheduler使用详解
APScheduler动态增、删、改任务
apscheduler mysql 持久化任务
APScheduler调度框架
在项目中,我们可能遇到有定时任务的需求。
其一:定时执行任务。例如每天早上 8 点定时推送早报。
其二:每隔一个时间段就执行任务。比如:每隔一个小时提醒自己起来走动走动,避免长时间坐着。
1 死循环使用线程睡眠函数sleep()
# -*- coding: utf-8 -*-
from datetime import datetime
import time
def timedTask():
# 每隔5秒打印当前时间
while True:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
time.sleep(5)
if __name__ == '__main__':
timedTask()
这种方法能够执行固定间隔时间的任务。如果timedTask()函数之后还有些操作,我们还使用死循环 + 阻塞线程。这会使得timedTask()一直占有 CPU 资源,导致后续操作无法执行。
2 threading中的Timer类
Python 标准库 threading 中有个 Timer 类。它会新启动一个线程来执行定时任务,所以它是非阻塞式。
如果你有使用多线程的话,需要关心线程安全问题。那么你可以选使用threading.Timer模块。
2.1 运行一次
# -*- coding: utf-8 -*-
from datetime import datetime
import threading
from threading import Timer
import time
import os
def timedTask():
# 每隔5秒执行任务。
# 第一个参数: 延迟多长时间执行任务(单位: 秒)
# 第二个参数: 要执行的任务, 即函数
# 第三个参数: 调用函数的参数(tuple)
t = Timer(5, task, ())
t.start()
# 待执行任务
def task():
child_id = threading.get_ident()
print("进程id=", os.getpid(), "子线程id=", child_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
parent_id = threading.get_ident()
timedTask()
while True:
print("进程id=", os.getpid(), "主线程id=", parent_id, time.time())
time.sleep(10)
定时任务,启动了一个单独的线程,运行一次。
2.2 循环运行
# -*- coding: utf-8 -*-
from datetime import datetime
import threading
from threading import Timer
import time
import os
def timedTask():
task()
# 每隔5秒执行任务一次。
# 第一个参数: 延迟多长时间执行任务(单位: 秒)
# 第二个参数: 要执行的任务, 即函数
# 第三个参数: 调用函数的参数(tuple)
t = Timer(5, timedTask)
t.start()
# 待执行任务
def task():
child_id = threading.get_ident()
print("进程id=", os.getpid(), "子线程id=", child_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
parent_id = threading.get_ident()
timedTask()
while True:
print("进程id=", os.getpid(), "主线程id=", parent_id, time.time())
time.sleep(10)
循环运行,每次都启动一个新线程。
3 事件调度器sched
使用标准库中sched模块。sched 是事件调度器,它通过 scheduler 类来调度事件,从而达到定时执行任务的效果。
3.1 执行一次
# -*- coding: utf-8 -*-
from datetime import datetime
import sched
import time
import threading
import os
def timedTask():
# 每隔5秒执行一次
# 初始化sched模块的scheduler类
scheduler = sched.scheduler(time.time, time.sleep)
scheduler.enter(5, 1, task) # 增加调度任务
scheduler.run() # 运行任务
# 待执行任务
def task():
child_id = threading.get_ident()
print("进程id=", os.getpid(), "子线程id=", child_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
timedTask()
scheduler 中的每个调度任务只会工作一次,不会无限循环被调用。如果想重复执行同一任务, 需要重复添加调度任务即可。
3.2 循环运行
# -*- coding: utf-8 -*-
from datetime import datetime
import sched
import time
import threading
import os
def timedTask():
task()
# 每隔5秒执行一次
# 初始化sched模块的scheduler类
scheduler = sched.scheduler(time.time, time.sleep)
scheduler.enter(5, 1, timedTask) # 增加调度任务
scheduler.run() # 运行任务
# 待执行任务
def task():
child_id = threading.get_ident()
print("进程id=", os.getpid(), "子线程id=", child_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
timedTask()
不启动新的线程
4 定时任务调度框架APScheduler
4.1 基础组件
APScheduler的全称是Advanced Python Scheduler。它是一个轻量级的 Python 定时任务调度框架。APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期),Linux 下的 Crontab 命令。同时,它还支持异步执行、后台执行调度任务。
APScheduler由四个组件构成:
(1)调度器(scheduler)
它是任务调度器,属于控制器角色。它配置作业存储器和执行器可以在调度器中完成,例如添加、修改和移除作业。
(2)作业存储(job store)
任务持久化仓库,默认保存任务在内存中,也可将任务保存都各种数据库中,任务中的数据序列化后保存到持久化数据库,从数据库加载后又反序列化。
(3)触发器(trigger)
描述调度任务被触发的条件。触发器完全是无状态的。
(4)执行器(executor)
负责处理作业的运行,它们通常通过在作业中提交指定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
4.1.1 schedulers调度器
调度器是将其余部分绑定在一起的工具。通常只有一个调度器(scheduler)在应用程序中运行。应用程序开发者通常不直接处理作业存储(job stores)、执行器(executors)或者触发器(triggers)。相反,调度器提供了适当的接口来处理它们。配置作业存储(job stores)和执行器(executors)是通过调度器(scheduler)来完成的,就像添加、修改和删除 job(作业)一样。
它提供 7 种调度器,能够满足我们各种场景的需要。
(1)BlockingScheduler : 调度器在当前进程的主线程中运行,也就是会阻塞当前线程。
(2)BackgroundScheduler : 调度器在后台线程中运行,不会阻塞当前线程。
(3)AsyncIOScheduler : 结合 asyncio 模块(一个异步框架)一起使用。
(4)GeventScheduler : 程序中使用 gevent(高性能的Python并发框架)作为IO模型,和 GeventExecutor 配合使用。
(5)TornadoScheduler : 程序中使用 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
(6)TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。
(7)QtScheduler : 你的应用是一个 Qt 应用,需使用QTimer完成定时唤醒。
4.1.2 triggers触发器
触发器包含调度逻辑。每个作业(job)都有自己的触发器,用于确定下一个作业何时运行。除了最初的配置,触发器是完全无状态的。
APScheduler有三种内建的trigger:
(1)date触发器
date是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
参数 说明
run_date (datetime或str) 作业的运行日期或时间
timezone (datetime.tzinfo或str) 指定时区
示例如下:
# -*- coding: utf-8 -*-
from datetime import datetime
from datetime import date
import time
from apscheduler.schedulers.background import BackgroundScheduler
import threading
import os
def job_task(text):
# 待执行任务
child_id = threading.get_ident()
print("进程id=", os.getpid(), "子线程id=", child_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), text)
if __name__ == '__main__':
scheduler = BackgroundScheduler()
# 添加调度任务
# 在 2023-05-18 时刻运行一次 job_func 方法
scheduler.add_job(job_task, 'date', run_date=date(2023, 5, 18), args=['text1'])
# 在 2023-05-18 15:57:00 时刻运行一次 job_func 方法
scheduler.add_job(job_task, 'date', run_date=datetime(2023, 5, 18, 15, 57, 0), args=['text2'])
# 在 2023-05-18 15:57:30 时刻运行一次 job_func 方法
scheduler.add_job(job_task, 'date', run_date='2023-5-18 15:57:30', args=['text3'])
# 启动调度任务
scheduler.start()
parent_id = threading.get_ident()
while True:
print("进程id=", os.getpid(), "主线程id=", parent_id, time.time())
time.sleep(20)
创建新的子线程
(2)interval触发器
固定时间间隔触发。interval 间隔调度,参数如下:
参数 说明
weeks (int) 间隔几周
days (int) 间隔几天
hours (int) 间隔几小时
minutes (int) 间隔几分钟
seconds (int) 间隔多少秒
start_date (datetime或str) 开始日期
end_date (datetime或str) 结束日期
timezone (datetime.tzinfo或str) 时区
示例如下
# -*- coding: utf-8 -*-
from datetime import datetime
import time
from apscheduler.schedulers.background import BackgroundScheduler
import threading
import os
def job_task(text):
# 待执行任务
child_id = threading.get_ident()
print("进程id=", os.getpid(), "子线程id=", child_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), text)
if __name__ == '__main__':
scheduler = BackgroundScheduler()
# 添加调度任务
# 每隔两分钟执行一次 job_func 方法
scheduler.add_job(job_task, 'interval', minutes=1, args=['text1'])
# 在start_date ~ end_date之间, 每隔1分钟执行一次 job_func 方法
scheduler.add_job(job_task, 'interval', minutes=1,
start_date='2023-05-18 16:08:01', end_date='2023-05-18 16:10:01', args=['text2'])
# 启动调度任务
scheduler.start()
parent_id = threading.get_ident()
while True:
print("进程id=", os.getpid(), "主线程id=", parent_id, time.time())
time.sleep(20)
(3)cron触发器
在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。 我们先了解 cron 参数:
参数 说明
year (int 或 str) 年,4位数字
month (int 或 str) 月 (范围1-12)
day (int 或 str) 日 (范围1-31
week (int 或 str) 周 (范围1-53)
day_of_week (int或str) 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int 或 str) 时 (范围0-23)
minute (int 或 str) 分 (范围0-59)
second (int 或 str) 秒 (范围0-59)
start_date (datetime 或 str) 最早开始日期(包含)
end_date (datetime 或 str) 最晚结束时间(包含)
timezone (datetime.tzinfo或str) 指定时区
示例如下
# 在每年 1-3、7-9 月份中的每个星期一、二中的 00:00, 01:00, 02:00 和 03:00 执行 job_func 任务
scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')
4.1.3 job stores作业存储
job stores 是存放作业的地方,默认保存在内存中。作业数据序列化后保存至持久性数据库,从持久性数据库加载回来时会反序列化。作业存储(job stores)不将作业数据保存在内存中(默认存储除外),相反,内存只是充当后端存储在保存、加载、更新、查找作业时的中间人角色。作业存储不能在调度器(schedulers) 之间共享。
该组件是对调度任务的管理,添加 job 有两种添加方法:
(1)其中一种上述代码用到的 add_job()。
(2)另一种则是scheduled_job()修饰器来修饰函数。
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
@scheduler.scheduled_job(job_func, 'interval', minutes=2)
def job_func(text):
print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
scheduler = BackgroundScheduler()
scheduler.start()
这个两种办法的区别是:第一种方法返回一个 apscheduler.job.Job 的实例,可以用来改变或者移除 job。第二种方法只适用于应用运行期间不会改变的 job。
一、移除 job 移除 job 也有两种方法:remove_job() 和 job.remove()。 remove_job() 是根据 job 的 id 来移除,所以要在 job 创建的时候指定一个 id。 job.remove() 则是对 job 执行 remove 方法即可
scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
scheduler.remove_job(job_one)
job = add_job(job_func, 'interval', minutes=2, id='job_one')
job.remvoe()
二、获取 job 列表 通过 scheduler.get_jobs() 方法能够获取当前调度器中的所有 job 的列表
修改 job 如果你因计划改变要对 job 进行修改,可以使用Job.modify() 或者 modify_job()方法来修改 job 的属性。但是值得注意的是,job 的 id 是无法被修改的。
scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
scheduler.start()
# 将触发时间间隔修改成 5分钟
scheduler.modify_job('job_one', minutes=5)
job = scheduler.add_job(job_func, 'interval', minutes=2)
# 将触发时间间隔修改成 5分钟
job.modify(minutes=5)
三、关闭 job 默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。如果你不想等待,可以将 wait 选项设置为 False。
scheduler.shutdown()
scheduler.shutdown(wait=false)
4.1.4 executors执行器
执行器处理作业的运行。它们通常通过将作业中的指定可调用部分提交给线程或进程池来实现这一点。 当作业完成后,执行器通知调度器,然后调度器发出一个适当的事件。
执行器顾名思义是执行调度任务的模块。最常用的 executor 有两种:ProcessPoolExecutor 和 ThreadPoolExecutor。
4.2 使用步骤
APScheduler 使用起来还算是比较简单。运行一个调度任务只需要以下三步。
(1)新建一个 schedulers (调度器) 。
(2)添加一个调度任务(job stores)。
(3)运行调度任务。
BlockingScheduler: 当调度器是你应用中唯一要运行的东西时使用。
BackgroundScheduler: 当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行。
选择合适的作业存储,你需要决定是否需要作业持久化。如果你总是在应用开始时重建job,你可以直接使用默认的作业存储(MemoryJobStore).但是如果你需要将你的作业持久化,以避免应用崩溃和调度器重启时,你可以根据你的应用环境来选择具体的作业存储。例如:使用Mongo或者SQLAlchemyJobStore (用于支持大多数RDBMS)。
默认的ThreadPoolExecutor 通常用于大多数用途。如果你的工作负载中有较大的CPU密集型操作,你可以考虑用ProcessPoolExecutor来使用更多的CPU核。你也可以在同一时间使用两者,将进程池调度器作为第二执行器。
# -*- coding: utf-8 -*-
from datetime import datetime
import time
from apscheduler.schedulers.background import BackgroundScheduler
import threading
import os
def timedTask():
task()
# 待执行任务
def task():
child_id = threading.get_ident()
print("进程id=", os.getpid(), "子线程id=", child_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
# 创建后台执行的 schedulers
scheduler = BackgroundScheduler()
# 添加调度任务
# 调度方法为 timedTask,触发器选择 interval(间隔性),间隔时长为 1 秒
scheduler.add_job(timedTask, 'interval', seconds=1)
# 启动调度任务
scheduler.start()
parent_id = threading.get_ident()
while True:
print("进程id=", os.getpid(), "主线程id=", parent_id, time.time())
time.sleep(5)
启动不同的线程处理。
4.3 使用job stores【mysql】
from datetime import datetime
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
import time
import threading
import os
def job_task(text):
# 待执行任务
child_id = threading.get_ident()
print("进程id=", os.getpid(), "子线程id=", child_id, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), text)
if __name__ == "__main__":
user = 'root' # 用户名
password = 'bigdata' # 密码
host = '127.0.0.1' # 主机IP
port = '3306' # 主机端口
dbname = 'test' # 数据库名称
# 使用pymysql连接数据库,字符集为UTF8
url = f"mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}?charset=utf8"
job_stores = {
'default': SQLAlchemyJobStore(url=url, tablename='my_tasks') # 定时任务表名为my_tasks
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': True, # 堆积后只执行最后一个
'max_instances': 1, # 最大的实例只能存在一个
}
scheduler = BackgroundScheduler(executors=executors,
job_defaults=job_defaults,
jobstores=job_stores,
timezone='Asia/Shanghai')
scheduler.start()
while True:
time.sleep(10)
scheduler.add_job(job_task, 'interval', args=["test5"], id="task5", seconds=2, replace_existing=True)
scheduler.add_job(job_task, 'interval', args=["test6"], id="task6", seconds=2, replace_existing=True)
4.4 增加修改移除任务
task_list = scheduler.get_jobs() # 获取最新的任务列表
task_id_list = [task.id for task in task_list] # 获取最新的任务id列表
if flag == "add":
# 增加任务
if task_id not in task_id_list:
scheduler.add_job(job_task, 'interval',
seconds=int(data_dict["Interval"]),
id=task_id,
start_date=data_dict["effect_start_time"],
end_date=data_dict["effect_end_time"],
args=[task_id])
task_id_list.append(task_id) # 更新任务列表
print("add", task_id)
if flag == "delete":
# 移除任务
if task_id in task_id_list:
scheduler.remove_job(job_id=task_id)
task_id_list.remove(task_id) # 更新任务列表
print("delete",task_id)
if flag == "update":
# 修改任务
if task_id in task_id_list:
scheduler_trigger = scheduler._create_trigger(
trigger="interval", # 指定新的执行任务方式,这里还是用的时间间隔
trigger_args={"seconds": int(data_dict["Interval"])} # 多少秒执行一次
)
scheduler.modify_job(job_id=task_id, trigger=scheduler_trigger)
print("update",task_id)