部署Filebeat+Kafka+Logstash+Elasticsearch+Kibana集群详解
- 1. Kafka
- 1.1 Kafka概述
- 1.1.1 为什么需要消息队列(MQ)
- 1.1.2 使用消息队列的好处
- 1.2 消息队列的两种模式
- 1.3 Kafka定义
- 1.3.1 Kafka简介
- 1.3.2 Kafka的特性
- 1.3.3 Kafka系统架构
- 1.3.4 Partation数据路由规则
- 1.3.5 分区的原因
- 1.4 消息队列如何选择?
- 2.中间件
- 3.部署kafka集群
- 4.部署Filebeat+Kafka+ELK集群
- 5.知识点总结
- 5.1 中间件
- 5.2 消息队列
- 5.3 kafka架构
- 5.4 kafka的ack应答机制
参见安装与部署ELK详解
参见安装与部署EFLK详解
参见安装与部署Zookeeper集群详解
1. Kafka
1.1 Kafka概述
1.1.1 为什么需要消息队列(MQ)
主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发too many connection错误,引发雪崩效应。
我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队列常应用于异步处理,流量削峰,应用解耦,消息通讯等场景。
当前比较常见的MQ中间件有ActiveMQ、RabbitMQ、RocketMQ、Kafka、pulsar等。
1.1.2 使用消息队列的好处
(1)解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
(2)可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
(3)缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
(4)灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
(5)异步通信
很多时候,用户不想也不需要立即处理消息。息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
1.2 消息队列的两种模式
(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到消息队列中,然后消息消费者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息消费者不可能消费到已经被消费的消息。消息队列支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
(2)发布/订阅模式(一对多,又叫观察者模式,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
发布/订阅模式是定义对象间一种一对多的依赖关系,使得每当一个对象(目标对象)的状态发生改变,则所有依赖于它的对象(观察者对象)都会得到通知并自动更新。
1.3 Kafka定义
Kafka是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据领域的实时计算以及日志收集。
1.3.1 Kafka简介
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于Zookeeper协调的分布式消息中间件系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于hadoop的批处理系统、低延迟的实时系统、Spark/Flink流式处理引擎,nginx访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
1.3.2 Kafka的特性
高吞吐量、低延迟
Kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒。每个topic可以分多个Partition,Consumer Group对Partition进行消费操作,提高负载均衡能力和消费能力。
可扩展性
kafka集群支持热扩展
持久性、可靠性
消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性
允许集群中节点失败(多副本情况下,若副本数量为n,则允许 n-1个节点失败)
高并发
支持数千个客户端同时读写
1.3.3 Kafka系统架构
(1)Broker
一台kafka服务器就是一个broker。一个集群由多个broke 组成。一个broker可以容纳多个 topic。
(2)Topic
可以理解为一个队列,生产者和消费者面向的都是一个topic。
类似于数据库的表名或者ES的index
物理上不同topic的消息分开存储
(3)Partition
为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分割为一个或多个partition,每个partition是一个有序的队列。Kafka只保证partition内的记录是有序的,而不保证topic中不同partition的顺序。
每topic至少有一个partition,当生产者产生数据的时候,会根据分配策略选择分区,然后将消息追加到指定的分区的队列末尾。
1.3.4 Partation数据路由规则
1.指定了patition,则直接使用;
2.未指定patition但指定key(相当于消息中某个属性),通过对key的value进行hash取模,选出一个patition;
3.patition和key都未指定,使用轮询选出一个patition。
每条消息都会有一个自增的编号,用于标识消息的偏移量,标识顺序从0开始。
每个partition中的数据使用多个segment文件存储。
如果topic有多个partition,消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下(例如商品秒杀、 抢红包),需要将partitio数目设为1。
- broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
- 如果某topic有N个partition,集群有 (N+M) 个broker,那么其中有N个broker存储topic的一个 partition, 剩下的M个broker不存储该topic的partition数据。
- 如果某topic有N个partition,集群中broker数目少于N 个,那么一个broker存储该topi 的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
-
1.3.5 分区的原因
- 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
- 可以提高并发,因为可以以Partition为单位读写了。
(4)Replica
副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个 follower。
(5)Leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
(6)Follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower 与Leader保持数据同步。Follower只负责备份,不负责数据的读写。
如果Leader故障,则从Follower中选举出一个新的Leader。
当Follower挂掉、卡住或者同步太慢,Leader会把这个Follower从 ISR(Leader 维护的一个和 Leader保持同步的Follower集合) 列表中删除,重新创建一个Follower。
(7)Producer
生产者即数据的发布者,该角色将消息push发布到Kafka的topic中。
broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。
生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
(8)Consumer
消费者可以从broker中pull拉取数据。消费者可以消费多个topic中的数据。
(9)Consumer Group(CG)
消费者组,由多个consumer组成。
所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。可为每个消费者指定组名,若不指定组名则属于默认的组。
将多个消费者集中到一起去处理某一个Topic的数据,可以更快的提高数据的消费能力。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,防止数据被重复读取。
消费者组之间互不影响。
(10)offset偏移量
可以唯一的标识一条消息。
偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息(即消费位置)。
消息被消费之后,并不被马上删除,这样多个业务就可以重复使用Kafka的消息。
某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。
消息最终还是会被删除的,默认生命周期为 1 周(7*24小时)。
(11)Zookeeper
Kafka通过Zookeeper来存储集群的meta信息。
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中;从0.9版本开始,consumer默认将offset保存在Kafka 一个内置的topic 中,该topic为__consumer_offsets。
也就是说,zookeeper的作用就是,生产者push数据到kafka集群,就必须要找到kafka集群的节点在哪里,这些都是通过zookeeper去寻找的。消费者消费哪一条数据,也需要zookeeper的支持,从zookeeper获得offset,offset记录上一次消费的数据消费到哪里,这样就可以接着下一条数据进行消费。
1.4 消息队列如何选择?
综上所述,选择哪一种消息中间件需要根据具体的应用场景和需求来决定。
-
ActiveMQ作为老牌的消息队列,吞吐量比较低,也缺少大规模吞吐量场景的验证、社区活跃度也很低,数据持久化的支持一般,目前渐渐被淘汰,已经不是主流了,不太建议选择了。
-
RabbitMQ和RocketMQ社区比较活跃,吞吐量比较高,支持AMQP,稳定性也比较好,如果你的场景是应用需要可靠性消息传递和较高的并发,那么这两者是比较好的选择。
要注意,rabbitMQ是使用Erlang语言开发的,而RocketMQ则使用Java语言开发,所以如果是需要深度研究掌握的话,要考虑团队中是否有Erlang工程师,如果不具备相关的人才储备的话,更建议选择RocketMQ。当然,如果只是小团队简单使用,则
rabbitMQ是一个挺好的选择。
如果是大数据领域的实时计算、日志采集等场景,则选择Kafka和Pulsar都是一个不错的选择,Kafka经历了超大规模应用的验证,社区活跃度很高,性能也非常高,几乎是全世界这个领域的事实性的标准。
Pulsar作为新兴的分布式消息传递系统,可扩展性强、性能高、社区活跃度也很高,最重要的是支持存储和计算分离,这在云原生下是非常出色的一项能力,并且天然支持跨数据中心的容灾,目前的应用也越来越广泛,如果集群对于持久化要求高,数据级别是超大规模,对于机器成本敏感,且支持多数据中心容灾,则建议选择Pulsar。
2.中间件
中间件是一种独立的系统软件或服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源中间件位于客户机/服务器的操作系统之上,管理计算机资源和网络通讯是连接两个独立应用程序或独立系统的软件。相连接的系统,即使它们具有不同的接口
但通过中间件相互之间仍能交换信息。执行中间件的一个关键途径是信息传递通过中间件,应用程序可以工作于多平台或OS环境。
jdk: jdk是java的开发工具包,它是一种用于构建在Java平台上发布的应用程序、applet和组件的开发环境
3.部署kafka集群
###关闭和禁止防火墙开机自启功能
systemctl stop firewalld
systemctl disable firewalld
setenforce 0
sed -i 's/enforcing/disabled/' /etc/selinux/config
(1)安装kafka,将安装包上传到/opt目录中
cd /opt/
rz -E
#kafka_2.13-2.7.1.tgz
tar xf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka
(2)修改kafka的配置文件server.properties
cd /usr/local/kafka/config/
cp server.properties{,.bak}
vim server.properties
###修改以下内容
broker.id=0 #21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.80.10:9092 #31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
num.network.threads=3 #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.io.threads=8 #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400 #48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400 #51行,接收套接字的缓冲区大小
socket.request.max.bytes=104857600 #54行,请求套接字的缓冲区大小
log.dirs=/usr/local/kafka/logs #60行,kafka运行日志存放的路径,也是数据存放的路径
num.partitions=1 #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1 #69行,用来恢复和清理data下数据的线程数量
log.retention.hours=168 #103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
log.segment.bytes=1073741824 #110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
zookeeper.connect=192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 #123行,配置连接Zookeeper集群地址
(3)修改环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
(4)配置kafka启动脚本
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
echo "---------- Kafka 启动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 状态 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
(5)设置kafka开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka
(6)将kafka集群节点1中配置好的/usr/local/kafka目录,上传到另外两个kafka集群中
scp -r /usr/local/kafka 192.168.80.60:`pwd`
scp -r /usr/local/kafka 192.168.80.70:`pwd`
(7)kafka集群中的各节点,分别启动kafka服务
service kafka start
service kafka status
netstat -lntp | grep 9092
(8)Kafka命令行操作
- 创建topic
kafka-topics.sh --create --zookeeper 192.168.80.50:2181,192.168.80.60:2181,192.168.80.70:2181 --replication-factor 2 --partitions 3 --topic gzy
-------------------------------------------------------------------------------------
--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
--replication-factor:定义分区副本数,1 代表单副本,建议为 2
--partitions:定义分区数
--topic:定义 topic 名称
- 查看当前服务器中的所有topic
kafka-topics.sh --list --zookeeper 192.168.80.50:2181,192.168.80.60:2181,192.168.80.70:2181
- 查看某个topic的详情
kafka-topics.sh --describe --zookeeper 192.168.80.50:2181,192.168.80.60:2181,192.168.80.70:2181
- kafka节点1模拟生产者发布消息
kafka-console-producer.sh --broker-list 192.168.80.50:9092,192.168.80.60:9092,192.168.80.70:9092 --topic gzy
- kafka节点1模拟消费者消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.80.50:9092,192.168.80.60:9092,192.168.80.70:9092 --topic gzy --from-beginning
-------------------------------------------------------------------------------------
--from-beginning:会把主题中以往所有的数据都读取出来
-------------------------------------------------------------------------------------
- 修改分区数
kafka-topics.sh --zookeeper 192.168.80.50:2181,192.168.80.60:2181,192.168.80.70:2181 --alter --topic gzy --partitions 6
- 创建名为test的topic分区,并设置分区数为6
kafka-topics.sh --zookeeper 192.168.80.50:2181,192.168.80.60:2181,192.168.80.70:2181 --alter --topic test --partitions 6
###可以添加指定分区的分区数但是不可以减少分区数
kafka-topics.sh --zookeeper 192.168.80.50:2181,192.168.80.60:2181,192.168.80.70:2181 --alter --topic test --partitions 7
###减少分区数无法实现
kafka-topics.sh --zookeeper 192.168.80.50:2181,192.168.80.60:2181,192.168.80.70:2181 --alter --topic test --partitions 5
- 删除topic
kafka-topics.sh --delete --zookeeper 192.168.80.50:2181,192.168.80.60:2181,192.168.80.70:2181 --topic test
kafka-topics.sh --list --zookeeper 192.168.80.50:2181,192.168.80.60:2181,192.168.80.70:2181
4.部署Filebeat+Kafka+ELK集群
(1)部署Zookeeper+Kafka集群
参见安装与部署Zookeeper集群详解
(2)部署Filebeat
cd /opt/filebeat-6.7.2-linux-x86_64/
vim filebeat.yml
###在21行左右,修改如下参数内容
- type: log
enabled: true
paths:
- /var/log/httpd/access_log
tags: ["httpd_access"]
- type: log
enabled: true
paths:
- /var/log/httpd/error_log
tags: ["httpd_error"]
......
#添加输出到Kafka的配置,(将原先输出到logstash的配置注释)
###在173行左右添加如下内容
output.kafka:
enabled: true
hosts: ["192.168.80.50:9092","192.168.80.60:9092","192.168.80.70:9092"] #指定 Kafka 集群配置
topic: "httpd" #指定 Kafka 的 topic
#启动 filebeat
./filebeat -e -c filebeat.yml
(3)部署ELK,在Logstash组件所在节点上新建一个Logstash配置文件
cd /etc/logstash/conf.d/
vim kafka.conf
input {
kafka {
bootstrap_servers => "192.168.80.50:9092,192.168.80.60:9092,192.168.80.70:9092" #kafka集群地址
topics => "httpd" #拉取的kafka的指定topic
type => "httpd_kafka" #指定 type 字段
codec => "json" #解析json格式的日志数据
auto_offset_reset => "latest" #拉取最近数据,earliest为从头开始拉取
decorate_events => true #传递给elasticsearch的数据额外增加kafka的属性数据
}
}
output {
if "httpd_access" in [tags] {
elasticsearch {
hosts => ["192.168.80.10:9200","192.168.80.20:9200"]
index => "httpd_access-%{+YYYY.MM.dd}"
}
}
if "httpd_error" in [tags] {
elasticsearch {
hosts => ["192.168.80.10:9200","192.168.80.20:9200"]
index => "httpd_error-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
(4)启动logstash,检查自定义的kafka.conf配置文件
cd /etc/logstash/conf.d/
logstash -f /etc/logstash/conf.d/kafka.conf
(5)浏览器访问kibana页面,查看日志数据
http://192.168.80.10:5601
登录Kibana,单击"Create Index Pattern" 按钮,添加索引"httpd_access-*“和"httpd_error-*”,单击 “create” 按钮创建,单击 “Discover” 按钮可查看图表信息及日志信息。
5.知识点总结
5.1 中间件
消息队列型(MQ) ActiveMQ、RabbitMQ、RocketNQ、Kafka、Pulsar、Redis
web应用型(代理服务器) Nginx、Haproxy、LVS、Tomcat、php
5.2 消息队列
作用:应用解耦、异步处理、流量削峰、消息缓冲
模式:点对点模式(一对一,消费者消费消息后会删除消息)
发布/订阅模式(又称为观察者模式。一对多,消费者消费后不会删除消息)
5.3 kafka架构
broker kafka服务器节点
producer 生产者,发布消息到topic
consumer 消费者
consumer group 消费者组,是消息的实际订阅者,一个消费者组包含一个或多个消费者(组内成员不能重复消费同一个partition数据)
producer ——> topic消息队列 ——> patition分区 ——> replica副本(leader负责数据读写、follower只负责同步leader的数据)
consumer ——> offset偏移量(用来记录消费者上一次消费的位置)
zookeeper 存储kafka集群的元数据信息,生产者和消费者的动作都需要zookeeper的管理和支持。
比如生产者推送数据到kafka集群需要通过zookeeper去循环找kafka服务器节点的位置,需要从zookeeper获取offiset记录的上一次消费位置继续往后消费
kafka只能保证partition分区内的消息顺序,消费时无法保证partition之间的顺序,
如需要严格保证消息的消费顺序(商品秒杀、抢红包等场景)要把partition数量设置为1。
5.4 kafka的ack应答机制
kafka 会通过 ack 应答机制保证数据的可靠性
ack 配置参数 0 (效果类似于异步复制,不等待follower同步完成即可让生产者发下一条信息)
1 (效果类似于半同步复制,至少等待一个follower同步完成才让生产者发下一条消息)
-1 (效果类似于全同步复制,要等待所有follower同步完成才让生产者发下一条消息>)