Presto(Trino)分布式(物理)执行计划的生成和调度

news2024/11/13 13:29:01

文章目录

  • 1.前言
  • 2.物理执行生成(Stage)的生成
    • 2.1不同的调度分区策略
      • 2.1.1 Connector自己提供的分区策略
      • 2.1.2 Presto提供的Partition策略(SystemPartitioningHandle):
    • 2.2 为Stage创建StageScheduler
      • 2.2.1 普通的非bucket表的TableScan Stage
        • Split 放置策略解析
      • 2.2.2 有splitSource但不是SOURCE_DISTRIBUTION的Stage的调度
        • RemoteSourceNode的exchange类型为replicate 或者 没有RemoteSourceNode但是直接读bucket表
        • RemoteSourceNode的exchange类型不是replicate
      • 2.2.3 其它没有splitSource的Stage
    • 3.1 FixedCountScheduler
    • 3.2 SourcePartitionedScheduler
    • 3.3 FixedSourcePartitionedScheduler
  • 总结

1.前言

在我的上一篇文章《Presto(Trino)的逻辑执行计划和Fragment生成过程》介绍了Presto从Query提交到生成逻辑执行计划、逻辑执行计划的分段全过程。本文书接上文,开始讲解物理执行计划的生成,以及基于物理执行计划的task 和 split 的调度过程。
逻辑执行计划的整个生成和优化是基于用户的SQL和定义好的规则以及外围数据的基本元信息来进行,但是,与逻辑执行计划的生成和优化不同,物理执行计划的生成和调度需要依赖集群和外围数据本身的特性, 比如, 集群当前的动态运行状态,表(本文只讨论Hive)特征(是否是bucket表?是partition还是flat 表),甚至Presto worker和HDFS之间的位置关系来确定。

NOTE:

  • 在本文中经常会出现上游下游,很容易引起部分读者的相反的理解,在这里明确,本文中,下游的输入会依赖上游的输出,因此典型的下游是执行计划树的root节点,典型的上游是直接读表的TableScanNode
  • 在Presto物理执行计划的生成和调度这一层,很多代码涉及到了写数据的调度,以及grouped execution这个特性, 本文均不讨论。

2.物理执行生成(Stage)的生成

----------------------------------------------------------------
// 一个Hive的partition表(无bucket),作为join的左侧表(probe table)
CREATE TABLE `default.zipcodes_orc_partitioned`(
  `recordnumber` int,
  `country` string,
  `city` string,
  `zipcode` int)
PARTITIONED BY (
  `state` string)
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
----------------------------------------------------------------
// 一个Hive的partition且做了bucket表,作为join的右侧表(build table)
CREATE TABLE `zipcodes_orc_partitioned_bucket`(
  `recordnumber` int,
  `country` string,
  `city` string,
  `zipcode` int)
PARTITIONED BY (
  `state` string)
CLUSTERED BY (
  zipcode)
SORTED BY (
  city ASC)
INTO 4 BUCKETS
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
----------------------------------------------------------------
SELECT *
	FROM hive.default.zipcodes_orc_partitioned_bucket t1
	LEFT JOIN hive.default.zipcodes_orc_partitioned t2 
	ON t1.country = t2.country;
----------------------------------------------------------------

对于以上query,生成的分段逻辑执行计划如下图所示:
在这里插入图片描述

SqlQueryExecution.start()中可以看到1) 构建逻辑执行计划以后 2) 开始构建物理执行计划 3) 对物理执行计划进行调度的代码逻辑:

    // 构建逻辑执行计划,并对计划进行Fragment处理
    PlanRoot plan = planQuery();
     // 构建物理执行计划
    planDistribution(plan);
    .......
    // if query is not finished, start the scheduler, otherwise cancel it
    SqlQueryScheduler scheduler = queryScheduler.get();

    if (!stateMachine.isDone()) {
        scheduler.start(); // 开始调度物理执行计划
   }

物理执行计划的生成自顶向下进行,即递归遍历刚刚生成的fragment计划,自顶向下生成整个stage执行树。
物理执行计划树的生成和调度都是由SqlQueryScheduler来完成的。
Presto中,每一个Stage是一个SqlStageExecution的对象。Presto会首先构造一个SqlQueryScheduler对象,在构造函数中构建了rootStage, 然后递归调用createStages()方法生成并构造完成stage 树。
请添加图片描述

在构造每一个stage的时候,会做下面几件事情:
根据当前不同的PartitionHandle 构建

  • 不同的split placement policy以确定split的调度方式
  • 不同的stageScheduler以确定stage内部task的调度方式

2.1不同的调度分区策略

注意,这里的分区并不是指Hive表的分区,而是Presto中的分区概念,指Presto中每一个Fragment中的 task 和 split的分布策略,比如,

  • 我们下文会详细分析,读原始非bucket hive 表的时候,这个策略叫做SOURCE_DISTRIBUTION,
  • 而读bucket表的时候,一个bucket中的数据不会被打散,而是调度到一起,这时候的partition策略则是根据bucket的数量进行,
  • 最后的计算结果会集中到最终的OutputNode, 这个fragment的partition策略就叫做SINGLE

不同调度策略决定于逻辑执行计划分段(PlanFragment)阶段,到了创建Stage的阶段,就需要根据不同的partition 策略,来确定具体的调度细节。
所有的根据partition策略来创建stage的过程,都体现在SqlQueryScheduler.createStages()中。

所以,每一个Presto 的 Fragment都有一个对应的partition策略,信息都封装在PartitioningHandle.java中:

public class PartitioningHandle
{
    private final Optional<CatalogName> connectorId;
    private final Optional<ConnectorTransactionHandle> transactionHandle;
    private final ConnectorPartitioningHandle connectorHandle; // 比如对应 Bucket Hive表的ScanNode,partitioninHandle是HivePartitioningHandle

不同的PartitioningHandle的区别主要体现在connectorHandle上。我们可以把connectorHandle 分成两类,一种是Connector自己提供的分区策略,一种是Presto内置的处理各种分区方式的分区策略。具体介绍如下:

2.1.1 Connector自己提供的分区策略

如果Connector 自己提供了对应的PartitionHandle,那么就用connector自己提供的ConnectorPartitioningHandle来构造PartitioningHandle。这里最典型的例子是,Hive的bucket表. 在构建Fragment阶段,如果遇到了TableScanNode, Presto会尝试使用Hive Connector自己的ConnectorPartitioningHandle实现, 查看代码HiveMetadata.getTableProperties

 if (isBucketExecutionEnabled(session) && hiveTable.getBucketHandle().isPresent()) {
    tablePartitioning = hiveTable.getBucketHandle().map(bucketing -> new ConnectorTablePartitioning(
            new HivePartitioningHandle(
                    bucketing.getBucketingVersion(),
                    bucketing.getReadBucketCount(),
                    bucketing.getColumns().stream()
                            .map(HiveColumnHandle::getHiveType)
                            .collect(toImmutableList()),
                    OptionalInt.empty()),
            bucketing.getColumns().stream()
                    .map(ColumnHandle.class::cast)
                    .collect(toImmutableList())));

可以看到,hive层面的处理策略是,如果是bucket表,那么会提供一个叫做HivePartitioningHandleConnectorPartitioningHandle实现,这个实现的最大功能是提供了bucket number的数量信息, 后期,Presto将根据bucket 的数量来确定调度的分区,即NodePartitionMap,本文后面会具体讲解。

2.1.2 Presto提供的Partition策略(SystemPartitioningHandle):

这发生在我们的Connector没有提供对应的PartitioningHandle的情况下,Presto就在生成Fragment的时候为这个Fragment构造合理的ConnectorPartitioningHandle并组装为PartitioningHandle。这种Partition策略的ConnectorPartitioningHandle实现类是都是SystemPartitioningHandle

    public SystemPartitioningHandle(
           @JsonProperty("partitioning") SystemPartitioning partitioning,
           @JsonProperty("function") SystemPartitionFunction function)
	   {
	       this.partitioning = requireNonNull(partitioning, "partitioning is null");
	       this.function = requireNonNull(function, "function is null");
	   }
	   private enum SystemPartitioning
	    {
	        SINGLE,
	        FIXED,
	        SOURCE,
	        SCALED,
	        COORDINATOR_ONLY,
	        ARBITRARY
	    }

可以看到,SystemPartitioningHandle可以封装不同的Partition的策略,我们从改名字能够大致知道这些Partition策略所代表的partition的方式。

比如:对于非Partition的Hive 表,由于Hive没有提供ConnectorPartitioningHandle实现,因此, Presto使用Partition策略为SystemPartitioning.SOURCE作为PartitioningHandle的策略,封装为一个叫做SOURCE_DISTRIBUTIONPartitioningHandle的对象:

public static final PartitioningHandle SOURCE_DISTRIBUTION = createSystemPartitioning(SystemPartitioning.SOURCE, SystemPartitionFunction.UNKNOWN);  

下面的代码显示,在逻辑执行计划生成阶段,会将非bucket表对应的Fragment的Partitioning设置为SOURCE_DISTRIBUTION

   @Override
   public PlanNode visitTableScan(TableScanNode node, RewriteContext<FragmentProperties> context)
   {
       PartitioningHandle partitioning = metadata.getTableProperties(session, node.getTable())
               .getTablePartitioning()
               .map(TablePartitioning::getPartitioningHandle)
               .orElse(SOURCE_DISTRIBUTION);

2.2 为Stage创建StageScheduler

上面简单介绍了Presto中partition的策略,基于对partition策略的理解,我们开始来看Presto是怎么根据各个Stage的partition的特性,来为各个Stage准备好对应的StageScheduler实现,进而使用这些Scheduler实现来对Stage进行调度的。
下图是创建StageScheduler的基本流程。
请添加图片描述

我们说Stage的调度,指的是对Stage中的Split和Task进行节点分配, 即让对应的Task在合适的节点上运行起来,同时,让这些Task知道从哪些上游(Children)的task中拉取数据,也知道有哪些下游(Parent)的task会从自己这里拉取数据因此自己需要把这些数据准备好。

Presto在这一层的代码实现并不是特别容易理解,Presto大致是这样考虑的:

  1. 对于那些直接读取Hive数据的Stage(有splitSource,并且是非partition表)
    • 如果不是bucket表, 那么,我们会把读取这些split的task均匀分布到整个集群的所有节点,让每个节点均匀地负责一部分splits
    • 如果bucket表,那么会为每一个bucket独立分配一个task去处理数据
  2. 如果是既直接读取Hive数据(有splitSource,并且是bucket表),同时又通过RemoteSourceNode指向了子Stage,那么这时候需要根据子Stage的exchange类型来确定调度形式
    • 如果子Stage的ExchangeTypereplicate,意味着上游(Children)Stage产生的数据需要无差别地复制到下游(Parent)Stage中去,不存在进行分区的问题
    • 如果子Stage的ExchangeType不是replicate(repartition或者gather),意味着上游(Children)Stage产生的数据需要根据对应的分区策略,只交付给对应的分区
  3. 没有splitSource的stage,这时候需要根据具体的分区情况来构建对应的Scheduler, 但是这时候由于没有直接的TableScanNode,因此全部都是RemoteSplit,不存在Split的调度,只存在Task的调度,因此很简单。

2.2.1 普通的非bucket表的TableScan Stage

这种Stage是指这个 Stage中有TableScanNode, 并且这个TableScanNode对应的表不是bucket表,比如下面两种情况:

  1. 直接读取某一个非bucket 表(然后进行聚合或者不聚合都可以),比如: SELECT * FROM unbucketed_table
  2. 一个Join操作中,任何对应于读取非bucket表的一侧的Stage(或者两侧表都是非bucket 表),那么这种Stage的partitioning就会是SOURCE_DISTRIBUTION的,比如: SELECT * FROM unbucketed_table ut left join bucketed_table bt on ut.row = bt.row 中的左侧读表Stage或者 SELECT * FROM bucketed_table bt left join unbucketed_table ut on bt.row = ut.row 的右侧读表Stage

综上,如果这个Stage含有直接读非bucket表的TableScanNode, 那么这个Stage的partitioning就会在planFragment 阶段被设置为SOURCE_DISTRIBUTION,这意味着对于这个Stage 的split和task的调度采用一种类似于均匀的调度策略DynamicSplitPlacementPolicy,先把split均匀的分配(注意,不是调度)到整个集群的所有节点,让各个节点尽可能有均匀地workload,然后,把task调度到这些节点上去,task调度上去以后,就可以开始调度对应的split了。

上面描述的调度逻辑,发生在下面的代码中(SqlQueryScheduler.java)

        if (partitioningHandle.equals(SOURCE_DISTRIBUTION)) {
            // 在这种情况下,节点是通过DynamicSplitPlacementPolicy动态选择的
            // nodes are selected dynamically based on the constraints of the splits and the system load
            Entry<PlanNodeId, SplitSource> entry = Iterables.getOnlyElement(plan.getSplitSources().entrySet());
            PlanNodeId planNodeId = entry.getKey();
            SplitSource splitSource = entry.getValue();
            Optional<CatalogName> catalogName = Optional.of(splitSource.getCatalogName())
                    .filter(catalog -> !isInternalSystemConnector(catalog));
            NodeSelector nodeSelector = nodeScheduler.createNodeSelector(catalogName); // UniformNodeSelector
            // 使用DynamicPlacementPolicy作为底层Scan Data的策略。即,只要节点上还有空闲,就可以使用这个节点,不存在node map
            SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stage::getAllTasks);

            checkArgument(!plan.getFragment().getStageExecutionDescriptor().isStageGroupedExecution());
            // 把SourcePartitionedScheduler作为StageScheduler而不是SourceScheduler
            stageSchedulers.put(stageId, newSourcePartitionedSchedulerAsStageScheduler(stage, planNodeId, splitSource, placementPolicy, splitBatchSize));
            bucketToPartition = Optional.of(new int[1]);
        }

对于这种表的调度,在逻辑执行计划生成阶段,Presto已经从表的元数据信息中获取到该表不是bucket表,因此设置了partitionHandle为SOURCE_DISTRIBUTION, 对于这种stage, Presto的调度逻辑可以描述为:

  • 根据当前的SplitPlacementPolicy和split本身的特性(split是否允许远程访问,split是否携带了地址),来确定split的分配表, 即splitAssignment
  • 确定了split的分配表以后,开始逐个节点的进行调度和分配(因为一个节点上可能分配了多个split,因此逐个节点进行分配和调度明显优于逐个split进行调度)
    • 如果该节点当前还没有创建task,那么就在该节点上创建task并把task在这个节点调度起来
    • 如果task创建好了, 那么只需要把这批分配到该节点的split给attach给这个task,让这个task开始处理这些split
      请添加图片描述

Split 放置策略解析

对于Split的放置策略,默认使用DynamicSplitPlacementPolicy,该DynamicSplitPlacementPolicy绑定的是UniformNodeSelector, 其节点选择逻辑为:

  • 对于某一轮调度的一批splits, 先把所有的可以远程访问并且没有地址信息的split先进行worker节点分配, 即
    • 该split是远程可访问并且split中没有地址信息, 那么为该split在当前所有的节点中选择已调度split数量最小的并且总的split数量没有超过最大允许运行的单节点的split数量的节点,从而实现worker节点所负责的split的数量的基本均衡
    • 对于剩下的还没有分配到节点的split(这时候没有分配成功,可能是因为split不允许远程访问,也有可能是所有节点上运行的split已经超过了最大允许运行的split数量),这样做
      • 如果该节点不是远程可访问,那么将这个split分配到这个split的address节点中去。很明显,这个split的address很可能根本不是presto的节点,那么这时候就分配失败。否则就分配成功
      • 如果该节点可以远程访问,则尝试为该节点重新随机选择节点(不再考虑节点的worker上运行的split是否已经超过限制)

2.2.2 有splitSource但不是SOURCE_DISTRIBUTION的Stage的调度

这种Stage有splitSource,就意味着这个Stage有读表的TableScanNode,但是它的partitioning并不是SOURCE_DISTRIBUTION, 这发生在对于bucket 表进行读取的Stage (在2.2.1的Stage也有splitSource ,因为也读表了)。
这种Stage有splitSource,在Presto中意味着这个Stage是有读表的TableScanNode的,因此,上一章节中的Stage也是有splitSource的,但是仅限于**非bucket **表。但是,这种有TableScanNode的Stage可能还有其它类型,比如:

  1. 直接读取某一个bucket 表(然后进行聚合或者不聚合都可以),比如: SELECT * FROM bucketed_table
  2. 一个Join操作中,任何对应于读取bucket表的一侧的Stage(或者两侧表都是bucket 表),那么这种Stage就是有splitSource(因为读表了)却不是SOURCE_DISTRIBUTION的,比如: SELECT * FROM unbucketed_table ut left join bucketed_table bt on ut.row = bt.row 中的右侧读表Stage或者 SELECT * FROM bucketed_table bt left join unbucketed_table ut on bt.row = ut.row 的左侧读表Stage

这种Stage有splitSource,我们需要根据RemoteSourceNode的partition 类型来分别计算从split到node之间的对应关系,然后进行split和task的调度

RemoteSourceNode的exchange类型为replicate 或者 没有RemoteSourceNode但是直接读bucket表

这种情况有两个前提,

  • 一个是这个Stage有splitSource,即这个Stage有直接读表(TableScanNode)的操作;另一个前提是整个Stage有RemoteSourceNode(即有子Stage)并且子Stage的exchange类型为replicate, 即当前StageRemoteSourceNode所指向的远程的Stage的exchange类型为replicate,而不是指当前Stage的exchange 类型为replicate

或者

  • 或者,这个Stage没有RemoteSourceNode(即没有上游(Children)Stage),只有读表TableScanNode,并且这个表是bucket 表
    Presto把这两种情况放在一个case中去处理,因为它们都不要求上游(Children)Stage有分区的概念,都有bucket表

这里的replicate, 和 spark中的BROADCAST是一个概念,发生在比如大表join小表,那么把小表的数据整个广播到所有join的task上面去构建一个哈希表。
这是ExchangeType的定义,除了REPLICATE,还有REPARTITIONGATHER。Presto将REPLICATE单独处理,而将REPARTITIONGATHER放在一起处理,因为REPLICATE是没有PARTITION的概念的,而GATHER是一种特殊情况下的REPARTITION,即下游Stage只有一个task的REPARTITION

 public enum Type
 {
     GATHER, // 只是一种特殊情况下的repartition,即parent stage只有一个partition的REPARTITION
     REPARTITION,
     REPLICATE
 }

下图分别释义了三种ExchangeType 的数据交换:
REPARTITION的exchange,上游(Children)的task会为下游(Parent)的每一个partition准备一个OutputBuffer, 根据比如hash算法,将对应的数据写入指定的partition 的OutputBuffer中,而下游(Parent)的task只会找上游(Children)的task拉取自己的partition的数据。
请添加图片描述

GATHER exchange其实是一种特殊的Exchange类型,即下游(Parent)只有一个task,因此上游(Children)每个Task的OutputBuffer只有一个。
请添加图片描述
REPLICATE 的exchange类型意味着上游(Children)的每个task的所有Output都放到一个Buffer中,下游(Parent)的每一个task会轮训上游(Children) Stage的所有task获取对应的这个task输出的全量数据。
请添加图片描述

可以想见,如果自己依赖的下游的Stage的数据是会全量复制(replicate),那么我当前Stage的task的分布节点的调度可以只需要考虑到节点的数量。由于当前Stage的TableScanNode对应的表是bucket表,那么我完全可以为每一个bucket刚好调度一个task,即task与bucket一一对应,那么,上游的小表就为每一个task复制一份过去就行了。这其实就是这种Stage的调度思想。
这一部分调度处理逻辑发生在SqlStageScheduler.java中:

                // 如果所有的节点的exchange type是REPLICATE,或者,它没有remoteSourceNodes,即只有读表的TableScanNode,显然,这张表是bucket表,因为在前面已经处理了非bucket表的读取了 
                // 检查RemoteSourceNode的exchangeType是否都是replicate. 注意,一个Stage如果只读表,是没有RemoteSourceNode的
                if (plan.getFragment().getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)) {
                    // no remote source
                    boolean dynamicLifespanSchedule = plan.getFragment().getStageExecutionDescriptor().isDynamicLifespanSchedule();
                    bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, dynamicLifespanSchedule); // 获取了从split -> bucket -> node的映射关系
                    // stageNodeList是所有的node // 需要调度到多少个节点上。stageNodeList的数量就是partition的数量
                    stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(catalogName).allNodes()); //stageNodeList代表这个stage的所有节点
                    Collections.shuffle(stageNodeList);
                    //  用来创建对应的 StageExecutionPlan
                    bucketToPartition = Optional.empty(); // 这时候bucketToPartition是空的,即 没有什么partition的概念
                }

从上面的代码可以看到,其实是构建了一个BucketNodeMap, 即构建完成了从split -> bucket -> node的映射关系,有了这一层映射关系,对于任何一个从connector层面读出来的split,我们都知道应该把这个split调度到哪个node上去。
那么,这个BucketNodeMap的映射关系是怎样构建的,构建原理如下图所示:
请添加图片描述

从代码可以看到,NodePartitionManager负责生成对应的BucketNodeMap, 看方法nodePartitioningManager.getBucketNodeMap(),可以看到,它还是委托接口ConnectorNodePartitioningProvider来提供对应的bucket信息,对于Hive, 接口ConnectorNodePartitioningProvider的实现类是HiveNodePartitioningProviderHiveNodePartitioningProvider提供的bucket的相关信息,构建split -> bucket的映射关系,然后根据当前集群的节点状况,构建split -> bucket的映射关系,最终完成split -> bucket的映射。
我们看接口ConnectorNodePartitioningProvider的代码,能够看到ConnectorNodePartitioningProvider的具体意图:

public interface ConnectorNodePartitioningProvider
{
   /**
     * 在Connector层面提供bucket到node的映射关系
     * @param transactionHandle
     * @param session
     * @param partitioningHandle
     * @return
     */
    ConnectorBucketNodeMap getBucketNodeMap(ConnectorTransactionHandle transactionHandle, 
                                            ConnectorSession session, 
                                            ConnectorPartitioningHandle partitioningHandle);

    /**
     * 在Connector层面提供split到bucket的映射关系
     * @param transactionHandle
     * @param session
     * @param partitioningHandle
     * @return
     */
    ToIntFunction<ConnectorSplit> getSplitBucketFunction(ConnectorTransactionHandle transactionHandle, 
                                                         ConnectorSession session,
                                                         ConnectorPartitioningHandle partitioningHandle);

    /**
     * 提供BucketFunction, 即给定任意一个Page和对应的position,返回这条数据对应的bucket
     * @param transactionHandle
     * @param session
     * @param partitioningHandle
     * @param partitionChannelTypes
     * @param bucketCount
     * @return
     */
    BucketFunction getBucketFunction(
            ConnectorTransactionHandle transactionHandle,
            ConnectorSession session,
            ConnectorPartitioningHandle partitioningHandle,
            List<Type> partitionChannelTypes,
            int bucketCount);
}
  1. 构建split -> bucket的映射关系
    从代码HiveNodePartitioningProvider.getSplitBucketFunction()可以看到,逻辑很简单,就是提取split中的bucket编号即可:
    @Override
    public ToIntFunction<ConnectorSplit> getSplitBucketFunction(
            ConnectorTransactionHandle transactionHandle,
            ConnectorSession session,
            ConnectorPartitioningHandle partitioningHandle)
    {
        return value -> ((HiveSplit) value).getBucketNumber() // 这个function是用来把ConnectionSplit转成int,转换的function就是提取这个split中的bucketNumber,由此可见,一个split是不可能跨多个split的
                .orElseThrow(() -> new IllegalArgumentException("Bucket number not set in split"));
    }
    
  2. 构建bucket -> node的映射关系
    从下面的代码可以看到,bucket -> node的映射关系就是根据bucket到数量随机选择了对应数量的节点,bucket和node一一对应,就完成了bucket到node对应关系
     return new FixedBucketNodeMap( // 通过每次随机选择节点,创建构建bucket到node的对应关系
             getSplitToBucket(session, partitioningHandle), // 每一个split都对应了一个bucket
             createArbitraryBucketToNode(
                     new ArrayList<>(nodeScheduler.createNodeSelector(catalogName).allNodes()),
                     connectorBucketNodeMap.getBucketCount()));
    

split -> bucketbucket -> node的对应关系对应好了,那么就封装成 FixedBucketNodeMap,从而完成了从split -> node的对应关系。有了这一层对应关系,StageScheduler就知道应该把每一个split调度到哪个node,当然,对应的node会创建好对应的task。这种有splitSource但是不是SOURCE_DISTRIBUTION的Stage,Presto用SourcePartitionedScheduler作为splitSource scheduler, FixedSourcePartitionedScheduler作为StageScheduler,构建好Scheduler, 就可以开始调度。

RemoteSourceNode的exchange类型不是replicate

从上文中的Exchange.Type定义可以看出来,除了replicate,就是repartition和gather, Presto把它们作为同一种case进行处理,因为gather是下游stage只有一个partition的特殊情况下的repartition exchange。
这种情况的前提是需要有splitSource,即这个Stage有直接读表的操作,有TableScanNode操作。
RemoteSourceNode的exchange类型为repartition或者gather, 这意味着当前Stage的RemoteSourceNode所指向的远程的Stage的exchange类型为repartition或者gather, 而不是上面的replicate.
这种情况下,在Presto层面,会生成一个NodePartitionMap信息,这个信息的结构是这样的:

public class NodePartitionMap
{
    // 根据partition获取对应的node
    private final List<InternalNode> partitionToNode;
    // 根据bucket index获取partition
    private final int[] bucketToPartition;
    // 根据split获取bucket的index
    private final ToIntFunction<Split> splitToBucket;

这个类其实维护了split -> bucket -> partition -> node的对应关系。在这里,其实partition与task是一一对应的。维护这样一个关系的意义是:
对于一个Stage调度了一些task出去并且获得了调度结果,我们需要把这些task所属的partition更新到上游的Stage(这个stage所依赖的stage)的所有的task中去,保证上游的Stage的每个task都为上游的每一个partition准备好对应的output buffer,并将生成的需要传输给下游的数据准备好并放置在正确的对应的partition的OutputBuffer中,这样,下游的task通过拉取的方式来获取数据的时候,就可以拉取到正确的属于自己的partition的数据。
那么,NodePartitionMap是怎样构建出来的呢?

跟上一节讲到的BucketNodeMap的映射关系构建一样,这个NodePartitionMap的构建也是NodePartitioningManager来进行的,并且内部也是通过hive的ConnectorNodePartitioningProvider实现类HiveNodePartitioningProvider来提供bucket到信息,进而完成split -> bucket -> partition -> node的对应关系的构建的。
整个步骤如下图所示:


    public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHandle partitioningHandle)
    {
        // 这个PartitionHandle的connectorHandle为SystemPartitioningHandle, 说明是非源头fragment, 依赖SystemPartitioningHandle,
        if (partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle) {
            return ((SystemPartitioningHandle) partitioningHandle.getConnectorHandle()).getNodePartitionMap(session, nodeScheduler);
        }

        // 对于源头PartitionHandle,应该是有对应的catalog信息的
        CatalogName catalogName = partitioningHandle.getConnectorId()
                .orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle));
        ConnectorNodePartitioningProvider partitioningProvider = partitioningProviders.get(catalogName);
        
        ConnectorBucketNodeMap connectorBucketNodeMap = getConnectorBucketNodeMap(session, partitioningHandle);

        List<InternalNode> bucketToNode;
        // 已经有了bucket到node的对应关系
        if (connectorBucketNodeMap.hasFixedMapping()) {
            bucketToNode = getFixedMapping(connectorBucketNodeMap);
        }
        else {
            // 还没有bucket到node的对应关系,创建随机对应关系
            bucketToNode = createArbitraryBucketToNode(
                    nodeScheduler.createNodeSelector(Optional.of(catalogName)).allNodes(),
                    connectorBucketNodeMap.getBucketCount());
        }

        int[] bucketToPartition = new int[connectorBucketNodeMap.getBucketCount()];
        BiMap<InternalNode, Integer> nodeToPartition = HashBiMap.create();
        int nextPartitionId = 0;
        for (int bucket = 0; bucket < bucketToNode.size(); bucket++) {
            // 不同的bucket可能属于同一个node,而node是跟partition一一对应的
            // 比如,Presto cluster 有 5个节点,那么会有5个partition,但是hive table有256个bucket, 或者有2个bucket
            InternalNode node = bucketToNode.get(bucket);
            Integer partitionId = nodeToPartition.get(node);
            // 还没有任何的partition分配到这个node, 那么为这个node分配partition
            if (partitionId == null) {
                partitionId = nextPartitionId++;
                nodeToPartition.put(node, partitionId);
            }
            // 这个bucket属于了这个分区
            bucketToPartition[bucket] = partitionId;
        }

        List<InternalNode> partitionToNode = IntStream.range(0, nodeToPartition.size())
                .mapToObj(partitionId -> nodeToPartition.inverse().get(partitionId))
                .collect(toImmutableList());

        return new NodePartitionMap(partitionToNode, bucketToPartition, getSplitToBucket(session, partitioningHandle));
    }

当有了NodePartitionMap以后,就可以构建对应的Scheduler, 这里,Presto用SourcePartitionedScheduler作为source scheduler, 用FixedSourcePartitionedScheduler,用FixedSourcePartitionedScheduler作为StageScheduler,构建好Scheduler, 开始调度。
请添加图片描述

2.2.3 其它没有splitSource的Stage

这种stage没有splitsSource,说明这种Stage不包含TableScanNode, 在Presto中,这种Stage的split叫做RemoteSplit。 我的理解,叫做RemoteSplit的原因是,这种Stage的split都是上游Stage生成的,因此,这种Stage不用调度Split, 只需要调度好Task, 所有的split都需要task从上游Stage去拉取。比如,本文中的Stage 0.
这种Stage的调度使用的Scheduler是FixedCountScheduler,因为task的数量是明确的,它跟上一节** RemoteSourceNode的exchange类型不是replicate** 一样,也是通过NodePartitioningManager.getNodePartitioningMap()构建NodePartitionMap, 只不过这个Stage的ConnectorPartitioningHandle实现不再是HivePartitioningHandle, 而是 SystemPartitioningHandle(因为这个Stage没有TableScanNode, 不跟具体的split source打交道),因此是SystemPartitioningHandle提供对应的NodePartitionMap, 代码如下所示:

    public NodePartitionMap getNodePartitionMap(Session session, NodeScheduler nodeScheduler)
    {
        // UniformNodeSelector
        NodeSelector nodeSelector = nodeScheduler.createNodeSelector(Optional.empty());
        List<InternalNode> nodes;
        // 只支持COORDINATOR_ONLY, SINGLE, FIXED三种
        if (partitioning == SystemPartitioning.COORDINATOR_ONLY) {
            nodes = ImmutableList.of(nodeSelector.selectCurrentNode()); // 当前节点即coordinator节点
        }
        else if (partitioning == SystemPartitioning.SINGLE) {
            nodes = nodeSelector.selectRandomNodes(1); // 比如select count(*), 这时候就任意选择一个节点就行
        }
        else if (partitioning == SystemPartitioning.FIXED) {
            nodes = nodeSelector.selectRandomNodes(getHashPartitionCount(session));
        }
        // 对于system_partitioning, 是不需要split to bucket mapping的
        return new NodePartitionMap(nodes, split -> {
            throw new UnsupportedOperationException("System distribution does not support source splits");
        });
    }

当构建完了NodePartitionMap, 即确定了task的调度位置,那么就可以构建对应的StageScheduler了,这里的实现是FixedCountScheduler

### 2.2.4 为每一个Stage构建好StageScheduler,准备开始调度 到目前为止,无论是对于SourceSplit, 还是没有SourceSplit而全部都是远程Split的Stage,我们都构建完成了NodePartitionMap, 即我们知道了当前系统的分区信息(如果有分区),我们知道了具体的调度位置了,因此可以构建具体的调度器`SqlStageScheduler`了。 在构建完成了整个分配策略以后,就使用`FixedSourcePartitionedScheduler`作为对应的`StageScheduler`,而委托`SourcePartitionedScheduler`作为`SourceScheduler`,准备开始调度了。 # 3. 物理执行计划的调度 物理执行计划生成以后,调度是发生在SqlQueryScheduler.schedule()中的。在这个方法运行前,每一个stage都生成了对应的StageScheduler实现,所有的调度信息已经准备好了。 在SqlQueryScheduler.schedule()中,它会进行多轮的调度,每一轮调度,它都会委托具体的ExecutionPolicy接口的实现,来提供需要调度的Stage以及调度顺序,然后就按照返回的`List`的顺序依次调度。

必须清楚,某个Stage的调度并不是一次性就可以完成的(比如,我们底层的读表操作,这些Split的读取是下层connector通过异步方式提供给上层scheduler的,并不是一次性全部提供的),ExecutionPolicy会维护对应的Stage的调度状态,在每一轮调度开始的时候,只提供还没有完成调度的Stage。

对于ExecutionPolicy, Presto默认使用AllAtOnceExecutionPolicy, 它会把所有的Stage一次性调度出去。另外一种ExecutionPolicy的实现是PhasedExecutionPolicy, 本文不具体讨论。从代码可以看到,AllAtOnceExecutionPolicy会按照从底向上的顺序返回Stage,因此,SqlQueryScheduler.schedule()是按照从地向上的顺序开始调度的,即,往往含有TableScanNode的Stage会先调度出去,顶层返回结果给用户的OutputNode最后调度。但是必须清楚,这里的顺序仅仅是调度的顺序,一个后调度的Stage只是后调度,并不一定后执行,也并不需要等到前面先调度的Stage执行完才能开始执行或者被调度。

每一个 Stage调度完成以后,有可能产生了新调度出去的task,这时候,Presto都需要通过StageLinkage信息来维护和更新上游(children)和下游(parent)的相关信息,这是在StageLinkage.processScheduleResults()中做的,即,StageLinkage的作用非常一目了然,就是根据当前Stage不断更新的调度结果,维护上下游Stage之间的依赖关系, 包括:

  1. 告知下游(parent)的Stage的所有task,我这里有新的task了,从我这里pull数据吧。
    这是通过调用下游(parent)的Stage的SqlStageExecution.addExchangeLocations()方法来实现的。查看代码可以看到,它会遍历下游(Parent) Stage的每一个task,把当前Stage的新的Task的信息封装成RemoteSplit,通知给Parent Stage的每一个Task:
     /**
     *
     * @param fragmentId 当前stage的fragment id
     * @param sourceTasks 为这个stage生成的新的task
     * @param noMoreExchangeLocations
     */
    public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set<RemoteTask> sourceTasks, boolean noMoreExchangeLocations)
    {
        requireNonNull(fragmentId, "fragmentId is null");
        requireNonNull(sourceTasks, "sourceTasks is null");
        // 获取RemoteSourceNode,这个RemoteSourceNode对应的fragment是当前的SqlStageExecution的子节点, sourceTasks是这个fragmentId对应的分布式执行计划的source task
        RemoteSourceNode remoteSource = exchangeSources.get(fragmentId); // 这个fragment只是这个parent的众多remote source中的一个
        // sourceTasks 是一个MultiMap, 对于一个key为remoteSourceId, value为这个id对应的搜有的sourceTasks
        this.sourceTasks.putAll(remoteSource.getId(), sourceTasks);
    
        // 对于 parent的每一个task,构建parent的task和source task之间的依赖关系
        for (RemoteTask task : getAllTasks()) { // 由于SqlStageExection指代当前的parent,因此getAllTasks()代表当前的parent的所有的task
            ImmutableMultimap.Builder<PlanNodeId, Split> newSplits = ImmutableMultimap.builder();
            for (RemoteTask sourceTask : sourceTasks) {
                URI exchangeLocation = sourceTask.getTaskStatus().getSelf();
                newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), exchangeLocation));
            }
            task.addSplits(newSplits.build()); // 为这个parent fragment的task设置remote split信息
        }
    
  2. 告知上游(children)的一个或者多个Stage的所有task,我有新的task需要从你那儿pull数据,因此你要准备好对应的OutputBuffer,存放好对应的数据,我会过来取。这是通过调用子Stage的对应的OutputBufferManager.addOutputBuffers()来实现的。
 private static class StageLinkage
 {
     private final PlanFragmentId currentStageFragmentId;
     private final ExchangeLocationsConsumer parent;
     private final Set<OutputBufferManager> childOutputBufferManagers; // 所有的childStage的output buffer
     private final Set<StageId> childStageIds;
        public StageLinkage(PlanFragmentId fragmentId, ExchangeLocationsConsumer parent, Set<SqlStageExecution> children)
        {
            this.currentStageFragmentId = fragmentId;
            this.parent = parent;
            this.childOutputBufferManagers = children.stream() // child stage的outputBufferManager创建好,这样,当前stage有了新的task,需要更新child stage的outputbuffer
                    .map(childStage -> {
                        PartitioningHandle partitioningHandle = childStage.getFragment().getPartitioningScheme().getPartitioning().getHandle();
                        if (partitioningHandle.equals(FIXED_BROADCAST_DISTRIBUTION)) {
                            return new BroadcastOutputBufferManager(childStage::setOutputBuffers);
                        }
                        else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
                            return new ScaledOutputBufferManager(childStage::setOutputBuffers);
                        }
                        else {
                            // index + 1
                            int partitionCount = Ints.max(childStage.getFragment().getPartitioningScheme().getBucketToPartition().get()) + 1;
                            return new PartitionedOutputBufferManager(partitioningHandle, partitionCount, childStage::setOutputBuffers);
                        }
                    })

从上面StageLinkage的成员变量中可以看到,每一个Stage的StageLinkage, 都维护了下游(Parent)的信息,和上游(Children)的信息,这样,通过StageLinkage, 我们就可以把一个Stage的信息变更告知给它的下游(Parent)和上游(Children).
在上面的StageLinkage的构造方法中,最重要的childOutputBufferManagers是给当前 Stage的每一个子Stage维护一个OutputBufferManager, 如果当前Stage调度出了新的Task,就会遍历childOutputBufferManagers,调用OutputBufferManager.addOutputBuffers()方法,将新的task的信息告诉子Stage,这样,子Stage 的所有在运行的task就会为这些新的task构建好OutputBuffer, 并将输出的数据放到对应的OutputBuffer中,等待当前Stage的task来取。

下面是OutputBufferManager接口的代码,调度过程中就是通过调用addOutputBuffers()方法,来更新对应的Task的outputBuffer信息的。

interface OutputBufferManager
{
    void addOutputBuffers(List<OutputBufferId> newBuffers, boolean noMoreBuffers);
}

从StageLinkage的构造方法可以看到,StageLinkage会根据子Stage的PartitioningHandle, 绑定对应的OutputBufferManager接口实现,主要有这两种实现::

  • BroadcastOutputBufferManager,用在FIXED_BROADCAST_DISTRIBUTION的PartitioningHandle情况下,比如,broadcast join。BroadcastOutputBufferManger所管理的outputBuffer没有partition的概念,或者,我们可以认为只有一个id=0的partition
  • PartitionedOutputBufferManager,用来需要进行partition的PartitioningHandle的Stage,比如,partitioned join。 跟BroadcastOutputBufferManager相比,我们可以看到BroadcastOutputBufferManager.addOutputBuffers()的实现,PartitionedOutputBufferManager的outputBuffer是在PartitionedOutputBufferManager构造的时候就确定的,后续调用addOutputBuffers的时候如果发现这个bufferId已经存在并且不一样,就会抛出异常,因为基于partition策略构建的stage,这个OutputBuffer应该在构造的时候就是明确的。因此我们可以看到,构造PartitionedOutputBufferManager的时候调用withNoMoreBufferIds,即不会有更多的bufferId过来了。

3.1 FixedCountScheduler

从上文的讲解可以看到,FixedCountScheduler用来给不存在读表TableScanNode的Stage的调度使用的,因此不存在splitSource,所以,所有的split都是RemoteSplit,把task调度出去就行,不需要调度split。
FixedCountScheduler的调度不依赖split的调度,因此,它的实现是所有StageScheduler中最简单的,就是把每个partition上对应的task调度出去就完成了(注意,调度完成不代表运行完成)。具体的task的调度是taskScheduler.scheduleTask(),从代码可以看到,这里taskScheduler.scheduleTask()是一个lambda, 即调用SqlStageExecution.scheduleTask()来进行调度。

    @Override
    public ScheduleResult schedule()
    {
        OptionalInt totalPartitions = OptionalInt.of(partitionToNode.size());
        List<RemoteTask> newTasks = IntStream.range(0, partitionToNode.size()) // 往每一个partitionToNode上调度task。
                .mapToObj(partition -> taskScheduler.scheduleTask(partitionToNode.get(partition), partition, totalPartitions)) // taskScheduler是一个lambda, 具体的taskScheduler是对应的Stage的Scheduler
                .filter(Optional::isPresent)
                .map(Optional::get)
                .collect(toImmutableList());
        // 可以看到,finished=true, 表示这个FixedCountScheduler的调度已经结束
        return new ScheduleResult(true, newTasks, 0);
    }

3.2 SourcePartitionedScheduler

从上文的讲解可以看到,SourcePartitionedScheduler是用来给读非bucket表的Stage进行调度的StageScheduler实现。
这个Scheduler会不断的从后端的Connector取出splits,然后通过DynamicSplitPlacementPolicy来确定调度的目标节点,即准备好assignment信息,assignment表达的是节点和准备调度到该节点的所有splits。
然后开始委托SqlStageScheduler.scheduleSplit()进行调度。虽然名字叫scheduleSplit, 但是处理逻辑是,如果目标节点上还没有创建好task,那么必须先创建好task,因为,任何时候,不存在单独的split调度,我们说split调度,其实指的是,把split调度到对应节点上运行的task上去,即,Coordinator将split给attach到对应的task对象(这里是HttpRemoteTask),task会通过http到方式将split信息发送给worker端的task, 完成split的调度

3.3 FixedSourcePartitionedScheduler

SourcePartitionedScheduler一样,FixedSourcePartitionedScheduler也是用来进行含有splitSource的stage的调度的。
但是,FixedSourcePartitionedScheduler是委托SourcePartitionedScheduler来进行splitSource的调度,而自己来负责Task的调度。上面说过,不存在单独的split的调度,我们讲split的调度,其实指的是把split给attach到已经调度好的task上去。因此,FixedSourcePartitionedScheduler其实会先把task调度出去,然后再开始委托SourcePartitionedScheduler来进行splitSource的调度。由于SourcePartitionedScheduler在调度split的时候会检查对应的task是否创建好,如果创建好了就不会再创建task,因此,通过这种方式,来让SourcePartitionedScheduler仅仅调度split,而不再创建task。

那么,既然SourcePartitionedScheduler也能调度task, 为什么我们还需要FixedSourcePartitionedScheduler呢?因为FixedSourcePartitionedScheduler在构建的时候就已经知道调度的目标节点(存放在NodePartitionMap或者BucketNodeMap中),比如在读取bucket表的时候,已经根据bucket的数量确定了分区数量并且为每个分区选择了节点,所以直接把task一次性调度出去,并且由于是bucket表,因此进行split的调度的时候也不是使用DynamicSplitPlacementPolicy而是使用BucketedSplitPlacementPolicy(当然,split必须调度到task所在的节点中)。而SourcePartitionedScheduler读取的表为非bucket表,Connector这一层返回的split会被均匀调度到几乎所有的worker node(使用的是DynamicSplitPlacementPolicy),即SourcePartitionedScheduler在分配task到时候是将task调度到有split的节点上,并且对此并不预先知道,而是在有了split的assignment以后,按照这个split的assignment去调度task。

总结

从本文的整个分析来看,作为一个计算引擎,Presto在调度层的实现是非常复杂的,这是因为Presto作为计算引擎所需要处理的情况很复杂。
本文涉及到的处理逻辑、代码量非常大, 因此难免会有一些理解错误。如果有兴趣,可以一起交流。
WeChat: Vico_Wu

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/693480.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

UE5.1.1 c++从0开始(14.用C++写UMG类)

先在这里放一个链接防止第一次看的朋友们不知道我在讲什么&#xff1a;https://www.bilibili.com/video/BV1nU4y1X7iQ/ 这一段的教程不难&#xff0c;唯一新建的C类是UMG的一个类。这个类用来写绑定在ai身上的血条。 总结一下一共做了什么事情&#xff1a; 给ai写了一个血条…

LeetCode Java两个单链表相交的一系列问题

题目描述 单链表可能有环&#xff0c;也可能无环。给定两个单链表的头节点 head1和head2&#xff0c;这两个链表可能相交&#xff0c;也可能不相交。 请实现一个函数&#xff0c;如果两个链表相交&#xff0c;请返回相交的第一个节点&#xff1b;如果不相交&#xff0c;返回n…

Android 渐变背景色

目录 一、背景 二、渐变 2.1 线性渐变背景色 1.新建资源文件 2.编辑样式文件 3.使用 4.编辑样式参数说明 2.2 圆角按钮渐变背景色 2.3 放射渐变 2.4 扫描线渐变 一、背景 单纯的颜色背景已经不能够满足UI大佬们的发挥&#xff0c;渐变色背景无疑成了一个炫技的方向。现在…

chatgpt赋能python:Python调用同一个类中方法详解

Python调用同一个类中方法详解 在Python编程中&#xff0c;类是一种非常重要的概念&#xff0c;可用于组织和管理代码。在同一个类中&#xff0c;可以定义多个方法。本文将详细介绍如何调用同一个类中的方法。 什么是类方法&#xff1f; 在Python中&#xff0c;类方法是指类…

魔兽世界自己架设任务

在魔兽世界中&#xff0c;玩家可以使用游戏内的任务编辑器自己架设任务来增加游戏的乐趣和挑战性。以下是详细的步骤&#xff1a; 第一步&#xff1a;打开任务编辑器 玩家可以在游戏中按下“ESC”键&#xff0c;进入游戏设置页面。在这个页面中&#xff0c;有一个“编辑器”选…

DSL查询分类与全文检索查询

DSL查询分类 Elasticsearch提供了基于JSON的DSL&#xff08;Domain Specific Language&#xff09;来定义查询。常见的查询类型包括&#xff1a; 查询所有&#xff1a;查询出所有数据&#xff0c;一般测试用。例如&#xff1a;match_all全文检索&#xff08;full text&#x…

Idea新建springboot项目遇到的问题及解决

1.更换阿里云 方法&#xff1a; 找到文件路径&#xff1a;Settings > Build,Execution,Deployment > Build Tools > Maven 如下图&#xff1a; 找到相应的settings文件 如果没有就新建一个同名文件&#xff0c;内容如下&#xff1a; <settings xmlns"h…

Gitlab回退到指定版本的方法与步骤

一、先根据分支获取代码 如下&#xff1a; 下载好后&#xff0c;通过右键菜单进入git bash here 就进入下面界面 去gitlab上面去寻找需要的faf0af86d24f7de73b024785ad864f36da4284e2 git reset --hard cf2a5283b9a79f8cf04b003d05cdd94b2b3ff166 执行命令“git push -f”&…

vue中对语句的语义进行比较

一、安装 string-similarity库 npm install string-similarity二、html <div><input type"text" v-model"string1" placeholder"文本1" /> </div> <div><input type"text" v-model"string2" p…

[计算机入门]了解键盘

2.1 了解键盘 键盘一般可以根据按键的功能进行分区&#xff0c;一般分为&#xff1a;主键盘区、小键盘区、控制键区、功能键区、指示灯区。下面介绍键盘的各个分区按键及功能。 2.1.1 主键盘区 主键盘区又叫打字键盘区或字符键区&#xff0c;具有标准英文打字机键盘的格式。…

【Java】Java 中的栈和堆内存

本文仅供学习参考&#xff01; 相关教程地址&#xff1a; https://www.cnblogs.com/whgw/archive/2011/09/29/2194997.html https://www.developer.com/java/stack-heap-java-memory/ https://zhuanlan.zhihu.com/p/529280783 Java 数据类型在执行过程中存储在两种不同形式的内…

HOT24-回文链表

leetcode原题链接&#xff1a;回文链表 题目描述 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1] 输出&#xff1a…

Leetcode 刷题 动态规划

198. 打家劫舍 1. 确定dp数组&#xff08;dp table&#xff09;以及下标的含义 dp[i]&#xff1a;考虑下标i&#xff08;包括i&#xff09;以内的房屋&#xff0c;最多可以偷窃的金额为dp[i] 2. 确定递推公式 dp[i] max(dp[i - 2] nums[i], dp[i - 1]); 3. dp数组如何初始…

分类预测 | MATLAB实现GA-BiLSTM遗传算法优化双向长短期记忆网络的数据多输入分类预测

分类预测 | MATLAB实现GA-BiLSTM遗传算法优化双向长短期记忆网络的数据多输入分类预测 目录 分类预测 | MATLAB实现GA-BiLSTM遗传算法优化双向长短期记忆网络的数据多输入分类预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 MATLAB实现GA-BiLSTM遗传算法优化双向长短…

【数据库】MySQL慢查询常用分析方法

系统慢慢越来越卡了&#xff0c;怎么定位系统慢的原因&#xff0c;大部分是因为服务器资源占用耗费高引起的&#xff0c;如CPU&#xff0c;内存和带宽等等。MySQL在日常开发工作中可能会遇到某个新功能在测试时需要很久才返回结果&#xff0c;这时就应该分析是不是慢查询导致的…

【javascript】2048小游戏

目录 什么是2048 游戏状态机 游戏界面绘制 3.1 界面 3.2 数字的背景颜色 分数逻辑 4.1 加分 4.2 更新最高分 方向控制逻辑 5.1 数组 5.2 随机数 5.3 初始化 5.4 判断数组是否全部填满 5.5 判断方格是否还能移动 5.6 上下左右的监听事件 5.7 移动 完整代码 …

微服务划分的姿势

我们知道微服务是一种理念&#xff0c;没有确切的定义和边界&#xff0c;好比设计原则&#xff0c;是属于抽象的概念。在定义不明确的情况下谈划分也是一种各说各话&#xff0c;具体问题需要具体分析&#xff0c;所以这篇文章谈到的划分也不是绝对标准&#xff0c;仅供参考。 有…

final, finally和finalize的区别

final、finally和finalize是Java中用于异常处理的关键字。每个关键字都有不同的功能。final是一个访问修饰符&#xff0c;finally是异常处理中的代码块&#xff0c;而finalize是Object类的方法。 除此之外&#xff0c;final、finally和finalize之间还存在许多区别。下面是final…

netty学习(1):1个客户端与服务器通信

1. 新建maven工程&#xff0c;添加netty依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"…

Avoid adding reactive properties to a Vue instance or its root $da

避免在运行时向Vue实例或其根$data添加反应性属性-在数据选项中预先声明它。 在页面中声明对象&#xff0c;直接修改即可。 data(){return{addressInfo:{}}}