ELK 企业级日志分析系统(四)

news2025/1/18 11:49:42

ELK

  • 一、部署Kafka集群
  • 二、Kafka的命令行操作
  • 三、Kafka架构深入
  • 四、Filebeat+Kafka+ELK部署

一、部署Kafka集群

1.下载安装包
官方下载地址:http://kafka.apache.org/downloads.html

cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz

2.安装 Kafka
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka

//修改配置文件
cd /usr/local/kafka/config/
cp server.properties{,.bak}

vim server.properties
broker.id=0 ●21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.80.10:9092 ●31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
num.network.threads=3 #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.io.threads=8 #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400 #48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400 #51行,接收套接字的缓冲区大小
socket.request.max.bytes=104857600 #54行,请求套接字的缓冲区大小
log.dirs=/usr/local/kafka/logs #60行,kafka运行日志存放的路径,也是数据存放的路径
num.partitions=1 #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1 #69行,用来恢复和清理data下数据的线程数量
log.retention.hours=168 #103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
log.segment.bytes=1073741824 #110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
zookeeper.connect=192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 ●123行,配置连接Zookeeper集群地址

//修改环境变量
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH= P A T H : PATH: PATH:KAFKA_HOME/bin

source /etc/profile

//配置 Zookeeper 启动脚本
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME=‘/usr/local/kafka’
case $1 in
start)
echo “---------- Kafka 启动 ------------”
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo “---------- Kafka 停止 ------------”
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
0 s t a r t ; ; s t a t u s ) e c h o " − − − − − − − − − − K a f k a 状态 − − − − − − − − − − − − " c o u n t = 0 start ;; status) echo "---------- Kafka 状态 ------------" count= 0start;;status)echo"Kafka状态"count=(ps -ef | grep kafka | egrep -cv “grep|$$”)
if [ “$count” -eq 0 ];then
echo “kafka is not running”
else
echo “kafka is running”
fi
;;
*)
echo “Usage: $0 {start|stop|restart|status}”
esac

//设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka

//分别启动 Kafka
service kafka start

二、Kafka的命令行操作

3.Kafka 命令行操作
//创建topic
kafka-topics.sh --create --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --replication-factor 2 --partitions 3 --topic test
在这里插入图片描述

–zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
–replication-factor:定义分区副本数,1 代表单副本,建议为 2
–partitions:定义分区数
–topic:定义 topic 名称

//查看当前服务器中的所有 topic
kafka-topics.sh --list --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181
在这里插入图片描述
//查看某个 topic 的详情
kafka-topics.sh --describe --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181
在这里插入图片描述
//发布消息
kafka-console-producer.sh --broker-list 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test
在这里插入图片描述
//消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092 --topic test --from-beginning
在这里插入图片描述
–from-beginning:会把主题中以往所有的数据都读取出来

//修改分区数
kafka-topics.sh --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --alter --topic test --partitions 6
在这里插入图片描述
在这里插入图片描述
这里的修改只能是增加,不能减少,减少只能删除这个topic

//删除 topic
kafka-topics.sh --delete --zookeeper 192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181 --topic test
在这里插入图片描述

三、Kafka架构深入

//Kafka 工作流程及文件存储机制
Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。

topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。

index 和 log 文件以当前 segment 的第一条消息的 offset 命名。

“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

//数据可靠性保证
为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

//数据一致性问题
LEO:指的是每个副本最大的 offset;
HW:指的是消费者能见到的最大的 offset,所有副本中最小的 LEO。

(1)follower 故障
follower 发生故障后会被临时踢出 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合),待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

(2)leader 故障
leader 发生故障之后,会从 ISR 中选出一个新的 leader, 之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。

注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

//ack 应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡选择。

当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:
●0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。

●1(默认配置):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。

●-1(或者是all):producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。

三种机制性能依次递减,数据可靠性依次递增。

注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。

四、Filebeat+Kafka+ELK部署

1.部署 Zookeeper+Kafka 集群

2.部署 Filebeat
cd /usr/local/filebeat

vim filebeat.yml
filebeat.prospectors:
- type: log
enabled: true
paths:
- /var/log/httpd/access_log
tags: [“access”]

- type: log
enabled: true
paths:
- /var/log/httpd/error_log
tags: [“error”]
在这里插入图片描述


#添加输出到 Kafka 的配置
output.kafka:
enabled: true
hosts: [“192.168.80.10:9092”,“192.168.80.11:9092”,“192.168.80.12:9092”] #指定 Kafka 集群配置
topic: “httpd” #指定 Kafka 的 topic
在这里插入图片描述

#启动 filebeat
./filebeat -e -c filebeat.yml

3.部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件
cd /etc/logstash/conf.d/

vim kafka.conf
input {
kafka {
bootstrap_servers => “192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092” #kafka集群地址
topics => “httpd” #拉取的kafka的指定topic
type => “httpd_kafka” #指定 type 字段
codec => “json” #解析json格式的日志数据
auto_offset_reset => “latest” #拉取最近数据,earliest为从头开始拉取
decorate_events => true #传递给elasticsearch的数据额外增加kafka的属性数据
}
}

output {
if “access” in [tags] {
elasticsearch {
hosts => [“192.168.80.30:9200”]
index => “httpd_access-%{+YYYY.MM.dd}”
}
}

if “error” in [tags] {
elasticsearch {
hosts => [“192.168.80.30:9200”]
index => “httpd_error-%{+YYYY.MM.dd}”
}
}

stdout { codec => rubydebug }
}

#启动 logstash
logstash -f kafka.conf

4.浏览器访问 http://192.168.80.30:5601 登录 Kibana,单击“Create Index Pattern”按钮添加索引“filebeat_test-*”,单击 “create” 按钮创建,单击 “Discover” 按钮可查看图表信息及日志信息。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/755170.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Halcon会慢慢被淘汰吗?

作为一个语言模型,我不能提供预测未来的确凿答案。然而,我可以提供一些关于Halcon和机器视觉市场的观察和趋势。 我这里刚好有嵌入式、单片机、plc的资料需要可以私我或在评论区扣个6 Halcon是一款商业机器视觉软件,具有广泛的应用和用户群…

“AI in the Alps“:身体与精神的一场盛宴

作者:Christofer Dutz 得益于 Timecho 的组织和安排,我最近参加了一个精彩绝伦的活动 “AI in the Alps”,并从中收获颇丰。 这次活动是由德国工业界知名博客 “Industrial AI Podcast”(http://aipod.de)的组织者 Ro…

metersphere数据库SQL断言应用

在使用metersphere的时候,需要查询数据库数据来验证接口是否正常,在查看使用手册时,发现不是很明确,在研究一点时间后,终于明白,在此写下心得。 metersphere使用手册地址:接口测试 - 接口用例操…

【探索AI未来】人工智能技术在软件开发中的应用与革新

自我介绍⛵ 📣我是秋说,研究人工智能、大数据等前沿技术,传递Java、Python等语言知识。 🙉主页链接:秋说的博客 📆 学习专栏推荐:人工智能:创新无限、MySQL进阶之路、C刷题集、网络安…

第一个Matplotlib绘图程序

本节学习第一个 Matplotlib 绘图程序,如何使用 Matplotlib 绘制一个简单的折线图。下面绘制一个简单正弦曲线图,它显示了角度与正弦函数值之间的关系。 第一个绘图程序 首先导入 Matplotlib 包中的 Pyplot 模块,并以 as 别名的形式简化引入…

【C#】并行编程实战:同步原语(4)

在第4章中讨论了并行编程的潜在问题,其中之一就是同步开销。当将工作分解为多个工作项并由任务处理时,就需要同步每个线程的结果。线程局部存储和分区局部存储,某种程度上可以解决同步问题。但是,当数据共享时,就需要用…

使用 EMQX 和 eKuiper 进行 MQTT 流处理:快速教程

引言 MQTT 协议是一种专为物联网应用而设计的轻量级消息传输协议。它具有简单、开放、易于实现的特点,是物联网应用的理想选择。MQTT 数据以连续实时的方式进行传输,非常适合由流处理引擎进行处理。 EMQX 是一款大规模分布式物联网 MQTT Broker&#x…

队列--C语言实现数据结构

本期带大家一起用C语言实现队列🌈🌈🌈 文章目录 1、队列的概念2、队列的操作流程3 、队列的结构4、队列的实现4.1 队列的结构设计4.2 队列的初始化4.3 入队4.4 判断队列是否为空4.5 出队4.6 获取队头数据4.7 获取队尾数据4.8 获取队列当中数据…

HTTP1和HTTP2和HTTP3的区别

超文本传输协议是一个简单的请求-响应协议,它通常运行在TCP之上。 目录 HTTP1.1: HTTP2 HTTP3 参考文献 HTTP1.1: 特点: 1.一条链接只能一次请求一次返回这样子来回。一般的我们浏览器会帮我们一次次请求和收到。…

安卓UI:SearchView

目录 一、SearchView介绍 二、常用方法 (一)、监听器: (二)、常用方法: (三)、其他常用方法 三、例子: MainActivity2 : ChatListAdapter : item_people_view: activity_main2: 运行结果…

043、TiDB特性_缓存表和分区表

针对于优化器在索引存在时依然使⽤全表扫描的情况下,使⽤缓存表和分区表是提升查询性能的有效⼿段。 缓存表 缓存表是将表的内容完全缓存到 TiDB Server 的内存中表的数据量不⼤,⼏乎不更改读取很频繁缓存控制: ALTER TABLE table_name CACHE|NOCACHE; # 使用tr…

【Ubuntu】安装docker-compose

要在Ubuntu上安装Docker Compose,可以按照以下步骤进行操作: 下载 Docker Compose 二进制文件: sudo curl -L "https://github.com/docker/compose/releases/latest/download/docker-compose-$(uname -s)-$(uname -m)" -o /usr/loc…

产业大模型刚开卷,京东跑进“最后半公里”

点击关注 文|姚 悦 编|王一粟 “京东一直在探索哪些产品、技术、场景可以真正把大模型用起来,在我们内部的场景中反复验证后,才决定在7月份对外发布,现在我们在零售、健康、物流、金融等业务场景里已经积累了一些经…

架构训练营:3-3设计备选方案与架构细化

3架构中期 什么是备选架构? 备选架构定义了系统可行的架构模式和技术选型 备选方案筛选过程 头脑风暴 :对可选技术进行排列组合,得到可能的方案 红线筛选:根据系统明确的约束和限定,一票否决某些方案(主要…

Java分布式项目常用技术栈简介

Spring-Cloud-Gateway : 微服务之前架设的网关服务,实现服务注册中的API请求路由,以及控制流速控制和熔断处理都是常用的架构手段,而这些功能Gateway天然支持 运用Spring Boot快速开发框架,构建项目工程;并结合Spring…

java错误:不支持发行版本5或java: 不再支持源选项 5。请使用 6 或更高版本的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

Docker本地镜像发布到私有库

Docker Registry(Docker镜像仓库) 使用Docker Registry,可以创建私有或公共的镜像仓库,以存储Docker镜像。私有仓库可以用于存储公司内部的镜像,或者用于个人项目的镜像。公共仓库则会将发布的镜像分享到全世界。 1 …

【C++】string模拟实现

个人主页🍖:在肯德基吃麻辣烫 文章目录 前言一、string的成员变量二、string默认成员函数1.构造函数1.1 无参构造(默认构造)1.2 普通构造1.3无参构造和全缺省构造可以合并 浅拷贝和深拷贝2.拷贝构造3.赋值运算符重载4.析构函数 三、[]的下标访问和iterat…

P5 第二章 电阻电路的等效变换

1、电阻的Y形联结和△形联结的等效变换 可以发现电阻的Y形联结和△形联结可以刻画成下图模型: 如果Y形联结和△形联结i1,i2,i3都相等,则可以列公式解出R1,R2,R3之间的大小关系。 电路普遍存在对偶关系,可以将上图的电阻换成电导&#xff0c…

干货 | 一个漏洞利用工具仓库

0x00 Awesome-Exploit 一个漏洞证明/漏洞利用工具仓库 不定期更新 部分漏洞对应POC/EXP详情可参见以下仓库: https://github.com/Threekiii/Awesome-POC https://github.com/Threekiii/Vulhub-Reproduce 0x01 项目导航 ActiveMQ CVE-2015-5254 Apisix CVE-2…