Kafka 分布式消息系统详细介绍

news2025/1/22 20:59:02

Kafka 分布式消息系统

  • 一、Kafka 概述
    • 1.1 Kafka 定义
    • 1.2 Kafka 设计目标
    • 1.3 Kafka 特点
  • 二、Kafka 架构设计
    • 2.1 基本架构
    • 2.2 Topic 和 Partition
    • 2.3 消费者和消费者组
    • 2.4 Replica 副本
  • 三、Kafka 分布式集群搭建
    • 3.1 下载解压
      • 3.1.1 上传解压
    • 3.2 修改 Kafka 配置文件
      • 3.2.1 修改zookeeper.properties配置文件
      • 3.2.2 修改consumer.properties配置文件
      • 3.2.3 修改producer.properties配置
      • 3.2.4 修改server.properties配置
    • 3.3 修改 Kafka 配置同步到其他节点
    • 3.4 修改 Kafka Server 编号
    • 3.5 启动Kafka 集群
    • 3.5.1 启动Zookeeper集群
    • 3.5.1 启动 Kafka 集群
    • 3.6 Kafka 集群测试
      • 3.6.1 创建Topic
      • 3.6.2 查看Topic列表
      • 3.6.2 查看Topic详情
      • 3.6.3 消费者消费Topic
      • 3.6.4 生产者向Topic发送消息
  • 四、案例实践:Flume 与 Kafka 集成开发
    • 4.1 配置Flume聚合服务
    • 4.2 Flume与Kafka集成测试
      • 4.2.1 启动Flume聚合服务
      • 4.2.2 启动 Flume 采集服务
      • 4.2.3 启动 Kafka 消费者服务
      • 4.2.4 准备测试数据

一、Kafka 概述

1.1 Kafka 定义

Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala语言编写,它以可水平扩展和高吞吐率的特点而被广泛使用。目前越来越多的开源分布式处理系统,如Spark、Flink都支持与Kafka集成。比如一个实时日志分析系统,Flume采集数据通过接口传输到Kafka集群(多台Kafka服务器组成的集群称为Kafka集群),然后Flink或者Spark直接调用接口从Kafka实时读取数据并进行统计分析。

1.2 Kafka 设计目标

  • 以时间复杂度为O(1)的方式提供消息持久化(Kafka)能力,即使对TB级以上数据也能保证常数时间的访问性能。持久化是将程序数据在持久状态和瞬时状态间转换的机制。通俗地讲,就是瞬时数据(比如内存中的数据是不能永久保存的)持久化为持久数据(比如持久化至磁盘中能够长久保存)。
  • 保证高吞吐率,即使在非常廉价的商用机器上,也能做到单机支持每秒100,000条消息的传输速度。
  • 支持Kafka Server间的消息分区,以及分布式消息消费,同时保证每个Partition内的消息顺序传输。
  • 支持离线数据处理和实时数据处理。

1.3 Kafka 特点

  • 高吞吐量、低延迟:Kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。
  • 可扩展性:Kafka集群同Hadoop集群一样,支持横向扩展。
  • 持久性、可靠性:Kafka消息可以被持久化到本地磁盘,并且支持Partition数据备份,防止数据丢失。
  • 容错性:允许Kafka集群中的节点失败,如果Partition(分区)副本数量为n,则最多允许n-1个节点失败。
  • 高并发:单节点支持上千个客户端同时读写,每秒钟有上百MB的吞吐量,基本上达到了网卡的极限。

二、Kafka 架构设计

2.1 基本架构

在这里插入图片描述
生产者将数据写入 Kafka,消费者从 Kafka 中读取数据,Zookeeper 提供协调服务,如生产者和消费者的负载均衡

2.2 Topic 和 Partition

在这里插入图片描述
生产者将数据写入主题,实际写入分区(轮询,随机等),一个分区只能对应一个消费者组中的一个消费组,而一个消费者可以对应多个分区。

2.3 消费者和消费者组

在这里插入图片描述
一个分区只能对应一个消费者组中的一个消费者,消费者组相互独立,一个分区可以对应多个不同消费者组中的消费者,一个消费者可以对应多个分区。

2.4 Replica 副本

  • Leader:每个Replica集合中的分区都会选出一个唯一的Leader,所有的读写请求都由Leader处理,其他副本从Leader处把数据更新同步到本地。

  • Follower:是副本中的另外一个角色,可以从Leader中复制数据。

  • ISR:Kafka集群通过数据冗余来实现容错。每个分区都会有一个Leader,以及零个或多个Follower,Leader加上Follower总和就是副本因子。Follower与Leader之间的数据同步是通过Follower主动拉取Leader上面的消息来实现的。所有的Follower不可能与Leader中的数据一直保持同步,那么与Leader数据保持同步的这些Follower称为IS(In Sync Replica)。Zookeeper维护着每个分区的Leader信息和ISR信息。

三、Kafka 分布式集群搭建

3.1 下载解压

下载地址:https://archive.apache.org/dist/kafka/

此处使用的下载的版本式:kafka_2.12_2.8.2.tgz

3.1.1 上传解压

[root@hadoop1 local]# tar -zxvf kafka_2.12-2.8.2.tgz 

添加软连接

[root@hadoop1 local]# ln -s kafka_2.12-2.8.2 kafka

在这里插入图片描述

3.2 修改 Kafka 配置文件

3.2.1 修改zookeeper.properties配置文件

进入Kafka的config目录下,修改zookeeper. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/zookeeper.properties 

修改如下内容:

dataDir=/usr/local/data/zookeeper/zkdata
clientPort=2181

3.2.2 修改consumer.properties配置文件

进入Kafka的config目录下,修改consumer. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/consumer.properties

修改如下内容:

bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092

备注:hadoop1:9092,hadoop2:9092,hadoop3:9092 为集群hadoop地址

3.2.3 修改producer.properties配置

进入Kafka的config目录中,修改producer. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/producer.properties 

修改内容如下:

bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092

3.2.4 修改server.properties配置

进入Kafka的config目录下,修改server. properties配置文件,具体内容如下:

[root@hadoop1 local]# vim /usr/local/kafka/config/server.properties 

修改内容如下:

zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181

3.3 修改 Kafka 配置同步到其他节点

将hadoop1节点中配置好的Kafka安装目录分发给hadoop2和hadoop3节点,具体操作如下所示:

[root@hadoop1 local]# deploy.sh /usr/local/kafka_2.12-2.8.2 /usr/local/ slave

给从节点创建软链接:

[root@hadoop1 local]# runRemoteCmd.sh "ln -s /usr/local/kafka_2.12-2.8.2 /usr/local/kafka" slave

备注:deploy.sh 是集群推送脚本,可以参考《ZooKeeper 集群的详细部署》

3.4 修改 Kafka Server 编号

登录hadoop1、hadoop2和hadoop3节点,分别进入Kafka的config目录下,修改server.properties配置文件中的broker.id项,具体操作如下所示:
[root@hadoop1 local]# vim /usr/local/kafka/config/server.properties
#标识hadoop1节点
broker.id=1
[root@hadoop2 local]# vim /usr/local/kafka/config/server.properties
#标识hadoop2节点
broker.id=2
[root@hadoop3 local]# vim /usr/local/kafka/config/server.properties
#标识hadoop3节点
broker.id=3

3.5 启动Kafka 集群

Zookeeper管理着Kafka Broker集群,同时Kafka将元数据信息保存在Zookeeper中,说明Kafka集群依赖Zookeeper提供协调服务,所以需要先启动Zookeeper集群,然后再启动Kafka集群。

3.5.1 启动Zookeeper集群

在集群各个节点中进入Zookeeper安装目录,使用如下命令启动Zookeeper集群。

# 启动集群
[root@hadoop1 local]# runRemoteCmd.sh "/usr/local/zookeeper/bin/zkServer.sh start" all
# 查看zookeeper 集群状态
[root@hadoop1 local]# runRemoteCmd.sh "/usr/local/zookeeper/bin/zkServer.sh status" all

在这里插入图片描述

3.5.1 启动 Kafka 集群

在集群各个节点中进入Kafka安装目录,使用如下命令启动Kafka集群。

[root@hadoop1 local]# runRemoteCmd.sh "/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties" all

在这里插入图片描述
显示 Kafka 已经启动。

3.6 Kafka 集群测试

Kafka自带有很多种Shell脚本供用户使用,包含生产消息、消费消息、Topic管理等功能。接下来利用Kafka Shell脚本测试使用Kafka集群。

3.6.1 创建Topic

使用Kafka的bin目录下的kafka-topics.sh脚本,通过create命令创建名为test的Topic,具体操作如下所示。

[root@hadoop1 local]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3

上述命令中,–zookeeper 指定 Zookeeper 集群;–create 是创建 Topic 命令;–topic指定Topic名称;–replication-factor 指定副本数量;–partitions指定分区个数。

在这里插入图片描述

3.6.2 查看Topic列表

通过list命令可以查看Kafka 的Topic列表,具体操作如下所示。

[root@hadoop1 kafka]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper hadoop1:2181  --list

在这里插入图片描述

3.6.2 查看Topic详情

通过describe命令查看Topic内部结构,具体操作如下所示。

[root@hadoop1 kafka]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper hadoop1:2181 --describe --topic test

在这里插入图片描述

3.6.3 消费者消费Topic

在hadoop1节点上,通过Kafka自带的kafka-console-consumer.sh脚本,开启消费者消费 test中的消息。

[root@hadoop1 kafka]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic test

在这里插入图片描述

3.6.4 生产者向Topic发送消息

在hadoop1节点上,通过Kafka自带的kafka-console-producer.sh脚本启动生产者,然后向 test发送3条消息,具体操作如下所示。

[root@hadoop1 logs]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list  hadoop1:9092 --topic test

生成者输入:
在这里插入图片描述
消费者展示:
在这里插入图片描述

四、案例实践:Flume 与 Kafka 集成开发

在 《Flume 日志采集系统》 的基础上进行 kafka 集成开发

4.1 配置Flume聚合服务

在 hadoop2 和 hadoop3 服务器配置分配配置 Flume 聚合服务

[root@hadoop1 conf]# vim /usr/local/flume/conf/avro-file-selector-kafka.properties
[root@hadoop2 conf]# vim /usr/local/flume/conf/avro-file-selector-kafka.properties

分别写入如下内容并保存:

#定义source、channel、sink的名称
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
# 定义和配置一个avro Source
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234
# 定义和配置一个file channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /usr/local/data/flume/checkpointDir
agent1.channels.c1.dataDirs = /usr/local/data/flume/dataDirs
# 定义和配置一个kafka sink
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = test
agent1.sinks.k1.brokerList = hadoop1:9092,hadoop2:9092,hadoop3:9092
agent1.sinks.k1.producer.acks = 1
agent1.sinks.k1.channel = c1

4.2 Flume与Kafka集成测试

4.2.1 启动Flume聚合服务

在 采集服务器 hadoop2 和 hadoop3 分别启动聚合服务

[root@hadoop2 conf]# /usr/local/flume/bin/flume-ng agent -n agent1 -c conf -f /usr/local/flume/conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console

[root@hadoop3 local]# /usr/local/flume/bin/flume-ng agent -n agent1 -c conf -f /usr/local/flume/conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console

在这里插入图片描述

4.2.2 启动 Flume 采集服务

在 Hadoop1 启动 Flume 采集脚本:

[root@hadoop1 conf]# /usr/local/flume/bin/flume-ng agent -n agent1 -c conf -f /usr/local/flume/conf/taildir-file-selector-avro.properties -Dflume.root.logger=INFO,console

在这里插入图片描述
正常启动 Flume 采集脚本

4.2.3 启动 Kafka 消费者服务

在 hadoop1 启动 Kafka 消费者服务脚本

[root@hadoop1 data]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic test

在这里插入图片描述

4.2.4 准备测试数据

在 hadoop1 另开连接,执行如下脚本:

[root@hadoop1 logs]# echo '00:00:100971413028304674[火炬传递路线时间]1 2www.olympic.cn/news/beijing/2008-03-19/1417291.html' >> /usr/local/data/flume/logs/sogou.log

输入三条测试数据
在这里插入图片描述

消费者打印三条测试数据:
在这里插入图片描述
至此,案例测试成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2114593.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java操作Elasticsearch的实用指南

Java操作Elasticsearch的实用指南 一、创建索引二、增删改查 一、创建索引 在ElasticSearch中索引相当于mysql中的表,mapping相当于表结构,所以第一步我们要先创建索引。 假设我们有一张文章表的数据需要同步到ElasticSearch,首先需要根据数据库表创建…

DisplayManagerService启动及主屏添加-Android13

DisplayManagerService启动及主屏添加-Android13 1、DisplayManagerService启动1.1 简要时序图 2、DEFAULT_DISPLAY主屏幕添加2.1 物理屏热插拔监听2.2 物理屏信息 3、默认屏幕亮度 1、DisplayManagerService启动 1.1 简要时序图 代码位置:frameworks/base/service…

git:基本操作(2)

目录 git操作(2) 1.版本回退 2.撤销修改 3.删除文件 git操作(2) 1.版本回退 git能够管理文件的历史版本,这也是版本控制器的重要能力,因此,git也提供了版本回退这样的功能。 执行git reset…

QT6聊天室项目 网络通信实现逻辑分析

实现逻辑 模块话网络通信设计分析 NetClient类 功能:负责与服务器进行通信httpClient:处理HTTP请求websocketClient:处理WebSocket通信 HTTP请求封装 设计请求和服务器响应的接口设计函数测试网络连接性设计处理的函数处理HTTP请求(后期实现…

C#/.NET/.NET Core推荐学习路线文档文章

前言 专门为C#/.NET/.NET Core推荐学习路线&文档&文章提供的一个Issues,各位小伙伴可以把自己觉得不错的学习路线、文档、文章相关地址分享出来🤞。 https://github.com/YSGStudyHards/DotNetGuide/issues/10 🏷️C#/.NET/.NET Cor…

智慧工地解决方案-2

### 1. 智慧工地解决方案概述 《智慧工地解决方案》针对传统工地的低效率和高风险问题,提出了一套集成现代技术的智能管理系统,以提升工地安防和生产效率。 ### 2. 工地现状与挑战 当前工地存在安全意识薄弱、管理粗放、环境污染监测困难等问题&#…

数据分析面试题:客户投保问题分析

目录 0 场景描述 1 数据准备 2 问题分析 2.1 计算小微公司的平均经营时长 2.2 计算小微公司且角色为投保人,保险起期在18年的总保费 2.3 假设,DWD_CUSTOMER_REL客户关联关系表中,存在部分客户保单数很多,部分客户保单数很少的情况,此时DWD_CUSTOMER_BASE表关联,程序…

Learn ComputeShader 10 HUD Overlay

前言: 1. HUD Overlay (Head-Up Display Overlay) 定义: HUD 是指游戏或应用程序中的一类叠加界面元素,通常显示在屏幕上,用于向用户提供实时信息。它通常显示关键信息而不会打断用户的主要活动或视线。应用场景: 常见于游戏、飞行模拟器和…

[项目][CMP][Page Cache]详细讲解

目录 1.申请内存2.释放内存3.框架 1.申请内存 当Central Cache向Page Cache申请内存时,Page Cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个 比如:申请的是4页page,4页page后面…

【MRI基础】TI反转时间概念

在磁共振成像 (MRI) 中,反转时间 (TI) 是反转恢复脉冲序列中的一个特定参数。它表示施加 180 度反转脉冲(将纵向磁化翻转到相反方向)与随后的 90 度激励脉冲(将磁化翻转到横向平面以创建 MR 信号)之间的时间间隔。 MRI…

常见概念 -- 电层业务调制谱宽与光层通道谱宽

本文介绍了“电层业务调制谱宽”和“光层通道谱宽”这两个概念,并结合网管的配置界面解释二者的配置方法。 电层业务调制谱宽 电层业务调制谱宽与光线路码型唯一相关,光线路码型确定后谱宽随之确定。 电层业务调制谱宽是指某业务信号的损耗谱从峰值下…

C++解决:求排列数

描述 输入两个整数m,n&#xff0c;求m个数字中选n个数的排列数。&#xff08;1<n<m<50&#xff09; 输入描述 两个正整数m和n。 输出描述 一个正整数表示排列数。 用例输入 1 6 5 用例输出 1 720 AC code #include<bits/stdc.h> using namespace s…

[linux 驱动]platform总线设备驱动详解与实战

目录 1 描述 2 结构体 2.1 bus_type 2.2 platform_bus_type 2.2.1 platform_match 2.2.2 platform_uevent 2.2.3 platform_dma_configure 2.2.4 platform_dev_pm_ops 2.3 platform_driver 2.4 platform_device 3 platform注册 3.1 platform_driver_register 3.1.1 …

【python因果推断库11】工具变量回归与使用 pymc 验证工具变量4

目录 Wald 估计与简单控制回归的比较 CausalPy 和 多变量模型 感兴趣的系数 复杂化工具变量公式 Wald 估计与简单控制回归的比较 但现在我们可以将这个估计与仅包含教育作为控制变量的简单回归进行比较。 naive_reg_model, idata_reg make_reg_model(covariate_df.assign…

C语言:刷题日志(1)

一.阶乘计算升级版 本题要求实现一个打印非负整数阶乘的函数。 其中n是用户传入的参数&#xff0c;其值不超过1000。如果n是非负整数&#xff0c;则该函数必须在一行中打印出n!的值&#xff0c;否则打印“Invalid input”。 首先&#xff0c;知道阶乘是所有小于及等于该数的…

halcon 自定义距离10的一阶导数幅图,摆脱sobel的3掩码困境

一&#xff0c;为什么要摆脱3的掩码 在处理图像的过程中&#xff0c;会用到平滑算子&#xff0c;很容易破坏边际&#xff0c;所谓的一阶导数sobel只计算掩码为3的差分&#xff0c;在幅度图分割中&#xff0c;往往是很难把握的。 举个例子-现在图像头平滑好了&#xff0c;缺陷…

【Python 千题 —— 算法篇】寻找两个正序数组的中位数

Python 千题持续更新中 …… 脑图地址 &#x1f449;&#xff1a;⭐https://twilight-fanyi.gitee.io/mind-map/Python千题.html⭐ 题目背景 在处理大规模数据时&#xff0c;我们经常需要对数据进行排序和分析。一个常见问题是如何高效地从两个正序数组中找出它们的中位数。…

今天又学到了——图编号关联章节号,QGIS下载文件存储的瓦片

记录教程来源&#xff1a;​​​​​​【Word图编号关联章节号】图片分章节 编号&#xff0c;图1-1、图2-1_哔哩哔哩_bilibili 上面链接这个实现的是这个效果&#xff1a; word自动目录及章节自动编号教程_哔哩哔哩_bilibili&#xff0c;这个的效果是自己设计多级列表&#xf…

Pr:首选项 - 音频

Pr菜单&#xff1a;编辑/首选项 Edit/Preferences Premiere Pro 首选项中的“音频” Audio选项卡主要作用是控制音频的处理设置&#xff0c;包括音量调整、波形生成、音频渲染等选项&#xff0c;这些设置有助于优化音频的处理和编辑工作&#xff0c;适用于不同的剪辑需求和项目…

【Qt】Qt与Html网页进行数据交互

前言&#xff1a;此项目使用达梦数据库&#xff0c;以Qt制作服务器&#xff0c;Html制作网页客户端界面&#xff0c;可以通过任意浏览器访问。 1、Qt与网页进行数据交互 1.1、第一步&#xff1a;准备qwebchannel.js文件 直接在qt的安装路径里复制即可 1.2、第二步&#xf…