Airflow任务流调度

news2025/1/17 2:56:27

前言

Airflow是Airbnb内部发起的一个工作流管理平台。使用Python编写实现的任务管理、调度、监控工作流平台。Airflow的调度依赖于crontab命令,与crontab相比,Airflow可以方便地查看任务的执行状况(执行是否成功、执行时间、执行依赖等),可追踪任务历史执行情况,任务执行失败时可以收到邮件通知、查看错误日志。对于管理调度任务有很大的帮助。

crontab命令管理调度的方式总结来看存在以下几个方面的弊端:

1)在多任务调度执行的情况下,难以厘清任务间的依赖关系;

2)不便于查看当前执行到哪一个任务;

3)不便于查看调度流下每个任务执行的起止消耗时间,而这对于优化task作业是非常重要的;

4)不便于记录历史调度任务的执行情况,而这对于优化作业和排查错误是非常重要的;

5)执行任务失败时不便于查看执行日志,不方便定位报错的任务和接收错误告警邮件。

Airflow的官方文档地址是👇

http://airflow.apache.org/index.html,想使用Airflow管理调度任务的读者可反复研读官网文章,深入了解Airflow。

下面介绍在工程开发中如何去应用Airflow。

1 基础概念

在介绍Airflow这个调度工具前先介绍几个相关的基础概念。

DAG(Directed Acyclic Graph,有向无环图):

用于描述数据流的计算过程。

Operators:

描述了DAG中一个具体的task要执行的任务,如BashOperator为执行一条bash命令,EmailOperator用于发送邮件,HTTPOperator用于发送HTTP请求,PythonOperator用于调用任意的Python函数。

Task:

是Operator的一个实例,也就是DAG中的一个节点。

Task Instance:

记录task的一次运行。Task Instance有自己的状态,包括“running”“success”“failed”“skipped”“up for retry”等。

Triggher Rules:

指task的触发条件。

每一个节点可视为一个task,每个task用于执行一条任务,比如执行某个表的ETL加工。这些task调度任务按执行顺序的先后连接起来形成一个有向无环图

2 Airflow服务构成

一个正常运行的Airflow系统一般由以下几个服务构成。

1.WebServer

Airflow提供了一个可视化的Web界面,启动WebServer后,可以在Web界面上查看定义好的DAG并监控及改变其运行状况。也可以在Web界面中对一些变量进行配置。

2.Worker(Celery模式)

一般地,我们使用Celery Worker来执行具体作业。Worker可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker就会接收这个作业任务并开始执行。Airflow会自动在每个部署Worker的机器上同时部署一个Server Logs服务,这样就可以在Web界面上方便地查看分布在不同机器上的日志了。

3.Scheduler

整个Airflow的调度由Scheduler负责发起,每隔一段时间Scheduler就会检查所有定义完成的DAG和定义在其中的作业,如果有符合运行条件的作业,Scheduler就会发起相应的作业任务以供Worker接收。

4.Flower(Celery模式)

Flower提供了一个可视化界面用于监控所有Celery Worker的运行状况。

主要模块功能

通过Airflow的管理界面,可以了解其主要覆盖的功能模块。下面介绍Airflow主要覆盖的功能模块,这些模块在Airflow官网上有详细介绍。Airflow的工作流设计是有向无环图(DAG),在编写工作流时,需要考虑如何将任务划分为多个可独立执行的任务,然后将这些任务合并为一个逻辑整体,从而实现任务调度的结果。

1.DAG任务列表

首页中的DAG模块可以查看当前DAG的任务列表,包括当前有哪些DAG调度任务、哪些任务运行成功、哪些任务运行失败、哪些任务正在运行中。

图片

2.DAG调度状态图

在Tree View模块可以查看当前DAG每个task任务的调度状态,是执行成功、正在执行、执行失败还是等待执行等,便于快速定位到执行失败的任务,重新调启执行。

3.DAG有向无环图

在Graph View模块可以看到当前DAG中各task任务之间的依赖关系,以及各任务的执行状态。

图片

4.DAG执行脚本

在Code模块中可以查看当前DAG任务的执行脚本,包括任务的起始调度时间、调度失败后重试机制、各task任务之间的依赖关系等。当某个task执行出现问题时可通过查看该调度脚本定位原因。

图片

5.Gantt图

在Gantt模块中可以查看DAG调度的甘特图,通过甘特图可以查看每个task调度任务的起止时间、持续时长。方便查找到调度时间长的task任务,以便后续进行优化。

图片

3 脚本实例

在Airflow中,简单地说,task脚本是需要被一个个调起执行的脚本,DAG脚本是管理task脚本执行顺序、执行触发条件的。在Airflow调度开发中主要需要维护的是DAG脚本。下面通过一个具体的例子来了解:

在该脚本中,首先定义了需要引入的依赖包,定义了默认的参数配置及DAG参数和调度时间。其中default_args的默认配置中主要定义了如下参数。

Python
from airflow.operators.bash_operator import BashOperator
import airflow
from airflow.models import DAG
from airflow import operators
from airflow.contrib.hooks import SSHHook
from airflow.models import BaseOperator
from airflow.contrib.operators import SSHExecuteOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
import os
import sys
from datetime import timedelta,date,datetime
import pendulum
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'userprofile',
    'depends_on_past': False,
    'start_date': datetime(2023, 12, 01),
    'email': ['administer@testemail.com'],
    'email_on_failure': True ,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}
os.environ['SPARK_HOME'] = '/usr/local/spark-2.1.1-bin-hadoop2.6'
sys.path.Append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

dag = DAG(
    'user_dag',
    default_args=default_args,
    description='A user test',
    schedule_interval='00 07  * * *'
)
 

depends_on_past:是否依赖上游任务,即上一个调度任务执行失败时,是否执行该任务。可选项包括True和False,False表示当前执行脚本不依赖上游执行任务是否成功;

start_date:表示首次任务的执行日期;

email:设定当任务执行失败时,用于接收失败报警邮件的邮箱地址;

email_on_failure:当任务执行失败时,是否发送邮件。可选项包括True和False,True表示失败时将发送邮件;

retries:表示执行失败时是否重新调起任务执行,1表示会重新调起;

retry_delay:表示重新调起执行任务的时间间隔。

在DAG的定义中,除了引入上述的默认配置(default_args=default_args)外,还定义了该DAG脚本的dag_id为user_dag,定时调度时间为每天早上7点。中间两行参数为配置脚本运行的环境变量。

4 常用命令行

Airflow通过可视化界面的方式实现了调度管理的界面操作,但在测试脚本或界面操作失败的时候,可通过命令行的方式调起任务。下面介绍几个常用的命令。

airflow dags list:列出所有DAG

airflow list_tasks user:该命令用于查看当前DAG任务下的所有task列表,其中user是DAG名称。

airflow test user age_task 20230701:该命令用于测试DAG下面某个task是否能正常执行,其中user是DAG名称,age_task是其中一个task的名称。

5

Airflow常用Operator介绍

  • Python
    """
    ### Tutorial Documentation
    Documentation that goes along with the Airflow tutorial located
    [here](http://pythonhosted.org/airflow/tutorial.html)
    """
    import airflow
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import timedelta
     
     
    # these args will get passed on to each operator
    # you can override them on a per-task basis during operator initialization
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': airflow.utils.dates.days_ago(2),
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    }
     
    dag = DAG(
        'tutorial',
        default_args=default_args,
        description='A simple tutorial DAG',
        schedule_interval=timedelta(days=1))
     
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',   #这里也可以是一个 bash 脚本文件
        bash_command='date',
        dag=dag)
     
    t1.doc_md = """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    """
     
    dag.doc_md = __doc__
     
    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        dag=dag)
     
    templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
     
    t3 = BashOperator(
        task_id='templated',
        depends_on_past=False,
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag)
     
    t2.set_upstream(t1)
    t3.set_upstream(t1)

这里 t1 和 t2 都很容易理解,直接调用的是 bash 命令,其实也可以传入带路径的 bash 脚本, t3 使用了 Jinja 模板,"{% %}" 内部是 for 标签,用于循环操作。"{{ }}" 内部是变量,其中 ds 是执行日期,是 airflow 的宏变量,params.my_param 是自定义变量。根据官方提供的模板,稍加修改即可满足我们的日常工作所需。

PythonOperator

Python
from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
 
import time
from pprint import pprint
 
args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}
 
dag = DAG(
    dag_id='example_python_operator', default_args=args,
    schedule_interval=None)
 
 
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)
 
 
def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'
 
run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag)
 
# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag)
 
    task.set_upstream(run_this)

通过以上代码我们可以看到,任务 task 及依赖关系都是可以动态生成的,这在实际使用中会减少代码编写数量,逻辑也非常清晰,非常方便使用。PythonOperator 与 BashOperator 基本类似,不同的是 python_callable 传入的是 Python 函数,而后者传入的是 bash 指令或脚本。通过 op_kwargs 可以传入任意多个参数

SqoopOperator

SqoopOperator允许用户在 Airflow 工作流中集成 Apache Sqoop 作业,以便于在 Hadoop 分布式文件系统(HDFS)、关系型数据库管理系统(RDBMS)如 MySQL、PostgreSQL 或 Oracle 之间导入和导出数据。使用 SqoopOperator可以自动化数据迁移任务,提高数据处理流程的可维护性和灵活性

Python
sqoop_import = SqoopOperator(
    task_id='sqoop_import_data',
    conn_id='my_postgres_conn',  # Airflow中预先配置的数据库连接ID
    cmd_type='import',  # Sqoop操作类型,这里是导入
    table='example_table',  # 要导入的表名
    target_dir='/user/hadoop/sqoop_imports/example_table',  # HDFS目标目录
    num_mappers=2,  # 使用的Mapper数量
    splits_by='id',  # 分割数据的列名
    dag=dag,
)  

参数说明

conn_id: 引用Airflow中预先配置的数据库连接信息。

cmd_type: 指定Sqoop命令类型,如import或export。

table: 要操作的数据库表名。

target_dir: 导入数据的目标HDFS目录。

num_mappers: 并行执行任务的Mapper数量。

splits_by: 用于数据切分的列名,有助于提升导入效率。

其他可选参数如 where 用于指定导入数据的筛选条件,--direct 用于直接模式导入等,可根据需要添加。

BranchPythonOperator

BranchPythonOperator允许用户通过函数返回下一步要执行的task的id,从而根据条件选择执行的分支。它用于在工作流中根据特定条件动态选择下一个执行的任务。这个操作符通过执行一个Python函数来决定接下来执行哪一个任务,从而实现工作流的动态分支逻辑。

DummyOperator

作为一个虚拟的任务节点,使得DAG有一个起点,但实际不执行任务;或者是在上游几个分支任务的合并节点,为了清楚的现实数据逻辑。

HiveOperator

可以通过HiveOperato执行Hive SQL语句或脚本。它允许用户在Airflow工作流中方便地集成Hive作业,例如创建表、加载数据、执行查询等。

ExternalTaskSensor

Airflow中可以通过ExternalTaskSensor来完成跨DAG依赖。

跨DAG依赖管理:ExternalTaskSensor用于处理不同DAG之间的依赖关系。如果你的业务流程包含多个相互依赖的DAG,可以使用 ExternalTaskSensor 来确保上游DAG或其特定任务完成后,下游DAG的任务才开始执行。

SqlSensor

SqlSensor是 Apache Airflow 中的一个传感器(Sensor)操作符,它用于在工作流中等待直到特定的SQL查询返回预期的结果,之后才允许工作流继续执行。这种操作符非常适合于基于数据库状态的依赖控制,比如在执行下一步骤之前确保数据已就绪或满足特定条件。

MySqlOperator

MySqlOperator 是 Apache Airflow 中的一个操作符,它允许用户在 Airflow 工作流中执行 MySQL 数据库的相关操作,比如执行 SQL 查询、插入数据、更新表结构等。通过使用 MySqlOperator,你可以将数据库操作集成到自动化的工作流程中,实现数据处理、ETL 任务的编排与执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1832836.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Elixir学习笔记——自定义符号

Elixir 提供双引号字符串以及一个称为 charlists 的概念,它们使用 ~c“hello world”符号语法定义。在本章中,我们将了解有关符号的更多信息以及如何定义我们自己的符号。 Elixir 的目标之一是可扩展性:开发人员应该能够扩展语言以适应任何特…

Vatee万腾平台,让智能更懂你

在数字化浪潮席卷全球的今天,智能科技已经渗透到我们生活的方方面面。然而,真正的智能不仅仅是技术的堆砌,更是对人性需求的深刻理解和满足。Vatee万腾平台,正是这样一个让智能更懂你的平台,它以其独特的方式&#xff…

select 下拉框不可选

select 下拉框不可选 disabled和readonlyselect 下拉框不可选择CSS pointer-events 属性 通常情况下,设置表单输入框不可操作的时候会选择使用disabled或者readonly,那么disabled和readonly有什么区别呢? disabled和readonly 首先来说这两个…

vscode插件开发之 - 消息通信

在开发vscode插件过程中,有一个典型场景是webview与extension.ts进行通信,例如,webview上的某些信息发送改变时,需要发送消息传递给extension.ts. 如果使用react框架构建vscode插件的webview,如何实现webview与extensi…

怎么把两个音频合成一个?将两个音频合成一个的四种方法

怎么把两个音频合成一个?在当今数字化的时代,音频处理已经成为我们生活中不可或缺的一部分。有时候,我们会希望将两段音频合成为一个,无论是为了制作音乐混音、创作声音效果,还是为了编辑播客节目或视频配音。合成音频…

硕思闪客精灵软件最新版下载及详细安装教程

闪客精灵(Sothink SWF Decompiler)是一款先进的SWF反编译软件,它不但能捕捉、反编译、查看和提取Shock Wave Flash影片(.swf和.exe格式文件),而且可以将SWF格式文件转化为FLA格式文件。 安 装 包 获 取 地 …

CentOS搭建kubernetes集群详细过程(yum安装方式)

kubernetes集群搭建详细过程(yum安装方式) Kubernetes,也被称为K8s,是一个多功能的容器管理工具,它不仅能够协调和调度容器的部署,而且还能监控容器的健康状况并自动修复常见问题。这个平台是在谷歌十多年…

性能测试、负载测试、压力测试、稳定性测试简单区分【超详细】

🍅 视频学习:文末有免费的配套视频可观看 🍅 点击文末小卡片 ,免费获取软件测试全套资料,资料在手,涨薪更快 性能测试是一个总称,可细分为性能测试、负载测试、压力测试、稳定性测试。 性能测试…

XZ后门故事:初始分析

2024年3月29日,Openwall OSS安全邮件列表上的一条消息“炸醒”了整个信息安全、开源和Linux社区:XZ出现了一个CVSS评分10.0的恶意后门。 这个后门库的特殊危险在于OpenSSH服务器进程sshd使用它。在多个基于systemd的发行版上(包括Ubuntu、De…

二叉树-根据先序遍历和中序遍历序列重建二叉树

目录 一、问题描述 二、解题思路 1.首先明确先序遍历和中序遍历的性质: 2.确定根节点及左右子树 3.对子树进行递归操作 4.递归返回条件 三、代码实现 四、刷题链接 一、问题描述 二、解题思路 1.首先明确先序遍历和中序遍历的性质: 先序遍历&am…

基于ChatGPT-4o自然科学研究全流程实践技术应用

自然科学研究遵循严谨的科学方法论,包括文献调研、问题综述、试验设计、提出假设、数据清洗、统计诊断、大数据分析、经典统计模型(回归模型、混合效应模型、结构方程模型、Meta分析模型)、参数优化、机器/深度学习、大尺度模型构建与模拟、论…

Centos7 安装oracle 11.2.0.4

荆轲刺秦王 1. 准备工作 需要下载 Oracle 11g 安装包 2.HostName修改: hostnamectl set-hostname oracle 3. 配置hostname(本机IP映射)注意:192.168.116.129 需要换乘本地ip vi /etc/hosts 192.168.116.129 oracle # 测试hos…

企业用户使用OV SSL证书趋势增长

随着网络安全的需求度日益提高,https证书也成为了当下最受欢迎的数字证书之一,主要是用于保护网站和应用程序的安全,并提升用户对网站的信任度,且只有企业或组织才可申请。 OV SSL证书全称Organization Validation SSL(组织验证性…

【前端】Nesj 学习笔记

1、前置知识 1.1 装饰器 装饰器的类型 declare type ClassDecorator <TFunction extends Function>(target: TFunction) > TFunction | void; declare type PropertyDecorator (target: Object, propertyKey: string | symbol) > void; declare type MethodDe…

CMSIS-RTOS2简介

本文介绍CMSIS-RTOS2。 1.引入 CMSIS-RTOS2在基于Arm Cortex处理器的设备上运行的实时操作系统内核上指定了通用RTOS接口。应用程序和中间件组件可以使用CMSIS-RTOS2 API在各种软件生态系统中实现更好的代码重用和更简单的集成。 CMSIS-RTOS2还指定了RTOS内核使用的标准OS T…

机械师电脑文件丢失怎么办?6个恢复方法,希望能帮到您

机械师电脑作为高性能的计算机品牌&#xff0c;受到众多用户的青睐。然而&#xff0c;即便是品质卓越的电脑&#xff0c;也难免会遇到文件丢失的困扰。无论是由于误操作、系统故障还是硬盘损坏&#xff0c;文件丢失都可能给用户带来不小的麻烦。当您发现机械师电脑上的文件突然…

使用Midjourney为产品创建出色效果图-关键词

使用MJ为产品创建效果图并不难&#xff0c;可以使用这个固定提示词公式。 Mockup empty, blank [ product ], [ decorating items ] [ background or context ], [ 1- 3 descriptive style], [ color palette ] 创建产品形象 首先&#xff0c;你需要准备一个透明背景的产品。…

基于JSP的二手车交易网站

开头语&#xff1a; 你好呀&#xff0c;我是计算机学长猫哥&#xff01;如果你对二手车交易网站感兴趣或有相关开发需求&#xff0c;欢迎随时联系我。我的联系方式可以在文末找到。 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;JSPJava 工具&#…

IPA清洁棉签 IPA清洁擦拭棒:打印机头、电子设备等清洁的有力工具!

在数字化快速发展的今天&#xff0c;打印机头、电子设备等已经成为了我们日常生活和工作中不可或缺的一部分。然而&#xff0c;随着使用时间的增长&#xff0c;这些设备往往会因为灰尘、油渍等污染物的积累而影响其性能。此时&#xff0c;一款高效、便捷的清洁工具就显得尤为重…

可通过小球进行旋转的十字光标(vtkResliceCursor)

前一段事件看到VTK的一个例子&#xff1a; 该案例是vtk.js写的&#xff0c;觉得很有意思&#xff0c;个人正好也要用到&#xff0c;于是萌生了用C修改VTK源码来实现该功能的想法。原本以为很简单&#xff0c;只需要修改一下vtkResliceCursor就可以了&#xff0c;加上小球&#…