文章目录
- Acknowledgment
- 读消息指定分区
- 批量消费
- 消息拦截
// 采用监听得方式接收 @Payload标记消息体内容.
@KafkaListener(topics = {"test"},groupId = "hello")
public void onEvent(@Payload String event,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partition){
System.out.println("读取到了时间消息: " + event);
}
Acknowledgment
开启手动确认模式;
listener:
ack-mode: manual
// 采用监听得方式接收 @Payload标记消息体内容.
@KafkaListener(topics = {"test"},groupId = "hello")
public void onEvent(@Payload String event,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
ConsumerRecord<String,String> record,
Acknowledgment ack){
ack.acknowledge(); // 手动确认,告诉kafka服务器该消息我已经收到了.
System.out.println("读取到了时间消息: " + event);
}
读消息指定分区
@KafkaListener(groupId = "hello",
topicPartitions = {
@TopicPartition(
topic = "${kafka.topic.test}",
partitions = {"0","1","2"}, // 0 1 2分区不限制偏移量
partitionOffsets = { // 3 分区只读 3偏移量之后的; 4分区只读 4偏移量之后的
@PartitionOffset(partition = "3",initialOffset = "3"),
@PartitionOffset(partition = "4",initialOffset = "3")
})
}
)
批量消费
修改配置
kafka:
bootstrap-servers: 192.168.225.128:9092
listener:
type: batch
# 每次读取20条
consumer:
max-poll-records: 20
消费者端接收一个List即可
@KafkaListener(topics = {"hi"},groupId = "batchGroup2")
public void onEvent3(List<ConsumerRecord<String,String>> records){
System.out.println(records.size());
}