使用 Meltano 将数据从 Snowflake 导入到 Elasticsearch:开发者之旅

news2024/10/6 20:27:28

作者:来自 Elastic Dmitrii Burlutskii

在 Elastic 的搜索团队中,我们一直在探索不同的 ETL 工具以及如何利用它们将数据传输到 Elasticsearch,并在传输的数据上实现 AI 助力搜索。今天,我想与大家分享我们与 Meltano 生态系统以及 Meltano Elasticsearch 加载器的故事。

Meltano 是一个声明式的代码优先数据集成引擎,允许你在不同的存储之间同步数据。在 hub.meltano.com 上有许提取器 (extractors) 和加载器 (loaders) 可用。如果你的数据存储在 Snowflake 中,并且想要为你的客户构建一个开箱即用的搜索体验,你可能会考虑使用 Elasticsearch,在那里你可以基于你拥有的数据为客户构建语义搜索。今天,我们将重点介绍如何将数据从 Snowflake 同步到 Elasticsearch。

要求

Snowflake 账号。 你在注册后将收到以下所有账号信息,或者你可以从 Snowflake 面板中获取它们。

  1. 账户用户名
  2. 账户密码
  3. 账户标识符(查看此处的说明以获取它)

Snowflake 数据集

如果你创建了一个新的 Snowflake 账户,你将拥有用于实验的示例数据。

然而,我将使用一个公共空气质量数据集,其中包含二氧化氮(NO2)的测量数据。

Elastic 账号

访问 https://cloud.elastic.co 并注册账号。

点击 “Create deployment”。在弹出窗口中,你可以更改或保留默认设置。

一旦准备好部署,请点击 “Continue”(或点击 “Open Kibana”)。它将重定向你到 Kibana 仪表板。

转到 Stack Management -> Security -> API keys,并生成一个新的 API 密钥。

安装 Meltano

在我的示例中,我将使用 Meltano Python 包,但你也可以将其作为 Docker 容器安装。

pip install "meltano"

添加 Snowflake 提取器

meltano add extractor tap-snowflake --variant=meltanolabs

验证提取器

meltano invoke tap-snowflake --test

添加 Elasticsearch 加载器

meltano add loader target-elasticsearch

配置提取器和加载器:

有多种方法可以配置 Meltano 提取器和加载器:

  • 编辑 meltano.yml
  • 使用 CLI 命令,例如
meltano config {loader} set config_name config_value

使用 CLI 交互模式

meltano config {loader} set --interactive

我将使用交互模式。

要配置 Snowflake 提取器,请运行以下命令并至少提​​供帐户标识符、用户名、密码和数据库。

meltano config tap-snowflake set --interactive

你应该会看到以下屏幕,你可以在其中选择要配置的选项。

配置提取后,你可以测试连接。 只需运行以下命令:

配置 Elasticsearch 加载器并提供主机、端口、架构和 API 密钥,

meltano config target-elasticsearch set --interactive

如果你想更改索引名称,可以运行以下命令并更改它:

meltano config target-elasticsearch set index_format my-index-name
meltano config target-elasticsearch set index_format my-index-name

比如, 默认索引字符串定义为 ecs-{{ stream_name }}-{{ current_timestamp_daily}} ,结果为 ecs-animals-2022-12-25,其中流名称为 animals。

配置完所有内容后,我们就可以开始同步数据。

meltano run tap-snowflake target-elasticsearch

同步开始后,你可以转到 Kibana 并看到创建了一个新索引并且有一些索引文档。

你可以通过单击索引名称来查看文档。 你应该查看你的文件。

使用你的索引设置(或映射)

如果我们开始同步数据,加载器将自动创建一个具有动态映射的新索引,这意味着 Elasticsearch 将处理索引中的字段及其类型。 如果我们愿意,我们可以通过提前创建索引并应用我们需要的设置来更改此行为。 咱们试试吧。

导航到 Kibana -> DevTools 并运行以下命令:

创建新的摄入管道

PUT _ingest/pipeline/drop-values-10
{
  "processors": [
    {
      "drop": {
      "description": "Drop documents with the value < 10",
      "if": "ctx.datavalue < 10"
      }
    }
  ]
}

这将删除 datavalue < 10 的所有文档。

创建新索引

PUT my-snowflake-data

应用索引设置

PUT my-snowflake-data/_settings
{
  "index": {
    "default_pipeline": "_ingest/pipeline/drop-values-10"
  }
}

更改 Meltano 中的索引名称

meltano config target-elasticsearch set index_format my-snowflake-data

开始同步作业

meltano run tap-snowflake target-elasticsearch

工作完成后,你可以看到索引中的文档比我们之前创建的要少

结论

我们已经成功地将数据从 Snowflake 同步到 Elastic Cloud。我们让 Meltano 为我们创建了一个新索引,并负责索引映射,我们将数据同步到了一个具有预定义管道的现有索引中。

我想强调在我旅程中记下的一些关键点:

Elasticsearch 加载器(Meltano Hub 上的页面)

  • 它尚未准备好处理大量的数据。你需要调整默认的 Elasticsearch 配置,使其更加健壮。我已经提交了一个 Pull Request,以暴露 “request_timeout” 和 “retry_on_timeout” 选项,这将会有所帮助。
  • 它使用 Elasticsearch Python 客户端的 8.x 分支,因此你可以确保它支持最新的 Elasticsearch 功能。
  • 它同步发送数据(不使用 Python AsyncIO),因此当您需要传输大量数据时可能会相当慢。

Meltano CLI

  • 它非常棒。你不需要 UI,所以一切都可以在终端中配置,这为工程师提供了大量的自动化选项。
  • 你可以仅通过一个命令即可运行按需同步。不需要其他正在运行的服务。

复制/增量同步

  • 如果你的管道需要数据复制或增量同步,你可以访问这个页面信息。

另外,我想提一下 Meltano Hub 真的很棒。它易于导航并找到你需要的内容。此外,你可以通过查看有多少客户使用它们来轻松比较不同的加载器或抽取器。

如果你对构建基于 AI 的应用程序感兴趣,请在以下博客文章中查找更多信息:

  • 在你的数据集上实现全文和语义搜索能力。
  • 连接你的数据与 LLMs,构建问题 - 答案。
  • 构建一个使用检索增强生成(RAG)模式的聊天机器人。

准备将 RAG 构建到你的应用中了吗?想要尝试不同的 LLMs 与向量数据库吗? 查看我们在 Github 上关于 LangChain、Cohere 等的示例 notebooks,并加入即将开始的 Elasticsearch 工程师培训!

原文:Ingest Data from Snowflake to Elasticsearch using Meltano: A developer’s journey — Elastic Search Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1579228.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

矩阵链乘法问题

描述 输入 输入共n1行 第一行输入矩阵的总个数n[2,1000] 后n行分别输入矩阵的维数[1,100] 输出 最后一行输出少乘法次数 输入样例 1 6 30 35 35 15 15 5 5 10 10 20 20 25 输出样例1 15125 代码实现 #include<iostream> #include<vector> #include<…

设计模式之观察者模式讲解

概念&#xff1a;定义对象间一种一对多的依赖关系&#xff0c;使得当每一个对象改变状态&#xff0c;则所有依赖于它的对象都会得到通知并被自动更新。 抽象主题&#xff1a;或者叫被观察者&#xff0c;可以持有、增加、删除观察者对象。具体主题&#xff1a;实现抽象主题定义的…

yolov5旋转目标检测遥感图像检测-无人机旋转目标检测(代码和原理)

YOLOv5&#xff08;You Only Look Once version 5&#xff09;是一个流行且高效的实时目标检测深度学习模型&#xff0c;最初设计用于处理图像中的水平矩形边界框目标。然而&#xff0c;对于旋转目标检测&#xff0c;通常需要对原始YOLOv5架构进行扩展或修改&#xff0c;以便能…

【经典算法】LCR187:破冰游戏(约瑟夫问题,Java/C/Python3/JavaScript实现含注释说明,Easy)

目录 题目思路及实现方式一&#xff1a;迭代模拟&#xff08;用链表模拟这个游戏&#xff09;思路代码实现Java版本C语言版本Python3版本 复杂度分析 方式二&#xff1a;数学迭代思路代码实现Java版本C语言版本Python3版本 复杂度分析 方式三&#xff1a;递归思路代码实现Java版…

数字化智慧养老:引领老年人融入科技时代新生活

hello宝子们...我们是艾斯视觉擅长ui设计和前端开发10年经验&#xff01;希望我的分享能帮助到您&#xff01;如需帮助可以评论关注私信我们一起探讨&#xff01;致敬感谢感恩&#xff01; 人类社会已经步入了一个全新的数字时代。在这个时代&#xff0c;互联网、大数据、人工智…

学习操作系统之单道批处理系统

较之前操作的改进&#xff1a; 在原先的工作基础上&#xff0c;扩大存储&#xff0c;一次放入多个作业再进行处理。 单道&#xff1a;内存中始终只有一道作业 批处理&#xff1a;磁带上有多道作业&#xff0c;安装一次磁带&#xff0c;可以处理一批作业 1953年诞生了第一代…

【C语言】指针篇(指针数组,数组指针,函数指针,一级、二级指针)

文章目录 一、指针基础1.什么是指针2.指针的定义和初始化3.指针的解引用4.野指针和空指针5.指针的类型6.指针的大小7.指针的运算8.指针和数组9.指针和字符串10.二级指针 二、指针数组和数组指针1.指针数组2.数组指针3.练习 三、数组传参和指针传参1.一维数组传参2.二维数组传参…

开源区块链系统/技术 总结(欢迎补充,最新)

1. FISCO BCOS FISCO BCOS 2.0 技术文档 — FISCO BCOS 2.0 v2.9.0 文档https://fisco-bcos-documentation.readthedocs.io/ 2. ChainMaker&#xff08;长安链&#xff09; 文档导航 — chainmaker-docs v2.3.2 documentationhttps://docs.chainmaker.org.cn/v2.3.2/html/in…

你们是如何保证消息不丢失的?

1、什么是死信 在 RabbitMQ 中充当主角的就是消息&#xff0c;在不同场景下&#xff0c;消息会有不同地表现。 死信就是消息在特定场景下的一种表现形式&#xff0c;这些场景包括&#xff1a; 1. 消息被拒绝访问&#xff0c;即 RabbitMQ返回 basicNack 的信号时 或者拒绝basi…

CKA 基础操作教程(五)

Kubernetes Ingress 理论学习 Ingress 提供从集群外部到集群内服务的 HTTP 和 HTTPS 路由。 流量路由由 Ingress 资源所定义的规则来控制。 Ingress 资源示例&#xff1a; apiVersion: networking.k8s.io/v1 # 指定 Kubernetes 中使用的 API 版本 kind: Ingress # 指定对象…

【日常记录】【JS】填充数组的三种方案

文章目录 1、for 循环填充2、new Array、fill、map 三者配合填充3、Array.from 填充数组参考链接 一般在开发中需要生成一个数组&#xff0c;用于测试等其他情况&#xff0c;以下介绍三种常见方案 1、for 循环填充 如果需要对这个数组的内容做一些特殊处理&#xff0c;写起来就…

Mysql底层原理七:InnoDB 行记录

1.行格式 1.1 Compact行格式 1.1.1 示意图 1.1.2 准备一下 1&#xff09;建表 mysql> CREATE TABLE record_format_demo (-> c1 VARCHAR(10),-> c2 VARCHAR(10) NOT NULL,-> c3 CHAR(10),-> c4 VARCHAR(10)-> ) CHARSETascii ROW_FORMATCOM…

企业网络安全运营能力的多维度评价及优化策略

网络安全是企业面临的一个日益重要的问题&#xff0c;安全运营能力的强弱直接关系到企业的健康可持续发展和综合竞争力的提升。为推动企业网络安全工作的标准化建设&#xff0c;提升企业的网络安全运营能力&#xff0c;本文从安全建设、安全应对和安全效果三个角度出发&#xf…

【迅为iTOP-4412-linux 系统制作(4)】ADB 或者 TF 卡烧写测试

准备工作 编译生成的内核镜像uImage 和设备树 dtb 文件“exynos4412-itop-elite.dtb”已经可以使用了。 把编译生成的uimage和dtb文件。拷贝fastboot工具。官方的u-boot-iTOP-4412.bin 也拷贝到 platform-tools 文件夹目录内。system.img 也拷贝到 platform-tools 文件夹目录…

阿里通义千问开源 320 亿参数模型;文字和音频自动翻译成手语Hand Talk拉近人与人的距离

✨ 1: Qwen1.5-32B Qwen1.5-32B是Qwen1.5系列中性能与效率兼顾的最新语言模型&#xff0c;内存占用低&#xff0c;运行速度快。 Qwen1.5-32B是Qwen1.5语言模型系列的最新成员&#xff0c;这个模型是基于先进的技术研发的&#xff0c;旨在提供一种既高效又经济的AI语言理解和生…

JS--demo2录入学生信息

实现学生信息录取。 效果图: <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><meta http-equiv"X-U…

景联文科技:为AI大模型提供高质海量训练数据

在全球AI浪潮的推动下&#xff0c;大量训练数据已成为AI算法模型发展和演进中的关键一环。 艾瑞咨询数据显示&#xff0c;包括数据采集、数据处理&#xff08;标注&#xff09;、数据存储、数据挖掘等模块在内的AI基础数据服务市场&#xff0c;将在未来数年内持续增长。 预计到…

【LeetCode题解】2009. 使数组连续的最少操作数

文章目录 [2009. 使数组连续的最少操作数](https://leetcode.cn/problems/minimum-number-of-operations-to-make-array-continuous/)思路&#xff1a;一、排序去重滑动窗口代码&#xff1a; 2009. 使数组连续的最少操作数 思路&#xff1a;一、排序去重滑动窗口 1.对数组进行…

SpringBoot + Dobbo + nacos

SpringBoot Dobbo nacos 一、nacos https://nacos.io/zh-cn/docs/quick-start.html 1、下载安装包 https://github.com/alibaba/nacos/releases/下载后在主目录下&#xff0c;创建一个logs的文件夹&#xff1a;用来存日志 2、启动nacos 在bin目录下打开cmd运行启动命令&a…

[StartingPoint][Tier2]Oopsie

Task 1 With what kind of tool can intercept web traffic? (哪种工具可以拦截web数据包) proxy Task 2 What is the path to the directory on the webserver that returns a login page? (路径到返回登录页面的 Web 服务器目录是什么&#xff1f;) /cdn-cgi/login Tas…