定时任务(python)

news2025/4/4 15:03:55

介绍

🧩 什么是“定时任务”?

定时任务,就是按照设定的时间间隔或时间点自动执行某些操作。比如:

•	每天早上8点发通知
•	每隔10秒采集一次数据
•	每小时清理一次缓存

相关使用

✅ 最简单的方式:while True + time.sleep()

import time

def job():
    print("执行任务")

while True:
    job()
    time.sleep(10)  # 每10秒执行一次

✅ 优点:

• 写法简单,不需要任何依赖

• 控制力强

❌ 缺点:

会阻塞当前线程

• 精度差(任务执行时间会影响间隔)

• 没有“精确到几点几分”的调度能力

✅ 进阶方案:使用 schedule 库

pip install schedule
import schedule
import time

def job():
    print("每隔5秒执行一次")

schedule.every(5).seconds.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
    
    
>>>>>>>>>
schedule.every().day.at("10:30").do(job) 每天10:30
schedule.every().monday.at("09:00").do(job) 每周一上午9

• schedule.run_pending() 会检查:是否有任务应该执行

• 如果是,就执行对应的 job()

• 然后 time.sleep(1) 让主线程休息 1 秒,再循环检查

所以:任务是每5秒一次,不是“休息6秒”。sleep(1) 是用于“轮询检测任务是否该执行”,并不会影响任务周期本身。

✅ 优点:

• 语法优雅、简单

• 支持 every().minutes, every().day.at(“10:00”) 这种写法

• 适合小型任务管理

✅ 更强大的方案:使用 APScheduler

from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print("每5秒执行一次")

scheduler = BlockingScheduler() # 创建一个阻塞型调度器实例 
scheduler.add_job(job, 'interval', seconds=5) # 每隔5秒调度一次 job 函数
scheduler.start()  # 启动调度器(这个会阻塞主线程)

内部原理确实 本质上就是一个“定时任务调度器 + 任务轮询器”

APScheduler 提供了多种调度器,比如:

• BlockingScheduler: 启动后会阻塞主线程,适合简单脚本

• BackgroundScheduler: 在后台启动,不阻塞主线程

• AsyncIOScheduler, TornadoScheduler, TwistedScheduler: 分别适配不同异步框架

类型含义
interval固定时间间隔
cron类似 crontab 表达式,支持精确到秒
date只运行一次,指定时间点

调用 start() 后,它会启动一个内部的 循环调度线程(或事件循环):

• 持续维护一个 “任务执行计划表”(内部是优先队列)

• 每一轮 tick(可能是 0.5~1 秒级别),检查是否有任务到了执行时间

• 有就执行(用线程池或进程池执行)

特性说明
支持多种调度类型interval / cron / date
有任务注册中心管理所有待运行的任务
有时间轮询机制类似事件循环,周期性检查是否“触发”
可以并发执行默认使用线程池
可持久化任务支持 SQLite、Redis 等存储后恢复

基于asyncio + 自定义时间

优点:

• 自由度高:可以灵活计算下一次执行时间。

• 无需额外依赖,原生 asyncio 就能跑。

• 跟 Redis 结合很好,可以做跨进程/跨机器任务协调。

缺点:

• 手动管理定时逻辑(get_next_run_time + asyncio.sleep())。

• 多任务可能不好管理,比如暂停/重启某个 job。

• 不支持 cron 表达式等复杂调度。

import asyncio
import logging
import os
from datetime import datetime, timedelta

import redis.asyncio as redis

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ScheduleService:
    def __init__(self):
        self.redis_subscribe_key = "demo:subscription"
        self.exist_subscribe_key = "demo:exist_push"
        self.push_interval_seconds = 600  # 推送间隔时间
        self.fixed_times = ["08:00", "12:00", "18:00"]  # 固定调度时间列表

    def get_next_run_time(self):
        now_time = datetime.now()
        today_str = now_time.strftime("%Y-%m-%d")
        
        # 今日的所有调度时间点
        run_times = [datetime.strptime(f"{today_str} {t}", "%Y-%m-%d %H:%M") for t in self.fixed_times]
        future_times = [t for t in run_times if t > now_time]

        if future_times:
            return min(future_times)
        else:
            # 如果今天已经过了所有调度时间,则返回明天的第一个时间点
            next_day = (now_time + timedelta(days=1)).strftime('%Y-%m-%d')
            return datetime.strptime(f"{next_day} {self.fixed_times[0]}", "%Y-%m-%d %H:%M")

    async def redis_client(self):
        return await redis.from_url("redis://localhost:6379", decode_responses=True)

    async def fixed_time_task(self):
        while True:
            next_run_time = self.get_next_run_time()
            sleep_seconds = max(1, (next_run_time - datetime.now()).total_seconds())
            logger.info(f"[定时任务] 下次执行时间: {next_run_time}, sleep {sleep_seconds:.0f} 秒")

            await asyncio.sleep(sleep_seconds)
            logger.info("[定时任务] 执行具体逻辑...✅")
            # TODO: 添加你自己的定时任务逻辑

    async def check_redis_and_push(self):
        while True:
            try:
                redis = await self.redis_client()
                now_score = int(datetime.now().strftime("%H%M"))
                trigger_score = now_score + 4

                subscriptions = await redis.zrangebyscore(
                    self.redis_subscribe_key, now_score, trigger_score, withscores=True
                )

                if subscriptions:
                    logger.info(f"[推送检查] 检测到 {len(subscriptions)} 条订阅")
                    for sub_key, _ in subscriptions:
                        user_id, tag = self.parse_key(sub_key)
                        push_key = f"{self.exist_subscribe_key}:{user_id}|{tag}"

                        if not await redis.exists(push_key):
                            logger.info(f"[推送中] 推送消息给用户 {user_id},标签:{tag}")
                            # TODO: 实际的推送逻辑
                            await redis.setex(push_key, self.push_interval_seconds, "1")

                await redis.aclose()
            except Exception as e:
                logger.error(f"[推送异常] {str(e)}")

            await asyncio.sleep(60)

    def parse_key(self, key):
        """解析订阅键 user:xxx|tag:xxx"""
        try:
            parts = key.split("|")
            user_id = parts[0].split(":")[1]
            tag = parts[1].split(":")[1]
            return user_id, tag
        except Exception as e:
            logger.error(f"解析 key 失败: {key} -> {e}")
            return "", ""

if __name__ == "__main__":
    async def main():
        service = ScheduleService()
        await asyncio.gather(
            service.fixed_time_task(),
            service.check_redis_and_push(),
        )

    asyncio.run(main())

方案是否异步优点缺点推荐场景
上述自定义写法灵活,Redis 任务配合好需手动维护时间逻辑任务量少 + redis调度系统
schedule简单、易用不支持异步、不适合服务部署脚本类、一次性任务
APScheduler支持异步 + cron/interval,易维护初学者需要学习下语法推荐服务常驻型场景

支持异步”,就是指在任务调度器中,能直接运行这种:

async def job():
    # 这里可以有 await,例如操作数据库、访问 Redis、发 HTTP 请求等
    await some_async_operation()
    print("异步任务完成")

而 APScheduler 的 AsyncIOScheduler 会让你 不需要 while True,你只要注册一次 job 函数,它会自动在对应时间点调度、支持并发、支持异步、支持 cron 等复杂逻辑,例如:

from apscheduler.schedulers.asyncio import AsyncIOScheduler

async def job():
    await do_something_async()

scheduler = AsyncIOScheduler()
scheduler.add_job(job, 'interval', seconds=10)
scheduler.start()

这个 wrapper() 是普通函数,apscheduler 就可以调度它,而它内部通过 asyncio.create_task() 启动了异步任务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2326657.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pytorch中dataloader自定义数据集

前言 在深度学习中我们需要使用自己的数据集做训练,因此需要将自定义的数据和标签加载到pytorch里面的dataloader里,也就是自实现一个dataloader。 数据集处理 以花卉识别项目为例,我们分别做出图片的训练集和测试集,训练集的标…

SQL Server:触发器

在 SQL Server Management Studio (SSMS) 中查看数据库触发器的方法如下: 方法一:通过对象资源管理器 连接到 SQL Server 打开 SSMS,连接到目标数据库所在的服务器。 定位到数据库 在左侧的 对象资源管理器 中,展开目标数据库&a…

标题:利用 Rork 打造定制旅游计划应用程序:一步到位的指南

引言: 在数字化时代,旅游计划应用程序已经成为旅行者不可或缺的工具。但开发一个定制的旅游应用可能需要耗费大量时间与精力。好消息是,Rork 提供了一种快捷且智能的解决方案,让你能轻松实现创意。以下是使用 Rork 创建一个定制旅…

WebSocket原理详解(二)

WebSocket原理详解(一)-CSDN博客 目录 1.WebSocket协议的帧数据详解 1.1.帧结构 1.2.生成数据帧 2.WebSocket协议控制帧结构详解 2.1.关闭帧 2.2.ping帧 2.3.pong帧 3.WebSocket心跳机制 1.WebSocket协议的帧数据详解 1.1.帧结构 WebSocket客户端与服务器通信的最小单…

计算声音信号波形的谐波

计算声音信号波形的谐波 1、效果 2、定义 在振动分析中,谐波通常指的是信号中频率是基频整数倍的成分。基频是振动的主要频率,而谐波可能由机械系统中的非线性因素引起。 3、流程 1. 信号生成:生成或加载振动信号数据(模拟或实际数据)。 2. 预处理:预处理数据,如去噪…

RepoReporter 仿照`TortoiseSVN`项目监视器,能够同时支持SVN和Git仓库

RepoReporter 项目地址 RepoReporter 一个仓库监视器,仿照TortoiseSVN项目监视器,能够同时支持SVN和Git仓库。 工作和学习会用到很多的仓库,每天都要花费大量的时间在频繁切换文件夹来查看日志上。 Git 的 GUI 工具琳琅满目,Git…

UI设计系统:如何构建一套高效的设计规范?

UI设计系统:如何构建一套高效的设计规范? 1. 色彩系统的建立与应用 色彩系统是设计系统的基础之一,它不仅影响界面的整体美感,还对用户体验有着深远的影响。首先,设计师需要定义主色调、辅助色和强调色,并…

【计算机网络】记录一次校园网无法上网的解决方法

问题现象 环境:实训室教室内时间:近期突然出现 (推测是学校在施工,部分设备可能出现问题)症状: 连接校园网 SWXY-WIFI 后: 连接速度极慢偶发无 IP 分配(DHCP 失败)即使分…

第二十一章:Python-Plotly库实现数据动态可视化

Plotly是一个强大的Python可视化库,支持创建高质量的静态、动态和交互式图表。它特别擅长于绘制三维图形,能够直观地展示复杂的数据关系。本文将介绍如何使用Plotly库实现函数的二维和三维可视化,并提供一些优美的三维函数示例。资源绑定附上…

系统思考反馈

最近交付的都是一些持续性的项目,越来越感觉到,系统思考和第五项修炼不只是简单的一门课程,它们能真正融入到我们的日常工作和业务中,帮助我们用更清晰的思维方式解决复杂问题,推动团队协作,激发创新。 特…

【C++】vector常用方法总结

📝前言: 在C中string常用方法总结中我们讲述了string的常见用法,vector中许多接口与string类似,作者水平有限,所以这篇文章我们主要通过读vector官方文档的方式来学习vector中一些较为常见的重要用法。 🎬个…

2025年数智化电商产业带发展研究报告260+份汇总解读|附PDF下载

原文链接:https://tecdat.cn/?p41286 在数字技术与实体经济深度融合的当下,数智化产业带正成为经济发展的关键引擎。 从云南鲜花产业带的直播热销到深圳3C数码的智能转型,数智化正重塑产业格局。2023年数字经济规模突破53.9万亿元&#xff…

Linux中常用服务器监测命令(性能测试监控服务器实用指令)

1.查看进程 ps -ef|grep 进程名以下指令需要先安装:sysstat,安装指令: yum install sysstat2.查看CPU使用情况(间隔1s打印一个,打印6次) sar -u 1 63.#查看内存使用(间隔1s打印一个,打印6次) sar -r 1 6

基于 GEE 的区域降水数据可视化:从数据处理到等值线绘制

目录 1 引言 2 代码功能概述 3 代码详细解析 3.1 几何对象处理与地图显示 3.2 加载 CHIRPS 降水数据 3.3 筛选不同时间段的降水数据 3.4 绘制降水时间序列图 3.5 计算并可视化短期和长期降水总量 3.6 绘制降水等值线图 4 总结 5 完整代码 6 运行结果 1 引言 在气象…

曲线拟合 | Matlab基于贝叶斯多项式的曲线拟合

效果一览 代码功能 代码功能简述 目标:实现贝叶斯多项式曲线拟合,动态展示随着数据点逐步增加,模型后验分布的更新过程。 核心步骤: 数据生成:在区间[0,1]生成带噪声的正弦曲线作为训练数据。 参数设置&#xff1a…

Qt6调试项目找不到Bluetooth Component蓝牙组件

错误如图所示 Failed to find required Qt component "Bluetooth" 解决方法:搜索打开Qt maintenance tool 工具 打开后,找到这个Qt Connectivity,勾选上就能解决该错误

JAVA- 锁机制介绍 进程锁

进程锁 基于文件的锁基于Socket的锁数据库锁分布式锁基于Redis的分布式锁基于ZooKeeper的分布式锁 实际工作中都是集群部署,通过负载均衡多台服务器工作,所以存在多个进程并发执行情况,而在每台服务器中又存在多个线程并发的情况,…

Java Spring Boot 与前端结合打造图书管理系统:技术剖析与实现

目录 运行展示引言系统整体架构后端技术实现后端代码文件前端代码文件1. 项目启动与配置2. 实体类设计3. 控制器设计4. 异常处理 前端技术实现1. 页面布局与样式2. 交互逻辑 系统功能亮点1. 分页功能2. 搜索与筛选功能3. 图书操作功能 总结 运行展示 引言 本文将详细剖析一个基…

深入剖析JavaScript多态:从原理到高性能实践

摘要 JavaScript多态作为面向对象编程的核心特性,在动态类型系统的支持下展现了独特的实现范式。本文深入解析多态的三大实现路径:参数多态、子类型多态与鸭子类型,详细揭示它们在动态类型系统中的理论基础与实践意义。结合V8引擎的优化机制…

GalTransl开源程序支持GPT-4/Claude/Deepseek/Sakura等大语言模型的Galgame自动化翻译解决方案

一、软件介绍 文末提供程序和源码下载 GalTransl是一套将数个基础功能上的微小创新与对GPT提示工程(Prompt Engineering)的深度利用相结合的Galgame自动化翻译工具,用于制作内嵌式翻译补丁。支持GPT-4/Claude/Deepseek/Sakura等大语言模型的…