Flink电商实时数仓(六)

news2025/2/8 21:21:24

交易域支付成功事务事实表

  1. 从topic_db业务数据中筛选支付成功的数据
  2. 从dwd_trade_order_detail主题中读取订单事实数据、LookUp字典表
  3. 关联三张表形成支付成功宽表
  4. 写入 Kafka 支付成功主题
执行步骤
  1. 设置ttl,通过Interval join实现左右流的状态管理
  2. 获取下单明细数据:用户必然要先下单才有可能支付成功,因此支付成功明细数据集必然是订单明细数据集的子集。要注意:Interval Join要求表中均为Append数据,即“只能新增,不能修改”,订单明细表数据生成过程中用到了left join,生成了回撤流,看似不满足Interval Join的条件。但是,回撤数据进入Kafka会以null值形式存在,如果用Kafka Connector将订单明细封装为动态表,null值会被过滤,最终得到的是相同主键存在重复数据的Append流(动态表本质上就是流),满足Interval Join的条件。
    • Interval join只支持事件时间,因此数据必须携带水位线;建表时水位线的相关语法为 water for order_time as order_time - interval '5' second,这里要求数据是timestamp(3)
    • 原有的时间数据类型是bigint类型的ts,使用row_time as TO_TIMESTAMP_LTZ(ts,3)这个函数即可将原有的时间数据转换为水位线所需的数据类型
  3. 筛选支付数据:
    • 支付状态为支付成功
    • 操作类型为update
  4. 构建 LookUp 字典表
  5. 联上述三张表形成支付成功宽表,写入 Kafka 支付成功主题

核心代码如下

 public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {
        //核心业务逻辑
        //1. 读取TopicDB主题数据
        createTopicDb(groupId,tableEnv);

        //2. 筛选支付成功的数据,从业务数据topic_db中
        filterPaymentTable(tableEnv);

        //3. 读取下单详情表数据, 从kafka读取数据
        createOrderDetailTable(tableEnv, groupId);

        //4. 创建base.dic字典表,从HBase维度数据中读取
        createBaseDic(tableEnv);

        //tableEnv.executeSql("select * from order_detail").print();
        //tableEnv.executeSql("select * from base_dic").print();

        //tableEnv.executeSql("select to_timestamp_ltz(ts,3) from order_detail");

        //5. 使用interval join 完成支付成功流和订单详情数据关联
        intervalJoin(tableEnv);

        //6. 使用lookup join完成维度退化
        Table resultTable = lookupJoin(tableEnv);

        //7. 创建upsert kafka连接器写出
        createKafkaSink(tableEnv);

        resultTable.insertInto(Constant.TOPIC_DWD_TRADE_ORDER_PAYMENT_SUCCESS).execute();

    }

事实表动态分流

在这里插入图片描述

dwd层其他的事实表都是从topic_db中去业务数据库一张表的变更数据,按照某些过滤后写入kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。有点类似我们前面实现DIM层的动态配置。

  1. 清洗过滤和转换:判断是否满足json格式,如果满足转换为jsonObj对象
  2. 读取配置表数据,使用flink-cdc读取
  3. 转换数据格式,转换到对应bean对象中
  4. 配置信息广播话,然后跟主流数据进行连接
  5. 筛选出需要的字段
  6. 根据表中的sink table字段来动态写出到对应的kafka主题中

核心代码如下

public static void main(String[] args) {
        new DwdBaseDb().start(10019, 4, "dwd_base_db", Constant.TOPIC_DB);
    }


    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
        //核心业务逻辑

        //1. 读取topic_db数据
        //stream.print();

        //2. 清洗过滤和转换, jsonObjStream是主流数据
        SingleOutputStreamOperator<JSONObject> jsonObjStream = filterJson(stream);

        //jsonObjStream.print();

        //3. 读取配置表数据,使用flink-cdc读取,读取配置文件时并发度最好为1
        DataStreamSource<String> tableProcessDwd = getTableProcessDwd(env);

        //tableProcessDwd.print();

        4. 转换数据格式 string -> TableProcessDwd -> broadcastStream,广播流数据
        SingleOutputStreamOperator<TableProcessDwd> processDwdStream = getProcessDwdStream(tableProcessDwd);

        MapStateDescriptor<String, TableProcessDwd> mapStateDescriptor = new MapStateDescriptor<>("process_state", String.class, TableProcessDwd.class);
        BroadcastStream<TableProcessDwd> broadcastStream = processDwdStream.broadcast(mapStateDescriptor);

        //5. 连接主流和广播流,对主流数据进行判断是否需要保留
        SingleOutputStreamOperator<Tuple2<JSONObject, TableProcessDwd>> processStream = processBaseDb(jsonObjStream, broadcastStream, mapStateDescriptor);

        //processStream.print();

        //6. 筛选最后需要写出的字段
        SingleOutputStreamOperator<JSONObject> dataStream = filterColumns(processStream);

        //7. 通过sink_table的表名来动态写出到对应kafka主题
        //在setRecordSerializer()设置
        dataStream.sinkTo(FlinkSinkUtil.getKafkaSinkWithTopicName());

    }

gitee地址 :https://gitee.com/langpaian/gmall2023-realtime

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1336007.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OGG-MySQL无法正常同步数据问题分析

问题背景: 用户通过OGG从源端一个MySQL从库将数据同步到目标端的另一个MySQL数据库里面&#xff0c;后面由于源端的从库出现了长时间的同步延时&#xff0c;由于延时差距过大最后选择通过重建从库方式进行了修复 从库重建之后&#xff0c;源端的OGG出现了报错ERROR OGG-0014…

电商数据分析-02-电商业务介绍及表结构

参考 电商业务简介 大数据项目之电商数仓、电商业务简介、电商业务流程、电商常识、业务数据介绍、电商业务表、后台管理系统 举个例子:&#x1f330; 1.1 电商业务流程 电商的业务流程可以以一个普通用户的浏览足迹为例进行说明&#xff0c;用户点开电商首页开始浏览&…

蓝桥杯备赛 day 1 —— 递归 、递归、枚举算法(C/C++,零基础,配图)

目录 &#x1f308;前言 &#x1f4c1; 枚举的概念 &#x1f4c1;递归的概念 例题&#xff1a; 1. 递归实现指数型枚举 2. 递归实现排列型枚举 3. 递归实现组合型枚举 &#x1f4c1; 递推的概念 例题&#xff1a; 斐波那契数列 &#x1f4c1;习题 1. 带分数 2. 反硬币 3. 费解的…

小程序面试题 | 18.精选小程序面试题

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

大师计划1.0 - log2 CRTO笔记

CRTOⅠ笔记 log2 这个笔记是我在2023年11月23日-12月22日中&#xff0c;学习CRTO所做的一些笔记。 事实上TryHackMe的路径和htb学院包含了许多CRTO的知识并且甚至还超出了CRTO&#xff08;CS除外&#xff09;&#xff0c;所以很多东西在THM和htb学院学过&#xff0c;这次CRTO等…

RK3588平台开发系列讲解(AI 篇)RKNN rknn_query函数详细说明

文章目录 一、查询 SDK 版本二、查询输入输出 tensor 个数三、查询输入 tensor 属性(用于通用 API 接口)四、查询输出 tensor 属性(用于通用 API 接口)五、查询模型推理的逐层耗时六、查询模型推理的总耗时七、查询模型的内存占用情况八、查询模型里用户自定义字符串九、查询原…

往年面试精选题目(前50道)

常用的集合和区别&#xff0c;list和set区别 Map&#xff1a;key-value键值对&#xff0c;常见的有&#xff1a;HashMap、Hashtable、ConcurrentHashMap以及TreeMap等。Map不能包含重复的key&#xff0c;但是可以包含相同的value。 Set&#xff1a;不包含重复元素的集合&#…

第四周:机器学习知识点回顾

前言&#xff1a; 讲真&#xff0c;复习这块我是比较头大的&#xff0c;之前的线代、高数、概率论、西瓜书、樱花书、NG的系列课程、李宏毅李沐等等等等…那可是花了三年学习佳实践下来的&#xff0c;现在一想脑子里就剩下几个名词就觉得废柴一个了&#xff0c;朋友们有没有同感…

Linux操作系统基础知识点

Linux是一种计算机操作系统&#xff0c;其内核由林纳斯本纳第克特托瓦兹&#xff08;Linus Benedict Torvalds&#xff09;于1991年首次发布。Linux操作系统通常与GNU套件一起使用&#xff0c;因此也被称为GNU/Linux。它是一种类UNIX的操作系统&#xff0c;设计为多用户、多任务…

滤波器(Filter)

滤波器 常用滤波器元器件 馈通电容滤波器NFM18PC104R1C3 \SDCW2012-2-900TF \ 0603 0.1UF(104) 16V 文章目录 滤波器前言一、滤波器是什么二、两路 0805共模滤波器 阻抗90Ω@100MHz三、0603 0.1UF(104) 16V四、馈通电容滤波器NFM18PC104R1C3总结前言 滤波器在电子系统中具有…

车载网络 - BootLoader - UDS刷写闲聊

聊升级的话,我们不得不聊一下MCU升级的一些基础概念;我们今天就简单说下,如果大家有兴趣,可以评论区留言,我后续继续补充内容或者私聊都可以的。 目录 一、MCU内存说明 二、常见的2类BOOT段 三、常见的APP段

机器人制作开源方案 | 森林管理员

​作者&#xff1a;李佳骏、常睿康、张智斌、李世斌、高华耸 单位&#xff1a;山西能源学院 指导老师&#xff1a;赵浩成、郜敏 1. 研究背景 森林作为地球上可再生自然资源及陆地生态的主体&#xff0c;在人类生存和发展的历史中起着不可代替的作用&#xff0c;它不仅能提供…

比宜德停业,奥乐齐死磕,硬折扣该怎样长硬不衰?

作者 | 楚文龙 来源 | 洞见新研社 刚刚过去的周末&#xff0c;让零售行业的从业者神经紧绷。因为&#xff0c;12月23日多个信源曝出&#xff0c;社区硬折扣超市比宜德已公告于12月22日起暂停营业。 作为中国第一家&#xff0c;也是唯一一家规模最大的硬折扣社区连锁店零售商&…

FLStudio21中文版水果编曲软件好用吗?如何下载最新版本

FL Studio21版是一款在国内非常受欢迎的多功能音频处理软件&#xff0c;我们可以通过这款软件来对多种不同格式的音频文件来进行编辑处理。而且FL Studio 21版还为用户们准备了超多的音乐乐器伴奏&#xff0c;我们可以直接一键调取自己需要的音调。 FL Studio21版不仅拥有非常…

leetcode——打家劫舍问题汇总

本章汇总一下leetcode中的打家劫舍问题&#xff0c;使用经典动态规划算法求解。 1、梦开始的地方——打家劫舍&#xff08;★&#xff09; 本题关键点就是不能在相邻房屋偷东西。 采用常规动态规划做法&#xff1a; 根据题意设定dp数组&#xff0c;dp[i]的含义为&#xff1a…

【WPF.NET开发】创建样式

本文内容 创建样式隐式应用样式显式应用样式以编程方式应用样式扩展样式TargetType 属性与 x:Key 属性之间的关系 使用 Windows Presentation Foundation (WPF)&#xff0c;可以使用自己的可重用样式自定义现有控件的外观。 可以对应用、窗口和页面全局应用样式&#xff0c;也…

【自定义磨砂动态背景】前端及pyqt6实现

如何实现一个自定义的磨砂动态背景呢&#xff1f; 这种效果看起来特别的高端&#xff0c;很新颖美观。 具体的效果可以看这里的演示&#xff1a;https://www.bilibili.com/video/BV1zj411H7wd/ 其实原理就是底层有多个多彩多边形在移动&#xff0c;然后再盖上一层模糊滤镜。 前…

【DevOps 工具链】搭建 项目管理软件 禅道

文章目录 1、简介2、环境要求3、搭建部署环境3.1. 安装Apache服务3.2. 安装PHP环境&#xff08;以php7.0为例 &#xff09;3.3. 安装MySQL服务 4、搭建禅道4.1、下载解压4.2、 配置4.2.1、 启动4.2.2、自启动4.2.3、确认是否开机启动 5、成功安装 1、简介 禅道是国产开源项目管…

React Router有几种模式?实现原理?

面试官&#xff1a;说说React Router有几种模式&#xff1f;实现原理&#xff1f; 一、是什么 在单页应用中&#xff0c;一个web项目只有一个html页面&#xff0c;一旦页面加载完成之后&#xff0c;就不用因为用户的操作而进行页面的重新加载或者跳转&#xff0c;其特性如下&a…

C++设计模式 #7 工厂方法(Factory Method)

“对象创建”模式 通过“对象创建”模式绕开new&#xff0c;来避免对象创建&#xff08;new&#xff09;过程中所导致的紧耦合&#xff08;依赖具体类&#xff09;&#xff0c;从而支持创建的稳定。它是接口抽象之后的第一步工作。 动机 在软件系统中&#xff0c;经常面临着创…