python中实现定时任务的几种方案

news2025/1/8 19:54:02

目录

  • while True: + sleep()
  • Timeloop库
  • threading.Timer
  • sched模块
  • schedule模块
  • APScheduler框架
  • Celery框架
  • 数据流工具Apache Airflow
    • 概述
    • Airflow 核心概念
    • Airflow 的架构

总结以下几种方案实现定时任务,可根据不同需求去使用不同方案。

while True: + sleep()

利用while True的死循环,加上 sleep()函数让其暂停一段时间,达到每隔一段时间执行特定任务的目的。

比较简单,例子如下:

import datetime
import time


def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    
def loop_monitor():
    while True:
        time_printer()
        time.sleep(5)


if __name__ == "__main__":
    loop_monitor()

主要缺点:

  • 只能设定间隔,不能指定具体的时间
  • sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。

Timeloop库

Timeloop是一个库,可用于运行多周期任务。

Timeloop内部维护了一个任务列表jobs,用来管理所有任务。

可以使用装饰器标记任务,这样就会把任务加到任务列表jobs中,使用start方法启动任务列表重的所有任务。

示例如下:

import time
from timeloop import Timeloop
from datetime import timedelta


tl = Timeloop()


@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
    print("2s job current time : {}".format(time.ctime()))


if __name__ == "__main__":
    tl.start(block=True)

运行后打印如下:

[2023-10-02 09:48:41,926] [timeloop] [INFO] Starting Timeloop..
[2023-10-02 09:48:41,926] [timeloop] [INFO] Registered job <function sample_job_every_2s at 0x7fc3d022d0d0>
[2023-10-02 09:48:41,926] [timeloop] [INFO] Timeloop now started. Jobs will run based on the interval set
2s job current time : Mon Oct  2 09:48:43 2023
2s job current time : Mon Oct  2 09:48:45 2023
2s job current time : Mon Oct  2 09:48:47 2023

同时Timeloop还有个stop方法,可以用来暂停所有任务。

threading.Timer

threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。

主要有如下方法:

方法说明
Timer(interval, function, args=None, kwargs=None)创建定时器
cancel()取消定时器
start()使用线程方式执行
join(self, timeout=None)主线程等待线程执行结束

示例:

import datetime

from threading import Timer


def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    # 注意 Timer 只能执行一次,这里需要循环调用,否则只能执行一次
    loop_monitor()


def loop_monitor():
    t = Timer(5, time_printer)
    t.start()


if __name__ == "__main__":
    loop_monitor()

sched模块

sched模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。

class sched.scheduler(timefunc, delayfunc)这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time),delayfunc应该是一个需要一个参数来调用、与timefunc的输出兼容、并且作用为延迟多个时间单位的函数(常用的如time模块的sleep)。

import datetime
import time
import sched

def time_printer():
    now = datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print('do func time :', ts)
    loop_monitor()
    
def loop_monitor():
    s = sched.scheduler(time.time, time.sleep)  # 生成调度器
    s.enter(5, 1, time_printer, ())
    s.run()

if __name__ == "__main__":
    loop_monitor()

scheduler对象主要方法:

enter(delay, priority, action, argument),安排一个事件来延迟delay个时间单位。
cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个ValueError。
run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件。
比threading.Timer更好,不需要循环调用。

schedule模块

schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。

示例:

import schedule
import time


def job():
    print("I'm working...")
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)


while True:
    schedule.run_pending()
    time.sleep(1)

也可以通过 @repeat() 装饰静态方法:

import time
from schedule import every, repeat, run_pending


@repeat(every().second)
def job():
    print('working...')


while True:
    run_pending()
    time.sleep(1)

传递参数:

import schedule


def greet(name):
    print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')


while True:
    schedule.run_pending()

装饰器同样能传递参数:

from schedule import every, repeat, run_pending


@repeat(every().second, 'World')
@repeat(every().minute, 'Mars')
def hello(planet):
    print('Hello', planet)


while True:
    run_pending()

取消任务:

import schedule

i = 0
def some_task():
    global i
    i += 1
    print(i)
    if i == 10:
        schedule.cancel_job(job)
        print('cancel job')
        exit(0)
job = schedule.every().second.do(some_task)

while True:
    schedule.run_pending()

运行一次任务:

import time
import schedule


def job_that_executes_once():
    print('Hello')
    return schedule.CancelJob


schedule.every().minute.at(':34').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)

根据标签检索任务:

# 检索所有任务:schedule.get_jobs()
import schedule


def greet(name):
    print('Hello {}'.format(name))
    
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

friends = schedule.get_jobs('friend')
print(friends)

根据标签取消任务:

# 取消所有任务:schedule.clear()
import schedule


def greet(name):
    print('Hello {}'.format(name))
    if name == 'Cancel':
        schedule.clear('second-tasks')
        print('cancel second-tasks')
        
schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')

while True:
    schedule.run_pending()

运行任务到某时间:

import schedule
from datetime import datetime, timedelta, time


def job():
    print('working...')
    
    
schedule.every().second.until('23:59').do(job)  # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job)  # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job)  # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job)  # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job)  # 2030-01-01 18:30停止


while True:
    schedule.run_pending()

马上运行所有任务(主要用于测试):

import schedule


def job():
    print('working...')

def job1():
    print('Hello...')


schedule.every().monday.at('12:40').do(job)
schedule.every().tuesday.at('16:40').do(job1)
schedule.run_all()
schedule.run_all(delay_seconds=3)  # 任务间延迟3秒

并行运行:使用 Python 内置队列实现:

import threading
import time
import schedule


def job1():
    print("I'm running on thread %s" % threading.current_thread())
    
def job2():
    print("I'm running on thread %s" % threading.current_thread())
    
def job3():
    print("I'm running on thread %s" % threading.current_thread())
    
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()


schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)

while True:
    schedule.run_pending()
    time.sleep(1)

APScheduler框架

APScheduler(advanceded python scheduler)基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个Python定时任务系统。

具体使用参考文章:APScheduler框架使用

Celery框架

Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度。Celery 的配置比较麻烦,如果你只是需要一个轻量级的调度工具,Celery 不会是一个好选择。

Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。通常使用它来实现异步任务(async task)和定时任务(crontab)。异步任务比如是发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作 ,定时任务是需要在特定时间执行的任务。

具体使用参考:

Celery使用:优秀的python异步任务框架

Django(21):使用Celery任务框架

数据流工具Apache Airflow

概述

Apache Airflow 是Airbnb开源的一款数据流程工具,目前是Apache孵化项目。以非常灵活的方式来支持数据的ETL过程,同时还支持非常多的插件来完成诸如HDFS监控、邮件通知等功能。Airflow支持单机和分布式两种模式,支持Master-Slave模式,支持Mesos等资源调度,有非常好的扩展性。被大量公司采用。

Airflow使用Python开发,它通过DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖。比如,如下的工作流中,任务T1执行完成,T2和T3才能开始执行,T2和T3都执行完成,T4才能开始执行。

在这里插入图片描述
Airflow提供了各种Operator实现,可以完成各种任务实现:

  • BashOperator – 执行 bash 命令或脚本。
  • SSHOperator – 执行远程 bash 命令或脚本(原理同 paramiko 模块)。
  • PythonOperator – 执行 Python 函数。
  • EmailOperator – 发送 Email。
  • HTTPOperator – 发送一个 HTTP 请求。
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,执行 SQL 任务。
  • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…

除了以上这些 Operators 还可以方便的自定义 Operators 满足个性化的任务需求。

一些情况下,我们需要根据执行结果执行不同的任务,这样工作流会产生分支。如:

在这里插入图片描述

这种需求可以使用BranchPythonOperator来实现。

Airflow 核心概念

  1. DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
  2. Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
  3. Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
  4. Task Instance:task的一次运行。Web 界面中可以看到task instance 有自己的状态,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
  5. Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >> Task2,表明Task2依赖于Task2了。通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 工作流(workflow)。

Airflow 的架构

在一个可扩展的生产环境中,Airflow 含有以下组件:

  1. 元数据库:这个数据库存储有关任务状态的信息。
  2. 调度器:Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。
  3. 执行器:Executor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。
  4. Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定。

在这里插入图片描述
Worker的具体实现由配置文件中的executor来指定,airflow支持多种Executor:

  1. SequentialExecutor: 单进程顺序执行,一般只用来测试
  2. LocalExecutor: 本地多进程执行
  3. CeleryExecutor: 使用Celery进行分布式任务调度
  4. DaskExecutor:使用Dask进行分布式任务调度
  5. KubernetesExecutor: 1.10.0新增, 创建临时POD执行每次任务

生产环境一般使用CeleryExecutor和KubernetesExecutor。

使用CeleryExecutor的架构如图:

在这里插入图片描述
使用KubernetesExecutor的架构如图:

在这里插入图片描述

参考:

https://mp.weixin.qq.com/s/dzA9xGoho50WK_-80hzelg

https://zhuanlan.zhihu.com/p/448847300

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1059114.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【2023年11月第四版教材】第17章《干系人管理》(合集篇)

第17章《干系人管理》&#xff08;合集篇&#xff09; 1 章节内容2 管理基础3 管理过程3.1 管理的过程★★★ &#xff08;22上44&#xff09;3.2 管理ITTO汇总★★★ 4 过程1-识别干系人4.1 数据收集★★★4.3数据分析4.4 权力利益方格4.5 数据表现&#xff1a;干系人映射分析…

【数据科学】Scikit-learn[Scikit-learn、加载数据、训练集与测试集数据、创建模型、模型拟合、拟合数据与模型、评估模型性能、模型调整]

这里写目录标题 一、Scikit-learn二、加载数据三、训练集与测试集数据四、创建模型4.1 有监督学习评估器4.1.1 线性回归4.1.2 支持向量机(SVM)4.1.3 朴素贝叶斯4.1.4 KNN 4.2 无监督学习评估器4.2.1 主成分分析(PCA)4.2.2 K Means 五、模型拟合5.1 有监督学习5.2 无监督学习 六…

全新UI彩虹外链网盘系统源码(前后端美化模板)

全新UI彩虹外链网盘系统源码前后端美化模板&#xff0c;支持所有格式文件的上传、生成文件外链、图片外链、音乐视频外链等功能&#xff0c;同时还可以自动生成相应的 UBB 代码和 HTML 代码&#xff0c;支持文本、图片、音乐、视频在线预览。这不仅仅是一个网盘&#xff0c;更是…

竞赛 机器视觉 opencv 深度学习 驾驶人脸疲劳检测系统 -python

文章目录 0 前言1 课题背景2 Dlib人脸识别2.1 简介2.2 Dlib优点2.3 相关代码2.4 人脸数据库2.5 人脸录入加识别效果 3 疲劳检测算法3.1 眼睛检测算法3.2 打哈欠检测算法3.3 点头检测算法 4 PyQt54.1 简介4.2相关界面代码 5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#x…

算法题系列9·最后一个单词的长度

目录 题目描述 实现 题目描述 给你一个字符串 s&#xff0c;由若干单词组成&#xff0c;单词前后用一些空格字符隔开。返回字符串中最后一个单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大子字符串。示例 1&#xff1a; 输入&#xff1a;s "Hello Worl…

WSL安装异常:WslRegisterDistribution failed with error: 0xc03a001a

简介&#xff1a;如果文件夹右上角是否都有两个相对的蓝色箭头&#xff0c;在进行安装wsl时&#xff0c;设置就会抛出 Installing WslRegisterDistribution failed with error: 0xc03a001a的异常 历史攻略&#xff1a; 卸载WSL WSL&#xff1a;运行Linux文件 WSL&#xff1…

关于MAC电脑无法正常登陆H3C iNodes登陆的解决办法

背景 前段时间&#xff0c;单位的网络在做升级改造&#xff0c;网络出口也进行彻底调整同时单位的网络出口设备做了机房物理迁移&#xff0c;迁移后网络正常使用&#xff0c;但是出现自己的MAC电脑无法登陆iNodes问题&#xff0c;总是出现“正在查询SSL 网关参数..查询SSL 网关…

【记录】IDA|IDA怎么查看当前二进制文件自动分析出来的内存分布情况(内存范围和读写性)

IDA版本&#xff1a;7.6 背景&#xff1a;我之前一直是直接看Text View里面的地址的首尾地址来判断内存分布情况的&#xff0c;似乎是有点不准确&#xff0c;然后才想到IDA肯定自带查看内存分布情况的功能&#xff0c;而且很简单。 可以通过View-Toolbars-Segments&#xff0c…

STM32复习笔记(四):独立看门狗IWDG

目录 &#xff08;一&#xff09;简介 &#xff08;二&#xff09;CUBEMX工程配置 &#xff08;三&#xff09;相关函数 总结&#xff1a; &#xff08;一&#xff09;简介 独立看门狗本质是一种定时器&#xff0c;其作用是监视系统的运行&#xff0c;当系统发生错误&…

linux命令行配置音频设备

linux命令行配置音频设备 TLTR在linux命令行播放音乐cmus需要开始声音条件功能才能调节播放的音量&#xff0c;看这个链接&#xff0c;继续折腾&#xff0c;have fun! TLTR 以archLinux为例&#xff0c;把下面软件都装一遍。 sudo pacman -S alsa-utils sudo pacman -S alsa-…

怎么才能实现一个链接自动识别安卓.apk苹果.ipa手机和win电脑wac电脑

您想要实现的功能是通过检测用户代理&#xff08;User Agent&#xff09;来识别访问设备类型并根据设备类型展示相应的页面。您可以根据以下步骤进行实现&#xff1a; 选择后端语言和框架&#xff0c;例如&#xff1a;Node.js、Express。 创建一个新的Express项目。 编写一个…

【单片机】16-LCD1602和12864显示器

1.LCD显示器相关背景 1.LCD简介 &#xff08;1&#xff09;显示器&#xff0c;常见显示器&#xff1a;电视&#xff0c;电脑 &#xff08;2&#xff09;LCD&#xff08;Liquid Crystal Display&#xff09;&#xff0c;液晶显示器&#xff0c;原理介绍 &#xff08;3&#xff…

Django基础讲解-路由控制器和视图(Django-02)

一 路由控制器 参考链接&#xff1a; Django源码阅读&#xff1a;路由&#xff08;二&#xff09; - 知乎 Route路由, 是一种映射关系&#xff01;路由是把客户端请求的 url路径与视图进行绑定 映射的一种关系。 这个/timer通过路由控制器最终匹配到myapp.views中的视图函数 …

[spring] spring core - 配置注入及其他内容补充

[spring] spring core - 配置注入及其他内容补充 上篇 [sping] spring core - 依赖注入 这里主要补一些 core 相关内容补充&#xff0c;同时添加了 java config bean 的方法 java config bean 是除了 XML、java 注解之外另一给实现 DI 的方法 java config bean 这个方法不…

Doctest:让你的测试更简单高效

简介&#xff1a;Doctest 是 Python 标准库的一部分&#xff0c;它允许开发者通过在文档字符串&#xff08;docstrings&#xff09;中编写示例来进行测试。这不仅可以在确保代码正确性的同时编写文档&#xff0c;还可以让读者更容易理解代码的用法和期望的输出。 历史攻略&…

(枚举 + 树上倍增)Codeforces Round 900 (Div. 3) G

Problem - G - Codeforces 题意&#xff1a; 思路&#xff1a; 首先&#xff0c;目标值和结点权值是直接联系的&#xff0c;最值不可能直接贪心&#xff0c;一定是考虑去枚举一些东西&#xff0c;依靠这种枚举可以遍历所有的有效情况&#xff0c;思考的方向一定是枚举 如果去…

云安全之等级保护介绍

网络安全部门概述 网络安全部门 1. 公安部 网安/网警/网监:全称“公共信息网络安全监察”&#xff0c;后改为“网络安全保卫部门”。 简称网监&#xff0c;是中华人民共和国公安部门的一项职责&#xff0c;具体实施这一职责的机构称为网监机关或网监部门(公共信息网络安全监…

kubernetes集群kubeadm方式安装

测试网络和DNS解析 kubectl run busybox --image busybox:1.28 --restart=Never --rm -it busybox -- sh ping www.baidu,com nslookup kubernetes.default.svc.cluster.localbusybox要用指定的1.28版本,不能用最新版本,最新版本,nslookup会解析不到dns和ip 延长kubernetes…

2023腾讯云轻量应用服务器和普通服务器有什么区别?

腾讯云轻量服务器和云服务器有什么区别&#xff1f;为什么轻量应用服务器价格便宜&#xff1f;是因为轻量服务器CPU内存性能比云服务器CVM性能差吗&#xff1f;轻量应用服务器适合中小企业或个人开发者搭建企业官网、博客论坛、微信小程序或开发测试环境&#xff0c;云服务器CV…

力扣 -- 279. 完全平方数(完全背包问题)

解题步骤&#xff1a; 参考代码&#xff1a; 未优化代码&#xff1a; class Solution { public:int numSquares(int n) {const int INF0x3f3f3f3f;int msqrt(n);//多开一行&#xff0c;多开一列vector<vector<int>> dp(m1,vector<int>(n1));//初始化第一行…