Flink 通过 paimon 关联维表,内存降为原来的1/4

news2025/1/12 4:49:20

你好,我是 shengjk1,多年大厂经验,努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注!你会有如下收益:

  1. 了解大厂经验
  2. 拥有和大厂相匹配的技术等

希望看什么,评论或者私信告诉我!

文章目录

  • 一、前言
  • 二、优化
    • 2.1 分析 Iceberg lookup 部分源码
    • 2.2 切换到 paimon 维表
    • 2.3 paimon 维表原理分析
    • 2.4 是不是一定要通过 iceberg 替换 paimon 才能降低内存
  • 三、总结


一、前言

线上实时任务,通过 FlinkSQL 关联 Iceberg 维表,维表大搞有 60w,首先通过 FlinkSQL关联 Iceberg 维表上线了,经过一番调优后:TaskManager Memory 给到了 16G,但通过监控可以轻易的发现 Heap 没下来过 10GB

图片.png

二、优化

2.1 分析 Iceberg lookup 部分源码

因为 Iceberg 的 lookup 是公司内部自己实现的,就不贴源码了,但核心一点就是,look up 维表 cache 的数据会存在内存中,这就是为什么堆内存没有下来过 10GB

2.2 切换到 paimon 维表

TaskManager Memory 给到了 4G,程序运行的轻轻松松,另外为了增加 rocksdb 性能,也适当的增加了 rocksdb 的内存

图片.png

为了替换 paimon 后内存可以下降那么多呢?

2.3 paimon 维表原理分析

首先来看一下 FlinkSQL look up paimon 的维表的源码,这里我们以 flink1.15 为例。
下载完 paimon 源码后,找到 moudle paimon-flink-1.15

图片.png
通过 OldLookupFunction 类中的

public void eval(Object... values) {
    function.lookup(GenericRowData.of(values)).forEach(this::collect);
}

可以知道调用的 FileStoreLookupFunction.lookup 方法

public Collection<RowData> lookup(RowData keyRow) {
    try {
        checkRefresh();

        InternalRow key = new FlinkRowWrapper(keyRow);
        if (partitionLoader != null) {
            InternalRow partition = refreshDynamicPartition(true);
            if (partition == null) {
                return Collections.emptyList();
            }
            key = JoinedRow.join(key, partition);
        }

        List<InternalRow> results = lookupTable.get(key);
        List<RowData> rows = new ArrayList<>(results.size());
        for (InternalRow matchedRow : results) {
            rows.add(new FlinkRowData(matchedRow));
        }
        return rows;
    } catch (OutOfRangeException e) {
        reopen();
        return lookup(keyRow);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

通过 checkRefresh 方法,一路跟踪到 FileStoreLookupFunction.refresh 方法

private void refresh() throws Exception {
    lookupTable.refresh();
}

这里呢,我们就以没有主键的 paimon 表为例,继续追踪,追踪到 FullCacheLookupTable.refresh 方法,让,后继续追踪,最后到了 FullCacheLookupTable.refreshRow 方法 ,继续追踪直到 NoPrimaryKeyLookupTable.refreshRow 方法

protected void refreshRow(InternalRow row, Predicate predicate) throws IOException {
    joinKeyRow.replaceRow(row);
    if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
        if (predicate == null || predicate.test(row)) {
            state.add(joinKeyRow, row);
        }
    } else {
        throw new RuntimeException(
                String.format(
                        "Received %s message. Only INSERT/UPDATE_AFTER values are expected here.",
                        row.getRowKind()));
    }
}

在这里我们可以看到 cache 的数据存到的 state 中,继续看 state 是如何实现的

RocksDBListState<InternalRow, InternalRow> state

也就是说,维表的cache被存到了 rocksdb 中,这一块内存在 Flink 中属于 off-heap,并且通过 manager menory 控制。
rocksdb这一块,如果不太了解的话,可以理解为 mysql,mysql 里面可以存放 TB 级的数据,但它的占用的内存却很少,rocksdb 也是类似的

2.4 是不是一定要通过 iceberg 替换 paimon 才能降低内存

答案是否定了,开头提到了之所以 iceberg 维表占用内存大,主要的原因是因为内部的实现方式:cache 到内存中了。

三、总结

本文通过实际案例,详细介绍了如何通过替换维表实现FlinkSQL任务内存占用的优化。作者通过分析Iceberg lookup部分源码,发现其cache的数据会存在内存中,导致内存占用过大。作者将维表替换为paimon,通过分析paimon维表的原理,发现其cache的数据存储在rocksdb中,从而实现了内存占用的降低。本文对于需要进行FlinkSQL任务内存优化的读者具有一定的参考价值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1710187.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数字化农业新时代:图扑农林牧综合监控平台

利用图扑自研 HT for Web GIS 产品&#xff0c;结合遥感技术&#xff0c;构建可交互式的农林牧数据分析平台。该平台围绕地块总览、播种分析、牛只管理、设备查询四个维度&#xff0c;对地区的全貌、农场、村集体分布以及相应的环境进行多样化的可视化展示和进行数据支持&#…

等保三级-MySQL 加固

1、身份鉴别 要求&#xff1a;建议身份密码登录&#xff0c;身份标识具有唯一性&#xff0c;身份鉴别信息具有复杂度要求&#xff0c;密码长度最少为8位&#xff0c;密码由数字、字母大小写、特殊符号组成、并设置定期更换&#xff0c;更换时间最长位90天 &#xff08;1&#…

php反序列化学习(1)

1、php面向对象基本概念 类的定义&#xff1a; 类是定义了一件事物的抽象特征&#xff0c;它将数据的形式以及这些数据上的操作封装住在一起。&#xff08;对象是具有类类型的变量&#xff0c;是对类的实例&#xff09; 构成&#xff1a; 成员变量&#xff08;属性&#xf…

《QT实用小工具·六十七》QTabWidget实现的炫酷标签工具栏

1、概述 源码放在文章末尾 该项目基于QTabWidget和QTabBar实现了灵活的标签工具栏&#xff0c;主要包含如下功能&#xff1a; 1、标签栏可以收起&#xff0c;可以展开 2、可以在标签栏中添加新的标签界面 3、可以从标签工具栏中把界面拖出来&#xff0c;也可以拖回去 4、关闭拖…

详解Java ThreadLocal

个人博客 详解Java ThreadLocal | iwts’s blog Java ThreadLocal ThreadLocal提供了线程内存储变量的能力&#xff0c;这些变量不同之处在于每一个线程读取的变量是对应的互相独立的。通过get和set方法就可以得到当前线程对应的值。 TreadLocal存储模型 ThreadLocal的静态…

vue3通过ElementPlus的tooltip组件实现自定义指令文字提示

vue3自定义指令实现tooltip文字提示&#xff0c;通过ElementPlus的tooltip组件 简介步骤1&#xff1a;定义指令tooltip步骤2&#xff1a;createTooltip函数步骤3&#xff1a;autoShowToolTip步骤4&#xff1a;注册指令步骤5&#xff1a;测试 简介 之前的项目中&#xff0c;有些…

基于springboot实现华府便利店信息管理系统项目【项目源码+论文说明】计算机毕业设计

基于springboot实现华府便利店信息管理系统演示 摘要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本华府便利店信息管理系统就是在这样的大环境下诞生&#xff…

NXP i.MX8系列平台开发讲解 - 3.13 Linux 之Audio子系统(二)

专栏文章目录传送门&#xff1a;返回专栏目录 目录 1. Linux ALSA 内核框架 2. Linux ALSA 代码分析 2.1 声卡驱动初始化 2.2 声卡创建注册 2.3 PCM设备创建 3. ALSA ASoC 3.1 Machine 3.2 Platform 3.3 Codec 上一章节&#xff0c;对于Linux Audio子系统有了大概的了解…

做视频号小店遇到差评怎么处理?如何规避差

大家好&#xff0c;我是喷火龙。 大家在做店的时候应该都会遇到品退、中差评这些问题&#xff0c;这对我们的店铺影响还是非常大的&#xff0c;差评过多就会影响店铺的体验分&#xff0c;从而影响店铺的流量&#xff0c;还会间接的影响商品的转化率&#xff0c;如果太低的话&a…

@RequestBody注解

文章目录 RequestBody注解基本概念在postman里如何发送接收端带有RequestBody的请求&#xff1f; RequestBody注解 基本概念 扩展&#xff1a; http报文会包含四部分&#xff0c;第一部分是请求行&#xff0c;第二部分是请求头&#xff0c;第三部分是空行&#xff0c;第四部分…

示教编程操作QA

示教器的连接问题 Q1&#xff1a;示教器显示连接断开&#xff0c;该如何解决&#xff1f; A1&#xff1a;原因&#xff1a;示教器与控制器之间断开连接&#xff0c;需要调整控制器来解决该问题。解决方法1&#xff1a;重启控制器&#xff1b;解决方法2&#xff1a;将控制器与P…

Python: 使用pyotp实现OTP一次性密码验证

使用pyotp实现OTP一次性密码验证 OTP的基本原理 生成一个共享秘钥作为随机数的种子服务端通过种子计算出当前的密码客户端也通过相同的种子计算出当前的密码验证客户端生成的密码和服务端生成的密码是否匹配 服务端和客户端计算的方式一样 共享密钥 时间因子 算法 > 密…

XDebug配置几件教程,phpstorm实现http请求断点调试

写这篇的文章的初衷:网络上配置XDebug的文章有很多,XDebug也有官方的文档, PhpStorm也有官方的文档,为什么还要写那? 相信不少人,都有一种感觉,虽然教程很多,但是按教程走一遍,自己的确不能正常调试。 问题出在下面几个方面: 1. 对调试过程中,没有一定的认识,因此…

100个 Unity小游戏系列五 -Unity 抽奖游戏专题三老虎机游戏

一、演示效果 二、知识点讲解 2.1 布局 public void CreateItems(SlotsData[] slotsData){isInited false;slotsPrizeList new List<SlotsData>();for (int i 0; i < slotsData.Length; i){var item slotsData[i];slotsPrizeList.Add(item);}float bottomY -it…

探索Python的包与模块:构建项目的基石

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、模块与包的基础认知 1. 模块的定义与创建 2. 包的组织与管理 二、模块与包的进阶使用…

【分支控制(if-else判断)】单分支-双分支-多分支-嵌套分支

程序流程控制 在程序中, 程序运行的流程控制决定程序是如何执行的, 是我们必须掌握的, 主要有三大流程控制语句. 顺序控制 (简单)分支控制 (判断)循环控制 (循环) 一. 顺序控制 顺序控制介绍 程序从上到下逐行地执行, 中间没有任何判断和跳转. 顺序控制举例和注意事项 Java中…

AUS GLOBAL 与 UNICEF 联合国儿童基金会共同帮助叙利亚和土耳其地震受灾居民

2023年2月6日,土耳其东南部和叙利亚发生两次强烈地震和数十次余震,数以千计的儿童和家庭面临危机。 成千上万的房屋被毁,许多家庭被迫流离失所,而在一年中的这个时候,气温经常低于冰点,雪和冻雨很常见。许多学校、医院以及其他医疗和教育设施被地震破坏或摧毁,这对儿童造成了巨…

马斯克开启军备竞赛,xAI筹集60亿美元

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调重新阅读。而最新科技&#xff08;Mamba&#xff0c;xLSTM,KAN&#xff09;则提供了大模…

蓝桥杯—SysTick中断精准定时实现闪烁灯

在嵌入式系统中&#xff0c;SysTick_Handler 是一个中断服务例程&#xff08;Interrupt Service Routine, ISR&#xff09;&#xff0c;用于处理 SysTick 定时器的中断。SysTick 定时器通常用于提供一个周期性的定时中断&#xff0c;可以用来实现延时或者周期性任务。 SysTick…

AWS联网和内容分发之Transit Gateway

将Amazon VPC、AWS账户和本地网络连接到一个网关中。AWS Transit Gateway通过中央枢纽连接Amazon虚拟私有云&#xff08;VPC&#xff09;和本地网络。此连接简化了您的网络&#xff0c;并且结束了复杂的对等关系。Transit Gateway充当高度可扩展的云路由器&#xff0c;每个新的…