Elasticsearch 8.9 Master节点处理请求源码

news2025/1/23 11:27:55

大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1)

在这里插入图片描述

这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上

  • 一、Master节点处理请求的逻辑
    • 1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
    • 2、Master节点处理来自客户端的请求(以创建索引请求举例)
      • (1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
      • (2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
  • 二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
    • 1、首先通过nodeClient执行doExecute()
    • 2、创建一个task任务异步执行TransportAction
    • 3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
    • 4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引

一、Master节点处理请求的逻辑

不是所有的请求都需要Master节点处理,但是有些请求必须让Master节点处理,比如创建index,下面的3就是用创建索引做的示例

1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest

主节点在 Elasticsearch 集群中负责集群的管理和协调工作。当节点需要执行某些操作时,它将创建相应的 MasterNodeRequest 实现类的实例,填充请求的参数和数据,并将其发送给主节点。主节点根据不同的 MasterNodeRequest 实现类的类型,执行相应的操作

/**
 * A based request for master based operation.
 * 在master上
 */
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest {

    public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);

    protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;

    protected MasterNodeRequest() {}

    protected MasterNodeRequest(StreamInput in) throws IOException {
        super(in);
        masterNodeTimeout = in.readTimeValue();
    }

    @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        out.writeTimeValue(masterNodeTimeout);
    }

    /**
     * A timeout value in case the master has not been discovered yet or disconnected.
     */
    @SuppressWarnings("unchecked")
    public final Request masterNodeTimeout(TimeValue timeout) {
        this.masterNodeTimeout = timeout;
        return (Request) this;
    }

    /**
     * A timeout value in case the master has not been discovered yet or disconnected.
     */
    public final Request masterNodeTimeout(String timeout) {
        return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout"));
    }

    public final TimeValue masterNodeTimeout() {
        return this.masterNodeTimeout;
    }
}

这里有点模糊,后面学到数据节点向主节点请求或者同步什么时,我再挂个链接

2、Master节点处理来自客户端的请求(以创建索引请求举例)

(1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)

至于请求如何找到RestCreateIndexAction的,可以参考Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码

@ServerlessScope(Scope.PUBLIC)
public class RestCreateIndexAction extends BaseRestHandler {
 	//省略代码  
    @Override
    public List<Route> routes() {
        return List.of(new Route(PUT, "/{index}"));
    }

    @Override
    public String getName() {
        return "create_index_action";
    }

    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        CreateIndexRequest createIndexRequest;
        if (request.getRestApiVersion() == RestApiVersion.V_7) {
            createIndexRequest = prepareRequestV7(request);
        } else {
            createIndexRequest = prepareRequest(request);
        }
        return channel -> client.admin().indices().create(createIndexRequest, new RestToXContentListener<>(channel));
    }
  //省略代码  
}    

(2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法

TransportMasterNodeAction 主要用于处理来自节点的各种管理操作请求,如创建索引、删除索引、更新集群设置等。
当节点(数据节点)发送请求到主节点时,请求会被传递给相应的 TransportMasterNodeAction 实现类进行处理。实现类会根据请求的类型,执行相应的操作逻辑,并返回执行结果给主节点。


/**
 * 需要在主节点上执行的操作的基类。
 * A base class for operations that needs to be performed on the master node.
 *
 */
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extends
    HandledTransportAction<Request, Response>
    implements
        ActionWithReservedState<Request> {
   //省略代码     

}
/**
 * 创建索引操作
 */
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {
    @Override
    protected void masterOperation(
        Task task,
        final CreateIndexRequest request,
        final ClusterState state,
        final ActionListener<CreateIndexResponse> listener
    ) {
    //省略代码
     createIndexService.createIndex(
            updateRequest,
            listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName))
        );
    }
}    

二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)

这个场景为主节点和数据节点分离的情况

1、首先通过nodeClient执行doExecute()

client.admin().indices().createcreate方法调用IndicesAdmin类的create方法,再调用execute方法的入参是 CreateIndexAction.INSTANCE

static class IndicesAdmin implements IndicesAdminClient {
	  @Override
      public void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {
            execute(CreateIndexAction.INSTANCE, request, listener);
	 }

	
	 @Override
	public <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(
	    ActionType<Response> action,
	    Request request
	) {
	    return client.execute(action, request);
	 }
	
}

调用的是AbstractClientexecute方法

  /**
     * This is the single execution point of *all* clients.
     * 这是所有客户端的单个执行点。
     */
    @Override
    public final <Request extends ActionRequest, Response extends ActionResponse> void execute(
        ActionType<Response> action,
        Request request,
        ActionListener<Response> listener
    ) {
        try {
            doExecute(action, request, listener);
        } catch (Exception e) {
            assert false : new AssertionError(e);
            listener.onFailure(e);
        }
    }

doExecute方法调用的是NodeClient类的方法

  @Override
    public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
        ActionType<Response> action,
        Request request,
        ActionListener<Response> listener
    ) {
        // Discard the task because the Client interface doesn't use it.
        try {
            executeLocally(action, request, listener);
        } catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {
            listener.onFailure(e);
        }
    }
 	/**
     *在本地执行 {@link ActionType},返回用于跟踪它的 {@link Task},并链接 {@link ActionListener}。如果在侦听响应时不需要访问任务,则首选此方法。这是用于实现 {@link 客户端} 接口的方法。
     */
    public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(
        ActionType<Response> action,
        Request request,
        ActionListener<Response> listener
    ) {
    	//注册并执行任务
        return taskManager.registerAndExecute(
            "transport",
            transportAction(action),
            request,
            localConnection,
            new SafelyWrappedActionListener<>(listener)
        );
    }   

之后调用TaskManager.java的方法

2、创建一个task任务异步执行TransportAction

public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(
        String type,
        TransportAction<Request, Response> action,
        Request request,
        Transport.Connection localConnection,
        ActionListener<Response> taskListener
    ) { 
        //检查请求是否有父任务,如果有,则注册子连接。
        final Releasable unregisterChildNode;
        if (request.getParentTask().isSet()) {
            unregisterChildNode = registerChildConnection(request.getParentTask().getId(), localConnection);
        } else {
            unregisterChildNode = null;
        }
        //创建一个新的跟踪上下文
        try (var ignored = threadPool.getThreadContext().newTraceContext()) {
            final Task task;
            //注册一个任务,并捕获可能的取消任务异常。
            try {
                task = register(type, action.actionName, request);
            } catch (TaskCancelledException e) {
                Releasables.close(unregisterChildNode);
                throw e;
            }
            //执行操作,并在操作完成时调用相应的监听器。
            action.execute(task, request, new ActionListener<>() {
                @Override
                public void onResponse(Response response) {
                    try {
                        release();
                    } finally {
                        taskListener.onResponse(response);
                    }
                }
                //根据操作的成功或失败情况,取消子任务并释放资源。
                @Override
                public void onFailure(Exception e) {
                    try {
                        if (request.getParentTask().isSet()) {
                            cancelChildLocal(request.getParentTask(), request.getRequestId(), e.toString());
                        }
                        release();
                    } finally {
                        taskListener.onFailure(e);
                    }
                }

                @Override
                public String toString() {
                    return this.getClass().getName() + "{" + taskListener + "}{" + task + "}";
                }

                private void release() {
                    Releasables.close(unregisterChildNode, () -> unregister(task));
                }
            });
            //返回任务对象。
            return task;
        }
    }

下面是TransportAction.java类中的方法

    /**
     * Use this method when the transport action should continue to run in the context of the current task
     * 当传输操作应继续在当前任务的上下文中运行时,请使用此方法
     */
    public final void execute(Task task, Request request, ActionListener<Response> listener) {
        final ActionRequestValidationException validationException;
        //对请求进行验证,如果验证过程中出现异常,则记录错误日志并通知监听器执行失败。
        try {
            validationException = request.validate();
        } catch (Exception e) {
            assert false : new AssertionError("validating of request [" + request + "] threw exception", e);
            logger.warn("validating of request [" + request + "] threw exception", e);
            listener.onFailure(e);
            return;
        }
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }
        //检查是否存在任务且请求需要存储结果,如果满足条件,则创建一个TaskResultStoringActionListener实例,用于在任务完成后将结果存储起来。
        if (task != null && request.getShouldStoreResult()) {
            listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
        }
        //创建一个请求过滤器链(RequestFilterChain),然后调用proceed方法,将任务、动作名称、请求和监听器传递给过滤器链进行处理。
        RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
        requestFilterChain.proceed(task, actionName, request, listener);
    }
 @Override
        public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
            int i = index.getAndIncrement();
            try {
                if (i < this.action.filters.length) {
                    this.action.filters[i].apply(task, actionName, request, listener, this);
                } else if (i == this.action.filters.length) {
                //`this.action.doExecute(task, request, listener);` 中`action`对应的是`TransportMasterNodeAction`。
                    this.action.doExecute(task, request, listener);
                } else {
                    listener.onFailure(new IllegalStateException("proceed was called too many times"));
                }
            } catch (Exception e) {
                logger.trace("Error during transport action execution.", e);
                listener.onFailure(e);
            }
        }

this.action.doExecute(task, request, listener);action对应的是TransportMasterNodeAction

3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法

TransportMasterNodeAction继承HandledTransportAction
HandledTransportAction继承自TransportAction

public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extends
    HandledTransportAction<Request, Response>
    implements
        ActionWithReservedState<Request> {
 @Override
    protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
       //省略代码
        new AsyncSingleAction(task, request, listener).doStart(state);
    }
}    
 protected void doStart(ClusterState clusterState) {
	  threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l -> executeMasterOperation(task, request, clusterState, l)));
  }
private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
        throws Exception {
       //调用子类实现
        masterOperation(task, request, state, listener);
    }
//子类实现   
protected abstract void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
        throws Exception;
   

4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引

其中创建索引的actionTransportCreateIndexAction

 @Override
    protected void masterOperation(
        Task task,
        final CreateIndexRequest request,
        final ClusterState state,
        final ActionListener<CreateIndexResponse> listener
    ) {
	createIndexService.createIndex(
            updateRequest,
            listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName))
        );
}

之后调用createIndexService.createIndex创建索引

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1119518.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

H5随机短视频滑动版带打赏源码,可封装APP软件或嵌入式观看

H5随机短视频滑动版带打赏源码&#xff0c;可封装APP软件或嵌入式观看&#xff0c;网站引流必备源码&#xff01; 数据来源抖音和快手官方短视频链接&#xff0c;无任何违规内容&#xff01;可自行添加广告等等&#xff01; 手机端完美支持滑动屏幕观看&#xff08;向上或向右…

STM32不使用 cubeMX实现外部中断

这篇文章将介绍如何不使用 cubeMX完成外部中断的配置和实现。 文章目录 前言一、文件加入工程二、代码解析exti.cexti.hmain.c 注意&#xff1a;总结 前言 实验开发板&#xff1a;STM32F103C8T6。所需软件&#xff1a;keil5 &#xff0c; cubeMX 。实验目的&#xff1a;如何不…

【【萌新的FPGA学习之分频器的介绍】】

萌新的FPGA学习之分频器的介绍 分频器的介绍 分频就是生成一个新时钟&#xff0c;该新时钟的频率是原有时钟频率的整数分之一倍&#xff0c;新周期是原有周期的整数倍。再简单来说&#xff0c;让你手撕一个四分频电路&#xff0c;就是写代码生成一个周期是原来四倍的时钟&…

PMP的智慧(2) - 系统性思考及复杂性

PMP的智慧(2) - 系统性思考及复杂性 在2021年推出的第七版《管理专业知识体系指南》中&#xff0c;PMI在传统的过程和ITTO的基础上&#xff0c;重新增加了12大项目管理原则。 管家式管理 stewardship团队 team干系人 stakeholders价值 value系统思考 system thinking领导力 l…

Excel函数中单元格的引用方式

如下图在D列第一行输入sum(A1:C1)&#xff1b; 回车之后结果如下&#xff1b;先要输入等号&#xff0c;然后输入sum&#xff0c;以及左括号&#xff0c;这是调用了sum求和函数&#xff1b; A1表示A列第一行&#xff0c;C1表示C列第一行&#xff1b; A1:C1&#xff0c;中间是冒号…

实践DDD模拟电商系统总结

目录 一、事件风暴 二、系统用例 三、领域上下文 四、架构设计 &#xff08;一&#xff09;六边形架构 &#xff08;二&#xff09;系统分层 五、系统实现 &#xff08;一&#xff09;项目结构 &#xff08;二&#xff09;提交订单功能实现 &#xff08;三&#xff0…

【Objective-C】浅析Block及其捕获机制

目录 Block的基本使用Block的声明Block的实现Block的调用 Block作为形参使用Block作为属性使用给Block起别名Block的copy Block的捕获机制auto类型的局部变量__block浅析static类型的局部变量全局变量 其他问题 Block的基本使用 什么是Block&#xff1f; Block &#xff08;块…

【深度学习实验】循环神经网络(五):基于GRU的语言模型训练(包括自定义门控循环单元GRU)

文章目录 一、实验介绍二、实验环境1. 配置虚拟环境2. 库版本介绍 三、实验内容&#xff08;一&#xff09;自定义门控循环单元&#xff08;GRU&#xff0c;Gated Recurrent Unit&#xff09;1. get_params2. init_gru_state3. gru &#xff08;二&#xff09;创建模型0. 超参数…

[AUTOSAR][诊断管理][$11] 复位服务

文章目录 一、简介(1) 应用场景&#xff08;2&#xff09; 请求格式&#xff08;3&#xff09; 重启类型 二、示例代码(1) 11_ecu_reset.c 一、简介 ECU复位服务就是可以此诊断指令来命令ECU执行自复位&#xff0c;复位有多种形式&#xff0c;依据子功能参数来区分&#xff08…

【Javascript】构造函数之new的作用

目录 new的作用 把对象返回了回来 无new 有new 把构造函数的this指向了要返回的对象 无new​编辑 有new new的执行流程 new的作用 创建了新空对象将构造函数的作用域赋值给新对象(this指向新对象)执行构造函数代码 &#xff08;为这个新对象添加属性&#xff09;返回新对…

网络协议--Ping程序

7.1 引言 “ping”这个名字源于声纳定位操作。Ping程序由Mike Muuss编写&#xff0c;目的是为了测试另一台主机是否可达。该程序发送一份ICMP回显请求报文给主机&#xff0c;并等待返回ICMP回显应答&#xff08;图6-3列出了所有的ICMP报文类型&#xff09;。 一般来说&#x…

HTML+CSS+JS+Django 实现前后端分离的科学计算器、利率计算器

&#x1f9ee;前后端分离计算器 &#x1f4da;git仓库链接和代码规范链接&#x1f4bc;PSP表格&#x1f387;成品展示&#x1f3c6;&#x1f3c6;科学计算器&#xff1a;1. 默认界面与页面切换2. 四则运算、取余、括号3. 清零Clear 回退Back4. 错误提示 Error5. 读取历史记录Hi…

​CUDA学习笔记(三)CUDA简介

本篇博文转载于https://www.cnblogs.com/1024incn/tag/CUDA/&#xff0c;仅用于学习。 前言 线程的组织形式对程序的性能影响是至关重要的&#xff0c;本篇博文主要以下面一种情况来介绍线程组织形式&#xff1a; 2D grid 2D block 线程索引 矩阵在memory中是row-major线性…

Cannot load from short array because “sun.awt.FontConfiguration.head“ is null

错误描述 在使用Easyexcel时发生了报错&#xff0c;请求返回空白 但是只在Linux上出现了该报错&#xff0c;在本地windows环境没有出现 JDK都使用的是17版本 错误原因 由于在linux上缺失Easyexcel使用的字体导致 解决办法 下载一个jdk1.8 在其jre/lib目录里复制fontconfi…

聊聊昨日ChatGPT全球宕机事件,带给我们的警示

作者 | 卖萌酱&#xff0c;王二狗 昨日&#xff0c;ChatGPT崩了&#xff01; 大模型研究测试传送门 GPT-4传送门&#xff08;免墙&#xff0c;可直接测试&#xff0c;遇浏览器警告点高级/继续访问即可&#xff09;&#xff1a;https://gpt4test.com 许多人发现无论是 ChatGPT…

腾讯云创建了jenkins容器,但无法访问

1、首先&#xff0c;查看本机能不能ping通你的腾讯云服务器 如果ping的通那就下一步 2、查看腾讯云服务器的防火墙关了没&#xff0c;没关关掉、 firewall-cmd --state not running 3、那就在云服务器的控制台开放端口

自然语言处理---注意力机制

注意力概念 观察事物时&#xff0c;之所以能够快速判断一种事物(当然允许判断是错误的)&#xff0c;是因为大脑能够很快把注意力放在事物最具有辨识度的部分从而作出判断&#xff0c;而并非是从头到尾的观察一遍事物后&#xff0c;才能有判断结果。正是基于这样的理论&#xf…

【数据结构】线性表(九)队列:链式队列及其基本操作(初始化、判空、入队、出队、存取队首元素)

文章目录 一、队列1. 定义2. 基本操作 二、顺序队列三、链式队列0. 链表1. 头文件2. 队列结构体3. 队列的初始化4. 判断队列是否为空5. 入队6. 出队7. 存取队首元素8. 主函数9. 代码整合 堆栈Stack 和 队列Queue是两种非常重要的数据结构&#xff0c;两者都是特殊的线性表&…

面试二总结

bean的生命周期&#xff1a; 数据库采用行级锁索引&#xff08;使用排他锁&#xff09;&#xff1a; mysql事务隔离级别 未提交读(Read uncommitted)是最低的隔离级别。通过名字我们就可以知道&#xff0c;在这种事务隔离级别下&#xff0c;一个事务可以读到另外一个事务未提交…

微信小程序自定义组件及会议管理与个人中心界面搭建

一、自定义tabs组件 1.1 创建自定义组件 新建一个components文件夹 --> tabs文件夹 --> tabs文件 创建好之后win7 以上的系统会报个错误&#xff1a;提示代码分析错误&#xff0c;已经被其他模块引用&#xff0c;只需要在 在project.config.json文件里添加两行配置 &…