与生产者对应的是消费者,应用程序可以通过 KafkaConsumer 来订阅主题,并从订阅主题中拉取消息。
消息者与消费组
消费者(Consumer)负责订阅 Kafka 中的主题(Topic),并且从订阅的主题上拉取消息。
每个消费者都有一个对应消费组。(不指定消费组kafka 会默认加一个随机ID 分配一个消费组,每次启动这个随机 ID 不一样。)
当消息发布到主题后,只会投递给订阅它的每个消费组中的一个消费者。
如图示,topicA 主题共有4个分区。有两个消费者组 A 和 B 都订阅了这个主题,消费组 A 中有4个消费者(C0、C1、C2、C3),消费组 B 中有2个消费者(C4和 C5)。
按照 Kafka 默认规则,分配结果是消费组 A 中的每人上消费者分配到了1个分区,消费组 B 中的每一个消费者分配到了2个分区,两个消费组之间互不影响。每个消费者只能消费所分配到的分区中的消息。
每一个分区只能被一个消费组中的一个消费者所消费。
消费者分区分配逻辑默认策略
如下图示,主题 A有7个分区,消费组内只有一个消费者A。
如果再加入一个消费者 B
再加一个消费者 C,那么依次分配如下:
消费者 A(p0,p1,p2),消费者B(p3,p4),消费者C(p5,p6)。
消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或者减少)消费者的个数来提高(或者降低)整体的消费能力。对于分区固定的情况,一味的增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费区分配不到任何分区。
消费组内有过多的消费者。
以上分配逻辑都是基于默认的分区策略进行的,可以通过消费者客户端参数 partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。
两种消息投递模式
点对点(P2P,Point-to-Point)模式
点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。
发布/订阅(Pub/Sub)模式
发布/订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播采用。
Kafka 同时支持两种消息投递模式。
- 如果所有的消费者都属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消息者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
- 如果所有的消费者都属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。
消费组是一个逻辑上的概念,它将消费者归为一类,每一个消费者只属于一个消费组。每个组都有固定的名字,消费者在消费前都会有一个固定的名称,这个通过消费者客户端参数 group.id 来配置,默认为空字符串。
消费者并非逻辑上的概念,它是实际的应用实例,可以是一个线程也可以是一个进程。同一个消费组内的消费者可以在同一台机器也可以在不同的机器 。
客户端开发
一个正常的消费逻辑需要具备以下几个步骤:
- 配置消费者客户端参数及创建相应的消费者实例。
- 订阅主题
- 拉取消息并消息
- 提交消费位移
- 关闭消费者实例
简单代码示例
public static final String brokerList = "localhost:9092";
public static final String topic = "test";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.id", groupId);
props.put("client.id", "consumer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while( isRunning.get() ) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset() );
System.out.println("Key=" + record.key() + ", value = " + record.value() );
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
基础参数配置
bootstrap.servers
指定连接 Kafka 集群所需的 broker 地址清单,格式:host1:prop1, host2:prop2, 可以设置一个或者多个地址,中间用逗号隔开
group.id
消费者所属的消费组名称,默认值为"", 如果设置为空会报出异常。(结合业务情况设置成有意义的名称)
key.deserializer 和 value.deserializer
这两个参数分别用来指定消息中的 key 和 value 所需反序列化操作的反序列化器,无默认值。
与生产者客户端序列化参数对应。消费者组从broker 端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。
client.id
这个参数用来指定 KafkaConsumer 对应的客户端 id,默认值也为"“。如果客户端不设置,则 KafkaConsumer 会自动生成一个非空字符串,内容形式为"consumer-1”,-2即字符串"consumer-"与数字 的拼接。
订阅主题与分区
一个消费者可以订阅一个或者多个主题。
订阅主题 subscribe
subscribe()订阅主题,可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。
// 集合模式
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
// 正则表达式模式
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
集合模式
如示例:
// 订阅主题1
consumer.subscribe(Arrays.asList(topic1));
// 订阅主题2
consumer.subscribe(Arrays.asList(topic2));
那么最终消费者订阅的是 topic2,不是 topic1。
正则表达式模式
采用正则表达式模式订阅,在之后的过程中,如果又创建了新的主题,且主题名称与正则表达式匹配,那么这个消费者就可以消费到新添加的主题中的消息。
应用场景:如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就比较有效。
在 Kafka 和其它系统之间进行数据复制时,这种模式就很常见。
如示例:
// 正则模式订阅
consumer.subscribe(Pattern.compile("topic-.*"));
assign 订阅特定分区
KafkaConsumer 中提供了一个 assign()方法来实现订阅某些主题特定的分区。
// 订阅主题下特定的分区
consumer.assign(Arrays.asList(new TopicPartition("test", 0)) );
这里只订阅test主题下的0分区。
取消订阅 unsubscribe
示例代码:
// 方法1
consumer.unsubscribe();
// 方法2
consumer.subscribe(new ArrayList<String>());
// 方法3
consumer.assign(new ArrayList<TopicPartition>());
三种方法都可以取消订阅。
订阅状态
三种不同的订阅状态,分别是:
- AUTO_TOPICS: 集合订阅方式 subscribe(Collection)
- AUTO_PATTERN: 正则表达式方式 subscribe(Pattern)
- USER_ASSIGNED:指定分区 assign( Collection )
如果没有订阅状态为 NONE,这三种状态是互斥的,在一个消费者中只能使用其中一种,否则会异常:
java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
通过 subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或者减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。
通过 assign()方法订阅分区时,是不具备消费者自动均衡的功能。
反序列化
KafkaProducer 生产者有对应的序列化器,消费者也有着与之对应的反序列化器。
- ByteBufferDeserializer
- ByteArrayDeserializer
- BytesDeserializer
- DoubleDeserializer
- FloatDeserializer
- IntegerDeserializer
- LongDeserializer
- ShortDeserializer
- StringDeserializer
它们分别用于 ByteBuffer、ByteArray、Bytes、Double、Float、Integer、Long、Short、String 类型的反序列化。
没有特殊需求,一般不建议使用自定义的序列化器和反序列化器,这样会增加消费者和生产者之间的耦合度,通用性不好。
推荐使用:
Avro、JSON、Thift、ProtoBuf
消息消费
消息的消费一般有2种模式:推和拉。推是服务端主动推送给消费者,拉是消费者主动向服务端发起请求来拉取消息。
Kafka 中的消费是基于拉模式的。
poll() 方法
Kafka 中的消息消费是一个不断轮循的过程,消费者重复的调用 poll ()方法,而 poll () 方法返回的是所订阅的主题上的一组消息。
poll()方法没有拉到可供消费的消息,对应的拉取结果就是空。
poll()方法的具体定义如下:
public ConsumerRecords<K, V> poll(long timeoutMs) {
}
public ConsumerRecords<K, V> poll(Duration timeout) {
}
private ConsumerRecords<K, V> poll(Timer timer, boolean includeMetadataInTimeout) {
}
在消费者的缓冲区里面没有可用数据时会发生阻塞,timeout 参数用来控制 poll()方法阻塞时间。
timeout 参数的设置值取决于应用程序对响应速度的要求,比如需要多长时间内将控制权移交给执行轮循的应用线程。
如果设置为0,这样 poll()方法会立刻返回,而不管是否已经拉取到了消息。
消费者消费到的每条消息的类型为 ConsumerRecord和生产者发送的消息类型 ProducerRecord 相对应。
public static final long NO_TIMESTAMP = -1L;
public static final int NULL_SIZE = -1;
public static final int NULL_CHECKSUM = -1;
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final int serializedKeySize;
private final int serializedValueSize;
private final Headers headers;
private final K key;
private final V value;
private final Optional<Integer> leaderEpoch;
System.out.println(record);
// 打印结构
ConsumerRecord(topic = test, partition = 1, leaderEpoch = 0, offset = 2, CreateTime = 1674975229308, serialized key size = 2, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = k5, value = v5)
poll()方法返回的是一个消息合集,类型是 ConsumerRecords,内部包含了 N 个 ConsumerRecord,它提供了一个 iterator()方法来循环遍历消息集内部的消息。
代码示例
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 获取消息集中每一个 ConsumerRecord
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
ConsumerRecords 类
在 consumerRecords 类中还有一些比较有用的方法
- partitions() 方法用来获取消息集中所有分区
- count() 方法用来计算出消息集中的消息个数,返回类型是 int
- isEmpty() 方法用来判断消息集是否为空,返回类型是 boolean
- empty() 方法用来获取一个空的消息集,返回类型是 ConsumerRecord<K, V>
poll小结
简单的理解和使用,可以理解为 poll()就是拉取消息。
但其内部逻辑非常复杂,涉及消费位移、消费者协调器、组协调器、消费者选举、分区分区的分发、再均衡的逻辑、心跳。
offset 位移提交
每条消息都有唯一的offset(偏移量 or 位移),用来表示消息在分区中对应的位置。
在每次调用 poll()方法时,它返回的是还没有被消费过的消息集。
Kafka 中对 offset 做了执久化保存,记录了上一次消费时的消费位移 offset。
消费者的位移存储在 Kafka中目录中__consumer_offsets下。
把消费位移存储起来(持久化)的动作称为"提交",消费者在消费完消息之后需要执行消费位移的提交。
**lastConsumedOffset:**表示当前消费位置
position : 当前消费者需要提交的消费位移并不是 M,还是 M+1,它表示下一条需要拉取的消息的位置。
committed offset: 表示已经提交过的消费位移。
代码示例
校验一下这三个概念,数据如下,共6条数据,
使用 assign 订阅这个固定的1号分区,先向分区1发送这6条消息,再创建一个消费者去消费这些消息,消费完之后同步提交消费位移(commitSync() ),再输出 lastConsumedOffset、committed offset、position 结果值。
TopicPartition tp = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(tp)); // 使用 assign 订阅主题的 1分区
long lastConsumedOffset = -1; // 当前消费位移
try {
while( isRunning.get() ) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if(records.isEmpty()) {
break;
}
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitAsync(); // 同步提交消费位移
}
System.out.println("消费位移 lastConsumedOffset : " + lastConsumedOffset);
OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
System.out.println("提交位移 committed offset : " + offsetAndMetadata.offset());
long posititon = consumer.position(tp);
System.out.println("下一次要拉取的消息的起始偏移量 positition : " + posititon);
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
输出结果:
消费位移 lastConsumedOffset : 5
提交位移 committed offset : 6
下一次要拉取的消息的起始偏移量 positition : 6
位移自动提交
在 Kafka 中默认的消费位移提交方式是自动提交,这个由消费者客户端参数 enable.auto.commit 配置,默认为 true。这个不是每消费一条就提交一次,而是定期提交,定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值是5秒,此参数生效的前提是 enable.auto.commit 配置为 true。
在默认的方式下,消费者每5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑中完成,在每次向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以就是会提交上一次轮循的位移。
重复消费和消息丢失问题
自动提交消费位移的方式非常简单,免去了复杂的位移提交逻辑,编码更简洁。但会产生重复消费和消息丢失问题。
重复消费
假设刚提交完了一次消费位移(positition = 6),然后拉取一批消息进行消费(拉取到了7,8,9),在下一次自动提交消费位移之前,消费者崩溃了(此时7,8,9的消费逻辑已经消费了更改了数据库业务状态),那么又得从上一次位移提交的地方(positition=6,拉取7,8,9)重新开始消费,这样便发生了重复消费的现象。
解决方法:
- 在Kafka 中减少位移提交的时间间隔(比如5秒调2秒或者更小,但这样不能避免重稍微晚点消费的发送,而会使位移提交更加频繁),建议在业务中处理这类重复消费问题。
- 具体问题具体分析,先从业务逻辑上处理看看业务的消费是不是可以弄成消费冥等性,只能消费一次,多次消费结果一样,或者MYSQL中判断这条数据是不是已经处理了处理过了不要进行重复消费。
消息丢失
消息丢失在什么情况下产生?
拉取消息是批次拉取的,假设有线程 A不断的拉取消息交给线程B去处理拉取到的消息,此时线程 A 已经拉取到了一批次消息(1,2,3,4,5),消费位移已经确认提交了,但处理线程 B 还没有处理完成已经拉到的消息(只处理了1,2,3),此时发生了异常,重新恢复之后,只会从消费位移确认之后的位置开始重新拉取消费,这时消息4和5就丢失了。
手动位移提交
自动位移在正常情况下不会发生消息丢失和重复消费的情况,但不能避免有些异常情况的发生,同时,自动位移提交也无法做到精确的位移管理。Kafka 提供了手动位移提交的管理方式,可以使开发人员对消费位移的管理控制更加灵活。
很多时候并不是拉到消息就算消费完成,还需要一系列的业务处理,计算,入库,或者更复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息消费成功,手动的提交方式可以让开发人员根据业务逻辑在合适的地方手动位移提交。
开启手动提交的功能需要配置消费者客户端参数:enable.auto.commit 配置为 false
示例:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
手动提交可以分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync() 和 commitAsync() 两种类型的方法。
commitSync() 同步提交
commitSync重载方法,可以分为2类,有参数和无参数。
public void commitSync()
public void commitSync(Duration timeout)
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout)
无参数 commitSync()
无参数示例
while( isRunning.get() ) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//todo 业务逻辑
}
consumer.commitSync(); // 同步提交消费位移
}
示例代码中对先拉取到的每一条消息做相应的业务逻辑处理,然后对整个消息集做同步提交。
也可以做批量处理+ 批量提交,对相应的消息加一个累加记,累加到相应的次数再做一次批量提交。
比如有时拉到了2条,3条,N 条,累计到了100条再做同步提交,而不用频繁的做同步提交,
commitSync() 方法会根据 poll()方法拉取最新的位移来提交,只要没有发生不可恢复的错误,它就会阻塞消费者线程直至位移提交成功,对于不可恢复的错误需要将其捕获并做针对性的处理。
无参数调用,提交的消费位移的频率和拉取批次消息、处理批次消息的频率是一样的。
有参数 commitSync()
方法参数中提供了一个 offset 参数,用来提交指定分区的位移。无参的 commitSync()只能提交当前批次对应的 positition 值。如果需要提交一个中间值,比如业务每消费一条消息就提交一次位移,那就必须使用有参的这种调用方法。
示例,带参数的同步位移提交
while( isRunning.get() ) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// todo 业务处理
long offset = record.offset();
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1)));
}
}
实际应用中,很少会有这种每消费一条就提交一次消费位移的应用场景。
**commitSync()是同步执行阻塞的,每调用一次会有一定性能损耗。**更多时候是按照分区的粒度划分提交位移。
示例,按分区粒度同步提交消费位移
try {
while( isRunning.get() ) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if(records.isEmpty()) {
break;
}
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
for(ConsumerRecord<String, String> record : partitionRecords) {
// todo 业务逻辑处理
}
// 按分区粒度提交消费位移
lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumedOffset + 1)));
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
commitAsync() 异步提交
异步提交在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回时就开始了新一轮的拉取。性能优于同步提交。
commitAsync() 有三个不同的重载方法。
public void commitAsync()
public void commitAsync(OffsetCommitCallback callback)
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
第一个无参和第3个 offset 和同步方法一致。第2和第3中的 callback 参数,它提供了一个异步提交的回调方法,当位移提交完成后会回调 OffsetCommitCallback中的 onComplete()方法
代码示例
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if ( e == null) {
System.out.println(map);
} else {
System.out.printf("fail to commit offsets {}", map, e);
}
}
});
commitAsync()同样会有失败的情况发生导致重复消费,如果有重复消费尽量交给业务代码处理(冥等性,mysql 唯一索引,等)。
控制或者关闭消费
KafkaConsume 提供了对消费速度进行控制的方法、在有些应用场景我们可能需要暂停某些分区的消费而先消费其它分区,当达到一定条件再恢复这些分区消费。
KafkaConsume 中使用 pause()和 resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。
public void pause(Collection<TopicParition> partitions)
public void resume(Collection<TopicPartition> partitions)
KafkaConsume 还提供了一个无参的 paused()方法来返回被暂停的分区集合,此方法的具体定义如下:
public Set<TopicPartition> paused()
如何优雅的退出
while(IsRunning.get()),这样可以通过在其他地方设定 isRunning.set(false)来退出 while 循环。
还有一种方法就是调用 KafkaConsumer 的 wakeup()方法,wakeup()方法是 KafkaConsumer 中唯一可能从其他线程里安全调用的方法。调用后可以退出 poll()的逻辑,并抛出 WakeupException的异常,我们也不需要处理 WakeupException 的异常,它只是一个跳出循环的方式。
释放资源
退出循环后一定要显式的执行关闭动作及释放各种系统资源,包括内存资源,socker 连接等。
KafkaConsumer 提供了 close() 方法执行关闭,它有三种重载方法:
public void close()
public void close(Duration timeout)
@Deprecated
public void close(long timeout, TimeUnit timeUnit)
第二种方法通过 timeout 参数来设定关闭方法的最长执行时间,有些内部的关闭逻辑会破费一定时间,比如设置了自动提交消费位移,这里还会做一次移位提交的动作;
第一种方法没有 timeout 参数,并不意味着会无限制等待,它内部设定了最长等待时间(30秒);
第三种方法已被标记为@Deprecated,不推荐使用,不用管它了。
代码示例
一个对应完整的消费程序逻辑伪代码示例:
// 订阅主题
consumer.subscribe(Arrays.asList(topic));
try {
while (isRunning.get()) {
// consumer.poll(***)
// process the record.
// commit offset.
}
} catch (WakeupException e) {
// ingore the error
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
当关闭这个消费逻辑的时候,可以调用 consumer.wakeup(), 也可以调用 isRunning.set(false)
指定位移消费
在 Kafka 中每当消费者找不到记录的消息位移时,就会根据客户端参数 auto.offset.reset 的配置来决定从何处开始进行消费,这个参数默认值是"latest",
- latest: 表示从分区末尾开始消费信息
- earliest: 表示从起始处开始消费,也就是从0开始消费
- none: 出现查找不到的消费位移的时候,即不从最新的消息位置处开始,也不从最早的消息位置开始消费,会报出 NoOffsetForPartitionException 异常,如果能找到则不会报异常。
如果配置的不是以上三个,则会报出 ConfigException 异常。
seek()
有一种特殊的需求,比如我们不想从头拉也不想从末尾拉,在业务端已经记录了消费位置的情况下,想从指定位置开始拉取。
KafkaConsumer 提供了一个 seek() 方法。
方法定义如下:
public void seek(TopicPartition partition, long offset)
- partition 表示分区
- offset 参数用来指定从分区的哪个位置开始消费
seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll()方法的调用过程中实现的。也就是说,在执行 seek() 方法之前需要先执行一次 poll()方法,等到分配到分区之后才可以重置消费位置。
通过这个方法可以把位移存储到 DB 或者其它地方,开启消费者的时候读取这个位移通过 seek()方法来进行精准位移的消费。
再均衡
再均衡是指分区的所属权从一个消费者转移到另一个消费才的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以即方便又安全地删除消费组内的消费者或者往消费组内添加消费者。 - 在再均衡发生期间,消费组内的消费者是无法读取消息的(消费组会变的不可用)。
- 另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。(比如消费者A 消费完某个分区的一部分消息还没有来得及提交消费位移就发生了再均衡操作,之后这个分区被分配给了消费组内的另一个消费者,原来被消费的消息又被消费者B消费了一次,发生了重复消费。)
就当尽量避免不必要的再均衡的发生。
ConsumerRebalanceListener
consumerRebalanceListener 是一个接口,包含2个方法。
void onPartitionsRevoked(Collection<TopicPartition> partitions)
这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。(参数 partitions 表示再均衡前所分配到的分区)
void onPartitionsAssigned(Collection<TopicPartition> partition)
这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。(参数 partitions 表示再均衡前所分配到的分区)
代码示例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class demo {
public static final String brokerList = "localhost:9092";
public static final String topic = "test";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.id", groupId);
props.put("client.id", "consumer.client.id.demo");
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
// 常规订阅
// consumer.subscribe(Arrays.asList(topic));
// 再均衡监听器
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
// 再均衡开始之前和消费者停止读取消息之后
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
// 发生了再均衡,执行同步提交消费位移
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
// 重新分配分区之后和消费者开始读取消费之前
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
}
});
try {
while( isRunning.get() ) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// 消费位移暂存到 currentOffets
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
// 正常同步提交
consumer.commitAsync(currentOffsets, null);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
将消费位移暂存到一个局部变量 currentOffsets 中,正常消费的时候可以通过 commitAsync()方法异步提交消费位移,在发生再均衡动作之前可以通过再均衡监听器的 onPartitionRevoked()回调执行 commitSync()方法同步提交消费位移,尽量避免一些不必要的重复消费。
消费者拦截器
消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。
需要自定义实现。
多线程实现
KafkaProducer 是线程安全的,然后 KafkaConsumer 却是非线程安全的。
KafkaConsumer 中定义了一个 acquire() 方法,用来检测当前是否只有一个线程在操作,若有其它线程正在操作则会抛出ConcurrentModifcationException异常
acquire()
KafkaConsumer 中的每个公用方法在执行所有执行的动作之前都会调用这个 acquire() 方法,只有 wakeup()方法例外。
acquire 和锁(synchronized, Lock)不同,它不会造成阻塞等待,可以看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。
acquire 和 release 方法成对出现,表示相应的加锁和解锁操作。
方法定义如下:
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
} else {
this.refcount.incrementAndGet();
}
}
private void release() {
if (this.refcount.decrementAndGet() == 0) {
this.currentThread.set(-1L);
}
}
他们都是私有方法,实际应用中不需要我们显示调用,了解期内部机制即可。
多线程的实现方式
方法一:
线程封闭:即为每一个线程实例化一个 KafkaConsumer 对象
一个线程对应一个 KafkaConsumer 实例,可以称之为消费线。一个消费线程可以消费一个或者多个分区中的消息,所有的消费线程都属于同一个消费组。
代码示例:
package cn.litchicloud.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class FirstMultConsumerThreadDemo {
public static final String brokerList = "localhost:9092";
public static final String topic = "test";
public static final String groupId = "group.demo";
public static final AtomicBoolean isRunning = new AtomicBoolean(true);
public static Properties initConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return props;
}
public static void main(String[] args) {
Properties props = initConfig();
int consumerThreadNum = 4; // 开4个线程
for(int i = 0; i<consumerThreadNum; i ++) {
new KafkaConsumerThread(props, topic).start();
}
}
// 多线程处理
public static class KafkaConsumerThread extends Thread {
private KafkaConsumer<String, String> kafkaConsumer;
public KafkaConsumerThread(Properties props, String topic) {
this.kafkaConsumer = new KafkaConsumer(props);
this.kafkaConsumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record: records) {
System.out.println(record);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaConsumer.close();
}
}
}
}
方法二:
多个消费线程同时消费同一个分区,通过 assign()、seek() 等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费能力。
缺点是:对位移提交和顺序控制会变得非常复杂。
不推荐,了解一下即可。
重要的消费者参数
fetch.min.bytes
该参数用来配置 Consumer 在一次拉取请求(调用 poll()方法)中能从 Kafka 中拉取的最小数据,默认值为1(B)。Kafka 在收到 Consumer 的拉取请求中,如果返回给 Consumer 的数据量小于这个参数所配置的值,那么它就需要进行等待,走到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞量,不过会造成额外的延迟,对于延迟敏感的应用可能就不可取了。
fetch.max.bytes
与 fetch.min.bytes 对应,用来配置 Consumer 在一次拉取请求中从 Kafka 中拉取的最大数据量,默认值是52428800(B),也就是50MB。
featch.max.wait.ms
和 fetch.min.bytes 参数有关,如果 Kafka 仅仅参考 featch.min.bytes 参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer。
featch.max.wait.ms 参数用于指定 Kafka 的等待时间,默认值为500(ms),如果 Kafka 中没有足够多的消息而满足不了 featch.min.bytes 参数的要求,那么最终会等待500ms。
如果业务延迟敏感可以适当调小这个参数。
max.partition.featch.bytes
这个参数用来配置从每个分区里返回给 Consumer 的最大数据量,默认值为1048576(B),即1MB。这个参数与 featch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。
max.poll.records
这个参数用来配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的大小都比较小,可以适当调大这个参数值来提升消息速度。
connections.max.idle.ms
指定在多久之后关闭限制的边接,默认是540000(ms),即9分钟
receive.buffer.bytes
设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B),即64KB。如果设置为-1,则使用操作系统的默认值。
send.buffer.bytes
设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。
request.timeout.ms
配置 Consumer 等待请求响应的最长时间,默认值为30000(ms),即5分钟。
metadata.max.age.ms
配置元数据的过期时间,默认值为30000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有更新,则会被强制更新,即使没有任何分区变化或者新的 broker 加入。