如何通过 Apache Airflow 将数据导入 Elasticsearch

news2025/4/14 2:19:29

作者:来自 Elastic Andre Luiz

了解如何通过 Apache Airflow 将数据导入 Elasticsearch。

Apache Airflow

Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load) 流程、数据管道和其他复杂工作流,提供灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更易于访问和高效,让你可以跟踪执行的进度和结果。以下是它的四个主要支柱:

  • 动态:管道以 Python 定义,允许动态灵活地生成工作流。
  • 可扩展:Airflow 可以与各种环境集成,可以创建自定义运算符,并可以根据需要执行特定代码。
  • 优雅:管道以干净明确的方式编写。
  • 可扩展:其模块化架构使用消息队列来编排任意数量的工作器。

在实践中,Airflow 可用于以下场景:

  • 数据导入:编排将数据每日提取到 Elasticsearch 等数据库中。
  • 日志监控:管理日志文件的收集和处理,然后在 Elasticsearch 中进行分析以识别错误或异常。
  • 多种数据源集成:将来自不同系统(API、数据库、文件)的信息合并到 Elasticsearch 中的单个层中,简化搜索和报告。

DAG:Directed Acyclic Graphs - 有向无环图

在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种定义任务执行顺序的结构。DAG 的主要特征是:

  • 由独立任务组成:每个任务代表一个工作单元,旨在独立执行。
  • 排序:任务的执行顺序在 DAG 中明确定义。
  • 可重用性:DAG 旨在重复执行,促进流程自动化。

Airflow 的主要组件

Airflow 生态系统由多个组件组成,它们共同协作以协调任务:

  • 调度程序 - scheduler:负责调度 DAG 并发送任务以供工作人员执行。
  • 执行器 - Exectutor:管理任务的执行,将其委托给工作人员。
  • Web 服务器 - Webserver:提供与 DAG 和任务交互的图形界面。
  • Dags 文件夹 - Dags folder:我们存储用 Python 编写的 DAG 的文件夹。
  • 元数据 - Metadata:作为工具存储库的数据库,由调度程序和执行器用于存储执行状态。

Apache Airflow 和 Elasticsearch

我们将演示如何使用 Apache Airflow 和 Elasticsearch 来协调任务并在 Elasticsearch 中索引结果。此演示的目标是创建一个任务管道来更新 Elasticsearch 索引中的记录。此索引包含电影数据库,用户可以在其中进行评分和分配评级。想象一个每天有数百个评级的场景,有必要保持评级记录更新。为此,将开发一个 DAG,它将每天执行,负责检索新的合并评级并更新索引中的记录。

在 DAG 流程中,我们将有一个获取评级的任务,然后是一个验证结果的任务。如果数据不存在,DAG 将被定向到失败任务。否则,数据将在 Elasticsearch 中编入索引。目标是通过一种带有负责计算分数的机制的方法检索评级,以更新索引中电影的评级字段。

使用 Apache Airflow 和 Elasticsearch 以及 Docker

要创建容器化环境,我们将使用 Apache Airflow 和 Docker。按照 “在 Docker 中运行 Airflow” 指南中的说明实际设置 Airflow。

至于 Elasticsearch,我将使用 Elastic Cloud 上的集群,但如果你愿意,也可以使用 Docker 配置 Elasticsearch。已经创建了一个包含电影目录的索引,其中电影数据已编入索引。这些电影的 “rating” 字段将被更新。

创建 DAG

通过 Docker 安装后,将创建一个文件夹结构,其中包括 dags 文件夹,我们必须将 DAG 文件放在该文件夹中,以便 Airflow 识别它们。

在此之前,我们需要确保安装了必要的依赖项。以下是此项目的依赖项:

pip install apache-airflow apache-airflow-providers-elasticsearch

我们将创建文件 update_ratings_movies.py 并开始编写任务代码。

现在,让我们导入必要的库:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

我们将使用 ElasticsearchPythonHook,这是一个通过抽象连接和使用外部 API 来简化 Airflow 和 Elasticsearch 集群之间集成的组件。

接下来,我们定义 DAG,并指定其主要参数:

  • dag_id:DAG 的名称。
  • start_date:DAG 的启动时间。
  • schedule:定义周期(在我们的例子中是每日)。
  • doc_md:将导入并显示在 Airflow 界面中的文档。

定义任务

现在,让我们定义 DAG 的任务。第一个任务将负责检索电影评级数据。我们将使用 PythonOperator,并将 task_id 设置为“get_movie_ratings”。python_callable 参数将调用负责获取 ratings 的函数。

get_ratings_operator = PythonOperator(
   task_id='get_movie_ratings',
   python_callable=get_movie_ratings_task
)

接下来,我们需要验证结果是否有效。为此,我们将使用带有 BranchPythonOperator 的条件。task_id 将为 “validate_result”,python_callable 将调用验证函数。op_args 参数将用于将上一个任务 “get_movie_ratings” 的结果传递给验证函数。

validate_result = BranchPythonOperator(
   task_id='validate_result',
   python_callable=validate_result,
   op_args=["{
  
  { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证成功,我们将从 “get_movie_ratings” 任务中获取数据并将其索引到 Elasticsearch 中。为此,我们将创建一个新任务 “index_movie_ratings”,它将使用 PythonOperator。op_args 参数将 “get_movie_ratings” 任务的结果传递给索引函数。

index_ratings_operator = PythonOperator(
   task_id='index_movie_ratings',
   python_callable=index_movie_ratings_task,
   op_args=["{
  
  { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证表明失败,DAG 将继续执行失败通知任务。在此示例中,我们只是打印一条消息,但在实际场景中,我们可以配置警报来通知失败。

failed_get_rating_operator = PythonOperator(
   task_id='failed_get_rating_operator',
   python_callable=lambda: print('Ratings were False, skipping indexing.')
)

最后,我们定义任务依赖关系,确保它们以正确的顺序执行:

get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

以下是我们 DAG 的完整代码:

"""
DAG update Rating Movies
"""
import ast
import random

from airflow import DAG
from datetime import datetime

from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook


def index_movie_ratings_task(movies):
   es_hook = ElasticsearchPythonHook(hosts=None,
                                     es_conn_args={
                                         "cloud_id": "cloud_id"
                                         "api_key": "api-key"
                                     })
   es_client = es_hook.get_conn
   actions = []
   for movie in ast.literal_eval(movies):
       actions.append(
           {
               "update": {
                   "_id": movie["id"],
                   "_index": "movies"
               }
           }
       )
       actions.append(
           {
               "doc": {
                   "rating": movie["rating"]
               },
               "doc_as_upsert": True
           }
       )
   result = es_client.bulk(operations=actions)
   print(f"Ingestion completed.")
   print(result)
   return True


def get_movie_ratings_task():
   movies = [
       {"id": i, "rating": round(random.uniform(1, 10), 1)}
       for i in range(1, 100)
   ]
   return movies

def validate_result(result):
   if not result:
       return 'failed_get_rating_operator'
   else:
       return 'index_movie_ratings'


with DAG(
       dag_id="update_ratings_movies_2024",
       start_date=datetime(2024, 12, 29),
       schedule="@daily",
       doc_md=__doc__,
):
   get_ratings_operator = PythonOperator(
       task_id='get_movie_ratings',
       python_callable=get_movie_ratings_task
   )

   validate_result = BranchPythonOperator(
       task_id='validate_result',
       python_callable=validate_result,
       op_args=["{
  
  { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"],
       provide_context=True
   )

   index_ratings_operator = PythonOperator(
       task_id='index_movie_ratings',
       python_callable=index_movie_ratings_task,
       op_args=["{
  
  { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
   )

   failed_get_rating_operator = PythonOperator(
       task_id='failed_get_rating_operator',
       python_callable=lambda: print('Ratings were False, skipping indexing.')
   )

get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

可视化 DAG 执行

在 Apache Airflow 界面中,我们可以可视化 DAG 的执行。只需转到 “DAG” 选项卡并找到你创建的 DAG 即可。

下面,我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行,我们可以访问每个任务的日志。请注意,在 index_movie_ratings 任务中,我们可以在索引中看到索引结果,并且它已成功完成。

在其他选项卡中,可以访问有关任务和 DAG 的其他信息,以协助分析和解决潜在问题。

结论

在本文中,我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据提取解决方案。我们展示了如何配置 DAG、定义负责检索、验证和索引电影数据的任务,以及如何在 Airflow 界面中监控和可视化这些任务的执行。

这种方法可以轻松适应不同类型的数据和工作流,使 Airflow 成为在各种场景中编排数据管道的有用工具。

参考资料:

Apache AirFlow

  • https://airflow.apache.org/

使用 Docker 安装 Apache Airflow

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

Elasticsearch Python Hook

  • https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html

Python 运算符

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时开始!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在吗的本地机器上试用 Elastic。

原文:How to ingest data to Elasticsearch through Apache Airflow - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2278922.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STM32 学习笔记【补充】(十)硬件I2C读写MPU6050

该系列为笔者在学习STM32过程(主线是江科大的视频)中的记录与发散思考。 初学难免有所纰漏、错误,还望大家不吝指正,感谢~ 一、I2C 外设简介 I2C(Inter-Integrated Circuit)是一种多主多从的串行通信协议…

QT信号槽 笔记

信号与槽就是QT中处理计算机外设响应的一种机制 比如敲击键盘、点击鼠标 // 举例: 代码: connect(ls,SIGNAL(sig_chifanla()),ww,SLOT(slot_quchifan())); connect(ls,SIGNAL(sig_chifanla()),zl,SLOT(slot_quchifan()));connect函数:这是…

【React】插槽渲染机制

目录 通过 children 属性结合条件渲染通过 children 和 slot 属性实现具名插槽通过 props 实现具名插槽 在 React 中,并没有直接类似于 Vue 中的“插槽”机制(slot)。但是,React 可以通过 props和 children 来实现类似插槽的功能…

openharmony电源管理子系统

电源管理子系统 简介目录使用说明相关仓 简介 电源管理子系统提供如下功能: 重启服务:系统重启和下电。系统电源管理服务:系统电源状态管理和休眠运行锁管理。显示相关的能耗调节:包括根据环境光调节背光亮度,和根…

数据库(中)11讲

用颜色、有否下划线对应! E-R图

图像去雾数据集的下载和预处理操作

前言 目前,因为要做对比实验,收集了一下去雾数据集,并且建立了一个数据集的预处理工程。 这是以前我写的一个小仓库,我决定还是把它用起来,下面将展示下载的路径和数据处理的方法。 下面的代码均可以在此找到。Auo…

STM32入门教程-示例程序(按键控制LED光敏传感器控制蜂鸣器)

1. LED Blink(闪烁) 代码主体包含:LED.c key.c main.c delay.c(延时防按键抖动) 程序代码如下(涉及RCC与GPIO两个外设): 1.使用RCC使能GPIO时钟 RCC_APB2PeriphClockC…

一本书揭秘程序员如何培养架构思维!

在程序员的职业规划中,成为软件架构师是一个非常有吸引力的选择。但是对于如何才能成为一名架构师,不少同学认为只要代码写得好,就能得到公司提拔,晋升为架构师。 还真不是这样的,如果不具备架构思维,即使…

Flink(十):DataStream API (七) 状态

1. 状态的定义 在 Apache Flink 中,状态(State) 是指在数据流处理过程中需要持久化和追踪的中间数据,它允许 Flink 在处理事件时保持上下文信息,从而支持复杂的流式计算任务,如聚合、窗口计算、联接等。状…

Vue2+OpenLayers实现点位拖拽功能(提供Gitee源码)

目录 一、案例截图 二、安装OpenLayers库 三、代码实现 3.1、初始化变量 3.2、创建一个点 3.3、将点添加到地图上 3.4、实现点位拖拽 3.5、完整代码 四、Gitee源码 一、案例截图 可以随意拖拽点位到你想要的位置 二、安装OpenLayers库 npm install ol 三、代码实现…

2024年博客之星年度评选—创作影响力评审入围名单公布

2024年博客之星活动地址https://www.csdn.net/blogstar2024 TOP 300 榜单排名 用户昵称博客主页 身份 认证 评分 原创 博文 评分 平均 质量分评分 互动数据评分 总分排名三掌柜666三掌柜666-CSDN博客1001002001005001wkd_007wkd_007-CSDN博客1001002001005002栗筝ihttps:/…

NVIDIA发布个人超算利器project digital,标志着ai元年的开启

上图NVIDIA公司创始人兼首席执行官 黄仁勋(Jensen Huang) 这些年被大家熟知的赛博朋克风格一直都是未来的代言词,可以承载人类记忆的芯片,甚至能独立思考的仿生人,现在,随着NVIDIA的project digital发布之后…

(一)afsim第三方库编译

注意:防止奇怪的问题,源码编译的路径最好不要有中文,请先检查各文件夹名 AFSIM版本 Version: 2.9 Plugin API Version: 11 软件环境 操作系统: Kylin V10 SP1 项目构建工具: cmake-3.26.0-linux-aarch6…

2025.1.17——三、SQLi regexp正则表达式|

题目来源:buuctf [NCTF2019]SQLi1 目录 一、打开靶机,整理信息 二、解题思路 step 1:正常注入 step 2:弄清关键字黑名单 1.目录扫描 2.bp爆破 step 3:根据过滤名单构造payload step 4:regexp正则注…

docker的数据卷与dockerfile自定义镜像

docker的数据卷与dockerfile自定义镜像 一. docker的数据卷数据卷容器 二. dockerfile自定义镜像2.1 dockerfile的命令格式镜像的操作命令add和copy的区别 容器启动的命令 2.2 run命令2.3 其它端口映射 三. 练习 一. docker的数据卷 容器于宿主机之间,或者容器和容…

【python_钉钉群发图片】

需求: **在钉钉群发图片,需要以图片的形式展示,如图所示:**但是目前影刀里面没有符合条件的指令 解决方法: 1、在钉钉开发者后台新建一个自建应用,发版,然后获取里面的appkey和appsecret&am…

新星杯-ESP32智能硬件开发--ESP32的I/O组成-系统中断矩阵

本博文内容导读📕🎉🔥 ESP32开发板的中断矩阵、功能描述与实现、相关API和示例程序进行介绍 ESP32中断矩阵将任一外部中断源单独分配到每个CPU的任一外部中断上,提供了强大的灵活性,能适应不同的应用需求。 ESP32中断主…

软路由系统iStoreOS 一键安装 docker compose

一键安装命令 大家好!今天我来分享一个快速安装 docker-compose 的方法。以下是我常用的命令,当前版本是 V2.32.4。如果你需要最新版本,可以查看获取docker compose最新版本号 部分,获取最新版本号后替换命令中的版本号即可。 w…

CSRF攻击XSS攻击

概述 ​在 HTML 中&#xff0c;<a>, <form>, <img>, <script>, <iframe>, <link> 等标签以及 Ajax 都可以指向一个资源地址&#xff0c;而所谓的跨域请求就是指&#xff1a;当前发起请求的域与该请求指向的资源所在的域不一样。这里的域指…

企业分类相似度筛选实战:基于规则与向量方法的对比分析

文章目录 企业表相似类别筛选实战项目背景介绍效果展示基于规则的效果基于向量相似的效果 说明相关文章推荐 企业表相似类别筛选实战 项目背景 在当下RAG&#xff08;检索增强生成&#xff09;技术应用不断发展的背景下&#xff0c;掌握文本相似算法不仅能够助力信息检索&…