基于Flink SQL的实时指标多维分析模型

news2025/4/21 4:42:40

数据流程介绍

1.创建源表kafka接入消息队列数据,定义字段映射规则;
2.创建目标表es_sink配置Elasticsearch输出;
3.通过多级视图(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)实现数据清洗、去重、状态计算;
4.使用ROLLUP进行多维聚合统计;
5.最终计算结果写入ES,包含成功率等衍生指标。
在这里插入图片描述

Flink SQL 逻辑

SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
-- 单位:ms, 10天
--SET table.exec.state.ttl = 864000000

CREATE TABLE kafkaTable (
       mid bigint,
       db string,
       sch string,
       tab string,
       opt string,
       ts bigint,
       ddl string,
       err string,
       src map<string,string>,
       cur map<string,string>,
       cus map<string,string>,
       id AS IF(cur['id'] IS NOT NULL , cur['id'], src ['id']),
       task_id AS IF(cur['task_id'] IS NOT NULL , cur['task_id'], src ['task_id']),
       account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
       publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
       msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
       send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type']),
       retry_status AS IF(cur['retry_status'] IS NOT NULL , cur['retry_status'], src ['retry_status']),
       update_time as IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']),
       event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
       proctime AS PROCTIME()
--                           WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH (
      'connector' = 'kafka',
      'topic' = 'xxx',
      'jdq.client.id' = 'xxx',
      'jdq.password' = 'xxx',
      'jdq.domain' = 'xxx',
      'scan.startup.mode' = 'group-offsets', --  default: group-offsets,other: latest-offset,earliest-offset
      --  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交
      'format' = 'binlog'
      );


CREATE TABLE es_sink(
     send_type      STRING
    ,task_id        STRING
    ,month_dim      STRING
    ,day_dim        STRING
    ,grouping_id    INTEGER
    ,init           INTEGER
    ,cancel         INTEGER
    ,succ           INTEGER
    ,fail           INTEGER
    ,cancel_rate    float
    ,succ_rate      float
    ,fail_rate      float
    ,update_date    STRING
    ,PRIMARY KEY (grouping_id,send_type,month_dim,day_dim,task_id) NOT ENFORCED
)
    with (
        'connector' = 'elasticsearch-6',
        'index' = 'index01',
        'document-type' = 'type01',
        'hosts' = 'xx',
        'format' = 'json',
        'filter.null-value'='true',
        'sink.bulk-flush.max-actions' = '1000',
        'sink.bulk-flush.max-size' = '10mb'
        );
-- 维度:
--   - send_type, 发送类型
--   - month_dim,月份维度
--   - day_dim,天维度
--   - task_id,任务ID

CREATE view  tmp as
select
    send_type,
    task_id,
    publish_time,
    msg_status,
    case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,
    case when UPPER(opt) = 'UPDATE' and msg_status='4' then 1 else 0 end AS cancel,
    case when UPPER(opt) = 'UPDATE' and msg_status='1' then 1 else 0 end AS succ,
    case when UPPER(opt) = 'UPDATE' and msg_status='2' then 1 else 0 end AS fail,
    update_time,
    opt,
    ts,
    id,
    proctime,
    SUBSTRING(publish_time,1,7) as month_dim,
    SUBSTRING(publish_time,1,10) as day_dim
FROM kafkaTable
where trim(retry_status) = '0'
  and publish_time >= '2025-01-01 00:00:00'
  and
    (    (UPPER(opt) = 'INSERT' and msg_status='0' and position( '_R' in task_id) = 0)
        or   (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4') and position( '_R' in task_id) = 0)
        or   (UPPER(opt) = 'UPDATE' and msg_status='1' and position( '_R' in task_id) > 0)
        );

--去重模式,去重是指对在列的集合内重复的行进行删除,只保留第一行或最后一行数据。在聚合sum或count时,Flink回撤流会对数据进行回撤处理
create view tmp_dedup as
select * from
    (
        select *,
               row_number() over(partition by id,msg_status order by proctime desc) as rn
        from tmp
    ) t
where rn=1;

CREATE view tmp1 as
select
    send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,init
     ,case when cancel = 1 and update_time <= publish_time then 1 else 0 end AS cancel
     ,succ
     ,case when cancel = 1 and update_time > publish_time then 1 else fail end AS fail
     ,update_time
from tmp_dedup
where position( '_R' in task_id) = 0;

CREATE view tmp2 as
select
    send_type
     ,SPLIT_INDEX(task_id,'_R',0) AS task_id
     ,month_dim
     ,day_dim
     ,init
     ,cancel
     ,succ
     ,-1 AS fail
     ,update_time
from tmp_dedup
where position( '_R' in task_id) > 0
and   succ = 1 ;

CREATE view tmp3 as
select
      send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,init
     ,cancel
     ,succ
     ,fail
from tmp1
UNION ALL
select
    send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,init
     ,cancel
     ,succ
     ,fail
from tmp2;


CREATE view  tmp_groupby as
select
--/*+ STATE_TTL('tmp' = '10d') */
    COALESCE(send_type,'N') AS send_type
     ,COALESCE(month_dim,'N') AS month_dim
     ,COALESCE(day_dim,'N') AS day_dim
     ,COALESCE(task_id,'N') AS task_id
     ,case when send_type is null and month_dim is null and day_dim is null and task_id is null then 1
           when send_type is not null and month_dim is null and day_dim is null and task_id is null then 2
           when send_type is not null and month_dim is not null and day_dim is null and task_id is null then 3
           when send_type is not null and month_dim is not null and day_dim is not null and task_id is null then 4
           when send_type is not null and month_dim is not null and day_dim is not null and task_id is not null then 5
    end grouping_id
     ,sum(init) as init
     ,sum(cancel) as cancel
     ,sum(succ) as succ
     ,sum(fail) as fail
from tmp3
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,month_dim,day_dim,task_id); --等同于以上

INSERT INTO es_sink
select
    case when trim(send_type) = '1'  then '发送类型1'
         when trim(send_type) = '2'  then '发送类型2'
         else send_type end AS send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,grouping_id
     ,init
     ,cancel
     ,succ
     ,fail
     ,ROUND(cancel*100.0/init,2) AS cancel_rate
     ,ROUND(succ*100.0/(init - cancel),2) AS succ_rate
     ,ROUND(fail*100.0/(init - cancel),2) AS fail_rate
     ,CAST(LOCALTIMESTAMP AS STRING) as update_date
from tmp_groupby
where init > 0
and (init - cancel) > 0;

es mapping

#POST index01/type01/_mapping
{
    "type01": {
        "properties": {
            "grouping_id": {
                "type": "byte"
            },
            "send_type": {
                "type": "keyword",
                "ignore_above": 256
            },
           "month_dim": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM",
           			"ignore_malformed":"true" --忽略错误的各式
           		}
           	}
           },
            "day_dim": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM-dd",
           			"ignore_malformed":"true"
           		}
           	}
           },
            "task_id": {
                "type": "keyword"
            },
            "init": {
                "type": "integer"
            },
            "cancel": {
                "type": "integer"
            },
            "succ": {
                "type": "integer"
            },
            "fail": {
                "type": "integer"
            },
            "cancel_rate": {
                "type": "float"
            },
            "succ_rate": {
                "type": "float"
            },
            "fail_rate": {
                "type": "float"
            },
            "update_date": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2314860.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【从零开始学习计算机科学】数据库系统(二)关系数据库 与 关系代数

【从零开始学习计算机科学】数据库系统(二)关系数据库 与 关系代数 关系数据库结构化查询语言SQL数据定义语言(DDL)数据查询语言(Data Query Language, DQL)数据操纵语言(Data Manipulation Language, DML)数据控制语言(Data Control Language, DCL)关系型数据库的优…

Linux驱动开发实战(四):设备树点RGB灯

Linux驱动开发实战&#xff08;四&#xff09;&#xff1a;设备树点RGB灯 文章目录 Linux驱动开发实战&#xff08;四&#xff09;&#xff1a;设备树点RGB灯前言一、驱动实现1.1 驱动设计思路1.2 关键数据结构1.3 字符设备操作函数1.4 平台驱动探测函数1.5 匹配表和平台驱动结…

vue中,watch里,this为undefined的两种解决办法

提示&#xff1a;vue中&#xff0c;watch里&#xff0c;this为undefined的两种解决办法 文章目录 [TOC](文章目录) 前言一、问题二、方法1——使用function函数代替箭头函数()>{}三、方法2——使用that总结 前言 ‌‌‌‌‌尽量使用方法1——使用function函数代替箭头函数()…

设计模式C++

针对一些经典的常见的场景, 给定了一些对应的解决方案&#xff0c;这个就叫设计模式。 设计模式的作用&#xff1a;使代码的可重用性高&#xff0c;可读性强&#xff0c;灵活性好&#xff0c;可维护性强。 设计原则&#xff1a; 单一职责原则&#xff1a;一个类只做一方面的…

前端构建工具进化论:从Grunt到Turbopack的十年征程

前端构建工具进化论&#xff1a;从Grunt到Turbopack的十年征程 一、石器时代&#xff1a;任务自动化工具&#xff08;2012-2014&#xff09; 1.1 Grunt&#xff1a;首个主流构建工具 // Gruntfile.js 典型配置 module.exports function(grunt) {grunt.initConfig({concat: {…

设备预测性维护:企业降本增效的关键密码​

在当今竞争激烈的商业战场中&#xff0c;企业犹如一艘在波涛汹涌大海上航行的巨轮&#xff0c;要想乘风破浪、稳步前行&#xff0c;降本增效便是那至关重要的 “船锚”&#xff0c;帮助企业在复杂的市场环境中站稳脚跟。而设备预测性维护&#xff0c;正是开启企业降本增效大门的…

css基本功

为什么 ::first-letter 是伪元素&#xff1f; ::first-letter 的作用是选择并样式化元素的第一个字母&#xff0c;它创建了一个虚拟的元素来包裹这个字母&#xff0c;因此属于伪元素。 grid布局 案例一 <!DOCTYPE html> <html lang"zh-CN"><head&…

信号处理抽取多项滤波的数学推导与仿真

昨天的《信号处理之插值、抽取与多项滤波》&#xff0c;已经介绍了插值抽取的多项滤率&#xff0c;今天详细介绍多项滤波的数学推导&#xff0c;并附上实战仿真代码。 一、数学变换推导 1. 多相分解的核心思想 将FIR滤波器的系数 h ( n ) h(n) h(n)按相位分组&#xff0c;每…

C++双端队列知识点+习题

在C中&#xff0c;双端队列&#xff08;Deque&#xff0c;发音为“deck”&#xff09;是标准模板库&#xff08;STL&#xff09;中的一种容器适配器&#xff0c;其全称为Double-Ended Queue。它结合了队列和栈的特点&#xff0c;允许在容器的两端&#xff08;前端和后端&#x…

【递归、搜索和回溯算法】专题二 :二叉树中的深搜

二叉树中的深搜 深度优先遍历&#xff08;DFS&#xff09;&#xff1a;一种沿着树或图的深度遍历节点的算法&#xff0c;尽可能深地搜索树或图的分支&#xff0c;如果一条路径上的所有结点都被遍历完毕&#xff0c;就会回溯到上一层&#xff0c;继续找一条路遍历。 在二叉树中…

【vue3学习笔记】(第150-151节)computed计算属性;watch监视ref定义的数据

尚硅谷Vue2.0Vue3.0全套教程丨vuejs从入门到精通 本篇内容对应课程第150-151节 课程 P150节 《computed计算属性》笔记 写一个简单的 姓、名输入框效果&#xff1a; 用vue2的形式定义一个计算属性 fullName&#xff1a; 测试页面展示无问题&#xff1a; 但是&#xff0c;在vue…

MySQL 8 设置允许远程连接(Windows环境)

&#x1f31f; MySQL 8 设置允许远程连接&#xff08;Windows环境&#xff09; 在开发和部署应用时&#xff0c;经常需要从远程主机连接到MySQL数据库。默认情况下&#xff0c;MySQL仅允许本地连接&#xff0c;因此需要进行一些配置才能允许远程访问。今天&#xff0c;我将详细…

我又又又又又又更新了~~纯手工编写C++画图,有注释~~~

再再再次感谢Ttcofee提的问题 本次更新内容&#xff1a; 鼠标图案&#xff08;切换&#xff09;&#xff0c;版本号获取&#xff0c;输入框复制剪切板 提前申明&#xff1a;如果运行不了&#xff0c;请到主页查看RedpandaDevc下载&#xff0c;若还是不行就卸了重装。 版本号&…

全面解析:将采购入库单数据集成到MySQL的技术实施

旺店通旗舰版-采购入库单集成到MySQL的技术案例分享 在数据驱动的业务环境中&#xff0c;如何高效、准确地实现系统间的数据对接是企业面临的重要挑战。本文将聚焦于一个具体的系统对接集成案例&#xff1a;将旺店通旗舰奇门平台上的采购入库单数据集成到MySQL数据库中&#x…

12. Pandas :使用pandas读Excel文件的常用方法

一 read_excel 函数 其他参数根据实际需要进行查找。 1.接受一个工作表 在 11 案例用到的 Excel 工作簿中&#xff0c;数据是从第一张工作表的 A1 单元格开始的。但在实际场景中&#xff0c; Excel 文件可能并没有这么规整。所以 panda 提供了一些参数来优化读取过程。 比如 s…

记录致远OA服务器硬盘升级过程

前言 日常使用中OA系统突然卡死&#xff0c;刷新访问进不去系统&#xff0c;ping服务器地址正常&#xff0c;立马登录服务器检查&#xff0c;一看磁盘爆了。 我大脑直接萎缩了&#xff0c;谁家OA系统配400G的空间啊&#xff0c;过我手的服务器没有50也是30台&#xff0c;还是…

Java网络多线程

网络相关概念: 关于访问: IP端口 因为一个主机上可能有多个服务, 一个服务监听一个端口,当你访问的时候主机通过端口号就能知道要和哪个端口发生通讯.因此一个主机上不能有两个及以上的服务监听同一个端口. 协议简单来说就是数据的组织形式 好像是两个人交流一样,要保证自己说…

VScode 运行LVGL

下载vscode解压 环境安装 安装mingw64&#xff0c;gcc 版本必须8.3以上 安装cmak 系统环境变量Path中添加&#xff08;以实际安装目录为准&#xff09; C:\Program Files\mingw64\bin C:\Program Files\CMake\bin 将GUI-Guider生成的代码目录拷贝一份放到vscode项目目录…

React Next项目中导入Echart世界航线图 并配置中文

公司业务要求做世界航线图&#xff0c;跑了三个ai未果&#xff0c;主要是引入world.json失败&#xff0c;echart包中并不携带该文件&#xff0c;源码的world.json文件页面404找不到。需要自己寻找。这是整个问题卡壳的关键点&#xff0c;特此贴出资源网址。 目录 一、安装 二…

QT与网页显示数据公式的方法

一.网页中显示数学公式通常有三种主要方法 1.图片方式 原理&#xff1a;将公式转换为图片&#xff08;如 PNG、SVG&#xff09;&#xff0c;通过 <img> 标签嵌入网页。 实现步骤&#xff1a; 使用工具&#xff08;如 LaTeX dvipng、在线生成工具&#xff09;将公式渲…