系列文章目录
【Django开发入门】ORM的增删改查和批量操作
【Django功能开发】编写自定义manage命令
文章目录
- 系列文章目录
- 前言
- 一、django定时任务
- 二、django-apscheduler基本使用
- 1.安装django-apscheduler
- 2.配置settings.py的INSTALLED_APPS
- 3.通过命令生成定时记录表
- 3.如何创建自定义manage命令
- 4.创建runapscheduler.py文件
- 5.修改任务,上一个定时将被清空
- 6.系统中启动和停止定时
- 7.如何记录进程id
- 8.对自定义manage命令添加参数,控制定时任务的开启和停止
- 二、django-apscheduler参数应用
- 1.jobstore作业存储
- 2.scheduler调度器
- 3.executor执行器
- 4.trigger触发器
- 5.job任务管理
- 第一种方式:调度器条用add_job方法
- 第二种方式:装饰器执行
- 6.监听定时任务
- 总结
前言
顾名思义,定时任务是会在后台一直运行的,但是在使用过程中,我们还是会更新程序的,尤其是在刚开始写定时任务时,会频繁的启动定时程序,但是这样可能有问题的,每次启动程序都是打开了新的定时进程,前边的定时程序可能没有关闭,还在默默的帮你完成之前设定的工作(需要看实际情况)。
网络上虽然有很多教程,其实使用方式都有些问题,真的不敢直接用。这里整理了我自己在django框架中使用定时程序的一点心得,如有需要可自取。
提示:以下是本篇文章正文内容,下面案例可供参考
一、django定时任务
在百度上随便一搜 django定时任务,有很多库都实现了定时任务,比如django-crontab、Celery、django-apscheduler(apscheduler)、time自定义等这些python库,如果在windows系统还可以使用系统自带的定时任务,linux系统自带的crontab任务。
上述这些都可以实现定时任务,在开发中也可能都应用过,本篇博客介绍的是在django框架下开发定时功能,我这里推荐使用的是django-apscheduler,适用于中小系统定时任务功能。
定时任务 | 描述 |
---|---|
crontab | 适用于linux系统 |
django-crontab | 适用于linux系统,基于crontab |
Celery | 适用于linux系统,任务队列,大型项目 |
windows定时 | 适用于windows系统 |
time | 适用于多种系统 |
django-apscheduler | 适用于多种系统,中小型项目 |
二、django-apscheduler基本使用
1.安装django-apscheduler
代码如下(示例):
pip install django-apscheduler
2.配置settings.py的INSTALLED_APPS
代码如下(示例):
INSTALLED_APPS = (
# ...
"django_apscheduler",
)
3.通过命令生成定时记录表
使用python的manage命令,我们应该cd到项目目录下,就是manage.py文件所在的目录。
我们应该使用python manage.py makemigrations 和 python manage.py migrate同步数据库,数据库中将生成2个表,django_apscheduler_djangojob和django_apscheduler_djangojobexecution。
3.如何创建自定义manage命令
自定义命令整理中
4.创建runapscheduler.py文件
runapscheduler.py正是通过自定义创建manange命令的py文件,可以通过python manage.py runapscheduler 启动定时程序。
import logging
from django.conf import settings
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import util
logger = logging.getLogger(__name__)
def my_job():
# Your job processing logic here...
print(123)
pass
def delete_old_job_executions(max_age=604_800):
"""
此作业从数据库中删除早于“max_age”的APScheduler作业执行条目。
它有助于防止数据库中塞满不再有用的旧历史记录。
最长7天
"""
DjangoJobExecution.objects.delete_old_job_executions(max_age)
class Command(BaseCommand):
help = "Runs APScheduler."
def handle(self, *args, **options):
scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), "default")
scheduler.add_job(
my_job,
trigger=CronTrigger(second="*/10"), # Every 10 seconds
id="my_job", # The `id` assigned to each job MUST be unique
max_instances=1,
replace_existing=True,
)
logger.info("Added job 'my_job'.")
scheduler.add_job(
delete_old_job_executions,
trigger=CronTrigger(
day_of_week="mon", hour="00", minute="00"
), # Midnight on Monday, before start of the next work week.
id="delete_old_job_executions",
max_instances=1,
replace_existing=True,
)
logger.info(
"Added weekly job: 'delete_old_job_executions'."
)
try:
logger.info("Starting scheduler...")
scheduler.start()
except KeyboardInterrupt:
logger.info("Stopping scheduler...")
scheduler.shutdown()
logger.info("Scheduler shut down successfully!")
1234这4步是django-apscheduler官网的使用步骤,经过测试,定时程序已经正常运行了。
现在,数据库中已经有了相关定时任务的记录。
5.修改任务,上一个定时将被清空
将步骤4中的runapscheduler文件的my_job改成my_job2,再次python manage.py runapscheduler。
修改的位置
新的定时任务和数据库记录如下:
6.系统中启动和停止定时
在linux系统中,我们希望定时任务后台运行,需要执行python manage.py runapscheduler &。
其实,通过python manage.py命令启动的程序,每次都是开启了一个新的进程,进程中开启一个线程去处理定时任务。这样启动的定时任务跟django的web服务是分开,相互之间影像很低。很多博客将定时卸载views.py文件开始位置,定时程序和web服务是在一个进程上,会出现阻塞的情况。
因为某些原因,我们想关闭定时程序,在命令行输入ps -aux | grep python可以查看通过python运行的程序,并且可以查看是哪一个py文件,我的自定义命令文件是start_crontab.py,这里就找start_crontab.py,在通过kill 命令停止该进程。
ps -aux | grep python
# 下图的红框就是我们的启动命令,蓝框是进程id列,725066就是我们定时任务进程id
kill 725066
7.如何记录进程id
其实无论实在linux还是在windows,我们是可以记录python manage.py runapscheduler任务的进程id的,下边代码会生成一个crontab_pid.txt文件,记录进程id,方便下一次处理和操作。
import os
from apscheduler.schedulers.blocking import BlockingScheduler
from django.core.management import BaseCommand
from feishuapi.tasks import mission_6min
from tianyiapi.settings import BASE_DIR
class Command(BaseCommand):
# 帮助文本, 一般备注命令的用途及如何使用。
help = "更新已有任务的状态"
# 处理命令行参数,可选
def add_arguments(self, parser):
pass
def kill(self, pid):
# 本函数用于中止传入pid所对应的进程
if os.name == 'nt':
# Windows系统
cmd = 'taskkill /pid ' + str(pid) + ' /f'
try:
os.system(cmd)
print(pid, 'killed')
except Exception as e:
print(e)
elif os.name == 'posix':
# Linux系统
cmd = 'kill ' + str(pid)
try:
os.system(cmd)
print(pid, 'killed')
except Exception as e:
print(e)
else:
print('Undefined os.name')
# 核心业务逻辑
def handle(self, *args, **options):
# 1.读取上次保存的pid
file = os.path.join(BASE_DIR, 'crontab_pid.txt')
if os.path.isfile(file):
with open(file, 'r') as f:
pid = f.read()
# 2.如果存在杀死上一次的进程
# print('上一次进程', pid)
if pid:
# 调用kill函数,终止进程
self.kill(pid=pid)
with open(file, 'w+') as f:
# 3.获取当前进程的pid
pid = os.getpid()
# print('当前进程的pid: ', pid)
f.write(pid.__str__())
try:
# 创建调度器BlockingScheduler()
scheduler = BlockingScheduler()
# 添加任务,时间间隔为6分钟
scheduler.add_job(mission_6min, 'interval', minutes=6, id='test_job1')
scheduler.start()
except Exception as e:
print(e)
print('定时任务已启动')
8.对自定义manage命令添加参数,控制定时任务的开启和停止
这个目前还没做,有时间再补上。
二、django-apscheduler参数应用
django-apscheduler和apscheduler一样,我们使用的过程中需要根据不同的场景调整定时任务。
1.jobstore作业存储
scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), "default")
2.scheduler调度器
BlockingScheduler : 当调度器是你应用中唯一要运行的东西时
BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用。
AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。
GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。
TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。
TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用
QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
scheduler.start() # 此处程序不会发生阻塞
scheduler = BlockingScheduler()
scheduler.start() # 此处程序会发生阻塞
3.executor执行器
通常都是使用默认值,特殊需求可以考虑。
# 方式1: 线程
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
'default': ThreadPoolExecutor(20) # 最多20个线程同时执行
}
scheduler = BackgroundScheduler(executors=executors)
# 方式2: 进程
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
'default': ProcessPoolExecutor(3) # 最多3个进程同时运行
}
scheduler = BackgroundScheduler(executors=executors)
4.trigger触发器
APScheduler 有三种内建的 trigger:
date: 特定的时间点触发
interval: 固定时间间隔触发
cron: 在特定时间周期性地触发
from datetime import date
# 在2020年11月11日00:00:00执行
sched.add_job(my_job, 'date', run_date=date(2020, 11, 11))
# 在2020年11月1日16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2020, 11, 11, 16, 30, 5))
sched.add_job(my_job, 'date', run_date='2009-11-06 16:30:05')
# 立即执行
sched.add_job(my_job, 'date')
sched.start()
from datetime import datetime
# 时间间隔可选seconds 、minutes、hours、days、weeks、start_date、end_date 、timezone
# 每两小时执行一次
sched.add_job(job_function, 'interval', hours=2)
# 在2010年10月10日09:30:00 到2014年6月15日的时间内,每两小时执行一次
sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
# 可选周期second、minute、hour、day、day_of_week、month、year、start_date、end_date、timezone
# 在6、7、8、11、12月的第三个周五的00:00, 01:00, 02:00和03:00 执行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 在2014年5月30日前的周一到周五的5:30执行
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
5.job任务管理
通常添加job有2中方式。
第一种方式:调度器条用add_job方法
def my_job():
# Your job processing logic here...
print(123)
pass
job = scheduler.add_job(my_job, 'interval', minutes=2) # 添加任务
第二种方式:装饰器执行
scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), "default")
@register_job(scheduler, "interval", seconds=1)
def dummy_job():
print("I'm a job!")
scheduler.start()
涉及到具体定时任务,可以通过id参数进行管理。
# 方式1: 通过对象
job = scheduler.add_job(myfunc, 'interval', minutes=2) # 添加任务
job.remove() # 移除任务
job.pause() # 暂停任务
job.resume() # 恢复任务
# 方式2: 通过任务id
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 添加任务
scheduler.remove_job('my_job_id') # 移除任务
scheduler.pause_job('my_job_id') # 暂停任务
scheduler.resume_job('my_job_id') # 恢复任务
scheduler.shutdown() # 停止任务
6.监听定时任务
# coding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED
import datetime
import logging
logger = logging.getLogger('job')
logging.basicConfig(level=logging.INFO,
format = '%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt = '%Y-%m-%d %H:%M:%S',
filename = 'mylog.txt',
filemode = 'a')
'''任务1'''
def my_job(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
'''任务2'''
def test_job(x):
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)
print(1 / 0)
def job_listener(Event):
job = scheduler.get_job(Event.job_id)
if not Event.exception:
print('任务正常运行!')
logger.info("jobname=%s|jobtrigger=%s|jobtime=%s|retval=%s", job.name, job.trigger,
Event.scheduled_run_time, Event.retval)
else:
print("任务出错了!!!!!")
logger.error("jobname=%s|jobtrigger=%s|errcode=%s|exception=[%s]|traceback=[%s]|scheduled_time=%s", job.name,
job.trigger, Event.code,
Event.exception, Event.traceback, Event.scheduled_run_time)
scheduler = BlockingScheduler()
scheduler.add_job(func=test_job, args=('一次性任务,会出错',),
next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15), id='date_task')
scheduler.add_job(func=my_job, args=('循环任务',), trigger='interval', seconds=3, id='interval_task')
scheduler.add_listener(job_listener, EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_EXECUTED)
scheduler._logger = logging
scheduler.start()
总结
以上就是我对定时任务的理解,在接下来的工作中可以更好的应用定时任务。