FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析

news2024/11/15 16:00:53

FlinkSQL处理如下实时数据需求:
实时聚合不同 类型/账号/发布时间 的各个指标数据,比如:初始化/初始化后删除/初始化后取消/推送/成功/失败 的指标数据。要求实时产出指标数据,数据源是mysql cdc binlog数据。

代码实例

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

CREATE TABLE kafka_table (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
     publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
     msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
     send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])
     --event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
     --WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't1',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
   --  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交
  'format' = 'json'
);



CREATE TABLE es_sink(
     send_type      STRING
    ,account_id     STRING
    ,publish_time   STRING
    ,grouping_id       INTEGER
    ,init           INTEGER
    ,init_cancel    INTEGER
    ,push          INTEGER
    ,succ           INTEGER
    ,fail           INTEGER
    ,init_delete    INTEGER
    ,update_time    STRING
    ,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_sink',
    'document-type' = 'es_sink',
    'hosts' = 'http://xxx:9200',
    'format' = 'json',
    'filter.null-value'='true',
    'sink.bulk-flush.max-actions' = '1000',
    'sink.bulk-flush.max-size' = '10mb'
);

CREATE view  tmp as
select
    send_type,
    account_id,
    publish_time,
    msg_status,
    case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,
    case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,
    case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,
    case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,
    case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,
    case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,
    event_time,
    opt,
    ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');


--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6

CREATE view  tmp_groupby as
select
 COALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1
         when send_type is not null and account_id is null and publish_time is null then 2
         when send_type is not null and account_id is not null and publish_time is null then 3
         when send_type is not null and account_id is not null and publish_time is not null then 4
         end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上

INSERT INTO es_sink
select
     send_type
    ,account_id
    ,publish_time
    ,grouping_id
    ,init
    ,init_cancel
    ,push
    ,succ
    ,fail
    ,init_delete
    ,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

其他配置

  • flink集群参数
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.localdir: /export/io_tmp_dirs/rocksdb
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
rest.flamegraph.enabled: true
pipeline.operator-chaining: false
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.network.min: 128 mb
taskmanager.memory.network.max: 128 mb
taskmanager.memory.framework.off-heap.size: 32mb
taskmanager.memory.task.off-heap.size: 32mb
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.fraction: 0.03
  • 检查点配置
    在这里插入图片描述

  • job运行资源
    管理节点(JM) 1 个, 节点规格 1 核 4 GB内存, 磁盘 10Gi
    运行节点(TM)10 个, 节点规格 1 核 4 GB内存, 磁盘 80Gi
    单TM槽位数(Slot): 1
    默认并行度:8

  • es mapping

#POST app_cust_syyy_private_domain_syyy_group_msg/app_cust_syyy_private_domain_syyy_group_msg/_mapping
{
    "app_cust_syyy_private_domain_syyy_group_msg": {
        "properties": {
            "send_type": {
                "type": "keyword",
                "ignore_above": 256
            },
            "account_id": {
                "type": "keyword"
            },
           "publish_time": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
           			"ignore_malformed":"true" # 忽略错误的各式
           		}
           	}
           },
            "grouping_id": {
                "type": "integer"
            },
            "init": {
                "type": "integer"
            },
            "init_cancel": {
                "type": "integer"
            },
            "query": {
                "type": "integer"
            },
            "succ": {
                "type": "integer"
            },
            "fail": {
                "type": "integer"
            },
            "init_delete": {
                "type": "integer"
            },
            "update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

性能调优

是否开启【MiniBatch 聚合】和【Local-Global 聚合】对分组聚合场景影响巨大,尤其是在数据量大的场景下。

  • 如果未开启,在分组聚合,数据更新状态时,每条数据都会触发聚合运算,进而更新StateBackend (尤其是对于 RocksDB StateBackend,火焰图上反映就是一直在update rocksdb),造成上游算子背压特别大。此外,生产中非常常见的数据倾斜会使这个问题恶化,并且容易导致 job 发生反压。
    在这里插入图片描述

  • 在开启【MiniBatch 聚合】和【Local-Global 聚合】后,配置如下:

--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

开启配置好会在DAG上添加两个环节MiniBatchAssignerLocalGroupAggregate
在这里插入图片描述

对结果的影响

开启了【MiniBatch 聚合】和【Local-Global 聚合】后,一天处理不完的数据,在10分钟内处理完毕

输出结果

在这里插入图片描述在这里插入图片描述

参考:
Group Aggregation
Streaming Aggregation Performance Tuning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1391154.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【GCC】6 接收端实现:周期构造RTCP反馈包

基于m98代码。GCC涉及的代码,可能位于:webrtc/modules/remote_bitrate_estimator webrtc/modules/congestion_controller webrtc/modules/rtp_rtcp/source/rtcp_packet/transport_feedback.cc webrtc 之 RemoteEstimatorProxy 对 remote_bitrate_estimator 的 RemoteEstimato…

java注释详解

1、Java 中的注释详解 概括&#xff1a;注释是增加一些说明&#xff0c;在编译后&#xff0c;注释会被抹掉&#xff0c;不起任何租用&#xff0c;只在书写代码的时候&#xff0c;对代码进行的一个说明 不管是那种编程语言&#xff0c; 代码的注释都是必备的语法功能&#xff…

初识 Elasticsearch 应用知识,一文读懂 Elasticsearch 知识文集(5)

&#x1f3c6;作者简介&#xff0c;普修罗双战士&#xff0c;一直追求不断学习和成长&#xff0c;在技术的道路上持续探索和实践。 &#x1f3c6;多年互联网行业从业经验&#xff0c;历任核心研发工程师&#xff0c;项目技术负责人。 &#x1f389;欢迎 &#x1f44d;点赞✍评论…

GitLab Runner 实现项目 CI/CD 发布

Gitlab Runner简介 Gitlab实现CICD的方式有很多&#xff0c;比如通过Jenkins&#xff0c;通过Gitlab Runner等&#xff0c;今天主要介绍后者。Gitlab在安装的时候&#xff0c;就默认包含了Gitlab CI的能力&#xff0c;但是该能力只是用于协调作业&#xff0c;并不能真的去执行…

Vulnhub-tr0ll-1

一、信息收集 端口收集 PORT STATE SERVICE VERSION 21/tcp open ftp vsftpd 3.0.2 | ftp-anon: Anonymous FTP login allowed (FTP code 230) |_-rwxrwxrwx 1 1000 0 8068 Aug 09 2014 lol.pcap [NSE: writeable] | ftp-syst: | STAT: | FTP …

STM32F103标准外设库——GPIO 输入、输出 (五)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;V…

Qt第二周周二作业

代码&#xff1a; widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);~Widget();void paintEvent(…

苹果MAC怎么清理内存?苹果MAC清理内存的方法

很多使用苹果电脑的用户都喜欢在同时运行多个软件&#xff0c;不过这样会导致在运行一些大型软件的时候出现不必要的卡顿现象&#xff0c;这时候我们就可以去清理下内存&#xff0c;不过很多人可能并不知道正确的清内存方式&#xff0c;下面就和小编一起来看看吧。 苹果MAC清理…

微信小程序防止截屏录屏

一、使用css添加水印 使用微信小程序原生的view和css给屏幕添加水印这样可以防止用户将小程序内的隐私数据进行截图或者录屏分享导致信息泄露&#xff0c;给小程序添加一个水印浮层。这样即使被截图或者拍照&#xff0c;也能轻松地确定泄露的源头。效果图如下&#xff1a; 代码…

从云计算到物联网:虚拟化技术的演变与嵌入式系统的融合

文章目录 一、硬件性能提升&#xff1a;摩尔定律与嵌入式虚拟化二、CPU多核技术&#xff1a;为嵌入式虚拟化提供支持三、业务负载整合&#xff1a;嵌入式虚拟化的核心需求四、降低硬件成本&#xff1a;虚拟化技术的经济效益五、软件重用与移植&#xff1a;虚拟化技术的优势六、…

计算机网络-甘晴void学习笔记

计算机网络 计科210X 甘晴void 202108010XXX 文章目录 计算机网络期中复习1计算机网络和因特网1.1 因特网1.2 网络边缘1.3 网络核心1.4 分组交换的时延/丢包和吞吐量1.5 协议层次与服务模型 2 应用层原理2.1 应用层协议原理2.2 Web和Http2.3 因特网中的电子邮件2.4 DNS&#x…

每日一练:LeeCode-107、199、637、429、515、116、117题-层序遍历模版通解 【二叉树】

本文是力扣LeeCode-107、199、637、429、515、116、117题的层序遍历模版通解 学习与理解过程&#xff0c;本文仅做学习之用&#xff0c;对本题感兴趣的小伙伴可以出门左拐LeeCode。 以下题目都是套用层序遍历的模版解决的: 大家可以去参考我的 每日一练&#xff1a;LeeCode-10…

接口防刷方案

1、前言 本文为描述通过Interceptor以及Redis实现接口访问防刷Demo 2、原理 通过ip地址uri拼接用以作为访问者访问接口区分 通过在Interceptor中拦截请求&#xff0c;从Redis中统计用户访问接口次数从而达到接口防刷目的 如下图所示 3、案例工程 项目地址&#xff1a; htt…

【Linux】Linux 系统编程——which 命令

文章目录 1.命令概述2.命令格式3.常用选项4.相关描述5.参考示例 1.命令概述 which 命令用于定位执行文件的路径。当输入一个命令时&#xff0c;which 会在环境变量 PATH 所指定的路径中搜索每个目录&#xff0c;以查找指定的可执行文件。 2.命令格式 which [选项] 命令名3.常…

STM32——DMA知识点及实战总结

1.DMA概念介绍 DMA&#xff0c;全称Direct Memory Access&#xff0c;即直接存储器访问。 DMA传输 将数据从一个地址空间复制到另一个地址空间。 注意&#xff1a;DMA传输无需CPU直接控制传输 2.DMA框图 3.DMA处理过程 外设的 8 个请求独立连接到每个通道&#xff0c;由 DMA_…

推挽输出、开漏输出、上拉输入、下拉输入、浮空输入。

一、推挽输出 推挽输出的内部电路大概如上图中黄色部分&#xff0c;输出控制内有反相器&#xff0c;由一个P-MOS和一个N-MOS组合而成&#xff0c;同一时间只有一个管子能够进行导通。 当写入1时&#xff0c;经过反向器后为0&#xff0c;P-MOS导通&#xff0c;N-MOS截至&#xf…

kafka简单介绍和代码示例

“这是一篇理论文章&#xff0c;给大家讲一讲kafka” 简介 在大数据领域开发者常常会听到MQ这个术语&#xff0c;该术语便是消息队列的意思&#xff0c; Kafka是分布式的发布—订阅消息系统。它最初由LinkedIn(领英)公司发布&#xff0c;使用Scala语言编写&#xff0c;与2010年…

gin-vue-admin二开使用雪花算法生成唯一标识 id

场景介绍 需求场景&#xff1a; 总部采集分支的数据&#xff0c;由于分支的 id 是子增的主键 id&#xff0c;所以会出现重复的 id&#xff0c;但是这个 id 需要作为标识&#xff0c;没有实际作用&#xff0c;这里选择的是分布式 id 雪花算法生成 id 存储用来标识&#xff0c;这…

交通流量预测:T-GCN A Temporal Graph Convolutional Network for Traffic Prediction

摘要 为了同时捕捉时空相关性&#xff0c;将图卷积网络(GCN)和门控递归单元(GRU)相结合&#xff0c;提出了一种新的基于神经网络的流量预测方法–时态图卷积网络(T-GCN)模型。具体地&#xff0c;GCN用于学习复杂的拓扑结构以捕获空间相关性&#xff0c;而门控递归单元用于学习…

20240112-补 制作两个字符串字谜的最少步骤数

题目要求 给你两个长度相同的字符串 s 和 t。在一个步骤中&#xff0c;你可以选择 t 中的任意一个字符并用另一个字符替换它。 返回将 t 变为 s 的变位所需的最少步数。 字符串的 "字谜 "是指字符串中的相同字符具有不同&#xff08;或相同&#xff09;的排列顺序…