- 一、相关API的handler
- 1、接受HTTP请求的hander(RestRefreshAction)
- 2、往数据节点发送刷新请求的action(TransportRefreshAction)
- 3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)
- 二、在IndexShard执行refresh操作
- 1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据
- (1)、maybeRefresh和maybeRefreshBlocking的简单介绍
- 三、lucene源码中执行逻辑
- 1、判断是否需要刷新
下面的图来自ElasticSearch——刷盘原理流程,这篇文章主要讲的是refresh命令把ES写入索引缓冲区的数据刷进Lucene,使数据可供查询,搜索,否则,在索引缓冲区是不可见的,不涉及到在
translog.log
和Lucene
的数据结构。
通过这个流程知道ES如何把索引缓冲区的数据刷进Lucene的,主要是下面左中部分refresh部分
其他部分源码梳理
1、主节点同时写入ES缓冲区和translog这一部分,请看Elasticsearch 8.9 Bulk批量给索引增加数据源码
2、下半边fsync的源码逻辑,请看Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码
一、相关API的handler
在ActionModule.java
中
registerHandler.accept(new RestRefreshAction());
actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);
actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);
1、接受HTTP请求的hander(RestRefreshAction)
public class RestRefreshAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return List.of(
new Route(GET, "/_refresh"),
new Route(POST, "/_refresh"),
new Route(GET, "/{index}/_refresh"),
new Route(POST, "/{index}/_refresh")
);
}
@Override
public String getName() {
return "refresh_action";
}
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index")));
refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions()));
return channel -> client.admin().indices().refresh(refreshRequest, new RestToXContentListener<RefreshResponse>(channel) {
@Override
protected RestStatus getStatus(RefreshResponse response) {
return response.getStatus();
}
});
}
}
client.admin().indices().refresh()
会执行到下面的父类TransportBroadcastReplicationAction
的doExecute
方法
2、往数据节点发送刷新请求的action(TransportRefreshAction)
public class TransportRefreshAction extends TransportBroadcastReplicationAction<
RefreshRequest,
RefreshResponse,
BasicReplicationRequest,
ReplicationResponse> {
@Inject
public TransportRefreshAction(
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
NodeClient client
) {
super(
RefreshAction.NAME,
RefreshRequest::new,
clusterService,
transportService,
client,
actionFilters,
indexNameExpressionResolver,
TransportShardRefreshAction.TYPE,
ThreadPool.Names.REFRESH
);
}
//省略代码
}
public abstract class TransportBroadcastReplicationAction<
Request extends BroadcastRequest<Request>,
Response extends BaseBroadcastResponse,
ShardRequest extends ReplicationRequest<ShardRequest>,
ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
clusterService.threadPool().executor(executor).execute(ActionRunnable.wrap(listener, createAsyncAction(task, request)));
}
private CheckedConsumer<ActionListener<Response>, Exception> createAsyncAction(Task task, Request request) {
return new CheckedConsumer<ActionListener<Response>, Exception>() {
private int totalShardCopyCount;
private int successShardCopyCount;
private final List<DefaultShardOperationFailedException> allFailures = new ArrayList<>();
@Override
public void accept(ActionListener<Response> listener) {
assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";
final ClusterState clusterState = clusterService.state();
final List<ShardId> shards = shards(request, clusterState);
final Map<String, IndexMetadata> indexMetadataByName = clusterState.getMetadata().indices();
try (var refs = new RefCountingRunnable(() -> finish(listener))) {
//遍历所有的分片
for (final ShardId shardId : shards) {
// NB This sends O(#shards) requests in a tight loop; TODO add some throttling here?
shardExecute(
task,
request,
shardId,
ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire())
);
}
}
}
};
}
protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
assert Transports.assertNotTransportThread("may hit all the shards");
ShardRequest shardRequest = newShardRequest(request, shardId);
shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener);
}
}
3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)
public class TransportShardRefreshAction extends TransportReplicationAction<
BasicReplicationRequest,
ShardRefreshReplicaRequest,
ReplicationResponse> {
private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);
public static final String NAME = RefreshAction.NAME + "[s]";
public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME, ReplicationResponse::new);
public static final String SOURCE_API = "api";
@Inject
public TransportShardRefreshAction(
Settings settings,
TransportService transportService,
ClusterService clusterService,
IndicesService indicesService,
ThreadPool threadPool,
ShardStateAction shardStateAction,
ActionFilters actionFilters
) {
super(
settings,
NAME,
transportService,
clusterService,
indicesService,
threadPool,
shardStateAction,
actionFilters,
BasicReplicationRequest::new,
ShardRefreshReplicaRequest::new,
ThreadPool.Names.REFRESH
);
// registers the unpromotable version of shard refresh action
new TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService);
}
@Override
protected void shardOperationOnPrimary(
BasicReplicationRequest shardRequest,
IndexShard primary,
ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener
) {
primary.externalRefresh(SOURCE_API, listener.delegateFailure((l, refreshResult) -> {
ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult);
replicaRequest.setParentTask(shardRequest.getParentTask());
logger.trace("{} refresh request executed on primary", primary.shardId());
l.onResponse(new PrimaryResult<>(replicaRequest, new ReplicationResponse()));
}));
}
}
primary.externalRefresh
执行分片的刷新
二、在IndexShard执行refresh操作
public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {
verifyNotClosed();
getEngine().externalRefresh(source, listener);
}
public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {
ActionListener.completeWith(listener, () -> {
logger.trace("external refresh with source [{}]", source);
return refresh(source);
});
}
getEngine()
的实现是InternalEngine
类
@Override
public RefreshResult refresh(String source) throws EngineException {
return refresh(source, SearcherScope.EXTERNAL, true);
}
1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据
protected final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
//这两种刷新类型都会导致内部刷新,但只有外部刷新类型也会将新的读取器引用传递给外部读取器管理器。
//获取当前的本地检查点。
final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
boolean refreshed;
long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;
try {
//refresh 不需要按住 readLock,因为如果引擎在中途关闭,ReferenceManager 可以正确处理。
if (store.tryIncRef()) {
try {
//尽管我们保留了 2 managers,但我们实际上只做过一次繁重的工作。第二次刷新只会做我们必须做的额外工作,以预热缓存等。
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();
//根据参数决定是进行阻塞刷新还是非阻塞刷新
if (block) {
//刷新可能会导致阻塞
referenceManager.maybeRefreshBlocking();
refreshed = true;
} else {
//刷新不会导致阻塞
refreshed = referenceManager.maybeRefresh();
}
//如果刷新成功,获取当前的读取器,并更新段的生成号
if (refreshed) {
//获取当前的目录
final ElasticsearchDirectoryReader current = referenceManager.acquire();
try {
//更新segment信息
segmentGeneration = Math.max(current.getIndexCommit().getGeneration(), generationBeforeRefresh);
} finally {
referenceManager.release(current);
}
}
} finally {
store.decRef();
}
if (refreshed) {
lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);
}
} else {
refreshed = false;
}
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (Exception e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh
: "refresh checkpoint was not advanced; "
+ "local_checkpoint="
+ localCheckpointBeforeRefresh
+ " refresh_checkpoint="
+ lastRefreshedCheckpoint();
// TODO: maybe we should just put a scheduled job in threadPool?
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
// for a long time:
maybePruneDeletes();
mergeScheduler.refreshConfig();
return new RefreshResult(refreshed, segmentGeneration);
}
其中referenceManager
根据入参是 SearcherScope.EXTERNAL
获得的实现是ExternalReaderManager
private final ExternalReaderManager externalReaderManager;
@Override
protected final ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope) {
return switch (scope) {
case INTERNAL -> internalReaderManager;
case EXTERNAL -> externalReaderManager;
};
}
根据入参中的block=true
实际执行的是referenceManager.maybeRefreshBlocking();
来刷新,是异步非阻塞的,
并且根据下图ExternalReaderManager
继承了ReferenceManager
,所以没有重写maybeRefreshBlocking
所以执行的是父类ReferenceManager
的
import org.apache.lucene.search.ReferenceManager;
@SuppressForbidden(reason = "reference counting is required here")
private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
@Override
protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
//省略代码
}
@Override
protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
return reference.tryIncRef();
}
@Override
protected int getRefCount(ElasticsearchDirectoryReader reference) {
return reference.getRefCount();
}
@Override
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
reference.decRef();
}
}
(1)、maybeRefresh和maybeRefreshBlocking的简单介绍
下面是lucene源码中关于这两个API的实现,
//这个是会尝试获取刷新锁,如果没有则不执行刷新操作
public final boolean maybeRefresh() throws IOException {
this.ensureOpen();
boolean doTryRefresh = this.refreshLock.tryLock();
if (doTryRefresh) {
try {
this.doMaybeRefresh();
} finally {
this.refreshLock.unlock();
}
}
return doTryRefresh;
}
//这里会等待获取刷新锁,所以会阻塞
public final void maybeRefreshBlocking() throws IOException {
this.ensureOpen();
this.refreshLock.lock();
try {
this.doMaybeRefresh();
} finally {
this.refreshLock.unlock();
}
}
但是实际上最后执行刷新还是执行的this.doMaybeRefresh()
方法
三、lucene源码中执行逻辑
private void doMaybeRefresh() throws IOException {
this.refreshLock.lock();
boolean refreshed = false;
try {
Object reference = this.acquire();
try {
//通知刷新监听器。
this.notifyRefreshListenersBefore();
//调用 refreshIfNeeded(reference) 返回一个新的引用 (newReference)
//用来判断是否需要刷新,如果不需要刷新,refreshIfNeeded 应返回 null
G newReference = this.refreshIfNeeded(reference);
if (newReference != null) {
assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
try {
//调用 swapReference(newReference) 方法来交换旧的引用为新的引用。
this.swapReference(newReference);
//设置 refreshed 为 true 表示刷新成功。
refreshed = true;
} finally {
//如果刷新失败,释放新的引用
if (!refreshed) {
this.release(newReference);
}
}
}
} finally {
//释放旧的引用
this.release(reference);
//通知刷新监听器刷新完成
this.notifyRefreshListenersRefreshed(refreshed);
}
this.afterMaybeRefresh();
} finally {
//最后释放刷新锁
this.refreshLock.unlock();
}
}
1、判断是否需要刷新
其中refreshIfNeeded
用的是子类ExternalReaderManager
的实现方法
private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
@Override
protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
internalReaderManager.maybeRefreshBlocking();
//获取其reader对象。
final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();
//isWarmedUp为false或者获取到的新reader对象与传入的referenceToRefresh对象不相等,说明需要刷新
if (isWarmedUp == false || newReader != referenceToRefresh) {
boolean success = false;
try {
refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);
isWarmedUp = true;
success = true;
} finally {
if (success == false) {
internalReaderManager.release(newReader);
}
}
}
//没有任何变化 - 两个 ref 管理器共享同一个实例,因此我们可以使用引用相等性,不需要执行刷新操作
if (referenceToRefresh == newReader) {
internalReaderManager.release(newReader);
return null;
} else {
return newReader; // steal the reference
}
}
}