一、ELFK集群部署(Filebeat+ELK)
在搭建ELK的基础上安装Filebeat服务,Filebeat服务可以布置在以下任意一台主机,本次实验将布置在apache服务器的节点上
步骤一:安装 Filebeat(在apache节点操作)
#上传软件包 filebeat-6.7.2-linux-x86_64.tar.gz 到/opt目录
tar zxvf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64/ /usr/local/filebeat
步骤二:设置 filebeat 的主配置文件
cd /usr/local/filebeat
vim filebeat.yml
filebeat.prospectors:
- type: log #指定 log 类型,从日志文件中读取消息
enabled: true
paths:
#指定监控的日志文件
- /var/log/httpd/access_log
tags: ["filebeat"] #设置索引标签
fields: #可以使用 fields 配置选项设置一些参数字段添加到 output 中
service_name: apache
log_type: access
from: 192.168.73.107
--------------Elasticsearch output-------------------
(全部注释掉)
----------------Logstash output---------------------
output.logstash:
hosts: ["192.168.73.107:5044"] #指定 logstash 的 IP 和端口
步骤三: 新建 Logstash 配置文件
cd /etc/logstash/conf.d
cp system.conf filebeat.conf
vim filebeat.conf
input {
beats { port => "5044" }
}
output {
elasticsearch {
hosts => ["192.168.73.105:9200","192.168.73.106:9200"]
index =>"%{[fields][service_name]}-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
filebeat 功能的引入测试
1)创建两个访问页面
echo "<h1>this is filebeat test </h1>" >/var/www/html/test1.html
echo "h1> this is a test </h1>" >/var/www/html/test2.html
2)启动filebeat和logstash
#启动 filebeat
nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#-e:输出到标准输出,禁用syslog/文件输出
#-c:指定配置文件
#nohup:在系统后台不挂断地运行命令,退出终端不会影响程序的运行
#启动 logstash
cd /etc/logstash/conf.d
logstash -f filebeat.conf
3)进行访问测试
此时:屏幕输出的日志以及发生改变(Logstash的新建配置文件中output模块中stdout)
4)kibana中创建于apache相关的索引,并且通过时序,可以查看到访问日志的信息
解决日志与kibana时间不一致的方法
二、 Logstash的过滤模块
2.1 Logstash配置文件中的模块
(1)input {}
- 指定输入流,通过file、beats、kafka、redis中获取数据
(2)filter {}
常用插件:
- grok:对若干个大文本字段进行再分割,分割成一些小字段 (?<字段名>正则表达式) 字段名:正则表示匹配到的内容
- date:对数据中的时间进行统一格式化
- mutate:对一些无用的字段进行剔除,或增加字段
- mutiline:对多行数据进行统一编排,多行合并和拆分
(3)ourput {}
- elasticsearch stdout
2.2 Filter(过滤模块)中的插件
而对于 Logstash 的 Filter,这个才是 Logstash 最强大的地方。Filter 插件也非常多,我们常用到的 grok、date、mutate、mutiline 四个插件。
对于 filter 的各个插件执行流程,可以看下面这张图:
grok插件(通过grok插件实现对字段的分割,使用通配符)
这里就要用到 logstash 的 filter 中的 grok 插件。filebeat 发送给 logstash 的日志内容会放到message 字段里面,logstash 匹配这个 message 字段就可以了。
2.3 kibana 的DEV Tools 中 Grok Debugger工具的运用
在此我们以apache访问日志索引中的message字段为例 ,进行数据切片:
假设我们要进行的切片信息为messge中的
192.168.73.105 - - [13/Nov/2022:02:57:13 +0800] "GET /test1.html HTTP/1.1" 304 - "-" "Mozilla/5.0 (X11; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0"
数据切片的格式为:
匹配格式:(?<字段名>正则表达式)
切片案例一:分离访问用户的IP
(?<remote_addr>%{IPV6}|%{IPV4} )(?<other_info>.+)
切片案例二:以上面的为基础,将时间分离出来
(?<remote_addr>%{IPV4}|%{IPV6})[\s-]+\[(?<logTime>.+)](?<other_info>.+)
切片案例三:取出状态码和http方法
(?<remote_addr>%{IPV4}|%{IPV6})[\s-]+\[(?<logTime>.+)] "GET (?<http_method>.+")[\s](?<status_code>\d+) -\s"-"\s(?<other_info>.+)
2.4 数据切片的实战演练
切片前的操作
vim apache_log.conf
input {
file{
path => "/etc/httpd/logs/access_log"
type => "access"
start_position => "beginning"
}
}
output {
if [type] == "access" {
elasticsearch {
hosts => ["192.168.73.105:9200","192.168.73.105:9200"]
index => "apache_access-%{+YYYY.MM.dd}"
}
}
}
logstash -f apache_log.conf
对apche服务进行访问
进行切片分离
修改logstash的控制conf
vim apache_log.conf
input {
file{
path => "/etc/httpd/logs/access_log"
type => "access"
start_position => "beginning"
}
}
filter {
grok {
match => ["message","(?<remote_addr>%{IPV4}|%{IPV6})[\s-]+\[(?<logTime>.+)] \"GET (?<http_method>.+\")[\s](?<status_code>\d+) -\s\"-\"\s(?<other_info>.+)"]
}
}
output {
if [type] == "access" {
elasticsearch {
hosts => ["192.168.73.105:9200","192.168.73.105:9200"]
index => "apache_access-%{+YYYY.MM.dd}"
}
}
}
logstash -f apache_log.conf
再次进行访问测试
三、filebeat中multiline 插件的引入
java错误日志一般都是一条日志很多行的,会把堆栈信息打印出来,当经过 logstash 解析后,每一行都会当做一条记录存放到 EslaticSearch中,那这种情况肯定是需要处理的。这里就需要使用 multiline 插件,对属于同一个条日志的记录进行拼接。
3.1 安装 multiline 插件
在线安装插件
cd /usr/share/logstash
bin/logstash-plugin install logstash-filter-multiline
离线安装插件
先在有网的机器上在线安装插件,然后打包,拷贝到服务器,执行安装命令
bin/logstash-plugin install file:///usr/share/logstash/logstash-offline-plugins-6.7.2.zip
检查下插件是否安装成功,可以执行以下命令查看插件列表
bin/logstash-plugin list
3.2 使用 multiline 插件 进行日志合并
日志合并的过程:
-
第一步:每一条日志的第一行开头都是一个时间,可以用时间的正则表达式匹配到第一行。
-
第二步:然后将后面每一行的日志与第一行合并。
-
第三步:当遇到某一行的开头是可以匹配正则表达式的时间的,就停止第一条日志的合并,开始合并第二条日志。
-
第四步:重复第二步和第三步
filter {
multiline {
pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}.\d{3}"
negate => true
what => "previous"
}
}
pattern: 用来匹配文本的表达式,也可以是grok表达式
what: 如果pattern匹配成功的话,那么匹配行是归属于上一个事件,还是归属于下一个事件。previous: 归属于上一个事件,向上合并。next: 归属于下一个事件,向下合并
negate:是否对 pattern 的结果取反。false: 不取反,是默认值。true: 取反。将多行事件扫描过程中的行匹配逻辑取反(如果pattern匹配失败,则认为当前行是多行事件的组成部分)
3.3 多行被拆分
Java 堆栈日志太长了,有 100 多行,被拆分了两部分,一部分被合并到了原来的那一条日志中,另外一部分被合并到了不相关的日志中。
为了解决这个问题,可以通过配置 filebeat 的 multiline 插件来截断日志。至于为什么不用 logstash 的 multiline 插件呢?因为在 filter 中使用 multiline 没有截断的配置项。filebeat 的 multiline 配置项如下:
multiline.type: pattern
multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}.\d{3}'
multiline.negate: true
multiline.match: after
multiline.max_lines: 50
配置项说明:
-
multiline.pattern:希望匹配到的结果(正则表达式)
-
multiline.negate:值为 true 或 false。使用 false 代表匹配到的行合并到上一行;使用 true 代表不匹配的行合并到上一行
-
multiline.match:值为 after 或 before。after 代表合并到上一行的末尾;before 代表合并到下一行的开头
-
multiline.max_lines:合并的最大行数,默认 500
-
multiline.timeout:一次合并事件的超时时间,默认为 5s,防止合并消耗太多时间导致 filebeat 进程卡死
四、mutate 插件
当我们将日志解析出来后,Logstash 自身会传一些不相关的字段到 ES 中,这些字段对我们排查线上问题帮助不大。可以直接剔除掉。这里我们就要用到 mutate 插件了。它可以对字段进行转换,剔除等。
一般会把把 log.offset 字段去掉,这个字段可能会包含很多无意义内容。
filter{
mutate {
remove_field => ["host" "[log][offset]"]
}
}