一文理解Kafka

news2025/1/23 7:04:33

概述
        Kafka是一个基于Zookeeper的分布式消息中间件,支持消息分区,提供发布和订阅功能。使用Scala编写,主要特点是可水平扩展,高吞吐率以及高并发。

        常见的使用场景:

  • 企业级别活动数据和运营数据的消息传递,活动数据一般包括页面的访问,搜索。运营数据包括服务器上CPU,IO,用户活跃度等数据。
  • 日志收集,收集的日志对接hadoop,Hbase,Elasticsearch等系统。
  • 流式处理,支持spark streaming和storm。

 
 

基本架构以及概念
        Kafka的主要工作原理是多个Producer发送Topic消息体到Kafka集群上,消息首先会存放在不同Broker对应的Leader分区上,Follower分区拉取Leader分区消息并写入日志,Consumer客户端同时也拉取Leader分区消息,完成消息消费。

         上图中,Kafka集群中有3台Broker,Kafka集群在启动的时候会将自身信息注册到Zookeeper集群中,保证信息的一致性。Producer有3个,分别发送Topic为A,B,C的消息体道Kafka集群中。Kafka集群中Topic A的Partition数为2,Replication数为3,Topic B的Partition数为1,Replication数为3,Topic C的Partition数为1,Replication数为2.每个Partition有主从之分,主Partition会接收Producer消息并共Consumer消费,从Partition只会从主Partition接收数据,不会和Producer以及Cosumer有直接联系。多个Consumer可以组成一个Group,同一group下不同的Consumer只能消费同一Topic下不同Partition的消息。例如Consumer Group A下的Consumer0和Consumer1只能分别消费Topic A中Partition0和Partition1的消息。

        以下是Kafka部分概念解析

  •  Producer:消息生产者。
  •  Consumer:消息消费者。
  •  Consumer Group: 消费者群组,包含多个消费者,同组消费者消费同一个Topic下不同的分区的消息。
  •  Broker: Kafka实例,可以理解为不同的kafka服务器,每个都有一个唯一的编号。
  •  Message: 生产者传递给消费者的消息体。
  •  Topic: 消息主题,Broker上有不同的Topic, Message发送到不同的Topic供消费者消费。
  •  Partition: 相当于将消息进行了分发,一个Topic可以分为多个分区,消费者群组里面的消费者可以同时消费不同分区里面的消息,提高了吞吐量。
  •  Replication: 分区副本,默认最大为10个,不能大于Broker的数量,当分区的Leader挂掉之后,Follower继续工作,提供可靠性保证。
  • Offset:消息持久化中消息的位置偏移信息。
  •  zookeeper: 保存Kafka集群的信息的Metadata,同样提供了可靠性保证

具体工作流程

  1. Producer发送数据到Broker

        同一Topic下的消息在集群中有多个分区,Producer发送数据的时候总会发送给Leader分区,Leader分区再将数据同步给其他Follower分区,等待所有的Follower同步完成之后向Leader分区返回ack消息,Leader分区接收到所有的Follower分区ack之后向Producer发送ack,确认消息接收完成。

        Leader分区的选择是首先所有Broker选取出一个Controller,由Controller指定分区的leader。

        其中ACK应答机制是有参数可以设置的,值为0,1,all;来确定kafka是否有接收到数据,这3个参数的含义如下:

  • 0:Producer发送完数据后直接返回,不会等待集群的ack消息
  • 1:Producer只要leader分区应答ack即可,不用其他follower应答ack
  • all: Producer要等待集群中所有分区都回复ack才会继续发送下一条数据,否则发送失败

        ACK应答机制能够确保消息的可靠性。但是可靠性和消息交互速率是一对矛盾体。消息越可靠,相对传输速率就会降低。

        同样,Producer发送消息到broker,到底发送到了那一个分区,通常遵循以下规则:

  • Producer在发送时指定
  • 如果没有指定但是设置了数据key, 就会对数据key进行hash,根据hash之后的值选定分区
  • 如果上述两者都没有设定,则轮询选择分区

     2. Broker保存数据

        Kafka的数据是保存在磁盘的,之所以采用文件追加的方式进行存放,实际是采用了顺序IO的方式,避免随机IO造成大量的耗时。一个Topic有多个Partition,每个partition相当于一个有序的队列。每个parition以文件夹的形式存储在Broker上。

       a) Partition存储结构

         Partiion采用分段(segment)存储的方式,每段有3个文件:.log, .index, .timeindex。

.log数据存储文件,存放位置position和消息对应关系
.indexoffset索引文件,存放offset和position对应关系,offset代表消息顺序,position代表消息在磁盘中的位置
.timeindex时间索引文件, 存放时间戳和offset的对应关系

        以下是Partition存放文件夹对应示意图。

       b) Message存储结构

        Message在.log文件中存放,具体字段和含义如下

字节描述
8Position
4消息体大小
4CRC32校验值
1kafka版本号
1attributes
4key的长度
mkey的内容
4payload长度
npayload内容

        c) 两个概念LEO和HW

         LEO(Log End Offset):  表示每个Partiotion log中最后一条message的offset位置。
         HW(High Water Mark): 是统一Partiotion中各个Replicas数据同步一直的offset位置,该位置前的数据consumer可见,该位置之后不可见。

 

        d)通过索引定位消息

       以下是一个例子: 找出offset为7的消息内容

        1)首先通过offset值7确定文件在哪个segment中,显然在00000000000000000.index,这一步是offset值和index文件名进行比对。

        2)index文件索引采用的是稀疏索引进行存储,有可能恰好没有对应的offset值,所以这里是利用二分查找找到小于等于offset值的那条记录,这里找到offset=6,取出Message在log文件中的位置为9807。

        3) 在log文件中从position为9807的位置顺序检索,首先找到的是offset为6的数据,然后加上消息体大小,定位出offset为7的数据位置,然后读取该message数据。

    d) 数据清理策略

        清理策略:时间和大小阈值(时间默认超过7天或者大小超过1G,清除日志)

 #清理超过指定时间的消息,默认是168小时,7天,
 #还有log.retention.ms, log.retention.minutes, log.retention.hours,优先级高到低
 log.retention.hours=168​
 #超过指定大小后,删除旧的消息,下面是1G的字节数,-1就是没限制
 log.retention.bytes=1073741824


    3. Consumer消费数据:
        消费者通常会有一个消费者群组,同一消费组中的消费者可以消费一个Topic不同分区的数据。不会有两个同组消费者消费同一topic下同一分区的消息。
   

 

        消费者记录消费消息的信息在早期版本会记录在zookeeper中,后边的版本统一记录在_consumer_offsets topic下。

集群搭建
        本文采用docker-compose部署kafka集群以及UI页面,docker版本:18.06.3-ce  docker-compose版本:1.24.1。下图中的10.232.112.13为宿主机的IP,注意需要替换

version: "3"

services:
  zookeeper:
    image: 'bitnami/zookeeper:3.6'
    container_name: zookeeper
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    volumes:
      - ./zookeeper:/bitnami/zookeeper
    restart: always

  kafka1:
    image: 'bitnami/kafka:3.0'
    container_name: kafka1
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.232.112.13:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    # restart: always
    depends_on:
      - zookeeper
  kafka2:
    image: 'bitnami/kafka:3.0'
    container_name: kafka2
    ports:
      - '9093:9093'
    environment:
      - KAFKA_BROKER_ID=2
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.232.112.13:9093
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    # restart: always
    depends_on:
      - zookeeper

  kafka3:
    image: 'bitnami/kafka:3.0'
    container_name: kafka3
    ports:
      - '9094:9094'
    environment:
      - KAFKA_BROKER_ID=3
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.232.112.13:9094
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    # restart: always
    depends_on:
      - zookeeper
  kafka-ui:
    image: 'provectuslabs/kafka-ui'
    container_name: kafka-ui
    ports:
      - "18080:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=CLUUSTER001
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=10.232.112.13:9092
    # restart: always
    depends_on:
      - zookeeper
      - kafka1
      - kafka2
      - kafka3

Demo代码

        Producer代码

public class KProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","10.232.112.13:9192");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer",StringSerializer.class.getName());

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> stringStringProducerRecord = new ProducerRecord<String, String>("Test",3,"testKey","hello");
        Future<RecordMetadata> send = kafkaProducer.send(stringStringProducerRecord);
        RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);

    }
}

        Consumer代码

public class KConsumer {
    public static void main(String[] args) throws InterruptedException {

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","10.232.112.13:9192");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer",StringDeserializer.class.getName());
        properties.setProperty("group.id","1111");
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList("Test"));

        while (true){
            ConsumerRecords<String, String> poll = consumer.poll(500);
            for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {
                System.out.println("**********" + stringStringConsumerRecord.key() + stringStringConsumerRecord.value());
            }

        }   

    }
}

Kafka的优缺点

        优点:        

        1、高吞吐量:Kafka支持高吞吐量的传输,可以支持数千个客户端和每秒数百万条消息。

        2、可扩展性:Kafka支持水平扩展,可以添加更多的节点来支持多客户端和更多的消息。

        3、可靠性:Kafka支持消息的可靠传输,可以确保消息不会丢失。

        4、低延迟:Kafka支持低延迟的消息传输,可以确保消息能够及时到达消费者。

        缺点:

        1、管理复杂性:Kafka的管理比较复杂,需要对Kafka集群进行维护和监控。

         2、消息顺序:Kafka不能保证消息的顺序,因为消息可能会被分发到不同的分区中。

到这里,你了解Kafka了吗

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/537133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年自动化测试如何学?从头开始自动化测试指南,一路晋升...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 Python自动化测试&…

❤ 用JS 从零开始开发一个 Chrome 提示插件(简单易学 10分钟搞定)

❤ 为自己量身手写一个chrome暖心插件&#xff08;资源文章最后&#xff09; ❤ 最近看到了一个很温馨的提示代码,于是想着为自己的浏览器做一款chrome插件 1、chrome 插件理解&#xff1a; 一个html js css image的一个web应用 不同于普通的web应用&#xff0c; chrome插…

怎么把视频转换成gif动图,5个超强工具分享

在我们平时的聊天中&#xff0c;会经常遇到需要将视频转换成gif动图的情况。这样一来&#xff0c;我们可以轻松将视频中的经典片段转换成gif动图&#xff0c;方便分享和娱乐。同时&#xff0c;这种方式不仅能够传播视频内容&#xff0c;还能带来很多趣味。 然而&#xff0c;许…

1,Hadoop的基本概念和架构

Hadoop的基本概念和架构 学习路线 hadoop的基本概念和架构hadoop的安装和配置hadoop的HDFS文件系统hadoop的MapReduce计算框架hadoop的YARN资源管理器hadoop的高级特效&#xff0c;如HBase&#xff0c;Hive&#xff0c;Pig等hadoop的优化和调优hadoop的应用场景&#xff0c;如…

Qt中的互斥锁(QMutex和QMutexLocker)

QMutex和QMutexLocker 类 QMutex 的主要函数有&#xff1a; lock ()&#xff1b; 加锁&#xff0c;如果该互斥锁被占用&#xff0c;该函数阻塞&#xff0c;直到互斥锁被释放。unlock ()&#xff1b; 解锁bool tryLock (int timeout 0)&#xff1b; 表示尝试去加锁&#xff0…

如何用R语言分析COVID-19相关数据

一、概述 COVID-19是当前全球面临的一项重大挑战。 本文将介绍如何使用R语言分析COVID-19相关数据&#xff0c;探索其感染率、死亡率和人口特征的相关性&#xff0c;以及使用统计建模方法预测COVID-19的死亡率。 二、数据导入与筛选 COVID-19 Data Repository by the Center…

CSS的使用

CSS 概述 CSS 是一门语言&#xff0c;用于控制网页表现。我们之前介绍过W3C标准。W3C标准规定了网页是由以下组成&#xff1a; 结构&#xff1a;HTML表现&#xff1a;CSS行为&#xff1a;JavaScript CSS也有一个专业的名字&#xff1a;Cascading Style Sheet&#xff08;层…

一起了解大数据可视化开发

在办公自动化快速发展的今天&#xff0c;大数据可视化开发的应用价值普遍增高。借助它的灵活、便捷、易操作等特性&#xff0c;可以助力企业实现办公自动化提质增效&#xff0c;数字化进程快速发展&#xff0c;因而得到了大家的信赖与支持。那么&#xff0c;什么是大数据可视化…

Spring Boot 使用SSL-HTTPS

Spring Boot 使用SSL-HTTPS HTTPS协议可以理解为HTTPSSL/TLS&#xff0c;可以理解为HTTP下加入了SSL层&#xff0c;通过SSL证书来验证服务器的身份&#xff0c;并为浏览器和服务器之间的通信进行加密。 SSL(Secure Socket Layer安全套接字层)&#xff1a;SSL协议位于TCP/IP协…

【Jenkins】Jenkins拉取Github代码(windows)

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化 &#x1f449;专__注&#x1f448;&#xff1a;专注主流机器人、人工智能等相关领域的开发、…

虹科新品 | 高可靠性、可适用于高磁/压的线性传感器!

PART 1 什么是线性传感器&#xff1f; 基本上&#xff0c;线性传感器是一种用于测量位移和距离的设备&#xff0c;具有高可靠性。测量网格通过光学传感器移动测量数据&#xff0c;数据被光学记录并通过控制器转换为电气数据&#xff0c;而控制器又可以转换为路径。 因此&…

怎么删除文件?分享3个文件删除的正确方法!

案例&#xff1a;怎么删除文件 【我每次想要删除文件时都感觉好麻烦啊&#xff01;想问问大家在删除文件时都是怎么进行操作的呢&#xff1f;】 在日常使用电脑的过程中&#xff0c;删除文件是一个很常见的操作&#xff0c;但是并不是每个人都知道删除文件的正确方式。正确的删…

企业做网站需要什么条件?

随着互联网的不断发展&#xff0c;企业做网站已成为市场营销的必要手段。但是&#xff0c;要想让一个网站达到预期效果&#xff0c;需要具备一定的条件和技巧。本文将从以下几个方面介绍企业做网站的条件和优化方法。 第一步&#xff1a;明确目标 企业做网站的第一步就是要明确…

【JAVAEE】线程安全的集合类及死锁

目录 1.多线程环境使用集合类 2.多线程环境使用队列 3.多线程环境使用哈希表 3.1HashTable 3.2ConcurrentHashMap 4.死锁 4.1死锁是什么 4.2死锁的代码示例 4.3产生死锁的原因 4.4如何避免死锁 这里有一个代码示例&#xff1a; 定义一个普通的集合类&#xff0c;通过…

动态规划之背包模型

文章目录 采药&#xff08;01背包&#xff09;装箱问题&#xff08;01背包&#xff09;宠物小精灵之收服(二维费用01背包&#x1f44d;&#x1f618;)数字组合(01背包)买书&#xff08;完全背包&#xff09;货币系统&#xff08;完全背包&#xff09; 采药&#xff08;01背包&a…

ROS:yaml文件解析:base_local_planner、global_costmap、local_costmap、base_local_planner

一.costmap_common_params.yaml # 设置了代价地图中障碍物信息的阀值 # obstacle_range&#xff1a;确定了最大范围传感器读数&#xff0c;这将导致障碍物被放入代价地图中。 # 此处设置为2.5m&#xff0c;意为着机器人只会更新其地图包含距离移动基座2.5m以内的障碍物信息 obs…

Python学习之用QTimer计时器实现摄像头视频的播放和暂停

在上一篇文章《Python学习之简易视频播放器》中&#xff0c;通过python-opencv-pyqt5&#xff0c;实现了有界面的视频播放。但是&#xff0c;上文代码只有播放&#xff0c;却无法让播放的视频暂停。这是因为&#xff0c;我们在播放中使用的是while(self.cap.isOpened())循环。若…

上海亚商投顾:沪指震荡调整跌0.21% 两市成交金额不足8000亿

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 市场情绪 三大指数今日震荡调整&#xff0c;上证50午后一度跌超1%&#xff0c;以保险为首的权重板块走低。军工股逆市大涨&a…

玩机搞机----电脑端几种反编译apk工具操作步骤解析

经常玩机的友友避免不了有时候需要反编译有些app或者JAR文件等等。目前各种反编译工具很多。各有所长吧。很多都是就过工具结合使用。而且很多app涉及到加密加壳。由于有些工具没有及时更新。老版本的底层还是apktool_2.4这些。对于新款的app反编译有点吃力且兼容性不太好。当然…

yolov2

yolov2相对于yolov1的改进&#xff1a; 1、加入Batch Normalization 2、yolov2使用更大的分辨率图片 V1训练使用图片分辨率为224*224&#xff0c;测试图片分辨率为448*448。 V2在V1上的改进为&#xff1a;V2训练时额外又进行了10次448*448的微调。 3、yolov2的网络结构 相…