elasticsearch pipelineI详解:原理与使用

news2025/1/9 21:01:08
码到三十五 : 个人主页

在Elasticsearch的数据处理流程中,Pipeline API为数据的预处理和转换提供了强大的工具。随着Elasticsearch 5.x版本之后Ingest Node的引入,Pipeline API的引入为开发者们提供了更多的灵活性和便利性。本文将对Pipeline API的原理、具体使用方法及其在实际场景中的应用进行更加详细的探讨。

目录

    • 一、Pipeline 背景和原理
      • Elasticsearch 5.0之前的文档预处理
      • Ingest Node的引入
      • Ingest Node的工作原理
      • Ingest Node的配置与灵活性
      • Elasticsearch对Logstash的替代
    • 二、Pipeline API使用
      • 1. 定义 Pipeline
      • 2. 使用 Pipeline
      • 3. 获取 Pipeline 信息
      • 4. 删除 Pipeline
      • 5. 模拟 Pipeline
      • 6. 引用其他 Pipeline
    • 三、Pipeline API应用场景
    • 四、Pipeline 应用方式
    • 五、内置 Processors

一、Pipeline 背景和原理

Elasticsearch 5.0之前的文档预处理

在 Elasticsearch 5.0 版本之前,如果用户希望在文档被索引到 Elasticsearch 之前进行预处理,他们通常需要依赖外部工具,如 Logstash,或者以编程方式/手动进行预处理。这是因为早期的 Elasticsearch 版本并不提供文档预处理或转换的能力,它仅仅是将文档按原样索引。

Ingest Node的引入

从 Elasticsearch 5.x 版本开始,为了解决这个问题,Elasticsearch 引入了一个名为 ingest node 的功能。Ingest node 为 Elasticsearch 本身提供了文档预处理和丰富的轻量级解决方案。这意味着用户可以在 Elasticsearch 内部直接对文档进行预处理,而无需依赖外部工具。

Ingest Node的工作原理

当数据进入 Elastic 集群并指定了特定的 Pipeline 时,Elasticsearch 中的 ingest node 会按照定义好的处理器(processor)顺序对数据进行操作和处理。这种预处理是通过截取批量和索引请求在 ingest node 上执行的,处理完成后将文档传递回索引或批量 API。

在这里插入图片描述

要在索引之前预处理文档,用户必须定义一个 Pipeline。Pipeline 是一系列处理器的集合,用于转换传入的文档。每个处理器都以某种方式转换文档,并且它们按照在 Pipeline 中定义的顺序执行。

要使用 Pipeline,用户只需在索引或批量请求上指定 pipeline 参数,告诉 ingest node 使用哪个 Pipeline。

Ingest Node的配置与灵活性

如果使用默认配置实现 Elasticsearch 节点,默认情况下将启用 master、data 和 ingest 功能,这意味着节点将充当主节点、数据节点和提取节点。但是,如果用户在 elasticsearch.yml 文件中配置了 node.ingest: false,则该节点上的 ingest 功能将被禁用。

与 Logstash 相比,Elasticsearch 的 ingest node 提供了更高的灵活性。因为用户可以通过编程的方式随时修改 Pipeline,而无需重启整个 Logstash 集群。

Elasticsearch对Logstash的替代

随着新的 ingest 功能的发布,Elasticsearch 已经取出了 Logstash 的部分功能,特别是其过滤器部分。这意味着用户现在可以在 Elasticsearch 中直接处理原始日志,而无需先通过 Logstash 进行过滤和预处理。这进一步简化了数据处理流程,并提高了系统的整体性能。

二、Pipeline API使用

要使用Pipeline API,首先需要定义Pipeline。Pipeline由两部分组成:描述(description)和处理器列表(processor list)。

  • 描述(Description):这是一个非必需字段,用于存储关于Pipeline的一些描述性信息,如用途、作者等。虽然这个字段不是必需的,但它对于理解和维护Pipeline非常有帮助。
  • 处理器列表(Processor List):这是Pipeline的核心部分,它定义了用于转换文档的处理器序列。每个处理器以某种方式转换文档,如替换文本、转换数据类型、删除字段等。处理器按照在Pipeline中定义的顺序执行。

Elasticsearch提供了大约20个内置的处理器,这些处理器可以在构建Pipeline时使用。此外,还可以使用一些插件提供的处理器,如Ingest Attachment用于处理附件数据、Ingest Geo-IP用于根据IP地址提取地理位置信息等。这些插件增强了Pipeline的数据处理能力。

定义好Pipeline后,就可以通过在索引或批量请求上指定Pipeline参数来使用它。例如,当通过POST请求将数据发送到指定索引时,可以带上pipeline参数来指定使用的Pipeline。

1. 定义 Pipeline

使用 PUT 请求和 _ingest/pipeline/<pipeline_id> 端点来定义一个新的 Pipeline 或更新一个已存在的 Pipeline。Pipeline 的定义包含了一个可选的 description 字段和一个 processors 列表。

例如,定义一个名为 firstpipeline 的 Pipeline,它将消息字段(message)中的值转换为大写:

PUT _ingest/pipeline/firstpipeline
{
  "description": "将 message 字段中的值转换为大写",
  "processors": [
    {
      "uppercase": {
        "field": "message"
      }
    }
  ]
}

2. 使用 Pipeline

要在索引文档之前使用定义的 Pipeline,只需在索引或批量请求的 URL 中添加 ?pipeline=<pipeline_id> 参数。

例如,使用之前定义的 firstpipeline 来索引一个文档:

PUT my_index/_doc/1?pipeline=firstpipeline
{
  "name": "pipeline",
  "message": "this is so cool!"
}

执行上述请求后,索引到 my_index 中的文档将具有大写形式的 message 字段。

3. 获取 Pipeline 信息

使用 GET 请求和 _ingest/pipeline 端点可以检索现有 Pipeline 的定义。

例如,要获取所有 Pipeline 的定义:

GET _ingest/pipeline

或者,要获取特定 Pipeline(如 secondpipeline)的定义:

GET _ingest/pipeline/secondpipeline

4. 删除 Pipeline

使用 DELETE 请求和 _ingest/pipeline/<pipeline_id> 端点可以删除一个 Pipeline。

例如,删除名为 firstpipeline 的 Pipeline:

DELETE _ingest/pipeline/firstpipeline

5. 模拟 Pipeline

使用 _simulate 端点可以模拟 Pipeline 的执行,而不实际索引文档。这对于测试 Pipeline 定义和查看预期结果非常有用。

例如,模拟 secondpipeline 对提供的文档集的执行:

POST _ingest/pipeline/secondpipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "name": "pipeline",
        "message": "this is so cool!"
      }
    },
    {
      "_source": {
        "name": "nice",
        "message": "this is nice!"
      }
    }
  ]
}

上述请求将返回模拟执行后的文档,并显示每个文档经过 Pipeline 处理后的结果。

6. 引用其他 Pipeline

在 Pipeline 的定义中,还可以引用其他已存在的 Pipeline。这允许用户创建复杂的文档处理流程,通过组合多个 Pipeline 来实现。

例如,先定义一个 pipelineA,然后在 pipelineB 中引用它:

PUT _ingest/pipeline/pipelineA
{
  "description": "内部 Pipeline",
  "processors": [
    {
      "set": {
        "field": "inner_pipeline_set",
        "value": "inner"
      }
    }
  ]
}

PUT _ingest/pipeline/pipelineB
{
  "description": "外部 Pipeline",
  "processors": [
    {
      "pipeline": {
        "name": "pipelineA"
      }
    },
    {
      "set": {
        "field": "outer_pipeline_set",
        "value": "outer"
      }
    }
  ]
}

在上述示例中,当使用 pipelineB 索引文档时,首先会执行 pipelineA 的处理器,然后再执行 pipelineB 中定义的其他处理器。

三、Pipeline API应用场景

Pipeline API在数据预处理方面有着广泛的应用。以下是一些具体的应用场景:

  1. 数据清洗:通过Pipeline API,可以在数据索引到Elasticsearch之前对数据进行清洗,去除无用的字段、转换数据类型、处理缺失值等。这有助于确保数据的准确性和一致性。

  2. 日志处理:对于日志数据,Pipeline API非常有用。它可以用于解析和格式化日志数据,提取出有用的字段进行索引,以便于后续的查询和分析。例如,可以使用Grok处理器来解析复杂的日志行。

  3. 数据增强:除了基本的数据清洗和转换外,Pipeline API还可以用于数据增强。例如,通过Ingest Geo-IP插件,可以根据IP地址提取出地理位置信息并添加到文档中;通过Ingest User-Agent插件,可以解析用户代理字符串并提取出浏览器、操作系统等信息。

  4. 动态修改Pipeline:由于Pipeline API支持编程方式修改,因此可以根据实际需求动态地修改Pipeline。这意味着当数据格式或处理需求发生变化时,无需修改源代码或重启Elasticsearch集群,只需通过API调用即可更新Pipeline。

四、Pipeline 应用方式

  1. 在 Bulk API 中使用

    使用 Bulk API 时,可以指定 pipeline 来预处理批量文档。例如:

    POST _bulk
    {"index": {"_index": "my_index", "_id" : "1", "pipeline": "my_pipeline"}}
    {"name": "zhang san", "category": "sports"}
    

    对于 Bulk API 请求,可以包含多个操作(如 index, update, delete 等),并为每个操作指定不同的 pipeline(如果需要)。

  2. 在 Beats 中使用

    在 Filebeat 或其他 Beats 中,可以通过配置 pipeline processor 来预处理事件数据。这允许在数据发送到 Elasticsearch 之前进行必要的转换和增强。具体可参阅 Elastic 官方文档中关于 Beats 和 pipeline processor 的部分。

  3. 在 Reindex API 中使用

    当从一个索引重新索引到另一个索引时,可以使用 pipeline 来预处理数据。例如:

    POST _reindex
    {
      "source": {
        "index": "source_index"
      },
      "dest": {
        "index": "destination_index",
        "pipeline": "some_ingest_pipeline"
      }
    }
    

    这样,从 source_index 重新索引到 destination_index 的所有文档都将通过 some_ingest_pipeline 进行预处理。

  4. 在 Enrich Processors 中使用

    Elasticsearch 的 enrich processor 允许你根据其他索引中的数据进行数据丰富。结合 ingest pipeline,可以在数据丰富之前对文档进行预处理。例如,可以在 enrich processor 之前使用 pipeline 来提取或转换字段,以确保它们可用于 enrich processor。

  5. 在 Update By Query API 中使用

    使用 Update By Query API 更新索引中的文档时,可以通过指定 pipeline 来预处理这些文档。例如:

    POST my_index/_update_by_query?pipeline=my_pipeline
    {
      "query": {
        "match": {
          "some_field": "some_value"
        }
      }
    }
    

    上述请求将更新 my_index 中满足 some_field: some_value 条件的文档,并在更新前通过 my_pipeline 对它们进行预处理。

  6. 在索引中设置 Default Pipeline

    对于特定索引,可以通过设置默认 pipeline 来确保所有新索引的文档都经过该 pipeline 的处理。例如:

    PUT my_index
    {
      "settings": {
        "index.default_pipeline": "my_pipeline"
      }
    }
    

    此后,任何索引到 my_index 的新文档都将默认通过 my_pipeline 进行预处理。注意,在较新版本的 Elasticsearch 中,设置方式可能有所变化,请查阅相应版本的官方文档。

五、内置 Processors

默认情况下,Elasticsearch 提供大量的ingest处理器。 可以在地址https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html 找到已经为我设计好的内置的 processors。下面是一些常见的一些 processor :

在这里插入图片描述


关注以下公众号获取更多深度内容,纯干货 !

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2055554.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode322. 零钱兑换,完全背包最值问题,附背包问题模板

leetcode322. 零钱兑换 给你一个整数数组 coins &#xff0c;表示不同面额的硬币&#xff1b;以及一个整数 amount &#xff0c;表示总金额。 计算并返回可以凑成总金额所需的 最少的硬币个数 。如果没有任何一种硬币组合能组成总金额&#xff0c;返回 -1 。 你可以认为每种…

人机环境系统智能已经超越了传统的空间智能和物理世界的概念

人机环境系统智能已经超越了传统的空间智能和物理世界的概念&#xff0c;进入了更为复杂的层次。在人机环境系统中&#xff0c;智能不仅涉及对物理世界的感知和理解&#xff0c;还包括对人类语言、情感、意图等的理解和生成。人工智能技术的应用&#xff0c;如自然语言处理、机…

基于UE5和ROS2的激光雷达+深度RGBD相机小车的仿真指南(三)---创建自定义激光雷达Componet组件

前言 本系列教程旨在使用UE5配置一个具备激光雷达深度摄像机的仿真小车&#xff0c;并使用通过跨平台的方式进行ROS2和UE5仿真的通讯&#xff0c;达到小车自主导航的目的。本教程默认有ROS2导航及其gazebo仿真相关方面基础&#xff0c;Nav2相关的学习教程可以参考本人的其他博…

Kubernetes的快速安装

一、kubernetes的基本概念 1.kubernetes Kubernetes 是一个开源的开源的分布式编排技术&#xff0c;Kubernetes 致力于提供跨主机集群的自动部署、扩展、高可用以及运行应用程序容器的平台&#xff0c;其遵循主从式架构设计、组件可以分为工作节点 (Node) 组件&#xff0c;和控…

基础第3关:LangGPT结构化提示词编写实践

提示词&#xff1a; # Role: 伟大的数学家 ## Profile - author: LangGPT - version: 1.0 - language: 中文 - description: 一个伟大的数学家&#xff0c;能够解决任何的数学难题 ## Goals: 根据关键词进行描述&#xff0c;避免与已有描述重复。 ## Background: 你正在被…

2024网安创新大赛,美创科技产品方案双获奖!

2024年网络安全优秀创新成果大赛 “2024年网络安全优秀创新成果大赛”是国家网络安全宣传周重要活动之一。大赛由中央网信办指导、中国网络安全产业联盟&#xff08;CCIA&#xff09;主办。 近日&#xff0c;“2024年网络安全优秀创新成果大赛-杭州分站赛” 正式公布评选结果。…

强!小目标检测全新突破!检测速度快10倍,GPU使用减少73.4%

强&#xff01;小目标检测全新突破&#xff0c;提出Mamba-in-Mamba结构&#xff0c;通过内外两层Mamba模块&#xff0c;同时提取全局和局部特征&#xff0c;实现了检测速度快10倍&#xff0c;GPU使用减少73.4&#xff05;的显著效果&#xff01; 【小目标检测】是近年来在深度…

点灯案例练习(基于寄存器)

目录 一、需求描述 二、工程创建 二、硬件电路设计 三、软件设计 1、main.c 1、开启时钟 2、配置GPIOA的工作模式 3、设置PA1、PA8端口低电平 4、给死循环保持状态 2、最终代码如下 四、实验现象 前面&#xff0c;我们耗费大量时间&#xff0c;终于点亮了STM32板子上的…

WLAN基础知识(1)

WLAN&#xff1a; 无线局域网&#xff0c;无线技术&#xff1a;Wi-Fi、红外、蓝牙等 WLAN设备&#xff1a; 胖AP&#xff1a; 适用于家庭等小型网络&#xff0c;可独立配置&#xff0c;如&#xff1a;家用Wi-Fi路由器 瘦AP&#xff1a; 适用于大中型企业&#xff0c;需要配合AC…

【Kettle】新建转换工程

目录 一、新建一个转换工程1. 创建【转换】工程2. 创建输入对象并编辑步骤3. 创建输出对象并编辑步骤 二、运行转换工程和查看执行结果1. 运行转换工程2. 查看执行结果 一、新建一个转换工程 1. 创建【转换】工程 在 Kettle 欢迎界面中&#xff0c;依次点击【新建】->【转…

其实你就学不会 Python

标题党一下&#xff0c;Python 程序员成千上万&#xff0c;当然有很多人学得会。这里说的“你”&#xff0c;是指职场中的非专业人员。 职场人员一般会用 Excel 处理数据&#xff0c;但也会有很多无助的情况&#xff0c;比如复杂计算、重复计算、自动处理等&#xff0c;再遇上个…

中石油笔试25届秋招考什么?如何通过在线测评|附真题库面试攻略

职小豚 一、中石油公司介绍 嘿&#xff0c;小伙伴们&#xff01;今天咱们来聊聊大名鼎鼎的中石油。 中石油&#xff0c;那可是能源领域的巨无霸&#xff01;它就像一座庞大的能源宝库&#xff0c;为我们的生活和国家的发展源源不断地输送着动力。 中石油在国内外的油气勘探…

如何优雅的薅羊毛之Flux.1免费使用还支持中文prompt

我看硅基流动&#xff0c;现在免费用Flux.1的模型了&#xff0c;就注册了一个账号 但是Flux和之前的sd一样&#xff0c;中文理解力有问题 换哪个模型都不成&#xff0c;直接换英文提示词还行 放DIFY里串一下 我看tool里没有&#xff0c;那就自定义一个 DIFY要求schema要满足op…

SpringCloud天机学堂:分布式任务调度

SpringCloud天机学堂&#xff1a;分布式任务调度 文章目录 SpringCloud天机学堂&#xff1a;分布式任务调度1、分布式任务调度2、分布式任务调度原理3、分布式任务调度技术对比4、XXL-JOB介绍部署调度中心定义任务注册执行器配置任务调度执行一次 1、分布式任务调度 一般的定时…

43.x86游戏实战-XXX寻找吸怪坐标

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 工具下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1rEEJnt85npn7N38Ai0_F2Q?pwd6tw3 提…

( Neurocomputing,2023)Relphormer:用于知识图谱表示的关系图Transformer

Relphormer:Relational Graph Transformer for Knowledge Graph Representations 资料 论文&#xff1a;Relphormer:Relational Graph Transformer for Knowledge Graph Representations 代码&#xff1a;https://github.com/zjunlp/Relphormer 摘要 Transformer在包括自然…

提高网站并发量的有效策略有哪些?

提高网站并发量的有效策略有哪些&#xff1f; 1. 静态化 & 模板引擎2. 分离静态资源3. 数据库优化4. 缓存技术5. 镜像部署6. 负载均衡7. CDN加速 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 1. 静态化 & 模板引擎 HTML静态化&a…

8月19日笔记

http隧道搭建(续) ABPTTS安装使用 一款基于 SSL 加密的 HTTP 端口转发工具&#xff0c;全程通信数据加密&#xff0c;比 reGerog 都要稳定。使用 python2 编写&#xff0c;但是该工具只支持 aspx 和 jsp 脚本的网站。 下载地址&#xff1a;https://github.com/nccgroup/ABPTT…

CentOS7上安装RabbitMQ

在 CentOS 7 上安装 RabbitMQ 需要一些步骤&#xff0c;包括安装必要的依赖项、启用 RabbitMQ 源以及安装 RabbitMQ 服务器。以下是详细的步骤&#xff1a; 1. 更新系统 首先&#xff0c;确保系统是最新的&#xff1a; sudo yum update -y2. 安装 Erlang RabbitMQ 依赖于 E…

【Python】成功解决 ModuleNotFoundError: No module named ‘PIL‘

【Python】成功解决 ModuleNotFoundError: No module named ‘PIL’ 下滑即可查看博客内容 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我静心耕耘深度学习领域、真诚分享知识与智慧的小天地&#xff01;&#x1f387; &#x1f393; 博主简介&#xff1a;985高…