Elasticsearch:使用 pipelines 路由文档到想要的 Elasticsearch 索引中去

news2025/1/16 17:39:02

路由文件

当应用程序需要向 Elasticsearch 添加文档时,它们首先要知道目标索引是什么。在很多的应用案例中,特别是针对时序数据,我们想把每个月的数据写入到一个特定的索引中。一方面便于管理索引,另外一方面在将来搜索的时候可以按照每个月的索引来进行搜索,这样速度更快,更便捷。

当你处于某种类型的文档总是转到特定索引的琐碎情况时,这似乎很明显,但当你的索引名称可能根据杂项参数(无论它们是否在你的系统外部 - 当前例如日期 - 或者你尝试存储的文档的固有属性 - 大多数时候是文档字段之一的值)。

当发生最后一种情况时(我们指的是索引名称可以变化的情况),在向 Elasticsearch 发出索引命令之前,你的应用程序需要计算目标索引的名称。

此外 —— 即使一开始这看起来像是一种反模式 —— 你可以有多个应用程序需要在索引中索引相同类型的文档,这些文档的名称可能会发生变化。 现在你必须维护跨多个组件重复的索引名称计算逻辑:就可维护性和敏捷性而言,这不是好消息。

Logstash —— Elastic Stack 的一个知名成员 —— 可以帮助集中这样的逻辑,但代价是维护另一个正在运行的软件,这需要配置、知识等。

我们想要在本文中展示的是通过将索引名称计算委托给 Elasticsearch 而不是我们的应用程序来解决此问题的解决方案。

Date index name processor 介绍

该处理器的目的是通过使用日期数学索引名称支持,根据文档中的日期或时间戳字段将文档指向基于正确时间的索引。

处理器根据提供的索引名称前缀、正在处理的文档中的日期或时间戳字段以及提供的日期舍入,使用日期数学索引名称表达式设置 _index 元数据字段。

首先,此处理器从正在处理的文档中的字段中获取日期或时间戳。 或者,可以根据字段值应如何解析为日期来配置日期格式。 然后这个日期、提供的索引名称前缀和提供的日期舍入被格式化为日期数学索引名称表达式。 此处还可以选择日期格式指定日期应如何格式化为日期数学索引名称表达式。

将文档指向每月索引的示例管道,该索引以基于 date1 字段中的日期的 my-index-prefix 开头:

PUT _ingest/pipeline/monthlyindex
{
  "description": "monthly date-time index naming",
  "processors" : [
    {
      "date_index_name" : {
        "field" : "date1",
        "index_name_prefix" : "my-index-",
        "date_rounding" : "M"
      }
    }
  ]
}

使用该管道进行索引请求:

PUT /my-index/_doc/1?pipeline=monthlyindex
{
  "date1" : "2016-04-25T12:02:01.789Z"
}

上面命令运行的结果是:

{
  "_index": "my-index-2016-04-01",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 0,
  "_primary_term": 1
}

上面的请求不会将这个文档索引到 my-index 索引中,而是索引到 my-index-2016-04-01 索引中,因为它是按月取整的。 这是因为 date-index-name-processor 覆盖了文档的 _index 属性。

要查看导致上述文档被索引到 my-index-2016-04-01 中的实际索引请求中提供的索引的日期数学(date-math)值,我们可以使用模拟请求检查处理器的效果。

POST _ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "monthly date-time index naming",
    "processors" : [
      {
        "date_index_name" : {
          "field" : "date1",
          "index_name_prefix" : "my-index-",
          "date_rounding" : "M"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "date1": "2016-04-25T12:02:01.789Z"
      }
    }
  ]
}

上面命令返回结果:

{
  "docs": [
    {
      "doc": {
        "_index": "<my-index-{2016-04-25||/M{yyyy-MM-dd|UTC}}>",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "date1": "2016-04-25T12:02:01.789Z"
        },
        "_ingest": {
          "timestamp": "2023-02-23T01:15:52.214364Z"
        }
      }
    }
  ]
}

以上示例显示 _index 设置为 <my-index-{2016-04-25||/M{yyyy-MM-dd|UTC}}>。 Elasticsearch 将其理解为 2016-04-01,如日期数学索引名称文档中所述。

日期索引名称选项

以下是使用 date index name 的一些选项

名称必要项默认描述
fieldyes-从中获取日期或时间戳的字段。
index_name_prefixno-要在打印日期之前添加的索引名称的前缀。 支持模板片段。
date_roundingyes-将日期格式化为索引名称时如何舍入日期。 有效值是:y(年)、M(月)、w(星期)、d(日)、h(小时)、m(分钟)和 s(秒)。 支持模板片段。
date_formatsnoyyyy-MM-dd'T'HH:mm:ss.SSSXX用于解析正在预处理的文档中的日期/时间戳的预期日期格式的数组。 可以是 Java 时间模式或以下格式之一:ISO8601、UNIX、UNIX_MS 或 TAI64N。
timezonenoUTC解析日期时使用的时区以及日期数学索引支持的时间将表达式解析为具体的索引名称。
localenoENGLISH从正在预处理的文档中解析日期时使用的语言环境,在解析月份名称或工作日时相关。
index_name_formatnoyyyy-MM-dd将解析日期打印到索引名称中时使用的格式。 此处需要一个有效的 Java 时间模式。 支持模板片段。
descriptionno-处理器的描述。 用于描述处理器的用途或其配置。
ifno-有条件地执行处理器。 请参阅有条件地运行处理器。
ignore_failurenofalse忽略处理器的故障。 请参阅处理管道故障。
on_failureno-处理处理器的故障。 请参阅处理管道故障。
tagno-处理器的标识符。 用于调试和指标

使用案例 - 基于时间的时序索引

这是一个众所周知的用例,通常在您要处理日志时发现。这个想法是索引文档,索引的名称由根名称和根据日志事件的日期计算的值组成。 该日期实际上是你要索引的文档的一个字段。

这不是本文的重点,但以这种方式索引文档有几个优点,包括更容易的数据管理、启用冷/暖架构等。让我们举个例子。

假设我们必须处理来自多个来源的数据 —— 例如物联网。 我们的每个对象每分钟都会向不同的后端发送一些数据(是的,这真的很可悲,但我们的对象不通过相同的网络进行通信,因此选择通过多个系统来处理这个问题)。

对象发送的数据被转换成如下所示的 JSON 文档:

POST data/_doc?pipeline=compute-index-name
{
  "objectId": 1234,
  "manufacturer": "SHIELD",
  "payload": "some_data",
  "date": "2019-04-01T12:10:30.000Z"
}

我们有一个用于传输数据的对象的 UID、一个制造商 ID、一个有效负载部分和一个日期字段。

索引名称计算

假设我们要将对象的数据存储在名为 data-{YYYYMMDD} 的索引中,其中根名称是数据后跟日期模式。基于上面的例子,后端收到这个文档应该怎么办呢?

首先它必须解析它以提取日期字段的值,然后它必须根据它在文档中找到的日期计算目标索引名称。 最后,它向 Elasticsearch 向刚刚计算出名称的索引发出索引请求。

document.date = "2019-04-01T12:10:30Z"
index.name = "data-" + "20190401"

在我们的例子中,我们有几个后端必须知道如何计算索引名称,因此必须知道索引的命名逻辑。

如果索引名的计算直接由 Elasticsearch 进行,岂不是更聪明?

Ingest pipeline 的力量

从 Elasticsearch 的第 5 版开始,我们现在有了一种称为摄取的节点。默认情况下,集群的所有节点都具有 ingest 类型。这些节点有权在索引文档之前执行所谓的管道。 管道是一组处理器,每个处理器都可以以某种特定方式转换输入文档。当一个文档被摄入到 Elasticsearch 集群时,它的工作流是这样的。

从上面,我们可以看出来,在文档被写入之前,它必须经过 ingest node 进行处理。我们可以通过 date index name processor 来获得我们想要的 index 名称,进而写入到我们想要的索引中去。 这里有用的是,管道不仅可以转换文档的固有数据,还可以修改文档元数据,特别是它的 _index 属性。

现在让我们回到我们的例子。我们建议定义一个管道来完成这项工作,而不是将索引名称计算委托给应用程序。根据文档,此处理器允许你定义包含日期的字段名称、索引的根名称(前缀)以及计算附加到此前缀的日期的舍入方法。

如果我们想将 IoT 数据添加到模式为 data-{YYYYMMDD} 的索引中,我们只需创建如下所示的管道:

PUT _ingest/pipeline/compute-index-name
{
  "description": "Set the document destination index by appending a prefix and its 'date' field",
  "processors": [
    {
      "date_index_name": {
        "field": "date",
        "index_name_prefix": "data-",
        "date_rounding": "d",
        "index_name_format": "yyyyMMdd"
      }
    }
  ]
}

一个索引 = 一个管道?

好的,现在我们知道如何定义一个管道来为特定的目标索引建立一个名称。 但是我们可以通过操纵文档元数据来做更多的事情!

假设我们有不同类型的文档,每个文档都有一个日期字段,但需要在不同的索引中进行索引。计算目标索引名称的逻辑对于每种文档类型都是相同的,但使用上述策略将导致创建多个管道。

让我们试着做一些更简单和可重用的东西。

回到我们的示例,我们现在有两种文档类型:一种需要在 adata-{YYYYMMDD} 索引(和以前一样)中建立索引,另一种其目的地是名为 new_data-{YYYYMMDD} 的索引。

目标为 new_data 的文档具有以下结构:

{
  "newObjectId": 1234,
  "source": "HYDRA",
  "payload": "some_data",
  "date": "2019-04-02T13:10:30.000Z"
}

该结构与标准 IoT 文档略有不同,但重要的是日期字段存在于两个映射中。

现在我们要定义一个管道来计算我们两种文档类型的目标索引。 我们所要做的就是通过分析通过索引 API 发出的请求目的地来构建目的地索引名称。

PUT _ingest/pipeline/compute-index-name
{
  "description": "Set the document destination index by appending the requested index and its 'date' field",
  "processors": [
    {
      "date_index_name": {
        "field": "date",
        "index_name_prefix": "{{ _index }}-",
        "date_rounding": "d",
        "index_name_format": "yyyyMMdd"
      }
    }
  ]
}

请注意,索引名称前缀现在位于名为_index 的索引元数据字段中。 通过使用这个字段,我们的管道现在是通用的并且可以与任何索引一起使用 —— 假设目标索引是根据相同的规则计算的。

使用我们的 “路由” 管道

现在我们有了一个能够根据文档的日期字段计算目标索引名称的通用管道,让我们看看如何让 Elasticsearch 使用它。

我们可以通过两种方式告诉 Elasticsearch 使用管道,让我们评估一下。

Index API 调用

第一个 —— 也是直接的解决方案——是使用 Index API 的管道参数。

换句话说:每次你想索引一个文档,你必须告诉 Elasticsearch 要使用的管道。

POST data/_doc?pipeline=compute-index-name
{
  "objectId": 1234,
  "manufacturer": "SHIELD",
  "payload": "some_data",
  "date": "2019-04-01T12:10:30.000Z"
}

现在,每次我们通过指示 compute-index-name 管道将文档添加到索引中时,该文档将被添加到正确的基于时间的索引中。 在此示例中,目标索引将为 data-20190401 。

我们提供给 Index API 的 data 索引名称呢? 它可以被看作是一个索引:它只是用来执行 API 调用并且是真正目标索引的根,它不一定存在!

默认管道:引入 “虚拟索引”

索引默认管道(default pipeline)是使用管道的另一种有用方式:当你创建索引时,有一个名为 index.default_pipeline 的设置可以设置为管道的名称,只要你将文档添加到相应的索引就会执行该管道并且没有管道被添加到 API 调用中。 你还可以在索引文档时使用特殊管道名称 _none 来绕过此默认索引。 通过使用此功能,你可以定义我称之为 “虚拟索引” 的内容,并将其与默认管道相关联,该默认管道将充当我们上面看到的路由管道。

让我们将其应用到我们的示例中。

我们假设我们的通用路由管道 compute-index-name 已经创建。 我们现在可以创建一个名为 data 的索引,它将使用此管道作为其默认管道。

PUT data
{
  "settings" : {
    "index" : {
      "number_of_shards" : 3, 
      "number_of_replicas" : 1,
      "default_pipeline" : "compute-index-name"
    }
  }
}

现在,每次我们要求 Elasticsearch 为数据索引中的文档编制索引时,计算索引名称管道将负责该文档的实际路由。 因此,数据索引中永远不会有单个文档被索引,但我们将调用管道的责任完全委托给 Elasticsearch。

运行完上面的命令后,我们来尝试写入一个文档:

POST data/_doc
{
  "objectId": 1234,
  "manufacturer": "SHIELD",
  "payload": "some_data",
  "date": "2019-04-01T12:10:30.000Z"
}

上面的命令返回的结果是:

{
  "_index": "data-20190401",
  "_id": "2DMGfIYBaog4blQ55Qr7",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 1,
  "_primary_term": 1
}

结论

我们刚刚在这里看到了如何利用 Elasticsearch 中的管道功能根据文档的固有属性来路由文档。Ingest pipeline 不仅仅可以替代 Logstash 过滤器:你可以定义复杂的管道,使用多个处理器(一个特定的处理器甚至允许你调用另一个管道)、条件等。有关 ingest pipeline 的更多文章,请参阅 “Elastic:开发者上手指南” 文章中的 “Ingest pipeline” 章节。

在我看来,本文末尾看到的 “虚拟索引” 非常有趣。 包含创建这样一个并非真正的索引的索引只是为了创建路由管道的入口点的功能甚至可以成为 Elasticsearch 的一个新的和有用的功能,为什么不呢?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/365165.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从0开始学python -37

Python3 错误和异常 作为 Python 初学者&#xff0c;在刚学习 Python 编程时&#xff0c;经常会看到一些报错信息&#xff0c;在前面我们没有提及&#xff0c;这章节我们会专门介绍。 Python 有两种错误很容易辨认&#xff1a;语法错误和异常。 Python assert&#xff08;断…

C语言实现用堆解决 TOP-K 问题

目录 TopK函数实现 如何测试 完整源码 生活中我们经常能见到TopK问题&#xff0c;例如&#xff1a;专业前10名、世界500强、富豪榜、游戏中前100的活跃玩家等。 所以&#xff0c;TopK问题即求出一组数据中前K个最大或最小的元素&#xff0c;一般情况下&#xff0c;数据量都…

[ Java ] 时间API在更新,传奇已经谢幕,但技术永远不死

&#xff08;Bill Joy(左一)&#xff0c;Vinod Khosla(左二)&#xff0c;Andy Bechtolsheim(右二)&#xff0c;Scott McNealy(右一) &#xff09; CSDN 博文征集活动&#xff08;和日期相关的代码和bug&#xff09;&#xff1a;点击这里 各位 “big guys”&#xff0c;这篇博文…

【数据结构】顺序表的深度剖析

&#x1f307;个人主页&#xff1a;平凡的小苏 &#x1f4da;学习格言&#xff1a;别人可以拷贝我的模式&#xff0c;但不能拷贝我不断往前的激情 &#x1f6f8;C语言专栏&#xff1a;https://blog.csdn.net/vhhhbb/category_12174730.html &#x1f680;数据结构专栏&#xff…

Dart的安装及环境变量配置

本文介绍dart的安装步骤及环境变量配置&#xff0c;以及如何在vscode中进行开发环境配置。一、dart的安装访问dart官网https://dart.cn/&#xff0c;点击网站右上角的获取DART SDK进行下载页面。如下图&#xff0c;选择下载SDK的zip压缩文件。根据自己的操作系统情况选择合适版…

DOM 文档对象模型

目录 一、简介 二、节点Node 三、document 1、简介 2、document对象的原型链 3、部分属性 四、元素节点 1、如何获取元素节点对象 通过document对象来获取已存在的元素节点 通过document对象来创建元素节点 2、原型链 3、通过元素节点对象获取其他节点的方法 五、…

如何备份网站到本地电脑(适用虚拟主机)

一、mysql数据库备份 登陆主机控制面板&#xff0c;点击左侧的数据库。 在数据库管理页面最下方有备份数据库的操作项目。点击【通过SQL文件导入导出】&#xff0c;进入到导出和导入的页面。 选择【导出/备份】这个选项导出。会在在wwwroot目录生成以时间命名的sql文件。 导出…

ADC模数转换器(基于STM32F407)

简介 Analog-to-digital converters&#xff08;模拟数字转换器&#xff09;&#xff0c;我的STM32F407中内置3个ADC&#xff0c;每个 ADC 有 12 位、10 位、8 位和 6 位可选&#xff0c;ADC 具有独立模式、双重模式和三重模式&#xff0c;对于不同 AD 转换要求几乎都有合适的…

list链表,结点

目录 1.链表 2.list构造函数 3.list的赋值和交换&#xff0c;&#xff0c;assign,swap 4.list大小的操作,size,empty,resize 5.list插入和删除&#xff0c;push_back,pop_back,push_front,pop_front,insert,clear,erase,remove 6.list容器数据存取,front,back 7.list反转…

数字孪生加持,水利水电工程或将实现全生命周期管理

水利水电工程在数字孪生技术的加持&#xff0c;使得建设和运营更加高效和智能化&#xff0c;将工程中各种元素、过程和系统数字化&#xff0c;并建立数字孪生模型&#xff0c;以实现工程建设和运营的智能化管理。数字孪生对水利水电实现对工程建设的全生命周期管理&#xff0c;…

Bean的生命周期和作用域

Bean的生命周期Bean的执行流程&#xff1a;Bean 执行流程&#xff1a;启动Spring 容器 -> 实例化 Bean&#xff08;分配内存空间&#xff0c;从无到有&#xff09;-> Bean 注册到 Spring 中&#xff08;存操作&#xff09; -> 将 Bean 装配到需要的类中&#xff08;取…

《计算机网络:自顶向下方法》实验2:常用网络命令的使用

使用Ping实用程序来测试计算机的网络连通性 登录到Windows中。单击开始,然后将鼠标指针移到程序上,再移到Windows系统,然后单击命令提示符。在命令提示窗口键入ping 127.0.0.1。问题1:发送了多少数据包?接受了多少数据包?丢失了多少数据包? 发送了4个数据包;接受了4个数…

JavaScript Web API实战:7个小众技巧让你的网站瞬间提升用户体验

随着技术的日新月异&#xff0c;为开发人员提供了令人难以置信的新工具和API。但据了解&#xff0c;在100 多个 API中&#xff0c;只有5%被开发人员积极使用。 让我们来看看一些有用的Web API&#xff0c;它们可以帮助您将网站推向月球&#xff01; 1、 截屏接口 Screen Capt…

Blockchain gold经测试完美兼容EVM虚拟机

尽管对于行业人士来说&#xff0c;有关寻找更快更便宜的基础层区块链的对话并不是什么新鲜事。 但随着 Defi Summer 持续一年有余的繁荣增长&#xff0c;更实际的需求——以太坊上高昂的 gas 费用使得开发者时间尤为昂贵。 可以看到的是&#xff0c;作为有着以太坊 CPU 之称的 …

Halcon 拟合直线

本文用 Halcon 的矩阵操作实现最小二乘拟合直线 *首先随机生成一组数据 Mx : [100:10:500] tuple_length(Mx, len) tuple_gen_const(len, 5, r) Ma : 2 Mb : 40 tuple_rand(len, noise) My : Ma * Mx Mb * noise gen_circle(ContCircle, My, Mx, r)接下来用矩阵进行最小二乘求…

一次Linux系统密码修改失败事件

一、事件描述 某业务系统采用移动云主机&#xff0c;某次因误操作导致移动云内嵌密码管理相关Pga进程导致页面无法修改密码&#xff0c;东移动云主机web终端登录也无法修改&#xff0c;密码错误次数最大已无法登录&#xff0c;无奈只能重启主机&#xff0c;修改密码&#xff1b…

如何保证线程的原子性

线程的原子性是指&#xff1a;一个或者一系列指令&#xff0c;它的操作是不可中断的&#xff0c;一旦开始是不允许被其他CPU或线程来中断。 我们来看下述代码&#xff1a;ThreadAtomicityDemo类中有个初始值为0的Integer类型的静态变量count&#xff0c;还有一个每次sleep一毫…

Vue3快速上手

Vue3快速上手 1.Vue3简介 2020年9月18日&#xff0c;Vue.js发布3.0版本&#xff0c;代号&#xff1a;One Piece&#xff08;海贼王&#xff09;耗时2年多、2600次提交、30个RFC、600次PR、99位贡献者github上的tags地址&#xff1a;https://github.com/vuejs/vue-next/release…

微信上制作投票链接在线制作投票链接如果制作投票链接

现在来说&#xff0c;公司、企业、学校更多的想借助短视频推广自己。通过微信投票小程序&#xff0c;网友们就可以通过手机拍视频上传视频参加活动&#xff0c;而短视频微信投票评选活动既可以给用户发挥的空间激发参与的热情&#xff0c;又可以让商家和企业实现推广的目的&…

华为OD机试 - 知识图谱新词挖掘 1(Python)【2023-Q1 新题】

华为OD机试300题大纲 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华为 OD 清单查看地址:blog.csdn.net/hihell/category_12199275.html 华为OD详细说明:https://dream.blog.csdn.net/article/details/128980730 知识图谱新词挖掘…