APScheduler框架使用

news2025/1/10 16:48:12

目录

  • 概述
  • 架构
  • 重要概念
    • Job 作业
    • Trigger 触发器
    • Executor 执行器
    • Jobstore
    • Event 事件
    • 调度器
  • 工作流程
  • 使用

概述

APScheduler(advanceded python scheduler)基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个Python定时任务系统。

它有以下三个特点:

  • 类似于 Liunx Cron 的调度程序(可选的开始/结束时间)
  • 基于时间间隔的执行调度(周期性调度,可选的开始/结束时间)
  • 一次性执行任务(在设定的日期/时间运行一次任务)

架构

APScheduler有四种组成部分:

  • 触发器(trigger) 包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。
  • 作业存储(job store) 存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。
  • 执行器(executor) 处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
  • 调度器(scheduler) 是其他的组成部分。你通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。通过配置executor、jobstore、trigger,使用线程池(ThreadPoolExecutor默认值20)或进程池(ProcessPoolExecutor 默认值5)并且默认最多3个(max_instances)任务实例同时运行,实现对job的增删改查等调度控制。

在这里插入图片描述

重要概念

Job 作业

Job作为APScheduler最小执行单位。创建Job时指定执行的函数,函数中所需参数,Job执行时的一些设置信息。

  • id:指定作业的唯一ID
  • name:指定作业的名字
  • trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间, 满足时将会执行
  • executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的 执行器,执行job指定的函数
  • max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行
  • next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间
  • misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
  • coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
  • func:Job执行的函数
  • args:Job执行函数需要的位置参数
  • kwargs:Job执行函数需要的关键字参数

Trigger 触发器

Trigger绑定到Job,在scheduler调度筛选Job时,根据触发器的规则计算出Job的触发时间,然后与当前时间比较确定此Job是否会被执行,总之就是根据trigger规则计算出下一个执行时间。

目前APScheduler支持触发器:

  • 指定时间的DateTrigger
  • 指定间隔时间的IntervalTrigger
  • 像Linux的crontab一样的CronTrigger。
  1. 指定时间的触发器

date定时,作业只执行一次。

如下:

sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])
  1. 指定间隔时间的触发器

interval间隔调度

  • weeks (int) – 间隔几周
  • days (int) – 间隔几天
  • hours (int) – 间隔几小时
  • minutes (int) – 间隔几分钟
  • seconds (int) – 间隔多少秒
  • start_date (datetime|str) – 开始日期
  • end_date (datetime|str) – 结束日期
  • timezone (datetime.tzinfo|str) – 时区

如下:

sched.add_job(job_function, 'interval', hours=2)
  1. crontab表达式的触发器

cron调度

  • (int|str) 表示参数既可以是int类型,也可以是str类型
  • (datetime | str) 表示参数既可以是datetime类型,也可以是str类型
  • year (int|str) – 4-digit year -(表示四位数的年份,如2008年)
  • month (int|str) – month (1-12) -(表示取值范围为1-12月)
  • day (int|str) – day of the (1-31) -(表示取值范围为1-31日)
  • week (int|str) – ISO week (1-53) -(格里历2006年12月31日可以写成2006年-W52-7(扩展形式)或2006W527(紧凑形式))
  • day_of_week (int|str) – number or name of weekday (0-6 or
  • mon,tue,wed,thu,fri,sat,sun) – (表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示)
  • hour (int|str) – hour (0-23) – (表示取值范围为0-23时)
  • minute (int|str) – minute (0-59) – (表示取值范围为0-59分)
  • second (int|str) – second (0-59) – (表示取值范围为0-59秒)
  • start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示开始时间)
  • end_date (datetime|str) – latest possible date/time to trigger on (inclusive) – (表示结束时间)
  • timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示时区取值)

CronTrigger可用的表达式:
在这里插入图片描述
示例:

# 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每周一到周五运行 直到2024-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'

Executor 执行器

Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor找到实际的执行器对象,然后根据执行器对象执行Job。

Executor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor,如果选择tornado作为调度的库,选择TornadoExecutor,如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以。

Executor的选择需要根据实际的scheduler来选择不同的执行器。目前APScheduler支持的Executor:

  • executors.asyncio:同步io,阻塞
  • executors.gevent:io多路复用,非阻塞
  • executors.pool: 线程ThreadPoolExecutor和进
  • ProcessPoolExecutor
  • executors.twisted:基于事件驱动

Jobstore

Jobstore在scheduler中初始化,另外也可通过scheduler的add_jobstore动态添加Jobstore。每个jobstore都会绑定一个alias,scheduler在Add Job时,根据指定的jobstore在scheduler中找到相应的jobstore,并将job添加到jobstore中。作业存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。APScheduler支持的任务存储器有:

  • jobstores.memory:内存
  • jobstores.mongodb:存储在mongodb
  • jobstores.redis:存储在redis
  • jobstores.rethinkdb:存储在rethinkdb
  • jobstores.sqlalchemy:支持sqlalchemy的数据库如mysql,sqlite
  • jobstores.zookeeper:zookeeper

不同的任务存储器可以在调度器的配置中进行配置(见调度器)。

Event 事件

vent是APScheduler在进行某些操作时触发相应的事件,用户可以自定义一些函数来监听这些事件,当触发某些Event时,做一些具体的操作。常见的比如。Job执行异常事件 EVENT_JOB_ERROR。Job执行时间错过事件 EVENT_JOB_MISSED。

目前APScheduler定义的Event:

  • EVENT_SCHEDULER_STARTED
  • EVENT_SCHEDULER_STAR
  • EVENT_SCHEDULER_SHUTDOWN
  • EVENT_SCHEDULER_PAUSED
  • EVENT_SCHEDULER_RESUMED
  • EVENT_EXECUTOR_ADDED
  • EVENT_EXECUTOR_REMOVED
  • EVENT_JOBSTORE_ADDED
  • EVENT_JOBSTORE_REMOVED
  • EVENT_ALL_JOBS_REMOVED
  • EVENT_JOB_ADDED
  • EVENT_JOB_REMOVED
  • EVENT_JOB_MODIFIED
  • EVENT_JOB_EXECUTED
  • EVENT_JOB_ERROR
  • EVENT_JOB_MISSED
  • EVENT_JOB_SUBMITTED
  • EVENT_JOB_MAX_INSTANCES

Listener表示用户自定义监听的一些Event,比如当Job触发了EVENT_JOB_MISSED事件时可以根据需求做一些其他处理。

调度器

Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。

APScheduler支持的调度器方式如下,比较常用的为BlockingSchedulerBackgroundScheduler

  • BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
  • BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。
  • AsyncIOScheduler:适用于使用了asyncio模块的应用程序。
  • GeventScheduler:适用于使用gevent模块的应用程序。
  • TwistedScheduler:适用于构建Twisted的应用程序。
  • QtScheduler:适用于构建Qt的应用程序。

工作流程

Scheduler添加job流程:
在这里插入图片描述

Scheduler调度流程:

在这里插入图片描述

使用

一个简单的示例如下:

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime


# 输出时间
def job():
    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# 创建BlockingScheduler
sched = BlockingScheduler()
# 添加任务
sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
# 启动任务
sched.start()

主要步骤如下:

  1. 创建调度器
sched = BlockingScheduler()
  1. 添加job

第一种方法是最常用的,第二种方法通过声明 job 而不修改应用程序运行时是最为方便的;add_job() 方法返回一个 apscheduler.job.Job 实例,我们可以用它来在之后修改或移除 job;我们可以随时调度 scheduler 里的 job,如果添加 job 时,scheduler 尚未运行,job 会被临时地进行排列,直到 scheduler 启动之后,它的首次运行时间才会被确切地计算出来。

注意:

① 如果你调度的 job 在一个持久化的 job store 里,当你初始化你的应用程序时,你 必须 为 job 定义一个显式的 ID 并使用 replace_existing=True ,否则每次你的应用程序重启时你都会得到那个 job 的一个新副本

② 如果想马上运行 job ,请在添加 job 时省略 trigger 参数

③ 如果我们的执行器或任务储存器是会序列化任务的,那么这些任务就必须符合:1-回调函数必须全局可用;2-回调函数参数必须也是可以被序列化的

  1. 移除job

当从 scheduler 中移除一个 job 时,它会从关联的 job store 中被移除,不再被执行;有两种途径可以移除 job:

① 通过 job 的 ID 以及 job store 的别名来调用 remove_job() 方法

② 对你在 add_job() 中得到的 job 实例调用 remove() 方法

后者看起来更方便,实际上它要求你必须将调用 add_job() 得到的 Job 实例存储在某个地方;而对于通过 scheduled_job() 装饰器来调度 job 的就只能使用第一种方法;

如果一个 job 完成了调度(例如它的触发器不会再被触发),它会自动被移除。

# 移除job任务
myjob.remove()
bgsched.remove_job(job_id='00001') # jobstore=None
  1. 暂停和恢复job
# 暂停job
myjob.pause()

# 恢复job
myjob.resume()
  1. 获取作业调度列表

通过get_jobs方法获取调度列表:bgsched.get_jobs()。

  1. 修改job
# 修改job相关信息,id不能被修改
myjob.modify(name='modity_name')
  1. 终止调度

默认情况下,scheduler 会终止其 job store 以及 executor,然后等待所有目前执行的 job 完成后自行终止,如果不想等待可以设置wait为False。

# 终止调度
bgsched.shutdown(wait=False)
  1. 启动调度
bgsched.start()

注意:有些timezone时区可能会有夏令时的问题;这个可能导致令时切换时,任务不执行或任务执行两次;避免这个问题,可以使用UTC时间,或提前预知并规划好执行的问题。

from pytz import utc
sched = BlockingScheduler(timezone=utc)

参考:
https://blog.51cto.com/u_16147578/6396384
https://zhuanlan.zhihu.com/p/491679794

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1057489.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络层常见协议——IPV4、IPV6、ARP、ICMP、QoS

目录 1、IPV4 协议 IPV4 地址的组成: IPV4地址的分类: 关于多播和组播: 常见组播地址分类: 特殊的 IPV4 地址: 私有地址和公有地址: 私有地址的范围: 子网划分: 子网掩码&…

Codeforces Round 665 (Div. 2) (A-F)

A.Problem - A - Codeforces (1)题意 给你个X轴,初始A点在n这个位置,O在源点0,问你要把B放在哪才能让|AB-BO| k,最小化A需要移动多少次。 (2)思路 直接分情况套路即可。 &#xff0…

uwb人员定位系统:人员轨迹实时定位

UWB定位系统是一种基于超宽带技术的定位系统。它与传统的通信技术不同,不需要使用载波,而是通过发送和接收具有纳秒或微妙级以下的极窄脉冲来实现无线传输。这种系统的优势包括低功耗、对信道衰落不敏感、抗环境能力强、不会对同一环境下的其他设备造成影…

LINUX 基本命令

​ 一 展示当前目录下的内容 $ ls查看当前目录 pwd // /home/winnie二 mkdir 三 touch-cat-more 进入翻页后 通过键盘 q 退出翻页 b 翻上一页 四 cp mv rm mv 如果 mv 2.txt 5.txt (因为5.txt不存在,或者说他是个文件不是路径),故此操作将2.txt更名…

openfire 4.7.5 Web插件开发

文章目录 1、openfire服务端下载安装1.1、openfire解压运行1.2、Spark安装和登录 2、openfire插件开发2.1、基于servlet开发http接口2.2、基于Jersey开发http接口2.3、WEB UI页面开发2.4、上传插件2.4.1、访问servlet接口效果2.4.2、访问Jersey接口效果2.4.3、访问页面效果 Ope…

撰写博客的工具记录

文章目录 前言TyporaPicgoGitee 免费图床ScreenToGifPointofix和Snipaste墨滴社区小结 前言 本文主要目的是记录和分析自己在写博客和相关文档时所用到的工具,单纯记录。按照一篇文章从0到发布的场景进行叙述。 Typora Typora是一款Markdown编辑器。Markdown的优…

游戏制作资源推荐

教程 创建僵尸第一人称射击游戏 | 虚幻引擎 5 初学者教程https://www.youtube.com/watch?vqOam3QjGE8g ​​​​​​​ 虚幻商城免费资产 人物资产 各种角色应有尽有 关键词:paragon ;推荐程度:三颗星

C. MEX Repetition

题目:样例: 输入 5 1 2 1 3 1 0 1 3 2 2 0 2 5 5 1 2 3 4 5 10 100 5 3 0 4 2 1 6 9 10 8输出 1 2 0 1 2 1 2 3 4 5 0 7 5 3 0 4 2 1 6 9 10 思路: 从题目和样例中,我们可以知道,从一个数组中,按照包括0的自…

leetCode 121. 买卖股票的最佳时机 贪心算法

给定一个数组 prices ,它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。 你只能选择 某一天 买入这只股票,并选择在 未来的某一个不同的日子 卖出该股票。设计一个算法来计算你所能获取的最大利润。 返回你可以从这笔交易中获取的最大利润。…

2023年汉字小达人市级比赛在线模拟题来了,四种练习助力好成绩

2023年第十届汉字小达人比赛区级自由报名活动已于9月30日结束,尽管最终晋级市级比赛的名单还需要在11月初发布(有一些学校的校级选拔还没结束),但是许多小朋友已经开始准备市级比赛了。 根据往年的经验,实际比赛也是在…

软断言你也学不会

断言是测试用例的一部分,也是测试工程师开发测试用例的核心。断言通常集成在单元测试和集成测试中,断言分为硬断言和软断言。 硬断言是我们狭义上听到的普通断言:当用例运行后得到的[实际]结果与预期结果不匹配时,测试框架将停止测试执行并抛…

华为云云耀云服务器L实例评测|ClickHouse部署及压测

文章目录 前言📣 1.前言概述📣 2.安全设置📣 3.ClickHouse安装✨ 3.1 申请服务器✨ 3.2 安装前准备✨ 3.3 RPM安装包✨ 3.4 配置文件✨ 3.5 使用ClickHouse 📣 4.ClickHouse压测✨ 4.1 下载数据✨ 4.2 解压数据✨ 4.3 创建数据库和…

(c++)类和对象 下篇

目录 1.再次了解构造函数 2. Static成员 3. 友元 4. 内部类 5.匿名对象 6.拷贝对象时的一些编译器优化 1.再次了解构造函数 1.1 构造函数体赋值 在创建对象时,编译器通过调用构造函数,给对象中各个成员变量一个合适的初始值。 class Date { pub…

火山引擎 ByteHouse:如何提升 18000 节点的 ClickHouse 可用性?

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群 ClickHouse 是业内被广泛使用的 OLAP 引擎。当集群规模过大时,ClickHouse 则面临使用局限性的问题。如何提升 ClickHouse 的可用性,成为困扰…

Python实时采集Windows CPU\MEMORY\HDD使用率

文章目录 安装psutil库在Python脚本中导入psutil库获取CPU当前使用率,并打印结果获取内存当前使用率,并打印结果获取磁盘当前使用情况,并打印结果推荐阅读 要通过Python实时采集Windows性能计数器的数据,你可以使用psutil库。psut…

怎么通过portainer部署一个vue项目

这篇文章分享一下今天通过docker打包vue项目,并使用打包的镜像在portainer上部署运行,参考了vue-cli和docker的官方文档。 首先,阅读vue-cli关于docker部署的说明 vue-cli关于docker部署的说明https://cli.vuejs.org/guide/deployment.html#…

SpringCloudAlibaba 相关组件的学习一

目录 前言 系统架构演变 1、单体架构 2、垂直架构 3、分布式架构 4、SOA架构 5、微服务架构 一、微服务架构的介绍 1、微服务架构的常见问题 2 微服务架构的常见概念 2.1 服务治理 2.2 服务调用 2.3 服务网关 2.4 服务容错 2.5 链路追踪 3、微服务架构的常用解决…

第2篇 机器学习基础 —(1)机器学习方式及分类、回归

前言:Hello大家好,我是小哥谈。机器学习是一种人工智能的分支,它使用算法和数学模型来使计算机系统能够从经验数据中学习和改进,而无需显式地编程。机器学习的目标是通过从数据中发现模式和规律,从而使计算机能够自动进…

黑马mysql教程笔记(mysql8教程)基础篇——数据库相关概念、mysql安装及卸载、数据模型、SQL通用语法及分类(DDL、DML、DQL、DCL)

参考文章1:https://www.bilibili.com/video/BV1Kr4y1i7ru/ 参考文章2:https://dhc.pythonanywhere.com/article/public/1/ 文章目录 基础篇数据库相关概念(数据库DataBase(DB)、数据库管理系统DataBase Management Sy…

25种ACM模式输入输出模板,支持C++、Java、Python、Go、JS版本

很多录友苦于不熟悉 ACM 输入输入结构,在笔试和面试的时候,处理数据输入输出就花费了大量的时间,以至于算法题没写完,甚至是 根本就写不对输入输出的方式。 下面,我针对常见的25种 ACM输入与输出方式,给大…