Apache Airflow (九) :Airflow Operators及案例之BashOperator及调度Shell命令及脚本

news2025/1/8 1:22:10

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. BashOperator 调度Shell命令案例

2. BashOperator 调度Shell脚本案例


Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用参数如下:

task_id(str) : 唯一task_id标记



owner(str):任务的所有者,建议使用linux用户名



email(str or list[str]):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。



email_on_retry(bool):当任务重试时是否发送电子邮件



email_on_failure(bool):当任务执行失败时是否发送电子邮件



retries(int):在任务失败之前应该重试的次数



retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象



start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。



end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。



depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。



dag(airflow.models.DAG):指定的dag。



execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。



trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。

BashOperator主要执行bash脚本或命令,BashOperator参数如下:

bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)

1. BashOperator 调度Shell命令案例

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'email':'kettle_test1@163.com', #pwd:kettle123456
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_cmd',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

t1=BashOperator(
    task_id='print_date',
    bash_command='date',
    dag = dag
)

t2=BashOperator(
    task_id='print_helloworld',
    bash_command='echo "hello world!"',
    dag=dag
)

t3=BashOperator(
    task_id='tempplated',
    bash_command="""
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ params.name}}"
        echo "{{ params.age}}"
    {% endfor %}
    """,
    params={'name':'wangwu','age':10},
    dag=dag
)

t1 >> t2 >> t3

注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5

此外,配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

2. BashOperator 调度Shell脚本案例

准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bash

dt=$1

echo "==== execute first shell ===="

echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bash

dt=$1

echo "==== execute second shell ===="

echo "---- second : time is ${dt}"

编写airflow python 配置:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_sh',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=BashOperator(
    task_id='first',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag = dag
)

second=BashOperator(
    task_id='second',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag=dag
)

first >> second

执行结果:

特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1222988.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

手机LiDAR-based激光雷达标定板提高无人汽车智能化程度

手机LiDAR-based 3D扫描和建模测试系统是一种利用激光雷达(LiDAR)技术进行三维扫描和模型创建的工具,它可以在手机上运行。这种测试系统可以用于各种应用,如地形测绘、建筑物建模、机器人视觉、无人驾驶汽车导航等。 手机LiDAR-ba…

【Linux网络编程】高级I/O

目录 五种I/O模型 阻塞和非阻塞 非阻塞I/O I/O多路复用之Select、Poll、与Epoll 本文目的是深入浅出理解高级I/O相关的知识,结尾附上代码加深理解相关知识。 五种I/O模型 1.阻塞I/O:在内核将数据准备好之前,系统调用会一直等待。所有的套…

CTFHub | Cookie注入,UA注入,Refer注入,过滤空格(利用hackbar插件)

Cookie注入 Cookie 注入原理 Cookie 注入的原理也和其他注入一样,只不过是将提交的参数以 Cookie 方式提交,而一般的注入是使用 GET 或者 POST 方式提交,GET 方式提交就是直接在网址后面加上需要注入的语句,POST 方式则是通过表单…

【计算机基础】优雅的PPT就应该这样设计

📢:如果你也对机器人、人工智能感兴趣,看来我们志同道合✨ 📢:不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 📢:文章若有幸对你有帮助,可点赞 👍…

【Java SE】继承

学习完了类之后,我们将继续学习一个Java中的重点内容“继承” 继承 1.1 为什么需要继承 举例: 在Cat类中和Dog类中我们发现有很多一样的地方,这样写太浪费空间和内存了 我们可以把它相同的地方都用一个类来表示,并且使用它1.2 继…

PCL 半径滤波剔除噪点(二)

目录 一、算法原理二、注意事项三、代码实现一、算法原理 PCL半径滤波是删除在输入的点云一定范围内没有达到足够多领域的所有数据点。通俗的讲:就是以一个点p给定一个范围r,领域点要求的个数为m,r若在这个点的r范围内部的个数大于m则保留,小于m则删除。因此,使用该算法时…

【MATLAB源码-第82期】基于matlab的OFDM系统载波频移偏差(CFO)估计,对比三种不同的方法。

操作环境: MATLAB 2013b 1、算法描述 正交频分复用(OFDM)系统中的载波频率偏移(CFO)估计是一项关键技术,用于确保数据传输的准确性和效率。CFO通常由于振荡器频率不匹配和多普勒频移引起。不同的CFO估计…

Docker Volume: 实现容器间数据共享与持久化的利器

文章目录 Docker Volume的作用Docker Volume与容器内数据的比较优势劣势 Docker Volume的创建和管理创建Docker Volume管理Docker Volume 演示Docker Volume的挂载Docker Volume的生命周期安全性考虑与Docker Volume应用场景Docker Volume与多容器协作容器迁移与Docker Volume未…

电子学会2023年6月青少年软件编程(图形化)等级考试试卷(二级)真题,含答案解析

青少年软件编程(图形化)等级考试试卷(二级) 一、单选题(共25题,共50分) 1. 运行下列哪段程序,可以让狗狗走到木屋门口?( ) A.

vite vue3安装element-plus

准备 参考 安装 官网 yarn add element-plus完整引入 如果你对打包后的文件大小不是很在乎,那么使用完整导入会更方便。 main.ts // main.ts import { createApp } from vue import ElementPlus from element-plus import element-plus/dist/index.css import…

数据资产到底如何入表?

2024年1月1日起,财政部《企业数据资源相关会计处理暂行规定》正式施行,距离现在只有一个多月的时间。 数据资源入表意味着企业可以将数据资源确认为企业资产负债表中“资产”一项。对于拥有丰富数据资源的企业来说,有望在财务报表中体现其真…

C++软件开发面试场景题

自己在秋招过程中遇到的一些场景题 海量数据N取Top K个元素,复杂度是多少 在处理海量数据中获取前K个元素(Top K)的问题中,通常会使用一些高效的算法来减少时间和空间复杂度。以下是两种常见的解决方案和它们的复杂度&#xff1…

VS中修改解决方案名称和项目名称

如何修改visual studio2019中的项目名 - 知乎 (zhihu.com) 查了很多,还是这个可行。虽然文中说不是最简单的,但在所查找资料中是可行且最简单的。 要点主要是: 1、比如我们复制一个解决方案,最好是带代码哈,也就是添…

Python 爬虫入门

文章目录 Python 爬虫入门requests 库beautifulsoup4库函数findall(),find()函数get() 爬虫实例 1:抓小说爬虫实例 2:抓豆瓣 top 250 的电影信息后记 Python 爬虫入门 Python 的爬虫功能使得程序员可以快速抓取并分析网页中的信息&#xff0…

【Nacos】配置管理、微服务配置拉取、实现配置热更新、多环境配置

🐌个人主页: 🐌 叶落闲庭 💨我的专栏:💨 c语言 数据结构 javaEE 操作系统 Redis 石可破也,而不可夺坚;丹可磨也,而不可夺赤。 Nacos 一、nacos实现配置管理1.1 统一配置管…

C#WPF用户控件及自定义控件实例

本文演示C#WPF自定义控件实例 用户控件(UserControl)和自定义控件(CustomControl)都是对UI控件的一种封装方式,目的都是实现封装后控件的重用。 只不过各自封装的实现方式和使用的场景上存在差异。 1 基于UserControl 创建 创建控件最简单一个方法就是基于UserControl …

基于Vue+SpringBoot的高校学生管理系统 开源项目

项目编号: S 029 ,文末获取源码。 \color{red}{项目编号:S029,文末获取源码。} 项目编号:S029,文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 学生管理模块2.2 学院课程模块2.3 学…

java智慧校园信息管理系统源码带微信小程序

一、智慧校园的定义 智慧校园指的是以云计算和物联网为基础的智慧化的校园工作、学习和生活一体化环境。以各种应用服务系统为载体,将教学、科研、管理和校园生活进行充分融合,让校园实现无处不在的网络学习、融合创新的网络科研、透明高效的校务治理、…

利用X6 制作一个简单的流程图工具

介绍 项目模版使用 我自己基于 arco-design 封装的一个 B 端项目模版 。 地址:https://github.com/duKD/antv-x6-org 运用 antv/X6 : https://x6.antv.antgroup.com/ 来实现 一个简单的流程图工具 项目预览: 功能 支持框选 alt鼠标左键…

linux课程第一课------命令的简单的介绍

作者前言 🎂 ✨✨✨✨✨✨🍧🍧🍧🍧🍧🍧🍧🎂 ​🎂 作者介绍: 🎂🎂 🎂 🎉🎉&#x1f389…