深入Kafka client

news2025/1/11 19:50:19

分区分配策略

客户端可以自定义分区分配策略, 当然也需要考虑分区消费之后的offset提交, 是否有冲突。

消费者协调器和组协调器

a. 消费者的不同分区策略, 消费者之间的负载均衡(新消费者加入或者存量消费者退出), 需要broker做必要的协调。
b. Kafka按照消费组管理消费者, 鉴于offset提交最终都是在某个broker节点上完成。该broker扮演GroupCoordinator角色, 具体的选择则是通过hash快速定位。
c. client端存在一个ClientCoordinator与目标的GroupCoordinator进行通信实现最终协调;
d. 具体过程如下

ClientCoordinator Broker(Min Load) Broker(GroupCoordinator) Broker(To consumer) 1. Find_Coordinator request Find_Coordinator response 2. Join_Group request 3.1 calculate brokerId 3.2 Elect leader consumer 3.3 Elect partition strategy . Join_Group response, isLeader 4. Sync_Group Request Sync_Group Response 5. Poll offset/message, HeartBeat response offset/heartbeat/message ClientCoordinator Broker(Min Load) Broker(GroupCoordinator) Broker(To consumer)

关于__consumer_offset

__consumer_offset是一个特殊的topic, 用于存储每个topic中partition中client提交的offset。其中的数据保留时间通过offset.retention.minutes配置。如果consumer消费消息的间隔超过了配置时间, 则offset会丢失, consumer再次获取offset时会因为没有存量的offset而自动重置(auto.offset.reset)。该topic下的消息清理采用压缩策略(仅保留最新消息)。Kafka中会有定时清理任务清理过期的消费位移。

消息发送QoS

  1. at-least-once, 至少一次, 消息不会丢失, 但消息会重复;
  2. at-most-once, 至多一次, 消息不会重复, 但可能会丢失;
  3. exact-once, 恰好一次, 消息肯定被传输且只传输一次;(如果开发即时消息系统, 那么这个语义就是我们的目标)
    默认情况下, Kafka producer在发送时, 如果消息发送失败会自动进行重试, 重试过程可能会导致消息重复。而一旦发送成功, Kafka通过多副本机制保证消息一定会被保存。因此从consumer角度观察, producer发送的结果, 其QoS是at-least-once。如果需要exact-once, 则需要启用Kafka的幂等特性。

幂等

  1. 配置参数
    enable.idompotence=true
    retrics > 0
    max.in.flight.requests.per.connection <=5
    ack=-1

  2. 实现细节
    首先幂等是partition级别, broker端自动为producer分配一个PID, 并维护PID->分区(序列号 lastSeq) 的状态。当producer发送消息时, 必须携带该序列号newSeq。broker端收到消息时做校验:
    a. newSeq = lastSeq+1, broker接收;
    b. newSeq > lastSeq+1, 中间存在消息丢失, 抛出OutOfOrderException;
    c. newSeq < lastSeq+1, 消息存在重复, 直接丢弃即可.

事务消息

如果要实现跨parition的exact-once语义, 则需要基于事务消息。一般来说事务有ACID的特性, 但这个是数据库事务的通用场景。Kafka下消息需要考虑生产和消费, 这里的事务消息更多是生产端的事务消息。消费端可能会因为某些原因无法以事务的形式消费。比如:

  1. 对于采用日志压缩策略的主题而言, 事务中的消息被清理(对相同key的消息后写入的消息会覆盖之前写入的消息);
  2. 事务涉及的分区多个日志段, 如果老的日志分段被删除, 对应的消息也会消失;
  3. 消费者通过seek消费消息, 造成消息遗漏;
  4. 消费者在消费时没有消费到事务涉及的所有分区, 因此不能读取事务中的所有消息;
    总的来说, 事务保证了生产者可以以事务的方式实现消息发送的exact
    -once语义, 但消息清理和消费并未引入事务约束。

实现原理

  1. 开启幂等;
  2. 设置事务ID, transactional.id;
  3. 生产者通过事务ID得到PID和producer epoch, 进而实现跨生产者会话的消息发送和事务恢复。前者保证相同transactionId的生产者仅有1个可以有效发送消息, 后者保证如果事务消息发送后宕机新恢复出来的生产者可以继续提交或者终止事务。其中包含2个方面, 生产者的唯一性, 其关联的在途事务的可见性和可操作性。
  4. broker端为支持事务消息引入了事务协调器, 与组协调器类似, 用于处理事务的提交和终止。
  5. 具体交互流程如下
    发送事务消息交互细节

事务存储

  1. 日志存储按Topic, Partition和LogSegment层级存储, 事务消息也不例外;
  2. 与普通消息的区别是, 事务消息更多适用于发送一组消息的场景, 具体到LogSegment就是有一组连续的消息, 因此Kafka引入了ControlBatch消息来标志消息结束。
  3. 事务消息的开始在哪里呢? 严格来说, producer跨分区发送成功后, consumer是无法恢复出原有的顺序, 在分区级别仅可以做到与某个事务关联的一组消息(通过消息的属性标志是否为事务消息), 结束通过ControlBatch标志一组消息结束。

小结

本文讨论了Kafka发送消息的三种语义at-least-once, at-most-once, exact-once,并针对exact-once的单分区实现(幂等控制)和跨分区实现(事务消息)做简要介绍, 希望能帮助你梳理出Kafka broker端对消息发送QoS实现的基本脉络, 为进一步学习打基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1488372.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

为什么国内很多MCU公司,都在仿STM32?

做了单片机开发十多年了&#xff0c;STM32是我用过的单片机里面&#xff0c;最省心的。 用STM32做过的产品&#xff0c;至少10几个以上了。 其实不仅仅是STM32&#xff0c;还有STM8系列&#xff0c;也很稳。 我们无际单片机特训营好几个项目&#xff0c;都用了STM8和STM32系列。…

MySQL中有事务无法回滚的语句?

目录 0.从修改表结构语句开始 1.DDL(Data Definition Language) 数据定义语言 2.DCL(Data Control Language) 数据控制语言 3.在该事务还没提交时开启新事务 4.锁操作 5.行政声明语句 6.主从复制的从机操作 7.如何避免出现隐式提交导致的错误 0.从修改表结构语句开始 试…

Nuxt3:useFetch在服务端及客户端重复请求问题

一、问题描述 在页面setup中调用$http.get&#xff08;封装了useFetch&#xff09;&#xff0c;发现不仅在服务端发送了接口请求&#xff0c;而且在客户端也重新发送了一遍接口请求&#xff0c;造成资源浪费及页面加载缓慢。 二、问题原因 首先看一下Nuxt 的useFetch文档&…

高光谱遥感学习入门丨高光谱数据处理基础、Python和Matlab高光谱遥感数据处理

目录 ①Python高光谱遥感数据处理与高光谱遥感机器学习方法深度应用 ②Matlab高光谱遥感、数据处理与混合像元分解实践技术应用 ③高光谱遥感数值建模技术及在植被、水体、土壤信息提取领域应用 更多应用 高光谱遥感信息对于我们认识世界具有重要意义。尽管大部分物质在人眼…

(C语言)qsort函数详解

目录 1. qsort解释 2. qsort实例 2.1 qsort排列整形数组类型&#xff1a; 2.2 qsort排列结构体类型数据&#xff08;字符串&#xff09;&#xff1a; 2.3 qsort排列结构体类型数据&#xff08;整形&#xff09;&#xff1a; 1. qsort解释 我们可以进入网站&#xff1a;qso…

局域网如何远程?

局域网远程一直是许多用户在处理远程连接需求时面临的一个难题。随着技术的不断进步&#xff0c;一种名为“天联”的组网解决方案应运而生。天联组网具有操作简单、跨平台应用、无网络要求以及独创的安全加速方案等独特优势&#xff0c;在解决各行业客户的远程连接需求方面发挥…

【JAVA重要知识 | 第二篇】一篇文章读懂Java锁机制(含CAS思想、AQS机制)

文章目录 2.一篇文章读懂Java常用的锁机制2.1锁介绍2.1.1定义2.1.2相关概念 2.2锁的种类2.2.1按功能层面分&#xff08;1&#xff09;共享锁/排他锁/读写锁 2.2.2按性能和线程安全分&#xff08;1&#xff09;乐观锁/悲观锁&#xff08;2&#xff09;偏向锁/轻量级锁(自旋锁)/重…

盘点:国家智能算力中心

文章目录 1. Main2. My thoughtsReference 1. Main 按照《中国算力白皮书&#xff08;2022年&#xff09;》的定义&#xff0c;算力主要分为四部分&#xff1a;通用算力、智能算力、超算算力、边缘算力。通用算力以CPU芯片输出的计算能力为主&#xff1b;智能算力以GPU、FPGA、…

如何本地创建websocket服务端并发布到公网实现远程访问

文章目录 1. Java 服务端demo环境2. 在pom文件引入第三包封装的netty框架maven坐标3. 创建服务端,以接口模式调用,方便外部调用4. 启动服务,出现以下信息表示启动成功,暴露端口默认99995. 创建隧道映射内网端口6. 查看状态->在线隧道,复制所创建隧道的公网地址加端口号7. 以…

使用easyexcel填充模板数据,并导出excel

文章目录 前言一、制作模板二、前端代码三、后端代码总结 前言 导出excel功能非常场景&#xff0c;本片文章记录如何使用模板填充数据后再导出。因直接导出excel数据样式不符合要求&#xff0c;所以做了模板填充然后再导出excel。 效果如下&#xff1a; 一、制作模板 注意&a…

2024年腾讯云服务器优惠券领取入口及使用教程

随着云计算技术的不断发展&#xff0c;越来越多的企业和个人选择将业务迁移到云端。腾讯云作为国内领先的云计算服务提供商&#xff0c;为了吸引用户上云&#xff0c;经常推出多种优惠活动&#xff0c;其中就包括服务器优惠券&#xff0c;本文将为大家分享腾讯云服务器优惠券的…

华为配置基于VLAN限速示例

华为配置基于VLAN限速示例 组网图形 图1 流量监管配置组网图 表1 Switch为上行流量提供的QoS保障 流量类型 CIR(kbps) PIR(kbps) DSCP优先级 语音 2000 10000 46 视频 4000 10000 30 数据 4000 10000 14 ^^^ 流分类简介配置注意事项组网需求配置思路操作步…

恒峰-智能高压森林应急消防泵:守护森林安全的绿色战士

在茂密的森林中&#xff0c;每一棵树木都是大自然的精灵&#xff0c;它们为我们提供氧气、净化空气、保持水土&#xff0c;是地球上不可或缺的生命之源。然而&#xff0c;当火灾肆虐时&#xff0c;这些树木也会成为我们的噩梦。为了保护森林资源&#xff0c;我们需要一种高效、…

TikTok外贸系统的核心功能及其源代码分享!

随着全球化的不断推进&#xff0c;外贸业务成为越来越多企业的增长动力&#xff0c;TikTok作为一个全球性的社交媒体平台&#xff0c;其用户基数庞大、活跃度高&#xff0c;为外贸业务提供了无限的商机。 为了帮助企业在TikTok上更好地开展外贸业务&#xff0c;TikTok外贸系统…

数据持久层框架:MyBatis

数据持久层框架&#xff1a;MyBatis 前言入门SqlSessionFactorySqlSession作用域&#xff08;Scope&#xff09;和生命周期 配置properties&#xff08;属性&#xff09;settings&#xff08;设置&#xff09;typeAliases&#xff08;别名&#xff09;typeHandlers&#xff08;…

Deep Learning相关概念介绍

工具&#xff1a; Anaconda: anaconda.com/products/individual。我理解是一个基于Python的AI程序开发环境&#xff0c;其作用类似于google notebook。区别是google notebook是在网页上&#xff0c;而Anaconda一般是安装在自己的服务器上。Jupyter Notebooks Anaconda激活深度…

Linux - 安装 maven(详细教程)

目录 一、下载二、安装三、配置环境变量四、镜像资源配置 一、下载 官网&#xff1a;https://maven.apache.org/download.cgi 打开 maven 的官网下载页面&#xff0c;点击 bin.tar.gz 文件链接 即可下载最新版本的 maven 如果想要下载旧版本的 meven&#xff0c;则点击 Maven…

JavaScript中call和apply函数方法

看下下面这个代码示例&#xff1a; javascript const lufthansa {airline: Lufthansa,iataCode: LH,bookings: [],book(flightNum, name) {console.log(${name} booked a seat on ${this.airline} flight ${this.iataCode}${flightNum});}, };lufthansa.book(239, ‘IT知识一…

python实现有限域GF(2^8)上的乘法运算

有限域GF(2^8)上的乘法运算可以看成多项式的乘法 5e转换成二进制为0101 1110&#xff0c;对应的多项式为x^6x^4x^3x^2x 3f转换成二进制为0011 1111&#xff0c;对应的多项式为x^5x^4x^3x^2x1 将这两个多项式相乘再模多项式x^8x^4x^3x1得到结果为1110 0101&#xff0c;转换为…

【扩散模型】生成模型中的Residual Self-Attention UNet 以及 DDPM的pytorch代码

参考&#xff1a; [1] https://github.com/xiaohu2015/nngen/blob/main/models/diffusion_models/ddpm_cifar10.ipynb [2] https://www.bilibili.com/video/BV1we4y1H7gG/?spm_id_from333.337.search-card.all.click&vd_source9e9b4b6471a6e98c3e756ce7f41eb134 TOC 1 UNe…