集群状态发布
cluster模块封装了在集群层面执行的任务,如集群健康、集群级元信息管理、分片分配给节点、节点管理等。集群任务执行之后可能会产生新的集群状态,如果产生新的集群状态主节点会将集群状态广播给其他节点。
集群状态封装在clusterState中,支持增量同步
提交集群任务的主要时机有以下几种:
- 索引的创建、删除、打开、关闭
- 索引模板、映射、别名的变化
- gateway模块发布选举出来的集群状态
- 快照
- 分片分配
- 集群节点变化等
提交集群任务入口在ClusterService的submitStateUpdateTask方法,第一个参数是事件来源,第二个参数是要执行的具体任务
public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener>
void submitStateUpdateTask(String source, T updateTask) {
submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);
}
public <T> void submitStateUpdateTask(String source, T task,
ClusterStateTaskConfig config,
ClusterStateTaskExecutor<T> executor,
ClusterStateTaskListener listener) {
submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);
}
public <T> void submitStateUpdateTasks(final String source,
final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor) {
masterService.submitStateUpdateTasks(source, tasks, config, executor);
}
最有代表性的任务是ClusterStateUpdateTask,它实现了ClusterStateTaskConfig、ClusterStateTaskExecutor
public abstract class ClusterStateUpdateTask
implements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener {
ClusterStateTaskConfig包含了任务的配置信息和优先级
TimeValue timeout();
Priority priority();
ClusterStateTaskExecutor主要是定义要执行的任务,最主要的方法就是execute方法
ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
任务执行时会传入当前集群状态,任务运行过程中如果产生新的集群状态就返回新的集群状态,如果没有就返回原来的集群状态
ClusterStateTaskListener主要是提交任务后的回调处理
/**
* A callback called when execute fails.
*/
void onFailure(String source, Exception e);
/**
* called when the task was rejected because the local node is no longer master.
* Used only for tasks submitted to {@link MasterService}.
*/
default void onNoLongerMaster(String source) {
onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));
}
/**
* Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed
* properly by all listeners.
*/
default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
}
MasterService主要负责集群任务管理和运行,只有主节点会提交集群任务到内部队列,并运行队列中的任务
public <T> void submitStateUpdateTasks(final String source,
final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor) {
if (!lifecycle.started()) {
return;
}
final ThreadContext threadContext = threadPool.getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
threadContext.markAsSystemContext();
//封装任务
List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream()
.map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor))
.collect(Collectors.toList());
//提交任务
taskBatcher.submitTasks(safeTasks, config.timeout());
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;
}
}
}
public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
if (tasks.isEmpty()) {
return;
}
final BatchedTask firstTask = tasks.get(0);
assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :
"tasks submitted in a batch should share the same batching key: " + tasks;
// convert to an identity map to check for dups based on task identity
//根据任务标识检查重复数据
final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(
BatchedTask::getTask,
Function.identity(),
(a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },
IdentityHashMap::new));
synchronized (tasksPerBatchingKey) {
//添加相同batchingKey的任务,返回已存在batchingKey的任务
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,
k -> new LinkedHashSet<>(tasks.size()));
//检查是否存在相同batchingKey的任务
for (BatchedTask existing : existingTasks) {
// check that there won't be two tasks with the same identity for the same batching key
BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
if (duplicateTask != null) {
throw new IllegalStateException("task [" + duplicateTask.describeTasks(
Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");
}
}
existingTasks.addAll(tasks);
}
//执行任务
if (timeout != null) {
threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
} else {
threadExecutor.execute(firstTask);
}
}
这里有去重逻辑,拥有相同ClusterStateTaskExecutor对象实例的任务只会执行一次,然后对于其他相同的实例直接赋值相同的执行结果。区分重复任务的方式时通过定义的任务本身,去重的方式不是将重复的数据删除,而是在执行完任务后赋予重复任务相同的结果。
ClusterStateTaskExecutor相同有两种情况可能是提交的任务本身重复,还有就是之前提交的任务已存在,但是尚未执行此时提交相同的任务就会保存到对应的列表中,只会执行一次
任务会被封装到UpdateTask中
class UpdateTask extends BatchedTask {
final ClusterStateTaskListener listener;
UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,
ClusterStateTaskExecutor<?> executor) {
super(priority, source, executor, task);
this.listener = listener;
}
@Override
public String describeTasks(List<? extends BatchedTask> tasks) {
return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(
tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));
}
}
提交到线程池运行调用run方法
@Override
public void run() {
//运行还没处理的任务
runIfNotProcessed(this);
}
void runIfNotProcessed(BatchedTask updateTask) {
//具有相同batching key的任务只会执行一次
if (updateTask.processed.get() == false) {
final List<BatchedTask> toExecute = new ArrayList<>();
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
synchronized (tasksPerBatchingKey) {
//获取任务列表
LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
if (pending != null) {
for (BatchedTask task : pending) {
if (task.processed.getAndSet(true) == false) {
logger.trace("will process {}", task);
//构建要执行的任务列表
toExecute.add(task);
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
} else {
logger.trace("skipping {}, already processed", task);
}
}
}
}
if (toExecute.isEmpty() == false) {
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
//执行任务
run(updateTask.batchingKey, toExecute, tasksSummary);
}
}
}
执行任务并发布集群状态的逻辑在MasterService中
@Override
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
//运行任务,并发布集群状态
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));
}
private void runTasks(TaskInputs taskInputs) {
final String summary = taskInputs.summary;
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, master service not started", summary);
return;
}
logger.debug("executing cluster state update for [{}]", summary);
//之前集群状态
final ClusterState previousClusterState = state();
//只在主节点执行
if (!previousClusterState.nodes().isLocalNodeElectedMaster() && taskInputs.runOnlyWhenMaster()) {
logger.debug("failing [{}]: local node is no longer master", summary);
taskInputs.onNoLongerMaster();
return;
}
final long computationStartTime = threadPool.relativeTimeInMillis();
//执行task任务生成新的集群状态
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", summary);
if (taskOutputs.clusterStateUnchanged()) {
final long notificationStartTime = threadPool.relativeTimeInMillis();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
} else {//集群状态发生改变
final ClusterState newClusterState = taskOutputs.newClusterState;
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
}
final long publicationStartTime = threadPool.relativeTimeInMillis();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodesDeltaSummary = nodesDelta.shortSummary();
if (nodesDeltaSummary.length() > 0) {
logger.info("{}, term: {}, version: {}, delta: {}",
summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary);
}
}
logger.debug("publishing cluster state version [{}]", newClusterState.version());
//发布集群状态
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
} catch (Exception e) {
handleException(summary, publicationStartTime, newClusterState, e);
}
}
}
执行方法前判断是不是主节点因为只有主节点可以运行集群任务,根据执行任务前的集群状态执行任务生成新的集群状态
执行任务获取任务执行结果,并生成新的集群状态
private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {
//执行提交的任务,并且返回新的集群状态
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);
//根据分配分片结果生成新的集群状态
ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
return new TaskOutputs(taskInputs, previousClusterState, newClusterState, getNonFailedTasks(taskInputs, clusterTasksResult),
clusterTasksResult.executionResults);
}
获取任务列表,调用executor的execute方法
private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) {
ClusterTasksResult<Object> clusterTasksResult;
try {
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
//执行任务,并返回新的集群状态
clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);
if (previousClusterState != clusterTasksResult.resultingState &&
previousClusterState.nodes().isLocalNodeElectedMaster() &&
(clusterTasksResult.resultingState.nodes().isLocalNodeElectedMaster() == false)) {
throw new AssertionError("update task submitted to MasterService cannot remove master");
}
} catch (Exception e) {
......
clusterTasksResult = ClusterTasksResult.builder()
.failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e)
.build(previousClusterState);
}
......
return clusterTasksResult;
}
这里我们以gateway恢复集群状态为例
ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
@Override
public final ClusterTasksResult<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks)
throws Exception {
//执行集群状态变更task,并且返回执行之后的集群状态结果
ClusterState result = execute(currentState);
return ClusterTasksResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result);
}
@Override
public void onSuccess(final ClusterState recoveredState) {
logger.trace("successful state recovery, importing cluster state...");
clusterService.submitStateUpdateTask("local-gateway-elected-state",
new RecoverStateUpdateTask() {
@Override
public ClusterState execute(final ClusterState currentState) {
final ClusterState updatedState = ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState);
return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState));
}
});
}
@Override
public ClusterState execute(final ClusterState currentState) {
if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
logger.debug("cluster is already recovered");
return currentState;
}
//状态信息恢复完成
final ClusterState newState = Function.<ClusterState>identity()
.andThen(ClusterStateUpdaters::updateRoutingTable)
.andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock)
.apply(currentState);
//开始分配分片
return allocationService.reroute(newState, "state recovered");
}
生成新的集群状态,开始分配分片,根据之前的集群状态和新生成的结果构造新的集群状态
private ClusterState patchVersions(ClusterState previousClusterState, ClusterTasksResult<?> executionResult) {
//新的集群状态
ClusterState newClusterState = executionResult.resultingState;
if (previousClusterState != newClusterState) {
// only the master controls the version numbers
//生成新的集群状态版本号,递增的
Builder builder = incrementVersion(newClusterState);
//路由表发生了改变,也就是分片信息发送了改变,分片-node
if (previousClusterState.routingTable() != newClusterState.routingTable()) {
builder.routingTable(RoutingTable.builder(newClusterState.routingTable())
.version(newClusterState.routingTable().version() + 1).build());
}
//集群元数据发生了改变
if (previousClusterState.metadata() != newClusterState.metadata()) {
builder.metadata(Metadata.builder(newClusterState.metadata()).version(newClusterState.metadata().version() + 1));
}
newClusterState = builder.build();
}
return newClusterState;
}
回到MasterService的runTasks方法中新的集群状态已经生成并返回,然后判断集群状态和之前的集群状态是否相同,如果发生变化则将进入集群状态发布阶段,将最新的集群状态广播到所有节点
//发布集群状态
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {
final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() {
@Override
protected boolean blockingAllowed() {
return isMasterUpdateThread() || super.blockingAllowed();
}
};
//发布集群状态
clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()));
// indefinitely wait for publication to complete
//无限期等待发布完成
try {
FutureUtils.get(fut);
onPublicationSuccess(clusterChangedEvent, taskOutputs);
} catch (Exception e) {
onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e);
}
}
@Override
public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) {
//新的集群状态
ClusterState newState = clusterChangedEvent.state();
assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source();
try {
// state got changed locally (maybe because another master published to us)
if (clusterChangedEvent.previousState() != this.committedState.get()) {
throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update");
}
pendingStatesQueue.addPending(newState);
//发布集群状态
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (FailedToCommitClusterStateException t) {
// cluster service logs a WARN message
logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",
newState.version(), electMaster.minimumMasterNodes());
synchronized (stateMutex) {
pendingStatesQueue.failAllStatesAndClear(
new ElasticsearchException("failed to publish cluster state"));
rejoin("zen-disco-failed-to-publish");
}
publishListener.onFailure(t);
return;
}
final DiscoveryNode localNode = newState.getNodes().getLocalNode();
final AtomicBoolean processedOrFailed = new AtomicBoolean();
pendingStatesQueue.markAsCommitted(newState.stateUUID(),
new PendingClusterStatesQueue.StateProcessedListener() {
@Override
public void onNewClusterStateProcessed() {
processedOrFailed.set(true);
publishListener.onResponse(null);
ackListener.onNodeAck(localNode, null);
}
@Override
public void onNewClusterStateFailed(Exception e) {
processedOrFailed.set(true);
publishListener.onFailure(e);
ackListener.onNodeAck(localNode, e);
logger.warn(() -> new ParameterizedMessage(
"failed while applying cluster state locally [{}]", clusterChangedEvent.source()), e);
}
});
synchronized (stateMutex) {
if (clusterChangedEvent.previousState() != this.committedState.get()) {
publishListener.onFailure(
new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes")
);
return;
}
//经过二阶段提交状态已经发布到了集群,但不能保证所有节点都成功了,下面处理提交后的集群状态
boolean sentToApplier = processNextCommittedClusterState("master " + newState.nodes().getMasterNode() +
" committed version [" + newState.version() + "] source [" + clusterChangedEvent.source() + "]");
if (sentToApplier == false && processedOrFailed.get() == false) {
assert false : "cluster state published locally neither processed nor failed: " + newState;
logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed",
newState.version());
publishListener.onFailure(new FailedToCommitClusterStateException("cluster state that is published locally has neither " +
"been processed nor failed"));
}
}
}
首先准备发送集群状态的目标节点列表,剔除本节点。构建增量发布或全量发布集群状态,然后执行序列化并压缩,以便将状态发布出去
public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes,
final Discovery.AckListener ackListener) throws FailedToCommitClusterStateException {
final DiscoveryNodes nodes;
final SendingController sendingController;
final Set<DiscoveryNode> nodesToPublishTo;
final Map<Version, BytesReference> serializedStates;
final Map<Version, BytesReference> serializedDiffs;
final boolean sendFullVersion;
try {
//需要发送目的节点
nodes = clusterChangedEvent.state().nodes();
nodesToPublishTo = new HashSet<>(nodes.getSize());
DiscoveryNode localNode = nodes.getLocalNode();
final int totalMasterNodes = nodes.getMasterNodes().size();
for (final DiscoveryNode node : nodes) {
if (node.equals(localNode) == false) {
nodesToPublishTo.add(node);
}
}
sendFullVersion = !discoverySettings.getPublishDiff() || clusterChangedEvent.previousState() == null;
//全量状态
serializedStates = new HashMap<>();
//增量状态
serializedDiffs = new HashMap<>();
// we build these early as a best effort not to commit in the case of error.
// sadly this is not water tight as it may that a failed diff based publishing to a node
// will cause a full serialization based on an older version, which may fail after the
// change has been committed.
//构建序列化后的结果
buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(),
nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs);
//发布状态返回结果处理
final BlockingClusterStatePublishResponseHandler publishResponseHandler =
new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener);
sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes,
totalMasterNodes, publishResponseHandler);
} catch (Exception e) {
throw new FailedToCommitClusterStateException("unexpected error while preparing to publish", e);
}
try {
//发布
innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates,
serializedDiffs);
} catch (FailedToCommitClusterStateException t) {
throw t;
} catch (Exception e) {
// try to fail committing, in cause it's still on going
if (sendingController.markAsFailed("unexpected error", e)) {
// signal the change should be rejected
throw new FailedToCommitClusterStateException("unexpected error", e);
} else {
throw e;
}
}
}
全量状态保存在serializedStates,增量状态保存在serializedDiffs。每个集群状态都有自己为一个版本好,在发布集群状态时允许相邻版本好之间只发送增量内容
构造需要发送的状态,如果上次发布集群状态的节点不存在或设置了全量发送配置,则构建全量状态否则构建增量状态然后进行序列化并压缩
private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set<DiscoveryNode> nodesToPublishTo,
boolean sendFullVersion, Map<Version, BytesReference> serializedStates,
Map<Version, BytesReference> serializedDiffs) {
Diff<ClusterState> diff = null;
for (final DiscoveryNode node : nodesToPublishTo) {
try {
//发送全量
if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
// will send a full reference
if (serializedStates.containsKey(node.getVersion()) == false) {
serializedStates.put(node.getVersion(), serializeFullClusterState(clusterState, node.getVersion()));
}
} else {
//发送增量
// will send a diff
if (diff == null) {
diff = clusterState.diff(previousState);
}
if (serializedDiffs.containsKey(node.getVersion()) == false) {
serializedDiffs.put(node.getVersion(), serializeDiffClusterState(diff, node.getVersion()));
}
}
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster_state for publishing to node {}", e, node);
}
}
}
es使用二阶段提交来实现状态发布,第一步是push及先将状态数据发送到node节点,但不应用,如果得到超过半数的节点的返回确认,则执行第二步commit及发送提交请求,二阶段提交不能保证节点收到commit请求后可以正确应用,也就是它只能保证发了commit请求,但是无法保证单个节点上的状态应用是成功还是失败的
- push阶段发送集群状态数据
private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
final SendingController sendingController, final Discovery.AckListener ackListener,
final boolean sendFullVersion, final Map<Version, BytesReference> serializedStates,
final Map<Version, BytesReference> serializedDiffs) {
final ClusterState clusterState = clusterChangedEvent.state();
final ClusterState previousState = clusterChangedEvent.previousState();
//发布超时时间
final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
//发布起始时间
final long publishingStartInNanos = System.nanoTime();
//遍历节点异步发送全量或增量状态数据
for (final DiscoveryNode node : nodesToPublishTo) {
// try and serialize the cluster state once (or per version), so we don't serialize it
// per node when we send it over the wire, compress it while we are at it...
// we don't send full version if node didn't exist in the previous version of cluster state
//发生全量状态
if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
} else {
//发布增量状态
sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController);
}
}
//等待提交,等待第一阶段完成收到足够的响应或达到了超时时间
sendingController.waitForCommit(discoverySettings.getCommitTimeout());
final long commitTime = System.nanoTime() - publishingStartInNanos;
ackListener.onCommit(TimeValue.timeValueNanos(commitTime));
try {
long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - commitTime);
final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
if (sendingController.getPublishingTimedOut()) {
DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
// everyone may have just responded
if (pendingNodes.length > 0) {
logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})",
clusterState.version(), publishTimeout, pendingNodes);
}
}
// The failure is logged under debug when a sending failed. we now log a summary.
Set<DiscoveryNode> failedNodes = publishResponseHandler.getFailedNodes();
if (failedNodes.isEmpty() == false) {
logger.warn("publishing cluster state with version [{}] failed for the following nodes: [{}]",
clusterChangedEvent.state().version(), failedNodes);
}
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
}
}
无论是发送全量数据还是发送增量数据最终都会调用到sendClusterStateToNode方法
private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes,
final DiscoveryNode node,
final TimeValue publishTimeout,
final SendingController sendingController,
final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {
try {
//调用底层的传输层发送
transportService.sendRequest(node, SEND_ACTION_NAME,
new BytesTransportRequest(bytes, node.getVersion()),
stateRequestOptions,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
//发布超时
if (sendingController.getPublishingTimedOut()) {
logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node,
clusterState.version(), publishTimeout);
}
//检查收到的响应是否过半,然后执行commit
sendingController.onNodeSendAck(node);
}
@Override
public void handleException(TransportException exp) {
if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
} else {
logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);
sendingController.onNodeSendFailed(node, exp);
}
}
});
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
sendingController.onNodeSendFailed(node, e);
}
}
调用transportService的sendRequest方法异步发送数据,rpc请求为internal:discovery/zen/publish/send对应节点注册的处理器为SendClusterStateRequestHandler
//发送处理
transportService.registerRequestHandler(SEND_ACTION_NAME, ThreadPool.Names.SAME, false, false, BytesTransportRequest::new,
new SendClusterStateRequestHandler());
private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {
@Override
public void messageReceived(BytesTransportRequest request, final TransportChannel channel, Task task) throws Exception {
//处理状态变更请求
handleIncomingClusterStateRequest(request, channel);
}
}
protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {
Compressor compressor = CompressorFactory.compressor(request.bytes());
StreamInput in = request.bytes().streamInput();
final ClusterState incomingState;
synchronized (lastSeenClusterStateMutex) {
try {
if (compressor != null) {
in = compressor.streamInput(in);
}
in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);
in.setVersion(request.version());
// If true we received full cluster state - otherwise diffs
//true:全量状态,false:增量
if (in.readBoolean()) {
incomingState = ClusterState.readFrom(in, transportService.getLocalNode());
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),
request.bytes().length());
} else if (lastSeenClusterState != null) {
Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());
incomingState = diff.apply(lastSeenClusterState);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",
incomingState.version(), incomingState.stateUUID(), request.bytes().length());
} else {
logger.debug("received diff for but don't have any local cluster state - requesting full state");
throw new IncompatibleClusterStateVersionException("have no local cluster state");
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
} catch (Exception e) {
logger.warn("unexpected error while deserializing an incoming cluster state", e);
throw e;
} finally {
IOUtils.close(in);
}
//触发监听器
incomingClusterStateListener.onIncomingClusterState(incomingState);
lastSeenClusterState = incomingState;
}
//发送发回空结果
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
保存集群状态,然后返回空结果
继续回到主节点发送数据的回调函数中,检查响应是否足够
public synchronized void onNodeSendAck(DiscoveryNode node) {
if (committed) {//提交状态
assert sendAckedBeforeCommit.isEmpty();
sendCommitToNode(node, clusterState, this);
} else if (committedOrFailed()) {
logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version());
} else {
// we're still waiting
sendAckedBeforeCommit.add(node);
if (node.isMasterNode()) {
checkForCommitOrFailIfNoPending(node);
}
}
}
//检查返回ack的节点数,如果超过了半数就执行commit
private synchronized void checkForCommitOrFailIfNoPending(DiscoveryNode masterNode) {
logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])",
masterNode, clusterState.version(), pendingMasterNodes, neededMastersToCommit);
neededMastersToCommit--;
if (neededMastersToCommit == 0) {
if (markAsCommitted()) {
for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) {
sendCommitToNode(nodeToCommit, clusterState, this);
}
sendAckedBeforeCommit.clear();
}
}
decrementPendingMasterAcksAndChangeForFailure();
}
- commit阶段
接收到了足够的响应后开始执行commit逻辑
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
try {
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]",
clusterState.stateUUID(), clusterState.version(), node);
transportService.sendRequest(node, COMMIT_ACTION_NAME,
new CommitClusterStateRequest(clusterState.stateUUID()),
stateRequestOptions,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
if (sendingController.getPublishingTimedOut()) {
logger.debug("node {} responded to cluster state commit [{}]", node, clusterState.version());
}
sendingController.getPublishResponseHandler().onResponse(node);
}
@Override
public void handleException(TransportException exp) {
logger.debug(() -> new ParameterizedMessage("failed to commit cluster state (uuid [{}], version [{}]) to {}",
clusterState.stateUUID(), clusterState.version(), node), exp);
sendingController.getPublishResponseHandler().onFailure(node, exp);
}
});
} catch (Exception t) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state commit (uuid [{}], version [{}]) to {}",
clusterState.stateUUID(), clusterState.version(), node), t);
sendingController.getPublishResponseHandler().onFailure(node, t);
}
}
同样通过transportService发生RPC请求,内部请求的url为internal:discovery/zen/publish/commit
接收数据的节点注册的处理器为CommitClusterStateRequestHandler
//提交处理
transportService.registerRequestHandler(COMMIT_ACTION_NAME, ThreadPool.Names.SAME, false, false, CommitClusterStateRequest::new,
new CommitClusterStateRequestHandler());
//提交集群状态处理
private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {
@Override
public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel, Task task) throws Exception {
handleCommitRequest(request, channel);
}
}
节点应用集群状态
@Override
public void onClusterStateCommitted(String stateUUID, ActionListener<Void> processedListener) {
//更新提交新状态
final ClusterState state = pendingStatesQueue.markAsCommitted(stateUUID,
new PendingClusterStatesQueue.StateProcessedListener() {
@Override
public void onNewClusterStateProcessed() {
processedListener.onResponse(null);
}
@Override
public void onNewClusterStateFailed(Exception e) {
processedListener.onFailure(e);
}
});
if (state != null) {
synchronized (stateMutex) {
//应用新的集群状态
processNextCommittedClusterState("master " + state.nodes().getMasterNode() +
" committed version [" + state.version() + "]");
}
}
}
//集群应用新的集群状态
clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",
this::clusterState,
new ClusterApplyListener() {
@Override
public void onSuccess(String source) {
try {
pendingStatesQueue.markAsProcessed(newClusterState);
} catch (Exception e) {
onFailure(source, e);
}
}
@Override
public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected failure applying [{}]", reason), e);
try {
// TODO: use cluster state uuid instead of full cluster state so that we don't keep reference to CS around
// for too long.
pendingStatesQueue.markAsFailed(newClusterState, e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error(() -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);
}
}
});
最终调用到ClusterApplierService的runTask方法
private void runTask(UpdateTask task) {
if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
return;
}
logger.debug("processing [{}]: execute", task.source);
//获取之前的集群状态
final ClusterState previousClusterState = state.get();
//任务执行起始时间
long startTimeMS = currentTimeInMillis();
//简单的秒表,允许对许多任务进行计时
final StopWatch stopWatch = new StopWatch();
final ClusterState newClusterState;
try {
try (Releasable ignored = stopWatch.timing("running task [" + task.source + ']')) {
newClusterState = task.apply(previousClusterState);
}
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.trace(() -> new ParameterizedMessage(
"failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",
executionTime, previousClusterState.version(), task.source, previousClusterState), e);
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
task.listener.onFailure(task.source, e);
return;
}
if (previousClusterState == newClusterState) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
task.listener.onSuccess(task.source);
} else {
if (logger.isTraceEnabled()) {
logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source,
newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);
}
try {
//执行状态更新
applyChanges(task, previousClusterState, newClusterState, stopWatch);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,
executionTime, newClusterState.version(),
newClusterState.stateUUID());
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
task.listener.onSuccess(task.source);
} catch (Exception e) {
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
assert applicationMayFail();
task.listener.onFailure(task.source, e);
}
}
}
遍历所有状态应用者,调用集群状态的应用者的应用集群状态方法
//发送集群状态应用者
callClusterStateAppliers(clusterChangedEvent, stopWatch);
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
clusterStateAppliers.forEach(applier -> {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) {
applier.applyClusterState(clusterChangedEvent);
}
});
}
遍历所有集群状态监听器,调用集群状态变更回调函数
//发送集群状态监听器
callClusterStateListeners(clusterChangedEvent, stopWatch);
//执行集群状态变更后的回调
private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {
Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {
try {
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
listener.clusterChanged(clusterChangedEvent);
}
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
});
}
回到主节点执行回调函数handleResponse和handleException两个回调函数执行相同的处理逻辑,将latch减一,如果有的节点执行失败也不会执行修复逻辑。