kafka集群和Filebeat+Kafka+ELK

news2025/1/23 12:18:35

一、Kafka 概述

1.1 为什么需要消息队列(MQ)

主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。
我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队列常应用于异步处理,流量削峰,应用解耦,消息通讯等场景。

当前比较常见的 MQ 中间件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka、Pulsar 等。

1.2 使用消息队列的好处

(1)解耦
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

(2)可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

(3)缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

(4)灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

(5)异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

1.3 消息队列的两种模式

(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到消息队列中,然后消息消费者从消息队列中取出并且消费消息。消息被消费以后,消息队列中不再有存储,所以消息消费者不可能消费到已经被消费的消息。消息队列支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

(2)发布/订阅模式(一对多,又叫观察者模式,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。
发布/订阅模式是定义对象间一种一对多的依赖关系,使得每当一个对象(目标对象)的状态发生改变,则所有依赖于它的对象(观察者对象)都会得到通知并自动更新。

1.4 Kafka 定义

Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQ,Message Queue),主要应用于大数据领域的实时计算以及日志收集。

1.5 Kafka 简介

Kafka 是最初由 Linkedin 公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于 Zookeeper 协调的分布式消息中间件系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景,比如基于 hadoop 的批处理系统、低延迟的实时系统、Spark/Flink 流式处理引擎,nginx 访问日志,消息服务等等,用 scala 语言编写,
Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目。

1.6 Kafka 的特性

●高吞吐量、低延迟
Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。每个 topic 可以分多个 Partition,Consumer Group 对 Partition 进行消费操作,提高负载均衡能力和消费能力。

●可扩展性
kafka 集群支持热扩展

●持久性、可靠性
消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

●容错性
允许集群中节点失败(多副本情况下,若副本数量为 n,则允许 n-1 个节点失败)

●高并发
支持数千个客户端同时读写

1.7 Kafka 系统架构

(1)Broker
一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。

(2)Topic
可以理解为一个队列,生产者和消费者面向的都是一个 topic。
类似于数据库的表名或者 ES 的 index
物理上不同 topic 的消息分开存储

(3)Partition
为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分割为一个或多个 partition,每个 partition 是一个有序的队列。Kafka 只保证 partition 内的记录是有序的,而不保证 topic 中不同 partition 的顺序。

每个 topic 至少有一个 partition,当生产者产生数据的时候,会根据分配策略选择分区,然后将消息追加到指定的分区的队列末尾。
##Partation 数据路由规则:
1.指定了 patition,则直接使用;
2.未指定 patition 但指定 key(相当于消息中某个属性),通过对 key 的 value 进行 hash 取模,选出一个 patition;
3.patition 和 key 都未指定,使用轮询选出一个 patition。

每条消息都会有一个自增的编号,用于标识消息的偏移量,标识顺序从 0 开始。

每个 partition 中的数据使用多个 segment 文件存储。

如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下(例如商品秒杀、 抢红包),需要将 partition 数目设为 1。

●broker 存储 topic 的数据。如果某 topic 有 N 个 partition,集群有 N 个 broker,那么每个 broker 存储该 topic 的一个 partition。
●如果某 topic 有 N 个 partition,集群有 (N+M) 个 broker,那么其中有 N 个 broker 存储 topic 的一个 partition, 剩下的 M 个 broker 不存储该 topic 的 partition 数据。
●如果某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。

1.8 分区的原因

●方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
●可以提高并发,因为可以以Partition为单位读写了。

(4)Replica
副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。

(5)Leader
每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition。

(6)Follower
Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。Follower 只负责备份,不负责数据的读写。
如果 Leader 故障,则从 Follower 中选举出一个新的 Leader。
当 Follower 挂掉、卡住或者同步太慢,Leader 会把这个 Follower 从 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合) 列表中删除,重新创建一个 Follower。

(7)Producer
生产者即数据的发布者,该角色将消息 push 发布到 Kafka 的 topic 中。
broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中。
生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition。

(8)Consumer
消费者可以从 broker 中 pull 拉取数据。消费者可以消费多个 topic 中的数据。

(9)Consumer Group(CG)
消费者组,由多个 consumer 组成。
所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。可为每个消费者指定组名,若不指定组名则属于默认的组。
将多个消费者集中到一起去处理某一个 Topic 的数据,可以更快的提高数据的消费能力。
消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,防止数据被重复读取。
消费者组之间互不影响。

(10)offset 偏移量
可以唯一的标识一条消息。
偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息(即消费位置)。
消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 Kafka 的消息。
某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。
消息最终还是会被删除的,默认生命周期为 1 周(7*24小时)。

(11)Zookeeper
Kafka 通过 Zookeeper 来存储集群的 meta 信息。

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中;从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为 __consumer_offsets。

也就是说,zookeeper的作用就是,生产者push数据到kafka集群,就必须要找到kafka集群的节点在哪里,这些都是通过zookeeper去寻找的。消费者消费哪一条数据,也需要zookeeper的支持,从zookeeper获得offset,offset记录上一次消费的数据消费到哪里,这样就可以接着下一条数据进行消费。

二、部署 kafka 集群

1.下载安装包

官方下载地址:http://kafka.apache.org/downloads.html

cd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz

2.安装 Kafka

cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka

3. 修改配置文件

cd /usr/local/kafka/config/
cp server.properties{,.bak}

vim server.properties


broker.id=0    ●21行,broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=1、broker.id=2
listeners=PLAINTEXT://192.168.80.10:9092    ●31行,指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
num.network.threads=3    #42行,broker 处理网络请求的线程数量,一般情况下不需要去修改
num.io.threads=8         #45行,用来处理磁盘IO的线程数量,数值应该大于硬盘数
socket.send.buffer.bytes=102400       #48行,发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400    #51行,接收套接字的缓冲区大小
socket.request.max.bytes=104857600    #54行,请求套接字的缓冲区大小
log.dirs=/usr/local/kafka/logs        #60行,kafka运行日志存放的路径,也是数据存放的路径
num.partitions=1    #65行,topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1    #69行,用来恢复和清理data下数据的线程数量
log.retention.hours=168    #103行,segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
log.segment.bytes=1073741824    #110行,一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
zookeeper.connect=192.168.80.10:2181,192.168.80.11:2181,192.168.80.12:2181    ●123行,配置连接Zookeeper集群地址

 

4. 修改环境变量

vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin


source /etc/profile

5. 配置 Zookeeper 启动脚本

vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/usr/local/kafka'
case $1 in
start)
	echo "---------- Kafka 启动 ------------"
	${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
	echo "---------- Kafka 停止 ------------"
	${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
	$0 stop
	$0 start
;;
status)
	echo "---------- Kafka 状态 ------------"
	count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
	if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

//设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka

//分别启动 Kafka
service kafka start

6. Kafka 命令行操作

创建topic

kafka-topics.sh --zookeeper 192.168.136.196:2181,192.168.136.197:2181,192.168.136.198:2181 --partitions 1 --replication-factor 2 --create --topic CXK

--zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
--replication-factor:定义分区副本数,1 代表单副本,建议为 2 
--partitions:定义分区数 
--topic:定义 topic 名称
 

查看当前服务器中的所有 topic
kafka-topics.sh --zookeeper 192.168.136.196:2181,192.168.136.197:2181,192.168.136.198:2181 --list

查看某个 topic 的详情
kafka-topics.sh --zookeeper 192.168.136.196:2181,192.168.136.197:2181,192.168.136.198:2181 --describe

发布消息
kafka-console-producer.sh --broker-list 192.168.80.10:9092,192.168.80.11:9092,192.168.80.12:9092  --topic test

 

消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.136.196:9092,192.168.136.197:9092,192.168.136.198:9092 --topic CXK --from-beginning

 --from-beginning:会把主题中以往所有的数据都读取出来 

 修改分区数
kafka-topics.sh --zookeeper 192.168.136.196:2181,192.168.136.197:2181,192.168.136.198:2181 --alter --topic CXK --partitions 3

删除 topic
kafka-topics.sh --zookeeper 192.168.136.196:2181,192.168.136.197:2181,192.168.136.198:2181 --delete --topic CXK

 三、Filebeat+Kafka+ELK

1.部署 Zookeeper+Kafka 集群

2.部署 Filebeat 

cd /usr/local/filebeat

vim filebeat.yml
filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /var/log/httpd/access_log
  tags: ["access"]
  
- type: log
  enabled: true
  paths:
    - /var/log/httpd/error_log
  tags: ["error"]

........
#添加输出到 Kafka 的配置
output.kafka:
  enabled: true
  hosts: ["192.168.80.10:9092","192.168.80.11:9092","192.168.80.12:9092"]    #指定 Kafka 集群配置
  topic: "httpd"    #指定 Kafka 的 topic
  
#启动 filebeat
./filebeat -e -c filebeat.yml

 3.部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件

cd /etc/logstash/conf.d/

input {
   kafka {
      bootstrap_servers => "192.168.136.196:9092,192.168.136.197:9092,192.168.136.198:9092"
      topics => "nginx_logs"
      type => "nginx-kafka"
      codec => "json"
      auto_offset_reset => "latest"
      decorate_events => true
   }
}
#filter {}

output {
    if "nginx_access" in [tags] {
        elasticsearch {
            hosts => ["192.168.136.180:9200" ,"192.168.136.190:9200" ,"192.168.136.195:9200"]
            index => "nginx-access-%{+YYYY.MM.dd}"
        }
    }
    if "nginx_error" in [tags] {
        elasticsearch {
            hosts => ["192.168.136.180:9200" ,"192.168.136.190:9200" ,"192.168.136.195:9200"]
            index => "nginx-error-%{+YYYY.MM.dd}"
        }
    }
stdout { codec => rubydebug }
}

 4.浏览器访问

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1404934.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

专业ScrumMaster(高级)- PSM II 认证班,Scrum.org认证PSM II官方认证班

课程简介 Scrum是目前运用最为广泛的敏捷开发方法,是一个轻量级的项目管理和产品研发管理框架,旨在最短时间内交付最大价值。根据2022年全球敏捷状态报告,Scrum的应用占比已经达到87%。 Scrum.org 由 Scrum 的联合创始人 Ken Schwaber 创立…

[亲测有效]CentOS7下安装mysql5.7

前言 近期项目需要搭配mysql一起存储相关数据,但对mysql的版本有要求,于是在服务器搭建了mysql5.7,顺便记录一下搭建步骤和踩坑解决步骤。 目录 前言 一、清除旧安装包 二、安装YUM 三、使用yum命令即可完成安装 四、重新设置密码 五、…

C#winform上位机开发学习笔记5-串口助手的定时发送功能添加

1.功能描述 选择自动发送功能后,按照设定的发送时间发送发送框中的信息数据,设定时间可以手动输入,当手动输入信息无效(非数字)时,系统弹出错误提示,并将其设置为默认定时时间。 2.代码部分 步…

【Gene Expression Prediction】Part3 Deep Learning in Gene Expression Analysis

文章目录 6 第二个讲座:Deep Learning in Gene Expression Analysis6.1 Introduction6.2 D-GEX6.2.1 Connectivity map project6.2.2 Predicting gene expression from landmark genes 6.3 Deep generative models for genomics6.3.1 Manifold hypothesis6.3.2 Auto…

【C++】Qt:QCustomPlot图表绘制库配置与示例

😏★,:.☆( ̄▽ ̄)/$:.★ 😏 这篇文章主要介绍QCustomPlot图表绘制库配置与示例。 学其所用,用其所学。——梁启超 欢迎来到我的博客,一起学习,共同进步。 喜欢的朋友可以关注一下,下次…

2024/1/17 DFS BFS + Div 3 a,b

目录 Lake Counting S 求细胞数量 海战 组合的输出 div3 A. Square div3 B. Arranging Cats Lake Counting S P1596 [USACO10OCT] Lake Counting S - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 感谢大佬的指点!!!! 思…

k8s使用ingress实现应用的灰度发布升级

v1是1.14.0版本nginx ,实操时候升级到v2是1.20.0版本nginx,来测试灰度发布实现过程 一、方案:使用ingress实现应用的灰度发布 1、服务端:正常版本v1,灰度升级版本v2 2、客户端:带有请求头versionv2标识的请求访问版…

老套的购物车效果

一、项目文件结构 二、各个文件内容 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta http-equiv"X-UA-Compatible" content"IEedge" /><meta name"viewport…

【C语言进阶】预处理详解

引言 对预处理的相关知识进行详细的介绍 ✨ 猪巴戒&#xff1a;个人主页✨ 所属专栏&#xff1a;《C语言进阶》 &#x1f388;跟着猪巴戒&#xff0c;一起学习C语言&#x1f388; 目录 引言 预定义符号 #define定义常量 #define定义宏 带有副作用的宏参数 宏替换的规则 …

算法基础学习|双指针算法

双指针算法 代码模板 for (int i 0, j 0; i < n; i ){while (j < i && check(i, j)) j ;// 具体问题的逻辑 } 常见问题分类&#xff1a;(1) 对于一个序列&#xff0c;用两个指针维护一段区间(2) 对于两个序列&#xff0c;维护某种次序&#xff0c;比如归并…

echarts绘制饼图,部分数据隐藏指示线和文本,hover时隐藏指示线和文本的类别也不显示tooltip提示

option {tooltip: {trigger: item,formatter: (p) > {if (p.name) {return ${p.name}&#xff1a;${p.value}个;}},backgroundColor: #ffffff,textStyle: { color: #666666 } // 提示标签字体颜色},legend: {top: 5%,left: center},series: [{name: Access From,type: pie,…

第三方控价服务商怎么选

用对了方法&#xff0c;事半功倍&#xff0c;品牌控价也是如此&#xff0c;品牌方在治理工作中&#xff0c;如果选择自建团队进行处理&#xff0c;需要包含对数据技术的抓取团队&#xff0c;还要有对治理规则熟悉的操作团队&#xff0c;涉及人员和系统&#xff0c;费用成本相应…

TA百人计划学习笔记 3.1.1模板测试

资料 源视频 【技术美术百人计划】图形 3.1 深度与模板测试 传送门效果示例_哔哩哔哩_bilibili ppt 3100-模板测试与深度测试(1) 参考 Unity Shader: 理解Stencil buffer并将它用于一些实战案例&#xff08;描边&#xff0c;多边形填充&#xff0c;反射区域限定&#xff0c;阴影…

EXECL 单元格字符串链接 CONCAT :应用:将一行数据转为json

源&#xff1a; 目标 函数表示 CONCAT("data", CHAR(10), "{", CHAR(10), " ", "ulAlarmId : ", A5, CHAR(10), " ", "ulAlarmLevel : ", D5, CHAR(10)," ", "bBo…

远程桌面--虚拟机与主机的文件传输

注意&#xff1a; 确保VMware开头的服务全部在运行进入虚拟机打开文件管理器点击计算机右键选择属性在选择远程管理选择允许 1.winR 输入mstsc 2.输入虚拟机的ip地址 2.输入虚拟机的密码 上面的Administrator是虚拟机的用户名&#xff0c;有时会需要我们手动输入 3.验证…

【华为 ICT HCIA eNSP 习题汇总】——题目集7

1、一台 PC 的 MAC 地址是 5489-98FB-65D8 &#xff0c;管理员希望该 PC 从 DHCP 服务器获得指定的 IP 地址为192.168.1.11/24&#xff0c;以下命令配置正确的是&#xff08;&#xff09;。 A、dhcp static-bind ip-address 192.168.1.11 24 mac- address 5489-98FB-65D8 B、dh…

java(渣哇)------输入与输出语句(详解) (๑•̌.•๑)

目录 一.java的输出语句&#xff1a; System.out.println() -----输出并换行 System.out.print() -----输出但不换行 System.out.printf() -----类似C语言的printf()输出语句,按格式进行输出 二.java的输入语句&#xff1a; 2.1-----Scanner的基础用法&#xff1a; 2.2…

【江科大】STM32:外部中断(Extern Interrupt)

文章目录 EXTI&#xff08;Extern Interrupt&#xff09;外部中断EXIT的基本结构EXIT框图 旋转编码器简介库函数&#xff1a;对射式红外传感器计次&#xff1a;代码展示&#xff1a;旋转编码器计次注意&#xff1a; EXTI&#xff08;Extern Interrupt&#xff09;外部中断 功能…

vue2中CesiumV1.113.0加载离线地形数据

离线地形数据可以放在vue项目下的public/data/sjzTerrain文件下 由于地形离线数据数量太大&#xff0c;在vue项目编译时会报如下错误&#xff1a; ERROR in EMFILE: too many open files, open D:\test_project\vue_cesium_demo\public\data\sjzTerrain\.tmp\14\26787\11669.h…

macos pip3 install pycryptodome导入from Crypto.Cipher import AES报错

问题&#xff1a; 已经使用pip3 install pycryptodome安装成功了&#xff0c;但是导入from Crypto.Cipher import AES还是提示Unresolved reference Crypto 原因&#xff1a; 一句话&#xff1a;安装文件大小写问题&#xff08;这只是我遇到的一种情况&#xff09;。 修改&am…