python 的APScheduler配置的定时任务会被Miss掉

news2024/9/29 1:15:22

背景

python 的APScheduler配置的定时任务会被Miss掉,经常在控制台收到一些Miss的告警信息,就觉得是任务太多导致的,为了定位到具体的原因,看了一些源码,了解到了定时任务的6大模块的协同工作模式。

异常信息及来源

异常信息
Run time of job “check_job_status (trigger: interval[0:00:05], next run at: 2023-04-11 12:05:01 CST)” was missed by 0:00:03.012525
异常信息解析
异常日志表示一个任务(名称为 “check_job_status”)的运行时间被错过了。任务使用了一个 interval 触发器,即每隔 5 秒触发一次。该任务的下一次运行时间应该是在 2023-04-11 12:05:01,CST 时区(中国标准时间)
异常日志来源
任务到了被触发执行的时候,执行器将该任务提交给子线程,执行方法run_job,run_job方法内部会再次校验当前时间和需要触发执行时间的时间差是否超过配置的超时时间(misfire_grace_time),如果超过了,则打印该告警信息。
在这里插入图片描述在这里插入图片描述

APScheduler 的六大模块

路径下的 executors、jobstores、schedulers、triggers、events、job 这几个模块是协同工作来实现调度任务的。下面是它们之间的关系和工作流程:
在这里插入图片描述

  1. jobstores 模块:任务存储器,用于存储任务信息和执行计划,包括 MemoryJobStore、SQLAlchemyJobStore、MongoDBJobStore 和 RedisJobStore 等多种实现。jobstores 模块将任务定义和计划信息存储在一个任务列表中,供 schedulers 模块调度任务。

  2. triggers 模块:触发器,用于确定任务的执行时间,包括 SimpleTrigger、IntervalTrigger、CronTrigger 等多种实现。triggers 模块提供了不同类型的触发器,用于根据不同的时间或事件条件触发任务的执行。

  3. job 模块:任务,用于封装要执行的任务内容,包括 Job 和 AsyncJob 两种实现。job 模块将任务的执行方法和参数封装在一个 Job 对象中,供 schedulers 模块调度。

  4. executors 模块:任务执行器,用于执行任务,包括 ThreadPoolExecutor、ProcessPoolExecutor、BlockingScheduler 等多种实现。executors 模块会根据 Job 对象中的执行方法和参数来执行任务,执行结果会被传递给 schedulers 模块。

  5. schedulers 模块:调度器,用于调度任务的执行,包括 BlockingScheduler、BackgroundScheduler、AsyncIOScheduler 和 GeventScheduler 等多种实现。schedulers 模块会从 jobstores 模块获取任务列表,并根据 triggers 模块提供的触发器信息,将需要执行的任务交给 executors 模块执行。

  6. events 模块:事件管理器,用于管理和触发各种事件,包括 JobExecutionEvent、JobSubmissionEvent 等多种实现。events 模块可以检测任务的执行结果,并触发相应的事件,供开发者进行监控和处理。

下面根据部分业务代码和源码来捋一下定时任务的全流程

一、初始化定时任务对象时,我这里配置ThreadPoolExecutoe, 最多10个线程的线程池(实际不配置的话,默认也是10)在这里插入图片描述

二、 在业务逻辑里添加定时任务:

#  最大实例数为1, interval触发器
cls.schedule.add_job(id=job_id, args=(func,), trigger="interval", max_instances=1, **params)

add_job方法的底层实现如下
/venv/lib/python3.8/site-packages/apscheduler/schedulers/base.py
在这里插入图片描述
这段源码的自我分析:将任务信息封装成Job对象,然后调用_real_add_job方法将任务添加到任务管理器中,添加时会携带任务信息(执行时间,执行次数,时间间隔等)以及触发器,调度器和下次期望被触发的时间。

三、 调度器会循环校验每个任务的执行时间,如果执行时间到,则获取到任务的触发器,触发器将任务转给执行器,执行器执行任务。具体逻辑在如下的源码中

    def _process_jobs(self):
        """
        Iterates through jobs in every jobstore, starts jobs that are due and figures out how long
        to wait for the next round.

        If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least
        ``jobstore_retry_interval`` seconds.

        """
        if self.state == STATE_PAUSED:
            self._logger.debug('Scheduler is paused -- not processing jobs')
            return None

        self._logger.debug('Looking for jobs to run')
        now = datetime.now(self.timezone)
        next_wakeup_time = None
        events = []

        with self._jobstores_lock:
            for jobstore_alias, jobstore in six.iteritems(self._jobstores):
                try:
                    due_jobs = jobstore.get_due_jobs(now)
                except Exception as e:
                    # Schedule a wakeup at least in jobstore_retry_interval seconds
                    self._logger.warning('Error getting due jobs from job store %r: %s',
                                         jobstore_alias, e)
                    retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
                    if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
                        next_wakeup_time = retry_wakeup_time
					# 如果任务还没到触发执行的时候,就pass
                    continue

                for job in due_jobs:
                    # Look up the job's executor
                    try:
                        executor = self._lookup_executor(job.executor)
                    except BaseException:
                        self._logger.error(
                            'Executor lookup ("%s") failed for job "%s" -- removing it from the '
                            'job store', job.executor, job)
                        self.remove_job(job.id, jobstore_alias)
                        continue

                    run_times = job._get_run_times(now)
                    run_times = run_times[-1:] if run_times and job.coalesce else run_times
                    if run_times:
                        try:
                            # 这里将任务和任务要触发的时间交给执行器,任务的实例值+1; 如果是线程池执行器,submit_job方法内部会将任务提交到线程池中执行,执行成功后,会将该任务的实例值-1。
                            executor.submit_job(job, run_times)
                        except MaxInstancesReachedError:
                            self._logger.warning(
                                'Execution of job "%s" skipped: maximum number of running '
                                'instances reached (%d)', job, job.max_instances)
                            # 将任务封装成事件,将事件添加到事件列表中
                            event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
                                                       jobstore_alias, run_times)
                            events.append(event)
                        except BaseException:
                            self._logger.exception('Error submitting job "%s" to executor "%s"',
                                                   job, job.executor)
                        else:
                            # 将任务封装成事件,将事件添加到事件列表中
                            event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
                                                       run_times)
                            events.append(event)

                        # Update the job if it has a next execution time.
                        # Otherwise remove it from the job store.
                        # 触发器获取到该任务下次需要被触发的时间,并更新任务管理器
                        job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)
                        if job_next_run:
                            job._modify(next_run_time=job_next_run)
                            jobstore.update_job(job)
                        else:
                            self.remove_job(job.id, jobstore_alias)

                # Set a new next wakeup time if there isn't one yet or
                # the jobstore has an even earlier one
                jobstore_next_run_time = jobstore.get_next_run_time()
                if jobstore_next_run_time and (next_wakeup_time is None or
                                               jobstore_next_run_time < next_wakeup_time):
                    next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)

        # Dispatch collected events
        # 遍历事件列表 `events`,并分别调用 `_dispatch_event` 方法来处理每个事件。
        # 具体地说,`_dispatch_event` 方法会将事件分发给所有已注册的监听器进行处理。遍历监听器列表 `self._listeners`,
        # 并分别调用# 每个监听器的 `handle_event` 方法来处理事件。
        # 如果监听器处理事件时出现异常,会将异常记录到日志中。处理完所有监听器后,`_dispatch_event` 方法会返回。
        # 这样,整个事件分发过程就完成了。通过这个过程,我们可以实现在任务执行成功、失败、被取消等情况下触发相应的事件,并对这些事件进行监听和处理。
        #(配置事件管理器的是add_listener方法,可以全局配置的事件管理器,也可以为每个任务配置不同的管理器。)
        for event in events:
            self._dispatch_event(event)

        # Determine the delay until this method should be called again
        if self.state == STATE_PAUSED:
            wait_seconds = None
            self._logger.debug('Scheduler is paused; waiting until resume() is called')
        elif next_wakeup_time is None:
            wait_seconds = None
            self._logger.debug('No jobs; waiting until a job is added')
        else:
            wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
            self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
                               wait_seconds)
		# 这里的 wait_seconds 就是再次触发_process_jobs方法被调用时,需要阻塞的时间,单位 秒。
        return wait_seconds

这段源码的自我分析:调度器会轮训任务管理器中的每一个任务,如果任务要被触发,会将任务交给执行器,同时将任务添加到事件管理列表中。然后获取任务的下一次触发时间,如果没有获取到,则将该任务移除。最后获取一个该方法被循环调用的阻塞时间。

四、 _process_jobs方法实际在调度器启动时就已经被注册调用了,具体的源码如下图:
在这里插入图片描述
wakeup 方法仅仅是调用了 _event.set() 方法,该方法会设置一个事件。当调度器处于休眠状态时,等待 _event 事件被设置,一旦事件被设置,调度器就会立即执行 _process_jobs 方法,检查所有的任务是否到了执行时间。

总结整个流程

  1. 调用start方法启动定时任务
  2. 调用add_job方法添加定时任务,定时任务被存入任务管理器,此时会调用wakeup方法触发_process_jobs方法执行
  3. _process_jobs方法做的事情比较多
    a. 会将任务和任务要触发的时间交给执行器,任务的实例值+1;
    b. 如果是线程池执行器,submit_job方法内部会将任务提交到线程池中执行,执行成功后,会将该任务的实例值-1.
    c. 最后将任务封装成事件,将事件添加到事件列表中
    d. 如果任务配置了事件管理器,则执行注册的事件
    e. 获取阻塞_process_jobs方法的时间
  4. 下次调用_process_jobs方法继续监控任务列表

架构图

引用这位博主的一张架构图, 感谢~
在这里插入图片描述

任务被Miss的原因

通过整个流程的梳理,项目中任务被Miss掉的真正原因如下:

  1. 任务量太大,大概500个定时任务
  2. 任务触发间隔时间太短, 5秒钟触发一次
  3. 线程池配置比较小。感觉这里也不应该配置太多,太多的话,资源消耗肯定会增加
  4. 任务里有对第三方服务发起http请求,比较耗时。这里也许可以用

解决方案

批处理:下游服务提供批量查询的功能,我的服务就不用一个个查询了,一下子解决了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/570930.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

01.GATK人种系变异最佳实践SnakeMake流程:WorkFlow简介

<~生~信~交~流~与~合~作~请~关~注~公~众~号生信探索> 学习的第一个GATK找变异流程&#xff0c;人的种系变异的短序列变异&#xff0c;包括SNP和INDEL。写了一个SnakeMake分析流程&#xff0c;从fastq文件到最后的vep注释后的VCF文件&#xff0c;关于VCF的介绍可以参考上一…

Windows 安装 GCC

文章目录 GCC 是什么&#xff1f;GCC 和 gcc 什么关系&#xff1f;Windows 安装 GCC选型下载安装配置环境变量验证 参考文献 GCC 是什么&#xff1f; GCC&#xff08;GNU Compiler Collection&#xff09;是一个开源的编译器套件&#xff0c;由 GNU 项目开发和维护。 GNU 编译…

悠可集团再获金鼠标3项大奖,自研营销工具助推全渠道数字营销

5月19日,第14届金鼠标数字营销大赛评选结果揭晓,悠可集团斩获3项大奖,其中悠可集团被评为“年度数字营销杰出代理商”,悠可DTC团队自主研发的智能广告投放引擎Turbo Media及点正科技申报的KOL优先自动化工具均荣获“年度最佳数字营销工具”奖项。 据主办方介绍,本届金鼠标数字营…

java基础 - jvm 堆、栈、方法区 java 内存模型

一、 概览 在进入主题前&#xff0c;我们先了解一些相关的知识&#xff0c;方面后面对运行时数据区进行分类。 进程中&#xff0c;有很多数据是多线程之间共享的&#xff0c;线程在执行时&#xff0c;会先从主存中读取数据&#xff0c;然后复制一份到高速缓存中&#xff0c;当…

【Docker容器】Docker安装Kibana详细步骤(看完这一篇就够啦!)

前言 安装Kibana的版本一定要跟ElaticSearch的版本是一样的&#xff0c;这样才不会容易出问题。 安装ElasticSearch请点击以下链接前去学习。 【Docker容器】Docker安装ElasticSearch详细步骤(看这一篇就够啦&#xff01;) 安装 1.访问镜像官网 https://hub.docker.com/ 搜索…

Java的内存模型(Java Memory Mode,JMM)

并发编程模型的两个关键问题 线程之间如何通信及线程之间如何同步。 线程之间如何通信&#xff1a;共享内存&#xff0c;消息传递线程之间如何同步通信是指线程之间以何种机制来 交换信息同步是指程序中用于控制不同线程间 操作发生相对顺序 的机制在共享内存的并发模型里&a…

【PHP图片托管】免费CFimagehost图床源码搭建私人图床 - 无需数据库

文章目录 1.前言2. CFImagehost网站搭建2.1 CFImagehost下载和安装2.2 CFImagehost网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar临时数据隧道3.2 Cpolar稳定隧道&#xff08;云端设置&#xff09;3.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 4.公网访问测…

C#入门:编写运行第一个Windows窗体应用程序Helloworld_WinForm

下载及安装Visual Studio的链接&#xff0c;具体见C#入门&#xff1a;编写运行第一个C#程序Helloworld 目录 下载.NET桌面开发组件创建新项目添加控件编辑程序运行查看效果补充 下载.NET桌面开发组件 Visual Studio中需下载安装.NET桌面开发组件。 创建新项目 创建项目&…

【2】Jmeter获取token,模拟多用户并发及token存储文本文件

按以下步骤来&#xff0c;以下各种处理程序的放置位置很重要&#xff01;&#xff01;不然会出现各种问题&#xff1a; 1、setup线程组 前置获取token的接口&#xff08;login&#xff0c;以下都用login表示获取token接口&#xff09;请求&#xff0c;获取类似token等后续需要…

10 个对 Android 开发者有用的 Kotlin 扩展函数 #2

10 个对 Android 开发者有用的 Kotlin 扩展函数 #2 通过出色的 Kotlin 扩展提高您的工作效率 EditText 通过EditText的“text”属性&#xff0c;您可以快速获取EditText中的文本。但是&#xff0c;这个文本是“可编辑”的&#xff0c;因此每次都需要将其转换为字符串才能获…

前端食堂技术周刊第 83 期:TS 5.1 RC、Nuxt 3.5、INP、Knip、管理 GitHub 通知、WebXR

By Midjournery 美味值&#xff1a;&#x1f31f;&#x1f31f;&#x1f31f;&#x1f31f;&#x1f31f; 口味&#xff1a;杏花乌龙拿铁 食堂技术周刊仓库地址&#xff1a;https://github.com/Geekhyt/weekly 本期摘要 TypeScript 5.1 RCNuxt 3.5INP 将成为新的 Core Web…

SSM 如何使用 Seata 框架实现分布式事务?

SSM 如何使用 Seata 框架实现分布式事务&#xff1f; 分布式事务是现代分布式系统中必不可少的一部分&#xff0c;而 Seata 框架是一种常用的分布式事务处理方式。在 SSM 框架中&#xff0c;我们可以使用 Seata 框架来管理分布式事务。本文将介绍如何在 SSM 框架中使用 Seata …

好用工具第2期:手机电脑传文件LocalSend

1. 概要 LocalSend 是一个跨平台的 AirDrop替工具软件。 适用于手机电脑之间的数据文件传输&#xff0c;不需要互联网连接或第三方服务器&#xff0c;是局域网本地通信的快速可靠解决方案。 LocalSend 是一个开源项目, 项目地址是: https://github.com/localsend/localsend …

webpack简单的搭建和使用

随便创建一个空的文件夹&#xff0c;例如说&#xff1a;explore 然后我们测试一下我们的node是否存在 可以正确打印出版本 我们再次输入&#xff1a;npm init -y 创建一个package.json文件 出现这样的情况就成功了 然后我们要安装webpack在终端上输入命令&#xff1a; npm i …

辅助生成: 低延迟文本生成的新方向

大型语言模型如今风靡一时&#xff0c;许多公司投入大量资源来扩展它们规模并解锁新功能。然而&#xff0c;作为注意力持续时间不断缩短的人类&#xff0c;我们并不喜欢大模型缓慢的响应时间。由于延迟对于良好的用户体验至关重要&#xff0c;人们通常使用较小的模型来完成任务…

Kotlin SOLID 原则

Kotlin SOLID 原则 许多 Kotlin 开发者并不完全了解 SOLID 原理&#xff0c;即使他们知道&#xff0c;他们也不知道为什么要使用它。您准备好了解所有细节了吗&#xff1f; 介绍 亲爱的 Kotlin 爱好者&#xff0c;您好&#xff01;欢迎来到我的新文章。今天我要讲的是 Kotli…

Qt编程基础 | 第六章-窗体 | 6.2、VS导入资源文件

一、VS导入资源文件 1.1、导入资源文件 步骤一&#xff1a; 将所有图片放到各自文件夹下&#xff0c;并将文件夹拷贝到资源文件&#xff08;.qrc文件&#xff09;的同级目录下&#xff0c;如下&#xff1a; 步骤二&#xff1a; 新建VS项目的时候&#xff0c;系统会自动建好一…

如何在华为OD机试中获得满分?Java实现【最差产品奖】一文详解!

✅创作者&#xff1a;陈书予 &#x1f389;个人主页&#xff1a;陈书予的个人主页 &#x1f341;陈书予的个人社区&#xff0c;欢迎你的加入: 陈书予的社区 &#x1f31f;专栏地址: Java华为OD机试真题&#xff08;2022&2023) 文章目录 1. 题目描述2. 输入描述3. 输出描述…

NetApp AFF C 系列——可持续、可扩展且安全可靠的全闪存解决方案

NetApp AFF C 系列 采用全新的闪存技术&#xff0c;同时辅以智能科技加持&#xff0c;将为您带来一个更为经济实惠的全闪存解决方案&#xff0c;它重新定义了安全性、可扩展性和可持续性。 为什么选择 AFF C 系列的新一代全闪存解决方案&#xff1f; 实现现代化&#xff0c;打…

chatgpt+mj+did生成会说话的头像

chatgptmjdid生成会说话的头像 当我们有了 ChatGPT 生成的内容&#xff0c;有了 stable-diffusion、midjourney 或者 dalle2 生成的人像&#xff0c;如果还能让人像动起来&#xff0c;替我们朗诵或者播报一下 ChatGPT 的内容&#xff0c;不就是一个数字人了么&#xff1f; D-…