举个简单栗子,如上图
replication.factory = 2
副本因子是2- 一个Leader副本,一个Follower副本
- 初始情况Leader和Follower 副本都是空
- 我们逐步看当producer写入消息时,
- broker端的副本会做什么,副本的HW和LEO是如何被更新的
步骤一:当Producer开始向Leader写入数据
- Leader 接收到producer发送消息后,将消息写入log文件
- 自动更新自己的LEO值为1,即第一条消息写入0的位置,下一条写入位置为1
- 尝试更新HW值,此时没有Fetch请求发起,Remote LEO 值还是0
- Leader 用最新LEO值(1)与Remote LEO值(0)进行比较,取最小值
- 最小值还是0,所以不用更新
步骤二: 当Follower 开始向Leader发送Fetch请求,获取数据
Follower在发送Fetch请求时,携带了Fetch offset,这个offset代表Follower要请求的下一条消息,其实也就是Follower的LEO
Leader 在返回数据时,携带了Leader HW,这是当前分区最新的HW值
在Follower Fetch请求时,Leader 和 Follower之间会互相同步信息
- Leader 收到Follower 发送fetch请求后,
- 先读取log文件获取数据
更新Remote LEO
=0(因为此时Follower还没有写入这条消息)尝试更新HW
,逻辑同上,min(Leader LEO,Remote LEO) = 0,不用更新- 将数据和当前HW值一起发送给Follower
- Follower 收到Leader返回的数据后
- 先将消息写入本地log文件
更新自己的LEO
值为1更新HW值
,逻辑同Leader,min(Leader返回的HW,自己的LEO)=0,不用更新
步骤三: Follower再次向Leader发起Fetch请求,获取数据
- Leader 收到Follower 发送fetch请求后,
- 先读取log文件获取数据
更新Remote LEO
=1(Follower 传过来的offset=1)尝试更新HW
,min(Leader LEO,Remote LEO) = 1,更新HW=1- 将数据和当前HW值一起发送给Follower
- Follower 收到Leader返回的数据后
- 返回数据为空,不用写入log文件
- 不更新自己的LEO值
尝试更新HW值
,min(Leader返回的HW,自己的LEO)=1,更新HW=1
上图有个炼狱,是什么呢?
当Leader无法满足fetch要求的数据,Leader就一条数据,Follower请求第二条数据,
此时Fetch请求会被暂存到purgatory(炼狱)中,待有数据再次处理,默认超时时间500ms,
当到达超时时间,返回空数据,
未到达超时时间,有生产者发送数据,会唤醒在炼狱中FETCH请求,开始上述处理过程