PostgreSQL数据库FDW——Parquet S3 MultifileMergeExecutionStateBaseS3

news2025/1/24 5:30:45

在这里插入图片描述
MultifileMergeExecutionStateBaseS3和SingleFileExecutionStateS3、MultifileExecutionStateS3类不同,reader成员被替换为ParquetReader *类型的readers vector。新增slots_initialized布尔变量指示slots成员是否已经初始化。slots成员是Heap类,Heap用于以优先级方式存储元组以及文件号。优先级被赋予具有最小key的元组。一旦请求了下一个元组,它将从堆的顶部取出,并从同一个文件中读取一个新元组并将其插入回堆中。然后重建堆以维持其属性。这个想法来自PostgreSQL中的nodeGatherMerge.c,但使用STL重新实现。slots Heap中存储的是ReaderSlot元素。

class MultifileMergeExecutionStateBaseS3 : public ParquetS3FdwExecutionState{
protected:
    std::vector<ParquetReader *> readers;
    /* Heap is used to store tuples in prioritized manner along with file
     * number. Priority is given to the tuples with minimal key. Once next
     * tuple is requested it is being taken from the top of the heap and a new
     * tuple from the same file is read and inserted back into the heap. Then
     * heap is rebuilt to sustain its properties. The idea is taken from
     * nodeGatherMerge.c in PostgreSQL but reimplemented using STL. */
    struct ReaderSlot{
        int             reader_id;
        TupleTableSlot *slot;
    };
    Heap<ReaderSlot>    slots;
    bool                slots_initialized;

首先看ParallelCoordinator相关函数,set_coordinator向ParquetReader *类型的readers vector中所有ParquetReader都设置coord成员,也就是这些coord都共享coord。

    void set_coordinator(ParallelCoordinator *coord) {
        this->coord = coord;
        for (auto reader : readers) reader->set_coordinator(coord);
    }
    Size estimate_coord_size() {
        return sizeof(ParallelCoordinator) + readers.size() * sizeof(int32);
    }
    void init_coord() {
        coord->init_multi(readers.size());
    }

compare_slots函数用于通过sort keys比较两个slot。如果a大于b,返回true;否则返回false。

    /* compare_slots
     *      Compares two slots according to sort keys. Returns true if a > b,
     *      false otherwise. The function is stolen from nodeGatherMerge.c
     *      (postgres) and adapted.  */
    bool compare_slots(const ReaderSlot &a, const ReaderSlot &b) {
        TupleTableSlot *s1 = a.slot; TupleTableSlot *s2 = b.slot;
        Assert(!TupIsNull(s1)); Assert(!TupIsNull(s2));

        for (auto sort_key: sort_keys) {
            AttrNumber  attno = sort_key.ssup_attno;
            Datum       datum1, datum2;  bool        isNull1, isNull2;          
            if (this->schemaless) { /* In schemaless mode, presorted column data available on each reader. TupleTableSlot just have a jsonb column. */
                auto reader_a = readers[a.reader_id]; auto reader_b = readers[b.reader_id];
                std::vector<ParquetReader::preSortedColumnData> sorted_cols_data_a = reader_a->get_current_sorted_cols_data();
                std::vector<ParquetReader::preSortedColumnData> sorted_cols_data_b = reader_b->get_current_sorted_cols_data();
                datum1 = sorted_cols_data_a[attno].val; isNull1 = sorted_cols_data_a[attno].is_null;
                datum2 = sorted_cols_data_b[attno].val; isNull2 = sorted_cols_data_b[attno].is_null;
            }  else {
                datum1 = slot_getattr(s1, attno, &isNull1); datum2 = slot_getattr(s2, attno, &isNull2);
            }

            int  compare = ApplySortComparator(datum1, isNull1, datum2, isNull2, &sort_key);
            if (compare != 0)
                return (compare > 0);
        }
        return false;
    }

get_schemaless_sortkeys函数主要用途是判别如果sorted_cols中包含的列,而reader list中的该列没有数据,则需要将其剔除出sort_keys列表。

    /* get_schemaless_sortkeys
     *      - Get sorkeys list from reader list.
     *      - The sorkey is create when create column mapping on each reader
     */
    void get_schemaless_sortkeys() {
        this->sort_keys.clear();
        for (size_t i = 0; i < this->sorted_cols.size(); i++) {           
            for (auto reader: readers) {  /* load sort key from all reader */
                ParquetReader::preSortedColumnData sd = reader->get_current_sorted_cols_data()[i];
                if (sd.is_available) {
                    this->sort_keys.push_back(sd.sortkey);
                    break;
                }
            }
        }
    }

MultifileMergeExecutionStateS3

MultifileMergeExecutionStateS3继承自MultifileMergeExecutionStateBaseS3类,没有新增成员,包含的成员函数和上节的ExecutionStateS3子类类似。add_file函数创建ParquetReader的流程和上节的ExecutionStateS3子类类似,不同之处在于需要将新创建的reader加入readers vector,且每个reader拥有自己的标识,以每次添加前的readers vector大小作为reader_id标识。

    void add_file(const char *filename, List *rowgroups) {
        ListCell           *lc; std::vector<int>    rg;        
        foreach (lc, rowgroups)
            rg.push_back(lfirst_int(lc));
            
        int32_t             reader_id = readers.size(); // 以readers vector大小作为reader_id标识
        ParquetReader *r = create_parquet_reader(filename, cxt, reader_id);
        r->set_rowgroups_list(rg); r->set_options(use_threads, use_mmap);
        if (s3_client)  r->open(dirname, s3_client);
        else r->open();
        r->set_schemaless_info(schemaless, slcols, sorted_cols);
        r->create_column_mapping(tuple_desc, attrs_used);
        
        readers.push_back(r); // 将新创建的reader加入readers vector
    }

initialize_slots函数在第一次 调用next函数时被调用,用于初始化slots binary heap。

    /* initialize_slots      Initialize slots binary heap on the first run. */
    void initialize_slots(){
        std::function<bool(const ReaderSlot &, const ReaderSlot &)> cmp = [this] (const ReaderSlot &a, const ReaderSlot &b) { return compare_slots(a, b); }; // 确定比较函数
        int i = 0;

        slots.init(readers.size(), cmp);
        
        for (auto reader: readers) { // 循环对每个reader创建ReaderSlot
            ReaderSlot    rs;
            PG_TRY_INLINE(
                {
                    MemoryContext oldcxt = MemoryContextSwitchTo(cxt); rs.slot = MakeTupleTableSlotCompat(tuple_desc); MemoryContextSwitchTo(oldcxt);
                }, "failed to create a TupleTableSlot"
            );

            if (reader->next(rs.slot) == RS_SUCCESS) { 
                ExecStoreVirtualTuple(rs.slot);
                rs.reader_id = i;
                slots.append(rs); // 如果next调用成功,则保留该ReaderSlot
            }
            ++i;
        }
        if (this->schemaless) get_schemaless_sortkeys();
        PG_TRY_INLINE({ slots.heapify(); }, "heapify failed");
        slots_initialized = true;
    }

next函数在slots_initialized为false,即未初始化slots binary heap时,初始化heap。获取slots heap中最小的ReaderSlot,从其中读取一条记录作为返回slot。尝试从与head slot相同的读取器读取另一条记录。如果成功,新记录将其放入堆中,堆将被重新初始化。否则,如果读取器中没有更多的记录,那么当前头将从堆中移除,堆将重新被初始化。

    bool next(TupleTableSlot *slot, bool /* fake=false */) {
        if (unlikely(!slots_initialized)) initialize_slots();
        if (unlikely(slots.empty())) return false;

        /* Copy slot with the smallest key into the resulting slot */
        const ReaderSlot &head = slots.head(); // 获取slots heap中最小的ReaderSlot,即head处的
        PG_TRY_INLINE(
            {
                ExecCopySlot(slot, head.slot);  ExecClearTuple(head.slot);
            }, "failed to copy a virtual tuple slot"
        );

        /* Try to read another record from the same reader as in the head slot. In case of success the new record makes it into the heap and the heap gets reheapified. Else if there are no more records in the reader then current head is removed from the heap and heap gets reheapified. */
        if (readers[head.reader_id]->next(head.slot) == RS_SUCCESS){
            ExecStoreVirtualTuple(head.slot);
            PG_TRY_INLINE({ slots.heapify_head(); }, "heapify failed");
        } else {
#if PG_VERSION_NUM < 110000
            /* Release slot resources */
            PG_TRY_INLINE(
                {
                    ExecDropSingleTupleTableSlot(head.slot);
                }, "failed to drop a tuple slot"
            );
#endif
            slots.pop();
        }
        return true;
    }

CachingMultifileMergeExecutionStateS3

CachingMultifileMergeExecutionStateS3是MultifileMergeExecutionState的一个专门版本,它能够合并大量文件,而不会同时打开所有文件。为此,它利用CachingParqueReader将所有读取数据存储在内部缓冲区中。其新增std::vector<uint64_t> ts_active存储每个reader activation的时间戳,用于获取最近最少使用active的reader。int num_active_readers用于存储标识为active的reader的数量,int max_open_files用于存储最大打开文件的数量。
activate_reader函数的流程如下:如果reader尚未激活,则打开它。如果active readers的数量超过限制,函数将关闭最近最少使用的reader。

    ParquetReader *activate_reader(ParquetReader *reader) {             
        if (ts_active[reader->id()] > 0) return reader; /* If reader's already active then we're done here */        
        if (max_open_files > 0 && num_active_readers >= max_open_files) { /* Does the number of active readers exceeds limit? */
            uint64_t    ts_min = -1;  /* initialize with max uint64_t */ int         idx_min = -1;
            /* Find the least recently used reader */
            for (std::vector<ParquetReader *>::size_type i = 0; i < readers.size(); ++i) {
                if (ts_active[i] > 0 && ts_active[i] < ts_min) {
                    ts_min = ts_active[i]; idx_min = i;
                }
            }
            if (idx_min < 0) throw std::runtime_error("failed to find a reader to deactivate");
            
            readers[idx_min]->close(); // 关闭最近最少使用的reader
            ts_active[idx_min] = 0;  num_active_readers--;
        }
        
        struct timeval tv; gettimeofday(&tv, NULL);
        /* Reopen the reader and update timestamp */        
        ts_active[reader->id()] = tv.tv_sec*1000LL + tv.tv_usec/1000;        
        if (s3_client) reader->open(dirname, s3_client);
        else reader->open();
        num_active_readers++;
        return reader;
    }

initialize_slots函数初始化slots binary heap,和上一个类的函数不同之处在于其初始化ts_active,并调用activate_reader函数对reader进行了激活,并且调用了reader对象的set_schemaless_info和create_column_mapping函数。

    /* initialize_slots      Initialize slots binary heap on the first run. */
    void initialize_slots() {
        std::function<bool(const ReaderSlot &, const ReaderSlot &)> cmp =
            [this] (const ReaderSlot &a, const ReaderSlot &b) { return compare_slots(a, b); };
        int i = 0;

        this->ts_active.resize(readers.size(), 0);

        slots.init(readers.size(), cmp);
        for (auto reader: readers)
        {
            ReaderSlot    rs;

            PG_TRY_INLINE(
                {
                    MemoryContext oldcxt;

                    oldcxt = MemoryContextSwitchTo(cxt);
                    rs.slot = MakeTupleTableSlotCompat(tuple_desc);
                    MemoryContextSwitchTo(oldcxt);
                }, "failed to create a TupleTableSlot"
            );

            activate_reader(reader);
            reader->set_schemaless_info(schemaless, slcols, sorted_cols);
            reader->create_column_mapping(tuple_desc, attrs_used);

            if (reader->next(rs.slot) == RS_SUCCESS)
            {
                ExecStoreVirtualTuple(rs.slot);
                rs.reader_id = i;
                slots.append(rs);
            }
            ++i;
        }
        if (this->schemaless)
            get_schemaless_sortkeys();
        PG_TRY_INLINE({ slots.heapify(); }, "heapify failed");
        slots_initialized = true;
    }

next函数用于获取下一个solt,和上一个类的函数不同之处在于ReadStatus增加了RS_INACTIVE类型的处理,也就是调用activate_reader函数激活对应的reader。

    bool next(TupleTableSlot *slot, bool /* fake=false */) {
        if (unlikely(!slots_initialized)) initialize_slots();
        if (unlikely(slots.empty())) return false;

        /* Copy slot with the smallest key into the resulting slot */
        const ReaderSlot &head = slots.head();
        PG_TRY_INLINE(
            {
                ExecCopySlot(slot, head.slot); ExecClearTuple(head.slot);
            }, "failed to copy a virtual tuple slot"
        );

        /* Try to read another record from the same reader as in the head slot.
         * In case of success the new record makes it into the heap and the
         * heap gets reheapified. If next() returns RS_INACTIVE try to reopen
         * reader and retry. If there are no more records in the reader then
         * current head is removed from the heap and heap gets reheapified.
         */
        while (true) {
            ReadStatus status = readers[head.reader_id]->next(head.slot);
            switch(status)  {
                case RS_SUCCESS:
                    ExecStoreVirtualTuple(head.slot);
                    PG_TRY_INLINE({ slots.heapify_head(); }, "heapify failed");
                    return true;
                case RS_INACTIVE: /* Reactivate reader and retry */
                    activate_reader(readers[head.reader_id]);  break;
                case RS_EOF:
#if PG_VERSION_NUM < 110000
                    /* Release slot resources */
                    PG_TRY_INLINE(
                        {
                            ExecDropSingleTupleTableSlot(head.slot);
                        }, "failed to drop a tuple slot"
                    );
#endif
                    slots.pop();
                    return true;
            }
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/164823.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

重装系统Windows10纯净版操作步骤(微pe)

目录 前言 操作步骤 第一步&#xff1a;格式化硬盘 第二步&#xff1a;硬盘重新分区 固态硬盘分区 机械硬盘分区 完成效果展示 第三步&#xff1a;把ISO镜像文件写入固态硬盘 第四步&#xff1a;关机拔u盘 第五步&#xff1a;开机重装系统成功 前言 1.要重装系统&am…

Webpack提取页面公共资源

1. 利用html-webpack-externals-plugin 分离基础库 在做React开发时&#xff0c;经常需要引入react和react-dom基础库&#xff0c;这样在打包的时候速度就会比较慢&#xff0c;这种情况下我们可以将这些基础库忽略掉&#xff0c;将它们通过CDN的方式直接引入&#xff0c;而不打…

apache和IIS区别?内网本地服务器项目怎么让外网访问?

Apache和IIS是比较常用的搭建服务器的中间件&#xff0c;它们之间还是有一些区别差异的&#xff0c;下面就详细说说 Apache和IIS有哪些区别&#xff0c;以及如何利用快解析实现内网主机应用让外网访问。 1.安全性 首先说说apache和IIS最基本的区别。Apache运行的操作系统通常为…

Python数学建模问题总结(3)数据可视化Cookbook指南·下

概括总结&#xff1a;五、样式&#xff1a;优化图表、数据可视1.形状&#xff1a;形状的精确程度&#xff1b;2.颜色&#xff1a;区分类别、表示数量、突出特定数据、表示含义&#xff1b;3.线&#xff1a;点划线或不同的不透明度&#xff1b;4.文字排版&#xff1a;应用于图表…

IOC/DI配置管理第三方bean及注解开发。

目录 一、IOC/DI 配置管理第三方bean 1、配置第三方bean 2、加载properties 文件 3、核心容器 二、注解开发 1、注解开发定义bean 2、纯注解开发模式 3、注解开发bean作用范围与生命周期管理 4、注解开发依赖注入 三、IOC/DI注解开发管理第三方bean 1、注解开发管…

深度学习中有哪些从数学模型或相关理论出发, 且真正行之有效的文章?

自深度学习兴起后&#xff0c;深层网路对图像进行特征学习&#xff0c;将低层次的基础特征聚合成更高级的语义特征&#xff0c;取得突出的识别效果&#xff0c;在图像识别、分割及目标检测三大领域得到了众多应用。深度学习算法基本上是由多个网络层搭建&#xff0c;每个网络层…

SpringBoot自动装配

前言 Spring翻译为中文是“春天”&#xff0c;的确&#xff0c;在某段时间内&#xff0c;它给Java开发人员带来过春天&#xff0c;但是随着我们项目规模的扩大&#xff0c;Spring需要配置的地方就越来越多&#xff0c;夸张点说&#xff0c;“配置两小时&#xff0c;Coding五分…

Open3D Usage

Open3D UsageWhat is open3Dopen3D 核心功能包括&#xff1a;python quick start交互指令显示点云**read_point_cloud** ParametersReturnPointCloud的属性&#xff1a;加载ply点云&#xff1a;显示单帧点云&#xff1a;批量单帧显示点云可视化**draw_geometries** Parameters含…

Uniswap v3 详解(三):交易过程

交易过程 v3 的 UniswapV3Pool 提供了比较底层的交易接口&#xff0c;而在 SwapRouter 合约中封装了面向用户的交易接口&#xff1a; exactInput&#xff1a;指定交易对路径&#xff0c;付出的 x token 数和预期得到的最小 y token 数&#xff08;x, y 可以互换&#xff09;e…

Studio One2023新版本更新功能介绍

Studio One 6是一款非常专业的音乐创作编辑软件。为用户提供了所有一切你所需要创作的功能&#xff0c;包括所有的歌曲、项目、仪表板等动能&#xff0c;而且还自定义添加配置文件&#xff0c;良好的界面交互和丰富的功能板块&#xff0c;再结合优秀的性能&#xff0c;能够满足…

基于SpringBoot的SSMP整合(数据层)

模块创建 新建&#xff1a; 添加依赖项&#xff1a; 由于parent没有版本维护&#xff0c;还需在pom.xml文件再次添加&#xff1a; <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version…

Django REST framework--类视图

Django REST framework--类视图基于类的视图APIView类视图generics通用类视图视图集DRF框架路由基于类的视图 项目开发中要不断思考如何让代码保持高内聚&#xff0c;低耦合&#xff0c;因此优化代码的道路上一直都不停歇。目前开发的视图是基于函数形式的&#xff0c;特点是灵…

Effective Objective-C 2.0学习记录(四)

学习记录15.用前缀避免命名空间冲突16.提供“全能初始化方法”17.实现description方法debugDescription&#xff1a;18.尽量使用不可变对象19.使用清晰而协调的命名方式类与协议的命名20.为私有方法名加前缀21.理解OC错误模型22.理解NSCopying协议深拷贝和浅拷贝15.用前缀避免命…

【1-神经网络计算】北京大学TensorFlow2.0

课程地址&#xff1a;【北京大学】Tensorflow2.0_哔哩哔哩_bilibiliPython3.7和TensorFlow2.1六讲&#xff1a;神经网络计算&#xff1a;神经网络的计算过程&#xff0c;搭建第一个神经网络模型神经网络优化&#xff1a;神经网络的优化方法&#xff0c;掌握学习率、激活函数、损…

ArcGIS基础实验操作100例--实验99三维爆炸分析

本实验专栏参考自汤国安教授《地理信息系统基础实验操作100例》一书 实验平台&#xff1a;ArcGIS 10.6 实验数据&#xff1a;请访问实验1&#xff08;传送门&#xff09; 空间分析篇--实验99 三维爆炸分析 目录 一、实验背景 二、实验数据 三、实验步骤 &#xff08;1&…

Open3D ICP精配准(使用鲁棒性核函数,Python版本)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 标准的ICP算法(点到平面)是使得下面这个目标函数最小化: 其中 p 、 q p、q p、q是相对应的匹配点,

RocketMQ5.0.0路由中心NameServer

一、NameServer概览NameServer是RocketMQ的注册中心&#xff0c;是消息存储Broker、生产者、消费者沟通的桥梁。NameServer集群之间是相互独立的&#xff0c;Broker启动时向所有NameServer注册中心注册。通过DLedger构建NameServer集群&#xff0c;实现如主从切换等功能。启动N…

【笔记】大话设计模式24-28

【笔记】大话设计模式24-28 文章目录【笔记】大话设计模式24-2824 职责链模式24.1 Example24.2 定义24.3 Show me the code24.4 总结25 中介者模式25.1 Example25.2 定义25.3 Show me the code25.4 总结26 享元模式26.1 Example26.2 定义26.3 Show me the code26.4 总结27 解释…

aws s3 参与s3game寻找宝藏游戏挑战学习s3对象存储

参考资料 Pirates S3game workshop http://s3game-level1.s3-website.us-east-2.amazonaws.com/level1.html https://blog.benclmnt.com/notes/s3-game/ https://awscli.amazonaws.com/v2/documentation/api/latest/reference/s3/index.html 强烈推荐这种寓教于乐的方式学…

【ROS2 入门】ROS 2 actions 概述

大家好&#xff0c;我是虎哥&#xff0c;从今天开始&#xff0c;我将花一段时间&#xff0c;开始将自己从ROS1切换到ROS2&#xff0c;在上一篇中&#xff0c;我们一起了解ROS 2中Parameters&#xff0c; 这一篇&#xff0c;我们主要会围绕ROS中另外一个重要的概念“Actions ”来…