【实时计算 Flink】SQL作业大状态导致反压的调优原理与方法

news2025/1/18 11:03:41

状态管理不仅影响应用的性能,还关系到系统的稳定性和资源的有效利用。如果状态管理不当,可能会导致性能下降、资源耗尽,甚至系统崩溃。本文为您介绍SQL作业大状态导致反压的调优原理与方法。

运行原理:状态算子的产生

作为一种特定领域语言,SQL的设计初衷是隐藏底层数据处理的复杂性,可以通过声明式语言来进行数据操作。而Flink SQL由于其架构的特殊性,在实现层面通常需要引入状态后端配合系统检查点(Checkpoint)来保证计算结果的最终一致性。目前Flink SQL由优化器根据配置项以及SQL语句来推导生成状态算子,想要高效处理有状态的大规模数据和性能调优,需要对SQL状态算子生成机制和管理策略有一定了解。

基于优化器推导产生的状态算子

主要有如下三种状态算子:

状态算子

状态清理机制

ChangelogNormalize

生命周期TTL

SinkUpsertMaterlizer

LookupJoin(*)

ChangelogNormalize

ChangelogNormalize旨在对涉及主键语义的数据变更日志进行标准化处理。通过该算子,可以有效地整合和优化数据变更记录,确保数据的一致性和准确性。该状态算子会在以下两种场景出现:

  • 使用了带有主键的upsert源表

    upsert源表特指在保持主键顺序一致性的前提下,仅产生基于主键的UPDATE(包括INSERT和 UPDATE_AFTER)及DELETE操作的变更数据表。例如,Upsert Kafka便是支持这类操作的典型连接器之一。此外,您也可以通过重写自定义源表连接器中的getChangelogMode方法,实现upsert功能。

    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.upsert();
    }
  • 显式设置'table.exec.source.cdc-events-duplicate' = 'true'

    在使用at-least-once语义进行CDC事件处理时,可能会产生重复的变更日志。在需要exactly-once语义时,您需要开启此配置项来对变更日志进行去重。例如

  • 当出现该算子时,上游数据将按照Flink SQL源表DDL中定义的主键做一次hash shuffle操作后使用ValueState来存储当前主键下最新的整行记录。更新状态并向下游发送变更的过程如下图所示。处理第二条-U(2, 'Jerry', 77)时State已经empty,说明截止目前+I/+UA和-D/-UB已经两两抵销,当前这条retract消息是重复的,可以丢弃。

SinkUpsertMaterializer

专门用于处理具有主键定义的结果表,并确保数据的物化操作符合upsert语义。在数据流更新过程中,如果无法保证upsert的特定要求,即按照主键进行更新时保持数据的唯一性和有序性,优化器会自动引入此算子。它通过维护基于结果表主键的状态信息,来确保这些约束得到满足。更多信息及常见场景请参见Flink SQL中Changelog事件乱序处理原理。

LookupJoin

在处理LookupJoin操作时,若主动配置了系统优化选项'table.optimizer.non-deterministic-update.strategy'='TRY_RESOLVE',且优化器识别到潜在的非确定性更新问题(如何消除流查询的不确定性影响),则系统会尝试采取特殊措施以解决这一问题。具体而言,若通过引入一个状态算子能够消除非确定性,优化器便会自动创建一个带状态的LookupJoin算子。

带状态的LookupJoin算子主要适用于以下情况:结果表被定义了主键,而这些主键完全或部分来自于维表,同时维表中的数据可能会发生变化(例如通过变更数据捕获,即CDC Lookup Source机制)。此外,用于Join操作的字段在维表中并非主键。在这种情况下,带状态的LookupJoin算子能够有效地处理数据的动态变化,确保查询结果的准确性和一致性。

基于SQL操作产生的状态算子

基于SQL操作产生的状态算子,按状态清理机制可以分为TTL过期和依赖watermark推进两类。具体说来,Flink SQL里有部分状态算子的生命周期不是由TTL来控制,例如Window相关的状态计算(WindowAggregate、WindowDeduplicate、WindowJoin、WindowTopN等)。它们的状态清理主要依赖于watermark的推进,当watermark超过窗口结束时间时,内置的定时器就会触发状态清理。

状态算子

如何产生

状态清理机制

Deduplicate

使用row_number语句,order by的字段必须为时间属性(time attribute)字段(事件时间event time或处理时间processing time),且只取第一条。

TTL

RegularJoin

使用join语句,等值条件里不包含时间属性字段。

GroupAggregate

使用group by语句进行分组聚合,如sum、count、min、max、first_value、last_value,或使用distinct关键字。

GlobalGroupAggregate

分组聚合开启local-global优化。

IncrementalGroupAggregate

当存在两层分组聚合操作并开启两阶段优化时,内层聚合对应的状态算子GlobalGroupAggregate和外层聚合对应的状态算子LocalGroupAggregate被合并成一个IncrementalGroupAggregate。

Rank

使用row_number语句,order by的字段必须为非时间属性字段。

GlobalRank

使用row_number语句,order by的字段必须为非时间属性字段,并开启local-global优化。

IntervalJoin

使用join语句,等值条件里包含时间属性字段(事件时间或处理时间)。例如:

L.time between R.time + X and R.time + Y 
  -- 或 
R.time between L.time - Y and L.time - X

watermark

TemporalJoin

使用基于事件时间的inner或left join语句。

WindowDeduplicate

基于Window TVF的去重操作。

WindowAggregate

基于Window TVF聚合。

GlobalWindowAggregate

基于Window TVF聚合,并开启两阶段优化。

WindowJoin

基于Window TVF的Join。

WindowRank

基于Window TVF的排序。

GroupWindowAggregate

基于legacy语法的Window聚合。

问题诊断方法

在Flink作业遭遇性能瓶颈时,系统往往表现出明显的反压现象。这种反压可能由多种因素引起,但主要的原因之一是作业状态规模的持续膨胀,直至超出内存限制。此时,状态存储引擎会将部分不频繁使用的状态数据移至磁盘,而磁盘与内存在数据存取速度上的巨大差异,使得磁盘IO操作成为数据处理效率的瓶颈。尤其在Flink的计算过程中,如果算子频繁地从磁盘读取状态数据,将显著增加作业的延迟,降低整体处理速度,成为性能问题的根源。

为了准确识别是否由状态访问引发反压,需要对作业的运行状态和算子行为进行深入分析。利用监控工具追踪和诊断性能瓶颈,可以有效地发现并解决由状态访问引起的性能问题,从而提升Flink作业的性能,具体方法请参见问题诊断方法。

调优方法

主动避免生成不必要的状态算子

基于SQL操作产生的状态算子一般很难避免,因此主要针对优化器自动推导的算子进行讨论。

  • ChangelogNormalize

    在使用upsert source进行数据处理时,需注意其ChangelogNormalize状态节点的生成。通常情况下,除了事件时间的时态关联(event time temporal join)外,其他upsert source应用场景都会产生该状态节点。因此,在选择Upsert Kafka或类似的Upsert连接器时,应首先评估具体的使用场景,对于非事件时间关联场景,应特别关注状态算子的状态指标(state metrics)。由于状态节点是基于KeyedState的,当源表的主键数量庞大时,状态节点的规模也会相应增加。如果物理表的主键更新频繁,状态节点也将频繁地被访问和修改。从实践角度而言,像数据同步类的场景,建议避免使用Upsert Kafka作为源表连接器,同时也最好选择能够保证exactly-once语义的数据同步工具。

  • SinkUpsertMaterializer

    auto作为table.exec.sink.upsert-materialize配置项的默认值,表明系统会自动判断数据的一致性,尤其是在变更日志(changelog)出现无序的情况下。该机制确保了通过引入SinkUpsertMaterializer来维持数据处理的准确性。但并不意味着每当该算子被激活,数据就一定存在无序问题。例如,将多个分组键(group by key)合并的操作,这种情况下优化器无法准确推导出upsert键,因此出于安全考虑,会默认添加SinkUpsertMaterializer。如果对数据的分布有充分的了解,不使用该算子也能够确保输出结果的正确性,可以将参数设置为none,从而在数据正确性和性能上都得到保证。

    您可以通过检查作业的最后一个节点来确认SinkUpsertMaterializer是否被激活使用。在作业的运行拓扑图中(如下所示),该算子通常会与sink算子一起显示,形成一个算子链。通过这种方式,可以直观地监控和评估SinkUpsertMaterializer在数据处理过程中的实际应用情况,从而做出更加合理的优化决策。

    image.png

    image.png

    在检测到生成了特定算子且数据计算无误的情况下,可以调整配置项为 'table.exec.sink.upsert-materialize'='none'(配置步骤请参见如何配置作业运行参数?),以避免自动添加SinkUpsertMaterializer。实时计算引擎VVR 8.0及以上版本中引入了SQL执行计划智能分析功能,协助您更好地识别此类问题,如下图所示。

    image.png

减少状态访问频次:开启mini-batch

在对延时要求不高(比如分钟级别更新)的场景下,开启mini-batch攒批优化将会减少State的访问和更新频率(具体操作请参见开启MiniBatch),提升吞吐。

实时计算Flink版可以应用mini-batch的状态算子如下:

状态算子

说明

ChangelogNormalize

无。

Deduplicate

可配置table.exec.deduplicate.mini-batch.compact-changes-enable,在基于事件时间去重时是否压缩Changelog。

GroupAggregate

GlobalGroupAggregate

IncrementalGroupAggregate

无。

RegularJoin

需额外配置table.exec.stream.join.mini-batch-enabled开启mini-batch join优化。适用于更新流和outer join场景。

减少状态大小设置合理生命周期

说明

开启或关闭TTL不能保证完全兼容。当尝试在已开启TTL的作业上关闭TTL配置时,或者反过来操作时,将会导致兼容性失败并引发StateMigrationException异常。

在优化计算系统时,关键在于精简状态数据以提高性能。您可以在作业运维页面配置State数据过期时间(参数详情请参见运行参数配置)来控制作业状态的生命周期,以满足不同的运维需求和策略。

image.png

过短的TTL可能导致数据未能及时处理,从而产生不符合预期的计算结果,例如,在聚合或连接操作时,部分数据晚到,而相关状态已过期,导致结果异常。相反,过长的TTL会消耗资源,降低作业的稳定性。因此,在对Flink SQL作业进行TTL配置时,建议根据数据特性和业务需求进行恰当的TTL设置。例如,如果计算周期以自然天为单位,并且数据跨天漂移不会超过1小时,那么将TTL设定为25小时即可满足需求。数据开发人员应深入了解业务场景和计算逻辑,以实现最佳的平衡。

此外,针对双流连接场景,Flink SQL自实时计算引擎VVR 8.0.1版本起,支持通过JOIN_STATE_TTL Hint为左流和右流分别设置不同的生命周期。这一改进允许为各自数据流定制生命周期,有效减少不必要的状态存储开销,从而优化作业性能。您可以根据左右流数据的实际生命周期需求,灵活配置,以达到节省资源和提高作业效率的目的,具体操作请参见查询提示。

SELECT /*+ JOIN_STATE_TTL('left_table' = '..', 'right_table' = '..') */ *
FROM left_table [LEFT | RIGHT | INNER] JOIN right_table ON ...

下面是一个作业使用JOIN_STATE_TTL Hint前后的State大小对比示例。

对比

作业情况

状态大小

优化前

  • 双流join操作,左流数据量大,约为右流的20至50倍。右流需长期保存数据,原定为18天。为提升性能,实际将右流的保存周期缩短至10天,导致数据正确性受损。

  • join操作的状态大小约为5.8 TB。

  • 单作业所需资源高达700 CU。

22

优化后

  • 通过合理设置JOIN_STATE_TTL Hint,左流可缩短至12小时,右流保持18天的保存周期,无需牺牲数据完整性。

  • join操作的状态大幅减少至约590 GB,仅约为原来的十分之一。

  • 资源消耗显著降低,从700 CU降至200-300 CU,节省了50%-70%的资源。

23e

减少状态大小:命中更优的执行计划

在生成执行计划时,优化器会结合输入SQL和配置选择相应的State实现。

  • 利用主键优化双流连接

    • 当连接键(Join Key)包含主键时,系统采用ValueState<RowData>进行数据存储,这样可以为每个连接键仅保留一条最新记录,实现存储空间的最大化节省。

    • 如果连接操作使用了非主键字段,即使已定义主键,系统会使用MapState<RowData, RowData>进行存储,以便为每个连接键保存来自源表的、基于主键的最新记录。

    • 在未定义主键的情况下,系统将使用MapState<RowData, Integer>存储数据,记录每个连接键对应的整行数据及其出现次数。

    因此,建议在建表DDL中声明主键,并在双流连接时优先使用主键,以优化存储效率。

  • 优化append_only流去重操作

    使用ROW_NUMBER函数替代FIRST_VALUE或LAST_VALUE函数进行去重,可以更有效地保留首次(ROW_NUMBER函数生成的Deduplicate算子仅保留出现过的Key)或最新出现的记录(保留Key及其最后一次出现的记录)。

  • 提升聚合查询性能

    在进行多维度统计,例如计算全网UV、手机客户端UV、PC端UV等,推荐使用AGG WITH FILTER语法替代传统的CASE WHEN语法。SQL优化器能够识别Filter参数,使得在同一个字段上根据不同条件计算COUNT DISTINCT时能够共享状态信息,减少状态的读写次数。根据性能测试结果,采用AGG WITH FILTER语法相比CASE WHEN可以提升高达一倍的性能。

减少状态大小:调整多流Join顺序,缓解State放大

Flink在处理数据流时,采用了二进制哈希连接(Binary Hash Join)的方式。在下图示例中,A与B的连接结果会导致数据存储的冗余,这种冗余程度与连接操作的频率成正比。随着加入连接的流数量增加,State的冗余问题会变得更加严重。

image.png

您可以策略性地调整连接的顺序来优化该问题。具体来说,可以先将数据量较小的流进行连接,而将数据量大的流放在最后进行。这样的顺序调整有助于减轻状态冗余带来的放大效应,从而提高数据处理的效率和性能。

尽可能减少读盘

为了提升系统性能,可以通过减少磁盘读取次数并优化内存使用来实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2198573.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面试题:Redis(二)

1. 面试题 2. MoreKey案列 事故案例 2.1 生成上如何限制key*/flushdb/flushall等危险命令的使用&#xff1f; 通过redis.conf配置文件中在SECURITY选项中禁用这些命令 2.2 不用key*避免卡顿那用什么&#xff1f; 用scan命令&#xff0c;类似mysql中的limit命令 语法&…

数学建模算法与应用 第2章 整数规划及其求解方法

目录 2.1 概述 2.2 0-1整数规划模型 2.3 分枝定界法&#xff08;Branch and Bound&#xff09; 2.4 蒙特卡洛法&#xff08;随机取样法&#xff09; Matlab代码示例&#xff1a;蒙特卡洛法求解简单整数规划 2.5 整数规划的计算机求解工具 习题 2 总结 整数规划是线性规…

Window11 安装Java21教程

随着Java版本的迭代&#xff0c;最新的长期支持版本已经更新到Java21了&#xff0c;虽然笔者许多代码还是当年用Java8写的&#xff0c;但抱残守缺从来不适合IT人员&#xff0c;该来的我们始终要欣然面对。 其实随着各项技术的发展&#xff0c;Java许多组件现在其实都不需要或者…

Authentication Lab | Client Side Auth

关注这个靶场的其它相关笔记&#xff1a;Authentication Lab —— 靶场笔记合集-CSDN博客 0x01&#xff1a;Client Side Auth 前情提要 有些时候&#xff0c;开发人员会将身份验证的逻辑写于前端&#xff0c;这样写是十分不安全的&#xff0c;因为前端的代码几乎全部都是可见的…

借助微软 Teams 中的 Tableau,加速数据驱动型决策与协作流程

Tableau 应用已正式上线 Teams 商店&#xff01;如此&#xff0c;企业就能以一种更可靠和安全的方式在 Teams 中共享数据见解。 让团队能快速协作做出数据驱动型决策不再是可选项&#xff0c;而是业务成功的必备条件。 在几个月前的TC24 大会上&#xff0c;Tableau 再次表明了…

关于Amazon Linux 2023的版本及包管理器

在亚马逊上创建EC2实例时&#xff0c;会看到有一个Amazon Linux镜像。 那这个镜像与其他Linux有什么关系和区别呢&#xff1f; 网站是介绍&#xff1a;Amazon Linux 2023 是基于 Linux 的现代化通用操作系统&#xff0c;提供 5 年的长期支持。它针对 AWS 进行了优化&#xff0…

Stable Diffusion最新版nowebui的api使用详解

最近在使用stable diffusion最新版的Stable Diffusion WebUI Forge进行api调用,下面来一步一步的进行展开吧!!! 1、下载lllyasviel/stable-diffusion-webui-forge GitHub - lllyasviel/stable-diffusion-webui-forgeContribute to lllyasviel/stable-diffusion-webui-for…

gaussdb hccdp认证思考题01 GaussDB数据库介绍

01_GaussDB数据库介绍 1. &#xff08;判断题&#xff09;gsql是一款运行在Windows操作系统上的图形界面SQL客户端工具&#xff0c;用于连接GaussDB集群中的数据库以及管理数据库对象。 --错。

120页满分PPT | 企业级业务架构和IT架构规划方案

方案内容综述 方案涵盖了从战略分析到具体实施路径的内容。提出了IT架构规划的工作思路&#xff0c;包括项目启动、部门访谈、资料收集、内部数据库搜索与先进实践研究等步骤&#xff0c;旨在通过这些步骤完成现状及差距分析&#xff0c;并基于此设计未来的应用架构、数据架构…

应用UX体验标准

1、应用导航 标准编号 2.1.1.1 系统返回 标准描述 所有界面都可以执行系统返回操作。 除一级界面外&#xff0c;所有全屏界面均需要提供返回/关闭/取消按钮。(全屏沉浸式场景除外) 测试方法 使用侧边返回手势&#xff0c;验证当前应用界面是否可以执行系统返回操作。检查…

ML 系列:机器学习和深度学习的深层次总结(14) — 逻辑回归(第 3 部分 — 实施)

目录 一、说明 二、数据集说明 三、探索性数据分析 3.1. 查找 null 值 3.2. 数据预处理 3.3. 独特价值 3.4. 两种类型&#xff08;恶性、良性&#xff09;之间的数据传播 3.5. 特征选择和降维 3.5.1.特征选择 3.5.2 降维 &#xff08;PCA&#xff09; 3.6. 选择数据的两个重要特…

【测试】——Loadrunner 介绍与使用

&#x1f4d6; 前言&#xff1a;LoadRunner是一款开源桌面应用软件&#xff0c;可用来模拟用户负载完成性能测试工作&#xff0c;LoadRunner的功能在版本不断升级的过程中已经十分强大&#xff0c;现在很多互联网公司都在使用LoadRunner来完成产品或者Loadrunner是业界公认的权…

纠删码参数自适应匹配问题ECP-AMP实验方案(二)

6.方法设计 6.1.数据获取 为了收集不同的文件大小和纠删码参数对性能指标的影响&#xff0c;本文在Hadoop平台上进行了模拟实验。Hadoop是一种开源的分布式存储和计算框架&#xff0c;它可以支持不同类型的纠删码&#xff0c;并提供了一些应用程序接口和工具来测试和评估纠删…

最大异或对(每周一类)

今天我们来看这个最大异或类这道题 最大异或对 1.首先&#xff0c;我们先来了解一下异或是什么&#xff0c;之后还要讲一下同或。 众所周知&#xff0c;数字在计算机中是由二进制来表示的&#xff0c;比如十进制的7&#xff0c;用二进制表示就是 111&#xff0c;十进制的3&…

SpringBoot+Activiti7工作流使用进阶实例-高亮显示BPMN流程图( SpringBoot+Activiti+mybatis+shiro实现)

文章目录 说明绘制流程图排他网关设置任务节点设置创建工程修改 pom.xml 文件准备数据库的表和测试数据修改 application.yml 文件配置静态资源Shiro 相关配置ShiroConfiguration.javaMyShiroRealm.java流程控制器添加静态的资源和模板页面运行结果截图源码地址说明 使用 Spri…

量子数字签名概述

我们都知道&#xff0c;基于量子力学原理研究密钥生成和使用的学科称为量子密码学。其内容包括了量子密钥分发、量子秘密共享、量子指纹识别、量子比特承诺、量子货币、秘密通信扩展量子密钥、量子安全计算、量子数字签名、量子隐性传态等。虽然各种技术发展的状态不同&#xf…

45岁被裁员的程序员,何去何从?

在当今快速变化的技术行业&#xff0c;职业生涯的稳定性受到挑战。在45岁被裁员&#xff0c;对很多程序员来说&#xff0c;可能是一种惊慌失措的体验。然而&#xff0c;这个阶段也可以被视为一个重新审视和调整方向的机会。本文将对可能的出路进行全方位的分析&#xff0c;并提…

springboot 整合 rabbitMQ(1)

目录 一、MQ概述 二、MQ的优势和劣势 三、常见的MQ产品 RabbitMQ使用步骤 第一步&#xff1a;确保rabbitmq启动并且可以访问15672 第二步&#xff1a;导入依赖 第三步&#xff1a;配置 auto自动确认 manual手工确认&#xff08;推荐使用&#xff01;可以防止消息丢失&a…

网站集群批量管理-Ansible-(playbook)

1.剧本概述 1. playbook 文件,用于长久保存并且实现批量管理,维护,部署的文件. 类似于脚本存放命令和变量 2. 剧本yaml格式,yaml格式的文件:空格,冒号 2. 区别 ans-playbookans ad-hoc共同点批量管理,使用模块批量管理,使用模块区别重复调用不是很方便,不容易重复场景部署服务…

裸眼3D巨幕视频演示Pr城市广告显示屏样机模板

震撼大气超强视觉冲击力3D城市数字广告牌视频演示pr模板工程文件。 5个城市街景裸眼3D巨幕户外广告显示屏样机模板。每个场景提供2个不同的相机视图。 下载地址&#xff1a;https://prmuban.com/40595.html