点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(正在更新…)
章节内容
上节我们完成了如下内容:
- 消费组测试,消费者变动对消费的影响
- 消费者的心跳机制
- 消费者的相关配置参数
主题和分区
- Topic:Kafka用于分类管理消息的逻辑单元,类似于MySQL的数据库
- Partition:是Kafka下数据存储的基本单元,这个是物理上的概念,同一个Topic的数据,会被分散的存储到多个Partition中,这些Partition可以在同一台机器上,也可以在多台机器上。优势在于可以进行水平扩展,通常Partition的数量是BrokerServer数量的整数倍
- ConsumerGroup,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部消息。在消息组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。
- Consumer 采用 PULL 模式从 Broker 中读取数据,采用PULL模式 Consumer可以自行控制消费的速度。
反序列化
- Kafka的Broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交由给用户程序消费。
- 消费者的反序列化器包括Key和Value。
自定义反序列化
如果要实现自定义的反序列化器,需要实现 Deserializer 接口:
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}
@Override
public User deserialize(String topic, byte[] data) {
ByteBuffer buffer = ByteBuffer.allocate(data.length);
buffer.put(data);
buffer.flip();
int userId = buffer.getInt();
int usernameLen = buffer.getInt();
String username = new String(data, 8, usernameLen);
int passwordLen = buffer.getInt();
String password = new String(data, 8 + usernameLen, passwordLen);
int age = buffer.getInt();
User user = new User();
user.setUserId(userId);
user.setUsername(username);
user.setPassword(password);
user.setAge(age);
return user;
}
@Override
public User deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}
@Override
public void close() {
Deserializer.super.close();
}
}
消费者拦截器
消费者在拉取了分区消息之后,要首先经过反序列化器对Key和Value进行反序列化操作。
消费端定义消息拦截器,要实现 ConsumerInterceptor接口:
- 一个可插拔的接口,允许拦截、更改消费者接收到的消息,首要的用例在于将第三方组件引入消费者应用程序,用于定制监控、日志处理等
- 该接口的实现类通过configure方法获取消费者配置的属性,如果消费者配置中没有指定ClientID,还可以获取KafkaConsumer生成的ClientID,获取这个配置跟其他拦截器是共享的,需要保证不会在各个拦截器之间产生冲突。
- ConsumerInterceptor方法抛出异常会被捕获,但不会向下传播,如果配置了错误的参数类型,消费者不会抛出异常而是记录下来。
- ConsumerInterceptor回调发生在KafkaConsumer.poll()方法的同一个线程
public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("=== 消费者拦截器 01 onConsume ===");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println("=== 消费者拦截器 01 onCommit ===");
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
System.out.println("消费者设置的参数");
configs.forEach((k, v) -> {
System.out.println(k + ", " + v);
});
}
}
位移提交
相关概念
- Consumer 需要向Kafka记录自己的位移数据,这个汇报过程称为:提交位移(Committing Offsets)
- Consumer 需要为分配给它的每个分区提交各自的位移数据
- 位移提交的由Consumer端负责的,Kafka只负责保管,存到 __consumer_offsets 中
- 位移提交:自动提交和手动提交
- 位移提交:同步提交和异步提交
自动提交
Kafka Consumer后台提交
- 开启自动提交 enable.auto.commit=true
- 配置启动提交间隔:auto.commit.interval.ms,默认是5秒
位移顺序
自动提交位移的顺序:
- 配置 enable.auto.commit=true
- Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息的
- 因此自动提交不会出现消息丢失,但是会重复消费
重复消费
重复消费的场景:
- Consumer设置5秒提交offset
- 假设提交offset后3秒发生了Rebalance
- Rebalance之后所有的Consumer从上一次提交的Offset的地方继续消费
- 因为Rebalance发生前3秒的内的提交就丢失了
异步提交
- 使用 KafkaConsumer#commitSync,会提交所有poll返回的最新Offset
- 该方法为同步操作 等待直到 offset 被成功提交才返回
- 手动同步提交可以控制offset提交的时机和频率
位移管理
Kafka中,消费者根据消息的位移顺序消费消息,消费者的位移由消费者者管理,Kafka提供了消费者的API,让消费者自行管理位移。
重平衡
重平衡可以说是Kafka中诟病最厉害的一部分。
重平衡是一个协议,它规定了如何让消费者组下的所有消费者来分配Topic中每一个分区。
比如一个Topic中有100个分区,一个消费组内有20个消费者,在协调者的控制下可以让每一个消费者能分配到5个分区,这个分配过程就是重平衡。
重平衡的出发条件主要有三个:
- 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
- 主题的分区数发生变化,Kafka目前只能增加分区数,当增加的时候就会触发重平衡
- 订阅的主题发生变化,当消费组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会重平衡
为什么说重平衡让人诟病呢?因为重平衡过程中,消费者无法从Kafka消费消息,对Kafka的TPS影响极大,而如果Kafka集群内节点较多,比如数百个,重平衡耗时会很久。
避免重平衡
要完全避免重平衡做不到,但是要尽量避免重平衡。
在分布式系统中,由于网络问题没有接收到心跳,此时不确认是挂了还是负载没过来还是网络阻塞。
- session.timeout.ms 规定超时时间是多久
- heartbeat.interval.ms 规定心跳的频率 越高越不容易误判 但是会消耗更多资源
- max.poll.interval.ms 消费者poll数据后,需要处理在进行拉取,如果两次拉取时间超过间隔就会被剔除,默认是5分钟。
这里给出一些推荐参数的配置:
- session.timeout.ms 设置为6秒
- heaertbeat.interval.ms 设置2秒
- max.poll.interval.ms 推荐消费者处理消息最长耗时再加1分钟