大数据面试重点之kafka(七)
Kafka的分区器、拦截器、序列化器?
问过的一些公司:ebay 参考答案:
Kafka中,先执行拦截器对消息进行相应的定制化操作,然后执行序列化器将消息序列化,最后执行分 区器选择对应分区
拦截器 -> 序列化器 -> 分区器
1、拦截器
Kafka有两种拦截器:生产者拦截器和消费者拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规定过滤不符合要求的消息、修 改消息内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
生产者拦截器的实现,主要是自定义实现 org.apache.kafka.clients.producer. ProducerInterceptor 接口。
ProducerInterceptor接口包含3个方法:
-
ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
-
void onAcknowledgement(RecordMetadata var1, Exception var2);
-
void close(); 4
KafkaProducer 在将消息序列化和计算分区之前会调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和 pritition 等信息,如果修改需要保证对其有准确判断,否则会出现与预想不一致的偏差。比如修改 key 不仅会影响分区的计算还会影响 broker 端日志压缩(Log Compaction)功能。
KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer的I/O线程中,所以这个方法的实现逻辑约简单越好,否则会影响消息的发送。
close()方法主要用于在关闭拦截器时执行一些资源的清理工作。在这3个方法中抛出的异常都会被捕获并 记录到日志中,但并不会再向上传递。
ProducerInterceptor接口与Protitioner 接口一样都有一个父接口Configurable。
Kafka中不仅可以指定一个拦截器还可以指定多个拦截器形成一个拦截器链。拦截器链会根据配置时配 置的拦截器顺序来执行(配置的时候,各个拦截器之间使用逗号隔开)。
如果拦截器链中的某个拦截器的执行需要依赖上一个拦截器的输出,那么就有可能产生“副作用”。如果 第一个拦截器因为异常执行失败,那么第二个也就不能继续执行。在拦截链中,如果某个拦截器执行失 败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。
2、序列化
生产者需要用序列化器(Serializer)将key和value序列化成字节数组才可以将消息传入Kafka。消费者需 要用反序列化器(Deserializer)把从Kafka中收到的字节数组转化成相应的对象。在代码清单3-1中,key 和value都使用了字符字符串,对应程序中的序列化器也使用了客户端自带的 StringSerializer,除了字符串类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这几种类型,它们都实现了 org.apache.kafka.common.serialization.Serializer 接口,此接口有3个方法: -
// 用来配置当前类
-
public void configure(Map<String, ?> configs, boolean isKey)
-
// 用来执行序列化操作
-
public byte[] serializer(String topic, T data)
-
// 用来关闭当前的序列化器
-
public void close() 7
一般情况下 close() 是个空方法,如果实现了此方法,则必须确保此方法的幂等性(一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源 本身所产生的影响均与一次执行的影响相同。),因为这个方法可能会被 KafkaProducer调用多次
(Serializer和KafkaProducer 所实现的接口都继承了 Closeable 接口)。
生产者使用的序列化和消费者使用的序列化是一一对应的,如果生产者使用了 StringSerializer,而消费者使用了另一种序列化器,那么是无法解析出相要数据的。
如果Kafka客户端提供的几种序列化器都无法满足应用需求,可以选择如 Avro、JSON等通用的序列化工具实现,或者使用自定义类型的序列化器来实现。
3、分区器
消息在通过 send() 发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器
(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必须的,而序列化器是必须的。消息经过序列化之后就需要确定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分区器的作用,因为 partition 代表的就是要发往的分区号。
如果 ProducerRecode 中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算
partition 的值。分区器的作用就是为消息分配分区。
Kafka 中提供的默认分区器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner 接口,这个接口中定义了2个方法,具体如下所示。 -
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
-
public void close(); 3
partition() 方法用来计算分区号,返回值为ini类型。方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值以及集群的元数据信息,通过这些信息可以实现分区器。close()方法在关闭分区器的 时候回收一些资源。
Partitioner接口还有一个父接口 org.apache.kafka.common.Configurable,该接口只有一个方法
1 void configure(Map<String, ?> var1); 2
Configurable 接口中的 configure() 方法主要是用来获取配置信息及初始化数据。
在默认分区器 DefaultPartitioner 的实现中,close()是空方法,而 partition() 方法中定义了主要的分区分配逻辑。如果 key 不为 null,那么默认分区器会对 key 进行哈希(采用 MurmurHash2 算法,具备高运算性能及低碰撞率)根据最终得到的哈希值,与分区的数量取模运算得到分区编号来匹配分区,相同key 得到的哈希值是一样的,所以当key一致,分区数量不变的情况下,会将消息写入同一个分区(注意: 在不改变主题分区数量的情况下,key 与分区之间的映射可以保持不变。不过,一旦主题增加了分区, 那么就难以保证key与分区的映射关系)。如果,key 是 null,那么消息会以轮询的方式写入分区。(注意:如果 key 不为null,那么计算得到的分区号会是所有分区中的一个。如果 key 为 null 并且有可用的分区的时候,那么计算得到的分区号仅为可用分区中的任意一个。)
除了使用 Kafka 提供的默认分区器进行分配,还可以使用自定义的分区器,只需要和 DefaultPartitioner 一样 实现 Partitioner 接口即可。默认的分区器在 key 为 null 不会选择不可用的分区,我们通过自定义分区器来实现。
Kafka连接Spark Streaming的几种方式
问过的一些公司:作业帮参考答案:
Spark Streaming获取kafka数据的两种方式:Receiver与Direct的方式,可以从代码中简单理解成
Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据。
1、基于Receiver的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据 零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失
败,也可以使用预写日志中的数据进行恢复。
注意:
Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程 的数量。不会增加Spark处理数据的并行度。
可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预 写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。
2、基于Direct的方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
这种方式有如下优点:
-
简化并行读取
如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。 -
高性能
如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数 据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份 到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制, 那么就可以通过Kafka的副本进行恢复。 -
一次且仅一次的事务机制
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数 据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理 一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。 -
降低资源
Direct不需要Receivers,其申请的Executors全部参与到计算任务中;而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。 -
降低内存
Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据,对于小业务量的场景还好, 如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。实际应用中我们可以把原先的10G降至现在的2-4G左右。 -
鲁棒性更好
Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实 时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。Direct 则没有这种顾虑,其Driver在触发batch 计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
Kafka的生成者客户端有几个线程?
问过的一些公司:昆仑万维参考答案:
2个,主线程和Sender线程。
主线程负责创建消息,然后通过分区器、序列化器、拦截器作用之后缓存到累加器RecordAccumulator 中。
Sender线程负责将RecordAccumulator中消息发送到kafka中。
Kafka怎么防止脑裂
问过的一些公司:网易参考答案:
1、先说下什么是kafka controller
控制器(controller)其实就是一个broker ,只不过它除了具有一般broker的功能之外,还负责分区首领的选举,相当于整个kafka集群的master,负责topic的创建、删除、以及partition的状态机转换,broker 的上线、下线等。集群里第一个启动的broker通过在Zookeeper里创建一个临时节点/controller 让自己成为控制器。其它broker在启动时也会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然 后“意识”到控制器节点已存在,也就是说集群里已经有一个控制器了。其他broker在控制器节点上创建Zookeeper watch对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群次只有一个控制器存在。
2、什么是脑裂
kafka中只有一个控制器controller 负责分区的leader选举,同步broker的新增或删除消息,但有时由于网络问题,可能同时有两个broker认为自己是controller,这时候其他的broker就会发生脑裂,不知道该听 从谁的。
3、如何解决脑裂
通过controller epoch来解决。
每当新的controller产生时就会通过Zookeeper生成一个全新的、数值更大的controller epoch标识。其他broker在知道当前controller epoch后,如果收到由控制器发出的包含较旧epoch的消息,就会忽略它们。
Kafka高可用体现在哪里
问过的一些公司: 参考答案:
高可性(High Availability),指系统无间断地执其功能。
Kafka从0.8版本开始提供高可机制,可保障单个或多个Broker宕机后,其他Broker及所有Partition都能继 续提供服务,避免存储的消息丢失。
对分布式系来说,当集群规模上升到一定程度后,一台或者多台机宕机的可能性大增加;Kafka采多机 备份和消息应答确认方式解决了数据丢失问题,并通过一套失败恢复机制解决服务可问题。
Zookeeper在Kafka的作用
可回答:Kafka在什么地方需要用到Zookeeper 问过的一些公司:字节x2,阿里,小米,有赞参考答案:
简洁版:
Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分 配和leader选举等工作。
Controller的管理工作都是依赖于Zookeeper的。也就是说ZK是辅助Controller的管理工作的。 以下为partition的leader选举过程:
详细版:
1、Broker注册
Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理 起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节 点:
1 /brokers/ids 2
每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点, 如/brokers/ids/[0…N]。
Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建 的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。
2、Topic注册
在 Kafka 中,所有 Topic 与 Broker 的对应关系都由 ZooKeeper 来维护,在 ZooKeeper 中,通过建立专属的节点来存储这些信息,其路径为 :/brokers/topics/{topic_name}
在 Kafka 中,所有 Topic 与 Broker 的对应关系都由 ZooKeeper 来维护,在 ZooKeeper 中,通过建立专属的节点来存储这些信息,其路径为 :/brokers/topics/{topic_name}
为了保障数据的一致性,ZooKeeper 机制得以引入。基于 ZooKeeper,Kafka 为每一个 Partition 找一个节点作为 Leader,其余备份作为 Follower;接续上图的例子,就 TopicA 的 Partition1 而言,如果位于Broker2(Kafka 节点)上的 Partition1 为 Leader,那么位于 Broker1 和 Broker4 上面的 Partition1 就充当Follower,则有下图:
基于上图的架构,当 Producer Push 的消息写入 Partition(分区)时,作为 Leader 的 Broker(Kafka 节点)会将消息写入自己的分区,同时还会将此消息复制到各个 Follower,实现同步。如果某个 Follower 挂掉,Leader 会再找一个替代并同步消息;如果 Leader 挂了,Follower 们会选举出一个新的 Leader 替代,继续业务,这些都是由 ZooKeeper 完成的。
3、生产者负载均衡
由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些 分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。
1)四层负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会 对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同 其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均 衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生 产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时, 生产者也无法实时感知到Broker的新增和删除。
2))使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该 节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。
4、消费者负载均衡
与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器 上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消 费者分组消费自己特定的Topic下面的消息,互不干扰。
5、分区与消费者的关系
消费组 (Consumer Group):
consumer group下有多个Consumer(消费者)。对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group内部的所有消费者共享该ID。订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)。
同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。
在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在Zookeeper上记录消息分区与Consumer之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID写入到Zookeeper对应消息分区的临时节点上,例如:
1 /consumers/[group_id]/owners/[topic]/[broker_id-partition_id] 2
其中,[broker_id-partition_id]就是一个消息分区的标识,节点内容就是该消息分区上消费者的Consumer ID。
6、消费者注册
消费者服务器在初始化启动时加入消费者分组的步骤如下:
注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消 费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。
7、记录Partition与Consumer的关系
Consumer Group 在 ZooKeeper 上的注册节点为 /consumers/[group_id],而 Consumer Group 中的Consumer 在 ZooKeeper 上的注册节点为 /consumers/[group_id] 下的子节点 owners,它们共享一个Group ID。为了 Consumer 负载均衡,同一个 Group 订阅的 Topic 下的任一 Partition 都只能分配给一个Consumer。Partition 与 Consumer 的对应关系也需要在 ZooKeeper 中记录,路径为:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] 2
这个路径也是一个临时节点,进行 Rebalance 时会被删除,而后依据新的对应关系重建。此外, [broker_id-partition_id] 是一个消息分区的标识,其内容就是该消息分区消费者的 Consumer ID,通常采用 hostname:UUID 形式表示。