Airflow:HttpSensor实现API驱动数据流程

news2025/1/6 22:26:02

数据管道工作流通常依赖于api来访问、获取和处理来自外部系统的数据。为了处理这些场景,Apache Airflow提供了HttpSensor,这是一个内置的Sensor,用于监视HTTP请求的状态,并在满足指定条件时触发后续任务。在这篇博文中,我们将深入探讨HttpSensor,涵盖它的特性、用例、实现、自定义和最佳实践。

介绍HttpSensor

定义HttpSensor是 Airflow 中的一个传感器(Sensor)。传感器在 Airflow 中用于等待某些条件满足后再继续执行后续任务。HttpSensor专门用于检查 HTTP 端点是否返回预期的状态码,以此来判断某个 HTTP 服务是否可用或者某个网页是否可以正常访问等。
在这里插入图片描述

工作原理:它会发送 HTTP 请求到指定的端点(URL),然后检查响应的状态码。如果状态码符合预期(默认是 200),则传感器任务完成,允许工作流继续执行后续任务;如果状态码不符合预期或者请求出现错误(如连接超时、网络问题等),则传感器会按照一定的策略(如重试策略)不断地重新检查,直到满足条件或者达到重试上限。

应用场景:

HttpSensor 典型应用场景主要有以下几类:

  • 在数据管道方面,用于 ETL 流程中,在开始阶段检查数据来源(如 Web 服务、API)是否可用,确保数据能顺利提取,还用于数据仓库更新时监控外部数据源。
  • 在微服务架构里,用于服务依赖监控,检查一个微服务所依赖的其他微服务是否正常运行,同时在容器化环境的服务发现中,确定新部署或扩展后的微服务是否可访问。
  • 对于网页和 Web 应用,一是进行网站可用性监测,定期检查网站主要页面状态码,及时发现故障;二是用于内容更新验证,检查内容更新后页面是否可访问、内容是否更新成功,以保障 Web 应用正常运行。

HttpSensor示例

  • 环境准备

    假设已经安装并配置好 Airflow。如果没有,请先安装 Airflow 并初始化数据库(例如,使用airflow initdb命令)。

  • 定义 DAG(有向无环图)和 HttpSensor 任务

    以下是简单的 Airflow DAG 示例,其中包含一个HttpSensor任务,用于检查一个示例网站(这里以https://www.example.com为例)是否可以正常访问。

from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.sensors.http_sensor import HttpSensor
from datetime import datetime, timedelta

# 设置默认参数
default_args = {
    'owner': 'airflow',
   'start_date': datetime(2023, 1, 1),
   'retries': 3,
   'retry_delay': timedelta(minutes=5)
}
# 创建DAG对象
dag = DAG(
    'http_sensor_example',
    default_args=default_args,
    schedule_interval='@daily'
)
# 创建HttpSensor任务
check_website = HttpSensor(
    task_id='check_website',
    http_conn_id='http_default',
    endpoint='https://www.example.com',
    poke_interval=60,  # 每60秒检查一次
    dag=dag
)

# 创建一个简单的后续任务,用于在网站可访问后执行
def print_message():
    print("网站可以正常访问!")
    
send_message = SimpleHttpOperator(
    task_id='send_message',
    http_conn_id='http_default',
    endpoint='/',
    method='GET',
    python_callable=print_message,
    dag=dag
)
# 设置任务依赖关系
check_website >> send_message

代码解释

  • 导入模块

    首先导入必要的 Airflow 模块,包括DAG用于定义工作流,SimpleHttpOperator用于发送简单的 HTTP 请求操作(这里用于后续的简单演示),HttpSensor用于检查 HTTP 端点的传感器,以及日期时间相关的模块用于设置工作流的开始日期等参数。

  • 设置默认参数

    定义default_args字典,其中包含工作流的所有者(owner)、开始日期(start_date)、重试次数(retries)和重试延迟(retry_delay)等参数。这些参数将被应用到 DAG 中的所有任务。

  • 创建 DAG 对象

    使用DAG类创建一个名为http_sensor_example的 DAG 对象,指定了之前定义的默认参数和调度间隔(schedule_interval),这里设置为每天执行一次(@daily)。

  • 创建 HttpSensor 任务

    创建HttpSensor任务,命名为check_websitehttp_conn_id通常用于指定 Airflow 中预定义的 HTTP 连接配置(这里使用http_default,可以根据实际情况修改和配置),endpoint是要检查的 HTTP 端点的 URL。poke_interval表示检查的间隔时间,单位是秒。

  • 创建后续任务

    定义一个简单的函数print_message,用于在网站可以正常访问后打印一条消息。然后使用SimpleHttpOperator创建一个名为send_message的任务,这个任务在check_website任务成功后执行,它会发送一个简单的 GET 请求到网站根目录(/),并且在请求时调用print_message函数。

  • 设置任务依赖关系

    通过check_website >> send_message设置任务的依赖关系,确保check_website任务成功完成后才会执行send_message任务。

运行和监控 DAG

  • 启动 Airflow 服务

    启动 Airflow 的 Web 服务器(airflow webserver)和调度器(airflow scheduler),以便可以在 Web 界面中查看和管理 DAG。

  • 在 Web 界面中操作

    打开 Airflow 的 Web 界面(通常是http://localhost:8080),在 DAG 列表中找到http_sensor_example,可以手动触发 DAG 执行,或者等待按照调度间隔自动执行。在执行过程中,可以在任务实例页面查看HttpSensor任务的状态,观察它是否成功检查到网站可以正常访问,以及后续任务是否正确执行。

请注意,在实际使用中,可能需要根据具体的网络环境、要检查的 HTTP 端点的特性以及业务需求等因素,调整HttpSensor的参数(如poke_intervalretriesretry_delay等)和其他相关任务的设置。同时,要确保 Airflow 的配置正确,并且具有访问目标 HTTP 端点的网络权限。

自定义HttpSensor 行为

HttpSensor提供了几个参数,你可以用它们来定制它的行为:

  • http_conn_id: HTTP服务器的连接ID,引用Airflow界面中建立的连接。

  • endpoint:发送请求的API端点(路径)。

  • method:用于请求的HTTP方法(例如,‘GET’、‘POST’、‘PUT’等)。

  • headers:要包含在HTTP请求中的header字典。

  • data:在POST和PUT等方法的请求体中发送的数据。

  • response_check:一个Python可调用函数(例如lambda函数),它接受HTTP响应作为参数并返回bool值

  • mode:传感器的工作模式。默认情况下,它使用“poke”模式,定期检查所需的条件。

  • timeout:传感器在失败前等待所需条件满足的最大时间(以秒为单位)。缺省情况下,没有超时。

  • poke_interval:检查所需条件之间的时间间隔(以秒为单位)。默认值是60秒。
    在这里插入图片描述

最佳实践

  • 使用描述性的task_id:确保为HttpSensors使用清晰且有意义的task_id,以提高dag的可读性和可维护性。

  • 设置适当的超时:为HttpSensor设置合理的超时,以避免让任务无限期地等待API可用或完成处理。这有助于防止资源耗尽,并确保如果在预期的时间范围内没有满足所需的条件,管道可以正常失败。

  • 调整间隔:根据具体用例自定义poke_interval。如果API的响应时间不确定,你可能希望使用更长的间隔来避免过多的轮询。相反,如果希望API快速响应,则较短的间隔可能更合适。

  • 处理API身份验证:如果你的API需要身份验证,请确保在HTTP连接设置中设置适当的身份验证方法(例如,基本身份验证,令牌身份验证等)。

  • 使用response_check可调用对象:始终定义一个response_check可调用对象,它准确地反映了HttpSensor所需的条件。这允许传感器在继续执行下一个任务之前确定API的响应是否满足要求。

总结

Apache Airflow HttpSensor是功能强大的通用工具,用于监控数据管道中外部api的状态。通过了解它的各种用例和参数,你可以创建高效的工作流,可以在继续之前等待特定的API条件得到满足。在继续使用Apache Airflow 时,请记住利用HttpSensor的强大功能来有效地监视和管理dag中api驱动的依赖项。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2271299.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

活动预告 | Microsoft Power Platform 在线技术公开课:实现业务流程自动化

课程介绍 参加“Microsoft Power Platform 在线技术公开课:实现业务流程自动化”活动,了解如何更高效地开展业务。参加我们举办的本次免费培训活动,了解如何借助 Microsoft AI Builder 和 Power Automate 优化工作流。结合使用这些工具可以帮…

【SpringBoot教程】搭建SpringBoot项目之编写pom.xml

🙋大家好!我是毛毛张! 🌈个人首页: 神马都会亿点点的毛毛张 👏今天毛毛张分享的内容主要是Maven 中 pom 文件🆕,涵盖基本概念、标签属性、配置等内容 文章目录 1.前言🥭2.项目基本…

职场常用Excel基础04-二维表转换

大家好,今天和大家一起分享一下excel的二维表转换相关内容~ 在Excel中,二维表(也称为矩阵或表格)是一种组织数据的方式,其中数据按照行和列的格式进行排列。然而,在实际的数据分析过程中,我们常…

ASA第六天笔记

Botnet Traffic Filter简介 1.僵死网络流量过滤特性是一个基于名誉的机制,用于阻止流量源自于或者去往已知的感染主机。 2.僵死网络流量过滤比较每一个连接中的源和目的IP地址。 动态SensorBase数据库,被Cisco动态更新。静态数据库,需要手动…

【ArcGISPro/GeoScenePro】检查多光谱影像的属性并优化其外观

数据 https://arcgis.com/sharing/rest/content/items/535efce0e3a04c8790ed7cc7ea96d02d/data 操作 其他数据 检查影像的属性 熟悉检查您正在使用的栅格属性非常重要。

MySQL图形化界面工具--DataGrip

之前介绍了在命令行进行操作,但是不够直观,本次介绍图形化界面工具–DataGrip。 安装DataGrip 官网链接:官网下载链接 常规的软件安装流程。 参考链接:DataGrip安装 使用DataGrip 添加数据源: 第一次使用最下面会…

企业微信——智能表格学习

智能表格 应用限制条件 获取 token https://developer.work.weixin.qq.com/document/10013#%E5%BC%80%E5%8F%91%E6%AD%A5%E9%AA%A4 开发步骤 你可以通过以下步骤,使用access_token来访问企业微信的接口。需要注意的是,所有的接口需使用Https协议、Js…

调试:用电脑开发移动端网页,然后用手机真机调试

一、背景 电脑开发移动端,然后想真机调试... 二、实现 2.1、电脑和手机链接相同局域网 2.2、pnpm run dev 启动项目 2.3、浏览器访问 localhost:3001/login 2.4、Windowsr 输入cmd,在cmd输入 ipconfig 2.5、浏览器访问 ip地址加/login 2.6、手机端…

华为ensp-BGP路由过滤

学习新思想,争做新青年,今天学习的是BGP路由过滤 实验目的: 掌握利用BGP路由属性AS_Path进行路由过滤的方法 掌握利用BGP路由属性Community进行路由过滤的方法 掌握利用BGP路由属性Next_Hop进行路由过滤的方法 实验内容: 本实…

【书籍连载】《软件测试架构实践与精准测试》| 有关软件测试模型的调查结果

各位软件领域的精英们,今天小编邀请你继续深入学习《软件测试架构实践与精准测试》。 《软件测试架构实践与精准测试》是作者李龙(安畅检测首席技术专家)基于软件测试“川模型”的著作。本书结合作者首次提出的软件测试新的模型“川模型”测试…

nginx学习之路-windows系统安装nginx

文章目录 1. 下载2. 启动3. 验证参考文档 1. 下载 官方下载地址:https://nginx.org/en/download.html 可以下载windows版本,如nginx-1.26.2.zip。解压后,加入系统变量。 2. 启动 可以使用命令行启动(windows系统自带的cmd可能…

word中编号统一格式

不要手敲编号,要利用工具来。要善于利用多级编号和编号,分别对标题和段落进行组织 尤其是段落和标题特别多的时候,像毕设、标书这些 为什么呢?因为这样更方便修改,后续的增加和删除段落,编号会自动排列&am…

MQ-导读

什么是MQ? MQ是一款消息中间件,通常被称为"消息队列",用于分布式架构中上下文的异步通信, 由三个角色组成: 1. 消息提供者:发送消息的人 2. 消息接收者:接收、处理消息的人 3. 消息代理者&#x…

深入剖析MySQL数据库架构:核心组件、存储引擎与优化策略(四)

慢查询日志,顾名思义,就是查询慢的日志,是指mysql记录所有执行超过long_query_time(默认的时间10秒)参数设定的时间阈值的SQL语句的日志。该日志能为SQL语句的优化带来很好的帮助。默认情况下,慢查询日志是…

MySQL数据库笔记——版本号机制和CAS(Compare And Swap)

大家好,这里是Good Note,关注 公主号:Goodnote,本文详细介绍乐观锁的两种实现方式:版本号机制和CAS(Compare And Swap)。 文章目录 MySQL 内置的并发控制机制MVCC(多版本并发控制&am…

使用 commitlint 和 husky 检查提交描述是否符合规范要求

在上一小节中,我们了解了 Git hooks 的概念,那么接下来我们就使用 Git hooks 来去校验我们的提交信息。 要完成这么个目标,那么我们需要使用两个工具: 注意:npm 需要在 7.x 以上版本。 1. commitlint 用于检查提交信…

使用函数求e的近似值(PTA)C语言

自然常数e可以用级数11/1!1/2!⋯1/n!来近似计算。本题要求实现一个计算阶乘的简单函数,使得可以利用该函数,对给定的非负整数n,求该级数的前n1项和。 函数接口定义: double fact( int n ); 其中n是用户传入的参数,函…

使用Clion在ubuntu上进行交叉编译,并在Linux上远程编译五子棋

目录 1.工具以及概念介绍 (1)Clion软件简介 (2)交叉编译 (3)远程编译 2.操作原理 3.详细操作步骤 (1)配置Clion与虚拟机ubuntu的ssh连接 CLion远程开发Ubuntu,并显…

ubuntu如何禁用 Snap 更新

.禁用 Snap 更新(通过修改 snapd 配置) 打开并编辑 /etc/apt/apt.conf.d/50unattended-upgrades文件。 这个文件控制自动更新的行为。 sudo vim /etc/apt/apt.conf.d/50unattended-upgrades 里面有一行将里面的auto改为false即可禁用更新:…

SpringBoot - Spring Profiles 详解

文章目录 Pre官方文档Spring Profiles 详解1. 基本用法2. 激活 Profiles3. 添加 Active Profiles4. Profile Groups5. 在代码中设置 Profiles6. Profile 特定的配置文件 总结 Pre SpringBoot - Spring Boot 中的配置体系Profile全面解读 SpringBoot - spring.profiles.active…