2023-01-10 clickhouse-聚合函数的源码再梳理

news2024/11/26 13:38:01

https://cloud.tencent.com/developer/article/1815441

1.IAggregateFunction接口梳理

话不多说,直接上代码,笔者这里会将所有聚合函数的核心接口代码全部列出,一一梳理各个部分:

构造函数

 IAggregateFunction(const DataTypes & argument_types_, const Array & parameters_)
        : argument_types(argument_types_), parameters(parameters_) {}

复制

上面的代码实现了IAggregateFunction接口的构造函数,初始化了该接口的两个成员变量:

  • argument_type:函数的参数类型,比如函数select sum(a), sum(b), c from test group by c, 这里a, b分别是UInt16类型与Double类型,那么这个sum(a)sum(b)的参数就不同。
  • parameters: 参数,实际类型为std::vector<Field>。它代表着函数的除了数据的输入参数之外的其他参数。比如聚合函数topk,其中需要传入的k的值就在parameters之中。

内存分配接口

在Clickhouse的聚合执行过程之中,所有的聚合函数都是通过列来进行的。而这里有两个重要的问题:

  • 列内存从哪里分配
  • 分配的内存结构,长度是如何的 笔者在梳理下面代码的过程之中给出解答,
    /** Create empty data for aggregation with `placement new` at the specified location.
      * You will have to destroy them using the `destroy` method.
      */
    virtual void create(AggregateDataPtr place) const = 0;

    /// Delete data for aggregation.
    virtual void destroy(AggregateDataPtr place) const noexcept = 0;

复制

IAggregateFunction定义的两个接口createdestory接口完成了内存结构与长度的确定,这里可能描述的不是很明白,这里了解Doris聚合实现的同学可以这样理解。create函数本身就是完成了Doris聚合函数之中init函数所完成的工作。这里通过子类IAggregateFunctionDataHelper的实现代码来进一步理解它做了什么事情:

    void create(AggregateDataPtr place) const override
    {
        new (place) Data;
    }

    void destroy(AggregateDataPtr place) const noexcept override
    {
        data(place).~Data();
    }

复制

这部分代码很简单,Data就是模板派生的类型,然后通过placement newplacement delete的方式完成了Data类型的构造与析构。而这个Data类型就是聚合函数存储中间结果的类型,比如sum的聚合函数的派生类型是类AggregateFunctionSumData的内存结构,它不仅包含了聚合结果的数据sum同时也包含了一组进行聚合计算的函数接口add,merge等:

template <typename T>
struct AggregateFunctionSumData
{
    T sum{};

    void add(T value)
    {
        sum += value;
    }

    void merge(const AggregateFunctionSumData & rhs)
    {
        sum += rhs.sum;
    }

    T get() const
    {
        return sum;
    }
};

复制

这里就是通过createdestory函数调用AggregateFunctionSumData的构造函数与析构函数。而问题又绕回第一个问题了,这部分内存是在哪里分配的呢?

 aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
 createAggregateStates(aggregate_data);

复制

在进行聚合运算时,通过Aggregator之中的内存池进行单行所有的聚合函数的数据结果的内存分配。并且调用createAggregateStates依次调用各个聚合函数的create方法进行构造函数的调用。这部分可能有些难理解,我们接着看下面的流程图,来更好的帮助理解:

create函数在聚合的流程之中的作用

通过上述流程图可以看到,create这部分就是在构造聚合hash表时,进行内存初始化工作的,而这部分内存不仅仅包含了聚合函数的结果数据,还包含了对应聚合算子的函数指针。后文我们分析计算接口的时候也会同样看到。接下来,来看destory就很容易理解了,就是在聚合计算结束或取消时,遍历hash表,并调用析构函数对hash表中存储的Data类型调用析构函数,而最终的内存伴随着aggregates_pool内存池的析构而同时释放。

detory函数在聚合流程之中的作用

函数计算接口

接下来就是聚合函数最核心的部分,聚合函数的计算。

/** Adds a value into aggregation data on which place points to.
     *  columns points to columns containing arguments of aggregation function.
     *  row_num is number of row which should be added.
     *  Additional parameter arena should be used instead of standard memory allocator if the addition requires memory allocation.
     */
    virtual void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0;

    /// Merges state (on which place points to) with other state of current aggregation function.
    virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;

    /** Contains a loop with calls to "add" function. You can collect arguments into array "places"
      *  and do a single call to "addBatch" for devirtualization and inlining.
      */
    virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;

复制

IAggregateFunction定义的3个接口:

  • add函数将对应AggregateDataPtr指针之中数据取出,与列columns中的第row_num的数据进行对应的聚合计算
  • addBatch函数:这是函数也是非常重要的,虽然它仅仅实现了一个for循环调用add函数。它通过这样的方式来减少虚函数的调用次数,并且增加了编译器内联的概率,同样,它实现了高效的向量化。
  • merge函数:将两个聚合结果进行合并的函数,通常用在并发执行聚合函数的过程之中,需要将对应的聚合结果进行合并。

这里的两个函数类似Doris之中聚合函数的updatemerge。接下来我们看它是如何完成工作的。

首先看聚合节点Aggregetor是如何调用addBatch函数:

   /// Add values to the aggregate functions.
    for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
        inst->that->addBatch(rows, places.data(), inst->state_offset, inst->arguments, aggregates_pool);

复制

这里依次遍历AggregateFunction,并调用addBatch接口。而addBatch接口就是一行行的遍历列,将参数列inst->arguments与上文提到create函数构造的聚合数据结构的两列列数据进行聚合计算:

    void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const override
    {
        for (size_t i = 0; i < batch_size; ++i)
            static_cast<const Derived *>(this)->add(places[i] + place_offset, columns, i, arena);
    }

复制

这里还是调用了add函数,我们通过AggregateFunctionSum作为子类来具体看一下add的具体实现:

   void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
    {
        const auto & column = static_cast<const ColVecType &>(*columns[0]);
        this->data(place).add(column.getData()[row_num]);
    }

复制

这里其实还是调用上文提到的AggregateFunctionSumData的内存结构的add函数完成聚合计算。而这个add函数就是一个简单的相加逻辑,这样就完成了简单的一次聚合运算。

   void add(T value)
    {
        sum += value;
    }

复制

merge函数的实现逻辑类似于add函数,这里就不展开再次分析了。

函数结果输出接口

最后就是聚合函数结果输出接口,将聚合计算的结果重新组织为列存。

  /// Inserts results into a column.
    virtual void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const = 0;

复制

首先看聚合节点Aggregator是如何调用insertResultInto函数的

 data.forEachValue([&](const auto & key, auto & mapped)
    {
        method.insertKeyIntoColumns(key, key_columns, key_sizes);

        for (size_t i = 0; i < params.aggregates_size; ++i)
            aggregate_functions[i]->insertResultInto(
                mapped + offsets_of_aggregate_states[i],
                *final_aggregate_columns[i]);
    });

复制

Aggregetor同样是遍历hash表之中的结果,将key列先组织成列存,然后调用insertResultInto函数将聚合计算的结果也转换为列存。 这里我们找一个sum函数的实现,来看看insertResultInto函数接口是如何工作的:

    void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
    {
        auto & column = static_cast<ColVecResult &>(to);
        column.getData().push_back(this->data(place).get());
    }

复制

其实很简单,就是调用AggregateDataPtr,也就是AggregateFunctionSumDataget()函数获取sum计算的结果,然后添加到列内存之中。

get函数接口的实现如下:

    T get() const
    {
        return sum;
    }

复制

2.聚合函数的注册流程

有了上述的背景知识,我们接下来举个栗子。来看看一个聚合函数的实现细节,以及它是如何被使用的。

AggregateFunctionSum

这里选取了一个很简单的聚合算子Sum,我们来看看它实现的代码细节。 这里我们可以看到AggregateFunctionSum是个final类,无法被继承了。而它继承IAggregateFunctionHelp类与IAggregateFunctionDataHelper类。

  • IAggregateFunctionHelp类 通过CRTP让父类可以直接调用子类的add函数指针而避免了虚函数调用的开销。
  • IAggregateFunctionHelper类则包含了Data的模板数据类型,也就是上文提及的AggregateFunctionSumData进行内存结构的createdestory等等。

这里我们就重点看,这个类override了getName方法,返回了对应的名字时sum。并且实现了我们上文提到核心方法。

template <typename T, typename TResult, typename Data>
class AggregateFunctionSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>
{
public:
    using ResultDataType = std::conditional_t<IsDecimalNumber<T>, DataTypeDecimal<TResult>, DataTypeNumber<TResult>>;
    using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
    using ColVecResult = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<TResult>, ColumnVector<TResult>>;

    String getName() const override { return "sum"; }

    AggregateFunctionSum(const DataTypes & argument_types_)
        : IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {})
        , scale(0)
    {}

    AggregateFunctionSum(const IDataType & data_type, const DataTypes & argument_types_)
        : IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>(argument_types_, {})
        , scale(getDecimalScale(data_type))
    {}

    DataTypePtr getReturnType() const override
    {
        if constexpr (IsDecimalNumber<T>)
            return std::make_shared<ResultDataType>(ResultDataType::maxPrecision(), scale);
        else
            return std::make_shared<ResultDataType>();
    }

    void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
    {
        const auto & column = static_cast<const ColVecType &>(*columns[0]);
        this->data(place).add(column.getData()[row_num]);
    }

    void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
    {
        this->data(place).merge(this->data(rhs));
    }

    void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
    {
        this->data(place).write(buf);
    }

    void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
    {
        this->data(place).read(buf);
    }

    void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
    {
        auto & column = static_cast<ColVecResult &>(to);
        column.getData().push_back(this->data(place).get());
    }

private:
    UInt32 scale;
};

复制

之前我们讲到AggregateFunction的函数就是通过AggregateDataPtr指针来获取AggregateFunctionSumData的地址,来调用add实现聚合算子的。我们可以看到AggregateFunctionSumData实现了前文提到的add, merge, write,read四大方法,正好与接口IAggregateFunction一一对应上了。

template <typename T>
struct AggregateFunctionSumData
{
    T sum{};

    void add(T value)
    {
        sum += value;
    }

    void merge(const AggregateFunctionSumData & rhs)
    {
        sum += rhs.sum;
    }

    void write(WriteBuffer & buf) const
    {
        writeBinary(sum, buf);
    }

    void read(ReadBuffer & buf)
    {
        readBinary(sum, buf);
    }

    T get() const
    {
        return sum;
    }
};

复制

ClickHouse在Server启动时。main函数之中会调用registerAggregateFunction的初始化函数注册所有的聚合函数。 然后调用到下面的函数注册sum的聚合函数:

void registerAggregateFunctionSum(AggregateFunctionFactory & factory)
{
    factory.registerFunction("sum", createAggregateFunctionSum<AggregateFunctionSumSimple>, AggregateFunctionFactory::CaseInsensitive);
}

复制

也就是完成了这个sum聚合函数的注册,后续我们get出来就可以愉快的调用啦。(这部分有许多模板派生的复杂代码,建议与源码结合梳理才能事半功倍~~)

3.要点梳理

第二小节解析了一个聚合函数与接口意义对应的流程,这里重点梳理聚合函数实现的源码要点:

  1. 各个聚合函数核心的实现add,merge与序列化,内存结构初始化,内存结构释放的接口。
  2. 各个函数的实现需要继承IAggregateFunctionDataHelper的接口,而它的父类是IAggregateFunctionHelperIAggregateFunction接口。
  3. ClickHouse的聚合函数保证了每次循环遍历一个Block只调用一个IAggregateFunction的聚合函数,这样最大程度上确保了向量化执行的可能性,减少了数据偏移与依赖。

4. 小结

好了,到这里也就把ClickHouse聚合函数部分的代码梳理完了。 除了sum函数外,其他的函数的执行也是同样通过类似的方式依次来实现和处理的,源码阅读的步骤也可以参照笔者的分析流程来参考。 笔者是一个ClickHouse的初学者,对ClickHouse有兴趣的同学,欢迎多多指教,交流。

5. 参考资料

官方文档 ClickHouse源代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/153829.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android设置本地字体文件ttf

目录 前言 ①使用typeface 方式 一、创建加载字体实例 二、使用步骤 1.在Application中加载字体 2.在xml中使用 ②使用fontFamily 方式 1、在res/font下导入ttf文件 2、在xml中使用 总结 前言 产品告诉UI设计设计图时要使用炫酷字体。因为Android不像网页项目可以使用…

如何使用Jasper导出用户列表数据?

场景说明在使用JasperjaspersoftStudio导出用户列表数据导出(如下图)是比较简单的&#xff0c;就是把用户列表数据&#xff0c;一个List集合放到 JRBeanCollectionDataSource中即可。但是如果有多个List集合需要导出呢&#xff0c;这个应该怎么办?比如&#xff1a;一个用户的集…

用Python发邮件(附完整源代码)

目录 一、背景 1.1、前言 1.2、说明 二、SMTP协议 2.1、SMTP协议作用 2.2、SSL作用 三、步骤 3.1、开启QQ邮箱SMTP 四、代码 4.1、完整源代码 五、结果 5.1、代码运行结果 六、总结 6.1、总结 一、背景 1.1、前言 写了一个简陋的2023年12306自动化购票程序&…

微服务保护 - Sentinel

1.概念&#xff1a; 一&#xff1a;sentinel 介绍 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式服务架构的轻量级流量控制产品&#xff0c;主要以流量为切入点&#xff0c;从流量控制、熔断降级、系统负载保护等多个维度来帮助…

黑马学ElasticSearch(六)

目录&#xff1a; &#xff08;1&#xff09;搜索结果处理-排序 &#xff08;2&#xff09;搜索结果处理-分页 &#xff08;3&#xff09;搜索结果处理-高亮 &#xff08;1&#xff09;搜索结果处理-排序 评分降序、价格升序查询 第二 第三 一旦进行了排序_score就没有值了 …

Qt 自定义流程图 diagram

Qt 自定义流程图 diagram前言程序执行效果程序源码下载图形视图框架成员介绍重写QGraphicsItem程序源码介绍重点代码前言 本文将对QGraphicsScene, QGraphicsView,QGraphicsItem这三个类进行简单介绍&#xff0c;并通过diagram流程图项目对自定义QGraphicsItem操作进行演示讲解…

Docker从无到有

随着各个软件的版本越来越多&#xff0c;软件开发、使用环境愈发复杂&#xff0c;Docker日益受到广泛应用。本文记录下从零开始了解、使用docker的各个步骤。 Docker有3个基本概念: Image&#xff0c;镜像。镜像就是系统的快照。静态。每个Image以<Repo Name>:<Tag …

求助:程序员得了结膜炎+干眼症怎么办?

大概是2022年12月初开始&#xff0c;我就感觉眼睛有看东西有点肿胀和模糊&#xff0c;还有就是总想眯眼。本来以为就是用眼过度导致的疲劳&#xff0c;想着周六周日好好休息一下应该就好了&#xff0c;但是没想到不仅没好还加重了。不得已去了医院求助医生。 我去的是杭州的浙…

TS:镜像构建过程中go下载第三方包失败-2023.1.8(已解决)

title: TS&#xff1a;镜像构建过程中go下载第三方包失败-2023.1.8(已解决) date: 2023-1-10 categories: Golang tags:Golang TS&#xff1a;镜像构建过程中go下载第三方包失败-2023.1.8(已解决) 注意&#xff1a;一定要注意项目代码里go版本和自己机器go版本是否一致&#x…

5.2中断系统中的设备树——Linux对中断处理的框架及代码流程简述

当发生中断时&#xff0c;CPU会跳到一个固定的地址去执行代码&#xff0c;这个固定的地址就被称为中断向量。 以ARM920T为例&#xff0c;它的中断向量默认是地址24&#xff08;0x18&#xff09;的地方。那么&#xff0c;就可以在这里放一条跳转指令。 一系列的跳转指令用来处…

基于配置系统和流水线的热更新方案

文章目录背景方案调研具体方案方案优缺点背景 最近我们要在一个新的 App 上增加热更新的能力&#xff0c;按照以往的设计思路&#xff0c;需要后台一起参与&#xff0c;并提供对应的接口&#xff0c;具体的接口如下&#xff1a; 接口参数返回值备注uploadBasePkgappVersion&a…

接口管理工具YApi怎么用?颜值高、易管理、超好用

众多接口管理工具如雨后春笋搬冒出。让人欣慰的是&#xff0c;有许多优秀作品来自国内&#xff0c;包含YApi和rap。看着中文的官网&#xff0c;熟悉的汉语&#xff0c;不禁让人暗爽。当然这也就带来另一个弊端&#xff0c;因为使用基数少&#xff0c;所以参考资料少。我们想学习…

Linux时间的获取与使用

Linux系统时间有两种。 &#xff08;1&#xff09;日历时间。该值是自协调世界时(UTC)1970年1月1日00:00:00这个特定时间以来所经过的秒数累计值。基本数据类型用time_t保存。最后通过转换才能得到我们平时所看到的24小时制或者12小时间制的时间。 &#xff08;2&#xff09;…

使用WSL获得Ubuntu系统环境

文章目录使用WSL获得Ubuntu系统环境为什么要用WSL什么是WSLWSL部署安装Windows Terminal软件使用WSL获得Ubuntu系统环境 为什么要用WSL WSL作为Windows10系统带来的全新特性&#xff0c;正在逐步颠覆开发人员既有的选择。 传统方式获取Linux操作系统环境&#xff0c;是安装完…

凯撒加密Caesar cipher

凯撒加密的由来凯撒加密正是凯撒大帝发明的&#xff0c;是一种古典的加密凯撒率军征服高卢&#xff0c;袭击日耳曼和不列颠&#xff0c;古罗马开启了走出意大利&#xff0c;征服全欧洲的征程仅用8年时间征服高卢后&#xff0c;凯撒率军越过卢比孔河&#xff0c;驱赶政敌&#x…

振弦采集模块参数配置工具的使用

振弦采集模块参数配置工具的使用 通常情况下&#xff0c;在计算机端对模块进行测试、读写时&#xff0c;可使用一些通用的免费工具完成&#xff0c;如基于 MODBUS 通讯协议的调试工具 MODSCAN、通用串口调试助手等&#xff0c; 这些工具可以通过网络搜索下载使用&#xff0c;在…

03【Response、ServletContext】

文章目录03【Response、ServletContext】一、HTTP响应概述1.1 什么是HTTP响应&#xff1a;1.2 响应信息的组成&#xff1a;1.2.1 响应行1.2.2 响应头1.2.3 响应体1.3 Http协议小结二、HttpServletResponse对象2.1 设置响应行2.2.1 设置响应状态码2.2.2 常见响应码1&#xff09;…

入选IDC报告,美创科技数据安全管理平台实力领跑

近日&#xff0c;国际权威研究咨询机构IDC发布《中国数据安全基础设施管理平台市场洞察&#xff0c;2022》报告。本次报告对行业用户以及技术提供商深入访谈&#xff0c;挑选出具有代表性的数据安全基础设施管理平台产品和方案&#xff0c;美创数据安全管理平台入选&#xff0c…

软件测试---概念篇

本文主要介绍软件测试相关的一些基础概念.主要内容包括 : 什么是需求 什么是bug 什么是测试用例 开发模型和测试模型 配置管理和软件测试 一 : 什么是需求 满足用户期望或正式规定文档&#xff08;合同、标准、规范&#xff09;所具有的条件和权能&#xff0c;包含用户需求和软…

【Kubernetes 企业项目实战】02、基于 Prometheus 和 K8s 构建智能化监控告警系统(中)

目录 一、安装和配置 node-exporter 1.1 node-exporter介绍&#xff1f; 1.2 安装 node-exporter 二、Prometheus server 安装和配置 2.1 创建 sa 账号&#xff0c;对 sa 做 rbac 授权 2.2 创建 prometheus 数据存储目录 2.3 安装 Prometheus server 服务 &#xff08;…