Apache Zeppelin结合Apache Airflow使用1

news2025/1/23 10:28:33

Apache Zeppelin结合Apache Airflow使用1

文章目录

  • Apache Zeppelin结合Apache Airflow使用1
  • 前言
  • 一、安装Airflow
  • 二、使用步骤
    • 1.目标
    • 2.编写DAG
    • 2.加载、执行DAG
  • 总结


前言

之前学了Zeppelin的使用,今天开始结合Airflow串任务。

Apache Airflow和Apache Zeppelin是两个不同的工具,各自用于不同的目的。Airflow用于编排和调度工作流,而Zeppelin是一个交互式数据分析和可视化的笔记本工具。虽然它们有不同的主要用途,但可以结合使用以满足一些复杂的数据处理和分析需求。

下面是一些结合使用Airflow和Zeppelin的方式:

  1. Airflow调度Zeppelin Notebooks:

    • 使用Airflow编写调度任务,以便在特定时间或事件触发时运行Zeppelin笔记本。
    • 在Airflow中使用Zeppelin的REST API或CLI命令来触发Zeppelin笔记本的执行。
  2. 数据流管道:

    • 使用Airflow编排数据处理和转换任务,例如从数据源提取数据、清理和转换数据。
    • 在Zeppelin中创建笔记本,用于进一步的数据分析、可视化和报告生成。
    • Airflow任务完成后,触发Zeppelin笔记本执行以基于最新数据执行分析。
  3. 参数传递:

    • 通过Airflow参数传递,将一些参数值传递给Zeppelin笔记本,以便在不同任务之间共享信息。
    • Zeppelin笔记本可以从Airflow任务中获取参数值,以适应特定的数据分析需求。
  4. 日志和监控:

    • 使用Airflow监控工作流的运行情况,查看任务的日志和执行状态。
    • 在Zeppelin中记录和可视化Airflow工作流的关键指标,以获得更全面的工作流性能洞察。
  5. 整合数据存储:

    • Airflow可以用于从不同数据源中提取数据,然后将数据传递给Zeppelin进行进一步的分析。
    • Zeppelin可以使用Airflow任务生成的数据,进行更深入的数据挖掘和分析。

结合使用Airflow和Zeppelin能够充分发挥它们各自的优势,实现更全面、可控和可视化的数据处理和分析工作流。


一、安装Airflow

安装参考:
https://airflow.apache.org/docs/apache-airflow/stable/start.html

CentOS 7.9安装后启动会报错,还需要配置下sqlite,参考:https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database

[root@slas bin]# airflow standalone
Traceback (most recent call last):
  File "/root/.pyenv/versions/3.9.10/bin/airflow", line 5, in <module>
    from airflow.__main__ import main
  File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/__init__.py", line 52, in <module>
    from airflow import configuration, settings
  File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 2326, in <module>
    conf.validate()
  File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 718, in validate
    self._validate_sqlite3_version()
  File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 824, in _validate_sqlite3_version
    raise AirflowConfigException(
airflow.exceptions.AirflowConfigException: error: SQLite C library too old (< 3.15.0). See https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database

二、使用步骤

1.目标

我想做个简单的demo,包括两个节点,实现如图所示功能,读取csv,去重:
在这里插入图片描述
csv文件输入在airflow上实现,去重在zeppelin上实现。

2.编写DAG

先实现extract_data_script.py,做个简单的读取csv指定列数据写入新的csv文件。

import argparse
import pandas as pd

def extract_and_write_data(date, output_csv, columns_to_extract):
    # 读取指定列的数据
    csv_file_path = f"/home/works/datasets/data_{date}.csv"
    df = pd.read_csv(csv_file_path, usecols=columns_to_extract)

    # 将数据写入新的 CSV 文件
    df.to_csv(output_csv, index=False)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--date", type=str, required=True, help="Date parameter passed by Airflow")
    args = parser.parse_args()

    # 输出 CSV 文件路径(替换为实际的路径)
    output_csv_path = "/home/works/output/extracted_data.csv"

    # 指定要提取的列
    columns_to_extract = ['column1', 'column2', 'column3']

    # 调用函数进行数据提取和写入
    extract_and_write_data(args.date, output_csv_path, columns_to_extract)

然后在 Zeppelin 中创建一个 Python 笔记本(Notebook),其中包含被 Airflow DAG 调用的代码。加载先前从 output/extracted_data.csv 文件中提取的数据:

%python

# 导入必要的库
import pandas as pd

# 加载先前从 CSV 文件中提取的数据
csv_file_path = "/home/works/output/extracted_data.csv"
# 读取 CSV 文件
df = pd.read_csv(csv_file_path)

# 过滤掉 column1 为空的行
df = df[df['column1'].notnull()]

# 去重,以 column2、column3 字段为联合去重依据
deduplicated_df = df.drop_duplicates(subset=["column2", "column3"])

# 保存去重后的结果到新的 CSV 文件
deduplicated_df.to_csv("/home/works/output/dd_data.csv", index=False)

将这个 Zeppelin 笔记本保存,并记住笔记本的paragraph ID, Airflow DAG 需要使用这个 ID 来调用 Zeppelin 笔记本。

接下来,用VSCode编写zeppelin_integration.py代码如下,上传到$AIRFLOW_HOME/dags目录下:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'zeppelin_integration',
    default_args=default_args,
    schedule=timedelta(days=1),
)

extract_data_task = BashOperator(
    task_id='extract_data',
    bash_command='python /home/works/z/extract_data_script.py --date {{ ds }}',
    dag=dag,
)


run_zeppelin_notebook_task = BashOperator(
    task_id='run_zeppelin_notebook',
    bash_command='curl -X POST -HContent-Type:application/json http://IP:PORT/api/notebook/run/2JND7T68E/paragraph_1705372327640_1111015359',
    dag=dag,
)


# Set the task dependencies
extract_data_task >> run_zeppelin_notebook_task

2.加载、执行DAG

如下命令进行测试,先执行下代码看看语法是否都正确,然后list出tasks,并逐一test:

# python zeppelin_integration.py 

# airflow tasks list zeppelin_integration
extract_data
run_zeppelin_notebook

# airflow tasks test zeppelin_integration extract_data 20240122
[2024-01-22T08:57:45.805+0800] {dagbag.py:538} INFO - Filling up the DagBag from /root/airflow/dags
[2024-01-22T08:57:47.853+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: zeppelin_integration.extract_data __airflow_temporary_run_2024-01-22T00:57:47.740537+00:00__ [None]>
[2024-01-22T08:57:47.860+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: zeppelin_integration.extract_data __airflow_temporary_run_2024-01-22T00:57:47.740537+00:00__ [None]>
[2024-01-22T08:57:47.861+0800] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-22T08:57:47.861+0800] {taskinstance.py:2250} WARNING - cannot record queued_duration for task extract_data because previous state change time has not been saved
[2024-01-22T08:57:47.862+0800] {taskinstance.py:2192} INFO - Executing <Task(BashOperator): extract_data> on 2024-01-20T00:00:00+00:00
[2024-01-22T08:57:47.900+0800] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='zeppelin_integration' AIRFLOW_CTX_TASK_ID='extract_data' AIRFLOW_CTX_EXECUTION_DATE='2024-01-20T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-01-22T00:57:47.740537+00:00__'
[2024-01-22T08:57:47.904+0800] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-01-22T08:57:47.905+0800] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'python /home/works/z/extract_data_script.py --date 2024-01-20']
[2024-01-22T08:57:47.914+0800] {subprocess.py:86} INFO - Output:
[2024-01-22T08:57:48.553+0800] {subprocess.py:97} INFO - Command exited with return code 0
[2024-01-22T08:57:48.632+0800] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=zeppelin_integration, task_id=extract_data, execution_date=20240120T000000, start_date=, end_date=20240122T005748

# airflow tasks test zeppelin_integration run_zeppelin_notebook 20240122
[2024-01-22T09:01:43.665+0800] {dagbag.py:538} INFO - Filling up the DagBag from /root/airflow/dags
[2024-01-22T09:01:45.835+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: zeppelin_integration.run_zeppelin_notebook __airflow_temporary_run_2024-01-22T01:01:45.733341+00:00__ [None]>
[2024-01-22T09:01:45.843+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: zeppelin_integration.run_zeppelin_notebook __airflow_temporary_run_2024-01-22T01:01:45.733341+00:00__ [None]>
[2024-01-22T09:01:45.844+0800] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-22T09:01:45.844+0800] {taskinstance.py:2250} WARNING - cannot record queued_duration for task run_zeppelin_notebook because previous state change time has not been saved
[2024-01-22T09:01:45.845+0800] {taskinstance.py:2192} INFO - Executing <Task(BashOperator): run_zeppelin_notebook> on 2024-01-22T00:00:00+00:00
[2024-01-22T09:01:45.904+0800] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='zeppelin_integration' AIRFLOW_CTX_TASK_ID='run_zeppelin_notebook' AIRFLOW_CTX_EXECUTION_DATE='2024-01-22T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-01-22T01:01:45.733341+00:00__'
[2024-01-22T09:01:45.909+0800] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-01-22T09:01:45.910+0800] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'curl -X POST -HContent-Type:application/json http://100.100.30.220:8181/api/notebook/run/2JND7T68E/paragraph_1705372327640_1111015359']
[2024-01-22T09:01:45.921+0800] {subprocess.py:86} INFO - Output:
[2024-01-22T09:01:45.931+0800] {subprocess.py:93} INFO -   % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
[2024-01-22T09:01:45.931+0800] {subprocess.py:93} INFO -                                  Dload  Upload   Total   Spent    Left  Speed
100    50  100    50    0     0      8      0  0:00:06  0:00:06 --:--:--    12
[2024-01-22T09:01:52.003+0800] {subprocess.py:93} INFO - {"status":"OK","body":{"code":"SUCCESS","msg":[]}}
[2024-01-22T09:01:52.003+0800] {subprocess.py:97} INFO - Command exited with return code 0
[2024-01-22T09:01:52.098+0800] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=zeppelin_integration, task_id=run_zeppelin_notebook, execution_date=20240122T000000, start_date=, end_date=20240122T010152

最后用命令airflow scheduler将它添加到airflow里。

# airflow scheduler
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2024-01-22T09:28:21.829+0800] {task_context_logger.py:63} INFO - Task context logging is enabled
[2024-01-22T09:28:21.831+0800] {executor_loader.py:115} INFO - Loaded executor: SequentialExecutor
[2024-01-22T09:28:21.868+0800] {scheduler_job_runner.py:808} INFO - Starting the scheduler
[2024-01-22T09:28:21.869+0800] {scheduler_job_runner.py:815} INFO - Processing each file at most -1 times
。。。

页面上会增加一个DAG,如图:
在这里插入图片描述
在Actions里可以点击执行。


总结

以上就是今天要讲的内容,总体来说集成两个工具还是很方便的,期待后面更多的应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1404944.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DAY07_SpringBoot—用法整合MyBatis

目录 1 SpringBoot 用法1.1 环境切换1.1.1 业务需求1.1.2 多环境编辑 1.2 热部署1.2.1 需求说明1.2.2 引入jar包1.2.3 配置IDEA环境 2 SpringBoot整合Mybatis2.1 导入数据库2.2 创建SpringBoot项目2.2.1 创建项目2.2.2 生成POM.xml文件如下2.2.3 Mavenjar包作用范围2.2.4 数据源…

面试经验分享 | 工控安全研究员

本文由掌控安全学院 - 徐浩洋 投稿 笔试 进程注入有几种简单描述一下? Windows异常分发流程。 Windows下反调试和反反调试的手段。 使用分页机制下虚拟地址怎么翻译为物理地址。 符号延时加载(DeferredSymbolLoading)的原理? 漏洞触发流程是怎么样的,怎么定位触发点如何进行…

服务器数据恢复—服务器进水导致阵列中磁盘同时掉线的数据恢复案例

服务器数据恢复环境&#xff1a; 数台服务器数台存储阵列柜&#xff0c;共上百块硬盘&#xff0c;划分了数十组lun。 服务器故障&检测&#xff1a; 外部因素导致服务器进水&#xff0c;进水服务器中一组阵列内的所有硬盘同时掉线。 北亚数据恢复工程师到达现场后发现机房内…

OpenMP和MPI环境配置

目录 OpenMP和MPI环境配置OpenMP环境配置MPI环境配置&#xff08;Windows&#xff09;MPI环境配置&#xff08;Ubuntu&#xff09; 参考资料 OpenMP和MPI环境配置 OpenMP环境配置 首先创建项目&#xff0c;选择C 控制台应用。 选择项目&#xff0c;属性。 在C/C —— 语言中&…

老师家访的目的是什么

家访&#xff0c;对于老师来说&#xff0c;是工作中必不可少的一部分。有人会问&#xff0c;老师家访的目的是什么呢&#xff1f;是为了了解学生的家庭情况&#xff0c;还是为了和家长建立良好的沟通关系&#xff1f;其实&#xff0c;老师家访的目的远不止于此。 老师家访是为…

【MySQL】计算日期是当前月份的第几周

力扣题 1、题目地址 2993. 发生在周五的交易 I 2、模拟表 表&#xff1a;Purchases Column NameTypeuser_idintpurchase_datedateamount_spendint (user_id, purchase_date, amount_spend) 是该表的主键(具有唯一值的列)。purchase_date 的范围从 2023 年 11 月 1 日到 2…

对称图做法,全程动图演示

最终效果&#xff1a; 实现步骤&#xff1a; 其他图形画法&#xff1a; 点线对比图做法&#xff0c;全程动图演示 气泡图做法&#xff0c;全程动图演示 重叠柱状图做法&#xff0c;全程动图演示 瀑布图做法&#xff0c;全程动图演示 对称图做法&#xff0c;全程动图演示

轻松上手:通过阿里云PAI QuickStart微调部署Qwen-72B-Chat模型

作者&#xff1a;熊兮、求伯、一耘 引言 通义千问-72B&#xff08;Qwen-72B&#xff09;是阿里云研发的通义千问大模型系列的720亿参数规模模型。Qwen-72B的预训练数据类型多样、覆盖广泛&#xff0c;包括大量网络文本、专业书籍、代码等。Qwen-72B-Chat是在Qwen-72B的基础上…

第一篇【传奇开心果系列】beeware的toga开发移动应用:轮盘抽奖移动应用

系列博文目录 beeware的toga开发移动应用示例系列博文目录一、项目目标二、开发传奇开心果轮盘抽奖安卓应用编程思路三、传奇开心果轮盘抽奖安卓应用示例代码四、补充抽奖逻辑实现五、开发传奇开心果轮盘抽奖苹果手机应用编程思路六、开发传奇开心果轮盘抽奖苹果手机应用示例代…

kafka集群和Filebeat+Kafka+ELK

一、Kafka 概述 1.1 为什么需要消息队列&#xff08;MQ&#xff09; 主要原因是由于在高并发环境下&#xff0c;同步请求来不及处理&#xff0c;请求往往会发生阻塞。比如大量的请求并发访问数据库&#xff0c;导致行锁表锁&#xff0c;最后请求线程会堆积过多&#xff0c;从…

专业ScrumMaster(高级)- PSM II 认证班,Scrum.org认证PSM II官方认证班

课程简介 Scrum是目前运用最为广泛的敏捷开发方法&#xff0c;是一个轻量级的项目管理和产品研发管理框架&#xff0c;旨在最短时间内交付最大价值。根据2022年全球敏捷状态报告&#xff0c;Scrum的应用占比已经达到87%。 Scrum.org 由 Scrum 的联合创始人 Ken Schwaber 创立…

[亲测有效]CentOS7下安装mysql5.7

前言 近期项目需要搭配mysql一起存储相关数据&#xff0c;但对mysql的版本有要求&#xff0c;于是在服务器搭建了mysql5.7&#xff0c;顺便记录一下搭建步骤和踩坑解决步骤。 目录 前言 一、清除旧安装包 二、安装YUM 三、使用yum命令即可完成安装 四、重新设置密码 五、…

C#winform上位机开发学习笔记5-串口助手的定时发送功能添加

1.功能描述 选择自动发送功能后&#xff0c;按照设定的发送时间发送发送框中的信息数据&#xff0c;设定时间可以手动输入&#xff0c;当手动输入信息无效&#xff08;非数字&#xff09;时&#xff0c;系统弹出错误提示&#xff0c;并将其设置为默认定时时间。 2.代码部分 步…

【Gene Expression Prediction】Part3 Deep Learning in Gene Expression Analysis

文章目录 6 第二个讲座&#xff1a;Deep Learning in Gene Expression Analysis6.1 Introduction6.2 D-GEX6.2.1 Connectivity map project6.2.2 Predicting gene expression from landmark genes 6.3 Deep generative models for genomics6.3.1 Manifold hypothesis6.3.2 Auto…

【C++】Qt:QCustomPlot图表绘制库配置与示例

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍QCustomPlot图表绘制库配置与示例。 学其所用&#xff0c;用其所学。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下&#xff0c;下次…

2024/1/17 DFS BFS + Div 3 a,b

目录 Lake Counting S 求细胞数量 海战 组合的输出 div3 A. Square div3 B. Arranging Cats Lake Counting S P1596 [USACO10OCT] Lake Counting S - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 感谢大佬的指点&#xff01;&#xff01;&#xff01;&#xff01; 思…

k8s使用ingress实现应用的灰度发布升级

v1是1.14.0版本nginx ,实操时候升级到v2是1.20.0版本nginx&#xff0c;来测试灰度发布实现过程 一、方案&#xff1a;使用ingress实现应用的灰度发布 1、服务端&#xff1a;正常版本v1&#xff0c;灰度升级版本v2 2、客户端&#xff1a;带有请求头versionv2标识的请求访问版…

老套的购物车效果

一、项目文件结构 二、各个文件内容 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compatible" content"IEedge" /><meta name"viewport…

【C语言进阶】预处理详解

引言 对预处理的相关知识进行详细的介绍 ✨ 猪巴戒&#xff1a;个人主页✨ 所属专栏&#xff1a;《C语言进阶》 &#x1f388;跟着猪巴戒&#xff0c;一起学习C语言&#x1f388; 目录 引言 预定义符号 #define定义常量 #define定义宏 带有副作用的宏参数 宏替换的规则 …

算法基础学习|双指针算法

双指针算法 代码模板 for (int i 0, j 0; i < n; i ){while (j < i && check(i, j)) j ;// 具体问题的逻辑 } 常见问题分类&#xff1a;(1) 对于一个序列&#xff0c;用两个指针维护一段区间(2) 对于两个序列&#xff0c;维护某种次序&#xff0c;比如归并…