Kafka核心原理第二弹——更新中

news2025/1/11 20:06:37

架构原理

一、高吞吐机制:Batch打包、缓冲区、acks

1. Kafka Producer怎么把消息发送给Broker集群的?

需要指定把消息发送到哪个topic去

首先需要选择一个topic的分区,默认是轮询来负载均衡,但是如果指定了一个分区key,那么根据这个key的hash值来分发到指定的分区,这样可以让相同的key分发到同一个分区里去,还可以自定义partitioner来实现分区策略

producer.send(msg); // 用类似这样的方式去发送消息,就会把消息给你均匀的分布到各个分区上去
producer.send(key, msg); // 订单id,或者是用户id,他会根据这个key的hash值去分发到某个分区上去,他可以保证相同的key会路由分发到同一个分区上去

知道要发送到哪个分区之后,还得找到这个分区的leader副本所在的机器,然后跟那个机器上的Broker通过Socket建立连接来进行通信,发送Kafka自定义协议格式的请求过去,把消息就带过去了

如果找到了partition的leader所在的broker之后,就可以通过socket跟那台broker建立连接,接着发送消息过去

Producer(生产者客户端),起码要知道两个元数据,每个topic有几个分区,每个分区的leader是在哪台broker上,会自己从broker上拉取kafka集群的元数据,缓存在自己client本地客户端上

kafka使用者的层面来考虑一下,我如果要把数据写入kafka集群,应该如何来做,怎么把数据写入kafka集群,以及他背后的一些原理还有使用过程中需要设置的一些参数,到底应该怎么来弄

在这里插入图片描述

2. 用一张图告诉你Producer发送消息的内部实现原理

每次发送消息都必须先把数据封装成一个ProducerRecord对象,里面包含了要发送的topic,具体在哪个分区,分区key,消息内容,timestamp时间戳,然后这个对象交给序列化器,变成自定义协议格式的数据

接着把数据交给partitioner分区器,对这个数据选择合适的分区,默认就轮询所有分区,或者根据key来hash路由到某个分区,这个topic的分区信息,都是在客户端会有缓存的,当然会提前跟broker去获取

接着这个数据会被发送到producer内部的一块缓冲区里

然后producer内部有一个Sender线程,会从缓冲区里提取消息封装成一个一个的batch,然后每个batch发送给分区的leader副本所在的broker
在这里插入图片描述

3. 基于Java API写一个Kafka Producer发送消息的代码示例

package com.zhss.demo.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerDemo {
	
	public static void main(String[] args) throws Exception {
		Properties props = new Properties();

		// 这里可以配置几台broker即可,他会自动从broker去拉取元数据进行缓存
		props.put("bootstrap.servers", "hadoop03:9092,hadoop04:9092,hadoop05:9092");  
		// 这个就是负责把发送的key从字符串序列化为字节数组
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 这个就是负责把你发送的实际的message从字符串序列化为字节数组
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("acks", "-1");
		props.put("retries", 3);
		props.put("batch.size", 323840);
		props.put("linger.ms", 10);
		props.put("buffer.memory", 33554432);
		props.put("max.block.ms", 3000);
		
		// 创建一个Producer实例:线程资源,跟各个broker建立socket连接资源
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

		ProducerRecord<String, String> record = new ProducerRecord<>(
				"test-topic", "test-key", "test-value");
		
		// 这是异步发送的模式
		producer.send(record, new Callback() {

			@Override
			public void onCompletion(RecordMetadata metadata, Exception exception) {
				if(exception == null) {
					// 消息发送成功
					System.out.println("消息发送成功");  
				} else {
					// 消息发送失败,需要重新发送
				}
			}

		});

		Thread.sleep(10 * 1000); 
		
		// 这是同步发送的模式
//		producer.send(record).get(); 
		// 你要一直等待人家后续一系列的步骤都做完,发送消息之后
		// 有了消息的回应返回给你,你这个方法才会退出来

		producer.close();
	}
	
}

4. 发送消息给Broker时遇到的各种异常该如何处理?

之前我们看到不管是异步还是同步,都可能让你处理异常,常见的异常如下:

LeaderNotAvailableException:这个就是如果某台机器挂了,此时leader副本不可用,会导致你写入失败,要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可

如果说你平时重启kafka的broker进程,肯定会导致leader切换,一定会导致你写入报错,是LeaderNotAvailableException

NotControllerException:这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可

NetworkException:网络异常,重试即可

我们之前配置了一个参数,retries,他会自动重试的,但是如果重试几次之后还是不行,就会提供Exception给我们来处理了

5. 发送消息的缓冲区应该如何优化来提升发送的吞吐量?

buffer.memory:设置发送消息的缓冲区,默认值是33554432,就是32MB

如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住

compression.type,默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销

6. 消息批量发送的核心参数batch.size是如何优化吞吐量?

batch.size,设置meigebatch的大小,如果batch太小,会导致频繁网络请求,吞吐量下降;如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里

默认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,可以自己压测一下

还有一个参数,linger.ms,这个值默认是0,意思就是消息必须立即被发送,但是这是不对的,一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去

但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力

7. 如何根据业务场景对消息大小以及请求超时进行合理的设置?

max.request.size:这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1mb,这个一般太小了,很多消息可能都会超过1mb的大小,所以需要自己优化调整,把他设置更大一些

你发送出去的一条大数据,超大的JSON串,超过1MB,就不让你发了

request.timeout.ms:这个就是说发送一个请求出去之后,他有一个超时的时间限制,默认是30秒,如果30秒都收不到响应,那么就会认为异常,会抛出一个TimeoutException来让我们进行处理

8. 基于Kafka内核架构原理深入分析acks参数到底是干嘛的

acks参数,其实是控制发送出去的消息的持久化机制的

如果acks=0,那么producer根本不管写入broker的消息到底成功没有,发送一条消息出去,立马就可以发送下一条消息,这是吞吐量最高的方式,但是可能消息都丢失了,你也不知道的,但是说实话,你如果真是那种实时数据流分析的业务和场景,就是仅仅分析一些数据报表,丢几条数据影响不大的

会让你的发送吞吐量会提升很多,你发送弄一个batch出,不需要等待人家leader写成功,直接就可以发送下一个batch了,吞吐量很大的,哪怕是偶尔丢一点点数据,实时报表,折线图,饼图

acks=all,或者acks=-1:这个leader写入成功以后,必须等待其他ISR中的副本都写入成功,才可以返回响应说这条消息写入成功了,此时你会收到一个回调通知

min.insync.replicas = 2,ISR里必须有2个副本,一个leader和一个follower,最最起码的一个,不能只有一个leader存活,连一个follower都没有了

acks = -1,每次写成功一定是leader和follower都成功才可以算做成功,leader挂了,follower上是一定有这条数据,不会丢失

retries = Integer.MAX_VALUE,无限重试,如果上述两个条件不满足,写入一直失败,就会无限次重试,保证说数据必须成功的发送给两个副本,如果做不到,就不停的重试,除非是面向金融级的场景,面向企业大客户,或者是广告计费,跟钱的计算相关的场景下,才会通过严格配置保证数据绝对不丢失

acks=1:只要leader写入成功,就认为消息成功了,默认给这个其实就比较合适的,还是可能会导致数据丢失的,如果刚写入leader,leader就挂了,此时数据必然丢了,其他的follower没收到数据副本,变成leader

9. 针对瞬间异常的消息重试参数有哪些需要考虑的点

有的时候一些leader切换之类的问题,需要进行重试,设置retries即可,而且还可以跟消息不丢失结合起来,但是消息重试会导致重复发送的问题,比如说网络抖动一下导致他以为没成功,就重试了,其实人家都成功了

所以消息重试导致的消费重复,需要你在下游consumer做幂等性处理,但是kafka已经支持了一次且仅一次的消息语义

另外一个,消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了,你现在收到回调失败了才在重试,此时消息就会乱序,所以可以使用“max.in.flight.requests.per.connection”参数设置为1,这样可以保证producer同一时间只能发送一条消息

两次重试的间隔默认是100毫秒,用“retry.backoff.ms”来进行设置

一般来说,某台broker重启导致的leader切换,是最常见的异常,所以尽可能把重试次数和间隔,设置的可以cover住新leader切换过来

10. Kafka Producer高阶用法(一):自定义分区

public class HotDataPartitioner implements Partitioner {

private Random random;

@Override
public void configure(Map<String, ?> configs) {
random = new Random();
}

@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String key = (String)keyObj;
List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
int partitionCount = partitionInfoList.size();
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
}

}

props.put(“partitioner.class, “com.zhss.HotDataPartitioner”);

测试发送

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test-topic

11. Kafka Producer高阶用法(二):自定义序列化

12. Kafka Producer高阶用法(三):自定义拦截器

二、Kafka Consumer选举与Rebalance实现原理

1. 一张图画清Kafka基于Consumer Group的消费者组的模型

每个consumer都要属于一个consumer.group,就是一个消费组,topic的一个分区只会分配给一个消费组下的一个consumer来处理,每个consumer可能会分配多个分区,也有可能某个consumer没有分配到任何分区

分区内的数据是保证顺序性的

group.id = “membership-consumer-group”

如果你希望实现一个广播的效果,你的每台机器都要消费到所有的数据,每台机器启动的时候,group.id可以是一个随机生成的UUID也可以,你只要让不同的机器的KafkaConsumer的group.id是不一样的

如果consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他,这个就是所谓的消费者rebalance的过程

2. 消费者offset的记录方式以及基于内部topic的提交模式

每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储

所以后来就是提交offset发送给内部topic:__consumer_offsets,提交过去的时候,key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact

也就是每个group.id+topic+分区号就保留最新的那条数据即可

而且因为这个__consumer_offsets可能会接收高并发的请求,所以默认分区50个,这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力,就好很多
在这里插入图片描述

3. 基于Java API写一个Kafka Consumer消费消息的代码示例

String topicName = “test-topic”;
String groupId = “test-group”;

Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092);
props.put(“group.id”, “groupId”);
props.put(“enable.auto.commit”,true);
props.put(“auto.commit.ineterval.ms”,1000);
// 每次重启都是从最早的offset开始读取,不是接着上一次
props.put(“auto.offset.reset”, “earliest”); 
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.SttringDeserializer”);

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));

try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000); // 超时时间
for(ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() +,+ record.key() +,+ record.value());
}
}
} catch(Exception e) {

}

4. Kafka感知消费者故障是通过哪三个参数来实现的?

heartbeat.interval.ms:consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作

session.timeout.ms:kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒

max.poll.interval.ms:如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了

5. 对消息进行消费时有哪几个参数需要注意以及设置呢?

fetch.max.bytes:获取一条消息最大的字节数,一般建议设置大一些

max.poll.records:一次poll返回消息的最大条数,默认是500条

connection.max.idle.ms:consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收

6. 消费者offset相关的参数设置会对运行产生什么样的影响?

auto.offset.reset:这个参数的意思是,如果下次重启,发现要消费的offset不在分区的范围内,就会重头开始消费;但是如果正常情况下会接着上次的offset继续消费的

enable.auto.commit:这个就是开启自动提交位移

7. Group Coordinator是什么以及主要负责什么?

每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance的,那么这个如何选择呢?

就是根据group.id来进行选择,他有内部的一个选择机制,会给你挑选一个对应的Broker,总会把你的各个消费组均匀分配给各个Broker作为coordinator来进行管理的

他负责的事情只要就是rebalance,说白了你的consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给你的这个consumer来进行消费

coordinator会尽可能均匀的分配分区给各个consumer来消费

8. 为消费者选择Coordinator的算法是如何实现的?

首先对groupId进行hash,接着对__consumer_offsets的分区数量取模,默认是50,可以通过offsets.topic.num.partitions来设置,找到你的这个consumer group的offset要提交到__consumer_offsets的哪个分区

比如说:groupId,“membership-consumer-group” -> hash值(数字)-> 对50取模 -> 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,大家可以找到__consumer_offsets的一个分区

__consumer_offset的分区的副本数量默认来说1,只有一个leader

然后对这个分区找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,接着就会维护一个Socket连接跟这个Broker进行通信

9. Coordinator和Consume Leader如何协作制定分区方案?

每个consumer都发送JoinGroup请求到Coordinator,然后Coordinator从一个consumer group中选择一个consumer作为leader,把consumer group情况发送给这个leader,接着这个leader会负责制定分区方案,通过SyncGroup发给Coordinator

接着Coordinator就把分区方案下发给各个consumer,他们会从指定的分区的leader broker开始进行socket连接以及消费消息
在这里插入图片描述

10. rebalance的三种策略分别有哪些优劣势?

这里有三种rebalance的策略:range、round-robin、sticky

0~8

order-topic-0
order-topic-1
order-topic-2

range策略就是按照partiton的序号范围,比如partitioin02给一个consumer,partition35给一个consumer,partition6~8给一个consumer,默认就是这个策略;

round-robin策略,就是轮询分配,比如partiton0、3、6给一个consumer,partition1、4、7给一个consumer,partition2、5、8给一个consumer

但是上述的问题就在于说,可能在rebalance的时候会导致分区被频繁的重新分配,比如说挂了一个consumer,然后就会导致partition04分配给第一个consumer,partition58分配给第二个consumer

这样的话,原本是第二个consumer消费的partition3~4就给了第一个consumer,实际上来说未必就很好

最新的一个sticky策略,就是说尽可能保证在rebalance的时候,让原本属于这个consumer的分区还是属于他们,然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略

consumer1:0~2 + 6~7
consumer2:3~5 + 8

11. Consumer内部单线程处理一切事务的核心设计思想

其实就是在一个while循环里不停的去调用poll()方法,其实是我们自己的一个线程,就是我们自己的这个线程就是唯一的KafkaConsumer的工作线程,新版本的kafka api,简化,减少了线程数量

Consumer自己内部就一个后台线程,定时发送心跳给broker;但是其实负责进行拉取消息、缓存消息、在内存里更新offset、每隔一段时间提交offset、执行rebalance这些任务的就一个线程,其实就是我们调用Consumer.poll()方法的那个线程

就一个线程调用进去,会负责把所有的事情都干了

为什么叫做poll呢?因为就是你可以监听N多个Topic的消息,此时会跟集群里很多Kafka Broker维护一个Socket连接,然后每一次线程调用poll(),就会监听多个socket是否有消息传递过来

可能一个consumer会消费很多个partition,每个partition其实都是leader可能在不同的broker上,那么如果consumer要拉取多个partition的数据,就需要跟多个broker进行通信,维护socket

每个socket就会跟一个broker进行通信

每个Consumer内部会维护多个Socket,负责跟多个Broker进行通信,我们就一个工作线程每次调用poll()的时候,他其实会监听多个socket跟broker的通信,是否有新的数据可以去拉取
在这里插入图片描述

12. 消费过程中的各种offset之间的关系是什么?

上一次提交offset,当前offset(还未提交),高水位offset,LEO

内存里记录这么几个东西:上一次提交offset,当前消费到的offset,你不断的在消费消息,不停的在拉取新的消息,不停的更新当前消费的offset,HW offset,你拉取的时候,是只能看到HW他前面的数据

LEO,leader partition已经更新到了一个offset了,但是HW在前面,你只能拉取到HW的数据,HW后面的数据,意味着不是所有的follower都写入进去了,所以不能去读取的

13. 自动提交offset的语义以及导致消息丢失和重复消费的问题

默认是自动提交

auto.commit.inetrval.ms:5000,默认是5秒提交一次

如果你提交了消费到的offset之后,人家kafka broker就可以感知到了,比如你消费到了offset = 56987,下次你的consumer再次重启的时候,就会自动从kafka broker感知到说自己上一次消费到的offset = 56987

这次重启之后,就继续从offset = 56987这个位置继续往后去消费就可以了

他的语义是一旦消息给你poll到了之后,这些消息就认为处理完了,后续就可以提交了,所以这里有两种问题:

第一,消息丢失,如果你刚poll到消息,然后还没来得及处理,结果人家已经提交你的offset了,此时你如果consumer宕机,再次重启,数据丢失,因为上一次消费的那批数据其实你没处理,结果人家认为你处理了

poll到了一批数据,offset = 65510~65532,人家刚好就是到了时间提交了offset,offset = 65532这个地方已经提交给了kafka broker,接着你准备对这批数据进行消费,但是不巧的是,你刚要消费就直接宕机了

其实你消费到的数据是没处理的,但是消费offset已经提交给kafka了,下次你重启的时候,offset = 65533这个位置开始消费的,之前的一批数据就丢失了

第二,重复消费,如果你poll到消息,都处理完毕了,此时还没来得及提交offset,你的consumer就宕机了,再次重启会重新消费到这一批消息,再次处理一遍,那么就是有消息重复消费的问题

poll到了一批数据,offset = 65510~65532,你很快的处理完了,都写入数据库了,结果还没来得及提交offset就宕机了,上一次提交的offset = 65509,重启,他会再次让你消费offset = 65510~65532,一样的数据再次重复消费了一遍,写入数据库

重启kafka consumer,修改了他的代码

14. 如何实现Consumer Group的状态机流转机制?

刚开始Consumer Group状态是:Empty

接着如果部分consumer发送了JoinGroup请求,会进入:PreparingRebalance的状态,等待一段时间其他成员加入,这个时间现在默认就是max.poll.interval.ms来指定的,所以这个时间间隔一般可以稍微大一点

接着如果所有成员都加入组了,就会进入AwaitingSync状态,这个时候就不能允许任何一个consumer提交offset了,因为马上要rebalance了,进行重新分配了,这个时候就会选择一个leader consumer,由他来制定分区方案

然后leader consumer制定好了分区方案,SyncGroup请求发送给coordinator,他再下发方案给所有的consumer成员,此时进入stable状态,都可以正常基于poll来消费了

所以如果说在stable状态下,有consumer进入组或者离开崩溃了,那么都会重新进入PreparingRebalance状态,重新看看当前组里有谁,如果剩下的组员都在,那么就进入AwaitingSync状态

leader consumer重新制定方案,然后再下发

15. 最新设计的rebalance分代机制可以有什么作用?

大家设想一个场景,在rebalance的时候,可能你本来消费了partition3的数据,结果有些数据消费了还没提交offset,结果此时rebalance,把partition3分配给了另外一个cnosumer了,此时你如果提交partition3的数据的offset,能行吗?

必然不行,所以每次rebalance会触发一次consumer group generation,分代,每次分代会加1,然后你提交上一个分代的offset是不行的,那个partiton可能已经不属于你了,大家全部按照新的partiton分配方案重新消费数据

consumer group generation = 1
consumer group generation = 2

16. Consumer端的自定义反序列化器是什么?

17. 自行指定每个Consumer要消费哪些分区有用吗?

List partitions = consumer.partitionsFor(“order-topic”);

new TopicPartition(partitionInfo.topic(), partitionInfo.partition());

consumer.assign(partitions); //指定每个consumer要消费哪些分区,你就不是依靠consumer的自动的分区分配方案来做了

18. 老版本的high-level consumer的实现原理是什么?

producer和consumer api原理,都是新版本的kafka api

老版本的kafka consumer api分成两种,high-level和low-level,都是基于zk实现的,只不过前者有consumer group的概念,后者没有

high-level的api,比如说consumer启动就是在zk里写一个临时节点,但是如果自己宕机了,那么zk临时节点就没了,别人就会发现,然后就会开启rebalance

然后在消费的时候,可以指定多个线程取消费一个topic,比如说你和这个consumer分配到了5个分区,那么你可以指定最多5个线程,每个线程消费一个分区的数据,但是新版本的就一个线程负责消费所有分区

在提交offset,就是向zk写入对某个分区现在消费到了哪个offset了,默认60秒才提交一次

新版本的api就不基于zk来实现了呢,zk主要是做轻量级的分布式协调,元数据存储,并不适合高并发大量连接的场景,cnosumer可能有成百上千个,成千上万个,zk来做的,连接的压力,高并发的读写

broker内部基于zk来进行协调

19. 老版本的low-level consumer的实现原理是什么?

老版本的low-level消费者,是可以自己控制offset的,实现很底层的一些控制,但是需要自己去提交offset,还要自己找到某个分区对应的leader broker,跟他进行连接获取消息,如果leader变化了,也得自己处理,非常的麻烦

比如说storm-kafka这个插件,在storm消费kafka数据的时候,就是使用的low-level api,自己获取offset,提交写入zk中自己指定的znode中,但是在未来基本上老版本的会越来越少使用

三、Kafka的时间轮延时调度机制与架构原理总结

1. Producer的缓冲区内部数据结构是什么样子的?

producer会创建一个accumulator缓冲区,他里面是一个HashMap数据结构,每个分区都会对应一个batch队列,因为你打包成出来的batch,那必须是这个batch都是发往同一个分区的,这样才能发送一个batch到这个分区的leader broker

{
“order-topic-0” -> [batch1, batch2],
“order-topic-1” -> [batch3]
}

batch.size

每个batch包含三个东西,一个是compressor,这是负责追加写入batch的组件;第二个是batch缓冲区,就是写入数据的地方;第三个是thunks,就是每个消息都有一个回调Callback匿名内部类的对象,对应batch里每个消息的回调函数

每次写入一条数据都对应一个Callback回调函数的

2. 消息缓冲区满的时候是阻塞住还是抛出异常?

max.block.ms,其实就是说如果写缓冲区满了,此时是阻塞住一段时间,然后什么时候抛异常,默认是60000,也就是60秒

3. 负责IO请求的Sender线程是如何基于缓冲区发送数据的?

Sender线程会不停的轮询缓冲区内的HashMap,看batch是否满了,或者是看linger.ms时间是不是到了,然后就得发送数据去,发送的时候会根据各个batch的目标leader broker来进行分组

因为可能不同的batch是对应不同的分区,但是不同的分区的Leader是在一个broker上的,<Node, List>,接着会进一步封装为<Node, Request>,每个broker一次就是一个请求,但是这里可能包含很多个batch,接着就是将分组好的batch发送给leader broker,并且处理response,来反过来调用每个batch的callback函数

发送出去的Request会被放入InFlighRequests里面去保存,Map<NodeId, Deque>,这里就代表了发送出去的请求,但是还没接收到响应的

4. 同时可以接受有几个发送到Broker的请求没收到响应?

Map<NodeId, Deque> => 给这个broker发送了哪些请求过去了?

max.in.flight.requests.per.connection:5

这个参数默认值是5,默认情况下,每个Broker最多只能有5个请求是发送出去但是还没接收到响应的,所以这种情况下是有可能导致顺序错乱的,大家一定要搞清楚这一点,先发送的请求可能后续要重发

5. Kafka自定义的基于TCP的二进制协议深入探秘一番(一)

kafka自定义了一组二进制的协议,现在一共是包含了43种协议类型,每种协议都有对应的请求和响应,Request和Response,其实说白了,如果大家现在看咱们的那个自研分布式存储系统的课,里面用到了gRPC

你大概可以认为就是定义了43种接口,每个接口就是一种协议,然后每个接口都有自己对应的Request和Response,就这个意思

每个协议的Request都有相同的请求头(RequestHeader),也有不同的请求体(RequestBody),请求头包含了:api_key、api_version、correlation_id、client_id,这里的api_key就类似于“PRODUCE”、“FETCH”,你可以认为是接口的名字吧

“PRODUCE”就是发送消息的接口,“FETCH”就是拉取消息的接口,就这个意思

api_version,就是这个API的版本号

correlation_id,就是类似客户端生成的一次请求的唯一标志位,唯一标识一次请求

client_id,就是客户端的id

每个协议的Response也有相同的响应头,就是一个correlation_id,就是对某个请求的响应

6. Kafka自定义的基于TCP的二进制协议深入探秘一番(二)

比如说发送消息,就是ProduceRequest和ProduceResponse,代表“PRODUCE”这个接口的请求和响应,api_key=0,其实就是“PRODUCE”接口的代表

他的RequestBody,包含了:transactional_id,acks,timeout,topic_data(topic,data(partition,record_set)),acks就是客户端自己指定的acks参数,这个会指示leader和follower副本的写入方式,timeout就是超时时间,默认就是30秒,request.timeout.ms

然后就是要写入哪个topic,哪个分区,以及对应数据集合,里面是多个batch

ProduceResponse,ResponseBody,包含了responses(topic,partition_responses(partition,error_code,base_offset,log_append_time,log_start_offset)),throttle_time_ms,简单来说就是当前响应是对哪个topic写入的响应

包含了每个topic的各个分区的响应,每个partition的写入响应,包括error_code错误码,base_offset是消息集合的起始offset,log_append_time是写入broker端的时间,log_start_offset是分区的起始offset

其实各种接口大体上来说就是如此,所以现在大家就知道了,协议就是一种规定,你发送过来的请求是什么格式的,他可能有请求头还有请求体,分别包含哪些字段,按什么格式放数据,响应也是一样的

然后大家就可以按一样的协议来发送请求和接收响应

7. 盘点一下在Broker内部有哪些不同场景下会有延时任务?

比如说acks=-1,那么必须等待leader和follower都写完才能返回响应,而且有一个超时时间,默认是30秒,也就是request.timeout.ms,那么在写入一条数据到leader磁盘之后,就必须有一个延时任务,到期时间是30秒

延时任务会被放到DelayedOperationPurgatory,延时操作管理器中

这个延时任务如果因为所有follower都写入副本到本地磁盘了,那么就会被自动触发苏醒,那么就可以返回响应结果给客户端了,否则的话,这个延时任务自己指定了最多是30秒到期,如果到了超时时间都没等到,那么就直接超时返回异常了

还有一种是延时拉取任务,也就是说follower往leader拉取消息的时候,如果发现是空的,那么此时会创建一个延时拉取任务,然后延时时间到了之后,就会再次读取一次消息,如果过程中leader写入了消息那么也会自动执行这个拉取任务

8. Kafka的时间轮延时调度机制(一):O(1)时间复杂度

Kafka内部有很多延时任务,没有基于JDK Timer来实现,那个插入和删除任务的时间复杂度是O(nlogn),而是基于了自己写的时间轮来实现的,时间复杂度是O(1),其实Netty、ZooKeeper、Quartz很多中间件都会实现时间轮

延时任务是很多很多的,大量的发送消息以及拉取消息,都会涉及到延时任务,任务数量很多,如果基于传统的JDK Timer把大量的延时任务频繁的插入和删除,时间复杂度是O(nlogn)性能比较低的

时间轮的机制,延时任务插入和删除,O(1)

简单来说,一个时间轮(TimerWheel)就是一个数组实现的存放定时任务的环形队列,数组每个元素都是一个定时任务列表(TimerTaskList),这个TimerTaskList是一个环形双向链表,链表里的每个元素都是定时任务(TimerTask)

时间轮是有很多个时间格的,一个时间格就是时间轮的时间跨度tickMs,wheelSize就是时间格的数量,时间轮的总时间跨度就是tickMs * wheelSize(interval),然后还有一个表盘指针(currentTime),就是时间轮当前所处的时间

currentTime指向的时间格就是到期,需要执行里面的定时任务

比如说tickMs = 1ms,wheelSize = 20,那么时间轮跨度(inetrval)就是20ms,刚开始currentTime = 0,这个时候如果有一个延时2ms之后执行的任务插入进来,就会基于数组的index直接定位到时间轮底层数组的第三个元素

因为tickMs = 1ms,所以第一个元素代表的是0ms,第二个元素代表的是1ms的地方,第三个元素代表的就是2ms的地方,直接基于数组来定位就是O(1)是吧,然后到数组之后把这个任务插入其中的双向链表,这个时间复杂度也是O(1)

所以这个插入定时任务的时间复杂度就是O(1)

然后currentTime会随着时间不断的推移,1ms之后会指向第二个时间格,2ms之后会指向第三个时间格,这个时候就会执行第三个时间格里刚才插入进来要在2ms之后执行的那个任务了

这个时候如果插入进来一个8ms之后要执行的任务,那么就会放到第11个时间格上去,相比于currentTime刚好是8ms之后,对吧,就是个意思,然后如果是插入一个19ms之后执行的呢?那就会放在第二个时间格

每个插入进来的任务,他都会依据当前的currentTime来放,最后正好要让currentTime转动这么多时间之后,正好可以执行那个时间格里的任务

9. Kafka的时间轮延时调度机制(二):多层级时间轮

接着上一讲的内容,那如果这个时候来一个350毫秒之后执行的定时任务呢?已经超出当前这个时间轮的范围了,那么就放到上层时间轮,上层时间轮的tickMs就是下层时间轮的interval,也就是20ms

wheelSize是固定的,都是20,那么上层时间轮的inetrval周期就是400ms,如果再上一层的时间轮他的tickMs是400ms,那么interval周期就是8000ms,也就是8s,再上一层时间轮的tickMs是8s,interval就是160s,也就是好几分钟了,以此类推即可

反正有很多层级的时间轮,一个时间轮不够,就往上开辟一个新的时间轮出来,每个时间轮的tickMs是下级时间轮的interval,而且currentTime就跟时钟的指针一样是不停的转动的,你只要根据定时周期把他放入对应的轮子即可

每个轮子插入的时候根据currentTime,放到对应时间之后的时间格即可

比如定时350ms后执行的任务,就可以放到interval位400ms的时间轮内,currentTime自然会转动到那个时间格来执行他

10. Kafka的时间轮延时调度机制(三):时间轮层级的下滑

接着上一讲的内容,那如果这个时候来一个350毫秒之后执行的定时任务呢?已经超出当前这个时间轮的范围了,那么就放到上层时间轮,上层时间轮的tickMs就是下层时间轮的interval,也就是20ms

wheelSize是固定的,都是20,那么上层时间轮的inetrval周期就是400ms,如果再上一层的时间轮他的tickMs是400ms,那么interval周期就是8000ms,也就是8s,再上一层时间轮的tickMs是8s,interval就是160s,也就是好几分钟了,以此类推即可

反正有很多层级的时间轮,一个时间轮不够,就往上开辟一个新的时间轮出来,每个时间轮的tickMs是下级时间轮的interval,而且currentTime就跟时钟的指针一样是不停的转动的,你只要根据定时周期把他放入对应的轮子即可

每个轮子插入的时候根据currentTime,放到对应时间之后的时间格即可

比如定时350ms后执行的任务,就可以放到interval位400ms的时间轮内,currentTime自然会转动到那个时间格来执行他

11. Kafka的时间轮延时调度机制(四):基于DelayQueue推动

基于数组和双向链表来O(1)时间度可以插入任务

但是推进时间轮怎么做呢?搞一个线程不停的空循环判断是否进入下一个时间格吗?那样很浪费CPU资源,所以采取的是DelayQueue

每个时间轮里的TimerTaskList作为这个时间格的任务列表,都会插入DelayQueue中,设置一个延时出队时间,DelayQueue会自动把过期时间最短的排在队头,然后专门有一个线程来从DelayQueue里获取到期任务列表

某个时间格对应的TimerTaskList到期之后,就会被线程获取到,这种方式就可以实现时间轮推进的效果,推进时间轮基于DelayQueue,时间复杂度也是O(1),因为只要从队头获取即可
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/969604.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第五章 树与二叉树 六、树的存储结构

一、双亲表示法&#xff08;顺序存储&#xff09; 结点的指针指向父节点的位置 #define MAX_Tree_Size 100 //双亲表示法 typedef struct {int data;//数据域int parent;//双亲位置域 }PTNode;typedef struct { //树的定义PTNode nodes[MAX_Tree_Size]; //双亲表示int …

【Java实战项目】【超详细过程】—大饼的图片服务器2

代码操作数据库 pom.xml1.juit2.gson3.mysql依赖4.servlet DBUtil类Image类ImageDao类 本文主要讲述了用代码对数据库的操作&#xff0c;maven项目创建后进行如下操作&#xff1a; maven项目创建可以看这里&#xff1a;Servlet-day01 这里面有详细介绍 pom.xml pom.xml中放置了…

Vulnhub: Inferno: 1.1靶机

kali&#xff1a;192.168.111.111 靶机&#xff1a;192.168.111.233 信息收集 端口扫描 nmap -A -sC -v -sV -T5 -p- --scripthttp-enum 192.168.111.23380端口目录爆破 gobuster dir -u http://192.168.111.233 -w /usr/share/wordlists/dirbuster/directory-list-2.3-me…

使用Docker配置深度学习的运行环境

文章目录 推荐实验环境前言docker安装docker操作docker配置常见方法&#xff08;安装包、联网、程序管理器&#xff09;安装驱动的前提要求传统方法安装驱动程序程序管理器安装联网安装deb包安装 安装完成后的设置非传统方法安装-通过容器安装驱动的前提要求安装NVIDIA-Contain…

UG\NX CAM二次开发 获取工序类型 UF_OPER_ask_oper_type

文章作者:代工 来源网站:NX CAM二次开发专栏 简介: UG\NX CAM二次开发 获取工序类型 UF_OPER_ask_oper_type 效果: 代码: void MyClass::do_it() { int count=0;tag_t * objects;UF_UI_ONT_ask_selected_nodes(&count, &objects);for (int i=0; i<co…

Kotlin语言基础(一)Kotlin语言特性

Kotlin语言概述&#xff08;一&#xff09;- 语言特性 Kotlin是一种高级强类型语言&#xff0c;可以在Java VM上运行。 它具有如下特性&#xff1a; 1.代码简洁&#xff0c;代码量少&#xff0c;易学易懂。 例如&#xff1a; /*** 定义数据类Student* property id String 学…

14. Loadrunner 详细介绍以及使用

目录 1. Loadrunner 环境搭建 2. Loadrunner 介绍 3. Loadrunner 三大组件之间的关系 4. Loadrunner 脚本录制 4.1 WebTours 项目 4.1.1 如何启动项目 4.1.2 如何访问项目 4.1.3 相关配置 4.2 脚本录制 4.3 脚本加强 4.3.1 插入事务 4.3.2 插入集合点 4.3.3 插入检…

Marin说PCB之TDK和Murata电容哪家强?

不是各位朋友是否听说华为新款手机MATE60Pro已经开始发布销售了&#xff0c;小编我听到这个消息后很是震惊啊&#xff0c;这两年老美一直打压我们中国芯片行业的发展&#xff0c;而且拿华为开刀&#xff0c;搞了一些很恶心的手段来限制和打压华为的发展。所以当我听到新款的MAT…

JavaScript基础代码

以下是一些 JavaScript 基础代码示例&#xff1a; 变量定义 // 定义字符串变量 var name "John";// 定义数字变量 var age 30;// 定义布尔变量 var isMarried false;控制语句 // if 语句 if (age > 18) {console.log("成年人"); } else {console.…

点云处理库Open3D基础操作1

文章目录 基础介绍Open3D读取文件Voxel降采样裁剪点云Open3D点云和numpy数组的转换点云的bounding box 欢迎访问个人网络日志&#x1f339;&#x1f339;知行空间&#x1f339;&#x1f339; 基础介绍 Open3D是英特尔公司于2015年发布的开源3D视觉算法库&#xff0c;截至2023年…

QT(9.3)定时器,绘制事件

作业&#xff1a; 自定义一个闹钟 pro文件&#xff1a; QT core gui texttospeechgreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# The following define makes your compiler emit warnings if you use # any Qt feature that has been marked deprecat…

C#,《小白学程序》第八课:列表(List)其二,编制《高铁列车时刻表》与时间DateTime

1 文本格式 /// <summary> /// 车站信息类 class /// </summary> public class Station { /// <summary> /// 编号 /// </summary> public int Id { get; set; } 0; /// <summary> /// 车站名 /// </summary&g…

2023年项目进度管理平台排行榜

项目进度管理是项目管理学科中的一门重要课程&#xff0c;通过合理的项目计划&#xff0c;有效控制项目进度&#xff0c;保障项目能够按时交付。 不过&#xff0c;项目进度管理并不是一件简单的工作&#xff0c;不仅需要面对项目过程中各种突发情况&#xff0c;还需要做好团队协…

函数指针与std::function<>对回调函数的实现与性能分析

函数指针与std::function<>都可以作为函数形参&#xff0c;通过接受另一个函数的地址来实现回调函数的功能&#xff0c;但是它们之间的性能差异非常明显。下面分别介绍对回调函数的实现与各自的性能分析。 函数指针与std::function<>对回调函数的实现 先在类中分…

植物大战僵尸植物僵尸表(一)

前言 此文章为“植物大战僵尸”专栏中的第005刊&#xff08;2023年9月第一刊&#xff09;。 警告&#xff1a; 1.仅适用于无名版&#xff1b; 2.可能存在遗漏&#xff0c;如发现请联系作者。 植物表 植物名称植物图片豌豆射手寒冰射手双发豌豆射手机枪射手裂荚射手三线射手…

IDEA提示忽略大小写

IDEA提示忽略大小写 IDEA代码提示默认是区分大小写的&#xff0c;设置为提示忽略大小写&#xff0c;编译我们后期的开发

【Unity编辑器扩展】 | 编辑器扩展 特性(Attribute) 整理总结 | 建议收藏

前言【Unity编辑器扩展】 | 编辑器扩展 特性(Attribute) 整理总结 | 建议收藏Unity中Attribute的作用一、System命名空间SerializableNonSerialized二、UnityEngine命名空间AddComponentMenuAssemblyIsEditorAssemblyColorUsageContextMenuContextMenuItemCreateAssetMenu

单片机第二季:温度传感器DS18B20

目录 1&#xff0c;DS18B20介绍 2&#xff0c;DS18B20数据手册 2.1&#xff0c;初始化时序 2.2&#xff0c;读写时序 3&#xff0c;DS18B20工作流程 4&#xff0c;代码 1&#xff0c;DS18B20介绍 DS18B20的基本特征&#xff1a; (1)内置集成ADC&#xff0c;外部数字接…

nginx-error错误页面

客户访问发生错误时&#xff0c;nginx返回给客户的错误页面。 例&#xff1a;客户访问的资源不存在时&#xff0c;返回302并跳转到一个网站。 2.可以返回200&#xff0c;给客户返回一个401.html的页面&#xff0c;这个页面可以做的友好一点&#xff0c;返回给客户。

【Java实战项目】【超详细过程】—大饼的图片服务器01

目录 一、该图片服务器的功能二、设计数据库设计前后端交互设计新增图片查看所有图片属性查看指定图片属性删除指定图片查看部分图片内容 一、该图片服务器的功能 1.可以选择文件上传或删除 2.拥有防盗功能 3.展示图片 二、设计 数据库设计 图片表中需要储存的数据有&#…