airflow调度时间详解

news2025/1/20 12:16:38

⭐️ airflow调度概述

Apache Airflow 是一个开源的工作流调度和监控平台,广泛用于数据工程、ETL(提取、转换、加载)管道以及各种自动化任务。下面我将详细说明 Airflow 的调度算法。

1. DAG(有向无环图)

Airflow 的核心是 DAG(Directed Acyclic Graph),它定义了一组有序的任务,其中每个任务称为一个 “task”。DAG 是调度的基本单位,Airflow 通过 DAG 定义任务的依赖关系和执行顺序。

在这里插入图片描述

2. DAG调度器

Airflow 中的调度器(Scheduler)负责监控 DAG,决定何时触发 DAG 中的任务。

3. 调度策略(Scheduling Policy)

在 Airflow 中,调度策略定义了 DAG 应该何时被触发。调度策略通常基于以下两个因素:

  • 时间表(Schedule Interval): 这是一个时间间隔,定义了 DAG 被调度的频率。例如,每小时调度一次、每天调度一次等。时间表可以通过 Cron 表达式、timedelta 对象等来定义。

  • 开始时间(Start Date): DAG 从这个时间开始调度。start_dateschedule_interval 一起决定了 DAG 的首次执行时间和后续的执行时间点。

4. Catchup

默认情况下,Airflow 会自动执行所有未执行的 DAG 实例。如果你设置了一个 DAG 的 start_date 为过去的某个时间,且 schedule_interval 为每天执行一次,Airflow 会在下次调度器运行时尝试执行所有中间日期的任务,直到赶上当前日期。

如果 DAG 不是为了处理其追赶而编写的,那么可以关闭catchup。这可以通过在DAG中设置catchup=False或在配置文件中设置catchup_by_default=False来实现。关闭catchup时,调度程序仅为最新间隔创建DAG运行。

5. External Triggers

Airflow 还支持外部事件触发,例如通过 REST API 或 Sensor 任务来触发 DAG 或特定任务的执行。这个机制使得 Airflow 能够适应动态和复杂的调度需求,而不仅仅依赖于固定的时间表。

在这里插入图片描述

⭐️ airflow内部是怎样计算调度时间的

Airflow 的开始时间(start_date)和调度时间(schedule_interval)是密切相关的,理解它们之间的关系对正确配置 DAG 至关重要。

1. 开始时间(start_date)

start_date 是指 DAG 或任务首次被调度的参考时间点。它表示任务计划从何时开始执行。重要的是,Airflow 将在 start_date 后的第一个调度间隔完成时实际开始执行任务。换句话说,start_date 指的是任务应该开始考虑执行的时间,而不是实际执行的时间。

2. 调度时间(schedule_interval)

schedule_interval 是指 DAG 定期运行的时间间隔。例如,如果你设置了 schedule_interval=‘@daily’,Airflow 会每天触发一次 DAG。Airflow 调度任务的实际时间通常是 start_date 加上 schedule_interval 的长度。

3. 两者的关系

Airflow 的调度器总是会在 start_date 之后的调度间隔结束时调度任务,这意味着任务实际运行的时间通常是 start_date 加上一个 schedule_interval。

举例说明,假设你有一个 DAG,其 start_date 设置为 2024-08-14 00:00:00,schedule_interval 设置为 @daily(每天运行一次):start_date 是 2024-08-14 00:00:00。
第一次调度的实际执行时间(即第一个 DAG run)会是 2024-08-15 00:00:00。

为什么会这样? 因为 Airflow 认为每个 DAG run 是在上一个时间区间的末尾处理数据。例如,在这个例子中,2024-08-15 00:00:00 运行的是 2024-08-14 的数据。

4. 调度时间的具体计算

Airflow 采用了如下逻辑计算调度时间:

初始调度时间计算:
初始调度时间是基于 start_date 和 schedule_interval 计算得出的第一个调度时间。例如,如果 start_date 是 2024-08-14 00:00:00,schedule_interval 是 1 小时,那么第一个调度时间是 2024-08-14 01:00:00。

后续调度时间计算:
Airflow 会从 start_date 开始逐个计算调度时间,直到当前时间。例如,若 schedule_interval 是 1 小时,Airflow 会逐小时计算每个调度时间点。

Missed Schedules(错过的调度):
如果一个 DAG 处于 paused 状态或因为某些原因未被调度,Airflow 会在 DAG 恢复为 active 状态时补齐所有错过的调度时间点。

5. 实际运行中的情况

在实际应用中,start_date 和 schedule_interval 的组合可能会导致以下情况:
迟到执行:由于 schedule_interval 的存在,第一次 DAG 任务通常在 start_date 后一个完整的调度间隔结束时执行。
调度延迟:如果 DAG 配置或系统资源不够,可能会出现调度延迟。

6. 示例说明

假设你有一个 DAG:

start_date: 2024-08-14 00:00:00
schedule_interval: 每 1 小时一次 (timedelta(hours=1))

那么 Airflow 会按如下时间点调度任务:

2024-08-14 01:00:00 调度 2024-08-14 00:00:00 到 01:00:00 这一小时的数据
2024-08-14 02:00:00 调度 2024-08-14 01:00:00 到 02:00:00 这一小时的数据
以此类推

7. 具体代码实现

在 Airflow 的源码中,调度时间的计算主要依赖于以下几个组件:
DagRun: 每次 DAG 运行都会创建一个 DagRun 对象,其中记录了调度时间、开始时间、结束时间等信息。
next_dagrun_info: 这个函数会基于当前的 schedule_interval 计算下一个调度时间。
catchup: 如果 catchup=True,Airflow 会尝试补齐所有错过的调度时间。

在这里插入图片描述

⭐️ 一个场景

假如开始时间为当天0点,每隔1小时执行一次,而且开始时DAG是被paused的,当在凌晨3点手动从paused状态变成acitve状态,那么后面的运行是怎么调度的,分析如下:

在 Airflow 中,DAG 被 paused 状态解除并变为 active 状态时,调度的逻辑将根据 start_dateschedule_interval 来决定接下来的任务调度时间。让我们详细讨论一下这种情况下的调度行为。

1. 情景描述

  • start_date: 当天的凌晨 00:00(例如 2024-08-14 00:00)。
  • schedule_interval: 每 1 小时运行一次('0 * * * *'timedelta(hours=1))。
  • DAG 状态: 初始时 DAG 处于 paused 状态,凌晨 3 点手动将其设置为 active 状态。

2. 关键点

  1. DAG 的调度逻辑

    • Airflow 的调度是基于 start_dateschedule_interval 的,并且会计算所有可能的调度时间点。
    • 当 DAG 处于 paused 状态时,Airflow 不会调度新的任务实例,但会“记住”错过的调度窗口。
    • 当 DAG 被从 paused 变为 active 时,Airflow 会立即尝试补齐所有错过的调度实例,除非你手动跳过这些实例。
  2. 调度行为

    • 你在凌晨 3 点将 DAG 从 paused 变为 active 状态后,Airflow 将立即调度凌晨 1 点、2 点和 3 点的实例,因为它们都是基于 start_dateschedule_interval 计算出来的调度点。
    • 这意味着,当你在凌晨 3 点将 DAG 设为 active,Airflow 会依次调度并执行 00:00-01:00、01:00-02:00、02:00-03:00 这几个时间段的 DAG 任务。

3. 示例说明

假设 start_date 是 2024-08-14 00:00,DAG 设定为每 1 小时执行一次,调度时间点依次是:

  • 2024-08-14 01:00:00
  • 2024-08-14 02:00:00
  • 2024-08-14 03:00:00
  • 2024-08-14 04:00:00

如果 DAG 在 2024-08-14 03:00:00 从 paused 状态变为 active,Airflow 会立即调度前面错过的任务:

  • 2024-08-14 01:00:00(任务处理 00:00 到 01:00 的数据)
  • 2024-08-14 02:00:00(任务处理 01:00 到 02:00 的数据)
  • 2024-08-14 03:00:00(任务处理 02:00 到 03:00 的数据)

Airflow 会按照这些顺序来执行这些任务,确保所有错过的调度时间点都得到处理。

下图是一个在实际生产中的案例,开始时间是凌晨0点5分,每5个小时调度一次,但开始时DAG是paused的,上午9点多手动将DAG恢复,此图是从airflow ui 中截取的,

在这里插入图片描述

笔者水平有限,若有不对的地方欢迎评论指正!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2067015.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

萌拉数据需要下载吗,萌啦数据使用需不需要下载

在数字化浪潮席卷全球的今天,数据成为了驱动企业决策、优化产品服务、精准市场营销的关键要素。而提到数据处理与分析的利器,“萌拉数据”这一名字逐渐在业界崭露头角。面对这样一个功能强大的数据平台,不少用户心中或许会产生疑问&#xff1…

前端:内嵌微信扫码登陆在chrome浏览器失败

前端:内嵌微信二维码登录。 官方文档: 关于微信快速登录功能的说明 | 微信开放文档 按照官方文档书写后,二维码出现在了页面上。但是扫码登录时,浏览器控制台报错 Unsafe JavaScript attempt to initiate navigation for frame w…

无需多部备用机,云手机方便又便宜!

云手机,是云计算技术的又一创新应用,它通过在云服务器上虚拟出带有原生安卓操作系统的手机实例,为用户提供了一种全新的手机使用体验。无需携带多部手机,只需通过云手机,便可轻松实现多账号管理、应用运行及数据存储等…

用这个项目管理工具创建管理表,轻松实现项目管理!

在项目管理中,时间表不仅仅是一个简单的计划工具,而是确保项目按时完成的关键 许多项目经理和团队成员在推进项目时,常常因为缺乏明确的时间表而陷入混乱,导致进度拖延、资源浪费,甚至项目失败。 那么,如何…

无人机遥控器里的接收器工作原理解析!

无人机遥控器中的接收器工作方式主要基于无线信号传输技术 信号发射 遥控器上的发射器将用户的操作指令(如推动操纵杆的动作)转化为无线电信号。这些信号通过特定的频率(如常用的2.4GHz)在空气中传输。 信号接收 无人机上的接…

2024.8.23(docker)

一、打包传输 1、打包 [rootdocker ~]# docker save -o centos.tar centos:latest [rootdocker ~]# ls 2、删除镜像 [rootdocker ~]# docker ps -all CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 372f7f49e6df …

固废检测算法实际应用方案固废检测算法源码解析

固废检测算法是利用机器视觉和深度学习技术,对地面上的垃圾进行自动识别、分类和定位的一种算法。这类算法在环境管理、智慧城市、自动化清洁等领域具有广泛的应用前景。以下是对固废检测算法的详细介绍: 一、算法原理 固废检测算法主要基于深度学习和计…

小米14的射频芯片高通SDR753全景图

这周末,除非外面下钞票,否则谁也拦不住我玩《黑神话悟空》(附:两款可以玩转悟空的显卡推荐) 天玑助力联发科力压高通~探秘MTK 5G旗舰智能手机SoC芯片——MT6989(天玑9300) 原创 芯愿景软件 集成电路大数据平台 2024年05月08日 19:18 北京 封装图(正面) 封装图

第21周:中文文本分类-Pytorch实现

目录 前言 一、前期准备 1.1 数据示例 1.2 文本分类的流程图 1.3 导入所需库 1.4 加载数据 二、数据预处理 2.1 构建词典 2.2 生成数据批次和迭代器 三、模型构建 3.1 搭建模型 3.2 初始化模型和超参数定义 3.3 训练函数与评估函数定义 四、训练模型 4.1 拆分数…

整合sentinel遇到的小问题

1.运行jar包 ,端口为默认8080 正确命令 java -Dserver.port8090 -Dcsp.sentinel.dashboard.server127.0.0.1:8090 -Dproject.namesentinel-dashboard -jar sentinel-dashboard-1.8.6.jar -D这些指令要在 -jar前面 在宝塔部署时,直接复制到运行命令后…

深入探索分布式任务调度框架:MySQL实现高效锁机制

本文主要介绍项目中怎么使用 MySQL 实现分布式锁的 背景 假如我们现在要做一个高性能、可扩展的分布式任务调度框架,要怎么设计呢?下面是我之前自己设计的一个架构图。 为了方便后续的分布式锁的设计,我们大致描述下各个角色都做了哪些事情…

搭建springboot项目,并解决项目出现红色J问题

搭建springboot的2种方式 搭建springboot项目有多种方式,这里简单介绍2种,看您使用哪个更方便。 第一种:在idea里创建 第二种,在官网创建下载 spring官网地址:https://start.spring.io/ 解决项目出现红J问题 搭建好…

【OpenHarmony】openharmony移植到RK3568------获取源码编译OpenHarmony源码

一、源码获取 源码获取有好几种方式,在这里直接在镜像网站下载源码,点击下面连接下载全量版本的OpenHarmony4.1 https://repo.huaweicloud.com/openharmony/os/4.1-Release/code-v4.1-Release.tar.gz 将源码放到自己建立的目录下解压,我放…

[JavaEE] 工作流- Activiti7 框架详解

目录 1、Activiti介绍 1.1、BPMN设计器 1.2、常见流程符号 1.2.1、事件event 1.2.2、活动activiti 1.2.3、流向flow 2、入门案例 2.1、需求说明 2.2、初始环境 2.2.1、添加依赖 2.2.2、添加配置 2.2.3、添加引导类 2.2.4、启动项目 2.2.5、表结构 2.2.6、常见ap…

【解析几何笔记】5.仿射坐标系与二阶行列式

5.仿射坐标系与二阶行列式 5.1 定义 【定义1.2】空间中一点 O O O与三个不共面向量 e 1 , e 2 , e 3 \pmb{e}_{1},\pmb{e}_{2},\pmb{e}_{3} e1​,e2​,e3​一起构成空间的一个仿射标架,记作 [ O ; e 1 , e 2 , e 3 ] [O;\pmb{e}_{1},\pmb{e}_{2},\pmb{e}_{3}] [O;…

Android CCodec Codec2 (三)C2Param - Ⅰ

在Codec2框架中,对组件的配置(Setting)、微调(Tuning)以及组件回传的信息(Info)都是通过参数的形式进行传递的。无论是简单参数(只包含一个值)还是复杂参数(包…

HEIC批量格式转化JPG怎么转?这四种方法很好用

HEIC批量格式转化JPG怎么转?随着智能手机技术的不断发展,HEIC(High Efficiency Image Container)作为一种高效的图像压缩格式,逐渐被广泛应用于iOS设备中。然而,由于HEIC格式的兼容性问题,许多非…

Echarts 散点图的 tooltip 自定义formatter方法(展示X、Y、value之外的数据)

1.效果展示,如图,tooltip的构成是指标名实际值【目标值】 2.后端的数据结构 3.完整代码:主要就是将需要展示的字段数据拼好放到tooltipInfo里 initLeftEcharts() {const now new Date();const year now.getFullYear();const month …

oracle共享池(shared pool):一、工作原理、组成部分 二、软硬解析过程

文章目录 oracle整体结构图共享池(shared pool)shared pool的作用shared pool的组成查询 shared pool 各组成部分大小硬解析和软解析 oracle整体结构图 共享池(shared pool) shared pool的作用 1、 将 sql 语句解析成执行计划 …

统一服务入口-Gateway(一)

目录 1.网关介绍 1.1含有问题 1.2什么是API网关 网关核心功能: 2.Spring Cloud Gateway 2.1什么是Spring Cloud Gateway 2.2快速上手 2.2.1创建网关项目 2.2.2引入网关依赖 2.2.3添加Gateway的路由配置 2.2.4测试 2.3Predicate 2.3.1Predicate的其他写…