Hive SQL ——窗口函数源码阅读

news2025/1/21 9:32:46

前言

   使用Starrocks引擎中的窗口函数 row_number() over( )对10亿的数据集进行去重操作,BE内存溢出问题频发(忘记当时指定的BE内存上限是多少了.....),此时才意识到,开窗操作,如果使用 不当,反而更容易引发性能问题。 下文是对Hive中的窗口函数底层源码进行初步学习,若有问题,请指正!

一、窗口函数的执行步骤

(1)将数据分割成多个分区;

(2)在各个分区上调用窗口函数。

   由于窗口函数的返回结果不是一个聚合值,而是另一张表的格式(table-in, table-out),因此Hive社区引入分区表函数  Partitioned Table Function(PTF)。

  简略的代码流转图:

  hive会把QueryBlock,翻译成执行操作数OperatorTree,其中每个operator都会有三个重要的方法:

  • initializeOp() :初始化算子
  • process() :执行每一行数据
  • forward() :把处理好的每一行数据发送到下个Operator

   当遇到窗口函数时,会生成PTFOperator,PTFOperator依赖PTFInvocation 读取已经排好序的数据,创建相应的输入分区:PTFPartition inputPart;

   WindowTableFunction 负责管理窗口帧、调用窗口函数(UDAF)、并将结果写入输出分区: PTFPartition outputPart。

二、源码分析

2.1 PTFOperator 类

   是PartitionedTableFunction的运算符,继承Operator抽象类(Hive运算符基类)

重写process(Object row, int tag) 方法,该方法来处理一行数据Row

@Override
    public void process(Object row, int tag) throws HiveException {
        if (!isMapOperator) {
            /*
             * check if current row belongs to the current accumulated Partition:
             * - If not:
             *  - process the current Partition
             *  - reset input Partition
             * - set currentKey to the newKey if it is null or has changed.
             */
            newKeys.getNewKey(row, inputObjInspectors[0]);
            //会判断当前row所属的Key(newKeys)是否等于当前正在累积数据的partition所属的key(currentKeys)
            boolean keysAreEqual = (currentKeys != null && newKeys != null) ?
                    newKeys.equals(currentKeys) : false;
            // 如果不相等,就结束当前partition分区的数据累积,触发窗口计算
            if (currentKeys != null && !keysAreEqual) {
                // 关闭正在积累的分区
                ptfInvocation.finishPartition();
            }
            // 如果currentKeys为空或者被改变,就将newKeys赋值给currentKeys
            if (currentKeys == null || !keysAreEqual) {
                // 开启一个新的分区partition
                ptfInvocation.startPartition();
                if (currentKeys == null) {
                    currentKeys = newKeys.copyKey();
                } else {
                    currentKeys.copyKey(newKeys);
                }
            }
        } else if (firstMapRow) { // 说明当前row是进入的第一行
            ptfInvocation.startPartition();
            firstMapRow = false;
        }
        // 将数据row添加到分区中,积累数据
        ptfInvocation.processRow(row);
    }

   上面的代码可以看出,所有数据应该是按照分区排好了序,排队进入process方法,当遇到进入的row和当前分区不是同一个key时,当前分区就可以关闭了,然后在打开下一个分区。

2.2 PTFInvocation类

  PTFInvocationPTFOperator类 的内部类

 在PTFOperator的初始化方法中创建了实例。

@Override
  protected void initializeOp(Configuration jobConf) throws HiveException {
    ...
    ptfInvocation = setupChain();
    ptfInvocation.initializeStreaming(jobConf, isMapOperator);
    ...
  }

   它的主要作用是负责PTF 数据链中行( row)的流动,通过 ptfInvocation.processRow(row) 方法调用传递链中的每一行,并且通过ptfInvocation.startPartition()、ptfInvocation.finishPartition()方法来通知分区何时开始何时结束。

 该类中包含TableFunction,用来处理分区数据。

PTFPartition inputPart; // inputPart理解为:分区对象,一直是在复用一个inputPart
TableFunctionEvaluator tabFn; // tabFn理解为:窗口函数的实例

//向分区中添加一行数据
void processRow(Object row) throws HiveException {
    if (isStreaming()) {
            // tabFn是窗口函数的实例
        handleOutputRows(tabFn.processRow(row));
    } else {
        // inputPart就是当前正在累积数据的分区
        inputPart.append(row);
    }
}

// 开启一个分区
void startPartition() throws HiveException {
    if (isStreaming()) {
        tabFn.startPartition();
    } else {
        if (prev == null || prev.isOutputIterator()) {
            if (inputPart == null) {
                // 创建新分区对象:PTFPartition对象
                createInputPartition();
            } else {
                // 重置分区
                inputPart.reset();
            }
        }
    }
    if (next != null) {
        next.startPartition();
    }
}

// 关闭一个分区
void finishPartition() throws HiveException {
    if (isStreaming()) {
        handleOutputRows(tabFn.finishPartition());
    } else {
        if (tabFn.canIterateOutput()) {
            outputPartRowsItr = inputPart == null ? null :
                    tabFn.iterator(inputPart.iterator());
        } else {
            // tabFn是窗口函数的实例,execute方法:执行窗口函数逻辑的计算,返回outputPart依旧是一个分区对象
            outputPart = inputPart == null ? null : tabFn.execute(inputPart);
            outputPartRowsItr = outputPart == null ? null : outputPart.iterator();
        }
        if (next != null) {
            if (!next.isStreaming() && !isOutputIterator()) {
                next.inputPart = outputPart;
            } else {
                if (outputPartRowsItr != null) {
                    while (outputPartRowsItr.hasNext()) {
                        next.processRow(outputPartRowsItr.next());
                    }
                }
            }
        }

    if (next != null) {
        next.finishPartition();
    } else {
        if (!isStreaming()) {
            if (outputPartRowsItr != null) {
                while (outputPartRowsItr.hasNext()) {
                    // 将窗口函数计算结果逐条输出到下一个Operator中
                    forward(outputPartRowsItr.next(), outputObjInspector);
                }
            }
        }
    }
}

2.3 PTFPartition类

   该类表示由TableFunctionWindowFunction来处理的行集合,使用PTFRowContainer来保存数据。

private final PTFRowContainer<List<Object>> elems; // 存放数据的容器

public void append(Object o) throws HiveException {
  //在往PTFPartition中添加数据时,如果当前累计条数超过了Int最大值(21亿),会抛异常。
    if (elems.rowCount() == Integer.MAX_VALUE) {
        throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition",
                Integer.MAX_VALUE));
    }

    @SuppressWarnings("unchecked")
    List<Object> l = (List<Object>)
            ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE);
    elems.addRow(l);
}

2.4 TableFunctionEvaluator类

   该类负责对分区内的数据做实际的窗口计算

public abstract class TableFunctionEvaluator {
transient protected PTFPartition outputPartition; // transient瞬态变量,该属性可以不参与序列化


// iPart理解为:分区对象
public PTFPartition execute(PTFPartition iPart)
        throws HiveException {
    if (ptfDesc.isMapSide()) {
        return transformRawInput(iPart);
    }
    PTFPartitionIterator<Object> pItr = iPart.iterator();
    PTFOperator.connectLeadLagFunctionsToPartition(ptfDesc.getLlInfo(), pItr);

    if (outputPartition == null) {
        outputPartition = PTFPartition.create(ptfDesc.getCfg(),
                tableDef.getOutputShape().getSerde(),
                OI, tableDef.getOutputShape().getOI());
    } else {
        outputPartition.reset();
    }
	// 入参1:输入PTFPartition转换的迭代器;入参2:输出PTFPartition
    execute(pItr, outputPartition);
    return outputPartition;
}

protected abstract void execute(PTFPartitionIterator<Object> pItr, PTFPartition oPart) throws HiveException;
}

 抽象方法 execute(PTFPartitionIterator pItr, PTFPartition oPart) 方法的具体实现在子类WindowingTableFunction

public class WindowingTableFunction extends TableFunctionEvaluator {

@Override
public void execute(PTFPartitionIterator<Object> pItr, PTFPartition outP) throws HiveException {
    ArrayList<List<?>> oColumns = new ArrayList<List<?>>();
    PTFPartition iPart = pItr.getPartition();
    StructObjectInspector inputOI = iPart.getOutputOI();

    WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) getTableDef();
    for (WindowFunctionDef wFn : wTFnDef.getWindowFunctions()) {
    	// 这里是判断逻辑:如果该窗口定义是一个从第一行到最后一行的全局无限窗口就返回false,反之true
        boolean processWindow = processWindow(wFn.getWindowFrame());
        pItr.reset();
        if (!processWindow) {
            Object out = evaluateFunctionOnPartition(wFn, iPart);
            if (!wFn.isPivotResult()) {
                out = new SameList(iPart.size(), out);
            }
            oColumns.add((List<?>) out);
        } else {
            oColumns.add(executeFnwithWindow(wFn, iPart));
        }
    }

    /*
     * Output Columns in the following order
     * - the columns representing the output from Window Fns
     * - the input Rows columns
     */
    for (int i = 0; i < iPart.size(); i++) {
        ArrayList oRow = new ArrayList();
        Object iRow = iPart.getAt(i);

        for (int j = 0; j < oColumns.size(); j++) {
            oRow.add(oColumns.get(j).get(i));
        }
        for (StructField f : inputOI.getAllStructFieldRefs()) {
            oRow.add(inputOI.getStructFieldData(iRow, f));
        }
        //最终将处理好的数据逐条添加到输出PTFPartition中
        outP.append(oRow);
    }
}

// Evaluate the function result for each row in the partition
ArrayList<Object> executeFnwithWindow(
        WindowFunctionDef wFnDef,
        PTFPartition iPart)
        throws HiveException {
    ArrayList<Object> vals = new ArrayList<Object>();
    for (int i = 0; i < iPart.size(); i++) {
    	// 入参:1.窗口函数、2.当前行的行号、3.输入PTFPartition对象
        Object out = evaluateWindowFunction(wFnDef, i, iPart);
        vals.add(out);
    }
    return vals;
}

// Evaluate the result given a partition and the row number to process
private Object evaluateWindowFunction(WindowFunctionDef wFn, int rowToProcess, PTFPartition partition)
        throws HiveException {
    BasePartitionEvaluator partitionEval = wFn.getWFnEval()
            .getPartitionWindowingEvaluator(wFn.getWindowFrame(), partition, wFn.getArgs(), wFn.getOI(), nullsLast);
    // 给定当前行,获取窗口的聚合
    return partitionEval.iterate(rowToProcess, ptfDesc.getLlInfo());
}


}

注:WindowingTableFunction类中的execute方法 ,没怎么理解清楚,待补充~

三、Hive SQL窗口函数实现原理

    window Funtion的使用语法:

select 
       col1,
       col2,
       row_number() over (partition by col1 order by col2) as rn
  from tableA

上面的语句主要分两部分

  • window函数部分(window_func)

  • 窗口定义部分

3.1 window函数部分

   windows函数部分即是:在窗口上执行的函数。主要有count 、sum、avg聚合类窗口函数、还有常用的row_number、rank这样的排序函数。

3.2  窗口定义部分

 j即为: over里面的内容

  • partition by

  • order by

  • rows | range between

3.3  window Function实现原理

   窗口函数的实现,主要借助 Partitioned Table Function (即PTF);

(1)PTF的输入可以是:表、子查询或另一个PTF函数输出;

(2)PTF输出是一张表。

写一个相对复杂的sql,来看一下执行窗口函数时,数据的流转情况:

select 
    id,
    sq,
    cell_type,
    rank,
    row_number() over(partition by id  order by rank ) ,
    rank() over(partition by id order by rank) as r,
    dense_rank() over(partition by  cell_type order by id) as dr  
 from window_test_table 
 group by
    id,
    sq,
    cell_type,
    rank;

数据流转如下图:

以上代码实现主要有三个阶段:

  • 计算除窗口函数以外所有的其他运算,如:group by,join ,having等。上面的代码的第一阶段即为:

select
    id, 
    sq, 
    cell_type, 
    rank
from window_test_table
group by
    id, 
    sq, 
    cell_type, 
    rank;
  • 将第一步的输出作为第一个 PTF 的输入,计算对应的窗口函数值。上面代码的第二阶段即为:

select 
    id,
    sq,
    cell_type,
    rank,
    rn,
    r 
from 
window(
   <w>,--将第一阶段输出记为w
   partition by id, --分区
   order by rank, --窗口函数的order
   [rn:row_number(),r:rank()] --窗口函数调用
 )

   由于row_number(),rank() 两个函数对应的窗口是相同的(partition by id  order by rank),因此,这两个函数可以在一次shuffle中完成。

  • 将第二步的输出结果作为 第二个PTF 的输入,计算对应的窗口函数值。上面代码的第三阶段即为:

select 
    id,
    sq,
    cell_type,
    rank,
    rn,
    r,
    dr
from 
window(
   <w1>,--将第二阶段输出记为w1
   partition by cell_type, --分区
   order by id, --窗口函数的order
   [dr:dense_rank()] --窗口函数调用
 )

    由于dense_rank()的窗口与前两个函数不同,因此需要再partition一次,得到最终的输出结果。

     总结:上述代码显示需要shuffle三次才能得到最终的结果(第一阶段的group by ,第二阶段,第三阶段的开窗操作)。对应到MapReduce程序,即需要经历三次 map->reduce组合;对应到spark sql上,需要Exchange三次,再加上中间排序操作,在数据量很大的情况下,效率上确实会有较大的影响。

四、窗口函数的性能问题

   在使用Hive进行数据处理时,借助窗口函数可以对数据进行分组、排序等操作,但是在使用row_number这类窗口函数时,可能会遇到性能较慢的问题,比普通的聚合函数( sum,min,max等)运行成本更高,为啥?

4.1 性能问题产生原因

4.1.1 第一个版本

  小破站一个up主给出的答案:

 原因:

(1)开窗函数不能做预聚合 ,数据量很多,shuffle慢,计算慢,并且会有

数据倾斜的风险。

(2)开窗多一步order by ,更耗时间。

4.1.2 第二个版本

原因:

(1)普通的聚合函数语句,可以根据函数不同,采用partial + merge 的方式运行,也即是map端预聚合;但那是window 窗口语句只能在reduce 端一次性聚合,即只有complete 执行模式。

(2)普通聚合函数的物理执行计划分为SortBased和HashBased的;而window则都是SortBased。

(3)window语句作用于 对行,并为每行返回一个聚合结果,这决定了window在执行过程中需要更大的buffer 进行汇总。

4.2 性能问题的优化方法

4.2.1 用聚合函数替代 排序开窗函数

     例如:假设需要求出历史至今用户粒度末次交易的sku名称或者交易金额等,这种情况下,可以将 交易时间和sku名称拼接起来,取max ,之后再将sku名称拆解开,即能达到预期效果。

    在Hive 中,row_number是一个常用的窗口函数,用于为结果集中的每一行分配一个唯一的数字。通常会搭配over子句来指定窗口的范围和排序方式。例如:

select 
       col1,
       col2,
       row_number() over (partition by col1 order by col2) as rn
  from tableA

   上述示例row_number 函数将根据col1进行分组,并按照col2的值进行排序,为每一组数据分配一个唯一的行号。然而,在处理大规模数据时,使用row_number可能会导致性能下降,这是因为row_number 需要对数据进行排序和标记,而这些操作在大数据量下会消耗较多的计算资源。

 以下 都是row_n   umber开窗函数性能优化的几种方式:

4.2.2 减少数据量

   一种最直接的优化方法是减少需要进行row_number计算的数据量。可以通过在where子句中添加条件、对数据进行分区等方式来减小数据规模,从而提升计算性能。

   ps: 这种方式在生产环境中用过。

4.2.3 避免多次排序

   在使用row_number时,尽量避免多次排序操作。可以将row_number 函数应用在子查询中,然后再进行排序操作,避免重复的排序过程。

select
     col1,
     col2,
     rn
from 
( select 
       col1,
       col2,
       row_number() over (partition by col1 order by col2) as rn
  from tableA) tmp1
order by col1,col2;

参考文章:

常用的SQL优化方式, 用聚合函数替代排序开窗求最值, sparksql, hivesql_哔哩哔哩_bilibili

https://blog.51cto.com/u_16213435/9877979

Hive学习(一)窗口函数源码阅读_hive 源码阅读-CSDN博客

https://mp.weixin.qq.com/s/WBryrbpHGO9jmzMp0e7jhw

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1978537.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

stm32工程配置

目录 STM32F103 start&#xff1a;启动文件、内核寄存器文件、外设寄存器文件、时钟配置文件 library&#xff1a;标准库函数&#xff08;内核及外设驱动&#xff09; user&#xff1a;用户文件、库函数配置文件、中断程序文件 添加宏定义 STM32F407 start目录 启动文件…

实战:使用Certbot签发免费ssl泛域名证书(主域名及其它子域名共用同一套证书)-2024.8.4(成功测试)

1、使用Certbot签发免费ssl泛域名证书 | One实战&#xff1a;使用Certbot签发免费ssl泛域名证书(主域名及其它子域名共用同一套证书)-2024.8.4(成功测试)https://wiki.onedayxyy.cn/docs/docs/Certbot-install/

Transformer相关介绍

1 Transformer 介绍 Transformer的本质上是一个Encoder-Decoder的结构。 1.1 编码器 在Transformer模型中&#xff0c;编码器&#xff08;Encoder&#xff09; 的主要作用是将输入序列&#xff08;例如文本、语音等&#xff09;转换为隐藏表示&#xff08;或者称为特征表示…

24军dui文职联勤保障部报名照规格要求

24军dui文职联勤保障部报名照规格要求 #军队文职 #文职 #文职备考 #联勤保障部队 #文职考试 #文职上岸 #2024军队文职

python-查找元素3(赛氪OJ)

[题目描述] 有n个不同的数&#xff0c;从小到大排成一列。现在告诉你其中的一个数x&#xff0c;x不一定是原先数列中的数。你需要输出最后一个<x的数在此数组中的下标。输入&#xff1a; 输入共两行第一行为两个整数n、x。第二行为n个整数&#xff0c;代表a[i]。输出&#x…

练习2.30

2.29题目没有理解,暂时没有做出来,先把2.30做了 上代码 (defn square [x](* x x)) ;第一版,直接定义 (defn square-tree[tree](cond (not (seq? tree)) (square tree)(empty? tree) nil:else (cons (square-tree (first tree)) (square-tree (rest tree)))) ) ;使用map …

LeetCode刷题笔记 | 283 | 移动零 | 双指针 |Java | 详细注释

&#x1f64b;大家好&#xff01;我是毛毛张! &#x1f308;个人首页&#xff1a; 神马都会亿点点的毛毛张 原地移除元素2 LeetCode链接&#xff1a;283. 移动零 1.题目描述 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元…

Nextjs——国际化那些事儿

背景&#xff1a; 某一天&#xff0c;产品经理跟我说&#xff0c;我们的产品需要搞国际化 国际化的需求说白了就是把项目中的文案翻译成不同的语言&#xff0c;用户想用啥语言来浏览网页就用啥语言&#xff0c;虽然说英语是通用语言&#xff0c;但国际化了嘛&#xff0c;产品才…

学习编程的第二十天,加油!

3&#xff1a;递归与迭送&#xff08;循环是一种迭代&#xff09; &#xff01;&#xff01;&#xff01;递归算有些东西时计算量会很大导致运行时间过久&#xff0c;而使用循环会大大节省时间&#xff0c;但需要注意溢出的情况。 递归的练习&#xff0c;第一张呢不符合我们的…

刷题——不同路径的数目

不同路径的数目(一)_牛客题霸_牛客网 我第一眼&#xff0c;觉得是没有思路的&#xff0c;我也是看别人代码反应过来&#xff0c; 画图可以看出来 外边沿的只有一种到达方式&#xff0c;全部赋值1&#xff0c; 如果有两个方块相接&#xff0c;那就让此方块的左邻和右邻相加&…

线程池ThreadPoolExecutor使用

文章目录 一、基础-Java中线程创建的方式1.1、继承Thread类创建线程1.2、实现Runnable接口创建线程1.3、实现Calable接口创建线程1.4、使用线程池创建线程二、概念-线程池基本概念2.1、并发和井行的主要区别2.1.1、处理任务不同2.1.2、存在不同2.1.3、CPU资源不同2.2、什么是线…

网页保护用户 小tips

在使用创建web开发的过程中&#xff0c;直接使用用户名url&#xff0c;容易造成用户信息的被攻击&#xff0c;例如对方直接访问 ../../.../username 的网页&#xff0c;可以窃取用户信息&#xff0c;然而把usename变成一堆乱码就安全的多 效果&#xff1a; 代码&#xff1a;…

一个 .NET 开源的地图组件库 - Mapsui

前言 今天大姚给大家分享一个.NET开源&#xff08;MIT License&#xff09;、免费、同时支持多平台框架&#xff08;MAUI、WPF、Avalonia、Uno、Blazor、WinUI、Eto、.NET Android 和 .NET iOS&#xff09;地图组件库&#xff1a;Mapsui。 项目源代码 支持的UI框架的NuGet包 …

JavaFX布局-TabPane

JavaFX布局-TabPane 常用属性paddingsidetabClosingPolicytabDragPolicy 实现方式Java实现fxml实现 组织一组tab的容器&#xff0c;可以设置关闭&#xff0c;拖拽等每个tab内容可以设置不同容器数据 常用属性 padding 内边距&#xff0c;可以单独设置上、下、左、右的内边距 …

webfunny埋点系统如何进行部署?

hello 大家webfunny埋点系统做了不少功能更新&#xff0c;平常给大家分享比较多的是**webfunny前端监控系统**&#xff0c;最近有不少技术同学来了解webfunny埋点系统&#xff0c;今天主要给大家分享下webfunny埋点系统部署&#xff0c;分为本地部署和线上部署。 还没有试用和…

翻译: 可视化深度学习反向传播原理一

本期我们来讲反向传播 也就是神经网络学习的核心算法 稍微回顾一下我们之前讲到哪里之后 首先我要撇开公式不提 直观地过一遍 这个算法到底在做什么 然后如果你们有人想认真看里头的数学 下一期影片我会解释这一切背后的微积分 如果你看了前两期影片 或者你已经有足够背景知…

【文件系统】磁盘的物理结构 | 存储结构

目录 0.前言 1.磁盘的物理结构 1.1什么是二进制&#xff08;0/1&#xff09; 1.2磁盘的存放位置 1.3磁盘的结构 2.磁盘的存储结构 2.1❓数据是怎样在磁盘上存储 2.2❓读写的基本单位是什么 2.3❓如何找到一个指定位置的扇区 2.4❓为什么磁盘的机械结构是这样的 0.前…

Audio Spectrogram Transformer (AST)工作介绍

Audio Spectrogram Transformer (AST)&#xff0c;是一种基于 Transformer 模型的音频分类方法。AST 利用了 Transformer 模型在捕获全局特征方面的优势&#xff0c;将音频信号转换为频谱图进行处理。下面是对 AST 及其相关研究工作的详细介绍&#xff1a; 1.研究背景 传统的音…

SpringAOP-底层实现源码解析

目录 1. Spring AOP原理流程图 2. 动态代理 3. ProxyFactory 4. Advice的分类 5. Advisor的理解 6. 创建代理对象的方式 ProxyFactoryBean BeanNameAutoProxyCreator DefaultAdvisorAutoProxyCreator 7. 对Spring AOP的理解 8. AOP中的概念 9. Advice在Spring AOP中…

文心智能体【MBTI速测小精灵】:趣味速测,精准解析你的性格密码!

文章目录 一、文心智能体平台是什么&#xff1f;二、创建文心智能体智能体创建智能体调试分析智能体基础配置智能体高级配置智能体高级调试 三、文心智能体发布四、文心智能体体验总结 一、文心智能体平台是什么&#xff1f; AgentBuilder文心智能体平台是基于文心大模型的智能…