Flink 优化(六) --------- FlinkSQL 调优

news2025/1/9 1:30:59

目录

  • 一、设置空闲状态保留时间
  • 二、开启 MiniBatch
  • 三、开启 LocalGlobal
  • 四、开启 Split Distinct
  • 五、多维 DISTINCT 使用 Filter
  • 六、设置参数总结


FlinkSQL 官网配置参数:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html

一、设置空闲状态保留时间

Flink SQL 新手有可能犯的错误,其中之一就是忘记设置空闲状态保留时间导致状态爆炸。列举两个场景:

➢ FlinkSQL 的 regular join(inner、left、right),左右表的数据都会一直保存在状态里,不会清理!要么设置 TTL,要么使用 FlinkSQL 的 interval join。

➢ 使用 Top-N 语法进行去重,重复数据的出现一般都位于特定区间内 (例如一小时或一天内),过了这段时间之后,对应的状态就不再需要了。

Flink SQL 可以指定空闲状态(即未更新的状态)被保留的最小时间,当状态中某个 key 对应的状态未更新的时间达到阈值时,该条状态被自动清理:

#API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
#参数指定
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "1 h");

二、开启 MiniBatch

MiniBatch 是微批处理,原理是缓存一定的数据后再触发处理,以减少对 State 的访问,从而提升吞吐并减少数据的输出量。MiniBatch 主要依靠在每个 Task 上注册的 Timer 线程来触发微批,需要消耗一定的线程调度性能。

➢ MiniBatch 默认关闭,开启方式如下:

// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");

➢ 适用场景

微批处理通过增加延迟换取高吞吐,如果有超低延迟的要求,不建议开启微批处理。通常对于聚合的场景,微批处理可以显著的提升系统性能,建议开启。

在这里插入图片描述
➢ 注意事项:

1)目前,key-value 配置项仅被 Blink planner 支持。
2)1.12 之前的版本有 bug,开启 miniBatch,不会清理过期状态,也就是说如果设置状态的 TTL,无法清理过期状态。1.12 版本才修复这个问题。

参考 ISSUE:https://issues.apache.org/jira/browse/FLINK-17096

三、开启 LocalGlobal

原理概述

LocalGlobal 优化将原先的 Aggregate 分成 Local+Global 两阶段聚合 , 即 MapReduce 模型中的 Combine+Reduce 处理模式。第一阶段在上游节点本地攒一批数据进行聚合 (localAgg) ,并输出这次微批的增量值 (Accumulator)。第二阶段再将收到的 Accumulator 合并 (Merge) ,得到最终的结果 (GlobalAgg) 。

LocalGlobal 本质上能够靠 LocalAgg 的聚合筛除部分倾斜数据,从而降低 GlobalAgg的热点,提升性能。结合下图理解 LocalGlobal 如何解决数据倾斜的问题。

在这里插入图片描述
由上图可知:

  • 未开启 LocalGlobal 优化,由于流中的数据倾斜,Key 为红色的聚合算子实例需要处理更多的记录,这就导致了热点问题。
  • 开启 LocalGlobal 优化后,先进行本地聚合,再进行全局聚合。可大大减少 GlobalAgg 的热点,提高性能。

➢ LocalGlobal 开启方式:

1)LocalGlobal 优化需要先开启 MiniBatch,依赖于 MiniBatch 的参数。
2)table.optimizer.agg-phase-strategy: 聚合策略。默认 AUTO,支持参数 AUTO、TWO_PHASE(使用 LocalGlobal 两阶段聚合)、ONE_PHASE(仅使用 Global 一阶段聚合)。

// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");

// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");

➢ 注意事项:
1)需要先开启 MiniBatch
2)开启 LocalGlobal 需要 UDAF 实现 Merge 方法。

四、开启 Split Distinct

LocalGlobal 优化针对普通聚合 (例如 SUM、COUNT、MAX、MIN 和 AVG ) 有较好的效果,对于 DISTINCT 的聚合 (如 COUNT DISTINCT) 收效不明显,因为 COUNT DISTINCT 在 Local 聚合时,对于 DISTINCT KEY 的去重率不高,导致在 Global 节点仍然存在热点。

原理概述

之前,为了解决 COUNT DISTINCT 的热点问题,通常需要手动改写为两层聚合 (增加按 Distinct Key 取模的打散层 )。

从 Flink1.9.0 版本开始,提供了 COUNT DISTINCT 自动打散功能 , 通过HASH_CODE(distinct_key) % BUCKET_NUM 打散,不需要手动重写。Split Distinct 和
LocalGlobal 的原理对比参见下图。

在这里插入图片描述
Distinct 举例:

SELECT a, COUNT(DISTINCT b)
FROM T
GROUP BY a

手动打散举例:

SELECT a, SUM(cnt)
FROM (
 SELECT a, COUNT(DISTINCT b) as cnt
 FROM T
 GROUP BY a, MOD(HASH_CODE(b), 1024)
)
GROUP BY a

➢ Split Distinct 开启方式

默认不开启,使用参数显式开启:

  • table.optimizer.distinct-agg.split.enabled: true,默认 false。
  • table.optimizer.distinct-agg.split.bucket-num: Split Distinct 优化在第一层聚合中,被打散的 bucket 数目。默认 1024。
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:(要结合 minibatch 一起使用)
// 开启 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");

➢ 注意事项:

(1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 优化方法。
(2)拆分出来的两个 GROUP 聚合还可参与 LocalGlobal 优化。
(3)该功能在 Flink1.9.0 版本及以上版本才支持。

五、多维 DISTINCT 使用 Filter

原理概述

在某些场景下,可能需要从不同维度来统计 count(distinct)的结果(比如统计 uv、app 端的 uv、web 端的 uv),可能会使用如下 CASE WHEN 语法。

SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB_b,
COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a

在这种情况下,建议使用 FILTER 语法, 目前的 Flink SQL 优化器可以识别同一唯一键上的不同 FILTER 参数。如,在上面的示例中,三个 COUNT DISTINCT 都作用在 b 列上。

此时,经过优化器识别后,Flink 可以只使用一个共享状态实例,而不是三个状态实例,可减少状态的大小和对状态的访问。

将上边的 CASE WHEN 替换成 FILTER 后,如下所示:

SELECT
a,
COUNT(DISTINCT b) AS total_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('A', 'B')) AS AB_b,
COUNT(DISTINCT b) FILTER (WHERE c IN ('C', 'D')) AS CD_b
FROM T
GROUP BY a

六、设置参数总结

总结以上的调优参数,代码如下:

// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
// 开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", "true");
// 批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
// 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
// 开启 Split Distinct
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
// 第一层打散的 bucket 数目
configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024");
// 指定时区
configuration.setString("table.local-time-zone", "Asia/Shanghai");

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/431042.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Zookeeper源码分析——算法基础

Zookeeper高级 Paxos 算法 Paxos算法:一种基于消息传递且具有高度容错特性的一致性算法。 Paxos算法解决的问题:就是如何快速正确的在一个分布式系统中对某个数据值达成一致,并且保证不论发生任何异常, 都不会破坏整个系统的一…

回溯递归(例题+思路+代码)

题目描述 leetcode 77 思路 组合问题适合用回溯求解。 经典解法&#xff1a;for循环 内部回溯。 每次进入回溯方法时&#xff0c;先判断终止条件&#xff0c;再进行当前层的循环&#xff0c;循环进行下一层递归。 代码 class Solution {public List<List<Integer&…

【C++入门必备知识:缺省参数+函数重载+函数名修饰规则】

【C入门必备知识&#xff1a;缺省参数函数重载函数名修饰规则】 ①.缺省参数Ⅰ.概念1.全缺省参数2.半缺省参数3.使用规则4.应用场景再现 ②.函数重载Ⅰ.概念1.参数个数不同2.参数类型不同3.参数类型顺序不同4.对返回值没有要求 ③.函数名修饰规则Ⅰ.C/C的不同 ①.缺省参数 Ⅰ.…

ubuntu22.04安装pyCUDA

更多内容请查看 www.laowubiji.com 笔者近期想使用GPU进行并行计算&#xff0c;搜索之后看到需要用到pyCUDA库函数&#xff0c;所以需要在所使用的ubuntu22.04系统中部署pyCUDA库&#xff0c;没想到在部署过程中折腾了好几回&#xff0c;总算是安装成功了。简单记录过程如下&a…

从输入URL到页面展示到底发生了什么

刚开始写这篇文章还是挺纠结的&#xff0c;因为网上搜索“从输入url到页面展示到底发生了什么”&#xff0c;你可以搜到一大堆的资料。而且面试这道题基本是必考题&#xff0c;二月份面试的时候&#xff0c;虽然知道这个过程发生了什么&#xff0c;不过当面试官一步步追问下去的…

AI智能改写-文本改写人工智能

随着信息技术的不断发展&#xff0c;互联网上各种信息的海量涌现&#xff0c;万千信息竞相呈现&#xff0c;如何让自己的内容独领风骚&#xff0c;引起用户的注意和眼球&#xff1f;这时&#xff0c;一款强大的文章智能改写神器便应运而生&#xff0c;可以让您的内容变得更加独…

可能你已经刷了很多01背包的题,但是真的对01背包领悟透彻了吗?,看我这一篇,使君对01背包的理解更进一步【代码+图解+文字描述】

一.概念理解&#xff1a;什么是01背包 关于01背包的概念理解如下&#xff1a;01背包是在M件物品取出若干件放在空间为W的背包里&#xff0c;每件物品的体积为W1&#xff0c;W2至Wn&#xff0c;与之相对应的价值为P1,P2至Pn。001背包的约束条件是给定几种物品&#xff0c;每种物…

SpringAMQP的使用

目录一、什么是SpringAMQP二、基本消息队列消息发送消息接收三、WorkQueue队列四、发布订阅模型FanoutExchangeDirectExchangeTopicExchange五、消息转换器一、什么是SpringAMQP 它可以大大的简化我们的开发&#xff0c;不用我们再自己创建连接写一堆代码&#xff0c;具有便捷…

【MySQL--05】表的约束

文章目录 1.表的约束1.1空属性1.2默认值default vs null1.3列描述1.4 zerofill1.5主键primary key1.6 自增长auto_increment1.7唯一键 unique如何设计主键&#xff1f;1.8 外键 foreign key 1.表的约束 真正的约束字段的是数据类型&#xff0c;但是数据类型约束很单一&#xf…

安捷伦E4405B

18320918653 E4405B E4405B|Agilent|ESA-E系列|10G|频谱分析仪|9kHz至13.2GHz 安捷伦 Agilent 惠普 HP 测量速度&#xff1a;28次更新/秒 测量精度&#xff1a;1dB 可选用的10Hz分辨事宽滤波器 机箱可容纳6插槽选件卡 97dB三阶动态范围 能在现场使用的坚固&#xff0c…

mycat2安装配置,mycat2分库分表,mycat2一库多表,mycat2自增id

1、官网下载&#xff08;官网下载地址&#xff09; 官网下载地址 Index of /2.0/ 下载模板 下载jdk包 下载好后吧jdk包房到mycat的lib目录下 2、配置启动 配置结构 mycat配置文件夹 clusters- prototype.cluster.json //无集群的时候自动创建- c0.cluster.json- c1.cluster…

UML与代码的对应关系

五种关系的耦合强弱比较&#xff1a;依赖<关联<聚合<组合<继承 依赖 虚线箭头 可描述为&#xff1a;Uses a 依赖是类的五种关系中耦合最小的一种关系。 因为在生成代码的时候&#xff0c;这两个关系类都不会增加属性。 注意1&#xff1a; Water类的生命期&…

【机器学习】独立成分分析(ICA)及Matlab实现

独立成分分析及Matlab实现1.问题引入2.ICA原理3.ICA算法步骤4.性质与优点5.程序代码6.程序分析7.运行结果1.问题引入 独立成分分析&#xff08;ICA&#xff09;最初由Aapo Hyvrinen等人于1980年代提出&#xff0c;其起源可以追溯到对神经科学和信号处理领域的研究需求。ICA的提…

C语言判断一个日期是在该年的第几天案例讲解

今天是2023年4月11号&#xff0c;我们就用今天举例得出是2023年的第几天。 思路分析 1&#xff09;我们想知道2023年4月11号是2023年的第几天&#xff0c;只需要把1到3月份的天数累加求和然后加上今天日期也就是11就可以算出2023年4月11号是2023年的第几天。 推广&#xff1a;…

kafka集群节点重启后未被topic识别

1.案例 kafka集群的节点重启后,topic为apex的主题识别不到重启后的broker节点id,但是还能识别到副本集还在原来的broker节点上 在kafka manager上查看 继续往下查看 2.查看kafka日志报错原因 以下是两个不同的broker节点报错的报错日志 tail -f /etc/kafka/kafka/logs/ka…

(排序9)非比较排序之计数排序

非比较排序之计数排序 之前讲的七种排序方法的话&#xff0c;都是比较排序&#xff1b;除此之外还有三种非比胶排序&#xff1a;计数排序&#xff0c;基数排序&#xff0c;桶排序。后面两个实际应用没啥&#xff0c;没啥价值。非比较排序的话&#xff0c;他的条件都比较苛刻&a…

HTTP 和 HTTPS(请求响应报文格式 + 请求方法 + 响应状态码 + HTTPS 加密流程 + Cookie 和 Session)

文章目录1. HTTP 是什么2. HTTP 请求报文和响应报文的格式1&#xff09;请求报文格式2&#xff09;响应报文格式3&#xff09;报文中空行的作用3. HTTP 的长连接和短连接4. URL1&#xff09;在浏览器中输入 www.baidu.com 后执行的全部过程5. HTTP 常用的请求方法6. GET 和 POS…

自媒体都在用的5个素材网站,视频、音效、图片全部免费下载~

推荐几个自媒体必备的素材库&#xff0c;免费可商用&#xff0c;建议收藏&#xff01; 1、菜鸟图库 视频素材下载_mp4视频大全 - 菜鸟图库 国内超大的素材库&#xff0c;在这里你可以找到设计、办公、图片、视频、音频等各种素材。视频素材就有上千个&#xff0c;全部都很高清…

后端应用架构

微服务架构划分 ⚠️ 生产环境实际部署中&#xff0c;基础能力、公共基础能力将分别在国内、美国集群部署。在没有引入数据同步的场景下&#xff0c;数据是隔离的。 接入层&#xff08;交互层&#xff09; 接入层主要完成协议转换、熔断限流、统一鉴权等能力&#xff0c;起到保…

Linux网络服务之ftp

目录 一.ftp的相关知识1.1 ftp的简介1.2 ftp的数据连接模式 二.svftpd的安装和配置2.1 svftpd安装3.2 设置本地用户验证访问ftp3.2.1 设置本地用户可以访问ftp&#xff0c;禁止匿名用户登录3.2.2 对本地用户访问切换目录进行限制 3.3 黑名单和白名单的使用3.3.1 黑名单的使用3.…