⭐️ airflow调度概述
Apache Airflow 是一个开源的工作流调度和监控平台,广泛用于数据工程、ETL(提取、转换、加载)管道以及各种自动化任务。下面我将详细说明 Airflow 的调度算法。
1. DAG(有向无环图)
Airflow 的核心是 DAG(Directed Acyclic Graph),它定义了一组有序的任务,其中每个任务称为一个 “task”。DAG 是调度的基本单位,Airflow 通过 DAG 定义任务的依赖关系和执行顺序。
2. DAG调度器
Airflow 中的调度器(Scheduler)负责监控 DAG,决定何时触发 DAG 中的任务。
3. 调度策略(Scheduling Policy)
在 Airflow 中,调度策略定义了 DAG 应该何时被触发。调度策略通常基于以下两个因素:
-
时间表(Schedule Interval): 这是一个时间间隔,定义了 DAG 被调度的频率。例如,每小时调度一次、每天调度一次等。时间表可以通过 Cron 表达式、
timedelta
对象等来定义。 -
开始时间(Start Date): DAG 从这个时间开始调度。
start_date
与schedule_interval
一起决定了 DAG 的首次执行时间和后续的执行时间点。
4. Catchup
默认情况下,Airflow 会自动执行所有未执行的 DAG 实例。如果你设置了一个 DAG 的 start_date 为过去的某个时间,且 schedule_interval 为每天执行一次,Airflow 会在下次调度器运行时尝试执行所有中间日期的任务,直到赶上当前日期。
如果 DAG 不是为了处理其追赶而编写的,那么可以关闭catchup。这可以通过在DAG中设置catchup=False或在配置文件中设置catchup_by_default=False来实现。关闭catchup时,调度程序仅为最新间隔创建DAG运行。
5. External Triggers
Airflow 还支持外部事件触发,例如通过 REST API 或 Sensor 任务来触发 DAG 或特定任务的执行。这个机制使得 Airflow 能够适应动态和复杂的调度需求,而不仅仅依赖于固定的时间表。
⭐️ airflow内部是怎样计算调度时间的
Airflow 的开始时间(start_date)和调度时间(schedule_interval)是密切相关的,理解它们之间的关系对正确配置 DAG 至关重要。
1. 开始时间(start_date)
start_date 是指 DAG 或任务首次被调度的参考时间点。它表示任务计划从何时开始执行。重要的是,Airflow 将在 start_date 后的第一个调度间隔完成时实际开始执行任务。换句话说,start_date 指的是任务应该开始考虑执行的时间,而不是实际执行的时间。
2. 调度时间(schedule_interval)
schedule_interval 是指 DAG 定期运行的时间间隔。例如,如果你设置了 schedule_interval=‘@daily’,Airflow 会每天触发一次 DAG。Airflow 调度任务的实际时间通常是 start_date 加上 schedule_interval 的长度。
3. 两者的关系
Airflow 的调度器总是会在 start_date 之后的调度间隔结束时调度任务,这意味着任务实际运行的时间通常是 start_date 加上一个 schedule_interval。
举例说明,假设你有一个 DAG,其 start_date 设置为 2024-08-14 00:00:00,schedule_interval 设置为 @daily(每天运行一次):start_date 是 2024-08-14 00:00:00。
第一次调度的实际执行时间(即第一个 DAG run)会是 2024-08-15 00:00:00。
为什么会这样? 因为 Airflow 认为每个 DAG run 是在上一个时间区间的末尾处理数据。例如,在这个例子中,2024-08-15 00:00:00 运行的是 2024-08-14 的数据。
4. 调度时间的具体计算
Airflow 采用了如下逻辑计算调度时间:
初始调度时间计算:
初始调度时间是基于 start_date 和 schedule_interval 计算得出的第一个调度时间。例如,如果 start_date 是 2024-08-14 00:00:00,schedule_interval 是 1 小时,那么第一个调度时间是 2024-08-14 01:00:00。
后续调度时间计算:
Airflow 会从 start_date 开始逐个计算调度时间,直到当前时间。例如,若 schedule_interval 是 1 小时,Airflow 会逐小时计算每个调度时间点。
Missed Schedules(错过的调度):
如果一个 DAG 处于 paused 状态或因为某些原因未被调度,Airflow 会在 DAG 恢复为 active 状态时补齐所有错过的调度时间点。
5. 实际运行中的情况
在实际应用中,start_date 和 schedule_interval 的组合可能会导致以下情况:
迟到执行:由于 schedule_interval 的存在,第一次 DAG 任务通常在 start_date 后一个完整的调度间隔结束时执行。
调度延迟:如果 DAG 配置或系统资源不够,可能会出现调度延迟。
6. 示例说明
假设你有一个 DAG:
start_date: 2024-08-14 00:00:00
schedule_interval: 每 1 小时一次 (timedelta(hours=1))
那么 Airflow 会按如下时间点调度任务:
2024-08-14 01:00:00 调度 2024-08-14 00:00:00 到 01:00:00 这一小时的数据
2024-08-14 02:00:00 调度 2024-08-14 01:00:00 到 02:00:00 这一小时的数据
以此类推
7. 具体代码实现
在 Airflow 的源码中,调度时间的计算主要依赖于以下几个组件:
DagRun: 每次 DAG 运行都会创建一个 DagRun 对象,其中记录了调度时间、开始时间、结束时间等信息。
next_dagrun_info: 这个函数会基于当前的 schedule_interval 计算下一个调度时间。
catchup: 如果 catchup=True,Airflow 会尝试补齐所有错过的调度时间。
⭐️ 一个场景
假如开始时间为当天0点,每隔1小时执行一次,而且开始时DAG是被paused的,当在凌晨3点手动从paused状态变成acitve状态,那么后面的运行是怎么调度的,分析如下:
在 Airflow 中,DAG 被 paused
状态解除并变为 active
状态时,调度的逻辑将根据 start_date
和 schedule_interval
来决定接下来的任务调度时间。让我们详细讨论一下这种情况下的调度行为。
1. 情景描述
start_date
: 当天的凌晨 00:00(例如 2024-08-14 00:00)。schedule_interval
: 每 1 小时运行一次('0 * * * *'
或timedelta(hours=1)
)。- DAG 状态: 初始时 DAG 处于
paused
状态,凌晨 3 点手动将其设置为active
状态。
2. 关键点
-
DAG 的调度逻辑:
- Airflow 的调度是基于
start_date
和schedule_interval
的,并且会计算所有可能的调度时间点。 - 当 DAG 处于
paused
状态时,Airflow 不会调度新的任务实例,但会“记住”错过的调度窗口。 - 当 DAG 被从
paused
变为active
时,Airflow 会立即尝试补齐所有错过的调度实例,除非你手动跳过这些实例。
- Airflow 的调度是基于
-
调度行为:
- 你在凌晨 3 点将 DAG 从
paused
变为active
状态后,Airflow 将立即调度凌晨 1 点、2 点和 3 点的实例,因为它们都是基于start_date
和schedule_interval
计算出来的调度点。 - 这意味着,当你在凌晨 3 点将 DAG 设为
active
,Airflow 会依次调度并执行 00:00-01:00、01:00-02:00、02:00-03:00 这几个时间段的 DAG 任务。
- 你在凌晨 3 点将 DAG 从
3. 示例说明
假设 start_date
是 2024-08-14 00:00,DAG 设定为每 1 小时执行一次,调度时间点依次是:
- 2024-08-14 01:00:00
- 2024-08-14 02:00:00
- 2024-08-14 03:00:00
- 2024-08-14 04:00:00
- …
如果 DAG 在 2024-08-14 03:00:00 从 paused
状态变为 active
,Airflow 会立即调度前面错过的任务:
- 2024-08-14 01:00:00(任务处理 00:00 到 01:00 的数据)
- 2024-08-14 02:00:00(任务处理 01:00 到 02:00 的数据)
- 2024-08-14 03:00:00(任务处理 02:00 到 03:00 的数据)
Airflow 会按照这些顺序来执行这些任务,确保所有错过的调度时间点都得到处理。
下图是一个在实际生产中的案例,开始时间是凌晨0点5分,每5个小时调度一次,但开始时DAG是paused的,上午9点多手动将DAG恢复,此图是从airflow ui 中截取的,
笔者水平有限,若有不对的地方欢迎评论指正!