🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客
🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。
🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频
目录
1. DAG任务依赖设置一
2. DAG任务依赖设置二
3. DAG任务依赖设置三
4. DAG任务依赖设置四
5. DAG任务依赖设置五
1. DAG任务依赖设置一
- DAG调度流程图
- task执行依赖
A >> B >>C
- 完整代码
'''
airflow 任务依赖关系设置一
'''
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # 拥有者名称
'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5), # 失败重试间隔
}
dag = DAG(
dag_id = 'dag_relation_1', #DAG id ,必须完全由字母、数字、下划线组成
default_args = default_args, #外部定义的 dic 格式的参数
schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)
A = BashOperator(
task_id='A',
bash_command='echo "run A task"',
dag=dag
)
B = BashOperator(
task_id='B',
bash_command='echo "run B task"',
dag=dag
)
C = BashOperator(
task_id='C',
bash_command='echo "run C task"',
dag=dag,
retries=3
)
A >> B >>C
2. DAG任务依赖设置二
- DAG调度流程图
- task执行依赖
[A,B] >>C >>D
- 完整代码
'''
airflow 任务依赖关系设置二
'''
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # 拥有者名称
'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5), # 失败重试间隔
}
dag = DAG(
dag_id = 'dag_relation_2', #DAG id ,必须完全由字母、数字、下划线组成
default_args = default_args, #外部定义的 dic 格式的参数
schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)
A = BashOperator(
task_id='A',
bash_command='echo "run A task"',
dag=dag
)
B = BashOperator(
task_id='B',
bash_command='echo "run B task"',
dag=dag
)
C = BashOperator(
task_id='C',
bash_command='echo "run C task"',
dag=dag,
retries=3
)
D = BashOperator(
task_id='D',
bash_command='echo "run D task"',
dag=dag
)
[A,B] >>C >>D
3. DAG任务依赖设置三
- DAG调度流程图
- task执行依赖
[A,B,C] >>D >>[E,F]
- 完整代码
'''
airflow 任务依赖关系设置三
'''
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # 拥有者名称
'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5), # 失败重试间隔
}
dag = DAG(
dag_id = 'dag_relation_3', #DAG id ,必须完全由字母、数字、下划线组成
default_args = default_args, #外部定义的 dic 格式的参数
schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)
A = BashOperator(
task_id='A',
bash_command='echo "run A task"',
dag=dag
)
B = BashOperator(
task_id='B',
bash_command='echo "run B task"',
dag=dag
)
C = BashOperator(
task_id='C',
bash_command='echo "run C task"',
dag=dag,
retries=3
)
D = BashOperator(
task_id='D',
bash_command='echo "run D task"',
dag=dag
)
E = BashOperator(
task_id='E',
bash_command='echo "run E task"',
dag=dag
)
F = BashOperator(
task_id='F',
bash_command='echo "run F task"',
dag=dag
)
[A,B,C] >>D >>[E,F]
4. DAG任务依赖设置四
- DAG调度流程图
- task执行依赖
A >>B>>C>>D
A >>E>>F
- 完整代码
'''
airflow 任务依赖关系设置四
'''
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # 拥有者名称
'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5), # 失败重试间隔
}
dag = DAG(
dag_id = 'dag_relation_4', #DAG id ,必须完全由字母、数字、下划线组成
default_args = default_args, #外部定义的 dic 格式的参数
schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)
A = BashOperator(
task_id='A',
bash_command='echo "run A task"',
dag=dag
)
B = BashOperator(
task_id='B',
bash_command='echo "run B task"',
dag=dag
)
C = BashOperator(
task_id='C',
bash_command='echo "run C task"',
dag=dag,
retries=3
)
D = BashOperator(
task_id='D',
bash_command='echo "run D task"',
dag=dag
)
E = BashOperator(
task_id='E',
bash_command='echo "run E task"',
dag=dag
)
F = BashOperator(
task_id='F',
bash_command='echo "run F task"',
dag=dag
)
A >>[B,C,D]
A >>[E,F]
5. DAG任务依赖设置五
- DAG调度流程图
- task执行依赖
A >>B>>E
C >>D>>E
- 完整代码
'''
airflow 任务依赖关系设置五
'''
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow', # 拥有者名称
'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
'retries': 1, # 失败重试次数
'retry_delay': timedelta(minutes=5), # 失败重试间隔
}
dag = DAG(
dag_id = 'dag_relation_5', #DAG id ,必须完全由字母、数字、下划线组成
default_args = default_args, #外部定义的 dic 格式的参数
schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
)
A = BashOperator(
task_id='A',
bash_command='echo "run A task"',
dag=dag
)
B = BashOperator(
task_id='B',
bash_command='echo "run B task"',
dag=dag
)
C = BashOperator(
task_id='C',
bash_command='echo "run C task"',
dag=dag,
retries=3
)
D = BashOperator(
task_id='D',
bash_command='echo "run D task"',
dag=dag
)
E = BashOperator(
task_id='E',
bash_command='echo "run E task"',
dag=dag
)
A >>B>>E
C >>D>>E