浅谈Flink批模式Adaptive Hash Join

news2024/9/27 23:23:46

Flink批Hash Join递归超限问题

随着Flink流批一体能力的迅速发展以及Flink SQL易用性的提升,越来越多的厂商开始将Flink作为离线批处理引擎使用。在我们使用Flink进行大规模join操作时,也许会发生如下的异常,导致任务失败:

Hash join exceeded maximum number of recursions, without reducing partitions enough to be memory resident.

字面意思即为Hash Join的递归次数超出限制。Flink批模式下的join算法有两种,即Hybrid Hash Join和Sort-Merge Join。顾名思义,Hybrid Hash Join就是Simple Hash Join和Grace Hash Join两种算法的结合(关于它们,看官可参考这篇文章)。引用一张Flink官方博客中的手绘图来说明。

Flink的Hybrid Hash Join在build阶段会积极地利用TaskManager的托管内存,并将内存无法容纳的哈希分区spill到磁盘中。在probe阶段,当内存中的哈希分区处理完成后,会释放掉对应的MemorySegment,并将先前溢写到磁盘的分区读入,以提升probe效率。特别注意,如果溢写分区对空闲的托管内存而言仍然过大(特别是存在数据倾斜的情况时),就会将其递归拆分成更小的分区,原理如下图所示。

当然,递归拆分也不能是无限制的。在Blink Runtime中,如果递归拆分3次仍然不能满足内存需求,就会抛出前文所述的异常了。

笔者在今年7月ApacheCon Asia 2022流处理专场的分享内容里谈到了这个问题,并且将其归咎于Flink SQL的CBO优化器的代价模型不太科学,导致其十分偏向选择Hash Join。由于修改的难度很大,所以暂时的workaround就是在任务失败后,自动设置table.exec.disabled-operators参数来禁用掉ShuffleHashJoin算子,从而强制使用Sort-Merge Join。

当然这仍然不算优雅的解决方法,接下来简要看看Flink 1.16版本中提出的更好一点的方案:Adaptive Hash Join。

Adaptive Hash Join的实现

所谓adaptive(自适应),就是指Hash Join递归超限时,不必让任务失败,而是将这些大分区自动转为Sort-Merge Join来处理。

Blink Runtime中的哈希表有两种,即BinaryHashTable(key的类型为BinaryRowData)和LongHybridHashTable(key的类型为Long)。以前者为例,查看其prepareNextPartition()方法,该方法负责递归地取得下一个要处理的哈希分区。

private boolean prepareNextPartition() throws IOException {
        // finalize and cleanup the partitions of the current table
        // ......

        // there are pending partitions
        final BinaryHashPartition p = this.partitionsPending.get(0);
        // ......

        final int nextRecursionLevel = p.getRecursionLevel() + 1;
        if (nextRecursionLevel == 2) {
            LOG.info("Recursive hash join: partition number is " + p.getPartitionNumber());
        } else if (nextRecursionLevel > MAX_RECURSION_DEPTH) {
            LOG.info(
                    "Partition number [{}] recursive level more than {}, process the partition using SortMergeJoin later.",
                    p.getPartitionNumber(),
                    MAX_RECURSION_DEPTH);
            // if the partition has spilled to disk more than three times, process it by sort merge
            // join later
            this.partitionsPendingForSMJ.add(p);
            // also need to remove it from pending list
            this.partitionsPending.remove(0);
            // recursively get the next partition
            return prepareNextPartition();
        }

        // build the next table; memory must be allocated after this call
        buildTableFromSpilledPartition(p, nextRecursionLevel);

        // set the probe side
        setPartitionProbeReader(p);

        // unregister the pending partition
        this.partitionsPending.remove(0);
        this.currentRecursionDepth = p.getRecursionLevel() + 1;

        // recursively get the next
        return nextMatching();
    }

注意当递归深度超过MAX_RECURSION_DEPTH(常量定义即为3)时,会将分区直接放入一个名为partitionsPendingForSMJ的容器中,等待做Sort-Merge Join。另外,在该方法调用的buildTableFromSpilledPartition()方法(对溢写分区执行build操作)开头,去掉了对递归超限的判断,也就是说Hash join exceeded maximum number of recursions异常已经成为历史。

那么等待做Sort-Merge Join的分区是如何被处理的?查看Blink Runtime中的HashJoinOperator算子,在构造该算子时,需要比原来多传入一个SortMergeJoinFunction的实例:

private final SortMergeJoinFunction sortMergeJoinFunction;

SortMergeJoinFunction实际上是将旧版的SortMergeJoinOperator处理逻辑抽离出来的类,算法本身没有任何变化。然后从哈希表中读取前述的partitionsPendingForSMJ容器,对每个分区的build侧和probe侧分别执行Sort-Merge Join操作即可。

/**
     * If here also exists partitions which spilled to disk more than three time when hash join end,
     * means that the key in these partitions is very skewed, so fallback to sort merge join
     * algorithm to process it.
     */
    private void fallbackSMJProcessPartition() throws Exception {
        if (!table.getPartitionsPendingForSMJ().isEmpty()) {
            // release memory to MemoryManager first that is used to sort merge join operator
            table.releaseMemoryCacheForSMJ();
            // initialize sort merge join operator
            LOG.info("Fallback to sort merge join to process spilled partitions.");
            initialSortMergeJoinFunction();
            fallbackSMJ = true;

            for (BinaryHashPartition p : table.getPartitionsPendingForSMJ()) {
                // process build side
                RowIterator<BinaryRowData> buildSideIter =
                        table.getSpilledPartitionBuildSideIter(p);
                while (buildSideIter.advanceNext()) {
                    processSortMergeJoinElement1(buildSideIter.getRow());
                }

                // process probe side
                ProbeIterator probeIter = table.getSpilledPartitionProbeSideIter(p);
                BinaryRowData probeNext;
                while ((probeNext = probeIter.next()) != null) {
                    processSortMergeJoinElement2(probeNext);
                }
            }

            // close the HashTable
            closeHashTable();

            // finish build and probe
            sortMergeJoinFunction.endInput(1);
            sortMergeJoinFunction.endInput(2);
            LOG.info("Finish sort merge join for spilled partitions.");
        }
    }

    private void initialSortMergeJoinFunction() throws Exception {
        sortMergeJoinFunction.open(
                true,
                this.getContainingTask(),
                this.getOperatorConfig(),
                (StreamRecordCollector) this.collector,
                this.computeMemorySize(),
                this.getRuntimeContext(),
                this.getMetricGroup());
    }

    private void processSortMergeJoinElement1(RowData rowData) throws Exception {
        if (leftIsBuild) {
            sortMergeJoinFunction.processElement1(rowData);
        } else {
            sortMergeJoinFunction.processElement2(rowData);
        }
    }

    private void processSortMergeJoinElement2(RowData rowData) throws Exception {
        if (leftIsBuild) {
            sortMergeJoinFunction.processElement2(rowData);
        } else {
            sortMergeJoinFunction.processElement1(rowData);
        }
    }

与BinaryHashTable不同,LongHybridHashTable的join逻辑全部是代码生成的,在对应的生成器LongHashJoinGenerator中,可以看到与上文类似的代码,看官可以自行找来读读。

The End

民那晚安晚安。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/130899.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Es进阶检索

本文用到的测试数据及所有代码链接&#xff1a; https://blog.csdn.net/m0_62436868/article/details/128505566?spm1001.2014.3001.5501 1、SearchAPI ES 支持两种基本方式检索 : 一个是通过使用 REST request URI 发送搜索参数&#xff08;uri检索参数&#xff09; 另…

基于华为eNSP的中小企业办公园区网络规划与设计

目录一、需求分析&#xff08;一&#xff09;项目背景&#xff08;二&#xff09;网络业务需求&#xff08;三&#xff09;网络应用需求二、网络结构设计三、网络拓扑图四、网络设备基本配置五、项目测试结语运用到的技术有&#xff1a; 1、虚拟局域网&#xff08;VLAN&#xf…

人工智能--你需要知道的一切

人工智能--你需要知道的一切 人工智能是当今最受关注的技术之一。但它究竟是什么&#xff1f;你为什么要关心&#xff1f; 人工智能是当今最受关注的技术之一。但它到底是什么&#xff1f;你为什么要关心&#xff1f;在这里&#xff0c;我们将介绍你需要知道的关于人工智能的…

java开发的美妆化妆品电商商城系统

简介 Java基于ssm(可以转springboot项目哦)开发的美妆商城系统&#xff0c;主要是卖化妆品的系统&#xff0c;用户可以浏览商品&#xff0c;加入购物车&#xff0c;下单&#xff0c;在个人中心管理自己的订单。管理员可以管理自己的商品&#xff0c;发布商品&#xff0c;上下架…

2023年留学基金委(CSC)公派访问学者博士后项目选派办法及解读

2023年即将伊始。知识人网祝大家新年快乐&#xff0c;心想事成&#xff01;同时提醒申请者关注国家留学基金委&#xff08;CSC&#xff09;的申报政策。目前CSC官网已经发布了2023年公派访问学者、博士后的项目通知&#xff0c;知识人网小编现将其选派工作流程及选派办法原文转…

C语言 自定义类型

结构体内存对齐 解释1 从内存开始位置存放 解释二 int对齐数是4 vs默认对齐数是8 取其较小值的倍数 那就是4的位置存放 char 对数1 vs是8默认 谁的较小值对数是1 那就是任意数 所以c2防砸8 如下图绿色部分 struct S2 {char c1;int i;double d;//8 };输出结果16 struct S4 …

当我们身边没有示波器就无法测量频率与占空比了?一招教你解决身边没有示波器时如何测量STM32定时器产生PWM的频率与占空比

当我们身边没有示波器就无法测量频率与占空比了&#xff1f;这篇文件小编就教大家如何使用定时器输入捕获功能测量频率与占空比。 原理解析 定时器输入捕获一般应用在两个方面&#xff0c;一个方面是脉冲跳变沿时间测量&#xff0c;另一方面是 PWM输入测量。下面将要使用就是测…

LaTeX快速入门

文章目录LaTeX快速入门一、 概述1、 简介2、 环境配置3、文件结构4、 文档结构二、 基本概念1、 第一个LaTeX程序2、 宏包和文档类2.1 宏包2.2 文档类3、 文件组织的方式4、 相关术语和概念三、 排版文字1、 文字编码2、 排版中文3、 LaTeX中的字符3.1 空格和分行3.2 注释3.3 特…

Redis(Ⅰ)【学习笔记】

&#xff08;仅作为个人学习笔记&#xff09; 1.什么是Redis&#xff1f; 1.Redis 是用C语言开发的一个开源的高性能键值对&#xff08; key-value &#xff09;内存数据库&#xff0c;它是一种 NoSQL 数据库。 2.它是【单进程单线程】的内存数据库&#xff0c;所以说不存在线…

charAt()方法的使用

charAt()函数 Java charAt() 方法属于Java String类 charAt() 方法用于返回指定索引处的字符。索引范围为从 0 到 length() - 1。 语法 public char charAt(int index) 参数 index – 字符的索引。 返回值 返回指定索引处的字符。 举个例子&#xff1a; package 做题;…

基于思维进化算法优化BP神经网络(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️❤️&#x1f4a5;&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑…

Kubernetes之PV与PVC

1. 综述 当前&#xff0c;存储的方式和种类有很多&#xff0c;并且各种存储的参数也需要非常专业的技术人员才能够了解。在Kubernetes集群中&#xff0c;放了方便我们的使用和管理&#xff0c;Kubernetes提出了PV和PVC的概念&#xff0c;这样Kubernetes集群的管理人员就可以将注…

linux系统中利用设备树完成对LED的控制

大家好&#xff0c;今天主要和大家聊一聊&#xff0c;如何使用linux系统中的设备树控制led。 目录 第一&#xff1a;设备树LED基本驱动原理 第二&#xff1a;LED灯驱动程序的实现 第一&#xff1a;设备树LED基本驱动原理 本次实验采用设备树向linux内核传递相关的寄存器物理…

MATLAB演示梯度上升寻找极值

MATLAB演示梯度上升寻找极值 梯度 梯度的本意是一个向量&#xff08;矢量&#xff09;&#xff0c;表示某一函数在该点处的方向导数沿着该方向取得最大值&#xff0c;即函数在该点处沿着该方向&#xff08;此梯度的方向&#xff09;变化最快&#xff0c;变化率最大&#xff08…

dubbo源码解析-SPI机制

SPI&#xff0c;Service Provider Interface&#xff0c;服务提供者接口&#xff0c;是一种服务发现机制。 JDK 的 SPI 规范 JDK 的 SPI 规范规定&#xff1a;  接口名&#xff1a;可随意定义  实现类名&#xff1a;可随意定义  提供者配置文件路径&#xff1a;其查…

剑指offer(简单)

目录 数组中重复的数字 替换空格 从尾到头打印链表 用两个栈实现队列 斐波那契数列 青蛙跳台阶问题 旋转数组的最小数字 二进制中的1的个数 打印从1到最大的n位数 删除链表的节点 调整数组顺序使奇数位于偶数前面 链表中倒数第k个节点 反转链表 合并两个排序的链…

一文读懂tensorflow: 基本概念和API

文章目录前言tensorflow发展历程基本概念张量神经网络、层模型超参数损失函数交叉熵函数激活函数梯度和自动微分优化器tensorflow 2.x 和 tensorflow 1.xtensorflow开发流程tensorflow API张量的定义和运算示例张量的初始化方式梯度计算模型的搭建示例&#xff1a;MINST手写数字…

Casting out Primes: Bignum Arithmetic for Zero-Knowledge Proofs学习笔记

1. 引言 Polygon zero团队 Daniel Lubarov 和 Polygon zkEVM团队 Jordi Baylina 2022年10月联合发表的论文 《Casting out Primes: Bignum Arithmetic for Zero-Knowledge Proofs》。 受“casting out nines” 技术——做对9取模运算并提供概率性结果&#xff0c;启发&#x…

RocketMQ 的存储模型

文章目录1.整体概览2.数据文件3.消费文件4.索引文件1.整体概览 Producer &#xff1a;消息发布的角色&#xff0c;Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递&#xff0c;投递的过程支持快速失败并且低延迟。 Consumer &#xff1a;消息消费的角…

如何应用人工智能和机器学习来预测消费者的行为

应用AI和机器学习来预测消费者行为 在这篇文章中&#xff0c;我们将学习和分析一般的消费者行为。我们还将了解人工智能是如何帮助发现有价值的见解的&#xff0c;从而使公司做出正确的决定&#xff0c;以实现提供更好的价值和创造更好的收入的愿景。 我们还将通过一个案例进…