Elasticsearch 索引数据预处理

news2025/1/22 16:11:30

pipeline

在文档写入 ES 之前,对数据进行预处理(ingest)工作通过定义 pipeline 和 processors 实现。

注意:数据预处理必须在 Ingest node 节点处理,ES 默认所有节点都是 Ingest node。

如果需要禁用 Ingest ,可以在 elasticsearch.yaml 配置:

node.ingest: false

pipeline Demo

创建 pipeline

# 创建名称为:pipeline_uppercase pipeline
# processors 包含一个 processor :将 message 字段的内容转换为大写
PUT _ingest/pipeline/pipeline_uppercase
{
  "description": "uppercase field message",
  "processors": [
    {
      "uppercase": {
        "field": "message",
        "ignore_missing": true
      }
    }
  ]
}

写入数据

# 指定使用的 pipeline 名字
POST index_data/_doc?pipeline=pipeline_uppercase
{
  "name": "pipeline",
  "message": "this is so cool!"
}

查看数据

GET index_data/_search

# 结果显示
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "index_data",
        "_type" : "_doc",
        "_id" : "lIiXsnYBv8g5MehmL98X",
        "_score" : 1.0,
        "_source" : {
          "name" : "pipeline",
          "message" : "THIS IS SO COOL!"
        }
      }
    ]
  }
}

模拟pipeline

GET /_ingest/pipeline/pipeline_uppercase/_simulate
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "message": "this is so cool!"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "message": "elasticsearch"
      }
    }
  ]
}

# 测试结果
{
  "docs" : [
    {
      "doc" : {
        "_index" : "index",
        "_type" : "_doc",
        "_id" : "id",
        "_source" : {
          "message" : "THIS IS SO COOL!"
        },
        "_ingest" : {
          "timestamp" : "2020-12-30T07:44:47.1443329Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "index",
        "_type" : "_doc",
        "_id" : "id",
        "_source" : {
          "message" : "ELASTICSEARCH"
        },
        "_ingest" : {
          "timestamp" : "2020-12-30T07:44:47.1443329Z"
        }
      }
    }
  ]
}

内置 processor

ES 内置了大量 processor。

常用 processor :

  • convert
  • Date
  • Drop
  • Grok
  • Dissect
  • Remove
  • Rename
  • Set
  • URI

参考官方文档:

ES 还提供了 ingest 插件,需要自己安装

  • Ingest-geoip: 是地理位置处理的数据库插件,在最新的版本已经不作为插件发布,合并到geoip processor。
  • ingest-user-agent:扩展浏览器请求信息,在最新的版本已经不作为插件发布,合并到User agent processor。
  • ingest-attachment:该插件扩展ES处理文本文件的能力, 使用它可以实现对(PDF,DOC,EXCEL等)主流格式文件的文本抽取及自动导入。处理的Field必须是Base64格式的二进制编码。

使用 Ingest API

# 创建pipeline
PUT /_ingest/pipeline/<pipeline> 

# 查询pipeline
GET /_ingest/pipeline/<pipeline> 
GET /_ingest/pipeline

# 删除pipeline
DELETE /_ingest/pipeline/<pipeline>

# 模拟pipeline
POST /_ingest/pipeline/<pipeline>/_simulate
GET /_ingest/pipeline/<pipeline>/_simulate
POST /_ingest/pipeline/_simulate
GET /_ingest/pipeline/_simulate

自定义 processor 插件

自定义 processor 可以仿照 ES 官方插件实现自己的 processor 处理。直接完毕直接安装插件就可以使用。

**初始化 IngestPlugin 插件 **

public class CoordinateConvertPlugin extends Plugin implements IngestPlugin {    			
    @Override    
    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {        
        return Collections.singletonMap(CoordinateProcessor.TYPE, new CoordinateProcessor.Factory());    
    }
}

IngestPlugin内我们只需要实现 getProcessors 创建 processsor 工厂方法,其中 CoordinateProcessor.TYPE 是 processor 名称不能重复。ES框架会遍历所有的 IngestPlugin,加载 Processor。

new CoordinateProcessor.Factory() 是初始化 Processor.Factory 工厂类。

Processor.Factory工厂实现

Processor.Factory主要是定义processor的命令行格式和初始化CoordinateProcessor。

public static final class Factory implements Processor.Factory {
        static final Set<Property> DEFAULT_PROPERTIES = EnumSet.allOf(Property.class);

        @Override
        public Processor create(Map<String, Processor.Factory> processorFactories, String tag,
                                Map<String, Object> config) throws Exception {
            String field = readStringProperty(TYPE, tag, config, "field");
            String targetField = readStringProperty(TYPE, tag, config, "target_field", "coordinate");
            List<String> propertyNames = readOptionalList(TYPE, tag, config, "properties");
            boolean ignoreMissing = readBooleanProperty(TYPE, tag, config, "ignore_missing", false);

            final Set<Property> properties;
            if (propertyNames != null) {
                properties = EnumSet.noneOf(Property.class);
                for (String fieldName : propertyNames) {
                    try {
                        properties.add(Property.parse(fieldName));
                    } catch (Exception e) {
                        throw newConfigurationException(TYPE, tag, "properties", "illegal field option [" +
                            fieldName + "]. valid values are " + Arrays.toString(Property.values()));
                    }
                }
            } else {
                properties = DEFAULT_PROPERTIES;
            }

            return new CoordinateProcessor(tag, field, targetField, properties, ignoreMissing);
        }
    }

Processor实现

CoordinateProcessor 是AbstractProcessor 的实现,所有的 processor 都是实现 Processor 接口,ES 框架定义了抽象类AbstractProcessor,不同的 Processor 实现各自不同的处理逻辑。

public class CoordinateProcessor extends AbstractProcessor {
    public static final String TYPE = "location_coordinate";
    public CoordinateProcessor(String tag, String field, String targetField, Set<Property> properties, boolean ignoreMissing) {
        super(tag);
        this.field = field;
        this.targetField = targetField;
        this.properties = properties;
        this.ignoreMissing = ignoreMissing;
        this.mapIDRevertLonLat = new MapIDRevertLonLat();
    }

    @Override
    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
        Map<String, Object> additionalFields = new HashMap<>();
		String input = ingestDocument.getFieldValue(field, String.class);
		// 此处省略...
		additionalFields.put("longitude", value[0]);
        additionalFields.put("latitude", value[1]);
		
        return ingestDocument;
    }

    @Override
    public String getType() {
        return TYPE;
    }
}

这样我们只需要在 execute 中实现自己的 processor 处理逻辑就实现了自己的 processor 了。

效果测试

创建 pipeline,process 指定为 location_coordinate。

PUT _ingest/pipeline/coordinate
{
  "description": "Extract single location coordinate information",
  "processors": [
    {
      "location_coordinate": {
        "field": "url",
        "ignore_missing": true
      }
    }
  ]
}

使用 pipeline

POST /coordinate_test/_doc?pipeline=coordinate
{
  "url": "x=25357&y=6538&level=15"
}

测试效果

{
        "_index" : "coordinate_test",
        "_type" : "_doc",
        "_id" : "l4i4snYBv8g5MehmA9_v",
        "_score" : 1.0,
        "_source" : {
          "coordinate" : {
            "level" : 15,
            "latitude" : "18.17",
            "x" : 25357,
            "y" : 6538,
            "longitude" : "98.59"
          },
          "url" : "x=25357&y=6538&level=15"
        }
      }

小结

ES的 Ingest node pipeline 功能很强大,具有强大的数据处理的能力,官方提供了丰富的 processes,用户可以灵活选择,也可以通过自定义 IngestPlugin 实现更为复杂的操作。既可以灵活的变更索引的结构和数据,又可以减少对业务代码的侵入。对数据清洗处理的任务提供了一种轻量级的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2204286.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java中的拦截器、过滤器及监听器

过滤器&#xff08;Filter&#xff09;监听器&#xff08;Listener&#xff09;拦截器&#xff08;Interceptor&#xff09;关注点web请求系统级别参数、对象Action&#xff08;部分web请求&#xff09;如何实现函数回调事件Java反射机制&#xff08;动态代理&#xff09;应用场…

《大道平渊》· 廿贰 —— 杀心篇:独立人格的形成

《大道平渊》 独立人格的形成&#xff0c;在杀心的过程中会越来越完备。 在这个漫长的过程中&#xff0c;你会一次次击碎自己固有的三观&#xff0c;慢慢再修复你的三观。 . 不要认为一个人的明白&#xff0c;都是恍然大悟&#xff0c;都是碰到了高人指点。 并不是这样的&a…

使用 Raspberry Pi Pico W 的基于 MQTT 的分布式网络自适应估计

英文论文标题&#xff1a;MQTT based Adaptive Estimation over Distributed Network using Raspberry Pi Pico W 中文论文标题&#xff1a;使用 Raspberry Pi Pico W 的基于 MQTT 的分布式网络自适应估计 作者信息&#xff1a; Prantaneel DebnathAnshul GusainParth Sharm…

46 C 语言文件的打开与关闭、写入与读取函数:fopen、fclose、fputc、fputs、fprintf、fgetc、fgets、fscanf

目录 1 文件的存储形式 2 打开文件——fopen() 函数 2.1 功能描述 2.2 函数原型 2.3 文件打开方式&#xff08;模式&#xff09; 3 关闭文件——fclose() 函数 3.1 功能描述 3.2 函数原型 4 常见的文件写入方式 4.1 fputc() 函数 4.1.1 功能描述 4.1.2 函数原型 4…

第四范式发布全新一代文档数字化管理平台Smart Archive 2.0

产品上新 Product Release 今日&#xff0c;第四范式正式推出全新一代文档数字化管理平台——Smart Archive 2.0。该产品基于第四范式自研的文档处理大模型&#xff0c;实现零样本下对企业文档的精准识别及信息提取。文档处理大模型利用二十多个行业&#xff0c;上百种场景下的…

【华为】默认路由配置

1.配置接入层&#xff1a; LSW1&#xff08;LSW3同理&#xff09;: vlan batch 10 20 in g0/0/1 port link-type ac port default vlan 10 in g0/0/2 port link-type ac port default vlan 20 in g0/0/24 port link-type tr port tr allow-pass vlan 10 20 2.配置汇聚层&#x…

Spring Boot 集成 LiteFlow 实现业务流程编排

LiteFlow 是一款轻量级的流程编排框架,它允许开发者通过简单的配置方式,将复杂的业务流程分解为多个独立的节点,然后通过定义规则来编排节点,达到解耦业务逻辑、提高代码可维护性的目的 1. LiteFlow 的基本概念 在 LiteFlow 中,主要有以下几个概念: 节点 (Node):代表一…

2015年国赛高教杯数学建模C题月上柳梢头解题全过程文档及程序

2015年国赛高教杯数学建模 C题 月上柳梢头 月上柳梢头&#xff0c;人约黄昏后”是北宋学者欧阳修的名句&#xff0c;写的是与佳人相约的情景。请用天文学的观点赏析该名句&#xff0c;并进行如下的讨论&#xff1a;   1. 定义“月上柳梢头”时月亮在空中的角度和什么时间称为…

SKG未来健康校招社招入职测评:综合能力及性格问卷SHL测评题库

SKG未来健康科技股份有限公司在校招和社招过程中使用的SHL测评题库主要考察应聘者的综合能力和性格特征。以下是对这些测评的简要分析&#xff1a; 综合能力测评&#xff1a; 测评时间&#xff1a;46分钟&#xff08;实际答题时间36分钟&#xff09; 题目数量&#xff1a;30题…

多jdk版本环境下,jenkins系统设置需指定JAVA_HOME环境变量

一、背景 由于不同项目对jdk版本的要求不同&#xff0c;有些是要求jdk11&#xff0c;有些只需要jdk8即可。 而linux机器上安装jdk的方式又多种多样&#xff0c;最后导致jenkins打包到底使用的是哪个jdk&#xff0c;比较混乱。 1、java在哪 > whereis java java: /usr/bin/…

不到千元的自动猫砂盆是智商税吗?这四大选购技巧不看就亏大了

虽然现在的人都说&#xff0c;猫砂盆等上班一天回来再清理也没有任何关系&#xff0c;但实际上在这一天里&#xff0c;猫咪的粪便已经在猫砂盆里滋生了很多无法察觉的细菌&#xff0c;久而久之就会影响猫咪的健康&#xff0c;导致尿闭&#xff0c;放了一天的便便臭味也让人无法…

数据结构与算法——Java实现 32.堆

人的想法和感受是会随着时间的认知改变而改变&#xff0c; 原来你笃定不会变的事&#xff0c;也会在最后一刻变得释然 —— 24.10.10 堆 堆是基于二叉树实现的数据结构 大顶堆每个分支的上一个节点的权值要大于它的孩子节点 小顶堆每个分支的上一个节点的权值要小于它的孩子…

PyQt5写好的py文件生成可执行的exe文件【Pyinstaller】

文章目录 pyinstaller介绍特点 1.单个py文件2.多个py文件3.程序图标设置4.打包形式(1)单个exe文件(2)文件夹 5.程序开始前的加载中图片6.UPX打包压缩问题解决办法 7.指令总结 pyinstaller介绍 pyinstaller 属于Python第三方库&#xff0c;可以将py文件在不同平台上打包为exe可执…

鸿蒙NEXT开发-沉浸式导航和键盘避让模式(基于最新api12稳定版)

注意&#xff1a;博主有个鸿蒙专栏&#xff0c;里面从上到下有关于鸿蒙next的教学文档&#xff0c;大家感兴趣可以学习下 如果大家觉得博主文章写的好的话&#xff0c;可以点下关注&#xff0c;博主会一直更新鸿蒙next相关知识 专栏地址: https://blog.csdn.net/qq_56760790/…

架构与思维:漫谈高并发业务的CAS及ABA

1 高并发场景下的难题 1.1 典型支付场景 这是最经典的场景。支付过程&#xff0c;要先查询买家的账户余额&#xff0c;然后计算商品价格&#xff0c;最后对买家进行进行扣款&#xff0c;像这类的分布式操作&#xff0c;如果是并发量低的情况下完全没有问题的&#xff0c;但如果…

企业级私有化即时通讯软件:高效沟通与安全保障的优选

在当今这个信息化高速发展的时代&#xff0c;企业面临着前所未有的沟通挑战与信息安全压力。为了应对这些挑战&#xff0c;企业级私有化即时通讯软件应运而生&#xff0c;它不仅优化了内部沟通流程&#xff0c;还确保了数据的安全性与管理的深度需求得到满足。以下将从“助力大…

k8s部署jenkins集群,配置集群kubernetes plugin的pod模板

一、配置集群 填写k8s地址&#xff1a;https://kubernetes.default.svc.cluster.local 命名空间&#xff1a;kubernetes-plugin Jenkins地址&#xff1a;http://jenkins:18080 Jenkins通道&#xff1a;jenkins:50000 jenkins是容器别名 设置jenkinsslave的标签属性 二、…

2024年CSS @规则(At-rules)新增数量超过过去十年的总和,CSS @规则(At-rules)详解系列目录

2024年CSS 规则(At-rules)新增数量超过过去十年的总和&#xff0c; CSS 规则(At-rules)详解系列目录 本文目录&#xff1a; 零、时光宝盒 一、CSS 规则(At-rules)发展状况 二、什么是CSS 规则(At-rules) 2.1、一些背景 2.2、概念 2.3、CSS规则(At-rules) 规则(At-rules…

快速解决urllib3.exceptions.MaxRetryError: HTTPSConnectionPool

正题 使用pip命令查看urllib3版本 pip list发现版本为 1.26.9 urllib3 v1.26.9此时如下报错&#xff0c;无法正常使用&#xff08;使用了代理&#xff09; urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(hostxxx.xxxxx.com, port443): Max retries exceeded wit…

充电宝租赁管理系统网站毕业设计SpringBootSSM框架开发

目录 1. 概述 2. 技术选择与介绍 3. 系统设计 4. 功能实现 5. 需求分析 1. 概述 充电宝租赁管理系统网站是一个既实用又具有挑战性的项目。 随着移动设备的普及和人们日常生活对电力的持续依赖&#xff0c;充电宝租赁服务已成为现代都市生活中的一项重要便利设施。它不仅为…