Apache Arrow Acero执行引擎

news2025/1/23 17:38:25

Apache Arrow流执行引擎

对于许多复杂的计算,在内存或计算时间内,连续的计算函数的直接调用都是不可行的。为了更加有效的提高资源使用率、促进多批数据的消费,Arrow提供了一套流式执行引擎,称为Acero。

目前支持算子有:Source、Sink、HashJoin、Project、Filter、Sort、 Agg。

如果自己要在Arrow里面实现一个新算子,如物化、MergeJoin等算子,我们需要如何实现?

本节将以最复杂的HashJoin算子为例,拆解其实现原理,便于快速上手。

1.Acero Plan

以两表join为例,假设Student、Score表,其列字段分别如下:

Student表

Column  |  Type   | Collation | Nullable | Default 
---------+---------+-----------+----------+---------
 id      | integer |           |          | 
 stu_id  | integer |           |          | 
 subject | text    |           |          | 
 score   | integer |           |          |

Score表

Column |  Type   | Collation | Nullable | Default 
--------+---------+-----------+----------+---------
 id     | integer |           |          | 
 name   | text    |           |          | 
 age    | integer |           |          |

SQL语句

select subject, name, score from student st join score s on st.id = s.stu_id and st.name != s.subject;

我们大概可以得到类似的Plan:

->  Hash Join 
     Hash Cond: (student.id = score.stu_id)
     Join Filter: (student.name <> score.subject)
     ->  Seq Scan on student 
     ->  Hash  (cost=431.00..431.00 rows=1 width=8)
          ->  Seq Scan on score

对于这样的Plan我们可以构建一个Acero计划,如下图所示:

7f9b91c7b1be161c38eb8e26e3fcaa78.png

2.拓扑排序

对于一个Plan,我们可以把它想象成算法中的图,使用拓扑排序便可以得到节点执行的顺序。

在Acero中,便是这么做的,通过拓扑排序算法,得到先后顺序,对于上面的图我们可以得到拓扑排序的结果为:

Left SourceNode->Left ProjectNode->HashJoinNode(probe 端)->Right SourceNode->Right ProjectNode->HashJoinNode(build 端)->ProjectNode->SinkNode。

注意:对于HashjoinNode其实是一个节点,在节点内部去分叉build/probe。

当得到这么一个执行顺序节点之后,我们需要关注几个问题?

  • 如何初始化这些节点?

  • 如何停止/结束、什么时候发送数据/接受数据?

  • 对于多条路径,像Hashjoin这种既有build/probe端,如何识别哪一端?

  • 如何管理Schema?特别是Filter、Output这些的列如何与Input的Schema关联起来?

  • 如何使用Filter过滤数据?

除了这些问题,还有特别多,例如:

  • 多线程调度

  • 任务调度

  • 异步处理

  • BloomFilter细节

  • SwissJoin细节

等等。

涉及的内容非常庞杂,可以说把HashjoinNode实现出来,对于Arrow的整个框架基本可以覆盖了(当然还有ipc/kernel等)。

我们先来熟悉一下,整个Acero的模型是怎样的,这很重要,因为一不小心,写出来的Plan就没法停了,死循环了就尴尬了。

还是以上述两个表Join为例,在得到节点的拓扑排序后,plan会对收集好的节点进行倒序遍历,这样做的目的是初始化节点、收集节点异步future。

倒序的逻辑涉及两点:

  • StartProducing

开始生产数据,对于像Project、Sink之类的节点,基本是不做事的,完成当前节点的初始化工作,所有节点最重要的初始化便是执行完成的标记。

  • finished()

返回什么时候当前异步任务可以结束工作,在节点倒序遍历过程中会把每个节点finished()返回的Future对象收集起来,最后统一等待所有任务完成。

目前有两种方式来判断是否已经完成当前节点:

第一种是:通过Future对象控制

finished_.MarkFinished(status);

另外一种是:任务组

task_group_.End();

3.执行框架

以上述的Plan为例子,我们可以得到如下执行流程。

图中蓝色这条线我们称之为倒序遍历初始化各个节点,左虚线框我们称之为probe端,右侧虚线框我们称之为build端。

1)第一个执行的节点是SinkNode,然后按照StartProducing、finished流程执行,没啥好说的,没什么特殊逻辑。

2)第二个执行的节点是ProjectNode,同上。

3)第三个执行的节点是HashJoinNode,HashJoin支持BloomFilter,所以在内部有一个Context去StartProducing、finished。

4)第四个执行的节点是Build端的Right ProjectNode,同上。

5)第五个执行的节点是Build端的Right SourceNode,这里到了精彩的部分,此时的StartProducing会真正的干活,对于Source节点是数据的来源,那么它会负责把数据Push下去,那么就会依次调用各个节点的InputReceived、InputFinished接口。

  • InputReceived

每个输入的节点必须要实现的接口,当然SourceNode是不需要实现的,因为它是没有输入的,如果当前节点实现了InputReceived接口,那么数据便会从上游Push下来,然后当前节点处理即可。

  • InputFinished

当处理完当前节点的任务后,我们需要停止,这个节点可太重要了,因为没它,你的plan就死循环了。因为在最外面一直在等当前节点处理完,可以没处理完,就死循环了,不过arrow的Future有超时控制。InputeFinished需要做两件事情:

第一:通知下游节点你可以结束了。当前节点处理了一堆事情之后,会产生Batch,产生多少个,那么当前节点完成的话,下游节点也得拿到这些完成的数据去做处理,就得一层层的InputeFinished掉。

第二:当前节点结束设置finished()接口的标志,例如:

finished_.MarkFinished(status);
或者
task_group_.End();

6)第六个执行的节点是Probe端的Left ProjectNode,不做什么事情,StartProducing、finished。

7)第七个执行的节点是Probe端的Left SourceNode,跟前面的Build端SourceNode类似,负责Probe表的数据输入,注意两者在HashJoinNode节点内部处理的区别,分别会调用各自的逻辑。

0c26dc96ae9918f33a3b43fa61f32826.png

4.Schema管理

HashJoin的Schema管理是一门艺术,设计的非常优雅。

首先来讨论一下为什么要Schema管理呢?

假设输入了两个表的schema,left schema、right schema,这个我们称之为INPUT schema,对于下面这样的query,引出几个问题。

select subject, name, score from student st join score s on st.id = s.stu_id and st.name != s.subject;
  • PayLoad部分需要?如果需要,如何与输入的Schema进行关联?

  • Filter时,我可能只需要Left+Right的部分列,怎么快速获取?

  • 如何快速判断Hash Key中是否含有Filter列?或者PayLoad是否含有Filter列?

对于第三个问题,HashJoin在Probe阶段会得到一些匹配的行、不匹配的行,对于Filter来说需要Batch数据,而这个Batch是由n列组合而来,那么可能一部分列来自于PayLoad、一部分来自于Key。

对于前面两个问题比较常见了,就是我扫描的时候记下用了Left/Right哪些列即可。

于是,我猜测,为了这些目标,arrow实现了一套schema管理机制,在HashJoin里面分为几类:

  • INPUT

  • OUTPUT

  • KEY

  • PAYLOAD

  • FILTER

分别是输入、输出、等值条件Key、不进行输出的列、进行过滤的列。

其实现原理比较好理解,记录两个mapping,一个是正向、另一个是反向。

正向:用来记录其他类型在INPUT中是否存在,具体的位置是哪里。

反向:用来记录INPUT类型在其他类型是否存在,具体的位置是哪里。

不存在用-1来标记。

c50b863b55cc7a8eb57f3c8fd3b4f998.png

于是我们便可以通过map来得到任意两者之间的关系,例如:

  • 查询filter类型在input类型的位置

  • 查询input类型在filter类型的位置

  • 查询filter类型在payload类型的位置

04f5f92dd82cead1c7f2b1163fb2c9a4.png

以上便是本节的内容,欢迎大家转发~

更多硬核内容,欢迎订阅知识星球~

ae267eeff37d49d9cee63d95bf974d95.jpeg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/840952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jvm-程序计数器

1、是什么 4 学习路线 类加载器 内存结构方法区 类堆 对象虚拟机栈程序计数器本地方法栈 执行引擎解释器编译器 热点代码 5 程序计数器–作用 java源代码编译蛏二进制字节码 jvm指令。 对所有平台保持一致性。记住下一条jvm指令的执行地址。寄存器&#xff0c;cpu中读取速度…

CI/CD—Docker中深入学习

1 容器数据卷 什么是容器数据卷&#xff1a; 将应用和环境打包成一个镜像&#xff01;数据&#xff1f;如果数据都在容器中&#xff0c;那么我们容器删除&#xff0c;数据就会丢失&#xff01;需求&#xff1a;数据可以持久 化。MySQL容器删除了&#xff0c;删容器跑路&#…

网络框架重构之路plain2.0(c++23 without module) 环境

开发环境 主开发环境 1、系统 plain因为支持跨平台&#xff0c;所以主要的两个操作系统是linux和windows&#xff0c;而linux中我选择了中小企业中常用的centos 7&#xff08;centos 8 已经停止支持了&#xff0c;但是7还有一段时间才会停&#xff0c;估计之后大家可能会被迫使…

C++QT教程2——创建QT项目

文章目录 2 创建Qt项目2.1 使用向导创建2.2 手动创建2.3 .pro文件2.4 一个最简单的Qt应用程序main入口函数中&#xff08;main.cpp&#xff09;arnold_widget.h函数arnold_widget.cpp 参考文章 2 创建Qt项目 2.1 使用向导创建 打开Qt Creator 界面选择 New Project或者选择菜…

CTF Crypto --- 七八月份比赛杂题记录

文章目录 前言第一届交通运输行业网络安全大赛决赛---CryptoeasyRSAMypow baby_RSAEasyRSA你懂RSA吗 前言 哥们终于想起账号密码了(尊嘟忘了)。 鸽了快两个星期辣&#xff0c;下次一定不鸽(x)。 第一届交通运输行业网络安全大赛决赛—Crypto easyRSA 题目&#xff1a; f…

APP外包开发的开发语言对比

在开发iOS APP时有两种语言可以选择&#xff0c;Swift&#xff08;Swift Programming Language&#xff09;和 Objective-C&#xff08;Objective-C Programming Language&#xff09;&#xff0c;它们是两种不同的编程语言&#xff0c;都被用于iOS和macOS等苹果平台的软件开发…

Kafka入门,保姆级教学

文章目录 Kafka概念消息中间件对比消息中间件对比-选择建议Kafka常用名词介绍Kafka入门1. Kafka安装配置2.Kafka生产者与消费者关系3.Kafka依赖4.生产者发消息5.消费者接受消息6.Kafka高可用性设计6.1集群Kafka备份机制(Reolication) 7.kafka生产者详解7.1 发送类型7.2参数详解…

五分钟帮您理解Linux网络核心知识点——socket和epoll

关于linux网络相关的基础知识点&#xff0c;最热的两个就是socket和epoll&#xff0c;接下来我就用最简单的方式把他俩说清楚便于大家理解&#xff01; Socket Socket 是一种进程间通信的方法&#xff0c;它允许位于同一主机&#xff08;计算机&#xff09;或使用网络连接起来…

【链表OJ 3】链表的中间结点

前言: 本文收录于http://t.csdn.cn/n6UEP数据结构刷题的博客中,首先欢迎大家的来访&#xff0c;其次如有错误&#xff0c;非常欢迎大家的指正&#xff01;我会及时更正错误&#xff01; 目录 一.链表的中间结点 1.1原理:快慢指针的使用 链表元素个数为奇数时 链表元素个数…

只会用插件可不行,这些前端动画技术同样值得收藏-JavaScript篇(下)

目录 前言 介绍 基本使用 关键帧 KeyframeEffect的三种类的声明 keyframes options 动画对象 全局Animation类 标签中的animate函数 总结 相关代码&#xff1a; 前言 接着上文往下介绍&#xff0c;上篇文章我们对JS原生动画和贝塞尔曲线有了一个详细的认识&#x…

了解IL汇编异常处理语法

从网上拷过来一个IL汇编程序&#xff0c;编译时先报如下错&#xff0c; 看它是把空格识别为了下注红线的字符&#xff0c;这是字符编码的问题&#xff0c;用记事本替换功能替换了&#xff1b; 然后又报如下的错&#xff0c; 看不出来问题&#xff0c;拷一句正确的来&#xff0…

Netty面试题3

讲一讲你在网络通讯中遇到的坑或者比较棘手的问题 1、网络延迟问题 2、网络拥塞问题 某公司的Java项目需要向远程服务器发送大量的HTTP请求并获取响应&#xff0c;由于请求量较大&#xff0c;导致网络拥塞&#xff0c;请求响应延迟较高。针对这个问题&#xff0c;我们可以采取…

「2024」预备研究生mem-等差等比数列片段和 一般数列

一、等差数列 片段和 二、等比数列 片段和 三、一般数列

数据结构 | 树的定义及实现

目录 一、树的术语及定义 二、树的实现 2.1 列表之列表 2.2 节点与引用 一、树的术语及定义 节点&#xff1a; 节点是树的基础部分。它可以有自己的名字&#xff0c;我们称作“键”。节点也可以带有附加信息&#xff0c;我们称作“有效载荷”。有效载荷信息对于很多树算法…

AcWing 379. 捉迷藏(最小路径点覆盖匈牙利算法)

输入样例&#xff1a; 7 5 1 2 3 2 2 4 4 5 4 6输出样例&#xff1a; 3 #include<bits/stdc.h> using namespace std; typedef long long ll; const int N220; int n,m,t; int d[N][N],vis[N]; int match[N]; bool find(int x){for(int i1;i<n;i){if(d[x][i]&&…

Mac unsupported architecture

&#xff08;瓜是长大在营养肥料里的最甜&#xff0c;天才是长在恶性土壤中的最好。——培根&#xff09; unsupported architecture 在mac的m系列芯片中容易出现此类问题&#xff0c;因为m系列是arm64的芯片架构&#xff0c;而有些nodejs版本或npm包的芯片架构是x86的&#x…

Visual Studio配置PCL库

Visual Studio配置PCL库 Debug和Release配置新建项目配置属性表测试参考 Debug和Release Debug和Release的配置过程一模一样&#xff0c;唯一区别就在于最后一步插入的附加依赖项不同&#xff0c;因此下面以debug为例。 配置新建项目 1、新建一个C空项目&#xff0c;模式设置…

Linux ——实操篇

Linux ——实操篇 前言vi 和 vim 的基本介绍vi和vim常用的三种模式正常模式插入模式命令行模式 vi和vim基本使用各种模式的相互切换vi和vim快捷键关机&重启命令基本介绍注意细节 用户登录和注销基本介绍使用细节 用户管理基本介绍添加用户基本语法应用案例细节说明 指定/修…

ROS实现机器人移动

开源项目 使用是github上六合机器人工坊的项目。 https://github.com/6-robot/wpr_simulation.git 机器人运动模型 运动模型如下所示&#xff1a;&#x1f447; 机器人运动的消息包&#xff1a; 实现思路&#xff1a;&#x1f447;   为什么要使用/cmd_vel话题。因为这…

Spring Cloud +UniApp 智慧工地云平台源码,智能监控和AI分析系统,危大工程管理、视频监控管理、项目人员管理、绿色施工管理

一套智慧工地云平台源码&#xff0c;PC管理端APP端平板端可视化数据大屏端源码 智慧工地可视化系统利用物联网、人工智能、云计算、大数据、移动互联网等新一代信息技术&#xff0c;通过工地中台、三维建模服务、视频AI分析服务等技术支撑&#xff0c;实现智慧工地高精度动态仿…