下面以一个demo来理解消息DLedger消息写入和同步其他Peers的流程
/**
* 创建DLedgerServer
*/
public DLedgerServer create(String selfId, String peers, String leaderId) {
DLedgerConfig dLedgerConfig = new DLedgerConfig();
dLedgerConfig.setMappedFileSizeForEntryData(1024);
dLedgerConfig.setEnableLeaderElector(false);
dLedgerConfig.setEnableDiskForceClean(false);
dLedgerConfig.setStoreType(DLedgerConfig.FILE);
dLedgerConfig.setPeers(peers);
dLedgerConfig.setSelfId(selfId);
dLedgerConfig.setGroup("test");
DLedgerServer dLedgerServer = new DLedgerServer(dLedgerConfig);
MemberState memberState = dLedgerServer.getMemberState();
memberState.setCurrTermForTest(0);
if (selfId.equals(leaderId)) {
memberState.changeToLeader(0);
} else {
memberState.changeToFollower(0, leaderId);
}
dLedgerServer.startup();
return dLedgerServer;
}
private String joinPeers(List<String> nodes, String s) {
StringBuilder builder = new StringBuilder();
for (String node : nodes) {
builder.append(node).append(s);
}
builder.deleteCharAt(builder.length() - 1);
String join = builder.toString();
return join;
}
@Test
public void appendTest() throws Exception {
//创建一个两peers的DLedgerServer,三个不利于debug代码
List<String> nodes = Arrays.asList(
"n1-localhost:1001",
"n2-localhost:1002"
);
String peers = joinPeers(nodes, ";");
DLedgerServer server1 = create("n1", peers, "n1");
DLedgerServer server2 = create("n2", peers, "n1");
Thread.sleep(1000L);
//保存一个消息并同步另一个peer
AppendEntryRequest request = new AppendEntryRequest();
request.setRemoteId(server1.getMemberState().getLeaderId());
request.setGroup("test");
request.setBody(new byte[500]);
CompletableFuture<AppendEntryResponse> handleAppend = server1.handleAppend(request);
AppendEntryResponse response = handleAppend.get();
System.out.println(String.format("resp: index=%s, pos=%s", response.getIndex(), response.getPos()));
}
handleAppend
RocketMQ的handleAppend流程是RocketMQ中与消息追加(Append)相关的一个重要流程,它涉及将生产者发送的消息追加到Broker的存储文件中。
方法里isPendingFull判断当前等待同步到peer的消息数是不是过多,默认是10000个。太多会导致系统内消息的堆积。
当消息不满时,会调用dLedgerStore.appendAsLeader先写入本地的存储。
public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest request) throws IOException {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING);
long currTerm = memberState.currTerm();
if (dLedgerEntryPusher.isPendingFull(currTerm)) {
//pend请求满时
AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
appendEntryResponse.setGroup(memberState.getGroup());
appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());
appendEntryResponse.setTerm(currTerm);
appendEntryResponse.setLeaderId(memberState.getSelfId());
return AppendFuture.newCompletedFuture(-1, appendEntryResponse);
} else {
//pend请求不满时
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
//loader本地保存日志
DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);
//waitAck加入pend请求等待返回
return dLedgerEntryPusher.waitAck(resEntry, false);
}
}
appendAsLeader
appendAsLeader和下文的appendAsFollower都是写入本地存储的详细代码,包含写消息和写索引。
dataFileList和CommitLog一样,设计上是一个可以不停追加写入的消息队列。当然dataFileList里写入的时候由于每个消息的大小不一样,所以在通过index的方式查询的时候只能一个个消息的遍历。为了提升查询的速度要需要使用到indexFileList,记录了每一个消息在dataFileList中的index和消息size,每一个indexBuffer的长度都是固定的,由magic(4)+pos(8)+size(4)+index(8)+term(8)共32个字节组成。
在查询时只要知道消息的index直接定位indexBuffer,再从indexBuffer中取出post、size,dataFileList就能读取到消息。
@Override
public DLedgerEntry appendAsLeader(DLedgerEntry entry) {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
PreConditions.check(!isDiskFull, DLedgerResponseCode.DISK_FULL);
ByteBuffer dataBuffer = localEntryBuffer.get();
ByteBuffer indexBuffer = localIndexBuffer.get();
DLedgerEntryCoder.encode(entry, dataBuffer);
int entrySize = dataBuffer.remaining();
synchronized (memberState) {
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER, null);
PreConditions.check(memberState.getTransferee() == null, DLedgerResponseCode.LEADER_TRANSFERRING, null);
//生成日志的序号
long nextIndex = ledgerEndIndex + 1;
entry.setIndex(nextIndex);
entry.setTerm(memberState.currTerm());
entry.setMagic(CURRENT_MAGIC);
DLedgerEntryCoder.setIndexTerm(dataBuffer, nextIndex, memberState.currTerm(), CURRENT_MAGIC);
long prePos = dataFileList.preAppend(dataBuffer.remaining());
entry.setPos(prePos);
PreConditions.check(prePos != -1, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.setPos(dataBuffer, prePos);
for (AppendHook writeHook : appendHooks) {
writeHook.doHook(entry, dataBuffer.slice(), DLedgerEntry.BODY_OFFSET);
}
//写日志
long dataPos = dataFileList.append(dataBuffer.array(), 0, dataBuffer.remaining());
PreConditions.check(dataPos != -1, DLedgerResponseCode.DISK_ERROR, null);
PreConditions.check(dataPos == prePos, DLedgerResponseCode.DISK_ERROR, null);
DLedgerEntryCoder.encodeIndex(dataPos, entrySize, CURRENT_MAGIC, nextIndex, memberState.currTerm(), indexBuffer);
//写索引
long indexPos = indexFileList.append(indexBuffer.array(), 0, indexBuffer.remaining(), false);
PreConditions.check(indexPos == entry.getIndex() * INDEX_UNIT_SIZE, DLedgerResponseCode.DISK_ERROR, null);
if (logger.isDebugEnabled()) {
logger.info("[{}] Append as Leader {} {}", memberState.getSelfId(), entry.getIndex(), entry.getBody().length);
}
ledgerEndIndex++;
ledgerEndTerm = memberState.currTerm();
if (ledgerBeginIndex == -1) {
ledgerBeginIndex = ledgerEndIndex;
}
updateLedgerEndIndexAndTerm();
return entry;
}
}
waitAck
leader写入后还需要同步到各个peers上,通过waitAck方法等待同步完成。waitAck是异步等待,创建CompletableFuture加入pendingAppendResponsesByTerm后异步监听同步结束。
pendingAppendResponsesByTerm为一个双层的Map,key为term、index,当同步完成后会从pendingAppendResponsesByTerm中通过term、index取出CompletableFuture,调用CompletableFuture.complete返回结果。
public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry, boolean isBatchWait) {
updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex());
checkTermForPendingMap(entry.getTerm(), "waitAck");
AppendFuture<AppendEntryResponse> future;
if (isBatchWait) {
future = new BatchAppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs());
} else {
future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs());
}
future.setPos(entry.getPos());
//加入pendingAppendResponsesByTerm异步等待follower写入完成
CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);
if (old != null) {
logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());
}
return future;
}
为什么是双层的Map,第一层为term,如果leader宕机触发重新选举term会+1,而老的term就全部移除了。
QuorumAckChecker
public void doWork() {
.....
long currTerm = memberState.currTerm();
checkTermForPendingMap(currTerm, "QuorumAckChecker");
checkTermForWaterMark(currTerm, "QuorumAckChecker");
if (pendingAppendResponsesByTerm.size() > 1) {
for (Long term : pendingAppendResponsesByTerm.keySet()) {
if (term == currTerm) {
continue;
}
//当term不同时
for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(memberState.getGroup());
response.setIndex(futureEntry.getKey());
response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());
response.setLeaderId(memberState.getLeaderId());
logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm);
futureEntry.getValue().complete(response);
}
//移除
pendingAppendResponsesByTerm.remove(term);
}
}
......
}
QuorumAckChecker
pendingAppendResponsesByTerm在QuorumAckChecker线程执行,waitAck返回future异步等待
QuorumAckChecker有一个work方法不停的检查pendingAppendResponsesByTerm中有没有完成的future
private class QuorumAckChecker extends ShutdownAbleThread {
private long lastPrintWatermarkTimeMs = System.currentTimeMillis();
private long lastCheckLeakTimeMs = System.currentTimeMillis();
private long lastQuorumIndex = -1;
public QuorumAckChecker(Logger logger) {
super("QuorumAckChecker-" + memberState.getSelfId(), logger);
}
@Override
public void doWork() {
...
}
}
doWork最重要的部分如下
//peerWaterMarksByTerm保存了每个peer已写入的index序号
Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);
List<Long> sortedWaterMarks = peerWaterMarks.values()
.stream()
.sorted(Comparator.reverseOrder())
.collect(Collectors.toList());
//有半数peer写入完成
long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2);
final Optional<StateMachineCaller> fsmCaller = DLedgerEntryPusher.this.fsmCaller;
dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
boolean needCheck = false;
int ackNum = 0;
for (Long i = quorumIndex; i > lastQuorumIndex; i--) {
try {
CompletableFuture<AppendEntryResponse> future = responses.remove(i);
if (future == null) {
needCheck = true;
break;
} else if (!future.isDone()) {
//回复handleAppend写入成功
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(memberState.getGroup());
response.setTerm(currTerm);
response.setIndex(i);
response.setLeaderId(memberState.getSelfId());
response.setPos(((AppendFuture) future).getPos());
System.out.println(String.format("complete: index=%s, pos=%s", response.getIndex(), response.getPos()));
future.complete(response);
}
ackNum++;
} catch (Throwable t) {
logger.error("Error in ack to index={} term={}", i, currTerm, t);
}
}
if (ackNum == 0) {
checkResponseFuturesTimeout(quorumIndex + 1);
waitForRunning(1);
}
if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 || needCheck) {
updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
checkResponseFuturesElapsed(quorumIndex);
lastCheckLeakTimeMs = System.currentTimeMillis();
}
lastQuorumIndex = quorumIndex;
EntryDispatcher
peerWaterMarksByTerm在EntryDispatcher线程中同步给follower并调用updatePeerWaterMark更新
DLedgerServer启动时会根据peer的个数为每个peer创建一个EntryDispatcher用于同步消息
public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,
DLedgerRpcService dLedgerRpcService) {
this.dLedgerConfig = dLedgerConfig;
this.memberState = memberState;
this.dLedgerStore = dLedgerStore;
this.dLedgerRpcService = dLedgerRpcService;
//没peer创建一个EntryDispatcher
for (String peer : memberState.getPeerMap().keySet()) {
if (!peer.equals(memberState.getSelfId())) {
dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
}
}
this.entryHandler = new EntryHandler(logger);
this.quorumAckChecker = new QuorumAckChecker(logger);
this.fsmCaller = Optional.empty();
}
private void doAppend() throws Exception {
while (true) {
if (!checkAndFreshState()) {
break;
}
if (type.get() != PushEntryRequest.Type.APPEND) {
break;
}
if (writeIndex > dLedgerStore.getLedgerEndIndex()) {
doCommit();
doCheckAppendResponse();
break;
}
if (pendingMap.size() >= maxPendingSize || DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000) {
long peerWaterMark = getPeerWaterMark(term, peerId);
for (Long index : pendingMap.keySet()) {
if (index < peerWaterMark) {
pendingMap.remove(index);
}
}
lastCheckLeakTimeMs = System.currentTimeMillis();
}
if (pendingMap.size() >= maxPendingSize) {
//处理同步失败后重试
doCheckAppendResponse();
break;
}
//复制消息到follower
doAppendInner(writeIndex);
writeIndex++;
}
}
private void doAppendInner(long index) throws Exception {
//根据index查询loader写入的entry数据
DLedgerEntry entry = getDLedgerEntryForAppend(index);
if (null == entry) {
return;
}
checkQuotaAndWait(entry);
PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);
//发送复制请求
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);
pendingMap.put(index, System.currentTimeMillis());
responseFuture.whenComplete((x, ex) -> {
try {
PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
switch (responseCode) {
case SUCCESS:
pendingMap.remove(x.getIndex());
//更新peerWaterMarksByTerm
updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());
quorumAckChecker.wakeup();
break;
case INCONSISTENT_STATE:
logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());
changeState(-1, PushEntryRequest.Type.COMPARE);
break;
default:
logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
break;
}
} catch (Throwable t) {
logger.error("", t);
}
});
lastPushCommitTimeMs = System.currentTimeMillis();
}
NettyRpcServer接收请求后调用handlePush处理
public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
//The timeout should smaller than the remoting layer's request timeout
CompletableFuture<PushEntryResponse> future = new TimeoutFuture<>(1000);
switch (request.getType()) {
case APPEND:
if (request.isBatch()) {
PreConditions.check(request.getBatchEntry() != null && request.getCount() > 0, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
} else {
PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
}
long index = request.getFirstEntryIndex();
//请求写入writeRequestMap
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future));
if (old != null) {
logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());
future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));
}
break;
case COMMIT:
compareOrTruncateRequests.put(new Pair<>(request, future));
break;
case COMPARE:
case TRUNCATE:
PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
writeRequestMap.clear();
compareOrTruncateRequests.put(new Pair<>(request, future));
break;
default:
logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo());
future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
break;
}
wakeup();
return future;
}
EntryHandler
dLedgerRpcService.push发送Append消息,在follower节点EntryHandler接收处理
@Override
public void doWork() {
try {
if (!memberState.isFollower()) {
waitForRunning(1);
return;
}
if (compareOrTruncateRequests.peek() != null) {
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll();
PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN);
switch (pair.getKey().getType()) {
case TRUNCATE:
handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
break;
case COMPARE:
handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());
break;
case COMMIT:
handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());
break;
default:
break;
}
} else {
long nextIndex = dLedgerStore.getLedgerEndIndex() + 1;
Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex);
if (pair == null) {
checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());
waitForRunning(1);
return;
}
PushEntryRequest request = pair.getKey();
if (request.isBatch()) {
handleDoBatchAppend(nextIndex, request, pair.getValue());
} else {
//处理写入请求
handleDoAppend(nextIndex, request, pair.getValue());
}
}
} catch (Throwable t) {
DLedgerEntryPusher.logger.error("Error in {}", getName(), t);
DLedgerUtils.sleep(100);
}
}
}
appendAsFollower
appendAsFollower的流程与appendAsLeader的流程类型
private void handleDoAppend(long writeIndex, PushEntryRequest request,
CompletableFuture<PushEntryResponse> future) {
try {
PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
//写入日志
DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());
PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
System.out.println(String.format("handleDoAppend: thread=%s, self=%s, nextIndex=%s, indexPos=%s",
Thread.currentThread().getName(), memberState.getSelfId(), entry.getIndex(), entry.getPos()));
updateCommittedIndex(request.getTerm(), request.getCommitIndex());
} catch (Throwable t) {
logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t);
future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
}
}
流程图
1.EntryDispatcher线程: 处理向follower同步数据
2.QuorumAckChecker线程: 处理半数peer写入成功返回
3.EntryHandler线程: 接收EntryDispatcher发送的同步请求写入数据
4.WaterMark(peerWaterMarksByTerm):当前term下peer同步的index序号
在代码中以Map的方式存储
Map<Long, ConcurrentMap<String, Long>> peerWaterMarksByTerm