ModuleNotFoundError: No module named ‘dags’
原因:airflow默认是从dags目录下开始搜所有模块,如果你加上dags目录名,就相当于在dags目录下找dags包。
解决方法:导入的时候,去掉dags,详细可以参考下面案例
.
└── airflow_project
├── airflow.cfg
├── dags
│ ├── ceshi
│ │ ├── data_sensor.py
│ │ ├── first.py
│ │ ├── __init__.py
│ │ ├── test.sh
│ │ └── virtual.py
│ ├── __init__.py
│ ├── modules # 自定义模块或者函数
│ │ ├── __init__.py
│ │ └── utils.py
├── logs
│ └── scheduler
│ ├── 2023-10-19
├── plugins
│ ├── date_utils.py
│ ├── event_listener.py
└── webserver_config.py
目录结构如上面所示,我们只需要把自定义模块放dags目录里,然后调用就可以,代码如下:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# 导入自定义函数
from modules.utils import get_next_monday
def get_func():
a = get_next_monday()
print(a)
return a
with DAG('module_test',tags=['test'], schedule='* */1 * * *', start_date=datetime(2023, 10,18), catchup=False) as dag:
python_openrator = PythonOperator(task_id='py_ceshi', python_callable=get_func)
python_openrator