文章目录
- 1.修改filebeat配置文件filebeat.yml收集日志转发(生产)给kafka
- 2.修改logstash配置从kafka中消费日志,并输出到kibana前端展示
在ELFK的基础上,添加kafka做数据缓冲
附kafka消息队列
nginx服务器配置filebeat收集日志:192.168.17.50,修改配置将采集到的日志转发给kafka;
kafka集群:192.168.17.51,192.168.17.52,192.168.17.53(生产和消费端口9092);
logstash+kibana:192.168.17.54,修改配置从kafka中消费日志,并输出到kibana前端展示;
elasticsearch群集:192.168.17.55,192.168.17.56,对格式化后的数据进行索引和存储。
1.修改filebeat配置文件filebeat.yml收集日志转发(生产)给kafka
2.修改logstash配置从kafka中消费日志,并输出到kibana前端展示
input {
kafka {
bootstrap_servers => "192.168.17.50:9092,192.168.17.51:9092,192.168.17.52:9092"
topics => "nginx_log"
type => "nginx_log"
codec => "json"
auto_offset_reset => "latest"
decorate_events => true
}
}
filter {
grok {
match => ["message", "(?<remote_addr>%{IPV4}|%{IPV6})[\s-]+\[(?<logTime>.+)\] \"(?<http_method>.+) (?<url_path>/.*) (?<http_ver>.+)\" (?<rev_code>\d+) \d+ \".*\" \"(?<User_agent>.+)\" \".*\""]
}
mutate {
replace => { "host" => "nginx_server" }
}
date {
match => ["logTime","dd/MMM/yyyy:HH:mm:ss Z"]
timezone => "Asia/Shanghai"
}
}
output {
if [source] == "/var/log/nginx/access.log" {
elasticsearch {
hosts => ["192.168.116.60:9200","192.168.116.70:9200"]
index => "nginx_access-%{+YYYY.MM.dd}"
}
}
if [source] == "/var/log/nginx/error.log" {
elasticsearch {
hosts => ["192.168.116.60:9200","192.168.116.70:9200"]
index => "nginx_error-%{+YYYY.MM.dd}"
}
}
stdout {
codec => rubydebug
}
}
开启logstash,此时访问web测试页面,就可以在kibana对日志收集分析了
在kibana中收集和分析