Kafka使用规范(纯技术和实战建议)

news2024/9/29 21:31:53

概述:

1、kafka使用规范主要从,生产可靠性、和消费为轴线定义使用规范,另外Kafka建议核心业务系统不要使用(对数据可靠性要求高),因为Kafka高效性能源于批量设计思想,要充分利于Kafka高效性能,前提是要允许部分数据丢失。

2、kafka使用核心:削峰解耦、向下游并行广播通知(无可靠性保证)和分布式事务,本规范仅从削峰解耦、向下游并行广播通知论述

1、可靠性(强制)

可靠性包括Producer发送消息机制的可靠性,Kafka Server(Broker)消息持久化刷盘机制和Broker主从节点消息同步机制,Consumer消息的消费机制。

1.1、Producer发送消息的可靠性:

1.1.1、核心参数设置:

acks:用于Producer指明Broker主从节点消息同步的机制,有如下三个设置:

  1. acks=0,表示生产者在成功写入消息之前不会等待任何来自服务器的响应。说白了就是Producer只负责消息发送,不管消息是否成功到达Broker消息可靠性极低,但发送效率极高;
  2. acks=1,表示只要集群的Leader分区接收到了消息,就会向生产者发送一个成功响应的ack。说白了就是Producer只确保消息发送到了Leader消息可靠性不太高,发送效率一般
  3. acks=all,表示只有所有参与复制的节点(ISRmin.insync.replicas综合决定)全部收到消息时,生产者才会接收到来自服务器的响应ack。说白了就是Producer发送的消息会从Leader同步到Slave,具体同步多少Slave节点?可以通过min.insync.replicas指定;

min.insync.replicas:用于指明Producer发送的消息,Leader收到消息后,会同步到Slave节点的个数,该值默认是1,值越大,消息可告性越高,但发送效率极低。同时该参数控制消息至少被写入到多少个Leader才算是"真正写入",acks=all需要考虑真正写入

replica.lag.time.max.msKafka判断ISR中的FollowerLeader是否需要同步?根据是参数 replica.lag.time.max.ms (主从之间同步落后时间差),首先ISR 的全称是:In-Sync Replicas ISR是一个Follower的列表,里面存储的是能跟Leader数据同步一致的Follower,确定一个FollowerISR列表中,有3个判断条件:

  1. 根据FollowerLeader的交互时间差,如果大于某个时间差就认定这个Follower不行了,就把此FollowerISR中剔除,此时间差根据rerplica.lag.time.max.ms指定如:rerplica.lag.time.max.ms=10000,单位ms,也就是默认10s,ISR中的Follower没有向ISR发送心跳包就会被移除;
  2. 根据LeaderFollower的消息条数差值决定是否从ISR中剔除此Follower,此消息条数差值根据配置参数。如:rerplica.lag.max.messages=4000  ,即:消息条数差大于4000会被移除。该参数Kafka 0.10.0已弃用;
  3. Follower所在的Broker节点的确不可用,如:网络不可达,或直接宕机。就把此FollowerISR中剔除;

注意:剔除不是意味着不可用,Follower还是会去默默同步数据,随着Follower不断与Leader进行消息同步, Leader副本的 LEO也会逐渐后移 ,并最终追赶上Leader,此时该Follower就有资格进入ISR集合。另外从消息投递的效率和可靠性综合考虑,建议asks设置为1。如果设置为all(或-1),建议min.insync.replicasTopic分区数(Partition)的1/2或者1/3,replica.lag.time.max.ms可以使用默认10s

retries:用于指明生产者可以重发消息的次数,如果达到这个次数,最终还是失败,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms ,可以通过retry.backoff.ms 参数来配置时间间隔。

1.1.2、刷盘机制(broker节点配置):

kafka的刷盘机制是通过以下三个参数确定:

  1. log.flush.interval.ms:在刷新到磁盘之前,日志分区上消息保留在内存中的最长时间;
  2. log.flush.interval.messages:在将消息刷新到磁盘之前,日志分区上累积的消息数量;
  3. log.flush.scheduler.interval.ms:日志刷新器检查是否需要将所有日志刷新到磁盘的频率(一个Broker上可能有很Partition);

我们可以把log.flush.interval.messages值设为1,实现同步刷盘,同步刷盘对性能影响极大,而且现在Kafka统一由集团管理,应该不会随意改配置。

注:如果未设置log.flush.interval.ms,则使用log.flush.scheduler.interval.ms中的值。

1.1.3、消息生产(producer):

消息生产,指Kafka生产投递消息的方式,分为同步和异步两种方式。

1.1.3.1、同步发送:

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。同步发送效率不高数据可靠性高

1.1.3.2、异步发送:

异步发送数据可靠性不高,异步发送效率较高,不会阻塞发送工作线程,但有其它开销。因此在谈异步发送方式之前,先看看异步发送的底层原理。

Kafka的Producer发送消息采用异步发送的方式时,在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator【记录累计器,充当一个队列】。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。

 

相关参数:

  • batch.size: 只有数据积累到batch.size之后,sender才会发送数据,batch.size以字节为单位;
  • linger.ms:如果数据迟迟未到达batch.sizesender等待linger.ms 之后就会发送数据;

1.1.3.2.1、异步发送不带回调:

异步发送不带回调,指发送了就不管了,直接返回后续不再捕获发送结果。

1.1.3.2.2、异步发送带回调:

异步发送带回调,指发送了,可以设置一个回调函数捕获发送执行结果,编码可以根据发送执行结果(success/fail)做补偿。

注:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

1.1.4、消息消费(consumer):

消息消费包话消费方式,和消息消费提交方式

1.1.4.1、消费方式:

消费方式包括消息拉取方式点对点消费广播消费

1.1.4.1.1:消息拉取方式:

Kafka目前已发布的版本仅支持,pull方式获取消息。

1.1.4.1.2:点对点消费:

Kafka其实不支持点对点对消费,它是以消费组的发布订阅模式消费,即:消费组消费模式是点对点

注:关于消费组的个数,与Topic分区数的关系,具体一点来说是主分区数

消费组由多个consumer组成,每一个消费组,只能有一个消费者消费同一topic下的的主分区,复制分区在Kafka里,只做备份数据的功能,只有当主挂了,选举成主时,才提供消费服务。

  1. 同一组中当消费者数大于分区数时,多余的消费者不会接收消息,但可以作为备用消费者,当处理的消费者挂掉后,备用消费者可以继续进行处理;
  2. 同一组中当消费者数小于分区数时,一个消费者将会消费多个主分区,此时Kafka会尽量负载消费;
  3. 对于消费者来说,在每个分区上实际上是单线程消费;

1.1.4.1.3:广播消费:

Kafka不支持广播消费,若要实现,消费端可以用动态生成消费组实现

注:动态生成消费组,很多Kafka生产环境是禁止的,主要以下三点不足:

  1. 消费组每次动态生成,不好管理维护;
  2. Kafka后端要维护消费组消费的Offset,但重启后又无意义,记而不用(因为每次重启应用都会生成新的消费组);
  3. Kafka要明配置 auto.offset.reset,配置为 earliest 会有重复消费的可能,需要实现消费逻辑幂等,配置为 latest 会有漏消费的可能;

auto.offset.reset有以下三个可选值:

  1. latest(默认):对于同一个消费者组,若没有提交过offset,则只消费消费者连接topic后,新产生的数据;
  2. earliest:对于同一个消费者组,若没有提交过offset,则从头开始消费;
  3. none:对于同一个消费者组,若没有提交过offset,会抛异常直接抛出异常;

其实可以为后台应用硬编码死不同的消费组,但这样一来应用扩展性和维护性就降低了。

1.1.4.1、消费提交方式:

消费提交方式指,消息被消费者Pull以后,是手动提交,还是自动提交,可以通过如下两个参数配置:

enable.auto.commit:是否开启自动提交offset功能;
auto.commit.interval.ms:自动提交offset的时间间隔;

1.1.4.1.1、自动提交:

自动提交对于编码来说是不可控的,如果消费者在执行消费业务逻辑时,出现异常时,是不能回滚的,直接后果就是消息丢失。如果要使用此种提交方式,请确认异常补救方式。

1.1.4.1.2、手动提交:

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。

  1. 两者的相同点是:都会将本次pull的一批数据最高的偏移量(offset)提交(可以批量消费);
  2. 两者的不同点是:commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),而commitAsync则没有失败重试机制,故有可能提交失败;

2、缓冲区和消息体大小限制(非强制):

缓冲区和消息体大小限制,主要由:max.request.sizebuffer.memorybatch.size、linger.ms、message.max.bytes、max.message.bytes、fetch.max.bytes指定。

2.1、Producer:

生产端缓冲区和消息体大小的配置。

2.1.1、max.request.size:

  1. 限制单条消息大小(以字节为单位),即每条消息最大允许的大小;
  2. 限制发送请求大小(以字节为单位),即每次发送到Broker最大允许的大小;

注:max.request.size,建议不超过1024*2 Kb,超过2Kb开启压缩机制。

2.1.2、buffer.memory:

buffer.memory的本质就是用来约束Producer能够使用的内存缓冲区的大小的,内存缓冲区的作用就是预分配内存,且在使用上不会被GC回收。

2.1.3、batch.size:

通过这个参数来设置批量发送的数据大小,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息)。

2.1.4、linger.ms:

这个是设置消息发送延迟,这样可以收集更多的消息后批量发送(发往同一分区的消息)。

注:当 batch.size 和 linger.ms 同时设置的时候,只要两个条件中满足一个就会发送。比如:说batch.size设置16kb,linger.ms设置50ms,那么当消息积压达到16kb就会发送,如果没有到达16kb,那么在第一个消息到来之后的50ms之后消息将会发送。

2.1.5、batch.size <  buffer.memory :

二者大小的限制最好: batch.size <  buffer.memory,如果:发送的真实消息体大小(以字节为单位)> batch.size,可能会导致频繁GC。如果:batch.size >  buffer.memory,可能会导致消息发不出去。

2.2、Broker(服务端):

Broker配置的参数,开发人员不能控制修改,建议使用前向运维人员问清楚。

2.2.1、message.max.bytes:

这个参数决定了  Broker 能够接收到的最大消息的大小,限制Broker上的所有Topic,如果:max.request.size > message.max.bytes,可能会导致消息发送异常

2.2.2、max.message.bytes:

这个参数决定了  Broker 能够接收到的最大消息的大小,它只针对某个主题生效,可动态配置,可覆盖全局的 message.max.bytes。如果:max.request.size > max.message.bytes,可能会导致消息发送异常

2.3、Consumer(消费端):

消费端消息体的大小,主要指拉取消息的大小。

2.3.1、fetch.max.bytes:

fetch.max.bytes 这个参数决定消费者单次从 Broker 获取消息的最大字节数。如果:fetch.max.bytes < max.request.size,可能会导致消费者消费不了消息。

3、常见建议操作(非强制)

常见建议操作,包括消息生产溯源,消息积压告警阈值设置,消息集压处理策略。

3.1、消息生产溯源:

消息生产溯源,指生产者向下游生产投递消息后,防止下游消息丢失,无法找回。同时考虑消息投递的效率和降级异常补尝处理,建议Producer如下操作发送消息。

  1. 发送消息之前先落库记录,投递之前此条记录标识为未发送状态;
  2. 异步发送机制投递消息;
  3. 异步回调处理,投递结果,成功、失败、还是异常;
  4. 定时任务降级异常补尝处理未发送、发送失败,或者异常的记录;

3.2、消息集压告警阈值设置:

消息积压告警阈值设置,一种是与业务相关性不大,完全是从消息中间件特性设置的阈值。另一种是与业务相关性很大,即:上游系统投递的消息,下游系统必需在某一个时差处理,否则会影响业务。

3.2.1、业务相关性不大:

业务相关性不大,直接找运维提供一个阈值即可。

3.2.2、业务相关性很大:

业务相关性很大,阈值的设置:

  1. 明确下游系统的消费速率;
  2. 明确上下系统业务最大允许的时差;
  3. 根据1和2算出一个合理的积压告警阈值;

例如:下游系统的消费速率是1 Second,上下系统业务最大允许的时差5 Minute,则积压告警阈值是:300,考虑提前告警,可以设为280。

3.3、消息集压处理策略:

消息集压原因:

  1. Kafak中间件自身问题导致;
  2. 下游系统因代码原因,导致不能消费;
  3. 穷尽现有能力优化仍然消费不过来;

对于1和2得找出原因解决,对于3得动态横向扩展消费端扩大消费能力,分为无序消息的扩展有序消息的扩展

3.3.1、无序消息的扩展:

无序消息的扩展,直接加应用服务器即可。

3.3.2、有序消息的扩展:

有序消息的扩展:

  1. 加应用服务器;
  2. 消费端做二次分发,即:做好备用topic(做好开关控制),当阻塞时,二次分发,扩大分区分摊消费能力;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/348957.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python小游戏】智商爆棚,推荐一款益智类亲子娱乐首选—某程序员老爸:成语编成填空“游戏”,贪玩女儿1天牢记500词(厉害了我的Python)

前言 成语填空想必大家都是十分熟悉的了&#xff0c;特别是有在上小学的家长肯定都有十分深刻的印象。 在我们的认知里看图猜成语不就是一些小儿科的东西吗&#xff1f; 当然了你也别小看了成语调控小游戏&#xff0c;有的时候知识储备不够&#xff0c;你还真的不一定猜得出…

嵌入式STM32F767BGT6规格STM32F767BIT6引脚图 32Bit MCU+FPU

ARM Cortex-M7 STM32 F7 Microcontroller IC 32-Bit 216MHz 1MB (1M x 8) FLASH 208-LQFP (28x28)产品信息型号&#xff1a;STM32F767BGT6 / STM32F767BIT6类型&#xff1a;ARM微控制器 - MCU封装&#xff1a;LQFP-208明佳达电子下面是产品中文规格&#xff0c;仅供参考&#x…

云帆文档易用性功能设计之文档查阅

云帆文档管理系统是一款基于 SpringBootVue 开发的电子文档管理系统。系统集成了用户管理、角色管理、部门管理、文档管理、新闻管理、问答管理、通告管理、文档全文检索。 支持常用的 office 文档&#xff0c;视频文件、PDF 文档在线预览&#xff0c;下载&#xff0c;笔记&…

ChatGPT中文免费小程序(AI GPGT智能助手) - ChatGPT国内小程序版在线使用

ChatGPT中文网是一个面向中国用户的聊天机器人网站&#xff0c;旨在为国内用户提供一个自然的环境、有趣、实用的聊天体验。它使用最新的自然语言处理技术来帮助用户更好地理解他们的聊天对话&#xff0c;还可以帮助用户解决日常生活中的问题&#xff0c;提供有趣的谈话内容以及…

最新版EasyRecovery数据恢复软件使用测评介绍

我们在逐渐适应信息电子化的同时&#xff0c;也有一些潜在的麻烦接踵而来&#xff0c;其中较为常见的就是文件和数据的保存问题。显然&#xff0c;设备的存储空间是有限的&#xff0c;这就不可避免地会出现数据被删除、覆盖或丢失的现象&#xff0c;如果丢失的是重要数据&#…

【MyBatis】源码学习 01 - 泛型解析器 TypeParameterResolver

文章目录前言参考目录问题引入流程分析TypeParameterResolver#resolveReturnTypeTypeParameterResolver#resolveTypeTypeParameterResolver#resolveParameterizedTypeTypeParameterResolver#resolveTypeVar前言 最近结合着源码书学习 MyBatis&#xff0c;毫不夸张的说&#xf…

Kubernetes基本概念与组件

Kubernetes基本概念与组件 基本概念 Kubernetes 中的绝大部分概念都抽象成 Kubernetes 管理的一种资源对象&#xff0c;下面我们一起学习一下常遇到的一些资源对象&#xff1a; Master&#xff1a;Master 节点是 Kubernetes 集群的控制节点&#xff0c;负责整个集群的管理和控…

Camera | 4.瑞芯微平台MIPI摄像头应用程序编写

前面3篇我们讲解了camera的基础概念&#xff0c;MIPI协议&#xff0c;CSI2&#xff0c;常用命令等&#xff0c;本文带领大家入门&#xff0c;如何用c语言编写应用程序来操作摄像头。 Linux下摄像头驱动都是基于v4l2架构&#xff0c;要基于该架构编写摄像头的应用程序&#xff…

Java 反射深入浅出

Java 反射深入浅出&#x1f4c8; 反射的概述&#xff1a;&#x1f4d1; Java Reflection(反射) 被视为动态语言的关键&#xff0c;Java并不是动态语言&#xff0c;但因为反射Java可以被称为准动态语言 反射机制允许程序在执行期 借助于Reflection API取得任何类的内部信息&a…

哪款蓝牙耳机性价比最高?无线蓝牙耳机性价比排行榜

我酷爱音乐&#xff0c;也是游戏发烧友&#xff0c;平时耳机从不离身。用的耳机多了&#xff0c;在选择上也有了自己的一些心得&#xff0c;通常来说&#xff0c;音乐耳机注重音效&#xff0c;游戏耳机注重低延迟&#xff0c;当前蓝牙耳机市场琳琅满目&#xff0c;下面推荐以下…

100M网口客户电脑插上网线就断线,自己工厂正常,是什么问题导致?

Hqst&#xff08;华强盛科技&#xff09;导读&#xff1a;物联工程师100M网口产品出现客户电脑插上网线就显示断线&#xff0c;无法通信&#xff0c;在自己工厂又正常使用&#xff0c;是什么问题&#xff1f;问&#xff1a;100M 网口&#xff0c; 使用改电路&#xff0c; 产品出…

Learning C++ No.10【STL No.2】

引言&#xff1a; 北京时间&#xff1a;2023/2/14/23:18&#xff0c;放假两个月&#xff0c;没有锻炼&#xff0c;今天去跑了几圈&#xff0c;一个字&#xff0c;累&#xff0c;感觉人都要原地升天了&#xff0c;所以各位小伙伴&#xff0c;准确的说是各位卷王&#xff0c;一定…

与其被行业内卷,还不如主动出击,打破困境~

如今的 “互联行业内卷”这事好像成为了一种常态&#xff0c;尤其是在一些已处于饱和状态和即将处于饱和状态的行业比较突出&#xff0c;比如&#xff1a;Android 开发行业、前端、……等等 造成行业内卷原因是什么&#xff1f; 市场竞争加剧&#xff0c;企业更加注重成本控制…

大数据---Hadoop集群搭建

Hadoop集群搭建 再起启动一台虚拟机并且安装jdk&#xff0c;开启免密登录 不需要安装zookeeper 文章目录Hadoop集群搭建时间同步4台机器安装npdate设置定时任务集群配置图将Hadoop安装包上传到zk1zk1---解压到soft目录下zk1---更名zk1---修改配置文件core-site.xmlhdfs-site.x…

第四届宁波网安市赛训练题

Crypto 散乱的密文 8fd4a4c94gf15{50}l72d3提示了2 1 6 5 3 4&#xff0c;我们直接横向排列 2165348fd4a4c94gf15{50}l72d3 按顺序竖着抄下来fc1l84f}a45dg034{2d957,然后栅栏解密&#xff0c;注意这里是W型栅栏解密&#xff0c;行数6 flag:flag{52048c453d794df1} 综合解密…

stm32f429FMC外设学习

功能框图 这是一种型号为 W9825G6KH 的 SDRAM 芯片内部结构框图&#xff0c;以它为模型进行学习 CLK -- FMC_SDCLK //同步时钟信号 CKE -- FMC_SDCKE[1:0] //SDCKE0&#xff1a; SDRAM 存储区域 1 时钟使能,;SDCKE1&#xff1a; SDRAM 存储区域 2 时钟使能.这里表示的…

QT(16)- QFileDevice

QT&#xff08;16&#xff09;- QFileDevice1 简介2 公有类型2.1 enum QFileDevice::FileError2.2 enum QFileDevice&#xff1a;&#xff1a;FileHandleFlag2.3 enum QFileDevice::FileTime2.4 enum QFileDevice&#xff1a;&#xff1a;MemoryMapFlags2.5 enum QFileDevice::…

【OJ】计数的梦

&#x1f4da;Description: Bessie 处于半梦半醒的状态。过了一会儿&#xff0c;她意识到她好像在数羊&#xff0c;不能入睡。Bessie的大脑反应灵敏&#xff0c;仿佛真实地看到了她数过的一个又一个数。她开始注意每一个数码&#xff1a;每一个数码在计数的过程中出现过多少次…

华为OD机试 - 箱子之形摆放(Python)| 真题+思路+考点+代码+岗位

箱子之形摆放 题目 有一批箱子(形式为字符串,设为str), 要求将这批箱子按从上到下以之字形的顺序摆放在宽度为 n 的空地,请输出箱子的摆放位置。 例如:箱子ABCDEFG,空地宽度为3,摆放结果如图: 则输出结果为: AFG BE CD 输入 输入一行字符串,通过空格分隔,前面部…

关于tensorboard --logdir=logs的报错解决办法记录

我在运行tensorboard --logdirlogs时&#xff0c;产生了如下的报错&#xff0c;找遍全网后&#xff0c;解决办法如下 先卸载 pip uninstall tensorboard再安装 pip install tensorboard最后出现如下报错 Traceback (most recent call last): File “d:\newanaconda\envs\imo…