Python中定时任务APScheduler库用法详解

news2025/3/9 10:54:53

在日常工作中,常常会用到需要周期性执行的任务,一种方式是采用 Linux 系统自带的 crond 结合命令行实现。另外一种方式是直接使用Python。

当每隔一段时间就要执行一段程序,或者往复循环执行某一个任务,这就需要使用定时任务来执行程序。python中常用的定时任务主要有以下8中方法:

  1. while True:+sleep()
  2. threading.Timer定时器
  3. Timeloop库执行定时任务
  4. 调度模块sched
  5. 调度模块schedule
  6. 任务框架APScheduler
  7. 分布式消息系统celery执行定时任务
  8. 使用windows自带的定时任务

上述8种使用方式详见博客:python实现定时任务的8种方式详解

在本文中,我们主要讲解第6种方式,即任务框架APScheduler的使用方法。

文章目录

  • 1、APScheduler简介
  • 2、APScheduler库安装
  • 3、APScheduler组成
    • 3.1 触发器(trigger)
    • 3.2 作业存储(job store)
    • 3.3 执行器(executor)
    • 3.4 调度器(scheduler)
  • 4、常见的两种调度器
    • 4.1 BlockingScheduler
    • 4.2 BackgroundScheduler
    • 4.3 如果job执行时间过长会怎么样?
    • 4.4 每个job是怎么被调度的
  • 5、使用详情
    • 5.1 interval触发器
    • 5.2 触发器date的使用
    • 5.3 cron触发器
  • 参考博客

1、APScheduler简介

APScheduler是Python的一个定时任务框架,用于执行周期或者定时任务,该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,使用起来非常方便。

APscheduler全称Advanced Python Scheduler,作用为在指定的时间规则执行指定的作业,其是基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。

2、APScheduler库安装

首先安装apscheduler库:

pip install apscheduler

在这里插入图片描述

3、APScheduler组成

  • 触发器(trigger):包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置以外,触发器完全是无状态的。
  • 作业存储(job store):存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据将在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。
  • 执行器(executor):处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
  • 调度器(scheduler):其他的组成部分。通常在应用只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。

3.1 触发器(trigger)

包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了它们自己初始配置以外,触发器完全是无状态的。

APScheduler 有三种内建的 trigger:

  • date: 特定的时间点触发
  • interval: 固定时间间隔触发
  • cron: 在特定时间周期性地触发

简单理解:触发器就是根据你指定的触发方式,比如是按照时间间隔,还是按照 cron触发,触发条件是什么等。每个任务都有自己的触发器。

3.2 作业存储(job store)

如果你的应用在每次启动的时候都会重新创建作业,那么使用默认的作业存储器(MemoryJobStore)即可,但是如果你需要在调度器重启或者应用程序奔溃的情况下任然保留作业,你应该根据你的应用环境来选择具体的作业存储器。例如:使用Mongo或者SQLAlchemy JobStore (用于支持大多数RDBMS)。

任务存储器是可以存储任务的地方,默认情况下任务保存在内存,也可将任务保存在各种数据库中。任务存储进去后,会进行序列化,然后也可以反序列化提取出来,继续执行。

3.3 执行器(executor)

Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。

每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor。找到实际的执行器对象,然后根据执行器对象执行Job。

Executor的选择需要根据实际的scheduler来选择不同的执行器。

处理作业的运行,它们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。

3.4 调度器(scheduler)

Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。

scheduler可根据自身的需求选择不同的组件,如果是使用AsyncIO则选择AsyncIOScheduler,使用tornado则选择TornadoScheduler。

任务调度器是属于整个调度的总指挥官。它会合理安排作业存储器、执行器、触发器进行工作,并进行添加和删除任务等。调度器通常是只有一个的。开发人员很少直接操作触发器、存储器、执行器等。因为这些都由调度器自动来实现了。

4、常见的两种调度器

APScheduler中有很多种不同类型的调度器,BlockingScheduler与BackgroundScheduler是其中最常用的两种调度器。那他们之间有什么区别呢? 简单来说,区别主要在于BlockingScheduler会阻塞主线程的运行,而BackgroundScheduler不会阻塞。所以,在不同的情况下,选择不同的调度器:

  • BlockingScheduler: 调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时(如上例)使用。
  • BackgroundScheduler: 调用start后主线程不会阻塞。当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行。

4.1 BlockingScheduler

示例代码:

import time
from apscheduler.schedulers.blocking import BlockingScheduler
 
 
def job():
    print('job 3s')
 
 
if __name__ == '__main__':
 
    sched = BlockingScheduler(timezone='MST')
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    sched.start()
 
    while True:
        print('main 1s')
        time.sleep(1)

运行结果:
在这里插入图片描述
由上述例子可以看出BlockingScheduler调用start函数后会阻塞当前线程,导致主程序中while循环不会被执行到。

4.2 BackgroundScheduler

示例代码:

import time
from apscheduler.schedulers.background import BackgroundScheduler
 
 
def job():
    print('job 3s')
 
 
if __name__ == '__main__':
 
    sched = BackgroundScheduler(timezone='MST')
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    sched.start()
 
    while True:
        print('main 1s')
        time.sleep(1)

运行结果:
在这里插入图片描述
由上述例子可以发现,调用start函数后,job()并不会立即开始执行。而是等待3s后,才会被调度执行。如何让job在start()后就开始运行?有一种最简单的方式,就是在调度器start之前,就运行一次job()。

4.3 如果job执行时间过长会怎么样?

示例代码:

import time
from apscheduler.schedulers.background import BackgroundScheduler
 
 
def job():
    print('job 3s')
    time.sleep(5)
 
 
if __name__ == '__main__':
 
    sched = BackgroundScheduler(timezone='MST')
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    sched.start()
 
    while True:
        print('main 1s')
        time.sleep(1)

运行结果:Execution of job “job (trigger: interval[0:00:03], next run at: 2022-12-21 07:04:52 MST)” skipped: maximum number of running instances reached (1)

在这里插入图片描述

由上述例子所示,3s时间到达后,并不会“重新启动一个job线程”,而是会跳过该次调度,等到下一个周期(再等待3s),又重新调度job()。

为了能让多个job()同时运行,可以配置调度器的参数max_instances,如下例,允许2个job()同时运行:

实例代码:

import time
from apscheduler.schedulers.background import BackgroundScheduler
 
 
def job():
    print('job 3s')
    time.sleep(5)
 
 
if __name__ == '__main__':
 
    job_defaults = {'max_instances': 2}
    sched = BackgroundScheduler(timezone='MST', job_defaults=job_defaults)
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    sched.start()
 
    while True:
        print('main 1s')
        time.sleep(1)

运行结果:
在这里插入图片描述

上述代码实例参数是全局的,也可以作用于单个任务上面:

示例代码:

import time
from apscheduler.schedulers.background import BackgroundScheduler
 
 
def job():
    print('job 3s')
    time.sleep(5)
 
 
if __name__ == '__main__':
 
    sched = BackgroundScheduler(timezone='MST')
    sched.add_job(job, 'interval', id='3_second_job', seconds=3, max_instances=2)
    sched.start()
 
    while True:
        print('main 1s')
        time.sleep(1)

运行结果:
在这里插入图片描述

4.4 每个job是怎么被调度的

job()函数会被以进程的方式调度运行,还是以线程来运行呢?

实例代码:

import time
import os
import threading
from apscheduler.schedulers.background import BackgroundScheduler
 
 
def job():
    print('job 3s')
    print('job thread_id-{0}, process_id-{1}'.format(threading.get_ident(), os.getpid()))
    time.sleep(5)
 
 
if __name__ == '__main__':
 
    sched = BackgroundScheduler(timezone='MST')
    sched.add_job(job, 'interval', id='3_second_job', seconds=3, max_instances=2)
    sched.start()
 
    while True:
        print('main 1s')
        time.sleep(1)

运行结果:
在这里插入图片描述
由上述例子表明:每个job()的进程ID都相同,每个线程ID都不相同。所以,job()最终是以线程的方式进行调度的。

5、使用详情

在这里插入图片描述
参数说明:

  • id:指定作业的唯一ID
  • name:指定作业的名字
  • trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间, 满足时将会执行
  • executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的 执行器,执行job指定的函数
  • max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行
  • next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间
  • misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
  • coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
  • func:Job执行的函数
  • args:Job执行函数需要的位置参数
  • kwargs:Job执行函数需要的关键字参数

5.1 interval触发器

固定时间间隔触发。interval 间隔调度,参数如下:
在这里插入图片描述
示例代码:

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
 
 
def task():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts)
 
 
def task2():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + ' 666!')
 
 
def task3():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + ' 888!')
 
 
def func():
    # 创建调度器BlockingScheduler()
    scheduler = BlockingScheduler()
    scheduler.add_job(task, 'interval', seconds=3, id='test_job1')
    # 添加任务,时间间隔为5秒
    scheduler.add_job(task2, 'interval', seconds=5, id='test_job2')
    # 在2022-10-27 21:50:30和2022-10-27 21:51:30之间,时间间隔为6秒
    scheduler.add_job(task3, 'interval', seconds=6, start_date='2022-10-27 21:53:00', end_date='2022-10-27 21:53:30', id ='test_job3')
    # 每小时(上下浮动20秒区间内)运行task
    # jitter振动参数,给每次触发添加一个随机浮动秒数,一般适用于多服务器,避免同时运行造成服务拥堵。
    scheduler.add_job(task, 'interval', hours=1, jitter=20, id='test_job4')
    scheduler.start()
 
 
func()

运行结果:
在这里插入图片描述

5.2 触发器date的使用

date 是最基本的一种调度,作业任务只会执行一次。它表示特定的时间点触发。它的参数如下:
在这里插入图片描述
注意:run_date参数可以是date类型、datetime类型或文本类型。

示例代码:

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
 
 
def task():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts)
 
 
def task2():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + '666!')
 
 
def func():
    # 创建调度器BlockingScheduler()
    scheduler = BlockingScheduler()
    scheduler.add_job(task, 'date', run_date=datetime(2022, 10, 27, 21, 39, 00), id='test_job1')
    scheduler.add_job(task2, 'date', run_date=datetime(2022, 10, 27, 21, 39, 50), id='test_job2')
    scheduler.start()
 
 
func()

运行结果:
在这里插入图片描述

5.3 cron触发器

在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。

cron参数:
在这里插入图片描述
表达式:
在这里插入图片描述
示例代码1:

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
 
 
def task():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts)
 
 
def task2():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + ' 666!')
 
 
def task3():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts + ' 888!')
 
 
def func():
    # 创建调度器BlockingScheduler()
    scheduler = BlockingScheduler()
    # 在每年 1-3、7-9 月份中的每个星期一、二中的 00:00, 01:00, 02:00 和 03:00 执行 task 任务
    scheduler.add_job(task, 'cron', month='1-3,7-9', day_of_week='1-2', hour='0-3', id='test_job1')
    scheduler.start()
 
 
func()

示例代码2: 【注意:day_of_week()中mon~sun对应的数字是0-6】

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
 
 
def task():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts)
 
 
def func():
    # 创建调度器BlockingScheduler()
    scheduler = BlockingScheduler()
    # 在每个星期三中的 23:02执行 task 任务
    scheduler.add_job(task, 'cron', day_of_week='2', hour='23', minute='2')
    scheduler.start()
 
 
if __name__ == '__main__':
    func()

运行结果:

在这里插入图片描述

示例代码3: 【注意:day-of_week()中可以定时多天,时分秒可以是字符串也可以是数值型】

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
 
 
def task():
    now = datetime.now()
    ts = now.strftime("%Y-%m-%d %H:%M:%S")
    print(ts)
 
 
def func():
    # 创建调度器BlockingScheduler()
    scheduler = BlockingScheduler()
    # 在每个星期二、三中的 23:11:00执行 task 任务
    scheduler.add_job(task, 'cron', day_of_week='1-2', hour='23', minute='11', second='00')
    # 在每个星期二、三中的 23:11:03执行 task 任务
    scheduler.add_job(task, 'cron', day_of_week='1-2', hour=23, minute=11, second=3)
    # 在每个星期三、四中的 23:11:05执行 task 任务
    scheduler.add_job(task, 'cron', day_of_week='2-3', hour='23', minute='11', second='05')
    # 在每个星期三、四中的 23:11:08执行 task 任务
    scheduler.add_job(task, 'cron', day_of_week='2-3', hour=23, minute=11, second=8)
    scheduler.start()
 
 
if __name__ == '__main__':
    func()

运行结果:
在这里插入图片描述

参考博客

  • python中定时任务apscheduler库用法详解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/894998.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何搭建游戏服务器?有哪些操作步骤

​  选择游戏服务器提供商 为确保游戏服务器的稳定运行和及时响应问题,选择一个正规、靠谱的游戏服务器提供商非常重要。 选择服务器操作系统 根据不同游戏的需求,选择适合的操作系统,通常可选择Linux或WindowsServer操作系统。 上传、安装…

智安网络|零信任安全框架:保障数字化时代网络安全的最佳实践

随着数字化时代的快速发展,网络安全问题变得越来越突出。传统的安全防御模式已经不再适用于现代复杂的网络环境中。为了应对日益增长的网络威胁,零信任安全模式应运而生。 一、什么是零信任? 零信任是一种安全框架和哲学,它基于…

【使用教程】在Ubuntu下运行CANopen通信PMM伺服电机使用教程(NimServoSDK_V2.0.0)

本教程将指导您在Ubuntu操作系统下使用NimServoSDK_V2.0.0来运行CANopen通信的PMM系列一体化伺服电机。我们将介绍必要的步骤和命令,以确保您能够成功地配置和控制PMM系列一体化伺服电机。 NimServoSDK_V2.0.0是一款用于PMM一体化伺服电机的软件开发工具包。它提供了…

第八章LVS中的DR模式详解

1,LVS-DR数据包的流向分析 总结: (1)客户端发送请求到 Director Server(负载均衡器),请求的数据报文(源 IP 是 CIP,目标 IP 是 VIP)到达内核空间。 (2&#…

机器学习基础之《分类算法(3)—模型选择与调优》

作用是如何选择出最好的K值 一、什么是交叉验证(cross validation) 1、定义 交叉验证:将拿到的训练数据,分为训练和验证集。以下图为例:将数据分成5份,其中一份作为验证集。然后经过5次(组)的测试&#x…

「UG/NX」Block UI 体收集器BodyCollector

✨博客主页何曾参静谧的博客📌文章专栏「UG/NX」BlockUI集合📚全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C+&#

springboot引入druid解析sql

一、前言 在开发中&#xff0c;有时我们可能会需要获取SQL中的表名&#xff0c;那么因为不同的数据源类型SQL会存在部分差异&#xff0c;那么我们就可以使用alibaba 的druid包实现不同的数据源类型的sql解析。 二、引入相关maven依赖 <dependency><groupId>com.a…

Python基础语法入门(第二十二天)——并发编程

在Python中&#xff0c;并发编程的实现有多种方式&#xff0c;包括多线程、多进程和异步编程。每一种方式都有其使用的场景和特点。那么如何去选择多线程、多进程和多协程呢&#xff1f;要知道如何选择的话就要了解一下什么是CPU密集型计算、什么是I/O密集型计算&#xff1b;多…

selenium +Jmeter 的性能测试

通过Jmeter快速将已有的Selenium 代码以性能测试的方式组织起来&#xff0c;并使用JMeter 丰富的报表展示测试结果 from selenium import webdriver from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.by import By driver …

JAVA基础知识(三)——数组

数组 数组一、数组的概述1.1 数组的定义1.2 数组的常见概念1.3 数组的特点1.4 数组的分类 二、一维数组的使用2.1 一维数组的声明和初始化2.2 数组的基本使用2.3 数组元素的默认初始化值2.4 数组的内存解析 三、多维数组的使用3.1 二维数组的理解3.2 二维数组的声明3.3 二维数组…

多家企业加入即将在2024年发射的量子卫星SpeQtral-1任务

近日&#xff0c;总部位于新加坡的量子通信技术公司SpeQtral宣布将与纳米航空电子公司NanoAvionics和卫星光子学公司Mbryonics合作执行即将到来的SpeQtral-1量子密钥分发&#xff08;Quantum Key Distribution, QKD&#xff09;卫星任务。NanoAvionics被选为卫星平台提供商&…

算法题面试实战收集

回文数字 2023-08-18 美团 一面 在不使用额外的内存空间的条件下判断一个整数是否是回文。 回文指逆序和正序完全相同。 数据范围&#xff1a; 进阶&#xff1a; 空间复杂度O(1) &#xff0c;时间复杂度 O(n) 提示&#xff1a; 负整数可以是回文吗&#xff1f;&#xff08;比如…

Python数据分析实战-多进程并发处理列表(附源码和实现效果)

实现功能 有15个列表&#xff0c;尝试多进程并发处理&#xff0c;每个列表一个进程&#xff0c;进程数和 CPU 核数一致 实现代码 import multiprocessing有15个列表&#xff0c;尝试多进程并发处理&#xff0c;每个列表一个进程&#xff0c;进程数和 CPU 核数一致def sum_li…

第P1周:实现mnist手写数字识别

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 我的环境&#xff1a; 语言环境&#xff1a;Python3.10.7编译器&#xff1a;VScode深度学习环境&#xff1a;TensorFlow 2.13.0 一、前期工作&#xff1a; …

Element Plus el-table 数据为空时自定义内容【默认为 No Data】

1. 通过 Table 属性设置 <div class"el-plus-table"><el-table empty-text"暂无数据" :data"tableData" style"width: 100%"><el-table-column prop"date" label"Date" width"180" /&g…

VR数字工厂多元化展现,打造数字企业工厂名片

5G时代&#xff0c;各种营销都在走数字化的路子&#xff0c;VR数字工厂用VR赋能工厂数字升级&#xff0c;将企业环境、工厂生产、产品研发、质检运输等流程&#xff0c;无死角720度的展示在客户面前&#xff0c;不仅可以提升自身企业的实力&#xff0c;还可以提高客户的信任感。…

.netcore grpc身份验证和授权

一、鉴权和授权&#xff08;grpc专栏结束后会开启鉴权授权专栏欢迎大家关注&#xff09; 权限认证这里使用IdentityServer4配合JWT进行认证通过AddAuthentication和AddAuthorization方法进行鉴权授权注入&#xff1b;通过UseAuthentication和UseAuthorization启用鉴权授权增加…

虽然每天都在一线coding,但是如果不是经常温习一些面试题,真的有时候会被问蒙

前言 虽然每天都在一线coding,但是如果不是经常温习一些面试题,真的有时候会被问蒙。因为技术的东西是在大多台繁杂了,不经常用就很容易淡忘,但是有经验有能很快处理开发中的问题。技术无止境,人生苦短,35岁是程序员的一道坎,能转管理路线尽量转管理路线。 一、有了 f…

HackNos 3靶场

配置 进入控制面板配置网卡 第一步&#xff1a;启动靶机时按下 shift 键&#xff0c; 进入以下界面 第二步&#xff1a;选择第二个选项&#xff0c;然后按下 e 键&#xff0c;进入编辑界面 将这里的ro修改为rw single init/bin/bash&#xff0c;然后按ctrlx&#xff0c;进入…

暂停Windows更新的方法,可延后数十万年,简单且有手就行

前言 近年来&#xff0c;Windows更新频率过快&#xff0c;最大只能暂停更新5周&#xff0c;导致用户不厌其烦&#xff0c;从网上找到的暂停更新的方法不是过于繁琐就是毫无效果&#xff0c;或者是暂停的时间有限&#xff0c;无意中发现一个大神的帖子可以通过修改注册表信息以达…