Logstash笔记

news2025/1/15 16:49:15

目录​​​​​​​

一、简介

二、单个输入和输出插件

三、多个输入和输出插件

四、pipeline结构

五、队列和数据弹性

六、内存队列

七、持久化队列

八、死信队列 (DLQ)

九、输入插件

1)、beats

2)、dead_letter_queue

 3)、elasticsearch 

 4)、file

5)、redis

十、输出插件

1)、elasticsearch

2)、file

 3)、kafka

4)、redis

十一、过滤插件

1)、age

2)、bytes

3)、date

4)、grok


一、简介

Logstash 是一个具有实时管道功能的开源数据收集引擎。 Logstash 可以动态地统一来自不同来源的数据,并将数据规范化到您选择的目的地。

Logstash 管道有两个必需元素(输入和输出)以及一个可选元素(过滤器)。输入插件使用来自源的数据,过滤器插件根据您的指定修改数据,输出插件将数据写入目标。
例如:

image.png


最基本的 Logstash 管道:

cd logstash-8.12.2
# 示例中的管道从标准输入 stdin 获取输入,并以结构化格式将该输入移动到标准输出 stdout。
bin/logstash -e 'input { stdin { } } output { stdout {} }'

二、单个输入和输出插件

在创建 Logstash 管道之前,您将配置 Filebeat 以将日志行发送到 Logstash。 Filebeat 客户端是一个轻量级、资源友好型工具,它从服务器上的文件收集日志并将这些日志转发到 Logstash 实例进行处理。 Filebeat 专为可靠性和低延迟而设计。 Filebeat 在主机上的资源占用很少,Beats 输入插件最大限度地减少了 Logstash 实例的资源需求。

安装Filebeat后,需要对其进行配置。打开位于 Filebeat 安装目录中的 filebeat.yml 文件:

#输出logstash中filebeat.inputs:
- type: log
  paths:
    - /path/to/file/logstash-tutorial.log    # Filebeat 处理的一个或多个文件的绝对路径
output.logstash:
  hosts: ["localhost:5044"]   #输出logstash中

创建一个 Logstash 配置管道,该管道使用 Beats 输入插件从 Beats 接收事件,first-pipeline.conf 的文件:

input {
    beats {
        port => "5044"   # 使用 Filebeat 将 Apache Web 日志作为输入
    }
}
 filter {   # 解析这些日志以从日志中创建特定的命名字段
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
    elasticsearch {
        hosts => [ "localhost:9200" ]   # 将解析后的数据写入 Elasticsearch 集群
    }
}

三、多个输入和输出插件

创建一个 Logstash 管道,该管道从 Twitter 源和 Filebeat 客户端获取输入,然后将信息发送到 Elasticsearch 集群以及将信息直接写入文件。

input {
    twitter {
        consumer_key => "enter_your_consumer_key_here"
        consumer_secret => "enter_your_secret_here"
        keywords => ["cloud"]
        oauth_token => "enter_your_access_token_here"
        oauth_token_secret => "enter_your_access_token_secret_here"
    }
    beats {
        port => "5044"
    }
}
output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
    file {
        path => "/path/to/target/file"
    }
}

四、pipeline结构

input {
  ...
}

filter {
  ...
}

output {
  ...
}

每个部分都包含一个或多个插件的配置选项。如果指定多个过滤器,它们将按照它们在配置文件中出现的顺序应用。如果指定多个输出,事件将按照它们在配置文件中出现的顺序按顺序发送到每个目标。

五、队列和数据弹性

默认情况下,Logstash 在管道阶段(输入 → 管道工作线程)之间使用内存中的有界队列来缓冲事件如果 Logstash 遇到临时机器故障,内存队列的内容将会丢失。(临时机器故障是指 Logstash 或其主机异常终止但能够重新启动的情况。)为了防止数据丢失并确保事件在管道中不间断地流动,Logstash 提供了数据弹性功能。

  • 持久队列 (PQ) 通过将事件存储在磁盘上的内部队列中来防止数据丢失。
  • 死信队列 (DLQ) 为 Logstash 无法处理的事件提供磁盘存储,以便您可以评估它们。您可以使用 dead_letter_queue 输入插件轻松地重新处理死信队列中的事件。

默认情况下,这些弹性功能处于禁用状态。要打开这些功能,您必须在 Logstash 设置文件中显式启用它们。

六、内存队列

内存队列大小不是直接配置的。相反,这取决于您如何调整 Logstash。

其上限由 pipeline.workers (默认值:CPU 数量)乘以 pipeline.batch.size (默认值:125)事件定义。该值称为“飞行计数”,确定每个内存队列中可以保存的最大事件数。

当队列已满时,Logstash 对输入施加反压,以阻止数据流入 Logstash。

七、持久化队列

Logstash 持久队列通过将运行中的消息队列存储到磁盘来帮助防止异常终止期间的数据丢失。

  • 有助于防止正常关闭期间和 Logstash 异常终止期间消息丢失。如果在事件正在进行时 Logstash 重新启动,Logstash 会尝试传送存储在持久队列中的消息,直到传送至少成功一次。
  • 可以吸收突发事件,而不需要 Redis 或 Apache Kafka 等外部缓冲机制。

默认情况下禁用持久队列。

持久队列不能解决以下问题:

  • 不使用请求-响应协议的输入插件无法防止数据丢失。 Tcp、udp、zeromq 推+拉和许多其他输入没有向发送方确认收到的机制。 (beats和http等插件确实具有确认功能,因此受到该队列的良好保护。)
  • 如果在提交检查点文件之前发生异常关闭,则可能会丢失数据。
  • 持久队列不处理永久性机器故障,例如磁盘损坏、磁盘故障和机器丢失。保留到磁盘的数据不会被复制。

八、死信队列 (DLQ)

死信队列(DLQ)被设计为临时写入无法处理的事件的地方。 DLQ 使您能够灵活地调查有问题的事件,而不会阻塞管道或丢失事件。可以使用 dead_letter_queue 输入插件处理来自 DLQ 的事件。

默认情况下,当 Logstash 遇到由于数据包含映射错误或其他问题而无法处理的事件时,Logstash 管道会挂起或删除不成功的事件。为了防止这种情况下的数据丢失,您可以配置 Logstash 将不成功的事件写入死信队列而不是丢弃它们。

默认情况下禁用死信队列。要启用死信队列,请在logstash.yml设置文件中设置dead_letter_queue_enable选项:

dead_letter_queue.enable: true

当您准备好处理死信队列中的事件时,您可以创建一个使用 dead_letter_queue 输入插件从死信队列中读取事件的管道。

input {
  dead_letter_queue {
    path => "/path/to/data/dead_letter_queue"   # 包含死信队列的顶级目录的路径
    commit_offsets => true   # 当 true 时,保存偏移量。当管道重新启动时,它将继续从上次停止的位置读取,而不是重新处理队列中的所有项目
    pipeline_id => "main"   # 写入死信队列的管道的 ID。默认为“main”
    clean_consumed => true   # 删除已完全消耗的段,从而在处理时释放空间
    start_timestamp => "2017-06-06T23:40:37"  # 使用 start_timestamp 选项在队列中的特定点开始处理事件
  }
}

output {
  stdout {
    codec => rubydebug { metadata => true }   # 将事件(包括元数据)写入标准输出
  }
}

九、输入插件

1)、beats

在端口 5044 上侦听传入的 Beats 连接并为 Elasticsearch 建立索引:

input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}"  # %{[@metadata][beat]} 将索引名称的第一部分设置为元数据字段的值,%{[@metadata][version]} 将第二部分设置为 Beat 版本。例如:metricbeat-6.1.6。
  }
}

2)、dead_letter_queue

用于从 Logstash 的死信队列中读取事件: 

input {
  dead_letter_queue {
    path => "/var/logstash/data/dead_letter_queue"
    start_timestamp => "2017-04-04T23:40:37"
  }
}

 3)、elasticsearch 

input {
      # Read all documents from Elasticsearch matching the given query
      elasticsearch {
        hosts => "localhost"
        query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
      }
    }

 4)、file

input {
    file {
        path => "/path/log/*.log"
    }
}

5)、redis

input {
    redis {
        # Redis的key的类型。值可以是以下任意值:list、channel、pattern_channel
        data_type => "list"
        # Redis 数据库编号
        db => 5
        # Redis的数据库地址,默认为127.0.0.1
        host => "10.0.0.10"
        # Redis的端口号
        port => 6379
        # Redis的认证密码
        password => "admin"
        # Redis 列表或通道的名称
        key => "logstash-input-redis"
    }
}

十、输出插件

输出插件将事件数据发送到特定目的地。输出是事件管道的最后阶段。

1)、elasticsearch

使用 mutate 过滤器和条件来添加 [@metadata] 字段来设置每个事件的目标索引:

filter {
      if [log_type] in [ "test", "staging" ] {
        mutate { add_field => { "[@metadata][target_index]" => "test-%{+YYYY.MM}" } }
      } else if [log_type] == "production" {
        mutate { add_field => { "[@metadata][target_index]" => "prod-%{+YYYY.MM.dd}" } }
      } else {
        mutate { add_field => { "[@metadata][target_index]" => "unknown-%{+YYYY}" } }
      }
    }
    output {
      elasticsearch {
        index => "%{[@metadata][target_index]}"
      }
    }

2)、file

output {
 file {
   path => ...
   codec => line { format => "custom format: %{message}"}
 }
}

 3)、kafka

output {
      kafka {
        codec => json
        topic_id => "mytopic"   # 生成消息的主题
      }
    }

4)、redis

output {
  redis {
     # 指定写入数据的类型
    data_type => "list"
    # Redis 数据库编号
    db => 5
    # Redis主机地址
    host => "10.0.0.10"
    # Redis的端口号
    port => 6379
    # Redis的认证密码
    password => "admin"
    # Redis写入的key值
    key => "logstash-input-redis"
  }
}

十一、过滤插件

过滤器插件对事件执行中间处理。通常根据事件的特征有条件地应用过滤器。

1)、age

用于计算事件年龄的简单过滤器。
此过滤器通过从当前时间戳减去事件时间戳来计算事件的年龄。您可以将此插件与删除过滤器插件一起使用来删除早于某个阈值的 Logstash 事件。 

filter {
  age {
    add_field => {   # 一次添加多个字段
          "foo_%{somefield}" => "Hello world, from %{host}"
          "new_field" => "new_static_value"
        }
  }
  if [@metadata][age] > 86400 {
    drop {}
  }
}

2)、bytes

将计算机存储大小的字符串表示形式(例如“123 MB”或“5.6gb”)解析为其以字节为单位的数值。

filter {
      bytes {
        source => "my_bytes_string_field"   # 包含存储大小的源字段的名称
        target => "my_bytes_numeric_field"  # 将包含存储大小(以字节为单位)的目标字段的名称
      }
    }

3)、date

日期过滤器用于解析字段中的日期,然后使用该日期或时间戳作为事件的 Logstash 时间戳。 

 filter {
      date {
        match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
      }
    }

4)、grok

解析任意文本并对其进行结构化。Grok 是将非结构化日志数据解析为结构化且可查询的数据的好方法。

虚构的 http 请求日志:

55.3.244.1 GET /index.html 15824 0.043
filter {
      grok {
        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
      }
    }

 在 grok 过滤器之后,事件中将有一些额外的字段:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1691188.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LESS IS MORE: ONE-SHOT-SUBGRAPH LINK PREDICTION ON LARGE-SCALE KNOWLEDGE GRAPHS

LESS IS MORE: ONE-SHOT-SUBGRAPH LINK PREDICTION ON LARGE-SCALE KNOWLEDGE GRAPHS(ICLR2024) 论文地址:https://arxiv.org/html/2403.10231v1 源码地址:https://github.com/tmlr-group/one-shot-subgraph ABSTRACT 为了推断知…

2024最新 Jenkins + Docker 实战教程(六)- Jenkins配置邮箱接收构建通知

😄 19年之后由于某些原因断更了三年,23年重新扬帆起航,推出更多优质博文,希望大家多多支持~ 🌷 古之立大事者,不惟有超世之才,亦必有坚忍不拔之志 🎐 个人CSND主页——Mi…

软件设计:基于 python 代码快速生成 UML 图

1. 官方文档 PlantUML Language Reference Guide Comate | 百度研发编码助手 百度 Comate (Coding Mate Powered by AI) 是基于文心大模型的智能代码助手,结合百度积累多年的编程现场大数据和外部优秀开源数据,可以生成更符合实际研发场景的优质代码。…

element-plus表格的表单校验如何实现,重点在model和prop

文章目录 vue&#xff1a;3.x element-plus&#xff1a;2.7.3 重点&#xff1a; 1) tableData放到form对象里 2) form-item的prop要写成tableData.序号.属性 <!--table-表单校验--> <template><el-form ref"forms" :model"form"><e…

【Linux系列】软链接使用

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

基于自抗扰控制器和线性误差反馈控制律(ADRC-LSEF)的控制系统simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 ADRC原理 4.2 线性误差反馈控制律(LSEF) 4.3 ADRC-LSEF融合系统 5.完整工程文件 1.课题概述 基于自抗扰控制器和线性误差反馈控制律(ADRC-LSEF)的控制系统simulink建模与仿真。 2.系统仿真结果 …

基于Java的地震震中附近城市分析实战

目录 前言 一、空间数据说明 1、空间查询 二、Java后台开发 1、模型层设计与实现 2、控制层设计与实现 三、Leaflet地图开发 1、地震震中位置展示 2、附近城市展示 3、成果展示 总结 前言 随着全球气候变化和地壳活动的不断演变&#xff0c;地震作为一种自然灾害&…

“AIGC行业投资时机分析:评估当前市场发展阶段与未来需求趋势“

文章目录 每日一句正能量前言行业前景当前发展前景相关领域的发展趋势行业潜力竞争情况结论 市场需求人才需求情况机会挑战结论 选择与规划自我评估行业调研职业规划风险管理个人陈述示例 后记 每日一句正能量 胖了就减&#xff0c;没钱就赚&#xff0c;不会就学&#xff0c;不…

rtk技术的使用, test ok

1. 什么是gnss 2 rtk定位

记录使用 Vue3 过程中的一些技术点

1、自定义组件&#xff0c;并使用 v-model 进行数据双向绑定。 简述&#xff1a; 自定义组件使用 v-model 进行传参时&#xff0c;遵循 Vue 3 的 v-model 机制。在 Vue 3 中&#xff0c;v-model 默认使用了 modelValue 作为 prop 名称&#xff0c;以及 update:modelValue 作为…

Linux-笔记 应用编程函数总结

之前一直没做总结&#xff0c;这里总结一下。 一、文件I/O open #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> int open(const char *pathname, int flags); 例子&#xff1a; int fd; fd open("./test_kondon", O_WRONLY …

10.2.k8s的附加组件-Metrics-server组件与hpa资源pod水平伸缩

目录 一、概述 二、安装部署Metrics-Server组件 1.下载Metrics-Server资源清单 2.编辑Metrics-Server的资源清单 3.验证Metrics-Server是否成功安装 4.使用top命令测试是否管用 三、hpa资源实现pod水平伸缩&#xff08;自动扩缩容&#xff09; 1.编写deploy资源清单 2.…

java实现List对象转geojson文本返回前端

1.业务需求 查询带有经纬度数据的list列表&#xff0c;将其转为geojson格式给前端。 2.GeoJson格式说明 GeoJSON是一种对各种地理数据结构进行编码的格式&#xff0c;基于Javascript对象表示法(JavaScript Object Notation, 简称JSON)的地理空间信息数据交换格式。GeoJSON对…

每日练习之深度优先搜索——最大的湖

最大的湖 题目描述 运行代码 #include<iostream> using namespace std; bool mp[102][102]; int sum,num; int N,M,K; int dfs(int x,int y ) {if( mp[x][y] ){mp[x][y]0;sum;dfs(x1,y);dfs(x-1,y);dfs(x,y1);dfs(x,y-1);}return 0; } int main() {cin>>N>>…

ubuntu 源码安装 cloudcompare

1.系统环境&#xff1a; ubuntu18 cmake&#xff1a;3.10.2 官方安装指导&#xff1a;https://github.com/CloudCompare/CloudCompare/blob/master/BUILD.md (注&#xff1a;查看cmake版本&#xff1a; cmake --version) 2.安装依赖 sudo apt-get update sudo apt-get insta…

Android 屏保开关

设置-显示-屏保&#xff0c; 打开关闭 设置代码在 ./packages/apps/Settings/src/com/android/settings/dream/DreamMainSwitchPreferenceController.java &#xff0c; Overridepublic boolean isChecked() {return mBackend.isEnabled();}Overridepublic boolean setChecke…

【云原生】Kubernetes----POD基本管理

目录 引言 一、Pod基础概念 &#xff08;一&#xff09;Pod简介 &#xff08;二&#xff09;Pod的分类 1.自主式Pod 2.控制器管理的Pod &#xff08;三&#xff09;Pod使用方式 1.单容器pod 2.多容器Pod 3. 注意事项 二、Pod容器的分类 &#xff08;一&#xff09;…

FL Studio2025新功能大揭秘,你准备好了吗?

FL Studio&#xff0c;常被音乐制作者亲切地称为“水果”编曲软件&#xff0c;是比利时的Image-Line公司研发的一款完整的软件音乐生产环境或数字音频工作站&#xff08;DAW&#xff09;。自从1997年推出以来&#xff0c;它已经成为全世界众多电子音乐制作者和DJ的首选工具&…

【中霖教育口碑】什么情况下不允许参加注册会计师考试?

对于某些特殊情况&#xff0c;存在明确的禁止性规定&#xff0c;是不能参加注册会计师考试的&#xff0c;中霖为大家分享一下!关于注册会计师全国统一考试的资格条件&#xff0c;需明确以下要点&#xff1a; 1. 针对在前期注册会计师统一考试中因违反规定而受到禁止参加考试的…