Flink 实时数仓(八)【DWS 层搭建(二)流量域、用户域、交易域搭建】

news2024/11/25 8:20:05

前言

        今天的任务是完成流量域最后一个需求、用户域的两个需求以及交易域的部分需求;

1、流量域页面浏览各窗口汇总表

任务:从 Kafka 页面日志主题读取数据,统计当日的首页和商品详情页独立访客数。

注意:一般我们谈到访客,指的是 mid;而用户才是 uid;

1.1、思路

  • 消费 Kafka dwd_traffic_page_log
  • 过滤出 page_id = 'home' 或 'good_detail' 的数据
  • 按照 mid 分组
  • 使用状态编程为每个 mid 维护两个状态:首页的末次访问日期,商品详情页的末次访问日期
    • 每新来一条数据就判断它的两个状态是否为 null
      • 如果为 null,则给状态赋值
      • 如果不为 null,则不做操作
    • 当两个状态中有一个不为 null 时,发送数据到下游
  • 开窗聚合(实时计算更新报表,这里开窗用的是 windowAll() ,因为上一步发送下来的数据已经不再是键控流了)
  • 写出到 clickhouse

1.2、实现

1.2.1、创建 ck 表并创建 Java Bean

首先创建 ck 表结构,和前面的表一样,主要的字段就是:维度 + 度量值 (这里没有粒度,因为我们统计的是一个宏观的统计结果信息,到 ADS 都不用加工),这里的 stt 和 edt 依然是作为 ck 表的 order by 字段防止数据重复;ts 字段作为 ck 的版本字段;这里 order by 字段取窗口起止时间,因为窗口是基于事件时间的,所以不用担心任务挂了之后重复消费造成数据重复的问题,ck 会自动根据 order by 字段进行去重;

create table if not exists dws_traffic_page_view_window
(
    stt               DateTime,
    edt               DateTime,
    home_uv_ct        UInt64,
    good_detail_uv_ct UInt64,
    ts                UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

 创建 ck 表对应的 JavaBean:

@Data
@AllArgsConstructor
public class TrafficHomeDetailPageViewBean {
    // 窗口起始时间
    String stt;

    // 窗口结束时间
    String edt;

    // 首页独立访客数
    Long homeUvCt;

    // 商品详情页独立访客数
    Long goodDetailUvCt;

    // 时间戳
    Long ts;
}

1.2.2、读取页面日志并过滤出首页与商品详情页

这里我们不仅要过滤还希望尽量顺便把数据转换为 JSONObject 格式,所以选用 flatMap 最为合适:

  • 过滤出 page_id 为 home 或者 good_detail 的数据
// TODO 3. 读取 dwd_traffic_page_log 的数据
        String groupId = "dws_traffic_page_view_window";
        DataStreamSource<String> pageLog = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_traffic_page_log", groupId));

        // TODO 4. 转为 json 并过滤出首页和商品详情页
        SingleOutputStreamOperator<JSONObject> filterDS = pageLog.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSON.parseObject(value);
                String page_id = jsonObject.getJSONObject("page").getString("page_id");
                if (page_id != null) {
                    if (page_id.equals("home") || page_id.equals("good_detail")) {
                        out.collect(jsonObject);
                    }
                }
            }
        });

1.2.3、提取事件时间并生成水位线

// TODO 5. 提取事件时间生成水位线
        filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject element, long recordTimestamp) {
                        return element.getLong("ts");
                    }
                })
        );

1.2.4、状态编程过滤出独立访客

这里使用富函数的 flatMap,因为富函数中才有 open(在 open 方法中初始化状态)、close等方法,以及获取上下文对象(通过上下文对象给状态描述器设置ttl并初始化)等高级操作;

这里 flatMap 的输出类型我们设置为之前写好的 ck 表对应的 JavaBean ,方便直接插入到 ck中;

这里我们同样可以给状态设置一个 TTL 防止长时间访客未访问状态存储浪费;这里两个状态任意一个不为 null 即可输出:

        // TODO 6. 状态编程(按照mid分组)过滤出独立访客
        KeyedStream<JSONObject, String> keyedStream = filterDS.keyBy(json -> json.getJSONObject("common").getString("mid"));
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> trafficHomeDetailDS = keyedStream.flatMap(new RichFlatMapFunction<JSONObject, TrafficHomeDetailPageViewBean>() {

            private ValueState<String> homeLastVisit;
            private ValueState<String> detailLastVisit;

            @Override
            public void open(Configuration parameters) throws Exception {

                StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.days(1))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();

                ValueStateDescriptor<String> homeStateDescriptor = new ValueStateDescriptor<>("home-state", String.class);
                ValueStateDescriptor<String> detailStateDescriptor = new ValueStateDescriptor<>("detail-state", String.class);

                // 设置 TTL
                homeStateDescriptor.enableTimeToLive(ttlConfig);
                detailStateDescriptor.enableTimeToLive(ttlConfig);

                homeLastVisit = getRuntimeContext().getState(homeStateDescriptor);
                detailLastVisit = getRuntimeContext().getState(detailStateDescriptor);
            }

            @Override
            public void flatMap(JSONObject value, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {

                // 获取状态数据以及当前数据中的日期
                String curDt = DateFormatUtil.toDate(value.getLong("ts"));
                String homeLastDt = homeLastVisit.value();
                String detailLastDt = detailLastVisit.value();

                long homeUvCt = 0;
                long goodDetailUvCt = 0;

                if (homeLastDt == null || !homeLastDt.equals(curDt)) {
                    homeUvCt = 1;
                    homeLastVisit.update(curDt);
                }
                if (detailLastDt == null || !detailLastDt.equals(curDt)) {
                    goodDetailUvCt = 1;
                    detailLastVisit.update(curDt);
                }

                if (homeUvCt == 1 || goodDetailUvCt == 1) {
                    out.collect(new TrafficHomeDetailPageViewBean("", "",
                            homeUvCt,
                            goodDetailUvCt,
                            value.getLong("ts")));
                }
            }
        });

1.2.5、开窗聚合并写入到 clickhouse

这里的窗口函数依旧是先用增量聚合函数,再用全量聚合函数(获得窗口信息);

注意:这里的 ts 字段是 clickhouse 表数据的版本字段,取系统时间即可;

// TODO 7. 开窗(windowAll聚合)聚合
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> resultDS = trafficHomeDetailDS.windowAll(
                TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))
        ).reduce(new ReduceFunction<TrafficHomeDetailPageViewBean>() {
            @Override
            public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
                value1.setHomeUvCt(value1.getHomeUvCt() + value2.getHomeUvCt());
                value1.setGoodDetailUvCt(value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt());
                return value1;
            }
        }, new AllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
            @Override
            public void apply(TimeWindow window, Iterable<TrafficHomeDetailPageViewBean> values, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
                TrafficHomeDetailPageViewBean next = values.iterator().next();
                next.setTs(System.currentTimeMillis());
                next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                out.collect(next);
            }
        });

        // TODO 8. 写入到 clickhouse
        resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_traffic_page_view_window values(?,?,?,?,?)"));

        // TODO 9. 启动任务
        env.execute("DwsTrafficPageViewWindow");

2、用户域用户登陆各窗口汇总表

任务:从 Kafka 页面日志主题读取数据,统计七日回流用户当日独立用户数

当日独立用户数很好求,和上面差不多,也是使用状态编程对 uid 保存状态去重即可。接下来我们主要分析七日回流用户怎么求:

2.1、思路分析

        回流用户定义:之前的活跃用户,一段时间未活跃(流失),今日又活跃了。这里要求统计回流用户总数,规定当日登陆,且自上次登陆之后至少 7 日未登录的用户为回流用户。

1、消费页面浏览主题(dwd_traffic_page_log)登录用户过滤

  • 用户打开应用自动登录(cookie)
    • uid != null && last_page_id = null (后面这个条件可以过滤掉没必要的数据)
  • 用户在登录页登录
    • uid != null && last_page_id = login

2、设置水位线、uid 分组之后进行状态编程

  • 判断 lastLoginDt 是否为 null
    • null:是今天的独立用户,但不是回流用户
    • !=null
      • 判断和今天是否相同
        • 相同:丢弃
        • 不同:是今天的独立用户,再判断今天-lastLoginDt >= 8?是回流用户:不是

2.2、代码实现

2.2.1、创建 ck 表并创建对应 JavaBean

这张表依然没有粒度,直接就是统计结果;我们去重的字段依然是窗口的起止时间: 

create table if not exists dws_user_user_login_window
(
    stt     DateTime,
    edt     DateTime,	
    back_ct UInt64,
    uu_ct   UInt64,
    ts      UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);
import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class UserLoginBean {
    // 窗口起始时间
    String stt;

    // 窗口终止时间
    String edt;

    // 回流用户数
    Long backCt;

    // 独立用户数
    Long uuCt;

    // 时间戳
    Long ts;
}

2.2.2、 消费 dwd_traffic_page_log 主题

// TODO 3. 读取 dwd_traffic_page_log 的数据
        String groupId = "dws_user_user_login_window";
        DataStreamSource<String> pageLog = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_traffic_page_log", groupId));

2.2.3、转换数据流为 JSON 格式并过滤出独立用户

// TODO 4. 转换为 json 格式 & 过滤出独立用户(uid!=null & last_page_id=null 或者 uid!=null & last_page_id=login)
        SingleOutputStreamOperator<JSONObject> filterDS = pageLog.flatMap(new RichFlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                JSONObject jsonObject = JSONObject.parseObject(value);
                String uid = jsonObject.getJSONObject("common").getString("uid");
                String lastPageId = jsonObject.getJSONObject("page").getString("last_page_id");

                if (uid != null) {
                    if (lastPageId == null || lastPageId.equals("login")) {
                        out.collect(jsonObject);
                    }
                }
            }
        });

2.2.4、提取事件时间生成水位线

        // TODO 5. 提取事件时间生成水位线
        filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject element, long recordTimestamp) {
                        return element.getLong("ts");
                    }
                })
        );

2.2.5、使用状态编程过滤出独立用户

// TODO 6. 状态编程过滤出独立用户
        KeyedStream<JSONObject, String> keyedStream = filterDS.keyBy(json -> json.getJSONObject("common").getString("uid"));
        SingleOutputStreamOperator<UserLoginBean> userLoginDS = keyedStream.flatMap(new RichFlatMapFunction<JSONObject, UserLoginBean>() {

            private ValueState<String> lastLoginDtState;

            @Override
            public void open(Configuration parameters) throws Exception {
                StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.days(7))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();

                ValueStateDescriptor<String> lastLoginStateDescriptor = new ValueStateDescriptor<String>("last-login", String.class);
                lastLoginStateDescriptor.enableTimeToLive(ttlConfig);
                lastLoginDtState = getIterationRuntimeContext().getState(lastLoginStateDescriptor);
            }

            @Override
            public void flatMap(JSONObject value, Collector<UserLoginBean> out) throws Exception {
                // 本次登录日期
                Long curTs = value.getLong("ts");
                String curDt = DateFormatUtil.toDate(curTs);
                // 上次登录日期
                String lastLoginDt = lastLoginDtState.value();

                long uuCt = 0L;
                long backCt = 0L;

                if (lastLoginDt == null) {
                    uuCt = 1;
                    lastLoginDtState.update(curDt);
                } else if (!lastLoginDt.equals(curDt)) {
                    uuCt = 1;
                    lastLoginDtState.update(curDt);
                    // 判断相差是否 >= 8 天
                    Long lastTs = DateFormatUtil.toTs(lastLoginDt);
                    long days = (curTs - lastTs) / 1000 / 3600 / 24;
                    backCt = days >= 8 ? 1 : 0;
                }

                if (uuCt != 0) {
                    out.collect(new UserLoginBean("", "", backCt, uuCt, curTs));
                }
            }
        });

2.2.6、窗口聚合

和上一个需求一样,增量聚合函数和全量聚合函数配合着使用;

// TODO 6. 窗口聚合
        SingleOutputStreamOperator<UserLoginBean> resultDS = userLoginDS.windowAll(
                TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))
        ).reduce((record1, record2) -> {
            record1.setUuCt(record1.getUuCt() + record2.getUuCt());
            record2.setBackCt(record1.getBackCt() + record2.getBackCt());
            return record1;
        }, new AllWindowFunction<UserLoginBean, UserLoginBean, TimeWindow>() {
            @Override
            public void apply(TimeWindow window, Iterable<UserLoginBean> values, Collector<UserLoginBean> out) throws Exception {
                UserLoginBean next = values.iterator().next();
                next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                next.setTs(System.currentTimeMillis());
                out.collect(next);
            }
        });

2.2.7、写出到 clickhouse 


        // TODO 7. 写入到 clickhouse
        resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_user_user_login_window values(?,?,?,?,?)"));

        // TODO 8. 启动任务
        env.execute("DwsUserUserLoginWindow");

3、用户域用户注册各窗口汇总表

任务:从 DWD 层用户注册表中读取数据,统计各窗口注册用户数,写入 ClickHouse。

这个需求比较简单,因为我们之前在 DWD 层已经创建了用户注册事务事实表(包含字段:user_id,date_id,create_time,ts)

3.1、代码实现

这里教程中用的是 DataStream API ,但是我这里想用 Flink SQL 实现:

3.1.1、创建 dwd_user_register 表并生成水位线

注意:当原表中有更贴近事件时间的字段时,我们就尽量少用 Maxwell 的 ts 字段!

// TODO 3. 消费 Kafka dwd_user_register 主题(生成水位线)
        String groupId = "dws_user_user_register_window";
        tableEnv.executeSql("CREATE TABLE dwd_user_register " +
                        "`user_id` string," +
                        "`date_id` string," +
                        "`create_time` string," +
                        "`ts` string" +
                        "time_ltz AS TO_TIMESTAMP(FROM_UNIXTIME(create_time/1000)), " +
                        "WATERMARK FOR time_ltz AS time_ltz - INTERVAL '2' SECOND " +
                        ")" + MyKafkaUtil.getKafkaDDL("dwd_user_register",groupId)
        );

3.1.2、分组开窗聚合

用 Flink SQL 实现就简单多了,这里的聚合逻辑更简单,直接 count(*):

// TODO 4. 分组,开窗,聚合
        Table resultTable = tableEnv.sqlQuery("SELECT " +
                "    date_format(tumble_start(time_ltz,interval '10' second),'yyyy-MM-dd HH:mm:ss') stt," +
                "    date_format(tumble_end(time_ltz,interval '10' second),'yyyy-MM-dd HH:mm:ss') edt," +
                "    count(*) register_ct," +
                "    unix_timestamp() ts" +
                "FROM dwd_user_register " +
                "GROUP BY tumble(time_ltz,interval '10' second)");
        tableEnv.createTemporaryView("result_table",resultTable);

3.1.3、创建 ck 表及其 Bean

create table if not exists dws_user_user_register_window
(
    stt         DateTime,
    edt         DateTime,
    register_ct UInt64,
    ts          UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

这里需要把动态表转为流,所以我们需要创建一个 Java Bean,对应上 ck 表的每个字段:

@Data
@AllArgsConstructor
public class UserRegisterBean {
    // 窗口起始时间
    String stt;
    // 窗口终止时间
    String edt;
    // 注册用户数
    Long registerCt;
    // 时间戳
    Long ts;
}

3.1.4、将动态表转为流并写入到 clickhouse

 // TODO 5. 将动态表转为流并写入到 clickhouse
        DataStream<UserRegisterBean> dataStream = tableEnv.toAppendStream(resultTable, UserRegisterBean.class);
        dataStream.addSink(ClickHouseUtil.getSinkFunction("insert into dws_user_user_register_window values (?,?,?,?)"));

        // TODO 6. 启动任务
        env.execute("DwsUserUserRegisterWindow");

4、交易域加购各窗口汇总表

任务:从 Kafka 读取用户加购明细数据,统计每日各窗口加购独立用户数,写入 ClickHouse。

4.1、思路分析

思路很简单,还是根据 uid 进行 keyby,然后使用状态编程维护一个 lastCartAddDate,对数据进行判断:

  • 如果 lastCartAddDate = null
    • 写入状态
  • 如果 lastCartAddDate != null
    • 如果 lastCartAddDate != curDate
      • 更新状态
    • 否则丢弃

4.2、代码实现

这里不多介绍,和前面的逻辑都是一样的,只说明部分点:

  • 我们在生成水位线的时候,应该尽可能的生成贴近事件时间的,而这里对于加购操作来说,它有两种情况:
    • insert:就是加购,会影响的到 create_time 字段
    • update:可能是加购,会影响到 operate_time 字段,我们在 DWD 层已经过滤过了:只要 sku_num 变大就是加购
  • 所以这里我们的水位线可以取 operate_time 字段,取不到再取 create_time
// TODO 3. 读取 dwd_traffic_card_add 的数据
        String groupId = "dws_trade_cart_add_uu_window";
        DataStreamSource<String> cartAddLog = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_cart_add", groupId));

        //TODO 4. 转为 json 格式并
        SingleOutputStreamOperator<JSONObject> jsonDS = cartAddLog.map(JSONObject::parseObject);

        // TODO 5. 提取事件时间生成水位线
        jsonDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject element, long recordTimestamp) {
                        String operate_time = element.getString("operate_time");
                        if (operate_time != null){
                            return DateFormatUtil.toTs(operate_time,true);
                        }
                        return DateFormatUtil.toTs(element.getString("create_time"));
                    }
                })
        );

        // TODO 6. 按照用户id进行分组 & 过滤出独立用户
        KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getJSONObject("common").getString("uid"));
        SingleOutputStreamOperator<CartAddUuBean> filterDS = keyedStream.flatMap(new RichFlatMapFunction<JSONObject, CartAddUuBean>() {

            private ValueState<String> lastCartAddDateState;

            @Override
            public void open(Configuration parameters) throws Exception {
                StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.days(1))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();

                ValueStateDescriptor<String> lastCartAddStateDescriptor = new ValueStateDescriptor<String>("last-cart-add", String.class);
                lastCartAddStateDescriptor.enableTimeToLive(ttlConfig);
                lastCartAddDateState = getRuntimeContext().getState(lastCartAddStateDescriptor);
            }

            @Override
            public void flatMap(JSONObject value, Collector<CartAddUuBean> out) throws Exception {
                // 当前的时间戳
                Long curTs = value.getLong("ts");
                String curDate = DateFormatUtil.toDate(curTs);
                String lastCartAddDate = lastCartAddDateState.value();

                if (lastCartAddDate == null || !lastCartAddDate.equals(curDate)) {
                    lastCartAddDateState.update(curDate);
                    out.collect(new CartAddUuBean("","",1L,curTs));
                }
            }
        });

        // TODO 7. 开窗聚合(补充字段)
        SingleOutputStreamOperator<CartAddUuBean> resultDS = filterDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<CartAddUuBean>() {
                    @Override
                    public CartAddUuBean reduce(CartAddUuBean value1, CartAddUuBean value2) throws Exception {
                        value1.setCartAddUuCt(value1.getCartAddUuCt() + value2.getCartAddUuCt());
                        return value1;
                    }
                }, new AllWindowFunction<CartAddUuBean, CartAddUuBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<CartAddUuBean> values, Collector<CartAddUuBean> out) throws Exception {
                        CartAddUuBean next = values.iterator().next();
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        next.setTs(System.currentTimeMillis());
                        out.collect(next);
                    }
                });

        // TODO 8. 写出到 clickhouse
        resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_cart_add_uu_window values (?,?,?,?)"));

        // TODO 9. 启动任务
        env.execute("DwsTradeCartAddUuWindow");

5、交易域支付各窗口汇总表

任务:从 Kafka 读取交易域支付成功主题数据,统计支付成功独立用户数首次支付成功用户数(第一次在平台消费)。

5.1、思路分析

如果一个用户是首次支付成功用户(既然是历史第一次下单操作,必然也是今天的第一次下单),那么他必然是今天的支付成功独立用户;所以我们只需要通过状态过滤出 lastPayDate = null 或者 lastPayDate != curDt 的用户(注意:这里的 lastPayDate 不能设置 TTL ,因为我们需要知道这个用户历史上有没有支付过,所以就不允许状态失效)

left join 实现过程

        假设 A 表作为主表与 B 表做等值左外联。当 A 表数据进入算子,而 B 表数据未至时会先生成一条 B 表字段均为 null 的关联数据ab1,其标记为 +I。其后,B 表数据到来,会先将之前的数据撤回,即生成一条与 ab1 内容相同,但标记为 -D 的数据,再生成一条关联后的数据,标记为 +I。这样生成的动态表对应的流称之为回撤流。

在 DWD 层的订单预处理表(dwd_trade_order_pre_process)生成过程中会形成回撤流,因为它需要对订单明细活动表和订单明细优惠券表进行 left join。而我们这里的支付成功依赖于 DWD 层支付成功事务事实表(dwd_trade_pay_detail_suc),该表又依赖于 DWD 层的下单事务事实表(dwd_trade_order_detail),所以这里我们需要考虑回撤流的问题:

回撤数据在 Kafka 中以 null 值的形式存在,只需要简单判断即可过滤。我们需要考虑的是如何对其余数据去重:

order_id = 1001
order_detail_id = 1001-a
order_detail_activity_id: a1


SELECT ...
FROM
order_detail od 
join 
    order_info oi
on
    od.order_id = oi.id
left join 
    order_detail_activity oa
on
    od.id = oa.order_detail_id

上面我们有一个订单(id=1001),这个订单内只有一个商品并且参与了活动,那么由于 order_detail_activity 来得肯定要晚一些,所以可能会出现下面这种情况:

+/-    order_id    order_detail_id     order_detail_activity_id

+        1001        1001-a                    null
-        null        null                      null
+        1001        1001-a                    a1

我们过滤 null 值指的是过滤上面操作是 '-' 的数据,因为回撤数据在 Kafka 中以 null 值的形式存在。而除了 null 值之外,我们还应该过滤掉旧的错误数据,由于 order_detail_activity 数据来得晚一些,导致flink 直接给字段 order_detail_activity_id 一个 null,所以我们应该把这个字段值删除;

但是,对于这个需求(求支付成功的用户数),其实我们也可以不做去重,放到最后再做去重,为什么呢?设想如果一个用户下了多个订单,而我们的支付成功表的粒度是商品,所以数据即使在 left join 之后对相同 order_detail_id 的数据做了去重,但是多个订单的话最终还有重复。

考虑到之后还可能遇到需要去重的需求(尤其是设计到金额的),这里我们还是练习一下如何实现去重:

5.2、代码实现

5.2.1、创建 clickhouse 表格及对应的 JavaBean

create table if not exists dws_trade_payment_suc_window
(
    stt                           DateTime,
    edt                           DateTime,
    payment_suc_unique_user_count UInt64,
    payment_new_user_count        UInt64,
    ts                            UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);
import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class TradePaymentWindowBean {
    // 窗口起始时间
    String stt;

    // 窗口终止时间
    String edt;

    // 支付成功独立用户数
    Long paymentSucUniqueUserCount;

    // 支付成功新用户数
    Long paymentSucNewUserCount;

    // 时间戳
    Long ts;
}

5.2.2、创建时间工具类

为了去重,我们需要对每一条数据都设置一个时间,因为对于重复数据,它们在原始表中的时间字段值都是一样的。

FlinkSQL 提供了几个可以获取当前时间戳的函数

  • localtimestamp():返回本地时区的当前时间戳,返回类型为 TIMESTAMP(3)。在流处理模式下会对每条记录计算一次时间。而在批处理模式下,仅在查询开始时计算一次时间,所有数据使用相同的时间。
  • current_timestamp():返回本地时区的当前时间戳,返回类型为 TIMESTAMP_LTZ(3)。在流处理模式下会对每条记录计算一次时间。而在批处理模式下,仅在查询开始时计算一次时间,所有数据使用相同的时间。
  • now():与 current_timestamp 相同。
  • current_row_timestamp():返回本地时区的当前时间戳,返回类型为 TIMESTAMP_LTZ(3)。无论在流处理模式还是批处理模式下,都会对每行数据计算一次时间

这里,我们使用current_row_timestamp 来作为时间,我们需要给订单预处理表中添加:

current_row_timestamp() as row_op_ts

-- 在建表语句中添加
row_op_ts TIMESTAMP_LTZ(3)

那么,下单事务事实表来源于订单预处理表,支付成功事务事实表依赖于下单事务事实表,搜易当然也应该添加该字段。

import java.util.Comparator;

public class TimestampLtz3CompareUtil {

    public static int compare(String timestamp1, String timestamp2) {
        // 数据格式 2022-04-01 10:20:47.302Z
        // 1. 去除末尾的时区标志,'Z' 表示 0 时区
        String cleanedTime1 = timestamp1.substring(0, timestamp1.length() - 1);
        String cleanedTime2 = timestamp2.substring(0, timestamp2.length() - 1);
        // 2. 提取小于 1秒的部分
        String[] timeArr1 = cleanedTime1.split("\\.");
        String[] timeArr2 = cleanedTime2.split("\\.");
        String microseconds1 = new StringBuilder(timeArr1[timeArr1.length - 1])
                .append("000").toString().substring(0, 3);
        String microseconds2 = new StringBuilder(timeArr2[timeArr2.length - 1])
                .append("000").toString().substring(0, 3);
        int micro1 = Integer.parseInt(microseconds1);
        int micro2 = Integer.parseInt(microseconds2);
        // 3. 提取 yyyy-MM-dd HH:mm:ss 的部分
        String date1 = timeArr1[0];
        String date2 = timeArr2[0];
        Long ts1 = DateFormatUtil.toTs(date1, true);
        Long ts2 = DateFormatUtil.toTs(date2, true);
        // 4. 获得精确到毫秒的时间戳
        long microTs1 = ts1 * 1000 + micro1;
        long microTs2 = ts2 * 1000 + micro2;

        long divTs = microTs1 - microTs2;

        return divTs < 0 ? -1 : divTs == 0 ? 0 : 1;
    }

    public static void main(String[] args) {
        System.out.println(compare("2022-04-01 11:10:55.040Z",
                "2022-04-01 11:10:55.04Z"));
    }
}

 5.2.3、读取DWD支付成功事务事实表

        读取DWD支付成功事务事实表并转为 JSON 格式,然后按照订单明细id进行分组(为了对回撤流的数据进行去重,根据相同明细id的时间进行判断)

// TODO 3. 读取 dwd_trade_pay_detail_suc 的数据
        String groupId = "dws_trade_payment_suc_window";
        DataStreamSource<String> paymentSucDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_pay_detail_suc", groupId));

        // TODO 4. 将数据转为JSON格式
        SingleOutputStreamOperator<JSONObject> jsonDS = paymentSucDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSONObject.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    // 可以选择输出到侧输出流
                    e.printStackTrace();
                }
            }
        });

        // TODO 5. 按照订单明细id分组
        KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getString("order_detail_id"));

5.2.4、状态编程对回撤流中的数据去重

这里的回撤流是因为支付成功事务事实表需要用 订单明细 innner join 订单表 left join 订单明细活动 left join 订单明细活动造成的;

// TODO 6. 使用状态编程过滤最新数据输出(需要使用状态和定时器所以使用 process)
        SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {

            private ValueState<JSONObject> lastPaySucDateState;

            @Override
            public void open(Configuration parameters) throws Exception {
                lastPaySucDateState = getRuntimeContext().getState(new ValueStateDescriptor<>("last-pay-suc", JSONObject.class));
            }

            @Override
            public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
                JSONObject state = lastPaySucDateState.value();
                if (state == null) {
                    lastPaySucDateState.update(value);
                    // 注册定时器
                    ctx.timerService().registerEventTimeTimer(ctx.timerService().currentProcessingTime() + 5000L);
                } else {
                    String stateRt = state.getString("row_op_ts");
                    String curRt = value.getString("row_op_ts");
                    int compare = TimestampLtz3CompareUtil.compare(stateRt, curRt);

                    if (compare != 1) { // 状态里的时间小
                        lastPaySucDateState.update(value);
                    }
                }
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<JSONObject> out) throws Exception {
                super.onTimer(timestamp, ctx, out);
                // 输出并清空状态数据
                JSONObject value = lastPaySucDateState.value();
                out.collect(value);

                lastPaySucDateState.clear();
            }
        });

5.2.5、提取事件时间并生成水位线

这里选择 callback_time ,它是支付成功后的回调时间; 

// TODO 7. 提取事件时间生成水位线
        SingleOutputStreamOperator<JSONObject> jsonWithWmDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject element, long recordTimestamp) {
                        return DateFormatUtil.toTs(element.getString("callback_time"), true);
                    }
                }));

5.2.6、按照 user_id 分组并提取支付成功独立用户数和首次支付成功用户数

// TODO 8. 按照 user_id 分组
        KeyedStream<JSONObject, String> keyedByUidDS = jsonWithWmDS.keyBy(json -> json.getString("user_id"));

        // TODO 9. 提取独立支付成功用户数和首次支付成功用户数
        SingleOutputStreamOperator<TradePaymentWindowBean> tradePaymentDS = keyedByUidDS.flatMap(new RichFlatMapFunction<JSONObject, TradePaymentWindowBean>() {

            private ValueState<String> lastDtState;

            @Override
            public void open(Configuration parameters) throws Exception {
                lastDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>("lastDt", String.class));
            }

            @Override
            public void flatMap(JSONObject value, Collector<TradePaymentWindowBean> out) throws Exception {
                String lastDt = lastDtState.value();
                String curDt = value.getString("callback_time").split(" ")[0];

                // 当日支付人数
                long pay = 0L;
                // 首次支付人数
                long newPay = 0L;

                // 判断状态是否为null
                if (lastDt == null) {
                    pay = 1;
                    newPay = 1;
                    lastDtState.update(curDt);
                } else if (!lastDt.equals(curDt)) {
                    pay = 1;
                    lastDtState.update(curDt);
                }

                // 写出
                if (pay == 1) {
                    out.collect(new TradePaymentWindowBean("", "", newPay, pay, DateFormatUtil.toTs(curDt)));
                }
            }
        });

5.2.7、开窗聚合并写出到 clickhouse

开窗是为了实时刷新到报表,聚合依然是那两个函数:增量聚合(聚合结果),全量聚合(补充窗口起止字段);

// TODO 10. 开窗,聚合
        SingleOutputStreamOperator<TradePaymentWindowBean> resultDS = tradePaymentDS.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<TradePaymentWindowBean>() {
                    @Override
                    public TradePaymentWindowBean reduce(TradePaymentWindowBean value1, TradePaymentWindowBean value2) throws Exception {
                        value1.setPaymentSucNewUserCount(value1.getPaymentSucNewUserCount() + value2.getPaymentSucNewUserCount());
                        value1.setPaymentSucUniqueUserCount(value1.getPaymentSucUniqueUserCount() + value2.getPaymentSucUniqueUserCount());
                        return value1;
                    }
                }, new AllWindowFunction<TradePaymentWindowBean, TradePaymentWindowBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<TradePaymentWindowBean> values, Collector<TradePaymentWindowBean> out) throws Exception {
                        TradePaymentWindowBean next = values.iterator().next();
                        next.setTs(System.currentTimeMillis());
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        out.collect(next);
                    }
                });

        // TODO 11. 写出到 clickhouse
        resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_payment_suc_window values(?,?,?,?,?)"));

        // TODO 12. 启动任务
        env.execute("DwsTradePaymentSucWindow");

总结

        今天的 DWS 层到此为止,剩下了还有几个需求估计还得 1~2 天完成,这一块要比之前都难一些,争取这周日前把实时数仓完结;然后下周开始把离线和实时再好好复习一遍;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1986145.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

广东省造林绿化施工丙级资质2024年9月开通申报

关于广东省造林绿化施工丙级资质2024年9月的申报情况&#xff0c;可以归纳如下&#xff1a; 一、申报时间 具体时间&#xff1a;2024年9月1日至9月30日。在此期间&#xff0c;相关企业可以在网上提交申请。 二、申报条件 资历和信誉 1、独立企事业法人资格&#xff1a;申请…

程序跟随系统主题色切换主题

如果程序要跟随系统主题色进行切换&#xff0c;需监听当前系统的主题色&#xff0c;下面介绍Windows和MacOS下获取当前系统主题的方法 Windows 系统切换主题 以win10为例&#xff0c;点击右键选择个性化&#xff0c;进入个性化页面&#xff0c;选择左侧颜色的Tab&#xff0c…

mp3格式转换器哪个好用?汇总七款音频格式转换方法(无损转换)

音乐已经成为我们生活中不可或缺的一部分。但是在播放的时候&#xff0c;可能会遇到音频格式不兼容的情况。特别是在一些下载站或音乐平台获取的音频&#xff0c;有些特殊格式在播放器上无法正常播放&#xff0c;一般这种情况我们需要借助mp3转换器解决。 mp3是一种常见的数字音…

三更的springsecurity课程个人笔记总计4万字,全部测试通过,代码cv即可

SpringSecurity b站 40.源码讲解部分说明_哔哩哔哩_bilibili BV1mm4y1X7Hc 以下全为个人总结&#xff0c;不能代表官方&#xff0c;有错误还请指出&#xff08;全部测试通过&#xff09;&#xff08;1刷视频&#xff09; 1-简介 tip 接下来的所有类不会包含import信息&am…

爬虫代理教程:爬虫代理池部署+高并发实现方法

在数据爬取的世界里&#xff0c;代理IP就像是爬虫的隐身衣&#xff0c;帮助我们在网络上自由穿梭&#xff0c;避免被目标网站识别封禁。今天我就来分享一下爬虫代理池的部署和高并发实现的技巧&#xff0c;希望能对大家有所帮助。 什么是爬虫代理池&#xff1f; 首先&#xf…

nginx下载安装及使用教程

一、打开下载官网&#xff1a;nginx 选择稳定版本&#xff08;windows&#xff09; 然后就是解压安装到指定目录下 二、启动nginx 使用cmd命令提示符进入&#xff0c;输入一下命令(注意&#xff1a;回车确认是会出现一闪&#xff0c;这是正常现象&#xff09; 查看任务进程是否…

Flink学习之Flink SQL(补)

Flink SQL 1、SQL客户端 1.1 基本使用 启动yarn-session yarn-session.sh -d启动Flink SQL客户端 sql-client.sh--退出客户端 exit;测试 重启SQL客户端之后&#xff0c;需要重新建表 -- 构建Kafka Source -- 无界流 drop table if exists students_kafka_source; CREATE TABL…

SourceTree配置多个不同Remote地址的仓库

需求 在我们开发过程中&#xff0c;有可能需要拉取的地址仓库不在同一个仓库中&#xff0c;有些可能在Github上&#xff0c;有些可能在Gitlab上。 所以我们需要配置Github的仓库的配置和Gitlab仓库的配置。 现在&#xff0c;我们来配置两个不同的仓库的地址。 假设&#xf…

快速体验LLaMA-Factory 私有化部署和高效微调Llama3模型FAQ

序言 之前已经介绍了在超算互联网平台SCNet上使用异构加速卡AI 显存64GB PCIE&#xff0c;私有化部署Llama3模型&#xff0c;并对 Llama3-8B-Instruct 模型进行 LoRA 微调、推理和合并 &#xff0c;详细内容请参考另一篇博客&#xff1a;快速体验LLaMA-Factory 私有化部署和高…

Android14音频进阶之命令行播放音频(八十)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 新书发布:《Android系统多媒体进阶实战》🚀 优质专栏: Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏: 多媒体系统工程师系列【原创干货持续更…

SpringDI(依赖注入)的理解

目录 一、什么是SpringDI&#xff1f; 二、SpringDI&#xff08;依赖注入&#xff09;的作用是什么&#xff1f; 三、DI的实现方式 3.1 set注入 1.项目结构&#xff1a; 2.set注入对象 2.1applicationContext.xml&#xff1a; 2.2 Student 实体类 2.3 controller: 2.4 da…

SpringIOC容器对Bean管理

一、bean实例化 1.通过构造方法&#xff08;默认&#xff09; 2.通过工厂方法 3.通过静态工厂方法 项目结构&#xff1a; 1. 通过构造方法&#xff08;默认&#xff09; 1.1 pojo实体类&#xff1a; Student: public class Student {public Student() {System.out.println…

软件安全测试报告内容和作用简析,软件测试服务供应商推荐

在数字化时代&#xff0c;软件安全问题愈发凸显&#xff0c;安全测试显得尤为重要。软件安全测试报告是对软件系统在安全性方面进行评估和分析后的书面文件。该报告通常包含测试过程、测试发现、漏洞描述、风险评估及改进建议等重要信息。报告的目的是为了帮助开发团队及时发现…

各种排序算法【持续更新中.....】

1.归并排序 归并排序 &#xff0c;归并排序是采用分治法(Divide and Conquer&#xff09;的一个非常典型的应用&#xff0c;所以我们先来说一下什么是分治法。 分治法 定义 分治&#xff08;英语&#xff1a;Divide and Conquer&#xff09;&#xff0c;字面上的解释是「分…

C++初阶大全

目录 一.命名空间 1.命名空间定义 2.命名空间使用 二.C输入&输出 三.缺省参数 四. 函数重载 五.引用 1.常引用 2.传值、传引用效率比较 3.引用和指针的区别 4.引用和指针的不同点: 小知识点: 六.内联函数 七.auto关键字(C11) 1.auto的使用细则 八.基于范围…

为何你的进销存系统买来却成了摆设?教你7大招解决!

我做企业数字化系统9年&#xff0c;调研的企业大大小小也有几十家了。我发现个挺扎心的现象&#xff1a;有三成企业买的进销存系统&#xff0c;最后都成了闲置的“高科技装饰品”。为啥这些企业舍得掏腰包&#xff0c;到头来却让它们吃灰呢&#xff1f; 说到底&#xff0c;就一…

开源免费的wiki知识库

开源的Wiki知识库有多种选择&#xff0c;它们各自具有不同的特点和优势&#xff0c;适用于不同的场景和需求。以下是一些主流的开源Wiki知识库系统&#xff1a; MediaWiki 简介&#xff1a;MediaWiki是使用PHP编写的免费开源Wiki软件包&#xff0c;是Wikipedia和其他Wikimedia…

达梦数据库的系统视图v$mem_reginfo

达梦数据库的系统视图v$mem_reginfo 达梦数据库的V$MEM_REGINFO视图提供了系统当前已分配但未释放的内存信息。这个视图在MEMORY_LEAK_CHECK参数设置为1时才会注册信息&#xff0c;用于监控内存的申请和释放情况。通过查询V$MEM_REGINFO视图&#xff0c;可以关注REFNUM字段&am…

跟《经济学人》学英文:2024年08月03日这期 Britain’s railways go from one extreme to another

Britain’s railways go from one extreme to another Departing: privatisation. Destination: centralisation 出发:私有化。目的地:集中化 depart&#xff1a;出发 privatisation&#xff1a;美 [ˌpraɪvətaɪ’zeɪʃən] 私有化&#xff1b;民营化&#xff1b;私营化…

IO进程—进程间的通信—共享内存具体函数操作

创建唯一key值—— ftok key_t ftok(const char *pathname, int proj_id); #include <sys/types.h> #include <sys/ipc.h> 功能&#xff1a; ftok函数是用于生成一个键值&#xff08;key_t类型&#xff09;的函数&#xff0c;这个键值通常用于进程间通信&#xff0…