Django - 定时任务框架【django-apscheduler】基本使用详解(二)

news2024/11/15 19:28:07

一. 前言

一个网页会有很多数据是不需要经常变动的,比如说首页,变动频率低而访问量大,我们可以把它静态化,这样就不需要每次有请求都要查询数据库再返回,可以减少服务器压力
我们可以使用Django的模板渲染功能完成页面渲染

二. APSchedule/django-apschedule简介

APScheduler的全称是Advanced Python Scheduler。它是一个轻量级的 Python 定时任务调度框架。
APScheduler 支持三种调度任务:

  • 固定时间间隔
  • 固定时间点(日期)
  • Linux 下的 Crontab命令。同时,它还支持异步执行、后台执行调度任务。

特点

1)可以动态添加任务
2)不依赖Linux的crontab系统定时
3)可以对添加的定时任务做持久保存

之前已经介绍过APScheduler的使用,下面介绍的是django-apscheduler的使用

三. 【django-apscheduler】使用

1.安装APScheduler

 pip install django-apscheduler

2. 使用Django_apscheduler步骤

1.创建app

python manage.py startapp test

2. 注册使用

settings.py中注册django-apscheduler和test

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'django_apscheduler',
    'apps.test'	# 注意,我这里是在外层包了一个apps的文件夹,test为业务模块
]

3.在test文件夹中新建urls.py

在子模块apps/test/urls.py中添加如下代码

from django.urls import path
from apps.test import views

urlpatterns = [
]

4. 在项目总路由urls.py中添加test.urls

打开djangoproject中的urls.py,输入如下代码

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('', include('apps.test.urls')),
]

5. 执行迁移

python manage.py makemigrations
python manage.py migrate

没有其他表结构的话不必运行 python manage.py makemigrations
会创建两张表:

  • django_apscheduler_djangojob
  • django_apscheduler_djangojobexecution

通过进入后台管理能方便管理定时任务。

django_apscheduler_djangojob 表保存注册的任务以及下次执行的时间
在这里插入图片描述
这里注意 status為executed是执行,missed 则是表示撞车的场景, 为避免这种场景需要在 周期的长度以及是否进行强制结束进行选择

6. 在test子应用中的urls.py中输入下面的代码

from django.shortcuts import render

# Create your views here.
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job,DjangoResultStoreMixin

# 实例化调度器
scheduler = BackgroundScheduler()
# 开启定时工作
# 调度器使用DjangoJobStore()
scheduler.add_jobstore(DjangoJobStore(), "default")


# 设置定时任务,选择方式为interval,时间间隔为10s
# 另一种方式为每天固定时间执行任务,对应代码为:
# @register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')
@register_job(scheduler, "interval", seconds=10,id='test_job', replace_existing=True)  # replace_existing=解决第二次启动失败的问题
def my_job():
    # 这里写你要执行的任务
    pass

# register_events(scheduler)    最新的django_apscheduler已经不需要这一步
scheduler.start()

最好為job加上id ,不加也可以

注意: 需要加上replace_existing=True 否則會報以下錯誤,即id重複

raise ConflictingIdError(job.id)
apscheduler.jobstores.base.ConflictingIdError: 'Job identifier (index_html) conflicts with an existing job'

提示:也可以不寫在Django工程目录下的urls.py文件中(主urls.py)或者子應用的urls.py 中,百度說可以寫在view.py 中或其他隨便哪個文件中,但是我沒試成功過

7. 运行django項目

python manage.py runserver 8000

常见的三种调度参数

  • date:希望在某个特定时间仅运行一次作业时使用
  • interval:要以固定的时间间隔运行作业时使用
  • cron:以crontab的方式运行定时任务

3. 示例

from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job

scheduler = BackgroundScheduler()
scheduler.add_jobstore(DjangoJobStore(), "default")


# 时间间隔3秒钟打印一次当前的时间
@register_job(scheduler, "interval", seconds=3, id='test_job')
def test_job():
    print("我是apscheduler任务")
# per-execution monitoring, call register_events on your scheduler
register_events(scheduler)
scheduler.start()
print("Scheduler started!")

运行结果

Scheduler started!
我是apscheduler任务
我是apscheduler任务
...

APScheduler中两种调度器的区别及使用过程中要注意的问题

APScheduler中有很多种不同类型的调度器,BlockingScheduler与BackgroundScheduler是其中最常用的两种调度器。区别主要在于BlockingScheduler会阻塞主线程的运行,而BackgroundScheduler不会阻塞。所以,我们在不同的情况下,选择不同的调度器:

  • BlockingScheduler: 调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时(如上例)使用
  • BackgroundScheduler: 调用start后主线程不会阻塞。当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行。

BlockingScheduler的示例

from apscheduler.schedulers.blocking import BlockingScheduler
import time

def job():
    print('job 3s')


if __name__=='__main__':

    sched = BlockingScheduler(timezone='MST')
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    sched.start()

    while(True):
        print('main 1s')
        time.sleep(1)

运行结果

job 3s
job 3s
job 3s
...

可见,BlockingScheduler调用start函数后会阻塞当前线程,导致主程序中while循环不会被执行到。

BackgroundScheduler的例子

from apscheduler.schedulers.background import BackgroundScheduler
import time

def job():
    print('job 3s')


if __name__=='__main__':

    sched = BackgroundScheduler(timezone='MST')
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    sched.start()

    while(True):
        print('main 1s')
        time.sleep(1)

运行结果

main 1s
main 1s
main 1s
job 3s
main 1s
main 1s
main 1s
job 3s
...

可见,BackgroundScheduler调用start函数后并不会阻塞当前线程,所以可以继续执行主程序中while循环的逻辑。

通过这个输出,我们也可以发现,调用start函数后,job()并不会立即开始执行。而是等待3s后,才会被调度执行。

如何让job在start()后就开始运行

其实APScheduler并没有提供很好的方法来解决这个问题,但有一种最简单的方式,就是在调度器start之前,就运行一次job(),如下

from apscheduler.schedulers.background import BackgroundScheduler
import time

def job():
    print('job 3s')


if __name__=='__main__':
    job()
    sched = BackgroundScheduler(timezone='MST')
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    sched.start()

    while(True):
        print('main 1s')
        time.sleep(1)

这样虽然没有绝对做到让job在start()后就开始运行,但也能做到不等待调度,而是刚开始就运行job

如果job执行时间过长会怎么样

如果执行job()的时间需要5s,但调度器配置为每隔3s就调用一下job(),会发生什么情况呢?我们写了如下例子

from apscheduler.schedulers.background import BackgroundScheduler
import time

def job():
    print('job 3s')
    time.sleep(5)

if __name__=='__main__':

    sched = BackgroundScheduler(timezone='MST')
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    sched.start()

    while(True):
        print('main 1s')
        time.sleep(1)

在这里插入图片描述
可见,3s时间到达后,并不会“重新启动一个job线程”,而是会跳过该次调度,等到下一个周期(再等待3s),又重新调度job()。

为了能让多个job()同时运行,我们也可以配置调度器的参数max_instances,如下例,我们允许2个job()同时运行

from apscheduler.schedulers.background import BackgroundScheduler
import time

def job():
    print('job 3s')
    time.sleep(5)

if __name__=='__main__':
    job_defaults = { 'max_instances': 2 }
    sched = BackgroundScheduler(timezone='MST', job_defaults=job_defaults)
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    sched.start()

    while(True):
        print('main 1s')
        time.sleep(1)

我们可看到如下运行结果
在这里插入图片描述

每个job是怎么被调度的

通过上面的例子,我们发现,调度器是定时调度job()函数,来实现调度的。
那job()函数会被以进程的方式调度运行,还是以线程来运行呢?
为了弄清这个问题,我们写了如下程序:

from apscheduler.schedulers.background import BackgroundScheduler
import time,os,threading

def job():
    print('job thread_id-{0}, process_id-{1}'.format(threading.get_ident(), os.getpid()))
    time.sleep(50)

if __name__=='__main__':
    job_defaults = { 'max_instances': 20 }
    sched = BackgroundScheduler(timezone='MST', job_defaults=job_defaults)
    sched.add_job(job, 'interval', id='3_second_job', seconds=3)
    sched.start()

    while(True):
        print('main 1s')
        time.sleep(1)

我们可看到如下运行结果
在这里插入图片描述
可见,每个job()的进程ID都相同,但线程ID不同。所以,job()最终是以线程的方式被调度执行

BlockingScheduler定时任务及其他方式的实现

#BlockingScheduler定时任务
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
首先看看周一到周五定时执行任务
# 输出时间
def job():
  print(datetime.now().strtime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(job, "cron", day_of_week="1-5", hour=6, minute=30)
schduler.start()
 
 
scheduler.add_job(job, 'cron', hour=1, minute=5)
hour =19 , minute =23  这里表示每天的1923 分执行任务
hour ='19', minute ='23'  这里可以填写数字,也可以填写字符串
hour ='19-21', minute= '23'  表示 19:2320:2321:23 各执行一次任务
 
#每300秒执行一次
scheduler .add_job(job, 'interval', seconds=300)
 
#在1月,3月,5月,7-9月,每天的下午2点,每一分钟执行一次任务
scheduler .add_job(func=job, trigger='cron', month='1,3,5,7-9', day='*', hour='14', minute='*')
 
# 当前任务会在 6、7、8、11、12 月的第三个周五的 0、1、2、3 点执行
scheduler .add_job(job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
 
#从开始时间到结束时间,每隔俩小时运行一次
scheduler .add_job(job, 'interval', hours=2, start_date='2018-01-10 09:30:00', end_date='2018-06-15 11:00:00')
 
#自制定时器
 from datetime import datetime
 import time
 # 每n秒执行一次
 def timer(n):
   while True:
     print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
     time.sleep(n)
 
timer(5)

BackgroundScheduler定时任务及其他方式的实现

启动异步定时任务
 import time
 from apscheduler.schedulers.background import BackgroundScheduler
 from django_apscheduler.jobstores import DjangoJobStore, register_events, register_job
 try: 
    # 实例化调度器
    scheduler = BackgroundScheduler()
    # 调度器使用DjangoJobStore()
    scheduler.add_jobstore(DjangoJobStore(), "default")
    # 'cron'方式循环,周一到周五,每天9:30:10执行,id为工作ID作为标记
    # ('scheduler',"interval", seconds=1) #用interval方式循环,每一秒执行一次
    @register_job(scheduler, 'cron', day_of_week='mon-fri', hour='9', minute='30', second='10',id='task_time')
    def test_job():
      t_now = time.localtime()
      print(t_now)
  
   # 监控任务
   register_events(scheduler)
   # 调度器开始
   scheduler.start()
except Exception as e:
  print(e)
  # 报错则调度器停止执行
  scheduler.shutdown()

参考链接:https://www.cnblogs.com/guojie-guojie/p/16330165.html
以上就是python- 定时任务框架【APScheduler】基本使用介绍,希望对你有所帮助!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/706432.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Android Framework系列】第4章 PMS原理

1 PMS简介 PMS(PackageManagerService)是Android提供的包管理系统服务,它用来管理所有的包信息,包括应用安装、卸载、更新以及解析AndroidManifest.xml。通过解析每个安装应用的AndroidManifest.xml,将xml中的数据全部…

Acwing.846 数的重心(DFS)

题目 给定一颗树,树中包含n个结点(编号1~n)和n-1条无向边。 请你找到树的重心,并输出将重心删除后,剩余各个连通块中点数的最大值。 重心定义:重心是指树中的一个结点,如果将这个点删除后,剩余各个连通块中…

Java项目实战——Linux入门

文章目录 一、Linux安装1.1、安装方式介绍1.2、网卡设置1.3、安装SSH连接工具1.4、Linux和windows目录结构对比1.5、Linux目录结构 2、Linux常用命令2.1、Linux命令初体验2.2、使用技巧2.3、命令格式2.4、文件目录操作命令文件目录操作命令ls小知识 文件目录操作命令cat文件目录…

数据倾斜排查

一、问题现象 租户反馈,任务执行时长加长,执行过程中任务卡在 99%,大概率是出现了数据倾斜 二、排查过程 数据倾斜大多数都是大 key 问题导致的。排查方法如下: 1.时间判断 reduce 的时间比其他 reduce 时间长的多,大…

基于STM32的户外环境监测系统的设计

目录 1 引言 1.1 本课题的研究意义 1.2 本课题的研究现状 1.3本课题的发展趋势和研究可行性 1.4本课题主要研究工作 2 系统的概述和相关原理 2.1 系统的概述 2.1.1 总体设计的方案 2.1.2 总体框图 2.2 相关理论 2.2.1 STM32平台 2.2.2 WIFI模块 3 硬件电路设计 8 3…

解决页面等比缩放问题

近些年可视化数据大屏技术早已成熟,在市场上相关技术也是五花八门;通常情况是自行开发,要不找技术比较成熟大厂定制,或者使用较成熟的低代码平台实现。 技术门槛比较低,不过在数据大屏项目实施过程中会发现&#xff0c…

《移动互联网技术》第一章 概述: 掌握移动互联网的基本概念和组成

🌷🍁 博主 libin9iOak带您 Go to New World.✨🍁 🦄 个人主页——libin9iOak的博客🎐 🐳 《面试题大全》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~&#x1f33…

密码找回安全总结-业务安全测试实操(28)

撞库攻击 撞库是黑客通过收集互联网已泄露的用户和密码信息,生成对应的字典表,尝试批量登录其他网站后,得到一系列可以登录的用户名和密码组合。由于很多用户在不同网站使用的是相同的账号和密码,因此黑客可以通过获取用户在 A 网站的账户从而尝试登录B网站,这就可以理解为…

Linux--时间相关的指令:date、cal

一、data显示 date 指定格式显示时间: date %Y:%m:%d date 用法: date [OPTION]... [FORMAT] 1.在显示方面,使用者可以设定欲显示的格式,格式设定为一个加号后接数个标记,其中常用的标记列表如下 %H : 小时(00..2…

threejs动画

个人博客地址: https://cxx001.gitee.io 前面我们所用的模型大都是静态的,没有动画,没有生命。这节我们将赋予它们生命。 动画本质是通过改变物体的旋转、缩放、位置、材质、顶点、面以及其它你所能想到的属性来实现的。这些其实在前面章节示例里或多或…

git 版本控制从入门到精通

文章目录 1、git安装1.1、Linux安装1.2、Windows安装1.3、MAC安装 2、配置git3、git命令使用4、git远程服务器5、提交到远端服务器6、commit合并7、创建分支8、命令练习记录 1、git安装 1.1、Linux安装 在linux上我们建议你用二进制的方式来安装git,可以使用发行版…

electron报错Error: Object has been destroyed

问题描述 在 Electron 中,当一个窗口被销毁后,与该窗口相关联的 JavaScript 对象也会被销毁,再次访问已被销毁的窗口对象时,会导致 Error: Object has been destroyed 错误。 例如之前在写多窗口pinia状态同步 / 多窗口样式同步的…

Redis【实战篇】---- 分布式锁

Redis【实战篇】---- 分布式锁 1. 基本原理和实现方式对比2. Redis分布式锁的实现核心思路3. 实现分布式锁版本一4. Redis分布式锁误删情况说明5. 解决Redis分布式锁误删问题6. 分布式锁的原子性问题7. Lua脚本解决多条命令原子性问题8. 利用Java代码调试Lua脚本改造分布式锁 1…

Python 利用深度学习识别空间推理验证码(一)

注意:本文会比较长,因为空间推理验证码本身比较复杂,我会详细的讲解,我是如何一步一步拆分空间推理的思想去实现的,另外,这里只介绍第一种思想来解决空间推理验证码,实际上,解决该验证码的方法也比较多,这第一种,我会讲解的比较简单,通俗易懂。 注意:下面数据集使用…

Redis主从/哨兵机制原理介绍

目录 ​编辑 一、主从复制 1.1 什么是主从复制 1.2 主从复制的作用 1.3 主从复制原理 1.3.1 全量复制 1.3.2 增量复制 1.3.3 同步流程 二、哨兵机制 2.1 哨兵机制介绍 2.1.1 集群逻辑图 2.1.2 哨兵机制实现的功能 2.2 哨兵机制原理 2.2.1 监控 2.2.2 下线 2.2.2.1 下线流程 2.…

C# csc构建dll 和 csc构建时指定dll

新建一个mydll.cs; using System; using System.Collections.Generic; using System.Linq; using System.Text;namespace myDLL {public class MyMath{public int add(int x, int y){return x y;}public int sub(int x, int y){return x - y;}} } 用下图命令构建…

MySQL高可用

MySQL高可用 一、高可用 1.什么是MHA MHA(MasterHigh Availability)是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。 MHA 的出现就是解决MySQL 单点的问题。 MySQL故障切换过程中,MHA能做到0-30秒内自动完成故障切换操作。 MHA能…

LeetCode·每日一题·2490. 回环句·模拟

作者:小迅 链接:https://leetcode.cn/problems/circular-sentence/solutions/2325227/mo-ni-zhu-shi-chao-ji-xiang-xi-by-xun-ge-x65e/ 来源:力扣(LeetCode) 著作权归作者所有。商业转载请联系作者获得授权&#xff0…

fusionpbx简介

概述 fusionpbx是以freeswitch作为底层框架开发而成的开源PBX,在freeswitch的基础上,优化了GUI的易用性。 fusionpbx可用作高可用性的单租户或基于域的多租户 PBX、运营商级交换机、呼叫中心服务器、传真服务器、voip服务器、语音邮件服务器、会议服务…

admin配置k8s

系列文章目录 文章目录 系列文章目录一、实验1.实验要求2.3. 所有节点安装docker4.所有节点安装kubeadm,kubelet和kubectl5.部署K8S集群6./所有节点部署网络插件flannel7./在master节点查看节点状态 总结 一、实验 1.实验要求 master(2C/4G&#xff0c…