Kafka 架构深入介绍 及搭建Filebeat+Kafka+ELK

news2025/1/22 15:59:00

目录

一        架构深入介绍

(一)Kafka 工作流程及文件存储机制

(二)数据可靠性保证

(三)数据一致性问题

(四)故障问题

(五)ack 应答机制

二      实验搭建Filebeat+Kafka+ELK

(一)实验环境

(二)架构图

(三)实验模拟

1,部署 Zookeeper+Kafka 集群

2,66机器部署 Filebeat 

3,  66机器安装httpd

4,部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件

5,生产黑屏操作es时查看所有的索引


一        架构深入介绍

(一)Kafka 工作流程及文件存储机制

1,Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。

2,topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

3,由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。

4,ndex 和 log 文件以当前 segment 的第一条消息的 offset 命名。

5,“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

(二)数据可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
 

(三)数据一致性问题

LEO:指的是每个副本最大的 offset; 
HW:指的是消费者能见到的最大的 offset,所有副本中最小的 LEO。
 

(四)故障问题

(1)follower 故障 
follower 发生故障后会被临时踢出 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合),待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

(2)leader 故障 
leader 发生故障之后,会从 ISR 中选出一个新的 leader, 之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。

注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。 

(五)ack 应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡选择。

当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

●0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。

●1(默认配置):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。

●-1(或者是all):producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。

三种机制性能依次递减,数据可靠性依次递增。

 注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。

二      实验搭建Filebeat+Kafka+ELK

(一)实验环境

Node1节点(2C/4G):node1/192.168.217.77                   Elasticsearch  Kibana
Node2节点(2C/4G):node2/192.168.217.88                    Elasticsearch
Apache节点:apache/192.168.217.99                                  Logstash  Apache
Filebeat节点:filebeat/192.168.217.66                                   Filebeat

zookeeper 集群;         192.168.217.22 /44/55

kafka 集群 :               192.168.217.22 /44/55

(二)架构图

(三)实验模拟

1,部署 Zookeeper+Kafka 集群

上章有详细介绍

2,66机器部署 Filebeat 

写filebeat 的配置文件:

vim /etc/filebeat/filebeat.yml

重启 filebeat

3,  66机器安装httpd

4,部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件

99机器:

代码如下:

input {
    kafka {
        bootstrap_servers => "192.168.217.22:9092,192.168.217.44:9092,192.168.217.55:9092"  #kafka集群地址
        topics  => "httpd"     #拉取的kafka的指定topic
        type => "httpd_kafka"  #指定 type 字段
        codec => "json"        #解析json格式的日志数据
		auto_offset_reset => "latest"  #拉取最近数据,earliest为从头开始拉取
		decorate_events => true   #传递给elasticsearch的数据额外增加kafka的属性数据
    }
}

output {
  if "access" in [tags] {
    elasticsearch {
      hosts => ["192.168.217.77:9200"]
      index => "httpd_access-%{+YYYY.MM.dd}"
    }
  }
  
  if "error" in [tags] {
    elasticsearch {
      hosts => ["192.168.217.77:9200"]
      index => "httpd_error-%{+YYYY.MM.dd}"
    }
  }
  
  stdout { codec => rubydebug }
}

启动logstash

5,生产黑屏操作es时查看所有的索引

去到77 es node1 

curl -X GET "localhost:9200/_cat/indices?v"

三    常见问题报错

 例如如图所示:  在logstash 节点 启动 logstash时,报以下错误:

[root@logstash logstash]/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka.conf 


ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
16:23:10.325 [LogStash::Runner] FATAL logstash.runner - Logstash could not be started because there is already another instance using the configured data directory.  If you wish to run multiple instances, you must change the "path.data" setting.

解决方法:

ps -aux |grep logstash 找到进程号   然后kill -9 关闭

再重新启动logstash

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1597414.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【max材质addtive叠加模式特效渲染不出通道的解决办法】

max材质addtive叠加模式特效渲染不出通道的解决办法 2021-12-22 18:15 max的scanline扫描线,vray渲染可以,红移不行(只支持它自己的材质,它自己的材质没有additive模式)。据说mr是可以的。 右侧的球体使用附加不透明度。 附加不透明度通过将…

关于机器学习/深度学习的一些事-答知乎问(六)

如何使用频率域变换对序列数据进行增强? 时频变换是常见的信号分析思路,同样可用于数据增强。在频率域添加噪声是方法之一。比如可以对传感器信号应用短时傅里叶变换STFT得到具有时序关系的谱特征,再在谱特征上应用两种数据增强方法。一是对…

Amazon MemoryDB for Redis的探索和实践

一、背景 由于当下项目的日益增长的数据量,单机Redis已经远远不能满足我们的要求。考虑转成集群,但是直接在服务器中搭建Redis集群的话,EC2挂掉则Redis也会随之挂掉,耦合性太强。所以将Redis相关的服务全部抽取在单独的服务器上或…

论文笔记:Time Travel in LLMs: Tracing Data Contamination in Large Language Models

iclr 2024 spotlight reviewer评分 688 1 intro 论文认为许多下游任务(例如,总结、自然语言推理、文本分类)上观察到的LLMs印象深刻的表现可能因数据污染而被夸大 所谓数据污染,即这些下游任务的测试数据出现在LLMs的预训练数据…

ins视频批量下载,instagram批量爬取视频信息

简介 Instagram 是目前最热门的社交媒体平台之一,拥有大量优质的视频内容。但是要逐一下载这些视频往往非常耗时。在这篇文章中,我们将介绍如何使用 Python 编写一个脚本,来实现 Instagram 视频的批量下载和信息爬取。 我们使用selenium获取目标用户的 HTML 源代码,并将其保存…

MySQL模糊查询

一、MySQL通配符模糊查询(%,_) 1.1.通配符的分类 1.“%”百分号通配符:表示任何字符出现任意次数(可以是0次) 2.“_”下划线通配符:表示只能匹配单个字符,不能多也不能少,就是一个字符。当然…

计算机组成原理【CO】Ch3 存储系统

文章目录 考纲3.1 存储系统概述3.2 主存储器3.3 主存储器与CPU的连接3.4 外部存储器3.5 高速缓冲存储器3.6 虚拟存储器【※】存储系统总体流程图【※】各个部件的存储位置计算机存储相关硬件与数据结构说明进程控制块(PCB)页表页表始址页表始址寄存器(PTR)MMU(内存管理单元…

Springboot Gateway 报错Failed to resolve “bogon”的原因及解决办法

一、问题出现原因及初步分析 今天遇到一个奇怪的错误,一个一直正确运行的微服务后台,突然无法访问,如何重启都会报错。 想到近期有人在服务器上安装过其它服务,因此,考虑可能是配置问题,可配置问题修复后…

实时数据同步之Maxwell和Canal

文章目录 一、概述1、实时同步工具概述1.1 Maxwell 概述1.2 Canal概述 2、数据同步工作原理2.1 MySQL 主从复制过程2.2 两种工具工作原理 3、MySQL 的 binlog详解3.1 什么是 binlog3.2 binlog 的开启3.3 binlog 的分类设置 4、Maxwell和Canal对比5、环境安装 二、Maxwell 使用1…

信也科技网络自动化实践-网络策略管理

1、背景 随着各种法律法规和行业标准的出台和更新,企业或组织需要遵守各种安全合规性要求。网络安全策略管理需要符合这些要求,从而保障企业或组织的安全和合规性。网络安全策略管理需要涵盖企业或组织的整个网络生命周期,包括网络规划、设计…

halcon 3.2标定相机

参考《solution_guide_iii_c_3d_vision.pdf》 3.2.2.2 Which Distortion Model to Use 选用何种畸变模型 对于面阵相机,halcon中两种畸变模型:The division model and the polynomial model(差分模型和多项式模型),前…

MLOps

参考: 什么是MLOps?与DevOps有何异同?有什么价值?https://baijiahao.baidu.com/s?id1765071998288593530&wfrspider&forpcMLOps简介_AI开发平台ModelArts_WorkflowMLOps(Machine Learning Operation)是机器学习&#xf…

kafka(六)——存储策略

存储机制 kafka通过topic作为主题缓存数据,一个topic主题可以包括多个partition,每个partition是一个有序的队列,同一个topic的不同partiton可以分配在不同的broker(kafka服务器)。 关系图 partition分布图 名称为t…

Unity 扩展自定义编辑器窗口

在Assets文件夹路径下任意位置创建Editor文件夹,将扩展编辑器的代码放在Editor文件夹下 生成编辑器窗口 代码中首先引用命名空间 using UnityEditor; 然后将创建的类继承自EditorWindow public class MenuEditor : EditorWindow 然后通过扩展编辑器菜单功能调用…

AndroidStudio 导出aar包,并使用

打包 1、确认当前选项是否勾选,如未勾选请先勾选。 2、勾选完成后重启Android Studio。 3、重启完成后,选中要打包的module 4、打包完成 使用 1.在项目中新建libs,放入aar文件。 2.修改配置 添加如下代码 flatDir {dirs("libs")}3.修改app…

【BEVHeight论文阅读】自动驾驶车路协同车端感知算法

论文名称:BEVHeight: A Robust Framework for Vision-based Roadside 3D Object Detection 论文地址:https://arxiv.org/pdf/2303.08498.pdf 代码地址:https://github.com/ADLab-AutoDrive/BEVHeight 总结:这篇文章比较有意思的点…

单元测试四大过程

单元测试四大过程(蓝桥课学习笔记) 单元测试过程 单元测试是软件测试过程中的一个关键环节,它与集成测试、系统测试一样,分为测试策划、测试设计、测试执行和测试总结几个阶段。 单元测试过程中每个阶段需要完成的主要工作如下&…

ActiveMQ主从架构和集群架构的介绍及搭建

一、主从和集群架构的特点 1.1 主从架构的-Master/slave模式特点 读写分离,纵向扩展,所有的写操作一般在master上完成,slave只提供一个热备 1.2 集群架构-Cluster模式特点 分布式的一种存储,水平的扩展,消息的分布…

基于WOA优化的CNN-LSTM-Attention的时间序列回归预测matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1卷积神经网络(CNN)在时间序列中的应用 4.2 长短时记忆网络(LSTM)处理序列依赖关系 4.3 注意力机制(Attention) 4…

聚类能代替分类吗?

聚类和分类是两种不同的机器学习方法,它们在处理数据时有着不同的目的和应用场景。 分类:分类是一种监督学习方法,它需要已标记的训练数据集。在分类中,算法会学习如何将输入数据映射到预定义的类别中。例如,给定一组包…