第 31 章 - 源码篇 - Elasticsearch 写入流程深入分析

news2025/1/8 6:55:24

写入源码分析

接收与处理

请求首先会被 Netty4HttpServerTransport 接收,接着交由 RestController 进行路由分发。

private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
    
    // 从 tier 树中,找到该请求路径对应的 RestHandler
    Iterator<MethodHandlers> allHandlers = getAllHandlers(request.params(), rawPath);
    while (allHandlers.hasNext()) {
        final RestHandler handler;
        final MethodHandlers handlers = allHandlers.next();
        if (handlers == null) {
            handler = null;
        } else {
            handler = handlers.getHandler(requestMethod, restApiVersion);
        }
        if (handler == null) {
            if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
                return;
            }
        } else {
            // 找到后,将本次请求转发给该 RestHandler
            dispatchRequest(request, channel, handler, threadContext);
            return;
        }
    }
}

那么 ES 如何知道对应的路由应该由谁处理呢?
Node 初始化时,会执行 ActionModule#initRestHandlers(...)

public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
    ...
    ...
    // 注册路由
    registerHandler.accept(new RestIndexAction());
    ...
    ...
}

RestIndexAction 注册的路由如下所示

public List<Route> routes() {
    return List.of(
        new Route(POST, "/{index}/_doc/{id}"),
        new Route(PUT, "/{index}/_doc/{id}"),
        Route.builder(POST, "/{index}/{type}/{id}").deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build(),
        Route.builder(PUT, "/{index}/{type}/{id}").deprecated(TYPES_DEPRECATION_MESSAGE, RestApiVersion.V_7).build()
    );
}

每个 RestHandlerprepareRequest(final RestRequest request, final NodeClient client) 都会声明与之绑定的 TransportAction,之后所有逻辑会交由 TransportAction 处理。
其绑定的 TransportActionTransportIndexAction
RestIndexAction#prepareRequest(...)

public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    ...
    ...
    return channel -> client.index(
            indexRequest,
            new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing()))
        );
}

AbstractClient#index(final IndexRequest request, final ActionListener<IndexResponse> listener)

@Override
public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {
    execute(IndexAction.INSTANCE, request, listener);
}

对于写入类型的 TransportAction 在内部又会通过协调节点(接收客户端请求的就是协调节点)先将请求转发给对应主分片所在的节点,主分片节点写入后,主分片节点又会转发给副本分片,副本分片写入后,返回给主分片,主分片再返回给协调节点,最后协调节点返回给客户端。

整体流程如下图所示:

协调节点分发请求

上文 search 读流程有提到,TansportAction 定义了基本流程,每个子类实现 doExecute(...) 方法,自定义执行逻辑,因此我们只需要看 TransportIndexAction#doExecute(...) 即可。

不存在索引则创建

当索引不存在时,则会先创建索引,接着再执行写入操作。如果索引存在,则直接执行写入操作。

TransportBulkAction#doInternalExecute(...)

protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
    for (String index : autoCreateIndices) {
        // 创建索引
        createIndex(index, bulkRequest.timeout(), minNodeVersion, new ActionListener<>() {
            @Override
            public void onResponse(CreateIndexResponse result) {
                // 创建索引成功回调函数,
                if (counter.decrementAndGet() == 0) {
                    // 执行写入操作
                    threadPool.executor(executorName).execute(new ActionRunnable<>(listener) {
                        @Override
                        protected void doRun() {
                            executeBulk(
                                task,
                                bulkRequest,
                                startTime,
                                listener,
                                executorName,
                                responses,
                                indicesThatCannotBeCreated
                            );
                        }
                    });
                }
            }
        }
    }
}

executeBulk(..) 方法内部会创建 BulkOperation 交由该类做处理

void executeBulk(
        Task task,
        BulkRequest bulkRequest,
        long startTimeNanos,
        ActionListener<BulkResponse> listener,
        String executorName,
        AtomicArray<BulkItemResponse> responses,
        Map<String, IndexNotFoundException> indicesThatCannotBeCreated
    ) {
        new BulkOperation(task, bulkRequest, listener, executorName, responses, startTimeNanos, indicesThatCannotBeCreated).run();
    }

BulkOperation 继承自 AbstractRunnableAbstractRunnable 定义了执行的基本流程,子类需要实现 doRun() 方法,因此,只需要关注 BulkOperation#doRun() 方法。

路由计算

BulkOperation#doRun()

protected void doRun() {
    // 获取路由计算规则
    IndexRouting indexRouting = concreteIndices.routing(concreteIndex);
}

IndexRouting#fromIndexMetadata(...)

public static IndexRouting fromIndexMetadata(IndexMetadata indexMetadata) {
    // 索引配置上是否设置 routing_path
    if (false == indexMetadata.getRoutingPaths().isEmpty()) {
            if (indexMetadata.isRoutingPartitionedIndex()) {
                throw new IllegalArgumentException("routing_partition_size is incompatible with routing_path");
            }
            return new ExtractFromSource(
                indexMetadata.getRoutingNumShards(),
                indexMetadata.getRoutingFactor(),
                indexMetadata.getIndex().getName(),
                indexMetadata.getRoutingPaths()
            );
        }
    // 索引配置上是否设置了分区索引相关参数
    if (indexMetadata.isRoutingPartitionedIndex()) {
        return new Partitioned(
            indexMetadata.getRoutingNumShards(),
            indexMetadata.getRoutingFactor(),
            indexMetadata.getRoutingPartitionSize()
        );
    }
    // 正常写入
    return new Unpartitioned(indexMetadata.getRoutingNumShards(), indexMetadata.getRoutingFactor());
}

上诉 3 个路由算法,底层算法都是类似的,都是基于一致性 hash 计算对应的路由。
hash 计算函数 Murmur3HashFunction#hash(String routing)

我们可以简单将路由算法理解为如下:

  1. 先计算 hash
  2. 再根据 hash 计算路由

计算 hash 可以又分为以下几种情况:

  1. 如果索引配置了 routing_path,则 hash = Murmur3HashFunction#hash(routing_path_value)
  2. 如果路径上有路由参数,则 hash = Murmur3HashFunction#hash(routing)
  3. 否则 hash = Murmur3HashFunction#hash(_id)

根据 hash 计算路由的规则如下:
IndexRouting#hashToShardId(...)

protected final int hashToShardId(int hash) {
    return Math.floorMod(hash, routingNumShards) / routingFactor;
}
  • routingNumShards
    值默认依赖主分片数(number_of_shards),如果创建索引时未指定,默认按因子2拆分,并且最多可拆分为1024个分片。例如原索引主分片数为1,则可拆分为1~1024中的任意数;原索引主分片为5,则支持拆分的分片数为:10、20、40、80、160、320以及最大数640(不能超过1024)。 可通过索引的 index.number_of_routing_shards 配置,但不建议配置。
  • routingFactor
    默认为 routingNumShards / number_of_shards

简单说,你可以将 number_of_routing_shards 理解为虚拟的分片数、 number_of_shards 则为物理的分片数。其本质就是 一致性 hash。

分发请求至主分片

TransportReplicationAction#doExecute(...)

@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
    assert request.shardId() != null : "request shardId must be set";
    runReroutePhase(task, request, listener, true);
}

ReroutePhase#doRun()

protected void doRun() {
    if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {       
        // 主分片节点在协调节点上
        performLocalAction(state, primary, node, indexMetadata);
    } else {
        // 主分片节点不在协调节点上
        performRemoteAction(state, primary, node);
    }
}
主分片写入
接收请求

TransportReplicationAction 构造函数,注册了主分片写入的处理函数

protected TransportReplicationAction(
    ...
    ...
) {
    transportService.registerRequestHandler(
        transportPrimaryAction,
        executor,
        forceExecutionOnPrimary,
        true,
        in -> new ConcreteShardRequest<>(requestReader, in),
        this::handlePrimaryRequest
    );
}
主分片写入

TransportShardBulkAction#dispatchedShardOperationOnPrimary(...)

@Override
protected void dispatchedShardOperationOnPrimary(
    BulkShardRequest request,
    IndexShard primary,
    ActionListener<PrimaryResult<BulkShardRequest, BulkShardResponse>> listener
) {
    ...
    ...
    // 在主分片上执行
    performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, (update, shardId, mappingListener) -> {
        ...
    }), listener, threadPool, executor(primary));
}
异步转发请求至副本分片

转发请求至副本分片,是在主分片写入数据后,才执行的

ReplicationOperation#execute(...)

public void execute() throws Exception {
    ...
    ...
    // 执行主分片写入
    primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, this::finishAsFailed));
}

handlePrimaryResult() 方法是写入主分片后的回调函数
ReplicationOperation#handlePrimaryResult(..)

private void handlePrimaryResult(final PrimaryResultT primaryResult) {
    ...
    // 异步发送同步副本分片请求
    performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup, pendingReplicationActions);
    ...
}
副本分片写入
接收请求

类似的,TransportReplicationAction 构造函数,注册了副本分片写入的处理函数

transportService.registerRequestHandler(
    transportReplicaAction,
    executor,
    true,
    true,
    in -> new ConcreteReplicaRequest<>(replicaRequestReader, in),
    this::handleReplicaRequest
);
数据写入

而后请求交给 AsyncReplicaAction#doRun() 处理

@Override
protected void doRun() throws Exception {
    ...
    ...
    // 获取写入许可后,会回调至 AsyncReplicaAction#onResponse() 
    acquireReplicaOperationPermit(
        replica,
        replicaRequest.getRequest(),
        this,
        replicaRequest.getPrimaryTerm(),
        replicaRequest.getGlobalCheckpoint(),
        replicaRequest.getMaxSeqNoOfUpdatesOrDeletes()
    );
}

AsyncReplicaAction#onResponse()

@Override
public void onResponse(Releasable releasable) {
    ...
    // 执行写入
    shardOperationOnReplica(...);
    ...  
}

调用该函数后,最后代码会走到 TransportShardBulkAction#dispatchedShardOperationOnReplica(...)

@Override
protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
    ActionListener.completeWith(listener, () -> {
        final long startBulkTime = System.nanoTime();
        
        // 执行写入
        final Translog.Location location = performOnReplica(request, replica);
        replica.getBulkOperationListener().afterBulk(request.totalSizeInBytes(), System.nanoTime() - startBulkTime);
        return new WriteReplicaResult<>(request, location, null, replica, logger);
    });
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2272495.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言----指针

目录 1.概念 2.格式 3.指针操作符 4.初始化 1. 将普通变量的地址赋值给指针变量 a. 将数组的首地址赋值给指针变量 b. 将指针变量里面保存的地址赋值给另一个指针变量 5.指针运算 5.1算术运算 5.2 关系运算 指针的大小 总结&#xff1a; 段错误 指针修饰 1. con…

Java高频面试之SE-09

hello啊&#xff0c;各位观众姥爷们&#xff01;&#xff01;&#xff01;本牛马baby今天又来了&#xff01;哈哈哈哈哈嗝&#x1f436; final关键字有什么作用&#xff1f; 在 Java 中&#xff0c;final 关键字有多个用途&#xff0c;它可以用于类、方法和变量。根据使用的上…

ChatGPT 主流模型GPT-4/GPT-4o mini的参数规模是多大?

微软论文又把 OpenAI 的机密泄露了&#xff1f;&#xff1f;在论文中明晃晃写着&#xff1a; o1-preview 约 300B&#xff1b;o1-mini 约 100BGPT-4o 约 200B&#xff1b;GPT-4o-mini 约 8BClaude 3.5 Sonnet 2024-10-22 版本约 175B微软自己的 Phi-3-7B&#xff0c;这个不用约…

某纪检工作委员会视频监控网络综合运维项目

随着某纪检工作委员会信息化建设的不断深入&#xff0c;网络基础设施的数量持续增加&#xff0c;对网络设备的运维管理提出了更为复杂和艰巨的要求。为了确保这些关键信息基础设施能够安全稳定地运行&#xff0c;该纪检工作委员会决定引入智能化运维管理系统&#xff0c;以科技…

显示器太薄怎么用屏幕挂灯?使用前先了解屏幕挂灯的最佳角度

人们对用眼健康的重视以及数字化办公和娱乐的普及&#xff0c;屏幕挂灯作为一种能够有效减少屏幕反光、保护眼睛的照明设备&#xff0c;受到了越来越多消费者的青睐。随着科技的进步&#xff0c;显示器设计日益轻薄&#xff0c;为我们的桌面节省了空间并带来了美观的视觉效果。…

HTTP/HTTPS ②-Cookie || Session || HTTP报头

这里是Themberfue 上篇文章介绍了HTTP报头的首行信息 本篇我们将更进一步讲解HTTP报头键值对的含义~~~ ❤️❤️❤️❤️ 报头Header ✨再上一篇的学习中&#xff0c;我们了解了HTTP的报头主要是通过键值对的结构存储和表达信息的&#xff1b;我们已经了解了首行的HTTP方法和UR…

excel快速计算周数的方法

业务中经常要通过周汇总计算&#xff0c;为方便后续汇总在源数据引入“周”列 公式&#xff1a; "W"&IF((ROW()1)/7<1,1,ROUNDUP((ROW()1)/7,0))函数释义&#xff1a; ①一周有7天&#xff0c;如果1月1号刚好是从周一开始&#xff0c;那么计算周数可以简单得…

redis各种数据类型介绍

Redis 是一种高性能的键值存储数据库&#xff0c;它支持多种数据类型&#xff0c;使得开发者可以灵活地存储和操作数据。以下是 Redis 支持的主要数据类型及其介绍&#xff1a; 1. 字符串&#xff08;String&#xff09; 字符串是 Redis 中最基本的数据类型&#xff0c;它可以存…

Python 模块,包(详解)

一. 引用变量 引用变量&#xff1a;值的传递通常可以分为两种方式&#xff0c;一种是值的传递&#xff0c;一种是引用地址传递&#xff0c;在Python中一般都是用引用地址传递 变量名和对象&#xff1a;变量名&#xff08;如 a&#xff09;和它指向的对象&#xff08;如整数 5&a…

RabbitMQ发布确认高级篇(RabbitMQ Release Confirmation Advanced Edition)

系统学习消息队列——RabbitMQ的发布确认高级篇 简介 ‌RabbitMQ是一个开源的消息代理软件&#xff0c;实现了‌高级消息队列协议&#xff08;AMQP&#xff09;‌&#xff0c;主要用于在分布式系统中进行消息传递。RabbitMQ由‌‌Erlang语言编写&#xff0c;具有高性能、健壮…

封装/前线修饰符/Idea项目结构/package/impore

目录 1. 封装的情景引入 2. 封装的体现 3. 权限修饰符 4. Idea 项目结构 5. package 关键字 6. import 关键字 7. 练习 程序设计&#xff1a;高内聚&#xff0c;低耦合&#xff1b; 高内聚&#xff1a;将类的内部操作“隐藏”起来&#xff0c;不需要外界干涉&#xff1b…

【代码随想录】刷题记录(89)-分发糖果

题目描述&#xff1a; n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求&#xff0c;给这些孩子分发糖果&#xff1a; 每个孩子至少分配到 1 个糖果。相邻两个孩子评分更高的孩子会获得更多的糖果。 请你给每个孩子分发糖果&#xff0…

Ae:合成设置 - 3D 渲染器

Ae菜单&#xff1a;合成/合成设置 Composition/Composition Settings 快捷键&#xff1a;Ctrl K After Effects “合成设置”对话框中的3D 渲染器 3D Renderer选项卡用于选择和配置合成的 3D 渲染器类型&#xff0c;所选渲染器决定了合成中的 3D 图层可以使用的功能&#xff0…

掌握RabbitMQ:全面知识点汇总与实践指南

前言 RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。 特点&#xff1a;它通过发布/订阅模型&#xff0c;实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。 作用&#xff1a;服务间异步通信&#xff1b;顺序消费&#xff1b;定时任务&#xff1b;请求削…

react构建项目报错 `npm install --no-audit --save @testing-l

这应该是我们想构建 react18 的项目&#xff0c;但是 通过 npx create-react-app my-app进行构建时&#xff0c;给我们安装的依赖是 react 19 下面提供一下我的解决方法&#xff1a; 第一步&#xff1a;在 package.json 中把依赖 react19 改为 react 18 第二步&#xff1a;添…

App窗口创建流程(Android12 )

有关的窗口对象 PhoneWindowActivityThread#performLaunchActivity {Activity.attach}Surface new ViewRootImpl 创建null对象mSurface.transferFrom(getOrCreateBLASTSurface())//填充内容创建native层的SurfaceLayerSurfaceFlinger::createLayerRenderSurfaceSurfaceFlinger…

LLM之Agent(十三)| 使用 PydanticAI 框架构建多代理LLM 系统(保姆教程)

Pydantic 是 Python 生态系统中的强大平台,每月下载量超过 2.85 亿次。现在,Pydantic的创始人也正在通过 Pydantic AI 涉足 AI 的前沿领域,Pydantic AI 是一个专为构建由生成式 AI 提供支持的生产级应用程序的框架。在本文中,我们将深入探讨 Pydantic AI 的独特之处、它的主…

常用的数据结构API概览

List ArrayList 1、在初始化一个ArrayList的时候&#xff0c;如果我想同时set一些值 比如存放int[ ] List<int[]> list new ArrayList(Arrays.asList(new int[]{intervals[0][0],intervals[0][1]}));//或者int[] temp new int[]{intervals[0][0],intervals[0][1]}…

年会游戏大全 完整版见考试宝

企业年会游戏大全&#xff08;35个&#xff09; 1.泡泡糖 游戏准备&#xff1a;主持人召集若干人上台&#xff0c;人数最好是奇数。 游戏规则&#xff1a;当大家准备好时&#xff0c;主持人喊“泡泡糖”大家要回应“粘什么”&#xff0c;主持人随机想到身体的某个部位&#x…

用豆包去除文章Ai味和重复率,实操教程

用AI生成的文章总是有“AI味”或者重复率高的问题&#xff1f; 今天就教你如何使用豆包轻松去除这些问题 让你的文章更自然、更具个人风格&#xff01;✍️✨ 详细版指令教程都整理了&#xff0c;纯粹F享啦~