kafka是一个常用的分布式消息中间件,与RabbitMQ对比,特点是可以无限横向扩容,并保持高可靠性、高吞吐量和低延迟,因此比RabbitMQ有更高的市场占有率(网上搜了一下,kafka大约41%,RabbitMQ大约29%)。
一、kafka常见概念
一般正常的开发,了解到前6个概念就好,其余的概念更多用于kafka运维配置,或问题排查。
1、producer生产者
指生产消息,并把消息投递到kafka的外部应用程序 ,它不是kafka的组成部分
2、consumer消费者
指连接到kafka,接收/订阅消息,并进行后续逻辑处理的外部应用程序,它也不是kafka的组成部分。
一个消费者可以同时消费kafka的多个队列(主题)
3、Consumer Group消费者组
连接到kafka的消费者,必须指定消费者组,多个消费者可以指定相同的消费者组,这样可以避免同一个消息被重复消费。
如果2个消费者绑定了同一个队列(主题),指定了不同的消费者组,则每条消息,都会同时投递给这2个消费者。
4、topic 主题
kafka里,收发消息的逻辑集合,每个主题,都可以认为是一个队列;
生产者和消费者,都是通过连接主题 来处理消息。
5、partition分区
kafka里存储消息的物理集合,一个主题可以划分为1个或多个分区,可以理解为子队列;
每个分区只属于一个主题,且只能被一个消费者消费(同一个组)。
该主题收到的所有消息,会根据消息的key选择对应的分区进行投递;
如果消息未指定key,且没有定义分区规则时,则kafka会随机平均投递到主题的多个分区里。
注意:每个分区里的消息,一定是按队列的规则,保证先进先出;但是不同的分区的消息,则无法保证。
因此,如果要确保消费者能按消息的投递顺序进行消费:
- 每个主题只建一个分区(这个不推荐)
- 同一批需要保证顺序的消息,指定相同的key,比如使用用户ID作为消息的key,相同key的消息会投递到同一个分区
6、offset偏移量
指每个分区里的消息的唯一编号,并且是从0开始递增的。主题+分区+偏移量,可以唯一定位一条消息。
注:每个分区里的每条消息,offset一定是不同的;不同分区的offset是会重复的。
消费者会也记录每次消费的offset值,来标识自己当前处理到哪一条消息了,以便断开重连时,继续消费,消费者的offset也是存储在kafka中。
7、broker集群节点
kafka集群里的某个节点,通常是一台服务器上的kafka实例。
8、replica副本
主题的每个分区,可以指定多个副本,每个副本都存储一份完全相同的消息数据。
一般建议同一个分区的不同副本,要保存在不同的broker上,避免broker故障导致该分区数据丢失。
9、leader/follower
主题的每个分区,如果有多个副本,那么其中一个副本会作为leader对外提供读写服务,其余的作为follower只同步数据。
如果leader出现故障时,会从follower中选举一个副本作为leader重新提供服务。
10、ISR(in-sync replicas)
分区的同步副本集合,每个分区都会维护一个ISR列表,内容是那些与leader保持同步的follower清单。
如果某个follower跟不上同步的进度,或无法保持同步时,会从ISR列表中移除。
只有在ISR列表里的follower,才有机会提升为leader.
注:已经成为leader的副本,也在ISR中
11、LEO日志末端偏移量
LEO指Log End Offset,即当前分区中下一条待写入消息的偏移量,该条消息是未指向具体的消息。
分区的每个副本,都有自己的LEO。
12、HW高水位线
HW指High Watermark Offset,即当前分区中已经被提交并复制到所有副本的最高消息偏移量(offset),
leader接收到消息,但是还未同步完时,不会更新HW值。
leader会比对自己和所有follower的LEO,用其中的较小值,来更新HW值。
13、LAG滞后消息数
一个消费者组,在消费 主题的每个分区时,每个分区都会计算一个LAG值,指该分区的消息总数与消费者已消费的消息数的差值。
通常是 该分区的HW 减 消费者组的offset。
实践中,运维人员应当对LAG进行监控,比如超出10000时进行告警和处理。
理解了这些概念,网上找了一张kafka工作原理图:
二、kafka与RabbitMQ对比
相对于RabbitMQ,Kafka有如下特点:
- kafka的消息消费完并不会立即删除,而是保存一定时间后才删除,默认是7天。而RabbitMQ是消费完就删除。
kafka不删消息这点我很喜欢,尤其是排查问题需要数据恢复时。 - kafka消费者只支持pull模式,不支持push模式,即消费者只能主动轮询kafka获取消息,默认是每500ms拉一次,每次最多拉500条数据,轮询的优点就是灵活,缺点就是没消息时空耗性能。
RabbitMQ默认只支持push模式,主动推送消息给消费者,实时性更好。 - kafka通过topic主题连接生产者和消费者,多个消费者连接到同一个topic,即可消费该主题的所有消息,如果有不需要的消息,也只能由消费者自行判断和抛弃处理。
RabbitMQ则是通过Exchange接收消息,再通过指定的规则转发到具体的Queue,由消费者消费,可以参考我以前写的文章:https://youbl.blog.csdn.net/article/details/80401945
RabbitMQ可以通过配置很多路由,避免消息投递给不必要的消费者,
不过RabbitMQ也支持直接通过Queue接收和投递消息。 - 性能上,RabbitMQ是单线程模型,大数据上会有瓶颈;而kafka可以几乎无限扩展。
- 有序性,kafka对于主题的每个分区,因为有且只能有一个消费者,所以能保证消息的有序性,不同分区则无法保证;
而RabbitMQ在多消费者时,会平均分配消息,无法保证有序,并且在消息消费失败重新投递时,也会破坏消息顺序。
三、kafka最佳实践
1、生产者配置
-
生产者发送时,有个acks配置,说明如下:
- 为0,生产者发消息后,不等borker响应就返回成功,性能最高,丢数据概率也最高;
- 为1,生产者发消息后,leader节点返回成功,就算成功;但是leader未同步给其它副本前就挂了,也会丢数据;
- 为all 或 -1,则必须等所有副本都同步成功,才返回成功,保证数据不丢失,但性能最低。
生产环境,建议配置为-1,另外2个配置都有丢数据的可能性。
-
min.insync.replicas 最小副本数要求,默认值1,建议为2(当然要求每个topic副本数都在3以上)
因为如果配置为1,当leader收到数据,还未同步就故障了,会丢失数据。 -
retries: 重试次数,设置为较大值,默认值为
Integer.MAX_VALUE
,确保发送成功。
注:虽然重试次数默认很大,但是重试还受到另一个时间配置的影响:delivery.timeout.ms
(默认2分钟),retries还没用完,这个超时时间就到了,也会中断发送。
另外,设置了retries,很显示,请使用异步发消息的方式,避免同步导致线程堵塞,影响用户体验,或其它业务问题。 -
配置参考:
spring:
kafka:
producer:
bootstrap-servers: 10.1.1.1:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 10000
properties:
delivery.timeout.ms: 2000 # 发送消息上报成功或失败的最大时间,默认120000,两分钟
linger.ms: 0 # 生产者把数据组合到一个批处理进行请求的最大延迟时间,默认0
# 参考 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
request.timeout.ms: 1000 # 批处理就绪后到响应的等待时长,含网络+服务器复制时间
batch.size: 1000
2、消费者配置
- 为了避免消息丢失,消费者需要开启手动ack,消息业务逻辑处理完成再提交偏移量
- 参考后面的死循环问题,建议使用String反序列化器
- 根据业务情况,配置合适的批量拉取数量
max-poll-records
,默认值500 - 根据业务情况,配置合适的
auto-offset-reset
值,默认值latest- latest:消费者在消费主题的某个分区时,如果没有之前的消费记录(以前提交的偏移量),则只拉取最新消息,忽略历史消息。
- earliest:与latest相反,没有之前的消费记录时,从最早的消息开始处理。
- none:没有之前的消费记录时,抛出异常。
- 配置参考:
spring:
kafka:
consumer:
bootstrap-servers: 10.1.1.1:9092
max-poll-records: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
listener:
type: batch
ack-mode: manual_immediate
3、其它
- 配置多个 bootstrap server url,避免单节点故障,导致连接失败
- 如果是异步发消息,不要在kafkaTemplate的成功回调和失败回调方法里有太多业务逻辑,回调方法是单线程处理,里面的业务逻辑会占用
delivery.timeout.ms
的超时时间配置,可能导致后续消息发送超时。 - 同理,消费者也是单线程处理,消费逻辑太重的话,可能导致
session.timeout.ms
超时,从而被认为消费者离线,导致问题
四、kafka工具介绍
1、图形化工具
推荐 OffsetExplorer
,下载地址:https://www.kafkatool.com/download.html
2、命令行工具
kafka安装包内置了很多脚本工具,可以方便的查询kafka的状态,这些工具只需要下载就可以使用,无需安装。
- 下载地址: https://kafka.apache.org/downloads
下载后解压,在bin目录下有很多sh文件,这些是在linux上使用的;
如果在Windows下使用,要用 bin\windows\ 下的那些bat文件。
下面用windows的bat文件命令举例(linux改用对应的sh文件执行即可) - 使用说明,请参考官方文档:https://kafka.apache.org/documentation/
查询某个消费者组下有哪些消费者,以及这些消费者对主题的消费状态:
d:\kafka_2.13-3.4.0\bin\windows\kafka-consumer-groups.bat --describe --group=cb_consumers --bootstrap-server=10.0.0.1:9092
字段说明:
- GROUP 消费者分组
- TOPIC 消费的主题
- PARTITION 消费的分区
- CURRENT-OFFSET 当前消费到的消息偏移量
- LOG-END-OFFSET 当前分区的最大消息偏移量
- LAG 滞后消息条数
- CONSUMER-ID 消费者的ID
- HOST 消费者所在的主机
- CLIENT-ID 客户端ID
注:LAG可以简单理解为LOG-END-OFFSET 减 CURRENT-OFFSET
,但是实际上LAG=HW 减 CURRENT-OFFSET
四、springboot项目使用
1、生产者Demo代码:
1.1、添加pom依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
1.2、添加application.yml配置:
spring:
kafka:
producer:
bootstrap-servers: 10.1.1.1:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 2 # 失败重发次数
1.3、Java发送代码:
private final KafkaTemplate kafkaTemplate; // 注入的Bean
// 同步发送消息
String topic = "beinetTest111";
Object result = kafkaTemplate.send(topic, "我是key", objData).get();
2、消费者Demo代码:
2.1、添加pom依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.2、添加application.yml配置:
spring:
kafka:
consumer:
bootstrap-servers: 10.1.1.1:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
type: batch
ack-mode: manual_immediate
2.3、Java发送代码:
@KafkaListener(topics = "${kafka-topic.reports}")
public void consumerCreateTask(List<ConsumerRecord<String, Object>> consumerRecordList, Acknowledgment ack) {
if (consumerRecordList == null || consumerRecordList.size() <= 0)
return;
long start = System.nanoTime();
ConsumerRecord lastRecord = consumerRecordList.get(0);
try {
// 转换dto,并进行业务逻辑处理
long elapsedTime = System.nanoTime() - start;
log.debug("Topic:{} 分区:{} 偏移:{} 条数:{} 耗时:{}ns",
lastRecord.topic(),
lastRecord.partition(),
lastRecord.offset(),
dtos.size(),
elapsedTime);
} catch (Exception exp) {
long elapsedTime = System.nanoTime() - start;
log.error("Topic:{} 分区:{} 偏移:{} 耗时:{}ns 出错:",
lastRecord.topic(),
lastRecord.partition(),
lastRecord.offset(),
elapsedTime,
exp);
} finally {
// 不论成败,都提交,避免出错导致死循环,避免丢消息的逻辑,可以在catch里备份
ack.acknowledge();
}
}
五、kafka常见问题
1、有多个消费者,但是总会有一个消费者拿不到消息数据
对一个消费者组而言,一个主题有几个分区,就最多接受几个消费者;
比如主题有2个分区,那么每个分区只能分配给组里的一个消费者,最多只有2个消费者连接上来,如果组里有3个消费者,那么肯定会有一个消费者处于空闲状态,没活干。
如果主题有2个分区,但是组里只有一个消费者,那么2个分区的消息,都会投递给这一个消费者。
2、主题的分区分配策略是怎么样的?
当主题存在多个分区和多个消费者时,kafka的源码实现里有如下几种分区分配策略:
- Range策略(默认策略):
把当前消费者组消费的每个主题的所有分区,逐个分配给消费者,注意是每个主题单独处理,所以会出现不均衡的情况。
例如:a主题有3个分区a0/a1/a2,b主题3个分区b0/b1/b2,有2个消费者C0/C1,分配过程大致是:
第1步分配a主题: a0->C0, a1->C1, a2->C0
第2步分配b主题: b0->C0, b1->C1, b2->C0
这样可以看出,【消费者C0要维护4个分区的数据,而C1只要维护2个分区的数据】,出现了明显的不均衡问题。 - Round-Robin策略:
把所有的分区,排序后,轮询方式逐一分配给所有的消费者,
例如:a主题有3个分区a0/a1/a2,b主题3个分区b0/b1/b2,有2个消费者C0/C1,分配过程大致是:
第1步分配a主题: a0->C0, a1->C1, a2->C0
第2步分配b主题: b0->C1, b1->C0, b2->C1
注意:第2步不是从头开始,而是接着第1步,继续分配,所以排除了Range方案的不均衡问题,
最终的分配结果是【2个消费者,各自负责3个分区】。
但是,如果2个消费者消费的主题,只有部分交集,并不完全相同时,还是会出现不均衡的情况。
如果希望改用这种策略,目前暂时没有配置方法,要在代码里修改partition.assignment.strategy
属性,参考代码:
@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration {
private final KafkaProperties kafkaProperties;
private final ConcurrentKafkaListenerContainerFactory<String, Object> kafkaFactory;
@Bean("myKafkaFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> batchFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
kafkaFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
return kafkaFactory;
}
}
然后在消费者代码上指定使用这个工厂Bean:
@KafkaListener(id = "beinetHandler1", groupId = "beinetGroup", topicPattern = "beinetTest.*",
containerFactory = "myKafkaFactory")
public void msgHandler(List<ConsumerRecord> message, Acknowledgment ack) {
- 在kafka的源码里还有2种实现,本文暂不深入介绍:
org.apache.kafka.clients.consumer.CooperativeStickyAssignor
org.apache.kafka.clients.consumer.StickyAssignor
3、消费者加入或退出时,消息还能正常消费吗?
结论:只要有存活的消费者存在,那么所有的消息都能正常消费。
当有新的消费者加入组,或组中有消费者下线/退出,都会触发消费者重新平衡的动作,就是重新为所有的消费者分配分区。
重平衡发生时,默认停止所有消费者工作,直到分配结束。
4、有兄弟说已经往kafka写入消息了,但是消费者那边没有数据入库
- 首先,确认kafka有消息,用上面的图形化工具offset explorer,去对应的topic主题查找数据,发现确实有数据
- 再在工具里,查看Consumers下的对应Group,发现Lag为0,说明消息已经被正常消费了
- 查看消费者的应用日志,没有消费日志产生
- 再继续排查消费者的应用日志,发现有如下日志:
cb_consumers: partitions assigned: []
这表示,该消费者没有分配到分区,不在工作中。
初步判断,是不是有人在其它地方启动了消费者,把数据给消费掉了。 - offset explorer这个工具,不能展示消费者IP信息,只能使用上面的命令
kafka-consumer-groups.bat
查看消费者IP,
再找运维看看这个IP是谁的。 - 最后定位到是测试环境配置错误,把开发环境的数据消费掉了。
5、反序列化失败,导致消费者死循环问题
某天发布到测试环境后,发现程序启动后,一直抛如下异常,且持续几十分钟也不会中断:
<#6d8d6458> j.l.IllegalStateException: No type information in headers and no default type provided
at o.s.util.Assert.state(Assert.java:76)
at o.s.k.s.s.JsonDeserializer.deserialize(JsonDeserializer.java:535)
at o.a.k.c.c.i.Fetcher.parseRecord(Fetcher.java:1387)
at o.a.k.c.c.i.Fetcher.access$3400(Fetcher.java:133)
at o.a.k.c.c.i.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1618)
at o.a.k.c.c.i.Fetcher$CompletedFetch.access$1700(Fetcher.java:1454)
at o.a.k.c.c.i.Fetcher.fetchRecords(Fetcher.java:687)
at o.a.k.c.c.i.Fetcher.fetchedRecords(Fetcher.java:638)
at o.a.k.c.c.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
at o.a.k.c.c.KafkaConsumer.poll(KafkaConsumer.java:1233)
at o.a.k.c.c.KafkaConsumer.poll(KafkaConsumer.java:1206)
at j.i.r.GeneratedMethodAccessor109.invoke(Unknown Source)
at j.i.r.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at j.l.reflect.Method.invoke(Unknown Source)
at o.s.a.s.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at o.s.a.f.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
at c.s.proxy.$Proxy186.poll(Unknown Source)
at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1413)
at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1250)
at o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1162)
at j.u.c.Executors$RunnableAdapter.call(Unknown Source)
at j.u.c.FutureTask.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
放google搜索了一下,说是反序列化找不到类型信息导致的,并建议不要使用JSON反序列化。
查了一下配置变化记录,确实加了一个kafka的反序列化配置变更:
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
把spring.consumer.value-deserializer 改成:org.apache.kafka.common.serialization.StringDeserializer
就恢复了。
了解了一下,同事希望在消费者的方法参数上,直接使用对象,而不是使用String,所以做了这个修改。
而正好出错的这个消费者,消费的是其它项目生产的消息,里面并不包含类型信息。
而这个异常,是在spring的底层抛出的,业务代码上无法进行try 捕捉,代码同时又设置了手工提交ack,导致代码进入了死循环。
为了避免这种问题,还是建议使用StringDeserializer反序列化,自己在代码里反序列化比较好。
6、broker单个故障,导致消费者无法提交偏移量的问题
生产环境,为了性能和故障转移,部署了6个broker,某天有一个broker故障下线了,按理应该会自动切换,实际上,所有的消费者都开始抛异常:“error when storing group assignment during syncgroup”
直到人工恢复broker并上线,故障才恢复。
最终排查结果:
- 运维把kafka的一个内部topic:
__consumer_offsets
副本数配置为2, - 同时配置
min.insync.replicas=2
,该配置的含义是ISR列表最小同步副本数不得少于2个 - 故障下线的broker,正好包含该topic:
__consumer_offsets
的一个副本,导致该主题的副本数只剩下一个,不符合min.insync.replicas=2
配置要求,从而停止工作 - topic:
__consumer_offsets
的作用,是接收并存储所有消费者组的消费偏移量,该主题不工作,就会导致消费者无法提交偏移量,从而导致所有消费不正常,会重复消费数据。
知道问题了,调整就是把 topic: __consumer_offsets
的副本数调整为3(默认值就是3,运维改错了)
6、所有消费者都不消费任何消息
如果消费者先启动,然后才创建topic,会导致消费者消费不到数据,可以重启消费者试试
7、spring中的kafka,是否存在线程安全问题
生产者使用的KafkaTemplate是线程安全的,经测试,都是使用同一个线程进行消息发送。
同样,消费者也是线程安全的,每个消费者也是单线程处理所有接收到的消息。