Elk+Filebeat+Kafka实现日志收集(本机nginx)
部署Zookeeper
1.实验组件
#准备3台服务器做Zookeeper集群
20.0.0.10
20.0.0.20
20.0.0.30
2.安装前准备
#关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
setenforce 0
#安装JDK
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version
#将apache-zookeeper-3.5.7-bin.tar.gz压缩包上传至/opt目录
3.安装Zookeeper
#三台服务器一齐操作
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /opt/zookeeper
#修改配置文件
cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
--2--
tickTime=2000
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
--5--
initLimit=10
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s
--8--
syncLimit=5
#Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
--12--修改
dataDir=/opt/zookeeper/data
#指定保存Zookeeper中的数据的目录,目录需要单独创建
--添加--
dataLogDir=/opt/zookeeper/logs
#指定存放日志的目录,目录需要单独创建
--15--
clientPort=2181
#客户端连接端口
--末尾添加集群信息--
server.1=20.0.0.10:3188:3288
server.2=20.0.0.20:3188:3288
server.3=20.0.0.30:3188:3288
#在每个节点上创建数据目录和日志目录
mkdir /opt/zookeeper/data
mkdir /opt/zookeeper/logs
#在每个节点的dataDir指定的目录下创建一个 myid 的文件,不同节点分配1、2、3
echo 1 > /opt/zookeeper/data/myid
echo 2 > /opt/zookeeper/data/myid
echo 3 > /opt/zookeeper/data/myid
#配置 Zookeeper 启动脚本
vim /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/opt/zookeeper'
case $1 in
start)
echo "---------- zookeeper 启动 ------------"
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo "---------- zookeeper 停止 ------------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo "---------- zookeeper 重启 ------------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo "---------- zookeeper 状态 ------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
#设置开机自启
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper
#分别启动 Zookeeper
service zookeeper start
#查看当前状态
service zookeeper status
部署Kafka(3.4.1版本)
1.安装Kafka
cd /opt
--上传kafka_2.13-3.4.1.tgz--
tar -xf kafka_2.13-3.4.1.tgz
mv kafka_2.13-3.4.1 kafka
cd kafka/config/
cp server.properties server.properties.bak
vim server.properties
--24--
broker.id=1
#broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=2、broker.id=3
--34--
listeners=PLAINTEXT://20.0.0.10:9092
#每台服务器分别为10、20、30,不用加地址映射
--62--
log.dirs=/var/log/kafka
#kafka运行日志存放的路径,也是数据存放的路径
--125--
zookeeper.connect=20.0.0.10:2181,20.0.0.20:2181,20.0.0.30:2181
#配置连接Zookeeper集群地址
#修改全局配置
vim /etc/profile
--添加--
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
#配置Zookeeper启动脚本
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/opt/kafka'
case $1 in
start)
echo "---------- Kafka 启动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 状态 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
#设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka
#分别启动Kafka
service kafka start
2.命令行测试
#创建topic
kafka-topics.sh --create --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --replication-factor 2 --partitions 3 --topic test1
#查看当前服务器中的所有 topic
kafka-topics.sh --list --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092
#发布消息
kafka-console-producer.sh --broker-list 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1
#消费消息
kafka-console-consumer.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1 --from-beginning
#修改分区数
kafka-topics.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --alter --topic test1 --partitions 6
#删除 topic
kafka-topics.sh --delete --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1
部署Filebeat
1.安装Filebeat
#10
cd /opt/
--上传filebeat-6.7.2-linux-x86_64.tar.gz--
tar -xf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 filebeat
vim /etc/logstash/logstash.yml
--64--
path.config: /opt/log
systemctl restart logstash
2.时间同步
#所有节点
yum -y install ntpdate
ntpdate ntp.aliyun.com
date
3.配置filebeat
#给nginx日志文件赋权
cd /var/log/nginx/
chmod 777 access.log error.log
#配置filebeat
cd /usr/local/filebeat
vim filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/nginx/error.log
tags: ["nginx"]
fields:
service_name: 20.0.0.10_nginx
log_type: nginx
from: 20.0.0.10
output.kafka:
enabled: true
hosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]
topic: "nginx"
--------------Elasticsearch output-------------------
(全部注释掉)
----------------Logstash output---------------------
(全部注释掉)
nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#启动filebeat
4.配置logstash
cd /opt/log/
vim kafka.conf
input {
kafka {
bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"
topics => "nginx"
type => "nginx_kafka"
codec => "json"
auto_offset_reset => "earliest"
decorate_events => true
}
}
output {
if "nginx" in [tags] {
elasticsearch {
hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
index => "nginx_access-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
logstash -f kafka.conf --path.data /opt/test1
日志收集(远程Apache+Mysql)
部署Filebeat
1.安装配置filebeat
#收集81服务器上的mysql和apache日志
cd /opt/
--上传filebeat-6.7.2-linux-x86_64.tar.gz--
tar -xf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 filebeat
cd filebeat/
vim filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /etc/httpd/logs/access_log
- /etc/httpd/logs/error_log
tags: ["httpd_81"]
fields:
service_name: 20.0.0.81_httpd
log_type: httpd
from: 20.0.0.81
- type: log
enabled: true
paths:
- /usr/local/mysql/data/mysql_general.log
tags: ["mysql_81"]
fields:
service_name: 20.0.0.81_mysql
log_type: mysql
from: 20.0.0.81
output.kafka:
enabled: true
hosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]
topic: "httpdmysql"
--------------Elasticsearch output-------------------
(全部注释掉)
----------------Logstash output---------------------
(全部注释掉)
nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#启动filebeat
2.配置logstash
10:
cd /opt/log/
vim 81_a+m.conf
input {
kafka {
bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"
topics => "httpdmysql"
type => "httpd+mysql_kafka"
codec => "json"
auto_offset_reset => "earliest"
decorate_events => true
}
}
output {
if "httpd_81" in [tags] {
elasticsearch {
hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
index => "nginx_access-%{+YYYY.MM.dd}"
}
}
if "mysql_81" in [tags] {
elasticsearch {
hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
index => "nginx_access-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
logstash -f 81_a+m.conf --path.data /opt/test2