Meetup 回顾|Data Infra 研究社第十六期(含资料发布)

news2024/9/20 0:56:36

本文整理于上周六(10月21日)Data Infra 第 16 期的活动内容。本次活动由 Databend 研发工程师-王旭东为大家带来了一场主题为《Databend hash join spill 设计与实现》的分享,让我们一起回顾一下吧~

以下是本次活动的相关视频、资料及文字:

通过本次分享,我们能更加了解 Databend 的 hash join spill 的设计与实现,以及学习如何使用 spill 功能。

本次活动回放也可在 B 站上找到:

🔗  Databend hash join spill 设计与实现|Data Infra 研究社第16期_哔哩哔哩_bilibili

《 Databend hash join spill 设计与实现 》

此次活动的讲稿和相关资料都可以在 Data Infra 第 16 期的 PDF 文件中找到:🔗 https://github.com/databendcn/data-infra/tree/main/第16期-20231021

Hash join 在 pipeline 架构下的设计

左侧是一个典型的两表 join plan,通过 pipeline builder 会生成右侧的 pipeline,包括 main pipeline 和一条子 pipeline ( build pipeline )。

probe pipeline 和 build pipeline 之间通过 bridge 结构关联,hash table 以及 build 和 probe 共用的一些 states 都会存在 bridge 里面,等 hash join build 侧生成 hash table 后会通过 bridge 把 hash table 给 probe 侧用。

Hash join 是多线程的,假设 build side 有 N 个 threads,probe side 有 M 个 threads。Probe 需要等待 build 完成后才能开始。因为两条 pipeline 是同时开始的,我们没法确定 build 先到达还是 probe 先到达,所以 probe 可能先于 build 发生,又因为是多线程执行,可能所有 probe 的线程都先与build 线程到达,也可能发生交错,这时提前到达的 probe 线程需要异步等待状态。

最直观的想法是用 notify 来控制 build 和 probe 之间的等待,因为是多线程的,所以考虑 notify waiters(),但是 notify 不知道预知有多少 waiters,它只会唤醒 register 过的 waiters,在 build 和 probe 这种模式下找到合适的地方进行注册不太可能的,所以不考虑 notify 而是用 tokio 的 watch channel 来解决 Hash join 的多线程模型。

channel 中的初始值是 0,当 build 侧完成后,最后一个 build 线程把 1 发送到 channel 中来唤醒所有的 probe 线程。probe 在开始等待 build 的时候会订阅 watcher channel,得到一个 receiver,如果此时已经是 1,可以直接进行 probe, 否则就要等待 channel 中发生 change,及 build 的最后一个线程把 1 写到 channel 里。

pub async fn wait_first_round_build_done(&self) -> Result<()> {
    let mut rx = self.build_done_watcher.subscribe();
    if *rx.borrow() == 1_u8 {
        return Ok(());
    }
    rx.changed()
        .await
        .map_err(|_| ErrorCode::TokioError("build_done_watcher's sender is dropped"))?;
    debug_assert!(*rx.borrow() == 1_u8);
    Ok(())
}

梳理完 build 和 probe 之间的交互后,看一下 build 的状态。不考虑 spill 的时候,它的状态比较简单,只有三个 steps,不同的 step 对应不同的 event,触发不同的行为,有异步的有同步的,一些比较重的 IO 会进行异步,还有线程之间的等待也会异步,比如在 finalize 之前需要等所有的 threads 都完成 running step (即搜集完所有的 data )。

enum HashJoinBuildStep {
    // The running step of the build phase.
    Running,
    // The finalize step is waiting all build threads to finish and build the hash table.
    Finalize,
    // The fast return step indicates there is no data in build side,
    // so we can directly finish the following steps for hash join and return empty result.
    FastReturn,
    // Wait to spill
    WaitSpill,
    // Start the first spill
    FirstSpill,
    // Following spill after the first spill
    FollowSpill,
    // Wait probe
    WaitProbe,
    // The whole build phase is finished.
    Finished,
}

首先所有的线程都开始运行,进入第一个 step—running,这一步主要收集 input data,到 chunk 里面,一个线程完成当前任务后需要等待其他完成,这里我们可以用 Barrier 这个 sync 结构。最后一个线程负责切分 finalize tasks 和初始化 hash table,之后所有的线程进入 finalize 阶段,并行的写 hash table。

FastReturn 是一个 fast path,如果 build side 数据为空,那么对于一些特定的 join 类型,probe 可以直接返回,不需要 probe 一个 空的 hash table。

接下来看下 probe 的状态

enum HashJoinProbeStep {
    // The step is to wait build phase finished.
    WaitBuild,
    // The running step of the probe phase.
    Running,
    // The final scan step is used to fill missing rows for non-inner join.
    FinalScan,
    // The fast return step indicates we can directly finish the probe phase.
    FastReturn,
    // Spill step is used to spill the probe side data.
    Spill,
    // Async running will read the spilled data, then go to probe
    AsyncRunning,
}

第一个 step 就是我们之前提到的:等待 build 的阶段。这个阶段完成后,进入 probe 阶段。等所有的线程都完成了 probe,对于 non-inner join 要进行 Final Scan,来进行 补 NULL。

Spiller 模块的设计

Spiller 是一个比较独立的模块,也就是说不局限在某一个 operator 上,所有有 spill 需求的 operator 都可以利用 Spiller 模块完成 spill 操作。

具体来说,spiller 负责以下工作:

  1. 收集需要 spill 的数据
  2. partition 需要 spill 的数据
  3. 序列化和反序列化数据
  4. 与存储进行读写交互

每一个 partition 都有一个 file lists,通过 opendal 把对应的 files 写到存储上。

Hash join spill 设计与实现

首先看一下 build 侧,80% 的工作量都在 build 侧,probe 只需要根据 build 的 spill 信息进行 spill 就可以。

enum HashJoinBuildStep {
    // The running step of the build phase.
    Running,
    // The finalize step is waiting all build threads to finish and build the hash table.
    Finalize,
    // The fast return step indicates there is no data in build side,
    // so we can directly finish the following steps for hash join and return empty result.
    FastReturn,
    // Wait to spill
    WaitSpill,
    // Start the first spill
    FirstSpill,
    // Following spill after the first spill
    FollowSpill,
    // Wait probe
    WaitProbe,
    // The whole build phase is finished.
    Finished,
}

引入 spill 后,build step 多了四个主要的 step,WaitSpill、FirstSpill 以及 FollowSpill 和 WaitProbe。

每个线程都有自己的 Spiller,否则这个线程的 spill 工作,不同线程的 spill 通过 BuildSpillCoordinator 来协调。

如果一个线程对当前内存数据大小进行判断,发现需要 spill 后,会进入 WaitSpill 状态,BuildSpillCoordinator会记录当前等待 spill 的线程数量,最后一个线程不会进入等待状态,而是直接作为 coordinator,来协调第一次 spill,它会把 buffer 中所有等待 spill 的数据收集起来,进行 partition,均匀的生成 tasks,分发给每个线程,每个线程的 partition set 都是一样的。完成第一次 spill 后,之后的 spill 不需要再 buffer 数据,如果数据有对应的 partition 可以直接进行 spill,否则 buffer 起来,看后续是否还需要 spill,如果内存够用,可以直接生成 hash table。

等所有的 spill 工作完成后对内存中的数据,进行正常的 hash join build 过程,生成 hash table,通过 bridge 发给 probe 后进入 wait probe 状态。

接下来先看下 hash join probe 侧 spill 的工作,然后再回到 build。

probe 和 build 一样,每个线程都有一个 Spiller。

enum HashJoinProbeStep {
    // The step is to wait build phase finished.
    WaitBuild,
    // The running step of the probe phase.
    Running,
    // The final scan step is used to fill missing rows for non-inner join.
    FinalScan,
    // The fast return step indicates we can directly finish the probe phase.
    FastReturn,
    // Spill step is used to spill the probe side data.
    Spill,
    // Async running will read the spilled data, then go to probe
    AsyncRunning,
}

有了 spill 后,当 WaitBuild 阶段结束后,就要进入 Spill 阶段了。

build 会通过 bridge 把它的 partition set 发过来,比如 {0, 1, 2 3},probe 也会利用 Spiller 对数据计算 partition,如果 partition id 在 build 的 partition set 中,会下刷,对于不在的数据,如果是第一轮,会跟 build 发送过来的 hash table 进行 probe。

spill 完成后,会选出一个 partition id,发送给 build,build 拿到 id 后,会把相关 partition 的数据读上来,进行正常的 hash join build 流程,生成 hash table 给 probe,probe 也会读取对应 id 的数据进行 probe,这就是正常的 hash join 过程。每完成一轮,就取一个 partition id,直到没有需要读取的 partition。

未来规划

  1. 支持递归 spill
  2. 应用具体的场景
  3. 进一步优化

Connect With Us

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

  • Databend Website
  • GitHub Discussions
  • Twitter
  • Slack Channel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1133016.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【算法】模拟退火算法(SAA,Simulated Annealing Algorithm)

模拟退火算法&#xff08;SAA&#xff09;简介 模拟退火算法&#xff08;SAA&#xff0c;Simulated Annealing Algorithm&#xff09;的灵感来源于工艺铸造流程中的退火处理&#xff0c;随着铸造温度升高&#xff0c;分子运动趋于无序&#xff0c;徐徐冷却后&#xff0c;分子运…

【数据分享】2014-2022年我国淘宝村点位数据(Excel格式/Shp格式)

电子商务是过去一二十年我国发展最快的行业&#xff0c;其中又以淘宝为代表&#xff0c;淘宝的发展壮大带动了一大批服务淘宝电子商务的村庄&#xff0c;这些村庄被称为淘宝村&#xff01; 截至到目前&#xff0c;阿里研究院梳理并公布了2014-2022年共9个年份的淘宝村名单&…

2.AUTOSAR SWC设计概述

1.SWC概述 SWC,全称Software Components,运行在RTE之上,属于应用算法逻辑这一层,如下图: 由1.AUTOSAR的架构及方法论中我们了解到该框架的提出就是为了减少平台移植成本、加快研发效率;这也就是说在AUTOSAR框架下,SWC作为组件是需要被重用的,意味着一个…

数据预处理(超详细)

import pandas as pd import numpy as np【例5-1】使用read_csv函数读取CSV文件。 df1 pd.read_csv("sunspots.csv")#读取CSV文件到DataFrame中 print(df1.sample(5))df2 pd.read_table("sunspots.csv",sep ",")#使用read_table&#xff0c;…

人工智能基础_机器学习003_有监督机器学习_sklearn中线性方程和正规方程的计算_使用sklearn解算八元一次方程---人工智能工作笔记0042

然后我们再来看看,如何使用sklearn,来进行正规方程的运算,当然这里 首先要安装sklearn,这里如何安装sklearn就不说了,自己查一下 首先我们还是来计算前面的八元一次方程的解,但是这次我们不用np.linalg.solve这个 解线性方程的方式,也不用 直接 解正规方程的方式: 也就是上面…

接口自动化测试实践

接口自动化概述 Python接口自动化测试零基础入门到精通&#xff08;2023最新版&#xff09; 众所周知&#xff0c;接口自动化测试有着如下特点&#xff1a; 低投入&#xff0c;高产出。 比较容易实现自动化。 和UI自动化测试相比更加稳定。 如何做好一个接口自动化测试项目呢…

华媒舍:怎样利用KOL出文营销推广打造出超级影响力?

利用KOL&#xff08;Key Opinion Leader&#xff09;出文营销推广已成为很多个人和企业提高影响力的重要方法。根据恰当的思路与技巧&#xff0c;你也能轻松吸引大批粉丝并打造无敌的存在影响力。下面我们就以科谱的形式详细介绍怎样利用KOL 出文营销推广&#xff0c;帮助自己做…

SD-WAN让跨境网络访问更快、更安全!

目前许多外贸企业都面临着跨境网络不稳定、不安全的问题&#xff0c;给业务合作带来了很多困扰。但是&#xff0c;现在有一个解决方案能够帮助您解决这些问题&#xff0c;让您的跨境网络访问更快、更安全&#xff0c;那就是SD-WAN&#xff01; 首先&#xff0c;让我们来看看SD-…

微机原理:逻辑运算指令、移位指令

文章目录 一、逻辑运算指令1、取反运算指令2、与运算指令3、或运算指令4、异或运算 二、移位指令1、开环移位指令算术左移&#xff1a;SHL、SAL算术右移&#xff1a;SAR逻辑右移&#xff1a;SHR 2、闭环移位指令含进位的循环左移&#xff1a;RCL含进位的循环右移&#xff1a;RC…

lunar-1.5.jar

公历农历转换包 https://mvnrepository.com/artifact/com.github.heqiao2010/lunar <!-- https://mvnrepository.com/artifact/com.github.heqiao2010/lunar --> <dependency> <groupId>com.github.heqiao2010</groupId> <artifactId>l…

使用文件附件

文件附件在peoplesoft中非常常见 This chapter provides an overview of the file attachment functions and discusses: Developing applications that use file attachment functions. Application development considerations. Application deployment and system configu…

基于 Appium 的 Android UI 自动化测试!

自动化测试是研发人员进行质量保障的重要一环&#xff0c;良好的自动化测试机制能够让开发者及早发现编码中的逻辑缺陷&#xff0c;将风险前置。日常研发中&#xff0c;由于快速迭代的原因&#xff0c;我们经常需要在各个业务线上进行主流程回归测试&#xff0c;目前这种测试大…

Kafka入门04——原理分析

目录 01理解Topic和Partition Topic(主题) Partition(分区) 02理解消息分发 消息发送到分区 消费者订阅和消费指定分区 总结 03再均衡(rebalance) 再均衡的触发 分区分配策略 RangeAssignor(范围分区) RoundRobinAssignor(轮询分区) StickyAssignor(粘性分区) Re…

软件测试面试1000问(含文档)

前前后后面试了有20多家的公司吧&#xff0c;最近抽空把当时的录音整理了下&#xff0c;然后给大家分享下 开头都是差不多&#xff0c;就让做一个自我介绍&#xff0c;这个不用再给大家普及了吧 同时&#xff0c;我也准备了一份软件测试视频教程&#xff08;含接口、自动化、…

进阶课4——随机森林

1.定义 随机森林是一种集成学习方法&#xff0c;它利用多棵树对样本进行训练并预测。 随机森林指的是利用多棵树对样本进行训练并预测的一种分类器&#xff0c;每棵树都由随机选择的一部分特征进行训练和构建。通过多棵树的集成&#xff0c;可以增加模型的多样性和泛化能力。…

MTK AEE_EXP调试方法及user版本打开方案

一、AEE介绍 AEE (Android Exception Engine)是安卓的一个异常捕获和调试信息生成机制。 手机发生错误(异常重启/卡死)时生成db文件(一种被加密过的二进制文件)用来保存和记录异常发生时候的全部内存信息,经过调试和仿真这些信息,能够追踪到异常的缘由。 二、调试方法…

深度学习_6_实战_点集最优直线解_代码解析

问题描述&#xff1a; 上述题目的意思为&#xff0c;人工造出一些数据点&#xff0c;对我们的模型y Xw b ∈进行训练&#xff0c;其中标准模型如下&#xff1a; 其中W和X都为张量&#xff0c;我们训练的模型越接近题目给出的标准模型越好 训练过程如下&#xff1a; 人造数…

订水商城H5实战教程-04用户注册

目录 1 用户注册2 创建模型应用3 开发审核功能4 配置菜单5 发布预览最终效果 我们上一篇讲解了用户协议的功能&#xff0c;如果用户同意协议&#xff0c;就可以跳转到注册页面&#xff0c;要求用户录入个人基本信息&#xff0c;本篇我们介绍一下用户注册功能。 1 用户注册 用户…

Python-自动化绘制股票价格通道线

常规方案 通过将高点/低点与其 2 个或 3 个相邻点进行比较来检测枢轴点,并检查它是否是其中的最高/最低点。对所有枢轴点进行线性回归以获得上方和下方趋势线。价格离开通道后建仓。通过这样做,我们得到如下所示的价格通道。我认为我们可以利用给定的数据取得更好的结果。

OkHttp网络框架深入理解-SSL握手与加密

OkHttp简介 由Square公司贡献的一个处理网络请求的开源项目&#xff0c;是目前Android使用最广泛的网络框架。从Android4.4开始HttpURLConnection的底层实现采用的是OkHttp。 特点&#xff1a; 支持HTTP/2并允许对同一主机的所有请求共享一个套接字通过连接池,减少了请求延迟…