文章目录
- ELK整合实战
- 使用FileBeats将日志发送到Logstash
- 配置Logstash接收FileBeat收集的数据并打印
- Logstash输出数据到Elasticsearch
- 利用Logstash过滤器解析日志
- Grok插件
- Grok语法
- 用法
- 输出到Elasticsearch指定索引
前文:FileBeats详解
前文:logstash详解
ELK整合实战
案例:采集SpringBoot应用日志
一个springboot应用,打了一个jar包,使用nohup java -jar XXX.jar &
运行
日志内容如下所示
2024-08-16 16:04:04.605 INFO 9164 --- [nio-8081-exec-1] com.hs.single.controller.UserController : 请求参数为:1
Creating a new SqlSession
SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4196ffc2] was not registered for synchronization because synchronization is not active
JDBC Connection [com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@790df42b] will not be managed by Spring
==> Preparing: SELECT id,username,password,name,description,status,create_time,update_time FROM sys_user WHERE id=?
==> Parameters: 1(Long)
<== Total: 0
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@4196ffc2]
使用FileBeats将日志发送到Logstash
创建配置文件filebeat-logstash.yml,配置FileBeats将数据发送到Logstash
vim filebeat-logstash.yml
#因为Tomcat的web log日志都是以IP地址开头的,我们应用日志一般是以日期时间开头。所以我们需要修改下匹配字段。
# 不以日期时间开头的行追加到上一行
filebeat.inputs:
- type: log
enabled: true
paths:
- /root/application/nohup.out
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after
output.logstash:
enabled: true
hosts: ["192.168.75.65:5044"]
- pattern:正则表达式
- negate:true 或 false;默认是false,匹配pattern的行合并到上一行;true,不匹配pattern的行合并到上一行
- match:after 或 before,合并到上一行的末尾或开头
# 启动可能会报错 因为安全原因不要其他用户写的权限,去掉写的权限就可以了,
chmod 644 filebeat-logstash.yml
启动FileBeat,并指定使用指定的配置文件
# -e 前台运行 -c 指定配置文件
./filebeat -e -c filebeat-logstash.yml
可能出现的异常:
Exiting: error loading config file: config file ("filebeat-logstash.yml") can only be writable by the owner but the permissions are "-rw-rw-r--" (to fix the permissions use: 'chmod go-w /home/es/filebeat-7.17.3-linux-x86_64/filebeat-logstash.yml')
因为安全原因不要其他用户写的权限,去掉写的权限就可以了
Failed to connect to backoff(async(tcp://192.168.75.65:5044)): dial tcp 192.168.75.65:5044: connect: connection refused
FileBeat将尝试建立与Logstash监听的IP和端口号进行连接。但此时,我们并没有开启并配置Logstash,所以FileBeat是无法连接到Logstash的。
配置Logstash接收FileBeat收集的数据并打印
输入的使用5044端口接收输入,输出是直接在控制台打印,查看是否能正常接收数据
vim config/filebeat-console.conf
# 配置从FileBeat接收数据
input {
beats {
port => 5044
}
}
output {
stdout {
codec => rubydebug
}
}
启动logstash
# -t检查配置文件是否存在语法错误
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -t -f config/filebeat-console.conf
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -f config/filebeat-console.conf
调用接口,filebeat成功将数据发送至logstash中
Logstash输出数据到Elasticsearch
如果我们需要将数据输出值ES而不是控制台的话,我们修改Logstash的output配置。
vim config/filebeat-elasticSearch.conf
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["http://192.168.75.65:9200"]
user => "elastic"
password => "123456"
}
stdout{
codec => rubydebug
}
}
启动logstash
# -t检查配置文件是否存在语法错误
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -t -f config/filebeat-elasticSearch.conf
[root@hs-es-node1 logstash-7.17.3]# bin/logstash -f config/filebeat-elasticSearch.conf
ES中会生成一个以logstash开头的索引,测试日志是否保存到了ES。
思考:日志信息都保证在message字段中,是否可以把日志进行解析一个个的字段?例如:时间、日志级别、哪个类打印的日志、日志具体内容。
利用Logstash过滤器解析日志
在Logstash中可以配置过滤器Filter对采集到的数据进行过滤处理,Logstash中有大量的插件可以供我们使用。
# 查看Logstash已经安装的插件,默认已经安装了grok插件
[root@hs-es-node1 logstash-7.17.3]# bin/logstash-plugin list
Grok插件
Grok是一种将非结构化日志解析为结构化的插件。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是任意其他的日志格式。
https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-grok.html
Grok语法
Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配。
grok模式的语法是:
# SYNTAX(语法)指的是Grok模式名称,SEMANTIC(语义)是给模式匹配到的文本字段名。
# 注意这中间不要有空格
%{SYNTAX:SEMANTIC}
# 案例 duration表示:匹配一个数字,client表示匹配一个IP地址。
%{NUMBER:duration} %{IP:client}
默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:
%{NUMBER:duration:int} %{IP:client}
常用的Grok模式
https://help.aliyun.com/document_detail/129387.html?scm=20140722.184.2.173
用法
比如,tomacat日志
192.168.65.103 - - [23/Jun/2022:22:37:23 +0800] "GET /docs/images/docs-stylesheet.css HTTP/1.1" 200 5780
字段名 | 说明 |
---|---|
client IP | 浏览器端IP |
date | 请求的时间戳 |
method | 请求方式(GET/POST) |
uri | 请求的链接地址 |
status | 服务器端响应状态 |
length | 响应的数据长度 |
%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA:protocol}\" %{INT:status} %{INT:length}
为了方便测试,我们可以使用Kibana来进行Grok开发:
我们现在的日志和对应的grok语法:
2024-08-16 16:04:04.605 INFO 9164 --- [nio-8081-exec-1] com.hs.single.controller.UserController : 请求参数为:1
%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{INT:thread} --- \[%{NOTSPACE:serverPort}\] %{JAVACLASS:class} : %{GREEDYDATA:logResult}
修改Logstash配置文件
vim config/filebeat-elasticSearch.conf
input {
beats {
port => 5044
}
}
# 添加下面的filter
# 因为我们的消息是保存在message字段中,所以下面主要就是匹配message字段中的值
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{INT:thread} --- \[%{NOTSPACE:serverPort}\] %{JAVACLASS:class} : %{GREEDYDATA:logResult}"
}
}
}
output {
stdout {
codec => rubydebug
}
}
启动logstash测试
# --config.reload.automatic修改了配置文件能热加载
bin/logstash -f config/filebeat-elasticSearch.conf --config.reload.automatic
使用mutate插件过滤掉不需要的字段
mutate {
enable_metric => "false"
remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
}
如果要将日期格式进行转换,我们可以使用Date插件来实现。该插件专门用来解析字段中的日期,官方说明文档:https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-date.html
用法如下:
将date字段转换为「年月日 时分秒」格式。默认字段经过date插件处理后,会输出到@timestamp字段,所以,我们可以通过修改target属性来重新定义输出字段。
# [字段名,当前时间格式,修改后的时间格式] 我们这里可以不用改,但是需要了解这个功能
date {
match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]
target => "date"
}
输出到Elasticsearch指定索引
index来指定索引名称,默认输出的index名称为:logstash-%{+yyyy.MM.dd}。
但注意,要在index中使用时间格式化,filter的输出必须包含 @timestamp字段,否则将无法解析日期。
output {
elasticsearch {
hosts => ["http://192.168.75.65:9200"]
index => "app_web_log_%{+YYYY-MM-dd}"
user => "elastic"
password => "123456"
}
stdout{
codec => rubydebug
}
}
注意:index名称中,不能出现大写字符
完整的Logstash配置文件
input {
beats {
port => 5044
}
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{INT:thread} --- \[%{NOTSPACE:serverPort}\] %{JAVACLASS:class} : %{GREEDYDATA:logResult}"
}
}
mutate {
enable_metric => "false"
remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
}
# 我们这里可以不用改,但是需要了解这个功能
#date {
# match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]
# target => "date"
#}
}
output {
elasticsearch {
hosts => ["http://192.168.75.65:9200"]
index => "app_web_log_%{+YYYY-MM-dd}"
user => "elastic"
password => "123456"
}
stdout{
codec => rubydebug
}
}