在 Elasticsearch 中丰富你的 Elasticsearch 文档

news2024/12/26 13:40:51

作者:David Pilato

对于 Elasticsearch®,我们知道联接应该在 “索引时” 而不是查询时完成。 本博文是一系列三篇博文的开始,因为我们可以在 Elastic® 生态系统中采取多种方法。 我们将介绍如何在 Elasticsearch 中做到这一点。 下一篇博文将介绍如何使用集中式组件 Logstash 来实现这一点,上一篇博文将展示如何使用 Elastic Agent/Beats 在边缘实现这一点。

举一个简单的例子,假设我们是一个电子商务网站,在 kibana_sample_data_logs 中收集日志:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  }
}

请注意,你可以通过单击 “Sample web blogs” 框中的 “Add data”按钮,使用 Kibana® 示例数据集轻松导入此数据集:

我们还有一个 VIP 索引,其中包含有关我们客户的信息:

{ 
  "ip" : "30.156.16.164", 
  "vip": true, 
  "name": "David P" 
}

要导入此示例数据集,我们只需运行:

DELETE /vip
PUT /vip
{
  "mappings": {
    "properties": {
      "ip": { "type": "keyword" },
      "name": { "type": "text" },
      "vip": { "type": "boolean" }
    }
  }
}
POST /vip/_bulk
{ "index" : { } }
{ "ip" : "30.156.16.164", "vip": true, "name": "David P" }
{ "index" : { } }
{ "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" }
{ "index" : { } }
{ "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" }
{ "index" : { } }
{ "ip" : "236.212.255.77", "vip": true, "name": "Carly R" }
{ "index" : { } }
{ "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" }
{ "index" : { } }
{ "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" }
{ "index" : { } }
{ "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" }
{ "index" : { } }
{ "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }

要执行 “joins at index time”,我们需要丰富我们的数据集以获得如下所示的最终日志:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  },
  "vip": true, 
  "name": "David P" 
}

你可以使用摄取管道中的 Elasticsearch Enrich Processor 开箱即用地执行此操作。 让我们看看如何做到这一点。

在 Elasticsearch 中丰富 Elasticsearch 数据

摄取管道 - ingest pipeline

让我们首先使用摄取管道。

我们可以从一个空的开始,我们将用它来模拟我们想要的行为。 我们不需要原始数据集的完整字段集,因此我们对其进行了简化:

POST /_ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "clientip": "30.156.16.164"
      }
    }
  ],
  "pipeline": {
    "processors": []
  }
}

我们现在需要向我们的管道添加一个 enrich processor。 但为此,我们需要首先创建一个丰富的策略 (enrich policy):

PUT /_enrich/policy/vip-policy
{
  "match": {
    "indices": "vip",
    "match_field": "ip",
    "enrich_fields": ["name", "vip"]
  }
}

创建丰富策略后,我们可以使用执行丰富策略 API 来执行它:

PUT /_enrich/policy/vip-policy/_execute

我们现在可以模拟它:

POST /_ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "clientip": "30.156.16.164"
      }
    }
  ],
  "pipeline": {
    "processors": [{
      "enrich": {
        "policy_name": "vip-policy",
        "field": "clientip",
        "target_field": "enriched"
      }
    }]
  }
}

这给出如下的响应:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "enriched": {
            "name": "David P",
            "vip": true,
            "ip": "30.156.16.164"
          },
          "clientip": "30.156.16.164"
        },
        "_ingest": {
          "timestamp": "2023-04-06T17:14:29.127569953Z"
        }
      }
    }
  ]
}

我们只需清理一下数据即可获得我们期望的结构:

POST /_ingest/pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "clientip": "30.156.16.164"
      }
    }
  ],
  "pipeline": {
    "processors": [{
      "enrich": {
        "policy_name": "vip-policy",
        "field": "clientip",
        "target_field": "enriched"
      }
    },{
      "rename": {
        "field": "enriched.name",
        "target_field": "name"
      }
    },{
      "rename": {
        "field": "enriched.vip",
        "target_field": "vip"
      }
    },{
      "remove": {
        "field": "enriched"
      }
    }
    ]
  }
}

现在给出了预期的结果:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_id": "_id",
        "_version": "-3",
        "_source": {
          "name": "David P",
          "vip": true,
          "clientip": "30.156.16.164"
        },
        "_ingest": {
          "timestamp": "2023-04-06T17:16:08.175186282Z"
        }
      }
    }
  ]
}

我们现在可以存储最终的管道:

PUT /_ingest/pipeline/vip
{
  "processors": [{
    "enrich": {
      "policy_name": "vip-policy",
      "field": "clientip",
      "target_field": "enriched"
    }
  },{
    "rename": {
      "field": "enriched.name",
      "target_field": "name",
      "ignore_failure": true
    }
  },{
    "rename": {
      "field": "enriched.vip",
      "target_field": "vip",
      "ignore_failure": true
    }
  },{
    "remove": {
      "field": "enriched",
      "ignore_failure": true
    }
  }
  ]
}

请注意,我们通过添加一些 ignore_failure 指令对其进行了一些更改,因为我们可能在 vip 索引中找不到任何相关数据。

我们可以使用与源索引相同的映射来创建目标索引:

# Get the source mapping
GET /kibana_sample_data_logs/_mapping

# Create the destination index
PUT /kibana_sample_data_logs_new
{
  // Paste the source mappings structure
  "mappings": {
    "properties": {
      // And add the properties we are adding
      "name": {
        "type": "keyword"
      },
      "vip": {
        "type": "boolean"
      }
    }
  }
}

并调用重建索引 API:

POST _reindex
{
  "source": {
    "index": "kibana_sample_data_logs"
  },
  "dest": {
    "index": "kibana_sample_data_logs_new",
    "pipeline": "vip"
  }
}

让我们检查一下工作是否已完成:

GET /kibana_sample_data_logs_new/_search?filter_path=aggregations.by_name.buckets
{
  "size": 0,
  "aggs": {
    "by_name": {
      "terms": {
        "field": "name"
      }
    }
  }
}

上述命令给出如下类似的响应:

{
  "aggregations": {
    "by_name": {
      "buckets": [
        {
          "key": "David P",
          "doc_count": 100
        },
        {
          "key": "Philipp K",
          "doc_count": 29
        },
        {
          "key": "Adrienne V",
          "doc_count": 26
        },
        {
          "key": "Carly R",
          "doc_count": 26
        },
        {
          "key": "Iulia F",
          "doc_count": 25
        },
        {
          "key": "Naoise R",
          "doc_count": 25
        },
        {
          "key": "Jelena Z",
          "doc_count": 24
        },
        {
          "key": "Matt R",
          "doc_count": 24
        }
      ]
    }
  }
}

运行时字段丰富

丰富数据的另一种方法是在搜索时而不是索引时执行此操作。 这与本文的第一句话相悖,但有时,你需要进行一些权衡。 在这里,我们想用搜索速度来交换灵活性。

运行时字段功能 (runtime field feature) 允许丰富搜索响应对象,但不能用于查询或聚合数据。 此功能的一个简单示例:

GET kibana_sample_data_logs/_search?filter_path=hits.hits.fields
{
  "size": 1,
  "query": {
    "match": {
      "clientip": "30.156.16.164"
    }
  }, 
  "runtime_mappings": {
    "enriched": {
        "type": "lookup", 
        "target_index": "vip", 
        "input_field": "clientip", 
        "target_field": "ip", 
        "fetch_fields": ["name", "vip"] 
    }
  },
  "fields": [
    "clientip",
    "enriched"
  ],
  "_source": false
}

上述命令给出如下的响应:

{
  "hits": {
    "hits": [
      {
        "fields": {
          "enriched": [
            {
              "name": [
                "David P"
              ],
              "vip": [
                true
              ]
            }
          ],
          "clientip": [
            "30.156.16.164"
          ]
        }
      }
    ]
  }
}

请注意,这也可以添加为映射的一部分:

PUT kibana_sample_data_logs/_mappings
{
  "runtime": {
    "enriched": {
      "type": "lookup", 
      "target_index": "vip", 
      "input_field": "clientip", 
      "target_field": "ip", 
      "fetch_fields": ["name", "vip"] 
    }
  }
}

GET kibana_sample_data_logs/_search
{
  "size": 1,
  "query": {
    "match": {
      "clientip": "30.156.16.164"
    }
  }, 
  "fields": [
    "clientip",
    "enriched"
  ]
}

但是,如果你希望能够搜索或聚合这些字段,则需要在搜索时实际发出 (emit) 一些内容。

请注意,我们不能使用此方法在另一个索引中进行查找。 因此,因为且仅仅因为列表的长度很小,我们可以使用脚本来动态进行 “丰富”:

PUT kibana_sample_data_logs/_mappings
{
  "runtime": {
    "name": {
      "type": "keyword",
      "script": {
        "source": 
        """
        def name=params.name;
        for (int i=0; i< params.lookup.length; i++) {
          if (params.lookup[i].ip == doc['clientip'].value) {
            emit(params.lookup[i].name);
            break;
          }
        }
        """,
        "lang": "painless",
        "params": {
          "name": "David P",
          "lookup": [
            { "ip" : "30.156.16.164", "vip": true, "name": "David P" },
            { "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" },
            { "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" },
            { "ip" : "236.212.255.77", "vip": true, "name": "Carly R" },
            { "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" },
            { "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" },
            { "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" },
            { "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }
          ]
        }
      }
    },
    "vip": {
      "type": "boolean",
      "script": {
        "source": 
        """
        def name=params.name;
        for (int i=0; i< params.lookup.length; i++) {
          if (params.lookup[i].ip == doc['clientip'].value) {
            emit(params.lookup[i].vip);
            break;
          }
        }
        """,
        "lang": "painless",
        "params": {
          "name": "David P",
          "lookup": [
            { "ip" : "30.156.16.164", "vip": true, "name": "David P" },
            { "ip" : "164.85.94.243", "vip": true, "name": "Philipp K" },
            { "ip" : "50.184.59.162", "vip": true, "name": "Adrienne V" },
            { "ip" : "236.212.255.77", "vip": true, "name": "Carly R" },
            { "ip" : "16.241.165.21", "vip": true, "name": "Naoise R" },
            { "ip" : "246.106.125.113", "vip": true, "name": "Iulia F" },
            { "ip" : "81.194.200.150", "vip": true, "name": "Jelena Z" },
            { "ip" : "111.237.144.54", "vip": true, "name": "Matt R" }
          ]
        }
      }
    }
  }
}

我们可以再次聚合这些运行时字段:

GET /kibana_sample_data_logs/_search?filter_path=aggregations.by_name.buckets
{
  "size": 0,
  "aggs": {
    "by_name": {
      "terms": {
        "field": "name"
      }
    }
  }
}

这给出了与我们之前看到的相同的结果,但当然有点慢:

{
  "aggregations": {
    "by_name": {
      "buckets": [
        {
          "key": "David P",
          "doc_count": 100
        },
        {
          "key": "Philipp K",
          "doc_count": 29
        },
        {
          "key": "Adrienne V",
          "doc_count": 26
        },
        {
          "key": "Carly R",
          "doc_count": 26
        },
        {
          "key": "Iulia F",
          "doc_count": 25
        },
        {
          "key": "Naoise R",
          "doc_count": 25
        },
        {
          "key": "Jelena Z",
          "doc_count": 24
        },
        {
          "key": "Matt R",
          "doc_count": 24
        }
      ]
    }
  }
}

同样,此方法不适用于大索引,因此如我们在第一部分中看到的那样重新索引数据将是首选方法。

本文中描述的任何特性或功能的发布和时间安排均由 Elastic 自行决定。 当前不可用的任何特性或功能可能无法按时交付或根本无法交付。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1144009.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

node实战——后端koa结合jwt连接mysql实现权限登录(node后端就业储备知识)

文章目录 ⭐前言⭐ 环境准备⭐ 实现过程⭐ mysql 配置⭐路由前的准备⭐账号注册生成token⭐账号登录生成token⭐token登录 ⭐ 自测过程截图⭐总结⭐结束 ⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享关于node实战——后端koa项目配置jwt实现登录注册&#xff08;n…

1230. K倍区间(前缀和)

题目&#xff1a; 1230. K倍区间 - AcWing题库 突破口&#xff1a; 区间遍历枚举一般先枚举右端点&#xff0c;再枚举左端点&#xff0c;注意由右端点限制左端点 思路&#xff1a;1.暴力 #include<cstdio> #include<iostream> #include<algorithm> #incl…

Win 7 VPN拨号错误734.

正在验证用户名和密码错误 734: PPP 链接控制协议终止。 如果您继续收到错误信息&#xff0c;您可以启用日志记录来做分析。 其他电脑拨号都成功.就这个电脑不行.找了很久,修改之后报好成功 ************************** 找到是跟下面两个两个注册表信息有关,尤其是第一个我…

基于Pytest+Requests+Allure实现接口自动化测试!

一、整体结构 框架组成&#xff1a;pytestrequestsallure设计模式&#xff1a; 关键字驱动项目结构&#xff1a; 工具层&#xff1a;api_keyword/参数层&#xff1a;params/用例层&#xff1a;case/数据驱动&#xff1a;data_driver/数据层&#xff1a;data/逻辑层&#xff1a…

75 寻找旋转排序数组中的最小值

寻找旋转排序数组中的最小值 题解1 一次循环(正确理解题意)题解2 二分 已知一个长度为 n 的数组&#xff0c;预先按照 升序排列&#xff0c;经由 1 到 n 次 旋转 后&#xff0c;得到输入数组。例如&#xff0c;原数组 nums [0,1,2,4,5,6,7] 在变化后可能得到&#xff1a; …

刚刚:2023阿里云双十一优惠活动上线了!

2023阿里云双十一优惠活动「金秋云创季」开始啦&#xff0c;10月27日到10月31日可以领满减优惠&#xff0c;到11月1日和11月11日之间可以购买云服务器等产品&#xff0c;11.12到11.30日赢最高百万上云抵扣金&#xff0c;阿里云百科aliyunbaike.com分享2023阿里云双十一优惠活动…

Xray的简单使用

xray 简介 xray 是一款功能强大的安全评估工具&#xff0c;由多名经验丰富的一线安全从业者呕心打造而成&#xff0c;主要特性有: 检测速度快。发包速度快; 漏洞检测算法效率高。支持范围广。大至 OWASP Top 10 通用漏洞检测&#xff0c;小至各种 CMS 框架 POC&#xff0c;均…

前端实现打印功能Print.js

前端实现打印的方式有很多种&#xff0c;本人恰好经历了几个项目都涉及到了前端打印&#xff0c;目前较为推荐Print.js来实现前端打印 话不多说&#xff0c;直接上教程 官方链接: Print.js官网 在项目中如何下载Print.js 使用npm下载&#xff1a;npm install print-js --sav…

python 从mssql取出datetime2类型之后格式化

我mssql是datetime2类型&#xff0c;用df取出之后发现是个纳秒的int&#xff08;1698419713000000000 这种&#xff09; 所以格式化的话就需要变成秒为单位&#xff0c;他们之间是10的9次方倍。所以先除以1e9之后用datetime.datetime.fromtimestamp()转换之后再format就行了 l…

CCF CSP认证历年题目自练 Day39

题目 试题编号&#xff1a; 201312-5 试题名称&#xff1a; I’m stuck! 时间限制&#xff1a; 1.0s 内存限制&#xff1a; 256.0MB 问题描述&#xff1a; 问题描述   给定一个R行C列的地图&#xff0c;地图的每一个方格可能是’#‘, ‘’, ‘-’, ‘|’, ‘.’, ‘S’, ‘…

系列十九、循环依赖(一)

一、概述 循环依赖是指&#xff0c;多个bean之间相互依赖&#xff0c;形成了一个闭环。比如A依赖于B、B依赖于C、C依赖于A&#xff0c;形成了一个圈。 二、两种方式对循环依赖的影响 2.1、官网说明 2.2、结论 我们AB循环依赖问题只要A的注入方式是setter、并且是singleton&am…

Android14 WMS启动流程

一 概述 本文Android14源代码可参考&#xff1a;Search 在 Android 系统中&#xff0c;从设计的角度来看&#xff0c;窗口管理系统是基于 C/S 模式的。整个窗口系统分为服务端和客户端两大部分&#xff0c;客户端负责请求创建窗口和使用窗口&#xff0c;服务端完成窗口的维护…

智慧工地管理系统源码-数字孪生智慧工地可视化解决方案

一、智慧工地建设背景 我国经济发展正从传统粗放式的高速增长阶段&#xff0c;进入高效率、低成本、可持续的中高速增长阶段。随着现代建筑的复杂度和体量等不断增加&#xff0c;施工现场管理的内容越来越多&#xff0c;管理的技术难度和要求在不断提高。传统的施工现场管理模…

用超声波清洗机洗眼镜的有哪些?清洁力强的超声波清洗机不能错过

超声波清洗机在清洗眼镜方面表现出色&#xff0c;其强大的清洁能力可以彻底清除眼镜上的污垢和细菌。这种清洗方式被认为是一种高效且卫生的清洁方式&#xff0c;因为它利用高频振动和微射流打击力来清除污垢和细菌&#xff0c;而不是使用化学物质。对于那些长时间佩戴眼镜或者…

windows + ubuntu + vscode开发环境配置安装

一、卸载WSL/WSL2 如果安装了windows子系统的朋友&#xff0c;可以选择继续使用。或者提前卸载WSL&#xff0c;再选择安装虚拟机。虚拟机占用内存较大&#xff0c;WSL可能对于开发的一些需求还有欠缺。根据自己的实际情况进行选择。 WIN10/11安装WSL(请参考官方资料&#xff0c…

TSINGSEE青犀智慧仓储可视化视频智能监管系统方案

一、背景与需求 对于现在很多大型工厂或者物流基地来说&#xff0c;仓库无疑是存放物品的重点场所。仓储存放着大量货物&#xff0c;同时存在大量的辅助设备&#xff0c;需要进行全方位的监管&#xff0c;以避免发生安全事故&#xff0c;造成财产损失。原有的人工巡检方式已无…

Ubuntu安装AdGuardhome(树莓派安装AdGuardhome)

Ubuntu安装AdGuardhome&树莓派安装AdGuardhome 1.什么是AdGuardhome2.设备情况3.3.1.下载AdGuardhome3.2.解压3.3.安装3.4.仪表盘配置3.5.dns黑名单添加3.6.DNS白名单设置3.7常规设置3.8. dns设置3.9.加密设置 4.客户端设置 1.什么是AdGuardhome AdGuard Home 是网络范围的…

传奇服务器配置如何搭建

传奇服务器在中国页游发展中作为一个经典制作吸引了很多玩家的喜欢&#xff0c;很多人也想搭建一个属于自己团队的传奇游戏服务器&#xff0c;今天就让小编来讲一讲该如何搭建吧&#xff01; 首先是硬件配置&#xff0c;传奇游戏的服务器需要较高的硬件配置&#xff0c;选择双路…

Kubernetes数据卷Volume和数据卷分类(emptyDir、nfs、hostPath、ConfigMap)详解

Kubernetes数据卷Volume和数据卷分类详解 数据卷概述 Kubernetes Volume&#xff08;数据卷&#xff09;主要解决了如下两方面问题&#xff1a; 数据持久性&#xff1a;通常情况下&#xff0c;容器运行起来之后&#xff0c;写入到其文件系统的文件暂时性的。当容器崩溃后&am…

window系统修改rabbitmq 默认端口

安装完rabbitmq之后&#xff0c;默认的client端口是5672, 控制台访问端口是15672&#xff0c;rabbitmq管理工具启动之后在浏览器中输入地址&#xff1a; ​ ​http://localhost:15672/​​​ 就可以访问后台​ ​​​&#xff0c; 默认管理员账号&#xff1a;guest 密码&#x…