Elasticsearch 8.9 refresh刷Es缓冲区的数据到Lucene,更新segemnt,使数据可见

news2025/1/19 17:17:25

  • 一、相关API的handler
    • 1、接受HTTP请求的hander(RestRefreshAction)
    • 2、往数据节点发送刷新请求的action(TransportRefreshAction)
    • 3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)
  • 二、在IndexShard执行refresh操作
    • 1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据
      • (1)、maybeRefresh和maybeRefreshBlocking的简单介绍
  • 三、lucene源码中执行逻辑
    • 1、判断是否需要刷新

下面的图来自ElasticSearch——刷盘原理流程,这篇文章主要讲的是refresh命令把ES写入索引缓冲区的数据刷进Lucene,使数据可供查询,搜索,否则,在索引缓冲区是不可见的,不涉及到在translog.logLucene的数据结构。
通过这个流程知道ES如何把索引缓冲区的数据刷进Lucene的,主要是下面左中部分refresh部分

在这里插入图片描述

其他部分源码梳理
1、主节点同时写入ES缓冲区和translog这一部分,请看Elasticsearch 8.9 Bulk批量给索引增加数据源码
2、下半边fsync的源码逻辑,请看Elasticsearch 8.9 flush刷新缓存中的数据到磁盘源码

一、相关API的handler

ActionModule.java

 registerHandler.accept(new RestRefreshAction());
 actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class);
 actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);

1、接受HTTP请求的hander(RestRefreshAction)

public class RestRefreshAction extends BaseRestHandler {

    @Override
    public List<Route> routes() {
        return List.of(
            new Route(GET, "/_refresh"),
            new Route(POST, "/_refresh"),
            new Route(GET, "/{index}/_refresh"),
            new Route(POST, "/{index}/_refresh")
        );
    }

    @Override
    public String getName() {
        return "refresh_action";
    }

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        RefreshRequest refreshRequest = new RefreshRequest(Strings.splitStringByCommaToArray(request.param("index")));
        refreshRequest.indicesOptions(IndicesOptions.fromRequest(request, refreshRequest.indicesOptions()));
        return channel -> client.admin().indices().refresh(refreshRequest, new RestToXContentListener<RefreshResponse>(channel) {
            @Override
            protected RestStatus getStatus(RefreshResponse response) {
                return response.getStatus();
            }
        });
    }
}

client.admin().indices().refresh()会执行到下面的父类TransportBroadcastReplicationActiondoExecute方法

2、往数据节点发送刷新请求的action(TransportRefreshAction)

public class TransportRefreshAction extends TransportBroadcastReplicationAction<
    RefreshRequest,
    RefreshResponse,
    BasicReplicationRequest,
    ReplicationResponse> {

    @Inject
    public TransportRefreshAction(
        ClusterService clusterService,
        TransportService transportService,
        ActionFilters actionFilters,
        IndexNameExpressionResolver indexNameExpressionResolver,
        NodeClient client
    ) {
        super(
            RefreshAction.NAME,
            RefreshRequest::new,
            clusterService,
            transportService,
            client,
            actionFilters,
            indexNameExpressionResolver,
            TransportShardRefreshAction.TYPE,
            ThreadPool.Names.REFRESH
        );
    }

   //省略代码
}
public abstract class TransportBroadcastReplicationAction<
    Request extends BroadcastRequest<Request>,
    Response extends BaseBroadcastResponse,
    ShardRequest extends ReplicationRequest<ShardRequest>,
    ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {
 @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
        clusterService.threadPool().executor(executor).execute(ActionRunnable.wrap(listener, createAsyncAction(task, request)));
    }

    private CheckedConsumer<ActionListener<Response>, Exception> createAsyncAction(Task task, Request request) {
        return new CheckedConsumer<ActionListener<Response>, Exception>() {

            private int totalShardCopyCount;
            private int successShardCopyCount;
            private final List<DefaultShardOperationFailedException> allFailures = new ArrayList<>();

            @Override
            public void accept(ActionListener<Response> listener) {
                assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";

                final ClusterState clusterState = clusterService.state();
                final List<ShardId> shards = shards(request, clusterState);
                final Map<String, IndexMetadata> indexMetadataByName = clusterState.getMetadata().indices();

                try (var refs = new RefCountingRunnable(() -> finish(listener))) {
                //遍历所有的分片
                    for (final ShardId shardId : shards) {
                        // NB This sends O(#shards) requests in a tight loop; TODO add some throttling here?
                        shardExecute(
                            task,
                            request,
                            shardId,
                            ActionListener.releaseAfter(new ReplicationResponseActionListener(shardId, indexMetadataByName), refs.acquire())
                        );
                    }
                }
            }
        };
    }

    protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
        assert Transports.assertNotTransportThread("may hit all the shards");
        ShardRequest shardRequest = newShardRequest(request, shardId);
        shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
        client.executeLocally(replicatedBroadcastShardAction, shardRequest, shardActionListener);
    }

}    

3、数据节点接收主节点refresh传输的action(TransportShardRefreshAction)

public class TransportShardRefreshAction extends TransportReplicationAction<
    BasicReplicationRequest,
    ShardRefreshReplicaRequest,
    ReplicationResponse> {

    private static final Logger logger = LogManager.getLogger(TransportShardRefreshAction.class);

    public static final String NAME = RefreshAction.NAME + "[s]";
    public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME, ReplicationResponse::new);
    public static final String SOURCE_API = "api";

    @Inject
    public TransportShardRefreshAction(
        Settings settings,
        TransportService transportService,
        ClusterService clusterService,
        IndicesService indicesService,
        ThreadPool threadPool,
        ShardStateAction shardStateAction,
        ActionFilters actionFilters
    ) {
        super(
            settings,
            NAME,
            transportService,
            clusterService,
            indicesService,
            threadPool,
            shardStateAction,
            actionFilters,
            BasicReplicationRequest::new,
            ShardRefreshReplicaRequest::new,
            ThreadPool.Names.REFRESH
        );
        // registers the unpromotable version of shard refresh action
        new TransportUnpromotableShardRefreshAction(clusterService, transportService, shardStateAction, actionFilters, indicesService);
    }

  
    @Override
    protected void shardOperationOnPrimary(
        BasicReplicationRequest shardRequest,
        IndexShard primary,
        ActionListener<PrimaryResult<ShardRefreshReplicaRequest, ReplicationResponse>> listener
    ) {
        primary.externalRefresh(SOURCE_API, listener.delegateFailure((l, refreshResult) -> {
            ShardRefreshReplicaRequest replicaRequest = new ShardRefreshReplicaRequest(shardRequest.shardId(), refreshResult);
            replicaRequest.setParentTask(shardRequest.getParentTask());
            logger.trace("{} refresh request executed on primary", primary.shardId());
            l.onResponse(new PrimaryResult<>(replicaRequest, new ReplicationResponse()));
        }));
    }
}    

primary.externalRefresh执行分片的刷新

二、在IndexShard执行refresh操作

 public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {
        verifyNotClosed();
        getEngine().externalRefresh(source, listener);
    }
 public void externalRefresh(String source, ActionListener<Engine.RefreshResult> listener) {
        ActionListener.completeWith(listener, () -> {
            logger.trace("external refresh with source [{}]", source);
            return refresh(source);
        });
    }   

getEngine()的实现是InternalEngine

  @Override
    public RefreshResult refresh(String source) throws EngineException {
        return refresh(source, SearcherScope.EXTERNAL, true);
    }

1、根据入参决定是使用lucene提供的阻塞还是非阻塞API刷新数据

   protected final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
        //这两种刷新类型都会导致内部刷新,但只有外部刷新类型也会将新的读取器引用传递给外部读取器管理器。
        //获取当前的本地检查点。
        final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
        boolean refreshed;
        long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;
        try {
            //refresh 不需要按住 readLock,因为如果引擎在中途关闭,ReferenceManager 可以正确处理。
            if (store.tryIncRef()) {
                try {
                    //尽管我们保留了 2 managers,但我们实际上只做过一次繁重的工作。第二次刷新只会做我们必须做的额外工作,以预热缓存等。
                    ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
                    long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();
                    //根据参数决定是进行阻塞刷新还是非阻塞刷新
                    if (block) { 
                        //刷新可能会导致阻塞
                        referenceManager.maybeRefreshBlocking();
                        refreshed = true;
                    } else {
                    	//刷新不会导致阻塞
                        refreshed = referenceManager.maybeRefresh();
                    }
                    //如果刷新成功,获取当前的读取器,并更新段的生成号
                    if (refreshed) {
                    	//获取当前的目录
                        final ElasticsearchDirectoryReader current = referenceManager.acquire();
                        try {
                            //更新segment信息
                            segmentGeneration = Math.max(current.getIndexCommit().getGeneration(), generationBeforeRefresh);
                        } finally {
                            referenceManager.release(current);
                        }
                    }
                } finally {
                    store.decRef();
                }
                if (refreshed) {
                    lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);
                }
            } else {
                refreshed = false;
            }
        } catch (AlreadyClosedException e) {
            failOnTragicEvent(e);
            throw e;
        } catch (Exception e) {
            try {
                failEngine("refresh failed source[" + source + "]", e);
            } catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw new RefreshFailedEngineException(shardId, e);
        }
        assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh
            : "refresh checkpoint was not advanced; "
                + "local_checkpoint="
                + localCheckpointBeforeRefresh
                + " refresh_checkpoint="
                + lastRefreshedCheckpoint();
        // TODO: maybe we should just put a scheduled job in threadPool?
        // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
        // for a long time:
        maybePruneDeletes();
        mergeScheduler.refreshConfig();
        return new RefreshResult(refreshed, segmentGeneration);
    }

其中referenceManager 根据入参是 SearcherScope.EXTERNAL 获得的实现是ExternalReaderManager

    private final ExternalReaderManager externalReaderManager;
  @Override
    protected final ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope) {
        return switch (scope) {
            case INTERNAL -> internalReaderManager;
            case EXTERNAL -> externalReaderManager;
        };
    }

根据入参中的block=true 实际执行的是referenceManager.maybeRefreshBlocking(); 来刷新,是异步非阻塞的,
并且根据下图ExternalReaderManager继承了ReferenceManager,所以没有重写maybeRefreshBlocking 所以执行的是父类ReferenceManager

import org.apache.lucene.search.ReferenceManager;

 @SuppressForbidden(reason = "reference counting is required here")
    private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
       
        @Override
        protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
           //省略代码
        }

        @Override
        protected boolean tryIncRef(ElasticsearchDirectoryReader reference) {
            return reference.tryIncRef();
        }

        @Override
        protected int getRefCount(ElasticsearchDirectoryReader reference) {
            return reference.getRefCount();
        }

        @Override
        protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
            reference.decRef();
        }
    }

(1)、maybeRefresh和maybeRefreshBlocking的简单介绍

下面是lucene源码中关于这两个API的实现,

//这个是会尝试获取刷新锁,如果没有则不执行刷新操作
  public final boolean maybeRefresh() throws IOException {
        this.ensureOpen();
        boolean doTryRefresh = this.refreshLock.tryLock();
        if (doTryRefresh) {
            try {
                this.doMaybeRefresh();
            } finally {
                this.refreshLock.unlock();
            }
        }

        return doTryRefresh;
    }
	//这里会等待获取刷新锁,所以会阻塞
    public final void maybeRefreshBlocking() throws IOException {
        this.ensureOpen();
        this.refreshLock.lock();

        try {
            this.doMaybeRefresh();
        } finally {
            this.refreshLock.unlock();
        }

    }

但是实际上最后执行刷新还是执行的this.doMaybeRefresh() 方法

三、lucene源码中执行逻辑

private void doMaybeRefresh() throws IOException {
        this.refreshLock.lock();
        boolean refreshed = false;

        try {
            Object reference = this.acquire();

            try {
            	//通知刷新监听器。
                this.notifyRefreshListenersBefore();
                //调用 refreshIfNeeded(reference) 返回一个新的引用 (newReference)
                //用来判断是否需要刷新,如果不需要刷新,refreshIfNeeded 应返回 null
                G newReference = this.refreshIfNeeded(reference);
                if (newReference != null) {
                    assert newReference != reference : "refreshIfNeeded should return null if refresh wasn't needed";
                    try {
                    //调用 swapReference(newReference) 方法来交换旧的引用为新的引用。
                        this.swapReference(newReference);
                     //设置 refreshed 为 true 表示刷新成功。   
                        refreshed = true;
                    } finally {
                    //如果刷新失败,释放新的引用
                        if (!refreshed) {
                            this.release(newReference);
                        }
                    }
                }
            } finally {
                //释放旧的引用
                this.release(reference);
                //通知刷新监听器刷新完成
                this.notifyRefreshListenersRefreshed(refreshed);
            }

            this.afterMaybeRefresh();
        } finally {
        	//最后释放刷新锁
            this.refreshLock.unlock();
        }

    }

1、判断是否需要刷新

其中refreshIfNeeded用的是子类ExternalReaderManager的实现方法

private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
 		@Override
        protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
            internalReaderManager.maybeRefreshBlocking();
             //获取其reader对象。
            final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();
            //isWarmedUp为false或者获取到的新reader对象与传入的referenceToRefresh对象不相等,说明需要刷新
            if (isWarmedUp == false || newReader != referenceToRefresh) {
                boolean success = false;
                try {
                    refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);
                    isWarmedUp = true;
                    success = true;
                } finally {
                    if (success == false) {
                        internalReaderManager.release(newReader);
                    }
                }
            }
            //没有任何变化 - 两个 ref 管理器共享同一个实例,因此我们可以使用引用相等性,不需要执行刷新操作
            if (referenceToRefresh == newReader) {
                internalReaderManager.release(newReader);
                return null;
            } else {
                return newReader; // steal the reference
            }
        }
}        

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1298767.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Http请求(bug)——路径变量传参遇到特殊符号的问题 URL中的#,?,符号作用

前言 本篇博客分析路径变量传参遇到特殊符号的问题&#xff0c;阐述了URL中的#&#xff0c;&#xff1f;&#xff0c;&符号作用。 目录 前言引出路径变量传参遇到特殊符号的问题问题描述问题分析 URL中的 #&#xff0c;&#xff1f;&#xff0c;&符号的作用URL中# 的作…

【探索Linux】—— 强大的命令行工具 P.21(多线程 | 线程同步 | 条件变量 | 线程安全)

阅读导航 引言一、线程同步1. 竞态条件的概念2. 线程同步的概念 二、条件变量1. 条件变量函数⭕使用前提&#xff08;1&#xff09;初始化条件变量&#xff08;2&#xff09;等待条件满足&#xff08;3&#xff09;唤醒等待pthread_cond_broadcast()pthread_cond_signal() &…

Qexo博客后台管理部署

Qexo博客后台管理部署 个人主页 个人博客 参考文档 https://www.oplog.cn/qexo/本地部署 采用本地Docker部署管理本地Hexo 下载代码包 若无法下载使用科学工具下载到本地在上传到服务器 wget https://github.com/Qexo/Qexo/archive/refs/tags/3.0.1.zip# 解压 unzip Qexo…

SQL命令---修改字段的排列位置

介绍 使用sql语句表字段的排列顺序。 命令 alter table 表名 modify 字段名1 数据类型 first|after 字段名2;例子 将a表中的age字段改为表的第一个字段。 alter table a modify age int(12) first;下面是执行命令后的表结构&#xff1a; 将a表中的age字段放到name字段之…

【linux】查看CPU和内存信息

之前咱们一起学习了查看内存的和CPU的命令。 ​mpstat &#xff1a; 【linux】 mpstat 使用 uptime&#xff1a;【Linux】 uptime命令使用 CPU的使用率&#xff1a;【linux】查看CPU的使用率 nmon &#xff1a;【linux】nmon 工具使用 htop &#xff1a;【linux】htop 命令…

学习Linux(2)-学习Linux命令

Linux目录结构 Linux目录结构-菜鸟教程 /bin&#xff1a;bin 是 Binaries (二进制文件) 的缩写, 这个目录存放着最经常使用的命令。 /boot&#xff1a;这里存放的是启动 Linux 时使用的一些核心文件&#xff0c;包括一些连接文件以及镜像文件。 /dev &#xff1a;dev 是 De…

Cocos Creator:创建棋盘

Cocos Creator&#xff1a;创建棋盘 创建地图三部曲&#xff1a;1. 创建layout组件2. 创建预制体Prefab&#xff0c;做好精灵贴图&#xff1a;3. 创建脚本LayoutSprite.ts收尾工作&#xff1a; 创建地图三部曲&#xff1a; 1. 创建layout组件 使用layout进行布局&#xff0c;…

数据表记录的操作

一、数据添加 1、打开SSMS&#xff0c;附加数据库&#xff08;数据库文件在自己的文件夹下面&#xff09;&#xff0c;并进行下面的设置&#xff1a; &#xff08;1&#xff09;设置“部门信息”表中的“编号”为主键&#xff08;SSMS&#xff09; 首先建立好所需的数据库库…

HNU计算机视觉作业三

前言 选修的是蔡mj老师的计算机视觉&#xff0c;上课还是不错的&#xff0c;但是OpenCV可能需要自己学才能完整把作业写出来。由于没有认真学&#xff0c;这门课最后混了80多分&#xff0c;所以下面作业解题过程均为自己写的&#xff0c;并不是标准答案&#xff0c;仅供参考 …

单臂路由与三层交换机

单臂路由 划分VLAN后同一VLAN的计算机属于同一个广播域&#xff0c;同一VLAN的计算机之间的通信是不成问题的。然而&#xff0c;处于不同VLAN的计算机即使是在同一交换机上&#xff0c;它们之间的通信也必须使用路由器。 图&#xff08;a&#xff09;是一种实现VLAN间路由的方…

ubuntu上搭建bazel编译环境,构建Android APP

背景是github上下载的工程&#xff0c;说明仅支持bazel编译&#xff0c;折腾了一天Android studio&#xff0c;失败。 不得不尝试单价bazel编译环境&#xff0c;并不复杂&#xff0c;过程记录如下 说明&#xff1a;ubuntu环境是20.04&#xff0c;pve虚拟机安装 1.安装jdk sudo…

docker-compose安装教程

1.确认docker-compose是否安装 docker-compose -v如上图所示表示未安装&#xff0c;需要安装。 如上图所示表示已经安装&#xff0c;不需要再安装&#xff0c;如果觉得版本低想升级&#xff0c;也可以继续安装。 2.离线安装 下载docker-compose安装包&#xff0c;上传到服务…

如何将html网页免费转为excel?

一、直接复制。 直接复制是最简单有效、快捷的解决方案&#xff0c;操作方法如下&#xff1a; 1、用鼠标像平常复制文本一样&#xff0c;将整个网页表格选中。 2、点击右键&#xff0c;点击“复制”。 3、打开excel软件&#xff0c;鼠标点击任意单元格。 4、点击右键&#…

leetcode7 移除列表中特定元素

给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 不要使用额外的数组空间&#xff0c;你必须仅使用 O(1) 额外空间并 原地 修改输入数组。 元素的顺序可以改变。你不需要考虑数组中超出新长度后面…

【Cisco Packet Tracer】路由器 NAT实验

NAT的实现方式有三种&#xff0c;即静态转换Static Nat、动态转换Dynamic Nat和端口多路复用OverLoad。 静态转换是指内部本地地址一对一转换成内部全局地址&#xff0c;相当内部本地的每一台PC都绑定了一个全局地址。一般用于在内网中对外提供服务的服务器。 [3] 动态转换是指…

springboot+java医保付费绩效管理平台ssm

随着社会的飞速发展&#xff0c;特别是信息技术的迅猛发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势。当然&#xff0c;也不能排除医保付费及绩效管理行业。随着网络技术的不断成熟&#xff0c;医保付费及绩效管理的发展得到了促…

Wireshark添加自定义协议解析

最终效果如下&#xff1a; 参考文档&#xff1a;https://mika-s.github.io/topics/ 此参考文档中7个例子教我们如何编写lua脚本去识别我们自定义的协议 安装Wireshark https://www.wireshark.org/上下载安装包安装即可。我的安装路径是D:\Install\Wireshark&#xff0c;在W…

nodejs+vue+微信小程序+python+PHP北京地铁票务APP-计算机毕业设计推荐 -安卓

根据现实中在北京地铁票务方面的需求&#xff0c;并对该系统进行了仔细的研究&#xff0c;将系统权限按照管理者和用户这两种类型进行了区分。 &#xff08;1&#xff09;用户功能需求   用户进入APP可以进行系统首页、地铁线路、我的等操作&#xff0c;在我的页面可以对我的…

解释Spring中一个bean的注入过程

目录 1、定义Bean&#xff1a; XML配置方式&#xff1a; 2、注入方式&#xff1a; 构造器注入&#xff08;Constructor Injection&#xff09;&#xff1a; Setter方法注入&#xff08;Setter Injection&#xff09;&#xff1a; 字段注入&#xff08;Field Injection&…

Linux数据库修改密码的三种方式

1、正常修改密码 [rootzaotounan ~]# mysqladmin -uroot -p原密码 password 新密码 2、忘记mysql root用户密码 更改 vim /etc/my.cnf //进入my.cnf文件 skip-grant-tables //添加语句&#xff0c;跳过授权表 systemctl restar…