Apache Airflow 快速入门教程

news2024/12/26 1:50:39

Apache Airflow已经成为Python生态系统中管道编排的事实上的库。与类似的解决方案相反,由于它的简单性和可扩展性,它已经获得了普及。在本文中,我将尝试概述它的主要概念,并让您清楚地了解何时以及如何使用它。

Airflow应用场景

想象一下,你想要构建一个机器学习管道,它由以下几个步骤组成:

  • 从基于云的存储中读取图像数据集
  • 处理图像
  • 使用下载的图像训练深度学习模型
  • 将训练好的模型上传到云端
  • 部署模型

你将如何安排和自动化这个工作流程?Cron作业是一个简单的解决方案,但它也带来了许多问题。最重要的是,它们不允许你有效地扩展。Airflow提供了轻松调度和扩展复杂数据流程编排的能力,另一方面,它还能够在故障后自动重新运行它们,管理它们的依赖关系,并使用日志和仪表板监视它们。

在构建上述数据流之前,让我们先了解Apache Airflow 的基本概念。

Airflow 简介

Apache Airflow 是一个开源的平台,用于编排、调度和监控工作流,工作流是由一系列任务(Tasks)组成的,这些任务可以是数据处理、数据分析、机器学习模型训练、文件传输等各种操作。因此,它是ETL和MLOps用例的理想解决方案。示例用例包括:

  • 从多个数据源提取数据,对其进行聚合、转换,并将其存储在数据仓库中。
  • 从数据中提取见解并将其显示在分析仪表板中
  • 训练、验证和部署机器学习模型

核心组件

在默认版本中安装Apache Airflow 时,你将看到四个不同的组件。

  • Webserver: Webserver是Airflow的用户界面(UI),它允许您在不需要CLI或API的情况下与之交互。从那里可以执行和监视管道,创建与外部系统的连接,检查它们的数据集等等。
  • 执行器:执行器是管道运行的机制。有许多不同类型的管道在本地运行,在单个机器中运行,或者以分布式方式运行。一些例子是LocalExecutor, SequentialExecutor, CeleryExecutor和KubernetesExecutor
  • 调度器:调度器负责在正确的时间执行不同的任务,重新运行管道,回填数据,确保任务完成等。
  • PostgreSQL:存储所有管道元数据的数据库。这通常是Postgres,但也支持其他SQL数据库。

安装Airflow最简单的方法是使用docker compose。你可以从这里下载官方的docker撰写文件:

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.5.1/docker-compose.yaml'

在这里插入图片描述

基本概念

要学习Apache Airflow,必须熟悉它的主要概念,这些概念可能有点难理解,让我们试着揭开它们的神秘面纱。

DAGs

所有管道都定义为有向无环图(dag)。每次执行DAG时,都会创建一个单独的运行。每个DAG运行都是独立的,并且包含一个关于DAG执行阶段的状态。这意味着相同的dag可以并行执行多次。

要实例化DAG,可以使用DAG函数或与上下文管理器一起使用,如下所示:

from airflow import DAG
with DAG(
    "mlops",
    default_args={
        "retries": 1,
     },
    schedule=timedelta(days=1),
    start_date=datetime(2023, 1, 1)
) as dag:

# dag code goes here

上下文管理器接受一些关于DAG的全局变量和一些默认参数。默认参数被传递到所有任务中,并且可以在每个任务的基础上重写。完整的参数列表可以在官方文档中找到。

在本例中,我们定义DAG将从2023年1月1日开始,并且每天执行一次。retries参数确保在可能出现故障后重新运行一次。

task(任务)

DAG的每个节点表示一个Task,即一段单独的代码。每个任务可能有一些上游和下游依赖项。这些依赖关系表示任务如何相互关联以及它们应该以何种顺序执行。每当初始化一个新的DAG运行时,所有任务都初始化为Task实例。这意味着每个Task实例都是给定任务的特定运行。

在这里插入图片描述

operator(任务模板)

操作符可以被视为预定义任务的模板,因为它们封装了样板代码并抽象了它们的大部分逻辑。常见的操作符有BashOperator、PythonOperator、MySqlOperator、S3FileTransformOperator。我们看到,操作符可以定义遵循特定模式的任务。例如,MySqlOperator创建任务来执行SQL查询,而BashOperator执行bash脚本。

操作符在DAG上下文管理器中定义,如下所示。下面的代码创建了两个任务,一个执行bash命令,另一个执行MySQL查询。

with DAG(
	"tutorial"
) as dag:

    task1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    task2 = MySqlOperator(
        task_id="load_table",
        sql="/scripts/load_table.sql"
    )

任务依赖

为了形成DAG的结构,我们需要定义每个任务之间的依赖关系。一种方法是使用>>符号,如下所示:

task1 >> task2 >> task3
# 一个任务有多个依赖
task1 >> [task2, task3]
# 也可以使用set_downstream, set_upstream
t1.set_downstream([t2, t3])

xcom

xcom,或相互通信,负责任务之间的通信。xcom对象可以在任务之间推拉数据。更具体地说,它们将数据推入元数据数据库,其他任务可以从中提取数据。这就是为什么可以通过它们传递的数据量是有限的。但是,如果需要传输大数据,则可以使用合适的外部数据存储,例如对象存储或NoSQL数据库。

看看下面的代码。这两个任务使用ti参数(任务实例的缩写)通过xcom进行通信。train_model任务将model_path推入元数据数据库,元数据由deploy_model任务拉出。

dag = DAG(
    'mlops_dag',
)

def train_model(ti):
    model_path = train_and_save_model()
    ti.xcom_push(key='model_path', value=model_path)

def deploy_model(ti):
    model_path = ti.xcom_pull(key='model_path', task_ids='train_model')
    deploy_trained_model(model_path)

train_model_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag
)

deploy_model_task = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_model,
    dag=dag
)

train_model_task >> deploy_model_task

Taskflow

Taskflow API是一种使用Python装饰器@task来定义任务的简单方法。如果所有任务的逻辑都可以用Python编写,那么一个简单的注释就可以定义一个新任务。Taskflow自动管理其他任务之间的依赖关系和通信。

使用Taskflow API,我们可以用@dag装饰器初始化DAG。下面是使用Tashflow示例:

@dag(
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily'
)
def mlops():

    @task
    def load_data():
        . . .
        return df

    @task
    def preprocessing(data):
       . . .
       return data

    @task
    def fit(data): 
        return None

    df = load_data()
    data = preprocessing(df)
    model = fit(data)
    
dag = mlops()

注意,任务之间的依赖关系是通过每个函数参数隐含的。这里我们是简单的连接顺序,但实际可以变得复杂得多。Taskflow API还解决了任务之间的通信问题,因此使用xcom的需求有限。

调度

作业调度是Airflow的核心功能之一。这可以使用schedule_interval参数完成,该参数接收cron表达式,表示日期时间对象,或预定义变量,如@hour, @daily等。更灵活的方法是使用最近添加的时间表,它支持使用Python定义自定义时间表。

下面是如何使用schedule_interval参数的示例。以下DAG将每天执行。

@dag(
    start_date=datetime(2023,1,1),
    schedule_interval = '@daily',
    catchup =False
)
def my_dag():
    pass

关于调度,需要了解两个非常重要的概念:回填(backfill)和追赶(catchup)。

一旦我们定义了DAG,我们就设置了开始日期和计划间隔。如果catchup=True,则Airflow 将为从开始日期到当前日期的所有计划间隔创建DAG运行。如果catchup=False,气流将只从当前日期调度运行。

回填扩展了这个想法,使我们能够在CLI中创建过去的运行,而不管catchup参数的值:

$ airflow backfill  -s <START_DATE> -e <END_DATE> <DAG_NAME>

连接

Airflow 提供了一种简单的方法来配置与外部系统或服务的连接。可以使用UI、作为环境变量或通过配置文件创建连接。它们通常需要URL、身份验证信息和唯一id。钩子(Hooks )是一种API,它抽象了与这些外部系统的通信。例如,我们可以通过如下的UI定义一个PostgreSQL连接:

在这里插入图片描述

然后使用PostgresHook来建立连接并执行我们的查询:

pg_hook = PostgresHook(postgres_conn_id='postgres_custom')

conn = pg_hook.get_conn()
cursor = conn.cursor()
cursor.execute('create table _mytable (ModelID int, ModelName varchar(255)')
cursor.close()
conn.close()

高级概念

为了使本教程尽可能完整,我需要提到一些更高级的概念。我不会详细介绍每一个,但我强烈建议你看看他们,如果你想深入掌握Airflow 。

  • 分支:分支允许你将任务划分为许多不同的任务,如:支持条件处理不同任务的工作流。最常见的方法是BranchPythonOperator。
  • 任务组:任务组可以在单个组中组织多个任务。它是简化图形视图和重复模式的好工具。
  • 动态包:包和任务也可以以动态的方式构造。从Airflow 2.3开始,可以在运行时创建包和任务,这对于并行和依赖输入的任务来说是理想的。气流也支持Jinja模板,并且是对动态包非常有用的补充。
  • 单元测试和日志记录:气流具有运行单元测试和记录信息的专用功能.

Airflow最佳实践

在我们看到实际操作的示例之前,让我们讨论一下大多数从业者使用的一些最佳实践。

  • 幂等性:dag和任务应该是幂等的。使用相同的输入重新执行相同的DAG运行应该始终具有与执行一次相同的效果。
  • 原子性:任务应该是原子性的。每个任务应该负责一个操作,并且独立于其他任务
  • 增量过滤:每个DAG运行应该只处理一批支持增量提取和加载的数据。这样,可能出现的故障就不会影响整个数据集。
  • 顶级代码:如果不是用于创建操作符或标记,则应避免使用顶级代码,因为它会影响性能和加载时间。所有代码都应该在任务内部,包括导入包、数据库访问和繁重的计算。
  • 复杂性:dag应尽可能保持简单,因为高复杂性可能会影响性能或调度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253006.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【OpenAI库】从0到1深入理解Python调用OpenAI库的完整教程:从入门到实际运用

文章目录 Moss前沿AI一、初识OpenAI API1.1 获取API-Key&#xff08;两种方案&#xff09;1.2 安装OpenAI库 二、Python调用OpenAI API的基础设置2.1 设置API密钥和Base URL2.2 参数详解 三、构建一个简单的聊天应用3.1 创建聊天请求3.2 参数详解3.3 处理响应 四、完整代码示例…

42 基于单片机的智能浇花系统

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机&#xff0c;采样DHT11温湿度传感器检测温湿度&#xff0c;通过LCD1602显示 4*4按键矩阵可以设置温度湿度阈值&#xff0c;温度大于阈值则开启水泵&#xff0c;湿度大于阈值则开启风扇…

typecho 添加主题备份及恢复功能

typecho 换主题很简单&#xff0c;但是确有一个比较麻烦的事情&#xff0c;就是主题配置在切换主题的同时也就被删除了。于是&#xff0c;今天我下决心要弄一个备份恢复的功能出来。网上查了很久&#xff0c;都没有找到适合的&#xff08;不过还是有参考价值的&#xff09;。最…

docker部署RustDesk自建服务器

客户端&#xff1a; Releases rustdesk/rustdesk GitHub 服务端&#xff1a; 项目官方地址&#xff1a;GitHub - rustdesk/rustdesk-server: RustDesk Server Program 1、拉取RustDesk库 docker pull rustdesk/rustdesk-server:latest 阿里云库&#xff1a; docker pu…

从零开始了解推荐系统(算法构建、召回、粗排、精排、重排、冷启动、衡量标准)

算法构建 推荐算法流程 实际上是一种信息处理逻辑&#xff0c;当获取了用户与内容的信息之后&#xff0c;按照一定的逻辑处理信息后&#xff0c;产生推荐结果。热度排行榜就是最简单的一种推荐方法&#xff0c;依赖的逻辑是当一个内容被大多数用户喜欢&#xff0c;那么大概率…

【第 1 章 初识 C 语言】1.8 使用 C 语言的 7 个步骤

目录 1.8 使用 C 语言的 7 个步骤 1.8.1 第 1 步&#xff1a;定义程序的目标 1.8.2 第 2 步&#xff1a;设计程序 1.8.3 第 3 步&#xff1a;编写代码 1.8.4 第 4 步&#xff1a;编译 1.8.5 第 5 步&#xff1a;运行程序 1.8.6 第 6 步&#xff1a;测试和调试程序 1.8.…

基于Matlab卡尔曼滤波的GPS/INS集成导航系统研究与实现

随着智能交通和无人驾驶技术的迅猛发展&#xff0c;精确可靠的导航系统已成为提升车辆定位精度与安全性的重要技术。全球定位系统&#xff08;GPS&#xff09;和惯性导航系统&#xff08;INS&#xff09;在导航应用中各具优势&#xff1a;GPS提供全球定位信息&#xff0c;而INS…

C++知识整理day3类与对象(下)——赋值运算符重载、取地址重载、列表初始化、友元、匿名对象、static

文章目录 1.赋值运算符重载1.1 运算符重载1.2 赋值运算符重载 2.取地址重载2.1 const成员函数2.2 取地址运算符重载 3.类与对象的补充3.1 再探构造函数---初始化列表3.2 类型转换3.3 static成员3.4 友元3.5 内部类3.6 匿名对象3.7 对象拷贝时的编译器优化 1.赋值运算符重载 赋…

深入解析级联操作与SQL完整性约束异常的解决方法

目录 前言1. 外键约束与级联操作概述1.1 什么是外键约束1.2 级联操作的实际应用场景 2. 错误分析&#xff1a;SQLIntegrityConstraintViolationException2.1 错误场景描述2.2 触发错误的根本原因 3. 解决方法及优化建议3.1 数据库级别的解决方案3.2 应用层的解决方案 4. 友好提…

dns实验3:主从同步-完全区域传输

服务器192.168.234.111&#xff08;主服务器&#xff09;&#xff0c;打开配置文件&#xff1a; 打开配置文件&#xff1a; 关闭防火墙&#xff0c;改宽松模式&#xff1a; 重启服务&#xff1a; 服务器192.168.234.112&#xff08;从服务器&#xff09;&#xff0c;打开配置文…

LeetCode刷题 -- 分治快排

目录 颜色分类题目解析算法原理代码 排序数组题目解析算法原理代码 数组中第K个最大元素题目解析算法原理代码 LCR 159. 库存管理 III题目解析算法原理代码 颜色分类 题目链接 题目解析 数组分为三块 算法原理 1.如果nums[i] 0&#xff0c;left, i下标对应元素交换&#xff0c…

【论文笔记】Leveraging the Power of MLLMs for Gloss-Free Sign Language Translation

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: Leveraging the Power of …

TsingtaoAI具身智能高校实训方案通过华为昇腾技术认证

日前&#xff0c;TsingtaoAI推出的“具身智能高校实训解决方案-从AI大模型机器人到通用具身智能”基于华为技术有限公司AI框架昇思MindSpore&#xff0c;完成并通过昇腾相互兼容性技术认证。 TsingtaoAI&华为昇腾联合解决方案 本项目“具身智能高校实训解决方案”以实现高…

如何抓取亚马逊页面动态加载的内容:Python爬虫实践指南

引言 在现代电商领域&#xff0c;数据的重要性不言而喻。亚马逊作为全球领先的电商平台&#xff0c;其页面上动态加载的内容包含了丰富的商品信息。然而&#xff0c;传统的爬虫技术往往难以应对JavaScript动态加载的内容。本文将详细介绍如何使用Python结合Selenium工具来抓取…

tcpdump抓包wireshark分析

背景 分析特定协议的数据包&#xff0c;如 HTTP、DNS、TCP、UDP 等&#xff0c;诊断网络问题&#xff0c;例如连接故障、延迟和数据包丢失。 大概过程 1.安装tcpdump yum update yum install tcpdump2.抓包&#xff0c;从当前时间起&#xff0c;一小时后停止&#xff0c…

如何进行Appium实现移动端UI自动化测试呢?

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 Appium是一个开源跨平台移动应用自动化测试框架。 既然只是想学习下Appium如何入门&#xff0c;那么我们就直奔主题。文章结构如下&#xff1a; 为什么要使用…

骨架行为识别-论文复现

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

Unity 设计模式-观察者模式(Observer Pattern)详解

观察者模式 观察者模式&#xff08;Observer Pattern&#xff09;是一种行为型设计模式&#xff0c;它定义了对象之间的一对多依赖关系。当一个对象的状态发生变化时&#xff0c;它的所有依赖者&#xff08;观察者&#xff09;都会收到通知并自动更新。这种模式用于事件处理系…

【webApp之h5端实战】首页评分组件的原生实现

关于评分组件,我们经常在现代前端框架中用到,UI美观效果丰富,使用体验是非常不错的。现在自己动手使用原生js封装下评分组件,可以用在自己的项目中。 组件实现原理 点击的❤左侧包括自己都是高亮的样式,右侧都是灰色的样式,这样就能把组件的状态区分开了。右边再加上辅…

unity与android拓展

一.AndroidStudio打包 1.通过Unity导出Android Studio能够打开的工程 步骤 1.设置导出基本信息&#xff1a;公司名、游戏名、图标、包名等关键信息 2.在File——>Build Settings中&#xff0c;勾选 Export Project 选项 3.点击Export 导出按钮 2.在Android Studio中打开Un…