kafka的生产者和消费者
- 四、 生产者
- 4.1 分区分配策略
- 4.2 副本和消息消费
- 4.2.1 副本(AR、ISR、OSR)
- 4.2.2 HW与LEO
- 4.2.3 ISR 集合和 HW、LEO的关系
- 五、消费者
- 5.1 分区分配策略
- 5.2 消费者offset的存储
四、 生产者
4.1 分区分配策略
(1)分区的原因
- 方便在集群中扩展:每个partition通过调整以适应它所在的机器,而一个Topic又可以有多个partition组成,因此整个集群可以适应适合的数据。
- 可以提高并发:以Partition为单位进行读写。类似于多路。
(2)分区的原则
- 指明partition(这里的指明是指第几个分区)的情况下,直接将指明的值作为partition的值。
- 没有指明partition的情况下,但是存在值key,此时将key的hash值与topic的partition总数进行取余得到partition值。
- 值与partition均无的情况下,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与topic可用的partition总数取余得到partition值,即round-robin算法。
4.2 副本和消息消费
4.2.1 副本(AR、ISR、OSR)
(1)AR:Assigned Replicas,某分区的所有副本(这里所说的副本包括leader和follower)统称为 AR。
(2)ISR:In Sync Replicas,所有与leader副本保持"一定程度同步"的副本(包括leader副本在内)组成 ISR 。生产者发送消息时,只有leader与客户端发生交互,follower只是同步备份leader的数据,以保障高可用,所以生产者的消息会先发送到leader,然后follower才能从leader中拉取消息进行同步,同步期间,follower的数据相对leader而言会有一定程度的滞后,前面所说的"一定程度同步"就是指可忍受的滞后范围,这个范围可以通过server.properties中的参数进行配置。
(3)OSR :Out-of-Sync Replied,在上面的描述中,相对leader滞后过多的follower将组成OSR 。由此可见,AR = ISR + OSR,理想情况下,所有的follower副本都应该与leader 保持一定程度的同步,即AR=ISR,OSR集合为空。
leader负责跟踪维护 ISR 集合中所有follower副本的滞后状态,当follower副本"落后太多" 或 "follower超过一定时间没有向leader发送同步请求"时,leader副本会把它从 ISR 集合中剔除。如果 OSR 集合中有follower副本"追上"了leader副本,那么leader副本会把它从 OSR 集合转移至 ISR 集合。
上面描述的"落后太多"是指follower复制的消息落后于leader的条数超过预定值,这个预定值可在server.properties中通过replica.lag.max.messages配置,其默认值是4000。“超过一定时间没有向leader发送同步请求”,这个"一定时间"可以在server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000。
默认情况下,当leader发生故障时,只有 ISR 集合中的follower副本才有资格被选举为新的leader,而在 OSR 集合中的副本则没有任何机会。
4.2.2 HW与LEO
(1)HW
HW (High Watermark)俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能消费HW之前的消息。
(2)LEO
LEO (Log End Offset),标识当前日志文件中下一条待写入的消息的offset。图4.1中offset为9的位置即为当前日志文件的 LEO,分区 ISR 集合中的每个副本都会维护自身的 LEO ,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。
4.2.3 ISR 集合和 HW、LEO的关系
producer在发布消息到partition时,只会与该partition的leader发生交互将消息发送给leader,leader会将该消息写入其本地log,每个follower都从leader上pull数据做同步备份,follower在pull到该消息并写入其log后,会向leader发送ack,一旦leader收到了ISR中的所有follower的ack(只关注ISR中的所有follower,不考虑OSR,一定程度上提升了吞吐),该消息就被认为已经commit了,leader将增加HW,然后向producer发送ack。
也就是说,在ISR中所有的follower还没有完成数据备份之前,leader不会增加HW,也就是这条消息暂时还不能被消费者消费,只有当ISR中所有的follower都备份完成后,leader才会将HW后移。ISR集合中LEO最小的副本,即同步数据同步的最慢的一个,这个最慢副本的LEO即leader的HW,消费者只能消费HW之前的消息。
图4.1 ISR、LEO和HW的关系示意图
五、消费者
5.1 分区分配策略
一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由那个consumer消费的问题。Kafka的两种分配策略:
(1)round-robin循环
按照分区的字典对分区和消费者进行排序,然后对分区进行循环遍历,遇到自己订阅的则消费,否则向下轮询下一个消费者。即按照分区轮询消费者,继而消息被消费。
轮询的方式会导致每个Consumer所承载的分区数量不一致,从而导致各个Consumer压力不均。
(2)range
根据topic的分区数来进行分配,按照订阅该topic的consumer数进行平均分配,多出来的则按照consumer的字典序挨个分配。
这种方式会导致在前面的consumer得到更多的分区,导致各个consumer的压力不均衡。
5.2 消费者offset的存储
由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复以后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到了那个offset,以便故障恢复后继续消费。
(1)kafka自动保存提交
(2)消费者手动保存提交