Flink TableAPI Aggregation And DataType

news2025/1/22 20:53:04

序言

这里整理下聚合的优化选项 以及 数据类型

Stream Aggregation

SQL 是数据分析中使用最广泛的语言。Flink Table API 和 SQL 使用户能够以更少的时间和精力定义高效的流分析应用程序。此外,Flink Table API 和 SQL 是高效优化过的,它集成了许多查询优化和算子优化。但并不是所有的优化都是默认开启的,因此对于某些工作负载,可以通过打开某些选项来提高性能。

虽然在介绍TableApi 但是在StreamApi的算子上也可以借鉴他们优化的处理方式cuiyaonan2000@163.com

MiniBatch 聚合 #

默认情况下,无界聚合算子是逐条处理输入的记录,即:(1)从状态中读取累加器,(2)累加/撤回记录至累加器,(3)将累加器写回状态,(4)下一条记录将再次从(1)开始处理。这种处理模式可能会增加 StateBackend 开销(尤其是对于 RocksDB StateBackend )。此外,生产中非常常见的数据倾斜会使这个问题恶化,并且容易导致 job 发生反压。

MiniBatch 聚合的核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个 key 只需一个操作即可访问状态。这样可以大大减少状态开销并获得更好的吞吐量。但是,这可能会增加一些延迟,因为它会缓冲一些记录而不是立即处理它们。这是吞吐量和延迟之间的权衡。---这中跟windows很像,就是根据一段时间来处理,那跟window是否有冲突呢cuiyaonan2000@163.com????

下图说明了 mini-batch 聚合如何减少状态操作。

 

代码中开启优化

// instantiate table environment
TableEnvironment tEnv = ...;

// access flink configuration
TableConfig configuration = tEnv.getConfig();
// set low-level key-value options
configuration.set("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
configuration.set("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
configuration.set("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task

Local-Global 聚合 #

Local-Global 聚合是为解决数据倾斜问题提出的,通过将一组聚合分为两个阶段,首先在上游进行本地聚合,然后在下游进行全局聚合,类似于 MapReduce 中的 Combine + Reduce 模式。例如,就以下 SQL 而言:

SELECT color, sum(id)
FROM T
GROUP BY color

数据流中的记录可能会倾斜,因此某些聚合算子的实例必须比其他实例处理更多的记录,这会产生热点问题。本地聚合可以将一定数量具有相同 key 的输入数据累加到单个累加器中。全局聚合将仅接收 reduce 后的累加器,而不是大量的原始输入数据。这可以大大减少网络 shuffle 和状态访问的成本。每次本地聚合累积的输入数据量基于 mini-batch 间隔。这意味着 local-global 聚合依赖于启用了 mini-batch 优化。

下图显示了 local-global 聚合如何提高性能。

 代码开启,这个我们在streamApi中可以借鉴

// instantiate table environment
TableEnvironment tEnv = ...;

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation

拆分 distinct 聚合 #

Local-Global 优化可有效消除常规聚合的数据倾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在处理 distinct 聚合时,其性能并不令人满意。---是丢Local-Global的补充优化

例如,如果我们要分析今天有多少唯一用户登录。我们可能有以下查询:

SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day

如果 distinct key (即 user_id)的值分布稀疏,则 COUNT DISTINCT 不适合减少数据。即使启用了 local-global 优化也没有太大帮助。因为累加器仍然包含几乎所有原始记录,并且全局聚合将成为瓶颈(大多数繁重的累加器由一个任务处理,即同一天)。

这个优化的想法是将不同的聚合(例如 COUNT(DISTINCT col))分为两个级别。第一次聚合由 group key 和额外的 bucket key 进行 shuffle。bucket key 是使用 HASH_CODE(distinct_key) % BUCKET_NUM 计算的。BUCKET_NUM 默认为1024,可以通过 table.optimizer.distinct-agg.split.bucket-num 选项进行配置。第二次聚合是由原始 group key 进行 shuffle,并使用 SUM 聚合来自不同 buckets 的 COUNT DISTINCT 值。由于相同的 distinct key 将仅在同一 bucket 中计算,因此转换是等效的。bucket key 充当附加 group key 的角色,以分担 group key 中热点的负担。bucket key 使 job 具有可伸缩性来解决不同聚合中的数据倾斜/热点。

拆分 distinct 聚合后,以上查询将被自动改写为以下查询:

SELECT day, SUM(cnt)
FROM (
    SELECT day, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day

下图显示了拆分 distinct 聚合如何提高性能(假设颜色表示 days,字母表示 user_id)。

 

开启代码

// instantiate table environment
TableEnvironment tEnv = ...;

tEnv.getConfig()
  .set("table.optimizer.distinct-agg.split.enabled", "true");  // enable distinct agg split

在 distinct 聚合上使用 FILTER 修饰符 #

在某些情况下,用户可能需要从不同维度计算 UV(独立访客)的数量,例如来自 Android 的 UV、iPhone 的 UV、Web 的 UV 和总 UV。很多人会选择 CASE WHEN,例如:

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

但是,在这种情况下,建议使用 FILTER 语法而不是 CASE WHEN。因为 FILTER 更符合 SQL 标准,并且能获得更多的性能提升。FILTER 是用于聚合函数的修饰符,用于限制聚合中使用的值。将上面的示例替换为 FILTER 修饰符,如下所示:

SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day

Flink SQL 优化器可以识别相同的 distinct key 上的不同过滤器参数。例如,在上面的示例中,三个 COUNT DISTINCT 都在 user_id 一列上。Flink 可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小。在某些工作负载下,可以获得显著的性能提升。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/645006.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springcloud-Nacos-注册表结构

// Map(nameSpace:: Group,Service) Map<String,Map<String,Service>> service new ConcurrentHashMap<>(); //Service 中有个属性 clusterMap // Map(服务名称,集群) Map<groupNameserviceName,Cluster> clusterMap new HashMap<>(); //而Clus…

局部聚集系数

最近在打一个图数据库算法的比赛&#xff0c;分到了计算局部聚集系数这道题&#xff0c;要求速度快&#xff0c;空间复杂度可以不首要考虑。这对我是一个全新的知识&#xff0c;用此博客记录我的学习历程。 搜了一圈视频教程&#xff0c;b站没有这块的知识&#xff0c;只有yout…

系统码的编译码与汉明码

本专栏包含信息论与编码的核心知识&#xff0c;按知识点组织&#xff0c;可作为教学或学习的参考。markdown版本已归档至【Github仓库&#xff1a;https://github.com/timerring/information-theory 】或者公众号【AIShareLab】回复 信息论 获取。 文章目录 系统码的编译码线性…

Android使用WebView与Native交互的三种方式 ( 附源码 )

先附上assets目录中html的源代码文件内容&#xff0c;下面的demo都是使用这几个文件&#xff1a; javascript.html: <!DOCTYPE html> <html> <head><meta charset"utf-8"><title>Carson</title><script>function callAn…

深入理解多层感知机(MLP):原理与代码解析

文章目录 1. MLP的原理1.1 结构1.2 激活函数1.3 前向传播1.4 反向传播算法 2.MLP分类任务应用3.参考文献&#xff1a; 多层感知机&#xff08;MLP&#xff09;是一种经典的神经网络模型&#xff0c;由多个神经元层组成。它的结构和功能使其成为深度学习中的重要组成部分。MLP在…

【Java算法题】剑指offer_算法之01搜索算法

前言 刷题链接&#xff1a; https://www.nowcoder.com/exam/oj/ta?page2&tpId13&type265 1. 搜索算法 JZ53 数字在升序数组中出现的次数 思路&#xff1a;遍历数组&#xff0c;count记录k值出现次数&#xff0c;返回count public class Solution {public int GetN…

【word wps文字】目录页码中的格式在打印或打印预览时变为和正文页码格式一样,如何调整?

一、问题背景 之前在闲鱼上&#xff0c;有个人找我改word排版&#xff0c;有一个需求就是正文页码两边需要横杠。 但是目录中显示的页码&#xff0c;不需要横杠。 我当时是一个一个在目录中删除横杠的&#xff0c;借助了查找与替换功能。 更改后&#xff0c;目录页码如下所…

Java与SpringBoot对redis的使用方式

目录 1.Java连接redis 1.1 使用Jedis1.2 使用连接池连接redis1.3 java连接redis集群模式 2.SpringBoot整合redis 2.1 StringRedisTemplate2.2 RedisTemplate 1.Java连接redis redis支持哪些语言可以操作 &#xff08;去redis官网查询&#xff09; 1.1 使用Jedis (1)添加jedis…

HTML+CSS实训——Day14——项目其他页面的完善

仓库地址&#xff1a;HTML实训 前言 今天我们继续用老师提供的api&#xff0c;完善一些剩余的功能&#xff0c;因为我的git push好像传乱了&#xff0c;所以仓库大家看看最新的就好&#xff0c;最新的一天一定包括前一天所做的内容。 Collect.htmlcss 收藏界面 <!DOCT…

Dijkstra迪杰斯特拉算法求最短路径(C++实现)

名人说&#xff1a;一花独放不是春&#xff0c;百花齐放花满园。——《增广贤文》 作者&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 〇、Dijkstra迪杰斯特拉算法介绍1、Dijkstra算法是什么&#xff1f;2、Dijkstra算法…

低代码工具:jvs-list(列表引擎)2.1.7功能清单及新增功能介绍

在低代码开发平台中&#xff0c;列表页是一个用于显示数据列表的页面。它通常用于展示数据库中的多条记录&#xff0c;并提供搜索、排序和筛选等功能&#xff0c;以方便用户对数据进行查找和浏览。 jvs-list是jvs快速开发平台的列表页的配置引擎&#xff0c;它和普通的crud 具…

Rocky Linux9.的系统中安装MySQL8 实战

前言 Centos7 已经停止维护&#xff0c;学习其他linux系统势在必行&#xff0c;今天我们要探讨的是&#xff1a; 在Rocky linux9的系统上安装MySQL8 文章目录 前言1. 从Appstream中进行安装1.1 更新系统中的所有软件包1.2 安装MySQL1.3 启动并测试1.4 查看MySQL版本 2. 初始化操…

获取地理位置请求免费天气接口

需求&#xff1a;根据地理位置信息去请求免费的天气接口数据&#xff0c;拿到数据后进行展示&#xff0c;这边我用到了俩个key&#xff0c;一个是高德天气的key和心知天气的key&#xff0c;为什么要这么麻烦呢&#xff0c;是因为之前写过一版不需要获取地理位置&#xff0c;直接…

嵌入式系统开发中的常见挑战和困难

当涉及嵌入式系统开发时&#xff0c;可能会遇到以下一些常见的挑战和困难&#xff1a; 复杂的硬件和软件集成&#xff1a;嵌入式系统通常涉及硬件和软件的紧密集成&#xff0c;需要同时理解和处理硬件和软件层面的问题。这种复杂性可能导致调试和故障排除变得更加困难。 有限…

【哈佛积极心理学笔记】第19讲 让爱情天长地久

第19讲 让爱情天长地久 What makes relationship thrive, some characteristic: work hard the fix mindset: “you are so smart, you are so intelligent” the malleable mindset: “you work so hard” Finding mindset: “finding the right partner” (fix) some thin…

PLC领域从业者的工作待遇现状如何?

目前从事可编程逻辑控制器&#xff08;PLC&#xff09;领域的人员在工作待遇方面相对较好。PLC是工业自动化中广泛使用的控制设备&#xff0c;用于监控和控制各种工业过程和机械设备。以下是关于从事PLC的人员工作待遇的一些常见情况和趋势&#xff1a; 薪资水平&#xff1a;P…

【算法与数据结构】242、LeetCode有效的字母异位词

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;题目要求判断两个字符串是否为字母异位词。什么是字母异位词呢&#xff1f;顾名思义&#xff0c;就是字…

深入理解相机硬件抽象层

和你一起终身学习&#xff0c;这里是程序员Android 经典好文推荐&#xff0c;通过阅读本文&#xff0c;您将收获以下知识点: 一、概览二、Camera HIDL 接口三 、Camera Provider 主程序四、Camera HAL3 接口 一、概览 始于谷歌的Treble开源项目&#xff0c;基于接口与实现的分离…

【AUTOSAR】UDS协议的代码分析与解读(二)----ECU诊断协议概述

UDSO诊断服务技术规范 1 范围 本规范规定了增强型诊断需求的诊断服务部分的内容&#xff0c;定义了通用电子系统需遵循的UDS通用执行 规则。 本规范适用于集团x事业部所有平台车型&#xff0c;所有电子控 制单元(ECU) 的诊断需求&#xff0c;均需按此规范执行。 本规范定义的…

Python神经网络编程学习笔记

文章目录 神经网络基本原理线性分类器学习率一个线性分类器的局限性逻辑AND、逻辑OR逻辑XOR 神经元sigmoid function的logistic function(逻辑函数) 多层神经元演示只有两层&#xff0c;每层两个神经元的神经网络的工作矩阵大法(点乘)使用矩阵乘法的三层神经网络示例反向传播误…