Elasticsearch 8.9 Bulk批量给索引增加数据源码

news2024/11/25 18:47:42

  • 一、相关API的handler
  • 二、RestBulkAction,组装bulkRequest调用TransportBulkAction
  • 三、TransportBulkAction 会把数据分发到数据该到的数据节点
    • 1、把数据按分片分组,按分片分组数据再发送到指定的数据节点
      • (1) 计算此文档发往哪个分片
        • 1)根据索引是否是分区索引,返回不同的索引路由对象
        • 2) 文档没有id会自动给文档创建id
        • 3)根据不同的索引路由对象,id和routing决定此文档发往哪个分片
      • (2)、通过taskManager注册Task执行action.execute发送到数据节点
  • 四、数据节点(TransportShardBulkAction)处理处理来自主节点的数据
    • 1、针对此节点上索引分片进行操作
      • (1) 组装Engine.Index
      • (2)先添加到Lucene,成功后再添加到translog

下面的图来自ElasticSearch——刷盘原理流程,这篇文章主要讲的是客户端发送bulk命令到保存到Lucenetranslog的过程源码,不涉及到把数据刷到磁盘的逻辑,也不讲解存储在Lucene的数据结构

在这里插入图片描述

一、相关API的handler

ActionModule.java

//主节点处理谁分发到不同数据节点node的逻辑
 actions.register(BulkAction.INSTANCE, TransportBulkAction.class);
 //node节点接收到主节点分发的数据后的处理
 actions.register(TransportShardBulkAction.TYPE, TransportShardBulkAction.class);
 //主节点接收客户端的请求的hander
 registerHandler.accept(new RestBulkAction(settings));

二、RestBulkAction,组装bulkRequest调用TransportBulkAction

public class RestBulkAction extends BaseRestHandler {
 
    @Override
    public List<Route> routes() {
        return List.of(
            new Route(POST, "/_bulk"),
            new Route(PUT, "/_bulk"),
            new Route(POST, "/{index}/_bulk"),
            new Route(PUT, "/{index}/_bulk"),
            Route.builder(POST, "/{index}/{type}/_bulk").deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build(),
            Route.builder(PUT, "/{index}/{type}/_bulk").deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build()
        );
    }

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
            request.param("type");
        }
        BulkRequest bulkRequest = new BulkRequest();
        String defaultIndex = request.param("index");
        String defaultRouting = request.param("routing");
        FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
        String defaultPipeline = request.param("pipeline");
        String waitForActiveShards = request.param("wait_for_active_shards");
        if (waitForActiveShards != null) {
            bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
        }
        Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
        bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
        bulkRequest.setRefreshPolicy(request.param("refresh"));
        bulkRequest.add(
            request.requiredContent(),
            defaultIndex,
            defaultRouting,
            defaultFetchSourceContext,
            defaultPipeline,
            defaultRequireAlias,
            allowExplicitIndex,
            request.getXContentType(),
            request.getRestApiVersion()
        );
		
        return channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));
    }
}
  @Override
    public void bulk(final BulkRequest request, final ActionListener<BulkResponse> listener) {
        execute(BulkAction.INSTANCE, request, listener);
    }

其中BulkAction.INSTANCE会通过最上面的actions转到TransportBulkAction.class

三、TransportBulkAction 会把数据分发到数据该到的数据节点

public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {

 @Override
    protected void doExecute(Task task, org.elasticsearch.action.bulk.BulkRequest bulkRequest, ActionListener<org.elasticsearch.action.bulk.BulkResponse> listener) {
        /*
         * 这是在传输过程中调用的,因此我们可以快速检查索引内存压力,但我们不想让传输线程保持繁忙。然后,一旦我们有了索引压力,
         * 我们就会分叉到其中一个写入线程池。我们这样做是因为处理批量请求可能会变得昂贵,原因如下:
         * 在将子请求分派给分片时,我们可能需要压缩它们。LZ4 速度超快,但速度足够慢,最好不要在传输线程上执行此操作,尤其是对于大型子请求。
         * 我们可以检测到这些情况,然后才分叉,但这要正确处理起来很复杂,而且分叉的开销相当低。
         */
        final int indexingOps = bulkRequest.numberOfActions();
        final long indexingBytes = bulkRequest.ramBytesUsed();
        final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
        final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
        final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
        final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
        //通过线程池调用
        threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
            @Override
            protected void doRun() {
                doInternalExecute(task, bulkRequest, executorName, releasingListener);
            }
        });
    }

    protected void doInternalExecute(Task task, org.elasticsearch.action.bulk.BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
       //省略代码
        //在开始之前,尝试创建我们在批量处理期间需要的所有索引。
        // Step 1: 收集请求中的所有索引
        final Map<String, Boolean> indices = bulkRequest.requests.stream()  
            //  删除请求不应尝试创建索引(如果索引不存在),除非使用外部版本控制
            .filter(
                request -> request.opType() != DocWriteRequest.OpType.DELETE
                    || request.versionType() == VersionType.EXTERNAL
                    || request.versionType() == VersionType.EXTERNAL_GTE
            )
            .collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 || v2));

        // Step 2: 筛选索引列表以查找当前不存在的索引。
        final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
        Set<String> autoCreateIndices = new HashSet<>();
        ClusterState state = clusterService.state();
        for (Map.Entry<String, Boolean> indexAndFlag : indices.entrySet()) {
            final String index = indexAndFlag.getKey();
            boolean shouldAutoCreate = indexNameExpressionResolver.hasIndexAbstraction(index, state) == false;
            //只有当我们不要求它是别名时,我们才应该自动创建
            if (shouldAutoCreate && (indexAndFlag.getValue() == false)) {
                autoCreateIndices.add(index);
            }
        }

        // Step 3: 创建所有缺失的索引(如果有任何缺失)。在所有创建返回后启动批量
        if (autoCreateIndices.isEmpty()) {
            executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
        } else {     
			//省略代码
			 for (String index : autoCreateIndices) {
				//省略代码,遍历创建索引	
			}
		}

	}
	 void executeBulk(
        Task task,
        BulkRequest bulkRequest,
        long startTimeNanos,
        ActionListener<BulkResponse> listener,
        String executorName,
        AtomicArray<BulkItemResponse> responses,
        Map<String, IndexNotFoundException> indicesThatCannotBeCreated
    ) {
        //创建一个BulkOperation对象,执行doRun方法
        new BulkOperation(task, bulkRequest, listener, executorName, responses, startTimeNanos, indicesThatCannotBeCreated).run();
    }

}

1、把数据按分片分组,按分片分组数据再发送到指定的数据节点

 private final class BulkOperation extends ActionRunnable<BulkResponse> {
	@Override
        protected void doRun() {
           //省略代码
            Metadata metadata = clusterState.metadata();
        
            //按 ShardId -> Operations 映射对请求进行分组
            Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
            //遍历请求的每一条数据
            for (int i = 0; i < bulkRequest.requests.size(); i++) {
                DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
         		//省略代码
               
                IndexAbstraction ia = null;
                //请求是要把文档加入到索引
                boolean includeDataStreams = docWriteRequest.opType() == DocWriteRequest.OpType.CREATE;
                try {
                    //给定的请求解析索引
                    ia = concreteIndices.resolveIfAbsent(docWriteRequest);
                    //获取具体的写入索引
                    final Index concreteIndex = docWriteRequest.getConcreteWriteIndex(ia, metadata);
                    //判断索引是否关闭
                    if (addFailureIfIndexIsClosed(docWriteRequest, concreteIndex, i, metadata)) {
                        continue;
                    }
                    //获取索引的路由信息,其中返回的indexRouting是new Unpartitioned
                    IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
                    //这里如果文档没有带id,则会给文档生成一个id
                    docWriteRequest.process(indexRouting);
                    //获取分片ID 里面IdAndRoutingOnly调用的是Unpartitioned获取分片id
                    int shardId = docWriteRequest.route(indexRouting);
                    //请求和分片ID封装为BulkItemRequest对象,computeIfAbsent是如果不存在则新建
                    List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(
                        new ShardId(concreteIndex, shardId),
                        shard -> new ArrayList<>()
                    );
                    //并将其添加到requestsByShard中对应的分片请求列表中。
                    shardRequests.add(new BulkItemRequest(i, docWriteRequest));
                } catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException | ResourceNotFoundException e) {
                    String name = ia != null ? ia.getName() : docWriteRequest.index();
                    BulkItemResponse.Failure failure = new BulkItemResponse.Failure(name, docWriteRequest.id(), e);
                    BulkItemResponse bulkItemResponse = BulkItemResponse.failure(i, docWriteRequest.opType(), failure);
                    responses.set(i, bulkItemResponse);
                    // make sure the request gets never processed again
                    bulkRequest.requests.set(i, null);
                }
            }
            //没有要添加的数据,直接返回了
            if (requestsByShard.isEmpty()) {
                listener.onResponse(
                    new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
                );
                return;
            }
            //下面就知道是按照分片ID分别分发请求
            final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
            String nodeId = clusterService.localNode().getId();
            for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
                final ShardId shardId = entry.getKey();
                final List<BulkItemRequest> requests = entry.getValue();
                BulkShardRequest bulkShardRequest = new BulkShardRequest(
                    shardId,
                    bulkRequest.getRefreshPolicy(),
                    requests.toArray(new BulkItemRequest[requests.size()])
                );
                bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
                bulkShardRequest.timeout(bulkRequest.timeout());
                bulkShardRequest.routedBasedOnClusterVersion(clusterState.version());
                if (task != null) {
                    bulkShardRequest.setParentTask(nodeId, task.getId());
                }
                client.executeLocally(TransportShardBulkAction.TYPE, bulkShardRequest, new ActionListener<>() {
                	//成功后的响应处理
                    @Override
                    public void onResponse(BulkShardResponse bulkShardResponse) {
                        for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {    
                            if (bulkItemResponse.getResponse() != null) {
                                bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                            }
                            responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                        }
                      //所有的分片请求都完成后,则完成组装响应报文
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }
					//失败的处理逻辑
                    @Override
                    public void onFailure(Exception e) {
                        // create failures for all relevant requests
                        for (BulkItemRequest request : requests) {
                            final String indexName = request.index();
                            DocWriteRequest<?> docWriteRequest = request.request();
                            BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e);
                            responses.set(request.id(), BulkItemResponse.failure(request.id(), docWriteRequest.opType(), failure));
                        }
                        //所有的分片请求都完成后,则完成组装响应报文
                        if (counter.decrementAndGet() == 0) {
                            finishHim();
                        }
                    }

                    private void finishHim() {
                        listener.onResponse(
                            new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
                        );
                    }
                });
            }
            bulkRequest = null; // allow memory for bulk request items to be reclaimed before all items have been completed允许在所有项目完成之前回收批量请求项的内存
        }
}

(1) 计算此文档发往哪个分片

其中上面的关键代码块是下面

 //获取索引的路由信息,其中返回的indexRouting是new Unpartitioned
 IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
 //这里会针对文档的id做一些处理,比如会判断是否存在,不存在是抛异常还是创建一个新的
 docWriteRequest.process(indexRouting);
 //获取分片ID 里面IdAndRoutingOnly调用的是Unpartitioned获取分片id
 int shardId = docWriteRequest.route(indexRouting);

首先indexRouting的对象下面的Partitioned或者Unpartitioned

1)根据索引是否是分区索引,返回不同的索引路由对象
public static IndexRouting fromIndexMetadata(IndexMetadata metadata) {
        if (false == metadata.getRoutingPaths().isEmpty()) {
            return new ExtractFromSource(metadata);
        }
        //代码检查索引元数据是否是分区索引
        //如果是,则创建一个分区索引路由对象(Partitioned)并返回
        if (metadata.isRoutingPartitionedIndex()) {
            return new Partitioned(metadata);
        }
        //以上条件都不满足,则创建一个非分区索引路由对象(Unpartitioned)并返回
        return new Unpartitioned(metadata);
    }
2) 文档没有id会自动给文档创建id
 @Override
    public void process(IndexRouting indexRouting) {
        indexRouting.process(this);
    }

不管Partitioned还是Unpartitioned都继承自IdAndRoutingOnly

private abstract static class IdAndRoutingOnly extends IndexRouting {
 		@Override
        public void process(IndexRequest indexRequest) {
        	//往索引添加文档的id不能为空字符串,但是可以为null,后续会自动创建id
            if ("".equals(indexRequest.id())) {
                throw new IllegalArgumentException("if _id is specified it must not be empty");
            }

            // generate id if not already provided
            if (indexRequest.id() == null) {
                indexRequest.autoGenerateId();
            }
        }
}
public void autoGenerateId() {
        assert id == null;
        assert autoGeneratedTimestamp == UNSET_AUTO_GENERATED_TIMESTAMP : "timestamp has already been generated!";
        assert ifSeqNo == UNASSIGNED_SEQ_NO;
        assert ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM;
        autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis());
        String uid = UUIDs.base64UUID();
        id(uid);
    }
3)根据不同的索引路由对象,id和routing决定此文档发往哪个分片

其中route的接口如下

@Override
    public int route(IndexRouting indexRouting) {
        return indexRouting.indexShard(id, routing, contentType, source);
    }
    private abstract static class IdAndRoutingOnly extends IndexRouting {
	   protected abstract int shardId(String id, @Nullable String routing);
	    @Override
        public int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source) {
            if (id == null) {
                throw new IllegalStateException("id is required and should have been set by process");
            }
            checkRoutingRequired(id, routing);
            return shardId(id, routing);
        }
}

其中shardId有两种实现,分别是Partitioned还是Unpartitioned

 private static class Unpartitioned extends IdAndRoutingOnly {
        Unpartitioned(IndexMetadata metadata) {
            super(metadata);
        }
		//优先routing,如果没有则用id
        @Override
        protected int shardId(String id, @Nullable String routing) {
            return hashToShardId(effectiveRoutingToHash(routing == null ? id : routing));
        }

        @Override
        public void collectSearchShards(String routing, IntConsumer consumer) {
            consumer.accept(hashToShardId(effectiveRoutingToHash(routing)));
        }
    }

    private static class Partitioned extends IdAndRoutingOnly {
        private final int routingPartitionSize;

        Partitioned(IndexMetadata metadata) {
            super(metadata);
            this.routingPartitionSize = metadata.getRoutingPartitionSize();
        }
		//其中routing不能为null
        @Override
        protected int shardId(String id, @Nullable String routing) {
            if (routing == null) {
                throw new IllegalArgumentException("A routing value is required for gets from a partitioned index");
            }
            int offset = Math.floorMod(effectiveRoutingToHash(id), routingPartitionSize);
            return hashToShardId(effectiveRoutingToHash(routing) + offset);
        }

        @Override
        public void collectSearchShards(String routing, IntConsumer consumer) {
            int hash = effectiveRoutingToHash(routing);
            for (int i = 0; i < routingPartitionSize; i++) {
                consumer.accept(hashToShardId(hash + i));
            }
        }
    }

下面只看Unpartitioned

  /**
     * Convert a routing value into a hash.
     * 将路由值转换为哈希值。
     */
    private static int effectiveRoutingToHash(String effectiveRouting) {
        return Murmur3HashFunction.hash(effectiveRouting);
    }
/**
 * Hash function based on the Murmur3 algorithm, which is the default as of Elasticsearch 2.0.
 * 基于 Murmur3 算法的哈希函数,这是 Elasticsearch 2.0 的默认算法。
 */
public final class Murmur3HashFunction {

    private Murmur3HashFunction() {
        // no instance
    }

    public static int hash(String routing) {
        final byte[] bytesToHash = new byte[routing.length() * 2];
        for (int i = 0; i < routing.length(); ++i) {
            final char c = routing.charAt(i);
            final byte b1 = (byte) c, b2 = (byte) (c >>> 8);
            assert ((b1 & 0xFF) | ((b2 & 0xFF) << 8)) == c; // no information loss
            bytesToHash[i * 2] = b1;
            bytesToHash[i * 2 + 1] = b2;
        }
        return hash(bytesToHash, 0, bytesToHash.length);
    }

    public static int hash(byte[] bytes, int offset, int length) {
        return StringHelper.murmurhash3_x86_32(bytes, offset, length, 0);
    }
}
 /**
     * Convert a hash generated from an {@code (id, routing}) pair into a
     * shard id. 将从 {@code (id, routing}) 对生成的哈希转换为分片 ID。
     */
    protected final int hashToShardId(int hash) {
        return Math.floorMod(hash, routingNumShards) / routingFactor;
    }

这样就指定了文档的分片id

(2)、通过taskManager注册Task执行action.execute发送到数据节点

 client.executeLocally(TransportShardBulkAction.TYPE, bulkShardRequest, new ActionListener<>()
 public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(
        ActionType<Response> action,
        Request request,
        ActionListener<Response> listener
    ) {
        return taskManager.registerAndExecute(
            "transport",
            transportAction(action),
            request,
            localConnection,
            new SafelyWrappedActionListener<>(listener)
        );
    }

后面的逻辑就不梳理了,直接看TransportShardBulkAction.TYPE

四、数据节点(TransportShardBulkAction)处理处理来自主节点的数据

public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
 //这里处理属于属于当前节点分片的数据,请求是从主节点上过来的
    @Override
    protected void dispatchedShardOperationOnPrimary(
        BulkShardRequest request,
        IndexShard primary,
        ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
    ) {
        ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
        performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
            assert update != null;
            assert shardId != null;
            mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), update, mappingListener);
        }, mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() {
           //省略代码
            }
        }), listener, threadPool, executor(primary), postWriteRefresh, postWriteAction);
    }


}

performOnPrimary 直接看这个

  public static void performOnPrimary(
        org.elasticsearch.action.bulk.BulkShardRequest request,
        IndexShard primary,
        UpdateHelper updateHelper,
        LongSupplier nowInMillisSupplier,
        MappingUpdatePerformer mappingUpdater,
        Consumer<ActionListener<Void>> waitForMappingUpdate,
        ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener,
        ThreadPool threadPool,
        String executorName,
        @Nullable PostWriteRefresh postWriteRefresh,
        @Nullable Consumer<Runnable> postWriteAction
    ) {
        new ActionRunnable<>(listener) {

            private final Executor executor = threadPool.executor(executorName);

            private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);

            final long startBulkTime = System.nanoTime();

            @Override
            protected void doRun() throws Exception {
                //只要所有的请求没有执行完
                while (context.hasMoreOperationsToExecute()) {
                    if (executeBulkItemRequest(
                        context,
                        updateHelper,
                        nowInMillisSupplier,
                        mappingUpdater,
                        waitForMappingUpdate,
                        ActionListener.wrap(v -> executor.execute(this), this::onRejection)
                    ) == false) {
                        //我们正在等待另一个线程上的映射更新,一旦完成,它将再次调用此操作,因此我们在这里爆发。
                        return;
                    }
                    assert context.isInitial(); // either completed and moved to next or reset 要么完成并移至下一个,要么重置
                }
                primary.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime); 
                finishRequest();
            }

            @Override
            public void onRejection(Exception e) {
               //省略代码
            }

            private void finishRequest() {
                //省略代码
            }
        }.run();
    }

1、针对此节点上索引分片进行操作

static boolean executeBulkItemRequest(
       BulkPrimaryExecutionContext context,
        UpdateHelper updateHelper,
        LongSupplier nowInMillisSupplier,
        MappingUpdatePerformer mappingUpdater,
        Consumer<ActionListener<Void>> waitForMappingUpdate,
        ActionListener<Void> itemDoneListener
    ) throws Exception {
            //,则获取IndexRequest对象,并创建SourceToParse对象,将相应参数传递给primary的applyIndexOperationOnPrimary方法进行索引操作
            final IndexRequest request = context.getRequestToExecute();
            final SourceToParse sourceToParse = new SourceToParse(
                request.id(),
                request.source(),
                request.getContentType(),
                request.routing(),
                request.getDynamicTemplates()
            );
            //把文档数据保存到分片,返回结果保存结果
            result = primary.applyIndexOperationOnPrimary(
                version,
                request.versionType(),
                sourceToParse,
                request.ifSeqNo(),
                request.ifPrimaryTerm(),
                request.getAutoGeneratedTimestamp(),
                request.isRetry()
            );
      
        //从结果中得到,需要更新索引Mapper的映射,则更新索引的mapper
        if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
			//省略代码
        } 
        return true;
    }

(1) 组装Engine.Index

public Engine.IndexResult applyIndexOperationOnPrimary(
        long version,
        VersionType versionType,
        SourceToParse sourceToParse,
        long ifSeqNo,
        long ifPrimaryTerm,
        long autoGeneratedTimestamp,
        boolean isRetry
    ) throws IOException {
        assert versionType.validateVersionForWrites(version);
        //针对索引的操作,包括更新TransLog
        return applyIndexOperation(
            getEngine(),
            UNASSIGNED_SEQ_NO,
            getOperationPrimaryTerm(),
            version,
            versionType,
            ifSeqNo,
            ifPrimaryTerm,
            autoGeneratedTimestamp,
            isRetry,
            Engine.Operation.Origin.PRIMARY,
            sourceToParse
        );
    }
private Engine.IndexResult applyIndexOperation(
        Engine engine,
        long seqNo,
        long opPrimaryTerm,
        long version,
        @Nullable VersionType versionType,
        long ifSeqNo,
        long ifPrimaryTerm,
        long autoGeneratedTimeStamp,
        boolean isRetry,
        Engine.Operation.Origin origin,
        SourceToParse sourceToParse
    ) throws IOException {
        assert opPrimaryTerm <= getOperationPrimaryTerm()
            : "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
        ensureWriteAllowed(origin);
        Engine.Index operation;
        try {
            //组装index
            operation = prepareIndex(
                mapperService,
                sourceToParse,
                seqNo,
                opPrimaryTerm,
                version,
                versionType,
                origin,
                autoGeneratedTimeStamp,
                isRetry,
                ifSeqNo,
                ifPrimaryTerm,
                getRelativeTimeInNanos()
            );
            Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
            if (update != null) {
                return new Engine.IndexResult(update, operation.parsedDoc().id());
            }
        } catch (Exception e) {
           //省略代码
        }

        return index(engine, operation);
    }
 private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
        active.set(true);
        final Engine.IndexResult result;
        final Engine.Index preIndex = indexingOperationListeners.preIndex(shardId, index);
        try {
           	//省略代码
            //InternalEngine.index 逐条写入doc
            // Engine 封装了Lucene和translog的调用,对外提供读写接口.
            result = engine.index(preIndex);
           //省略代码
        } catch (Exception e) {
          	//省略代码
            indexingOperationListeners.postIndex(shardId, preIndex, e);
            throw e;
        }
        indexingOperationListeners.postIndex(shardId, preIndex, result);
        return result;
    }

其中engine.index的子类是InternalEngine.index方法

(2)先添加到Lucene,成功后再添加到translog

@Override
    public IndexResult index(Index index) throws IOException {
        assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
        final boolean doThrottle = index.origin().isRecovery() == false;
        try (ReleasableLock releasableLock = readLock.acquire()) {
          
                    //如果是主分片上的操作,则生成新的Index对象
                    if (index.origin() == Operation.Origin.PRIMARY) {
                        index = new Index(
                            index.uid(),
                            index.parsedDoc(),
                            generateSeqNoForOperationOnPrimary(index),
                            index.primaryTerm(),
                            index.version(),
                            index.versionType(),
                            index.origin(),
                            index.startTime(),
                            index.getAutoGeneratedIdTimestamp(),
                            index.isRetry(),
                            index.getIfSeqNo(),
                            index.getIfPrimaryTerm()
                        );

                        final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
                        if (toAppend == false) {
                            advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());
                        }
                    } else {
                        //其他分片就标记为已见
                        markSeqNoAsSeen(index.seqNo());
                    }

                    if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                        //把数据更新到Lucene中
                        indexResult = indexIntoLucene(index, plan);
                    } else {
                        indexResult = new IndexResult(
                            plan.versionForIndexing,
                            index.primaryTerm(),
                            index.seqNo(),
                            plan.currentNotFoundOrDeleted,
                            index.id()
                        );
                    }
                }
                if (index.origin().isFromTranslog() == false) {
                    final Translog.Location location;
                    //如果更新Lucene成功,则把索引数据放入到translog中
                    if (indexResult.getResultType() == Result.Type.SUCCESS) {
                        location = translog.add(new Translog.Index(index, indexResult));
                    } 
                    //省略代码
                    indexResult.setTranslogLocation(location);
                }
               //省略代码
                indexResult.setTook(relativeTimeInNanosSupplier.getAsLong() - index.startTime());
                indexResult.freeze();
                return indexResult;
         
       
    }

文档添加到Lucene

import org.apache.lucene.index.IndexWriter;

 private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws IOException {
        try {
            if (plan.addStaleOpToLucene) { //添加
                addStaleDocs(index.docs(), indexWriter);
            } else if (plan.useLuceneUpdateDocument) { //更新
                assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true);
                updateDocs(index.uid(), index.docs(), indexWriter);
            } else {
                // document does not exists, we can optimize for create, but double check if assertions are running
                assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
                addDocs(index.docs(), indexWriter);
            }
            return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted, index.id());
        } catch (Exception ex) {
          //省略代码
        }
    }
     private void addStaleDocs(final List<LuceneDocument> docs, final IndexWriter indexWriter) throws IOException {
        for (LuceneDocument doc : docs) {
            doc.add(softDeletesField); // soft-deleted every document before adding to Lucene
        }
        if (docs.size() > 1) {
            indexWriter.addDocuments(docs);
        } else {
            indexWriter.addDocument(docs.get(0));
        }
    }

在写入到transLog日志中,会先转成new Translog.Index 再添加到translog

    public Location add(final Operation operation) throws IOException {
         final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
		try {
            writeOperationWithSize(out, operation);
            final BytesReference bytes = out.bytes();
            try (ReleasableLock ignored = readLock.acquire()) {
                ensureOpen();
               //省略代码
                return current.add(bytes, operation.seqNo());
            }
        }
}

    private ReleasableBytesStreamOutput buffer;

 /**
     *将给定的字节添加到具有指定序列号的转录日志中;返回字节写入到的位置。
     * @param data  the bytes to write 要写入的字节数
     * @param seqNo the sequence number associated with the operation 与操作关联的序列号
     * @return the location the bytes were written to 字节写入到的位置
     * @throws IOException if writing to the translog resulted in an I/O exception
     */
    public Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
        //首先检查缓冲的字节数是否超过了forceWriteThreshold阈值,如果超过了,则调用writeBufferedOps方法将缓冲的操作写入。
        long bufferedBytesBeforeAdd = this.bufferedBytes;
        if (bufferedBytesBeforeAdd >= forceWriteThreshold) {
            writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4);
        }

        final Translog.Location location;
        synchronized (this) {
            ensureOpen();
            //代码确保buffer不为null,
            if (buffer == null) {
                buffer = new ReleasableBytesStreamOutput(bigArrays);
            }
            //数据写入缓冲区。然后更新minSeqNo和maxSeqNo的值
            assert bufferedBytes == buffer.size();
            final long offset = totalOffset;
            totalOffset += data.length();
            data.writeTo(buffer);

            assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
            assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;

            minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
            maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
            //并将seqNo添加到nonFsyncedSequenceNumbers中。操作计数器递增
            nonFsyncedSequenceNumbers.add(seqNo);

            operationCounter++;

            assert assertNoSeqNumberConflict(seqNo, data);
            //然后使用generation、offset和数据长度创建一个Translog.Location对象。
            location = new Translog.Location(generation, offset, data.length());
            //调用operationListener.operationAdded方法通知操作监听器有新的操作添加,并更新bufferedBytes的值。
            operationListener.operationAdded(data, seqNo, location);
            bufferedBytes = buffer.size();
        }

        return location;
    }

介绍一下Translog

/**Translog 是每个索引的分片组件,它以持久的方式记录所有未提交的索引操作。
在 Elasticsearch 中,每个 {@link org.elasticsearch.index.engine.InternalEngine} 都有一个 Translog 实例。
此外,从 Elasticsearch 2.0 开始,引擎还会在每次提交时记录一个 {@link *TRANSLOG_UUID_KEY},以确保 lucene 索引与事务日志文件之间的强关联。
此 UUID 用于防止从属于其他引擎的事务日志中意外恢复。

每个 Translog 只有一个 translog 文件打开,供 translog 生成 ID 随时引用的写入。
此 ID 将写入 {@code translog.ckp} 文件,该文件旨在适合单个磁盘块,因此文件的写入是原子的。
检查点文件在 translog 的每个 fsync 操作上写入,并记录写入的操作数、当前 translog 的文件生成、其 fsync 偏移量(以字节为单位)以及其他重要统计信息。

当当前转录文件达到特定大小 ({@link IndexSettingsINDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING}) 时,或者当新旧操作之间明确分离时(在主要术语更改时),
将重新打开当前文件进行只读,并创建一个新的只写文件。
任何非最新的、只读的 translog 文件总是有一个与之关联的 {@code translog-{gen}.ckp},它是其上一个 {@code translog.ckp} 的同步副本,因此在灾难恢复中,最后一个 fsync 偏移量、操作数等仍会保留。
**/
public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1219617.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Flutter NestedScrollView 、SliverAppBar全解析,悬浮菜单的应用

在我们开发过程中经常会使用到悬浮菜单的使用&#xff0c;当我们滑动到指定位置后&#xff0c;菜单会自动悬浮。 实现效果如下&#xff08;左为滑动前、右为滑动后&#xff09;&#xff1a; 上述便是通过NestedScrollView 、SliverAppBar实现的效果&#xff0c;通过两个控件我…

HTTPS加密为什么能保证网络安全?

随着互联网的普及和发展&#xff0c;网络安全问题日益严重。为了保护用户的隐私和数据安全&#xff0c;许多网站都采用了HTTPS加密技术。那么&#xff0c;HTTPS加密为什么可以保证网络安全呢&#xff1f; 原因是HTTP协议采用的是数据明文传输方式。用户从客户端浏览器提交数据…

基于Java的鲜花商店的设计与实现,ssm+jsp,MySQL数据库,前台用户+后台管理,完美运行,有一万字论文

目录 演示视频 基本介绍 选题背景 系统结构 论文目录 系统截图 演示视频 基本介绍 基于Java的鲜花商店的设计与实现&#xff0c;ssmjsp&#xff0c;MySQL数据库&#xff0c;前台用户后台管理&#xff0c;完美运行&#xff0c;有一万字论文。 前台功能&#xff1a;注册登…

Python Web APP在宝塔发布

本地测试运行&#xff1a;uvicorn main:app --host 127.0.0.1 --port 8082 --reload 宝塔发布&#xff1a; 运行配置——>启动模式&#xff1a;worker_class uvicorn.workers.UvicornWorker

Windows安装Elasticsearch并结合内网穿透实现远程访问

文章目录 系统环境1. Windows 安装Elasticsearch2. 本地访问Elasticsearch3. Windows 安装 Cpolar4. 创建Elasticsearch公网访问地址5. 远程访问Elasticsearch6. 设置固定二级子域名 Elasticsearch是一个基于Lucene库的分布式搜索和分析引擎&#xff0c;它提供了一个分布式、多…

Win10远程连接服务器失败,报错:出现了内部错误

背景&#xff1a;本地windows10专业版电脑远程Windows虚拟机报错&#xff0c;但实际检查控制台发现&#xff0c;虚拟机状态正常&#xff0c;只是本地远程连接莫名其妙断开&#xff0c;并报错出现了内部错误&#xff1a; 原因&#xff1a;win10客户端RDP兼容性的问题 解决方法&…

数据资产“入表”是不是红利?国企怎么认识?怎么利用?

毫无疑问&#xff0c;数字资产入表是红利。 数据资产入表意味着将数据资源作为企业资产进行确认和计量&#xff0c;解决了数据资源作为非物质资产未被充分认可和有效计量的问题&#xff0c;意味着数据完成了从自然资源到经济资产的跨越。上海数据交易所总经理汤奇峰此前表示&a…

代码混淆的原理是什么?常见代码混淆方法介绍

目录 一、代码混淆的原理 二、代码混淆的方法 三、常见的代码混淆方式 本文主要想你介绍代码混淆的原理&#xff0c;常见代码混淆方法&#xff0c;欢迎查阅~ 移动应用代码安全非常重要&#xff0c;代码逆向会导致代码逻辑被获取&#xff0c;进一步导致控制流被hook&#xff0…

忆联消费级SSD AH660:将用户体验推向新高度

自1989年IBM推出世界上第一款固态硬盘&#xff08;SSD&#xff09;以来&#xff0c;SSD在三十多年的时间中历经了多次技术革新和市场变革&#xff0c;早已成为个人电脑、汽车电子、数据中心、物联网终端等领域的主流存储产品&#xff0c;并广泛应用于各行各业&#xff0c;在202…

【LeetCode:2736. 最大和查询 | 贪心 + 二分 + 单调栈】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…

【学习笔记】Java安全之动态加载字节码

文章目录 什么是Java的字节码利用URLClassLoader加载远程class文件利用ClassLoader#defineClass直接加载字节码利用TemplatesImpl加载字节码利用BCEL ClassLoader加载字节码 最近在学习Phith0n师傅的知识星球的Java安全漫谈系列&#xff0c;随手记下笔记 什么是Java的字节码 J…

双向链表的知识点+例题

1.链表的种类 题中常考查以下两种&#xff1a; 上一讲我们学了无头单向非循环链表&#xff0c;这节&#xff0c;让我们看一下双向链表的操作吧~ 2基本操作 1&#xff0c;定义双向链表 2&#xff0c;创建一个节点 3&#xff0c;初始化双链表 4&#xff0c;尾插一个节点 5打印…

C语言判断素数(ZZULIOJ1057:素数判定)

题目描述 输入一个正整数n&#xff0c;判断n是否是素数&#xff0c;若n是素数&#xff0c;输出”Yes”,否则输出”No”。 注意&#xff1a;1不是素数。 输入&#xff1a;输入一个正整数n(n<1000) 输出&#xff1a;如果n是素数输出"Yes"&#xff0c;否则输出"…

Nuxt3框架全局引用外部JS/CSS文件的相关配置方法

全局引入外部文件方法&#xff1a; 找到根目录下的nuxt.config.ts配置文件&#xff1b;然后如上图所示&#xff0c;在defineNuxtConfig配置对象下app选项节点下&#xff0c;head对象中即可配置全局需要的JS或CSS文件&#xff1b; // https://nuxt.com/docs/api/configuration/…

Diagrams——制作短小精悍的流程图

今天为大家分享的是一款轻量级的流程图绘制软件——Diagrams。 以特定的图形符号加上说明&#xff0c;表示算法的图&#xff0c;称为流程图或框图。流程图是流经一个系统的信息流、观点流或部件流的图形代表。我们常用流程图来说明某一过程。 流程图使用一些标准符号代表某些类…

【Vue3 + webStorm】 求助,vite.config.js代理不生效

求助&#xff0c;vite.config.js代理不生效 上面为代理写法 上面为vue组件中,axios跳转写法 网页控制台一直跳转不到8080端口&#xff0c;请问是为什么&#xff1f;

标准库函数使用及源码

几个常规内存函数经常会使用的怀疑人生&#xff0c;现在整理一下 文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 以下对内存函数进行整理。 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、资源 MSDN网址 …

深度学习YOLO抽烟行为检测 - python opencv 计算机竞赛

文章目录 1 前言1 课题背景2 实现效果3 Yolov5算法3.1 简介3.2 相关技术 4 数据集处理及实验5 部分核心代码6 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于深度学习YOLO抽烟行为检测 该项目较为新颖&#xff0c;适合作为竞赛课…

2019年全国硕士研究生入学统一考试管理类专业学位联考数学试题——解析版

2019 年 1 月份管综初数真题 一、问题求解&#xff08;本大题共 5 小题&#xff0c;每小题 3 分&#xff0c;共 45 分&#xff09;下列每题给出 5 个选项中&#xff0c;只有一个是符合要求的&#xff0c;请在答题卡上将所选择的字母涂黑。 1、某车间计划 10 天完成一项任务&a…

小红书自动点赞评论脚本,可以群控多账号,按键精开源版代码分享

这个需要连接服务器&#xff0c;你可以在易语言配置一个服务端&#xff0c;然后设置好端口&#xff0c;脚本部署在模拟器或者云手机或者真机里面实现多账号点赞评论的效果&#xff0c;针对一个作品&#xff0c;按键精灵写的脚本&#xff0c;服务端的脚本需要自己写哈&#xff0…