Broker主从同步流程
配置数据同步流程
配置数据包含4种类型:Topic配置、消费者位点、延迟位点、订阅关系配置。每种配置数据由一个继承自ConfigManager的类来管理,继承关系如图。Slave如何从Master同步这些配置呢?我们先来看一下初始化服务的步骤
- 第一步:Master Broker在启动时,初始化一个BrokerOuterAPI,这个服务的功能包含Broker注册到Namesrv、Broker从Namesrv解绑、获取Topic配置信息、获取消费者位点信息、获取延迟位点信息及订阅关系等。
- 第二步:Slave Broker在初始化Controller的定时任务时,会初始化SlaveSynchronize服务,每60s调用一次SlaveSynchronize.syncAll()方法
- 第三步:syncAll()方法依次调用4种配置数据(Topic配置、消费者位点、延迟位点、订阅关系配置)的同步方法同步全量数据
- 第四步:syncAll()中执行的4个方法都通过Remoting模块同步调用BrokerOuterAPI,并从Master Broker获取数据,保存到Slave中
- 第五步:Topic配置和订阅关系配置随着保存内存信息的同时持久化到磁盘上;消费者位点通过BrokerController初始化定时任务持久化到磁盘上;延迟位点信息通过ScheduleMessageService定时将内存持久化到磁盘上
CommitLog数据同步流程
CommitLog的数据同步分为同步复制和异步复制两种。
同步复制是指生产者生产消息后,等待Master Broker将数据同步到Slave Broker后,再返回生产者数据存储状态;异步复制是指生产者在生产消息后,不用等待Slave同步,直接返回Master存储结果
- 异步复制
Master Broker启动时会启动HAService.AcceptSocketService服务,当监听到来自Slave的注册请求时会创建一个HAConnection,同时HAConnection会创建ReadSocketService和WriteSocketService
两个服务并启动,开始主从数据同步。ReadSocketService接收Slave同步数据请求,并将这些信息保存在HAConnection中WriteSocketService根据HAConnection中保存的Slave同步请求,从CommitLog中查询数据,并发送给Slave.
注:ReadSocketService和WriteSocketService是两个独立工作的线程服务,它们通过HAConnection中的公共变量将CommitLog同步给Slave
slaveRequestOffset表示Slave请求同步的位点值;
slaveAckOffset表示slave已经保存的位点值
- 同步复制
在CommitLog将消息存储到PageCache后,会调用CommitLog的handleHA()/submitReplicaRequest方法处理同步复制。
当BrokerRole配置为SYNC_MASTER时表示当前Master Broker需要同步将消息"发送"到Slave.根据Master Broker CommitLog
的存储结果构造一个GroupCommitRequest放入HAService中,再将GroupComitRequest放入GroupTransferService服务中,
等待GroupTransferService同步成功的锁。如果同步成功那么GroupCommit中的锁会被唤醒,并设置flushOK为True,表示生产
者发送的消息被Master Broker和Slave Broker 同时保存。
一个Master Broker可以配置多个Slave Broker,当需要同步数据时,通过service.getWaitNotifyObject().wakeupAll()来唤醒全部的Slave同步。虽然多个Slave都同步了数据,但是一旦Master Broker不可用时,消费者只会从一个Slave中拉取消息,所以生产环境建议Slave不要配置太多。
注:Slave在发送请求数据的Request时,会带上Slave请求的位点HAConnection.slaveRequestOffset,该值如果等于-1(默认),则表示没有Slave请求过位点数据
- ReadSocketService后台服务不断接收Slave Broker上报的offset,每上报一次都通知HAService.notifyTransferSome()方法,判断Slave同步的位点是否大于Master标记的已同步位点,如果大于则更新标记值,同时通知同步复制服务GroupTransferService.
GroupTransferService扫描所有的同步请求,依次判断哪些GroupCommitRequest的待同步复制的位点是比已同步位点小的,
释放GroupCommitRequest中的锁,消息处理线程可以将消息存储成功的结果返回给生产者 - 消费队列文件(ConsumeQueue)和索引文件(IndexFile)这两个文件是在SlaveBroker上追加CommitLog后由ReputMessageService进行创建的,所以不需要同步