Elasticsearch底层原理分析——新建、索引文档

news2024/11/18 9:40:44

es版本

8.1.0

重要概念回顾

Elasticsearch Node的角色

与下文流程相关的角色介绍:

Node Roles配置主要功能说明
masternode.roles: [ master ]有资格参与选举成为master节点,从而进行集群范围的管理工作,如创建或删除索引、跟踪哪些节点是集群的一部分以及决定将哪些分片分配给哪些节点等
datanode.roles: [ data ]数据节点保存已索引的文档的分片。处理数据相关操作,例如 CRUD、搜索和聚合。
node.roles: [ ]节点不填任何角色,则是协调节点,换言之每个节点,也都有协调节点功能。具备路由请求、对搜索结果合并和分发批量索引等功能。本质上,协调节点的行为就像智能负载均衡器

详见:https://www.elastic.co/guide/en/elasticsearch/reference/8.9/modules-node.html

分片

  • 一个分片是一个 Lucene 的实例,是一个完整的搜索引擎
  • 主分片的数量决定了索引最多能存储多少数据,多分片机制,带来存储量级提升
  • 主分片数不可更改,和数据路由算法有关
  • 副本分片可以防止硬件故障导致的数据丢失,同时可以提供读请求,增大能处理的搜索吞吐量
  • 对文档的新建、索引和删除请求都是写操作,必须在主分片上面完成,之后才能被复制到相关的副本分片

https://www.elastic.co/guide/cn/elasticsearch/guide/current/_add-an-index.html

新建、索引和删除文档

以官网(https://www.elastic.co/guide/cn/elasticsearch/guide/current/distrib-write.html)例子分析,es集群有3个节点,其中有个索引有两分片(P0、P1),两副本(P0、R0、R0,P1、R1、R1),如创建索引时:

PUT /blogs
{
   "settings" : {
      "number_of_shards" : 2,
      "number_of_replicas" : 2
   }
}

在这里插入图片描述

再对一些前提知识回顾一下:

  • 每个节点都具备协调节点功能,也即路由请求、对搜索结果合并和分发批量索引等功能
  • 对文档的新建、索引和删除请求等写操作,必须在主分片上面完成,之后才能被复制到相关的副本分片

这个例子中的两个假设:

  • 请求集群时,es采用的是随机轮询方法进行负载均衡,每个节点都有可能被请求到。在这个例子中,假设先请求到node1
  • 节点使用文档的 _id 确定文档属于分片 0

所以(直接引用官网步骤):

  • 客户端向 Node 1 发送新建、索引或者删除请求。
  • 节点使用文档的 _id 确定文档属于分片 0 。请求会被转发到 Node 3,因为分片 0 的主分片目前被分配在 Node 3 上。
  • Node 3 在主分片上面执行请求。如果成功了,它将请求并行转发到 Node 1 和 Node 2 的副本分片上。一旦所有的副本分片都报告成功, Node 3 将向协调节点报告成功,协调节点向客户端报告成功。

源码理解

如何确定文档属于哪个分片,请求转发哪个节点

获取分片ID是从TransportBulkAction类中开始调用开始

int shardId = docWriteRequest.route(indexRouting);

具体实现在IndexRouting类中。简述步骤就是:

  1. 对routing值进行Murmur3Hash运算(如果没有设置routing,值默认是doc id值)
  2. 对hash后的值进行取模运算,routingNumShards默认1024,routingFactor默认512
protected int shardId(String id, @Nullable String routing) {
    return hashToShardId(effectiveRoutingToHash(routing == null ? id : routing));
}

protected final int hashToShardId(int hash) {
   return Math.floorMod(hash, routingNumShards) / routingFactor;
}

private static int effectiveRoutingToHash(String effectiveRouting) {
    return Murmur3HashFunction.hash(effectiveRouting);
}
为何需要路由,以及路由带来什么问题
  1. 为何需要路由
    总的来说,就是多分片设计,可以承载更大量级数据,而分片预分配设计,可以简单的获取文档位置,能减少数据分裂风险,以及对数据重新索引友好
    https://www.elastic.co/guide/cn/elasticsearch/guide/current/overallocation.html
  2. 带来的问题:

创建索引的时候就需要确定好主分片的数量,并且永远不会改变这个数量。因为如果数量变化了,那么所有之前路由的值都会无效,文档也再也找不到了。
https://www.elastic.co/guide/cn/elasticsearch/guide/current/routing-value.html

如何根据分片ID确定节点

代码在TransportReplicationAction#doRun方法中,简单概括就是state中存有集群信息,通过传入分片ID,先获取主分片信息,再通过主分片节点ID,获取对应节点信息。

final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
if (primary == null || primary.active() == false) {
    logger.trace(
        "primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
            + "cluster state version [{}]",
        request.shardId(),
        actionName,
        request,
        state.version()
    );
    retryBecauseUnavailable(request.shardId(), "primary shard is not active");
    return;
}
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
    logger.trace(
        "primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
            + "cluster state version [{}]",
        request.shardId(),
        primary.currentNodeId(),
        actionName,
        request,
        state.version()
    );
    retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
    return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
    performLocalAction(state, primary, node, indexMetadata);
} else {
    performRemoteAction(state, primary, node);
}
主分片执行流程
1. 写一致性

默认写成功一个主分片即可,源码在ActiveShardCount#enoughShardsActive方法中

https://www.elastic.co/guide/en/elasticsearch/client/curator/current/option_wait_for_active_shards.html

    public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable) {
        final int activeShardCount = shardRoutingTable.activeShards().size();
        if (this == ActiveShardCount.ALL) {
            // adding 1 for the primary in addition to the total number of replicas,
            // which gives us the total number of shard copies
            return activeShardCount == shardRoutingTable.replicaShards().size() + 1;
        } else if (this == ActiveShardCount.DEFAULT) {
            return activeShardCount >= 1;
        } else {
            return activeShardCount >= value;
        }
    }
2. 具体写流程

参考官网(https://www.elastic.co/guide/cn/elasticsearch/guide/current/translog.html)理解。图片所示是一个lucene索引,lucene索引下面有三个段(segment),图中Searchable表示从内存(In-memory buffer,也叫Indexing Buffer)刷新到磁盘,写入物理文件,不可更改,其中fsync操作将新文档刷新到磁盘的操作,性能代价是很大的。所以会先将文档写入文件系统缓存中,也即图中In-memory buffer中,对应的是 Indexing Buffer(https://www.elastic.co/guide/en/elasticsearch/reference/8.10/indexing-buffer.html)。
所以流程是:

  • 将文档写入Indexing Buffer中
  • 将操作追加写入 translog 中,以便确保即便在刷盘时异常,也能从失败中恢复数据
  • 将内存中的数据刷新持久化到磁盘中(默认情况下每个分片会每秒自动刷新一次)
  • 在刷新(flush)之后,段被全量提交,并且事务日志被清空

在这里插入图片描述

在这里插入图片描述

相关源码

主要在InternalEngine类中:

  • index方法包含写入In-memory buffer对应生成IndexResult(?)和写translog;
  • 将内存中的数据刷新持久化到磁盘中在refresh方法;
  • 段被全量提交,和事务日志被清空在flush方法。
index方法
    public IndexResult index(Index index) throws IOException {
        // 确保传入的文档的唯一标识是 IdFieldMapper
        assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
        // 检查 index 的来源是否不是恢复操作
        final boolean doThrottle = index.origin().isRecovery() == false;
        // 获取读锁
        try (ReleasableLock releasableLock = readLock.acquire()) {
            // 确保引擎处于打开状态
            ensureOpen();
            // 断言传入的 index 的序列号符合预期
            assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
            int reservedDocs = 0;
            try (
                Releasable ignored = versionMap.acquireLock(index.uid().bytes());
                Releasable indexThrottle = doThrottle ? throttle.acquireThrottle() : () -> {}
            ) {
                lastWriteNanos = index.startTime();
                // 代码中有一段注释,描述了关于追加(append-only)优化的注意事项。根据注释所述,如果引擎接收到一个带有自动生成的ID的文档,
                // 可以优化处理并直接使用 addDocument 而不是 updateDocument,从而跳过版本和索引查找。此外,还使用文档的时间戳来检测是否可能已经添加过该文档。
                // 获取索引策略
                final IndexingStrategy plan = indexingStrategyForOperation(index);
                reservedDocs = plan.reservedDocs;

                final IndexResult indexResult;
                if (plan.earlyResultOnPreFlightError.isPresent()) {
                    assert index.origin() == Operation.Origin.PRIMARY : index.origin();
                    indexResult = plan.earlyResultOnPreFlightError.get();
                    assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
                } else {
                    // generate or register sequence number
                    // 生成或注册文档的序列号。对于主分片的操作,会生成新的序列号。
                    if (index.origin() == Operation.Origin.PRIMARY) {
                        index = new Index(
                            index.uid(),
                            index.parsedDoc(),
                            // 生成新的序列号
                            generateSeqNoForOperationOnPrimary(index),
                            index.primaryTerm(),
                            index.version(),
                            index.versionType(),
                            index.origin(),
                            index.startTime(),
                            index.getAutoGeneratedIdTimestamp(),
                            index.isRetry(),
                            index.getIfSeqNo(),
                            index.getIfPrimaryTerm()
                        );

                        // 检查了当前操作是否应该追加到 Lucene 索引中
                        final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
                        if (toAppend == false) {
                            // 更新主分片的最大序列号
                            advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());
                        }
                    } else {
                        // 对于副本分片的操作,会标记已经见过的序列号,序列号已经被使用。
                        markSeqNoAsSeen(index.seqNo());
                    }

                    assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();

                    if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                        // 写到 Lucene 中
                        indexResult = indexIntoLucene(index, plan);
                    } else {
                        indexResult = new IndexResult(
                            plan.versionForIndexing,
                            index.primaryTerm(),
                            index.seqNo(),
                            plan.currentNotFoundOrDeleted
                        );
                    }
                }
                // 判断索引操作是否来自 Translog。如果是来自 Translog 的操作,就不再处理,因为这已经是一个已经被记录的操作
                if (index.origin().isFromTranslog() == false) {
                    final Translog.Location location;
                    if (indexResult.getResultType() == Result.Type.SUCCESS) {
                        // 如果索引操作成功, 将该操作添加到 Translog 中,并获取 Translog 的位置
                        location = translog.add(new Translog.Index(index, indexResult));
                    } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
                        // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
                        // 如果索引操作失败,并且具有序列号, 则将失败的操作记录为一个 no-op 操作
                        final NoOp noOp = new NoOp(
                            indexResult.getSeqNo(),
                            index.primaryTerm(),
                            index.origin(),
                            index.startTime(),
                            indexResult.getFailure().toString()
                        );
                        location = innerNoOp(noOp).getTranslogLocation();
                    } else {
                        // 如果索引操作失败,并且没有序列号,将 location 设置为 null
                        location = null;
                    }
                    // 设置Translog 位置
                    indexResult.setTranslogLocation(location);
                }
                // 如果索引操作成功且需要写入 Lucene, 则获取 Translog 的位置信息,用于更新版本映射
                if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
                    final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
                    versionMap.maybePutIndexUnderLock(
                        index.uid().bytes(),
                        new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
                    );
                }
                // 本地 Checkpoint 的更新, 标记当前序列号已经被处理
                localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
                if (indexResult.getTranslogLocation() == null) {
                    // the op is coming from the translog (and is hence persisted already) or it does not have a sequence number
                    // 如果 Translog 的位置信息为 null,说明该操作来自于 Translog,已经被持久化,或者该操作没有序列号。
                    // 在这种情况下,标记当前序列号已经被持久化
                    assert index.origin().isFromTranslog() || indexResult.getSeqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO;
                    localCheckpointTracker.markSeqNoAsPersisted(indexResult.getSeqNo());
                }
                indexResult.setTook(System.nanoTime() - index.startTime());
                // 将操作结果冻结,确保其不可变
                indexResult.freeze();
                return indexResult;
            } finally {
                releaseInFlightDocs(reservedDocs);
            }
        } catch (RuntimeException | IOException e) {
            try {
                if (e instanceof AlreadyClosedException == false && treatDocumentFailureAsTragicError(index)) {
                    failEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
                } else {
                    maybeFailEngine("index id[" + index.id() + "] origin[" + index.origin() + "] seq#[" + index.seqNo() + "]", e);
                }
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw e;
        }
    }
自动sync条件translog条件:

相关配置:

  • index.translog.sync_interval: 默认5s
  • index.translog.durability:默认配置的是request,即每次写请求完成之后执行(e.g. index, delete, update, bulk)
  • index.translog.flush_threshold_size:默认512mb

https://www.elastic.co/guide/cn/elasticsearch/guide/current/translog.html
https://www.elastic.co/guide/en/elasticsearch/reference/8.11/index-modules-translog.html

private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
    if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST
        && indexShard.getLastSyncedGlobalCheckpoint() < indexShard.getLastKnownGlobalCheckpoint()) {
        indexShard.sync();
    }
}
refresh源码:
    final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException {
        // both refresh types will result in an internal refresh but only the external will also
        // pass the new reader reference to the external reader manager.
        // 获取当前的本地检查点
        final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
        boolean refreshed;
        try {
            // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
            // 尝试增加存储的引用计数,以确保在刷新期间没有人关闭存储
            if (store.tryIncRef()) {
                // increment the ref just to ensure nobody closes the store during a refresh
                try {
                    // even though we maintain 2 managers we really do the heavy-lifting only once.
                    // the second refresh will only do the extra work we have to do for warming caches etc.
                    ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
                    // it is intentional that we never refresh both internal / external together
                    if (block) {
                        referenceManager.maybeRefreshBlocking();
                        refreshed = true;
                    } else {
                        refreshed = referenceManager.maybeRefresh();
                    }
                } finally {
                    // 减少存储的引用计数
                    store.decRef();
                }
                if (refreshed) {
                    lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);
                }
            } else {
                refreshed = false;
            }
        } catch (AlreadyClosedException e) {
            failOnTragicEvent(e);
            throw e;
        } catch (Exception e) {
            try {
                failEngine("refresh failed source[" + source + "]", e);
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw new RefreshFailedEngineException(shardId, e);
        }
        assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh
            : "refresh checkpoint was not advanced; "
                + "local_checkpoint="
                + localCheckpointBeforeRefresh
                + " refresh_checkpoint="
                + lastRefreshedCheckpoint();
        // TODO: maybe we should just put a scheduled job in threadPool?
        // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
        // for a long time:
        maybePruneDeletes();
        mergeScheduler.refreshConfig();
        return refreshed;
    }
flush源码:

执行条件主要在这段注释里面:

// Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the
// newly created commit points to a different translog generation (can free translog),
// or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries.
    @Override
    public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
        // 确保引擎是打开的
        ensureOpen();
        if (force && waitIfOngoing == false) {
            // 如果强制执行 flush 但不等待正在进行的 flush 操作,抛出异常
            assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
            throw new IllegalArgumentException(
                "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing
            );
        }
        // 获取读锁
        try (ReleasableLock lock = readLock.acquire()) {
            ensureOpen();
            if (flushLock.tryLock() == false) {
                // if we can't get the lock right away we block if needed otherwise barf
                if (waitIfOngoing == false) {
                    return;
                }
                logger.trace("waiting for in-flight flush to finish");
                flushLock.lock();
                logger.trace("acquired flush lock after blocking");
            } else {
                logger.trace("acquired flush lock immediately");
            }
            try {
                /**
                 * 1. Lucene 有未提交的文档: 如果 Lucene 索引中存在未提交的文档,即有尚未写入磁盘的更改。
                 * 2. 被调用者强制执行: 如果调用者明确要求执行 flush 操作,即 force 参数为 true。
                 * 3. 新创建的提交指向不同的 translog 生成: 当新创建的提交(commit)指向不同的 translog 生成时,执行 flush 操作。
                 * 这可能是因为 translog 已经占用了一定的空间,需要释放这些旧的 translog。
                 * 4. 上一次提交的本地检查点信息已过期: 如果上一次提交的段信息中的本地检查点信息已过期,这可能会导致未来的恢复操作变慢。
                 * 因此,需要执行 flush 操作来更新本地检查点信息。
                 */
                // 检查 Lucene 是否有未提交的更改。
                boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges();
                // 检查是否应定期执行 flush 操作
                boolean shouldPeriodicallyFlush = shouldPeriodicallyFlush();
                if (hasUncommittedChanges
                    || force
                    || shouldPeriodicallyFlush
                    // 检查是否本地检查点信息在上一次提交的段信息中过期,如果是,则触发 flush
                    || getProcessedLocalCheckpoint() > Long.parseLong(
                        lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
                    )) {
                    ensureCanFlush();
                    try {
                        // 滚动 translog 的生成
                        translog.rollGeneration();
                        logger.trace("starting commit for flush; commitTranslog=true");
                        // 提交索引写入器,包括在 Lucene 中提交未提交的文档,并将 translog 提交到持久存储。
                        commitIndexWriter(indexWriter, translog);
                        logger.trace("finished commit for flush");

                        // a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved
                        logger.debug(
                            "new commit on flush, hasUncommittedChanges:{}, force:{}, shouldPeriodicallyFlush:{}",
                            hasUncommittedChanges,
                            force,
                            shouldPeriodicallyFlush
                        );

                        // we need to refresh in order to clear older version values
                        // 强制刷新索引以清除旧的版本信息。
                        refresh("version_table_flush", SearcherScope.INTERNAL, true);
                        translog.trimUnreferencedReaders();
                    } catch (AlreadyClosedException e) {
                        failOnTragicEvent(e);
                        throw e;
                    } catch (Exception e) {
                        throw new FlushFailedEngineException(shardId, e);
                    }
                    // 刷新最后提交的段信息
                    refreshLastCommittedSegmentInfos();

                }
            } catch (FlushFailedEngineException ex) {
                maybeFailEngine("flush", ex);
                throw ex;
            } finally {
                flushLock.unlock();
            }
        }
        // We don't have to do this here; we do it defensively to make sure that even if wall clock time is misbehaving
        // (e.g., moves backwards) we will at least still sometimes prune deleted tombstones:
        if (engineConfig.isEnableGcDeletes()) {
            pruneDeletedTombstones();
        }
    }
    protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
        // 确保引擎的状态是允许刷新的
        ensureCanFlush();
        try {
            // 获取已处理的本地检查点
            final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
            writer.setLiveCommitData(() -> {
                
                final Map<String, String> commitData = new HashMap<>(8);
                // 添加 translog 的 UUID 到提交数据中
                commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
                // 添加本地检查点到提交数据中
                commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
                // 添加最大序列号到提交数据中
                commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
                // 添加最大不安全自动生成的 ID 时间戳到提交数据中
                commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
                // 添加历史 UUID 到提交数据中
                commitData.put(HISTORY_UUID_KEY, historyUUID);
                final String currentForceMergeUUID = forceMergeUUID;
                if (currentForceMergeUUID != null) {
                    //  如果强制合并 UUID 存在,则添加到提交数据中
                    commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
                }
                // 添加最小保留序列号到提交数据中
                commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
                commitData.put(ES_VERSION, Version.CURRENT.toString());
                logger.trace("committing writer with commit data [{}]", commitData);
                return commitData.entrySet().iterator();
            });
            shouldPeriodicallyFlushAfterBigMerge.set(false);
            // 调用Lucene 会将所有未提交的文档写入磁盘,生成新的段
            writer.commit();
        } catch (final Exception ex) {
            try {
                failEngine("lucene commit failed", ex);
            } catch (final Exception inner) {
                ex.addSuppressed(inner);
            }
            throw ex;
        } catch (final AssertionError e) {
            /*
             * If assertions are enabled, IndexWriter throws AssertionError on commit if any files don't exist, but tests that randomly
             * throw FileNotFoundException or NoSuchFileException can also hit this.
             */
            if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
                final EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
                try {
                    failEngine("lucene commit failed", engineException);
                } catch (final Exception inner) {
                    engineException.addSuppressed(inner);
                }
                throw engineException;
            } else {
                throw e;
            }
        }
    }
写副本

副本在写入数据到 translog 后就可以返回了。源码主要在ReplicationOperation类中

@Override
public void tryAction(ActionListener<ReplicaResponse> listener) {
    replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener);
}

处理结束给协调节点返回消息

 @Override
 public void onResponse(Void aVoid) {
     successfulShards.incrementAndGet();
     try {
         updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
     } finally {
         decPendingAndFinishIfNeeded();
     }
 }

参考:
https://www.elastic.co/guide/cn/elasticsearch/guide/current/translog.html
https://www.golangblogs.com/read/elasticsearch/date-2023.05.24.16.58.36?wd=Elasticsearch
《Elasticsearch源码解析与优化实战》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1264392.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nacos2.x配置中心源码分析

概述 源码注释参考 git 仓库&#xff0c;对应流程图后续补充&#xff1b; 启动 nacos nacos 启动类&#xff1a; // com.alibaba.nacos.NacosSpringBootApplication(scanBasePackages "com.alibaba.nacos") ServletComponentScan EnableScheduling public class…

Django < 2.0.8 在 CommonMiddleware 中打开重定向的可能性 (CVE-2018-14574)

漏洞描述 如果django.middleware.common.CommonMiddleware和APPEND_SLASH设置都已启用&#xff0c;并且项目的 URL 模式接受任何以斜杠结尾的路径&#xff0c;则对该网站的恶意制作的 URL 的请求可能会导致重定向到另一个网站&#xff0c;从而启用网络钓鱼和其他攻击。 漏洞环…

带着GPT-4V(ision)上路,自动驾驶新探索

On the Road with GPT-4V(ision): Early Explorations of Visual-Language Model on Autonomous Driving GitHub | https://github.com/PJLab-ADG/GPT4V-AD-Exploration arXiv | https://arxiv.org/abs/2311.05332 自动驾驶技术的追求取决于对感知、决策和控制系统的复杂集成。…

第八节HarmonyOS @Component自定义组件的生命周期

在开始之前&#xff0c;我们先明确自定义组件和页面的关系&#xff1a; 1、自定义组件&#xff1a;Component装饰的UI单元&#xff0c;可以组合多个系统组件实现UI的复用。 2、页面&#xff1a;即应用的UI页面。可以由一个或者多个自定义组件组成&#xff0c;Entry装饰的自定…

消息队列进阶-1.消息队列的应用场景与选型

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring源码、JUC源码、Kafka原理&#x1f525;如果感觉博主的文章还不错的话&#xff0c;请&#x1f44…

Nacos源码本地搭建流程及目录结构解读

下载地址 https://github.com/alibaba/nacos 目录结构 本地单机启动 首先maven编译完成之后在console下面找到Nacos 这个就是主启动类 然后再vm中配置参数-Dnacos.standalonetrue表示单机启动 当控制台没有报错 访问 http://localhost:8848/nacos 控制台界面登录进来之后显…

LLM能力与应用全解析

一、简介 经过几年时间的发展&#xff0c;大语言模型&#xff08;LLM&#xff09;已经从新兴技术发展为主流技术。而以大模型为核心技术的产品将迎来全新迭代。大模型除了聊天机器人应用外&#xff0c;能否在其他领域产生应用价值&#xff1f;在回答这个问题前&#xff0c;需要…

澳大利亚访问学者子女入学政策-附实例体会

很多访问学者出国交流时&#xff0c;希望子女携签&#xff0c;一起到异国体验不同的生活方式&#xff0c;拓宽视野&#xff0c;增加认知。如果能免费入读当地的公立中小学&#xff0c;还可以获得自然习得英语的机会。那么澳大利亚的访问学者能否达到这一目的&#xff1f;需要准…

TiDB 7.x 源码编译之 TiDB Server 篇,及新特性详解

本文将介绍如何编译 TiDB Server 源码。以及阐释 TiDB Server 7.x 的部分新特性。 TiDB v7.5.0 LTS 计划于 2023 年 11 月正式 Release&#xff0c;目前代码虽未冻结&#xff0c;但已经可以看到 Alpha 版本的 Code 了&#xff0c;本文代码将以 v7.5.0-alpha 为基准。 TiDB Se…

【substance painter】如何制作一个生锈磨损的枪

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

智能优化算法应用:基于花授粉算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于花授粉算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于花授粉算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.花授粉算法4.实验参数设定5.算法结果6.参考文献7.…

C++ string类(二)

insert&#xff1a; erase&#xff1a; 常见用法&#xff1a; int main() {string s1("hello world");string s2("gm");s1.insert(5,"x");cout << s1 << endl;s1.insert(6,s1,0);cout << s1 << endl;s1.insert(0,&qu…

conda环境下 ERROR: CMake must be installed to build dlib问题解决

1 问题描述 pip install -r requirements.txt 在构建video_retalking项目过程中&#xff0c;使用命令安装依赖包时&#xff0c;出现如下错误&#xff1a; Building wheels for collected packages: face-alignment, dlib, ffmpy, futureBuilding wheel for face-alignment …

Unity中Shader的BRDF解析(三)

文章目录 前言一、BRDF中的镜面反射项二、分别解析每一个参数1、D、G函数&#xff1a;speclarTerm2、其他中间步骤3、光照颜色4、F函数&#xff08;菲涅尔函数&#xff09; &#xff1a;FresnelTermIBL在下篇文章中继续解析 三、最终代码.cginc文件:Shader文件&#xff1a; 前言…

ASCII值对照表

ASCII码是一种7位编码&#xff0c;但它存放时必须占全1个字节&#xff0c;也即占用8位&#xff0c;最高位为0&#xff0c;其余7位表示ASCII码。 ASCII 码使用指定的7 位或8 位二进制数组合来表示128 或256 种可能的字符包括所有的大写和小写字母&#xff0c;数字0 到9、标点符…

富富集网络图绘制教程

本期教程 前言 今天学习aPEAR包&#xff0c;绘制KEGG和GO功能富集网络图&#xff0c;用起来还是比较方便的&#xff0c;直接将clusterProfiler富集结果进行绘制&#xff0c;对人类、动物等分析结果非常方便。对于模式植物&#xff0c;使用自己制作的GO或KEGG背景文件进行富集分…

5款最常用的Android测试框架(含代码示例)

前言 今天&#xff0c;我们就要说说5款最常用的Android测试框架&#xff0c;并且每个框架都给出了基本的代码示例。 在这我为大家准备了一份软件测试视频教程&#xff08;含面试、接口、自动化、性能测试等&#xff09;&#xff0c;就在下方&#xff0c;需要的可以直接去观看…

Django二转Day02

http #1 http 是什么#2 http特点#3 请求协议详情 -请求首行---》请求方式&#xff0c;请求地址&#xff0c;请求协议版本 -请求头---》key:value形式 -referer&#xff1a;上一次访问的地址 -user-agenet&#xff1a;客户端类型 -name&#x…

Python编程控制Android手机操作技巧示例代码

文章目录 你应该拥有的东西截图TemplateMatching 滑动打电话给某人手机录屏打开手机发送 Whatsapp 消息关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小游…

HttpRunner原来还能这么用,大开眼界!!!

hook机制 Httprunner 框架中的 hook 机制相当于unittest框架中的 setup , teardown 函数&#xff0c;用来进行测试用例执行之前的环境初始化以及测试用例执行完毕之后的环境清理操作。 httprunner 中的 hooks 机制可以用在测试用例层级也可以用在测试步骤层级&#xff0c;其关键…