Flink实时数仓(六)【DWD 层搭建(四)交易域、互动域、用户域实现】

news2025/1/17 13:56:32

前言

        今天的任务是完成 DWD 层剩余的事实表;今年的秋招开得比往年早,所以要抓紧时间了,据了解,今年的 hc 还是不多,要是晚点投铁定寄中寄了;

        今天还是个周末,不过记忆里我好像整个大学都没有好好放松过过一个周末,毕竟只有周末的空教室才是最多的,可以抢到带插座的位置;emm... 等一切尘埃落定一定好好放松放松~

1、交易域下单事务事实表

        昨天我们创建了订单与处理表(把订单明细、订单、订单明细活动、订单明细优惠券等订单相关和表和 MySQL lookup 字典表关联到了一起),就是为了之后做关于订单的事实表的时候不用再去频繁关联造成重复计算;

1.1、实现思路

  • 从 Kafka dwd_trade_order_pre_process 主题读取订单预处理数据
  • 筛选下单明细数据:新增数据(type = 'insert')
  • 写入 Kafka 下单明细主题

1.2、代码实现

1.2.1、读取订单与处理表

这里只需要保留有用的数据,其它字段(比如 old 、order_status 这些字段这里完全没用,是给取消订单表用的)直接丢弃掉 

// TODO 2. 读取订单预处理表
        tableEnv.executeSql("create table dwd_trade_order_pre_process(" +
                "id ," +
                "order_id ," +
                "user_id ," +
                "sku_id ," +
                "sku_name ," +
                "province_id ," +
                "activity_id ," +
                "activity_rule_id ," +
                "coupon_id ," +
                "date_id ," +
                "create_time ," +
                "operate_date_id ," +
                "operate_time ," +
                "source_id ," +
                "source_type_id ," +
                "source_type_name ," +
                "sku_num ," +
                "split_original_amount ," +
                "split_activity_amount ," +
                "split_coupon_amount ," +
                "split_total_amount ," +
                "`type`" +
                ")" + MyKafkaUtil.getKafkaDDL("dwd_trade_order_pre_process", "trade_detail"));

1.2.2、过滤出下单数据

订单预处理表的粒度是商品,所以一旦有订单被下单,这里新增记录数 = 订单内商品件数

// TODO 3. 过滤出下单数据,即新增数据
        Table filterTable = tableEnv.sqlQuery("SELECT " +
                "id ," +
                "order_id ," +
                "user_id ," +
                "sku_id ," +
                "sku_name ," +
                "sku_num ," +
                "province_id ," +
                "activity_id ," +
                "activity_rule_id ," +
                "coupon_id ," +
                "create_time ," +
                "operate_date_id ," +
                "operate_time ," +
                "source_id ," +
                "source_type_id ," +
                "source_type_name ," +
                "split_activity_amount ," +
                "split_coupon_amount ," +
                "split_total_amount" +
                ") " +
                "FROM dwd_trade_order_pre_process " +
                "WHERE `type`='insert'"
        );
        tableEnv.createTemporaryView("filter_table",filterTable);

1.2.3、创建下单数据表

// TODO 4. 创建 dwd 层下单数据表
        tableEnv.executeSql("CREATE TABLE dwd_trade_order_detail (" +
                "        id STRING," +
                "        order_id STRING," +
                "        user_id STRING," +
                "        sku_id STRING," +
                "        sku_name STRING," +
                "        sku_num STRING," +
                "        province_id STRING," +
                "        activity_id STRING," +
                "        activity_rule_id STRING," +
                "        coupon_id STRING," +
                "        create_time STRING," +
                "        operate_date_id STRING," +
                "        operate_time STRING," +
                "        source_id STRING," +
                "        source_type_id STRING," +
                "        source_type_name STRING," +
                "        split_activity_amount STRING," +
                "        split_coupon_amount STRING," +
                "        split_total_amount STRING" +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_order_detail"));

1.2.4、将数据写出到 Kafka

// TODO 5. 将数据写出到 kafka
        tableEnv.executeSql("INSERT INTO dwd_trade_order_detail SELECT * FROM filter_table");

2、交易域取消订单事务事实表

这个需求也非常简单

 思路

  • 读取预处理表(上面的下单表不需要保留 old、order_status字段,但是这里必须保留)
  • 过滤出取消订单数据
    • type = 'update'
    • order_status = '1003'
    • old['order_status'] is not null(这个条件可有可无)
  • 写出到 Kafka
public class DwdTradeCancelDetail {
    public static void main(String[] args) throws Exception {
        // TODO 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 生产环境中设置为kafka主题的分区数
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1.1 开启checkpoint
        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次

        // 1.2 设置状态后端
        env.setStateBackend(new HashMapStateBackend());

        // TODO 2. 读取订单预处理表
        tableEnv.executeSql("create table dwd_trade_order_pre_process(" +
                "id ," +
                "order_id ," +
                "user_id ," +
                "order_status ," +
                "sku_id ," +
                "sku_name ," +
                "province_id ," +
                "activity_id ," +
                "activity_rule_id ," +
                "coupon_id ," +
                "date_id ," +
                "create_time ," +
                "operate_date_id ," +
                "operate_time ," +
                "source_id ," +
                "source_type_id ," +
                "source_type_name ," +
                "sku_num ," +
                "split_original_amount ," +
                "split_activity_amount ," +
                "split_coupon_amount ," +
                "split_total_amount ," +
                "`type` ," +
                "`old` " +
                ")" + MyKafkaUtil.getKafkaDDL("dwd_trade_order_pre_process", "trade_detail"));

        // TODO 3. 过滤出取消订单的数据
        Table filterTable = tableEnv.sqlQuery("" +
                "SELECT" +
                "       id ," +
                "       order_id ," +
                "       user_id ," +
                "       order_status ," +
                "       sku_id ," +
                "       sku_name ," +
                "       province_id ," +
                "       activity_id ," +
                "       activity_rule_id ," +
                "       coupon_id ," +
                "       date_id ," +
                "       create_time ," +
                "       operate_date_id ," +
                "       operate_time ," +
                "       source_id ," +
                "       source_type_id ," +
                "       source_type_name ," +
                "       sku_num ," +
                "       split_original_amount ," +
                "       split_activity_amount ," +
                "       split_coupon_amount ," +
                "       split_total_amount ," +
                "       `type` ," +
                "       `old` " +
                "FROM dwd_trade_order_pre_process " +
                "WHERE `type`='update' " +
                "old['order_status'] is not null" +
                "AND order_status='1003'"
        );
        tableEnv.createTemporaryView("filter_table",filterTable);

        // TODO 4. 创建 dwd 层取消订单表 
        tableEnv.executeSql("CREATE TABLE dwd_trade_cancel_detail" +
                "id STRING," +
                "order_id STRING," +
                "user_id STRING," +
                "order_status STRING," +
                "sku_id STRING," +
                "sku_name STRING," +
                "province_id STRING," +
                "activity_id STRING," +
                "activity_rule_id STRING," +
                "coupon_id STRING," +
                "date_id STRING," +
                "create_time STRING," +
                "operate_date_id STRING," +
                "operate_time STRING," +
                "source_id STRING," +
                "source_type_id STRING," +
                "source_type_name STRING," +
                "sku_num STRING," +
                "split_original_amount STRING," +
                "split_activity_amount STRING," +
                "split_coupon_amount STRING," +
                "split_total_amount STRING," +
                "`type` STRING," +
                "`old` map<String,String> " + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_cancel_detail")
        );
        
        // TODO 5. 写出数据 
        tableEnv.executeSql("INSERT INTO dwd_trade_cancel_detail SELECT * FROM filter_table");
        // TODO 6. 启动任务
        env.execute("DwdTradeCancelDetail");
    }
}

3、交易域支付成功事务事实表

        支付成功表的主表并不是订单表或者订单明细表,而是支付表,在业务系统中存在支付表(payment_info),每行代表一个订单的支付情况(成功或失败)。

        但是我们应该拿它去和下单明细表做关联,因为我们如果之后需要对支付记录做一个数据分析,就应该尽可能保证支付事实表应该有一个细的粒度和丰富的维度,而关联下单明细表之后不仅粒度商品,还增加了很多sku维度(比如商品类型、)

3.1、实现思路

1)设置 ttl

        支付成功事务事实表需要将业务数据库中的支付信息表 payment_info 数据与订单明细表关联。订单明细数据是在下单时生成的,经过一系列的处理进入订单明细主题,通常支付操作在下单后 15min 内完成即可,因此,支付明细数据可能比订单明细数据滞后 15min。考虑到可能的乱序问题,ttl 设置为 15min + 5s。

2)获取订单明细数据

        用户必然要先下单才有可能支付成功,因此支付成功明细数据集必然是订单明细数据集的子集,所以我们直接从 dwd 层的下单明细表去读取;

3)筛选支付表数据

        获取支付类型、回调时间(支付成功时间)、支付成功时间戳。

        生产环境下,用户支付后,业务数据库的支付表会插入一条数据,此时的回调时间和回调内容为空。通常底层会调用第三方支付接口,接口会返回回调信息,如果支付成功则回调信息不为空,此时会更新支付表,补全回调时间和回调内容字段的值,并将 payment_status 字段的值修改为支付成功对应的状态码(本项目为 1602)。支付成功之后,支付表数据不会发生变化。因此,只要操作类型为 update 且状态码为 1602 即为支付成功数据。

由上述分析可知,支付成功对应的业务数据库变化日志应满足两个条件:

(1)payment_status 字段的值为 1602;

(2)操作类型为 update。

4)构建 MySQL-LookUp 字典表

        为的是对支付方式字段做维度退化;

5)关联上述三张表形成支付成功宽表,写入 Kafka 支付成功主题

        支付成功业务过程的最细粒度为一个 sku 的支付成功记录,payment_info 表的粒度与最细粒度相同,将其作为主表。

(1) payment_info 表在订单明细表中必然存在对应数据,主表不存在独有数据,因此通过内连接与订单明细表关联;

(2) 与字典表的关联是为了获取 payment_type 对应的支付类型名称,主表不存在独有数据,通过内连接与字典表关联。下文与字典表的关联同理,不再赘述。

3.2、代码实现

3.2.1、读取 topic_db 数据

 注意:这个程序中我们需要消费三张表,其中两张来组 Kafka 需要我们指定消费者组,所以我们这里可以把它们的组设置为一样的,毕竟都在一个程序里:

// TODO 2. 读取 topic_db 数据(这里的消费者组id尽量和下面保持一致)
        tableEnv.executeSql(MyKafkaUtil.getTopicDb("dwd_trade_pay_detail_suc"));

3.2.2、过滤出支付成功数据

// TODO 3. 过滤出支付成功的数据
        Table paymentInfo = tableEnv.sqlQuery("SELECT " +
                "data['user_id'] user_id, " +
                "data['order_id'] order_id, " +
                "data['payment_type'] payment_type, " +
                "data['callback_time'] callback_time, " +
                "pt " + // 用来和 lookup 关联
                "FROM topic_db " +
                "WHERE `database` = `gmall` " +
                "AND `table` = `payment_info` " +
                "AND `type` = 'update' " +
                "AND data['payment_status'] = '1602'"
        );
        tableEnv.createTemporaryView("payment_info",paymentInfo);

3.2.3、消费下单明细事务事实表

// TODO 3. 过滤出支付成功的数据
        Table paymentInfo = tableEnv.sqlQuery("SELECT " +
                "data['user_id'] user_id, " +
                "data['order_id'] order_id, " +
                "data['payment_type'] payment_type, " +
                "data['callback_time'] callback_time, " +
                "pt " + // 用来和 lookup 关联
                "FROM topic_db " +
                "WHERE `database` = `gmall` " +
                "AND `table` = `payment_info` " +
                "AND `type` = 'update' " +
                "AND data['payment_status'] = '1602'"
        );
        tableEnv.createTemporaryView("payment_info",paymentInfo);

3.2.4、读取 MySQL lookup 表

// TODO 5. 读取 mysql lookup 表
        tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());

3.2.5、关联 3 张表

关联支付成功数据、下单明细和字典表:

        // TODO 6. 关联 3 张表
        Table resultTable = tableEnv.sqlQuery("SELECT " +
                "       od.id order_detail_id," +
                "       od.order_id," +
                "       od.user_id," +
                "       od.sku_id," +
                "       od.sku_name," +
                "       od.province_id," +
                "       od.activity_id," +
                "       od.activity_rule_id," +
                "       od.coupon_id," +
                "       pi.payment_type payment_type_code," +
                "       dic.dic_name payment_type_name," +
                "       pi.callback_time," +
                "       od.source_id," +
                "       od.source_type_code," +
                "       od.source_type_name," +
                "       od.sku_num," +
                "       od.split_activity_amount," +
                "       od.split_coupon_amount," +
                "       od.split_total_amount split_payment_amount " +
                "FROM payment_info pi" +
                "join dwd_trade_order_detail od" +
                "on pi.order_id = od.order_id" +
                "join `base_dic` for system_time as of pi.proc_time as dic" +
                "on pi.payment_type = dic.dic_code ");
        tableEnv.createTemporaryView("result_table",resultTable);

3.2.6、创建 Kafka 支付成功事务事实表并写入数据

// TODO 7. 创建 kafka 支付成功表
        tableEnv.executeSql(
                "CREATE TABLE dwd_trade_pay_detail_suc (" +
                "       order_detail_id string," +
                "       order_id string," +
                "       user_id string," +
                "       sku_id string," +
                "       sku_name string," +
                "       province_id string," +
                "       activity_id string," +
                "       activity_rule_id string," +
                "       coupon_id string," +
                "       payment_type_code string," +
                "       payment_type_name string," +
                "       callback_time string," +
                "       source_id string," +
                "       source_type_code string," +
                "       source_type_name string," +
                "       sku_num string," +
                "       split_activity_amount string," +
                "       split_coupon_amount string," +
                "       split_payment_amount string," +
                "       primary key(order_detail_id) not enforced "
                + MyKafkaUtil.getUpsertKafkaDDL("dwd_trade_pay_detail_suc")
        );

这里加主键是为了方便让相同 key 到一个分区中,方便下游做去重(也可以不加);

最后的 env 不需要执行,因为我们执行的是 tableEnv ,如果执行 env 的话会有警告:没有操作可执行;

4、交易域退单事务事实表

退单指的是支付成功后取消订单的操作,而取消订单指的是还没支付时取消订单的操作;

4.1、思路

        退单并不需要关联下单事务事实表,因为退单表的粒度本身就是商品(退单表包含 sku_id,有了这个 sku_id 足够之后去DIM关联 sku 维表了);我们这里需要关联订单表,为的是从中获得一些分析维度(比如 province_id,我认为还可以添加活动id和优惠券id等维度外键)。

退单操作会影响到订单表(order_status 从 1002['已支付'] 变为 1005['退款中']),所以我们可以在过滤订单表的时候,同时过滤出退单的数据,减少性能的消耗;

注意:这里我们关联的是订单表,因为退单肯定会触发修改订单表,它俩是几乎同时发生的,所以我们可以一次性从 topic_db 中读取出来。也没必要去订单预处理表去读(订单预处理表还依赖订单表,所以根本没必要)。

所以,大致思路就是:

  • 创建 topic_db 表
  • 过滤出订单表中的退单数据和退单表
    • 退单表过滤(table = 'order_refund_info')
    • 订单表中的退单数据(order_status='1005' & type='update' & order_status字段发生更新)
  • 创建 mysql 字典表的 lookup 表(获得退款类型和退款原因类型)
  • 对 3 张表进行关联
  • 创建 kafak dwd_trade_order_refund 表
  • 写入数据

4.2、实现

4.2.1、创建 topic_db 表并过滤出退单数据

// TODO 2. 读取 topic_db 数据
        tableEnv.executeSql(MyKafkaUtil.getTopicDb("dwd_trade_order_refund"));

        // TODO 3. 过滤出退单表
        Table refundTable = tableEnv.sqlQuery("SELECT " +
                "data['d'] id, " +
                "data['user_id'] user_id, " +
                "data['order_id'] order_id, " +
                "data['sku_id'] sku_id, " +
                "data['refund_type'] refund_type, " +
                "data['refund_num'] refund_num, " +
                "data['refund_amount'] refund_amount, " +
                "data['refund_reason_type'] refund_reason_type, " +
                "data['refund_reason_txt'] refund_reason_txt, " +
                "data['create_time'] create_time, " +
                "pt " +
                "FROM topic_db " +
                "WHERE `database`='gmall' " +
                "AND `table`='order_refund_info' "
        );
        tableEnv.createTemporaryView("order_refund_info",refundTable);

4.2.2、过滤出订单表中的退单数据

// TODO 4. 过滤出订单表中的退单数据
        Table orderInfoRefund = tableEnv.sqlQuery("SELECT " +
                "data['id'] id," +
                "data['province_id'] province_id," +
                "`old` " +
                "FROM topic_db " +
                "WHERE `database`='gmall' " +
                "AND `table`='order_info ' " +
                "AND `type` = 'update' " +
                "AND order_status = '1005' " +
                "AND old['order_status'] is not null"
        );
        tableEnv.createTemporaryView("order_info_refund",orderInfoRefund);

4.2.3、创建 MySQL lookup 退单表

为的是获得退款类型和退款原因类型

// TODO 5. 读取 mysql lookup 表
        tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());

4.2.4、关联 3 张表的数据

// TODO 6. 关联 3 张表
        Table resultTable = tableEnv.sqlQuery("SELECT " +
                "ri.id," +
                "ri.user_id," +
                "ri.order_id," +
                "ri.sku_id," +
                "oi.province_id," +
                "date_format(ri.create_time,'yyyy-MM-dd') date_id," +
                "ri.create_time," +
                "ri.refund_type," +
                "type_dic.dic_name," +
                "ri.refund_reason_type," +
                "reason_dic.dic_name," +
                "ri.refund_reason_txt," +
                "ri.refund_num," +
                "ri.refund_amount " +
                "from order_refund_info ri" +
                "join " +
                "order_info_refund oi" +
                "on ri.order_id = oi.id" +
                "join " +
                "base_dic for system_time as of ri.proc_time as type_dic" +
                "on ri.refund_type = type_dic.dic_code" +
                "join" +
                "base_dic for system_time as of ri.proc_time as reason_dic" +
                "on ri.refund_reason_type=reason_dic.dic_code"
        );
        tableEnv.createTemporaryView("result_table",resultTable);

4.2.5、创建退单事务事实表并写入数据

// TODO 7. 创建 Kafka 退单事务事实表
        tableEnv.executeSql("CREATE TABLE dwd_trade_order_refund (" +
                "id string," +
                "user_id string," +
                "order_id string," +
                "sku_id string," +
                "province_id string," +
                "date_id string," +
                "create_time string," +
                "refund_type_code string," +
                "refund_type_name string," +
                "refund_reason_type_code string," +
                "refund_reason_type_name string," +
                "refund_reason_txt string," +
                "refund_num string," +
                "refund_amount string," +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_order_refund")
        );

        // TODO 8. 写入数据
        tableEnv.executeSql("INSERT INTO dwd_trade_order_refund SELECT * FROM result_table");

5、交易域退款成功事务事实表

退款成功这个业务过程会影响到退款表(插入数据)、订单表(订单状态)和退单表(退款状态)

主要任务

  • 从退款表中提取退款成功的数据,关联字典表对支付类型做维度退化
  • 从订单表中提取退款成功的订单数据
  • 从退单表中提取退款成功的商品明细数据

补充

  • 退款表的粒度同样是商品(包含 sku_id)
  • 关联订单表为的是获取(province_id 和 user_id)
  • 关联退单表是为的获得商品的 sku_num
  • 关联字典表是为了对退款表的支付类型做维度退化

5.1、思路

思路和前面都是一样的:

  • 创建 topic_db 表
  • 过滤出退款表中退款成功的数据
    • refund_status = '0705'
  • 过滤出订单表和退单表中退款成功的数据
    • 订单表:order_status = '1006'
    • 退单表:redund_status = '0705'
  • 创建字典表的 lookup 表(为的是对支付类型字段做维度退化)
  • 关联 4 张表
    • 根据 order_id 进行 join
  • 创建 Kafka 主题并写入数据

5.2、实现

一样的套路,不解释了

public class DwdTradeRefundPaySuc {

    public static void main(String[] args) {
        // TODO 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 生产环境中设置为kafka主题的分区数
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1.1 开启checkpoint
        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次

        // 1.2 设置状态后端
        env.setStateBackend(new HashMapStateBackend());

        // 1.3 设置状态 ttl
        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));

        // TODO 2. 读取 topic_db 数据
        tableEnv.executeSql(MyKafkaUtil.getTopicDb("dwd_trade_refund_pay_suc"));

        // TODO 3. 过滤出退款表
        Table refundPayment = tableEnv.sqlQuery("SELECT " +
                "data['id'] id," +
                "data['order_id'] order_id," +
                "data['sku_id'] sku_id," +
                "data['payment_type'] payment_type," +
                "data['callback_time'] callback_time," +
                "data['total_amount'] total_amount," +
                "pt " +
                "FROM topic_db " +
                "WHERE `database`='gmall' " +
                "AND `table` = 'refund_payment' " +
                "AND data['refund_status']='0705' " +
                "AND `type` = 'update' " +
                "AND old['refund_status'] is not null"
        );
        tableEnv.createTemporaryView("refund_payment",refundPayment);

        // TODO 4. 过滤出订单表中退款成功的数据
        Table orderRefundSuc = tableEnv.sqlQuery("SELECT " +
                "data['id'] id, " +
                "data['user_id'] user_id, " +
                "data['province_id'] province_id, " +
                "pt " +
                "FROM topic_db " +
                "WHERE `database`='gmall' " +
                "AND `table`='order_info' " +
                "AND `type` = 'update'" +
                "AND data['order_status'] = '1006' " +
                "AND old['order_status'] is not null"
        );
        tableEnv.createTemporaryView("order_refund_suc",orderRefundSuc);

        // TODO 5. 过滤出退单表中退款成功的数据
        Table refundSuc = tableEnv.sqlQuery("SELECT " +
                "data['order_id'] order_id, " +
                "data['sku_id'] sku_id, " +
                "data['refund_num'] refund_num, " +
                "pt " +
                "FROM topic_db " +
                "WHERE `database`='gmall' " +
                "AND `table`='order_refund_info' " +
                "AND `type` = 'update'" +
                "AND data['order_status'] = '0705' " +
                "AND old['refund_status'] is not null"
        );
        tableEnv.createTemporaryView("order_refund_info",refundSuc);

        // TODO 6. 创建 MySQL lookup表
        tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());


        // TODO 7. 关联 4 张表
        Table resultTable = tableEnv.sqlQuery("select" +
                "rp.id," +
                "oi.user_id," +
                "rp.order_id," +
                "rp.sku_id," +
                "oi.province_id," +
                "rp.payment_type," +
                "dic.dic_name payment_type_name," +
                "date_format(rp.callback_time,'yyyy-MM-dd') date_id," +
                "rp.callback_time," +
                "ri.refund_num," +
                "rp.total_amount," +
                "from refund_payment rp " +
                "join " +
                "order_info oi" +
                "on rp.order_id = oi.id" +
                "join" +
                "order_refund_info ri" +
                "on rp.order_id = ri.order_id" +
                "and rp.sku_id = ri.sku_id" +
                "join " +
                "base_dic for system_time as of rp.proc_time as dic" +
                "on rp.payment_type = dic.dic_code");
        tableEnv.createTemporaryView("result_table", resultTable);

        // TODO 8. 创建 Kafka 退款成功事务事实表
        tableEnv.executeSql("create table dwd_trade_refund_pay_suc(" +
                "id string," +
                "user_id string," +
                "order_id string," +
                "sku_id string," +
                "province_id string," +
                "payment_type_code string," +
                "payment_type_name string," +
                "date_id string," +
                "callback_time string," +
                "refund_num string," +
                "refund_amount string "+
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_refund_pay_suc"));

        // TODO 8. 写入数据
        tableEnv.executeSql("INSERT INTO dwd_trade_refund_pay_suc SELECT * FROM result_table");
    }
}

6、工具域优惠券类事务事实表

我们先看看业务系统中 coupon_use 中的数据:

        这张表的粒度是一个订单,每行数据代表的哪个用户在什么时候领取的券,什么时候使用了券( using_time 表示使用时间,used_time 表示支付时间),什么时候过期等信息;

6.1、思路

        每发生优惠券领取这样一个业务过程,coupon_use 数据库中就会 insert 一条新的数据,所以我们只需要直接从 topic_db 中过滤出 type = 'insert' 的数据即可;

        而关于优惠券下单使用(下单)和优惠券使用(支付)这两个业务过程除了过滤出 type = 'update' 之外,只需要分别对 using_time(下单) 和 used_time(支付) 是否为 null 进行过滤即可;

总结

  • 优惠券领取
    • type = ''insert'
  • 优惠券使用(下单)
    • type = 'update' AND using_time is not null AND used_time is null
    • 或者 type = 'update' AND data[coupon_status] = '1402' AND old[coupon_status] = '1401'
  • 优惠券使用(支付)
    • type = 'update' AND used_time is not null
    • 或者 type = 'update' AND data[coupon_status] = '1402' AND old[coupon_status] = '1401'

6.2、实现

6.2.1、工具域优惠券领取事务事实表

很简单,不多废话了

public class DwdToolCouponGet {
public static void main(String[] args) throws Exception {

        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // TODO 2. 状态后端设置
        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, Time.days(1), Time.minutes(1)
        ));
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(
                "hdfs://hadoop102:8020/ck"
        );
        System.setProperty("HADOOP_USER_NAME", "lyh");

        // TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表
        tableEnv.executeSql("create table `topic_db`(" +
                "`database` string," +
                "`table` string," +
                "`data` map<string, string>," +
                "`type` string," +
                "`ts` string" +
                ")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_tool_coupon_get"));

        // TODO 4. 读取优惠券领用数据,封装为表
        Table resultTable = tableEnv.sqlQuery("select" +
                "data['id']," +
                "data['coupon_id']," +
                "data['user_id']," +
                "date_format(data['get_time'],'yyyy-MM-dd') date_id," +
                "data['get_time']," +
                "ts" +
                "from topic_db" +
                "where `table` = 'coupon_use'" +
                "and `type` = 'insert'");
        tableEnv.createTemporaryView("result_table", resultTable);

        // TODO 5. 建立 Kafka-Connector dwd_tool_coupon_get 表
        tableEnv.executeSql("create table dwd_tool_coupon_get (" +
                "id string," +
                "coupon_id string," +
                "user_id string," +
                "date_id string," +
                "get_time string," +
                "ts string" +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_tool_coupon_get"));

        // TODO 6. 将数据写入 Kafka-Connector 表
        tableEnv.executeSql("insert into dwd_tool_coupon_get select * from result_table");
    }
}

6.2.2、工具域优惠券使用(下单)事务事实表

public class DwdToolCouponOrder {
    public static void main(String[] args) throws Exception {

        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // TODO 2. 状态后端设置
        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, Time.days(1), Time.minutes(1)
        ));
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(
                "hdfs://hadoop102:8020/ck"
        );
        System.setProperty("HADOOP_USER_NAME", "lyh");

        // TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表
        tableEnv.executeSql("create table `topic_db` (" +
                "`database` string," +
                "`table` string," +
                "`data` map<string, string>," +
                "`type` string," +
                "`old` map<string, string>," +
                "`ts` string" +
                ")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_tool_coupon_order"));

        // TODO 4. 读取优惠券领用表数据,筛选满足条件的优惠券下单数据
        Table couponUseOrder = tableEnv.sqlQuery("select" +
                "data['id'] id," +
                "data['coupon_id'] coupon_id," +
                "data['user_id'] user_id," +
                "data['order_id'] order_id," +
                "date_format(data['using_time'],'yyyy-MM-dd') date_id," +
                "data['using_time'] using_time," +
                "ts" +
                "from topic_db" +
                "where `table` = 'coupon_use'" +
                "and `type` = 'update'" +
                "and data['coupon_status'] = '1402'" +
                "and `old`['coupon_status'] = '1401'");

        tableEnv.createTemporaryView("result_table", couponUseOrder);

        // TODO 5. 建立 Kafka-Connector dwd_tool_coupon_order 表
        tableEnv.executeSql("create table dwd_tool_coupon_order(" +
                "id string," +
                "coupon_id string," +
                "user_id string," +
                "order_id string," +
                "date_id string," +
                "order_time string," +
                "ts string" +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_tool_coupon_order"));

        // TODO 6. 将数据写入 Kafka-Connector 表
        tableEnv.executeSql("" +
                "insert into dwd_tool_coupon_order select " +
                "id," +
                "coupon_id," +
                "user_id," +
                "order_id," +
                "date_id," +
                "using_time order_time," +
                "ts from result_table");
    }
}

6.2.3、工具域优惠券使用(支付)事务事实表

public class DwdToolCouponPay {
    public static void main(String[] args) throws Exception {

        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // TODO 2. 状态后端设置
        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, Time.days(1), Time.minutes(1)
        ));
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(
                "hdfs://hadoop102:8020/ck"
        );
        System.setProperty("HADOOP_USER_NAME", "lyh");

        // TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表
        tableEnv.executeSql("create table `topic_db` (" +
                "`database` string," +
                "`table` string," +
                "`data` map<string, string>," +
                "`type` string," +
                "`old` string," +
                "`ts` string" +
                ")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_tool_coupon_pay"));

        // TODO 4. 读取优惠券领用表数据,筛选优惠券使用(支付)数据
        Table couponUsePay = tableEnv.sqlQuery("select" +
                "data['id'] id," +
                "data['coupon_id'] coupon_id," +
                "data['user_id'] user_id," +
                "data['order_id'] order_id," +
                "date_format(data['used_time'],'yyyy-MM-dd') date_id," +
                "data['used_time'] used_time," +
                "`old`," +
                "ts" +
                "from topic_db" +
                "where `table` = 'coupon_use'" +
                "and `type` = 'update'" +
                "and data['used_time'] is not null");

        tableEnv.createTemporaryView("coupon_use_pay", couponUsePay);

        // TODO 5. 建立 Kafka-Connector dwd_tool_coupon_order 表
        tableEnv.executeSql("create table dwd_tool_coupon_pay(" +
                "id string," +
                "coupon_id string," +
                "user_id string," +
                "order_id string," +
                "date_id string," +
                "payment_time string," +
                "ts string" +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_tool_coupon_pay"));

        // TODO 6. 将数据写入 Kafka-Connector 表
        tableEnv.executeSql("" +
                "insert into dwd_tool_coupon_pay select " +
                "id," +
                "coupon_id," +
                "user_id," +
                "order_id," +
                "date_id," +
                "used_time payment_time," +
                "ts from coupon_use_pay");
    }
}

7、互动域

7.1、思路分析

7.2、实现

7.2.1、互动域收藏商品事务事实表

 我可以看一下业务系统中的收藏表:

        可以看到,收藏表的粒度是商品,每收藏一件商品 favor_info 表就会 insert 一条数据,每取消收藏一件商品,并不会删除记录,而是将收藏表的 is_cancel 字段设为 1 并给 cancel_time 字段补充值;所以过滤条件很简单:

  1. insert 
  2. update 并且 is_cancel = 1 (取消收藏后又再次收藏)
public class DwdInteractionFavorAdd {
public static void main(String[] args) throws Exception {

        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // TODO 2. 状态后端设置
        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, Time.days(1), Time.minutes(1)
        ));
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(
                "hdfs://hadoop102:8020/ck"
        );
        System.setProperty("HADOOP_USER_NAME", "lyh");

        // TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表
        tableEnv.executeSql("create table topic_db(" +
                "`database` string," +
                "`table` string," +
                "`type` string," +
                "`data` map<string, string>," +
                "`ts` string" +
                ")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_interaction_favor_add"));

        // TODO 4. 读取收藏表数据
        Table favorInfo = tableEnv.sqlQuery("select" +
                "data['id'] id," +
                "data['user_id'] user_id," +
                "data['sku_id'] sku_id," +
                "date_format(data['create_time'],'yyyy-MM-dd') date_id," +
                "data['create_time'] create_time," +
                "ts" +
                "from topic_db" +
                "where `table` = 'favor_info'" +
                "and (`type` = 'insert' or (`type` = 'insert' and data['is_cancel'] = '1'))");
        tableEnv.createTemporaryView("favor_info", favorInfo);

        // TODO 5. 创建 Kafka-Connector dwd_interaction_favor_add 表
        tableEnv.executeSql("create table dwd_interaction_favor_add (" +
                "id string," +
                "user_id string," +
                "sku_id string," +
                "date_id string," +
                "create_time string," +
                "ts string" +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_interaction_favor_add"));

        // TODO 6. 将数据写入 Kafka-Connector 表
        tableEnv.executeSql("" +
                "insert into dwd_interaction_favor_add select * from favor_info");
    }
}

7.2.2、互动域评价事务事实表

任务:建立 MySQL-Lookup 字典表,读取评论表数据,关联字典表以获取评价(好评、中评、差评、自动),将结果写入 Kafka 评价主题

所以这张表的逻辑页很简单:

  • 从 topic_db 中过滤出评价表的数据
  • 从评价表中过滤出 type='insert' 的数据(其实也只有 insert 类型了)
  • 创建 mysql lookup 字典表(为的是对评价等级进行维度退化)
  • 关联两张表
  • 写入Kafka
public class DwdInteractionComment {
public static void main(String[] args) throws Exception {

        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 获取配置对象
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        // 为表关联时状态中存储的数据设置过期时间
        configuration.setString("table.exec.state.ttl", "5 s");

        // TODO 2. 状态后端设置
        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, Time.days(1), Time.minutes(1)
        ));
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(
                "hdfs://hadoop102:8020/ck"
        );
        System.setProperty("HADOOP_USER_NAME", "lyh");

        // TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表
        tableEnv.executeSql("create table topic_db(" +
                "`database` string," +
                "`table` string," +
                "`type` string," +
                "`data` map<string, string>," +
                "`proc_time` as PROCTIME()," +
                "`ts` string" +
                ")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_interaction_comment"));

        // TODO 4. 读取评论表数据
        Table commentInfo = tableEnv.sqlQuery("select" +
                "data['id'] id," +
                "data['user_id'] user_id," +
                "data['sku_id'] sku_id," +
                "data['order_id'] order_id," +
                "data['create_time'] create_time," +
                "data['appraise'] appraise," +
                "proc_time," +
                "ts" +
                "from topic_db" +
                "where `table` = 'comment_info'" +
                "and `type` = 'insert'");
        tableEnv.createTemporaryView("comment_info", commentInfo);

        // TODO 5. 建立 MySQL-LookUp 字典表
        tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());

        // TODO 6. 关联两张表
        Table resultTable = tableEnv.sqlQuery("select" +
                "ci.id," +
                "ci.user_id," +
                "ci.sku_id," +
                "ci.order_id," +
                "date_format(ci.create_time,'yyyy-MM-dd') date_id," +
                "ci.create_time," +
                "ci.appraise," +
                "dic.dic_name," +
                "ts" +
                "from comment_info ci" +
                "join" +
                "base_dic for system_time as of ci.proc_time as dic" +
                "on ci.appraise = dic.dic_code");
        tableEnv.createTemporaryView("result_table", resultTable);

        // TODO 7. 建立 Kafka-Connector dwd_interaction_comment 表
        tableEnv.executeSql("create table dwd_interaction_comment(" +
                "id string," +
                "user_id string," +
                "sku_id string," +
                "order_id string," +
                "date_id string," +
                "create_time string," +
                "appraise_code string," +
                "appraise_name string," +
                "ts string" +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_interaction_comment"));

        // TODO 8. 将关联结果写入 Kafka-Connector 表
        tableEnv.executeSql("" +
                "insert into dwd_interaction_comment select * from result_table");
    }
}

7.2.3、用户域用户注册事务事实表

主要任务:读取用户表数据,获取注册时间,将用户注册信息写入 Kafka 用户注册主题

如何分辨是新用户还是老用户很简单:新 insert 进来的都是新用户,所以思路很简单:

  • 从 topic_db 中过滤出用户表的数据
  • 从用户表中过滤出 type = 'insert' 的数据
  • 写入 Kafka
public class DwdUserRegister {
public static void main(String[] args) throws Exception {

        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.getConfig().setLocalTimeZone(ZoneId.of("GMT+8"));

        // TODO 2. 启用状态后端
        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        env.setRestartStrategy(
                RestartStrategies.failureRateRestart(3, Time.days(1L), Time.minutes(3L))
        );
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck");
        System.setProperty("HADOOP_USER_NAME", "lyh");

        // TODO 3. 从 Kafka 读取业务数据,封装为 Flink SQL 表
        tableEnv.executeSql("create table topic_db(" +
                "`database` string," +
                "`table` string," +
                "`type` string," +
                "`data` map<string, string>," +
                "`ts` string" +
                ")" + MyKafkaUtil.getKafkaDDL("topic_db", "dwd_trade_order_detail"));

        // TODO 4. 读取用户表数据
        Table userInfo = tableEnv.sqlQuery("select" +
                "data['id'] user_id," +
                "data['create_time'] create_time," +
                "ts" +
                "from topic_db" +
                "where `table` = 'user_info'" +
                "and `type` = 'insert'");
        tableEnv.createTemporaryView("user_info", userInfo);

        // TODO 5. 创建 Kafka-Connector dwd_user_register 表
        tableEnv.executeSql("create table `dwd_user_register`(" +
                "`user_id` string," +
                "`date_id` string," +
                "`create_time` string," +
                "`ts` string" +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_user_register"));

        // TODO 6. 将输入写入 Kafka-Connector 表
        tableEnv.executeSql("insert into dwd_user_register" +
                "select " +
                "user_id," +
                "date_format(create_time, 'yyyy-MM-dd') date_id," +
                "create_time," +
                "ts" +
                "from user_info");

    }
}

总结

        至此,DWD 层搭建完毕,实时数仓的 DWD 层没有离线数仓那么多分类(周期快照、累积快照等),所以做起来虽然不是那么简单,但是有迹可循;

        明天开始就是 DWS 层的搭建了,终于可以用到 clickhouse 了,浅浅期待一下~

        这次做实时数仓项目比离线快很多,但是并不是囫囵吞枣,看视频三年了,是在水视频过任务还是自己真心想理解一个项目自己很清楚。这个实时数仓和之前学的离线数仓的数据源都是一样的,对这个模拟出来的业务系统比较熟悉了,而且离线和实时有很多共同点所以学的很快,但是毕竟时间也花费了不少,每天的早八晚八;

        很多人做项目也好,学新技术也好,看到几十个小时的时长就想着过任务一样,自己骗自己看过视频就算学会了,就算过去了,好像再也不用看了一样。前期基础不好好学,后期很难有那种"灵光乍现"、"触类旁通"的感觉,因为知识储备就不够。就像我自己每天说要看书(今天下单一本《乡土中国》),但是老想想着刷刷抖音,骗自己抖音的地摊文化更有营养,到头来现在在写作或者表达的时候还是经常词穷;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1979384.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何实现ElementUI表单项label的文字提示?

在Vue和ElementUI的丰富组件库中,定制化表单是常见的需求之一。那么如何在表单项label后添加文字提示,以提升用户体验呢? 首先我们来看一下效果图: 这里我们鼠标移动到❓图标上就会出现提示 在 ElementUI 中,el-form-item 组件允许使用 slot 自定义 label。通过在 el-fo…

Boost:asio网络编程从同步到异步

文章目录 同步服务器客户端服务端 异步服务器(有问题)异步服务器优化 在学TCP的时候&#xff0c;写的第一个服务器就是一个echo服务器&#xff0c;那在Boost网络编程中&#xff0c;自然也先写一个echo服务器练练手 同步服务器 客户端 #include <iostream> #include &l…

<数据集>BDD100K人车识别数据集<目标检测>

数据集格式&#xff1a;VOCYOLO格式 图片数量&#xff1a;15807张 标注数量(xml文件个数)&#xff1a;15807 标注数量(txt文件个数)&#xff1a;15807 标注类别数&#xff1a;7 标注类别名称&#xff1a; [pedestrian, car, bus, rider, motorcycle, truck, bicycle] 序号…

Java-变量,运算符,输入与输出

目录 一&#xff0c;语法基础 1.基本Java程序 2.语法基础 2.1 变量 2.2 常量限制(fiinal)类比C中的const 2.3 类型转化 2.4 运算符 2.5 表达式 2.5 输入与输出 2.5.1 输入 2.5.2 输出 一&#xff0c;语法基础 1.基本Java程序 public class Main{public static void…

差分放大电路

目录 引出 复合管 直接耦合放大电路 问题: 怎么抑制 初代电路(已引入负反馈之后) 分析 怎么解决 镜像电路 两个概念 分析直流通路: 分析交流电路: 差分放大电路的分析 交流通路 简化 H参数等效 可以得到 其他接法 引出 复合管 目的:获得更大的放大倍数 多只…

3个二创文案生成器,让文案创作变简单

在当今数字时代&#xff0c;内容创作已经成为了一项非常重要的工作。无论是为了推广产品、营销服务&#xff0c;还是仅仅为了吸引读者&#xff0c;优质的文案都是至关重要的。然而&#xff0c;对于许多人来说&#xff0c;写出令人印象深刻的文案并不容易。这就是为什么二创文案…

基于Django框架的挂号诊疗系统(源码+论文+部署讲解等)

博主介绍&#xff1a;✌全网粉丝10W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流✌ 技术栈介绍&#xff1a;我是程序员阿龙&#xff…

【优秀python系统案例】基于python Flask的电影票房数据爬取与可视化系统的设计与实现

第1章 绪论 1.1 研究背景及意义 进入21世纪&#xff0c;特别是第二个十年&#xff0c;世界互联网取得了惊人的快速发展。根据分析师玛丽米克尔 (Mary Mikel) 2016年发布的一份互联网趋势报告&#xff0c;到2016年年中&#xff0c;全球互联网用户超过30亿&#xff0c;约占全球…

职场上的人情世故,你知多少?

对于职场新人来说&#xff0c;在学习人情世故时&#xff0c;不仅要学会哪些事情该做&#xff0c;还需要知道哪些事情千万不能做&#xff0c;这样才能让自己起码不会得罪别人&#xff0c;甚至得到更多的晋升机会。学会下面的四大职场规则&#xff0c;能让你的职场生涯更顺利。 …

MUSE Multi-View Contrastive Learningfor Heterophilic Graphs

发表于:CIKM 推荐指数: #paper/⭐ 一句话总结:融合了GCN(A,X)和GCN(A,I),创新性不足,因此只能B会 流程: 融合部分: h i f h i s λ i h i c h_i^fh_i^s\lambda_ih_i^c hif​his​λi​hic​ 由于有n个 λ \lambda λ.因此作者加了如下优化: L ϕ ∑ i 1 N λ i s ( h i …

贪心算法之货仓选址问题

#include<stdio.h> #include<stdlib.h> #include<math.h>//贪心算法之货仓选址问题/*** void* p是万能指针&#xff0c;可以和其它任意类型的指针进行转换&#xff0c;前提是确保转换是合法的*/ //写好用于qsort的比较函数&#xff0c;这里写的函数一般用于…

【K8S】为什么需要Kubernetes?

文章目录 1 什么是Kubernetes&#xff1f;2 三种常见的应用部署方式2.1 传统部署2.2 虚拟化部署2.3 容器化部署 3 Kubernetes的特点写在最后 1 什么是Kubernetes&#xff1f; Kubernetes是 一个开源的&#xff0c;用于管理云平台中多个主机上的容器化应用&#xff0c;Kubernet…

分享6款有助于写论文能用到的软件app!

在学术写作中&#xff0c;选择合适的软件和工具可以大大提高效率和质量。以下是六款有助于写论文的软件app推荐&#xff0c;其中特别重点介绍千笔-AIPassPaPer这款AI原创论文写作平台。 1. 千笔-AIPassPaPer 千笔-AIPassPaPer是一款功能全面且高效的AI原创论文写作平台。它能…

RabbitMQ高级特性 - 非持久化 / 持久化(交换机、队列、消息)

文章目录 RabbitMQ 持久化机制概述实现非持久化&#xff08;交换机、队列、消息&#xff09;实现持久化&#xff08;交换机、队列、消息&#xff09; RabbitMQ 持久化机制 概述 前面讲到了 生产者消息确认机制 和 消费者消息确认机制&#xff0c;保证了消息传输的可靠性&#…

断电引起redo和数据文件不一致故障恢复---惜分飞

有些时候故障总是来的让人非常意外,这个在准备停机迁移数据库之前的几分钟由于某种原因直接导致主机掉电,再次开机数据库无法启动 Sat Aug 03 23:10:37 2024 Successful mount of redo thread 1, with mount id 3696805928 Database mounted in Exclusive Mode Lost write prot…

【考研高数】tan(arcsin x)、tan(arccos x)、sin(arccos x)、cos(arcsin x) 等于多少?

在做题的时候&#xff0c;我们可能会用到下面这几个式子的值&#xff0c;在这里&#xff0c;「荒原之梦考研数学」先给出结论&#xff0c;在这些结论的后面&#xff0c;也给同学们放上了具体的证明过程&#xff1a; tan ⁡ ( arcsin ⁡ x ) x 1 − x 2 \tan(\arcsin x) \frac…

深度分析AI大模型赋能后,内容生产行业的发展现状

国内首部由AI全流程参与制作的微短剧《补天》近日正式发布&#xff0c;从剧本构思、角色设定到场景设计以及后期剪辑&#xff0c;各个环节均深度融入了AI技术&#xff0c;展现了人工智能在内容创作方面的全面能力。 AI大模型的兴起正在深刻地改变着内容生产的方式&#xff0c;…

Animate软件基本概念:帧及关键帧

FlashASer&#xff1a;AdobeAnimate2021软件零基础入门教程https://zhuanlan.zhihu.com/p/633230084 FlashASer&#xff1a;实用的各种Adobe Animate软件教程https://zhuanlan.zhihu.com/p/675680471 FlashASer&#xff1a;Animate教程及作品源文件https://zhuanlan.zhihu.co…

VBA读取不带后缀名文本文件的方法(解决Unix文本文件在Windows下变成一行的读取)

VBA在操作Excel等Office软件方面有天然的优势&#xff0c;虽说现在Python的Pandas&#xff0c;openpyxl和Java的poi包都可以处理Excel文件&#xff0c;但有两个问题&#xff1a;首先&#xff0c;目标电脑上必须先按照Java或python环境&#xff0c;如果在一些机构内部处于安全原…

库存超卖问题解决方式

文章目录 超卖问题解决方式什么是库存超卖问题&#xff1f;乐观锁和悲观锁的定义超卖问题解决方式一、悲观锁1.jvm单机锁2.通过使用mysql的行锁&#xff0c;使用一个sql解决并发访问问题3.使用mysql的悲观锁解决4. 使用redis分布式锁来解决 二、乐观锁解决1.版本号2. CAS法&…