如何为 Elasticsearch 创建自定义连接器

news2025/1/12 23:00:27

了解如何为 Elasticsearch 创建自定义连接器以简化数据摄取过程。

作者:JEDR BLASZYK

Elasticsearch 拥有一个摄取工具库,可以从多个来源获取数据。 但是,有时你的数据源可能与 Elastic 现有的提取工具不兼容。 在这种情况下,你可能需要创建自定义连接器以将数据与 Elasticsearch 连接。

在你的应用程序中使用 Elastic 连接器有多种原因。 例如,你可能想要:

  • 将数据从自定义或遗留应用程序引入 Elasticsearch
  • 为你的组织数据引入语义搜索
  • 从 PDF、MS Office 文档等文件中提取文本内容
  • 使用 Kibana UI 管理你的数据源(包括配置、过滤规则、设置定期同步计划规则)
  • 你想要在自己的基础设施上部署 Elastic 连接器(一些 Elastic 支持的连接器可作为 Elastic Cloud 中的本机连接器使用)

用于创建定制连接器的开放代码框架

如果创建你自己的连接器是满足你需求的解决方案,那么连接器框架将帮助你创建一个。 我们创建的框架是为了支持创建自定义连接器并帮助用户将独特的数据源连接到 Elasticsearch。 连接器的代码可在 GitHub 上找到,并且我们有可以帮助你入门的文档。

该框架设计简单且高性能。 它旨在对开发人员友好,因此它是开放代码且高度可定制的。 你创建的连接器可以在你自己的基础设施上进行自我管理。 目标是让开发人员能够轻松地将自己的数据源与 Elasticsearch 集成。

使用连接器框架之前你需要了解什么

该框架是用 async-python 编写的

有几门课程可以学习 async-python。 如果你需要推荐,我们认为这个 LinkedIn 学习课程非常好,但需要订阅。 我们喜欢的一个免费替代方案是这个。

为什么我们选择异步 Python?

摄取受 IO 限制(而非 CPU 限制),因此从资源利用的角度构建连接器时,异步编程是最佳方法。 在 I/O 密集型应用程序中,大部分时间都花在等待外部资源上,例如读取文件、发出网络请求或查询数据库。 在这些等待期间,传统的同步代码会阻塞整个程序,导致资源利用效率低下。

还有其他先决条件吗?

这不是先决条件。 在开始之前,绝对值得阅读《连接器开发人员指南》! 希望你觉得这个有用。

使用连接器框架构建定制连接器

入门很容易。 在与框架相关的术语中,我们将自定义连接器称为源。 我们通过创建一个新类来实现一个新的源,该类的职责是将文档从自定义数据源发送到Elasticsearch。

作为一种可选的入门方式,用户还可以查看此目录源 (directory source) 示例。 这是一个很好但基本的示例,可以帮助你了解如何编写自定义连接器。

步骤概要

一旦你知道要为其创建连接器的自定义数据源,以下是编写新源的步骤概述:

  • 在 connectors/sources 中添加模块或目录
  • 在 requirements.txt 中声明你的依赖项。 确保固定这些依赖项
  • 实现一个类,该类实现 connectors.source.BaseDataSource 中描述的方法
  • (可选,在为 repo 做出贡献时)在 connectors/sources/tests 中添加单元测试,覆盖率 +90%
  • 在源部分声明你的连接器 connectors/config.py
  • 就是这样。 我们完成了! 现在你应该能够运行连接器

在编写定制连接器之前你需要了解什么

为了使 Elasticsearch 用户能够获取数据并在该数据的基础上构建搜索体验,我们提供了一个轻量级的连接器协议。 该协议允许用户轻松获取数据、使用企业搜索功能来操作该数据并创建搜索体验,同时在 Kibana 中为他们提供无缝的用户体验。 为了与企业搜索兼容并充分利用 Kibana 中提供的连接器功能,连接器必须遵守该协议。

关于连接器协议你需要了解的内容

该文档页面提供了该协议的良好概述。 以下是你需要了解的内容:

  • 连接器和系统其他部分之间的所有通信都通过 Elasticsearch 索引异步进行
  • 连接器将其状态传达给 Elasticsearch 和 Kibana,以便用户可以为其提供配置并诊断任何问题
  • 这允许简单、开发人员友好的连接器部署。 connectors 服务是无状态的,并且不关心你的 Elastic 部署在哪里运行,只要它可以通过网络连接到它就可以正常工作。 该服务还具有容错能力,可以在重新启动或发生故障后在不同的主机上恢复操作。 一旦与 Elasticsearch 重新建立连接,它将继续正常运行。
  • 在底层,该协议使用 Elasticsearch 索引来跟踪连接器状态
    • .elastic-connectors 和 .elastic-connectors-sync-jobs (在上面链接的文档中描述)

托管自定义连接器的位置

连接器本身不依赖于 Elasticsearch,它可以托管在你自己的环境中

如果你有 Elasticsearch 部署,无论它是自我管理还是位于 Elastic Cloud 中:

  • 作为开发人员/公司,你可以为你的数据源编写自定义连接器
  • 在你自己的基础设施上管理连接器并根据你的需求配置连接器服务
  • 只要连接器可以通过网络发现 Elasticsearch,它就能够对数据建立索引
  • 作为管理员,你可以通过 Kibana 控制连接器

示例:使用连接器框架的 Google Drive 连接器

我们使用连接器框架为 Google Drive 编写了一个简单的连接器。 我们通过创建一个新类来实现新的源,该类的职责是将文档从目标源发送到 Elasticsearch。

注意:本教程与 Elastic stack 版本 8.10 兼容。 对于更高版本,请务必检查连接器发行说明以获取更新并参考 Github 存储库。

我们从具有 BaseDataSource 预期方法签名的 GoogleDriveDataSource 类开始,以配置数据源、检查其可用性(ping)并检索文档。 为了使这个连接器发挥作用,我们需要实现这些方法。

class GoogleDriveDataSource(BaseDataSource):
    """Google Drive"""

    name = "Google Drive"
    service_type = "google_drive"

    @classmethod
    def get_default_configuration(cls):
        """Returns a dict with a default configuration"""
        raise NotImplementedError

    async def ping(self):
        """When called, pings the backend

        If the backend has an issue, raises an exception
        """
        raise NotImplementedError

   async def get_docs(self, filtering=None):
        """Returns an iterator on all documents present in the backend

        Each document is a tuple with:
        - a mapping with the data to index
        - a coroutine to download extra data (attachments)

        The mapping should have least an `id` field
        and optionally a `timestamp` field in ISO 8601 UTC

        The coroutine is called if the document needs to be synced
        and has attachments. It needs to return a mapping to index.

        It has two arguments: doit and timestamp
        If doit is False, it should return None immediately.
        If timestamp is provided, it should be used in the mapping.

        Example:

           async def get_file(doit=True, timestamp=None):
               if not doit:
                   return
               return {'TEXT': 'DATA', 'timestamp': timestamp,
                       'id': 'doc-id'}
        """
        raise NotImplementedError

这个 GoogleDriveDataSource 类是编写 Google Drive 源代码的起点。 通过执行以下步骤,你将实现与 Google Drive 同步数据所需的逻辑:

  • 我们需要将此文件添加到 connectors/sources 中
  • 设置新的连接器名称和 service_type,例如 Google Drive 作为名称,google_drive 作为服务类型 (service type)
  • 要从源获取连接器同步数据,你需要实现:
    • get_default_configuration - 此函数应返回 RichConfigurableFields 的集合。 这些字段允许你从 Kibana UI 配置连接器。 这包括传递身份验证详细信息、凭据和其他特定于源的设置。 Kibana 巧妙地呈现这些配置。 例如,如果你将某个字段标记为 "sensitive": true, Kibana 会出于安全原因屏蔽它。
    • ping - 对数据源的简单调用,验证其状态,将其视为健康检查。
    • get_docs - 此方法需要实现实际从源获取数据的逻辑。 此函数应返回一个异步迭代器,该迭代器返回一个包含以下内容的元组:(document, lazy_download),其中:
      • document - 是远程源中项目的 JSON 表示形式。 (如 name, location, table, author, size 等)
      • lazy_download - 是一个协程,用于下载框架处理的内容提取的对象/附件(例如从 PDF 文档中提取文本)

BaseDataSource 类中还有其他抽象方法。 请注意,如果你只想支持内容同步(例如从谷歌驱动器获取所有数据),则不需要实现这些方法。 它们指的是其他连接器功能,例如:

  • 文档级安全性(get_access_control、access_control_query)
  • 高级过滤规则(advanced_rules_validators)
  • 增量同步 (get_docs_incrementally)
  • 将来可能会添加其他功能

我们如何编写官方 Elasticsearch Google Drive 连接器

首先实现 BaseDataSource 类所需的方法

我们需要实现方法 get_default_configuration、ping 和 get_docs 以使连接器同步数据。 因此,让我们更深入地了解实现。

首先要考虑的是:如何与Google Drive “对话” 来获取数据?

Google 提供了官方的 python 客户端,但它是同步的,因此同步内容可能会很慢。 我们认为更好的选择是 aiogoogle 库,它提供了用异步 python 编写的完整客户端功能。 一开始这可能并不直观,但使用异步操作来提高性能非常重要。 因此,在此示例中,我们选择不使用官方谷歌库,因为它不支持异步模式。

如果你在异步框架中使用同步或阻塞代码,可能会对性能产生重大影响。 任何异步框架的核心都是事件循环。 事件循环允许通过连续轮询已完成的任务并调度新任务来并发执行异步任务。 如果引入阻塞操作,它将停止循环的执行,从而阻止它管理其他任务。 这本质上否定了异步架构提供的并发优势。

下一个问题是连接器身份验证

我们将 Google Drive 连接器验证为服务帐户。 有关身份验证的更多信息可以在这些连接器文档页面中找到。

  • 服务帐户可以使用密钥进行身份验证
  • 我们通过 Elasticsearch 中的 Kibana UI 将身份验证密钥传递给服务帐户

让我们看一下 get_default_configuration 实现,它允许最终用户传递凭证密钥,该凭证密钥将存储在索引中以在同步期间进行身份验证:

class GoogleDriveDataSource(BaseDataSource):
    """Google Drive"""

    name = "Google Drive"
    service_type = "google_drive"

    {...}

    @classmethod
    def get_default_configuration(cls):
        """Get the default configuration for Google Drive.

        Returns:
            dict: Default configuration.
        """
        return {
            "service_account_credentials": {
                "display": "textarea",
                "label": "Google Drive service account JSON",
                "sensitive": True,
                "order": 1,
                "tooltip": "This connectors authenticates as a service account to synchronize content from Google Drive.",
                "type": "str",
                "value": "",
            },
        }

接下来我们来实现一个简单的 ping 方法

我们将对 google Drive api 进行简单的调用,例如 /about 端点。

对于此步骤,我们考虑 GoogleDriveClient 的简化表示。 我们的主要目标是指导你完成连接器创建,因此我们不关注 Google Drive 客户端的实现细节。 然而,最少的客户端代码对于连接器的操作至关重要,因此我们将依赖 GoogleDriveClient 类表示的伪代码。

class GoogleDriveClient(GoogleAPIClient):
    """A google drive client to handle api calls made to Google Drive API."""

    {... google drive client implementation}

    async def ping(self):
        return await self.api_call(resource="about", method="get", fields="kind")



class GoogleDriveDataSource(BaseDataSource):
    """Google Drive"""

    name = "Google Drive"
    service_type = "google_drive"

    {...}

    @cached_property
    def google_drive_client(self):
        """Initialize and return the GoogleDriveClient

        Returns:
            GoogleDriveClient: An instance of the GoogleDriveClient.
        """
        self._validate_service_account_json()

        json_credentials = json.loads(self.configuration["service_account_credentials"])

        return GoogleDriveClient(json_credentials=json_credentials)


    async def ping(self):
        """Verify the connection with Google Drive"""
        try:
            await self.google_drive_client.ping()
            self._logger.info("Successfully connected to the Google Drive.")
        except Exception:
            self._logger.exception("Error while connecting to the Google Drive.")
            raise

异步 iterator 从 google drive 返回文件以进行内容提取

下一步是编写 get_docs 异步迭代器,该迭代器将从 Google drive 和协程返回文件以下载它们以进行内容提取。 根据个人经验,开始将 get_docs 作为一个简单的独立 python 脚本来实现并获取一些数据通常更简单。 一旦 get_docs 代码正常工作,我们就可以将其移动到数据源类。

我们看一下 api 文档,我们可以:

  • 使用文 files/list 端点通过分页迭代 drive 中的文档
  • 使用 files/get 和 files/export 下载文件(或将 google 文档导出为特定文件格式)
class GoogleDriveDataSource(BaseDataSource):
    """Google Drive"""

    name = "Google Drive"
    service_type = "google_drive"

    {...}

    async def get_content(self, file, timestamp=None, doit=None):
        """Extracts the content from a file file.

        Args:
            file (dict): Formatted file document.
            timestamp (timestamp, optional): Timestamp of file last_modified. Defaults to None.
            doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.

        Returns:
            dict: Content document with id, timestamp & text
        """

        # Code details have been omitted here for brevity. For a complete implementation,
        # please refer to the connector code on GitHub.


    async def get_docs(self, filtering=None):
        """Executes the logic to fetch Google Drive objects in an async manner.

        Args:
            filtering (optional): Advenced filtering rules. Defaults to None.

        Yields:
            dict, partial: dict containing meta-data of the Google Drive objects,
                                partial download content function
        """


        async for files_page in self.google_drive_client.list_files():
            async for file in self.prepare_files(files_page=files_page):
                yield file, partial(self.get_content, file)

那么这段代码中发生了什么?

  • list_files 对驱动器中的文件进行分页。
  • prepare_files 将文件元数据格式化为预期模式
  • get_content 是一个下载文件并对其内容进行 Base64 编码的协程(内容提取的兼容格式)

为了简洁起见,省略了一些代码细节。 有关完整的实现,请参阅 GitHub 上的当前连接器实现。

让我们运行连接器!

要将自定义连接器集成到框架中,你需要注册其实现。 通过在 connectors/config.py 的源部分中添加自定义连接器的条目来执行此操作。 对于 Google Drive 示例,添加内容将如下所示:

"sources": {
  ...,
  "google_drive": "connectors.sources.google_drive:GoogleDriveDataSource",
  ...
}

现在在 Kibana 界面中:

  • 转到 Search -> Indices -> Create a new index -> Use a Connector
  • 选择 Customized connector(使用自定义连接器时)
  • 配置你的连接器。 生成 Elasticsearch API 密钥和连接器 ID,并按照说明将这些详细信息放入 config.yml 中,然后启动连接器。

此时,Kibana 应该检测到您的连接器! 安排定期数据同步或只需单击 “Sync” 即可开始完全同步。

连接器可以配置为使用 Elasticsearch 的摄取管道在将数据存储到索引之前对数据执行转换。 一个常见的用例是通过机器学习丰富文档。 例如,你可以:

  • 使用文本嵌入模型分析文本字段,该模型将生成数据的密集向量表示
  • 运行文本分类以进行情感分析
  • 使用命名实体识别 (NER) 从文本中提取关键信息

同步完成后,你的数据将在搜索优化的 Elasticsearch 索引中可用。 此时,你可以深入构建搜索体验或深入分析。

你想创建并贡献一个新的连接器吗?

如果你为可能对 Elasticsearch 社区有所帮助的源创建自定义连接器,请考虑贡献它。 以下是使定制连接器成为 Elastic 支持的连接器的升级路径指南。

贡献连接器的验收标准

此外,在开始花一些时间开发连接器之前,你应该创建一个问题并寻求有关连接器及其将使用哪些库的一些初步反馈。 一旦你的连接器想法得到一些初步反馈,请确保您的项目满足一些验收标准:

  • 在 connectors/sources 中添加模块或目录
  • 实现一个类,该类实现 connectors.source.BaseDataSource 中描述的所有方法
  • 在 connectors/sources/tests 中添加覆盖率 +90% 的单元测试
  • 在源部分的 connectors/config.py 中声明你的连接器
  • 在 requirements.txt 中声明你的依赖项。 确保固定这些依赖项
  • 对于你要添加的每个依赖项(包括间接依赖项),列出所有许可证并在补丁中提供该列表。
  • 确保你的源使用异步库。 如果不可能,请确保没有阻塞循环
  • 如果可能,请提供运行后端服务的 docker 映像,以便我们测试连接器。 如果您无法提供 Docker 映像,请提供针对在线服务运行所需的凭据。
  • 由于 Elasticsearch 分页的默认大小限制为 10k,测试后端需要返回超过 10k 的文档。 从测试后端返回超过 10k 文档将有助于测试连接器

用于测试连接器的支持工具

我们还有一些支持工具来分析连接器代码并运行性能测试。 你可以在这里找到这些资源:

  • Perf8 - 性能库和仪表板,用于分析 python 代码的质量,以评估资源利用率并检测阻塞调用
  • E-2-E 功能测试,利用 perf8 库来分析每个连接器

总结

我们希望这个博客和示例对你有用。

以下是 Elasticsearch 可用的 native connectors 和 connector clients 的完整列表。 如果你没有找到列出的数据源,是否可以创建一个自定义连接器?

以下是与本文相关的一些有用资源:

  • 连接器 GitHub 存储库和文档页面
  • 异步 Python 学习课程
  • 新的自定义连接器社区指南
  • Elastic 连接器框架的许可详细信息(在此链接中搜索 'Connector Framework')

如果你没有 Elastic 帐户,你可以随时启动试用帐户来开始!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1108746.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

推荐 3 个国外的自由职业者/兼职网站,大家有空可以去淘淘金

推荐 3 个国外的自由职业者/兼职网站,大家有空可以去淘淘金 1. Upwork 这个是全球最大的外包网站之一,很多知名公司都会在这里找外包员工 upwork.com 2. fiverr.com 这个平台也是侧重于技能变现,除了专业的职业技能,还有很多稀奇古怪的…

SpringCloud之Ribbon负载均衡解读

目录 基本介绍 概述 LoadBalanced理解 简单源码解读 1)LoadBalancerIntercepor 2)LoadBalancerClient 3)负载均衡策略IRule 4)总结 负载均衡策略 负载均衡策略 自定义负载均衡策略 基本介绍 概述 Ribbon是Netflix发布…

Nginx集群负载均衡配置完整流程

今天,良哥带你来做一个nginx集群的负载均衡配置的完整流程。 一、准备工作 本次搭建的操作系统环境是win11,linux可配置类同。 1)首先,下载nginx。 下载地址为:http://nginx.org/en/download.html 良哥下载的是&am…

浅谈余压监控系统在住宅小区的应用方案

【摘要】: 本文分析了火灾发生时人员伤亡的主要原因——烟雾,并针对该原因提供切实可靠的系统应用解决方案,并通过具体案例,从设计依据、产品选型、系统组网、现场安装等方式介绍余压监控系统,希望可以在火灾发生时较大…

如何使用前端绘图库(D3.js、Chart.js等)?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

紫光同创FPGA实现PCIE测速试验,提供PDS工程和Linux QT上位机源码和技术支持

目录 1、前言免责声明 2、我这里已有的 GT 高速接口解决方案3、设计思路框架PCIE硬件设计PCIE IP核添加和配置驱动文件和驱动安装QT上位机和源码 4、PDS工程详解5、上板调试验证并演示6、福利:工程代码的获取 紫光同创FPGA实现PCIE测速试验,提供PDS工程和…

spring 提前编译:AOT

文章目录 AOT概述GraalvmNative Image演示Native Image构建过程GraalVM安装(1)下载GraalVM 安装C的编译环境 Native Image构建 AOT概述 JIT与AOT的区别 JIT和AOT 这个名词是指两种不同的编译方式,这两种编译方式的主要区别在于是否在“运行时…

vue源码笔记之——响应系统

vue是一种声明式范式编程,使用vue者只需要告诉其想要什么结果,无需关心具体实现(vue内部做了,底层是利用命令式范式) 1. reactive为什么只能操作对象,对于基本数据类型,需要用ref? …

Nginx 代理

目录 正向代理 反向代理 负载均衡 负载均衡的工作原理 优势和好处 算法和策略 应用领域 Nginx 的反向代理 应用场景 在网络通信中,代理服务器扮演着重要的角色,其中正向代理和反向代理是两种常见的代理服务器模式。它们在网络安全、性能优化和…

vue3后台管理系统之pinia及持久化集成使用

安装依赖 pnpm i pinia 在src目录下创建store 创建大仓库 //仓库大仓库 import { createPinia } from pinia //创建大仓库 const pinia createPinia() //对外暴露:入口文件需要安装仓库 export default pinia 全局注册pinia 配置用户仓库pinia管理数据 // 创建用…

JavaSE入门---认识Java数组

文章目录 一. 数组的基本概念1.1 为什么要使用数组?1.2 什么是数组?1.3 数组的使用 二. 数组是引用类型三. 数组的应用场景四. 数组中的常用方法五. 二维数组 一. 数组的基本概念 1.1 为什么要使用数组? 想象这样的一个场景:期末…

混淆技术研究笔记(七)Ant扩展介绍

ant 扩展官方文档:https://ant.apache.org/manual/develop.html Writing Your Own Task 编写你自己的任务 1. 创建一个XXTask类 创建一个Java类继承org.apache.tools.ant.Task ,实际上不继承也可以,定义一个 execute() 方法就可以&#xf…

【python】制作一个windows端自动化工具!

作为一名自动化工程师,这一章,带大家来看看我是如何制作一个windows端的自动化工具,本章节内容我会从基础的环境配置、基础模块介绍、框架设计、实际运用等方面来讲解,对于想要未来从事该行业的人来说,希望这篇文章能给…

公司重要文件防泄密

公司重要文件防泄密是企业管理中一项非常重要的任务,今天分享几个可以防止公司重要文件泄密的方式: 1、建立完善的文件管理制度 企业应该制定严格的文件管理制度,包括文件分类、加密、访问权限的管理等。确保每个员工都了解文件管理制度并严…

【力扣1528】重新排列字符串

👑专栏内容:力扣刷题⛪个人主页:子夜的星的主页💕座右铭:前路未远,步履不停 目录 一、题目描述二、题目分析1、Java代码2、C代码 一、题目描述 给你一个字符串 s 和一个长度相同的整数数组 indices。 请你…

GLIP DetCLIP

1 GLIP: 十分钟解读GLIP:Grounded Language-Image Pre-training - 知乎 Grounded Language-Image Pre-training(GLIP)论文笔记 - 知乎 GLIP的主要贡献如下: 将phrase grounding和目标检测任务统一,将image和text pr…

Docker 快速入门体验

Docker 是什么? Docker 是一个开源项目,它能够自动化部署应用程序,通过所谓的容器来实现。这些容器允许开发者将自己的应用以及依赖打包到一个可移植的容器中,然后发布到任何流行的 Linux 或 Windows 机器上也可以实现虚拟化。Do…

Linux系统中配置系统

在Linux系统中配置系统设置->网络设置代理的详细教程如下: 首先,确保您已经安装了NetworkManager和nmtui。在终端中输入以下命令: sudo apt-get update sudo apt-get install network-manager nmtui 打开系统设置。在桌面上点击“设置”…

文件和命令的查找与处理

1.命令查找 which which 接命令 2.文件查找 find 按文件名字查找 准确查找 find / -name "hosts" 粗略查找 find / -name "ho*ts" 扩展名查找 find / -name "*.txt" 按文件类型查找 find / -type f 文件查找 find / -ty…

哈夫曼树实现哈夫曼编码(C++)

题目要求:根据哈夫曼编码的原理,编写一个程序,在用户输入结点权值的基础上求赫夫曼编码,并能把给定的编码进行译码。 (1)初始化:从键盘输入一字符串(或读入一文件)&…