kafka晋升之路-理论+场景

news2024/12/28 8:06:18

kafka晋升之路

  • 一:故事背景
  • 二:核心概念
    • 2.1 系统架构
    • 2.2 生产者(Producer)
      • 2.2.1 生产者分区
      • 2.2.2 生产者分区策略
    • 2.3 经纪人(Broker)
      • 2.3.1 主题(Topic)
      • 2.3.2 分区(Partition)
      • 2.3.2 消息
    • 2.4 消费者(Consumer)
      • 2.4.1 消费者组(ConsumerGroup)
      • 2.4.2 协调者(Coordinator)
      • 2.4.3 消费者策略
      • 2.4.4 重平衡 Rebalance
  • 三:进阶概念
    • 3.1 消息幂等
    • 3.2 事务
    • 3.3 拦截器
    • 3.4 控制器(Controller)
    • 3.4 日志存储
    • 3.5 常用参数配置
      • 3.5.1 broker.id
      • 3.5.2 port
      • 3.5.3 zookeeper.connect
      • 3.5.4 log.dirs
      • 3.5.5 auto.create.topics.enable
      • 3.5.6 num.partitions
      • 3.5.7 default.replication.factor
      • 3.5.8 log.retention.ms
      • 3.5.9 message.max.bytes
  • 四:常见问题
    • 4.1 消息丢失问题
      • 4.1.1 生产者消息丢失
      • 4.1.2 消费者消息丢失
    • 4.2 重复消费问题
    • 4.3 消息顺序问题
  • 五:总结提升

一:故事背景

一直在使用消息队列中间件,今天进行梳理其对应的知识。本篇文章将会带你系统梳理常用消息中间件kafka。主要侧重于核心知识、应用场景、常见问题。希望读者能够通过本篇文章系统了解,应用kafka。

二:核心概念

Kafka是一种分布式的,基于发布/订阅的消息系统。消息系统的作用,我们就不在这里讲了,大家可以网上自行查阅。接下来我们主要讲一讲kafka的系统架构,整个架构的各个角色。

2.1 系统架构

通常情况下,一个kafka体系架构包括 「多个Producer」、「多个Consumer」、「多个broker」以及「一个Zookeeper集群」。
一张图胜过千言万语,首先让我们开看一下,kafka的基本的架构。
在这里插入图片描述

这里我们列出了kafka的基本架构,整张图宏观看下来分为了四大部分,Zookeeper是用来做集群管理、元数据以及控制器选择的,抛出它以外剩下的三部分分别是:

  • 生产者
  • 队列
  • 消费者
    这三部分其实就是kafka的核心逻辑,生产、暂存、消费。
    接下来我会围绕这张基本架构的图,详细展开讲解kafka架构的各个部分。

2.2 生产者(Producer)

生产者,负责将消息发送到kafka中。生产者是整个流程的起始位置,如果没有生产者,就没有接下来的流程。针对生产者我们只需要考虑以下两个问题。

  1. 谁来生产
  2. 生产出来放在那

谁来进行消息的生产呢?在kafka中没有对谁来进行生产进行限制,也就是说我不用关注这条消息是谁生产的,我们只需要关注,生产出来的数据应该放在哪里,这就引入了生产者分区的概念

2.2.1 生产者分区

Kafka的消息组织方式实际上是三级结构:主题-分区-消息。主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
在逻辑上一个Topic就是一个队列、但是实际上一个Topic内分了多个区,每个区都是一个队列。生产者生产的数据也都是放到分区内的。
在这里插入图片描述
实际上的消息是放到分区的某个块上的,读写操作是针对分区的粒度上进行的。这样每个节点下每个分区都能独立的处理各自的读写请求、增加了系统的效率。

2.2.2 生产者分区策略

分区策略指的就是生产者生产出来的消息要放到具体的topic的哪一个分区下的策略,主要有以下三种:

  1. 轮询策略(Round-robin)
    顺序分配,例如一个topic下有3个分区,第一条消息发送到分区0,第二条发送到1,第三条发送到2。第四条又会重新开始。是kafka默认的分区策略
  2. 随机策略
    将消息放到任意一个分区上,一般是先算出topic的总分区数,返回一个小于它的随机数即可,但是可能会造成数据分布不均匀的情况。
  3. 按消息键保存策略
    Kafka允许为每条消息定义消息键,简称为Key。一旦消息被定义了Key,那么你就可以保证同一个Key的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
    如果没有指定key的话,会走默认的轮询策略。

2.3 经纪人(Broker)

Kafka服务节点,一个或多个Broker组成了一个Kafka集群。每一个broker都是一个单独提供服务的节点,每个broker内可以放置多个topic。

2.3.1 主题(Topic)

Topic是Producer和Consumer订阅的对象,可以为每个业务、每个应用、每类数据创造相应的Topic。我们可以把一个topic看成一个逻辑上的队列,Produce生产消息放入队列,Consumer从队列取出消息进行消费。
在这里插入图片描述

2.3.2 分区(Partition)

kafka支持分区机制,所谓分区指的就是一个Topic内可以有多个分区,虽然这些消息同属一个Topic,但是却有不同的分区,并且只会存在一份。通过分区设置,等于一个Topic内拥有了多个队列,这些Partition可以进行单独支持接收消息,取消息。

2.3.2 消息

在 Kafka 中,“消息”(Message)是指一段数据,它可以是任何形式的信息,例如文本、图像、日志记录等。消息一般包含以下几个关键部分:

  • 主题(Topic): 主题是消息的分类标签。消息会被发布到一个特定的主题中,而消费者可以订阅这些主题来接收消息。主题可以看作是消息的逻辑容器,帮助对消息进行组织和分区。

  • 消息键(Message Key): 每条消息可以有一个可选的键,它用于在发布消息时指定分区。如果消息键被提供,Kafka 会使用哈希算法将所有具有相同键的消息分配到同一个分区中,以确保具有相同键的消息始终位于同一个分区内。

  • 消息值(Message Value): 消息值是实际的数据内容,它可以是任何字节序列,通常是文本或二进制数据。

  • 时间戳(Timestamp): 每条消息可以有一个时间戳,表示消息的创建时间。时间戳可以用于数据处理和分析,以及确保消息的顺序性。

  • 分区(Partition): 主题可以被分成多个分区,每个分区是一个有序、不可变的消息序列。分区可以帮助实现消息的水平扩展,使 Kafka 集群能够处理大量消息。

  • 偏移量(Offset): 每条消息在其所属分区内都有一个唯一的偏移量,用于标识消息在分区中的位置。消费者可以根据偏移量来跟踪已经处理过的消息。

2.4 消费者(Consumer)

消费者Consumer是一种应用程序,它订阅一个或多个 Kafka 主题并从这些主题中拉取消息进行处理
消费者通常的工作流程如下:
在这里插入图片描述
我们可以根据需求,创建多个消费者实例进行并行消费,以实现并行处理。
偏移量(offset)指的是队列中的一个标记变量,其记录了某个消费者组消费到了什么位置,每个消费者组都在其订阅的主题的分区内有对应的偏移量。

2.4.1 消费者组(ConsumerGroup)

  • Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制。一个组内可以有多个消费者实例,他们共享同一个公共的GroupID。
  • 组内的所有消费者一起消费组订阅的主题内的所有分区。每个分区只能由消费者组内一个Consumer实例进行消费。
  • Consumer Group之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。这就是上文所讲每个消费者组都在其订阅的主题的分区内有对应的偏移量

2.4.2 协调者(Coordinator)

  • 协调者,它专门为Consumer Group服务,负责为Group执行Rebalance以及提供位移管理和组成员管理等。
  • Consumer端应用程序在提交位移时,其实是向Coordinator所在的Broker提交位移,同样地,当Consumer应用启动时,也是向Coordinator所在的Broker发送各种请求,然后由Coordinator负责执行消费者组的注册、成员管理记录等元数据管理操作。
  • 所有Broker都有各自的Coordinator组件。

2.4.3 消费者策略

上文我们说到,一个消费者组内有多个消费者实例,这些实例一起消费订阅的主题内的所有分区。那么这些分区应该怎么分呢?一个实例具体应该去消费那个或那几个分区呢?针对这个问题,kafka有3种对应的消费者策略。

  1. Round
    默认的轮循方式、决定消费者分区。假设有7个分区,分区将会如下分配:
    在这里插入图片描述

  2. Range
    对一个消费者组来说决定消费方式是以分区总数除以消费者总数来决定,一般如果不能整除,往往是从头开始将剩余的分区分配开。
    在这里插入图片描述
    一个分区放两个,剩下的轮循。

  3. Sticky
    它是在Range上的一种升华,且前面两个当同组内有新的消费者加入或者旧的消费者退出的时候,会从新开始决定消费者消费方式,但是Sticky,在同组中有新的新的消费者加入或者旧的消费者退出时,不会直接开始新的Range分配,而是保留现有消费者原来的消费策略,将退出的消费者所消费的分区平均分配给现有消费者,新增消费者同理,同其他现存消费者的消费策略中分离。

2.4.4 重平衡 Rebalance

上文说了消费者的分配策略,但是我们的消费者往往都不是静态的,会有消费者上线、下线的情况。这时候就引入了Rebalance机制,Rebalance触发的条件有3个分别是:

  1. 组成员数发生变更。比如有新的Consumer实例加入组或者离开组,或是有Consumer实例崩溃被踢出组。
  2. 订阅主题数发生变更。Consumer Group可以使用正则表达式的方式订阅主题,比如consumer.subscribe(Pattern.compile(“t.*c”))就表明该Group订阅所有以字母t开头、字母c结尾的主题,在Consumer Group的运行过程中,你新创建了一个满足这样条件的主题,那么该Group就会发生Rebalance。
  3. 订阅主题的分区数发生变更。Kafka当前只能允许增加一个主题的分区数,当分区数增加时,就会触发订阅该主题的所有Group开启Rebalance。

三:进阶概念

3.1 消息幂等

在Kafka中,Producer默认不是幂等性的,但我们可以创建幂等性Producer。
设置方法:

props.put(“enable.idempotence”, ture);
或
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIGtrue);

通过设置幂等,kafka会自动对你的消息进行去重,确保重复的消息只会有一条。
一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。它只能实现单会话上的幂等性,不能实现跨会话的幂等性

3.2 事务

事务型Producer能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。
设置方式:
和幂等性Producer一样,开启enable.idempotence = true。
设置Producer端参数transactional. id,最好为其设置一个有意义的名字。

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}

3.3 拦截器

拦截器一共有两类四种,分别是:

  1. onSend:该方法会在消息发送之前被调用。
  2. onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用。
  3. onConsume:该方法在消息返回给Consumer程序之前调用。
  4. onCommit:Consumer在提交位移之后调用该方法。

3.4 控制器(Controller)

在kafka中,控制器会由某个Broker节点担任,每个Broker启动时,都会尝试成为控制器,但是只有第一个会成为控制器,当运行中的控制器宕机或中止的时候,kafka会进行Failover(故障转移),选择新的控制器。
控制器主要有以下几个职责:

  • 主题管理(创建、删除、增加分区)
  • 分区重分配
  • Preferred领导者选举
  • 集群成员管理(新增Broker、Broker主动关闭、Broker宕机)
  • 数据服务

3.4 日志存储

Kafka中的消息是以主题为基本单位进行归类的,每个主题在逻辑上相互独立。
每个主题又可以分为一个或多个分区,在不考虑副本的情况下,一个分区会对应一个日志。
在kafka中引入的日志分段的概念,默认最大是1G,超过1G,日志文件就会分出一个新的段。

3.5 常用参数配置

3.5.1 broker.id

每个 kafka broker 都有一个唯一的标识来表示
这个唯一的标识符即是 broker.id,它的默认值是 0
这个值在 kafka 集群中必须是唯一的,可以任意设定

3.5.2 port

默认为9092端口,可以修改

3.5.3 zookeeper.connect

用来指定zookeeper的连接,其中有如下几个指定参数

  • hostname 是 Zookeeper 服务器的机器名或者 ip 地址。
  • port 是 Zookeeper 客户端的端口号
  • /path 是可选择的 Zookeeper 路径,Kafka 路径是使用了 chroot 环境,如果不指定默认使用跟路径。

3.5.4 log.dirs

Kafka 把所有的消息都保存到磁盘上,存放这些日志片段的目录是通过 log.dirs 来指定的

3.5.5 auto.create.topics.enable

自动创建主题、默认情况下,kafka自动创建主题

3.5.6 num.partitions

num.partitions 参数指定了新创建的主题需要包含多少个分区,该参数的默认值是 1。

3.5.7 default.replication.factor

kafka保存消息的副本数。

3.5.8 log.retention.ms

决定数据可以保留多久,默认是 168 个小时,也就是一周

3.5.9 message.max.bytes

限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。

四:常见问题

4.1 消息丢失问题

4.1.1 生产者消息丢失

问题描述:
Kafka Producer是异步发送消息的,也就是说如果你调用producer.send(msg)这个API,那么它通常会立即返回。实际上消息并没有发送成功,消息有可能因为网络抖动、消息过大、等原因没有实际发送成功。
解决方式:
**不要使用producer.send(msg),而要使用producer.send(msg, callback)。**通过回调确认消息是否真正发送成功

4.1.2 消费者消息丢失

问题描述:
Consumer端丢失数据主要体现在Consumer端要消费的消息不见了。Consumer程序从Kafka获取到消息后开启了多个线程异步处理消息,而Consumer程序自动地向前更新位移。假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于Consumer而言实际上是丢失了。
解决方案:
这里问题的关键存在于kafka的自动提交。如果是多线程异步处理消费消息,Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移线程执行失败了,不提交位移,将这组消息视为失败。
但这里有另外一个问题,**可能一组50条的数据,已经成功处理了20条,但是我们又无法去提交成功了20条,位移20条的信息。**我们可以在每个消费者的处理逻辑中,根据业务去验证拿到的消费是否已经消费过,避免重复消费。

4.2 重复消费问题

问题描述:
上文已经提到一种重复消费问题,还有另外一种场景即:
消费者消费时间过长
max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起离开组的请求,Coordinator 也会开启新一轮 Rebalance。
解决方案:

  1. 提高消费能力,提高单条消息的处理速度;根据实际场景可讲max.poll.interval.ms值设置大一点,避免不必要的rebalance;可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。
  2. 生成消息时,可加入唯一标识符如消息id,在消费端,保存最近的1000条消息id存入到redis或mysql中,消费的消息时通过前置去重。

4.3 消息顺序问题

问题描述
由于kafka的设计中一个Topic可以包含多个Partition,每个parttition内部是有序的,kafka只能保证partition内部有序,所以业务设计到需要有序的情况时,需要在parttition层面上进行控制,已达到有序。
解决方案

  1. 可以设置topic,有且只有一个partition
  2. 根据业务需要,需要顺序的 指定为同一个partition
  3. 根据业务需要,比如同一个订单,使用同一个key,可以保证分配到同一个partition上

上述只能从生产的过程中保证生产消息是有序的,但是如果消费者是通过多线程进行消息消费的,任然可能出现顺序不符问题。如果这种情况我们可以在消费者内部,使用队列维护从kafka获得的消息,然后通过锁的方式,控制消息的取出。

五:总结提升

本文讲解了kafka的基本概念、常见问题、通过此篇文章,相信你对kafka已经有了一定的了解,赶紧实验起来吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/891943.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

人工智能与云计算实训室建设方案

一、 人工智能与云计算系统概述 人工智能(Artificial Intelligence,简称AI)是一种模拟人类智能的科学和工程,通过使用计算机系统来模拟、扩展和增强人类的智能能力。人工智能涉及多个领域,包括机器学习、深度学习、自然…

k8s简介及虚拟机快速搭建k8s集群

文章目录 1、k8s简介1.1、部署方式的变迁1.2、定义1.3、Kubernetes提供的功能 2、虚拟机快速搭建k8s集群2.1、虚拟机配置(centos7 2G内存2个处理器)2.2、基础环境准备2.3、docker安装(易踩坑)2.4、安装k8s组件2.5、master节点部署…

英飞凌在车辆信息安全方面上应用

如今,网络安全在多个层面影响着我们每个人。我们的专业工作、个人生活,甚至我们的汽车,都依赖于复杂软件上运行的连接和技术。随着信息技术日益融入我们的日常生活,我们对后续信息系统的依赖性也与日俱增。反过来,这些…

Win10+anaconda+CUDA+pytorch+vscode配置

Win10anacondaCUDApytorchvscode配置 1.安装anaconda2.安装CUDA确认CUDA版本确认CUDA和pytorch版本安装CUDA 3.安装cudnn4.安装Pytorch5.vscode配置安装VScodevscode配置pytorch环境 1.安装anaconda 官网https://www.anaconda.com 下载安装,路径全英文然后记得有一…

【微服务】springboot 整合mysql实现版本管理通用解决方案

目录 一、前言 1.1 单独执行初始化sql 1.2 程序自动执行 二、数据库版本升级管理问题 三、spring 框架sql自动管理机制 3.1 jdbcTemplate 方式 3.1.1 创建数据库 3.1.2 创建 springboot 工程 3.1.3 初始化sql脚本 3.1.4 核心配置类 3.1.5 执行sql初始化 3.2 配置文…

numpy基础知识

文章目录 安装numpynumpy的ndarray对象ndarray 和 list 效率比较创建一/二维数组ndarray的常用属性调整数组形状ndarray转list numpy的数据类型数组的运算数组和数的计算数组和数组的计算 数组的轴数组的索引和切片数组的与或非和三目运算符numpy的插入、删除、去重插入删除去重…

财务数据分析用什么软件好?财务数据分析的几个重要数据是什么?

财务的数据分析也分很多种的,就拿最粗略的划分来说,也可以分为3大领域—— 财务数据处理类工具财务数据挖掘类工具财务数据可视化工具 01 数据处理类 在财务数据处理这一块儿,不用说,当然是以excel为主力的数据处理类工具—— …

22年电赛B题——具有自动泊车功能的电动车——做题记录以及经验分享

前言 这道题目也是小车类电赛题目,十月份的电赛题,由于之前积累了一些经验,这道题目在做下来的感觉还行,但是我们看题目没有仔细审题,和题目要求有一些些偏差,但是基础大功能还是做出来辽,大家还是可以参考…

SASS 学习笔记 II

SASS 学习笔记 II 上篇笔记,SASS 学习笔记 中包含: 配置 变量 嵌套 这里加一个扩展,嵌套中有一个 & 的用法,使用 & 可以指代当前 block 中的 selector,后面可以追加其他的选择器。如当前的 scope 是 form&a…

zotero在不同系统的安装(win/linux)

1 window系统安装 zotero 官网: https://www.zotero.org/ 官方文档 :https://www.zotero.org/support/ (官方)推荐常用的插件: https://www.zotero.org/support/plugins 入门视频推荐: Zotero 文献管理与知识整理最佳实践 点击 exe文件自…

30.Netty源码服务端启动主要流程

highlight: arduino-light 服务端启动主要流程 •创建 selector •创建 server socket channel •初始化 server socket channel •给 server socket channel 从 boss group 中选择一个 NioEventLoop •将 server socket channel 注册到选择的 NioEventLoop 的 selector •…

如何利用 ChatGPT 进行自动数据清理和预处理

推荐:使用 NSDT场景编辑器助你快速搭建可二次编辑的3D应用场景 ChatGPT 已经成为一把可用于多种应用的瑞士军刀,并且有大量的空间将 ChatGPT 集成到数据科学工作流程中。 如果您曾经在真实数据集上训练过机器学习模型,您就会知道数据清理和预…

Linux debian12解压和压缩.rar文件教程

一、Debian12安装rar命令 sudo apt install rar二、使用rar软件 1.解压文件 命令格式: rar x 文件名.rar实力测试: [rootdoudou tmp]# rar x test.rar2.压缩文件 test是一个文件夹 命令格式: rar a 文件名.rar 文件夹名实例测试&#x…

Java的AQS框架是如何支撑起整个并发库的

如何设计一个抽象队列同步器 引言AQS需要解决哪些场景下的问题互斥模式获取锁抢锁失败入队 释放锁小总结 共享模式获取共享资源释放共享资源唤醒丢失问题 小总结 混合模式获取写锁释放写锁获取读锁读锁是否应该阻塞 释放读锁小总结 栅栏模式等待递减计数 条件变量模式等待条件成…

如何将图片应用于所有的PPT页面?

问题:如何快速将图片应用到所有PPT页面? 解答:有两种方法可以解决这个问题。第一种用母板。第二种用PPT背景功能。 解决有时候汇报的时候,ppt中背景图片修改不了以及不知道如何查找,今天按照逆向过程进行操作 方法1…

Intelij IDEA 配置Tomcat解决Application Server不显示的问题

今天搭建war工程时部署项目发现,IDEA的控制台没有Application Servers,在网上查了一下,总结几个比较好的解决方法,为了方便自己和其他人以后碰到相同的问题,不再浪费时间再次寻找解决办法。 Intelij IDEA 配置Tomcat时…

PyMuPDF`库实现PDF旋转功能

本文介绍了一个简单的Python应用程序,用于将PDF文件转换为旋转90度的PDF文件。主要用于csdn网站中导出的博客pdf是横向的,看起来不是很方便,才想到用python编制一个将pdf从横向转为纵向的功能。 功能 该PDF转换工具具有以下功能&#xff1a…

pdf转word最简单方法~

pdf转word最简单方法!pdf转word最简单方法我们都知道,PDF文件是一种只读文件格式,无法按照需求对PDF文件进行更改与编辑,从而影响到了PDF文件的使用。所以,我们需要将PDF文件转换为word文档,以此来保证文件…

js 小程序限流函数 return闭包函数执行不了

问题: 调用限流 ,没走闭包的函数: checkBalanceReq() loadsh.js // 限流 const throttle (fn, context, interval) > {console.log(">>>>cmm throttle", context, interval)let canRun…

Webgl 存储限定符attribute、gl.getAttribLocation、gl.vertexAttrib3f及其同族函数和矢量版本的介绍

目录 attribute变量规范 获取attribute变量的存储位置 gl.getAttribLocation()函数的规范: 向attribute变量赋值 gl.vertexAttrib3f()的规范。 gl.vertexAttrib3f()的同族函数 示例代码…