input {
kafka {
topics => “xxxx”
bootstrap_servers => “ip:port”
auto_offset_reset => “xxxx”
group_id => “xxxx”
consumer_threads => 3
codec => “json”
}
}
filter {
grok {
match => {
“message” => ‘%{IP:client_ip} - - [%{HTTPDATE:timestamp}] “%{WORD:http_method} %{URIPATHPARAM:request_path} HTTP/%{NUMBER:http_version}” %{NUMBER:http_response_code} %{NUMBER:response_size} “%{URI:http_referer}” “%{DATA:user_agent}” “-”’
}
}
mutate {
add_field => {
“[parsed][client_ip]” => “%{client_ip}”
“[parsed][timestamp]” => “%{timestamp}”
“[parsed][http_method]” => “%{http_method}”
“[parsed][request_path]” => “%{request_path}”
“[parsed][http_version]” => “%{http_version}”
“[parsed][http_response_code]” => “%{http_response_code}”
“[parsed][response_size]” => “%{response_size}”
“[parsed][http_referer]” => “%{http_referer}”
“[parsed][user_agent]” => “%{user_agent}”
}
merge => { “client_ip” => “message” }
remove_field => ["client_ip", "timestamp", "http_method", "request_path", "http_version", "http_response_code", "response_size", "http_referer", "user_agent"]
}
}
output {
elasticsearch {
user => “elastic”
password => “xxxxx”
hosts => [“ip:port”]
index => “%{[type]}-%{[@metadata][index_day]}”
document_type => “%{[type]}”
}
}
最终实现的效果: