文章目录
- 定时任务库对比
- 简介
- 与其余框架的区别
- 安装
- 初试
- 调度器基础
- 测试方法
- 字符串格式
- 具体时间间隔
- 周期
- 某时间段
- 条件 API
- 条件逻辑
- 方法对比
- 执行选项
- 在主进程和线程中执行
- 进程
- 线程
- 异步
- 设置默认选项
- 日志
- 流水线
- 在一个任务后执行
- 输入作为输出
- 会话级参数
- 函数参数
- TODO:元参数
- 自定义条件
- 元任务
- 遇到的坑
- 参考文献
定时任务库对比
推荐阅读 Python timing task - schedule vs. Celery vs. APScheduler
库 | 大小 | 优点 | 缺点 | 适用场景 |
---|---|---|---|---|
Schedule | 轻量级 | 易用无配置 | 不能动态添加任务或持久化任务 | 简单任务 |
Celery | 重量级 | ①任务队列 ②分布式 | ①不能动态添加定时任务到系统中,如Flask(Django可以) ②设置起来较累赘 | 任务队列 |
APScheduler | 相对重量级 | ①灵活,可动态增删定时任务并持久化 ②支持多种存储后端 ③集成框架多,用户广 | 重量级,学习成本大 | 通用 |
Rocketry | 轻量级 | 易用功能强 | 尚未成熟,文档不清晰 | 通用 |
简介
Rocketry 是 Python 的任务调度框架,易用、简洁、强大。可通过 Python 装饰器语法进行任务调度,支持定时、并发(异步、多线程、多进程)、条件触发等。
感觉没有 Celery 和 APScheduler 成熟
与其余框架的区别
常见任务调度框架有:
- Crontab
- APScheduler
- Airflow
Rocketry 的调度程序基于语句,有相同的调度策略,也可以使用自定义调度语句进行扩展。
此外,Rocketry 非常易用,无需复杂配置,但可用于大型应用程序。
安装
pip install rocketry
初试
import datetime
from rocketry import Rocketry
app = Rocketry()
@app.task('every 5 seconds')
def do_things():
print(datetime.datetime.now())
if __name__ == "__main__":
app.run()
调度器基础
创建调度器规则的方式,可通过与、或、非组合,还可用于任务的开始、结束、终止:
- 字符串格式
- 条件 API
- 条件类
测试方法
判断当前时间是否在 10:00 到 14:00 之间
from rocketry.conds import time_of_day
condition = time_of_day.between('10:00', '14:00')
print(condition.observe())
字符串格式
简单易写,但静态代码分析器无法检查语句是否正确
具体时间间隔
import sys
from rocketry import Rocketry
app = Rocketry()
@app.task('every 10 seconds')
def do_constantly():
"""每10秒执行"""
print(sys._getframe().f_code.co_name)
@app.task('every 1 minute')
def do_minutely():
"""每1分钟执行"""
print(sys._getframe().f_code.co_name)
@app.task('every 1 hour')
def do_hourly():
"""每1小时执行"""
print(sys._getframe().f_code.co_name)
@app.task('every 1 day')
def do_daily():
"""每1天执行"""
print(sys._getframe().f_code.co_name)
@app.task('every 2 days 2 hours 20 seconds')
def do_custom():
"""每2天2小时20秒执行"""
print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
app.run()
周期
import sys
from rocketry import Rocketry
app = Rocketry()
@app.task('secondly')
def do_secondly():
"""每1秒钟执行"""
print(sys._getframe().f_code.co_name)
@app.task('minutely')
def do_minutely():
"""每1分钟执行"""
print(sys._getframe().f_code.co_name)
@app.task('hourly')
def do_hourly():
"""每1小时执行"""
print(sys._getframe().f_code.co_name)
@app.task('daily')
def do_daily():
"""每1天执行"""
print(sys._getframe().f_code.co_name)
@app.task('weekly')
def do_weekly():
"""每1周执行"""
print(sys._getframe().f_code.co_name)
@app.task('monthly')
def do_monthly():
"""每1个月执行"""
print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
app.run()
某时间段
before
:之前after
:之后between
:之间starting
:开始
import sys
from rocketry import Rocketry
app = Rocketry()
@app.task('minutely before 45')
def do_minutely():
"""每分钟的45秒前执行"""
print(sys._getframe().f_code.co_name)
@app.task('hourly after 45:00')
def do_hourly():
"""每小时的45分后执行"""
print(sys._getframe().f_code.co_name)
@app.task('daily between 08:00 and 14:00')
def do_daily():
"""每天的08:00到14:00这段时间内执行"""
print(sys._getframe().f_code.co_name)
@app.task('weekly on Monday')
def do_weekly():
"""每周的周一执行"""
print(sys._getframe().f_code.co_name)
@app.task('monthly starting 3rd')
def do_monthly():
"""每个月的3号开始执行"""
print(sys._getframe().f_code.co_name)
@app.task('time of day between 10:00 and 18:00')
def do_constantly_during_day():
"""每天的10:00到18:00这段时间内执行"""
print(sys._getframe().f_code.co_name)
@app.task('time of week between Saturday and Sunday')
def do_constantly_during_weekend():
"""每周的周六到周日这段时间内执行"""
print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
app.run()
条件 API
import sys
from rocketry import Rocketry
from rocketry.conds import every, hourly, daily, after_success, true, false
app = Rocketry()
@app.task(every('10 seconds'))
def do_constantly():
"""每10秒执行"""
print(sys._getframe().f_code.co_name)
@app.task(hourly)
def do_hourly():
"""每1小时执行"""
print(sys._getframe().f_code.co_name)
@app.task(daily.between('08:00', '14:00'))
def do_daily():
"""每天08:00到14:00执行一次"""
print(sys._getframe().f_code.co_name)
@app.task(after_success(do_daily))
def do_after():
"""do_daily成功后执行"""
print(sys._getframe().f_code.co_name)
@app.task(true & false & ~(true | false))
def do_logic():
"""逻辑执行"""
print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
app.run()
条件逻辑
&
:与|
:或~
:非
import sys
from rocketry import Rocketry
from rocketry.conds import true, false
app = Rocketry()
@app.task(true)
def do_constantly():
print(sys._getframe().f_code.co_name)
@app.task(false)
def do_never():
print(sys._getframe().f_code.co_name)
@app.task(true & false)
def do_and():
"""与"""
print(sys._getframe().f_code.co_name)
@app.task(true | false)
def do_or():
"""或"""
print(sys._getframe().f_code.co_name)
@app.task(~false)
def do_not():
"""非"""
print(sys._getframe().f_code.co_name)
@app.task((true | false) & ~(true | false))
def do_nested():
print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
app.run()
方法对比
执行选项
main
:在主进程和线程中执行(默认)process
:在单独进程中执行thread
:在单独线程中执行async
:异步执行
执行选项 | 是否并发 | 能否被终止 | 能否修改 session |
---|---|---|---|
main | ✖ | ✖ | ✔ |
process | ✔ | ✔ | ✖ |
thread | 部分 | ✔ | ✔ |
async | 部分 | ✔ | ✔ |
threading.current_thread()
:获取当前线程
os.getpid()
:获取当前进程 ID
在主进程和线程中执行
import os
import sys
import threading
from rocketry import Rocketry
app = Rocketry()
@app.task('secondly', execution='main')
def do_main():
"""在主进程和线程中执行"""
print(sys._getframe().f_code.co_name, threading.current_thread(), os.getpid())
if __name__ == '__main__':
app.run()
# do_main <_MainThread(MainThread, started 15676)> 23448
# do_main <_MainThread(MainThread, started 15676)> 23448
# do_main <_MainThread(MainThread, started 15676)> 23448
进程
import os
import sys
import threading
from rocketry import Rocketry
app = Rocketry()
@app.task('secondly', execution='process')
def do_process():
"""在单独进程中执行"""
print(sys._getframe().f_code.co_name, threading.current_thread(), os.getpid())
if __name__ == '__main__':
app.run()
# do_process <_MainThread(MainThread, started 20364)> 14612
# do_process <_MainThread(MainThread, started 25636)> 25996
# do_process <_MainThread(MainThread, started 27000)> 18504
线程
import os
import sys
import threading
from rocketry import Rocketry
app = Rocketry()
@app.task('secondly', execution='thread')
def do_thread():
""""""
print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid())
if __name__ == '__main__':
app.run()
# do_thread <Thread(Thread-1, started 28064)> 26768
# do_thread <Thread(Thread-2, started 3916)> 26768
# do_thread <Thread(Thread-3, started 17328)> 26768
异步
import os
import sys
import asyncio
import threading
from rocketry import Rocketry
app = Rocketry()
@app.task('secondly', execution='async')
async def do_async():
"""异步执行"""
print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid())
async def main():
rocketry_task = asyncio.create_task(app.serve())
await rocketry_task
if __name__ == '__main__':
asyncio.run(main())
# do_async <_MainThread(MainThread, started 22752)> 24976
# do_async <_MainThread(MainThread, started 22752)> 24976
# do_async <_MainThread(MainThread, started 22752)> 24976
设置默认选项
import os
import sys
import threading
from rocketry import Rocketry
app = Rocketry(config={'task_execution': 'thread'})
@app.task('secondly')
def do_thread():
print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid())
if __name__ == '__main__':
app.run()
# do_thread <Thread(Thread-1, started 28064)> 26768
# do_thread <Thread(Thread-2, started 3916)> 26768
# do_thread <Thread(Thread-3, started 17328)> 26768
日志
内置日志格式有:
rocketry.log.MinimalRecord
: 最简略的日志rocketry.log.LogRecord
: 经典的日志元素rocketry.log.TaskLogRecord
: 类似LogRecord
,同时包含开始、结束、运行次数
import os
import datetime
from rocketry import Rocketry
from redbird.repos import CSVFileRepo
from rocketry.log import MinimalRecord, LogRecord, TaskLogRecord
filename = 'logs.csv'
if os.path.exists(filename):
os.remove(filename)
app = Rocketry(logger_repo=CSVFileRepo(filename=filename, model=MinimalRecord))
@app.task('secondly')
def do_things():
print(datetime.datetime.now())
if __name__ == '__main__':
app.run()
流水线
- 在一个任务执行后、成功后、失败后,执行任务
- 将一个任务的输出作为另一个任务的输入
在一个任务后执行
after_success
:成功后after_fail
:失败后after_finish
:完成后
import sys
import random
from rocketry import Rocketry
from rocketry.conds import after_success, after_fail, after_finish
app = Rocketry(execution='main')
@app.task('every 3 seconds')
def do_things():
if random.randint(0, 10) % 2 == 0:
print(sys._getframe().f_code.co_name, '\tfail!')
raise Exception
print(sys._getframe().f_code.co_name, '\tsuccess!')
@app.task(after_success(do_things))
def do_after_success():
"""成功后执行"""
print(sys._getframe().f_code.co_name)
@app.task(after_fail(do_things))
def do_after_fail():
"""失败后执行"""
print(sys._getframe().f_code.co_name)
@app.task(after_finish(do_things))
def do_after_fail_or_success():
"""完成后执行"""
print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
app.run()
输入作为输出
import sys
from rocketry import Rocketry
from rocketry.args import Return
from rocketry.conds import after_success
app = Rocketry(execution='main')
@app.task('every 3 seconds')
def do_first():
print(sys._getframe().f_code.co_name)
return 'Hello World'
@app.task(after_success(do_first))
def do_second(arg=Return(do_first)):
print(sys._getframe().f_code.co_name, arg)
if __name__ == '__main__':
app.run()
会话级参数
有两种参数:
- 任务级别
- 会话级别(多数情况下使用)
Arg
:SimpleArg
:只传递值FuncArg
:会话级函数实参
import sys
from rocketry import Rocketry
from rocketry.args import Arg, SimpleArg
app = Rocketry(execution='main')
app.params(
my_arg='Hello world'
)
@app.task('every 3 seconds')
def do_arg(item=Arg('my_arg')):
print(sys._getframe().f_code.co_name, item)
@app.task('every 3 seconds')
def do_simple_arg(item=SimpleArg('Hello world')):
print(sys._getframe().f_code.co_name, item)
if __name__ == '__main__':
app.run()
函数参数
会话级别
import sys
import datetime
from rocketry import Rocketry
from rocketry.args import Arg
app = Rocketry(execution='main')
@app.param('my_arg')
def get_item():
return datetime.datetime.now()
@app.task('every 3 seconds')
def do_func_arg(item=Arg('my_arg')):
print(sys._getframe().f_code.co_name, item)
if __name__ == '__main__':
app.run()
任务级别
import sys
import datetime
from rocketry import Rocketry
from rocketry.args import FuncArg
app = Rocketry(execution='main')
def get_item():
return datetime.datetime.now()
@app.task('every 3 seconds')
def do_func_arg(item=FuncArg(get_item)):
print(sys._getframe().f_code.co_name, item)
if __name__ == '__main__':
app.run()
TODO:元参数
元参数包含调度系统组件的参数,用于任务操作会话,可关闭调度器或添加、删除任务等
会话参数
from rocketry import Rocketry
from rocketry.args import Session
app = Rocketry(execution='main')
@app.task('every 3 seconds')
def manipulate_session(session=Session()):
print(session)
if __name__ == '__main__':
app.run()
任务参数
from rocketry import Rocketry
from rocketry.args import Task
app = Rocketry(execution='main')
@app.task()
def do_things():
...
@app.task('every 3 seconds')
def manipulate_task(this_task=Task(), another_task=Task('do_things')):
print(this_task)
print(another_task)
if __name__ == '__main__':
app.run()
自定义条件
import sys
from rocketry import Rocketry
from rocketry.conds import daily
app = Rocketry(execution='main')
@app.cond()
def things_ready():
return True or False
@app.task(daily & things_ready)
def do_things():
print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
app.run()
传参,判断文件是否存在
import sys
from pathlib import Path
from rocketry import Rocketry
from rocketry.conds import daily
app = Rocketry(execution='main')
@app.cond()
def file_exists(file):
return Path(file).is_file()
@app.task(daily & file_exists(__file__))
def do_things():
print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
app.run()
传参,判断任务名
import sys
from pathlib import Path
from rocketry import Rocketry
from rocketry.args import Task
from rocketry.conds import daily
app = Rocketry(execution='main')
@app.cond()
def is_right_task(this_task=Task()):
return this_task.name.startswith('do_')
@app.task(daily & is_right_task)
def do_things():
print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
app.run()
元任务
可以在运行时:
- 终止调度器
- 重启调度器
- 强制任务运行
- 禁用任务
- 创建、更新、删除任务
遇到的坑
FutureWarning: Default execution will be changed to ‘async’. To suppress this warning, specify task_execution, ie. Rocketry(execution=‘async’)
实例化 Rocketry 对象时指定 execution,如
from rocketry import Rocketry
app = Rocketry(execution='main')
参考文献
- Rocketry Documentation
- Rocketry GitHub