kafka再浅析

news2025/1/11 22:46:00

在日常开发中,经常使用kafka,对它是既熟悉又陌生,下面继续聊,继续总结。
1、消息中间件
分布式消息是一种通信机制,和RPC、HTTP不一样,消息中间件采用分布式中间代理的方式进行通信。采用消息中间件后,上游业务系统发送消息,消息先存储在消息中间件,然后由消息中间件将消息分发到对应的下游业务系统,这种异步的方式减少了服务之间的耦合程度。
在系统架构中引用额外的组件,必然提高系统的架构复杂度和运行的难度,那么消息中间件又带来了哪些优势呢?

  • 解耦
  • 冗余
  • 扩展性
  • 削峰
  • 顺序性
  • 异步通信

2、kafka基本概念
在这里插入图片描述
(摘自网上)

  • producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka;
  • consumer:消费者,也就是接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理;
  • consumer group:一个消费者组可以包含一个或多个消费者。使用多分区 + 多消费者方式可以极大提高数据下游的处理速度,同一消费组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消费消息时互不影响。Kafka 就是通过消费组的方式来实现消息 P2P 模式和广播模式;
  • broker:服务代理节点。Broker 是 Kafka 的服务节点,即 Kafka 的服务器;
  • topic:Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费;
  • partition:Topic 是一个逻辑的概念,它可以细分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset);
  • offset:offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序性而不是主题有序性;
  • replication:副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络异常,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务;
  • record:实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了 key、value 和 timestamp;

自问自答:

  • 简单讲下 Kafka 的架构?
    答:Producer、Consumer、Consumer Group、Topic、Partition
  • Kafka 是推模式还是拉模式,推拉的区别是什么?
    答:Kafka Producer 向 Broker 发送消息使用 Push 模式,Consumer 消费采用的 Pull 模式。拉取模式,让 consumer 自己管理 offset,可以提供读取性能。
  • Kafka 如何广播消息?
    答:Consumer group
  • Kafka 的消息是否是有序的?
    答:Topic 级别无序,Partition 有序;
  • Kafka 是否支持读写分离?
    答:不支持,只有 Leader 对外提供读写服务;
  • Kafka 如何保证数据高可用?
    答:副本,ack,HW
  • Kafka 中 zookeeper 的作用?
    答:集群管理,元数据管理

3、kafka命令行工具
Kafka 的命令行工具在 Kafka 包的/bin目录下,主要包括服务和集群管理脚本,配置脚本,信息查看脚本,Topic 脚本,客户端脚本等。

  • kafka-configs.sh:配置管理脚本
  • kafka-console-consumer.sh:kafka 消费者控制台
  • kafka-console-producer.sh:kafka 生产者控制台
  • kafka-consumer-groups.sh:kafka 消费者组相关信息
  • kafka-delete-records.sh:删除低水位的日志文件
  • kafka-log-dirs.sh:kafka 消息日志目录信息
  • kafka-mirror-maker.sh:不同数据中心 kafka 集群复制工具
  • kafka-preferred-replica-election.sh:触发 preferred replica 选举
  • kafka-producer-perf-test.sh:kafka 生产者性能测试脚本
  • kafka-reassign-partitions.sh:分区重分配脚本
  • kafka-replica-verification.sh:复制进度验证脚本
  • kafka-server-start.sh:启动 kafka 服务
  • kafka-server-stop.sh:停止 kafka 服务
  • kafka-topics.sh:topic 管理脚本
  • kafka-verifiable-consumer.sh:可检验的 kafka 消费者
  • kafka-verifiable-producer.sh:可检验的 kafka 生产者
  • zookeeper-server-start.sh:启动 zk 服务
  • zookeeper-server-stop.sh:停止 zk 服务
  • zookeeper-shell.sh:zk 客户端

通常可以使用kafka-console-consumer.sh和kafka-console-producer.sh脚本来测试 Kafka 生产和消费,kafka-consumer-groups.sh可以查看和管理集群中的 Topic,kafka-topics.sh通常用于查看 Kafka 的消费组情况。

4、kafka producer
Kafka producer 的正常生产逻辑包含以下几个步骤:

  1. 配置生产者客户端参数;
  2. 构建待发送的消息;
  3. 发送消息;
  4. 关闭生产者实例;

producer发送消息的过程如下,需要经过拦截器、序列化器和分区器,最终由累加器批量发送至broker。
在这里插入图片描述
(摘自网上)
kafka producer参数:

  • bootstrap.server:指定 Kafka 的 Broker 的地址;
  • key.serializer:key 序列化器;
  • value.serializer:value 序列化器;
  • batch.num.messages:默认值:200,每次批量消息的数量,只对 asyc 起作用;
  • request.required.acks:默认值:0,0 表示 producer 毋须等待 leader 的确认,1 代表需要 leader 确认写入它的本地 log 并立即确认,-1 代表所有的备份都完成后确认。只对 async 模式起作用,这个参数的调整是数据不丢失和发送效率的 tradeoff,如果对数据丢失不敏感而在乎效率的场景可以考虑设置为 0,这样可以大大提高 producer 发送数据的效率;
  • request.timeout.ms:默认值:10000,确认超时时间;
  • partitioner.class:默认值:kafka.producer.DefaultPartitioner,必须实现 kafka.producer.Partitioner,根据 Key 提供一个分区策略。有时候我们需要相同类型的消息必须顺序处理,这样我们就必须自定义分配策略,从而将相同类型的数据分配到同一个分区中;
  • producer.type:默认值:sync,指定消息发送是同步还是异步。异步 asyc 成批发送用 kafka.producer.AyncProducer, 同步 sync 用 kafka.producer.SyncProducer。同步和异步发送也会影响消息生产的效率;
  • compression.topic:默认值:none,消息压缩,默认不压缩。其余压缩方式还有,“gzip”、“snappy"和"lz4”。对消息的压缩可以极大地减少网络传输量、降低网络 IO,从而提高整体性能;
  • compressed.topics:默认值:null,在设置了压缩的情况下,可以指定特定的 topic 压缩,未指定则全部压缩;
  • message.send.max.retries:默认值:3,消息发送最大尝试次数;
  • retry.backoff.ms:默认值:300,每次尝试增加的额外的间隔时间;
  • topic.metadata.refresh.interval.ms:默认值:600000,定期的获取元数据的时间。当分区丢失,leader 不可用时 producer 也会主动获取元数据,如果为 0,则每次发送完消息就获取元数据,不推荐。如果为负值,则只有在失败的情况下获取元数据;
  • queue.buffering.max.ms:默认值:5000,在 producer queue 的缓存的数据最大时间,仅仅 for asyc;
  • queue.buffering.max.message:默认值:10000,producer 缓存的消息的最大数量,仅仅 for asyc;
  • queue.enqueue.timeout.ms:默认值:-1,0 当 queue 满时丢掉,负值是 queue 满时 block, 正值是 queue 满时 block 相应的时间,仅仅 for asyc;

5、kafka comsumer
Kafka 有消费组的概念,每个消费者只能消费所分配到的分区的消息,每一个分区只能被一个消费组中的一个消费者所消费,所以同一个消费组中消费者的数量如果超过了分区的数量,将会出现有些消费者分配不到消费的分区。
Kafka Consumer Client 消费消息通常包含以下步骤:

  1. 配置客户端,创建消费者;
  2. 订阅主题;
  3. 拉取消息并消费;
  4. 提交消费位移;
  5. 关闭消费者实例;

因为 Kafka 的 Consumer 客户端是线程不安全的,为了保证线程安全,并提升消费性能,可以在 Consumer 端采用类似 Reactor 的线程模型来消费数据。
在这里插入图片描述
(摘自网上)
kafka consumer参数:

  • bootstrap.servers:连接 broker 地址,host:port 格式;
  • group.id:消费者隶属的消费组;
  • key.deserializer:与生产者的key.serializer对应,key 的反序列化方式;
  • value.deserializer:与生产者的value.serializer对应,value 的反序列化方式;
  • session.timeout.ms:coordinator 检测失败的时间。默认 10s 该参数是 Consumer Group 主动检测 (组内成员 comsummer) 崩溃的时间间隔,类似于心跳过期时间;
  • 默认值是 latest,也就是从最新记录读取数据(消费者启动之后生成的记录),另一个值是 earliest,意思是在偏移量无效的情况下,消费者从起始位置开始读取数据;
  • enable.auto.commit:否自动提交位移,如果为false,则需要在程序中手动提交位移。对于精确到一次的语义,最好手动提交位移;
  • fetch.max.bytes:单次拉取数据的最大字节数量;
  • max.poll.records:单次 poll 调用返回的最大消息数,如果处理逻辑很轻量,可以适当提高该值。但是max.poll.records条数据需要在 session.timeout.ms 这个时间内处理完 。默认值为 500;
  • request.timeout.ms:一次请求响应的最长等待时间。如果在超时时间内未得到响应,kafka 要么重发这条消息,要么超过重试次数的情况下直接置为失败;

6、kafka rebalance
rebalance 本质上是一种协议,规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每个分区。比如某个 group 下有 20 个 consumer,它订阅了一个具有 100 个分区的 topic。正常情况下,Kafka 平均会为每个 consumer 分配 5 个分区。这个分配的过程就叫 rebalance。
rebalance 的触发条件有三种:

  • 组成员发生变更(新 consumer 加入组、已有 consumer 主动离开组或已有 consumer 崩溃了)
  • 订阅主题数发生变更
  • 订阅主题的分区数发生变更

Kafka 默认提供了两种分配策略:

  • Range
  • Round-Robin

7、kafka高可用
在分布式数据系统中,通常使用分区来提高系统的处理能力,通过副本来保证数据的高可用性。多分区意味着并发处理的能力,这多个副本中,只有一个是 leader,而其他的都是 follower 。仅有 leader 副本可以对外提供服务。多个 follower 副本通常存放在和 leader 副本不同的 broker 中。通过这样的机制实现了高可用,当某台机器挂掉后,其他 follower 副本也能迅速”转正“,开始对外提供服务。
follower 副本不提供读服务,试想一下,如果 follower 副本也对外提供服务那会怎么样呢?首先,性能是肯定会有所提升的。但同时,会出现一系列问题。类似数据库事务中的幻读,脏读。比如你现在写入一条数据到 kafka 主题 a,消费者 b 从主题 a 消费数据,却发现消费不到,因为消费者 b 去读取的那个分区副本中,最新消息还没写入。而这个时候,另一个消费者 c 却可以消费到最新那条数据,因为它消费了 leader 副本。Kafka 通过 WH 和 Offset 的管理来决定 Consumer 可以消费哪些数据,已经当前写入的数据。
只有 Leader 可以对外提供读服务,那如何选举 Leader。kafka 会将与 leader 副本保持同步的副本放到 ISR 副本集合中。当然,leader 副本是一直存在于 ISR 副本集合中的,在某些特殊情况下,ISR 副本中甚至只有 leader 一个副本。当 leader 挂掉时,kakfa 通过 zookeeper 感知到这一情况,在 ISR 副本中选取新的副本成为 leader,对外提供服务。但这样还有一个问题,前面提到过,有可能 ISR 副本集合中,只有 leader,当 leader 副本挂掉后,ISR 集合就为空,这时候怎么办呢?这时候如果设置 unclean.leader.election.enable 参数为 true,那么 kafka 会在非同步,也就是不在 ISR 副本集合中的副本中,选取出副本成为 leader。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/118182.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MarkDown语法(自用)

目录结构展示 tree -a 显示所有tree -d 只显示文档夹tree -L n 显示项目的层级,n表示层级数,比如想要显示项目三层结构,可以用tree -l 3;tree -I pattern 用于过滤不想要显示的文档或者文档夹。比如你想要过滤项目中的 node_modu…

Linux之(17)系统服务

Linux之(17)系统服务 Author:onceday Date:2022年12月24日 漫漫长路,有多少人对你笑过… 参考文档: Systemd 入门教程:命令篇 - 阮一峰的网络日志 (ruanyifeng.com)可能是史上最全面易懂的 Systemd 服务管理教程&a…

MergeTree原理之一级索引

一级索引 MergeTree的主键使用PRIMARY KEY定义,待主键定义之后,MergeTree会依据index_granularity间隔(默认8192行),为数据表生成一级索引并保存至primary.idx文件内,索引数据按照PRIMARY KEY排序。相比使…

【PotPlayer】采集Switch图像及录制

【PotPlayer】采集Switch图像及录制下载potplayer使用方法连接设备录制视频无边框设置阳,休,懂?QAQ。阳之前买了个Switch,正好有好玩的想录下来,然后就…自行某宝,某东去买个采集卡。本文只管连软件&#x…

【运维有小邓】ADSelfService Plus身份管理

一、身份管理挑战: 由于企业需要越来越高的安全性,以保护用户帐户免遭入侵者的任何恶意攻击,因此身份管理正日益变得重要。在所有密码相关的身份挑战中,帮助台工作单量成为重中之重,它们在组织的年同比财务预算中不堪…

(二)ElasticSearch使用

一、ES的基本使用 1.创建索引 创建一个test索引http://localhost:9200/test 2.删除索引 http://localhost:9200/test 3.查看索引 http://localhost:9200/_all 4.向索引中新增数据 http://localhost:9200/person/_doc/ 5.搜索数据 http://localhost:9200/person/_doc/_sear…

全志Tina Linux SPINAND UBI 离线烧录 开发指南 支持百问网T113 D1-H哪吒 DongshanPI-D1s V853-Pro等开发板

1 概述 编写目的: 介绍Sunxi SPINand 烧写时的数据布局 2 名词解释 词义UBIunsorted block imagePEBphysical erase blockLEBlogical erase block PEB 和logical block 关系 1 PEB 1 logical block 1 logical block 2 physical blocks3 总体数据布局 ubi 方案…

必读,一文普及MES系统知识

MES系统的基本概念制造执行系统(MES)是一套面向制造企业的信息管理系统。MES系统可以为企业提供管理模块,包括制造数据管理、计划与调度管理、生产调度管理、库存管理、质量管理、生产过程控制、底层数据集成分析、上层数据集成与分解&#x…

LVGL学习笔记5 - Display, Screen和Layer

目录 1. Display 2. Screen 2.1 创建Screen 2.2 加载Screen 2.3 获取活动的Screen 2.4 实例 2.4.1 定义2个Screen全局变量 2.4.2 初始化Screen 2.4.3 循环更替 3. Layer图层 3.1 切换顺序 3.2 顶层和系统层 3.3 实例 3.3.1 创建全局变量 3.3.2 初始化 3.3.3 …

【OpenFOAM】-olaFlow-算例5- oppositeSolitariesFlume

算例路径: olaFlow\tutorials\oppositeSolitariesFlume 算例描述: 两列反向的孤立波相互作用 学习目标: 熟练掌握olaFlow的造波设置,波浪方向与消波方向设置 算例快照: 图1 两列反向孤立波相互作用文件结构&#xff1…

Linux Kernel 远程代码执行漏洞(CVE-2022-47939)

Linux Kernel 远程代码执行漏洞(CVE-2022-47939) CVE-2022-47939 据Security Affairs消息,近期披露的一个严重 Linux 内核漏洞会影响 SMB 服务器,可能导致远程代码执行。 Linux Kernel SMB2_TREE_DISCONNECT 命令处理中存在远程…

C进阶:征服指针之指针与数组强化笔试题练习(2)

🐲🦖 本篇文章是接上篇文章的,上篇文章链接:http://t.csdn.cn/RogqL 目录 🐇🐱一.关于 strlen 函数与数组、指针的综合笔试题 😸T1. 🐆 T2. 🐅T3. 🐲一.关…

网络和通信安全中的SSL/TLS国密改造

2021年3月,国家市场监管总局、国家标准化管理委员会发布公告,国家密码应用与安全性评估的关键标准GB/T39786—2021《信息安全技术信息系统密码应用基本要求》(以下简称“GB/T39786”)正式发布,GB/T 39786是贯彻落实《中…

CSS浮动与CSS定位装饰 Day3

结构伪类选择器 结构伪类 公式 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>伪类</title><style>li:nth-child(4n){background-color: red;}</style> </head> <body>…

【记录贴】项目经理的进阶日常:靠年终总结获得了核心项目的机会

进入项目经理这个岗位已经三年了&#xff0c;之前决定转行做项目经理是因为它涉及的知识面广&#xff0c;对个人的成长非常有帮助&#xff1b;也期望未来能积累一些大型且复杂的项目经验、获得更好的升职空间。但现在做了这么久&#xff0c;好像遇到了职业瓶颈&#xff0c;仿佛…

ESP8266 WIFI模块的使用

ESP8266 wiFI 可以用作连接周边的无线设备&#xff0c;也可以作为发送器供其他设备连接通常在产品中&#xff0c;我们经常用作无线的接收使用&#xff0c;也可能会用作在线升级使用等。 说点题外话&#xff1a;虽然在线升级已经较为成熟&#xff0c;但我不推荐在一些重要的产品…

Shell程序编写猜数字的小游戏

文章目录 目录 文章目录 前言 一、设计思路 二、代码编写 三、效果图 总结 前言 在学习Linux课程中学习了一点简单的shell语法&#xff0c;实现了一个猜数字功能的程序。感兴趣的可以看完后自己手动编写玩玩~这个小游戏的编写也是把基础的shell语法基本上都用到了&#…

在QQ音乐巅峰榜年度榜单中,听懂国人2022年的音乐记忆!

拐左为夏&#xff0c;拐右为冬&#xff0c;时至年末&#xff0c;各类形式的年终报告层出不穷。2022年&#xff0c;是特殊的一年&#xff0c;音乐作为不可或缺的一部分&#xff0c;也陪伴广大用户走过了日日夜夜。一首首歌曲拨动着心弦、同时也承载着回忆&#xff0c;熟悉的旋律…

【软件测试】不好,事搞大了,APP测试的血泪教训总结......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 听说&#xff0c;人…

C语言链表-记录学生信息

题目要求&#xff1a; 创建一个 单向链表 来记录学生信息&#xff0c;人数3–5人&#xff1b;链表结点为结构变量&#xff0c;结构的要求如下&#xff1a; struct stu_info{char stu_num[10]; //学号char stu_name[8]; //姓名char stu_sex[2]; //性别int stu_score /…