kafka入门到精通

news2024/10/3 6:32:08

文章目录

  • 一、kafka概述?
    • 1.定义
    • 1.2消息队列
      • 1.2.1 传统消息队列的使用场景
      • 1.2.2 消息队列好处
      • 1.2.3 消息队列两种模式
    • 1.3 kafka基础架构
  • 二、kafka快速入门
    • 1.1使用docker-compose安装kafka
    • 1.2测试访问kafka-manager
    • 1.3 查看kafka版本号
    • 1.4 查看zookeeper版本号
    • 1.5 扩展kafka的broker
    • 1.6 使用kafka
    • 1.7 测试生产者和消费者
    • 1.8 到zk 中查看节点信息,如下
    • 1.9 kafka群起脚本
  • 三、kafka架构深入
    • 1.kafka数据文件的存储
    • 2.kafka生产者
      • 2.1 分区策略
      • 2.2 数据可靠性保证
        • 2.2.1.**副本数据同步策略**
        • 2.2.2.ISR队列
        • 2.2.3 ack应答机制
        • 2.2.4 故障处理细节
      • 2.3Exactly Once语义(精准一次)
    • 3.kafka消费者
      • 3.1消费方式
      • 3.2分区分配策略(针对group)
        • 1.round robin
        • 2.range(默认分区分配策略)
      • 3.3offet的维护
      • 3.4消费者组案例
    • 4.kafka高效读写数据
    • 5.zk在kafka中的作用
    • 6.kafka事务
      • 6.1producer事务
      • 6.2comsumer事务(很少聊)
  • 四、kafka的API
    • 1.producer API
      • 1.1消息发送流程
      • 1.2.异步发送api
      • 1.3.同步发送api
    • 2.consumer API
    • 3.自定义interceptor
  • 五、kafka监控
    • **kafka eagle**
  • 六、flume对接kafka
  • 七、kafka面试题


文章整理自:尚硅谷kafka教程

一、kafka概述?

1.定义

kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。

1.2消息队列

1.2.1 传统消息队列的使用场景

在这里插入图片描述

1.2.2 消息队列好处

1.解耦
允许程序独立的扩展或两边的处理过程,只要确保它们遵守相同的接口约束

2.可恢复性
*系统一部分组件失效时,不会影响整个系统。消息队列降低了进程间的耦合度,所以即使一个消息处理进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理

3.缓冲
有助于控制和优化数据流经过系统的速度,解决生产消息远大于消费消息处理速度不一致的问题

4.灵活性(kafka服务扩缩容)&削峰处理
在访问量突增的情况下,应用仍然要继续发挥作用,但这样的突发流量并不常见。
如果因为以处理这类峰值的标准来部署应用则会造成巨大的资源浪费。使用消息队列能够使关键应用顶住突发访问压力,而不会因为突发的超负荷请求使整个服务崩溃。

5.异步通信
很多情况,用户也不需要立即处理消息。消息队列提供了异步处理机制,允许程序把一个消息放入队列,但并不需要立即处理。向消息队列中放入大量消息,在需要的时候程序再去进行处理

1.2.3 消息队列两种模式

点对点模式(一对一,消费者主动拉取数据,消息收到后清除消息)

消息生产者发送消息到queue中,消息消费者从queue中取出消息并消费消息。
消息被消费后,queue中不再存储该消息,所以消费者不可能消费到已经被消费的消息。
queue支持存在多个消费者,但对于一个消息,只会有一个消费者进行消费。

在这里插入图片描述

发布/订阅模式(一对多,消费者消费数据后不会清除消息)

消息生产者(发布)将消息发布到topic中,同时有多个消费者(订阅)消费该消息。
和点对点模式不同,发布到topic的消息会被所有订阅者消费
在这里插入图片描述
发布订阅模式也有两种:
1.消费者主动拉取;
缺点:消费者不知道topic中是否有消息,需要定时去轮询,比较浪费资源,所以出现了主动推送的模式
2.消息队列主动推送(如微信公众号)

1.3 kafka基础架构

在这里插入图片描述
topic:主题,对消息进行分类
partition:分区,主要提高kafka集群负载均衡,同时也提高了并发量
leader:针对于分区,消费者连接kafa只会连接leader
follower:针对于分区,仅仅数据备份,同一个partition的leader和follower一定不在同一台kafka服务上
comsumer goup:消费组,提高消费能力。多个消费者在同一个group时,消费消息时,一个分区的消息只能被同一个消费组的同一个消费者消费;消费者group中消费者的个数等于topic的partition数时,消费能力最合理,group中消费者数量大于partition分区数时,会造成资源浪费,多余消费者依然无法消费到消息。
zookeeper:管理kafa集群信息,存储消费位置信息(0.9版本前);
如:消费者A需要消费topic-partition0的10条消息,在消费到第5条时挂了,这时候消费进度的信息保存在zk里和内存中;
0.9版本后,offset存储在kafka集群的系统级topic中(默认存储磁盘7天),有kafka集群维护,主动拉取时高并发情况下对zk访问压力较大。

二、kafka快速入门

kafka的jar下载

1.1使用docker-compose安装kafka

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.58.100 # 不能通过hostname来配置(消费者和生产者识别不到)
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  kafka-manager:
    image: sheepkiller/kafka-manager
    environment:
      ZK_HOSTS: zookeeper:2181
    ports:
      - "19000:9000"

1.2测试访问kafka-manager

192.168.253.100:19000
在这里插入图片描述
出现以上问题 一般是docker-compose.yml文件中的kafka配置ip地址有误

添加kafka节点
在这里插入图片描述

1.3 查看kafka版本号

docker exec kafka_compose-kafka-1 find / -name *kafka_* | head -1 | grep -o ‘\kafka[^\n]*’
在这里插入图片描述

1.4 查看zookeeper版本号

docker exec kafka_compose-zookeeper-1 pwd
在这里插入图片描述

1.5 扩展kafka的broker

启动时指定kafka的broker数量

#端口一致所以只能启动一个broker,可能需要配置swarm网络。未亲测
docker-compose up --scale kafka=3 -d 

查看kafka扩展后的容器名称

docker ps;

CONTAINER ID   IMAGE                       COMMAND                   CREATED         STATUS         PORTS                                                                   NAMES
0b32ee5a3744   wurstmeister/kafka          "start-kafka.sh"          9 minutes ago   Up 9 minutes   0.0.0.0:32777->9092/tcp, :::32777->9092/tcp                             kafka_compose-kafka-4
0a655887b672   wurstmeister/kafka          "start-kafka.sh"          9 minutes ago   Up 9 minutes   0.0.0.0:32778->9092/tcp, :::32778->9092/tcp                             kafka_compose-kafka-1
e8fcd6cafa2c   sheepkiller/kafka-manager   "./start-kafka-manag…"   9 minutes ago   Up 9 minutes   0.0.0.0:19000->9000/tcp, :::19000->9000/tcp                             kafka_compose-kafka-manager-1
402829709dc9   wurstmeister/zookeeper      "/bin/sh -c '/usr/sb…"   9 minutes ago   Up 9 minutes   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp   kafka_compose-zookeeper-1
fbee5d80662b   wurstmeister/kafka          "start-kafka.sh"          9 minutes ago   Up 9 minutes   0.0.0.0:32779->9092/tcp, :::32779->9092/tcp                             kafka_compose-kafka-2
8c78ffde4538   wurstmeister/kafka          "start-kafka.sh"          9 minutes ago   Up 9 minutes   0.0.0.0:32780->9092/tcp, :::32780->9092/tcp                             kafka_compose-kafka-3

1.6 使用kafka

需要进入一个容器

docker exec -it kafka_compose-kafka-1 /bin/sh

创建topic:

kafka-topics.sh --create --topic topic001 --partitions 4 --zookeeper zookeeper:2181 --replication-factor 2

查看topic

#查看一个topic
kafka-topics.sh --list --zookeeper zookeeper:2181 topic001
#查看所有topic
kafka-topics.sh --list --zookeeper zookeeper:2181

删除topic

# 需要配置server.properties的delete.topic.enable=true
kafka-topics.sh --delete --topic topic002 --zookeeper zookeeper:2181

查看刚刚创建的topic的情况,borker和副本情况

kafka-topics.sh --describe --zookeeper zookeeper:2181 topic001

1.7 测试生产者和消费者

(1) 启动消费者客户端

kafka-console-consumer.sh --topic topic001 --bootstrap-server kafka_compose-kafka-1:9092,kafka_compose-kafka-2:9092,kafka_compose-kafka-3:9092,kafka_compose-kafka-4:9092
# --from-beginning 参数会从topic开始的位置消费,如果不指定,不会消费当前时刻之前的信息

启动后控制台不会打印消息,因为没有生产者生产消息。
(2) 启动生产者并且发送消息客户端

kafka-console-producer.sh --topic topic001 --broker-list kafka_compose-kafka-1:9092,kafka_compose-kafka-2:9092,kafka_compose-kafka-3:9092,kafka_compose-kafka-4:9092

现在已经进入了生产消息的命令行模式,输入一些字符串然后回车,再去消费消息的控制台窗口看看,已经有消息打印出来,说明消息的生产和消费都成功了。

创建kafka集群时,需要勾选启动JMX PORT,broker通讯需要用到
在这里插入图片描述

1.8 到zk 中查看节点信息,如下

1.安装prettyZoo可视化软件
prettyZoo可视化安装

2.连接并查看zookeeper情况
在这里插入图片描述

1.9 kafka群起脚本

#/bin/bash
case #1 in
"start"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo "***********************$1***************************"
		ssh $1 "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/mudole/kafka/config/server.properties"	
	done
};;

"stop"){
	for i in hadoop102 hadoop103 hadoop104
	do
		echo "***********************$1***************************"
		ssh $1 "/opt/module/kafka/bin/kafka-server-stop.sh "	
	done
};;

esac

kafka无法启动时,主要是看server.log日志

三、kafka架构深入

生产者生产消息时,如主题不存在,默认会创建一个分区和一个副本。在server.properties中进行修改
kafka集群可以保证分区有序,不能保证全局数据有序

1.kafka数据文件的存储

.log文件和.index文件

kafka分区的两个重要文件
0000000000000000.log只存储数据
0000000000000000.index记录消费数据offet

1.log文件默认存储大小1g,超过1个g了,生成新文件讲如何命名?
.index 和 .log 文件的命名规则就是当前文件的最小offset值(偏移量值)

2.怎么快速定位到想要消费的位置?
引入分片和索引机制。
分片规则:log文件1g后会新增分片,每片段都包含一个对应的.log和.index文件
索引机制:index文件中,存储了每条消息的id、起始偏移量和消息的大小。
文件索引原理:index文件通过二分查找找到是哪个消息,通过消息去log文件中找到消息内容

2.kafka生产者

2.1 分区策略

1.分区原因
方便在集群扩展
提高并发,可以以partition为单位进行读写

2.分区原则(3种)
在这里插入图片描述
1).在指定partition分区时,直接将数据存放在指定的分区中
2).未指定partition分区时,根据key的值hash后取余topic的分区数,得到存放数据的partition
3).未指定partition分区和key时,第一次调用时随机生成一个整数(后面如果没有指定partition和key时,会在这个整数的基础上自增),根据这个值与topic的partion数量取余得到存放数据的partition。
这是round-robin算法。

2.2 数据可靠性保证

topic的每个partition收到producer发送的数据后,都会向producer发送akc(acknowledgment确认收到),如果producer收到ack,就会发送下一轮数据,否则重复发送

kafka何时发送akc方案:
在这里插入图片描述

2.2.1.副本数据同步策略

在这里插入图片描述

全同步策略:
5台机器最多容忍4台机器故障,topicA分区下的5个副本(leader+follower)分别位于5台机器上且都是全量数据,5个副本中只要保证1个正常,那么kafka数据就是稳定的。则需要n+1个副本

超半数同步策略:
同样如果要容忍4台机器故障且kafka可用,则至少有一个副本在4台故障后依然是有topicA的全量数据,最坏情况下,这4台故障机器都是半数已经同步的机器,那么为了保证kafka数据稳定且可用需要4*2+1个副本。

kafka最终选择的方案:所有fowoller同步完成后发送ack
1.优点:
解决数据冗余:同样为了容忍n台节点故障,超半数同步策略需要2n+1个副本,而全同步策略只需要n+1个副本,针对kafka的使用场景,每个分区都有大量的数据,第一种方案会造成大量数据冗余
2.缺点:
网络延迟:虽然全量同步方案网络延迟高,但对kafka的影响较小

2.2.2.ISR队列

针对kafka选择全量同步方案时,网络延迟较高的问题,kafka做了优化
ISR队列(in-sync replicats):作用1在生产消息时快速响应、2在leader选举时做了优化
在0.9版本以前:ISR队列中存储的副本取决于两个因素,即同步数据快的follower、同步数据多的follower将会优先存储在ISR队列中,以备leader选举时进行挑选,那些同步慢、同步数据小的副本将会被剔除,不被考虑作为下一任leader

在0.9版本及以后:ISR队列只考同步快的副本进入队列

  • 为什么0.9版本后ISR队列取消了follower同步数量的条件?
    producer批量发送消息时,如果这个batch数量大于ISR同步的数量,那么久会造成延迟且数据不在范围内,可能就会从ISR中批量剔除副本,会造成频繁的ISR队列进出和zk的写入

现任版本:Leader维护了一个动态的ISR队列,意为和leader保持同步的follower集合。当ISR中的leader存储完producer的消息后,leader会给follower发送ack,如果follower长时间没有从leader同步数据,将会被剔除ISR队列(该时间阈值由replicat.lag.time.max.ms参数设定),一旦收到了ISR队列中所有follower的ACK,该消息就被确认commit了,leader将增加HW并且想producer发送ACK。
leader故障后,会从ISR队列中重新选区leader。

2.2.3 ack应答机制

对于某些不太重要的数据,对数据可靠性不是很高,能够容忍少量数据丢失,所以没有必要等ISR中所有follower全部接收成功。
kafka提供了3中可靠性的级别,用户根据可靠性和延迟的要求进行权衡进行选择
acks参数配置
acks=0时,producer不等待broker的ack,这种操作延迟最低,broker接收到消息还没写入磁盘就返回,当broker发生故障时有可能丢失数据
acks=1时,producer等待broker的ack,partition的leader落盘成功后返回ack,如果follower在同步成功前leader发生故障有可能丢失数据
acks=-1(all)时,producer等待broker的ack,partition的leader和follower全部落盘成功后才会返回ack。但是如果follower同步完成后,broker返回producer的ack之前,leader发生故障,那么会造成数据重复;当ISR中没有可用的follower时,leader返回ack后此时leader宕机并未将消息同步给其他ISR之外的follower时,就会丢失数据

2.2.4 故障处理细节

在这里插入图片描述
LEO:每个副本最大的offset
HW:高水位,是指消费者能见到的最大offset,ISR队列中最小的LEO;保证消费者获取数据一致性;保证副本之间的数据一致性

1.follower故障
follower发生故障后会被临时剔除ISR,等待follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件中高于HW的部分截取掉,从HW开始想leader同步。等该follower的LEO大于等于该topic的HW,即follower追上leader后,就可以重新加入ISR
2.leader故障
leader发生故障后,会从ISR中选举一个新的leader,之后为保证副本之前的一致性,其余的follower会先将各自的log文件中高于HW部分截取掉,然后重新想leader同步数据

这里只能保证副本之间数据的一致性。并不能保证数据不丢失或重复(ack决定)。

2.3Exactly Once语义(精准一次)

将kafkaserver的ack级别设置为-1,可以保证producer到server之间不会丢失数据,即是at least once语义;
相对的将ack级别设置为0,可以保证生产者每条消息只会发送一次,即at most once语义;
at least once可以保证数据不丢失,但是不能保证数据不重复;
at most once可以保证数据不重复,但是不能保证数据不丢失;
但对于一些非常重要的消息,比如说交易数据,下游的消费者要求数据即不能重复也不能丢失,即是Exactly Once语义;

kafka在0.11以前,对此是无能为力,只能保证数据不丢失,在消费者对数据做全局去重。对于多个下游应用的情况下,每个应用都要单独去重,这对性能造成了很大影响。

0.11之后,引入了幂等性;指的是producer无论向server发送多少次重复数据,server端都只会持久化一条。幂等性+at least once = exactly once

3.kafka消费者

3.1消费方式

1.comsumer采用pull的方式从broker拉去数据;pull可以根据消费能力调整消费速率。
2.broker向comsumer push的方式很难适应comsumer,因为push的频率是由broker决定的。push频率过快会comsumer来不及消费,表现为拒绝服务或网络阻塞。

pull不足:kafka没有消息被消费时,会陷入循环中,一直返回空数据,会造成资源浪费。kafka引入了timeout参数,comsumer pull返回空后,在timeout时间之后再去拉去。

3.2分区分配策略(针对group)

一个消费组有多个comsumer,一个topic有多个partition,消费时会涉及到partiion分配问题。

1.round robin

按照消费组来进行轮训分配
使用前提:消费组消费的是一个topic

2.range(默认分区分配策略)

根据topic来进行范围分配,多个主题被消费时,消费者消费数据不对等问题

当comsumer个数发生变化时,都会触发分配策略,重新分配消费分区。
当comsumer个数大于partition个数时,也会触发重新分配,会有多余的comsumer分配不到partition。

3.3offet的维护

comsumer group+topic+partition确定一个offset

3.4消费者组案例

4.kafka高效读写数据

1.顺序写磁盘
2.零拷贝

5.zk在kafka中的作用

controller:broker抢占zk中的controller,谁先来谁就是controller,controller是哪个broker无所谓,kafka的数据都是共享的,只是由这个身份为controller的broker来维护与zk的信息

在这里插入图片描述

6.kafka事务

kafka从0.11版本开始支持事务。事务可以保证kafka在exactly once语义的基础上,生产者和消费者可以跨分区和会话,要么全部成功,要么全部失败。

6.1producer事务

为了实现跨分区会话的事务,需要引入一个全局的transactionID(producer客户端自己生成),并且producer的pid和tid绑定,这样当producer重启后就可以通过正在进行的tid来获得原来的pid;
为了管理transaction,kafka引入了一个新的组件transaction coordinator。producer就是通过transaction coordinator交互获得TID对应的任务状态;coordinator还负责将事务所有写入kafka内部的一个topic,这样即使整个服务重启,由于事务状态可以保存,进行中的事务状态可以恢复,从而继续运行。

6.2comsumer事务(很少聊)

上述的事务机制主要是从producer方面去考虑,对于comsumer而言,事务的保证相对较弱,尤其是无法保证commit的信息被精确消费。这是由于comsumer可以通过offset访问任何信息,而且segmentFile的生命周期不同,同一事务的消息可能会出现重启后被删除的情况。(如segmnetFile01刚好7天过期,consumer批量消费时,恰好跨了两个segmentFile,重启后发现segmentFile01刚好过期,则无法读取到过期的数据)

四、kafka的API

1.producer API

1.1消息发送流程

kafka的producer消息时异步发送的。在消息发送过程中涉及到了两个线程main和sender,以及线程共享的一个变量RecordAccumulator。main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator中拉去并发送到broker。

在这里插入图片描述
相关参数:
batch.size: 只有数据累计到这个值的时候,sender才会发送数据
linger.ms:当batch.size迟迟没有累计到这个值,到了linger.ms时间是,sender也会发送

1.2.异步发送api

1.导入依赖
在这里插入图片描述
2.编写代码
需要用到的类:
KafkaProducer:创建一个生产者对象用来发送数据
ProducerConfig:获取所需的一系列参数
ProducerRecord:每一条消息封装成为一个对象

1.3.同步发送api

2.consumer API

3.自定义interceptor

五、kafka监控

kafka eagle

六、flume对接kafka

七、kafka面试题

kafka基础架构

1.Kafka中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)代表什么?

ISR:副本同步队列,速率和leader相差小于10秒的follower集合
OSR:非副本同步队列,速率和leader相差大于10秒的follower集合
AR:所有分区的follower,AR=ISR+OSR

2.Kafka 中的 HW、LEO 等分别代表什么?

HW:高水位,根据同一分区中,最低的LEO所决定,是消费者能见可消费的数据
LEO: 每个副本最高的offset
在leader、follower节点故障后,partition会对HW和每个副本的LEO进行调整

3.Kafka的用途有哪些?使用场景如何?

1.用户追踪:根据用户在web或app上的操作,将这些操作记录到topic中,消费者订阅这些消息做实时的分析和数据挖掘
2.日志收集:通过kafka对各个服务的日志进行收集,再开放给comsumer
3.系统消息:缓存消息
运营指标:记录运营监控数据,搜集操作应用数据的集中反馈,如报错和报告

4.Kafka中是怎么体现消息顺序性的?

每个分区内,每条消息都有offset,所以只能在同一分区内有序;不同的分区无法做到消息的顺序性

5.“消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?

是。超过分区数的消费者是接受不到数据的,只要有消费者接入 就会触发分区分配策略

6.有哪些情形会造成重复消费?或丢失信息?

重复消费:先处理业务后提交offset,可能会造成重复消费
丢失信息:先提交offset再处理业务,可能会造成信息丢失

7.Kafka 分区的目的?

对kafka集群来说,分区做到负载均衡;对于消费者来说,可以提高并发度,提高读取效率

8.Kafka 的高可靠性是怎么实现的?

为了实现高可靠性,kafka使用了订阅模式,并且使用ISR和ack应答机制
能进入ISR中的follower和leadeer之间同步速率相差小于10秒
当ack=0时, producr不等待broker的ack,不管数据有没有写成,都不再重发这个数据
当ack=1时,broker会等leader写完数据后想producer发送ack,但不会等follower同步数据;在follower数据未同步前,leader挂掉,producer会再次发送新的消息到新的leader中,old的leader未同步的消息就会丢失
当ack=all(-1)时,broker会等到leader和isr中的所有follower都同步完成后再想producer发送ack;当follower数据同步完成返回producer ack前,leader挂掉,producer数据重发就会造成数据重复。

9.topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?

可以增加

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3

10.topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?

不可以,现有的分区数据难以处理

11.简述Kafka的日志目录结构?

每一个分区对应一个文件夹,命名为topic-0或topic-1,,每个文件夹内有.index文件和.log文件;
.log文件存储数据
.index文件存储.log文件的数据id,起始偏移量和数据大小;通过index

12.如何解决消费者速率低的问题?

增加topic分区的数量、增加消费者个数

13.Kafka的那些设计让它有如此高的性能??

1.kafka是分布式的消息队列
2.对log文件进行了segment,并对segment文件进行索引
3.对于单节点使用了顺序读写,速度可达600M/S
4.引入了零拷贝,在os系统上就能完成读写操作,无需进入kafka应用(用户态)

14.kafka启动不起来的原因?

在关闭kafka时,先关闭了zk,zk中保留了kafka的id信息,会导致kafka下一次启动时报节点已经存在
把zk中的zkdata/version-2的文件删除就可以了

15.聊一聊Kafka Controller的作用?

负责kafka集群上下线工作,所有topic的副本分区分配和leader选举

16.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?

1.kafka的controller选举,先到先得
2.partitioin的leader选举,从isr中随机选取

17.失效副本是指什么?有那些应对措施?

失效副本:同步速率比leader相差大于10秒的副本
将失效的副本剔除ISR队列,进入OSR队列
失效副本等于leader同步速率小于10秒后从新进入ISR队列

18.Kafka消息是采用Pull模式,还是Push模式?

在producer阶段,采用的是push模式
在comsumer阶段,采用的是pull模式
comsumer在pull模式下:
优点:
comsumer可以根据自己消费能力调整消费速率,避免了broker push到comsumer时消费不及时而导致的崩溃问题;
缺点:consumer要时不时的去询问broker是否有新数据,容易发生死循环,内存溢出
解决办法:拉去不到数据时,增加下次拉去的时间,有api

19.Kafka创建Topic时如何将分区放置到不同的Broker中?

1.副本数不能超过broker数量
2.第一个分区是controller从broker中随机选取一个,然后其他分区相对0号分区依次向后移,第一个分区是用nextReplicatShift决定,而这个数也是随机产生

20.Kafka中的事务是怎么实现的?☆☆☆☆☆

kafka有两种事务:producer事务和consumer事务
producer事务是为了解决kafka跨分区跨会话的问题
kafka早起版本不能跨分区是因为producer的pid是kafka server根据producer生成的
为了解决这个问题,在java代码中给producer指定id,也就是transaction id,简称TID
我们将TID和PID进行绑定,在producer带着TID和PID第一次想broker注册时,broker会记录TID,并生成一个新的组件_transaction_state用来保存TID的事务状态信息
当producer重启后,就会带着TID和新的PID想broker发起请求,当发现TID一致时,producer就会获取之前的PID,将新的PID覆盖掉,并且去上一次事务的状态信息,从而继续上次的工作;
consumer事务相对于producer的事务相对弱一点,需要先确保consumer的消费和提交位置为一致且具有事务功能,才能保证数据的完成,不然就会造成数据的丢失或重复

21.Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?

拦截器>序列化器>分区器
拦截器拦截处理无效信息
序列化加密数据
分区分配原则

22.Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
在这里插入图片描述

使用了2个线程:main线程和sender线程
main线程会依次经过拦截器、序列化器、分区器、将数据发送到RecordAccumlator(x线程共享变量),再有sender线程从RecordAccumlate中拉取数据并发送到kafka broker,
batch.size:只有数据累计到batch.size后,sender才会发送数据。
linger.ms:如果数据迟迟未达到batch.size,sender线程等待linger.ms之后发送数据

23.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

答案是:offset+1;测试证明:

生产者发送数据offset是从0开始的:如下
在这里插入图片描述
消费者消费的数据offset是从offset+1开始的:如下
在这里插入图片描述

24.当你使用 kafka-topics.sh 创建(删除)了一个 topic 之后,Kafka 背后会执行什么逻辑?

1)会在 zookeeper 中的/brokers/topics 节点下创建一个新的 topic 节点,如:/brokers/topics/first
2)触发 Controller 的监听程序
3)kafka Controller 负责 topic 的创建工作,并更新 metadata cache

25.Kafka 有内部的 topic 吗?如果有是什么?有什么所用?

有 __consumer_offsets,保存消费者offset

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/369552.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

win11右键新建菜单添加选项

需要操作 2 处注册表, 以下以在右键新建菜单中添加 .html 为例 在主键 HKEY_CLASSES_ROOT 中,搜索 .html 找到后 ,右键点击它,选 新建 ->项, 在这里插入图片描述 项目名字是:ShellNew 新建后&#x…

【Linux学习笔记】4.Linux 文件基本属性及文件与目录管理

前言 本章介绍Linux的文件基本属性和文件与目录管理。 Linux 文件基本属性 Linux 系统是一种典型的多用户系统,不同的用户处于不同的地位,拥有不同的权限。 为了保护系统的安全性,Linux 系统对不同的用户访问同一文件(包括目录…

Linux命令及CPU占用过高的定位分析思路

一、vim命令不要使用vim打开大文件,vim会一次性读取所有内容到内存,容易造成宿主机内存溢出。 打开文件前,可以使用du -h命令查看文件大小。一般,100MB以下为宜。1、普通模式j 向下30j 向下移动30行k 向上h 向左l 向右0 到行首^ 到…

3.15版本poi导致FileMagic文件找不到问题解决过程记录

maven中的dependencies和dependencyManagement的区别_shenzhou_yh的博客-CSDN博客 maven 中 dependencies 与 dependencyManagement 的区别_Jaemon的博客-CSDN博客_snapshot dependencies和artifact dependencies的区别 com.alibaba.excel.exception.ExcelAnalysisException: …

ubuntu/linux系统知识(37)systemd管理临时文件的方法systemd-tmpfiles

1、systemd-tmpfiles Linux产生大量的临时文件和目录,例如/tmp、/run 。systemd提供了一个结构化的可配置方法来管理临时文件和目录,即systemd-tmpfiles工具和配套的几个服务,以实现创建、删除和管理临时文件。 systemd创建了几个调用syste…

React(一):初识React、类组件、jsx的基础语法

React(一)一、初识React1.简单介绍2.React的三个依赖3.Hello React案例二、类组件1.定义类组件并渲染2.绑定事件函数(奇怪的this问题)3.数组形式数据的展示(电影案例)4.计数器案例三、jsx语法详解1.jsx的书…

【GUI】用于电动助力车性能分析的GUI(Matlab代码实现)

👨‍🎓个人主页:研学社的博客💥💥💞💞欢迎来到本博客❤️❤️💥💥🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密…

202302-第四周资讯

山川软件愿为您提供最优质的服务。 您的每一个疑问都会被认真对待,您的每一个建议都将都会仔细思考。 我们希望人人都能分析大数据,人人都能搭建应用。 因此我们将不断完善我们的DEMO、文档、以及视频,期望能在最大程度上快速帮助用户快速…

最新OpenMVG编译安装与逐命令运行增量式和全局式SfM教程

openmvg是一个轻便的可以逐步运行的SfM开源库,它同时实现了增量式和全局式两种算法。 说明文档地址:https://openmvg.readthedocs.io/en/latest/ github主页地址:https://github.com/openMVG/openMVG 1 编译安装 openmvg的安装比较简单&…

【centos7下部署mongodb】

一.安装环境 CentOS7MongoDB4.0.13正式版。 二.下载MongoDB 1.1 官网下载地址:https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-4.0.13.tgz 1.2 将压缩包通过xftp上传到服务器/opt目录,然后解压、改名 三. 配置环境变量及配置文件 3.1配置系…

8000+字,就说一个字Volatile

简介 volatile是Java提供的一种轻量级的同步机制。Java 语言包含两种内在的同步机制:同步块(或方法)和 volatile 变量,相比于synchronized(synchronized通常称为重量级锁),volatile更轻量级&…

2023最新版网络安全保姆级指南,手把手带你从零基础进阶渗透攻防工程师

前言 一份网络攻防渗透测试的学习路线,不藏私了! 1、学习编程语言(phpmysqljshtml) 原因: phpmysql可以帮助你快速的理解B/S架构是怎样运行的,只有理解了他的运行原理才能够真正的找到问题/漏洞所在。所以对于国内那些上来就说…

Java实现在线沟通功能

文章目录1、介绍 和 特点2、整合SpringBoot2.1、导入依赖2.2、websocket 配置类2.3、消息处理类2.4、启动服务2.5、前端代码:张三2.6、前端代码:李四3、效果4、小结1、介绍 和 特点 t-io是基于JVM的网络编程框架,和netty属同类,所…

MES系统需求误区,一文告诉你需求分析有哪些

在企业的实际应用中,对MES系统需求的分析常常会出现六个错误。 要求广泛,目标不明确由于对MES系统的概念和企业的实际运作不了解,导致企业在提出MES系统的要求时,常常会笼统而不明确,有时会混淆目标和需要。比如&#…

LabVIEW主VI前面板中显示或使用多个子VI

LabVIEW主VI前面板中显示或使用多个子VI想在程序中连接一个或多个子VI的前面板,但是当调用它们时,每个子VI在计算机屏幕上显示为一个新窗口。那么怎么能让每个子VI作为主VI前面板的一部分进行显示,而不是在屏幕上显示多个窗口?正在…

python读取tif图像+经纬度

python读取tif的包很多,但大都只能读出图像像素值,不能读取到经纬度信息。原因:TIFF 简单理解就是一种图像格式,类似于 jpg、png 等。GeoTIFF 就是在普通 TIFF 文件上增加了地理位置、投影信息、坐标信息等,常用于遥感…

BBS系统的设计与实现

技术:Java、JSP等摘要:BBS全称为Bulletin Board System,中文就是“电子公告板”。 BBS是一种电子信息服务系统。它向用户提供了一块公共电子白板,每个用户都可以在上面发布信息或提出问题,早期的BBS由教育机构或研究机…

电脑硬盘如何重新分区 ?教你两招磁盘分区方法

摘要:对于刚刚购买的电脑来说,有些厂商在装机的时候没有根据用户需求,就给硬盘随意分区了,有的分区划分的不是很合理,在使用过程中会遇到一些麻烦,那么电脑硬盘如何重新分区 ?本文将给大家详细介…

OpenShift 简介

OpenShift 是红帽 Red Hat 公司基于开源的云平台,是平台即服务(PaaS),是一种容器应用平台。允许开发人员构建、测试和部署云应用。该系统是在 K8S 核心之上添加工具,从而实现更快的应用开发、部署及扩展。 在 OpenShi…

leetcode 1675. Minimize Deviation in Array(最小化数组偏差)

数组里面有n个正整数,里面的数字可以无限次进行如下操作: 1.偶数可以除以2 2.奇数可以乘以2 数组中任意两元素差的最大值称为偏差。 把数组中的元素进行上面2种操作,使偏差最小。 思路: 数组中现有2种数字,一种是奇数…