流式数据处理与高吞吐消息传递:深入探索Kafka技术的奥秘

news2025/1/13 13:36:02

Kafka 是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由 LinkedIn 公司开发,使用Scala 语言编写,目前是 Apache 的开源项目。

Kafka 概念

Zookeeper 集群是一个基于主从复制的高可用集群,每个服务器承担如下三种角色中的一种

ZooKeeper中常见的角色:

领导者(Leader): 在ZooKeeper集群中,只有一个服务器被选举为领导者。领导者负责处理所有的写请求,例如创建、修改、删除数据节点等操作。它还负责为每个事务分配全局唯一的递增事务ID。

追随者(Follower): 其他服务器可以成为追随者。追随者只负责处理读请求,如读取数据节点的内容。它们会从领导者同步数据并保持与领导者的通信,以确保数据的一致性。

观察者(Observer): 观察者也是处理读请求的角色,类似于追随者。但观察者不参与选举过程,不会成为领导者。观察者不会影响集群的一致性,通常用于减轻领导者的负载。

客户端(Client): 客户端是与ZooKeeper集群进行交互的应用程序。客户端可以连接到任何服务器,不仅限于领导者。客户端可以读取和写入数据,监视数据节点的变化,并获取有关集群状态的信息。

数据节点(ZNode): 数据节点是ZooKeeper中的基本数据单元,类似于文件系统中的文件或目录。每个数据节点可以包含一些数据,以及关联的元数据。客户端可以对数据节点进行读写操作,而监视数据节点可以帮助客户端实时了解节点内容的变化

Zookeeper 工作原理

Kafka是一种高吞吐量、分布式、可持久化的消息传递系统,用于处理实时流数据和构建实时数据流架构。以下是Kafka的核心概念:

Producer(生产者): 生产者是向Kafka主题发送消息的应用程序或系统。它负责将数据发布到Kafka集群中的指定主题。消息可以是任意格式的数据,如日志、事件等。

Consumer(消费者): 消费者是从Kafka主题订阅消息并处理它们的应用程序。消费者可以以不同的组进行分组,每个组只能消费主题中的一部分消息,这样可以实现消息的分发和负载均衡。

Broker(代理服务器): 代理服务器是Kafka集群中的节点,负责存储数据并处理生产者和消费者之间的消息交互。多个代理服务器组成一个Kafka集群,数据会在不同的代理之间进行复制和分发。

Topic(主题): 主题是消息的逻辑容器,生产者发布消息到主题,消费者从主题订阅消息。Kafka集群可以包含多个主题,每个主题可以有多个分区。

Partition(分区): 分区是主题的物理分片,每个分区是一个有序的、不可变的消息序列。分区允许数据水平分割和存储,以提高数据吞吐量和并行处理能力。

Offset(偏移量): 偏移量是每个消息在分区中的唯一标识,用于跟踪消息的消费进度。消费者可以指定从特定的偏移量开始消费消息。

Replication(复制): 复制是Kafka提供的高可用性机制,每个分区可以有多个副本。一个副本是主副本,其他副本是从副本。主副本负责处理写操作,从副本用于数据冗余和故障转移。

Consumer Group(消费者组): 消费者组是一组消费者的集合,它们共同消费同一个主题。Kafka会将主题中的消息分发给不同的消费者组,从而实现消息的负载均衡和并行处理。

Zookeeper: Kafka过去依赖于ZooKeeper来管理集群的元数据和状态,但从Kafka 0.11版本开始,可以选择使用内部的元数据管理系统。

Kafka 数据存储设计

Kafka的数据存储设计是为了支持高吞吐量、持久性、可扩展性和容错性。它将数据存储在主题(Topic)的分区(Partition)中,每个分区又被分为多个消息段(Segment)。以下是Kafka数据存储设计的核心要点:

主题和分区: 每个主题可以被划分为多个分区,每个分区都是一个有序、不可变的消息日志。分区的数量可以根据数据负载和需求进行动态调整。

分区的消息段: 每个分区内的数据被分为多个消息段,每个消息段包含一定数量的连续消息。消息段的大小可以配置,一旦达到大小限制,就会创建一个新的消息段。

索引和偏移量: 每个分区维护着一个索引,将消息的偏移量映射到消息段中的位置。偏移量用于唯一标识每个消息,消费者可以根据偏移量来消费消息。

日志文件格式:每个消息段以日志文件的形式存储在磁盘上。Kafka的日志文件格式采用顺序写入的方式,减少了磁盘的随机写入,提高了写入性能。

复制和副本: 每个分区可以有多个副本,其中一个是主副本,其他是从副本。主副本负责处理写操作,从副本用于数据冗余和故障转移,提供高可用性。

消息存储策略: Kafka支持两种消息存储策略:日志段滚动(Log Segment Rolling)和时间驱动(Time-based)滚动。日志段滚动在消息段达到一定大小时触发,时间驱动滚动在一段时间后触发,这两种策略结合了数据的容量和时效性。

清理策略: Kafka使用清理策略来删除旧的消息段,以释放磁盘空间。默认情况下,Kafka保留一段时间内的所有数据,可以根据配置来调整数据的保留时间。

索引和内存映射: Kafka使用内存映射来加速消息段的索引访问。每个消息段的索引都保存在内存中,以支持快速的偏移量到物理位置的查找。

Kafka生产者设计

负载均衡(partition 会均衡分布到不同 broker 上)

由于消息 topic 由多个 partition 组成,且 partition 会均衡分布到不同 broker 上,因此,为了有效利用 broker 集群的性能,提高消息的吞吐量,producer 可以通过随机或者 hash 等方式,将消息平均发送到多个 partition 上,以实现负载均衡。

批量发送

是提高消息吞吐量重要的方式,Producer 端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给 broker,从而大大减少 broker 存储消息的 IO 操作次数。但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。

压缩(GZIP 或 Snappy)

Producer 端可以通过 GZIP 或 Snappy 格式对消息集合进行压缩。Producer 端进行压缩之后,在Consumer 端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是 CPU(压缩和解压会耗掉部分 CPU 资源)。

图片

Kafka消费者设计

Consumer Group 

同一 Consumer Group 中的多个 Consumer 实例,不同时消费同一个 partition,等效于队列模式。partition 内消息是有序的,Consumer 通过 pull 方式消费消息。Kafka 不删除已消费的消息对于 partition,顺序读写磁盘数据,以时间复杂度 O(1)方式提供消息持久化能力。
Kafka消费者并进行消息处理:

import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;import java.util.Collections;import java.util.Properties;
public class SimpleKafkaConsumer {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";    private static final String GROUP_ID = "my-consumer-group";    private static final String TOPIC = "my-topic";
    public static void main(String[] args) {        Properties props = new Properties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);        consumer.subscribe(Collections.singletonList(TOPIC));
        try {            while (true) {                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));                for (ConsumerRecord<String, String> record : records) {                    System.out.println("Received message: " + record.value());                    // TODO: Add your message processing logic here                }                consumer.commitAsync(); // Commit the offsets after processing            }        } catch (Exception e) {            e.printStackTrace();        } finally {            consumer.close();        }    }}

结语END

Kafka是一个分布式的、高吞吐量的消息传递系统,它具有持久性、可扩展性和容错性,适用于处理实时流数据和构建实时数据流架构。以下是对Kafka的总结:

特点与优势:

高吞吐量:Kafka的设计目标是高吞吐量的消息传递,能够处理大量的实时数据流。

持久性:Kafka将消息持久化存储在磁盘上,确保数据不会丢失,即使消费者未及时消费。

分布式架构:Kafka采用分布式架构,可以横向扩展以适应不断增长的数据流量。

可扩展性:Kafka的分区机制和分布式部署允许动态地增加主题、分区和副本。

容错性:Kafka支持分区和副本,可以实现数据冗余和高可用性。

多语言支持:Kafka提供多种语言的客户端,如Java、Python、C++等,便于开发者集成和使用。

核心概念:

主题和分区:每个消息被发布到一个特定的主题,主题可以划分为多个分区。

分区副本:每个分区可以有多个副本,提供数据冗余和高可用性。

生产者和消费者:生产者将消息发布到主题,消费者从主题中订阅并消费消息。

分区选择和偏移量:消费者可以选择消费特定分区,并跟踪已消费的偏移量。

消息处理:消费者负责处理从主题中拉取的消息,实现数据处理逻辑。

适用场景:

日志收集和处理:Kafka广泛用于实时日志收集、存储和分析。

事件流处理:Kafka能够处理大量的事件流,适用于实时数据分析和监控。

数据流架构:Kafka作为数据流架构的核心,可以构建实时的数据流处理平台。

分布式协调:Kafka可以用于实现分布式系统中的协调和通信。

总之,Kafka作为一个高性能的分布式消息传递系统,适用于处理实时数据流和构建实时数据流架构。它的设计理念和特性使得它在大规模数据处理、事件驱动架构等场景中发挥着重要作用,为实时数据流处理提供了强大的支持和解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/981093.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【网络编程】C++实现网络通信服务器程序||计算机网络课设||Linux系统编程||TCP协议(附源码)

TCP网络服务器 &#x1f40d; 1.程序简洁&#x1f98e;2. 服务端ServerTcp程序介绍&#x1f996;3.线程池ThreadPool介绍&#x1f995; 4.任务类Task介绍&#x1f419;5. 客户端Client介绍&#x1f991;6.运行结果&#xff1a;&#x1f990; 7. 源码&#x1f99e;7.1 serverTcp…

C++内存管理(3)——内存池

1. 默认内存管理函数的不足&#xff08;为什么使用内存池&#xff09; 利用默认的内存管理操作符 new/delete 和函数 malloc()/free() 在堆上分配和释放内存会有一些额外的开销。 系统在接收到分配一定大小内存的请求时&#xff0c;首先查找内部维护的内存空闲块表&#xff0…

纯css制作常见的图形

1.正方形 <div class"square"></div> .square {width: 100px;height: 100px;background-color: #ffff00;} 效果&#xff1a; 2.长方形 <div class"rectangle"></div> .rectangle{width: 200px;height: 100px;background-color:…

用huggingface.Accelerate进行分布式训练

诸神缄默不语-个人CSDN博文目录 本文属于huggingface.transformers全部文档学习笔记博文的一部分。 全文链接&#xff1a;huggingface transformers包 文档学习笔记&#xff08;持续更新ing…&#xff09; 本部分网址&#xff1a;https://huggingface.co/docs/transformers/m…

Layui快速入门之第一节Layui的基本使用

目录 一&#xff1a;Layui的基本概念 二&#xff1a;Layui使用的基本步骤 1.在官网下载layui的基本文件&#xff0c;引入css和js文件 ①&#xff1a;普通方式引入 ②&#xff1a;第三方 CDN 方式引入 2.在script标签体中编写代码 3.测试 一&#xff1a;Layui的基本概念 …

Mac m1 安装rabbitmq+php-amqplib

rabbitmq 官方地址 https://www.rabbitmq.com mac 软件包 Downloading and Installing RabbitMQ — RabbitMQ 一.这里我选择 homebrew brew updatebrew install rabbitmq二.php代码 用composer 安装 10年软件开发经验,结交朋友! 分销商城系统开发,App商城开发 商务合作 s…

eclipse进入断点之后,一直卡死,线程一直在运行【记录一种情况】

问题描述: 一直卡死在某个断点处&#xff0c;取消断点也是卡死在这边的进程处。 解决方式&#xff1a; 将JDK的使用内存进行了修改 ① 打开eclipse&#xff0c;window->preference->Java->Installed JREs&#xff0c;选中使用的jdk然后点击右侧的edit&#xff0c;在…

打造基于终端命令行的IDE,Termux配置Vim C++开发环境

Termux配置Vim C开发环境&#xff0c;打造基于终端命令行的IDE 主要利用VimCoc插件&#xff0c;配置C的代码提示等功能。 Termux换源 打开termux&#xff0c;输入termux-change-repo 找到mirrors.tuna.tsinghua.edu.cn&#xff0c;清华源&#xff0c;空格选中&#xff0c;回…

LeetCode(力扣)40. 组合总和 IIPython

LeetCode40. 组合总和 II 题目链接代码 题目链接 https://leetcode.cn/problems/combination-sum-ii/ 代码 class Solution:def backtrackingz(self, candidates, target, result, total, path, startindex):if target total:result.append(path[:])return for i in range…

elasticsearch访问9200端口 提示需要登陆

项目场景&#xff1a; 提示&#xff1a;这里简述项目相关背景&#xff1a; elasticsearch访问9200端口 提示需要登陆 问题描述 提示&#xff1a;这里描述项目中遇到的问题&#xff1a; 在E:\elasticsearch-8.9.1-windows-x86_64\elasticsearch-8.9.1\bin目录下输入命令 ela…

手写Spring:第5章-注入属性和依赖对象

文章目录 一、目标&#xff1a;注入属性和依赖对象二、设计&#xff1a;注入属性和依赖对象三、实现&#xff1a;注入属性和依赖对象3.0 引入依赖3.1 工程结构3.2 注入属性和依赖对象类图3.3 定义属性值和属性集合3.3.1 定义属性值3.3.2 定义属性集合 3.4 Bean定义补全3.5 Bean…

Flutter实用工具Indexer列表索引和Search搜索帮助。

1.列表索引 效果图&#xff1a; indexer.dart import package:json_annotation/json_annotation.dart;abstract class Indexer {///用于排序的字母JsonKey(includeFromJson: false, includeToJson: false)String? sortLetter;///用于排序的拼音JsonKey(includeFromJson: fal…

学习笔记|计数器|Keil软件中 0xFD问题|I/O口配置|STC32G单片机视频开发教程(冲哥)|第十二集:计数器的作用和意义

文章目录 1.计数器的用途2.计数器的配置官方例程开始Tips&#xff1a;编译时提示错误FILE DOES NOT EXIST&#xff1a; 3.计数器的应用本例完整代码&#xff1a;总结课后练习&#xff1a; 1.计数器的用途 直流有刷的电机,后面两个一正一负的电接上,电机就可以转 到底是转子个…

NLP(六十八)使用Optimum进行模型量化

本文将会介绍如何使用HuggingFace的Optimum&#xff0c;来对微调后的BERT模型进行量化&#xff08;Quantization&#xff09;。   在文章NLP&#xff08;六十七&#xff09;BERT模型训练后动态量化&#xff08;PTDQ&#xff09;中&#xff0c;我们使用PyTorch自带的PTDQ&…

李宏毅-机器学习hw4-self-attention结构-辨别600个speaker的身份

一、慢慢分析学习pytorch中的各个模块的参数含义、使用方法、功能&#xff1a; 1.encoder编码器中的nhead参数&#xff1a; self.encoder_layer nn.TransformerEncoderLayer( d_modeld_model, dim_feedforward256, nhead2) 所以说&#xff0c;这个nhead的意思&#xff0c;就…

使用Maven创建父子工程

&#x1f4da;目录 创建父工程创建子模块创建子模块示例创建认证模块(auth) 结束 创建父工程 选择空项目&#xff1a; 设置&#xff1a;项目名称&#xff0c;组件名称&#xff0c;版本号等 创建完成后的工程 因为我们需要设置这个工程为父工程所以不需要src下的所有文件 在pom…

WPF Flyout风格动画消息弹出消息提示框

WPF Flyout风格动画消息弹出消息提示框 效果如图&#xff1a; XAML: <Window x:Class"你的名称控件.FlyoutNotication"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xam…

java八股文面试[数据库]——索引覆盖

覆盖索引是一种避免回表查询的优化策略: 只需要在一棵索引树上就能获取SQL所需的所有列数据&#xff0c;无需回表&#xff0c;速度更快。 具体的实现方式: 将被查询的字段建立普通索引或者联合索引&#xff0c;这样的话就可以直接返回索引中的的数据&#xff0c;不需要再通过聚…

肖sir__设计测试用例方法之因果图07_(黑盒测试)

设计测试用例方法之因果图 一、定义&#xff1a;因果图提供了一个把规格转化为判定表的系统化方法&#xff0c;从该图中可以产生测试数据。其 中&#xff0c;原因是表示输入条件&#xff0c;结果是对输入执 行的一系列计算后得到的输出。 二、因果图方法最终生成的就是判定表。…

rhcsa4 进程和SSH

tree命令。用于以树状结构显示目录和文件。通过运行 “tree” 命令可视化地查看文件系统中的目录结构。 tree / systemd是第一个系统进程&#xff08;pid1&#xff09;不启动&#xff0c;其他进程也没法启动&#xff0c; 用pstree查看进程树 我们可以看到所有进程都是syste…