Spark Explain:查看执行计划

news2024/12/19 9:28:27

Spark SQL explain 方法有 simple、extended、codegen、cost、formatted 参数,具体如下

目录

    • 一、基本语法
    • 二、执行计划处理流程
    • 三、具体案例

一、基本语法

从 3.0 开始,explain 方法有一个新的 mode 参数,指定执行计划展示格式

  • 只展示物理执行计划,默认 mode 是 simple
    • spark.sql(sqlstr).explain()
  • 展示物理执行计划和逻辑执行计划
    • spark.sql(sqlstr).explain(mode=“extended”)
  • 展示要 Codegen 生成的可执行 Java 代码
    • spark.sql(sqlstr).explain(mode=“codegen”)
  • 展示优化后的逻辑执行计划以及相关的统计
    • spark.sql(sqlstr).explain(mode=“cost”)
  • 格式化输出更易读的物理执行计划,展示每个节点的详细信息
    • spark.sql(sqlstr).explain(mode=“formatted”)

二、执行计划处理流程

在这里插入图片描述

  • 流程
    • 开始执行的 SQL 或者 DateFrame 操作会生成一个 Unresolved Logical Plan,也就是未决断逻辑计划,从语法的角度去进行校验,校验关键字等是否准确
    • 然后通过 Catalog 进行分析校验,校验表名列名是否存在,生成一个 Logical Plan 逻辑计划,当前阶段可以直接拿来跑
    • 但是同一个结果可以有不同的操作方式和执行顺序,会自动生成一个 Logical Optimization 逻辑优化操作,生成一个 Optimized Logical Plan 优化后的逻辑计划
    • 然后再转换成 Physical Plan 物理计划,通过 Cost Model ,也就是 CBO 代价选择去选择一个代价小的物理计划
    • 最后生成可执行 Java 代码转换成 RDD

优化后可以分为 5个 步骤
在这里插入图片描述

select
  sc.courseid,
  sc.coursename,
  sum(sellmoney) as totalsell
from sale_course sc join course_shopping_cart csc
  on sc.courseid=csc.courseid and sc.dt=csc.dt and sc.dn=csc.dn
group by sc.courseid,sc.coursename
 
# Unresolved 逻辑执行计划
== Parsed Logical Plan ==
'Aggregate ['sc.courseid, 'sc.coursename], ['sc.courseid, 'sc.coursename, 'sum('sellmoney) AS totalsell#38]
+- 'Join Inner, ((('sc.courseid = 'csc.courseid) AND ('sc.dt = 'csc.dt)) AND ('sc.dn = 'csc.dn))
   :- 'SubqueryAlias sc
   :  +- 'UnresolvedRelation [sale_course], [], false
   +- 'SubqueryAlias csc
      +- 'UnresolvedRelation [course_shopping_cart], [], false
 
# Resolved 逻辑执行计划
== Analyzed Logical Plan ==
courseid: bigint, coursename: string, totalsell: double
Aggregate [courseid#3L, coursename#5], [courseid#3L, coursename#5, sum(cast(sellmoney#22 as double)) AS totalsell#38]
+- Join Inner, (((courseid#3L = courseid#17L) AND (dt#15 = dt#23)) AND (dn#16 = dn#24))
   :- SubqueryAlias sc
   :  +- SubqueryAlias spark_catalog.spark_optimize.sale_course
   :     +- Relation spark_optimize.sale_course[chapterid#1L,chaptername#2,courseid#3L,coursemanager#4,coursename#5,edusubjectid#6L,edusubjectname#7,majorid#8L,majorname#9,money#10,pointlistid#11L,status#12,teacherid#13L,teachername#14,dt#15,dn#16] parquet
   +- SubqueryAlias csc
      +- SubqueryAlias spark_catalog.spark_optimize.course_shopping_cart
         +- Relation spark_optimize.course_shopping_cart[courseid#17L,coursename#18,createtime#19,discount#20,orderid#21,sellmoney#22,dt#23,dn#24] parquet
 
# 优化后的逻辑执行计划
== Optimized Logical Plan ==
Aggregate [courseid#3L, coursename#5], [courseid#3L, coursename#5, sum(cast(sellmoney#22 as double)) AS totalsell#38]
+- Project [courseid#3L, coursename#5, sellmoney#22]
   +- Join Inner, (((courseid#3L = courseid#17L) AND (dt#15 = dt#23)) AND (dn#16 = dn#24))
      :- Project [courseid#3L, coursename#5, dt#15, dn#16]
      :  +- Filter ((isnotnull(courseid#3L) AND isnotnull(dt#15)) AND isnotnull(dn#16))
      :     +- Relation spark_optimize.sale_course[chapterid#1L,chaptername#2,courseid#3L,coursemanager#4,coursename#5,edusubjectid#6L,edusubjectname#7,majorid#8L,majorname#9,money#10,pointlistid#11L,status#12,teacherid#13L,teachername#14,dt#15,dn#16] parquet
      +- Project [courseid#17L, sellmoney#22, dt#23, dn#24]
         +- Filter ((isnotnull(courseid#17L) AND isnotnull(dt#23)) AND isnotnull(dn#24))
            +- Relation spark_optimize.course_shopping_cart[courseid#17L,coursename#18,createtime#19,discount#20,orderid#21,sellmoney#22,dt#23,dn#24] parquet
 
# 物理执行计划
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
# HashAggregate 表示数据聚合,一般是成对出现,第一个执行节点本地的数据进行局部聚合,另一个是将各个分区的数据进一步进行聚合计算
+- HashAggregate(keys=[courseid#3L, coursename#5], functions=[sum(cast(sellmoney#22 as double))], output=[courseid#3L, coursename#5, totalsell#38])
  # Exchange 就是 shuffle,在集群上移动数据,很多时候 HashAggregate 会以 Exchange 分隔开
   +- Exchange hashpartitioning(courseid#3L, coursename#5, 200), ENSURE_REQUIREMENTS, [id=#127]
      +- HashAggregate(keys=[courseid#3L, coursename#5], functions=[partial_sum(cast(sellmoney#22 as double))], output=[courseid#3L, coursename#5, sum#44])
        # Project 是 SQL 中的投影操作,选择列
         +- Project [courseid#3L, coursename#5, sellmoney#22]
          # BroadcastHashJoin 广播方式进行 HashJoin
            +- BroadcastHashJoin [courseid#3L, dt#15, dn#16], [courseid#17L, dt#23, dn#24], Inner, BuildLeft, false
               :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false], input[2, string, true], input[3, string, true]),false), [id=#122]
               :  +- Filter isnotnull(courseid#3L)
               :     +- FileScan parquet spark_optimize.sale_course[courseid#3L,coursename#5,dt#15,dn#16] Batched: true, DataFilters: [isnotnull(courseid#3L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/sale_course..., PartitionFilters: [isnotnull(dt#15), isnotnull(dn#16)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,coursename:string>
               +- Filter isnotnull(courseid#17L)
                  +- FileScan parquet spark_optimize.course_shopping_cart[courseid#17L,sellmoney#22,dt#23,dn#24] Batched: true, DataFilters: [isnotnull(courseid#17L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/course_shop..., PartitionFilters: [isnotnull(dt#23), isnotnull(dn#24)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,sellmoney:string>

三、具体案例

测试 SQL

select
  sc.courseid,
  sc.coursename,
  sum(sellmoney) as totalsell
from sale_course sc join course_shopping_cart csc
  on sc.courseid=csc.courseid and sc.dt=csc.dt and sc.dn=csc.dn
group by sc.courseid,sc.coursename
  • 只展示物理执行计划,默认 mode 是 simple
    • spark.sql(sqlstr).explain() / spark.sql(sqlstr).explain(“simple”)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[courseid#3L, coursename#5], functions=[sum(cast(sellmoney#22 as double))])
   +- Exchange hashpartitioning(courseid#3L, coursename#5, 200), ENSURE_REQUIREMENTS, [id=#43]
      +- HashAggregate(keys=[courseid#3L, coursename#5], functions=[partial_sum(cast(sellmoney#22 as double))])
         +- Project [courseid#3L, coursename#5, sellmoney#22]
            +- BroadcastHashJoin [courseid#3L, dt#15, dn#16], [courseid#17L, dt#23, dn#24], Inner, BuildLeft, false
               :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false], input[2, string, true], input[3, string, true]),false), [id=#38]
               :  +- Filter isnotnull(courseid#3L)
               :     +- FileScan parquet spark_optimize.sale_course[courseid#3L,coursename#5,dt#15,dn#16] Batched: true, DataFilters: [isnotnull(courseid#3L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/sale_course..., PartitionFilters: [isnotnull(dt#15), isnotnull(dn#16)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,coursename:string>
               +- Filter isnotnull(courseid#17L)
                  +- FileScan parquet spark_optimize.course_shopping_cart[courseid#17L,sellmoney#22,dt#23,dn#24] Batched: true, DataFilters: [isnotnull(courseid#17L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/course_shop..., PartitionFilters: [isnotnull(dt#23), isnotnull(dn#24)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,sellmoney:string>
  • 展示物理执行计划和逻辑执行计划
    • spark.sql(sqlstr).explain(mode=“extended”)
== Parsed Logical Plan ==
'Aggregate ['sc.courseid, 'sc.coursename], ['sc.courseid, 'sc.coursename, 'sum('sellmoney) AS totalsell#0]
+- 'Join Inner, ((('sc.courseid = 'csc.courseid) AND ('sc.dt = 'csc.dt)) AND ('sc.dn = 'csc.dn))
   :- 'SubqueryAlias sc
   :  +- 'UnresolvedRelation [sale_course], [], false
   +- 'SubqueryAlias csc
      +- 'UnresolvedRelation [course_shopping_cart], [], false
 
== Analyzed Logical Plan ==
courseid: bigint, coursename: string, totalsell: double
Aggregate [courseid#3L, coursename#5], [courseid#3L, coursename#5, sum(cast(sellmoney#22 as double)) AS totalsell#0]
+- Join Inner, (((courseid#3L = courseid#17L) AND (dt#15 = dt#23)) AND (dn#16 = dn#24))
   :- SubqueryAlias sc
   :  +- SubqueryAlias spark_catalog.spark_optimize.sale_course
   :     +- Relation spark_optimize.sale_course[chapterid#1L,chaptername#2,courseid#3L,coursemanager#4,coursename#5,edusubjectid#6L,edusubjectname#7,majorid#8L,majorname#9,money#10,pointlistid#11L,status#12,teacherid#13L,teachername#14,dt#15,dn#16] parquet
   +- SubqueryAlias csc
      +- SubqueryAlias spark_catalog.spark_optimize.course_shopping_cart
         +- Relation spark_optimize.course_shopping_cart[courseid#17L,coursename#18,createtime#19,discount#20,orderid#21,sellmoney#22,dt#23,dn#24] parquet
 
== Optimized Logical Plan ==
Aggregate [courseid#3L, coursename#5], [courseid#3L, coursename#5, sum(cast(sellmoney#22 as double)) AS totalsell#0]
+- Project [courseid#3L, coursename#5, sellmoney#22]
   +- Join Inner, (((courseid#3L = courseid#17L) AND (dt#15 = dt#23)) AND (dn#16 = dn#24))
      :- Project [courseid#3L, coursename#5, dt#15, dn#16]
      :  +- Filter ((isnotnull(courseid#3L) AND isnotnull(dt#15)) AND isnotnull(dn#16))
      :     +- Relation spark_optimize.sale_course[chapterid#1L,chaptername#2,courseid#3L,coursemanager#4,coursename#5,edusubjectid#6L,edusubjectname#7,majorid#8L,majorname#9,money#10,pointlistid#11L,status#12,teacherid#13L,teachername#14,dt#15,dn#16] parquet
      +- Project [courseid#17L, sellmoney#22, dt#23, dn#24]
         +- Filter ((isnotnull(courseid#17L) AND isnotnull(dt#23)) AND isnotnull(dn#24))
            +- Relation spark_optimize.course_shopping_cart[courseid#17L,coursename#18,createtime#19,discount#20,orderid#21,sellmoney#22,dt#23,dn#24] parquet
 
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[courseid#3L, coursename#5], functions=[sum(cast(sellmoney#22 as double))], output=[courseid#3L, coursename#5, totalsell#0])
   +- Exchange hashpartitioning(courseid#3L, coursename#5, 200), ENSURE_REQUIREMENTS, [id=#43]
      +- HashAggregate(keys=[courseid#3L, coursename#5], functions=[partial_sum(cast(sellmoney#22 as double))], output=[courseid#3L, coursename#5, sum#30])
         +- Project [courseid#3L, coursename#5, sellmoney#22]
            +- BroadcastHashJoin [courseid#3L, dt#15, dn#16], [courseid#17L, dt#23, dn#24], Inner, BuildLeft, false
               :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false], input[2, string, true], input[3, string, true]),false), [id=#38]
               :  +- Filter isnotnull(courseid#3L)
               :     +- FileScan parquet spark_optimize.sale_course[courseid#3L,coursename#5,dt#15,dn#16] Batched: true, DataFilters: [isnotnull(courseid#3L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/sale_course..., PartitionFilters: [isnotnull(dt#15), isnotnull(dn#16)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,coursename:string>
               +- Filter isnotnull(courseid#17L)
                  +- FileScan parquet spark_optimize.course_shopping_cart[courseid#17L,sellmoney#22,dt#23,dn#24] Batched: true, DataFilters: [isnotnull(courseid#17L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/course_shop..., PartitionFilters: [isnotnull(dt#23), isnotnull(dn#24)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,sellmoney:string>
  • 展示要 Codegen 生成的可执行 Java 代码
    • spark.sql(sqlstr).explain(mode=“codegen”)
Found 3 WholeStageCodegen subtrees.
== Subtree 1 / 3 (maxMethodCodeSize:409; maxConstantPoolSize:139(0.21% used); numInnerClasses:0) ==
*(1) Project [courseid#3L, coursename#5, dt#15, dn#16]
+- *(1) Filter isnotnull(courseid#3L)
   +- *(1) ColumnarToRow
      +- FileScan parquet spark_optimize.sale_course[courseid#3L,coursename#5,dt#15,dn#16] Batched: true, DataFilters: [isnotnull(courseid#3L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/sale_course/dt=20190..., PartitionFilters: [isnotnull(dt#15), isnotnull(dn#16)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,coursename:string>
 
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private int columnartorow_batchIdx_0;
/* 010 */   private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] columnartorow_mutableStateArray_2 = new org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[4];
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] columnartorow_mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3];
/* 012 */   private org.apache.spark.sql.vectorized.ColumnarBatch[] columnartorow_mutableStateArray_1 = new org.apache.spark.sql.vectorized.ColumnarBatch[1];
/* 013 */   private scala.collection.Iterator[] columnartorow_mutableStateArray_0 = new scala.collection.Iterator[1];
/* 014 */
/* 015 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 016 */     this.references = references;
/* 017 */   }
/* 018 */
/* 019 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 020 */     partitionIndex = index;
/* 021 */     this.inputs = inputs;
/* 022 */     columnartorow_mutableStateArray_0[0] = inputs[0];
/* 023 */
/* 024 */     columnartorow_mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 96);
/* 025 */     columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 96);
/* 026 */     columnartorow_mutableStateArray_3[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 96);
/* 027 */
/* 028 */   }
/* 029 */
/* 030 */   private void columnartorow_nextBatch_0() throws java.io.IOException {
/* 031 */     if (columnartorow_mutableStateArray_0[0].hasNext()) {
/* 032 */       columnartorow_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)columnartorow_mutableStateArray_0[0].next();
/* 033 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numInputBatches */).add(1);
/* 034 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(columnartorow_mutableStateArray_1[0].numRows());
/* 035 */       columnartorow_batchIdx_0 = 0;
/* 036 */       columnartorow_mutableStateArray_2[0] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(0);
/* 037 */       columnartorow_mutableStateArray_2[1] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(1);
/* 038 */       columnartorow_mutableStateArray_2[2] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(2);
/* 039 */       columnartorow_mutableStateArray_2[3] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(3);
/* 040 */
/* 041 */     }
/* 042 */   }
/* 043 */
/* 044 */   protected void processNext() throws java.io.IOException {
/* 045 */     if (columnartorow_mutableStateArray_1[0] == null) {
/* 046 */       columnartorow_nextBatch_0();
/* 047 */     }
/* 048 */     while ( columnartorow_mutableStateArray_1[0] != null) {
/* 049 */       int columnartorow_numRows_0 = columnartorow_mutableStateArray_1[0].numRows();
/* 050 */       int columnartorow_localEnd_0 = columnartorow_numRows_0 - columnartorow_batchIdx_0;
/* 051 */       for (int columnartorow_localIdx_0 = 0; columnartorow_localIdx_0 < columnartorow_localEnd_0; columnartorow_localIdx_0++) {
/* 052 */         int columnartorow_rowIdx_0 = columnartorow_batchIdx_0 + columnartorow_localIdx_0;
/* 053 */         do {
/* 054 */           boolean columnartorow_isNull_0 = columnartorow_mutableStateArray_2[0].isNullAt(columnartorow_rowIdx_0);
/* 055 */           long columnartorow_value_0 = columnartorow_isNull_0 ? -1L : (columnartorow_mutableStateArray_2[0].getLong(columnartorow_rowIdx_0));
/* 056 */
/* 057 */           boolean filter_value_2 = !columnartorow_isNull_0;
/* 058 */           if (!filter_value_2) continue;
/* 059 */
/* 060 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
/* 061 */
/* 062 */           boolean columnartorow_isNull_1 = columnartorow_mutableStateArray_2[1].isNullAt(columnartorow_rowIdx_0);
/* 063 */           UTF8String columnartorow_value_1 = columnartorow_isNull_1 ? null : (columnartorow_mutableStateArray_2[1].getUTF8String(columnartorow_rowIdx_0));
/* 064 */           boolean columnartorow_isNull_2 = columnartorow_mutableStateArray_2[2].isNullAt(columnartorow_rowIdx_0);
/* 065 */           UTF8String columnartorow_value_2 = columnartorow_isNull_2 ? null : (columnartorow_mutableStateArray_2[2].getUTF8String(columnartorow_rowIdx_0));
/* 066 */           boolean columnartorow_isNull_3 = columnartorow_mutableStateArray_2[3].isNullAt(columnartorow_rowIdx_0);
/* 067 */           UTF8String columnartorow_value_3 = columnartorow_isNull_3 ? null : (columnartorow_mutableStateArray_2[3].getUTF8String(columnartorow_rowIdx_0));
/* 068 */           columnartorow_mutableStateArray_3[2].reset();
/* 069 */
/* 070 */           columnartorow_mutableStateArray_3[2].zeroOutNullBytes();
/* 071 */
/* 072 */           if (false) {
/* 073 */             columnartorow_mutableStateArray_3[2].setNullAt(0);
/* 074 */           } else {
/* 075 */             columnartorow_mutableStateArray_3[2].write(0, columnartorow_value_0);
/* 076 */           }
/* 077 */
/* 078 */           if (columnartorow_isNull_1) {
/* 079 */             columnartorow_mutableStateArray_3[2].setNullAt(1);
/* 080 */           } else {
/* 081 */             columnartorow_mutableStateArray_3[2].write(1, columnartorow_value_1);
/* 082 */           }
/* 083 */
/* 084 */           if (columnartorow_isNull_2) {
/* 085 */             columnartorow_mutableStateArray_3[2].setNullAt(2);
/* 086 */           } else {
/* 087 */             columnartorow_mutableStateArray_3[2].write(2, columnartorow_value_2);
/* 088 */           }
/* 089 */
/* 090 */           if (columnartorow_isNull_3) {
/* 091 */             columnartorow_mutableStateArray_3[2].setNullAt(3);
/* 092 */           } else {
/* 093 */             columnartorow_mutableStateArray_3[2].write(3, columnartorow_value_3);
/* 094 */           }
/* 095 */           append((columnartorow_mutableStateArray_3[2].getRow()));
/* 096 */
/* 097 */         } while(false);
/* 098 */         if (shouldStop()) { columnartorow_batchIdx_0 = columnartorow_rowIdx_0 + 1; return; }
/* 099 */       }
/* 100 */       columnartorow_batchIdx_0 = columnartorow_numRows_0;
/* 101 */       columnartorow_mutableStateArray_1[0] = null;
/* 102 */       columnartorow_nextBatch_0();
/* 103 */     }
/* 104 */   }
/* 105 */
/* 106 */ }
 
== Subtree 2 / 3 (maxMethodCodeSize:541; maxConstantPoolSize:351(0.54% used); numInnerClasses:1) ==
*(2) HashAggregate(keys=[courseid#3L, coursename#5], functions=[partial_sum(cast(sellmoney#22 as double))], output=[courseid#3L, coursename#5, sum#30])
+- *(2) Project [courseid#3L, coursename#5, sellmoney#22]
   +- *(2) BroadcastHashJoin [courseid#3L, dt#15, dn#16], [courseid#17L, dt#23, dn#24], Inner, BuildLeft
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true], input[2, string, true], input[3, string, true])), [id=#57]
      :  +- *(1) Project [courseid#3L, coursename#5, dt#15, dn#16]
      :     +- *(1) Filter isnotnull(courseid#3L)
      :        +- *(1) ColumnarToRow
      :           +- FileScan parquet spark_optimize.sale_course[courseid#3L,coursename#5,dt#15,dn#16] Batched: true, DataFilters: [isnotnull(courseid#3L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/sale_course/dt=20190..., PartitionFilters: [isnotnull(dt#15), isnotnull(dn#16)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,coursename:string>
      +- *(2) Project [courseid#17L, sellmoney#22, dt#23, dn#24]
         +- *(2) Filter isnotnull(courseid#17L)
            +- *(2) ColumnarToRow
               +- FileScan parquet spark_optimize.course_shopping_cart[courseid#17L,sellmoney#22,dt#23,dn#24] Batched: true, DataFilters: [isnotnull(courseid#17L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/course_shopping_cart..., PartitionFilters: [isnotnull(dt#23), isnotnull(dn#24)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,sellmoney:string>
 
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean agg_initAgg_0;
/* 010 */   private boolean agg_bufIsNull_0;
/* 011 */   private double agg_bufValue_0;
/* 012 */   private agg_FastHashMap_0 agg_fastHashMap_0;
/* 013 */   private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter_0;
/* 014 */   private org.apache.spark.unsafe.KVIterator agg_mapIter_0;
/* 015 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap_0;
/* 016 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter_0;
/* 017 */   private int columnartorow_batchIdx_0;
/* 018 */   private org.apache.spark.sql.execution.joins.UnsafeHashedRelation bhj_relation_0;
/* 019 */   private boolean agg_agg_isNull_8_0;
/* 020 */   private boolean agg_agg_isNull_10_0;
/* 021 */   private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] columnartorow_mutableStateArray_2 = new org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[4];
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] columnartorow_mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[9];
/* 023 */   private org.apache.spark.sql.vectorized.ColumnarBatch[] columnartorow_mutableStateArray_1 = new org.apache.spark.sql.vectorized.ColumnarBatch[1];
/* 024 */   private scala.collection.Iterator[] columnartorow_mutableStateArray_0 = new scala.collection.Iterator[1];
/* 025 */
/* 026 */   public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 027 */     this.references = references;
/* 028 */   }
/* 029 */
/* 030 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */     partitionIndex = index;
/* 032 */     this.inputs = inputs;
/* 033 */     wholestagecodegen_init_0_0();
/* 034 */     wholestagecodegen_init_0_1();
/* 035 */
/* 036 */   }
/* 037 */
/* 038 */   public class agg_FastHashMap_0 {
/* 039 */     private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 040 */     private int[] buckets;
/* 041 */     private int capacity = 1 << 16;
/* 042 */     private double loadFactor = 0.5;
/* 043 */     private int numBuckets = (int) (capacity / loadFactor);
/* 044 */     private int maxSteps = 2;
/* 045 */     private int numRows = 0;
/* 046 */     private Object emptyVBase;
/* 047 */     private long emptyVOff;
/* 048 */     private int emptyVLen;
/* 049 */     private boolean isBatchFull = false;
/* 050 */     private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 051 */
/* 052 */     public agg_FastHashMap_0(
/* 053 */       org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 054 */       InternalRow emptyAggregationBuffer) {
/* 055 */       batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 056 */       .allocate(((org.apache.spark.sql.types.StructType) references[1] /* keySchemaTerm */), ((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */), taskMemoryManager, capacity);
/* 057 */
/* 058 */       final UnsafeProjection valueProjection = UnsafeProjection.create(((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */));
/* 059 */       final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 060 */
/* 061 */       emptyVBase = emptyBuffer;
/* 062 */       emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 063 */       emptyVLen = emptyBuffer.length;
/* 064 */
/* 065 */       agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 066 */         2, 32);
/* 067 */
/* 068 */       buckets = new int[numBuckets];
/* 069 */       java.util.Arrays.fill(buckets, -1);
/* 070 */     }
/* 071 */
/* 072 */     public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(long agg_key_0, UTF8String agg_key_1) {
/* 073 */       long h = hash(agg_key_0, agg_key_1);
/* 074 */       int step = 0;
/* 075 */       int idx = (int) h & (numBuckets - 1);
/* 076 */       while (step < maxSteps) {
/* 077 */         // Return bucket index if it's either an empty slot or already contains the key
/* 078 */         if (buckets[idx] == -1) {
/* 079 */           if (numRows < capacity && !isBatchFull) {
/* 080 */             agg_rowWriter.reset();
/* 081 */             agg_rowWriter.zeroOutNullBytes();
/* 082 */             agg_rowWriter.write(0, agg_key_0);
/* 083 */             agg_rowWriter.write(1, agg_key_1);
/* 084 */             org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result
/* 085 */             = agg_rowWriter.getRow();
/* 086 */             Object kbase = agg_result.getBaseObject();
/* 087 */             long koff = agg_result.getBaseOffset();
/* 088 */             int klen = agg_result.getSizeInBytes();
/* 089 */
/* 090 */             UnsafeRow vRow
/* 091 */             = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 092 */             if (vRow == null) {
/* 093 */               isBatchFull = true;
/* 094 */             } else {
/* 095 */               buckets[idx] = numRows++;
/* 096 */             }
/* 097 */             return vRow;
/* 098 */           } else {
/* 099 */             // No more space
/* 100 */             return null;
/* 101 */           }
/* 102 */         } else if (equals(idx, agg_key_0, agg_key_1)) {
/* 103 */           return batch.getValueRow(buckets[idx]);
/* 104 */         }
/* 105 */         idx = (idx + 1) & (numBuckets - 1);
/* 106 */         step++;
/* 107 */       }
/* 108 */       // Didn't find it
/* 109 */       return null;
/* 110 */     }
/* 111 */
/* 112 */     private boolean equals(int idx, long agg_key_0, UTF8String agg_key_1) {
/* 113 */       UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 114 */       return (row.getLong(0) == agg_key_0) && (row.getUTF8String(1).equals(agg_key_1));
/* 115 */     }
/* 116 */
/* 117 */     private long hash(long agg_key_0, UTF8String agg_key_1) {
/* 118 */       long agg_hash_0 = 0;
/* 119 */
/* 120 */       long agg_result_0 = agg_key_0;
/* 121 */       agg_hash_0 = (agg_hash_0 ^ (0x9e3779b9)) + agg_result_0 + (agg_hash_0 << 6) + (agg_hash_0 >>> 2);
/* 122 */
/* 123 */       int agg_result_1 = 0;
/* 124 */       byte[] agg_bytes_0 = agg_key_1.getBytes();
/* 125 */       for (int i = 0; i < agg_bytes_0.length; i++) {
/* 126 */         int agg_hash_1 = agg_bytes_0[i];
/* 127 */         agg_result_1 = (agg_result_1 ^ (0x9e3779b9)) + agg_hash_1 + (agg_result_1 << 6) + (agg_result_1 >>> 2);
/* 128 */       }
/* 129 */
/* 130 */       agg_hash_0 = (agg_hash_0 ^ (0x9e3779b9)) + agg_result_1 + (agg_hash_0 << 6) + (agg_hash_0 >>> 2);
/* 131 */
/* 132 */       return agg_hash_0;
/* 133 */     }
/* 134 */
/* 135 */     public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 136 */       return batch.rowIterator();
/* 137 */     }
/* 138 */
/* 139 */     public void close() {
/* 140 */       batch.close();
/* 141 */     }
/* 142 */
/* 143 */   }
/* 144 */
/* 145 */   private void agg_doAggregate_sum_0(boolean agg_exprIsNull_2_0, org.apache.spark.sql.catalyst.InternalRow agg_unsafeRowAggBuffer_0, org.apache.spark.unsafe.types.UTF8String agg_expr_2_0) throws java.io.IOException {
/* 146 */     agg_agg_isNull_8_0 = true;
/* 147 */     double agg_value_9 = -1.0;
/* 148 */     do {
/* 149 */       boolean agg_isNull_9 = true;
/* 150 */       double agg_value_10 = -1.0;
/* 151 */       agg_agg_isNull_10_0 = true;
/* 152 */       double agg_value_11 = -1.0;
/* 153 */       do {
/* 154 */         boolean agg_isNull_11 = agg_unsafeRowAggBuffer_0.isNullAt(0);
/* 155 */         double agg_value_12 = agg_isNull_11 ?
/* 156 */         -1.0 : (agg_unsafeRowAggBuffer_0.getDouble(0));
/* 157 */         if (!agg_isNull_11) {
/* 158 */           agg_agg_isNull_10_0 = false;
/* 159 */           agg_value_11 = agg_value_12;
/* 160 */           continue;
/* 161 */         }
/* 162 */
/* 163 */         if (!false) {
/* 164 */           agg_agg_isNull_10_0 = false;
/* 165 */           agg_value_11 = 0.0D;
/* 166 */           continue;
/* 167 */         }
/* 168 */
/* 169 */       } while (false);
/* 170 */       boolean agg_isNull_13 = agg_exprIsNull_2_0;
/* 171 */       double agg_value_14 = -1.0;
/* 172 */       if (!agg_exprIsNull_2_0) {
/* 173 */         final String agg_doubleStr_0 = agg_expr_2_0.toString();
/* 174 */         try {
/* 175 */           agg_value_14 = Double.valueOf(agg_doubleStr_0);
/* 176 */         } catch (java.lang.NumberFormatException e) {
/* 177 */           final Double d = (Double) Cast.processFloatingPointSpecialLiterals(agg_doubleStr_0, false);
/* 178 */           if (d == null) {
/* 179 */             agg_isNull_13 = true;
/* 180 */           } else {
/* 181 */             agg_value_14 = d.doubleValue();
/* 182 */           }
/* 183 */         }
/* 184 */       }
/* 185 */       if (!agg_isNull_13) {
/* 186 */         agg_isNull_9 = false; // resultCode could change nullability.
/* 187 */
/* 188 */         agg_value_10 = agg_value_11 + agg_value_14;
/* 189 */
/* 190 */       }
/* 191 */       if (!agg_isNull_9) {
/* 192 */         agg_agg_isNull_8_0 = false;
/* 193 */         agg_value_9 = agg_value_10;
/* 194 */         continue;
/* 195 */       }
/* 196 */
/* 197 */       boolean agg_isNull_15 = agg_unsafeRowAggBuffer_0.isNullAt(0);
/* 198 */       double agg_value_16 = agg_isNull_15 ?
/* 199 */       -1.0 : (agg_unsafeRowAggBuffer_0.getDouble(0));
/* 200 */       if (!agg_isNull_15) {
/* 201 */         agg_agg_isNull_8_0 = false;
/* 202 */         agg_value_9 = agg_value_16;
/* 203 */         continue;
/* 204 */       }
/* 205 */
/* 206 */     } while (false);
/* 207 */
/* 208 */     if (!agg_agg_isNull_8_0) {
/* 209 */       agg_unsafeRowAggBuffer_0.setDouble(0, agg_value_9);
/* 210 */     } else {
/* 211 */       agg_unsafeRowAggBuffer_0.setNullAt(0);
/* 212 */     }
/* 213 */   }
/* 214 */
/* 215 */   private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, UnsafeRow agg_bufferTerm_0)
/* 216 */   throws java.io.IOException {
/* 217 */     ((org.apache.spark.sql.execution.metric.SQLMetric) references[11] /* numOutputRows */).add(1);
/* 218 */
/* 219 */     boolean agg_isNull_16 = agg_keyTerm_0.isNullAt(0);
/* 220 */     long agg_value_17 = agg_isNull_16 ?
/* 221 */     -1L : (agg_keyTerm_0.getLong(0));
/* 222 */     boolean agg_isNull_17 = agg_keyTerm_0.isNullAt(1);
/* 223 */     UTF8String agg_value_18 = agg_isNull_17 ?
/* 224 */     null : (agg_keyTerm_0.getUTF8String(1));
/* 225 */     boolean agg_isNull_18 = agg_bufferTerm_0.isNullAt(0);
/* 226 */     double agg_value_19 = agg_isNull_18 ?
/* 227 */     -1.0 : (agg_bufferTerm_0.getDouble(0));
/* 228 */
/* 229 */     columnartorow_mutableStateArray_3[8].reset();
/* 230 */
/* 231 */     columnartorow_mutableStateArray_3[8].zeroOutNullBytes();
/* 232 */
/* 233 */     if (agg_isNull_16) {
/* 234 */       columnartorow_mutableStateArray_3[8].setNullAt(0);
/* 235 */     } else {
/* 236 */       columnartorow_mutableStateArray_3[8].write(0, agg_value_17);
/* 237 */     }
/* 238 */
/* 239 */     if (agg_isNull_17) {
/* 240 */       columnartorow_mutableStateArray_3[8].setNullAt(1);
/* 241 */     } else {
/* 242 */       columnartorow_mutableStateArray_3[8].write(1, agg_value_18);
/* 243 */     }
/* 244 */
/* 245 */     if (agg_isNull_18) {
/* 246 */       columnartorow_mutableStateArray_3[8].setNullAt(2);
/* 247 */     } else {
/* 248 */       columnartorow_mutableStateArray_3[8].write(2, agg_value_19);
/* 249 */     }
/* 250 */     append((columnartorow_mutableStateArray_3[8].getRow()));
/* 251 */
/* 252 */   }
/* 253 */
/* 254 */   private void wholestagecodegen_init_0_1() {
/* 255 */     columnartorow_mutableStateArray_3[7] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 32);
/* 256 */     columnartorow_mutableStateArray_3[8] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 32);
/* 257 */
/* 258 */   }
/* 259 */
/* 260 */   private void columnartorow_nextBatch_0() throws java.io.IOException {
/* 261 */     if (columnartorow_mutableStateArray_0[0].hasNext()) {
/* 262 */       columnartorow_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)columnartorow_mutableStateArray_0[0].next();
/* 263 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[7] /* numInputBatches */).add(1);
/* 264 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /* numOutputRows */).add(columnartorow_mutableStateArray_1[0].numRows());
/* 265 */       columnartorow_batchIdx_0 = 0;
/* 266 */       columnartorow_mutableStateArray_2[0] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(0);
/* 267 */       columnartorow_mutableStateArray_2[1] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(1);
/* 268 */       columnartorow_mutableStateArray_2[2] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(2);
/* 269 */       columnartorow_mutableStateArray_2[3] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(3);
/* 270 */
/* 271 */     }
/* 272 */   }
/* 273 */
/* 274 */   private void agg_doConsume_0(long agg_expr_0_0, boolean agg_exprIsNull_0_0, UTF8String agg_expr_1_0, boolean agg_exprIsNull_1_0, UTF8String agg_expr_2_0, boolean agg_exprIsNull_2_0) throws java.io.IOException {
/* 275 */     UnsafeRow agg_unsafeRowAggBuffer_0 = null;
/* 276 */     UnsafeRow agg_fastAggBuffer_0 = null;
/* 277 */
/* 278 */     if (true) {
/* 279 */       if (!agg_exprIsNull_0_0 && !agg_exprIsNull_1_0) {
/* 280 */         agg_fastAggBuffer_0 = agg_fastHashMap_0.findOrInsert(
/* 281 */           agg_expr_0_0, agg_expr_1_0);
/* 282 */       }
/* 283 */     }
/* 284 */     // Cannot find the key in fast hash map, try regular hash map.
/* 285 */     if (agg_fastAggBuffer_0 == null) {
/* 286 */       // generate grouping key
/* 287 */       columnartorow_mutableStateArray_3[7].reset();
/* 288 */
/* 289 */       columnartorow_mutableStateArray_3[7].zeroOutNullBytes();
/* 290 */
/* 291 */       if (agg_exprIsNull_0_0) {
/* 292 */         columnartorow_mutableStateArray_3[7].setNullAt(0);
/* 293 */       } else {
/* 294 */         columnartorow_mutableStateArray_3[7].write(0, agg_expr_0_0);
/* 295 */       }
/* 296 */
/* 297 */       if (agg_exprIsNull_1_0) {
/* 298 */         columnartorow_mutableStateArray_3[7].setNullAt(1);
/* 299 */       } else {
/* 300 */         columnartorow_mutableStateArray_3[7].write(1, agg_expr_1_0);
/* 301 */       }
/* 302 */       int agg_unsafeRowKeyHash_0 = (columnartorow_mutableStateArray_3[7].getRow()).hashCode();
/* 303 */       if (true) {
/* 304 */         // try to get the buffer from hash map
/* 305 */         agg_unsafeRowAggBuffer_0 =
/* 306 */         agg_hashMap_0.getAggregationBufferFromUnsafeRow((columnartorow_mutableStateArray_3[7].getRow()), agg_unsafeRowKeyHash_0);
/* 307 */       }
/* 308 */       // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 309 */       // aggregation after processing all input rows.
/* 310 */       if (agg_unsafeRowAggBuffer_0 == null) {
/* 311 */         if (agg_sorter_0 == null) {
/* 312 */           agg_sorter_0 = agg_hashMap_0.destructAndCreateExternalSorter();
/* 313 */         } else {
/* 314 */           agg_sorter_0.merge(agg_hashMap_0.destructAndCreateExternalSorter());
/* 315 */         }
/* 316 */
/* 317 */         // the hash map had be spilled, it should have enough memory now,
/* 318 */         // try to allocate buffer again.
/* 319 */         agg_unsafeRowAggBuffer_0 = agg_hashMap_0.getAggregationBufferFromUnsafeRow(
/* 320 */           (columnartorow_mutableStateArray_3[7].getRow()), agg_unsafeRowKeyHash_0);
/* 321 */         if (agg_unsafeRowAggBuffer_0 == null) {
/* 322 */           // failed to allocate the first page
/* 323 */           throw new org.apache.spark.memory.SparkOutOfMemoryError("No enough memory for aggregation");
/* 324 */         }
/* 325 */       }
/* 326 */
/* 327 */     }
/* 328 */
/* 329 */     // Updates the proper row buffer
/* 330 */     if (agg_fastAggBuffer_0 != null) {
/* 331 */       agg_unsafeRowAggBuffer_0 = agg_fastAggBuffer_0;
/* 332 */     }
/* 333 */
/* 334 */     // common sub-expressions
/* 335 */
/* 336 */     // evaluate aggregate functions and update aggregation buffers
/* 337 */     agg_doAggregate_sum_0(agg_exprIsNull_2_0, agg_unsafeRowAggBuffer_0, agg_expr_2_0);
/* 338 */
/* 339 */   }
/* 340 */
/* 341 */   private void agg_doAggregateWithKeys_0() throws java.io.IOException {
/* 342 */     if (columnartorow_mutableStateArray_1[0] == null) {
/* 343 */       columnartorow_nextBatch_0();
/* 344 */     }
/* 345 */     while ( columnartorow_mutableStateArray_1[0] != null) {
/* 346 */       int columnartorow_numRows_0 = columnartorow_mutableStateArray_1[0].numRows();
/* 347 */       int columnartorow_localEnd_0 = columnartorow_numRows_0 - columnartorow_batchIdx_0;
/* 348 */       for (int columnartorow_localIdx_0 = 0; columnartorow_localIdx_0 < columnartorow_localEnd_0; columnartorow_localIdx_0++) {
/* 349 */         int columnartorow_rowIdx_0 = columnartorow_batchIdx_0 + columnartorow_localIdx_0;
/* 350 */         do {
/* 351 */           boolean columnartorow_isNull_0 = columnartorow_mutableStateArray_2[0].isNullAt(columnartorow_rowIdx_0);
/* 352 */           long columnartorow_value_0 = columnartorow_isNull_0 ? -1L : (columnartorow_mutableStateArray_2[0].getLong(columnartorow_rowIdx_0));
/* 353 */
/* 354 */           boolean filter_value_2 = !columnartorow_isNull_0;
/* 355 */           if (!filter_value_2) continue;
/* 356 */
/* 357 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[8] /* numOutputRows */).add(1);
/* 358 */
/* 359 */           boolean columnartorow_isNull_2 = columnartorow_mutableStateArray_2[2].isNullAt(columnartorow_rowIdx_0);
/* 360 */           UTF8String columnartorow_value_2 = columnartorow_isNull_2 ? null : (columnartorow_mutableStateArray_2[2].getUTF8String(columnartorow_rowIdx_0));
/* 361 */           boolean columnartorow_isNull_3 = columnartorow_mutableStateArray_2[3].isNullAt(columnartorow_rowIdx_0);
/* 362 */           UTF8String columnartorow_value_3 = columnartorow_isNull_3 ? null : (columnartorow_mutableStateArray_2[3].getUTF8String(columnartorow_rowIdx_0));
/* 363 */
/* 364 */           // generate join key for stream side
/* 365 */           columnartorow_mutableStateArray_3[3].reset();
/* 366 */
/* 367 */           columnartorow_mutableStateArray_3[3].zeroOutNullBytes();
/* 368 */
/* 369 */           if (false) {
/* 370 */             columnartorow_mutableStateArray_3[3].setNullAt(0);
/* 371 */           } else {
/* 372 */             columnartorow_mutableStateArray_3[3].write(0, columnartorow_value_0);
/* 373 */           }
/* 374 */
/* 375 */           if (columnartorow_isNull_2) {
/* 376 */             columnartorow_mutableStateArray_3[3].setNullAt(1);
/* 377 */           } else {
/* 378 */             columnartorow_mutableStateArray_3[3].write(1, columnartorow_value_2);
/* 379 */           }
/* 380 */
/* 381 */           if (columnartorow_isNull_3) {
/* 382 */             columnartorow_mutableStateArray_3[3].setNullAt(2);
/* 383 */           } else {
/* 384 */             columnartorow_mutableStateArray_3[3].write(2, columnartorow_value_3);
/* 385 */           }
/* 386 */           // find matches from HashedRelation
/* 387 */           UnsafeRow bhj_matched_0 = (columnartorow_mutableStateArray_3[3].getRow()).anyNull() ? null: (UnsafeRow)bhj_relation_0.getValue((columnartorow_mutableStateArray_3[3].getRow()));
/* 388 */           if (bhj_matched_0 != null) {
/* 389 */             {
/* 390 */               ((org.apache.spark.sql.execution.metric.SQLMetric) references[10] /* numOutputRows */).add(1);
/* 391 */
/* 392 */               boolean bhj_isNull_3 = bhj_matched_0.isNullAt(0);
/* 393 */               long bhj_value_3 = bhj_isNull_3 ?
/* 394 */               -1L : (bhj_matched_0.getLong(0));
/* 395 */               boolean bhj_isNull_4 = bhj_matched_0.isNullAt(1);
/* 396 */               UTF8String bhj_value_4 = bhj_isNull_4 ?
/* 397 */               null : (bhj_matched_0.getUTF8String(1));
/* 398 */               boolean columnartorow_isNull_1 = columnartorow_mutableStateArray_2[1].isNullAt(columnartorow_rowIdx_0);
/* 399 */               UTF8String columnartorow_value_1 = columnartorow_isNull_1 ? null : (columnartorow_mutableStateArray_2[1].getUTF8String(columnartorow_rowIdx_0));
/* 400 */
/* 401 */               agg_doConsume_0(bhj_value_3, bhj_isNull_3, bhj_value_4, bhj_isNull_4, columnartorow_value_1, columnartorow_isNull_1);
/* 402 */
/* 403 */             }
/* 404 */           }
/* 405 */
/* 406 */         } while(false);
/* 407 */         // shouldStop check is eliminated
/* 408 */       }
/* 409 */       columnartorow_batchIdx_0 = columnartorow_numRows_0;
/* 410 */       columnartorow_mutableStateArray_1[0] = null;
/* 411 */       columnartorow_nextBatch_0();
/* 412 */     }
/* 413 */
/* 414 */     agg_fastHashMapIter_0 = agg_fastHashMap_0.rowIterator();
/* 415 */     agg_mapIter_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(agg_hashMap_0, agg_sorter_0, ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* avgHashProbe */));
/* 416 */
/* 417 */   }
/* 418 */
/* 419 */   protected void processNext() throws java.io.IOException {
/* 420 */     if (!agg_initAgg_0) {
/* 421 */       agg_initAgg_0 = true;
/* 422 */       agg_fastHashMap_0 = new agg_FastHashMap_0(((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getTaskMemoryManager(), ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).getEmptyAggregationBuffer());
/* 423 */       agg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
/* 424 */       long wholestagecodegen_beforeAgg_0 = System.nanoTime();
/* 425 */       agg_doAggregateWithKeys_0();
/* 426 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[12] /* aggTime */).add((System.nanoTime() - wholestagecodegen_beforeAgg_0) / 1000000);
/* 427 */     }
/* 428 */     // output the result
/* 429 */
/* 430 */     while (agg_fastHashMapIter_0.next()) {
/* 431 */       UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_fastHashMapIter_0.getKey();
/* 432 */       UnsafeRow agg_aggBuffer_0 = (UnsafeRow) agg_fastHashMapIter_0.getValue();
/* 433 */       agg_doAggregateWithKeysOutput_0(agg_aggKey_0, agg_aggBuffer_0);
/* 434 */
/* 435 */       if (shouldStop()) return;
/* 436 */     }
/* 437 */     agg_fastHashMap_0.close();
/* 438 */
/* 439 */     while ( agg_mapIter_0.next()) {
/* 440 */       UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_mapIter_0.getKey();
/* 441 */       UnsafeRow agg_aggBuffer_0 = (UnsafeRow) agg_mapIter_0.getValue();
/* 442 */       agg_doAggregateWithKeysOutput_0(agg_aggKey_0, agg_aggBuffer_0);
/* 443 */       if (shouldStop()) return;
/* 444 */     }
/* 445 */     agg_mapIter_0.close();
/* 446 */     if (agg_sorter_0 == null) {
/* 447 */       agg_hashMap_0.free();
/* 448 */     }
/* 449 */   }
/* 450 */
/* 451 */   private void wholestagecodegen_init_0_0() {
/* 452 */     columnartorow_mutableStateArray_0[0] = inputs[0];
/* 453 */     columnartorow_mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 96);
/* 454 */     columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 96);
/* 455 */     columnartorow_mutableStateArray_3[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(4, 96);
/* 456 */
/* 457 */     bhj_relation_0 = ((org.apache.spark.sql.execution.joins.UnsafeHashedRelation) ((org.apache.spark.broadcast.TorrentBroadcast) references[9] /* broadcast */).value()).asReadOnlyCopy();
/* 458 */     incPeakExecutionMemory(bhj_relation_0.estimatedSize());
/* 459 */
/* 460 */     columnartorow_mutableStateArray_3[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 64);
/* 461 */     columnartorow_mutableStateArray_3[4] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(8, 192);
/* 462 */     columnartorow_mutableStateArray_3[5] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 64);
/* 463 */     columnartorow_mutableStateArray_3[6] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 64);
/* 464 */
/* 465 */   }
/* 466 */
/* 467 */ }
 
== Subtree 3 / 3 (maxMethodCodeSize:206; maxConstantPoolSize:232(0.35% used); numInnerClasses:0) ==
*(3) HashAggregate(keys=[courseid#3L, coursename#5], functions=[sum(cast(sellmoney#22 as double))], output=[courseid#3L, coursename#5, totalsell#0])
+- Exchange hashpartitioning(courseid#3L, coursename#5, 200), true, [id=#67]
   +- *(2) HashAggregate(keys=[courseid#3L, coursename#5], functions=[partial_sum(cast(sellmoney#22 as double))], output=[courseid#3L, coursename#5, sum#30])
      +- *(2) Project [courseid#3L, coursename#5, sellmoney#22]
         +- *(2) BroadcastHashJoin [courseid#3L, dt#15, dn#16], [courseid#17L, dt#23, dn#24], Inner, BuildLeft
            :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true], input[2, string, true], input[3, string, true])), [id=#57]
            :  +- *(1) Project [courseid#3L, coursename#5, dt#15, dn#16]
            :     +- *(1) Filter isnotnull(courseid#3L)
            :        +- *(1) ColumnarToRow
            :           +- FileScan parquet spark_optimize.sale_course[courseid#3L,coursename#5,dt#15,dn#16] Batched: true, DataFilters: [isnotnull(courseid#3L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/sale_course/dt=20190..., PartitionFilters: [isnotnull(dt#15), isnotnull(dn#16)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,coursename:string>
            +- *(2) Project [courseid#17L, sellmoney#22, dt#23, dn#24]
               +- *(2) Filter isnotnull(courseid#17L)
                  +- *(2) ColumnarToRow
                     +- FileScan parquet spark_optimize.course_shopping_cart[courseid#17L,sellmoney#22,dt#23,dn#24] Batched: true, DataFilters: [isnotnull(courseid#17L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/course_shopping_cart..., PartitionFilters: [isnotnull(dt#23), isnotnull(dn#24)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,sellmoney:string>
 
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage3(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=3
/* 006 */ final class GeneratedIteratorForCodegenStage3 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean agg_initAgg_0;
/* 010 */   private org.apache.spark.unsafe.KVIterator agg_mapIter_0;
/* 011 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap_0;
/* 012 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter_0;
/* 013 */   private scala.collection.Iterator inputadapter_input_0;
/* 014 */   private boolean agg_agg_isNull_4_0;
/* 015 */   private boolean agg_agg_isNull_6_0;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 017 */
/* 018 */   public GeneratedIteratorForCodegenStage3(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */
/* 026 */     inputadapter_input_0 = inputs[0];
/* 027 */     agg_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 32);
/* 028 */     agg_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(3, 32);
/* 029 */
/* 030 */   }
/* 031 */
/* 032 */   private void agg_doAggregate_sum_0(boolean agg_exprIsNull_2_0, org.apache.spark.sql.catalyst.InternalRow agg_unsafeRowAggBuffer_0, double agg_expr_2_0) throws java.io.IOException {
/* 033 */     agg_agg_isNull_4_0 = true;
/* 034 */     double agg_value_4 = -1.0;
/* 035 */     do {
/* 036 */       boolean agg_isNull_5 = true;
/* 037 */       double agg_value_5 = -1.0;
/* 038 */       agg_agg_isNull_6_0 = true;
/* 039 */       double agg_value_6 = -1.0;
/* 040 */       do {
/* 041 */         boolean agg_isNull_7 = agg_unsafeRowAggBuffer_0.isNullAt(0);
/* 042 */         double agg_value_7 = agg_isNull_7 ?
/* 043 */         -1.0 : (agg_unsafeRowAggBuffer_0.getDouble(0));
/* 044 */         if (!agg_isNull_7) {
/* 045 */           agg_agg_isNull_6_0 = false;
/* 046 */           agg_value_6 = agg_value_7;
/* 047 */           continue;
/* 048 */         }
/* 049 */
/* 050 */         if (!false) {
/* 051 */           agg_agg_isNull_6_0 = false;
/* 052 */           agg_value_6 = 0.0D;
/* 053 */           continue;
/* 054 */         }
/* 055 */
/* 056 */       } while (false);
/* 057 */
/* 058 */       if (!agg_exprIsNull_2_0) {
/* 059 */         agg_isNull_5 = false; // resultCode could change nullability.
/* 060 */
/* 061 */         agg_value_5 = agg_value_6 + agg_expr_2_0;
/* 062 */
/* 063 */       }
/* 064 */       if (!agg_isNull_5) {
/* 065 */         agg_agg_isNull_4_0 = false;
/* 066 */         agg_value_4 = agg_value_5;
/* 067 */         continue;
/* 068 */       }
/* 069 */
/* 070 */       boolean agg_isNull_10 = agg_unsafeRowAggBuffer_0.isNullAt(0);
/* 071 */       double agg_value_10 = agg_isNull_10 ?
/* 072 */       -1.0 : (agg_unsafeRowAggBuffer_0.getDouble(0));
/* 073 */       if (!agg_isNull_10) {
/* 074 */         agg_agg_isNull_4_0 = false;
/* 075 */         agg_value_4 = agg_value_10;
/* 076 */         continue;
/* 077 */       }
/* 078 */
/* 079 */     } while (false);
/* 080 */
/* 081 */     if (!agg_agg_isNull_4_0) {
/* 082 */       agg_unsafeRowAggBuffer_0.setDouble(0, agg_value_4);
/* 083 */     } else {
/* 084 */       agg_unsafeRowAggBuffer_0.setNullAt(0);
/* 085 */     }
/* 086 */   }
/* 087 */
/* 088 */   private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, UnsafeRow agg_bufferTerm_0)
/* 089 */   throws java.io.IOException {
/* 090 */     ((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /* numOutputRows */).add(1);
/* 091 */
/* 092 */     boolean agg_isNull_11 = agg_keyTerm_0.isNullAt(0);
/* 093 */     long agg_value_11 = agg_isNull_11 ?
/* 094 */     -1L : (agg_keyTerm_0.getLong(0));
/* 095 */     boolean agg_isNull_12 = agg_keyTerm_0.isNullAt(1);
/* 096 */     UTF8String agg_value_12 = agg_isNull_12 ?
/* 097 */     null : (agg_keyTerm_0.getUTF8String(1));
/* 098 */     boolean agg_isNull_13 = agg_bufferTerm_0.isNullAt(0);
/* 099 */     double agg_value_13 = agg_isNull_13 ?
/* 100 */     -1.0 : (agg_bufferTerm_0.getDouble(0));
/* 101 */
/* 102 */     agg_mutableStateArray_0[1].reset();
/* 103 */
/* 104 */     agg_mutableStateArray_0[1].zeroOutNullBytes();
/* 105 */
/* 106 */     if (agg_isNull_11) {
/* 107 */       agg_mutableStateArray_0[1].setNullAt(0);
/* 108 */     } else {
/* 109 */       agg_mutableStateArray_0[1].write(0, agg_value_11);
/* 110 */     }
/* 111 */
/* 112 */     if (agg_isNull_12) {
/* 113 */       agg_mutableStateArray_0[1].setNullAt(1);
/* 114 */     } else {
/* 115 */       agg_mutableStateArray_0[1].write(1, agg_value_12);
/* 116 */     }
/* 117 */
/* 118 */     if (agg_isNull_13) {
/* 119 */       agg_mutableStateArray_0[1].setNullAt(2);
/* 120 */     } else {
/* 121 */       agg_mutableStateArray_0[1].write(2, agg_value_13);
/* 122 */     }
/* 123 */     append((agg_mutableStateArray_0[1].getRow()));
/* 124 */
/* 125 */   }
/* 126 */
/* 127 */   private void agg_doConsume_0(InternalRow inputadapter_row_0, long agg_expr_0_0, boolean agg_exprIsNull_0_0, UTF8String agg_expr_1_0, boolean agg_exprIsNull_1_0, double agg_expr_2_0, boolean agg_exprIsNull_2_0) throws java.io.IOException {
/* 128 */     UnsafeRow agg_unsafeRowAggBuffer_0 = null;
/* 129 */
/* 130 */     // generate grouping key
/* 131 */     agg_mutableStateArray_0[0].reset();
/* 132 */
/* 133 */     agg_mutableStateArray_0[0].zeroOutNullBytes();
/* 134 */
/* 135 */     if (agg_exprIsNull_0_0) {
/* 136 */       agg_mutableStateArray_0[0].setNullAt(0);
/* 137 */     } else {
/* 138 */       agg_mutableStateArray_0[0].write(0, agg_expr_0_0);
/* 139 */     }
/* 140 */
/* 141 */     if (agg_exprIsNull_1_0) {
/* 142 */       agg_mutableStateArray_0[0].setNullAt(1);
/* 143 */     } else {
/* 144 */       agg_mutableStateArray_0[0].write(1, agg_expr_1_0);
/* 145 */     }
/* 146 */     int agg_unsafeRowKeyHash_0 = (agg_mutableStateArray_0[0].getRow()).hashCode();
/* 147 */     if (true) {
/* 148 */       // try to get the buffer from hash map
/* 149 */       agg_unsafeRowAggBuffer_0 =
/* 150 */       agg_hashMap_0.getAggregationBufferFromUnsafeRow((agg_mutableStateArray_0[0].getRow()), agg_unsafeRowKeyHash_0);
/* 151 */     }
/* 152 */     // Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
/* 153 */     // aggregation after processing all input rows.
/* 154 */     if (agg_unsafeRowAggBuffer_0 == null) {
/* 155 */       if (agg_sorter_0 == null) {
/* 156 */         agg_sorter_0 = agg_hashMap_0.destructAndCreateExternalSorter();
/* 157 */       } else {
/* 158 */         agg_sorter_0.merge(agg_hashMap_0.destructAndCreateExternalSorter());
/* 159 */       }
/* 160 */
/* 161 */       // the hash map had be spilled, it should have enough memory now,
/* 162 */       // try to allocate buffer again.
/* 163 */       agg_unsafeRowAggBuffer_0 = agg_hashMap_0.getAggregationBufferFromUnsafeRow(
/* 164 */         (agg_mutableStateArray_0[0].getRow()), agg_unsafeRowKeyHash_0);
/* 165 */       if (agg_unsafeRowAggBuffer_0 == null) {
/* 166 */         // failed to allocate the first page
/* 167 */         throw new org.apache.spark.memory.SparkOutOfMemoryError("No enough memory for aggregation");
/* 168 */       }
/* 169 */     }
/* 170 */
/* 171 */     // common sub-expressions
/* 172 */
/* 173 */     // evaluate aggregate functions and update aggregation buffers
/* 174 */     agg_doAggregate_sum_0(agg_exprIsNull_2_0, agg_unsafeRowAggBuffer_0, agg_expr_2_0);
/* 175 */
/* 176 */   }
/* 177 */
/* 178 */   private void agg_doAggregateWithKeys_0() throws java.io.IOException {
/* 179 */     while ( inputadapter_input_0.hasNext()) {
/* 180 */       InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
/* 181 */
/* 182 */       boolean inputadapter_isNull_0 = inputadapter_row_0.isNullAt(0);
/* 183 */       long inputadapter_value_0 = inputadapter_isNull_0 ?
/* 184 */       -1L : (inputadapter_row_0.getLong(0));
/* 185 */       boolean inputadapter_isNull_1 = inputadapter_row_0.isNullAt(1);
/* 186 */       UTF8String inputadapter_value_1 = inputadapter_isNull_1 ?
/* 187 */       null : (inputadapter_row_0.getUTF8String(1));
/* 188 */       boolean inputadapter_isNull_2 = inputadapter_row_0.isNullAt(2);
/* 189 */       double inputadapter_value_2 = inputadapter_isNull_2 ?
/* 190 */       -1.0 : (inputadapter_row_0.getDouble(2));
/* 191 */
/* 192 */       agg_doConsume_0(inputadapter_row_0, inputadapter_value_0, inputadapter_isNull_0, inputadapter_value_1, inputadapter_isNull_1, inputadapter_value_2, inputadapter_isNull_2);
/* 193 */       // shouldStop check is eliminated
/* 194 */     }
/* 195 */
/* 196 */     agg_mapIter_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).finishAggregate(agg_hashMap_0, agg_sorter_0, ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* spillSize */), ((org.apache.spark.sql.execution.metric.SQLMetric) references[3] /* avgHashProbe */));
/* 197 */   }
/* 198 */
/* 199 */   protected void processNext() throws java.io.IOException {
/* 200 */     if (!agg_initAgg_0) {
/* 201 */       agg_initAgg_0 = true;
/* 202 */
/* 203 */       agg_hashMap_0 = ((org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[0] /* plan */).createHashMap();
/* 204 */       long wholestagecodegen_beforeAgg_0 = System.nanoTime();
/* 205 */       agg_doAggregateWithKeys_0();
/* 206 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[5] /* aggTime */).add((System.nanoTime() - wholestagecodegen_beforeAgg_0) / 1000000);
/* 207 */     }
/* 208 */     // output the result
/* 209 */
/* 210 */     while ( agg_mapIter_0.next()) {
/* 211 */       UnsafeRow agg_aggKey_0 = (UnsafeRow) agg_mapIter_0.getKey();
/* 212 */       UnsafeRow agg_aggBuffer_0 = (UnsafeRow) agg_mapIter_0.getValue();
/* 213 */       agg_doAggregateWithKeysOutput_0(agg_aggKey_0, agg_aggBuffer_0);
/* 214 */       if (shouldStop()) return;
/* 215 */     }
/* 216 */     agg_mapIter_0.close();
/* 217 */     if (agg_sorter_0 == null) {
/* 218 */       agg_hashMap_0.free();
/* 219 */     }
/* 220 */   }
/* 221 */
/* 222 */ }
  • 展示优化后的逻辑执行计划以及相关的统计
    • spark.sql(sqlstr).explain(mode=“cost”)
Aggregate [courseid#3L, coursename#5], [courseid#3L, coursename#5, sum(cast(sellmoney#22 as double)) AS totalsell#0], Statistics(sizeInBytes=1288.1 GiB)
+- Project [courseid#3L, coursename#5, sellmoney#22], Statistics(sizeInBytes=1639.4 GiB)
   +- Join Inner, (((courseid#3L = courseid#17L) AND (dt#15 = dt#23)) AND (dn#16 = dn#24)), Statistics(sizeInBytes=4.1 TiB)
      :- Project [courseid#3L, coursename#5, dt#15, dn#16], Statistics(sizeInBytes=40.9 KiB)
      :  +- Filter ((isnotnull(courseid#3L) AND isnotnull(dt#15)) AND isnotnull(dn#16)), Statistics(sizeInBytes=137.9 KiB)
      :     +- Relation spark_optimize.sale_course[chapterid#1L,chaptername#2,courseid#3L,coursemanager#4,coursename#5,edusubjectid#6L,edusubjectname#7,majorid#8L,majorname#9,money#10,pointlistid#11L,status#12,teacherid#13L,teachername#14,dt#15,dn#16] parquet, Statistics(sizeInBytes=137.9 KiB)
      +- Project [courseid#17L, sellmoney#22, dt#23, dn#24], Statistics(sizeInBytes=103.0 MiB)
         +- Filter ((isnotnull(courseid#17L) AND isnotnull(dt#23)) AND isnotnull(dn#24)), Statistics(sizeInBytes=211.4 MiB)
            +- Relation spark_optimize.course_shopping_cart[courseid#17L,coursename#18,createtime#19,discount#20,orderid#21,sellmoney#22,dt#23,dn#24] parquet, Statistics(sizeInBytes=211.4 MiB)
 
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[courseid#3L, coursename#5], functions=[sum(cast(sellmoney#22 as double))], output=[courseid#3L, coursename#5, totalsell#0])
   +- Exchange hashpartitioning(courseid#3L, coursename#5, 200), ENSURE_REQUIREMENTS, [id=#43]
      +- HashAggregate(keys=[courseid#3L, coursename#5], functions=[partial_sum(cast(sellmoney#22 as double))], output=[courseid#3L, coursename#5, sum#30])
         +- Project [courseid#3L, coursename#5, sellmoney#22]
            +- BroadcastHashJoin [courseid#3L, dt#15, dn#16], [courseid#17L, dt#23, dn#24], Inner, BuildLeft, false
               :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false], input[2, string, true], input[3, string, true]),false), [id=#38]
               :  +- Filter isnotnull(courseid#3L)
               :     +- FileScan parquet spark_optimize.sale_course[courseid#3L,coursename#5,dt#15,dn#16] Batched: true, DataFilters: [isnotnull(courseid#3L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/sale_course..., PartitionFilters: [isnotnull(dt#15), isnotnull(dn#16)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,coursename:string>
               +- Filter isnotnull(courseid#17L)
                  +- FileScan parquet spark_optimize.course_shopping_cart[courseid#17L,sellmoney#22,dt#23,dn#24] Batched: true, DataFilters: [isnotnull(courseid#17L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/course_shop..., PartitionFilters: [isnotnull(dt#23), isnotnull(dn#24)], PushedFilters: [IsNotNull(courseid)], ReadSchema: struct<courseid:bigint,sellmoney:string>
  • 格式化输出更易读的物理执行计划,展示每个节点的详细信息
    • spark.sql(sqlstr).explain(mode=“formatted”)
== Physical Plan ==
* HashAggregate (14)
+- Exchange (13)
   +- * HashAggregate (12)
      +- * Project (11)
         +- * BroadcastHashJoin Inner BuildLeft (10)
            :- BroadcastExchange (5)
            :  +- * Project (4)
            :     +- * Filter (3)
            :        +- * ColumnarToRow (2)
            :           +- Scan parquet spark_optimize.sale_course (1)
            +- * Project (9)
               +- * Filter (8)
                  +- * ColumnarToRow (7)
                     +- Scan parquet spark_optimize.course_shopping_cart (6)
 
 
(1) Scan parquet spark_optimize.sale_course
Output [4]: [courseid#3L, coursename#5, dt#15, dn#16]
Batched: true
Location: InMemoryFileIndex [hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/sale_course/dt=20190722/dn=webA]
PartitionFilters: [isnotnull(dt#15), isnotnull(dn#16)]
PushedFilters: [IsNotNull(courseid)]
ReadSchema: struct<courseid:bigint,coursename:string>
 
(2) ColumnarToRow [codegen id : 1]
Input [4]: [courseid#3L, coursename#5, dt#15, dn#16]
 
(3) Filter [codegen id : 1]
Input [4]: [courseid#3L, coursename#5, dt#15, dn#16]
Condition : isnotnull(courseid#3L)
 
(4) Project [codegen id : 1]
Output [4]: [courseid#3L, coursename#5, dt#15, dn#16]
Input [4]: [courseid#3L, coursename#5, dt#15, dn#16]
 
(5) BroadcastExchange
Input [4]: [courseid#3L, coursename#5, dt#15, dn#16]
Arguments: HashedRelationBroadcastMode(List(input[0, bigint, true], input[2, string, true], input[3, string, true])), [id=#57]
 
(6) Scan parquet spark_optimize.course_shopping_cart
Output [4]: [courseid#17L, sellmoney#22, dt#23, dn#24]
Batched: true
Location: InMemoryFileIndex [hdfs://hybrid01:9000/user/hive/warehouse/spark_optimize.db/course_shopping_cart/dt=20190722/dn=webA]
PartitionFilters: [isnotnull(dt#23), isnotnull(dn#24)]
PushedFilters: [IsNotNull(courseid)]
ReadSchema: struct<courseid:bigint,sellmoney:string>
 
(7) ColumnarToRow
Input [4]: [courseid#17L, sellmoney#22, dt#23, dn#24]
 
(8) Filter
Input [4]: [courseid#17L, sellmoney#22, dt#23, dn#24]
Condition : isnotnull(courseid#17L)
 
(9) Project
Output [4]: [courseid#17L, sellmoney#22, dt#23, dn#24]
Input [4]: [courseid#17L, sellmoney#22, dt#23, dn#24]
 
(10) BroadcastHashJoin [codegen id : 2]
Left keys [3]: [courseid#3L, dt#15, dn#16]
Right keys [3]: [courseid#17L, dt#23, dn#24]
Join condition: None
 
(11) Project [codegen id : 2]
Output [3]: [courseid#3L, coursename#5, sellmoney#22]
Input [8]: [courseid#3L, coursename#5, dt#15, dn#16, courseid#17L, sellmoney#22, dt#23, dn#24]
 
(12) HashAggregate [codegen id : 2]
Input [3]: [courseid#3L, coursename#5, sellmoney#22]
Keys [2]: [courseid#3L, coursename#5]
Functions [1]: [partial_sum(cast(sellmoney#22 as double))]
Aggregate Attributes [1]: [sum#29]
Results [3]: [courseid#3L, coursename#5, sum#30]
 
(13) Exchange
Input [3]: [courseid#3L, coursename#5, sum#30]
Arguments: hashpartitioning(courseid#3L, coursename#5, 200), true, [id=#67]
 
(14) HashAggregate [codegen id : 3]
Input [3]: [courseid#3L, coursename#5, sum#30]
Keys [2]: [courseid#3L, coursename#5]
Functions [1]: [sum(cast(sellmoney#22 as double))]
Aggregate Attributes [1]: [sum(cast(sellmoney#22 as double))#25]
Results [3]: [courseid#3L, coursename#5, sum(cast(sellmoney#22 as double))#25 AS totalsell#0]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/371760.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

都2023年了,竟然还有人问网络安全怎么入门?

工作一直忙碌&#xff0c;偶然翻了一下知乎&#xff0c;都2022年了&#xff0c;相关网课这么多了&#xff0c;还有人不知道怎么学习网络安全&#xff0c;不了解也就算了&#xff0c;竟然还有一批神仙也真敢回答&#xff0c;对这个行业了解各一知半解就当做这些萌新的启蒙老师了…

UDP与TCP协议

目录 UDP协议 协议报头 UDP协议特点&#xff1a; 应用场景&#xff1a; TCP TCP协议报头 确认应答机制 理解可靠性 超时重传机制 连接管理机制 三次握手&#xff1a; 四次挥手&#xff1a; 滑动窗口 如何理解缓冲区和滑动窗口&#xff1f; 倘若出现丢包&#xf…

05 DC-AC逆变器(DCAC Converter / Inverter)简介

文章目录0、概述逆变原理方波变换阶梯波变换斩控调制方式逆变器分类逆变器波形指标1、方波变换器A 单相单相全桥对称单脉冲调制移相单脉冲调制单相半桥2、方波变换器B 三相180度导通120度导通&#xff08;线、相的关系与180度相反&#xff09;3、阶梯波逆变器独立直流源二极管钳…

Esxi NAT网络搭建

前言 本文主要讲述如何在Esxi上只有一个公网IP情况下,实现内部虚拟机上网,以及外部对内部服务的访问,以及外网通过vpn访问内网; 环境 Esxi 6.7iKuai8 3.6.13OpenVPN 2.6一、创建虚拟路由 1.1 目的 虚拟路由,也就是常说的软路由;只有一个外网IP情况下,其他虚拟机需要上…

LeetCode刷题系列 -- 429. N 叉树的层序遍历

给定一个 N 叉树&#xff0c;返回其节点值的层序遍历。&#xff08;即从左到右&#xff0c;逐层遍历&#xff09;。树的序列化输入是用层序遍历&#xff0c;每组子节点都由 null 值分隔&#xff08;参见示例&#xff09;。示例 1&#xff1a;输入&#xff1a;root [1,null,3,2…

【每日阅读】JS知识(三)

var声明提升 js是一个解释性语言类型&#xff0c;预解析就是在执行代码之前对代码进行通读 var关键字是&#xff0c;在内存中声明一个变量名 js在代码执行之前 会经历两个环节 解释代码 和执行代码 声明式函数 内存中 先声明一个变量名是函数 这个名代表的是函数 乘法表 // for…

IP、ICMP、TCP和UDP校验和计算

一. 前言 计算网络数据包的校验和是机器自动完成&#xff0c;不需要手动计算。但是正因为如此&#xff0c;我们往往不会去深究校验和到底是怎么计算的&#xff0c;留下这一块盲区。虽然书上有大致介绍计算的方法&#xff0c;但是&#xff0c;“纸上得来终觉浅&#xff0c;绝知此…

二叉树——验证二叉搜索树

验证二叉搜索树 链接 给你一个二叉树的根节点 root &#xff0c;判断其是否是一个有效的二叉搜索树。 有效 二叉搜索树定义如下&#xff1a; 节点的左子树只包含 小于 当前节点的数。 节点的右子树只包含 大于 当前节点的数。 所有左子树和右子树自身必须也是二叉搜索树。 …

【Proteus仿真】【51单片机】粮仓温湿度控制系统设计

文章目录一、功能简介二、软件设计三、实验现象联系作者一、功能简介 本项目使用Proteus8仿真51单片机控制器&#xff0c;使用声光报警模块、LCD1602显示模块、DHT11温湿度模块、继电器模块、加热加湿除湿风扇等。 主要功能&#xff1a; 系统运行后&#xff0c;LCD1602显示传…

LeetCode 144. 二叉树的前序遍历

144. 二叉树的前序遍历 难度&#xff1a;easy\color{Green}{easy}easy 题目描述 给你二叉树的根节点 rootrootroot &#xff0c;返回它节点值的 前序 遍历。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,2,3]示例 2&#xff1a; 输入&#…

Web前端学习:三 - 练习

三六&#xff1a;风筝效果 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title><style type"text/css">*{margin: 0;padding: 0;}.d1{width: 200px;height: 200px;background: yellow;position…

链表的排序:插入排序和归并排序

文章目录链表的排序&#xff1a;插入排序和归并排序147. 对链表进行插入排序148. 排序链表链表的排序&#xff1a;插入排序和归并排序 两道例题进行记录。 147. 对链表进行插入排序 题目链接&#xff1a;https://leetcode.cn/problems/insertion-sort-list/ 题目大意&#x…

计算机网络笔记 | 第一章:计算机网络概述(1.1-1.4小节知识点整理)

从专栏将讲述有关于计算机网络相关知识点&#xff0c;如果有想学习Java的小伙伴可以点击下方连接查看专栏&#xff0c;还有JavaEE部分 本专栏地址&#xff08;持续更新中&#xff09;&#xff1a;&#x1f525;计算机网络 MyBatis&#xff1a;✍️MyBatis Java入门篇&#xff1…

nginx安装部署实战手册

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录一、虚拟机安装nginx1.下载安装包2.安装编译工具和库文件3.编译安装4.启动nginx5.访问首页6.开机自启结尾一、虚拟机安装nginx 1.下载安装包 官网下载地址&#xf…

zabbix4.0-自定义脚本告警

目录 1、在zabbix-server端下载mailx 2、配置mailx配置文件 3、查看zabbix-server设置的 AlertScriptsPath变量 4、在对应路径下面编写邮件脚本 5、创建一个媒介类型 6、为用户指定媒介类型 7、更改触发器表达式进行测试 1、在zabbix-server端下载mailx [rootzabbix-serve…

FastASR+FFmpeg(音视频开发+语音识别)

想要更好的做一件事情&#xff0c;不仅仅需要知道如何使用&#xff0c;还应该知道一些基础的概念。 一、音视频处理基本梳理 1.多媒体文件的理解 1.1 结构分析 多媒体文件本质上可以理解为一个容器 容器里有很多流 每种流是由不同编码器编码的 在众多包中包含着多个帧(帧在音视…

投票链接制作可以制作投票的软件不记名投票模板视频投票平台

手机互联网给所有人都带来不同程度的便利&#xff0c;而微信已经成为国民的系统级别的应用。现在很多人都会在微信群或朋友圈里转发投票&#xff0c;对于运营及推广来说找一个合适的投票小程序能够提高工作效率&#xff0c;提高活动的影响力。那么微信做投票的小程序哪个比较好…

Spyder初使用

Spyder初使用写在最前面偏好设置汉化背景颜色代码编辑快捷键分段代码运行配置环境调试写在最前面 Spyder类似于MATLAB界面。 交互式写代码&#xff0c;有工作空间&#xff0c;可以一览自己设置的变量都有啥 将高级编辑、分析、调试和全面开发工具的分析功能与数据探索、交互…

Spring Cloud Nacos源码讲解(九)- Nacos客户端本地缓存及故障转移

Nacos客户端本地缓存及故障转移 ​ 在Nacos本地缓存的时候有的时候必然会出现一些故障&#xff0c;这些故障就需要进行处理&#xff0c;涉及到的核心类为ServiceInfoHolder和FailoverReactor。 ​ 本地缓存有两方面&#xff0c;第一方面是从注册中心获得实例信息会缓存在内存当…

CODESYS Install

CODESYS InstallCODESYS 由 Development System 和 Runtime System 两部分组成&#xff0c;Development System 就是一个 IDE&#xff0c;是免费使用的&#xff0c;可以在 CODESYS 官网下载并安装。程序写好以后&#xff0c;就要把它转移到硬件设备中运行。这时生成的程序自己是…