Python任务调度框架Rocketry

news2025/1/19 20:41:10

文章目录

  • 定时任务库对比
  • 简介
  • 与其余框架的区别
  • 安装
  • 初试
  • 调度器基础
    • 测试方法
    • 字符串格式
      • 具体时间间隔
      • 周期
      • 某时间段
    • 条件 API
    • 条件逻辑
    • 方法对比
  • 执行选项
    • 在主进程和线程中执行
    • 进程
    • 线程
    • 异步
    • 设置默认选项
  • 日志
  • 流水线
    • 在一个任务后执行
    • 输入作为输出
    • 会话级参数
      • 函数参数
      • TODO:元参数
    • 自定义条件
  • 元任务
  • 遇到的坑
  • 参考文献

定时任务库对比

推荐阅读 Python timing task - schedule vs. Celery vs. APScheduler

大小优点缺点适用场景
Schedule轻量级易用无配置不能动态添加任务或持久化任务简单任务
Celery重量级①任务队列
②分布式
①不能动态添加定时任务到系统中,如Flask(Django可以)
②设置起来较累赘
任务队列
APScheduler相对重量级①灵活,可动态增删定时任务并持久化
②支持多种存储后端
③集成框架多,用户广
重量级,学习成本大通用
Rocketry轻量级易用功能强尚未成熟,文档不清晰通用




简介

Rocketry 是 Python 的任务调度框架,易用、简洁、强大。可通过 Python 装饰器语法进行任务调度,支持定时、并发(异步、多线程、多进程)、条件触发等。

感觉没有 Celery 和 APScheduler 成熟




与其余框架的区别

常见任务调度框架有:

  • Crontab
  • APScheduler
  • Airflow

Rocketry 的调度程序基于语句,有相同的调度策略,也可以使用自定义调度语句进行扩展。

此外,Rocketry 非常易用,无需复杂配置,但可用于大型应用程序。




安装

pip install rocketry




初试

import datetime

from rocketry import Rocketry

app = Rocketry()


@app.task('every 5 seconds')
def do_things():
    print(datetime.datetime.now())


if __name__ == "__main__":
    app.run()




调度器基础

创建调度器规则的方式,可通过与、或、非组合,还可用于任务的开始、结束、终止

  • 字符串格式
  • 条件 API
  • 条件类



测试方法

判断当前时间是否在 10:00 到 14:00 之间

from rocketry.conds import time_of_day

condition = time_of_day.between('10:00', '14:00')
print(condition.observe())



字符串格式

简单易写,但静态代码分析器无法检查语句是否正确


具体时间间隔

import sys

from rocketry import Rocketry

app = Rocketry()


@app.task('every 10 seconds')
def do_constantly():
    """每10秒执行"""
    print(sys._getframe().f_code.co_name)


@app.task('every 1 minute')
def do_minutely():
    """每1分钟执行"""
    print(sys._getframe().f_code.co_name)


@app.task('every 1 hour')
def do_hourly():
    """每1小时执行"""
    print(sys._getframe().f_code.co_name)


@app.task('every 1 day')
def do_daily():
    """每1天执行"""
    print(sys._getframe().f_code.co_name)


@app.task('every 2 days 2 hours 20 seconds')
def do_custom():
    """每2天2小时20秒执行"""
    print(sys._getframe().f_code.co_name)


if __name__ == '__main__':
    app.run()



周期

import sys

from rocketry import Rocketry

app = Rocketry()


@app.task('secondly')
def do_secondly():
    """每1秒钟执行"""
    print(sys._getframe().f_code.co_name)


@app.task('minutely')
def do_minutely():
    """每1分钟执行"""
    print(sys._getframe().f_code.co_name)


@app.task('hourly')
def do_hourly():
    """每1小时执行"""
    print(sys._getframe().f_code.co_name)


@app.task('daily')
def do_daily():
    """每1天执行"""
    print(sys._getframe().f_code.co_name)


@app.task('weekly')
def do_weekly():
    """每1周执行"""
    print(sys._getframe().f_code.co_name)


@app.task('monthly')
def do_monthly():
    """每1个月执行"""
    print(sys._getframe().f_code.co_name)


if __name__ == '__main__':
    app.run()



某时间段

  • before:之前
  • after:之后
  • between:之间
  • starting:开始
import sys

from rocketry import Rocketry

app = Rocketry()


@app.task('minutely before 45')
def do_minutely():
    """每分钟的45秒前执行"""
    print(sys._getframe().f_code.co_name)


@app.task('hourly after 45:00')
def do_hourly():
    """每小时的45分后执行"""
    print(sys._getframe().f_code.co_name)


@app.task('daily between 08:00 and 14:00')
def do_daily():
    """每天的08:00到14:00这段时间内执行"""
    print(sys._getframe().f_code.co_name)


@app.task('weekly on Monday')
def do_weekly():
    """每周的周一执行"""
    print(sys._getframe().f_code.co_name)


@app.task('monthly starting 3rd')
def do_monthly():
    """每个月的3号开始执行"""
    print(sys._getframe().f_code.co_name)


@app.task('time of day between 10:00 and 18:00')
def do_constantly_during_day():
    """每天的10:00到18:00这段时间内执行"""
    print(sys._getframe().f_code.co_name)


@app.task('time of week between Saturday and Sunday')
def do_constantly_during_weekend():
    """每周的周六到周日这段时间内执行"""
    print(sys._getframe().f_code.co_name)


if __name__ == '__main__':
    app.run()



条件 API

import sys

from rocketry import Rocketry
from rocketry.conds import every, hourly, daily, after_success, true, false

app = Rocketry()


@app.task(every('10 seconds'))
def do_constantly():
    """每10秒执行"""
    print(sys._getframe().f_code.co_name)


@app.task(hourly)
def do_hourly():
    """每1小时执行"""
    print(sys._getframe().f_code.co_name)


@app.task(daily.between('08:00', '14:00'))
def do_daily():
    """每天08:00到14:00执行一次"""
    print(sys._getframe().f_code.co_name)


@app.task(after_success(do_daily))
def do_after():
    """do_daily成功后执行"""
    print(sys._getframe().f_code.co_name)


@app.task(true & false & ~(true | false))
def do_logic():
    """逻辑执行"""
    print(sys._getframe().f_code.co_name)


if __name__ == '__main__':
    app.run()



条件逻辑

  • &:与
  • |:或
  • ~:非
import sys

from rocketry import Rocketry
from rocketry.conds import true, false

app = Rocketry()


@app.task(true)
def do_constantly():
    print(sys._getframe().f_code.co_name)


@app.task(false)
def do_never():
    print(sys._getframe().f_code.co_name)


@app.task(true & false)
def do_and():
    """与"""
    print(sys._getframe().f_code.co_name)


@app.task(true | false)
def do_or():
    """或"""
    print(sys._getframe().f_code.co_name)


@app.task(~false)
def do_not():
    """非"""
    print(sys._getframe().f_code.co_name)


@app.task((true | false) & ~(true | false))
def do_nested():
    print(sys._getframe().f_code.co_name)


if __name__ == '__main__':
    app.run()



方法对比




执行选项

  • main:在主进程和线程中执行(默认)
  • process:在单独进程中执行
  • thread:在单独线程中执行
  • async:异步执行
执行选项是否并发能否被终止能否修改 session
main
process
thread部分
async部分

threading.current_thread():获取当前线程

os.getpid():获取当前进程 ID



在主进程和线程中执行

import os
import sys
import threading

from rocketry import Rocketry

app = Rocketry()


@app.task('secondly', execution='main')
def do_main():
    """在主进程和线程中执行"""
    print(sys._getframe().f_code.co_name, threading.current_thread(), os.getpid())


if __name__ == '__main__':
    app.run()
    # do_main <_MainThread(MainThread, started 15676)> 23448
    # do_main <_MainThread(MainThread, started 15676)> 23448
    # do_main <_MainThread(MainThread, started 15676)> 23448



进程

import os
import sys
import threading

from rocketry import Rocketry

app = Rocketry()


@app.task('secondly', execution='process')
def do_process():
    """在单独进程中执行"""
    print(sys._getframe().f_code.co_name, threading.current_thread(), os.getpid())


if __name__ == '__main__':
    app.run()
    # do_process <_MainThread(MainThread, started 20364)> 14612
    # do_process <_MainThread(MainThread, started 25636)> 25996
    # do_process <_MainThread(MainThread, started 27000)> 18504



线程

import os
import sys
import threading

from rocketry import Rocketry

app = Rocketry()


@app.task('secondly', execution='thread')
def do_thread():
    """"""
    print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid())


if __name__ == '__main__':
    app.run()
    # do_thread <Thread(Thread-1, started 28064)> 26768
    # do_thread <Thread(Thread-2, started 3916)> 26768
    # do_thread <Thread(Thread-3, started 17328)> 26768



异步

import os
import sys
import asyncio
import threading

from rocketry import Rocketry

app = Rocketry()


@app.task('secondly', execution='async')
async def do_async():
    """异步执行"""
    print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid())


async def main():
    rocketry_task = asyncio.create_task(app.serve())
    await rocketry_task


if __name__ == '__main__':
    asyncio.run(main())
    # do_async <_MainThread(MainThread, started 22752)> 24976
    # do_async <_MainThread(MainThread, started 22752)> 24976
    # do_async <_MainThread(MainThread, started 22752)> 24976



设置默认选项

import os
import sys
import threading

from rocketry import Rocketry

app = Rocketry(config={'task_execution': 'thread'})


@app.task('secondly')
def do_thread():
    print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid())


if __name__ == '__main__':
    app.run()
    # do_thread <Thread(Thread-1, started 28064)> 26768
    # do_thread <Thread(Thread-2, started 3916)> 26768
    # do_thread <Thread(Thread-3, started 17328)> 26768




日志

内置日志格式有:

  • rocketry.log.MinimalRecord: 最简略的日志
  • rocketry.log.LogRecord: 经典的日志元素
  • rocketry.log.TaskLogRecord: 类似 LogRecord,同时包含开始、结束、运行次数
import os
import datetime

from rocketry import Rocketry
from redbird.repos import CSVFileRepo
from rocketry.log import MinimalRecord, LogRecord, TaskLogRecord

filename = 'logs.csv'
if os.path.exists(filename):
    os.remove(filename)

app = Rocketry(logger_repo=CSVFileRepo(filename=filename, model=MinimalRecord))


@app.task('secondly')
def do_things():
    print(datetime.datetime.now())


if __name__ == '__main__':
    app.run()




流水线

  • 在一个任务执行后、成功后、失败后,执行任务
  • 将一个任务的输出作为另一个任务的输入



在一个任务后执行

  • after_success:成功后
  • after_fail:失败后
  • after_finish:完成后
import sys
import random

from rocketry import Rocketry
from rocketry.conds import after_success, after_fail, after_finish

app = Rocketry(execution='main')


@app.task('every 3 seconds')
def do_things():
    if random.randint(0, 10) % 2 == 0:
        print(sys._getframe().f_code.co_name, '\tfail!')
        raise Exception
    print(sys._getframe().f_code.co_name, '\tsuccess!')


@app.task(after_success(do_things))
def do_after_success():
    """成功后执行"""
    print(sys._getframe().f_code.co_name)


@app.task(after_fail(do_things))
def do_after_fail():
    """失败后执行"""
    print(sys._getframe().f_code.co_name)


@app.task(after_finish(do_things))
def do_after_fail_or_success():
    """完成后执行"""
    print(sys._getframe().f_code.co_name)


if __name__ == '__main__':
    app.run()



输入作为输出

import sys

from rocketry import Rocketry
from rocketry.args import Return
from rocketry.conds import after_success

app = Rocketry(execution='main')


@app.task('every 3 seconds')
def do_first():
    print(sys._getframe().f_code.co_name)
    return 'Hello World'


@app.task(after_success(do_first))
def do_second(arg=Return(do_first)):
    print(sys._getframe().f_code.co_name, arg)


if __name__ == '__main__':
    app.run()



会话级参数

有两种参数:

  • 任务级别
  • 会话级别(多数情况下使用)
    • Arg
    • SimpleArg:只传递值
    • FuncArg:会话级函数实参
import sys

from rocketry import Rocketry
from rocketry.args import Arg, SimpleArg

app = Rocketry(execution='main')
app.params(
    my_arg='Hello world'
)


@app.task('every 3 seconds')
def do_arg(item=Arg('my_arg')):
    print(sys._getframe().f_code.co_name, item)


@app.task('every 3 seconds')
def do_simple_arg(item=SimpleArg('Hello world')):
    print(sys._getframe().f_code.co_name, item)


if __name__ == '__main__':
    app.run()



函数参数

会话级别

import sys
import datetime

from rocketry import Rocketry
from rocketry.args import Arg

app = Rocketry(execution='main')


@app.param('my_arg')
def get_item():
    return datetime.datetime.now()


@app.task('every 3 seconds')
def do_func_arg(item=Arg('my_arg')):
    print(sys._getframe().f_code.co_name, item)


if __name__ == '__main__':
    app.run()

任务级别

import sys
import datetime

from rocketry import Rocketry
from rocketry.args import FuncArg

app = Rocketry(execution='main')


def get_item():
    return datetime.datetime.now()


@app.task('every 3 seconds')
def do_func_arg(item=FuncArg(get_item)):
    print(sys._getframe().f_code.co_name, item)


if __name__ == '__main__':
    app.run()



TODO:元参数

元参数包含调度系统组件的参数,用于任务操作会话,可关闭调度器或添加、删除任务等

会话参数

from rocketry import Rocketry
from rocketry.args import Session

app = Rocketry(execution='main')


@app.task('every 3 seconds')
def manipulate_session(session=Session()):
    print(session)


if __name__ == '__main__':
    app.run()

任务参数

from rocketry import Rocketry
from rocketry.args import Task

app = Rocketry(execution='main')


@app.task()
def do_things():
    ...


@app.task('every 3 seconds')
def manipulate_task(this_task=Task(), another_task=Task('do_things')):
    print(this_task)
    print(another_task)


if __name__ == '__main__':
    app.run()



自定义条件

import sys

from rocketry import Rocketry
from rocketry.conds import daily

app = Rocketry(execution='main')


@app.cond()
def things_ready():
    return True or False


@app.task(daily & things_ready)
def do_things():
    print(sys._getframe().f_code.co_name)


if __name__ == '__main__':
    app.run()

传参,判断文件是否存在

import sys
from pathlib import Path

from rocketry import Rocketry
from rocketry.conds import daily

app = Rocketry(execution='main')


@app.cond()
def file_exists(file):
    return Path(file).is_file()


@app.task(daily & file_exists(__file__))
def do_things():
    print(sys._getframe().f_code.co_name)


if __name__ == '__main__':
    app.run()

传参,判断任务名

import sys
from pathlib import Path

from rocketry import Rocketry
from rocketry.args import Task
from rocketry.conds import daily

app = Rocketry(execution='main')


@app.cond()
def is_right_task(this_task=Task()):
    return this_task.name.startswith('do_')


@app.task(daily & is_right_task)
def do_things():
    print(sys._getframe().f_code.co_name)


if __name__ == '__main__':
    app.run()




元任务

可以在运行时:

  • 终止调度器
  • 重启调度器
  • 强制任务运行
  • 禁用任务
  • 创建、更新、删除任务




遇到的坑

FutureWarning: Default execution will be changed to ‘async’. To suppress this warning, specify task_execution, ie. Rocketry(execution=‘async’)

实例化 Rocketry 对象时指定 execution,如

from rocketry import Rocketry

app = Rocketry(execution='main')




参考文献

  1. Rocketry Documentation
  2. Rocketry GitHub

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/47878.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

为什么说继承是把双刃剑

为什么说继承是把双刃剑 继承其实是把双刃剑&#xff1a;一方面继承是非常强大的&#xff1b;另一方面继承的破坏力也是很强的。 继承广泛应用于各种Java API、框架和类库之中&#xff0c;一方面它们内部大量使用继承&#xff0c;另一方面它们设计了良好的框架结构&#xff0c…

20221130 RabbitMQ

MQ MQ&#xff08;Message Queue&#xff09;消息队列&#xff0c;是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦&#xff0c;异步消息&#xff0c;流量削峰等问题&#xff0c;实现高性能&#xff0c;高可用&#xff0c;可伸缩和最终一致性架构。 主要的…

【正点原子FPGA连载】第二十三章 DDS信号发生器实验摘自【正点原子】DFZU2EG/4EV MPSoC 之FPGA开发指南V1.0

1&#xff09;实验平台&#xff1a;正点原子MPSoC开发板 2&#xff09;平台购买地址&#xff1a;https://detail.tmall.com/item.htm?id692450874670 3&#xff09;全套实验源码手册视频下载地址&#xff1a; http://www.openedv.com/thread-340252-1-1.html 第二十三章 DDS信…

web网页设计期末课程大作业__公司官网 (曼谷科技 1页)带psd文件

⛵ 源码获取 文末联系 ✈ Web前端开发技术 描述 网页设计题材&#xff0c;DIVCSS 布局制作,HTMLCSS网页设计期末课程大作业 | 家公司官网网站 | 企业官网 | 酒店官网 | 等网站的设计与制 | HTML期末大学生网页设计作业&#xff0c;Web大学生网页 HTML&#xff1a;结构 CSS&…

基于CCE Kubernetes网络与持久化存储实战

✅作者简介&#xff1a; CSDN内容合伙人&#xff0c;全栈领域新星创作者&#xff0c;阿里云专家博主&#xff0c;阿里云问答板块版主&#xff0c;华为云享专家博主&#xff0c;掘金后端评审团成员 &#x1f495;前言&#xff1a; 最近云原生领域热火朝天&#xff0c;那么云原生…

2022年NPDP新版教材知识集锦--【第四章节】(4)

【初始设计与规格阶段】(全部获取文末) 产品设计规格旨在&#xff1a; 明确产品设计并提供量化和客观性; 将产品设计要求传达给设计团队的其他成员; 推进产品从设计到制造的开发。 1、功能性设计(DesignforFunctionality,DFF) 功能设计决定了产品的最终性能&#xff0c;功…

小猴吃苹果-第12届蓝桥杯Scratch选拔赛真题精选

[导读]&#xff1a;超平老师计划推出Scratch蓝桥杯真题解析100讲&#xff0c;这是超平老师解读Scratch蓝桥真题系列的第90讲。 蓝桥杯选拔赛每一届都要举行4~5次&#xff0c;和省赛、国赛相比&#xff0c;题目要简单不少&#xff0c;再加上篇幅有限&#xff0c;因此我精挑细选…

JCTC:基于PWmat中的混合溶剂模型精确计算离子溶解自由能

溶液环境中溶质离子或中间体的自由能计算是电化学研究中最棘手的问题之一。目前单纯的实验手段并不能对发生在溶液中的化学反应过程/机理进行直接探测&#xff0c;许多信息仍主要依赖于理论模拟。对于这一问题&#xff0c;目前很多研究者采用经验势场的溶剂模型方法&#xff0c…

Casein-PEG-Rhodamine B 络蛋白-聚乙二醇-罗丹明B Casein-RB

产品名称&#xff1a;络蛋白-聚乙二醇-罗丹明B 英文名称&#xff1a;Casein-PEG-Rhodamine B 质量控制&#xff1a;95% 原料分散系数PDI&#xff1a;≤1.05 存储条件&#xff1a;-20C&#xff0c;避光&#xff0c;避湿 用 途&#xff1a;仅供科研实验使用&#xff0c;不用于诊…

SAP 电商云 Spartacus UI Configurable Product 的页面设置

关键字 CPQ&#xff0c;Product Configuration&#xff0c;Product Configure&#xff0c;Product Variant 变体是在某些方面彼此不同但基于相同基本模型的产品。 变体的一个示例是 T 恤的颜色和尺寸。 在 Spartacus 中启用变体功能&#xff0c;并在 SAP Commerce Cloud 中配…

基于python的pulp库使用,从基础模型到复杂模型,从一维变量到二维变量

写在前面 学习笔记&#xff0c;仅作参考。 个人觉得配合步骤和建模&#xff0c;直接看代码就能入门pulp&#xff0c;所以没有啥解释&#xff0c;见谅。 参考 https://blog.csdn.net/youcans/article/details/116371416 步骤 1、安装PuLp &#xff08;pip install pulp) 2…

HRD特征及其检测方法简介

HRD特征及其检测方法简介1、HRD背景知识介绍1.1 HRR通路简介1.2 HRR基因突变可导致通路失活和HRD1.3 HRD高发癌种2、HRD的两类主要标志物2.1 致病基因2.2 基因组瘢痕3、HRD检测方法4、全景变异分析&#xff08;CGP&#xff09;5、关键信息6、参考文件1、HRD背景知识介绍 1.1 H…

(4)点云数据处理学习——其它官网例子

1、主要参考 &#xff08;1&#xff09;视频&#xff0c;大佬讲的就是好啊 【Open3D】三维点云python教程_哔哩哔哩_bilibili &#xff08;2&#xff09;官方的github地址 GitHub - isl-org/Open3D: Open3D: A Modern Library for 3D Data Processing &#xff08;3&#…

BUUCTF Misc 被偷走的文件 snake

被偷走的文件 下载文件 wireshark打开&#xff0c;搜索flag字符串 可以看到一个带有flag.rar的FTP包进行TCP流追踪 看来流量中有flag.rar&#xff0c;使用kali中的foremost进行文件分离 发现一个文件夹内有一个需要密码的压缩包 密码是5790&#xff0c;解压 得到flag …

Redis数据结构和类型

Redis 包含五种数据类型&#xff0c;分别为String、List、Hash、Set、ZSet 底层实现的数据结构包SDS、双向链表、压缩列表、哈希表、整数集合、跳表 redis结构图数据类型和数据结构的关系Redis六种数据结构 一、动态字符串(SDS) Redis 是用 C 语言实现的&#xff0c;但是它…

Kotlin高仿微信-第12篇-单聊-图片

Kotlin高仿微信-项目实践58篇详细讲解了各个功能点&#xff0c;包括&#xff1a;注册、登录、主页、单聊(文本、表情、语音、图片、小视频、视频通话、语音通话、红包、转账)、群聊、个人信息、朋友圈、支付服务、扫一扫、搜索好友、添加好友、开通VIP等众多功能。 Kotlin高仿…

STC 51单片机45——51单片机对脉冲计数 汇编 16位除法

部分代码&#xff1a; ORG 0000H LJMP INIT ORG 0003H //外部中断0 LJMP INT0SUB ORG 0013H //外部中断1 LJMP INT1SUB ORG 0100H INIT: CLR P1.0 //控制端复位 …

D-019 EEROM硬件电路设计

EEROM硬件电路设计1 简介1.1 存储器的分类1.2EEPROM的特性2 接口介绍2.1 IIC接口2.2 SPI接口2.3 MicroWire 接口3 EEPROM 和 FLASH4 电路设计实战5 电路设计要点1 简介 1.1 存储器的分类 按照掉电数据是否丢失的特性&#xff0c;存储器可划分为&#xff1a; 易失性存储器&…

快排图文详解:快速排序算法的实现 - 【双边循环法与单边循环法 递归与非递归(栈的方式)的实现】

1.基本介绍 同冒泡排序一样&#xff0c;快速排序&#xff08;Quicksort&#xff09;也属于交换排序&#xff0c;通过元素之间的比较和交换位置来达到排序的目的。但快速排序是对冒泡排序的一种改进。 2.基本思想 关于基本思想&#xff0c;我们在这里先不考虑是如何具体实现的…

nuxtjs生命周期、项目创建、声明式导航与编程式导航、动态路由、嵌套路由、配置式路由、定制默认应用模板、扩展默认布局

文章目录1. 介绍2. 生命周期3. 项目创建4. 声明式导航和编程式导航5. 动态路由参数和验证6. 嵌套路由7. 404页面8. 配置式路由9. 定制默认应用模板10. 扩展默认布局10.1 默认布局10.2 自定义布局10.3 显示错误的布局1. 介绍 Nuxt.js 是一个基于 Vue.js 的通用应用框架。通过对…