将 OneLake 数据索引到 Elasticsearch - 第二部分

news2025/1/26 16:29:59

作者:来自 Elastic Gustavo Llermaly 及 Jeffrey Rengifo

本文分为两部分,第二部分介绍如何使用自定义连接器将 OneLake 数据索引并搜索到 Elastic 中。

在本文中,我们将利用第 1 部分中学到的知识来创建 OneLake 自定义 Elasticsearch 连接器。

我们已经上传了一些 OneLake 文档并将其索引到 Elasticsearch 中以供搜索。但是,这仅适用于一次性上传。如果我们想要同步数据,那么我们需要开发一个更复杂的系统。

幸运的是,Elastic 有一个连接器框架可用于开发满足我们需求的自定义连接器:

我们现在将根据本文制作一个 OneLake 连接器:如何为 Elasticsearch 创建自定义连接器。

步骤

  • 连接器引导
  • 实现 BaseDataSource 类
  • 身份验证
  • 运行连接器
  • 配置计划

连接器引导

背景信息:Elastic 连接器分为两种类型:

  1. Elastic 托管连接器:完全由 Elastic Cloud 托管和运行。
  2. 自托管连接器:由用户自行托管,必须部署在你的基础设施中。

自定义连接器属于 “连接器客户端” 类别,因此我们需要下载并部署连接器框架。

首先,克隆连接器的代码库:

git clone https://github.com/elastic/connectors

现在在 requirements/framework.txt 文件末尾添加你将使用的依赖项。在本例中:

azure-identity==1.19.0
azure-storage-file-datalake==12.17.0

这样,存储库就完成了,我们可以开始编码了。

实现 BaseDataSource 类

你可以在此存储库中找到完整的工作代码。

我们将介绍 onelake.py 文件中的核心部分。

在导入和类声明之后,我们必须定义将捕获配置参数的 __init__ 方法。

"""OneLake connector to retrieve data from datalakes"""

from functools import partial

from azure.identity import ClientSecretCredential
from azure.storage.filedatalake import DataLakeServiceClient

from connectors.source import BaseDataSource

ACCOUNT_NAME = "onelake"


class OneLakeDataSource(BaseDataSource):
    """OneLake"""

    name = "OneLake"
    service_type = "onelake"
    incremental_sync_enabled = True

    # Here we can enter the data that we'll later need to connect our connector to OneLake.
    def __init__(self, configuration):
        """Set up the connection to the azure base client

        Args:
            configuration (DataSourceConfiguration): Object of DataSourceConfiguration class.
        """
        super().__init__(configuration=configuration)
        self.tenant_id = self.configuration["tenant_id"]
        self.client_id = self.configuration["client_id"]
        self.client_secret = self.configuration["client_secret"]
        self.workspace_name = self.configuration["workspace_name"]
        self.data_path = self.configuration["data_path"]

然后,你可以配置 UI 将显示的表单,使用返回配置字典的 get_default_configuration 方法填充这些参数。

    # Method to generate the Enterprise Search UI fields for the variables we need to connect to OneLake.
    @classmethod
    def get_default_configuration(cls):
        """Get the default configuration for OneLake

        Returns:
            dictionary: Default configuration
        """
        return {
            "tenant_id": {
                "label": "OneLake tenant id",
                "order": 1,
                "type": "str",
            },
            "client_id": {
                "label": "OneLake client id",
                "order": 2,
                "type": "str",
            },
            "client_secret": {
                "label": "OneLake client secret",
                "order": 3,
                "type": "str",
                "sensitive": True, # To hide sensitive data like passwords or secrets
            },
            "workspace_name": {
                "label": "OneLake workspace name",
                "order": 4,
                "type": "str",
            },
            "data_path": {
                "label": "OneLake data path",
                "tooltip": "Path in format <DataLake>.Lakehouse/files/<Folder path>",
                "order": 5,
                "type": "str",
            },
            "account_name": {
                "tooltip": "In the most cases is 'onelake'",
                "default_value": ACCOUNT_NAME,
                "label": "Account name",
                "order": 6,
                "type": "str",
            },
        }

然后我们配置下载方法,并从 OneLake 文档中提取内容。

async def download_file(self, file_client):
        """Download file from OneLake

        Args:
            file_client (obj): File client

        Returns:
            generator: File stream
        """

        try:
            download = file_client.download_file()
            stream = download.chunks()

            for chunk in stream:
                yield chunk
        except Exception as e:
            self._logger.error(f"Error while downloading file: {e}")
            raise

    async def get_content(self, file_name, doit=None, timestamp=None):
        """Obtains the file content for the specified file in `file_name`.

        Args:
            file_name (obj): The file name to process to obtain the content.
            timestamp (timestamp, optional): Timestamp of blob last modified. Defaults to None.
            doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.

        Returns:
            str: Content of the file or None if not applicable.
        """

        if not doit:
            return

        file_client = await self._get_file_client(file_name)
        file_properties = file_client.get_file_properties()
        file_extension = self.get_file_extension(file_name)

        doc = {
            "_id": f"{file_client.file_system_name}_{file_properties.name}",  # workspacename_data_path
            "name": file_properties.name.split("/")[-1],
            "_timestamp": file_properties.last_modified,
            "created_at": file_properties.creation_time,
        }

        can_be_downloaded = self.can_file_be_downloaded(
            file_extension=file_extension,
            filename=file_properties.name,
            file_size=file_properties.size,
        )

        if not can_be_downloaded:
            return doc

        extracted_doc = await self.download_and_extract_file(
            doc=doc,
            source_filename=file_properties.name.split("/")[-1],
            file_extension=file_extension,
            download_func=partial(self.download_file, file_client),
        )

        return extracted_doc if extracted_doc is not None else doc

为了让我们的连接器对框架可见,我们需要在 connectors/config.py 文件中声明它。为此,我们将以下代码添加到源中:

 "sources": {
   ...
    "onelake": "connectors.sources.onelake:OneLakeDataSource",
    ...
 }

身份验证

在测试连接器之前,我们需要获取 client_id, tenant_id 和 client_secret,我们将使用它们从连接器访问工作区。

我们将使用 service principals 作为身份验证方法。

Azure service principal 是为与应用程序、托管服务和自动化工具一起使用以访问 Azure 资源而创建的身份。

步骤如下:

  1. 创建应用程序并收集 client_id、tenant_id 和 client_secret
  2. 在工作区中启用 service principal
  3. 将 service principal 添加到工作区

你可以逐步遵循本教程。

准备好了吗?现在是测试连接器的时候了!

运行连接器

连接器准备好后,我们现在可以连接到我们的 Elasticsearch 实例。

转到: Search > Content > Connectors > New connector 并选择 Customized Connector

选择要创建的名称,然后选择 “Create and attach an index” 以创建与连接器同名的新索引。

你现在可以使用 Docker 运行它或从源代码运行它。在此示例中,我们将使用 “Run from source”。

单击 “Generate Configuration”,然后将框中的内容粘贴到项目根目录中的 config.yml 文件中。在字段 service_type 上,你必须匹配 Connectors/config.py 中的连接器名称。在本例中,将 changeme 替换为 onelake。

现在,你可以使用以下命令运行连接器:

make install
make run

如果连接器正确初始化,你应该在控制台中看到如下消息:

注意:如果出现兼容性错误,请检查你的连接器/版本文件并与你的 Elasticsearch 集群版本进行比较:与 Elasticsearch 的版本兼容性。我们建议保持连接器版本和 Elasticsearch 版本同步。在本文中,我们使用 Elasticsearch 和连接器版本 8.15。

如果一切顺利,我们的本地连接器将与我们的 Elasticsearch 集群通信,我们将能够使用我们的 OneLake 凭据对其进行配置:

我们现在将索引来自 OneLake 的文档。为此,请单击 Sync > Full Content,运行完整内容同步:

同步完成后,你应该在控制台中看到以下内容:

在企业搜索 UI 中,你可以单击 “Documents” 来查看已索引的文档:

配置计划

你可以根据需要使用 UI 安排定期内容同步,以使索引保持更新并与 OneLake 同步。

要配置计划同步,请转到 “Search > Content > Connectors,然后选择你的连接器。然后单击 “scheduling”:

或者,你可以使用允许 CRON 表达式的更新连接器调度 API。

结论

在第二部分中,我们通过使用 Elastic 连接器框架并开发我们自己的 OneLake 连接器来轻松与我们的 Elastic Cloud 实例通信,将我们的配置更进一步。

想要获得 Elastic 认证?了解下一次 Elasticsearch 工程师培训何时开始!

Elasticsearch 包含新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在你的本地机器上试用 Elastic。

原文:Indexing OneLake data into Elasticsearch - Part II - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2283530.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android Studio:视图绑定的岁月变迁(2/100)

一、博文导读 本文是基于Android Studio真实项目&#xff0c;通过解析源码了解真实应用场景&#xff0c;写文的视角和读者是同步的&#xff0c;想到看到写到&#xff0c;没有上帝视角。 前期回顾&#xff0c;本文是第二期。 private Unbinder mUnbinder; 只是声明了一个 接口…

私有包上传maven私有仓库nexus-2.9.2

一、上传 二、获取相应文件 三、最后修改自己的pom文件

Qt监控系统辅屏预览/可以同时打开4个屏幕预览/支持5x64通道预览/onvif和rtsp接入/性能好

一、前言说明 在监控系统中&#xff0c;一般主界面肯定带了多个通道比如16/64通道的画面预览&#xff0c;随着电脑性能的增强和多屏幕的发展&#xff0c;再加上现在监控摄像头数量的增加&#xff0c;越来越多的用户希望在不同的屏幕预览不同的实时画面&#xff0c;一个办法是打…

LabVIEW心音心电同步采集与实时播放

开发了一个基于LabVIEW开发的心音心电同步采集与实时播放系统。该系统可以同时采集心音和心电信号&#xff0c;并通过LabVIEW的高级功能实现这些信号的实时显示和播放。系统提升心脏疾病诊断的准确性和效率&#xff0c;使医生能够在观察心音图的同时进行听诊。 ​ 项目背景 心…

深入解析ncnn::Net类——高效部署神经网络的核心组件

最近在学习ncnn推理框架&#xff0c;下面整理了ncnn::Net 的使用方法。 在移动端和嵌入式设备上进行高效的神经网络推理&#xff0c;要求框架具备轻量化、高性能以及灵活的扩展能力。作为腾讯开源的高性能神经网络推理框架&#xff0c;ncnn在这些方面表现出色。而在ncnn的核心…

前端【8】HTML+CSS+javascript实战项目----实现一个简单的待办事项列表 (To-Do List)

目录 一、功能需求 二、 HTML 三、CSS 四、js 1、绑定事件与初始设置 2.、绑定事项 &#xff08;1&#xff09;添加操作&#xff1a; &#xff08;2&#xff09;完成操作 &#xff08;3&#xff09;删除操作 &#xff08;4&#xff09;修改操作 3、完整js代码 总结…

程序诗篇里的灵动笔触:指针绘就数据的梦幻蓝图<1>

大家好啊&#xff0c;我是小象٩(๑ω๑)۶ 我的博客&#xff1a;Xiao Xiangζั͡ޓއއ 很高兴见到大家&#xff0c;希望能够和大家一起交流学习&#xff0c;共同进步。 这一节我们来学习指针的相关知识&#xff0c;学习内存和地址&#xff0c;指针变量和地址&#xff0c;包…

積分方程與簡單的泛函分析6.有連續對稱核的弗雷德霍姆積分算子的特徵值

1)def弗雷德霍姆算子的核函數定義 定义: 设 是定义在矩形区域 上的函数。 若满足以下条件,则称 为弗雷德霍姆算子的核函数: 实值性: 是实值函数,即对于任意的 ,。 这是因为在许多实际的物理和数学问题中,所涉及的量往往是实数值,例如在积分方程描述的物理模型中,…

AI编程工具使用技巧:在Visual Studio Code中高效利用阿里云通义灵码

AI编程工具使用技巧&#xff1a;在Visual Studio Code中高效利用阿里云通义灵码 前言一、通义灵码介绍1.1 通义灵码简介1.2 主要功能1.3 版本选择1.4 支持环境 二、Visual Studio Code介绍1.1 VS Code简介1.2 主要特点 三、安装VsCode3.1下载VsCode3.2.安装VsCode3.3 打开VsCod…

kettle与Springboot的集成方法,完整支持大数据组件

目录 概要整体架构流程技术名词解释技术细节小结 概要 在现代数据处理和ETL&#xff08;提取、转换、加载&#xff09;流程中&#xff0c;Kettle&#xff08;Pentaho Data Integration, PDI&#xff09;作为一种强大的开源ETL工具&#xff0c;被广泛应用于各种数据处理场景。…

GitHub Actions 使用需谨慎:深度剖析其痛点与替代方案

在持续集成与持续部署&#xff08;CI/CD&#xff09;领域&#xff0c;GitHub Actions 曾是众多开发者的热门选择&#xff0c;但如今&#xff0c;其弊端逐渐显现&#xff0c;让不少人在使用前不得不深思熟虑。 团队由大约 15 名工程师组成&#xff0c;采用基于主干的开发方式&am…

<iframe>标签和定时调用函数setInterval

iframe 标签和定时调用函数 setInterval 问题描述&#xff1a;解决方法&#xff1a; 问题描述&#xff1a; 今天遇到一个前端问题&#xff0c;在浏览器页面上传Excel文件后&#xff0c;然后点击导入按钮&#xff0c;经后端Java类读取文件内容校验后&#xff0c;将校验结果返回…

Qt——界面优化

一.QSS 1.背景 在网页前端开发领域中&#xff0c; CSS 是⼀个至关重要的部分。 描述了⼀个网页的 "样式"。 从而起到对网页美化的作用。 所谓样式&#xff0c;包括不限于大小&#xff0c;位置&#xff0c;颜色&#xff0c;背景&#xff0c;间距&#xff0c;字体等等…

java基础学习——jdbc基础知识详细介绍

引言 数据的存储 我们在开发 java 程序时&#xff0c;数据都是存储在内存中的&#xff0c;属于临时存储&#xff0c;当程序停止或重启时&#xff0c;内存中的数据就会丢失&#xff0c;我们为了解决数据的长期存储问题&#xff0c;有以下解决方案&#xff1a; 通过 IO流书记&…

CukeTest使用 | 1 CukeTest是什么?如何下载安装?

CukeTest使用 | 1 CukeTest是什么&#xff1f;如何下载安装&#xff1f; 1 CukeTest是什么&#xff1f;2 关于开发者3 CukeTest有哪些特性&#xff1f;4 都支持哪些自动化技术类型&#xff1f;5 版本区别6 下载安装 特殊说明&#xff1a;学习内容主要来自官网的教程、以及网上公…

An OpenGL Toolbox

3.An OpenGL Toolbox 声明&#xff1a;该代码来自&#xff1a;Computer Graphics Through OpenGL From Theory to Experiments&#xff0c;仅用作学习参考 3.1 Vertex Arrays and Their Drawing Commands 顶点数组及其绘制命令&#xff1a;将几何数据存储在一个位置&#xff0c…

R语言学习笔记之高效数据操作

一、概要 数据操作是R语言的一大优势&#xff0c;用户可以利用基本包或者拓展包在R语言中进行复杂的数据操作&#xff0c;包括排序、更新、分组汇总等。R数据操作包&#xff1a;data.table和tidyfst两个扩展包。 data.table是当前R中处理数据最快的工具&#xff0c;可以实现快…

Linux的权限和一些shell原理

目录 shell的原理 Linux权限 sudo命令提权 权限 文件的属性 ⽂件类型&#xff1a; 基本权限&#xff1a; chmod改权限 umask chown 该拥有者 chgrp 改所属组 最后&#xff1a; 目录权限 粘滞位 shell的原理 我们广义上的Linux系统 Linux内核Linux外壳 Linux严格…

LearnOpenGL——光照

教程地址&#xff1a;简介 - LearnOpenGL CN 前言 这篇开始光照的学习。 颜色 原文链接&#xff1a; 颜色 - LearnOpenGL CN总结&#xff1a; 重新搭建了一个简单场景&#xff0c;为后面的学习做准备。 现实世界中有无数种颜色&#xff0c;每一个物体都有它们自己的颜色。我…

步入响应式编程篇(二)之Reactor API

步入响应式编程篇&#xff08;二&#xff09;之Reactor API 前言回顾响应式编程Reactor API的使用Stream引入依赖Reactor API的使用流源头的创建 reactor api的背压模式发布者与订阅者使用的线程查看弹珠图查看形成新流的日志 前言 对于响应式编程的基于概念&#xff0c;以及J…