Airflow官网+自测源代码举例
- 1.准备
- 1.1 安装
- 1.2 查询DAG目录
- 2.官方
- 3.自测
- 4.总结
官方网站地址: https://airflow.apache.org/docs/apache-airflow/2.7.2/,本文是基于
2.7.2
版本进行的说明。
1.准备
1.1 安装
上一篇的 Quick Start 有详细的安装过程,这里做最简安装启动:
# 1.设置安装目录
export AIRFLOW_HOME=~/airflow
# 2.创建虚拟环境并安装
# 创建并切换到airflow虚拟环境
conda create -n airflow python=3.8
conda activate airflow
pip install "apache-airflow==2.7.2"
# 3.前台启动【在虚拟环境下】
airflow standalone
1.2 查询DAG目录
- dags_folder目录将Python文件放置到
[root@tcloud airflow]# cat airflow.cfg | grep dags_folder
dags_folder = /root/airflow/dags
2.官方
- 安装任务所需的依赖【代码执行所需要的依赖】
conda install scikit-learn
- 官方举例文件
demo.py
放置到dags_folder
路径下
文件内容如下:
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
# A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:
# Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")
@task()
def airflow():
print("airflow")
# Set dependencies between tasks
hello >> airflow()
- 刷新页面即可看到DAG,点击即可执行,效果如下:
3.自测
- 创建测试文件
airflow_test.py
并放置到dags_folder
目录下
文件内容如下:
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# DAG属性定义
default_args = {
'owner': 'airFlowTest',
'depends_on_past': False,
'start_date': days_ago(31),
# 填入邮箱,方便失败、重试时发送邮件
'email': ['xxxxx@qq.com'],
# 失败时发邮件告警
'email_on_failure': True,
'email_on_retry': False,
# 重试次数
'retries': 1,
'retry_delay': timedelta(minutes=2),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),bu
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
'trigger_rule': 'all_success'
}
# 定义dag
dag = DAG(
'air_test',
default_args=default_args,
description='A simple airflow test',
schedule_interval=timedelta(days=1),
)
step_one = BashOperator(
task_id='step_one',
bash_command='echo step_one over! >> /root/airflow/file/airflowtest.log',
dag=dag,
)
step_two = BashOperator(
task_id='step_two',
depends_on_past=False,
bash_command='echo step_two over! >> /root/airflow/file/airflowtest.log',
retries=3,
dag=dag,
)
step_three = BashOperator(
task_id='step_three',
depends_on_past=False,
bash_command='echo step_three over! >> /root/airflow/file/airflowtest.log',
retries=3,
dag=dag,
)
step_four = BashOperator(
task_id='step_four',
depends_on_past=False,
bash_command='echo step_four over! >> /root/airflow/file/airflowtest.log',
retries=3,
dag=dag,
)
step_one >> step_two >> step_four
- 执行结果
[root@tcloud file]# pwd
/root/airflow/file
[root@tcloud file]# cat airflowtest.log
step_one over!
step_three over!
step_two over!
step_four over!
执行结果说明:step_one和step_three
是同时开始执行的,step_one、tep_two、step_four
是按顺序执行的。
4.总结
- 使用
bash_command
的可操作空间就比较大了 - airflow的语法需要进行学习