RocketMQ5.0的Broker主从同步机制
一、主从同步工作原理
为了提高消息消费的高可用性,避免Broker发生单点故障引起存储在Broker上的消息无法及时消费, RocketMQ引入Broker主备机制,即:消息消费到达主服务器后需要将消息同步到消息从服务器,如果主服务器Broker宕机后,消息消费者可以从从服务器拉取消息。
下图所示是Broker的HA交互机制流程图及类图。主从同步模式分为:同步、异步。
- step1:主服务器启动,并在特定端口上监听从服务器的连接;
- step2:从服务器主动连接主服务器,主服务器接收客户端的连接,并建立相关TCP连接;
- step3:从服务器主动向主服务器发送待拉取消息偏移量,主服务器解析请求并返回消息给从服务器;
- step4:从服务器保存消息并继续发送新的消息同步请求。
1. 主从配置
参考rocketmq-distribution项目的conf目录下有:2主2从异步HA配置(2m-2s-async)、2主2从同步HA配置(2m-2s-sync)。以下1主1从异步HA配置实例如下。
主Broker配置:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = true
namesrvAddr=192.168.1.55:9876;172.17.0.3:9876
brokerIP1=192.168.1.55
从Broker配置:
注意:brokerName与主机相同;brokerId > 0时,则为从,0时则为主;brokerRole角色为SLAVE(从),刷盘类型为ASYNC_FLUSH(异步刷盘)。
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
# 启动SQL过滤
enablePropertyFilter = true
namesrvAddr=192.168.1.55:9876;172.17.0.3:9876
2. 启动HA
org.apache.rocketmq.store.DefaultMessageStore#start是Broker启动方法,如下图所示是其调用链及相关HA部分代码。
/**
* broker启动时,消息存储线程
* BrokerController#startBasicService()
* @throws Exception
*/
@Override
public void start() throws Exception {
// 是否HA主从复制
if (!messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
this.haService.init(this);
}
......
if (this.haService != null) {
this.haService.start();
}
......
}
org.apache.rocketmq.store.ha.DefaultHAService#init是HAService初始化方法,如下代码所示。注意,从Broker的broker.conf配置的brokerRole为SLAVE,才能创建HAClient(从Broker注册到主Broker)。
@Override
public void init(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
this.acceptSocketService = new DefaultAcceptSocketService(defaultMessageStore.getMessageStoreConfig());
this.groupTransferService = new GroupTransferService(this, defaultMessageStore);
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
this.haClient = new DefaultHAClient(this.defaultMessageStore);
}
this.haConnectionStateNotificationService = new HAConnectionStateNotificationService(this, defaultMessageStore);
}
org.apache.rocketmq.store.ha.DefaultHAService#start是HAService启动方法。注意:
- org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService:主Broker接收从Broker的连接事件;
- org.apache.rocketmq.store.ha.GroupTransferService:负责主Broker向从Broker发送同步数据;
- org.apache.rocketmq.store.ha.HAClient:从Broker向主Broker发送连接事件;
Broker启动时,根据配置brokerRole配置(ASYNC_MASTER、SYNC_MASTER、SLAVE)判定Broker是主还是从。若是Slave角色,在broker配置文件中获取haMasterAddress,并更新至masterAddress;但是haMasterAddress配置为空,则启动成功,但是不会执行HA。
/**
* 启动HAService
* step1:{@link AcceptSocketService}接收从Broker的注册事件,方法是{@link AcceptSocketService#beginAccept()}
* step2:启动{@link AcceptSocketService}线程,监听从Broker发送心跳
* step3:同步数据{@link GroupTransferService}线程启动,主Broker向从Broker发送数据
* step4:启动从Broker{@link HAClient}发送心跳到主Broker
*/
@Override
public void start() throws Exception {
// 主接收从Broker的连接事件,SelectionKey.OP_ACCEPT(连接事件)
this.acceptSocketService.beginAccept();
// 启动主Broker线程
this.acceptSocketService.start();
// 主Broker同步数据线程启动
this.groupTransferService.start();
this.haConnectionStateNotificationService.start();
// 启动从Broker{@link HAClient}发送心跳到主Broker
if (haClient != null) {
this.haClient.start();
}
}
二、主从同步实现机制
1. 从Broker发送连接事件
org.apache.rocketmq.store.ha.DefaultHAClient是从Broker向主Broker的发送连接事件的核心类,是个线程。其主要属性如下代码所示。
// Socket读缓存区大小,4M
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
// 主Broker地址
private final AtomicReference<String> masterHaAddress = new AtomicReference<>();
private final AtomicReference<String> masterAddress = new AtomicReference<>();
// 从Broker向主Broker发起HA的偏移量
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
// 网络传输通道
private SocketChannel socketChannel;
// 事件选择器
private Selector selector;
/**
* 上次读取主Broker的时间戳
* last time that slave reads date from master.
*/
private long lastReadTimestamp = System.currentTimeMillis();
/**
* 上次写入主Broker的时间戳
* last time that slave reports offset to master.
*/
private long lastWriteTimestamp = System.currentTimeMillis();
// 反馈HA的复制进度(从Broker的Commitlog文件的最大偏移量)
private long currentReportedOffset = 0;
// 本次处理读缓存区的指针
private int dispatchPosition = 0;
// 读缓存区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
// 读缓存区备份,与byteBufferRead交换
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private DefaultMessageStore defaultMessageStore;
// HA连接状态
private volatile HAConnectionState currentState = HAConnectionState.READY;
// 流监控
private FlowMonitor flowMonitor;
org.apache.rocketmq.store.ha.DefaultHAClient#run是HAClient启动执行任务,其调用链和代码如下。
- DefaultHAClient#connectMaster():从Broker连接到主Broker。
- DefaultHAClient#transferFromMaster():向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中。
/**
* 启动HAClient
* {@link DefaultHAClient#connectMaster()}:从Broker连接到主Broker
* {@link DefaultHAClient#transferFromMaster()}:向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
*/
@Override
public void run() {
log.info(this.getServiceName() + " service started");
this.flowMonitor.start();
while (!this.isStopped()) {
try {
switch (this.currentState) {
case SHUTDOWN:
return;
case READY:
if (!this.connectMaster()) {
log.warn("HAClient connect to master {} failed", this.masterHaAddress.get());
this.waitForRunning(1000 * 5);
}
continue;
case TRANSFER:
// 向主发送HA进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
if (!transferFromMaster()) {
// 没有可拉取消息时,设置READY状态
closeMasterAndWait();
continue;
}
break;
default:
this.waitForRunning(1000 * 2);
continue;
}
long interval = this.defaultMessageStore.now() - this.lastReadTimestamp;
if (interval > this.defaultMessageStore.getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("AutoRecoverHAClient, housekeeping, found this connection[" + this.masterHaAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("AutoRecoverHAClient, master not response some time, so close connection");
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.closeMasterAndWait();
}
}
log.info(this.getServiceName() + " service end");
}
注意,一旦HAClient线程启动后,在状态READY、TRANSFER来回变化,READY状态下:发送从Broker连接事件到主Broker,开启Socket连接;TRANSFER状态下:主从发送相关数据信息,如:从向主发送HA复制进度(currentReportedOffset,即:从Broker的Commitlog文件的最大偏移量);主向从发送同步消息。
org.apache.rocketmq.store.ha.DefaultHAClient#connectMaster是从Broker连接到主Broker的核心方法,其代码如下。
/**
* 从Broker连接到主Broker
* 注意:
* a. Broker启动时,若是Slave角色,从broker配置文件中获取haMasterAddress,并更新至masterAddress;
* b. 若是Slave角色,但是haMasterAddress配置为空,则启动成功,但是不会执行HA
* @return true连接成功;false连接失败
*/
public boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
// 获取主Broker地址
String addr = this.masterHaAddress.get();
if (addr != null) {
// 根据地址创建SocketAddress对象
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
// 获取SocketChannel
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
// SocketChannel注册OP_READ(网络读事件)
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
log.info("HAClient connect to master {}", addr);
this.changeCurrentState(HAConnectionState.TRANSFER);
}
}
// 获取Commitlog最大偏移量(HA同步进度)
this.currentReportedOffset = this.defaultMessageStore.getMaxPhyOffset();
this.lastReadTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
2. 主Broker接收连接事件
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService是主Broker接收从Broker连接事件的实现类,是一个线程。其主要属性如下代码所示。
// 主Broker监听本地的Socket(本地IP + 端口号)
private final SocketAddress socketAddressListen;
// Socket通道,基于NIO
private ServerSocketChannel serverSocketChannel;
// 事件选择器,基于NIO
private Selector selector;
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#beginAccept方法定义了主Broker监听从Broker的连接事件。
/**
* 启动监听从broker的连接
* Starts listening to slave connections.
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true); // TCP可重复使用
this.serverSocketChannel.socket().bind(this.socketAddressListen); // 绑定监听端口
if (0 == messageStoreConfig.getHaListenPort()) {
messageStoreConfig.setHaListenPort(this.serverSocketChannel.socket().getLocalPort());
log.info("OS picked up {} to listen for HA", messageStoreConfig.getHaListenPort());
}
this.serverSocketChannel.configureBlocking(false); // 非阻塞模式
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); // 注册OP_ACCEPT(连接事件)
}
org.apache.rocketmq.store.ha.DefaultHAService.AcceptSocketService#run监听到从Broker连接事件的任务处理,为每个连接事件创建org.apache.rocketmq.store.ha.HAConnection对象并启动(负责M-S的数据同步逻辑)。
/**
* 标准的NIO连接处理方式
* step1:选择器每1s处理一次连接就绪事件
* step2:是否连接事件,若是,创建{@link SocketChannel}
* step3:每一个连接创建{@link HAConnection}(负责M-S的数据同步逻辑)
*/
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 选择器每1s处理一次连接就绪事件
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
// 是否连接事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
// 若是连接事件时,创建SocketChannel
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
DefaultHAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 每一个连接创建HAConnection并启动(负责M-S的数据同步逻辑)
HAConnection conn = createConnection(sc);
conn.start();
DefaultHAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
org.apache.rocketmq.store.ha.DefaultHAConnection创建并启动时,启动读、写线程服务。其关键属性如下代码所示。
- private WriteSocketService writeSocketService:主Broker向从Broker写数据服务类
- private ReadSocketService readSocketService:主Broker读取从Broker数据服务类
private final DefaultHAService haService;
private final SocketChannel socketChannel;
// HA客户端连接地址
private final String clientAddress;
// 主Broker向从Broker写数据服务类
private WriteSocketService writeSocketService;
// 主Broker读取从Broker数据服务类
private ReadSocketService readSocketService;
private volatile HAConnectionState currentState = HAConnectionState.TRANSFER;
// 从Broker请求拉取的偏移量
private volatile long slaveRequestOffset = -1;
// 从Broker反馈已完成的偏移量
private volatile long slaveAckOffset = -1;
private FlowMonitor flowMonitor;
3. 从Broker反馈复制进度
org.apache.rocketmq.store.ha.DefaultHAClient#transferFromMaster是从Broker与主Broker传输数据的核心方法,代码如下所示,该方法有两大功能:
- 从Broker向主Broker:反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量),方法org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset执行。
- 从Broker接收主Broker:HA同步消息内容,方法org.apache.rocketmq.store.ha.DefaultHAClient#processReadEvent执行。
/**
* 向主反馈HA复制进度;处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
* {@link DefaultHAClient#reportSlaveMaxOffset(long)}:向主反馈HA复制进度,即:currentReportedOffset(从Broker的Commitlog文件的最大偏移量)
* {@link DefaultHAClient#processReadEvent()}:处理从主Broker发送过来的消息,并commit所有消息追加到Commitlog文件内存映射缓存中
*/
private boolean transferFromMaster() throws IOException {
boolean result;
// 判断是否需要向主Broker反馈当前待拉取偏移量
if (this.isTimeToReportOffset()) {
log.info("Slave report current offset {}", this.currentReportedOffset);
// 向主Broker反馈拉取偏移量
result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
return false;
}
}
this.selector.select(1000);
// 处理主Broker发送过来的消息数据
result = this.processReadEvent();
if (!result) {
return false;
}
return reportSlaveMaxOffsetPlus();
}
org.apache.rocketmq.store.ha.DefaultHAClient#reportSlaveMaxOffset向主Broker反馈HA复制进度,代码如下。
/**
* 向主Broker反馈拉取偏移量
* 注意:
* a. 向主Broker反馈拉取偏移量maxOffset: 对于Slave端:发送下次待拉取消息偏移量
* 对于Master端:本次请求拉取的偏移量,也可以理解为同步ACK
* b. 手动切换ByteBuffer的写模式/读模式;
* c. 通过{@link Buffer#hasRemaining()}判断缓存内容是否完全写入SocketChannel(基于NIO模式的写范例)
* @param maxOffset HA待拉取偏移量
* @return ByteBuffer缓存的内容是否写完
*/
private boolean reportSlaveMaxOffset(final long maxOffset) {
// 偏移量写入ByteBuffer
this.reportOffset.position(0); // 写缓存位置
this.reportOffset.limit(8); // 写缓存字节长度
this.reportOffset.putLong(maxOffset); // 偏移量写入ByteBuffer
// 将ByteBuffer的写模式 转为 读模式
this.reportOffset.position(0);
this.reportOffset.limit(8);
// 循环,并判定ByteBuffer是否完全写入SocketChannel
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
lastWriteTimestamp = this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
4. ReadSocketService线程读取从Broker复制进度
org.apache.rocketmq.store.ha.DefaultHAConnection.ReadSocketService#processReadEvent是主Broker读取从Broker拉取消息的请求,获取内容是HA复制进度。其代码如下,看出主Broker获取从Broker的HA复制进度后,赋值给DefaultHAConnection#slaveRequestOffset属性,后立即唤醒GroupTransferService线程,执行消息同步。
/**
* 主Broker读取从Broker拉取消息的请求
* step1:判定byteBufferRead是否有剩余空间,没有则{@link Buffer#flip()}
* step2:用剩余空间,从SocketChannel读数据到缓存中;读取到的内容是从Broker拉取消息的偏移量
* step3:通知等待同步HA复制结果的发送消息线程
* @return
*/
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
/*
byteBufferRead没有剩余空间时,则:position == limit == capacity
调用flip()方法后,则:position == 0, limit == capacity,加上processPosition = 0,说明从头开始处理
*/
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip(); // ByteBuffer重置处理
this.processPosition = 0;
}
// ByteBuffer有剩余空间,循环至byteBufferRead没有剩余空间
while (this.byteBufferRead.hasRemaining()) {
try {
// 从SocketChannel读数据到缓存中
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0; // 重置
this.lastReadTimestamp = DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 读取内容长度 >= 8,说明收到从Broker的拉取请求(内容是offset)
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPosition = pos;
// 从Broker反馈已完成的偏移量
DefaultHAConnection.this.slaveAckOffset = readOffset;
// 更新从Broker请求拉取的偏移量
if (DefaultHAConnection.this.slaveRequestOffset < 0) {
DefaultHAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + DefaultHAConnection.this.clientAddress + "] request offset " + readOffset);
}
// 通知等待同步HA复制结果的发送消息线程
DefaultHAConnection.this.haService.notifyTransferSome(DefaultHAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
// 连续读取字节数0,则终止本次读取处理
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + DefaultHAConnection.this.clientAddress + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
5. WriteSocketService传输同步消息
ReadSocketService线程读取从Broker发送的HA复制进度,由org.apache.rocketmq.store.ha.DefaultHAConnection.WriteSocketService根据DefaultHAConnection#slaveRequestOffset获取主Broker还没有同步的所有消息进行HA同步。其如下代码所示WriteSocketService#run方法,是同步消息核心逻辑。
/**
* 传输消息内容到HA客户端
* step1:slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件
* step2:nextTransferFromWhere为-1时,说明初次传输,计算nextTransferFromWhere(待传输offset)
* step3:判断上次是否传输完
* 上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,
* 发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)
* 上次传输没有完成:继续传输,忽略本次写事件
* step4:根据从Broker待拉取消息offset查找之后的所有可读消息
* step5:待同步消息总大小 > 一次传输的大小,默认32KB,则截取,此时一次传输不是完整的消息
* step6:传输消息内容
*/
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// slaveRequestOffset为-1时,说明主Broker没有收到从Broker的拉取请求,忽略本次写事件
if (-1 == DefaultHAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
/*
nextTransferFromWhere为-1时,说明初次传输
初次传输时,计算nextTransferFromWhere(待传输offset)
*/
// 初次传输
if (-1 == this.nextTransferFromWhere) {
// =0 时,从Commitlog文件的最大偏移量传输
if (0 == DefaultHAConnection.this.slaveRequestOffset) {
long masterOffset = DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
}
// !=0 时,从Broker请求的偏移量
else {
this.nextTransferFromWhere = DefaultHAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + DefaultHAConnection.this.clientAddress
+ "], and slave request " + DefaultHAConnection.this.slaveRequestOffset);
}
/*
判断上次是否传输完
上次传输完:当前时间 与 上次传输时间的间隔 > 发送心跳包时间间隔,
发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)
上次传输没有完成:继续传输,忽略本次写事件
*/
// lastWriteOver为true,则上次传输完
if (this.lastWriteOver) {
// 当前时间 与 上次传输时间的间隔
long interval =
DefaultHAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
// 时间间隔 > 发送心跳包时间间隔
if (interval > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header 发送心跳包长度: nextTransferFromWhere + size(消息长度0,避免长连接由空闲而关闭)
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
// 传输数据
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
}
// lastWriteOver为false,则上次传输没有完成,则继续传输
else {
// 继续传输上次拉取请求,还未完成,则忽略本次写事件
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 根据从Broker待拉取消息offset查找之后的所有可读消息
SelectMappedBufferResult selectResult =
DefaultHAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
// 待同步消息总大小 > 一次传输的大小,默认32KB
if (size > DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = DefaultHAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
int canTransferMaxBytes = flowMonitor.canTransferMaxByteNum();
if (size > canTransferMaxBytes) {
if (System.currentTimeMillis() - lastPrintTimestamp > 1000) {
log.warn("Trigger HA flow control, max transfer speed {}KB/s, current speed: {}KB/s",
String.format("%.2f", flowMonitor.maxTransferByteInSecond() / 1024.0),
String.format("%.2f", flowMonitor.getTransferredByteInSecond() / 1024.0));
lastPrintTimestamp = System.currentTimeMillis();
}
size = canTransferMaxBytes;
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size; // 下一次写入的offset
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header 传输size大小的消息内容(不一定是完整的消息)
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
// 传输数据
this.lastWriteOver = this.transferData();
} else {
DefaultHAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
DefaultHAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
DefaultHAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
if (this.selectMappedBufferResult != null) {
this.selectMappedBufferResult.release();
}
changeCurrentState(HAConnectionState.SHUTDOWN);
this.makeStop();
readSocketService.makeStop();
haService.removeConnection(DefaultHAConnection.this);
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
DefaultHAConnection.log.error("", e);
}
DefaultHAConnection.log.info(this.getServiceName() + " service end");
}
6. GroupTransferService线程通知HA结果
org.apache.rocketmq.store.ha.GroupTransferService该类负责将主从同步复制结束后,通知阻塞的消息发送者线程。同步主从Broker模式,即:消息刷磁盘后,继续等待新消息被传输到从Broker,等待传输结果,并通知消息发送线程。
1):待需要HA的消息集合
org.apache.rocketmq.store.CommitLog#asyncPutMessage是消息生产者发送消息到Broker时执行存储消息,参考《RocketMQ5.0.0消息存储<二>_消息存储流程》,该方法会根据同步或异步模式(默认)来执行org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA方法(完成刷盘和HA复制),方法调用链如下。
生产者把消息发送到Broker,完成commit操作(消息提交到文件内存映射中) ,随后根据同步/异步模式完成刷盘和HA。HA操作时,把消息提交请求添加到org.apache.rocketmq.store.ha.GroupTransferService.requestsWrite是主Broker待需要HA的的集合。以下是org.apache.rocketmq.store.CommitLog#handleHA的代码。
private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,
int needAckNums) {
if (needAckNums >= 0 && needAckNums <= 1) {
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
HAService haService = this.defaultMessageStore.getHaService();
long nextOffset = result.getWroteOffset() + result.getWroteBytes();
// Wait enough acks from different slaves
GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);
haService.putRequest(request);
haService.getWaitNotifyObject().wakeupAll();
return request.future();
}
2):通知消息发送者线程
HAService启动时,会启动GroupTransferService线程。GroupTransferService#run执行任务,如下代码所示。
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 间隔10s
this.waitForRunning(10);
// 主从同步复制结束后,通知阻塞的消息发送者线程
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
其中执行waitForRunning()方法时,会去执行org.apache.rocketmq.store.ha.GroupTransferService#swapRequests方法,使得requestsWrite与requestsRead两个集合对调:
- private volatile List<CommitLog.GroupCommitRequest>
requestsWrite:主Broker待需要HA的消息集合 - private volatile List<CommitLog.GroupCommitRequest>
requestsRead:主Broker正在执行的HA集合
private void swapRequests() {
List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
org.apache.rocketmq.store.ha.GroupTransferService#doWaitTransfer方法是主从同步复制结束后,通知阻塞的消息发送者线程,如下代码所示。
/**
* 主从同步复制结束后,通知阻塞的消息发送者线程
* step1:遍历消息提交请求(内存提交到Commitlog文件的内存映射)
* step2:判断主从同步成功:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量
*/
private void doWaitTransfer() {
// 加锁
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
// commit请求,即:内存提交到Commitlog文件的内存映射
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = false;
long deadLine = req.getDeadLine(); // 是否超时
final boolean allAckInSyncStateSet = req.getAckNums() == MixAll.ALL_ACK_IN_SYNC_STATE_SET;
for (int i = 0; !transferOK && deadLine - System.nanoTime() > 0; i++) { // 是否超时
if (i > 0) {
// 等待1s
this.notifyTransferObject.waitForRunning(1000);
}
if (!allAckInSyncStateSet && req.getAckNums() <= 1) {
// 主从同步成功判断:从已经成功复制的最大偏移量 >= 消息生产者发送消息后返回下一条消息的偏移量
transferOK = haService.getPush2SlaveMaxOffset().get() >= req.getNextOffset();
continue;
}
if (allAckInSyncStateSet && this.haService instanceof AutoSwitchHAService) {
// In this mode, we must wait for all replicas that in InSyncStateSet.
final AutoSwitchHAService autoSwitchHAService = (AutoSwitchHAService) this.haService;
final Set<String> syncStateSet = autoSwitchHAService.getSyncStateSet();
if (syncStateSet.size() <= 1) {
// Only master
transferOK = true;
break;
}
// Include master
int ackNums = 1;
for (HAConnection conn : haService.getConnectionList()) {
final AutoSwitchHAConnection autoSwitchHAConnection = (AutoSwitchHAConnection) conn;
if (syncStateSet.contains(autoSwitchHAConnection.getSlaveAddress()) && autoSwitchHAConnection.getSlaveAckOffset() >= req.getNextOffset()) {
ackNums++;
}
if (ackNums >= syncStateSet.size()) {
transferOK = true;
break;
}
}
} else {
// Include master
int ackNums = 1;
for (HAConnection conn : haService.getConnectionList()) {
// TODO: We must ensure every HAConnection represents a different slave
// Solution: Consider assign a unique and fixed IP:ADDR for each different slave
if (conn.getSlaveAckOffset() >= req.getNextOffset()) {
ackNums++;
}
if (ackNums >= req.getAckNums()) {
transferOK = true;
break;
}
}
}
}
if (!transferOK) {
log.warn("transfer message to slave timeout, offset : {}, request acks: {}",
req.getNextOffset(), req.getAckNums());
}
// 从完成复制后,唤醒消息发送者线程
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead.clear();
}
}
}
三、读写分离机制
RocketMQ读写分离与其他中间件的实现方式完全不同,RocketMQ是消费者首先向主服务器发起拉取消息请求,然后主服务器返回一批消息,然后会根据主服务器负载压力与主从同步情况,向从服务器建议下次消息拉取是从主服务器还是从从服务器拉取。
RocketMQ根据MessageQueu查找Broker地址的唯一依据是brokerName。Broker组织中根据brokerName获取一组Broker服务器(M-S),它们的brokerName相同但brokerId不同,主服务器的brokerId为0,从服务器的brokerId大于0。 其方法是org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInSubscribe
详细消费拉取消息时,实现读写分离机制见后续章节,参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取》。
四、参考资料
- https://blog.csdn.net/sinat_14840559/article/details/115970738
- https://www.cnblogs.com/shanml/p/16950178.html