Logstash:在 Logstash 管道中的定制的 Elasticsearch update by query

news2024/10/6 22:23:39

我们知道 Elasticsearch output plugin 为我们在 Logstash 的 pipeline 中向 Elasticsearch 的写入提供了可能。我们可以使用如下的格式向 Elasticsearch 写入数据:

  elasticsearch {
    hosts => ["https://localhost:9200"]
    index => "data-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "NtC7cM-GKQWOxqamHd1R"
    ssl => true
    ca_trusted_fingerprint => "d464eed5d00a20908318b6a1de38f88daf3a867177123def4c34aa2272571aaf"
  }

在向 Elasticsearch 写入数据的时候,目前它有四种操作:

  • index:索引文档(来自 Logstash 的事件)。
  • delete:通过 id 删除文档(此操作需要一个id)
  • create:索引文档,如果索引中已存在该 id 的文档,则失败。
  • update:通过 id 更新文档。 Update 有一个特殊情况,你可以 upsert — 更新文档(如果不存在)。 请参阅 doc_as_upsert 选项。 注意:这在 Elasticsearch 1.x 中不起作用且不受支持。 请升级到 ES 2.x 或更高版本以将此功能与 Logstash 一起使用!

一个 sprintf 样式的字符串,用于根据事件的内容更改操作。 值 %{[foo]} 将使用 foo 字段进行操作。 如果 resolved action 不在 [index, delete, create, update] 中,事件将不会发送到 Elasticsearch。 相反,事件将被发送到管道的死信队列 (DLQ)(如果启用),或者将被记录并删除。

在实际的使用中,假如我们的操作不是 index,delete create 或 update 其中的一种,那么我们该怎么办呢?比如我们想根据一定的条件来更新文档,就像 update by query 那样?我们该怎么办呢?

幸运的是,Logstash 提供了一个叫做 HTTP output plugin。它可以帮我解决这个问题。

准备数据

首先,我们来创建如下的一个索引:

PUT customer/_doc/2
{
  "id": 2,
  "timestamp": "2019-08-11T17:55:56Z",
  "paymentType": "Visa",
  "name": "Darby Dacks",
  "gender": "Female",
  "ip_address": "77.72.239.47",
  "purpose": "Shoes",
  "country": "Poland",
  "age": 55,
  "offer": false 
}

我们在 Kibana 中输入上面的命令来创建一个叫做 customer 的索引。它的 id 为 2。

更新数据

接下来,我们需要按照一定的条件来更新我们的数据。比如,我们想把 paymentType 为 Visa,并且年龄大于或等于 55 岁的人的 offer 设置为 true。在 Kibana 中正常的命令是这样的:

POST customer/_update_by_query
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "paymentType.keyword": "Visa"
          }
        },
        {
          "range": {
            "age": {
              "gte": 50
            }
          }
        }
      ]
    }
  },
  "script": {
    "source": "ctx._source.offer = params.offer",
    "lang": "painless",
    "params": {
      "offer": true
    }
  }
}

我们可以对 Logsthash 做如下的 pipeline 设计:

logstash.conf

input {
  generator {
    message => '{"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}'
    count => 1
  }
}
 
filter {
    json {
        source => "message"
    }
 
    if [paymentType] == "Mastercard" {
        drop {}
    }
 
    mutate {
        remove_field => ["message", "@timestamp", "path", "host", "@version", "log", "event"]
    }
 
}
 
output {
  stdout {
    codec => rubydebug
  }
  http {
      url => "https://localhost:9200/customer/_update_by_query"
      user => "elastic"
      password => "Y+6tv9jejPl=W4IGrTD="
      http_method => "post"
      format => "message"
      content_type => "application/json"
      message => '{"query":{"bool":{"must":[{"match":{"paymentType.keyword":"%{paymentType}"}},{"range":{"age":{"gte":"%{age}"}}}]}},"script":{"source":"ctx._source.offer = params.offer","lang":"painless","params":{"offer":true}}}' 
      cacert => "/Users/liuxg/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt"
  }
}

在上面,我们在 message 中通过一个查询,匹配到 paymentType.keyword 为 Visa,并且 age 为大于等于 55 的文档,我们设置该用户为促销对象。把他的 offer 值设置为 true。这个在实际的使用中,依据自己的条件来进行配置。在上面,cacert 为我们的 Elasticsearch 的证书文件位置。具体使用,请参考文档。

我们接下来运行 Logstash 的 pipeline:

./bin/logstash -f logstash.conf

 

在上面我们可以看出来信息的输出。我们在 Kibana 中使用如下的命令来检查更新后的文档:

GET customer/_search
{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "customer",
        "_id": "2",
        "_score": 1,
        "_source": {
          "offer": true,
          "country": "Poland",
          "gender": "Female",
          "purpose": "Shoes",
          "name": "Darby Dacks",
          "id": 2,
          "ip_address": "77.72.239.47",
          "age": 55,
          "timestamp": "2019-08-11T17:55:56Z",
          "paymentType": "Visa"
        }
      }
    ]
  }
}

很显然,我们上面的 offer 值现在变为 true,而不是之前的 false。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/363077.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ROS2手写自定义点云(PointCloud2)数据并发布

目录前言实现前言 继续学习ROS2,最近把navigation2的路径规划部分学习了一遍,但是还没有进行测试,于是先把这个部分先空出来后面再总结。先写一个与避障有关系的如何自己发点云数据。 在nav2里面有一个非常重要的部分就是costmap部分&#…

Python是未来的编程语言?学Python前景如何?薪资高吗?

Python是一种强大的语言,为世界各地的开发人员提供了多种用途。根据TIOBE指数,Python的排名还在继续攀升。开发人员和技术专业人员也不断发现Python的新用途,包括数据分析和机器学习等。 Python现在有着庞大的用户基础,并且它深深…

经纬度坐标点和距离之间的转换

1.纬度相同,经度不同 在纬度相同的情况下: 经度每隔0.00001度,距离相差约1米; 每隔0.0001度,距离相差约10米; 每隔0.001度,距离相差约100米; 每隔0.01度,距离相差约1000米…

Linux 远程登录

Linux 一般作为服务器使用,而服务器一般放在机房,你不可能在机房操作你的 Linux 服务器。 这时我们就需要远程登录到Linux服务器来管理维护系统。 Linux 系统中是通过 ssh 服务实现的远程登录功能,默认 ssh 服务端口号为 22。 Window 系统…

SpringCloud+Dubbo3 = 王炸 !

前言 全链路异步化的大趋势来了 随着业务的发展,微服务应用的流量越来越大,使用到的资源也越来越多。 在微服务架构下,大量的应用都是 SpringCloud 分布式架构,这种架构总体上是全链路同步模式。 全链路同步模式不仅造成了资源…

第二章 runtime-core初始化核心流程和runtime-core更新核心流程

runtime-core初始化核心流程 1 创建app 2 进行初始化 2.1 基于组件生成虚拟节点 2.2 进行render 调用patch 根据不同的vnode类型进行不同类型的组件处理 组件 2.2.1 创建component instance对象 2.2.2 setup component 初始化props slots 各种 2.2.3 setupRenderEffect…

通过Docker部署rancher

先创建k8s集群 https://blog.csdn.net/weixin_44371237/article/details/123974335 环境准备 一台linux主机,4G内存 通过Docker部署rancher 启动rancher docker run --privileged -d --restartunless-stopped -p 80:80 -p 443:443 rancher/rancher查看本地镜像…

python基础:简单实现从网页中获取小说名单列表并存入文件中

python基础:简单实现从网页中获取小说名单列表并存入文件中1.技术储备 requests:requests是使用Apache2 licensed 许可证的HTTP库,可以用于网页数据请求 requests.get():发起网络请求的一种方式,类似的还有post、 put、 delete、…

[MySQL]基本数据类型及表的基本操作

哈喽,大家好!我是保护小周ღ,本期为大家带来的是 MySQL 数据库常用的数据类型,数据表的基本操作:创建、删除、修改表,针对修改表的结构进行了讲解,随后是如何向数据表中添加数据,浅浅…

Vulnhub_Venom

目录 一 测试 (一)信息收集 1 端口服务探测 2 目录扫描 3 前端源码信息收集 (二)漏洞发现 1 前端注释敏感信息泄露 2 CVE-2018-19422-Subrion CMS v 4.2.1任意文件上传 (三)提权 1 sudo…

4万字c++讲解+区分c和c++,不来可惜了(含代码+解析)

目录 1 C简介 1.1 起源 1.2 应用范围 1.3 C和C 2开发工具 3 基本语法 3.1 注释 3.2关键字 3.3标识符 4 数据类型 4.1基本数据类型 4.2 数据类型在不同系统中所占空间大小 4.3 typedef声明 4.4 枚举类型 5 变量 5.1 变量的声明和定义 5.2 变量的作用域 6 运算符…

面试之设计模式(简单工厂模式)

案例 在面试时,面试官让你通过面对对象语言,用Java实现计算器控制台程序,要求输入两个数和运算符号,得出结果。大家可能想到是如下: public static void main(String[] args) {Scanner scanner new Scanner(System.…

一文让你了解SpringCloud五大核心组件

🏆今日学习目标: 🍀SpringCloud五大核心组件 ✅创作者:林在闪闪发光 ⏰预计时间:30分钟 🎉个人主页:林在闪闪发光的个人主页 🍁林在闪闪发光的个人社区,欢迎你的加入: 林…

2021-08-29

服务器 主:172.17.0.2 master 备:172.17.0.3 slave1 lvs虚拟IP:172.17.0.100 #nginx下载地址 http://nginx.org/download/ 本地文件路径 1.dockerfile构建nginx FROM centos:7 ADD nginx-1.6.0.tar.gz /usr/local COPY nginx_install.sh /usr/local RUN sh …

毕业设计(1)-AFLGO的安装

AFLGO是一个模糊测试工具,在CSDN上的安装教程不多,自己在安装过程中也出现了很多教程之外的错误,最后反复安装了2天终于安装成功这里记录一下安装工程中的错误 使用的平台:Ubuntu18.04 配置: 内存:6G&…

StreamAPI

StreamAPI 最近开发用上了 Java8的StreamAPI,(咋现在才用?嗯哼,项目需要)自己也不怎么会,来总结一波吧! 别认为好抽象!!!干他就完事 一.StreamAPI介绍 就是用来处理集合的数据 其实到后面会发现和SQL的语句是差不多的~哈哈?你不信?往下面看 Stream:英文翻译叫做流 举个粟子…

华为OD机试 - 最多等和不相交连续子序列(C++) | 附带编码思路 【2023】

刷算法题之前必看 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华为 OD 清单查看地址:https://blog.csdn.net/hihell/category_12199283.html 华为OD详细说明:https://dream.blog.csdn.net/article/details/128980730 华为OD机试题…

老学长的浙大MPA现场复试经验分享

作为一名在浙大MPA项目已经毕业的考生来说,很荣幸受到杭州达立易考周老师的邀请,给大家分享下我的复试经验,因为听周老师说是这几年浙大MPA因疫情情况,已经连续几年都是线上个人复试了,而今年疫情社会面较为平稳的情况…

【LoRa模块】关键参数记录

记录lora以及lorawa关键射频参数 这里写目录标题1. LoRa LoRaWAN LPWAN 三者区分2. LoRaWAN网路架构的特点3.关键参数4.参数定义1. LoRa LoRaWAN LPWAN 三者区分 2. LoRaWAN网路架构的特点 终端点的通讯是双向的 (bi-directional)LoRaWAN 数据速率可以从 0.3 kbps 到 50 kbps扩…

7 个 JavaScript Web API 来构建你不知道的未来网站

随着技术的日新月异,为开发人员提供了令人难以置信的新工具和API。但据了解,在100 多个 API中,只有5%被开发人员积极使用。让我们来看看一些有用的Web API,它们可以帮助您将网站推向月球!🌕🚀1.…