Airflow:TimeSensor感知时间条件

news2025/1/12 14:01:32

在数据管道工作流中,任务可能需要在特定的时间执行,或者在继续之前等待一定的时间。为了满足这些需求,Apache Airflow提供了TimeSensor,这是一种内置Sensor,可以监控当前时间,并在达到指定时间时触发后续任务。在这篇博文中,我们将深入研究TimeSensor,涵盖它的特性、用例、实现、定制和最佳实践。

介绍TimeSensor

  • 时间感知与触发:Airflow 的TimesSensor是一种传感器(Sensor)类型的操作符(Operator)。它的主要作用是按照指定的时间规则来感知时间,并且根据时间条件来触发后续的任务流程。例如,它可以等待到某个特定的时间点,如每天的下午 3 点,然后再启动相关的数据处理任务。
  • 阻塞任务执行:在满足时间条件之前,TimesSensor会阻塞当前任务实例(Task Instance)所在的分支流程。这意味着在指定时间到来之前,后续依赖于该传感器输出的任务不会被执行,从而确保任务是在期望的时间之后才开始运行。
    在这里插入图片描述

TimesSensor应用场景

定时数据提取与加载(ETL)

场景描述:在数据仓库的 ETL(Extract - Transform - Load)流程中,假设我们需要每天在特定时间从源数据库提取数据。例如,业务要求每天凌晨 2 点开始提取数据,因为这个时候源系统的负载较低,数据更新也基本完成。

实现方式:可以使用TimesSensor来设置等待时间为凌晨 2 点。在 Airflow 的 DAG(Directed Acyclic Graph)中,TimeSensor任务会在开始时检查时间,如果当前时间还没到凌晨 2 点,它会一直等待。一旦到达凌晨 2 点,它就会触发后续的数据提取任务,然后依次执行数据转换和加载任务,确保整个 ETL 流程按照预定的时间计划执行。

定期报表生成

场景描述:对于企业的定期报表任务,比如每周五下午 5 点生成本周的销售报表。销售数据在一天内不断更新,需要在特定时间点整合并生成报表。

实现方式:利用TimeSensor设置等待时间为每周五下午 5 点。当时间到达时,传感器触发报表生成任务,该任务可以从数据库中获取本周的销售数据,进行统计和格式化处理,最终生成销售报表,满足企业对数据时效性和定期性的要求。

系统维护任务调度

场景描述:在系统维护场景中,例如需要在每月的第一天凌晨进行系统日志清理和备份。这个任务需要在特定的日期和时间启动,以避免对日常业务产生影响。

实现方式:通过TimesSensor设定为每月 1 日凌晨的具体时间,如 0 点。在到达设定时间后,触发日志清理和备份任务,对系统日志进行清理和备份操作,保证系统的稳定性和数据的安全性。

TimeSensor举例

首先,假设已经安装并配置好 Airflow。在 Airflow 中,ETL 流程通常是通过定义 DAG(Directed Acyclic Graph)来实现的。DAG 是一组任务的集合,这些任务按照依赖关系和执行顺序排列。

例如,我们有一个简单的 ETL DAG,它包含三个任务:extract_data(提取数据)、transform_data(转换数据)和load_data(加载数据)。其中extract_data任务依赖于TimesSensor的触发。

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.time_sensor import TimesSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG('etl_with_timesensor', default_args=default_args, schedule_interval='0 2 * * *')

这里的schedule_interval='0 2 * * *'表示 DAG 每天在凌晨 2 点(按照 cron 表达式的格式)被调度。但我们还希望在 DAG 内部使用TimesSensor来更精细地控制任务启动时间。

wait_for_two_am = TimesSensor(
    task_id='wait_for_two_am',
    target_time='02:00:00',
    dag=dag
)

这个TimesSensor任务(wait_for_two_am)会等待直到时间到达凌晨 2 点(target_time='02:00:00')。

下面定义数据提取任务:

def extract_data():
    # 这里可以是实际的从数据库或其他数据源提取数据的代码
    print("Extracting data...")
    return "extracted_data"
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag
)

数据转换和加载任务:

def transform_data(data):
    # 实际的数据转换逻辑,如清洗、聚合等
    print("Transforming data...")
    return "transformed_data"
transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    op_args=[extract_task.output],
    dag=dag
)
def load_data(data):
    # 实际的加载数据到目标位置的逻辑,如数据库插入
    print("Loading data...")
load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    op_args=[transform_task.output],
    dag=dag
)

任务之间的依赖关系通过以下方式设置:

wait_for_two_am >> extract_task >> transform_task >> load_task

这意味着extract_data任务会在wait_for_two_am任务完成(即到达凌晨 2 点)后才开始执行,然后依次执行transform_dataload_data任务。

当 Airflow 调度器启动这个 DAG 时,TimeSensor首先检查当前时间。如果当前时间还没到凌晨 2 点,它会定期检查(由 Airflow 的内部机制控制检查间隔)。

一旦到达凌晨 2 点,TimesSensor任务完成,触发extract_data任务。extract_data任务提取数据后,将数据传递给transform_data任务进行转换,转换后的数据再传递给load_data任务进行加载,从而完成整个 ETL 流程。这种方式确保了 ETL 流程在每天凌晨 2 点准时开始,符合业务对数据提取时间的要求。

自定义TimeSensor行为

TimeSensor提供了几个参数,你可以使用它们来定制它的行为:

  • target_time:Sensor定义一天中触发条件为成功的时间(作为datetime.time对象)。
  • mode:Sensor的工作模式。默认情况下,它使用“poke”模式,定期检查所需的条件。
  • timeout:传感器在失败前等待所需条件满足的最大时间(以秒为单位)。缺省情况下,没有超时。
  • poke_interval:检查所需条件的时间间隔(以秒为单位)。默认值是60秒。

最佳实践

  • 使用描述性的task_id:为TimeSensor定义清晰和有意义的task_id,以提高dag的可读性和可维护性。

  • 设置适当的超时时间:为TimeSensor设置合理的超时时间,以避免任务无限期地等待特定的时间到达。这有助于防止资源耗尽,并确保如果在预期的时间范围内没有满足所需的条件,管道可以正常失败。

  • 调整戳间隔:根据您的具体用例自定义poke_interval。如果目标时间很遥远,您可能希望使用更长的时间间隔来避免过多的轮询。相反,如果您希望很快达到目标时间,那么较短的间隔可能更合适。

  • •考虑时区差异:当处理依赖于时间敏感事件或数据的任务时,请确保考虑到Airflow实例和时间敏感数据来源之间的任何时区差异。

总结

Apache Airflow TimeSensor是强大而通用的工具,用于管理数据管道中基于时间的依赖关系。通过了解它的各种用例和参数,你可以创建在特定时间执行任务的高效工作流,或者在继续之前等待一定数量的时间。当你继续使用Apache Airflow时,请记住利用TimeSensor的强大功能,来有效地监控和管理dag中时间驱动的依赖项,并构建健壮的、可扩展的数据管道。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2275515.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

libusb学习——简单介绍

文章目录 libusb 简介libusb 编译libusb 源码目录介绍核心代码文件平台支持例子 API使用libusb初始化和去初始化libusb设备处理和枚举libusb 杂项libusb USB描述符libusb 设备热插拔事件通知libusb 异步设备I/Olibusb 同步设备I/Olibusb 轮询与定时 libusb 涉及技术参考 libusb…

HTML5 网站模板

HTML5 网站模板 参考 HTML5 Website Templates

阿里云ios镜像源

阿里云镜像源:阿里巴巴开源镜像站-OPSX镜像站-阿里云开发者社区 下载centos7

【大数据】Apache Superset:可视化开源架构

Apache Superset是什么 Apache Superset 是一个开源的现代化数据可视化和数据探索平台,主要用于帮助用户以交互式的方式分析和展示数据。有不少丰富的可视化组件,可以将数据从多种数据源(如 SQL 数据库、数据仓库、NoSQL 数据库等&#xff0…

【2024年华为OD机试】 (A卷,100分)- 端口合并(Java JS PythonC/C++)

一、问题描述 题目描述 有 M 个端口组 (1 < M < 10)&#xff0c; 每个端口组是长度为 N 的整数数组 (1 < N < 100)&#xff0c; 如果端口组间存在 2 个及以上不同端口相同&#xff0c;则认为这 2 个端口组互相关联&#xff0c;可以合并。 输入描述 第一行输入端…

【灵码助力安全3】——利用通义灵码辅助智能合约漏洞检测的尝试

前言 随着区块链技术的快速发展&#xff0c;智能合约作为去中心化应用&#xff08;DApps&#xff09;的核心组件&#xff0c;其重要性日益凸显。然而&#xff0c;智能合约的安全问题一直是制约区块链技术广泛应用的关键因素之一。由于智能合约代码一旦部署就难以更改&#xf…

AOP实现操作日志记录

文章目录 1.common-log4j2-starter1.目录2.pom.xml 引入依赖3.LogAspect.java4.Log4j2AutoConfiguration.java Log4j2自动配置类条件注入切面类 2.common-log4j2-starter-demo 测试1.目录2.application.yml 启用日志切面3.TraceController.java4.结果 1.common-log4j2-starter …

分布式ID—雪花算法

背景 现在的服务基本是分布式、微服务形式的&#xff0c;而且大数据量也导致分库分表的产生&#xff0c;对于水平分表就需要保证表中 id 的全局唯一性。 对于 MySQL 而言&#xff0c;一个表中的主键 id 一般使用自增的方式&#xff0c;但是如果进行水平分表之后&#xff0c;多…

JavaEE之定时器及自我实现

在生活当中&#xff0c;有很多事情&#xff0c;我们不是立马就去做&#xff0c;而是在规定了时间之后&#xff0c;在到该时间时&#xff0c;再去执行&#xff0c;比如&#xff1a;闹钟、定时关机等等&#xff0c;在程序的世界中&#xff0c;有些代码也不是立刻执行&#xff0c;…

深入Android架构(从线程到AIDL)_23 活用IBinder接口于近程通信01

1、 在同一进程里活用IBinder接口 议题 1. myActivity对象是谁创建的呢? 2. myService对象是谁创建的呢? 3. 当myService类里有个f1()函数&#xff0c;如何去调用它呢? 4. 必须先取得myService对象的指针&#xff0c;才能调用f1()函数去存取对象的属性(Attribute)值。 …

vue3后台系统动态路由实现

动态路由的流程&#xff1a;用户登录之后拿到用户信息和token&#xff0c;再去请求后端给的动态路由表&#xff0c;前端处理路由格式为vue路由格式。 1&#xff09;拿到用户信息里面的角色之后再去请求路由表&#xff0c;返回的路由为tree格式 后端返回路由如下&#xff1a; …

如何开启苹果手机(IOS)系统的开发者模式?

如何开启开发者模式&#xff1f; 一、打开设置二、隐私与安全性三、找到开发者模式四、开启开发者模式------------------------------------------------------------如果发现没有开发者模式的选项一、电脑下载爱思助手二、连接手机三、工具箱——虚拟定位——打开虚拟定位——…

国产编辑器EverEdit - 扩展脚本:在当前文件目录下新建同类型文件

1 扩展脚本&#xff1a;在当前文件目录下新建同类型文件 1.1 应用场景 用户在进行编程语言学习时&#xff0c;比如&#xff1a;Python&#xff0c;经常做完一个小练习后&#xff0c;又需要新建一个文件&#xff0c;在新建文件的时候&#xff0c;不但要选择文件类型&#xff0c…

011:利用大津算法完成图片分割

本文为合集收录&#xff0c;欢迎查看合集/专栏链接进行全部合集的系统学习。 合集完整版请参考这里。 上一篇文章介绍了大津算法可以完成图片的前景和背景分割。 总的来说&#xff0c;大津算法的核心思想就两个&#xff1a; 数学上&#xff0c;通过确定一个像素阈值&#xf…

Jenkins触发器--在其他项目执行后构建

前言&#xff1a; jenkins中有多种触发器可用&#xff0c;可以方便的控制构建的启动 这里简单介绍下项目后构建的配置方法 1. 解释&#xff1a; Build after other projects are built Set up a trigger so that when some other projects finish building, a new build is…

PowerApps助力PowerBI实现数据写回

原文发布日期: 2019-08-01 06:03:50 0000 注&#xff1a;本文旨在介绍Power BI如何利用PowerApps实现用户在前端对数据源进行增删查改&#xff0c;关于此&#xff0c;你也可以在Google上找到更详细但较零散的资料 正文 在SSAS多维数据集中&#xff0c;开发者可以给数据开启&q…

oracle 19c安装

文章目录 一 环境配置1、更换yum源2、文件配置 二 oracle环境配置1、下载依赖包2、创建用户和用户组3、创建目录并赋予权限4、修改资源限制参数5、修改内核参数6、配置安全7、配置Oracle环境变量 三 安装Oracle数据库四 创建Oracle实例五 启动数据库 一 环境配置 1、更换yum源…

LabVIEW启动时Access Violation 0xC0000005错误

问题描述 在启动LabVIEW时&#xff0c;可能出现程序崩溃并提示以下错误&#xff1a;Error 0xC0000005 (Access Violation) ​ Access Violation错误通常是由于权限不足、文件冲突或驱动问题引起的。以下是解决此问题的全面优化方案&#xff1a; 解决步骤 1. 以管理员身份运行…

xilinx平台使用多个 FIFO 拼接

Xilinx FIFO IP 输入 的最大位宽 是 1024 bit &#xff0c;当需要缓存的数据是 1280bit 又或者是 1536等 。怎么办呢&#xff1f; 有一个办法就是拆数据&#xff0c;将1280拆5个 256bit输入&#xff0c;也就是可以使用 5个 256位宽输入的FIFO拼接起来。&#xff08;其它位宽也…

Ceph分布式存储集群,不仅仅是一个简单的对象存储解决方案

Ceph 作为 OpenStack 的存储后端 块存储&#xff08;Cinder 后端&#xff09; Ceph 的 RBD&#xff08;RADOS Block Device&#xff09;模块作为 OpenStack Cinder 服务的后端&#xff0c;为虚拟机提供块级别的存储资源。RBD 支持快照、克隆和恢复等功能&#xff0c;能够满足虚…