Impala4.x源码阅读笔记(二)——Impala如何高效读取Iceberg表

news2024/11/8 22:01:05

前言

本文为笔者个人阅读Apache Impala源码时的笔记,仅代表我个人对代码的理解,个人水平有限,文章可能存在理解错误、遗漏或者过时之处。如果有任何错误或者有更好的见解,欢迎指正。

Iceberg表是一种用于存储大规模结构化数据的开源表格式,旨在提供高效的数据存储和丰富的查询能力。不同于Parquet,Orc等文件格式定义了数据如何在文件中存储和索引,Iceberg作为一种表格式定义的是数据文件如何组织,换句话说就是一系列的数据文件如何构成一张表以及我们如何从大量数据文件中找到我们需要的。Iceberg表支持事务性写入和多版本并发控制,这使得它适合于需要大规模数据存储和高效查询的数据湖或数据仓库环境。Iceberg表还提供了对数据架构变化的良好支持,使得数据架构的演进变得更加容易。

Iceberg表的基本结构包括数据文件和元数据文件两部分,这些文件被组织在目录结构中,其中数据文件是实际存储数据的地方,支持Parquet、Orc等业界主流数据文件格式。元数据文件除了记录表的结构信息和演进记录外,还有一种Manifest文件用于索引数据文件,支持Iceberg使用快照(Snapshot)的概念来维护表的版本信息,可以快速定位某个版本的表包括了哪些数据文件,这使得我们可以方便地在Iceberg表上进行时间旅行查询和回滚操作。另外在Iceberg V2格式的表中,还在之前V1格式表的基础之上增加了数据的行级更新与删除能力,通过写入专门的Delete File和MOR(Merge On Read)技术,可以在不重写现有数据文件的前提下实现行级别的删除。

在impala步入4.x的大版本后,对Iceberg表的支持一直是社区关注的重点。在社区的重点投入下,截止Apache Impala 4.3.0版本,Impala对Iceberg表的支持度已经相当高了,除了建表删表修改字段等常规表支持的操作外,对Iceberg特有的时间旅行、版本回滚、清理快照也进行了支持,另外还提供了从Hive表迁移到Iceberg的功能。社区目前正在大力推进对于Iceberg V2表的支持,目前已经完整支持了对Iceberg表的行级Delete和MOR读取。Impala在实现MOR时没有使用Iceberg API提供的读取方法,而是使用了自身由C++实现的执行引擎进行读取。得益于Impala本身对于HDFS+Parquet表的长期优化,使得Impala在Iceberg的MOR读取性能方面也具有优势。

本文主要根据源码就Impala如何高效读取地Iceberg表进行分析,这里的读取包括了同时包括了对Iceberg表的时间旅行和MOR的支持。分析的过程中着重于扫描Iceberg表的执行计划如何制定,以及期间做了哪些优化。

准备工作

为了分析Impala如何读取Iceberg表,我们可以从一个简单但是功能覆盖充分的例子入手。首先我们使用Impala创建一张Iceberg V2表,并进行一些数据写入和删除操作:

-- 创建一张V2格式的Iceberg表
CREATE TABLE ice_v2 (id int, name string) STORED BY ICEBERG TBLPROPERTIES('format-version'='2');
-- 插入最初的两条数据
INSERT INTO ice_v2 VALUES (1, 'a'), (2, 'b');
-- 删除id为2的一条数据
DELETE FROM ice_v2 WHERE id = 2;
-- 再插入一条数据
INSERT INTO ice_v2 VALUES (3, 'c');

现在我们就准备好了一张包含Delete数据的Iceberg表了,因为我们总共进行了三次DML操作,现在这张Iceberg表就有3个快照了。在Impala中可以通过执行DESCRIBE HISTORY语句查看Iceberg表历史版本:

在这里插入图片描述

也可以看见数据文件中有两个Insert文件和一个Delete文件,都使用Parquet格式储存:

在这里插入图片描述

为了后续一些概念的理解,我们再看下这三个文件中的数据:

在这里插入图片描述

可以发现Insert文件中的数据和我们执行的Insert语句是对应的,而Delete文件中的数据这是另外的Schema,记录的是删除的行所在的文件和位置。在有了这些信息之后,我们就可以开始分析Impala是如何读取一张Iceberg表的了。

执行计划

首先我们先看一下扫描Iceberg表的执行计划长什么样,执行如下SQL:

-- Result: (3, 'c'), (1, 'a')
SELECT * FROM ice_v2 FOR SYSTEM_VERSION AS OF 5109113003992490801

其中FOR SYSTEM_VERSION AS OF子句就是对Iceberg表进行时间旅行查询使用的,5109113003992490801DESCRIBE HISTORY中显示的Iceberg表最新的快照ID。这样我们可以指定一个Iceberg快照版本进行查询,当然不指定时Impala会默认查询最新的快照,因此实际上这个查询子句加与不加结果是一样的,只是为了后面触发时间旅行查询相关的逻辑。我们也可以用FOR SYSTEM_TIME AS OF子句来指定一个时间点来查询这个时间点时这张Iceberg表的数据,就像进行了时间旅行穿越回过去直接查询这张表一样,也就是所谓的时间旅行查询了。虽然这只是一个简单的SELECT *查询,但是它在Impala中的执行计划却不简单:

在这里插入图片描述

从SQL角度来说,一条SQL在Impala中的处理过程逻辑上大体可以分为四个阶段,解析、分析、计划与执行。其中解析就是解析SQL,通过SQL Parser将SQL字符串转换为语句类StatementBase的子类对象,比如说上文中的查询会被转换为SelectStmt,其中又包含SelectListFromClause等子句部分,FromClause又包括了一个TableRef的列表表示FROM子句后面的各个表或者类似于表的对象,在TableRef中又包括一个TimeTravelSpec对象对应了我们的时间旅行子句。这些语句对象都实现了自己的analyze方法,会在后面的分析过程中调用。比如说SelectStmtanalyze会调用FromClauseanalyze,同时自己还会进行星号展开、注册Slot等工作。FromClauseanalyze则会分析路径、建立别名并调用各个TableRefanalyze方法。在TableRefanalyze方法中除了分析Join、Hints之外也会调用TimeTravelSpecanalyze方法。TimeTravelSpecanalyze方法则会计算AS OF后的表达式得到目标时间旅行的版本或者时间,为后续索引数据文件做准备。

经过了复杂的分析之后就可以进入计划阶段了,计划阶段的任务就是根据语句类对象和先前的分析结果给查询制定一个执行计划。执行计划是一个由各种PlanNode的子类对象构成的一个树状结构PlanTree,它会指导查询如何进行执行,具体包括执行需要哪些结点参与,结点间的数据流向等等。执行计划会被转为Thrift结构体传递给Impala的C++执行引擎进行执行,执行时PlanNode会转变为对应ExecNode的子类对象,每个结点都负责各自对应的工作,比如说ScanNode负责一张表的扫描任务,是树中的叶子结点。而JoinNodeUnionNode分别负责连接任务与合并任务,都有多个输入和一个输出。数据在结点间以行批的形式传递,最终汇聚到根结点并返回给客户端完成查询执行。

计划阶段又可以大体分为两部分,首先是制定单点执行计划,给出一颗完整的执行计划树。但是单点执行计划还不足以指导查询执行,Impala作为一个MPP架构的执行引擎可以在分布式集群的多个执行者上并发执行查询,因此还需要将单点执行计划转变为分布式执行计划。分布式执行计划实际上就是将单点执行计划切分为多个片段Fragment,并在其间插入一些交换节点ExchangeNode用于在Fragment之间传递数据。在上面的执行计划树图中我们可以看到结点间有虚线和实线两种连接方式,实线连接的结点就是同属于一个Fragment的,而虚线连接的是不同的FragmentFragment是查询执行阶段可以调度的最小执行单元,调度器可以根据Fragment的性质将其调度到一个或多个执行者上创建执行实例Instance并开始实际执行。比如说对于只有一个文件的扫描结点的Fragment,我们只需要将其调度到一个执行者上执行就够了,而那些扫描文件很多或者需要高并发执行的Fragment,我们则需要将其调度到多个执行者并发加速执行。

在Impala的执行计划中一般都是一个ScanNode对应负责一张表的扫描任务,而从图中可以发现,我们为了扫描这张Iceberg表却出动了三个ScanNode,甚至还有一个JoinNode和一个UnionNode,这主要是因为Iceberg表中有Delete文件出现了。我们可以对比看一下没有Delete文件时的Iceberg表扫描查询计划是怎样的,执行SQL:

-- Result: (1, 'a'), (2, 'b')
SELECT * FROM ice_v2 FOR SYSTEM_TIME AS OF '2023-12-07 16:00:05.5';

这次我们使用时间旅行回到表进行了第一次插入但是还没有进行Delete的时间点进行查询,执行计划如下:

在这里插入图片描述

这回就是简简单单一个ScanNode了。Impala究竟是如何为Iceberg表制定扫描计划的?我们可以从代码中找到答案。

代码分析

本文要分析的Iceberg表的扫描计划的制定是单点执行计划的制定的一部分,完整的执行计划制定代码是十分庞大和复杂的,幸运的是我们只需要关注其中的一小部分就能得到需要的答案。首先我们先快速深入的目标代码的位置,调用路径如下:

getPlannedExecRequest() -> createExecRequest() -> createPlans() -> createPlanFragments()
-> createSingleNodePlan() -> createQueryPlan() -> createSelectPlan() -> createTableRefsPlan()
-> createTableRefNode() -> createScanNode() {
  // 如果TableRef是实际的表就会调用createScanNode()方法为其创建扫描结点
  ...
  FeTable table = tblRef.getTable();
  // 通过表的类型选择对应的方法创建ScanNode
  // Iceberg表是FeIcebergTable同时也实现了FeFsTable接口,同时也是一种文件系统表(Hdfs表)
  if (table instanceof FeFsTable) {
    if (table instanceof FeIcebergTable) {
      // 对于Iceberg表有专门的IcebergScanPlanner创建扫描结点
      // IcebergScanPlanner构造时需要传入分析信息、查询上下文、表引用、连词和聚合信息
      // 同时还会在构造方法中完成谓词抽取,即将conjuncts中可以下推到Iceberg API中的谓词抽取出来并转换为Iceberg的谓词对象
      IcebergScanPlanner icebergPlanner = new IcebergScanPlanner(analyzer, ctx_, tblRef,
          conjuncts, aggInfo);
      return icebergPlanner.createIcebergScanPlan();
    }
    return createHdfsScanPlan(tblRef, aggInfo, conjuncts, analyzer);
  } else if (table instanceof FeDataSourceTable) {
  ...
}

接下来就进入正题了,我们看下IcebergScanPlanner::createIcebergScanPlan()是如何给Iceberg表创建扫描计划的:

public PlanNode createIcebergScanPlan() throws ImpalaException {
  // 首先是一个重要的判断,决定是否需要调用Iceberg的planFiles() API来列出待扫描文件
  // needIcebergForPlanning()等价于 !impalaIcebergPredicateMapping_.isEmpty() || tblRef_.getTimeTravelSpec() != null;
  // 即如果有谓词下推或者有时间旅行子句的话,我们是需要调用planFiles()来列出待扫描文件的
  // 因为这种情况下要扫描的文件往往不是最新快照的全部数据文件,而是其的一个子集或者是其他快照版本包含的数据文件
  if (!needIcebergForPlanning()) {
    // 为连词中引用到的字段进行槽位物化,槽位物化可以理解为在数据行的内存定义中保留一块内存存放对应字段的数据
    // 因为谓词评估需要使用这些字段进行计算,所以即使select list中没有它们,我们必须在数据行定义中给它们预留位置
    analyzer_.materializeSlots(conjuncts_);
    // 对于不需要planFiles()的扫描,我们可以直接调用setFileDescriptorsBasedOnFileStore()方法
    // 使用缓存的最新快照的文件集合FileStore来设置文件描述符FileDescriptors,用于之后的扫描计划创建
    setFileDescriptorsBasedOnFileStore();
    // 然后调用createIcebergScanPlanImpl()进行具体的扫描计划创建
    return createIcebergScanPlanImpl();
  }

  // 对于进行了谓词下推或包含时间旅行的查询,我们无法根据缓存的最新快照的文件集合来得到需要扫描的FileDescriptors
  // 需要调用filterFileDescriptors()来根据谓词或时间旅行子句来过滤得到需要FileDescriptors
  filterFileDescriptors();
  // 调用filterConjuncts()来过滤谓词,因为在filterFileDescriptors()中我们可能已经将部分谓词下推到Iceberg中了
  // 这部分谓词不需要在执行时再次计算,因为Iceberg API已经在文件层面将不符合谓词的数据文件都过滤了
  // 剩余需要扫描的文件中的数据肯定符合这些已经下推的谓词,因此我们需要将其过滤掉,避免扫描时多余的谓词计算
  // 只将那些没有被下推成功的谓词保留,并传递给扫描结点,由Impala的执行引擎负责计算和过滤数据
  filterConjuncts();
  analyzer_.materializeSlots(conjuncts_);
  return createIcebergScanPlanImpl();
}

对于上文中有时间旅行子句查询,没法进入createIcebergScanPlan()中的if分支,需要调用filterFileDescriptors()来得到应该扫描的文件,我们继续看下其是如何实现的:

private void filterFileDescriptors() throws ImpalaException {
  TimeTravelSpec timeTravelSpec = tblRef_.getTimeTravelSpec();
  // 调用IcebergUtil::planFiles()并传递Iceberg表、之前抽取并转换为Iceberg谓词的谓词列表和时间旅行描述对象
  // 其封装了Iceberg API的TableScan::planFiles()方法,可以将谓词和时间旅行版本传递给Iceberg API
  // 其返回的fileScanTasks就包含了所有我们需要扫描的数据文件,包括Delete文件
  try (CloseableIterable<FileScanTask> fileScanTasks =
      IcebergUtil.planFiles(getIceTable(),
          new ArrayList<>(impalaIcebergPredicateMapping_.keySet()), timeTravelSpec)) {
    long dataFilesCacheMisses = 0;
    for (FileScanTask fileScanTask : fileScanTasks) {
      // 遍历每个FileScanTask,首先处理残余表达式,也就是没有被下推到Iceberg的谓词
      // 这些谓词无法在文件级别应用,需要Impala的扫描结点在行级别应用,将其先全部加入到集合residualExpressions_中
      Expression residualExpr = fileScanTask.residual();
      if (residualExpr != null && !(residualExpr instanceof True)) {
        residualExpressions_.add(residualExpr);
      }
      // 调用getFileDescriptor()来获取文件描述符FileDescriptor和是否命中缓存的标志
      // getFileDescriptor()中封装了一层缓存文件描述符的逻辑,即使未命中缓存也会创建新的并添加到缓存
      Pair<FileDescriptor, Boolean> fileDesc = getFileDescriptor(fileScanTask.file());
      if (!fileDesc.second) ++dataFilesCacheMisses;
      // 如果这个文件没有对应的Delete文件,我们将其加入到无Delete文件的数据文件列表dataFilesWithoutDeletes_中
      if (fileScanTask.deletes().isEmpty()) {
        dataFilesWithoutDeletes_.add(fileDesc.first);
      } else {
        // 否则将其加入到有Delete文件的数据文件列表dataFilesWithDeletes_中
        dataFilesWithDeletes_.add(fileDesc.first);
        // 同时还需要遍历其所有的Delete文件,将其加入到Delete文件列表deleteFiles_中
        for (DeleteFile delFile : fileScanTask.deletes()) {
          // 截止本文撰写时,Impala还未支持等值删除EQUALITY_DELETES,只支持位置删除POSITION_DELETES
          // 如果发现目标表有等值删除文件,我们无法扫描,需要引发一个异常
          if (delFile.content() == FileContent.EQUALITY_DELETES) {
            throw new ImpalaRuntimeException(String.format(
                "Iceberg table %s has EQUALITY delete file which is currently " +
                "not supported by Impala, for example: %s", getIceTable().getFullName(),
                delFile.path()));
          }
          Pair<FileDescriptor, Boolean> delFileDesc = getFileDescriptor(delFile);
          if (!delFileDesc.second) ++dataFilesCacheMisses;
          deleteFiles_.add(delFileDesc.first);
        }
      }
    }
    if (dataFilesCacheMisses > 0) {
      Preconditions.checkState(timeTravelSpec != null);
      LOG.info("File descriptors had to be loaded on demand during time travel: " +
          String.valueOf(dataFilesCacheMisses));
    }
  } catch (IOException | TableLoadingException e) {
    throw new ImpalaRuntimeException(String.format(
        "Failed to load data files for Iceberg table: %s", getIceTable().getFullName()),
        e);
  }
  // 最后更新下Delete文件的统计信息,包括数据行数和行数据量,用于后续预估内存消耗等
  updateDeleteStatistics();
}

对谓词的处理并非本文的关注的重点,所以我们跳过filterConjuncts()方法,继续看关键的createIcebergScanPlanImpl()方法最后如何为Iceberg扫描创建扫描结点的:

private PlanNode createIcebergScanPlanImpl() throws ImpalaException {
  // 对于没有Delete文件的情况,我们只需要一个扫描结点来扫描数据文件就行了,也就是上文中第二个查询的情况
  if (deleteFiles_.isEmpty()) {
    Preconditions.checkState(dataFilesWithDeletes_.isEmpty());
    // 创建一个新的Iceberg扫描结点对象,并传递结点ID、表引用、连词列表、聚合信息、文件列表
    // 非IDENTITY列的连词列表和需要跳过连词列表,然后调用init()方法进行初始化
    // IcebergScanNode继承了HdfsScanNode,大部分逻辑也是复用的HdfsScanNode的
    // 所以很多Hdfs Scan的优化和功能也能在Iceberg扫描中使用
    PlanNode ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, conjuncts_,
        aggInfo_, dataFilesWithoutDeletes_, nonIdentityConjuncts_,
        getSkippedConjuncts());
    ret.init(analyzer_);
    return ret;
  }
  // 如果有Delete文件,那么一个扫描结点就不够用了,我们需要另外一个扫描结点专门负责扫描Delete文件
  // 然后还需要一个反连接结点将两者的数据进行ANTI JOIN,实现删除行的效果
  // 这部分逻辑由createPositionJoinNode()方法实现,我们后面分析,总之它会返回一个JoinNode
  PlanNode joinNode = createPositionJoinNode();

  // 如果先前的分析阶段发现这是可以进行优化的针对Iceberg V2表的count(*)查询
  // 则剩余的无Delete文件的数据文件的不需要实际扫描了,可从元数据中得到行数
  // 因此这里可以直接返回所有数据文件和所有删除文件之间的ANTI JOIN
  // 不需要处理后面的无Delete文件对应的数据文件,它们的行数会被一个ArithmeticExpr直接加到结果中
  if (ctx_.getQueryCtx().isOptimize_count_star_for_iceberg_v2()) return joinNode;

  // 当然,当所有数据文件都有相应的删除文件,即dataFilesWithoutDeletes_为空时
  // 我们也只需要返回所有数据文件和所有删除文件之间的ANTI JOIN
  if (dataFilesWithoutDeletes_.isEmpty()) return joinNode;

  // 如果还有无Delete文件对应的数据文件的话,我们则还需要这些文件创建一个扫描结点
  IcebergScanNode dataScanNode = new IcebergScanNode(
      ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_, dataFilesWithoutDeletes_,
      nonIdentityConjuncts_, getSkippedConjuncts());
  dataScanNode.init(analyzer_);
  // 然后根据表引用的槽位描述符创建输出表达式,并依此创建一个合并结点UnionNode来合并数据
  List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map(
      entry -> new SlotRef(entry)).collect(Collectors.toList());
  UnionNode unionNode = new UnionNode(ctx_.getNextNodeId(), tblRef_.getId(),
      outputExprs, false);
  // 将之前创建的负责无Delete文件的数据扫描结点和负责有Delete文件的连接结点作为UnionNode的输入
  // 并完成初始化后返回,这样这张Iceberg表的扫描计划就创建完成了
  // 根结点为一个UnionNode,也就对应了上文中第一个查询的情况
  unionNode.addChild(dataScanNode, outputExprs);
  unionNode.addChild(joinNode, outputExprs);
  unionNode.init(analyzer_);
  Preconditions.checkState(unionNode.getChildCount() == 2);
  Preconditions.checkState(unionNode.getFirstNonPassthroughChildIndex() == 2);
  return unionNode;
}

看完关键的createIcebergScanPlanImpl()方法后,Iceberg表的扫描计划是如何制定的就已经很清晰了,但是还有最后一块也是最核心的逻辑还需要再进一步分析,也就是期间我们调用的createPositionJoinNode()方法,它被用来创建实现Delete文件和数据文件之间数据行删除逻辑的ANTI JOIN结点,代码分析如下:

  private PlanNode createPositionJoinNode() throws ImpalaException {
    Preconditions.checkState(deletesRecordCount_ != 0);
    Preconditions.checkState(dataFilesWithDeletesSumPaths_ != 0);
    Preconditions.checkState(dataFilesWithDeletesMaxPath_ != 0);
	// 我们需要为数据文件和Delete文件分别创建一个扫描结点,不过在此之前还有些准备工作
    PlanNodeId dataScanNodeId = ctx_.getNextNodeId();
    PlanNodeId deleteScanNodeId = ctx_.getNextNodeId();
    // 首先需要为Delete文件先创建一张虚拟表,因为扫描结点都是以表为基础的,而Delete文件并不是实际存在的表
    IcebergPositionDeleteTable deleteTable = new IcebergPositionDeleteTable(getIceTable(),
        getIceTable().getName() + "-POSITION-DELETE-" + deleteScanNodeId.toString(),
        deleteFiles_, deletesRecordCount_, getFilePathStats());
    analyzer_.addVirtualTable(deleteTable);
    // 有了对应Delete文件的虚拟表IcebergPositionDeleteTable后,再为其创建表引用对象TableRef
    TableRef deleteDeltaRef = TableRef.newTableRef(analyzer_,
        Arrays.asList(deleteTable.getDb().getName(), deleteTable.getName()),
        tblRef_.getUniqueAlias() + "-position-delete");
    // 为了数据文件和Delete文件之间能够进行ANTI JOIN,我们还需要为数据文件的表添加相关的虚拟列
    // 这些虚拟列并不是实际存在于表中的列,而是为了作为Join Key存在的
    // addDataVirtualPositionSlots()方法会为数据表添加两个虚拟列,分别为INPUT__FILE__NAME和FILE__POSITION
    // Impala已经为Parquet和Orc文件实现了这两个虚拟列,其数据可以在扫描时填充,含义分别是文件名和行号
    addDataVirtualPositionSlots(tblRef_);
    // addDeletePositionSlots()方法会为Delete文件的虚拟表添加两个实际列,分别为file_path和pos
    // 这两个列是在Delete文件中实际存在的,可以看上文中解析Delete文件得到的结果
    addDeletePositionSlots(deleteDeltaRef);
    // 然后就可以为数据文件和Delete文件分别创建一个扫描结点并初始化了
    IcebergScanNode dataScanNode = new IcebergScanNode(
        dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_,
        nonIdentityConjuncts_, getSkippedConjuncts(), deleteScanNodeId);
    dataScanNode.init(analyzer_);
    IcebergScanNode deleteScanNode = new IcebergScanNode(
        deleteScanNodeId,
        deleteDeltaRef,
        Collections.emptyList(), /*conjuncts*/
        aggInfo_,
        Lists.newArrayList(deleteFiles_),
        Collections.emptyList(), /*nonIdentityConjuncts*/
        Collections.emptyList()); /*skippedConjuncts*/
    deleteScanNode.init(analyzer_);

    // 然后是ANTI JOIN结点的创建,首先需要调用createPositionJoinConjuncts()创建连接谓词
    // 它会生成两个EQ谓词,相当于INPUT__FILE__NAME = file_path AND FILE__POSITION = pos
    // 这样数据文件中的行和Delete文件中的行匹配上时就可以在ANTI JOIN中被剔除了,实现了删除的效果
    List<BinaryPredicate> positionJoinConjuncts = createPositionJoinConjuncts(
            analyzer_, tblRef_.getDesc(), deleteDeltaRef.getDesc());
    // Impala还实现了一种专门针对Iceberg Delete文件DELETE JOIN结点IcebergDeleteNode
    // 如果没设置query option disable_optimized_iceberg_v2_read的话
    // 就会使用IcebergDeleteNode代替LEFT_ANTI_JOIN的HashJoinNode来进行ANTI JOIN
    // IcebergDeleteNode继承了HashJoinNode,同时专门针对Iceberg Delete进行了优化
    TQueryOptions queryOpts = analyzer_.getQueryCtx().client_request.query_options;
    JoinNode joinNode = null;
    if (queryOpts.disable_optimized_iceberg_v2_read) {
      joinNode = new HashJoinNode(dataScanNode, deleteScanNode,
          /*straight_join=*/true, DistributionMode.NONE, JoinOperator.LEFT_ANTI_JOIN,
          positionJoinConjuncts, /*otherJoinConjuncts=*/Collections.emptyList());
    } else {
      joinNode =
          new IcebergDeleteNode(dataScanNode, deleteScanNode, positionJoinConjuncts);
    }
    // ANTI JOIN结点的创建好之后再进行一些初始化工作就可以返回了
    joinNode.setId(ctx_.getNextNodeId());
    joinNode.init(analyzer_);
    joinNode.setIsDeleteRowsJoin();
    return joinNode;
  }

至此,Iceberg表的扫描计划的制定就结束了,后续再完成一些别的必须的工作就可以下发到执行引擎中进行执行了。

总结

这篇文章主要是在执行计划的制定方面分析了Iceberg表在Impala中是如何扫描的,总的来说为了实现高性能的Iceberg的MOR功能,对于一张包含Delete数据的Iceberg表Impala会可能会使用多个扫描结点以及ANTI JOIN结点和UNION结点协同工作,配合完成任务,这样的设计和实现既能复用很多现有的Impala代码,也能利用起现有的很多针对HDFS表的扫描优化。但是文章篇幅有限,实际上代码中还有许多内容无法详细展开分析,对于扫描的实际执行也还未能涉猎,有机会的话会在后续的文章中继续介绍。有兴趣的读者也可以自行阅读,Impala的代码注释还是比较丰富的,代码风格规范也比较好,阅读难度不大。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1308892.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

市场全局复盘 20231213

昨日回顾&#xff1a; SELECT TOP 10000 CODE,成交额排名,净流入排名,代码,名称,DDE大单金额,涨幅 ,主力净额,DDE大单净量,CONVERT(DATETIME, 最后封板, 120) AS 最后封板 FROM dbo.全部&#xff21;股20231213_ALL WHERE 连板天 > 1AND DDE大单净量 > 0AND DDE散户数量…

bugku--source

dirsearch扫一下 题目提示源代码&#xff08;source&#xff09; 也就是源代码泄露&#xff0c;然后发现有.git 猜到是git泄露 拼接后发现有文件 但是点开啥也没有 kali里面下载下来 wegt -r 下载网站的所有内容 ls 查看目录 cd 进入到目录里面 gie reflog 引用日志使用…

Cent OS7 磁盘挂载:扩展存储空间和自动挂载

文章目录 &#xff08;1&#xff09;概述&#xff08;2&#xff09;查看磁盘使用情况&#xff08;3&#xff09;VMware虚拟机挂载磁盘&#xff08;4&#xff09;物理机磁盘挂载&#xff08;5&#xff09;ntfs硬盘处理 &#xff08;1&#xff09;概述 在Linux系统中&#xff0c…

ubuntu 自动安装 MKL Intel fortran 编译器 ifort 及完美平替

首先据不完全观察&#xff0c;gfortran 与 openblas是 intel fortran 编译器 ifotr和mkl的非常优秀的平替&#xff0c;openblas连函数名都跟mkl一样&#xff0c;加了一个下划线。 1&#xff0c; 概况 https://www.intel.com/content/www/us/en/developer/tools/oneapi/base-too…

【Hadoop】Hadoop基础架构的变化

1.x版本架构2.x版本架构3.x版本架构参考 1.x版本架构 NameNode&#xff1a;&#xff0c;负责文件系统的名字空间(Namespace)管理以及客户端对文 件的访问。NameNode负责文件元数据的管理和操作。是单节点。 Secondary NameNode&#xff1a;它的职责是合并NameNode的edit logs到…

SpringBoot进行自然语言处理,利用Hanlp进行文本情感分析

. # &#x1f4d1;前言 本文主要是SpringBoot进行自然语言处理&#xff0c;利用Hanlp进行文本情感分析&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风…

人工智能导论习题集(3)

第五章&#xff1a;不确定性推理 题1题2题3题4题5题6题7题8 题1 题2 题3 题4 题5 题6 题7 题8

过滤(删除)迭代对象中满足指定条件的元素itertools.filterfalse()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 过滤(删除)迭代对象中 满足指定条件的元素 itertools.filterfalse() [太阳]选择题 请问以下代码输出的结果是&#xff1f; a [1, 2, 3, 4, 5] print("【显示】a ",a) import ite…

关于Cython生成的so动态链接库逆向

来个引子&#xff1a;TPCTF的maze题目 如何生成这个so文件 为了研究逆向&#xff0c;我们先搞个例子感受一下生成so的整个过程&#xff0c;方便后续分析 创建对应python库文件 testso.py def test_add(a,b):a int(a)b int(b)return a bdef test_calc(li):for i in range…

redis-学习笔记(Jedis zset 简单命令)

zadd & zrange zadd , 插入的第一个参数是 zset , 第二个参数是 score, 第三个参数是 member 成员 内部依据 score 排序 zrange 返回 key 对应的 对应区间内的值 zrangeWithScore 返回 key 对应的 对应区间内的值和分数 示例代码 zcard 返回 key 对应的 zset 的长度 示例代…

05-命令模式

意图&#xff08;GOF定义&#xff09; 将一个请求封装为一个对象&#xff0c;从而使你可用不同的请求对客户端进行参数化&#xff0c;对请求排队或者记录日志&#xff0c;以及可支持撤销的操作。 理解 命令模式就是把一些常用的但比较繁杂的工作归类为成一组一组的动作&…

使用 Taro 开发鸿蒙原生应用 —— 当 Taro 遇到纯血鸿蒙 | 京东云技术团队

纯血鸿蒙即将到来 在今年 8 月的「2023年华为开发者大会&#xff08;HDC.Together&#xff09;」上&#xff0c;华为正式官宣「鸿蒙Next」&#xff0c;这个更新的版本将移除所有的 AOSP 代码&#xff0c;彻底与 Android 切割&#xff0c;使其成为一个完全自主研发的操作系统&a…

【视觉SLAM十四讲学习笔记】第四讲——指数映射

专栏系列文章如下&#xff1a; 【视觉SLAM十四讲学习笔记】第一讲——SLAM介绍 【视觉SLAM十四讲学习笔记】第二讲——初识SLAM 【视觉SLAM十四讲学习笔记】第三讲——旋转矩阵 【视觉SLAM十四讲学习笔记】第三讲——旋转向量和欧拉角 【视觉SLAM十四讲学习笔记】第三讲——四元…

LeetCode008之字符串转换整数 (相关话题:状态机)

题目描述 请你来实现一个 myAtoi(string s) 函数&#xff0c;使其能将字符串转换成一个 32 位有符号整数&#xff08;类似 C/C 中的 atoi 函数&#xff09;。 函数 myAtoi(string s) 的算法如下&#xff1a; 读入字符串并丢弃无用的前导空格检查下一个字符&#xff08;假设还…

以太网协议与DNS

以太网协议 以太网协议DNS 以太网协议 以太网用于在计算机和其他网络设备之间传输数据,以太网既包含了数据链路层的内容,也包含了物理层的内容. 以太网数据报: 其中目的IP和源IP不是网络层的目的IP和源IP,而是mac地址.网络层的主要负责是整体的转发过程,数据链路层负责的是局…

分库分表及ShardingShpere-proxy数据分片

为什么需要分库&#xff1f; 随着数据量的急速上升&#xff0c;单个数据库可能会QPS过高导致读写耗时过长而出现性能瓶颈&#xff0c;所以需要考虑拆分数据库&#xff0c;将数据库分布在不同实例上提升数据库可用性。主要的原因有如下&#xff1a; 磁盘存储。业务量剧增&…

bugku--Simple_SSTI_1---2

第一题 看到一句话&#xff0c;需要传入一个传参为flag 设置一个变量为 secret_key 构造paykoad /?flagsecret_key 但是发现什么都没有 SSTI模版注入嘛 这里使用的是flask模版 Flask提供了一个名为config的全局对象&#xff0c;可以用来设置和获取全局变量。 继续构造pa…

揭示 ETL 系统架构中的 OLAP、OLTP 和 HTAP

探索 ETL 系统设计需要了解 OLAP、OLTP 和不断发展的 HTAP。让我们试图剖析这些范式的复杂性。 1. OLAP&#xff08;联机分析处理&#xff09;&#xff1a; OLAP 是商业智能的中流砥柱&#xff0c;通过 OLAP 立方体进行多维数据分析。这些立方体封装了预先聚合、预先计算的数据…

关键点检测之修改labelme标注的json中类别名

import json import os import shutil#source_dir表示数据扩增之后的文件夹路径&#xff0c;此时标注的是多分类的标签 #new_dir表示转化之后得到的二分类文件夹def to2class():#json存放路径source_dir r1#json保存路径new_dir r1for i in os.listdir(source_dir):if i.ends…

人工智能_机器学习065_SVM支持向量机KKT条件_深度理解KKT条件下的损失函数求解过程_公式详细推导_---人工智能工作笔记0105

之前我们已经说了KKT条件,其实就是用来解决 如何实现对,不等式条件下的,目标函数的求解问题,之前我们说的拉格朗日乘数法,是用来对 等式条件下的目标函数进行求解. KKT条件是这样做的,添加了一个阿尔法平方对吧,这个阿尔法平方肯定是大于0的,那么 可以结合下面的文章去看,也…