- 配置file beat输出到 Kafka
- logstash服务器从kafka获取数据并输出到es集群
- 在es集群上查看索引
- kibana界面添加索引查看数据
1.配置file beat输出到 Kafka
1.1 Filebeat机器配置数据采集和输出目标
做好域名解析
# vim /usr/local/filebeat/filebeat.yml
# 修改输出目标为kafka集群
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["kafka01:9092", "kafka02:9092", "kafka03:9092"]topic: 'nginx'
partition.round_robin:
reachable_only: falserequired_acks: 1
compression: gzip
max_message_bytes: 1000000
1.2 启动file beat
1.3 kafka集群上验证kafka是否生成topic
[root@es03 kafka]# ./bin/kafka-topics.sh --zookeeper 192.168.19.20:2181 --list
2.logstash服务器从kafka获取数据并输出到es集群
2.1 配置logstash
# vim /usr/local/logstash/config/first-pipeline.conf
input {
kafka {
type => "nginx_log"
codec => "json"
topics => ["nginx"]
decorate_events => true
bootstrap_servers => "192.168.19.20:9092, 192.168.19.21:9092, 192.168.19.22:9092"
}
}filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" } }
}output {
stdout {}
if [type] == "nginx_log" { #和上面的type一致
elasticsearch {
index => "nginx-%{+YYYY.MM.dd}"
codec => "json"
hosts => ["192.168.19.20:9200","192.168.19.21:9200","192.168.19.22:9200"]
}
}
}
2.2 配置解析
decorate_events => true
默认是 false` 这将向logstash 事件添加一个名为kafka的字段 ,这包含以下属性。
topic 主题:与此消息相关联的主题
consumer_group
使用者群组:此事件中用来读取的使用者群组
partition
分区:与此消息关联的分区
offset
偏移量:与此消息关联的分区的偏移量
key
:包含message key
的ByteBuffer
2.2 启动 logstash
[root@logstash logstash]# ./bin/logstash -f config/first-pipeline.conf --config.reload.automatic
3.在es集群上查看索引
[root@es01 kafka]# curl -X GET "192.168.19.20:9200/_cat/indices"
4. kibana界面添加索引查看数据