Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

news2025/1/20 3:51:46

  • 一、相关API的handler
    • 1、接收HTTP请求的hander
    • 2、每一个数据节点(node)执行分片刷新的action是TransportShardFlushAction
  • 二、对indexShard执行刷新请求
    • 1、首先获取读锁,再获取刷新锁,如果获取不到根据参数决定是否直接返回还是等待
    • 2、在刷新之后translog需要滚动生成新的,这样不会影响正在进行的写入和删除时方便
    • 3、把IndexWriter中的数据持久化磁盘
    • 4、开始处理translog.log
      • (1) 首先把内存中的translog全部写入磁盘
      • (2) 删除磁盘中的对应的translog文件
    • 5、更新最后刷新时间和刷新最后提交的段信息
  • 三、通过源码得到一些结论

下面的图来自ElasticSearch——刷盘原理流程,这篇文章主要讲的是人工通过flush命令把批量写入内存segment的数据刷新进磁盘,不涉及到在translog.logLucene的数据结构。
通过这个流程知道ES如何把数据刷新进磁盘的,主要是下图的下半部分中fsync部分

在这里插入图片描述

一、相关API的handler

ActionModule.java

registerHandler.accept(new RestFlushAction()); 
actions.register(FlushAction.INSTANCE, TransportFlushAction.class); 
actions.register(TransportShardFlushAction.TYPE, TransportShardFlushAction.class);

1、接收HTTP请求的hander

public class RestFlushAction extends BaseRestHandler {

    @Override
    public List<Route> routes() {
        return List.of(
            new Route(GET, "/_flush"),
            new Route(POST, "/_flush"),
            new Route(GET, "/{index}/_flush"),
            new Route(POST, "/{index}/_flush")
        );
    }

    @Override
    public String getName() {
        return "flush_action";
    }

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index")));
        flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));
        flushRequest.force(request.paramAsBoolean("force", flushRequest.force()));
        flushRequest.waitIfOngoing(request.paramAsBoolean("wait_if_ongoing", flushRequest.waitIfOngoing()));
        return channel -> client.admin().indices().flush(flushRequest, new RestToXContentListener<>(channel));
    }
}

上面会执行下面这个,至于怎么到这里的,可以看Elasticsearch 8.9 Master节点处理请求源码

/**
 * Flush Action.
 */
public class TransportFlushAction extends TransportBroadcastReplicationAction<
    FlushRequest,
    FlushResponse,
    ShardFlushRequest,
    ReplicationResponse> {

    @Inject
    public TransportFlushAction(
        ClusterService clusterService,
        TransportService transportService,
        NodeClient client,
        ActionFilters actionFilters,
        IndexNameExpressionResolver indexNameExpressionResolver
    ) {
        super(
            FlushAction.NAME,
            FlushRequest::new,
            clusterService,
            transportService,
            client,
            actionFilters,
            indexNameExpressionResolver,
            TransportShardFlushAction.TYPE, //注意这个
            ThreadPool.Names.FLUSH
        );
    }

  //省略代码
}

这里需要注意上面的TransportShardFlushAction.TYPE

这里看一下它的父类TransportBroadcastReplicationAction

public abstract class TransportBroadcastReplicationAction<
    Request extends BroadcastRequest<Request>,
    Response extends BaseBroadcastResponse,
    ShardRequest extends ReplicationRequest<ShardRequest>,
    ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {

    private final ActionType<ShardResponse> replicatedBroadcastShardAction;
	 public TransportBroadcastReplicationAction(
        String name,
        Writeable.Reader<Request> requestReader,
        ClusterService clusterService,
        TransportService transportService,
        NodeClient client,
        ActionFilters actionFilters,
        IndexNameExpressionResolver indexNameExpressionResolver,
        ActionType<ShardResponse> replicatedBroadcastShardAction,
        String executor
    ) {
       	//省略代码
       	//这里即上面的TransportShardFlushAction.TYPE
        this.replicatedBroadcastShardAction = replicatedBroadcastShardAction;
       
    }
	@Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
        clusterService.threadPool().executor(executor).execute(ActionRunnable.wrap(listener, createAsyncAction(task, request)));
    }
	private CheckedConsumer<ActionListener<Response>, Exception> createAsyncAction(Task task, Request request) {
        return new CheckedConsumer<ActionListener<Response>, Exception>() {
			//省略代码
            @Override
            public void accept(ActionListener<Response> listener) {
              	
                final ClusterState clusterState = clusterService.state();
                final List<ShardId> shards = shards(request, clusterState);
                final Map<String, IndexMetadata> indexMetadataByName = clusterState.getMetadata().indices();
				//遍历分片
                try (var refs = new RefCountingRunnable(() -> finish(listener))) {
                    for (final ShardId shardId : shards) {
                        // NB This sends O(#shards) requests in a tight loop; TODO add some throttling here?
                        shardExecute(
                            task,
                            request,
                            shardId,
                            ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire())
                        );
                    }
                }
            }
			//省略代码
        };
    }
    protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
        assert Transports.assertNotTransportThread("may hit all the shards");
        ShardRequest shardRequest = newShardRequest(request, shardId);
        shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
        //通过执行replicatedBroadcastShardAction,即TransportShardFlushAction.class来实现分片的刷新
        client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener);
    }
}

2、每一个数据节点(node)执行分片刷新的action是TransportShardFlushAction

public class TransportShardFlushAction extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ReplicationResponse> {
	//主分片执行刷新
    @Override
    protected void shardOperationOnPrimary(
        org.elasticsearch.action.admin.indices.flush.ShardFlushRequest shardRequest,
        IndexShard primary,
        ActionListener<PrimaryResult<ShardFlushRequest, ReplicationResponse>> listener
    ) {
        ActionListener.completeWith(listener, () -> {
            primary.flush(shardRequest.getRequest());
            logger.trace("{} flush request executed on primary", primary.shardId());
            return new PrimaryResult<>(shardRequest, new ReplicationResponse());
        });
    }
    //副本分片执行刷新
  @Override
    protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
        ActionListener.completeWith(listener, () -> {
            replica.flush(request.getRequest());
            logger.trace("{} flush request executed on replica", replica.shardId());
            return new ReplicaResult();
        });
    }  
}

其中shardOperationOnReplicaTransportReplicationAction下的onResponse方法触发

public abstract class TransportReplicationAction<
    Request extends ReplicationRequest<Request>,
    ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
    Response extends ReplicationResponse> extends TransportAction<Request, Response> {
    //省略代码
private final class AsyncReplicaAction extends AbstractRunnable implements ActionListener<Releasable> {

	 @Override
        public void onResponse(Releasable releasable) {
            assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
            try {
                shardOperationOnReplica(
                    replicaRequest.getRequest(),
                    replica,
                    ActionListener.wrap((replicaResult) -> replicaResult.runPostReplicaActions(ActionListener.wrap(r -> {
                        final ReplicaResponse response = new ReplicaResponse(
                            replica.getLocalCheckpoint(),
                            replica.getLastSyncedGlobalCheckpoint()
                        );
                        releasable.close(); // release shard operation lock before responding to caller
                        if (logger.isTraceEnabled()) {
                            logger.trace(
                                "action [{}] completed on shard [{}] for request [{}]",
                                transportReplicaAction,
                                replicaRequest.getRequest().shardId(),
                                replicaRequest.getRequest()
                            );
                        }
                        setPhase(task, "finished");
                        onCompletionListener.onResponse(response);
                    }, e -> {
                        Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
                        responseWithFailure(e);
                    })), e -> {
                        Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
                        AsyncReplicaAction.this.onFailure(e);
                    })
                );
                // TODO: Evaluate if we still need to catch this exception
            } catch (Exception e) {
                Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
                AsyncReplicaAction.this.onFailure(e);
            }
        }
}



}

二、对indexShard执行刷新请求

/**
     * 对引擎执行给定的刷新请求。
     *
     * @param request the flush request
     * @return <code>false</code> if <code>waitIfOngoing==false</code> and an ongoing request is detected, else <code>true</code>.
     *         If <code>false</code> is returned, no flush happened.
     */
    public boolean flush(FlushRequest request) {
        final boolean waitIfOngoing = request.waitIfOngoing();
        final boolean force = request.force();
        logger.trace("flush with {}", request);
        verifyNotClosed();
        final long time = System.nanoTime();
        // TODO: Transition this method to async to support async flush 将此方法转换为异步以支持异步刷新
        PlainActionFuture<Engine.FlushResult> future = PlainActionFuture.newFuture();
        getEngine().flush(force, waitIfOngoing, future);
        Engine.FlushResult flushResult = future.actionGet();
        flushMetric.inc(System.nanoTime() - time);
        return flushResult.flushPerformed();
    }
/**
   刷新引擎的状态,包括事务日志、清除内存以及将 Lucene 索引中的文档写入磁盘。此方法将在调用线程上同步刷新。但是,根据引擎实现的不同,在触发侦听器之前,无法保证完全的耐用性。
     *
     * @param force         如果为 true则即使不需要提交任何更改,也会执行 Lucene 提交
     * @param waitIfOngoing 如果为 true,则此调用将阻止,直到所有当前正在运行的刷新都完成。否则,此调用将返回而不阻塞。
     * @param listener      在达到完全耐久性后通知。如果 waitIfOngoing==false 并且检测到正在进行的请求,则不会发生刷新,侦听器将完成,并带有指示没有刷新和未知生成的标记
     */
    public abstract void flush(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException;

其中实际调用的是Engine的子类InternalEngine

1、首先获取读锁,再获取刷新锁,如果获取不到根据参数决定是否直接返回还是等待

 @Override
    public void flush(boolean force, boolean waitIfOngoing, ActionListener<FlushResult> listener) throws EngineException {
     	//省略代码
        final long generation;
        //获取读锁(readLock.acquire())
        try (ReleasableLock lock = readLock.acquire()) {
            ensureOpen();
            //尝试获取刷新锁(flushLock)
            if (flushLock.tryLock() == false) {
                //如果无法立即获取到锁,则根据 waitIfOngoing 的值决定是等待刷新完成还是立即返回。
                // if we can't get the lock right away we block if needed otherwise barf
                if (waitIfOngoing == false) {
                    logger.trace("detected an in-flight flush, not blocking to wait for it's completion");
                    listener.onResponse(FlushResult.NO_FLUSH);
                    return;
                }
                logger.trace("waiting for in-flight flush to finish");
                //如果成功获取到刷新锁
                flushLock.lock();
                logger.trace("acquired flush lock after blocking");
            } else {
                logger.trace("acquired flush lock immediately");
            }

            try {
               //仅当下面这四种情况才进行刷新操作
               //(1) Lucene 有未提交的文档,
               //或 (2) 被调用方强制执行,
               //或 (3) 新创建的提交指向不同的 translog 生成(可以释放 translog),
               //或 (4) 上次提交中的本地检查点信息过时
                boolean hasUncommittedChanges = hasUncommittedChanges(); 检查是否有未提交的文档变更(hasUncommittedChanges())
                if (hasUncommittedChanges
                    || force  //是否需要强制刷新(force)
                    || shouldPeriodicallyFlush() //是否需要定期刷新(shouldPeriodicallyFlush())
                    //或者本地检查点信息是否过时
                    || getProcessedLocalCheckpoint() > Long.parseLong(
                        lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
                    )) {
                    //确保可以执行刷新(ensureCanFlush())
                    ensureCanFlush();
                    //获取最后一个写入的事务日志位置(getTranslogLastWriteLocation())
                    Translog.Location commitLocation = getTranslogLastWriteLocation();
                    try {
                        //如果当前 translog 不为空,则将其滚动到新生成。这不会提交 translog。这样正在进行的写入和刷新磁盘用的translog就不冲突
                        translog.rollGeneration();
                        logger.trace("starting commit for flush; commitTranslog=true");
                        long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong();
                        //记录即将生成的区段,以便实时版本地图存档记录在刷新时转到存档的文档 ID 的正确区段生成。
                        // 否则,如果在提交 IndexWriter 后立即对新文档进行索引更新,并且刷新将它们移动到存档中,则一旦我们在搜索分片上看到该段生成,
                        // 我们就会将它们从存档中清除,但这些更改不包括在提交中,因为它们发生在提交之后
                        //提交索引写入器
                        preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1);
                        //执行刷新操作
                        commitIndexWriter(indexWriter, translog);
                        logger.trace("finished commit for flush");
                        //我们需要刷新以清除旧版本值
                        refresh("version_table_flush", SearcherScope.INTERNAL, true);
                        translog.trimUnreferencedReaders();
                        //更新最后一次刷新的时间戳
                        this.lastFlushTimestamp = lastFlushTimestamp;
                    } catch (AlreadyClosedException e) {
                        failOnTragicEvent(e);
                        throw e;
                    } catch (Exception e) {
                        throw new FlushFailedEngineException(shardId, e);
                    }
                    //刷新最后提交的段信息(refreshLastCommittedSegmentInfos()
                    refreshLastCommittedSegmentInfos();
                    //获取刷新后的段信息
                    generation = lastCommittedSegmentInfos.getGeneration();
                    //调用刷新监听器的 afterFlush 方法
                    flushListener.afterFlush(generation, commitLocation);
                } else {
                    //如果不满足刷新条件,则直接获取最后提交的段信息的代数。
                    generation = lastCommittedSegmentInfos.getGeneration();
                }
            } catch (FlushFailedEngineException ex) {
                maybeFailEngine("flush", ex);
                listener.onFailure(ex);
                return;
            } catch (Exception e) {
                listener.onFailure(e);
                return;
            } finally {
                //释放刷新锁
                flushLock.unlock();
                logger.trace("released flush lock");
            }
        }
        if (engineConfig.isEnableGcDeletes()) {
            pruneDeletedTombstones();
        }
        //等待提交的持久性完成,并通过回调函数返回刷新结果
        waitForCommitDurability(generation, listener.map(v -> new FlushResult(true, generation)));
    }

2、在刷新之后translog需要滚动生成新的,这样不会影响正在进行的写入和删除时方便

/*
     *如果当前 translog 生成不为空,则将其滚动到新生成。这不会提交 translog。
     * @throws IOException if an I/O exception occurred during any file operations
     */
    public void rollGeneration() throws IOException {
        syncBeforeRollGeneration();
        //检查当前操作数是否为0,并且主要期限与当前的主要期限相同。如果满足条件,则直接返回,不执行后续操作
        if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {
            return;
        }
        //使用writeLock.acquire()获取写锁,确保代码块中的操作是以独占方式进行的。
        try (Releasable ignored = writeLock.acquire()) {
            //调用ensureOpen()方法确保资源处于打开状态
            ensureOpen();
            try {
                //将当前的Translog关闭并转换为TranslogReader对象。
                final TranslogReader reader = current.closeIntoReader();
                //,将reader添加到readers集合中
                readers.add(reader);
                //使用断言确认检查点文件中的generation与当前的generation相匹配。
                assert Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)).generation == current.getGeneration();
                //将检查点文件复制到指定的位置
                copyCheckpointTo(location.resolve(getCommitCheckpointFileName(current.getGeneration())));
                //创建一个新的Translog文件,并更新检查点数据
                current = createWriter(current.getGeneration() + 1);
                logger.trace("current translog set to [{}]", current.getGeneration());
            } catch (final Exception e) {
                tragedy.setTragicException(e);
                closeOnTragicEvent(e);
                throw e;
            }
        }
    }

3、把IndexWriter中的数据持久化磁盘

/**
     * 把索引写入器(IndexWriter)数据持久化到磁盘,
     * 用到了lucene 索引与 translog 关联的 translog uuid,这样会把这个uuid及之前的数据写入到磁盘
     * @param writer   the index writer to commit
     * @param translog the translog
     */
    protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
        //确保可以执行刷新操作
        ensureCanFlush();
        try {
            //获取本地检查点(localCheckpoint)
            final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
            //设置一个回调函数,用来设置索引写入器的提交数据,
            writer.setLiveCommitData(() -> {
                /*
                 * 上面捕获的用户数据(例如本地检查点)包含在 Lucene 刷新段之前必须评估的数据,包括本地检查点和其他值。
                 * 最大序列号是不同的,我们永远不希望最大序列号小于进入 Lucene 提交的最后一个序列号,否则在从此提交点恢复并随后将新文档写入索引时,我们将面临对两个不同文档重复使用序列号的风险。
                 * 由于我们只知道哪些 Lucene 文档在 {@link IndexWritercommit()} 调用刷新所有文档后进入最终提交,因此我们将最大序列号的计算推迟到提交数据迭代器的调用时间(在所有文档刷新到 Lucene 之后发生)。
                 */
                //创建一个包含提交数据的映射(commitData),包括事务日志的UUID、本地检查点、最大序列号等信息。
                final Map<String, String> commitData = Maps.newMapWithExpectedSize(8);
                // translog.getTranslogUUID()返回用于将 lucene 索引与 translog 关联的 translog uuid。
                commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
                commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
                commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
                commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
                commitData.put(HISTORY_UUID_KEY, historyUUID);
                final String currentForceMergeUUID = forceMergeUUID;
                if (currentForceMergeUUID != null) {
                    commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
                }
                commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
                commitData.put(ES_VERSION, Version.CURRENT.toString());
                logger.trace("committing writer with commit data [{}]", commitData);
                //将提交数据返回给索引写入器
                return commitData.entrySet().iterator();
            });
            //禁用大合并后的定期刷新(shouldPeriodicallyFlushAfterBigMerge)
            shouldPeriodicallyFlushAfterBigMerge.set(false);
            //提交索引写入器,将数据写入磁盘
            writer.commit();
        } catch (final Exception ex) {
            try {
                failEngine("lucene commit failed", ex);
            } catch (final Exception inner) {
                ex.addSuppressed(inner);
            }
            throw ex;
        } catch (final AssertionError e) {
          //省略代码
          }
    }

在Bulk批量给索引增加数据源码 中,后面的文档添加到Lucene的这个方法indexIntoLucene 里面就是写入了indexWriter 这里commit会直接写入到磁盘

其中indexWriter的全路径org.apache.lucene.index.IndexWriter;

4、开始处理translog.log

在上面rollGeneration()方法就把最后一个translog放入了readers,也为删除做准备

   //Translog 列表保证按 Translog 生成的顺序排列,
    private final List<TranslogReader> readers = new ArrayList<>();
 /**
     * Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum
     * required generation
     * 通过要求 {@link TranslogDeletionPolicy} 提供所需的最小生成来修剪未引用的 translog 生成
     */
    public void trimUnreferencedReaders() throws IOException {
        //首先获取读取锁,并检查是否有可以修剪的读取器
        try (ReleasableLock ignored = readLock.acquire()) {
            //如果已关闭或者最小引用的代数与最小文件代数相同,则不进行修剪操作。
            if (closed.get()) {
                // 我们可能会因一些悲惨事件而关闭,不要删除任何内容
                return;
            }
            if (getMinReferencedGen() == getMinFileGeneration()) {
                return;
            }
        }
        // 将大部分数据写入磁盘,以减少写入锁的持有时间
        sync();
        //获取写入锁,并再次检查是否已关闭或者最小引用的代数与最小文件代数相同。
        try (ReleasableLock ignored = writeLock.acquire()) {
            if (closed.get()) {
                // we're shutdown potentially on some tragic event, don't delete anything
                //我们可能会因一些悲惨事件而关闭,不要删除任何内容
                return;
            }
            final long minReferencedGen = getMinReferencedGen();
            //代码遍历读取器列表,
            for (Iterator<TranslogReader> iterator = readers.iterator(); iterator.hasNext();) {
                TranslogReader reader = iterator.next();
                if (reader.getGeneration() >= minReferencedGen) {
                    break;
                }
                //删除不再被引用的读取器
                iterator.remove();
                IOUtils.closeWhileHandlingException(reader);
                //translogPath是translog
                final Path translogPath = reader.path();
                logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath);
                //打开translog时使用检查点来了解应从哪些文件中恢复。现在,我们更新检查点以忽略要删除的文件。
                // 请注意,recoverFromFiles 中有一个规定,允许我们同步检查点但在删除文件之前崩溃的情况。立即同步,以确保最多有一个未引用的生成。
                //将所有缓冲的运算写入磁盘和 fsync 文件。current是TranslogWriter
                current.sync();
                //并删除相关文件
                deleteReaderFiles(reader);
            }
            assert readers.isEmpty() == false || current.generation == minReferencedGen
                : "all readers were cleaned but the minReferenceGen ["
                    + minReferencedGen
                    + "] is not the current writer's gen ["
                    + current.generation
                    + "]";
        } catch (final Exception ex) {
            closeOnTragicEvent(ex);
            throw ex;
        }
    }

(1) 首先把内存中的translog全部写入磁盘

/**
     * Sync's the translog.
     */
    public void sync() throws IOException {
        try (ReleasableLock lock = readLock.acquire()) {
            if (closed.get() == false) {
                current.sync();
            }
        } catch (final Exception ex) {
            closeOnTragicEvent(ex);
            throw ex;
        }
    }

    /**
     *将所有缓冲的运算写入磁盘和 fsync 文件。
     * 同步过程中的任何异常都将被解释为悲剧性异常,写入器将在引发异常之前关闭。
     */
    public void sync() throws IOException {
    	//这里一个是最大,一个是-2L,直接就是强制把translog从内存中刷进磁盘
        syncUpTo(Long.MAX_VALUE, SequenceNumbers.UNASSIGNED_SEQ_NO);
    }
 /**
     *将数据同步到指定的偏移量和全局检查点,确保数据的持久性。如果满足一定条件,会执行同步操作
     * @return <code>true</code> if this call caused an actual sync operation
     */
    final boolean syncUpTo(long offset, long globalCheckpointToPersist) throws IOException {
        //检查lastSyncedCheckpoint的偏移量和全局检查点是否小于指定的偏移量和全局检查点,并且需要进行同步操作
        if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist) && syncNeeded()) {
          	//省略代码
            //获取一个同步锁
            synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait
                if ((lastSyncedCheckpoint.offset < offset || lastSyncedCheckpoint.globalCheckpoint < globalCheckpointToPersist)
                    && syncNeeded()) {
                    //双重检查锁定 - 除非我们必须这样做,否则我们不想 fsync,现在我们有了锁,我们应该再次检查,因为如果这个代码很忙,我们可能已经足够 fsync 了
                    final Checkpoint checkpointToSync;
                    final List<Long> flushedSequenceNumbers;
                    final ReleasableBytesReference toWrite;
                    //再获取一个写锁
                    try (ReleasableLock toClose = writeLock.acquire()) {
                        synchronized (this) {
                            ensureOpen();
                            checkpointToSync = getCheckpoint();
                            //获取最新的写入数据buffer。
                            toWrite = pollOpsToWrite();
                            //如果没有未同步的序列号,则设置flushedSequenceNumbers为null;
                            if (nonFsyncedSequenceNumbers.isEmpty()) {
                                flushedSequenceNumbers = null;
                            } else {
                                //否则,将nonFsyncedSequenceNumbers赋值给flushedSequenceNumbers
                                flushedSequenceNumbers = nonFsyncedSequenceNumbers;
                                //nonFsyncedSequenceNumbers重新初始化为一个空的列表
                                nonFsyncedSequenceNumbers = new ArrayList<>(64);
                            }
                        }

                        try {      
                            //写入管道操作,下面channel.force会强制刷盘
                            writeAndReleaseOps(toWrite);
                            assert channel.position() == checkpointToSync.offset;
                        } catch (final Exception ex) {
                            closeWithTragicEvent(ex);
                            throw ex;
                        }
                    }
                    //现在在同步块之外执行实际的 fsync,以便我们可以继续写入缓冲区等。
                    try {
                        assert lastSyncedCheckpoint.offset != checkpointToSync.offset || toWrite.length() == 0;
                        if (lastSyncedCheckpoint.offset != checkpointToSync.offset) {
                            //则调用channel.force(false)方法来强制刷新通道的数据到磁盘。
                            channel.force(false);
                        }
                        //更新检查点
                        //将checkpointToSync写入到指定的checkpointChannel和checkpointPath中
                        Checkpoint.write(checkpointChannel, checkpointPath, checkpointToSync);
                    } catch (final Exception ex) {
                        closeWithTragicEvent(ex);
                        throw ex;
                    }
                    if (flushedSequenceNumbers != null) {
                        //则遍历flushedSequenceNumbers列表,并对每个元素调用persistedSequenceNumberConsumer::accept方法处理。
                        flushedSequenceNumbers.forEach(persistedSequenceNumberConsumer::accept);
                    }
                    assert lastSyncedCheckpoint.offset <= checkpointToSync.offset
                        : "illegal state: " + lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset;
                    lastSyncedCheckpoint = checkpointToSync; // write protected by syncLock
                    return true;
                }
            }
        }
        return false;
    }

其中 toWrite = pollOpsToWrite(); 就是下面这个,至于buffer是什么?
可以看一下Elasticsearch 8.9 Bulk批量给索引增加数据源码 中的 public Translog.Location add(final BytesReference data, final long seqNo)方法体中会把给this.buffer赋值

 private synchronized ReleasableBytesReference pollOpsToWrite() {
        ensureOpen();
        if (this.buffer != null) {
            //则将 buffer 赋值给 toWrite 变量,并将 buffer 置为 null,同时将 bufferedBytes 置为 0。
            ReleasableBytesStreamOutput toWrite = this.buffer;
            this.buffer = null;
            this.bufferedBytes = 0;
            //创建一个新的 ReleasableBytesReference 对象,该对象的字节为 toWrite.bytes(),并将 toWrite 作为释放引用
            return new ReleasableBytesReference(toWrite.bytes(), toWrite);
        } else {
            //返回一个空的 ReleasableBytesReference 对象
            return ReleasableBytesReference.empty();
        }
    }

下面就是正常的把数据写入到writeToFile

//将给定的 ReleasableBytesReference 对象写入到文件中,上面有channel.force会强制把Channels刷盘
    private void writeAndReleaseOps(ReleasableBytesReference toWrite) throws IOException {
        try (ReleasableBytesReference toClose = toWrite) {
            assert writeLock.isHeldByCurrentThread();
            final int length = toWrite.length();
            //检查 toWrite 的长度是否为0,如果是,则直接返回
            if (length == 0) {
                return;
            }
            //试从 diskIoBufferPool 中获取一个 ioBuffer,用于写入操作。
            ByteBuffer ioBuffer = diskIoBufferPool.maybeGetDirectIOBuffer();
            //如果获取不到 ioBuffer,则说明当前线程不使用直接缓冲区,代码会直接将数据写入到文件中,而不是先复制到 ioBuffer 中。
            if (ioBuffer == null) {
                // not using a direct buffer for writes from the current thread so just write without copying to the io buffer
                BytesRefIterator iterator = toWrite.iterator();
                BytesRef current;
                while ((current = iterator.next()) != null) {
                    Channels.writeToChannel(current.bytes, current.offset, current.length, channel);
                }
                return;
            }
            //如果成功获取到 ioBuffer,代码会使用迭代器遍历 toWrite 中的数据,并将数据逐个写入到 ioBuffer 中。
            BytesRefIterator iterator = toWrite.iterator();
            BytesRef current;
            while ((current = iterator.next()) != null) {
                int currentBytesConsumed = 0;
                while (currentBytesConsumed != current.length) {
                    int nBytesToWrite = Math.min(current.length - currentBytesConsumed, ioBuffer.remaining());
                    ioBuffer.put(current.bytes, current.offset + currentBytesConsumed, nBytesToWrite);
                    currentBytesConsumed += nBytesToWrite;
                    //如果 ioBuffer 的空间已满,
                    if (ioBuffer.hasRemaining() == false) {
                        //则将 ioBuffer 翻转(从写模式切换到读模式)
                        ioBuffer.flip();
                        //然后调用 writeToFile 方法将数据写入到文件中,
                        writeToFile(ioBuffer);
                        //并清空 ioBuffer。
                        ioBuffer.clear();
                    }
                }
            }
            //再次翻转 ioBuffer
            ioBuffer.flip();
            //并调用 writeToFile 方法将剩余的数据写入到文件中。
            writeToFile(ioBuffer);
        }
    }

(2) 删除磁盘中的对应的translog文件

/**
     * 删除与读取器关联的所有文件。package-private,以便此时能够模拟节点故障
     */
    void deleteReaderFiles(TranslogReader reader) {
        IOUtils.deleteFilesIgnoringExceptions(
            reader.path(),
            reader.path().resolveSibling(getCommitCheckpointFileName(reader.getGeneration()))
        );
    }
 /** 删除所有给定的文件,禁止所有抛出的 {@link IOException}。某些文件可能为 null,如果是这样,则忽略它们。
     *
     * @param files the paths of files to delete
     */
    public static void deleteFilesIgnoringExceptions(final Path... files) {
        for (final Path name : files) {
            if (name != null) {
                // noinspection EmptyCatchBlock
                try {
                    Files.delete(name);
                } catch (final IOException ignored) {

                }
            }
        }
    }

5、更新最后刷新时间和刷新最后提交的段信息

//刷新最后提交的段信息(refreshLastCommittedSegmentInfos()
refreshLastCommittedSegmentInfos();
//获取刷新后的段信息
generation = lastCommittedSegmentInfos.getGeneration();
//调用刷新监听器的 afterFlush 方法
 flushListener.afterFlush(generation, commitLocation);
private void refreshLastCommittedSegmentInfos() {
        /*
         * 在某些情况下,如果引擎由于意外事件关闭,我们无法获取写锁并等待独占访问。
         * 这可能会减少对存储的引用计数,从而关闭存储。为了保证能够使用存储,我们需要增加引用计数
         */
        store.incRef();
        try {
            //读取存储中的最后提交的段信息,并将结果赋给lastCommittedSegmentInfos变量。
            lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
        } catch (Exception e) {
            //读取过程中发生异常,代码会检查引擎是否已关闭。如果引擎未关闭,则记录一个警告日志,并检查异常是否是Lucene的损坏异常。
            // 如果是损坏异常,代码会抛出FlushFailedEngineException异常
            if (isClosed.get() == false) {
                logger.warn("failed to read latest segment infos on flush", e);
                if (Lucene.isCorruptionException(e)) {
                    throw new FlushFailedEngineException(shardId, e);
                }
            }
        } finally {
            //代码会减少对存储的引用计数,以确保引用计数的正确性和资源释放。
            store.decRef();
        }
    }

三、通过源码得到一些结论

1、translog也会刷进磁盘,不只是在内存中存在,异步方式会定时刷translog到磁盘
2、等segment刷进磁盘后,会把对应translog.log磁盘文件删除,所以translog说是临时文件,没有问题,
3、translog的删除方式是先刷到磁盘,直接通过删除文件的方式删除translog

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1293433.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Jquery easyui异步提交表单的两种方式

这篇文章分享一下easyui常用的两种表单异步提交的方式。 目录 第一种&#xff1a;利用ajax提交 $.post() $.ajax() 第二种&#xff1a;使用easyui提供的表单提交方式 首先&#xff0c;准备一个简单的表单&#xff0c;包含三个输入框&#xff0c;在页面引入easyui的js文件。…

使用智能AI文心一言处理采集数据

简数采集器支持调用百度智能AI文心一言大模型API接口&#xff0c;可对采集的数据进行研究分析&#xff0c;内容创作。 文心一言API使用方法如下&#xff1a; 目录 1. 采集数据 2. 申请API 3. 对接文心一言API 4. 设置文心一言API的执行指令 5. 使用文心一言API处理采集数…

苹果mac电脑如何彻底删除卸载软件?

在苹果电脑上安装和使用软件非常容易&#xff0c;但是卸载软件却可能会变得复杂和困难。不像在Windows上&#xff0c;你不能简单地在控制面板中找到已安装的程序并卸载它们。因此&#xff0c;在这篇文章中&#xff0c;我们将讨论苹果电脑怎么彻底删除软件。 CleanMyMac X全新版…

『 C++ 』BinarySearchTree搜索二叉树

文章目录 前言 &#x1f995;二叉搜索树的概念 &#x1f995;搜索二叉树的初始化 &#x1f995;Insert( )插入函数 &#x1f995;&#x1f47e; InsertR( ) 插入函数(递归) InOrder( ) 中序遍历打印 &#x1f995;Find( ) 查找函数 &#x1f995;&#x1f47e; Find( ) 查找函数…

系列学习前端之第 2 章:一文精通 HTML

全套学习 HTMLCSSJavaScript 代码和笔记请下载网盘的资料&#xff1a; 链接: https://pan.baidu.com/s/1-vY2anBdrsBSwDZfALZ6FQ 提取码: 6666 HTML 全称&#xff1a;HyperText Markup Language&#xff08;超文本标记语言&#xff09; 1、 HTML 标签 1. 标签又称元素&#…

第二十一章总结博客

网络程序设计基础 局域网与互联网 为了实现两台计算机的通信&#xff0c;必须用一个网络线路连接两台计算机。如下图所示 网络协议 1.IP协议 IP是Internet Protocol的简称&#xff0c;是一种网络协议。Internet 网络采用的协议是TCP/IP协议&#xff0c;其全称是Transmission …

区块链媒体:Web3.0时代的推广创新10爆款策略概览-华媒舍

随着Web3.0时代的到来&#xff0c;互联网推广正经历着一场创新的革命。在这个新的时代背景下&#xff0c;一系列全新的推广策略正在兴起&#xff0c;引领着市场的变革。本文将基于这一背景&#xff0c;为大家介绍Web3.0时代中的10大爆款推广策略概览。 1. 个性化推广 在Web3.0…

Linux:缓冲区的概念理解

文章目录 缓冲区什么是缓冲区&#xff1f;缓冲区的意义是什么&#xff1f;缓冲区的刷新方式 理解缓冲区用户缓冲区和内核缓冲区缓冲区在哪里&#xff1f; 本篇主要总结的是关于缓冲区的概念理解&#xff0c;以及再次基础上对文件的常用接口进行一定程度的封装 缓冲区 什么是缓…

基于ssm家庭理财系统源码和论文

基于ssm家庭理财系统源码和论文743 idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 环境&#xff1a; jdk8 tomcat8.5 开发技术 ssm 摘要 随着Internet的发展&#xff0c;人们的日常生活已经离不开网络。未来人们的生活与工作将变得越来越数字化&#xff…

Linux_CentOS_7.9配置oracle sqlplus、rman实现上下按键切换历史命令等便捷效率功能之简易记录

配置oracle sqlplus以及rman可以上下按键切换历史命令等便捷效率功能 设置前提是已经yum安装了rlwrap软件具体软件下载及配置参考文章http://t.csdnimg.cn/iXuVK su - oracleVim .bash_profile ## 文件中增加如下的别名设置 ---------------- alias sqlplusrlwrap sqlplus…

Android音量调节参考一

基于android 9平台分析。 在Android系统中&#xff0c;默认的设备(phone等)音量都是分开控制的&#xff0c;这些包括媒体、铃声、闹铃、蓝牙、通话通过音频流来区别不同的音量类型。每种流类型都定义最大音量、最小音量及默认音量&#xff0c;Android 9定了了11中音频流类型&am…

【MATLAB】MVMD信号分解+FFT+HHT组合算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 MVMD信号分解FFTHHT组合算法是一种强大的分析方法&#xff0c;结合了变分模态分解(MVMD)、快速傅里叶变换(FFT)和希尔伯特-黄变换(HHT)。 首先&#xff0c;使用MVMD将原始信号分解成多个…

几分钟在Ubuntu搭建本地Emlog博客网站并发布至公网无需购买域名服务器

文章目录 前言1. 网站搭建1.1 Emolog网页下载和安装1.2 网页测试1.3 cpolar的安装和注册 2. 本地网页发布2.1 Cpolar临时数据隧道2.2.Cpolar稳定隧道&#xff08;云端设置&#xff09;2.3.Cpolar稳定隧道&#xff08;本地设置&#xff09; 3. 公网访问测试总结 前言 博客作为使…

Python-封装配置文件

Code [url] baidu http://www.baidu.com[value] send_value 百度[server] ip 220.181.111.188封装的格式可以套用 # 封装,类似函数调用 import configparserclass ReadConfigIni():def __init__(self,filename):self.cf configparser.ConfigParser()self.cf.read(filenam…

分布式搜索引擎03

1.数据聚合 聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。例如: 什么品牌的手机最受欢迎? 这些手机的平均价格、最高价格、最低价格? 这些手机每月的销售情况如何? 实现这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现近…

[Linux] nginx配置的主配置文件

一、六个模块的作用 全局块&#xff1a;全局配置&#xff0c;对全局生效&#xff1b; events块&#xff1a;配置影响 Nginx 服务器与用户的网络连接&#xff1b; http块&#xff1a;配置代理&#xff0c;缓存&#xff0c;日志定义等绝大多数功能和第三方模块的配置&#xff1b;…

STM32 cubeMX 呼吸灯实验

文章代码使用 HAL 库。 文章目录 一、1.PWM原理二、LED 原理图三、使用cubemx 配置 led四、PWM 相关函数五、PWM占空比占空比计算六、PWM 呼吸灯重要代码总结 呼吸灯 一、1.PWM原理 PWM全称为脉冲宽度调制&#xff08;Pulse Width Modulation&#xff09;&#xff0c;是一种常…

【Linux】进程通信之命名管道mkfifo

1.认识命名管道 匿名管道应用的一个限制就是只能在具有共同祖先&#xff08;具有亲缘关系&#xff09;的进程间通信。如果我们想在不相关的进程之间交换数据&#xff0c;可以使用FIFO文件来做这项工作&#xff0c;它经常被称为命名管道。命名管道是一种特殊类型的文件 2.在命…

华清远见嵌入式学习——QT——作业1

作业要求&#xff1a; 代码&#xff1a; ①&#xff1a;头文件 #ifndef LOGIN_H #define LOGIN_H#include <QWidget> #include <QLineEdit> //行编辑器类 #include <QPushButton> //按钮类 #include <QLabel> //标签类 #include <QM…

(五) Python 代理模式

文章目录 5.1 代理模式概述5.1.1 代理介绍5.1.2 代理模式的作用 5.2 代理模式的UML类图5.3 了解不同类型的代理5.3.1虚拟代理5.3.2 远程代理5.3.3 保护代理5.3.4 智能代理 5.4 现实世界中的代理模式5.5 代理模式的优点5.6 门面模式和代理模式之间的比较 5.1 代理模式概述 5.1.…