Kafka技术基础

news2024/12/23 10:15:30

Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一,是消息队列的一种实现方式,提供消息的持久化。Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

  • 对机械硬盘的高效利用

Kafka的消息保存在本地磁盘,所以收集消息的高性能来源于对磁盘写入性能的优化。传统的机械硬盘结构决定了顺序读写性能最好,kafka 在将消息写入磁盘时全是顺序写操作,要比随机写的效率高很多,避免了大量缓慢的机械运动。

过于频繁的小 I/O 操作会拖慢速度,所以 kafka 会将一批次消息打包到一起批量写回磁盘。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j9dHeFHW-1682174778580)(C:\Users\hp\Pictures\gitee\Hadoop生态体系\image-20210228064205689.png)]

使你能够将消息从一个端点传递到另一个端点。Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在集群内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。

Kafka是一个统一的平台,用于处理所有实时数据Feed。 Kafka支持低延迟消息传递,并在出现机器故障时提供对容错的保证。 它具有处理大量不同消费者的能力。 Kafka非常快,执行2百万写/秒。 Kafka将所有数据保存到磁盘,这实质上意味着所有写入都会进入操作系统(RAM)的页面缓存。 这使得将数据从页面缓存传输到网络套接字非常有效。

关键术语:

  • broker:Kafka 集群中有很多台 Server,其中每一台 Server 都可以存储消息,将每一台 Server 称为一个 kafka 实例,也叫做 broker。

  • 主题(topic):一个 topic 里保存的是同一类消息,相当于对消息的分类,每个 producer 将消息发送到 kafka 中,都需要指明要存的 topic 是哪个,也就是指明这个消息属于哪一类。

  • 分区(partition):每个 topic 都可以分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部。为什么要进行分区呢?最根本的原因就是:kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。

  • 偏移量(Offset):一个分区对应一个磁盘上的文件,而消息在文件中的位置就称为 offset(偏移量),offset 为一个 long 型数字,它可以唯一标记一条消息。由于kafka 并没有提供其他额外的索引机制来存储 offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行“随机读写”。

综上,我们总结一下 Kafka 的几个要点:

  • kafka 是一个基于发布-订阅的分布式消息系统(消息队列)
  • Kafka 面向大数据,消息保存在主题中,而每个 topic 有分为多个分区
  • kafka 的消息数据保存在磁盘,每个 partition 对应磁盘上的一个文件,消息写入就是简单的文件追加,文件可以在集群内复制备份以防丢失
  • 即使消息被消费,kafka 也不会立即删除该消息,可以通过配置使得过一段时间后自动删除以释放磁盘空间
  • kafka依赖分布式协调服务Zookeeper,适合离线/在线信息的消费,与 storm 和 spark 等实时流式数据分析常常结合使用

一个 topic 对应的多个 partition 分散存储到集群中的多个 broker 上,存储方式是一个 partition 对应一个文件,每个 broker 负责存储在自己机器上的 partition 中的消息读写。

kafka 还可以配置 partitions 需要备份的个数(replicas),每个 partition 将会被备份到多台机器上,以提高可用性,备份的数量可以通过配置文件指定。

每个 partition 选举一个 server 作为“leader”,由 leader 负责所有对该分区的读写,其他 server 作为 follower 只需要简单的与 leader 同步,保持跟进即可。如果原来的 leader 失效,会重新选举由其他的 follower 来成为新的 leader。Kafka 使用 ZK 在 Broker 中选出一个 Controller,用于 Partition 分配和 Leader 选举。

从逻辑上讲,Kafka设计非常简单,它只有一种类似JMS的Topic的消息通道:

                              ┌──────────┐
                          ┌──>│Consumer-1│
                          │   └──────────┘
┌────────┐      ┌─────┐   │   ┌──────────┐
│Producer│─────>│Topic│───┼──>│Consumer-2│
└────────┘      └─────┘   │   └──────────┘
                          │   ┌──────────┐
                          └──>│Consumer-3│
                              └──────────┘

那么Kafka如何支持十万甚至百万的并发呢?答案是分区。Kafka的一个Topic可以有一个至多个Partition,并且可以分布到多台机器上:

            ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
             Topic
            │                   │
                ┌───────────┐        ┌──────────┐
            │┌─>│Partition-1│──┐│┌──>│Consumer-1│
             │  └───────────┘  │ │   └──────────┘
┌────────┐  ││  ┌───────────┐  │││   ┌──────────┐
│Producer│───┼─>│Partition-2│──┼─┼──>│Consumer-2│
└────────┘  ││  └───────────┘  │││   └──────────┘
             │  ┌───────────┐  │ │   ┌──────────┐
            │└─>│Partition-3│──┘│└──>│Consumer-3│
                └───────────┘        └──────────┘
            └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘

整体数据流程

Kafka 的总体数据流满足下图

img
(1)数据生产过程(Produce)

对于生产者要写入的一条记录,可以指定四个参数:分别是 topic、partition、key 和 value,其中 topic 和 value(要写入的数据)是必须要指定的,而 key 和 partition 是可选的。

对于一条记录,先对其进行序列化,然后按照 Topic 和 Partition,放进对应的发送队列中。如果 Partition 没填,那么情况会是这样的:a、Key 有填。按照 Key 进行哈希,相同 Key 去一个 Partition。b、Key 没填。Round-Robin 来选 Partition。

img

producer 将会和Topic下所有 partition leader 保持 socket 连接,消息由 producer 直接通过 socket 发送到 broker。其中 partition leader 的位置( host : port )注册在 zookeeper 中,producer 作为 zookeeper client,已经注册了 watch 用来监听 partition leader 的变更事件,因此,可以准确的知道谁是当前的 leader。

producer 端采用异步发送:将多条消息暂且在客户端 buffer 起来,并将他们批量的发送到 broker,小数据 IO 太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。

(2)数据消费过程(Consume)

对于消费者,不是以单独的形式存在的,每一个消费者属于一个 consumer group,一个 group 包含多个 consumer。特别需要注意的是:订阅 Topic 是以一个消费组来订阅的,发送到 Topic 的消息,只会被订阅此 Topic 的每个 group 中的一个 consumer 消费。

如果所有的 Consumer 都具有相同的 group,那么就像是一个点对点的消息系统;如果每个 consumer 都具有不同的 group,那么消息会广播给所有的消费者。

具体说来,这实际上是根据 partition 来分的,一个 Partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费,消费组里的每个消费者是关联到一个 partition 的,因此有这样的说法:对于一个 topic,同一个 group 中不能有多于 partitions 个数的 consumer 同时消费,否则将意味着某些 consumer 将无法得到消息。

同一个消费组的两个消费者不会同时消费一个 partition。

img

在 kafka 中,采用了 pull 方式,即 consumer 在和 broker 建立连接之后,主动去 pull(或者说 fetch )消息,首先 consumer 端可以根据自己的消费能力适时的去 fetch 消息并处理,且可以控制消息消费的进度(offset)。

partition 中的消息只有一个 consumer 在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见 kafka broker 端是相当轻量级的。当消息被 consumer 接收之后,需要保存 Offset 记录消费到哪,以前保存在 ZK 中,由于 ZK 的写性能不好,以前的解决方法都是 Consumer 每隔一分钟上报一次,在 0.10 版本后,Kafka 把这个 Offset 的保存,从 ZK 中剥离,保存在一个名叫 consumeroffsets topic 的 Topic 中,由此可见,consumer 客户端也很轻量级。

4、消息传送机制

Kafka 支持 3 种消息投递语义,在业务中,常常都是使用 At least once 的模型。

  • At most once:最多一次,消息可能会丢失,但不会重复。
  • At least once:最少一次,消息不会丢失,可能会重复。
  • Exactly once:只且一次,消息不丢失不重复,只且消费一次。
  • 这又是那里的概念?其他消息队列有吗?

Kafka的集群图。

Cluster Architecture

下表描述了上图中显示的每个组件。

组件和说明
**Broker(代理) **Kafka集群通常由多个代理组成以保持负载平衡。 Kafka代理是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。 一个Kafka代理实例可以每秒处理数十万次读取和写入,每个Broker可以处理TB的消息,而没有性能影响。 Kafka经纪人领导选举可以由ZooKeeper完成。
ZooKeeperZooKeeper用于管理和协调Kafka代理。 ZooKeeper服务主要用于通知生产者和消费者Kafka系统中存在任何新代理或Kafka系统中代理失败。 根据Zookeeper接收到关于代理的存在或失败的通知,然后生产者和消费者采取决定并开始与某些其他代理协调他们的任务。
***Producers(**生产者**)****生产者将数据推送给经纪人。 当新代理启动时,所有生产者搜索它并自动向该新代理发送消息。 Kafka生产者不等待来自代理的确认,并且发送消息的速度与代理可以处理的一样快。
****Consumers(**消费者**)****因为Kafka代理是无状态的,这意味着消费者必须通过使用分区偏移来维护已经消耗了多少消息。 如果消费者确认特定的消息偏移,则意味着消费者已经消费了所有先前的消息。 消费者向代理发出异步拉取请求,以具有准备好消耗的字节缓冲区。 消费者可以简单地通过提供偏移值来快退或跳到分区中的任何点。 消费者偏移值由ZooKeeper通知。

Kafka 分区是消息的线性有序序列,其中每个消息由它们的索引(称为偏移)来标识。Kafka 集群中的所有数据都是不相连的分区联合。 传入消息写在分区的末尾,消息由消费者顺序读取。

工作流程

发布 - 订阅消息的工作流程

以下是 Pub-Sub 消息的逐步工作流程 -

  • 生产者定期向主题发送消息。
  • Kafka 代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区,Kafka 将在第一分区中存储一个消息,在第二分区中存储第二消息。
  • 消费者订阅特定主题。
  • 一旦消费者订阅主题,Kafka 将向消费者提供主题的当前偏移,并且还将偏移保存在 Zookeeper 系统中。
  • 消费者将定期请求 Kafka (如100 Ms)新消息。
  • 一旦 Kafka 收到来自生产者的消息,它将这些消息转发给消费者。
  • 消费者将收到消息并进行处理。
  • 一旦消息被处理,消费者将向 Kafka 代理发送确认。
  • 一旦 Kafka 收到确认,它将偏移更改为新值,并在 Zookeeper 中更新它。 由于偏移在 Zookeeper 中维护,消费者可以正确地读取下一封邮件,即使在服务器暴力期间。
  • 以上流程将重复,直到消费者停止请求。
  • 消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息。

队列消息/用户组的工作流

在队列消息传递系统而不是单个消费者中,具有相同组 ID 的一组消费者将订阅主题。 简单来说,订阅具有相同 Group ID 的主题的消费者被认为是单个组,并且消息在它们之间共享。 让我们检查这个系统的实际工作流程。

  • 生产者以固定间隔向某个主题发送消息。
  • Kafka存储在为该特定主题配置的分区中的所有消息,类似于前面的方案。
  • 单个消费者订阅特定主题,假设 Topic-01 为 Group ID 为 Group-1 。
  • Kafka 以与发布 - 订阅消息相同的方式与消费者交互,直到新消费者以相同的组 ID 订阅相同主题Topic-01 1 。
  • 一旦新消费者到达,Kafka 将其操作切换到共享模式,并在两个消费者之间共享数据。 此共享将继续,直到用户数达到为该特定主题配置的分区数。
  • 一旦消费者的数量超过分区的数量,新消费者将不会接收任何进一步的消息,直到现有消费者取消订阅任何一个消费者。 出现这种情况是因为 Kafka 中的每个消费者将被分配至少一个分区,并且一旦所有分区被分配给现有消费者,新消费者将必须等待。
  • 此功能也称为使用者组。 同样,Kafka 将以非常简单和高效的方式提供两个系统中最好的。

ZooKeeper 的作用

Apache Kafka 的一个关键依赖是 Apache Zookeeper,它是一个分布式配置和同步服务。Zookeeper 是 Kafka 代理和消费者之间的协调接口。Kafka 服务器通过 Zookeeper 集群共享信息。Kafka 在 Zookeeper 中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息。

由于所有关键信息存储在 Zookeeper 中,并且它通常在其整体上复制此数据,因此Kafka代理/ Zookeeper 的故障不会影响 Kafka 集群的状态。Kafka 将恢复状态,一旦 Zookeeper 重新启动。 这为Kafka带来了零停机时间。Kafka 代理之间的领导者选举也通过使用 Zookeeper 在领导者失败的情况下完成。

基本操作

启动zk->启动kafka

用各种提供的脚本文件启动topic\启动生产者节点连接topic\

生产者将等待来自 stdin 的输入并发布到 Kafka 集群。 默认情况下,每个新行都作为新消息发布,然后在 config / producer.properties 文件中指定默认生产者属性。

消费者

与生产者类似,在config / consumer.proper-ties 文件中指定了缺省使用者属性。 打开一个新终端并键入以下消息消息语法。

语法

bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name --from-beginning

然后可以在终端看到生产者提供的消息

单节点多代理配置

指定Offset消费

auto.offset.reset = earlist | latest | none

当没有初始偏移量(第一次消费)或服务器上不再存在带你给钱偏移量时(如该数据已被删除)

  • earlist 自动将偏移量重置为最早的偏移量 --from-begining
  • latest 自动将偏移量重置为最新偏移量
  • none 如果未找到消费者组之前的偏移量,就直接像消费者抛出异常

如果想指定某一位置开始消费

  • 配置信息
    • 反序列化 Key_DESERIALIZER,VALUE_DE
    • 组id GROUP_ID_CONFIG

创建消费者

订阅主题

消费数据

保证分区分配方案制定完毕

        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        while(assignment.size() == 0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }
        for(TopicPartition topicPartition: assignment){
            kafkaConsumer.seek(topicPartition, 600);
        }

包装列表

package org.apache.kafka.clients.consumer;

public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {   
	public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
     List<ConsumerRecord<K, V>> recs = (List)this.records.get(partition);
     return recs == null ? Collections.emptyList() : Collections.unmodifiableList(recs);
 }

笔记

必须指定序列化类

kafka在发送和接受消息的时候,都是以byte[]字节型数组发送或者接受的。

https://blog.csdn.net/shirukai/article/details/82152172

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/450727.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IEEE-TMI:张孝勇团队开发小鼠精细脑结构自动分割的深度学习算法

近日&#xff0c;复旦大学类脑智能科学与技术研究院青年研究员张孝勇课题组联合德国亥姆霍兹慕尼黑研究中心&#xff0c;在医学图像处理领域顶尖期刊《IEEE医学影像汇刊》(IEEE Transactions on Medical Imaging&#xff0c;TMI) 发表了题为《MouseGAN&#xff1a;用于小鼠大脑…

OpenCV 模板匹配 matchTemplate

一、模板匹配概念 模板匹配是一项在一副图像中寻找与另一幅模板图像最匹配&#xff08;相似&#xff09;部分的技术。模板匹配不是基于直方图的&#xff0c;而是通过在输入图像上滑动图像块&#xff08;模板&#xff09;同时对比相似度&#xff0c;来对模板和输入图像进行匹配的…

【IoT】如何使用软件加密(文件夹加密工具.exe),并破解工具

目录 第一步&#xff1a;显示隐藏的文件。 第二步&#xff1a;将隐藏文件变成文件夹。 第三步&#xff1a;解密文件。 有时候出差或者有些商务场合&#xff0c;需要对一些敏感文件做一下简单的加密&#xff0c;这样在分享内容的时候&#xff0c;可以起到初步的保护作用。 当…

Windows利用easyBCD装Ubuntu双系统

一、准备材料&#xff1a; 1、easyBCD软件&#xff08;我用V2.3版本&#xff09; 2、Ubuntu系统&#xff08;我用的12.04版本ubuntu-12.04-desktop-i386.iso&#xff09; 二、配置空闲分区 1、右击“计算机”--->管理--->右击某个自己分配的分区--->选择“压缩卷”…

Deep Bidirectional Language-Knowledge Graph Pretraining论文阅读

Deep Bidirectional Language-Knowledge Graph Pretraining github代码 摘要 最近的工作表明&#xff0c;知识图(KG)可以补充文本数据&#xff0c;提供结构化的背景知识&#xff0c; 为推理提供有用的支架。然而&#xff0c;这些作品并没有经过预先的训练来学习大规模的两种…

央视推荐的护眼灯是哪款?盘点央视推荐的护眼灯排名

护眼灯是生活中最常见的照明工具&#xff0c;许多人担心品质不过关 不合格的护眼灯会造成视损害 建议在选购护眼灯时&#xff1a; 首先看清楚产品的具体标识&#xff0c;其中就包括有产品的生产厂家&#xff0c;生产地址以及他们的产品型号 看产品规格和是否获得了国家的相关…

博睿数据蝉联中国APM市场份额第一,Bonree ONE春季正式版重磅发布

日前&#xff0c;IDC发布《中国IT统一运维软件产品市场跟踪报告&#xff0c;2022H2》,2022下半年中国APM市场环比增长近10%。博睿数据以市场份额达18.28%蝉联APM应用性能监控市场份额第一。 追求卓越&#xff0c;顺势而为 博睿数据作为中国领先的一体化智能可观测平台&#xf…

今天主要谈谈关于申请美国专利的一些问题

进入2021年&#xff0c;国家更多的开始鼓励在国外开展业务的企业去布局国外专利&#xff0c;提升企业海外竞争力。无他&#xff0c;着实是我们在知识产权保护方面起步太晚&#xff0c;已经吃亏了太久&#xff0c;专利掣肘&#xff0c;技术卡脖子&#xff0c;勤勉的为别人打工。…

Domino自带的JSON校验工具

大家好&#xff0c;才是真的好。 JSON数据在Notes/Domino已经变得非常重要。从Domino 10开始&#xff0c;在LotusScript语言中就加入了对JSON数据处理功能。在管理中&#xff0c;我们知道&#xff0c;从Domino 12版本开始就支持Domino自动化配置&#xff0c;也是使用JSON数据作…

利用ESP32-C3实现一个风扇PWM控制器,可网页操作

1简介 这段代码是一个基于ESP32开发板的PWM控制器&#xff0c;可以通过网页输入控制参数并显示在屏幕上&#xff0c;通过PWM输出引脚控制风扇的转速&#xff0c;还可以测量风扇的转速并在屏幕上显示。此外&#xff0c;代码还具备显示当前时间、显示Wi-Fi连接信息等功能。 2函数…

【Git基础】常用git命令(三)

文章目录 1.版本回退1.1 没有commit的情况1.2 已经commit但没有push的情况1.3 已经push到远端仓库的情况 2. 删除文件2.1 从工作区删除文件2.2 使用git rm命令删除文件2.3 永久删除文件2.4 永久删除文件的步骤拆解 3. 查看指定文件的修改3.1 查看文件的所有commit3.2 查看所有c…

亚控组态王与EXCEL通信

先创建一个IO设备&#xff1a;DDE类型 创建一个变量&#xff1a; 创建一个窗口&#xff0c;建立一个文本显示并关联前面建立的变量 先打开一个EXCEL文件&#xff08;注意&#xff1a;WPS是不兼容的&#xff0c;必须先打开EXCEL文件&#xff0c;否则会报错&#xff09; …

云原生|kubernetes|rancher-2.6.4安装部署简明手册

前言: rancher是一个比较特殊的开源的kubernetes管理工具&#xff0c;特殊在它是一个名称为k3s的简单kubernetes集群&#xff0c;而该集群是在kubernetes集群内的。 OK&#xff0c;本文将讲述如何在centos7服务器上&#xff0c;在已有的kubernetes-1.23.15集群内&#xff0c;…

基于STM32的电阻、电容测量(NE555芯片RC振荡法)

文章目录 前言一、电路图1.电阻测量公式2.电容测量公式 二、代码实现1.外部中断代码2.定时器中断处理数据 总结 前言 做的一个关于电阻和电容的测量电路&#xff0c;都是比较通用的。经过实际测试&#xff0c;电容测量电路还是可以的&#xff0c;电阻测量电路有一个缺点就是&a…

Prometheus/Metrics监控dubbo线程池状态

网上找了些文章&#xff0c;发现挺少的&#xff0c;下面一些文章有些帮助 https://cloud.tencent.com/developer/article/1800906 https://github.com/apache/dubbo/issues/6625 其中第二篇文有写到&#xff0c;通过dubbo的spi获取DataStore&#xff0c;然后再拿到dubbo的线程…

关于jvm-sandbox-repeater dubbo回放异常的问题处理

还是引流回放的问题&#xff0c;今天测试的同学反馈说他做了流量回放&#xff0c;但是回放的好几个接口报错了&#xff0c;都是抛出来的服务器错误&#xff0c;请联系管理员&#xff0c;与预期的结果不符&#xff0c;但是实际这块的逻辑是没有改动的&#xff0c;所以也只能是du…

Ingonyama团队的ZKP加速

1. PipeMSM&#xff08;cloud-ZK&#xff09;&#xff1a;ZKPFPGA Ingonyama团队2022年发表了论文《PipeMSM: Hardware Acceleration for Multi-Scalar Multiplication》&#xff0c;尝试将ZK操作与FPGA结合&#xff0c;并为未来ZK与ASIC&#xff08;Application Specific Int…

无法解析的外部符号 __mingw_vsprintf

windows下的ffmpeg是采取mingw平台上编译&#xff0c;本人采用的是msys2&#xff0c;本人需要h264&#xff0c;于是先在msys2里面编译了x264静态库&#xff0c;注意这里是静态库&#xff0c;动态库经过了链接&#xff0c;不会出现下面的问题&#xff0c;然后在ffmpeg里面用下面…

【C++类】

目录 前言类的定义类的访问限定符及封装访问限定符封装 类的大小为什么需要内存对齐为什么成员函数不占用类的内存&#xff1f;为什么空类的大小是1个字节&#xff1f; 前言 今天是少年正式认识"对象"的第一天,虽然此"对象"非彼对象&#xff0c;但是少年也…

使用Jmeter进行http接口测试

前言&#xff1a; 本文主要针对http接口进行测试&#xff0c;使用Jmeter工具实现。 Jmter工具设计之初是用于做性能测试的&#xff0c;它在实现对各种接口的调用方面已经做的比较成熟&#xff0c;因此&#xff0c;本次直接使用Jmeter工具来完成对Http接口的测试。 一、开发接口…