Flink电商实时数仓(三)

news2024/11/24 15:09:16

DIM层代码流程图

维度层的重点和难点在于实时电商数仓需要的维度信息一般是动态的变化的,并且由于实时数仓一般需要一直运行,无法使用常规的配置文件重启加载方式来修改需要读取的ODS层数据,因此需要通过Flink-cdc实时监控MySql中的维度数据配置信息表,实时动态的发布广播信息。主流数据根据广播数据及时调整处理逻辑,并自动在HBase中创建相应的维度表和写入相应的维度数据。

  1. 消费Kafka ods业务主题数据
  2. 数据清洗:是否为JSON格式
  3. 使用flink-cdc读取监控配置表数据
  4. 在HBase中创建维度表
  5. 做成广播流
  6. 连接主流和广播流
  7. 筛选出需要写出的字段
  8. 写出到Hbase

在这里插入图片描述

整体架构

  • realtime-common模块
    • base: 所有Flink程序的基类,负责搭建Flink运行环境和设置并行度和检查点等相关参数。其中我们的数据来源也确定为Kafka,故数据源代码也写在这里。每个Flink程序的具体处理逻辑由handle()函数来负责处理。
    • bean:负责存放项目运行过程中需要用到的bean对象,比如当前flink-cdc程序中需要用到的TableProcessDim类,配置信息表对象。
    • constant:负责存放程序中需要使用到常量参数
    • function:负责存放一些通用的函数方法
    • util:一般存放和数据连接相关的工具类
    • test目录: 用来在写正式代码前测试连接是否通畅,数据是否可以正常发送。
  • realtime-dim模块
    • app:DimApp里面写的是dim层的具体实现,具体步骤如上述流程图所示。
    • function:负责存放数据处理的实现类,一般会继承相应的父类,在dim层可以直接调用这里的子类来实现父类接口,让dim层的代码逻辑更加清晰。
  • realtime-dwd模块:如上
  • realtime-dws模块:如上

在这里插入图片描述

数据清洗ETL

数据清洗,简单来说就是对数据进行简单的转换筛选。首先如果在转换过程中出现异常,直接过滤掉。注意这里无需抛出异常,因为如果throw a exception会导致整个程序异常终止,而在数据处理过程中出现部分数据格式错误而无法正常进行格式转换是很常见的,只需将异常信息打印到控制台即可。如果转换正常,再判断是否满足以下三个条件:

  1. 数据库名为gmall
  2. 数据类型不是bootstrap-start或者bootstrap-complete
  3. data字段不是null且长度不为0

Flink-cdc读取配置表的数据

Flink中获取数据主要有两个步骤:

  1. 获取相应的数据源Source
    • 注意:在构建Flink-cdc对应的MySQLSource时,tableList参数必须是库表.表名结构
  2. 调用env.fromSource()方法将数据源的发送过来的数据转换Ds数据流,在该方法中可以设置数据的水位线。
  3. 获取到数据后,建议先打印到控制台查看数据的具体结构。
  4. 注意读取配置信息表的并发度必须设置为1;如果不为1,只能读取r操作数据,其他更新数据无法读取。
public static MySqlSource<String> getMySqlSource(String databaseName, String tableName){
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname(Constant.MYSQL_HOST)
                .port(Constant.MYSQL_PORT)
                .username(Constant.MYSQL_USER_NAME)
                .password(Constant.MYSQL_PASSWORD)
                .databaseList(databaseName) // set captured database
                .tableList(databaseName+"."+tableName) // set captured table

                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .startupOptions(StartupOptions.initial())
                .build();

        return mySqlSource;
    }

在HBase中创建维度表

数据库中的配置表数据经过Flink-cdc处理后发送到这里是json格式的字符串,这里根据数据的四种类型op在HBase中进行不同的建表删表操作,同时对数json字符数据进行转换映射处理,转换为对应的bean对象数据流。这里一个数据产生一个处理后的对象,故使用Map算子或FlatMap算子都可以。

  • op类型
    • d 代表delete,需要删除before字段中对应的表
    • c 代表create,r 代表 read,需要创建after字段中对应的表
    • u 代表update,需要先删除掉旧表,然后根据新表的字段创建一个新表
  • 创建HBase连接,创建连接是很耗费资源的行为,因此新建连接和关闭连接需要写在open和close方法中
  • HBase中想要对表进行创建和删除等DDL操作,都由Admin对象管理;如果需要对数据进行插入删除等DML操作,需要创建Table对象。详细操作细节请看相应代码即可。
public static SingleOutputStreamOperator<TableProcessDim> createHbaseTable(DataStreamSource<String> mysqlSource) {
        SingleOutputStreamOperator<TableProcessDim> createHBaseTable = mysqlSource.flatMap(
                new RichFlatMapFunction<String, TableProcessDim>() {


                    public Connection connection ;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //获取连接
                        connection = HBaseUtil.getHBaseConnection();
                    }

                    @Override
                    public void close() throws Exception {
                        //关闭连接
                        HBaseUtil.closeHBaseConn(connection);
                    }

                    @Override
                    public void flatMap(String s, Collector<TableProcessDim> out){
                        //使用读取的配置表数据,到HBase中创建与之对应的表格

                        try {
                            JSONObject jsonObject = JSONObject.parseObject(s);
                            String op = jsonObject.getString("op");
                            TableProcessDim dim;//维度表
                            if ("d".equals(op)) {
                                dim = jsonObject.getObject("before", TableProcessDim.class);
                                dim.setOp(op);
                                //当配置表发送一个D类型的数据,对应的HBase需要删除一张维度表
                                deleteTable(dim);
                            } else if ("c".equals(op) || "r".equals(op)) {
                                dim = jsonObject.getObject("after", TableProcessDim.class);
                                createTable(dim);
                                dim.setOp(op);
                            } else {//op = 'u', 即修改
                                dim = jsonObject.getObject("after", TableProcessDim.class);
                                deleteTable(dim);
                                createTable(dim);
                            }
                            dim.setOp(op);
                            out.collect(dim);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                    private void createTable(TableProcessDim dim) {
                        String sinkFamily = dim.getSinkFamily();
                        String[] split = sinkFamily.split(",");
                        try {
                            HBaseUtil.createHBaseTable(connection,Constant.HBASE_NAMESPACE,dim.getSinkTable(),split);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }

                    private void deleteTable(TableProcessDim dim) {
                        try {
                            HBaseUtil.dropHBaseTable(connection, Constant.HBASE_NAMESPACE, dim.getSinkTable());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }
                }
        );
        return createHBaseTable;
    }

主流连接广播流

从Flink-cdc获取的数据(gmall2023_config)是作为一个参数来控制我们对于主流即ODS层数据(gmall数据库的业务数据)的处理逻辑。gmall2023)_config库中的Table_process_dim表决定了后续程序筛选哪个表作为维度信息,并且定义了表中有哪些字段。

  1. 转换为广播流只需要调用上述得到的TableProcessDimStream的broadcast方法
  2. 使用的主流(gmall业务数据)的connect方法,得到一个连接流,然后对连接流进行process处理。
  3. 创建BroadcastProcessFunction,在里面分别有两个函数
    • processBroadcastElement():处理广播流数据
    • processElement():处理主流数据
  4. 广播流处理逻辑:
    • 读取广播状态
    • 将配置表信息写到广播状态中
    • 根据广播状态数据的op对状态做相应的修改
  5. 主流处理逻辑:
    • 查询广播状态,判断当前数据对应的表是否存在于状态中
    • 如果数据比状态来的更早,造成状态为空,需要对状态做预处理(提前从mysql中读取维表配置表信息)
    • 如果根据当前表的表名查询的状态不为空,说明该表为维度数据,使用收集器收集起来。

筛选出需要的字段

在这里插入图片描述
在维度配置信息表中的sink_column字段里定义了维度表需要的字段,使用filter算子对JsonObj里面的data字段进行过滤即可获取到想要的字段数据。

写出到Hbase

过滤后的数据流调用它的addSink方法,方法中需要传入一个SinkFunction接口类。该接口需要实现三个方法分别是:

  • open方法:获取HBase连接
  • close方法:关闭HBase连接
  • invoke方法:写入数据时调用的方法,根据jsonObj中的type做不同处理,如果是delete,需要删除对应的维度表数据;否则都是直接覆盖写入。

代码的Gitee仓库地址:https://gitee.com/langpaian/gmall2023-realtime.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1327681.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GraphPad Prism 10 for Mac v10.0.0.3 安装教程

GraphPad Prism GraphPad Prism是一款非常专业强大的科研医学生物数据处理绘图软件&#xff0c;它可以将科学图形、综合曲线拟合&#xff08;非线性回归&#xff09;、可理解的统计数据、数据组织结合在一起&#xff0c;除了最基本的数据统计分析外&#xff0c;还能自动生成统…

ansible(二)

模块七&#xff1a; hostname模块&#xff0c;修改主机名 模块八&#xff1a; copy模块&#xff1a;用于复制指定主机的文件到远程主机的模块&#xff08;必须要用绝对路径&#xff09; 常用的参数&#xff1a; Dest:指出要复制的文件在哪&#xff08;去哪&#xff09;&am…

C语言—每日选择题—Day59

指针相关博客 打响指针的第一枪&#xff1a;指针家族-CSDN博客 深入理解&#xff1a;指针变量的解引用 与 加法运算-CSDN博客 第一题 1. 以下关于 typedef 正确的描述是&#xff08;&#xff09;【多选】 A&#xff1a;用typedef可以定义各种类型别名&#xff0c;但不能定义变量…

CSS:元素显示模式与背景

CSS&#xff1a;元素显示模式与背景 元素显示模式什么是元素显示模式块级元素 block行内元素 inline行内块元素 inline-block元素显示模式对比元素显示模式转换 display 背景背景颜色 background-color背景图片 background-image背景平铺 background-repeat背景图片位置 backgr…

使用Swift Package Manager (SPM)实现xcframework分发

Swift Package Manager (SPM) 是苹果官方提供的用于管理 Swift 项目的依赖关系和构建过程的工具。它是一个集成在 Swift 编程语言中的包管理器&#xff0c;用于解决在开发过程中管理和构建包依赖项的需求。 Package结构 一个 Package&#xff08;包&#xff09;由 Swift 源码…

CSS 网页制作-学成在线

1、 准备工作 1.1 项目目录 网站根目录是指存放网站的第一层文件夹&#xff0c;内部包含当前网站的所有素材&#xff0c;包含HTML、CSS、图片、JavaScript等等。 1.2 版心效果 可以发现都是呈现版心居中的效果&#xff0c;但是每次都写一次太麻烦了&#xff0c;可以把版心居中…

Android应用-flutter使用Positioned将控件定位到底部中间

文章目录 场景描述示例解释 场景描述 要将Positioned定位到屏幕底部中间的位置&#xff0c;你可以使用MediaQuery来获取屏幕的高度&#xff0c;然后设置Positioned的bottom属性和left或right属性&#xff0c;一般我们left和right都会设置一个值让控制置于合适的位置&#xff0…

鸿蒙应用开发初体验 HelloWorld

9 月 25 日&#xff0c;华为常务董事、终端 BG CEO、智能汽车解决方案 BU 董事长余承东华为秋季全场景新品发布会上介绍了鸿蒙系统的最新进展&#xff1a;HarmonyOS 4 发布后&#xff0c;短短一个多月升级用户已经超过 6000 万&#xff0c;成为史上升级速度最快的 HarmonyOS 版…

Python数据科学视频讲解:包裹法—递归特征消除

4.4 包裹法—递归特征消除 视频为《Python数据科学应用从入门到精通》张甜 杨维忠 清华大学出版社一书的随书赠送视频讲解4.4节内容。本书已正式出版上市&#xff0c;当当、京东、淘宝等平台热销中&#xff0c;搜索书名即可。内容涵盖数据科学应用的全流程&#xff0c;包括数据…

【已解决】vs2015下c++对sqlite的操作

本博文源于笔者操作sqlite3&#xff0c;借鉴了很多文章的思路&#xff0c;这里并整理了c常用的对数据库的操作供大家点赞收藏以后备用。包含了&#xff1a;c对sqlite3的创建数据库、创建数据表、写入数据表、读取数据表、删除数据表。也包括了最基础的让c运行sqlite3.内容供读者…

gem5 RubyPort: mem_request_port作用与连接 simple-MI_example.py

简介 回答这个问题&#xff1a;RubyPort的口下&#xff0c;一共定义了六个口&#xff0c;分别是mem_request_port&#xff0c;mem_response_port&#xff0c;pio_request_port&#xff0c;pio_response_port&#xff0c;in_ports, interrupt_out_ports&#xff0c;他们分别有什…

长三角安防行业盛会“2024杭州安博会”4月份在杭州博览中心召开

作为中国安防行业的盛会&#xff0c;2024杭州安博会将于4月份在杭州国际博览中心隆重召开。本届安博会将汇聚全球最先进的安防技术和产品&#xff0c;为来自世界各地的安防从业者、爱好者以及投资者提供一个交流、展示和合作的平台。 据了解&#xff0c;2024杭州安博会将会展示…

one wire(单总线)FPGA代码篇

一.引言 单总线&#xff08;OneWire&#xff09;是一种串行通信协议&#xff0c;它允许多个设备通过一个单一的数据线进行通信。这个协议通常用于低速、短距离的数字通信&#xff0c;特别适用于嵌入式系统和传感器网络。 二.one wire通信优点缺点 优点&#xff1a; 单一数据线…

[CVPR-23] Instant Volumetric Head Avatars

[paper | code | proj] 本文提出INSTA。INSTA是一种backward mapping方法。该方法基于NeRF建立标准空间&#xff0c;形变空间&#xff08;任意表情&#xff09;通过映射回标准空间&#xff0c;实现渲染。为实现形变空间中任意点向标准空间的映射&#xff0c;对形变空间中的任意…

PySpark中DataFrame的join操作

内容导航 类别内容导航机器学习机器学习算法应用场景与评价指标机器学习算法—分类机器学习算法—回归机器学习算法—聚类机器学习算法—异常检测机器学习算法—时间序列数据可视化数据可视化—折线图数据可视化—箱线图数据可视化—柱状图数据可视化—饼图、环形图、雷达图统…

Vue 保留富文本中包含指定字符串所在的行

需求描述 如下图所示&#xff0c;想保留所有包含『张三』所在的行 最终实现效果 先看一下富文本的源码 <p>任务1 张三</p> <p>任务2 张三</p> <p>任务3 李四</p> <p>任务4 李四</p> &l…

「微服务模式」七种微服务反模式

什么是微服务 流行语经常为进化的概念提供背景&#xff0c;并且需要一个良好的“标签”来促进对话。微服务是一个新的“标签”&#xff0c;它定义了我个人一直在发现和使用的领域。文章和会议描述了一些事情&#xff0c;我慢慢意识到&#xff0c;过去几年我一直在发展自己的个人…

JMeter常见配置及常见问题修改

一、设置JMeter默认打开字体 1、进入安装目录&#xff1a;apache-jmeter-x.x.x\bin\ 2、找到 jmeter.properties&#xff0c;打开。 3、搜索“ languageen ”&#xff0c;前面带有“#”号.。 4、去除“#”号&#xff0c;并修改为&#xff1a;languagezh_CN 或 直接新增一行&…

Envoy

一. Envoy ). Envoy Envoy 于 2017 年 9 月作为孵化项目加入 CNCF。从孵化到毕业&#xff0c;Envoy 都是 CNCF 增长最快的项目之一 Envoy 在吞吐量和延迟方面都表现良好。这在大型云原生部署中至关重要 Envoy 是专为大型现代 SOA&#xff08;面向服务架构&#xff09;架构设计…

Java|IDEA 中添加编译参数 --add-exports

方法1 File > Settings > Build, Execution, Deployment > Compiler > Java Compiler > Javac Options > Override compiler parameters per-module 点击&#xff1a; 点击OK 双击Compliation options&#xff0c;输入后回车&#xff1a; 方法2 找到出错…