使用 Logstash 丰富你的 Elasticsearch 文档

news2025/1/11 21:59:11

作者:来自 Elastic David Pilato

我们在上一篇文章中看到,我们可以使用摄取管道中的 Elasticsearch Enrich Processor 在 Elasticsearch® 中进行数据丰富。 但有时,你需要执行更复杂的任务,或者你的数据源不是 Elasticsearch,而是另一个源。 或者,你可能希望存储在 Elasticsearch 和第三方系统中,在这种情况下,将管道的执行转移到 Logstash® 很有意义。

使用 Elasticsearch 丰富 Elasticsearch 数据

使用 Logstash,使用类似于以下的管道,这非常容易:

input {
  # Read all documents from Elasticsearch
  elasticsearch {
    hosts => ["${ELASTICSEARCH_URL}"]
    user => "elastic"
    password => "${ELASTIC_PASSWORD}"
    index => "kibana_sample_data_logs"
    docinfo => true
    ecs_compatibility => "disabled"
  }
}

filter {
  # Enrich every document with Elasticsearch
  elasticsearch {
    hosts => ["${ELASTICSEARCH_URL}"]
    user => "elastic"
    password => "${ELASTIC_PASSWORD}"
    index => "vip"
    query => "ip:%{[clientip]}"
    sort => "ip:desc"
    fields => {
      "[name]" => "[name]"
      "[vip]" => "[vip]"
    }
  }
  mutate { 
    remove_field => ["@version", "@timestamp"] 
  }
}

output {
  if [name] {
    # Write all modified documents to Elasticsearch
    elasticsearch {
      manage_template => false
      hosts => ["${ELASTICSEARCH_URL}"]
      user => "elastic"
      password => "${ELASTIC_PASSWORD}"
      index => "%{[@metadata][_index]}"
      document_id => "%{[@metadata][_id]}"
    }
  }
}

总共,我们有 14074 个事件需要解析。 虽然不是很多,但对于这个演示来说已经足够了。 这是一个示例事件:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  }
}

正如我们在上一篇文章中看到的,vip 索引包含有关我们客户的信息:

{ 
  "ip" : "30.156.16.164", 
  "vip": true, 
  "name": "David P" 
}

我们可以通过以下方式运行管道:

docker run \
  --name=logstash \
  --rm -it \
  -v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \
  -e XPACK_MONITORING_ENABLED=false \
  -e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \
  -e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \
  docker.elastic.co/logstash/logstash:8.12.0

丰富的文档现在看起来像这样:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  },
  "vip": true,
  "name": "David P"
}

实际上很简单,但有一个问题:速度很慢。 通过网络进行查找,尽管 Elasticsearch 速度极快,但仍然会减慢整个管道的速度。

使用静态 JDBC 过滤器

我最近在 ParisJUG 遇到了 Laurent,他来自令人惊叹的 Elastic Consulting 团队,我们讨论了这个问题。 他告诉我,他的一位客户必须面对这个问题。 他建议改用 Logstash 中的 Elasticsearch 缓存。

问题是:Logstash 中没有这样的过滤器缓存插件。 他找到了一种非常聪明的方法来解决该问题,即利用静态 JDBC 过滤器插件和 Elasticsearch JDBC 驱动程序。

请注意,这需要拥有白金许可证(或试用版)。

添加 Elasticsearch JDBC 驱动程序

我们首先需要将 JDBC 驱动程序添加到 Logstash 实例中。

mdir -p logstash-config/lib
wget https://artifacts.elastic.co/maven/org/elasticsearch/plugin/x-pack-sql-jdbc/8.12.0/x-pack-sql-jdbc-8.12.0.jar
mv x-pack-sql-jdbc-8.12.0.jar logstash-config/lib

我们只需要与 Logstash docker 实例共享此目录:

time docker run \
  --name=logstash \
  --rm -it \
  -v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \
  -v $(pwd)/logstash-config/lib/:/tmp/lib/ \
  -e XPACK_MONITORING_ENABLED=false \
  -e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \
  -e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \
  docker.elastic.co/logstash/logstash:8.12.0

更新管道

input 部分不变。 但现在,我们要在内存中创建一个名为 vip 的临时表(为了保持一致性)。 该表结构是使用 local_db_objects 参数定义的:

jdbc_static {
  local_db_objects => [ {
    name => "vip"
    index_columns => ["ip"]
    columns => [
      ["name", "VARCHAR(255)"],
      ["vip", "BOOLEAN"],
      ["ip", "VARCHAR(64)"]
    ]
  } ]
}

当 jdbc_static 启动时,我们要首先从 Elasticsearch vip索引中读取所有数据集。 这是在 loaders 选项中完成的:

jdbc_static {
  loaders => [ {
    query => "select name, vip, ip from vip"
    local_table => "vip"
  } ]
  jdbc_user => "elastic"
  jdbc_password => "${ELASTIC_PASSWORD}"
  jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"
  jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"
  jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
}

每次我们需要进行查找时,我们都希望使用以下语句来执行它:

SELECT name, vip FROM vip WHERE ip = "THE_IP"

这可以使用 local_lookups 参数定义:

jdbc_static {
  local_lookups => [ {
    query => "SELECT name, vip FROM vip WHERE ip = :ip"
    parameters => { "ip" => "clientip" }
    target => "vip"
  } ]
}

如果没有找到数据,我们可以使用 default_hash 选项提供默认值:

jdbc_static {
  local_lookups => [ {
    query => "SELECT name, vip FROM vip WHERE ip = :ip"
    parameters => { "ip" => "clientip" }
    target => "vip" 
    default_hash => {
      name => nil
      vip => false
    }
  } ]
}

最后,这将在事件中生成 vip.name 和 vip.vip 字段。

我们现在可以定义我们想要对这些临时字段执行的操作:

jdbc_static {
  add_field => { name => "%{[vip][0][name]}" }
  add_field => { vip => "%{[vip][0][vip]}" }
  remove_field => ["vip"]
}

这给出了以下过滤器:

filter {
  # Enrich every document with Elasticsearch via static JDBC
  jdbc_static {
    loaders => [ {
      query => "select name, vip, ip from vip"
      local_table => "vip"
    } ]
    local_db_objects => [ {
      name => "vip"
      index_columns => ["ip"]
      columns => [
        ["name", "VARCHAR(255)"],
        ["vip", "BOOLEAN"],
        ["ip", "VARCHAR(64)"]
      ]
    } ]
    local_lookups => [ {
      query => "SELECT name, vip FROM vip WHERE ip = :ip"
      parameters => { "ip" => "clientip" }
      target => "vip" 
      default_hash => {
        name => nil
        vip => false
      }
    } ]
    add_field => { name => "%{[vip][0][name]}" }
    add_field => { vip => "%{[vip][0][vip]}" }
    remove_field => ["vip"]
    jdbc_user => "elastic"
    jdbc_password => "${ELASTIC_PASSWORD}"
    jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"
    jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"
    jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
  }
  mutate { 
    remove_field => ["@version", "@timestamp"] 
  }
}

将修改后的文档写入Elasticsearch

在第一个管道中,我们测试事件中是否确实存在名称字段:

if [name] {
  # Index to Elasticsearch
}

我们仍然可以使用类似的东西,但因为我们提供了默认值,以防在 Elasticsearch vip 索引中找不到 ip,所以现在它会在标签表中生成一个新的 _jdbcstaticdefaultsused 标签。

我们可以用它来知道我们是否发现了某些东西,如果是前者,则将我们的数据发送到 Elasticsearch:

output {
  if "_jdbcstaticdefaultsused" not in [tags] {
    # Write all the modified documents to Elasticsearch
    elasticsearch {
      manage_template => false
      hosts => ["${ELASTICSEARCH_URL}"]
      user => "elastic"
      password => "${ELASTIC_PASSWORD}"
      index => "%{[@metadata][_index]}"
      document_id => "%{[@metadata][_id]}"
    }
  }
}

更快吗?

因此,当我们在这个小数据集上运行测试时,我们可以看到,使用 Elasticsearch 过滤器方法,需要两分钟多一点的时间来丰富我们的数据集:

real    2m3.146s
user    0m0.077s
sys     0m0.042s

当使用 JDBC 静态过滤器方法运行管道时,现在只需不到一分钟:

real    0m48.575s
user    0m0.064s
sys     0m0.039s

正如我们所看到的,我们显着减少了该丰富管道的执行时间(增益约为 60%)。

如果你有一个可以轻松放入 Logstash JVM 内存的小型 Elasticsearch 索引,你可以尝试此策略(或类似的策略)。 如果你有数亿个文档,你仍然应该使用 Elasticsearch Filter Plugin。

结论

在这篇文章中,我们了解了当我们需要在 Elasticsearch 中执行一些查找时,如何使用 JDBC 静态过滤器插件来加速数据丰富管道。 在下一篇文章中,我们将了解如何使用 Elastic Agent 在边缘进行类似的丰富。

本文中描述的任何特性或功能的发布和时间安排均由 Elastic 自行决定。 当前不可用的任何特性或功能可能无法按时交付或根本无法交付

更多阅读:

  • Logstash:Jdbc static filter plugin 介绍

  • Logstash:运用 jdbc_streaming 来丰富我们的数据

原文:Enrich your Elasticsearch documents with Logstash | Elastic Blog

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1499007.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android14音频进阶:AIDL数据转换关键图解(五十九)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒体系统工程师系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只…

光线追踪12 - Defocus Blur(虚焦模糊)

现在我们的最后一个特性是虚化模糊。注意,摄影师通常称之为景深,所以请确保在光线追踪的朋友中只使用虚化模糊这个术语。 真实相机具有虚化模糊是因为它们需要一个大孔(而不仅仅是针孔)来收集光线。一个大孔会导致所有物体失去焦点…

[HackMyVM]Quick 2

kali:192.168.56.104 主机发现 arp-scan -l # arp-scan -l Interface: eth0, type: EN10MB, MAC: 00:0c:29:d2:e0:49, IPv4: 192.168.56.104 Starting arp-scan 1.10.0 with 256 hosts (https://github.com/royhills/arp-scan) 192.168.56.1 0a:00:27:00:00:05 (Un…

MySQl基础入门⑥

上一章知识内容 1.数据类型的属性 2.MySql的约束 mysql的约束时指对数据表中数据的一种约束行为,约束主要完成对数据的检验,如果有互相依赖数据,保证该数据不被删除。它能够帮助数据库管理员更好地管理数据库,并且能够确保数据库…

算法打卡day11|栈与队列篇03|Leetcode 239. 滑动窗口最大值、347.前 K 个高频元素

小顶堆和大顶堆 小顶堆(Min Heap)和大顶堆(Max Heap)是两种特殊的完全二叉树,它们遵循特定的堆属性,即父节点的值总是小于或等于(小顶堆)或者大于或等于(大顶堆&#xf…

微信小程序开发系列(二十二)·wxml语法·双向数据绑定model:的用法

目录 1. 单向数据绑定 2. 双向数据绑定 3. 代码 在 WXML 中&#xff0c;普通属性的绑定是单向的&#xff0c;例如&#xff1a;<input value"((value))"/> 如果希望用户输入数据的同时改变 data 中的数据&#xff0c;可以借助简易双向绑定机制。在对应属性…

文心一言 VS 讯飞星火 VS chatgpt (210)-- 算法导论16.1 1题

一、根据递归式(16.2)为活动选择问题设计一个动态规划算法。算法应该按前文定义计算最大兼容活动集的大小 c[i,j]并生成最大集本身。假定输入的活动已按公式(16.1)排好序。比较你的算法和GREEDY-ACTIVITY-SELECTOR的运行时间。如何要写代码&#xff0c;请用go语言。 文心一言&…

阿里云和腾讯云区别价格表,云服务器费用对比2024年最新

2024年阿里云服务器和腾讯云服务器价格战已经打响&#xff0c;阿里云服务器优惠61元一年起&#xff0c;腾讯云服务器61元一年&#xff0c;2核2G3M、2核4G、4核8G、4核16G、8核16G、16核32G、16核64G等配置价格对比&#xff0c;阿腾云atengyun.com整理阿里云和腾讯云服务器详细配…

利用tree命令自动保存文件层级结构

tree命令的使用 为了将上图左侧的文件目录&#xff0c;生成上图右侧中的文件夹结构列表&#xff0c;保存在txt中&#xff0c;使用了如下cmd命令&#xff1a; C:\armadillo-12.8.0>tree .>list.txt以上tree命令分为3部分&#xff1a; tree 命令. 在当前目录>list.tx…

ChatGPT:人工智能的革命与未来

引言 随着人工智能技术的飞速发展&#xff0c;ChatGPT作为OpenAI推出的一款语言模型&#xff0c;已经引起了广泛的关注和讨论。它不仅改变了我们与机器交流的方式&#xff0c;还为众多行业的发展带来了革命性的影响。本文将深入探讨ChatGPT的技术原理、应用场景以及它对未来的…

一些硬件知识(六)

防反接设计&#xff1a; 同步电路和异步电路的区别: 同步电路:存储电路中所有触发器的时钟输入端都接同一个时钟脉冲源&#xff0c;因而所有触发器的状态的变化都与所加的时钟脉冲信号同步。 异步电路:电路没有统一的时钟&#xff0c;有些触发器的时钟输入端与时钟脉冲源相连…

【HarmonyOS】ArkTS-箭头函数

箭头函数 箭头函数是 比普通函数 更简洁 的一种函数写法 () > {}() > {// 函数体 }let 函数名 () > {// 函数体 }let 函数名 () > {// 函数体 } 函数名(实参1, 实参2)let 函数名 (形参1: 类型, 形参2: 类型) > {// 函数体 } 函数名(实参1, 实参2)let 函数名 …

【嵌入式】嵌入式系统稳定性建设:静态代码扫描的稳定性提升术

1. 概述 在嵌入式系统开发过程中&#xff0c;代码的稳定性和可靠性至关重要。静态代码扫描工具作为一种自动化的代码质量检查手段&#xff0c;能够帮助开发者在编译前发现潜在的缺陷和错误&#xff0c;从而增强系统的稳定性。本文将介绍如何在嵌入式C/C开发中使用静态代码扫描…

嵌入式学习第二十五天!(网络的概念、UDP编程)

网络&#xff1a; 可以用来&#xff1a;数据传输、数据共享 1. 网络协议模型&#xff1a; 1. OSI协议模型&#xff1a; 应用层实际收发的数据表示层发送的数据是否加密会话层是否建立会话连接传输层数据传输的方式&#xff08;数据包&#xff0c;流式&#xff09;网络层数据的…

Day22:安全开发-PHP应用留言板功能超全局变量数据库操作第三方插件引用

目录 开发环境 数据导入-mysql架构&库表列 数据库操作-mysqli函数&增删改查 数据接收输出-html混编&超全局变量 第三方插件引用-js传参&函数对象调用 完整源码 思维导图 PHP知识点&#xff1a; 功能&#xff1a;新闻列表&#xff0c;会员中心&#xff0…

Python爬虫——scrapy-3

目录 免责声明 任务 文件简介 爬取当当网内容单管道 pipelines.py items.py setting dang.py 当当网多管道下载图片 pipelines.py settings 当当网多页下载 dang.py pielines.py settings items.py 总结 免责声明 该文章用于学习&#xff0c;无任何商业用途 …

深度学习-2.3损失函数

文章目录 损失函数深度学习优化思想回归&#xff1a;误差平方和SSE二分类交叉熵损失函数1. 极大似然函数估计求解二分类交叉熵函数2.用tensor实现二分类交叉熵损失 多分类交叉熵损失函数1.由二分类推广到多分类2.用PyTorch实现多分类交叉熵损失 损失函数 在之前的文章中&#…

OpenAI劲敌吹新风! Claude 3正式发布,Claude3使用指南

Claude 3是什么&#xff1f; 是Anthropic 实验室近期推出的 Claude 3 大规模语言模型&#xff08;Large Language Model&#xff0c;LLM&#xff09;系列&#xff0c;代表了人工智能技术的一个显著飞跃。 该系列包括三个不同定位的子模型&#xff1a;Claude 3 Haiku、Claude 3…

Chapter20-Ideal gases-CIE课本要点摘录、总结(编辑中)

20.1 Particles of a gas Brownian motion Fast modules 速率的数值大概了解下&#xff1a; average speed of the molecules:400m/s speed of sound:approximately 330m/s at STP&#xff08;standard temperature and pressure&#xff09; Standard Temperature and Pres…

如何使用WinSCP结合Cpolar实现公网远程访问内网Linux服务器

文章目录 1. 简介2. 软件下载安装&#xff1a;3. SSH链接服务器4. WinSCP使用公网TCP地址链接本地服务器5. WinSCP使用固定公网TCP地址访问服务器 1. 简介 ​ Winscp是一个支持SSH(Secure SHell)的可视化SCP(Secure Copy)文件传输软件&#xff0c;它的主要功能是在本地与远程计…