概述:
1、kafka使用规范主要从,生产、可靠性、和消费为轴线定义使用规范,另外Kafka建议核心业务系统不要使用(对数据可靠性要求高),因为Kafka高效性能源于批量设计思想,要充分利于Kafka高效性能,前提是要允许部分数据丢失。
2、kafka使用核心:削峰、解耦、向下游并行广播通知(无可靠性保证)和分布式事务,本规范仅从削峰、解耦、向下游并行广播通知论述。
1、可靠性(强制):
可靠性包括Producer发送消息机制的可靠性,Kafka Server(Broker)消息持久化刷盘机制和Broker主从节点消息同步机制,Consumer消息的消费机制。
1.1、Producer发送消息的可靠性:
1.1.1、核心参数设置:
acks:用于Producer指明Broker主从节点消息同步的机制,有如下三个设置:
- acks=0,表示生产者在成功写入消息之前不会等待任何来自服务器的响应。说白了就是Producer只负责消息发送,不管消息是否成功到达Broker,消息可靠性极低,但发送效率极高;
- acks=1,表示只要集群的Leader分区接收到了消息,就会向生产者发送一个成功响应的ack。说白了就是Producer只确保消息发送到了Leader,消息可靠性不太高,发送效率一般;
- acks=all,表示只有所有参与复制的节点(ISR和min.insync.replicas综合决定)全部收到消息时,生产者才会接收到来自服务器的响应ack。说白了就是Producer发送的消息会从Leader同步到Slave,具体同步多少Slave节点?可以通过min.insync.replicas指定;
min.insync.replicas:用于指明Producer发送的消息,Leader收到消息后,会同步到Slave节点的个数,该值默认是1,值越大,消息可告性越高,但发送效率极低。同时该参数控制消息至少被写入到多少个Leader才算是"真正写入",acks=all需要考虑真正写入;
replica.lag.time.max.ms:Kafka判断ISR中的Follower和Leader是否需要同步?根据是参数 replica.lag.time.max.ms (主从之间同步落后时间差),首先ISR 的全称是:In-Sync Replicas ISR是一个Follower的列表,里面存储的是能跟Leader数据同步一致的Follower,确定一个Follower在ISR列表中,有3个判断条件:
- 根据Follower和Leader的交互时间差,如果大于某个时间差就认定这个Follower不行了,就把此Follower从ISR中剔除,此时间差根据rerplica.lag.time.max.ms指定,如:rerplica.lag.time.max.ms=10000,单位ms,也就是默认10s,ISR中的Follower没有向ISR发送心跳包就会被移除;
- 根据Leader和Follower的消息条数差值决定是否从ISR中剔除此Follower,此消息条数差值根据配置参数。如:rerplica.lag.max.messages=4000 ,即:消息条数差大于4000会被移除。该参数Kafka 0.10.0已弃用;
- Follower所在的Broker节点的确不可用,如:网络不可达,或直接宕机。就把此Follower从ISR中剔除;
注意:剔除不是意味着不可用,Follower还是会去默默同步数据,随着Follower不断与Leader进行消息同步, Leader副本的 LEO也会逐渐后移 ,并最终追赶上Leader,此时该Follower就有资格进入ISR集合。另外从消息投递的效率和可靠性综合考虑,建议asks设置为1。如果设置为all(或-1),建议min.insync.replicas取Topic分区数(Partition)的1/2或者1/3,replica.lag.time.max.ms可以使用默认10s。
retries:用于指明生产者可以重发消息的次数,如果达到这个次数,最终还是失败,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms ,可以通过retry.backoff.ms 参数来配置时间间隔。
1.1.2、刷盘机制(broker节点配置):
kafka的刷盘机制是通过以下三个参数确定:
- log.flush.interval.ms:在刷新到磁盘之前,日志分区上消息保留在内存中的最长时间;
- log.flush.interval.messages:在将消息刷新到磁盘之前,日志分区上累积的消息数量;
- log.flush.scheduler.interval.ms:日志刷新器检查是否需要将所有日志刷新到磁盘的频率(一个Broker上可能有很Partition);
我们可以把log.flush.interval.messages值设为1,实现同步刷盘,同步刷盘对性能影响极大,而且现在Kafka统一由集团管理,应该不会随意改配置。
注:如果未设置log.flush.interval.ms,则使用log.flush.scheduler.interval.ms中的值。
1.1.3、消息生产(producer):
消息生产,指Kafka生产投递消息的方式,分为同步和异步两种方式。
1.1.3.1、同步发送:
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。同步发送效率不高,数据可靠性高。
1.1.3.2、异步发送:
异步发送数据可靠性不高,异步发送效率较高,不会阻塞发送工作线程,但有其它开销。因此在谈异步发送方式之前,先看看异步发送的底层原理。
Kafka的Producer发送消息采用异步发送的方式时,在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator【记录累计器,充当一个队列】。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
相关参数:
- batch.size: 只有数据积累到batch.size之后,sender才会发送数据,batch.size以字节为单位;
- linger.ms:如果数据迟迟未到达batch.size,sender等待linger.ms 之后就会发送数据;
1.1.3.2.1、异步发送不带回调:
异步发送不带回调,指发送了就不管了,直接返回后续不再捕获发送结果。
1.1.3.2.2、异步发送带回调:
异步发送带回调,指发送了,可以设置一个回调函数捕获发送执行结果,编码可以根据发送执行结果(success/fail)做补偿。
注:消息发送失败会自动重试,不需要我们在回调函数中手动重试。
1.1.4、消息消费(consumer):
消息消费包话消费方式,和消息消费提交方式。
1.1.4.1、消费方式:
消费方式包括消息拉取方式,点对点消费和广播消费。
1.1.4.1.1:消息拉取方式:
Kafka目前已发布的版本仅支持,pull方式获取消息。
1.1.4.1.2:点对点消费:
Kafka其实不支持点对点对消费,它是以消费组的发布订阅模式消费,即:消费组消费模式是点对点。
注:关于消费组的个数,与Topic分区数的关系,具体一点来说是主分区数。
消费组由多个consumer组成,每一个消费组,只能有一个消费者消费同一topic下的的主分区,复制分区在Kafka里,只做备份数据的功能,只有当主挂了,选举成主时,才提供消费服务。
- 同一组中当消费者数大于分区数时,多余的消费者不会接收消息,但可以作为备用消费者,当处理的消费者挂掉后,备用消费者可以继续进行处理;
- 同一组中当消费者数小于分区数时,一个消费者将会消费多个主分区,此时Kafka会尽量负载消费;
- 对于消费者来说,在每个分区上实际上是单线程消费;
1.1.4.1.3:广播消费:
Kafka不支持广播消费,若要实现,消费端可以用动态生成消费组实现。
注:动态生成消费组,很多Kafka生产环境是禁止的,主要以下三点不足:
- 消费组每次动态生成,不好管理维护;
- Kafka后端要维护消费组消费的Offset,但重启后又无意义,记而不用(因为每次重启应用都会生成新的消费组);
- Kafka要明配置 auto.offset.reset,配置为 earliest 会有重复消费的可能,需要实现消费逻辑幂等,配置为 latest 会有漏消费的可能;
auto.offset.reset有以下三个可选值:
- latest(默认):对于同一个消费者组,若没有提交过offset,则只消费消费者连接topic后,新产生的数据;
- earliest:对于同一个消费者组,若没有提交过offset,则从头开始消费;
- none:对于同一个消费者组,若没有提交过offset,会抛异常直接抛出异常;
其实可以为后台应用硬编码死不同的消费组,但这样一来应用扩展性和维护性就降低了。
1.1.4.1、消费提交方式:
消费提交方式指,消息被消费者Pull以后,是手动提交,还是自动提交,可以通过如下两个参数配置:
enable.auto.commit:是否开启自动提交offset功能;
auto.commit.interval.ms:自动提交offset的时间间隔;
1.1.4.1.1、自动提交:
自动提交对于编码来说是不可控的,如果消费者在执行消费业务逻辑时,出现异常时,是不能回滚的,直接后果就是消息丢失。如果要使用此种提交方式,请确认异常补救方式。
1.1.4.1.2、手动提交:
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。
- 两者的相同点是:都会将本次pull的一批数据最高的偏移量(offset)提交(可以批量消费);
- 两者的不同点是:commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败),而commitAsync则没有失败重试机制,故有可能提交失败;
2、缓冲区和消息体大小限制(非强制):
缓冲区和消息体大小限制,主要由:max.request.size、buffer.memory、batch.size、linger.ms、message.max.bytes、max.message.bytes、fetch.max.bytes指定。
2.1、Producer:
生产端缓冲区和消息体大小的配置。
2.1.1、max.request.size:
- 限制单条消息大小(以字节为单位),即每条消息最大允许的大小;
- 限制发送请求大小(以字节为单位),即每次发送到Broker最大允许的大小;
注:max.request.size,建议不超过1024*2 Kb,超过2Kb开启压缩机制。
2.1.2、buffer.memory:
buffer.memory的本质就是用来约束Producer能够使用的内存缓冲区的大小的,内存缓冲区的作用就是预分配内存,且在使用上不会被GC回收。
2.1.3、batch.size:
通过这个参数来设置批量发送的数据大小,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息)。
2.1.4、linger.ms:
这个是设置消息发送延迟,这样可以收集更多的消息后批量发送(发往同一分区的消息)。
注:当 batch.size 和 linger.ms 同时设置的时候,只要两个条件中满足一个就会发送。比如:说batch.size设置16kb,linger.ms设置50ms,那么当消息积压达到16kb就会发送,如果没有到达16kb,那么在第一个消息到来之后的50ms之后消息将会发送。
2.1.5、batch.size < buffer.memory :
二者大小的限制最好: batch.size < buffer.memory,如果:发送的真实消息体大小(以字节为单位)> batch.size,可能会导致频繁GC。如果:batch.size > buffer.memory,可能会导致消息发不出去。
2.2、Broker(服务端):
Broker配置的参数,开发人员不能控制修改,建议使用前向运维人员问清楚。
2.2.1、message.max.bytes:
这个参数决定了 Broker 能够接收到的最大消息的大小,限制Broker上的所有Topic,如果:max.request.size > message.max.bytes,可能会导致消息发送异常。
2.2.2、max.message.bytes:
这个参数决定了 Broker 能够接收到的最大消息的大小,它只针对某个主题生效,可动态配置,可覆盖全局的 message.max.bytes。如果:max.request.size > max.message.bytes,可能会导致消息发送异常。
2.3、Consumer(消费端):
消费端消息体的大小,主要指拉取消息的大小。
2.3.1、fetch.max.bytes:
fetch.max.bytes 这个参数决定消费者单次从 Broker 获取消息的最大字节数。如果:fetch.max.bytes < max.request.size,可能会导致消费者消费不了消息。
3、常见建议操作(非强制):
常见建议操作,包括消息生产溯源,消息积压告警阈值设置,消息集压处理策略。
3.1、消息生产溯源:
消息生产溯源,指生产者向下游生产投递消息后,防止下游消息丢失,无法找回。同时考虑消息投递的效率和降级异常补尝处理,建议Producer如下操作发送消息。
- 发送消息之前先落库记录,投递之前此条记录标识为未发送状态;
- 异步发送机制投递消息;
- 异步回调处理,投递结果,成功、失败、还是异常;
- 定时任务降级异常补尝处理未发送、发送失败,或者异常的记录;
3.2、消息集压告警阈值设置:
消息积压告警阈值设置,一种是与业务相关性不大,完全是从消息中间件特性设置的阈值。另一种是与业务相关性很大,即:上游系统投递的消息,下游系统必需在某一个时差处理,否则会影响业务。
3.2.1、业务相关性不大:
业务相关性不大,直接找运维提供一个阈值即可。
3.2.2、业务相关性很大:
业务相关性很大,阈值的设置:
- 明确下游系统的消费速率;
- 明确上下系统业务最大允许的时差;
- 根据1和2算出一个合理的积压告警阈值;
例如:下游系统的消费速率是1 Second,上下系统业务最大允许的时差5 Minute,则积压告警阈值是:300,考虑提前告警,可以设为280。
3.3、消息集压处理策略:
消息集压原因:
- Kafak中间件自身问题导致;
- 下游系统因代码原因,导致不能消费;
- 穷尽现有能力优化仍然消费不过来;
对于1和2得找出原因解决,对于3得动态横向扩展消费端扩大消费能力,分为无序消息的扩展和有序消息的扩展。
3.3.1、无序消息的扩展:
无序消息的扩展,直接加应用服务器即可。
3.3.2、有序消息的扩展:
有序消息的扩展:
- 加应用服务器;
- 消费端做二次分发,即:做好备用topic(做好开关控制),当阻塞时,二次分发,扩大分区分摊消费能力;