前言
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
一、Zookeeper 概述
官方下载地址:archive.apache.org/dist/zookee…
1.1 Zookeeper 定义
zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。
1.2 Zookeeper 工作机制
二。leader的选举:
第一次选举:
启动1
se 1 不够半数无法完成 looking
启动2 : 交换选票
启动3: 选票超过半数为leading状态
启动4: 启动已经没用了,leader已经固定下来了
启动5:。。。。。。。。
非第一次选举机制:(leader挂了后)
源leader挂了之后选leader
1.看 epoch 任期
2.(性能)看事务id
3.最后看服务id
Zookeeper 应用场景
(1)1个leader,多个follower跟随者
(2)集群中只要有半数存货,zookeeper
(3)树状
(4)软负载均衡,
(5)服务器动态上下线
kafka架构:
分布式消息队列中间件:支持分区,多副本,基于zookeeper
scale语言编写 通过zookeeper来存储元数据
1.高吞吐,低延时。
三、部署Filebeat+Kafka+ELK
三台服务器已搭建好Zookeeper 集群:
192.168.6.151
192.168.6.157
192.168.6.152
两台elasticsearch
192.168.6.155
192.168.6.156
一台filebeat
192.168.6.170
一台kiban同时部署logstash
192.168.6.152
ELFK+kafka的实现过程
firebeat轻量级收集工具,由firebeat进行日志收集,经过kafka进行流量的削峰,异步处理,缓冲,
再把数据进行logstash进行input,filter,output,输出到elastisearch上,master节点监控其它节点状态
data节点去进行数据读写,client节点分担data节点的压力。在kibana上进行输出展示。
实验步骤:
部署 Filebeat
cd /usr/local/filebeat
vim filebeat.yml
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/messages
- /var/log/*.log
......
#添加输出到 Kafka 的配置
output.kafka:
enabled: true
hosts: ["192.168.6.154:9092","192.168.6.157:9092","192.168.6.:9092","192.168.6.151:9092"] #指定 Kafka 集群配置
topic: "filebeat_test" #指定 Kafka 的 topic
#启动 filebeat
./filebeat -e -c filebeat.yml
部署 ELK
在 Logstash 组件所在节点上新建一个 Logstash 配置文件。
cd /etc/logstash/conf.d/
vim filebeat.conf
input {
kafka {
bootstrap_servers => "192.168.6.151:9092,192.168.6.154:9092,192.168.6.157:9092"
topics => "filebeat_test"
group_id => "test123"
auto_offset_reset => "earliest"
}
}
filter {
grok {
match => ["message", "(?<remote_addr>%{IPV4}|%{IPV6})[\s-]+\[(?<logTime>.+)\] \"(?<http_mpthod>.+) (?<url_path>/.*) (?<http_ver>.+)\" (?<rev_code>\d+) \d+ \".*\" \"(?<User_agent>.+)\" \".*\""]
}
mutate {
add_field => ["location", "nanjing"]
replace => { "host" => "httpd_server" }
rename => { "path" => "LogPath" }
remove_field => ["message","@version"]
}
date {
match => ["logTime", "dd/MM/yyyy:HH:mm:ss Z"]
timezone => "Asia/Shanghai"
}
}
output {
elasticsearch {
hosts => ["192.168.6.155:9200"]
index => "filebeat_test-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
#启动 logstash
logstash -f filebeat.conf
浏览器访问测试
浏览器访问 [http://192.168.6.152:5601] 登录 Kibana,单击“Create Index Pattern”按钮添加索引“filebeat_test-*”,单击 “create” 按钮创建,单击 “Discover” 按钮可查看图表信息及日志信息。
概述
EFLFK架构:ELK + Filebeat + Kafka。
部署 kafka 需要先部署 zookeeper。(kafka从3.0版本之后,不再依赖zookeeper)
zookeeper
zookeeper : 分布式的系统管理框架, 作用: 文件系统 + 通知机制
本质: 存储和管理 分布式应用的元数据,如果应用服务状态发生变化则会通知客户端。
消息队列 MQ
web应用中间件 : nginx tomcat apache haproxy squid varnish
MQ消息队列中间件 : redis kafka rabbitMQ rocketMQ activeMQ
kafka 架构
broker: kafka服务器,一个kafka由多个broker组成。
topic: 一个消息队列,生产者和消费者面向的都是topic。
producer: 生产者push 推送消息数据到broker 的topic中。
consumer: 消费者pull 从broker的topic中拉取消息数据。
partition: 分区,一个topic可以被分成一个或者多个partition分区,用来加快消息的传输(读写)。
- partition中的消息数据是有序的,partition之间是无序。在秒杀、红包等要求有序场景中,只能使用一个partition。
副本: 对partition进行备份,leader负责读写,follow负责备份。
offset: 偏移量,记录消费者消费消息的位置,记录消费者上一次消费的数据到哪里了,这样就可以接着下-a条数据继续进行消费。
zookeeper: 保存kafka集群的元信息,保存offset。 结合kafka,生产者推送数据到kafka集群时需要通过zk去寻找kafka的位置,消费者消费哪条数据也需要zk的支持,因为可以从zk中获得offset。