Flink 实时数仓(五)【DWD 层搭建(三)交易域事实表】

news2025/1/25 4:23:28

前言

        今天开始交易域事实表的创建,上一节流量域中的表(其实就是一个 kafka 主题)数据来自于日志,而交易域的数据来自于业务系统,业务表之间是有关联性的。

        我们之前在离线数仓中(声明粒度(最细粒度) -> 确认维度(维度外键 + 退化字段) -> 确认事实(度量值)),往往都是会选择一个最细粒度的业务表作为主表(比如订单明细表,因为订单明细表的粒度是sku),然后和其他相关的业务表(比如订单表(订单表中可以获得订单的状态),订单明细活动关联表,订单明细优惠券关联表等)进行 join,保留维度外键,选择需要退化的字段,最后添加度量字段;

        那么,在事实表这里其实也是一样的,我们也不可避免需要对事实数据流之间进行 join,实时数据和维表之间的 join,所以我们现在需要熟悉一下 Flink 中如何使用 DataStream API 或者 Flink SQL 实现 join:

0、前置知识回顾

0.1、DataStream API  实现 join

        从官网可以看到,Flink 的窗口 API 分为两类:window join 和 interval join;我们可以分析一下在下面事实表的创建中应该选择哪种 join 方式:

  • window join(官网并不推荐)
    • 滚动窗口:可能存在数据丢失(两条需要关联的数据刚好被窗口隔开)
    • 滑动窗口:可能存在数据重复
    • 会话窗口:可能存在窗口迟迟无法关闭,导致实时性变差
  • interval join

关于 interval join,我们在学习 Flink 时间语义和水位线 的时候讲过:

间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作 A)中的任意一个数据元素 a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫 B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果。

所以匹配的条件为:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里需要注意:

  • 做间隔联结的两条流 A 和 B,也必须基于相同的 key;
  • 下界 lowerBound应该小于等于上界 upperBound,两者都可正可负;
  • 间隔联结目前只支持事件时间语义。

0.2、Flink SQL 实现 join

这一块我们之前在学Flink SQL 查询的时候详细介绍过了,这里不多废话,回顾一下即可;

1、交易域加购事务事实表

        主要任务:提取订单表中的加购操作生成加购表,并将字典表中的相关维度退化到加购表中,写出到 Kafka 对应主题。

        前两节我们对日志数据的事实表进行构建的时候,考虑到日志表数据量大,于是我们将日志表根据不同类型拆分到了 5 个不同的 Kafka 主题当中。而这里我们处理业务系统中的数据并不需要那么做,毕竟数据量没那么大,这里我们只需要从 Maxwell 同步过来的 JSON 数据中过滤出我们需要的数据即可,不需要像日志数据那样多经过一次 Kafka ,影响效率;

1.1、实现思路

        事实表表需要通过和字典表进行 join 来做维度退化,所以这里我们需要使用 JDBC 连接器,将字典表中的数据封装成 Flink SQL 表;

        同样,我们需要用到 Kafka 连接器来将 Kafka 中所有的数据封装为一个 Flink SQL 表;

这个交易域加购事务事实表中,需要用到的业务表只有购物车表和订单表(用于对商品来源做一个维度退化,因为购物车表中只有来源 id,没有来源 value);

1.2、代码实现

1.2.1、topic_db 表结构设计

首先,我们需要考虑如何将 Kafka 的 topic_db 中的数据保存到 Flink SQL 表中去?因为 topic_db 中存储的是 Maxwell 同步过来的 46 张表 JSON 格式的数据:

{
 "database":"gmall-211126-flink",
 "table":"base_trademark",
 "xid" : 188,
 "commit" : true,
 "type":"update",
 "ts":1652499295,
 "data":{
     "id":1,
     "tm_name":"三星",
     "icon_url":"/bbb/cccc"
     },
 "old":{
     "tm_name":"小米"
     }
}

        所以,我们需要考虑如何设计表结构?这里的 ts 字段并没有必要保留(ts 是 Maxwell 给加的,而且业务表中一般都有 create_time 和 operator_time 等字段),还有 xid、commit 等字段也不需要(都是 Maxwell 加的,对后面分析没有用);

最后我们的表结构大概就是这样:

CREATE TABLE topic_db(
    `database` STRING,
    `table` STRING,
    `type` STRING,
    `data` MAP<STRING,STRING>,
    `old` MAP<STRING,STRING>,
    `pt` AS PROCTIME()
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic_db',
    'properties.bootstrap.servers' = 'hadoop102:9092',
    'properties,group.id' = 'cartAdd',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
)

对于这里面的 data 和 old 是 JSON 对象类型,为了考虑到以后可以直接操作而不需要再做转换,我们可以考虑使用 Flink SQL 中的复杂数据类型 Map;

此外,我们还增加了一个时间字段 pt,因为后面我们需要做 lookup join,它需要我们提供一个处理时间字段;

1.2.2、topic_db 表实现

首先,Flink Table API 的执行环境有一点变化:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); // 生产环境中设置为kafka主题的分区数
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

其它代码比如设置检查点和状态后端这里省略;

然后,我们在之前的 MyKafkaUtil 中再封装两个方法用来创建建表语句:

     /**
     * Kafka-Source DDL 的配置信息
     *
     * @param topic   数据源主题
     * @param groupId 消费者组
     * @return with 配置信息
     */
    public static String getKafkaDDL(String topic, String groupId) {
        return " with ('connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " +
                " 'properties.group.id' = '" + groupId + "', " +
                " 'format' = 'json', " +
                " 'scan.startup.mode' = 'group-offsets')";
    }

    /**
     * topic_db 建表语句
     * @param groupId 消费者组
     * @return 完整的建表语句
     */
    public static String getTopicDb(String groupId) {
        return "CREATE TABLE topic_db(\n" +
                "    `database` STRING,\n" +
                "    `table` STRING,\n" +
                "    `type` STRING,\n" +
                "    `data` MAP<STRING,STRING>,\n" +
                "    `old` MAP<STRING,STRING>,\n" +
                "    `pt` AS PROCTIME()\n" +
                ")" + getKafkaDDL("topic_db",groupId);
    }

执行 DDL :

        // TODO 2. 使用 DDL 方式读取 topic_db 创建表
        tableEnv.executeSql(MyKafkaUtil.getTopicDb("cart_add"));

        这样,我们的 topic_db 表就通过 Kafka 连接器和 Table API 创建完成了,之后我们就可以直接使用 Flink SQL 去查询我们想要的表的变更数据了;

1.2.3、过滤出加购数据

业务表中加购只两种情况,在 Maxwell 传递过来的数据中体现出来就是:

  • type 字段为 insert
    • date 中的 sku_num 即为该次加购操作的 sku_num
  • type 字段值为 update 且 data[sku_num] > old[sku_num]
    • data[sku_num] - old[sku_num] 即为该次加购操作的 sku_num
SELECT
     data['id'] id,
     data['user_id'] user_id,
     data['sku_id'] sku_id,
     data['cart_price'] cart_price,
     if(`type`='insert',`data`['sku_num'],CAST(CAST(`data`['sku_num'] AS INT) - CAST(`old`['sku_num'] AS INT)) AS STRING)) sku_num,
     data['sku_name'] sku_name,
     data['is_checked'] is_checked,
     data['create_time'] create_time,
     data['operate_time'] operate_time,
     data['is_ordered'] is_ordered,
     data['order_time'] order_time,
     data['source_id'] source_id,
     data['source_type'] source_type,
     pt
FROM
     topic_db
WHERE
     `database` = 'gmall'
AND
     `table` = 'cart_info'
AND
     `type` = 'insert'
OR (
        `type` = 'update'
        AND
            'old'['sku_num'] is not null
        AND
            CAST('old'['sku_num'] AS INT) < CAST('data'['sku_num'] AS INT)
    )

        这样,我们就可以从 topic_db 中读取到加购数据了,这些字段全部来自于 cart_info,每行数据代表一个加购操作;

        // TODO 3. 过滤加购数据
        Table cartAddTable = tableEnv.sqlQuery("SELECT\n" +
        "     data['id'] id,\n" +
        "     data['user_id'] ,\n" +
        "     data['sku_id'],\n" +
        "     data['cart_price'],\n" +
        "     if(`type`='insert',`data`['sku_num'],CAST(CAST(`data`['sku_num'] AS INT) - CAST(`old`['sku_num'] AS INT)) AS STRING)) sku_num,\n" +
        "     data['sku_name'] sku_name,\n" +
        "     data['is_checked'] is_checked,\n" +
        "     data['create_time'] create_time,\n" +
        "     data['operate_time'] operate_time,\n" +
        "     data['is_ordered'] is_ordered,\n" +
        "     data['order_time'] order_time,\n" +
        "     data['source_id'] source_id,\n" +
        "     data['source_type'] source_type,\n" +
        "     pt\n" +
        "FROM\n" +
        "     topic_db\n" +
        "WHERE\n" +
        "     `database` = 'gmall'\n" +
        "AND\n" +
        "     `table` = 'cart_info'\n" +
        "AND\n" +
        "     `type` = 'insert'\n" +
        "OR (\n" +
        "        `type` = 'update'\n" +
        "        AND\n" +
        "            'old'['sku_num'] is not null\n" +
        "        AND\n" +
        "            CAST('old'['sku_num'] AS INT) < CAST('data'['sku_num'] AS INT)\n" +
        "    )");
        tableEnv.createTemporaryView("cart_info_table",cartAddTable);

这里我们把提取出来的加购表创建成了一张临时表,这样下面我们才能用 FlinkSQL 对它和其他表进行关联; 

1.2.4、创建 lookup 表

        我们需要对字典表(base_dic)创建一张 lookup 表,用于和事实表去做 join。这里我们直接抽象出一个工具类,毕竟之后还有可能有其它表会和事实表去关联:

public class MysqlUtil {

    public static String getBaseDicLookUpDDL() {
        return "create table `base_dic`(\n" +
                "`dic_code` string,\n" +
                "`dic_name` string,\n" +
                "`parent_code` string,\n" +
                "`create_time` timestamp,\n" +
                "`operate_time` timestamp,\n" +
                "primary key(`dic_code`) not enforced\n" +
                ")" + MysqlUtil.mysqlLookUpTableDDL("base_dic");
    }

    public static String mysqlLookUpTableDDL(String tableName) {
        return "WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'url' = 'jdbc:mysql://hadoop102:3306/gmall',\n" +
                "'table-name' = '" + tableName + "',\n" +
                "'lookup.cache.max-rows' = '10',\n" +
                "'lookup.cache.ttl' = '1 hour',\n" +
                "'username' = 'root',\n" +
                "'password' = '123456',\n" +
                "'driver' = 'com.mysql.cj.jdbc.Driver'\n" +
                ")";
    }
}
        // TODO 4. 读取 MySQL 的 base_dic 作为 lookup 表
        tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());

这样,业务系统中字典表的 lookup 表就创建好了;

1.2.5、关联加购事实表和字典表

用 SQL 关联加购事实表和字典表,为的就是对加购来源(比如广告/用户查询/促销活动/只能推荐)做一个维度退化:

SEELCT
        ci.id,
        ci.user_id,
        ci.sku_id,
        ci.cart_price,
        ci.sku_num,
        ci.sku_name,
        ci.is_checked,
        ci.create_time,
        ci.operate_time,
        ci.is_ordered,
        ci.order_time,
        dic.dic_name source_type_name,
        ci.source_id,
        dic.dic_code source_type_id
FROM
    cart_info ci
JOIN
    base_dic dic FOR SYSTEM_TOME AS OF ci.pt AS dic
ON
    ci.source_type = dic.dic_code

关联后的表我们同样创建一张 Flink 临时表:

tableEnv.createTemporaryView("cart_with_dic_table",cartAddWithDicTable);

1.2.6、创建 DWD 层加购事务事实表

        数据现在已经有了,就是上面关联后的 cart_with_dic_table,现在我们需要创建 DWD 层的加购事务事实表:

注意:这张表最终是要写入到 Kafka 的 dwd_trade_cart_add 主题中的,所以我们需要在建表的时候带上配置信息(只需要指定 topic,不需要 groupId,因为我们是作为生产者向主题中写入数据)。

在 MyKafkaUtil 中添加方法:

public static String getKafkaSinkDDL(String topic) {
        return "WITH ( " +
                "  'connector' = 'kafka', " +
                "  'topic' = '" + topic + "', " +
                "  'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " +
                "  'format' = 'json' " +
                ")";
    }

创建 dwd_trade_cart_add 表:

// TODO 6. 使用 DDL 方式创建加购事实表
        // 前面创建的表是查询结果(Flink表,不包含配置信息比如Kafka主题等),
        // 这张表是要写入到Kafka主题中,所以只需要指定主题(指定主题就意味着写入该表的数据被写入到Kafka),
        // 这张表是Flink表它不会被消费,所以不需要指定消费者组
        tableEnv.executeSql("CREATE TABLE dwd_trade_cart_add(\n" +
                "        `id` STRING,\n" +
                "        `user_id` STRING,\n" +
                "        `sku_id` STRING,\n" +
                "        `cart_price` STRING,\n" +
                "        `sku_num` STRING,\n" +
                "        `sku_name` STRING,\n" +
                "        `is_checked` STRING,\n" +
                "        `create_time` STRING,\n" +
                "        `operate_time` STRING,\n" +
                "        `is_ordered` STRING,\n" +
                "        `order_time` STRING,\n" +
                "        `source_type_name` STRING,\n" +
                "        `source_id` STRING,\n" +
                "        `source_type_id` STRING\n" +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_cart_add"));

 1.2.7、写出数据

这一步我们只需要直接把关联后的临时表数据写入到上面的 dwd_trade_cart_add 表中即可:


        // TODO 7. 写出数据到 Kafka
        tableEnv.executeSql("INSERT INTO dwd_trade_cart_add SELECT * FROM cart_with_dic_table");

        // TODO 8. 启动任务
        env.execute("DwdTradeCartAdd");

2、交易域订单预处理

在离线数仓中,我们针对订单这一类型创建的是累积快照事实表:

        累计快照事实表是基于一个业务流程(区别于业务过程,业务过程指的是一个业务的原子操作,而业务流程是由多个有关联的业务过程组成的)中的多个关键业务过程联合处理而构建的事实表,如交易流程中的下单、支付、发货、确认收货业务过程。

        累积型快照事实表通常具有多个日期字段,每个日期对应业务流程中的一个关键业务过程(里程碑)比如下面的 下单日期  -> 支付日期  -> 发货日期 -> 收货日期。

        这里,我们经过分析,订单明细表和取消订单明细表的数据来源、表结构都相同,差别只在业务过程和过滤条件,为了减少重复计算(重复关联多张表的数据),我们可以将两张表公共的关联过程提取出来,形成订单预处理表。

        关联订单明细表订单表订单明细活动关联表订单明细优惠券关联表四张事实业务表和字典表(维度业务表)形成订单预处理表,写入 Kafka 对应主题。

        预处理表中要保留订单表的 type 和 old 字段,用于过滤订单明细数据和取消订单明细数据。

注意,预处理表在写出的时候不能再使用之前普通的 Kafka 连接器了,而是应该使用 Upsert Kafka 连接器(支持回撤流); 

2.1、实现思路

        先把大致的框架列出来,很多内容比如创建 topic_db 表和 base_dic 的 lookup 表可以直接复用:

public class DwdTradeOrderPreProcess {
    public static void main(String[] args) throws Exception {
        // TODO 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 生产环境中设置为kafka主题的分区数
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1.1 开启checkpoint
        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次

        // 1.2 设置状态后端
        env.setStateBackend(new HashMapStateBackend());

        // TODO 2. 使用 DDL 方式读取 topic_db 创建表
        tableEnv.executeSql(MyKafkaUtil.getTopicDb("order_pre_process"));

        // TODO 3. 过滤 订单明细表

        // TODO 4. 过滤 订单表
        // TODO 5. 过滤 订单明细活动关联表
        // TODO 6. 过滤 订单明细优惠券关联表
        // TODO 7. 创建 base_dic 的 lookup 表
        tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());
        // TODO 8. 关联这 5 张表
        // TODO 9. 创建 upsert-kafka 表
        // TODO 10. 数据写出
        // TODO 11. 启动任务
        env.execute("DwdTradeOrderPreProcess");
    }

2.1.1、过滤订单明细表

注意:订单明细表不同于订单表,订单表只有新增(type 只有 insert)!但是订单明细表会有订单的修改(type 有 insert,也有 update);

除了明显用不到的比如 img_url 等字段,别的以防万一全部保留(此外,还有 topic_db 的 pt 字段也需要查出来,后面要和 lookup 表 join 使用): 

        // TODO 3. 过滤 订单明细表
        Table orderDetailTable = tableEnv.sqlQuery("SELECT\n" +
                "        data['id'] id,\n" +
                "        data['user_id'] user_id,\n" +
                "        data['sku_id'] sku_id,\n" +
                "        data['sku_name'] sku_name,\n" +
                "        data['order_price'] order_price,\n" +
                "        data['sku_num'] sku_num,\n" +
                "        data['create_time'] create_time,\n" +
                "        data['source_type'] source_type,\n" +
                "        data['source_id'] source_id,\n" +
                "        data['split_total_amount'] split_total_amount,\n" +
                "        data['split_activity_amount'] split_activity_amount,\n" +
                "        data['split_coupon_amount'] split_coupon_amount,\n" +
                "        pt\n" +
                "FROM\n" +
                "      topic_db\n" +
                "WHERE\n" +
                "      `database` = 'gmall'\n" +
                "AND\n" +
                "      `table` = 'order_detail'\n");
        tableEnv.createTemporaryView("order_detail_table",orderDetailTable);

2.1.2、过滤订单表

        订单表粒度大,所以没有明细的数据(商品的增删改),但是订单表中有订单的状态变化!所以我们还应该保存 type 和 old 字段,方便我们识别订单状态的变化:

        // TODO 4. 过滤 订单表
        Table orderInfoTable = tableEnv.sqlQuery("SELECT\n" +
                "        data['id'] id,\n" +
                "        data['user_id'] user_id,\n" +
                "        data['province_id'] province_id,\n" +
                "        data['order_status'] order_status,\n" +
                "        data['operate_time'] operate_time,\n" +
                "        `type`,\n" +
                "        `old`,\n" +
                "        ts\n" +
                "FROM\n" +
                "      topic_db\n" +
                "WHERE\n" +
                "      `database` = 'gmall'\n" +
                "AND\n" +
                "      `table` = 'order_info'\n" +
                "AND (`type` = 'insert' OR `type` = 'update')");
        tableEnv.createTemporaryView("order_info_table",orderInfoTable);

2.1.3、过滤订单明细活动关联表

订单明细活动关联表只有新增,所以这里不需要对 type 进行过滤;

        // TODO 5. 过滤 订单明细活动关联表
        Table orderActivityTable = tableEnv.sqlQuery("SELECT\n" +
                "        data['order_detail_id'] order_detail_id,\n" +
                "        data['activity_id'] activity_id,\n" +
                "        data['activity_rule_id'] activity_rule_id\n" +
                "FROM\n" +
                "      topic_db\n" +
                "WHERE\n" +
                "      `database` = 'gmall'\n" +
                "AND\n" +
                "      `table` = 'order_detail_activity'");
        tableEnv.createTemporaryView("order_activity_table",orderActivityTable);

2.1.4、过滤订单明细优惠券表

同样订单明细优惠券表只有新增,不需要过滤 type:

Table orderDetailCoupon = tableEnv.sqlQuery("select\n" +
                "data['order_detail_id'] order_detail_id,\n" +
                "data['coupon_id'] coupon_id\n" +
                "from `topic_db`\n" +
                "where `table` = 'order_detail_coupon'\n");
        tableEnv.createTemporaryView("order_detail_coupon", orderDetailCoupon);

2.1.5、关联 5 张表

        // TODO 8. 关联这 5 张表
        Table resultTable = tableEnv.sqlQuery("SELECT \n" +
                "od.id,\n" +
                "od.order_id,\n" +
                "oi.user_id,\n" +
                "oi.order_status,\n" +
                "od.sku_id,\n" +
                "od.sku_name,\n" +
                "oi.province_id,\n" +
                "oa.activity_id,\n" +
                "oa.activity_rule_id,\n" +
                "oc.coupon_id,\n" +
                "date_format(od.create_time, 'yyyy-MM-dd') date_id,\n" +
                "od.create_time,\n" +
                "date_format(oi.operate_time, 'yyyy-MM-dd') operate_date_id,\n" +
                "oi.operate_time,\n" +
                "od.source_id,\n" +
                "od.source_type source_type_id,\n" +
                "dic.dic_name source_type_name,\n" +
                "od.sku_num,\n" +
                "od.split_original_amount,\n" +
                "od.split_activity_amount,\n" +
                "od.split_coupon_amount,\n" +
                "od.split_total_amount,\n" +
                "oi.`type`,\n" +
                "oi.`old`,\n" +
                "od.od_ts,\n" +
                "oi.oi_ts,\n" +
                "current_row_timestamp() row_op_ts\n" +
                "FROM\n" +
                "    order_detail_table od\n" +
                "JOIN\n" +
                "    order_info_table oi\n" +
                "ON\n" +
                "    od.order_id = oi.id\n" +
                "LEFT JOIN\n" +
                "    order_activity_table oa\n" +
                "ON\n" +
                "    oa.order_detail_id = od.id\n" +
                "LEFT JOIN\n" +
                "    order_coupon_table oc\n" +
                "ON\n" +
                "    oa.order_detail_id = oc.id\n" +
                "JOIN\n" +
                "    base_dic FOR SYSTEM_TIME AS OF od.pt AS dic\n" +
                "ON\n" +
                "    od.source_type = dic.dic_code");
        tableEnv.createTemporaryView("result_table", resultTable);

2.1.6、创建 Upsert Kafka 表

        实时数据流在 left join 的过程中很可能出现数据的撤回,而之前我们使用的普通 Kafka 连接器是仅追加模式的,如果想要将有更新操作的结果表写入到 Kafka,就需要使用 Upsert Kafka 连接器了:

在 MyKafkaUtil 中创建方法(用来给建表时,添加 Kafka 参数):

public static String getUpsertKafkaDDL(String topic) {
        return "WITH ( " +
                "  'connector' = 'upsert-kafka', " +
                "  'topic' = '" + topic + "', " +
                "  'properties.bootstrap.servers' = '" + KAFKA_SERVER + "', " +
                "  'key.format' = 'json', " +
                "  'value.format' = 'json' " +
                ")";
    }

创建表格,同样相当于是生产者,需要配置 Kafka 参数(topic,k,v类型): 

        // TODO 9. 建立 Upsert-Kafka dwd_trade_order_pre_process 表
        tableEnv.executeSql("" +
                "create table dwd_trade_order_pre_process(\n" +
                "id string,\n" +
                "order_id string,\n" +
                "user_id string,\n" +
                "order_status string,\n" +
                "sku_id string,\n" +
                "sku_name string,\n" +
                "province_id string,\n" +
                "activity_id string,\n" +
                "activity_rule_id string,\n" +
                "coupon_id string,\n" +
                "date_id string,\n" +
                "create_time string,\n" +
                "operate_date_id string,\n" +
                "operate_time string,\n" +
                "source_id string,\n" +
                "source_type string,\n" +
                "source_type_name string,\n" +
                "sku_num string,\n" +
                "split_original_amount string,\n" +
                "split_activity_amount string,\n" +
                "split_coupon_amount string,\n" +
                "split_total_amount string,\n" +
                "`type` string,\n" +
                "`old` map<string,string>,\n" +
                "od_ts string,\n" +
                "oi_ts string,\n" +
                "row_op_ts timestamp_ltz(3),\n" +
                "primary key(id) not enforced\n" +
                ")" + MyKafkaUtil.getUpsertKafkaDDL("dwd_trade_order_pre_process"));

2.1.7、写出到 Kafka

        // TODO 10. 数据写出
        tableEnv.executeSql("INSERT INTO dwd_trade_order_pre_process SELECT * FROM result_table");

        // TODO 11. 启动任务
        env.execute("DwdTradeOrderPreProcess");

总结

今天创建了 1 张事实表和一张辅助表(优化之后查询订单相关表的查询效率)

对于交易域加购事务事实表来说,需要根据公司业务逻辑来确定逻辑,比如有的公司的加购操作就是新增或者更新操作(更新商品的件数),但是有的公司的加购操作都是新增;

        我们创建了 topic_db 这张包含了全部业务系统 binlog 的数据表,并从中过滤出出了购物车相关的数据(type=insert 或者 update 的)。得到加购数据之后,我们需要对加购表中的加购来源字段(source_id)进行维度退化处理,这就需要去关联 base_dic 字典表。于是我们创建了 base_dic 的 lookup 表;

对于交易域订单预处理表,它并不是一个事实表,我们创建它是因为:和订单相关的业务过程(下单和取消订单)都需要从 订单明细表、订单表、订单明细活动关联表,订单明细优惠券关联表等去做关联,所以为了减少重复计算,我们可以这两张表的公共关联部分提取出来;创建的预处理表需要保留订单表的 type 和 old 字段(因为订单表的粒度是一个订单,它的变化就是订单状态的变化),所以为了方便后面过滤订单明细数据和取消订单明细数据(根据 old 中 order_status 字段的值和数据中 order_status 值的变化进行判断),

        和离线数仓一样,我们最终得到的预处理表的粒度也是最细粒度(订单明细表的粒度),一行代表一个商品(因为我们把订单明细表作为主表,并且和订单表进行了 join),同时具有丰富的维度属性(对订单活动、订单优惠券表进行了 left join);

        最终得到的预处理表中,包含订单表的订单id、省份id、用户id、订单状态、操作时间、type 和 old 等字段;活动表的活动id和活动规则(维度退化);优惠券表的优惠券id;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1975579.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

oracle数据库监控数据库中某个表是否正常生成

事情经过&#xff1a; 公司某业务系统每月25日0点会自动生成下个月的表&#xff0c;表名字是tabname_202407的格式。由于7月25日0点做系统保养的时候重启了应用系统服务&#xff0c;导致8月份的表没有生成。最终操作业务影响&#xff0c;为此决定对这个表进行监控&#xff0c;…

Depth Anything——强大的单目深度估计模型

概述 单目深度估计&#xff08;Monocular Depth Estimation, MDE&#xff09;是一项在计算机视觉领域中非常重要的技术&#xff0c;它旨在从单张图像中恢复出场景的三维结构。这项技术对于机器人导航、自动驾驶汽车、增强现实&#xff08;AR&#xff09;和虚拟现实&#xff08…

DVWA(SQL注入)medium、high

medium &#xff08;1&#xff09;判断注入是字符型还是数值型 数值型&#xff0c;获得了用户信息。 id 1 or 11 &#xff08;2&#xff09;查询字段数 为3时报错&#xff0c;代表字段数为2。 1 order by 3 &#xff08;3&#xff09;显示字段顺序 1 union select 1,2 &…

大数据与人工智能:数据隐私与安全的挑战_ai 和 数据隐私

前言 1.背景介绍 随着人工智能(AI)和大数据技术的不断发展&#xff0c;我们的生活、工作和社会都在不断变化。这些技术为我们提供了许多好处&#xff0c;但同时也带来了一系列挑战&#xff0c;其中数据隐私和安全是最为关键的之一。数据隐私和安全问题的出现&#xff0c;主要…

Redis7.x安装系列教程(一)单机部署

1、前言&环境准备说明 本文及接下来3篇将详细介绍在linux环境Redis7.X源码安装系列教程&#xff0c;从最简单的单机部署开始&#xff0c;逐步升级主从部署、哨兵部署和集群部署。 环境准备&#xff1a;如果有条件的用云服务器&#xff0c;如果没有的使用VMware 虚拟机&am…

【精通Redis】Redis事务

文章目录 前言一、标准事务1.1 标准事务的特性1.2 标准事务的生命周期1.3 事务的作用 二、Redis事务2.1 Redis事务的特性2.2 Redis事务与普通事务的区别 三、Redis事务常用命令总结 前言 我们在使用Redis的时候&#xff0c;有时为了处理多个结构&#xff0c;需要向Redis中一次…

Python数据结构实战:列表、字典与集合的高效使用

前言 在编程中&#xff0c;选择合适的数据结构对于提高程序效率至关重要。本文将介绍Python中最常用的数据结构——列表&#xff08;list&#xff09;、字典&#xff08;dict&#xff09;和集合&#xff08;set&#xff09;&#xff0c;并探讨它们的内部实现以及如何高效地使用…

The operation was rejected by your operating system. code CERT_HAS_EXPIRED报错解决

各种报错&#xff0c;试了清缓存&#xff0c;使用管理员权限打开命令行工具&#xff0c;更新npm&#xff0c;都不好使 最终解决&#xff1a;删除 c:/user/admin/ .npmrc

我的最爱之《达明一派》

达明一派&#xff0c;是我最爱。刘以达(Tats)与黄耀明(Anthony Wong)在1980年代的香港组成的二人流行音乐组合&#xff0c;在90年代&#xff0c;网络还没兴起时&#xff0c;那是卡带流行的岁月。90年代&#xff0c;我与好友&#xff0c;同考大学&#xff0c;他留在了南充读读书…

世媒讯带您了解什么是媒体邀约

什么是媒体邀约&#xff1f;其实媒体邀约是一种公关策略&#xff0c;旨在通过邀请媒体记者和编辑参加特定的活动、发布会或其他重要事件&#xff0c;以确保这些活动能够得到广泛的报道和关注。通过这种方式&#xff0c;企业和组织希望能够传达重要信息&#xff0c;提高品牌知名…

网络监控软件的作用是什么|企业用的六款网络监控软件

网络监控软件是干什么的呢&#xff1f;它是用来管理网络安全的&#xff0c;尤其是对于企业而言至关重要&#xff0c;下面我为你推荐六款知名的网络监控软件。 1. 安企神 功能特点&#xff1a; 全面监控&#xff1a;提供电脑屏幕监控、文件操作监控、聊天记录监控等功能&#…

全开源图床系统源码

一款专为个人需求设计的高效图床解决方案&#xff0c;集成了强大的图片压缩功能与优雅的前台后台管理界面。 项目结构精简高效&#xff0c;提供自定义图片压缩率与尺寸设置&#xff0c;有效降低存储与带宽成本。 支持上传JPEG、PNG、GIF格式图片并转换为WEBP格式&#xff0c;…

算法:BFS 解决多源最短路问题

目录 多源最短路 题目一&#xff1a;矩阵 题目二&#xff1a;飞地的数量 题目三&#xff1a;地图中的最高点 题目四&#xff1a;地图分析 多源最短路 首先想要知道多源最短路&#xff0c;就先要明白单源最短路&#xff0c;bfs解决单源最短路问题前面学习过&#xff0c;单…

leetcode-二叉树oj题1(共三道)--c语言

目录 a. 二叉树的概念以及实现参照博客&#xff1a; 一、三道题的oj链接 二、每题讲解 1.单值二叉树 a. 题目&#xff1a; b. 题目所给代码 c. 思路 d. 代码&#xff1a; 2. 相同的树 a. 题目 b. 题目所给代码 c. 思路 d. 代码 3. 二叉树的前序遍历 a. 题目 b.…

J029_UDP通信

一、需求描述 实现UDP的通信 1.1 一发一收 1.1.1 ClientTest1 package com.itheima.udp;import java.net.*;import static java.net.InetAddress.*;//完成udp通信快速入门&#xff0c;实现一收一发 public class ClientTest1 {public static void main(String[] args) thro…

递归 35

方法递归 递归算法 package File;public class digui {public static void main(String[] args) {//猴子吃桃//f(10)1//f(n)-f(n)/2—1f&#xff08;n1&#xff09;//f(n)F(n1)2System.out.println(f(3));}public static int f(int n){if (n10){return 1;}else {return 2*f(n1)…

MEME币热潮结束了?上市成功率仅1.4%!迷因暴富梦醒?洗量超容易,热潮都是假?

近年来&#xff0c;随着加密货币行业的蓬勃发展&#xff0c;各种迷因币(meme coins)也在此浪潮之中纷纷崛起。然而&#xff0c;在专门用于创造迷因币的平台"pump.fun"上&#xff0c;绝大多数迷因币都无法真正成功发行和上市。 最新的数据显示&#xff0c;近日Solana迷…

golang国内proxy设置

go env -w GOPROXYhttps://goproxy.cn,direct经常使用的两个, goproxy.cn 和 goproxy.io 连接分别是 https://goproxy.cn https://goproxy.io 如果遇到某些包下载不下来的情况&#xff0c;可尝试更换数据源 更推荐使用https://goproxy.cn 速度快&#xff0c;缓存的包多 提醒…

树莓派5进行YOLOv8部署的4种不同部署方式检测速度对比:pytorch、onnx、ncnn、tflite

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

关于k8s集群的资源发布方式(灰度/滚动发布)

目录 1.常见的发布方式 2.实现蓝绿发布 3.实现金丝雀发布&#xff08;Canary Release&#xff09; 4.声明式管理方法 1.常见的发布方式 蓝绿发布:两套环境交替升级&#xff0c;旧版本保留一定时间便于回滚优点&#xff1a;用户无感知&#xff0c;部署和回滚速度较快&#…