kafka个人笔记

news2025/1/17 3:07:23

大部分内容源于https://segmentfault.com/a/1190000038173886, 本人手敲一边加强印象方便复习

消息系统的作用

解耦
冗余
扩展性
灵活性(峰值处理
可恢复
顺序保证
缓冲
异步

  • 解耦:扩展两边处理过程,只需要让他们遵守约束即可
  • 冗余:持久化数据:规避丢失风险。采用 插入-获取-删除范式明确指出消息被处理完毕
  • 扩展性:解耦处理过程,容易扩展处理过程增大消息处理频率
  • 灵活性(峰值处理:访问激增情况不常见,无需投入过多标准资源。使用消息队列顶住访问压力
  • 可恢复:系统失效时仍可保证队列消息在系统恢复后处理
  • 顺序保证:kafka保证partition内消息有序
  • 缓冲:控制和优化 数据经过系统的速度,解决生产、消费速度不一致的问题
  • 异步:允许用户把一个或若干个消息放入队列,且不立即被处理

架构

在这里插入图片描述

  1. producer,消息生产者
  2. broker:kafka集群的服务器
  3. topic:消息的类别
  4. partition:kafka分配单位,一个topic包含一个或多个partition
  5. consumer:消息消费者,终端或服务
  6. comsumer group:
    high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
  7. replica:partition副本
  8. leader:特殊的replica,producer和consumer只和leader交互
  9. follower:除了leader的replica都为follwer,复制数据
  10. controller:服务器:用于leader选举和failover
  11. zookepper,存储集群meta信息

发布消息

producer用push发布到broker,消息被append到partition,顺序写磁盘

消息路由

//构造函数
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
     if (topic == null)
          throw new IllegalArgumentException("Topic cannot be null");
     if (timestamp != null && timestamp < 0)
          throw new IllegalArgumentException("Invalid timestamp " + timestamp);
     this.topic = topic;
     this.partition = partition;
     this.key = key;
     this.value = value;
     this.timestamp = timestamp;
}


private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
     Integer partition = record.partition();
     if (partition != null) {//指定了 partition 则直接使用
          List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
          int lastPartition = partitions.size() - 1;
          if (partition < 0 || partition > lastPartition) {
               throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
          }
          return partition;
     }//否则使用 key 计算
     return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
     int numPartitions = partitions.size();
     if (keyBytes == null) {//轮询
          int nextValue = counter.getAndIncrement();
          List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
          if (availablePartitions.size() > 0) {
               int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
               return availablePartitions.get(part).partition();
          } else {
               return DefaultPartitioner.toPositive(nextValue) % numPartitions;
          }
     } else {
          //对 keyBytes 进行 hash 选出一个 patition
          return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
     }
}

  1. 指定partition直接用
  2. 未指定partition但指定了key,对key进行hash得到partition
  3. 都未指定,使用轮询

写入流程

在这里插入图片描述

  1. producer从zk的/brokers/…/stateleader
  2. producer发消息给leader
  3. leader把消息写入log
  4. follower从leader拉取消息写入log后发送ACK给leader
  5. leader收到所有replica的ACK后,增加high watermark(位置信息,即位移(offset))给producer发送ack

投递保证

    ① At most once 消息可能会丢,但绝不会重复传递

    ② At least one  消息绝不会丢,但可能会重复传递

    ③ Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户想要的

默认 at least one

接收消息的行为

  1. comsumer从broker读取消息后,可以选择commit或处理消息
    1. 如果commit
      1. zookeeper存在comsumer在partition下读取消息的offset
      2. comsumer下次读取partition从下一条开始读取
    2. 未commit
      1. 下次读取位置和上次commit后开始位置相同

at most once

读完消息先commit再处理消息。
若commit后未处理消息系统崩坏,下次重新开始工作无法读到已提交但未处理的消息

At least once

读完消息先处理再commit消费状态(保存offset)
若处理消息后未commit系统崩坏,重新工作的时候会处理未commit的消息(处理两次)

Exactly once 两阶段提交

协调offset和实际操作的输出。但由于许多输出系统不支持两阶段提交,更为通用的方式是将offset和操作输入存在同一个地方

  1. consumer拿到数据后可能把数据放到HDFS
  2. 最新的offset和数据一起写到HDFS
  3. 保证offset更新和数据输出同时完成

(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)。

消息保存

topic分为多个partition,每个partition对应一个文件夹

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据

  • 基于时间:log.retention.hours=168
  • 基于大小:log.retention.bytes=1073741824
log.cleanup.policy=delete启用删除策略
直接删除,删除后的消息不可恢复。可配置以下两个策略:

清理超过指定时间清理: 
log.retention.hours=16

超过指定大小后,删除旧的消息:
log.retention.bytes=1073741824

请添加图片描述

topic的创造

  1. controller在ZK的/brokers/topics 节点上注册 watcher
    ,topic被创建的时候,controller 会通过 watch 得到该 topic 的 partition/replica 分配
  2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
    1. 分配给partition的所有replica(称为AR)任选一个可用的broker作为leader并将AR设置为ISR
    2. 新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
  3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

请添加图片描述
删除 topic 的序列

  1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher
  2. topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配
  3. 若 delete.topic.enable=false,结束;反之controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest

kafka HA 高可用性

replica

同一个 partition 可能会有多个 replica —— erver.properties 配置中的 default.replication.factor=N

若没有replica,broker死机

  • patition 的数据都不可被消费
  • producer 也不能再将数据存于其上的 patition

引入replica,需要选取leader,leader与producer和consumer交互,其他replica与leader复制数据

分配规则

  1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
  2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
  3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

leader failover

partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader

新的 leader 必须拥有旧 leader commit 过的所有消息

zookeeper 中(/brokers/…/state)动态维护了一个 ISR(in-sync replicas)。只有 ISR 里面的成员才能选为 leader。若有f个replica,partition可以保证f-1个replica失效情况下消息不丢失

failover方案

  • 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
  • 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短
    多用第二种方式

broker failover

在这里插入图片描述

  1. controller在zookeeper的/brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
  2. controller从/brokers/ids 节点读取可用broker
  3. controller决定set_p,集合包含死机broker上所有partition
  4. 对set_p所有partition进行: 1. 读取/brokers/ids 节点读取可用broker的ISR 2. 决定新leader, 新leader ISR controller_epoch和leader_epoch信息写入state结点
  5. 通过RPC给broker发送 leaderAndISRRequest 命令

controller failover

controller 宕机时会触发 controller failover

  1. broker在zookeeper的controller节点注册watcher
  2. controller宕机时,zookeeper临时节点消失
  3. 所有存活broker收到fire通知
  4. 每个broker尝试创建新的controller path,其中一个竞选成功为controller
  5. 当选成功触发KafkaController.onControllerFailover
1. 读取并增加 Controller Epoch。
2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
8. 启动 replicaStateMachine 和 partitionStateMachine。
9. 将 brokerState 状态设置为 RunningAsController。
10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

消费

kafka 提供了两套 consumer API:

The high-level Consumer API
The SimpleConsumer API

consumer API

high-level提供kafka消费数据的抽象

  1. 提供了 consumer group 的语义
  2. 消息只能被group内一个consumer消费
  3. 消费的时候不关注offset
  4. 最后一个offset由zookeeper保存

使用high-level consumer API可以是多线程应用

if(消费线程 > partition){
	部分线程收不到消息
}
if(消费线程 < partition){
	有些线程收到多个partition消息
}

if(一个线程消费多个 patition){
	无法保证收到消息的顺序
}

** SimpleConsumer API**

适用以下情况

  • 多次读取一个消息
  • 只消费一个 patition 中的部分消息
  • 使用事务来保证一个消息仅被消费一次

partition, offset, broker, leader不透明,需要自己管理

  • 追踪offset确定下一条消费的信息
  • 找出每个partition的follower
  • 处理leader变更

流程如下

  1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader
  2. 找到partition的follower
  3. 定义好请求,该请求应该能描述应用程序需要哪些数据
  4. fetch数据
  5. 识别leader变化并做出响应

consumer group
kafka分配单位是partition,consumer属于一个group
一个partition被一个group内的一个consumer消费(但是多个group可以同时消费这个partition)

实现离线处理与实时处理

  • spark 实时处理
  • hadoop 离线处理

消费方法

consumer用pull模式从broker读数据

push 模式很难适应消费速率不同的消费者

  • 消息发送速率是由 broker 决定的
  • 尽可能以最快速度传递消息
  • 容易造成 consumer 来不及处理消息(拒绝服务、网络拥塞

pull模式,consumer根据自己的能力消费信息

pull的优点

  • 简化broker设计
  • consumer自主控制消费速率
  • consumer自主控制消费方式 —— 批量/逐条
  • 选择不同提交方式

消费者递送保证

consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit(Exactly once

实际使用过程中,并不是consumer读完消息就结束了,还需要进一步处理。
处理和commit顺序决定了 consumer delivery guarantee

  • 先commit,后处理消息(At most once
    • consumer 在 commit 后还没来得及处理消息就 crash
    • 重新开始工作后就无法读到刚刚已提交而未处理的消息
  • 先处理再commit( At least once
    • 处理完消息之后 commit 之前 consumer crash
    • 恢复工作:处理刚刚未 commit 的消息
  • 两阶段提交
    (offset 和操作输入存在同一个地方,会更简洁和通用)
    (若不支持,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once) —— high-level API里面offset存于zookeeper中,无法存于HDFS,simple可以存于HDFS

consumer rebalance

触发机制

  • consumer加入退出
  • partition改变(broker 加入退出

算法如下

  1. 目标topic的partition排序,存于PT
  2. 选择consumer group下所有consumer排序, 存于CG
  3. N = ⌈ s i z e ( P T ) / s i z e ( C G ) ⌉ N = \lceil size(PT)/size(CG)\rceil N=size(PT)/size(CG)⌉
  4. 对group内原本的分配partition解除关系
  5. 然后每N个partition分配给一个consumer

consumer调整了单个partition后,为了保证一致性,group内其他consumer也应触发balance

导致以下问题

herd effect

  • broker,comsumer增减触发rebalance

split brain

  • 每个consumer单独通过zk判断broker和consumer宕机,不同的consumer同时从zookeeper看到的view可能不一致 —— 导致不正确的rebalance
  • 所有consumer不知道其他consumer的rebalance是否成功,导致kafka工作状态不正确
  • 因此0.9开始使用中心coordinator空值rebalance,计划在consumer客户端分配方案

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1215894.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ubuntu 20通过docker安装onlyoffice,并配置https访问

目录 一、安装docker &#xff08;一&#xff09;更新包列表和安装依赖项 &#xff08;二&#xff09;添加Docker的官方GPG密钥 &#xff08;三&#xff09;添加Docker存储库 &#xff08;四&#xff09;安装Docker &#xff08;五&#xff09;启动Docker服务并设置它随系…

MySQL覆盖索引的含义

覆盖索引&#xff1a;SQL只需要通过索引就可以返回查询所需要的数据&#xff0c;而不必通过二级索引查到主键之后再去查询数据&#xff0c;因为查询主键索引的 B 树的成本会比查询二级索引的 B 的成本大。 也就是说我select的列就是我的索引列&#xff08;或者主键&#xff0c;…

整理笔记——MOS管、三极管、IGBT

一、MOS管 在实际生活要控制点亮一个灯&#xff0c;例如家里的照明能&#xff0c;灯和电源之间就需要一个开关需要人为的打开和关闭。 再设计电路板时&#xff0c;如果要使用MCU来控制一个灯的开关&#xff0c;通常会用mos管或是三极管来做这个开关元件。这样就可以通过MCU的信…

填充每个节点的下一个右侧节点指针

题目链接 填充每个节点的下一个右侧节点指针 题目描述 注意点 给定一个 完美二叉树 解答思路 广度优先遍历一层层的遍历二叉树&#xff0c;将每一层节点的next指针都指向右侧节点 代码 class Solution {public Node connect(Node root) {if (root null) {return null;}…

【YOLOX简述】

YOLOX的简述 一、 原因1. 背景2. 概念 二、 算法介绍2.1 YOLOX算法结构图&#xff1a;2.2 算法独特点2.3 Focus网络结构2.4 FPN&#xff0c;PAN2.5 BaseConv2.6 SPP2.7 CSPDarknet2.8 YOlO Head 三、预测曲线3.1 曲线 一、 原因 1. 背景 工业的缺陷检测是计算机视觉中不可缺少…

2022年第八届美亚杯个人赛复盘

以学生的身份最后一次打美亚杯了还是要记录一下的写个wp告别哈哈。 1.[单选题] 王晓琳在这本电子书籍里最后对哪段文字加入了重点标示效果(Highlight)?(2分) A. 卿有何妙计 B. 宝玉已是三杯过去了 C. 武松那日早饭罢 D. 就除他做个强马温罢 2.[多选题] 王晓的手机里有一个 …

c#之反射详解

总目录 文章目录 总目录一、反射是什么&#xff1f;1、C#编译运行过程2、反射与元数据3、反射的优缺点 二、反射的使用1、反射相关的类和命名空间1、System.Type类的应用2、System.Activator类的应用3、System.Reflection.Assembly类的应用4、System.Reflection.Module类的应用…

销售管道管理软件推荐:提升销售业绩与效率

在企业中销售部门扮演着锐意进取的尖刀部队的角色&#xff0c;肩负着拓展公司发展领土的重要责任。销售管理是一个漫长而复杂的过程&#xff0c;需要经历潜在的商机、联系跟进、签订合同以及赢得订单等关键里程碑&#xff0c;无论是面向C端用户的销售还是面向企业复杂产品的销售…

TSINGSEE青犀AI智能分析+视频监控工业园区周界安全防范方案

一、背景需求分析 在工业产业园、化工园或生产制造园区中&#xff0c;周界防范意义重大&#xff0c;对园区的安全起到重要的作用。常规的安防方式是采用人员巡查&#xff0c;人力投入成本大而且效率低。周界一旦被破坏或入侵&#xff0c;会影响园区人员和资产安全&#xff0c;…

前台页面从数据库中获取下拉框值

后端&#xff1a;查询所有信息 前台&#xff1a;elementUI <el-select v-model"searchData.stationName" clearable> <el-option :label"item.stationName" :value"item.stationName" v-for"item in stationNameList&quo…

根据店铺ID/店铺链接/店铺昵称获取京东店铺所有商品数据接口|京东店铺所有商品数据接口|京东API接口

要获取京东店铺的所有商品数据&#xff0c;您需要使用京东开放平台提供的API接口。以下是一些可能有用的API接口&#xff1a; 商品SKU列表接口&#xff1a;该接口可以获取指定店铺下的所有商品SKU列表&#xff0c;包括商品ID、名称、价格等信息。您可以使用该接口来获取店铺中…

SpringBoot3新特性

本篇文章参考尚硅谷springboot3课程: https://www.bilibili.com/video/BV1Es4y1q7Bf?p94&vd_sourced6deb2b69988de2ae72087817e5143d7 原版笔记: https://www.yuque.com/leifengyang/springboot3/xy9gqc2ezocvz4wn 1.自动配置包位置变化 现在指定自动配置类放在了下面这…

俄罗斯方块小游戏

框架 package 框架;import java.awt.image.BufferedImage; import java.util.Objects;/*** author xiaoZhao* date 2022/5/7* describe* 小方块类* 方法&#xff1a; 左移、右移、下落*/ public class Cell {// 行private int row;// 列private int col;private BufferedIm…

kubernetes集群编排——etcd

备份 从镜像中拷贝etcdctl二进制命令 [rootk8s1 ~]# docker run -it --rm reg.westos.org/k8s/etcd:3.5.6-0 sh 输入ctrlpq快捷键&#xff0c;把容器打入后台 获取容器id [rootk8s1 ~]# docker ps 从容器拷贝命令到本机 docker container cp c7e28b381f07:/usr/local/bin/etcdc…

python爬虫概述及简单实践:获取豆瓣电影排行榜

目录 前言 Python爬虫概述 简单实践 - 获取豆瓣电影排行榜 1. 分析目标网页 2. 获取页面内容 3. 解析页面 4. 数据存储 5. 使用代理IP 总结 前言 Python爬虫是指通过程序自动化地对互联网上的信息进行抓取和分析的一种技术。Python作为一门易于学习且强大的编程语言&…

mysql数据模型

创建数据库 命令 create database hellox &#xff1a; &#xff08; hellox名字&#xff09; sql语句 创建 数据库 命令 create database hell&#xff1b; 也是创建但是有数据库不创建 命令 create database if not exists hell ; 切换数据库 命令 use hello&…

Facebook内容的类型

随着人们日益依赖的社交媒体来进行信息获取与交流&#xff0c;Facebook作为全球最大的社交媒体平台之一&#xff0c;那么Facebook的内容都有哪些类型呢&#xff1f;下面小编来讲讲吧&#xff01; 1、实时发生的事 我们需要实时了解时事动态&#xff0c;这样可以使用户对品牌发…

android PopupWindow设置

记录一个小功能&#xff0c;使用场景&#xff0c;列表项点击弹出 如图&#xff1a; java类代码&#xff1a; public class PopupUtil extends PopupWindow {private Activity context;private View view;private ListView listView;private TextView m_tv_reminderm, m_tv_Wa…

React-Router源码分析-History库

history源码 history 在 v5 之前使用单独的包&#xff0c; v6 之后再 router 包中单独实现。 history源码 Action 路由切换的动作类型&#xff0c;包含三种类型&#xff1a; POPREPLACEPUSH Action 枚举&#xff1a; export enum Action {Pop "POP",Push &quo…

BI 数据可视化平台建设(2)—筛选器组件升级实践

作者&#xff1a;vivo 互联网大数据团队-Wang Lei 本文是vivo互联网大数据团队《BI数据可视化平台建设》系列文章第2篇 -筛选器组件。 本文主要介绍了BI数据可视化平台建设中比较核心的筛选器组件&#xff0c; 涉及组件分类、组件库开发等升级实践经验&#xff0c;通过分享一些…