1、前言
考虑下面几个条件下如何提升kafka的消费速度
- 消息要求严格有序,如chat聊天消息
- 业务处理速度慢,如处理一条数据需要100ms
- 分片不合理,如有的分区很闲,有的分区消息数量积压
2、解决方案
1、顺序问题
关于消息消费时存在先后顺序问题,在chat聊天系统消息消费时遇到的问题及优化思路(一)中已经加以说明。具体实现大致为对同一会话、帖子Id等维度放入同一分区中,如使用Id % 分区数,如下图:
上面的解决方案是将同一会话的消息发送至同一分区进行消费,但是但消费者的消费能力大概率是不够的,因此,需要并发处理,详见
chat聊天系统消息消费时遇到的问题及优化思路(一)
注意:此时需要注意一个问题就是,当单个消费者消费分区,并将获取到的消息放入不同的队列中,此时还可能存在乱序问题,所以,可以考虑复用Id%队列数的方式,将同一个会话的消息放入到相同的队列中,让协程进行消费。
2、消息不丢失问题
上面的方案,当消费者从kafka拉到消息后,并没有等待处理完成就继续从kafka拉取消息然后缓存到内存中,等待消费队列慢慢消费,这个时候如果机器宕机,则内存中的消息将会丢失。
基于上面描述的问题,考虑使用手动提交offset。但是这样其实还存在一个问题就是:各个协程处理的offset值其实是不一样的,如下图:
此时goroutine1和goroutine2 的消息的offset不一致,为了保证消息不丢失,采用以下策略:定期手动提交当前的offset信息,提交的offset值选当前分区的最小的offset,如上面的就选1001这个offset值。可以采用在内存中缓存处理的offset列表的实现方式,如下:
当内存中待处理的最大offset与最小的offset差值>= M时,阻塞消费小城继续从kafka拉取消息,控制异常情况下的数据最多不多于M条
但是此时会引进一个新问题:消息重复消费
此时要保证消息消费的幂等性,如可通过消息唯一标识放入如redis中判断
3、消息堆积问题
堆积原因:
- 生产者短时间生产大量消息到broker,消费者无法及时消费,如大促
- 生产者无法感知消息堆积,仍继续生产消息,导致消息堆积进一步加剧
- 消费者能力不足,消费时间长,消费者宕机、网络异常与broker无法通信
- 业务功能存在bug,导致消费者无法消费
解决方案
消费者端
- 增加消费者数量,并采用并发消费
- 提高消费速度,避免消费时间过长。如果单条消息消费时间无法优化,可以提高批次拉取的数量(当批次拉取的数量较少时,拉取数据量/处理时间 < 生产速度时就容易堆积)
- 消费消息时尽量减少耗时操作,尽量减少三方接口调用、读写库等
- 合理设置消费组服务数量,合理增加topic的partition=个数,消费者数 >= 分区数
- 补偿消费,即消费跳过积压数据,直接消费最新的数据,同时启动补偿数据进程消费积压数据
生产者端
- 支持熔断与隔离,当broker消息积压时,对生产者熔断
- 根据key采用合适的算法,将消息均匀分不到对应的分区中
服务端
- 进行预估,设置合理的分区数
在电商中经常大促,因此很容易出现短时间内产生大量消息的问题,因此在大促前可根据历史情况进行容量预估和相关的扩容策略。
参考:
我是如何将一个老系统的Kafka消费者服务的性能提升近百倍的