(五)elasticsearch 源码之查询流程分析

news2025/1/5 1:39:46

https://www.cnblogs.com/darcy-yuan/p/17039526.html

1.概述

上文我们讨论了es(elasticsearch,下同)索引流程,本文讨论es查询流程,以下是基本流程图

2.查询流程

为了方便调试代码,笔者在电脑上启动了了两个节点,创建了一个索引如下,该索引有两个分片,没有复制分片

 使用postman发送如下请求:

 接下来,我们看代码(本系列文章源代码版本为7.4.0),search查询也是rest请求

// org.elasticsearch.action.support.TransportAction        

        public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
            int i = index.getAndIncrement();
            try {
                 if (i < this.action.filters.length) {
                    this.action.filters[i].apply(task, actionName, request, listener, this); // 先处理过滤器
                   } else if (i == this.action.filters.length) {
                      this.action.doExecute(task, request, listener); // 执行action操作
                } else {
                    listener.onFailure(new IllegalStateException("proceed was called too many times"));
                }
            } catch(Exception e) {
                logger.trace("Error during transport action execution.", e);
                listener.onFailure(e);
            }
        }

具体执行操作的是 TransportSearchAction,TransportSearchAction 对查询索引的顺序做了一些优化,我们这里用的是 QUERY_THEN_FETCH

// org.elasticsearch.action.search.TransportSearchAction    

    protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        final long relativeStartNanos = System.nanoTime();
        final SearchTimeProvider timeProvider =
             new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
        ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
            if (source != searchRequest.source()) {
                // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
                // situations when source is rewritten to null due to a bug
                searchRequest.source(source);
            }
            final ClusterState clusterState = clusterService.state();
            final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
                searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
            OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
            if (remoteClusterIndices.isEmpty()) {
                executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener); // 查询当前节点
            } else {
                if (shouldMinimizeRoundtrips(searchRequest)) { // 使用了折叠
                    ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, searchService::createReduceContext,
                        remoteClusterService, threadPool, listener,
                        (r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));
                } else {
                    AtomicInteger skippedClusters = new AtomicInteger(0);
                    collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
                        skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
                        ActionListener.wrap(
                            searchShardsResponses -> {
                                List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
                                Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
                                BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(
                                    searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
                                int localClusters = localIndices == null ? 0 : 1;
                                int totalClusters = remoteClusterIndices.size() + localClusters;
                                int successfulClusters = searchShardsResponses.size() + localClusters;
                                executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
                                    remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
                                    new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));
                            },
                            listener::onFailure));
                }
            }
        }, listener::onFailure);
        if (searchRequest.source() == null) {
            rewriteListener.onResponse(searchRequest.source());
        } else {
            Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
                rewriteListener); // 重写后回调
        }
    }

...
    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
                               OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,
                               BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
                               Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,
                               SearchResponse.Clusters clusters) {

        clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); // 读锁
        // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
        // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
        // of just for the _search api
        final Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider);
        Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
        Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
            searchRequest.indices());
        routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
        Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);

        if (shouldSplitIndices(searchRequest)) { // 分开查询只读索引和在写索引,并且优先查在写索引
            //Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible.
            //Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other
            //indices (possibly slower) being searched at the same time.
            List<String> writeIndicesList = new ArrayList<>();
            List<String> readOnlyIndicesList = new ArrayList<>();
            splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList);
            String[] writeIndices = writeIndicesList.toArray(new String[0]);
            String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]);

            if (readOnlyIndices.length == 0) {
                executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap,
                    aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
            } else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) {
                executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap,
                    aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
            } else {
                //Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so
                //that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices.
                CountDown countDown = new CountDown(2);
                AtomicReference<Exception> exceptions = new AtomicReference<>();
                SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider,
                    searchService::createReduceContext);
                CountDownActionListener<SearchResponse, SearchResponse> countDownActionListener =
                    new CountDownActionListener<SearchResponse, SearchResponse>(countDown, exceptions, listener) {
                        @Override
                        void innerOnResponse(SearchResponse searchResponse) {
                            searchResponseMerger.add(searchResponse);
                        }

                        @Override
                        SearchResponse createFinalResponse() {
                            return searchResponseMerger.getMergedResponse(clusters);
                        }
                    };

                //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
                //will be provided separately to executeSearch.
                SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices,
                    RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
                executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap,
                    aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener,
                    SearchResponse.Clusters.EMPTY);

                //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
                //will be provided separately to executeSearch.
                SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices,
                    RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
                executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap,
                    aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener,
                    SearchResponse.Clusters.EMPTY);
            }
        } else {
            String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new);
            executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap,
                aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
        }
    }

...
    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
                               OriginalIndices localIndices, String[] concreteIndices, Map<String, Set<String>> routingMap,
                               Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
                               List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections,
                               ClusterState clusterState, ActionListener<SearchResponse> listener, SearchResponse.Clusters clusters) {

        Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
        GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
                concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
        GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
            searchRequest.getLocalClusterAlias(), remoteShardIterators);

        failIfOverShardCountLimit(clusterService, shardIterators.size());

        // optimize search type for cases where there is only one shard group to search on
        if (shardIterators.size() == 1) {
            // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
            searchRequest.searchType(QUERY_THEN_FETCH); // 单个分片,不需要dfs了
        }
        if (searchRequest.allowPartialSearchResults() == null) {
           // No user preference defined in search request - apply cluster service default
            searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
        }
        if (searchRequest.isSuggestOnly()) {
            // disable request cache if we have only suggest
            searchRequest.requestCache(false);
            if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) {
                // convert to Q_T_F if we have only suggest
                searchRequest.searchType(QUERY_THEN_FETCH);
            }
        }

        final DiscoveryNodes nodes = clusterState.nodes();
        BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
            nodes::get, remoteConnections, searchTransportService::getConnection);
        boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators);
        searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
            Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start(); // 执行 SearchQueryThenFetchAsyncAction,异步处理
    }

 接下来执行 QUERY_THEN_FETCH的逻辑,从上面的时序图中我们看到 QUERY_THEN_FETCH主要分为四个阶段(phase),init, query, fetch, send response

// org.elasticsearch.action.search.AbstractSearchAsyncAction    

    private void executePhase(SearchPhase phase) {
        try {
            phase.run(); // 执行阶段
        } catch (Exception e) {
            if (logger.isDebugEnabled()) {
                logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
            }
            onPhaseFailure(phase, "", e);
        }
    }

首先是init阶段

// org.elasticsearch.action.search.AbstractSearchAsyncAction

    public final void run() {
        for (final SearchShardIterator iterator : toSkipShardsIts) {
            assert iterator.skip();
            skipShard(iterator);
        }
        if (shardsIts.size() > 0) {
             assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
            if (request.allowPartialSearchResults() == false) {
                final StringBuilder missingShards = new StringBuilder();
                // Fail-fast verification of all shards being available
                for (int index = 0; index < shardsIts.size(); index++) {
                    final SearchShardIterator shardRoutings = shardsIts.get(index);
                    if (shardRoutings.size() == 0) {
                        if(missingShards.length() > 0){
                            missingShards.append(", ");
                        }
                        missingShards.append(shardRoutings.shardId());
                    }
                }
                if (missingShards.length() > 0) {
                    //Status red - shard is missing all copies and would produce partial results for an index search
                    final String msg = "Search rejected due to missing shards ["+ missingShards +
                            "]. Consider using `allow_partial_search_results` setting to bypass this error.";
                    throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
                }
            }
            for (int index = 0; index < shardsIts.size(); index++) { // 轮询分片搜索
                final SearchShardIterator shardRoutings = shardsIts.get(index);
                assert shardRoutings.skip() == false;
                performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
            }
        }
    }

然后是query阶段,query阶段调用transportService去查当前节点,或者其他节点查询符合条件的文档

// org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction    

    protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
                                       final SearchActionListener<SearchPhaseResult> listener) {
        getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
            buildShardSearchRequest(shardIt), getTask(), listener);
    }

节点收到请求后找到对应的处理器处理

// org.elasticsearch.action.search.SearchTransportService 

        transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, // 注册query的请求处理器
            (request, channel, task) -> {
                searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>(
                    channel, QUERY_ACTION_NAME, request));
            });

 构建queryContext进行查询

// org.elasticsearch.search.SearchService

    private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {
        final SearchContext context = createAndPutContext(request);
        context.incRef();
        try {
            context.setTask(task);
            final long afterQueryTime;
            try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
                contextProcessing(context);
                loadOrExecuteQueryPhase(request, context); // query 主逻辑
                if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
                    freeContext(context.id());
                } else {
                    contextProcessedSuccessfully(context);
                }
                afterQueryTime = executor.success();
            }
            if (request.numberOfShards() == 1) {
                return executeFetchPhase(context, afterQueryTime); // fetch 逻辑
            }
            return context.queryResult();
        } catch (Exception e) {
            // execution exception can happen while loading the cache, strip it
            if (e instanceof ExecutionException) {
                e = (e.getCause() == null || e.getCause() instanceof Exception) ?
                    (Exception) e.getCause() : new ElasticsearchException(e.getCause());
            }
            logger.trace("Query phase failed", e);
            processFailure(context, e);
            throw e;
        } finally {
            cleanContext(context);
        }
    }
...
    private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
        final boolean canCache = indicesService.canCache(request, context);
        context.getQueryShardContext().freezeContext();
        if (canCache) { // 看下是否有缓存
            indicesService.loadIntoContext(request, context, queryPhase);
        } else {
            queryPhase.execute(context);
        }
    }
...
    public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
        if (searchContext.hasOnlySuggest()) {
            suggestPhase.execute(searchContext);
            searchContext.queryResult().topDocs(new TopDocsAndMaxScore(
                    new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),
                    new DocValueFormat[0]);
            return;
        }

        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));
        }

        // Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
        // request, preProcess is called on the DFS phase phase, this is why we pre-process them
        // here to make sure it happens during the QUERY phase
        aggregationPhase.preProcess(searchContext);
        final ContextIndexSearcher searcher = searchContext.searcher();
        boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled); // 查询主逻辑

        if (rescore) { // only if we do a regular search
            rescorePhase.execute(searchContext); // 重新打分
        }
        suggestPhase.execute(searchContext); // 处理建议,聚合
        aggregationPhase.execute(searchContext);

        if (searchContext.getProfilers() != null) {
            ProfileShardResult shardResults = SearchProfileShardResults
                    .buildShardResults(searchContext.getProfilers());
            searchContext.queryResult().profileResults(shardResults);
        }
    }
...
    static boolean execute(SearchContext searchContext,
                           final IndexSearcher searcher,
                           Consumer<Runnable> checkCancellationSetter) throws QueryPhaseExecutionException {
        final IndexReader reader = searcher.getIndexReader();
        QuerySearchResult queryResult = searchContext.queryResult();
        queryResult.searchTimedOut(false);
        try {
            queryResult.from(searchContext.from());
            queryResult.size(searchContext.size());
            Query query = searchContext.query();
            assert query == searcher.rewrite(query); // already rewritten

            final ScrollContext scrollContext = searchContext.scrollContext();
            if (scrollContext != null) {
                if (scrollContext.totalHits == null) {
                    // first round
                    assert scrollContext.lastEmittedDoc == null;
                    // there is not much that we can optimize here since we want to collect all
                    // documents in order to get the total number of hits

                } else {
                    final ScoreDoc after = scrollContext.lastEmittedDoc;
                    if (returnsDocsInOrder(query, searchContext.sort())) {
                        // now this gets interesting: since we sort in index-order, we can directly
                        // skip to the desired doc
                        if (after != null) {
                            query = new BooleanQuery.Builder()
                                .add(query, BooleanClause.Occur.MUST)
                                .add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER)
                                .build();
                        }
                        // ... and stop collecting after ${size} matches
                        searchContext.terminateAfter(searchContext.size());
                    } else if (canEarlyTerminate(reader, searchContext.sort())) {
                        // now this gets interesting: since the search sort is a prefix of the index sort, we can directly
                        // skip to the desired doc
                        if (after != null) {
                            query = new BooleanQuery.Builder()
                                .add(query, BooleanClause.Occur.MUST)
                                .add(new SearchAfterSortedDocQuery(searchContext.sort().sort, (FieldDoc) after), BooleanClause.Occur.FILTER)
                                .build();
                        }
                    }
                }
            }

            final LinkedList<QueryCollectorContext> collectors = new LinkedList<>();
            // whether the chain contains a collector that filters documents
            boolean hasFilterCollector = false;
            if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
                // add terminate_after before the filter collectors
                // it will only be applied on documents accepted by these filter collectors
                collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter()));
                // this collector can filter documents during the collection
                hasFilterCollector = true;
            }
            if (searchContext.parsedPostFilter() != null) {
                // add post filters before aggregations
                // it will only be applied to top hits
                collectors.add(createFilteredCollectorContext(searcher, searchContext.parsedPostFilter().query()));
                // this collector can filter documents during the collection
                hasFilterCollector = true;
            }
            if (searchContext.queryCollectors().isEmpty() == false) {
                // plug in additional collectors, like aggregations
                collectors.add(createMultiCollectorContext(searchContext.queryCollectors().values()));
            }
            if (searchContext.minimumScore() != null) {
                // apply the minimum score after multi collector so we filter aggs as well
                collectors.add(createMinScoreCollectorContext(searchContext.minimumScore()));
                // this collector can filter documents during the collection
                hasFilterCollector = true;
            }

            boolean timeoutSet = scrollContext == null && searchContext.timeout() != null &&
                searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false;

            final Runnable timeoutRunnable;
            if (timeoutSet) {
                final long startTime = searchContext.getRelativeTimeInMillis();
                final long timeout = searchContext.timeout().millis();
                final long maxTime = startTime + timeout;
                timeoutRunnable = () -> {
                    final long time = searchContext.getRelativeTimeInMillis();
                    if (time > maxTime) {
                        throw new TimeExceededException();
                    }
                };
            } else {
                timeoutRunnable = null;
            }

            final Runnable cancellationRunnable;
            if (searchContext.lowLevelCancellation()) {
                SearchTask task = searchContext.getTask();
                cancellationRunnable = () -> { if (task.isCancelled()) throw new TaskCancelledException("cancelled"); };
            } else {
                cancellationRunnable = null;
            }

            final Runnable checkCancelled;
            if (timeoutRunnable != null && cancellationRunnable != null) {
                checkCancelled = () -> {
                    timeoutRunnable.run();
                    cancellationRunnable.run();
                };
            } else if (timeoutRunnable != null) {
                checkCancelled = timeoutRunnable;
            } else if (cancellationRunnable != null) {
                checkCancelled = cancellationRunnable;
            } else {
                checkCancelled = null;
            }

            checkCancellationSetter.accept(checkCancelled);

            // add cancellable
            // this only performs segment-level cancellation, which is cheap and checked regardless of
            // searchContext.lowLevelCancellation()
            collectors.add(createCancellableCollectorContext(searchContext.getTask()::isCancelled));

            final boolean doProfile = searchContext.getProfilers() != null;
            // create the top docs collector last when the other collectors are known
            final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, reader, hasFilterCollector);
            // add the top docs collector, the first collector context in the chain
            collectors.addFirst(topDocsFactory);

            final Collector queryCollector;
            if (doProfile) {
                InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
                searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
                queryCollector = profileCollector;
            } else {
               queryCollector = QueryCollectorContext.createQueryCollector(collectors);
            }

            try {
                searcher.search(query, queryCollector); // 调用lucene api
            } catch (EarlyTerminatingCollector.EarlyTerminationException e) {
                queryResult.terminatedEarly(true);
            } catch (TimeExceededException e) {
                assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";

                if (searchContext.request().allowPartialSearchResults() == false) {
                    // Can't rethrow TimeExceededException because not serializable
                    throw new QueryPhaseExecutionException(searchContext, "Time exceeded");
                }
                queryResult.searchTimedOut(true);
            } finally {
                searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION);
            }
            if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER
                    && queryResult.terminatedEarly() == null) {
                queryResult.terminatedEarly(false);
            }

            final QuerySearchResult result = searchContext.queryResult();
            for (QueryCollectorContext ctx : collectors) {
                ctx.postProcess(result);
            }
            ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
            if (executor instanceof QueueResizingEsThreadPoolExecutor) {
                QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor;
                queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
                queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
            }
            if (searchContext.getProfilers() != null) {
                ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers());
                result.profileResults(shardResults);
            }
            return topDocsFactory.shouldRescore();
        } catch (Exception e) {
            throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e);
        }
    }

至此,节点查询逻辑完成。请求查询的节点对查询结果进行保存

// org.elasticsearch.action.search.AbstractSearchAsyncAction

    public final void onShardSuccess(Result result) {
        successfulOps.incrementAndGet();
        results.consumeResult(result); // 处理查询结果
        if (logger.isTraceEnabled()) {
            logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
        }
        // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
        // so its ok concurrency wise to miss potentially the shard failures being created because of another failure
        // in the #addShardFailure, because by definition, it will happen on *another* shardIndex
        AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
        if (shardFailures != null) {
            shardFailures.set(result.getShardIndex(), null);
        }
    }
// org.elasticsearch.action.search.InitialSearchPhase

    void consumeResult(Result result) {
        assert results.get(result.getShardIndex()) == null : "shardIndex: " + result.getShardIndex() + " is already set";
        results.set(result.getShardIndex(), result); // 处理查询结果
    }

下一步是fetch阶段

// org.elasticsearch.action.search.FetchSearchPhase

    private void innerRun() throws IOException {
        final int numShards = context.getNumShards();
        final boolean isScrollSearch = context.getRequest().scroll() != null;
        List<SearchPhaseResult> phaseResults = queryResults.asList();
        String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null;
        final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce(); // 解析上一步的查询结果,主要是文档id
        final boolean queryAndFetchOptimization = queryResults.length() == 1;
        final Runnable finishPhase = ()
            -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
            queryResults : fetchResults);
        if (queryAndFetchOptimization) {
            assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()
                + "], single result: " +  phaseResults.get(0).fetchResult();
            // query AND fetch optimization
            finishPhase.run();
        } else {
            ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs;
            final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs); // fetch哪些文档
            if (scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return
                phaseResults.stream()
                    .map(SearchPhaseResult::queryResult)
                    .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources
                finishPhase.run();
            } else {
                final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ?
                    searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards)
                    : null;
                final CountedCollector<FetchSearchResult> counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r),
                    docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not
                    finishPhase, context);
                for (int i = 0; i < docIdsToLoad.length; i++) {
                    IntArrayList entry = docIdsToLoad[i];
                    SearchPhaseResult queryResult = queryResults.get(i);
                    if (entry == null) { // no results for this shard ID
                        if (queryResult != null) {
                            // if we got some hits from this shard we have to release the context there
                            // we do this as we go since it will free up resources and passing on the request on the
                            // transport layer is cheap.
                            releaseIrrelevantSearchContext(queryResult.queryResult());
                        }
                        // in any case we count down this result since we don't talk to this shard anymore
                        counter.countDown();
                    } else {
                        SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
                        Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(),
                            searchShardTarget.getNodeId());
                        ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry,
                            lastEmittedDocPerShard, searchShardTarget.getOriginalIndices());
                        executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(),
                            connection); // 去fetch文档内容
                    }
                }
            }
        }

 最后收集结果返回:

// org.elasticsearch.action.search.AbstractSearchAsyncAction    

    protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
        ShardSearchFailure[] failures = buildShardFailures();
        Boolean allowPartialResults = request.allowPartialSearchResults();
        assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
        if (allowPartialResults == false && failures.length > 0){
            raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
        }
        return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
            skippedOps.get(), buildTookInMillis(), failures, clusters);
    }

3.elasticsearch中的回调

es中大量使用listener回调,对于习惯了顺序逻辑的同学可能会不太适应,这里举例说明

可以看到doExecute方法定义了一个很长的rewriteListener,然后在Rewriteable中进行回调。

注意到doExecute 方法参数里面也有一个listener,调用 executeLocalSearch 后也会进行回调。有些回调可能有多层,需要层层往上递归。

// org.elasticsearch.action.search.TransportSearchAction

    protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        final long relativeStartNanos = System.nanoTime();
        final SearchTimeProvider timeProvider =
             new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
        ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> { // 1.先定义listener
            if (source != searchRequest.source()) {
                // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
                // situations when source is rewritten to null due to a bug
                searchRequest.source(source);
            }
            final ClusterState clusterState = clusterService.state();
            final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
                searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
            OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
            if (remoteClusterIndices.isEmpty()) {
                executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener); // 查询当前节点
            } else {
                if (shouldMinimizeRoundtrips(searchRequest)) { // 使用了折叠
                    ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, searchService::createReduceContext,
                        remoteClusterService, threadPool, listener,
                        (r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));
                } else {
                    AtomicInteger skippedClusters = new AtomicInteger(0);
                    collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
                        skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
                        ActionListener.wrap(
                            searchShardsResponses -> {
                                List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
                                Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
                                BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(
                                    searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
                                int localClusters = localIndices == null ? 0 : 1;
                                int totalClusters = remoteClusterIndices.size() + localClusters;
                                int successfulClusters = searchShardsResponses.size() + localClusters;
                                executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
                                    remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
                                    new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));
                            },
                            listener::onFailure));
                }
            }
        }, listener::onFailure);
        if (searchRequest.source() == null) {
            rewriteListener.onResponse(searchRequest.source());
        } else {
            Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
                rewriteListener); // 2. rewriteAndFetch
        }
    }
// org.elasticsearch.index.query.Rewriteable

    static <T extends Rewriteable<T>> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener<T>
        rewriteResponse, int iteration) {
        T builder = original;
        try {
            for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder;
                 rewrittenBuilder = builder.rewrite(context)) {
                builder = rewrittenBuilder;
                if (iteration++ >= MAX_REWRITE_ROUNDS) {
                    // this is some protection against user provided queries if they don't obey the contract of rewrite we allow 16 rounds
                    // and then we fail to prevent infinite loops
                    throw new IllegalStateException("too many rewrite rounds, rewriteable might return new objects even if they are not " +
                        "rewritten");
                }
                if (context.hasAsyncActions()) {
                    T finalBuilder = builder;
                    final int currentIterationNumber = iteration;
                    context.executeAsyncActions(ActionListener.wrap(n -> rewriteAndFetch(finalBuilder, context, rewriteResponse,
                        currentIterationNumber), rewriteResponse::onFailure));
                    return;
                }
            }
            rewriteResponse.onResponse(builder); // 3. 回调 rewriteListener
        } catch (IOException|IllegalArgumentException|ParsingException ex) {
            rewriteResponse.onFailure(ex);
        }
    }

4.总结

 本文简单描述了es是如何进行文档查询的,es会先去各个分片上获取符合查询条件的文档id等信息,然后再fetch文档内容。本文没有涉及dfs,后面博客会继续探讨这些课题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1436381.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

通义千问上线春节新应用,AI帮你免费拍全家福

2月5日&#xff0c;春节将至年味渐浓&#xff0c;阿里云通义千问APP上线多项免费新应用&#xff0c;涵盖全家福、拜新年、万物成龙等图像生成的新玩法&#xff0c;共提供超300套照片模板&#xff0c;用户上传照片即可生成全家福、团圆照、拜年照、千里江山主题照&#xff1b;此…

引用大佬讲座谈 2100万日活FPS手游怎样做优化?百元机稳定30帧、适配98%机型

大型手机游戏对不同手机机型的适配&#xff0c;始终是贯穿手游发展过程的挑战。尤其在前几年&#xff0c;高配置的智能机还没有普及开的时候&#xff0c;产品优化是研发环节的重中之重。而作为对于网络状况、数据传输和服务器架构等有着极高要求的游戏品类&#xff0c;FPS更是优…

Javaweb之SpringBootWeb案例之 登录功能的详细解析

1. 登录功能 1.1 需求 在登录界面中&#xff0c;我们可以输入用户的用户名以及密码&#xff0c;然后点击 "登录" 按钮就要请求服务器&#xff0c;服务端判断用户输入的用户名或者密码是否正确。如果正确&#xff0c;则返回成功结果&#xff0c;前端跳转至系统首页面…

重学Java 13.面向对象.1

在熟悉的事物中循环 ——24.2.7 一、static关键字 1.static关键字的介绍以及基本使用 1.概述&#xff1a;static是一个静态关键字 2.使用&#xff1a; a.修饰一个成员变量&#xff1a; static 数据类型 变量名 b.修饰一个方法&#xff1a; 修饰符 static 返回值类型 方法名&am…

2024年,前景最被看好的十大行业

哪些行业在未来更具增长潜力&#xff1f;资本市场给出的答案&#xff0c;可以从上市公司的“行业市盈率”看出&#xff0c;但传统的行业市盈率方法需要改造。 “市盈率”简称PE&#xff0c;是股票价格与每股收益之间的比值。比如&#xff0c;某股票价格10元&#xff0c;某年的…

Vue中对虚拟DOM的理解

作为现代前端开发中的主流框架之一&#xff0c;Vue.js是一个非常流行的JavaScript框架&#xff0c;其核心概念之一就是虚拟DOM&#xff08;Virtual DOM&#xff09;。在本篇文章中&#xff0c;我们将深入探讨Vue中虚拟DOM的概念&#xff0c;并讨论为什么它在前端开发中如此重要…

小白水平理解面试经典题目_二维数组类LeetCode 2966 Divide Array【排序算法实现】

2966 将数组划分为具有最大差值的数组 小白渣翻译&#xff1a; 给定一个大小为 n 的整数数组 nums 和一个正整数 k 。 将数组分成一个或多个大小为 3 的数组&#xff0c;满足以下条件&#xff1a; nums 的每个元素都应该位于一个数组中。一个数组中任意两个元素之间的差异小…

读分布式稳定性建设指南文档

最近还是在做一些和稳定性建设相关的事情&#xff0c;找到一份《分布式稳定性建设指南》文档&#xff0c;摘抄了其中的重点&#xff0c;以便后续回顾方便&#xff0c;一直没上传好资源&#xff0c;我之后再试试&#xff0c;原文内容质量非常高。 大家可以先看一级目录即可&…

c#cad 创建-圆(二)

运行环境 vs2022 c# cad2016 调试成功 一、代码说明 这段代码是一个AutoCAD插件&#xff0c;用于在模型空间中创建一个圆形。 首先&#xff0c;我们需要定义一个命令类CreateCircleCommand&#xff0c;并在命名空间CreateCircleInCad中声明。 在CreateCircleCommand类中&a…

闲聊电脑(6)装个 Windows(二)

闲聊电脑&#xff08;6&#xff09;装个 Windows&#xff08;二&#xff09; 夜深人静&#xff0c;万籁俱寂&#xff0c;老郭趴在电脑桌上打盹&#xff0c;桌子上的小黄鸭和桌子旁的冰箱又开始窃窃私语…… 小黄鸭&#xff1a;冰箱大哥&#xff0c;上次说的镜像文件到底长啥样…

【Ftp客户端】FTPBox starter

Github&#xff1a; https://github.com/lihewei7/ftpbox-spring-boot-starterGitee&#xff1a; https://gitee.com/lihewei7/ftpbox-spring-boot-starter 文章目录 FTPBox是什么&#xff1f;Maven依赖使用APIuploaddownloadexistslistexecuteexecuteWithoutResult 配置单主机…

【C++】类与对象(四)——初始化列表|explicit关键字|static成员|友元|匿名对象

前言&#xff1a; 初始化列表&#xff0c;explicit关键字&#xff0c;static成员&#xff0c;友元&#xff0c;匿名对象 文章目录 一、构造函数的初始化列表1.1 构造函数体内赋值1.2 初始化列表 二、explicit关键字三、static成员四、友元4.1 友元函数4.2 友元类 五、内部类六、…

生成树技术华为ICT网络赛道

9.生成树 目录 9.生成树 9.1.生成树技术概述 9.2.STP的基本概念及工作原理 9.3.STP的基础配置 9.4.RSTP对STP的改进 9.5.生成树技术进阶 9.1.生成树技术概述 技术背景&#xff1a;二层交换机网络的冗余性与环路 典型问题1&#xff1a;广播风暴 典型问题2&#xff1a;MA…

OJ_单词个数统计

题干 C实现 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<map> using namespace std;int main() {//如果输入带空格的字符串&#xff08;scanf不行&#xff09;&#xff1f;eg:This is An Pencil Case//1.fgets()//2.cin.getline();char arr[200]…

PCB经验规则的综合应用

PCB经验规则的综合应用 走线尽量短&#xff0c;长度小于信号波长的十分之一 二是无法短的&#xff0c;就控制它的阻抗 按传输线设计、控制阻抗 首先我们来看看电路板的参数。常见的1.6毫米电路板 1oz 铜箔&#xff0c;介质 FR4&#xff0c;介电常数4.6-4.8&#xff0c;板芯厚…

Redis核心技术与实战【学习笔记】 - 28.Redis 6.0新特性(多线程、客户端缓存与安全)

简述 Redis 6.0 新增了几个关键新特性&#xff0c;分别是面向网络处理的多 IO 线程、客户端缓存、细粒度的权限控制&#xff0c;以及 RESP 3 协议的使用。 其中&#xff0c;面向网络处理的多 IO 线程可以提高网络请求处理的速度&#xff0c;而客户端缓存可以让应用直接在客户…

唐嫣、刘诗诗、杨幂齐聚春晚舞台,再现仙剑三美惊艳绝伦的魅力。

♥ 为方便您进行讨论和分享&#xff0c;同时也为能带给您不一样的参与感。请您在阅读本文之前&#xff0c;点击一下“关注”&#xff0c;非常感谢您的支持&#xff01; 文 |猴哥聊娱乐 编 辑|徐 婷 校 对|侯欢庭 近日&#xff0c;娱乐圈的目光将聚焦于三位璀璨的女星——唐嫣…

opensatck中windows虚拟机CPU核数显示异常问题处理

文章目录 一、问题描述二、元数据信息三、以32核的实例模版为例3.1 单槽位32核3.2 双槽位32核 总结 一、问题描述 openstack创建windows虚拟机的时候&#xff0c;使用普通的实例模版会出现CPU数量和实例模版不一致的问题。需要定制元数据才可以正常显示。 帖子&#xff1a;htt…

七月论文审稿GPT第2.5和第3版:分别微调GPT3.5、Llama2 13B以扩大对GPT4的优势

前言 自去年7月份我带队成立大模型项目团队以来&#xff0c;我司至今已有5个项目组&#xff0c;其中 第一个项目组的AIGC模特生成系统已经上线在七月官网第二项目组的论文审稿GPT则将在今年3 4月份对外上线发布第三项目组的RAG知识库问答第1版则在春节之前已就绪至于第四、第…

python下字符串操作

目录 一&#xff1a;连接字符串 二&#xff1a;字符串切片 三&#xff1a;字符串查找 四&#xff1a;字符串替换 五&#xff1a;字符串大小写转换 六&#xff1a;字符串分割 七&#xff1a;字符串去除空格和特殊字符 八&#xff1a;字符串长度 九&#xff1a;检查字符…