数仓开发之DWS层(二)

news2025/1/21 0:50:43

目录

五:用户域用户注册各窗口汇总表

5.1 主要任务

5.2 思路分析

5.3 图解

5.4 ClickHouse建表语句

六:交易域加购各窗口汇总表

6.1 主要任务

6.2 思路分析

6.3 图解

6.4 ClickHouse建表语句

七:交易域支付各窗口汇总表

7.1 主要任务

7.2 思路分析

7.3 图解

7.4 ClickHouse建表语句

八:交易域下单各窗口汇总表

8.1 主要任务

8.2 思路分析

8.3 图解

8.4 ClickHouse建表语句


五:用户域用户注册各窗口汇总表

5.1 主要任务

从 DWD 层用户注册表中读取数据,统计各窗口注册用户数,写入 ClickHouse。

5.2 思路分析

1)读取 Kafka 用户注册主题数据

2)转换数据结构

String 转换为 JSONObject。

3)设置水位线

4)开窗、聚合

5)写入 ClickHouse

5.3 图解

 

5.4 ClickHouse建表语句

drop table if exists dws_user_user_register_window;
create table if not exists dws_user_user_register_window
(
    stt         DateTime,
    edt         DateTime,
    register_ct UInt64,
    ts          UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

六:交易域加购各窗口汇总表

6.1 主要任务

从 Kafka 读取用户加购明细数据,统计每日各窗口加购独立用户数,写入 ClickHouse。

6.2 思路分析

1)从 Kafka 加购明细主题读取数据

2)转换数据结构

将流中数据由 String 转换为 JSONObject。

3)设置水位线

4)按照用户 id 分组

5)过滤独立用户加购记录

运用 Flink 状态编程,将用户末次加购日期维护到状态中。

如果末次登陆日期为 null 或者不等于当天日期,则保留数据并更新状态,否则丢弃,不做操作。

6)开窗、聚合

统计窗口中数据条数即为加购独立用户数,补充窗口起始时间、关闭时间,将时间戳字段置为当前系统时间,发送到下游。

7)将数据写入 ClickHouse。

6.3 图解

 

6.4 ClickHouse建表语句

drop table if exists dws_trade_cart_add_uu_window;
create table if not exists dws_trade_cart_add_uu_window
(
    stt            DateTime,
    edt            DateTime,
    cart_add_uu_ct UInt64,
    ts             UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

七:交易域支付各窗口汇总表

7.1 主要任务

从 Kafka 读取交易域支付成功主题数据,统计支付成功独立用户数和首次支付成功用户数。

7.2 思路分析

我们在 DWD 层提到,订单明细表数据生成过程中会形成回撤流。left join 生成的数据集中,相同唯一键的数据可能会有多条。上文已有讲解,不再赘述。回撤数据在 Kafka 中以 null 值的形式存在,只需要简单判断即可过滤。我们需要考虑的是如何对其余数据去重。

对回撤流数据生成过程进行分析,可以发现,字段内容完整数据的生成一定晚于不完整数据的生成,要确保统计结果的正确性,我们应保留字段内容最全的数据,基于以上论述,内容最全的数据生成时间最晚。要想通过时间筛选这部分数据,首先要获取数据生成时间。

1)知识储备

FlinkSQL 提供了几个可以获取当前时间戳的函数

    • localtimestamp:返回本地时区的当前时间戳,返回类型为 TIMESTAMP(3)。在流处理模式下会对每条记录计算一次时间。而在批处理模式下,仅在查询开始时计算一次时间,所有数据使用相同的时间。
    • current_timestamp:返回本地时区的当前时间戳,返回类型为 TIMESTAMP_LTZ(3)。在流处理模式下会对每条记录计算一次时间。而在批处理模式下,仅在查询开始时计算一次时间,所有数据使用相同的时间。
    • now():与 current_timestamp 相同。
    • current_row_timestamp():返回本地时区的当前时间戳,返回类型为 TIMESTAMP_LTZ(3)。无论在流处理模式还是批处理模式下,都会对每行数据计算一次时间。

函数测试。查询语句如下。

tableEnv.sqlQuery("select localtimestamp," +
                "current_timestamp," +
                "now()," +
                "current_row_timestamp()")
                .execute()
                .print();

查询结果如下。

+----+-------------------------+-------------------------+-------------------------+-------------------------+
| op |          localtimestamp |       current_timestamp |                  EXPR$2 |                  EXPR$3 |
+----+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 2022-04-13 20:42:28.529 | 2022-04-13 20:42:28.529 | 2022-04-13 20:42:28.529 | 2022-04-13 20:42:28.529Z |
+----+-------------------------+-------------------------+-------------------------+-------------------------+

动态表属于流处理模式,所以四种函数任选其一即可。此处选择 current_row_timestamp()。

2)时间比较工具类

动态表中获取的数据生成时间精确到毫秒,前文提供的日期格式化工具类无法实现此类日期字符串向时间戳的转化,也就不能通过直接转化为时间戳的方式比较两条数据的生成时间。因此,单独封装工具类用于比较 TIME_STAMP(3) 类型的时间。比较逻辑是将时间拆分成两部分:小数点之前和小数点之后的。小数点之前的日期格式为 yyyy-MM-dd HH:mm:ss,这部分可以直接转化为时间戳比较,如果这部分时间相同,再比较小数点后面的部分,将小数点后面的部分转换为整型比较,从而实现 TIME_STAMP(3) 类型时间的比较。

3)去重思路

获取了数据生成时间,接下来要考虑的问题就是如何获取生成时间最晚的数据。此处提供两种思路。

(1)按照唯一键分组,开窗,在窗口闭合前比较窗口中所有数据的时间,将生成时间最晚的数据发送到下游,其它数据舍弃。

(2)按照唯一键分组,对于每一个唯一键,维护状态和定时器,当状态中数据为 null 时注册定时器,把数据维护到状态中。此后每来一条数据都比较它与状态中数据的生成时间,状态中只保留生成最晚的数据。如果两条数据生成时间相同(系统时间精度不足),则保留后进入算子的数据。因为我们的 Flink 程序并行度和 Kafka 分区数相同,可以保证数据有序,后来的数据就是最新的数据。

两种方案都可行,此处选择方案二。

本节的数据来源于 Kafka dwd_trade_pay_detail_suc 主题,后者的数据由 payment_info、dwd_trade_order_detail、base_dic 三张表通过内连接关联获得,这一过程不会产生重复数据,因此,该表的重复数据由订单明细表决定。而 dwd_trade_order_detail 表的数据来源于 dwd_trade_order_pre_process,后者数据生成过程中使用了 left join,因此包含 null 数据和重复数据。订单明细表读取数据使用的 Kafka Connector 会过滤掉 null 数据,程序内只做了过滤没有去重,因此该表不存在 null 数据,但对于相同唯一键 order_detail_id 存在重复数据。综上,支付成功明细表存在唯一键 order_detail_id 相同的数据,但不存在 null 数据,因此仅须去重。

4)实现步骤

(1)从 Kafka 支付成功明细主题读取数据

(2)转换数据结构

String 转换为 JSONObject。

(3)按照唯一键分组

(4)去重

与前文同理。

(5)设置水位线,按照 user_id 分组

(6)统计独立支付人数和新增支付人数

运用 Flink 状态编程,在状态中维护用户末次支付日期。

若末次支付日期为 null,则将首次支付用户数和支付独立用户数均置为 1;否则首次支付用户数置为 0,判断末次支付日期是否为当日,如果不是当日则支付独立用户数置为 1,否则置为 0。最后将状态中的支付日期更新为当日。

(7)开窗、聚合

度量字段求和,补充窗口起始时间和结束时间字段,ts 字段置为当前系统时间戳。

(8)写出到 ClickHouse

7.3 图解

 

7.4 ClickHouse建表语句

drop table if exists dws_trade_payment_suc_window;
create table if not exists dws_trade_payment_suc_window
(
    stt                           DateTime,
    edt                           DateTime,
    payment_suc_unique_user_count UInt64,
    payment_new_user_count        UInt64,
    ts                            UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

八:交易域下单各窗口汇总表

8.1 主要任务

从 Kafka 订单明细主题读取数据,对数据去重,统计当日下单独立用户数新增下单用户数,封装为实体类,写入 ClickHouse。

8.2 思路分析

1)从 Kafka订单明细主题读取数据

2)转换数据结构

Kafka 订单明细主题的数据是通过 Kafka-Connector 从订单预处理主题读取后进行过滤获取的,Kafka-Connector 会过滤掉主题中的 null 数据,因此订单明细主题不存在为 null 的数据,直接转换数据结构即可。

3)按照 order_detail_id 分组

order_detail_id 为数据唯一键。

4)对 order_detail_id 相同的数据去重

按照上文提到的方案对数据去重。

5)设置水位线

6)按照用户 id 分组

7)计算度量字段的值

(1)当日下单独立用户数和新增下单用户数

运用 Flink 状态编程,在状态中维护用户末次下单日期。

若末次下单日期为 null,则将首次下单用户数和下单独立用户数均置为 1;否则首次下单用户数置为 0,判断末次下单日期是否为当日,如果不是当日则下单独立用户数置为 1,否则置为 0。最后将状态中的下单日期更新为当日。

(2)其余度量字段直接取流中数据的对应值即可。

8)开窗、聚合

度量字段求和,补充窗口起始时间和结束时间字段,ts 字段置为当前系统时间戳。

9)写出到 ClickHouse。

8.3 图解

 

8.4 ClickHouse建表语句

drop table if exists dws_trade_order_window;
create table if not exists dws_trade_order_window
(
    stt                          DateTime,
    edt                          DateTime,
    order_unique_user_count      UInt64,
    order_new_user_count         UInt64,
    order_activity_reduce_amount Decimal(38, 20),
    order_coupon_reduce_amount   Decimal(38, 20),
    order_origin_total_amount    Decimal(38, 20),
    ts                           UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/43872.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

AppAnalytics设备的分析服务

AppAnalytics设备的分析服务 EmbarcaderoAppAnalytics是一项针对移动、桌面和可穿戴设备的分析服务。它允许您跟踪和测量应用程序的使用频率、运行的平台、客户使用的功能、查找和记录崩溃等。它通过从最终用户获取匿名使用统计数据,帮助您了解用户行为。AppAnalyti…

应届生如何做好一份简历?

找工作是痛苦的,尤其是投简历的过程。 下面分享下自己最近投简历的一些感悟。 一定要避免的错误 在投简历的过程中一定要避免如下几个主要错误: 一份简历打天下就当前的经济形势,我相信大多数人找工作都是海投,但是在海投的过程中…

靠这一份面试文档,我花了2个通宵看完,最终拿到阿里offer

Java基础部分 请你描述JDK、JRE、JVM的关系!如果main方法被声明成private会怎样?&和&&的区别!char型变量中能否存储一个中文汉字,why?a、a的区别!ab、aab的区别!浮点型计算为什么会…

Web(六)CSS3语法-CSS样式规则

第1关&#xff1a;CSS基础知识 第2关&#xff1a;初识CSS 编程要求 请在右侧编辑器中的Begin - End区域内补充代码&#xff0c;具体要求是&#xff1a; 1.按照要求的效果在标签中运用CSS样式&#xff0c;编辑唐诗“静夜思” 2.标题文本“静夜思”采用<h1>标签作用&am…

【前端】HTML认知

一、基础认知 1.1基础概念铺垫&#xff08;了解&#xff09; 1.1.1认识网页&#xff08;了解&#xff09; 问题1&#xff1a;网页由哪些部分组成&#xff1f; 文字、图片、音频、视频、超链接 问题2&#xff1a;我们看到的网页背后本质是什么&#xff1f; 前端程序员写的…

(02)Cartographer源码无死角解析-(29) LocalTrajectoryBuilder2D::AddRangeData()→多雷达数据时间同步

讲解关于slam一系列文章汇总链接:史上最全slam从零开始&#xff0c;针对于本栏目讲解(02)Cartographer源码无死角解析-链接如下: (02)Cartographer源码无死角解析- (00)目录_最新无死角讲解&#xff1a;https://blog.csdn.net/weixin_43013761/article/details/127350885 文末…

uni-app 超详细教程(从菜鸟到大佬)

一&#xff0c;uni-app 介绍 &#xff1a; 官方网页 uni-app 是一个使用 Vue.js 开发所有前端应用的框架&#xff0c;开发者编写一套代码&#xff0c;可发布到iOS、Android、Web&#xff08;响应式&#xff09;、以及各种小程序&#xff08;微信/支付宝/百度/头条/飞书/QQ/快手…

基于51单片机温度火灾烟雾报警器程序仿真资料

资料编号&#xff1a;190 下面是该资料仿真演示视频&#xff1a; 190-基于51单片机温度火灾烟雾报警器(仿真源程序全套资料)功能介绍&#xff1a; 采用51单片机作为主控CPU&#xff0c;采用ds18b20来采集温度&#xff0c;采用MQ2来采集烟雾浓度&#xff0c;使用ADC0832来进行…

(十一)Java算法:计数排序(详细图解)

目录一、前言1.1、概念1.2、算法步骤二、maven依赖三、流程解析3.1、计数流程图3.2、计数数组变形3.3、排序过程四、编码实现一、前言 1.1、概念 计数排序&#xff1a;核心在于将输入的数据值转化为键存储在额外开辟的数组空间中。作为一种线性时间复杂度的排序&#xff0c;计…

[附源码]Python计算机毕业设计Django财务管理系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

DEJA_VU3D - Cesium功能集 之 087-完美状态栏组件

前言 编写这个专栏主要目的是对工作之中基于Cesium实现过的功能进行整合,有自己琢磨实现的,也有参考其他大神后整理实现的,初步算了算现在有差不多实现小130个左右的功能,后续也会不断的追加,所以暂时打算一周2-3更的样子来更新本专栏(尽可能把代码简洁一些)。博文内容…

MacBookPro M2芯片下如何搭建React-Native环境

MacBookPro M2芯片下如何搭建React-Native环境目录软件下载环境配置目录 写在最前&#xff1a;整体流程直接看的rn中文网的搭建开发环境&#xff1a;https://www.react-native.cn/docs/environment-setup 软件下载 1、xcode 2、android studio / vscode 环境配置 1、jdk1.8…

快速上手 TypeScript

快速上手 TypeScript ypeScript 简称 TS &#xff0c;既是一门新语言&#xff0c;也是 JS 的一个超集&#xff0c;它是在 JavaScript 的基础上增加了一套类型系统&#xff0c;它支持所有的 JS 语句&#xff0c;为工程化开发而生&#xff0c;最终在编译的时候去掉类型和特有的语…

Metabase学习教程:仪表盘-2

在仪表板中链接筛选器 链接仪表板中的筛选器&#xff0c;根据另一个筛选器的当前选择限制一个筛选器中可用的选项&#xff08;内容联动&#xff09;。 我们先用一个问题设置一个简单的仪表板。这里的目标是设置一个带有两个链接过滤器的仪表板。每个筛选器根据另一个筛选器的…

PIC单片机4——定时器方波

#include <p18cxxx.h>/*18F系列单片机头文件*/ void PIC18F_High_isr(void);/*中断服务函数声明*/ void PIC18F_Low_isr(void); #pragma code high_vector_section0x8 void high_vector (void) { _asm goto PIC18F_High_isr _endasm/*通过一条跳转指令(汇编指令&am…

设备树覆盖:概念与术语

前面我们讲过设备树相关的东西&#xff0c;其实你应该知道。 但是昨天一个FDT当时我还是有点懵。于是再在android的角度我们来看看这个东西。 1、概览 设备树 (DT) 是用于描述“不可发现”硬件的命名节点和属性构成的一种数据结构。 操作系统&#xff08;例如在 Android 中使…

MyBatis是如何为Dao接口创建实现类的

本文是我的MyBatis源码分析专栏中第三节的一小部分&#xff0c;作为试读部分&#xff0c;详细讲述了MyBatis是如何通过动态代理创建Dao接口的实现类的。 专栏地址&#xff1a;MyBatis源码分析 专栏字数:14w 专栏目录&#xff1a; 文章目录SqlSession.getMapper如何设计的&#…

MySQL----存储过程

目录 一、存储过程的介绍 二、存储过程的基本语法 三、变量 &#xff08;1&#xff09;系统变量 &#xff08;2&#xff09;用户自定义变量 &#xff08;3&#xff09;局部变量 四、存储过程的语法详解 &#xff08;1&#xff09;if判断 &#xff08;3&#xff09;条件…

数据要想管理得好,不得不提开源大数据处理解决方案

在很多企业里&#xff0c;内部数据的管理几乎是一团糟的。在大数据时代的环境中&#xff0c;不少企业急需要提升数据管理的效率&#xff0c;因此想通过一些有利途径来实现这一目的。开源大数据处理解决方案就是其中一个有效途径&#xff0c;是助力企业做好数据管理&#xff0c;…

07 ConfigMap/Secret:怎样配置、定制我的应用

文章目录1. ConfigMap/Secret 介绍1.1 为什么kubernets 要使用应用的配置管理&#xff1f;1.2 有什么类别的配置信息&#xff1f;2. 什么是 ConfigMap&#xff1f;2.1 创建ConfigMap模板文件2.1.1 ConfigMap 怎么生成带data 字段的 模板2.2 创建ConfigMap 对象2.3 查看ConfigMa…