Airflow用于ETL的四种基本运行模式, 2022-11-20

news2025/1/20 1:02:05

(2022.11.20 Sun)
基本运行模式(pattern)是data pipeline使用Airflow的DAG的不同结构,基本模式有如下四种 :

  • 序列Sequence
  • 平行拆分Parallel split
  • 同步Synchronisation
  • 单选Exclusive choice

序列模式

序列模式即若干task按先后顺序依次执行,在运行代码上 表示为task_1 >> task_2 >> ...

dag = DAG(
    dag_id='sequential_pattern',
    default_args={ 
        'start_date': utils.dates.days_ago(1),
    },
    schedule_interval=None,
)

with dag:
    read_input = DummyOperator(task_id='read_input')

    aggregate_data = DummyOperator(task_id='generate_data')

    write_to_redshift = DummyOperator(task_id='write_to_redshift')

    read_input >> aggregate_data >> write_to_redshift

Parallel split

parallel split

parallel split模式用于在分支的情况。比如当数据集备好之后,需要被加载进入多个不同的tasks,且都是同一个pipeline中,如同数据进入不同的分支。

分支在DAG中的表示为task_1 >> [task_2, task_3]
案例如

dag = DAG(
    dag_id='pattern_parallel_split',
    default_args={
        'start_date': utils.dates.days_ago(1),
    },
    schedule_interval=None,
)

with dag:
    read_input = DummyOperator(task_id='read_input')

    aggregate_data = DummyOperator(task_id='generate_data')

    convert_to_parquet = DummyOperator(task_id='convert_to_parquet')

    convert_to_avro = DummyOperator(task_id='convert_to_avro')

    read_input >> aggregate_data >> [convert_to_parquet, convert_to_avro]

Sychronisation

与parallel split相似,在同步模式中,不同branch的结果汇聚(reconciliation)在一个task中,不同的branch执行并行计算,并将结果整合。

synchronization

DAG的代码表达中,同步模式可拆解为在每个for loop中执行顺序模式,即

for xx in xxx:
    task_0 >> task_i >> task_2

代码实例

dag = DAG(
    dag_id='pattern_synchronization',
    default_args={
        'start_date': utils.dates.days_ago(1),
    },
    schedule_interval=None,
)

with dag:
    convert_to_parquet = DummyOperator(task_id='convert_to_parquet')
    for hour in range(0, 24):
        read_input = DummyOperator(task_id='read_input_hour_{}'.format(hour))

        aggregate_data = DummyOperator(task_id='generate_data_hour_{}'.format(hour))

        read_input >> aggregate_data >> convert_to_parquet

单选

根据预先设定的条件,在分支部分选择不同的task执行。

exclusive choice

在Apache Airflow中,可通过BranchOpertor对象执行分支单选命令。BranchOperator对象指定的方法,其返回值可用于指定对分支的选择,而task_id用于标识分支的名字。参考如下案例。

dag = DAG(
    dag_id='pattern_exclusive_choice',
    default_args={
        'start_date': utils.dates.days_ago(1),
    },
    schedule_interval=None,
)

with dag:
    def route_task():
        execution_date = context['execution_date']
        return 'convert_to_parquet'if execution_date.minute % 2 == 0 else 'convert_to_avro'
 

    read_input = DummyOperator(task_id='read_input')

    aggregate_data = DummyOperator(task_id='generate_data')

    route_to_format = BranchPythonOperator(task_id='route_to_format', python_callable=route_task)

    convert_to_parquet = DummyOperator(task_id='convert_to_parquet')

    convert_to_avro = DummyOperator(task_id='convert_to_avro')

    read_input >> aggregate_data >> route_to_format >> [convert_to_parquet, convert_to_avro]

Reference

1 ETL data patterns with Apache Airflow, waitingforcode, by BARTOSZ KONIECZNY

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/41080.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中远通在创业板IPO过会:前三季度收入11亿元,罗厚斌为董事长

近日,深圳证券交易所创业板披露的新显示,深圳市核达中远通电源技术股份有限公司(下称“中远通”)获得上市委会议通过。据贝多财经了解,中远通于2021年6月30日在创业板递交申请。 本次冲刺创业板上市,中远通…

以go rabbitmq为例子--用最少的时间最好的掌握消息队列

为什么要使用消息队列? 流量削峰 举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的…

向QTableView单元格插入窗体小部件的功能实现

1.前言 我们知道:QTableWidget类有如下函数: void QTableWidget::setCellWidget(int row, int column, QWidget *widget) 可以实现在指定的单元格插入窗体部件QWidget对象,如下代码: setCellWidget(row, column, new QLineEdi…

2023年天津财经大学珠江学院专升本管理学原理专业考试大纲

天津财经大学珠江学院2023年高职升本科专业课考试《管理学原理》考试大纲一、本大纲系天津财经大学珠江学院2023年高职升本科《管理学原理》课程考试大纲。所列考试范围出自徐碧琳主编的教材《管理学原理(第二版)》,机械工业出版社&#xff0…

刨根问底 Kafka,面试过程真好使

大家好,这里是 菜农曰,欢迎来到我的频道。 充满寒气的互联网如何在面试中脱颖而出,平时积累很重要,八股文更不能少!下面带来的这篇 Kafka 问答希望能够在你的 offer 上增添一把🔥。 Kafka最初是由Linkedin公…

【掌握K8S集群部署】手把手真正实现Kubernetes集群的配置与部署(附问题解决方法)

1、环境准备 IPHOSTNAME10.10.20.15k8s110.10.20.16k8s210.10.20.17k8s3 注意hostname不要用下划线、小数点与字母。 2、环境配置(所有节点) # stop firewalld systemctl stop firewalld systemctl disable firewalld# disable selinux sed -i s/enfo…

读 RocketMQ 源码,学习并发编程三大神器

笔者是 RocketMQ 的忠实粉丝,在阅读源码的过程中,学习到了很多编程技巧。 这篇文章,笔者结合 RocketMQ 源码,分享并发编程三大神器的相关知识点。 1 CountDownLatch 实现网络同步请求 CountDownLatch 是一个同步工具类&#xff…

高性能MySQL-创建高性能索引

什么是索引 MySQL并没有统一的索引标准,不同存储引擎的索引的工作方式并不一样,也不是所有的存储引擎都支持所有类型的索引。即使多个存储引擎支持同一种类型的索引,其底层的实现也可能不同。 索引是存储引擎用于快速找到记录的一种数据结构&…

LeetCode 图解 | 206.反转链表(附有知识点回顾)

206.反转链表题目描述思路分析递归方式代码实现迭代方式(非递归)代码实现知识点回顾题目描述 给你单链表的头节点 head ,请你反转链表,并返回反转后的链表。 /*** Definition for singly-linked list.* public class ListNode {*…

AtCoder Beginner Contest 279 G. At Most 2 Colors(计数/组合数学/dp递推)

题目 n(2<n<1e6)个格子&#xff0c;从左到右一字排开&#xff0c; 现在需要给格子涂色&#xff0c;有c(1<c<1e9)种颜色&#xff0c; 要求连续的k(2<k<n)个格子的颜色数最多只有2种&#xff0c; 求方案数&#xff0c;答案对998244353取模 思路来源 TOYO…

3. HTML的语法规范

3. HTML的语法规范 3.1.2 注释的作用和写法 ➢ 注释的作用&#xff1a; ​ •为代码添加的具有解释性、描述性的信息&#xff0c;主要用来帮助开发人员理解代码 ​ •浏览器执行代码时会忽略所有的注释 ➢ 注释的快捷键&#xff1a; ​ • 在VS Code中&#xff1a;ctrl / 3.1.…

中国的LPR改革及其意义

中国的LPR改革及其意义 – 潘登同学的宏观经济学笔记 文章目录中国的LPR改革及其意义 -- 潘登同学的宏观经济学笔记LPR的两次改革为什么需要LPR改革LPR改革的意义LPR的两次改革 LPR&#xff1a;商业银行对其最优质的客户执行的贷款利率 LPR在我国经历了两次改革&#xff0c;一…

PyTorch中torch.gather()函数

一. torch.gather()函数 官方文档&#xff1a;torch.gather函数&#xff0c;定义&#xff1a;从原tensor中获取指定dim和指定index的数据。 看到这个核心定义&#xff0c;我们很容易想到gather()的基本想法其实就类似从完整数据中按索引取值般简单&#xff0c;比如下面从列表中…

Kafka - 07 Zookeeper中存储的 Kafka 信息

我们在前面的文章中搭建了 Kafka 集群&#xff0c;分别是伪集群和真实的集群&#xff1a; Kafka - 03 Kafka安装 | 单机环境搭建 | 伪集群环境搭建 (一台虚拟机) Kafka - 06 Kafka 集群环境搭建&#xff08;三台虚拟机&#xff09; 两种方式中&#xff0c;Zookeeper 存储的 …

微机-------8086/8088寻址方式

目录 8086/8088寻址方式8086/8088寻址方式 1、立即寻址 立即数只能作为源操作数 MOV AL,80H MOV AX,306AH2、寄存器寻址 8位操作数的寄存器可以是:AH、AL、BH、BL、CH、CL、DH、DL 16位操作数的寄存器可以是:

如何录制视频?有了这款视频录制软件,粉丝多了,转发量也起来了

一款好用的视频录制软件&#xff0c;可以为视频录制工作带来便捷&#xff0c;高效的解决如何录制视频的难题。视频经济时代&#xff0c;很多人都用视频录制软件来进行视频录制&#xff0c;并分享到社交平台。但是很多时候&#xff0c;往往是两极分化&#xff0c;有的人粉丝几百…

[操作系统笔记]内存管理1

内容系听课复习所做笔记&#xff0c;图例多来自课程截图 覆盖与交换 下图为覆盖技术图示&#xff1a; 交换技术和中级调度密切相关&#xff1a; 中级调度&#xff08;内存调度&#xff09;&#xff1a;就是要决定将哪个处于挂起状态的进程重新调入内存 在哪保存被换出的进程 具…

m基于遗传优化算法的公式参数拟合matlab仿真

目录 1.算法描述 2.仿真效果预览 3.MATLAB核心程序 4.完整MATLAB 1.算法描述 遗传算法的原理 遗传算法GA把问题的解表示成“染色体”&#xff0c;在算法中也即是以二进制编码的串。并且&#xff0c;在执行遗传算法之前&#xff0c;给出一群“染色体”&#xff0c;也即是假…

MySQL窗口函数

窗口函数在统计类的需求中很常见&#xff0c;稍微复杂一点的查询需求就有可能用到它&#xff0c;使用窗口函数可以极大的简化我们的 SQL 语句。像 Oracle、SQL Server 这些数据库在较早的版本就支持窗口函数了&#xff0c;MySQL 直到 8.0 版本后才支持它。 一般来说涉及复杂的分…

pytorch深度学习实战lesson29

第二十九课 深度学习硬件 这节课讲一下深度学习的硬件。具体来讲一下所谓的 CPU 和 GPU有什么区别&#xff0c;为什么 GPU 会快&#xff1f; 目录 CPU GPU 首先大家如果学习深度学习的话基本上都有一个GPU的电脑&#xff0c;如果你自己装一台机器的话&#xff0c;很有可能是…