2023-01-28 clickhouse-聚合函数的源码再梳理

news2025/1/9 2:54:18

笔者在源码笔记1之中分析过ClickHouse的聚合函数的实现,但是对于各个接口函数的实际如何共同工作的源码,回头看并没有那么明晰,主要原因是没有结合Aggregator的类来一起分析聚合函数的是如果工作起来的。所以决定重新再完成一篇聚合函数的源码梳理的文章,帮助大家进一步的理解ClickHouse之中聚合函数的工作原理。 本系列文章的源码分析基于ClickHouse v19.16.2.2的版本。

1.IAggregateFunction接口梳理

话不多说,直接上代码,笔者这里会将所有聚合函数的核心接口代码全部列出,一一梳理各个部分:

构造函数

 IAggregateFunction(const DataTypes & argument_types_, const Array & parameters_)
        : argument_types(argument_types_), parameters(parameters_) {}

复制

上面的代码实现了IAggregateFunction接口的构造函数,初始化了该接口的两个成员变量:

  • argument_type:函数的参数类型,比如函数select sum(a), sum(b), c from test group by c, 这里a, b分别是UInt16类型与Double类型,那么这个sum(a)sum(b)的参数就不同。
  • parameters: 参数,实际类型为std::vector<Field>。它代表着函数的除了数据的输入参数之外的其他参数。比如聚合函数topk,其中需要传入的k的值就在parameters之中。

内存分配接口

在Clickhouse的聚合执行过程之中,所有的聚合函数都是通过列来进行的。而这里有两个重要的问题:

  • 列内存从哪里分配
  • 分配的内存结构,长度是如何的 笔者在梳理下面代码的过程之中给出解答,
    /** Create empty data for aggregation with `placement new` at the specified location.
      * You will have to destroy them using the `destroy` method.
      */
    virtual void create(AggregateDataPtr place) const = 0;

    /// Delete data for aggregation.
    virtual void destroy(AggregateDataPtr place) const noexcept = 0;

复制

IAggregateFunction定义的两个接口createdestory接口完成了内存结构与长度的确定,这里可能描述的不是很明白,这里了解Doris聚合实现的同学可以这样理解。create函数本身就是完成了Doris聚合函数之中init函数所完成的工作。这里通过子类IAggregateFunctionDataHelper的实现代码来进一步理解它做了什么事情:

    void create(AggregateDataPtr place) const override
    {
        new (place) Data;
    }

    void destroy(AggregateDataPtr place) const noexcept override
    {
        data(place).~Data();
    }

复制

这部分代码很简单,Data就是模板派生的类型,然后通过placement newplacement delete的方式完成了Data类型的构造与析构。而这个Data类型就是聚合函数存储中间结果的类型,比如sum的聚合函数的派生类型是类AggregateFunctionSumData的内存结构,它不仅包含了聚合结果的数据sum同时也包含了一组进行聚合计算的函数接口add,merge等:

template <typename T>
struct AggregateFunctionSumData
{
    T sum{};

    void add(T value)
    {
        sum += value;
    }

    void merge(const AggregateFunctionSumData & rhs)
    {
        sum += rhs.sum;
    }

    T get() const
    {
        return sum;
    }
};

复制

这里就是通过createdestory函数调用AggregateFunctionSumData的构造函数与析构函数。而问题又绕回第一个问题了,这部分内存是在哪里分配的呢?

 aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
 createAggregateStates(aggregate_data);

复制

在进行聚合运算时,通过Aggregator之中的内存池进行单行所有的聚合函数的数据结果的内存分配。并且调用createAggregateStates依次调用各个聚合函数的create方法进行构造函数的调用。这部分可能有些难理解,我们接着看下面的流程图,来更好的帮助理解:

create函数在聚合的流程之中的作用

通过上述流程图可以看到,create这部分就是在构造聚合hash表时,进行内存初始化工作的,而这部分内存不仅仅包含了聚合函数的结果数据,还包含了对应聚合算子的函数指针。后文我们分析计算接口的时候也会同样看到。接下来,来看destory就很容易理解了,就是在聚合计算结束或取消时,遍历hash表,并调用析构函数对hash表中存储的Data类型调用析构函数,而最终的内存伴随着aggregates_pool内存池的析构而同时释放。

detory函数在聚合流程之中的作用

函数计算接口

接下来就是聚合函数最核心的部分,聚合函数的计算。

/** Adds a value into aggregation data on which place points to.
     *  columns points to columns containing arguments of aggregation function.
     *  row_num is number of row which should be added.
     *  Additional parameter arena should be used instead of standard memory allocator if the addition requires memory allocation.
     */
    virtual void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0;

    /// Merges state (on which place points to) with other state of current aggregation function.
    virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;

    /** Contains a loop with calls to "add" function. You can collect arguments into array "places"
      *  and do a single call to "addBatch" for devirtualization and inlining.
      */
    virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;

复制

IAggregateFunction定义的3个接口:

  • add函数将对应AggregateDataPtr指针之中数据取出,与列columns中的第row_num的数据进行对应的聚合计算
  • addBatch函数:这是函数也是非常重要的,虽然它仅仅实现了一个for循环调用add函数。它通过这样的方式来减少虚函数的调用次数,并且增加了编译器内联的概率,同样,它实现了高效的向量化。
  • merge函数:将两个聚合结果进行合并的函数,通常用在并发执行聚合函数的过程之中,需要将对应的聚合结果进行合并。

这里的两个函数类似Doris之中聚合函数的updatemerge。接下来我们看它是如何完成工作的。

首先看聚合节点Aggregetor是如何调用addBatch函数:

   /// Add values to the aggregate functions.
    for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
        inst->that->addBatch(rows, places.data(), inst->state_offset, inst->arguments, aggregates_pool);

复制

这里依次遍历AggregateFunction,并调用addBatch接口。而addBatch接口就是一行行的遍历列,将参数列inst->arguments与上文提到create函数构造的聚合数据结构的两列列数据进行聚合计算:

    void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
    {
        for (size_t i = 0; i < batch_size; ++i)
            static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
    }

复制

这里还是调用了add函数,我们通过AggregateFunctionSum作为子类来具体看一下add的具体实现:

   void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
    {
        const auto & column = static_cast<const ColVecType &>(*columns[0]);
        this->data(place).add(column.getData()[row_num]);
    }

复制

这里其实还是调用上文提到的AggregateFunctionSumData的内存结构的add函数完成聚合计算。而这个add函数就是一个简单的相加逻辑,这样就完成了简单的一次聚合运算。

   void add(T value)
    {
        sum += value;
    }

复制

merge函数的实现逻辑类似于add函数,这里就不展开再次分析了。

函数结果输出接口

最后就是聚合函数结果输出接口,将聚合计算的结果重新组织为列存。

  /// Inserts results into a column.
    virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0;

复制

首先看聚合节点Aggregator是如何调用insertResultInto函数的

 data.forEachValue([&](const auto & key, auto & mapped)
    {
        method.insertKeyIntoColumns(key, key_columns, key_sizes);

        for (size_t i = 0; i < params.aggregates_size; ++i)
            aggregate_functions[i]->insertResultInto(
                mapped + offsets_of_aggregate_states[i],
                *final_aggregate_columns[i]);
    });

复制

Aggregetor同样是遍历hash表之中的结果,将key列先组织成列存,然后调用insertResultInto函数将聚合计算的结果也转换为列存。 这里我们找一个sum函数的实现,来看看insertResultInto函数接口是如何工作的:

    void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
    {
        auto & column = static_cast<ColVecResult &>(to);
        column.getData().push_back(this->data(place).get());
    }

复制

其实很简单,就是调用AggregateDataPtr,也就是AggregateFunctionSumDataget()函数获取sum计算的结果,然后添加到列内存之中。

get函数接口的实现如下:

    T get() const
    {
        return sum;
    }

复制

2.聚合函数的注册流程

有了上述的背景知识,我们接下来举个栗子。来看看一个聚合函数的实现细节,以及它是如何被使用的。

AggregateFunctionSum

这里选取了一个很简单的聚合算子Sum,我们来看看它实现的代码细节。 这里我们可以看到AggregateFunctionSum是个final类,无法被继承了。而它继承IAggregateFunctionHelp类与IAggregateFunctionDataHelper类。

  • IAggregateFunctionHelp类 通过CRTP让父类可以直接调用子类的add函数指针而避免了虚函数调用的开销。
  • IAggregateFunctionHelper类则包含了Data的模板数据类型,也就是上文提及的AggregateFunctionSumData进行内存结构的createdestory等等。

这里我们就重点看,这个类override了getName方法,返回了对应的名字时sum。并且实现了我们上文提到核心方法。

template <typename T, typename TResult, typename Data>
class AggregateFunctionSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>
{
public:
    using ResultDataType = std::conditional_t<IsDecimalNumber<T>, DataTypeDecimal<TResult>, DataTypeNumber<TResult>>;
    using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
    using ColVecResult = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<TResult>, ColumnVector<TResult>>;

    String getName() const override { return "sum"; }

    AggregateFunctionSum(const DataTypes & argument_types_)
        : IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {})
        , scale(0)
    {}

    AggregateFunctionSum(const IDataType & data_type, const DataTypes & argument_types_)
        : IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {})
        , scale(getDecimalScale(data_type))
    {}

    DataTypePtr getReturnType() const override
    {
        if constexpr (IsDecimalNumber<T>)
            return std::make_shared<ResultDataType>(ResultDataType::maxPrecision(), scale);
        else
            return std::make_shared<ResultDataType>();
    }

    void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
    {
        const auto & column = static_cast<const ColVecType &>(*columns[0]);
        this->data(place).add(column.getData()[row_num]);
    }

    void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
    {
        this->data(place).merge(this->data(rhs));
    }

    void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
    {
        this->data(place).write(buf);
    }

    void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
    {
        this->data(place).read(buf);
    }

    void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
    {
        auto & column = static_cast<ColVecResult &>(to);
        column.getData().push_back(this->data(place).get());
    }

private:
    UInt32 scale;
};

复制

之前我们讲到AggregateFunction的函数就是通过AggregateDataPtr指针来获取AggregateFunctionSumData的地址,来调用add实现聚合算子的。我们可以看到AggregateFunctionSumData实现了前文提到的add, merge, write,read四大方法,正好与接口IAggregateFunction一一对应上了。

template <typename T>
struct AggregateFunctionSumData
{
    T sum{};

    void add(T value)
    {
        sum += value;
    }

    void merge(const AggregateFunctionSumData & rhs)
    {
        sum += rhs.sum;
    }

    void write(WriteBuffer & buf) const
    {
        writeBinary(sum, buf);
    }

    void read(ReadBuffer & buf)
    {
        readBinary(sum, buf);
    }

    T get() const
    {
        return sum;
    }
};

复制

ClickHouse在Server启动时。main函数之中会调用registerAggregateFunction的初始化函数注册所有的聚合函数。 然后调用到下面的函数注册sum的聚合函数:

void registerAggregateFunctionSum(AggregateFunctionFactory & factory)
{
    factory.registerFunction("sum", createAggregateFunctionSum<AggregateFunctionSumSimple>, AggregateFunctionFactory::CaseInsensitive);
}

复制

也就是完成了这个sum聚合函数的注册,后续我们get出来就可以愉快的调用啦。(这部分有许多模板派生的复杂代码,建议与源码结合梳理才能事半功倍~~)

3.要点梳理

第二小节解析了一个聚合函数与接口意义对应的流程,这里重点梳理聚合函数实现的源码要点:

  1. 各个聚合函数核心的实现add,merge与序列化,内存结构初始化,内存结构释放的接口。
  2. 各个函数的实现需要继承IAggregateFunctionDataHelper的接口,而它的父类是IAggregateFunctionHelperIAggregateFunction接口。
  3. ClickHouse的聚合函数保证了每次循环遍历一个Block只调用一个IAggregateFunction的聚合函数,这样最大程度上确保了向量化执行的可能性,减少了数据偏移与依赖。

4. 小结

好了,到这里也就把ClickHouse聚合函数部分的代码梳理完了。 除了sum函数外,其他的函数的执行也是同样通过类似的方式依次来实现和处理的,源码阅读的步骤也可以参照笔者的分析流程来参考。 笔者是一个ClickHouse的初学者,对ClickHouse有兴趣的同学,欢迎多多指教,交流。

5. 参考资料

官方文档 ClickHouse源代码

ClickHouse源码笔记5:聚合函数的源码再梳理 - 腾讯云开发者社区-腾讯云

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/182696.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

梦熊杯-十二月月赛-白银组题解-A.自由

A. Problem A.自由&#xff08;freedom.cpp&#xff09; 内存限制&#xff1a;256 MiB 时间限制&#xff1a;1000 ms 标准输入输出 题目类型&#xff1a;传统 评测方式&#xff1a;文本比较 题目描述: 「蒙德」是「自由」的国度。 巴巴托斯认为&#xff0c;如果一个数的…

ch1_2 计算机的基本组成

计算机的基本组成 1. 冯 诺依曼计算机的特点 计算机由五大部件组成指令和数据 以同等地位 存于存储器&#xff0c; 可按地址寻访。指令和数据用二进制 表示指令由操作码 和 地址码 组成&#xff1b;存储程序&#xff1b;以运算器 为中心&#xff1b; 2. 硬件框图 存储器&am…

【Java集合】HashSet源码分析

目录 一、Set简介 二、HashSet简介 2.1 简介 2.2 HashSet继承关系 三、源码分析 3.1 成员属性 3.2 构造方法 3.3 添加元素 3.3.1 add()方法 3.3.2 addAll()方法 3.4 删除元素 3.4.1 remove()方法 3.4.2 removeAll()方法 3.5 查询元素 3.5.1 contains()方法 3.5.2 containsAll方…

项目管理:如何编写高质量的Makefile?

文章目录背景熟练掌握 Makefile 语法规划 Makefile 要实现的功能设计合理的 Makefile 结构掌握 Makefile 编写技巧技巧 1&#xff1a;善用通配符和自动变量技巧 2&#xff1a;善用函数技巧 3&#xff1a;依赖需要用到的工具技巧 4&#xff1a;把常用功能放在 /Makefile 中&…

nodeJS - 切换使用淘宝镜像【临时切换、 长期切换】

一、文章引导 #mermaid-svg-zWQadgqvTsLhAes4 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-zWQadgqvTsLhAes4 .error-icon{fill:#552222;}#mermaid-svg-zWQadgqvTsLhAes4 .error-text{fill:#552222;stroke:#55222…

自动驾驶感知——视觉感知经典算法

文章目录1. 车道线检测技术1.1 基于规则的车道线检测技术1.1.1 流程框架1.1.2 预处理模块1.1.3 车道线识别感兴趣区域提取1.1.4 灰度图转化1.1.5 灰度图去噪1.1.6 二值化操作1.1.7 鲁棒性参数估计——RANSAC1.1.8 后处理模块1.1.9 输出1.2 车道线检测技术发展路线2. 目标检测技…

10.图和树基础

一、基本介绍 1.图 图描述的是一些个体之间的关系。这些个体之间既不是前驱后继的顺序关系&#xff0c;也不是祖先后代的层次关系&#xff0c;而是错综复杂的网状关系。我们一般用图G(V,E)G(V,E)G(V,E)来表示&#xff0c;VVV表示结点&#xff0c;EEE表示边。 根据边是否有权值…

爱快软路由安装Docker插件

在爱快云 插件应用中开启Docker插件 在爱快web端页面的[系统设置]->[磁盘管理]->[磁盘分区]设置磁盘分区&#xff0c;选择普通存储&#xff0c;挂载路径名可以随便取。 点击[高级应用]->[插件管理] 点击页面的Docker图标。 启用Docker服务 点击中间的[镜像管理]&…

n皇后问题

n皇后问题 题目&#xff1a; 按照国际象棋的规则&#xff0c;皇后可以攻击与之处在同一行 或同一列 或同一斜线上的棋子。 n 皇后问题 研究的是如何将 n 个皇后放置在 nn 的棋盘上&#xff0c;并且使皇后彼此之间不能相互攻击。 给你一个整数 n &#xff0c;返回所有不同的…

基于java的大理旅游系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…

professional issue复习

Legal concepts Development of UK law • The Kingdom of England was established in 927. • The Principality of Wales was established in 1216. Common law • Following 1066, a unified system of law (English common law) slowly came into existence. It was “c…

漫谈cgroup

什么是cgroup cgroup 是linux内核的一个功能&#xff0c;用来限制、控制与分离一个进程组的资源&#xff08;如CPU、内存、磁盘I/O等&#xff09;。它是由 Google 的两位工程师进行开发的&#xff0c;自 2008 年 1 月正式发布的 Linux 内核 v2.6.24 开始提供此能力。 cgroup …

代码随想录算法训练营第30天 二叉树 java :39. 组合总和 40.组合总和II 131.分割回文串

文章目录LeetCode 39. 组合总和本题题解思路LeetCode 40.组合总和II本题题解思路LeetCode 131.分割回文串本题题解思路那么在代码里什么是切割线呢&#xff1f;那么在代码里什么是切割线呢&#xff1f;总结LeetCode 39. 组合总和 本题题解 思路 根据递归三部曲来分析 递归函…

单板硬件设计:存储器

在单板设计中&#xff0c;无论是涉及到一个简易的CPU、MCU小系统或者是复杂的单板设计&#xff0c;都离不开存储器设计&#xff1a; 1、存储器介绍 存储器的分类大致可以划分如下&#xff1a; ROM和RAM指的都是半导体存储器&#xff0c;ROM在系统停止供电的时候仍然可以保持…

visudo配置sudo权限

visudo配置sudo权限配置visudo仅允许字符终端登陆(tty)--授权localhost允许图形和tty登陆--授权all用户组提权-示例配置在sudoers.d目录下创建授权文件--推荐五段式配置三段式配置检查sudoers配置是否有误如何在sudo运行的命令中防止使用参数结果验证配置visudo https://blog.…

【数据结构】8.2 插入排序

文章目录前言1. 直接插入排序直接插入排序算法直接插入排序性能分析2. 折半插入排序3. 希尔排序希尔排序算法希尔排序算法分析排序方法比较前言 类似于俺们打牌时的插入&#xff0c;每抓来一张牌的时候&#xff0c;就将它放在合适的位置上&#xff0c;插入一张牌之后手里的牌仍…

MQ相关概念

1) 队列管理器 队列管理器是MQ系统中最上层的一个概念&#xff0c;由它为我们提供基于队列的消息服务。 2) 消息 在MQ中&#xff0c;我们把应用程序交由MQ传输的数据定义为消息&#xff0c;我们可以定义消息的内容并对消息进行广义的理解&#xff0c;比如&#xff1a;用户的各种…

JavaWeb-FilterListener

JavaWeb-Filter&Listener 1&#xff0c;Filter 1.1 Filter概述 Filter 表示过滤器&#xff0c;是 JavaWeb 三大组件(Servlet、Filter、Listener)之一。 过滤器可以把对资源的请求拦截下来&#xff0c;从而实现一些特殊的功能。 如下图所示&#xff0c;浏览器可以访问服…

JAVA性能统计项目

一、项目背景&#xff1a;我们希望设计开发一个小的框架&#xff0c;能够获取接口调用的各种统计信息&#xff0c;比如&#xff0c;响应时间的最大值&#xff08;max&#xff09;、最小值&#xff08;min&#xff09;、平均值&#xff08;avg&#xff09;、百分位值&#xff08…

力扣OJ(2000+)

目录 2032. 至少在两个数组中出现的值 2037. 使每位学生都有座位的最少移动次数 2042. 检查句子中的数字是否递增 2097. 合法重新排列数对 2180. 统计各位数字之和为偶数的整数个数 2185. 统计包含给定前缀的字符串 2283. 判断一个数的数字计数是否等于数位的值 2287. …