【实时数仓】DWM层订单宽表之需求分析、订单和订单明细关联源码

news2024/10/7 16:27:07

文章目录

  • 一 DWM层-订单宽表
    • 1 需求分析与思路
    • 2 订单和订单明细关联代码实现
      • (1)从Kafka的dwd层接收订单和订单明细数据
        • a 创建订单实体类
        • b 创建订单明细实体类
        • c 在dwm包下创建OrderWideApp读取订单和订单明细数据
        • d 测试
      • (2)订单和订单明细关联(双流join)
        • a 设定事件时间水位线
        • b 创建合并后的宽表实体类
        • c 设定关联的key
        • d 订单和订单明细关联 intervalJoin
        • e 测试

一 DWM层-订单宽表

1 需求分析与思路

订单是统计分析的重要的对象,围绕订单有很多的维度统计需求,比如用户、地区、商品、品类、品牌等等。

为了之后统计计算更加方便,减少大表之间的关联,所以在实时计算过程中将围绕订单的相关数据整合成为一张订单的宽表。

在这里插入图片描述

如上图,由于在之前的操作已经把数据分拆成了事实数据和维度数据,事实数据(绿色)进入kafka数据流(DWD层)中,维度数据(蓝色)进入hbase中长期保存。那么在DWM层中要把实时和维度数据进行整合关联在一起,形成宽表。那么这里就要处理有两种关联,事实数据和事实数据关联、事实数据和维度数据关联。

  • 事实数据和事实数据关联,其实就是流与流之间的关联。
  • 事实数据与维度数据关联,其实就是流计算中查询外部数据源。

2 订单和订单明细关联代码实现

(1)从Kafka的dwd层接收订单和订单明细数据

实现思路如下:

在这里插入图片描述

a 创建订单实体类

package com.hzy.gmall.realtime.beans;

import lombok.Data;
import java.math.BigDecimal;

/**
 * Desc: 订单实体类
 */
@Data
public class OrderInfo {
    // 属性名需要与数据库中字段名相同
    Long id;
    Long province_id;
    String order_status;
    Long user_id;
    BigDecimal total_amount;    //实际付款金额
    BigDecimal activity_reduce_amount;
    BigDecimal coupon_reduce_amount;
    BigDecimal original_total_amount;
    BigDecimal feight_fee;
    String expire_time;
    String create_time;
    String operate_time;
    String create_date; // 把其他字段处理得到
    String create_hour;
    Long create_ts; // 通过create_time转换
}

b 创建订单明细实体类

package com.hzy.gmall.realtime.beans;

import lombok.Data;
import java.math.BigDecimal;

/**
 * Desc:订单明细实体类
 */
@Data
public class OrderDetail {
    Long id;
    Long order_id ; // 无外键约束
    Long sku_id;
    BigDecimal order_price ;
    Long sku_num ;
    String sku_name; // 冗余字段,可以减少连接查询的次数
    String create_time;
    BigDecimal split_total_amount;
    BigDecimal split_activity_amount;
    BigDecimal split_coupon_amount;
    Long create_ts; // 通过create_time转换
}

c 在dwm包下创建OrderWideApp读取订单和订单明细数据

package com.hzy.gmall.realtime.app.dwm;
/**
 * 订单宽表的准备
 */
public class OrderWideApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 基本环境准备
        //1.1 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);

        //TODO 2 检查点设置(略)

        //TODO 3 从kafka中读取数据
        //3.1 声明消费主题以及消费者组
        String orderInfoSourceTopic = "dwd_order_info";
        String orderDetailSourceTopic = "dwd_order_detail";
        String groupId = "order_wide_app_group";

        //3.2 获取kafka消费者对象
        // 订单
        FlinkKafkaConsumer<String> orderInfoKafkaSource = MyKafkaUtil.getKafkaSource(orderInfoSourceTopic, groupId);
        // 订单明细
        FlinkKafkaConsumer<String> orderDetailKafkaSource = MyKafkaUtil.getKafkaSource(orderDetailSourceTopic, groupId);
        //3.3 读取数据,封装为流
        // 订单流
        DataStreamSource<String> orderInfoStrDS = env.addSource(orderInfoKafkaSource);
        // 订单明细流
        DataStreamSource<String> orderDetailStrDS = env.addSource(orderDetailKafkaSource);

        //TODO 4 对流中数据类型进行转换 String -> 实体对象
        //订单
        SingleOutputStreamOperator<OrderInfo> orderInfoDS = orderInfoStrDS.map(
                new RichMapFunction<String, OrderInfo>() {
                    private SimpleDateFormat sdf;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    }

                    @Override
                    public OrderInfo map(String jsonStr) throws Exception {
                        OrderInfo orderInfo = JSON.parseObject(jsonStr, OrderInfo.class);
                        orderInfo.setCreate_ts(sdf.parse(orderInfo.getCreate_time()).getTime());
                        return orderInfo;
                    }
                }
        );
        // 订单明细
        SingleOutputStreamOperator<OrderDetail> orderDetailDS = orderDetailStrDS.map(
                new RichMapFunction<String, OrderDetail>() {
                    private SimpleDateFormat sdf;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    }

                    @Override
                    public OrderDetail map(String jsonStr) throws Exception {
                        OrderDetail orderDetail = JSON.parseObject(jsonStr, OrderDetail.class);
                        orderDetail.setCreate_ts(sdf.parse(orderDetail.getCreate_time()).getTime());
                        return orderDetail;
                    }
                }
        );

        orderInfoDS.print("订单信息:");
        orderDetailDS.print("订单明细:");

        env.execute();
    }
}

d 测试

启动zookeeper、kafka、maxwell、hdfs,等待退出安全模式,启动Hbase

在配置表table_process中添加两条数据,如下

source_table    operate_type  sink_type  sink_table          sink_columns  sink_pk  sink_extend  
     
order_detail    insert        kafka      dwd_order_detail    id,order_id,sku_id,sku_name,img_url,order_price,sku_num,create_time,source_type,source_id,split_total_amount,split_activity_amount,split_coupon_amount                                                                id       (NULL)       
order_info      insert        kafka      dwd_order_info      id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time  id       (NULL)          

启动BaseDBApp、OrderWideApp,模拟生成业务数据,观察结果。

  • 执行流程

业务数据生成**->Maxwell同步->Kafka的ods_base_db_m主题->BaseDBApp分流写回kafka->dwd_order_info和dwd_order_detail->**OrderWideApp从kafka的dwd层读数据,打印输出

(2)订单和订单明细关联(双流join)

在flink中的流join大体分为两种,一种是基于时间窗口的join(Time Windowed Join),比如join、coGroup等。另一种是基于状态缓存的join(Temporal Table Join),比如intervalJoin。

这里选用intervalJoin,因为相比较窗口join,intervalJoin使用更简单,而且避免了应匹配的数据处于不同窗口的问题。intervalJoin目前只有一个问题,就是还不支持left join。

但是这里订单主表与订单从表之间的关联不需要left join,所以intervalJoin是较好的选择。

详情见官网关于窗口的说明。

Interval join 组合元素的条件为:两个流(我们暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内。

这个条件可以更加正式地表示为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里的 a 和 b 为 A 和 B 中共享相同 key 的元素。上界和下界可正可负,只要下界永远小于等于上界即可。 Interval join 目前仅执行 inner join。

当一对元素被传递给 ProcessJoinFunction,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context 访问)。

Interval join 目前仅支持 event time。

intervalJoin连接数据的方式如下图:

在这里插入图片描述

上例中,我们 join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。 默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive().upperBoundExclusive() 可以将它们排除在外。

图中三角形所表示的条件也可以写成更加正式的表达式:

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

a 设定事件时间水位线

// TODO 5 指定Watermark并提取事件时间字段
//订单
SingleOutputStreamOperator<OrderInfo> orderInfoWithWatermarkDS = orderInfoDS.assignTimestampsAndWatermarks(
        WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(
                        new SerializableTimestampAssigner<OrderInfo>() {
                            @Override
                            public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                                return orderInfo.getCreate_ts();
                            }
                        }
                )
);
// 订单明细
SingleOutputStreamOperator<OrderDetail> orderDetailWithWatermarkDS = orderDetailDS.assignTimestampsAndWatermarks(
        WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(
                        new SerializableTimestampAssigner<OrderDetail>() {
                            @Override
                            public long extractTimestamp(OrderDetail orderDetail, long recordTimestamp) {
                                return orderDetail.getCreate_ts();
                            }
                        }
                )
);

b 创建合并后的宽表实体类

package com.hzy.gmall.realtime.beans;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.ObjectUtils;

import java.math.BigDecimal;

/**
 * Desc: 订单和订单明细关联宽表对应实体类
 */
@Data
@AllArgsConstructor
public class OrderWide {
    Long detail_id;
    Long order_id ;
    Long sku_id;
    BigDecimal order_price ;
    Long sku_num ;
    String sku_name;
    Long province_id;
    String order_status;
    Long user_id;

    BigDecimal total_amount;
    BigDecimal activity_reduce_amount;
    BigDecimal coupon_reduce_amount;
    BigDecimal original_total_amount;
    BigDecimal feight_fee;
    BigDecimal split_feight_fee;
    BigDecimal split_activity_amount;
    BigDecimal split_coupon_amount;
    BigDecimal split_total_amount;

    String expire_time;
    String create_time;
    String operate_time;
    String create_date; // 把其他字段处理得到
    String create_hour;

    String province_name;//查询维表得到
    String province_area_code;
    String province_iso_code;
    String province_3166_2_code;

    Integer user_age ;
    String user_gender;

    Long spu_id;     //作为维度数据 要关联进来
    Long tm_id;
    Long category3_id;
    String spu_name;
    String tm_name;
    String category3_name;

    public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail){
        mergeOrderInfo(orderInfo);
        mergeOrderDetail(orderDetail);

    }

    // 将订单的信息赋值给订单宽表
    public void  mergeOrderInfo(OrderInfo orderInfo  )  {
        if (orderInfo != null) {
            this.order_id = orderInfo.id;
            this.order_status = orderInfo.order_status;
            this.create_time = orderInfo.create_time;
            this.create_date = orderInfo.create_date;
            this.activity_reduce_amount = orderInfo.activity_reduce_amount;
            this.coupon_reduce_amount = orderInfo.coupon_reduce_amount;
            this.original_total_amount = orderInfo.original_total_amount;
            this.feight_fee = orderInfo.feight_fee;
            this.total_amount =  orderInfo.total_amount;
            this.province_id = orderInfo.province_id;
            this.user_id = orderInfo.user_id;
        }
    }

    // 将订单明细的信息赋值给订单宽表
    public void mergeOrderDetail(OrderDetail orderDetail  )  {
        if (orderDetail != null) {
            this.detail_id = orderDetail.id;
            this.sku_id = orderDetail.sku_id;
            this.sku_name = orderDetail.sku_name;
            this.order_price = orderDetail.order_price;
            this.sku_num = orderDetail.sku_num;
            this.split_activity_amount=orderDetail.split_activity_amount;
            this.split_coupon_amount=orderDetail.split_coupon_amount;
            this.split_total_amount=orderDetail.split_total_amount;
        }
    }

    // firstNonNull获取参数中第一个不为空的值
    public void mergeOtherOrderWide(OrderWide otherOrderWide){
        this.order_status = ObjectUtils.firstNonNull( this.order_status ,otherOrderWide.order_status);
        this.create_time =  ObjectUtils.firstNonNull(this.create_time,otherOrderWide.create_time);
        this.create_date =  ObjectUtils.firstNonNull(this.create_date,otherOrderWide.create_date);
        this.coupon_reduce_amount =  ObjectUtils.firstNonNull(this.coupon_reduce_amount,otherOrderWide.coupon_reduce_amount);
        this.activity_reduce_amount =  ObjectUtils.firstNonNull(this.activity_reduce_amount,otherOrderWide.activity_reduce_amount);
        this.original_total_amount =  ObjectUtils.firstNonNull(this.original_total_amount,otherOrderWide.original_total_amount);
        this.feight_fee = ObjectUtils.firstNonNull( this.feight_fee,otherOrderWide.feight_fee);
        this.total_amount =  ObjectUtils.firstNonNull( this.total_amount,otherOrderWide.total_amount);
        this.user_id =  ObjectUtils.<Long>firstNonNull(this.user_id,otherOrderWide.user_id);
        this.sku_id = ObjectUtils.firstNonNull( this.sku_id,otherOrderWide.sku_id);
        this.sku_name =  ObjectUtils.firstNonNull(this.sku_name,otherOrderWide.sku_name);
        this.order_price =  ObjectUtils.firstNonNull(this.order_price,otherOrderWide.order_price);
        this.sku_num = ObjectUtils.firstNonNull( this.sku_num,otherOrderWide.sku_num);
        this.split_activity_amount=ObjectUtils.firstNonNull(this.split_activity_amount);
        this.split_coupon_amount=ObjectUtils.firstNonNull(this.split_coupon_amount);
        this.split_total_amount=ObjectUtils.firstNonNull(this.split_total_amount);
    }
}

c 设定关联的key

// TODO 6 通过分组指定两流的关联字段 -- order_id
// 订单
KeyedStream<OrderInfo, Long> orderInfoKeyedDS = orderInfoWithWatermarkDS.keyBy(OrderInfo::getId);
// 订单明细
KeyedStream<OrderDetail, Long> orderDetailkeyedDS = orderDetailWithWatermarkDS.keyBy(OrderDetail::getOrder_id);

d 订单和订单明细关联 intervalJoin

这里设置了正负5秒,以防止在业务系统中主表与从表保存的时间差。

// TODO 7 双流join,使用intervalJoin
// 用订单(一)join订单明细(多)
SingleOutputStreamOperator<OrderWide> orderWideDS = orderInfoKeyedDS
        .intervalJoin(orderDetailkeyedDS)
        .between(Time.seconds(-5), Time.seconds(5))
        .process(
                new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                    @Override
                    public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, Context ctx, Collector<OrderWide> out) throws Exception {
                        out.collect(new OrderWide(orderInfo, orderDetail));
                    }
                }
        );
orderWideDS.print(">>>");

e 测试

目前以完成工作

  • 基本环境准备

  • 设置检查点

  • 从kafka中读取两条流数据并在转换结构的时候补充创建时间(create_ts)

  • 通过keyby设置关联字段 – order_id

  • 双流join

    A.intervalJoin(B)
     .between(下界,上界)
     .process()
    
  • 测试,同2(1)d

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/101790.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot中的starter:whywhatmake

1、为什么用starter 我们知道&#xff0c;springboot框架是为了简化spring框架开发推出的&#xff0c;那么&#xff0c;在之前的spring框架开发中&#xff0c;如果我们需要连接数据库&#xff0c;引入mybatis框架&#xff0c;需要怎么操作呢&#xff1f;我们需要去maven仓库找…

当 chatGPT 被职场 PUA ,笑麻了

大家最近是不是被 chatGPT 刷屏了&#xff1f;简单来说&#xff0c;chatGPT 是一个智能聊天引擎。 那 chatGPT 和小爱同学、 siri 有什么区别呢&#xff1f; 如果体验过的朋友&#xff0c;能感受到区别还是很大&#xff0c;chatGPT 的智能表现过于优秀&#xff0c;远远超过了这…

深入浅出DPDK KNI核心技术

一、KNI KNI全称&#xff1a;Kernel NIC Interface&#xff0c;内核网卡接口&#xff0c;允许用户态程序访问linux控制平面。 在DPDK报文处理中&#xff0c;有些报文需要发送到内核协议栈进行处理&#xff0c;如GTP-C控制报文 如果报文数量较少&#xff0c;可以使用内核提供…

Lua热更新

Lua热更新解决方案 文章目录Lua热更新解决方案1.AB包1.1 AB包概述1. 从0开始的Lua语法1.1 HelloWorld1.2 数据类型1.3 字符串1.4 运算符1.5 条件语句1.6 循环语句1.7 函数1.8 数组1.9 迭代器1.10 字典&#xff0c;类&#xff0c;对象1.11 多脚本执行1.12 特殊语法1.13 协程1.14…

hive时间和字符串互转,时间函数

hive里经常需要将字符串转化为date或timestamp 或者转化为日期格式的string 先说一个简单的 cast(xx as date/string/timestamp) 这个大多情况都可以用 1.to_date to_date只保留年月日,参数必须是string类型的yyyy-MM-dd HH:mm:ss或者date或timestamp类型,返回值是date类型,…

尚太科技开启招股:预计募资总额22亿元,业绩增长迅猛

12月19日&#xff0c;石家庄尚太科技股份有限公司&#xff08;下称“尚太科技”&#xff0c;SZ:001301&#xff09;开启招股&#xff0c;将在深圳证券交易所主板上市。本次冲刺上市&#xff0c;尚太科技的发行价格为33.88元/股&#xff0c;发行数量为6494.37万股&#xff0c;募…

MySQL Binlog温故知新

目录 一、什么是Binlog 二、Binlog文件记录模式 三、Binlog 日志内容 四、常用的binlog日志操作命令 五、binlog日志中间件 一、什么是Binlog Binlog &#xff08;Binary log&#xff09;是MySQL的二进制日志&#xff0c;以二进制的形式记录了对于数据库的变更&#xff0…

Tesseract-OCR 和cnocr/cnstd

Tesseract-OCR学习系列 Tesseract-OCR学习系列&#xff08;四&#xff09;API - 简书 参考文档&#xff1a;https://github.com/tesseract-ocr/tesseract/wiki/APIExample 这篇文章介绍了GetComponentImages等基础api的用法 Python 自动识别图片文字—保姆级OCR实战教程 Py…

使用界面配置静态路由(保姆级教程)

啰嗦几句 因为写的很详细&#xff0c;保姆级别的&#xff0c;所以看起来内容很多&#xff0c;但是东西就那一点&#xff0c;你自己配个2、3遍就会了。期末考试也不用担心这个实验了。 使用Cisco Packet Tracer这个软件。 原文件下载 补充&#xff1a;下载后使用Cisco Packet T…

有源电力滤波器——低压配电系统

安科瑞 华楠 一、谐波的危害 ● 使公用电网中的设备产生附加谐波损耗&#xff0c;降低电网、输电及用电设备的使用效率。 ● 在三相四线制系统中&#xff0c;由于零线流过大量的3n次谐波电流&#xff0c;造成零线过热。 ● 谐波会产生额外的热效应&#xff0c;引起用电设备…

GitLab创建新分支并同步其它分支的内容(IDEA)

拉取项目到本地 选择对应项目并复制clone地址 打开IDEA&#xff0c;选择File–》New–》Project from Version Control 在弹出的对话框中粘贴刚才复制的地址 完成后项目就被拉取到本地对应目录中了 创建新分支 这里以master分支为例&#xff0c;其它分支同理 首先在GitL…

Linux Red Hat 8.0 find命令练习

find&#xff1a;主要用来查找字符串 常用参数&#xff1a; -name 搜索文件名 -iname 搜索文件名时大小写不敏感 -type d 目录、 f 普通文件、 l 符号链接 -size 指定搜索文件大小 -perm 按执行权限来查找 -user 按…

在线考试网站

开发工具(eclipse/idea/vscode等)&#xff1a; 数据库(sqlite/mysql/sqlserver等)&#xff1a; 功能模块(请用文字描述&#xff0c;至少200字)&#xff1a; 模块划分&#xff1a;老师模块、班级模块、学生模块、课程模块、试题模块、试卷模块、 组卷模块、考试模块、答题模块 …

什么是项目风险管理?要如何执行风险管理?

项目风险管理是项目管理中重要组成部分之一&#xff0c;其影响着项目是否能成功最终交付的关键。虽然项目经理都无法准确地预测未来项目结果&#xff0c;但可以应用一种精简的风险管理流程来预测项目管理中的不确定性&#xff0c;并将这些不确定因素的影响降到最低。因此如何执…

基于Python+Django+Vue+MYSQL的邯郸地标美食导游平台

项目介绍 我的家乡是邯郸市。邯郸市我国为数不多的3000年没有改变过名字城市。我的家乡就是邯郸。在我的家乡有非常多的旅游景点和美食。为了让更多的人了解到邯郸的旅游景点和好吃的美食信息&#xff0c;我通过pythonvueelementui开发了本次的邯郸家乡网红旅游景点美食导游平…

CSS高级 DAY2

目录 选择器进阶 复合选择器 后代选择器 子代选择器 并集选择器 交集选择器 hover伪类选择器&#xff08;就是鼠标悬停选择器&#xff09; 背景有关属性 背景颜色 背景图片 背景平铺 背景位置 背景的复合属性 元素显示模式 块级元素 行内元素 行内块元素 元素显示…

SimpleDateFormat线程不安全解析以及解决方案

我们都知道SimpleDateFormat是Java中用来日期格式化的一个类&#xff0c;可以将特定格式的字符转转化成日期&#xff0c;也可将日期转化成特定格式的字符串。比如 将特定的字符串转换成日期 public static void main(String[] args) throws ParseException {SimpleDateFormat…

windows11打开隐藏的gpedit.msc本地组策略编辑器以及禁止自动更新系统

目录打开隐藏的gpedit.msc本地组策略编辑器windows11禁止自动更新系统打开隐藏的gpedit.msc本地组策略编辑器 1.新建.txt文本文件&#xff0c;输入如下代码 echo off pushd “%~dp0” dir /b C:\Windows\servicing\Packages\Microsoft-Windows-GroupPolicy-ClientExtension…

a股行情接口功能特点

a股行情接口功能特点&#xff0c;如下&#xff1a; 1、支持A股历史交易查询&#xff1b; 2、包括股票数据&#xff1b; 3、每日A股收盘后更新当日交易数据&#xff0c;停牌不更新&#xff1b; 4、支持多股历史数据一次查询&#xff1b; 5、支持任何时间段查询&#xff1b;…

Linux进程概念(二)

Linux进程概念进程状态普通操作系统层面理解运行与阻塞挂起与阻塞Linux是怎么做的孤儿进程进程优先级什么是优先级如何改变优先级其他概念进程状态 进程状态分有&#xff1a; 运行 新建 就绪 挂起 阻塞 等待 停止 挂机 死亡… 状态其实就是返回的整形&#xff0c;就像某个数字…