定时任务库的详解与魅力应用:探索schedule的无尽可能性

news2024/11/27 3:50:18

文章目录

  • 摘要
  • 一些示例
  • 其他的示例
    • 向任务传递参数
    • 取消任务
    • 运行一次任务
    • 获取所有任务
    • 取消所有任务
    • 按标签获取多个任务
    • 以随机间隔运行作业
    • 运行一项作业,直到某个时间
    • 距离下次执行的时间
    • 立即运行所有作业,无论其计划如何
    • 在后台运行
    • 多个调度器
  • 记录日志
    • 自定义日志记录
  • 并行执行
  • 局限性

摘要

schedule 是一个 Python 库,用于在 Python 程序中方便地安排定时任务。它提供了一种简单的方式来安排周期性任务,例如定期执行函数或方法。

在这里插入图片描述

以下是 schedule 库的基本用法和示例:

  1. 安装 schedule 库:
pip install schedule
  1. 导入 schedule 库:
import schedule
import time
  1. 定义要定期执行的任务。可以是任何可调用的对象,例如函数或方法。
def job():
    print("I'm working...")
  1. 使用 schedule.every() 方法创建定时任务,并指定任务执行的时间间隔。例如,以下代码将创建一个每隔 5 秒执行一次的任务:
schedule.every(5).seconds.do(job)

你也可以使用其他时间单位,例如 minuteshoursdays。以下是一些示例:

  • 每隔 10 分钟执行一次任务:
schedule.every(10).minutes.do(job)
  • 每隔 2 小时执行一次任务:
schedule.every(2).hours.do(job)
  • 每周一的早上 8 点执行一次任务:
schedule.every().monday.at("08:00").do(job)
  1. 调用 schedule.run_pending() 方法来启动定时任务。通常,你可以在程序的主循环中定期调用此方法,以使任务按照指定的时间间隔执行。以下是一个简单的示例:
while True:
    schedule.run_pending()
    time.sleep(1)

这个循环将每秒钟检查一次是否有定时任务需要执行,并立即执行它们。你可以根据需要调整时间间隔。

以上是 schedule 库的基本用法。它还提供了其他功能和选项,例如暂停和恢复定时任务、取消定时任务等。你可以查看官方文档以获取更多详细信息和示例。
官网:https://schedule.readthedocs.io/en/stable/examples.html
在这里插入图片描述

一些示例

官网的一些例子,我做了翻译!方便大家理解:

import schedule
import time

def job():
    print("I'm working...")

# Run job every 3 second/minute/hour/day/week,
# Starting 3 second/minute/hour/day/week from now
schedule.every(3).seconds.do(job)
schedule.every(3).minutes.do(job)
schedule.every(3).hours.do(job)
schedule.every(3).days.do(job)
schedule.every(3).weeks.do(job)

# 在每分钟的23秒执行job
schedule.every().minute.at(":23").do(job)

# Run job every hour at the 42nd minute/在每个小时的42分执行任务
schedule.every().hour.at(":42").do(job)


#每隔5小时、20分钟和30秒运行一次任务
#如果当前时间是02:00,则第一次执行将在06:20:30进行
schedule.every(5).hours.at("20:30").do(job)


#每天在特定的HH:MM和下一个HH:MM:SS运行任务
schedule.every().day.at("10:30").do(job)
schedule.every().day.at("10:30:42").do(job)
schedule.every().day.at("12:42", "Europe/Amsterdam").do(job)

#在一周的特定某一天运行任务
#在周一执行job
schedule.every().monday.do(job)
#在周三的13:15执行job
schedule.every().wednesday.at("13:15").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

也可以使用装饰器来安排任务
使用@repeat来安排函数。通过与上面相同的语法传递一个间隔,同时省略.do()。代码如下:

from schedule import every, repeat, run_pending
import time

@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")

while True:
    run_pending()
    time.sleep(1)

注:@repeat装饰器不能用于非静态类方法。

其他的示例

向任务传递参数

.do()方法可以将额外的参数传递给任务函数,代码如下:

import schedule

def greet(name):
    print('Hello', name)

schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

装饰器的写法

from schedule import every, repeat

@repeat(every().second, "World")
@repeat(every().day, "Mars")
def hello(planet):
    print("Hello", planet)

取消任务

要从调度器中删除任务,请使用schedule.cancel_job(job)方法,代码如下:

import schedule

def some_task():
    print('Hello world')

job = schedule.every().day.at('22:30').do(some_task)
schedule.cancel_job(job)

定义了一个任务(函数some_task),该任务会打印出"Hello world"。然后,它使用schedule.every().day.at('22:30').do(some_task)将这个任务安排在每天的22:30执行。最后,它使用schedule.cancel_job(job)取消了这个任务。

运行一次任务

从任务返回schedule.CancelJob以将其从调度器中移除。

import schedule
import time

def job_that_executes_once():
    # Do some work that only needs to happen once...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)

这段代码的目的是安排一个任务在每天的22:30执行一次,然后从调度器中取消。

下面是代码的详细解释:

  1. 导入scheduletime模块,以便使用定时任务和时间相关的功能。
  2. 定义一个名为job_that_executes_once的函数,它执行一些只需要执行一次的工作,并返回schedule.CancelJob
  3. 使用schedule.every().day.at('22:30').do(job_that_executes_once)来安排任务。这将在每天的22:30执行job_that_executes_once函数。
  4. 进入一个无限循环,每次循环都检查是否有需要执行的任务,通过调用schedule.run_pending()来实现。
  5. 在每次循环后,使用time.sleep(1)来暂停1秒钟,然后再次检查是否有任务需要执行。

这样,代码将保持运行状态,并在每天的22:30执行一次job_that_executes_once函数,然后取消该任务。

获取所有任务

要从调度器检索所有任务,请使用schedule.get_jobs(),代码如下;

import schedule

def hello():
    print('Hello world')

schedule.every().second.do(hello)

all_jobs = schedule.get_jobs()

取消所有任务

要从调度器中删除所有任务,请使用schedule.clear()

import schedule

def greet(name):
    print('Hello {}'.format(name))

schedule.every().second.do(greet)

schedule.clear()

按标签获取多个任务

您可以从调度器检索一组任务,以唯一标识符进行筛选。

import schedule

def greet(name):
    print('Hello {}'.format(name))

schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

friends = schedule.get_jobs('friend')

定义了一个名为greet的函数,该函数接收一个name参数并打印出问候语。

接下来,代码使用schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')和类似的语句来安排定时任务。它们分别在每天和每小时执行greet函数,并传递不同的名字作为参数。这些任务还使用tag()方法添加了标签,以便后续筛选。

最后,代码使用schedule.get_jobs('friend')来从调度器中检索具有friend标签的所有任务。

以随机间隔运行作业

def my_job():
    print('Foo')

# Run every 5 to 10 seconds.
schedule.every(5).to(10).seconds.do(my_job)

使用schedule.every(5).to(10).seconds.do(my_job)来安排定时任务。这个语句表示每隔5到10秒执行一次my_job函数。

运行一项作业,直到某个时间

import schedule
from datetime import datetime, timedelta, time

def job():
    print('Boo')

# run job until a 18:30 today
schedule.every(1).hours.until("18:30").do(job)
# run job until a 2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# Schedule a job to run for the next 8 hours
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# Run my_job until today 11:33:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)

# run job until a specific datetime
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

距离下次执行的时间

使用schedule. idle_seconds ()来获取到下一个任务被计划运行的秒数。如果下一个计划作业已安排在过去运行,则返回值为负。如果未计划任何作业,则返回None。

import schedule
import time

def job():
    print('Hello')

schedule.every(5).seconds.do(job)

while 1:
    n = schedule.idle_seconds()
    if n is None:
        # no more jobs
        break
    elif n > 0:
        # sleep exactly the right amount of time
        time.sleep(n)
    schedule.run_pending()

立即运行所有作业,无论其计划如何

要运行所有作业,无论它们是否计划运行,请使用schedule.run_all()。在作业完成后,它们会被重新安排,就像使用run_pending()执行它们一样。

import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# Add the delay_seconds argument to run the jobs with a number
# of seconds delay in between.
schedule.run_all(delay_seconds=10)

导入了schedule库,这是一个Python库,可以用来安排周期性的任务。

然后,定义了两个函数job_1job_2。这两个函数都只是简单地打印出字符串'Foo''Bar'

接下来,您使用schedule.every().monday.at("12:40").do(job_1)schedule.every().tuesday.at("16:40").do(job_2)来安排周期性的任务。这表示job_1将在每个周一的12:40执行,而job_2将在每个周二的16:40执行。

使用schedule.run_all()来运行所有已安排的任务。这意味着所有已安排的job_1job_2都将立即执行。

最后一行代码schedule.run_all(delay_seconds=10),这个函数会等待指定的时间(这里是10秒)后再执行任务。

在后台运行

不能直接在后台运行,可以通过创建一个线程实现,代码如下:

import threading
import time

import schedule

def run_continuously(interval=1):
    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):
        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                schedule.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.start()
    return cease_continuous_run


def background_job():
    print('Hello from the background thread')


schedule.every().second.do(background_job)

# Start the background thread
stop_run_continuously = run_continuously()

# Do some other things...
time.sleep(10)

# Stop the background thread
stop_run_continuously.set()

多个调度器

您可以根据需要从单个调度器运行任意多的作业。但是,对于较大的安装,可能希望使用多个调度器。代码如下:

import time
import schedule

def fooJob():
    print("Foo")

def barJob():
    print("Bar")

# Create a new scheduler
scheduler1 = schedule.Scheduler()

# Add jobs to the created scheduler
scheduler1.every().hour.do(fooJob)
scheduler1.every().hour.do(barJob)

# Create as many schedulers as you need
scheduler2 = schedule.Scheduler()
scheduler2.every().second.do(fooJob)
scheduler2.every().second.do(barJob)

while True:
    # run_pending needs to be called on every scheduler
    scheduler1.run_pending()
    scheduler2.run_pending()
    time.sleep(1)

记录日志

将消息记录到Python名为schedule的记录器,其级别为DEBUG。要接收来自Schedule的日志,请将记录级别设置为DEBUG。

import schedule
import logging

logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
schedule_logger.setLevel(level=logging.DEBUG)

def job():
    print("Hello, Logs")

schedule.every().second.do(job)

schedule.run_all()

schedule.clear()

自定义日志记录

向任务添加可重用日志记录的最简单方法是实现一个处理日志记录的装饰器。作为一个例子,下面的代码添加了print_elapsed_time装饰器:

import functools
import time
import schedule

# This decorator can be applied to any job function to log the elapsed time of each job
def print_elapsed_time(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_timestamp = time.time()
        print('LOG: Running job "%s"' % func.__name__)
        result = func(*args, **kwargs)
        print('LOG: Job "%s" completed in %d seconds' % (func.__name__, time.time() - start_timestamp))
        return result

    return wrapper


@print_elapsed_time
def job():
    print('Hello, Logs')
    time.sleep(5)

schedule.every().second.do(job)

schedule.run_all()

并行执行

我正在尝试每10秒执行50个项目,但是从我的日志中可以看出,它按顺序每10秒执行一个项目。有解决方案吗?

默认情况下,schedule按顺序执行所有任务。这样做的原因是很难找到一种使所有人都满意的并行执行模型。

您可以通过在每个线程中运行每个作业来绕过此限制:

import threading
import time
import schedule

def job():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)


while 1:
    schedule.run_pending()
    time.sleep(1)

如果你想更严格地控制线程的数量,使用共享作业队列和一个或多个工作线程:

import time
import threading
import schedule
import queue

def job():
    print("I'm working")


def worker_main():
    while 1:
        job_func = jobqueue.get()
        job_func()
        jobqueue.task_done()

jobqueue = queue.Queue()

schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)

worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while 1:
    schedule.run_pending()
    time.sleep(1)

该模型也适用于分布式应用程序,其中工作进程是从分布式工作队列接收作业的独立进程。

局限性

说句公道话,Schedule并不是一个万能的调度库。这个库是为了解决简单的调度问题而设计的。如果需要以下功能,Schedule可能无法满足:

  • 作业持久性(在重新启动之间记住计划)
  • 确切的时间(秒的精度执行)
  • 并发执行(多个线程)
  • 本地化(工作日或节假日)

Schedule不计算作业函数执行所需的时间。为了确保稳定的执行计划,您需要将长时间运行的作业从主线程(调度器运行的地方)中移动出去。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/940704.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

海康机器人工业相机 Win10+Qt+Cmake 开发环境搭建

文章目录 一. Qt搭建海康机器人工业相机开发环境 一. Qt搭建海康机器人工业相机开发环境 参考这个链接安装好MVS客户端 Qt新建一个c项目 cmakeList中添加海康机器人的库,如下: cmake_minimum_required(VERSION 3.5)project(HIKRobotCameraTest LANG…

微信小程序隐私协议接入

自2023年9月15日起,对于涉及处理用户个人信息的小程序开发者,微信要求,仅当开发者主动向平台同步用户已阅读并同意了小程序的隐私保护指引等信息处理规则后,方可调用微信提供的隐私接口。 相关公告见:关于小程序隐私保…

计算机竞赛 基于机器视觉的二维码识别检测 - opencv 二维码 识别检测 机器视觉

文章目录 0 简介1 二维码检测2 算法实现流程3 特征提取4 特征分类5 后处理6 代码实现5 最后 0 简介 🔥 优质竞赛项目系列,今天要分享的是 基于机器学习的二维码识别检测 - opencv 二维码 识别检测 机器视觉 该项目较为新颖,适合作为竞赛课…

格创校园跑腿小程序独立版v2.0.7+前端

应用介绍 格创校园跑腿SAAS管理系统小程序独立版v2.0.7前端 格创校园跑腿小程序系统独立版是一款基于ThinkPHP6框架开发的多校园专业跑腿平台,可应用至小区、街区、园区、厂区等。 格创校园跑腿小程序系统是校园创业版块热门应用,全新UI界面&#xff0c…

(202308)科研论文配图 task5 安装LaTex + 书籍第二章SciencePlots部

SciencePlots 序言阅读笔记绘图包介绍Windows下安装Windows下的安装MikTexWindows下的安装ghostscript加入系统环境变量安装scienceplots 序言 有幸在这次的组队学习活动中,拜读宁海涛先生的《科研论文配图绘制指南——基于python》一书,这本书文辞亲切…

【网络安全带你练爬虫-100练】第19练:使用python打开exe文件

目录 一、目标1:调用exe文件 二、目标2:调用exe打开文件 一、目标1:调用exe文件 1、subprocess 模块允许在 Python 中启动一个新的进程,并与其进行交互 2、subprocess.run() 函数来启动exe文件 3、subprocess.run(["文件路…

【百草阁送书-第二期】一名阿里服务端开发工程师的进阶之路

文章目录 一、前言二、AI 时代,服务端开发面临新挑战三、服务端开发会被 AI 取代吗?四、知识体系化,构建核心竞争力五、业界首本体系化、全景式解读服务端开发的著作六、参与抽奖方式 一、前言 目前,资讯、社交、游戏、消费、出行…

c++(8.28)菱形继承,虚继承,多态,抽象类,模板+Xmind

xmind: 作业: 1.编程题: 以下是一个简单的比喻,将多态概念与生活中的实际情况相联系: 比喻:动物园的讲解员和动物表演 想象一下你去了一家动物园,看到了许多不同种类的动物,如狮子、大象、猴…

OLED透明屏是什么?什么叫做OLED透明屏的原屏?

OLED透明屏是一种新型的显示技术,具有高对比度、高亮度和能耗低等优势,正被越来越广泛地应用于各个领域中。 在OLED透明屏中,原屏是至关重要的元件之一。本文将深入探讨OLED透明屏原屏的意义、制造过程、品质要求、应用案例和发展趋势&#…

2023-8-28 排列数字(DFS)

题目链接&#xff1a;排列数字 #include <iostream>using namespace std;const int N 10;int n;int path[N];bool st[N];// u 看第几个位置 void dfs(int u) {if(u n){for(int i 0; i < n; i ) cout << path[i] << ;cout << endl;return ;}//…

初试Eureka注册中心

Eureka是spring cloud中的一个负责服务注册与发现的组件。遵循着CAP理论中的A(可用性)P(分区容错性)。一个Eureka中分为eureka server和eureka client。其中eureka server是作为服务的注册与发现中心。 搭建eureka服务 引入eureka依赖 引入SpringCloud为eureka提供的starter依…

0828|C++day6 菱形继承+虚继承+多态+抽象类+模板

一、思维导图 二、今日知识回顾 1&#xff09;多态 父类的指针或者引用&#xff0c;指向或初始化子类的对象&#xff0c;调用子类对父类重写的函数&#xff0c;进而展开子类的功能。 #include <iostream> using namespace std;class Zhou { private:string name;int age…

论文阅读_扩散模型_LDM

英文名称: High-Resolution Image Synthesis with Latent Diffusion Models 中文名称: 使用潜空间扩散模型合成高分辨率图像 地址: https://ieeexplore.ieee.org/document/9878449/ 代码: https://github.com/CompVis/latent-diffusion 作者&#xff1a;Robin Rombach 日期: 20…

Spring补充

一.Spring JDB 配置两个jar包 <!-- spring-jdbc --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>5.2.2.RELEASE</version> </dependency> <!-- 阿里数据…

Python实现T检验

今天来分享一下T检验的python实现方法。 01 先来上一波概念。 1.单样本t检验&#xff0c;又称单样本均数t检验&#xff0c;适用于来自正态分布的某个样本均数与已知总体均数的比较&#xff0c;其比较目的是检验样本均数所代表的总体均数是否与已知总体均数有差别。已知总体均数…

权限提升-数据库提权-MSF-UDF提权

权限提升基础信息 1、具体有哪些权限需要我们了解掌握的&#xff1f; 后台权限&#xff0c;网站权限&#xff0c;数据库权限&#xff0c;接口权限&#xff0c;系统权限&#xff0c;域控权限等 2、以上常见权限获取方法简要归类说明&#xff1f; 后台权限&#xff1a;SQL注入,数…

PVE 8.0.4 配置记录

前言 七夕收到了媳妇送的礼物 Beelink SER 5 PRO (Ryzen 5700U), 记录打造成私人服务器的过程. 下载安装 Proxmox 8.0.4 https://www.proxmox.com/en/downloads 安装过程中修改磁盘设置: swap 分区设置为物理内存的 2 倍, 防止虚机太多内存不足 root 最大设置为 32 GB, 多了…

SpringCloud入门——微服务调用的方式 RestTemplate的使用 使用nacos的服务名初步(Ribbon负载均衡)

目录 引出微服务之间的调用几种调用方法spring提供的组件 RestTemplate的使用导入依赖生产者模块单个配置的情况多个配置的情况没加.yaml的报错【报错】两个同名配置【细节】 完整代码config配置主启动类controller层 消费者模块进行配置restTemplate配置类controller层 使用na…

云渲染对本地电脑要求高不高?对配置有要求吗?

自己本地电脑渲不动&#xff0c;又没有用过云渲染的朋友们一般都会有这样的疑问&#xff1a;云渲染对电脑要求高不高&#xff1f;需要什么样的配置才能用上云渲染&#xff1f; 其实云渲染对本地电脑的配置是完全没有要求的&#xff0c;相反它还能减轻你本地电脑的运行负担&…

Linux驱动——模块化编程

文章目录 模块化编程方法一方法二前提模块化编程模块化编程基本框架&#xff08;重要&#xff09;模块化编程的编译ubantu上操作开发板的文件系统的指令——make超级终端上的模块指令 多模块情况1情况2 传递参数传递单个参数传递数组 模块化编程 方法一 修改kconfig&#xff…