目录
高级功能
高效读写
涉及技术
ZooKeeper
自定义拦截器
监控
延迟消费
一些改进手段
高级功能
高效读写
涉及技术
- 高吞吐量:Kafka 每秒可以处理数百万消息。这是因为 Kafka 消息的处理是以批处理(Batching)的方式来完成的,生产者可以将多个消息一起发送到 Kafka 集群,以减少网络开销以及加速处理速度。
- 低延迟:Kafka 利用磁盘存储加缓存,可以在微秒级别内完成消息处理。Kafka 具有高效的消息传递能力,也可以在微秒级别内完成消息处理。这是由于 Kafka 的消息存储设计是基于磁盘的,但同时消息缓存也是放在内存里的。也就是说,在处理消息时,Kafka 集群会先将消息写入到磁盘中进行持久化存储,并且在内存中缓存一份消息以便进行更快的消息传递和读取。
- 分布式架构:Kafka 采用分布式的架构设计,可以通过水平扩展增加集群规模和负载容量。集群中的每个节点都可以独立完成消息处理和写入,可以有效地提高整个系统的吞吐量。
- 高可靠性:Kafka 在存储消息时,使用了多副本机制,可以保证消息的高可靠性。当消息发送失败或者其中一个节点失效时,可以通过复制副本来实现自动故障转移,以确保消息的可靠性、可用性与一致性。
- 顺序写:Kafka 内部的消息存储结构是一个连续的、顺序写入的日志文件(Log File)集合,也称“分区”(Partition)。分区中的每一条消息都被分配一个唯一的偏移量(Offset),并且保留在磁盘上直到被消费。通过这种消息存储方式,Kafka 可以实现高效的顺序写入操作。因为 Kafka 可以将流式的消息按顺序追加到 Log 文件的末尾,这避免了随机写入所产生的磁盘寻址和寻道时间,从而大大提高了写入性能,并降低了延迟。此外,由于只有新的消息会追加到 Log 文件中,而没有数据被修改或删除,因此,读取数据时,Kafka 也可以通过顺序扫描磁盘获取最新的消息,这样也大大提高了读取数据的效率。
- 数据压缩:Kafka 提供了数据压缩功能,可以将传输的消息进行压缩和解压,减少了磁盘和网络带宽的使用。
- 零拷贝:Kafka 零拷贝技术可以避免在传输数据时进行数据缓冲和复制,从而减少了 CPU 和内存的使用,提高了性能。
ZooKeeper
Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线、所有topic的分区副本分配和leader的选举等工作。Controller的工作管理是依赖于zookeeper的。
Partition的Leader的选举过程
自定义拦截器
拦截器原理
Producer拦截器interceptor是在Kafka0.10版本引入的,主要用于Clients端的定制化控制逻辑。对于Producer而言,interceptor使得用户在消息发送之前以及Producer回调逻辑之前有机会对消息做一些定制化需求,比如修改消息的展示样式等,同时Producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链interceptor chain,Interceptor实现的接口为ProducerInterceptor,主要有四个方法:
- configure(Map<String, ?> configs):获取配置信息和初始化数据时调用
- onSend(ProducerRecord record):该方法封装在KafkaProducer.send()方法中,运行在用户主线程中,Producer确保在消息被序列化之前及计算分区前调用该方法,并且通常都是在Producer回调逻辑出发之前。
- onAcknowledgement(RecordMetadata metadata, Exception exception):onAcknowledgement运行在Producer的IO线程中,因此不要再该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
- close():关闭inteceptor,主要用于执行资源清理工作。
Inteceptor可能被运行到多个线程中,在具体使用时需要自行确保线程安全,另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并紧紧是捕获每个interceptor可能抛出的异常记录到错误日志中而非向上传递。
自定义加入时间戳拦截器
/**
* @author caoduanxi
* @Date 2021/1/13 14:15
* @Motto Keep thinking, keep coding!
*/
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
"TimeInterceptor:" + System.currentTimeMillis() + "," + record.value());
}
// 其余方法省略
}
自定义消息发送统计拦截器
/**
* @author caoduanxi
* @Date 2021/1/13 14:18
* @Motto Keep thinking, keep coding!
*/
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 输出结果,结束输出
System.out.println("Sent successful:" + successCounter);
System.out.println("Sent failed:" + errorCounter);
}
}
在CustomProducer中加入拦截器
// 加入拦截器
List<Object> interceptors = new ArrayList<>();
interceptors.add(TimeInterceptor.class);
interceptors.add(CounterInterceptor.class);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
注意:拦截器的close()方法是收尾的,一定要调用Producer.close()方法,否则拦截器的close()方法不会被调用。
监控
Eagle
Eagle是开源的额可视化和管理软件,允许查询、可视化、提醒和探索存储在任何地方的指标,简而言之,Eagle为您提供了将Kafka集群数据转换为漂亮的图形和可视化的工具。
实质: 一个运行在tomcat上的web应用。
延迟消费
kafka目前默认可支持1h以内的延迟消费。
使用方式:consumer启动参数增加 --delay-time-seconds n 设置消费延迟时间,单位秒,默认不延迟消费。仅能拉取到消费延迟时间之前的消息。
注意:此参数默认限制最大值为3600s,超过限制可能导致consumer启动失败。如有调整最大延迟时间的需求,请联系李锦涛(KIM:lijintao)
注意:消息拉取可能有分钟级别的误差。
注意:由于目前每4kb数据构建一次时间索引,如果最后一批数据的size不够4kb,可能导致这些数据不能被延迟消费到。
一些改进手段
- Rebalance优化
- Federation架构应用
- 存算分离等等
相关推荐文章:
Kafka学习笔记(基础篇)_Cat凯94的博客-CSDN博客
看完这篇Kafka,你也许就会了Kafka_心的步伐的博客-CSDN博客