目录
一、简介
二、单个输入和输出插件
三、多个输入和输出插件
四、pipeline结构
五、队列和数据弹性
六、内存队列
七、持久化队列
八、死信队列 (DLQ)
九、输入插件
1)、beats
2)、dead_letter_queue
3)、elasticsearch
4)、file
5)、redis
十、输出插件
1)、elasticsearch
2)、file
3)、kafka
4)、redis
十一、过滤插件
1)、age
2)、bytes
3)、date
4)、grok
一、简介
Logstash 是一个具有实时管道功能的开源数据收集引擎。 Logstash 可以动态地统一来自不同来源的数据,并将数据规范化到您选择的目的地。
Logstash 管道有两个必需元素(输入和输出)以及一个可选元素(过滤器)。输入插件使用来自源的数据,过滤器插件根据您的指定修改数据,输出插件将数据写入目标。
例如:
最基本的 Logstash 管道:
cd logstash-8.12.2
# 示例中的管道从标准输入 stdin 获取输入,并以结构化格式将该输入移动到标准输出 stdout。
bin/logstash -e 'input { stdin { } } output { stdout {} }'
二、单个输入和输出插件
在创建 Logstash 管道之前,您将配置 Filebeat 以将日志行发送到 Logstash。 Filebeat 客户端是一个轻量级、资源友好型工具,它从服务器上的文件收集日志并将这些日志转发到 Logstash 实例进行处理。 Filebeat 专为可靠性和低延迟而设计。 Filebeat 在主机上的资源占用很少,Beats 输入插件最大限度地减少了 Logstash 实例的资源需求。
安装Filebeat后,需要对其进行配置。打开位于 Filebeat 安装目录中的 filebeat.yml 文件:
#输出logstash中filebeat.inputs:
- type: log
paths:
- /path/to/file/logstash-tutorial.log # Filebeat 处理的一个或多个文件的绝对路径
output.logstash:
hosts: ["localhost:5044"] #输出logstash中
创建一个 Logstash 配置管道,该管道使用 Beats 输入插件从 Beats 接收事件,first-pipeline.conf 的文件:
input {
beats {
port => "5044" # 使用 Filebeat 将 Apache Web 日志作为输入
}
}
filter { # 解析这些日志以从日志中创建特定的命名字段
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ] # 将解析后的数据写入 Elasticsearch 集群
}
}
三、多个输入和输出插件
创建一个 Logstash 管道,该管道从 Twitter 源和 Filebeat 客户端获取输入,然后将信息发送到 Elasticsearch 集群以及将信息直接写入文件。
input {
twitter {
consumer_key => "enter_your_consumer_key_here"
consumer_secret => "enter_your_secret_here"
keywords => ["cloud"]
oauth_token => "enter_your_access_token_here"
oauth_token_secret => "enter_your_access_token_secret_here"
}
beats {
port => "5044"
}
}
output {
elasticsearch {
hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
}
file {
path => "/path/to/target/file"
}
}
四、pipeline结构
input {
...
}
filter {
...
}
output {
...
}
每个部分都包含一个或多个插件的配置选项。如果指定多个过滤器,它们将按照它们在配置文件中出现的顺序应用。如果指定多个输出,事件将按照它们在配置文件中出现的顺序按顺序发送到每个目标。
五、队列和数据弹性
默认情况下,Logstash 在管道阶段(输入 → 管道工作线程)之间使用内存中的有界队列来缓冲事件。如果 Logstash 遇到临时机器故障,内存队列的内容将会丢失。(临时机器故障是指 Logstash 或其主机异常终止但能够重新启动的情况。)为了防止数据丢失并确保事件在管道中不间断地流动,Logstash 提供了数据弹性功能。
- 持久队列 (PQ) 通过将事件存储在磁盘上的内部队列中来防止数据丢失。
- 死信队列 (DLQ) 为 Logstash 无法处理的事件提供磁盘存储,以便您可以评估它们。您可以使用 dead_letter_queue 输入插件轻松地重新处理死信队列中的事件。
默认情况下,这些弹性功能处于禁用状态。要打开这些功能,您必须在 Logstash 设置文件中显式启用它们。
六、内存队列
内存队列大小不是直接配置的。相反,这取决于您如何调整 Logstash。
其上限由 pipeline.workers (默认值:CPU 数量)乘以 pipeline.batch.size (默认值:125)事件定义。该值称为“飞行计数”,确定每个内存队列中可以保存的最大事件数。
当队列已满时,Logstash 对输入施加反压,以阻止数据流入 Logstash。
七、持久化队列
Logstash 持久队列通过将运行中的消息队列存储到磁盘来帮助防止异常终止期间的数据丢失。
- 有助于防止正常关闭期间和 Logstash 异常终止期间消息丢失。如果在事件正在进行时 Logstash 重新启动,Logstash 会尝试传送存储在持久队列中的消息,直到传送至少成功一次。
- 可以吸收突发事件,而不需要 Redis 或 Apache Kafka 等外部缓冲机制。
默认情况下禁用持久队列。
持久队列不能解决以下问题:
- 不使用请求-响应协议的输入插件无法防止数据丢失。 Tcp、udp、zeromq 推+拉和许多其他输入没有向发送方确认收到的机制。 (beats和http等插件确实具有确认功能,因此受到该队列的良好保护。)
- 如果在提交检查点文件之前发生异常关闭,则可能会丢失数据。
- 持久队列不处理永久性机器故障,例如磁盘损坏、磁盘故障和机器丢失。保留到磁盘的数据不会被复制。
八、死信队列 (DLQ)
死信队列(DLQ)被设计为临时写入无法处理的事件的地方。 DLQ 使您能够灵活地调查有问题的事件,而不会阻塞管道或丢失事件。可以使用 dead_letter_queue 输入插件处理来自 DLQ 的事件。
默认情况下,当 Logstash 遇到由于数据包含映射错误或其他问题而无法处理的事件时,Logstash 管道会挂起或删除不成功的事件。为了防止这种情况下的数据丢失,您可以配置 Logstash 将不成功的事件写入死信队列而不是丢弃它们。
默认情况下禁用死信队列。要启用死信队列,请在logstash.yml设置文件中设置dead_letter_queue_enable选项:
dead_letter_queue.enable: true
当您准备好处理死信队列中的事件时,您可以创建一个使用 dead_letter_queue 输入插件从死信队列中读取事件的管道。
input {
dead_letter_queue {
path => "/path/to/data/dead_letter_queue" # 包含死信队列的顶级目录的路径
commit_offsets => true # 当 true 时,保存偏移量。当管道重新启动时,它将继续从上次停止的位置读取,而不是重新处理队列中的所有项目
pipeline_id => "main" # 写入死信队列的管道的 ID。默认为“main”
clean_consumed => true # 删除已完全消耗的段,从而在处理时释放空间
start_timestamp => "2017-06-06T23:40:37" # 使用 start_timestamp 选项在队列中的特定点开始处理事件
}
}
output {
stdout {
codec => rubydebug { metadata => true } # 将事件(包括元数据)写入标准输出
}
}
九、输入插件
1)、beats
在端口 5044 上侦听传入的 Beats 连接并为 Elasticsearch 建立索引:
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}" # %{[@metadata][beat]} 将索引名称的第一部分设置为元数据字段的值,%{[@metadata][version]} 将第二部分设置为 Beat 版本。例如:metricbeat-6.1.6。
}
}
2)、dead_letter_queue
用于从 Logstash 的死信队列中读取事件:
input {
dead_letter_queue {
path => "/var/logstash/data/dead_letter_queue"
start_timestamp => "2017-04-04T23:40:37"
}
}
3)、elasticsearch
input {
# Read all documents from Elasticsearch matching the given query
elasticsearch {
hosts => "localhost"
query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
}
}
4)、file
input {
file {
path => "/path/log/*.log"
}
}
5)、redis
input {
redis {
# Redis的key的类型。值可以是以下任意值:list、channel、pattern_channel
data_type => "list"
# Redis 数据库编号
db => 5
# Redis的数据库地址,默认为127.0.0.1
host => "10.0.0.10"
# Redis的端口号
port => 6379
# Redis的认证密码
password => "admin"
# Redis 列表或通道的名称
key => "logstash-input-redis"
}
}
十、输出插件
输出插件将事件数据发送到特定目的地。输出是事件管道的最后阶段。
1)、elasticsearch
使用 mutate 过滤器和条件来添加 [@metadata] 字段来设置每个事件的目标索引:
filter {
if [log_type] in [ "test", "staging" ] {
mutate { add_field => { "[@metadata][target_index]" => "test-%{+YYYY.MM}" } }
} else if [log_type] == "production" {
mutate { add_field => { "[@metadata][target_index]" => "prod-%{+YYYY.MM.dd}" } }
} else {
mutate { add_field => { "[@metadata][target_index]" => "unknown-%{+YYYY}" } }
}
}
output {
elasticsearch {
index => "%{[@metadata][target_index]}"
}
}
2)、file
output {
file {
path => ...
codec => line { format => "custom format: %{message}"}
}
}
3)、kafka
output {
kafka {
codec => json
topic_id => "mytopic" # 生成消息的主题
}
}
4)、redis
output {
redis {
# 指定写入数据的类型
data_type => "list"
# Redis 数据库编号
db => 5
# Redis主机地址
host => "10.0.0.10"
# Redis的端口号
port => 6379
# Redis的认证密码
password => "admin"
# Redis写入的key值
key => "logstash-input-redis"
}
}
十一、过滤插件
过滤器插件对事件执行中间处理。通常根据事件的特征有条件地应用过滤器。
1)、age
用于计算事件年龄的简单过滤器。
此过滤器通过从当前时间戳减去事件时间戳来计算事件的年龄。您可以将此插件与删除过滤器插件一起使用来删除早于某个阈值的 Logstash 事件。
filter {
age {
add_field => { # 一次添加多个字段
"foo_%{somefield}" => "Hello world, from %{host}"
"new_field" => "new_static_value"
}
}
if [@metadata][age] > 86400 {
drop {}
}
}
2)、bytes
将计算机存储大小的字符串表示形式(例如“123 MB”或“5.6gb”)解析为其以字节为单位的数值。
filter {
bytes {
source => "my_bytes_string_field" # 包含存储大小的源字段的名称
target => "my_bytes_numeric_field" # 将包含存储大小(以字节为单位)的目标字段的名称
}
}
3)、date
日期过滤器用于解析字段中的日期,然后使用该日期或时间戳作为事件的 Logstash 时间戳。
filter {
date {
match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
}
}
4)、grok
解析任意文本并对其进行结构化。Grok 是将非结构化日志数据解析为结构化且可查询的数据的好方法。
虚构的 http 请求日志:
55.3.244.1 GET /index.html 15824 0.043
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
在 grok 过滤器之后,事件中将有一些额外的字段:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043