Flink 实时数仓(九)【DWS 层搭建(三)交易域汇总表创建】

news2025/1/23 9:15:39

前言

        今天立秋,任务是完成 DWS 剩余的表,不知道今天能不能做完,欲速则不达,学不完就明天继续,尽量搞懂每一个需求;

1、交易域下单各窗口汇总表

任务:从 Kafka 订单明细主题读取数据,对数据去重,统计当日下单独立用户数新增下单用户数,封装为实体类,写入 ClickHouse。

1.1、思路分析

首先,现在 DWS 这张表依赖于下单事务事实表,而下单事务事实表又依赖于订单预处理表,订单预处理表正是我们所讨论的会发生数据迟到的表(因为订单明细活动表和订单明细优惠券表要和主表进行 left join);

这个需求和昨天的交易域支付各窗口汇总表几乎一样,只不过这次我们不再对数据做去重了,因为上游订单明细活动表和订单明细优惠券表 left join 迟到不会影响我们表中的字段数据(表中字段的数据全部来自订单明细),所以下面我们在对 order_detail_id 分组后,我们只需要等到第一条数据来(不管乱序不乱序,因为我们要的字段在主表里,而撤回流写入 kafka 的null我们可以在分组前转 json 流的时候直接过滤掉),后面的数据直接丢弃即可(都是 left join 活动和优惠券的数据,不需要);

1.2、代码实现

1.2.1、创建 ck 表及 Bean

​
create table if not exists dws_trade_order_window
(
    stt                          DateTime,
    edt                          DateTime,
    order_unique_user_count      UInt64,
    order_new_user_count         UInt64,
    order_activity_reduce_amount Decimal(38, 20),
    order_coupon_reduce_amount   Decimal(38, 20),
    order_origin_total_amount    Decimal(38, 20),
    ts                           UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt);

​

 表中的 order_activity_reduce_amount 和 order_coupon_reduce_amount 字段并不需要订单明细活动表和订单明细优惠券表参与,因为 order_detail 和 order_info 表中都有关于优惠券和活动减免金额的字段;所以我们不需要考虑去重,因为对于这个需求,并不需要担心因为 订单明细活动表和订单明细优惠券表 的 left join 数据迟到,它俩迟到对这张表字段并不影响;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

@Data
@AllArgsConstructor
@Builder
public class TradeOrderBean {
    // 窗口起始时间
    String stt;

    // 窗口关闭时间
    String edt;

    // 下单独立用户数
    Long orderUniqueUserCount;

    // 下单新用户数
    Long orderNewUserCount;

    // 下单活动减免金额
    Double orderActivityReduceAmount;

    // 下单优惠券减免金额
    Double orderCouponReduceAmount;

    // 下单原始金额
    Double orderOriginalTotalAmount;

    // 时间戳
    Long ts;
}

1.2.2、读取下单事务事实表并转为 JSON 流

// TODO 2. 读取 kafka dwd_trade_order_detail
        String groupId = "dws_trade_order_window";
        DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));

        // TODO 3. 转为 JSON 对象
        SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSONObject.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    // 可以选择输出到侧输出流
                    e.printStackTrace();
                }
            }
        });

1.2.3、分组第一次去重

去重一共分为两步,第一步按照最细粒度 order_detail_id 进行,但是我们要求的是用户数,所以其实这里这一步只是为了多练习一下数据有撤回流时数据重复以及完整性的问题;

这里用的富函数版本的 filter 算子,因为涉及状态、ttl(上下午)等

// TODO 4. 第一次去重(根据 order_detail_id 进行分组)
        KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getString("id"));

        // TODO 5. 针对 order_detail_id 进行去重(保留第一条数据即可,因为要使用状态编程所以使用Rich)
        SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {

            // 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)
            private ValueState<String> state;

            @Override
            public void open(Configuration parameters) throws Exception {
                StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();

                ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);
                stateDescriptor.enableTimeToLive(ttlConfig);
                state = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public boolean filter(JSONObject value) throws Exception {
                String data = state.value();
                if (data == null) {
                    state.update("1"); //  随便存就行
                    return true;
                }
                return false;
            }
        });

1.2.4、提取事件时间生成水位线

这里依然选择提取 create_time 字段,更贴近事件时间;

这里的定时器可以设置 5s 或者稍微久一点,因为我们上游订单预处理表在生成的过程中需要 join,我们当时给了 5s,也就是说我们能保证 5s 数据就能完整,所以这里的状态保存 5 s我们就能保证数据肯定是完整的,订单明细活动表和订单明细优惠券表的 left join 肯定已经完成了,不会有迟到数据了。而事实上,当第一条数据来的时候我们就会输出,因为我们所需要的字段数据是在 order_detail 中的,是订单预处理表的主表(订单明细表),而我们这里的下单事务事实表又是直接依赖于订单预处理表,所以订单明细活动表和订单明细优惠券表 的 left join 即使迟到,也不会受影响。

// TODO 6. 提取事件时间生成 watermark
        SingleOutputStreamOperator<JSONObject> jsonWithWmDS = filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject element, long recordTimestamp) {
                        return DateFormatUtil.toTs(element.getString("create_time"));
                    }
                }));

1.2.5、按照 uid 分组第二次去重

// TODO 7. 按照 user_id 分组
        KeyedStream<JSONObject, String> keyedByUidDS = jsonWithWmDS.keyBy(json -> json.getString("user_id"));

        // TODO 8. 提取独立下单用户
        SingleOutputStreamOperator<TradeOrderBean> tradeOrderDS = keyedByUidDS.map(new RichMapFunction<JSONObject, TradeOrderBean>() {

            private ValueState<String> lastOrderDtState;

            @Override
            public void open(Configuration parameters) throws Exception {
                lastOrderDtState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-order-dt", String.class));
            }

            @Override
            public TradeOrderBean map(JSONObject value) throws Exception {
                String lastOrderDt = lastOrderDtState.value();
                String curDt = value.getString("create_time").split(" ")[0];

                // 下单独立用户数
                long orderUniqueUserCount = 0;
                // 下单新用户数
                long orderNewUserCount = 0;
                if (lastOrderDt == null) {
                    orderUniqueUserCount = 1L;
                    orderNewUserCount = 1L;
                    lastOrderDtState.update(curDt);
                } else if (!lastOrderDt.equals(curDt)) {
                    orderUniqueUserCount = 1L;
                    lastOrderDtState.update(curDt);
                }

                // 取出下单件数和单价
                Integer sku_num = value.getInteger("sku_num");
                Double order_price = value.getDouble("order_price");

                return new TradeOrderBean("", "",
                        orderUniqueUserCount,
                        orderNewUserCount,
                        value.getDouble("split_activity_amount"),
                        value.getDouble("split_coupon_amount"),
                        sku_num * order_price,
                        null); // ts 后面开窗的时候都会给当前时间,到时候再补充
            }
        });

1.2.6、开窗聚合写出到 clickhouse

开窗为了时效性,聚合是因为我们的指标是一天的统计量,所以对一天的数据需要进行聚合;同样使用增量聚合函数 + 全量聚合函数来补全字段;

// TODO 9. 开窗聚合
        SingleOutputStreamOperator<TradeOrderBean> resultDS = tradeOrderDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TradeOrderBean>() {
                    @Override
                    public TradeOrderBean reduce(TradeOrderBean value1, TradeOrderBean value2) throws Exception {
                        value1.setOrderUniqueUserCount(value1.getOrderUniqueUserCount() + value2.getOrderUniqueUserCount());
                        value1.setOrderNewUserCount(value1.getOrderNewUserCount() + value2.getOrderNewUserCount());
                        value1.setOrderOriginalTotalAmount(value1.getOrderOriginalTotalAmount() + value2.getOrderOriginalTotalAmount());
                        value1.setOrderNewUserCount(value1.getOrderNewUserCount() + value2.getOrderNewUserCount());
                        value1.setOrderCouponReduceAmount(value1.getOrderCouponReduceAmount() + value2.getOrderCouponReduceAmount());
                        value1.setOrderActivityReduceAmount(value1.getOrderActivityReduceAmount() + value2.getOrderActivityReduceAmount());
                        return value1;
                    }
                }, new AllWindowFunction<TradeOrderBean, TradeOrderBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<TradeOrderBean> values, Collector<TradeOrderBean> out) throws Exception {
                        TradeOrderBean next = values.iterator().next();
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        next.setTs(System.currentTimeMillis());
                        out.collect(next);
                    }
                });

        // TODO 10. 写入 clickhouse
        resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_order_window values(?,?,?,?,?,?,?,?)"));

        // TODO 11. 启动任务
        env.execute("DwsTradeOrderWindow");

2、交易域用户-SPU粒度下单各窗口汇总表

        这应该是目前为止最难的一个需求了,从 Kafka 订单明细主题读取数据(dwd_trade_order_detail),过滤 null 数据(因为下单事务事实表依赖于的订单预处理表,而这张表的生成需要 left join 活动和优惠券表,所以会形成回撤流)并按照唯一键对数据去重,关联维度信息(因为 order_info 和 order_detail 中都没有 spu 信息),按照维度(user,spu,trademark,category)分组,统计各维度各窗口的订单数和订单金额,将数据写入 ClickHouse 交易域品牌-品类-用户-SPU粒度下单各窗口汇总表;

2.1、需求分析

        可以看到,我们的建表语句中不仅保留了我们需求中要求的用户和sku_id,我们还同时保留了品牌、品类等字段,为的是之后可以扩展更多的需求而不用再去创建;其实我们设置可以将这个需求的粒度再做小一点,做到用户-sku粒度,同时保留 spu信息,这样我们就可以实现更多细粒度的需求了;

        我们需要关联 6 张维表:首先根据 sku_id 去读取 sku_info ,得到该 sku_id 的 spu_id、3个category_id以及trademark_id ,然后再去分别关联这 5 张表(通过 category3 去关联 category2,再用category2去获取category1);

create table if not exists dws_trade_user_spu_order_window
(
    stt            DateTime,
    edt            DateTime,
    trademark_id   String,
    trademark_name String,
    category1_id   String,
    category1_name String,
    category2_id   String,
    category2_name String,
    category3_id   String,
    category3_name String,
    user_id        String,
    spu_id         String,
    spu_name       String,
    order_count    UInt64,
    order_amount   Decimal(38, 20),
    ts             UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt, spu_id, spu_name, user_id);

        我们在创建该表的 Java Bean 的时候需要额外补充一些字段——sku_id 和 一个 Set 类型的 orderset,因为没有 sku_id 字段就无法关联任一维表,而 order_set 是我们将来做去重的重要依据;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

import java.util.Set;

@Data
@AllArgsConstructor
@Builder
public class TradeUserSpuOrderBean {
    // 窗口起始时间
    String stt;
    // 窗口结束时间
    String edt;
    // 品牌 ID
    String trademarkId;
    // 品牌名称
    String trademarkName;
    // 一级品类 ID
    String category1Id;
    // 一级品类名称
    String category1Name;
    // 二级品类 ID
    String category2Id;
    // 二级品类名称
    String category2Name;
    // 三级品类 ID
    String category3Id;
    // 三级品类名称
    String category3Name;

    // 订单 ID
    @TransientSink
    Set<String> orderIdSet;

    // sku_id
    @TransientSink
    String skuId;

    // 用户 ID
    String userId;
    // spu_id
    String spuId;
    // spu 名称
    String spuName;
    // 下单次数
    Long orderCount;
    // 下单金额
    Double orderAmount;
    // 时间戳
    Long ts;
}

在上面的 Java Bean 中我们用到了构造者模式,因为我们在把数据转为 JavaBean 类型的数据流的时候,很多字段需要关联维表才能得到,而初始化构造太烦人了,所以我们通过构造者模式来不断丰富对象的属性值; 

表中的 order_count 字段是指对某一品类商品下单的次数,也就是指用户在不同 order_id 中下单相同的 spu 数;而用户在一个 order_id 中下单的多个相同 spu 是只能计数为 1 的;

所以考虑到一个 order_id 中的 spu_id 可能重复:

 而且 sku_id 也可能发生重复(可能一个订单中的一个商品买了多件,虽然 order_id 和 sku_id 肯定相同,但是每件用了不同的券,有的平台会视为一条(sku_id相同,sku_num=2),但是有的平台可能会视为两条sku_id相同,sku_num=1的数据)。

所以我们必须做去重,那应该怎么去重?

  • 读取数据源(下单事务事实表)
  • 转为 JSON 流
  • 按照唯一键去重(除去上游表 left join 导致的回撤流的重复数据)
  • 转为 JavaBean 类型的流
  • 关联维表(只补充 tm_id,spu_id 和 category3_id)
  • 添加水位线
  • 分组(按照维度 user,spu,trademark,category)
  • 开窗
    • 创建一个 set 集合存储 order_id
  • 聚合(根据 order_id 进行去重,set 的大小就是 order_count)
  • 关联维表(补充 tm_name,spu_name,category的name 以及 category2 和 category1 的 id 和 name)

为什么我们要在开窗聚合前后去关联两次维表呢?

  • 首先,在开窗前我们只关联了分组所必须依赖的字段
  • 开窗后的字段都是一些对分组没有影响的字段,所以聚合后再去关联不仅省去了没必要的数据传输,而且聚合后的数据量更小,关联时读取 hbase 的数据量小(数据量小就减少了关联的次数)

2.2、代码实现

2.2.1、读取下单事务事实表 + 转换JSON + 分组 + 去重

这部分和上面的需求一样,直接拿来:

        // TODO 2. 读取 kafka dwd_trade_order_detail
        String groupId = "dws_trade_trademark_category_user_order_window";
        DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));

        // TODO 3. 转为 JSON 格式
        SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSONObject.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    // 可以选择输出到侧输出流
                    e.printStackTrace();
                }
            }
        });

        // TODO 4. 第一次去重(根据 order_detail_id 进行分组)
        KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getString("id"));

        // TODO 5. 针对 order_detail_id 去重上游 left join 导致的重复数据(保留第一条数据即可,因为要使用状态编程所以使用Rich)
        SingleOutputStreamOperator<JSONObject> filterDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {

            // 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)
            private ValueState<String> state;

            @Override
            public void open(Configuration parameters) throws Exception {
                StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();

                ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);
                stateDescriptor.enableTimeToLive(ttlConfig);
                state = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public boolean filter(JSONObject value) throws Exception {
                String data = state.value();
                if (data == null) {
                    state.update("1"); //  随便存就行
                    return true;
                }
                return false;
            }
        });

2.2.2、转为 Java Bean 类型的流

        // TODO 6. 将流转为 JavaBean 类型
        SingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuDS = filterDS.map(line -> {
                    HashSet<String> orderIds = new HashSet<>();
                    orderIds.add(line.getString("order_id"));
                    return TradeUserSpuOrderBean.builder()
                            .skuId(line.getString("sku_id"))
                            .userId(line.getString("user_id"))
                            .orderAmount(line.getDouble("split_total_amount"))
                            .orderIdSet(orderIds)
                            .ts(DateFormatUtil.toTs(line.getString("create_time"), true))
                            .build();
                }
        );

2.2.3、维表查询工具类封装

之后我们要将数据流和维表进行关联,那我们就必须去 Phoenix 查询数据,所以我们这里封装一个工具类:

/**
 * 适用于任何 JDBC 方式访问的数据库中的任何查询语句
 */
public class JdbcUtil {

    /**
     * @param connection 连接对象
     * @param sql sql语句
     * @param clz 返回类型对象
     * @param underScoreToCamel 是否下划线转为驼峰
     * @param <T> 返回类型
     * @return 结果集列表
     */
    public static <T> List<T> queryList(Connection connection, String sql, Class<T> clz, boolean underScoreToCamel) throws SQLException, InstantiationException, IllegalAccessException, InvocationTargetException {
        // 创建集合用于存放结果
        ArrayList<T> result = new ArrayList<>();

        // 预编译 SQL
        PreparedStatement preparedStatement = connection.prepareStatement(sql);

        // 执行查询
        ResultSet resultSet = preparedStatement.executeQuery();

        // 获取查询的元数据信息
        ResultSetMetaData metaData = resultSet.getMetaData();
        int columnCount = metaData.getColumnCount();

        // 遍历结果集
        while (resultSet.next()) {
            T t = clz.newInstance();

            for (int i = 0; i < columnCount; i++) {
                // 获取列名和列值
                String columnName = metaData.getColumnName(i);
                Object value = resultSet.getObject(columnName);

                // 判断是否需要进行下划线与驼峰命名的转换
                if (underScoreToCamel) {
                    columnName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName.toLowerCase());
                }

                // 赋值
                BeanUtils.setProperty(t, columnName, value);
            }

            resultSet.close();
            preparedStatement.close();

            result.add(t);
        }
        return result;
    }
}
public class DimUtil {

    public static JSONObject getDimInfo(Connection connection,String tableName,String key) throws SQLException, InvocationTargetException, InstantiationException, IllegalAccessException {
        // 拼接 SQL
        String querySql = "select * from " + GmallConfig.HBASE_SCHEMA + "." + tableName + " where id = '" + key + "'";
        // 查询数据
        List<JSONObject> queryList = JdbcUtil.queryList(connection, querySql, JSONObject.class, false);
        // 返回结果
        return queryList.get(0);
    }

}

现在有个问题是:Phoenix 查询数据的速度每条平均 7 ms 左右,这显然不能满足大数据的场景,关于 HBase 的查询数据流程我们很熟悉:

  • 客户端向 zookeeper 查询元数据表所在的 regionServer 地址
  • zookeeper 返回 metastore 地址
  • 客户端去 regionServer 查询出 metastore 信息并保存
  • 根据 metastore 中的信息去 regionServer 去读取数据同时缓存一份到读缓存

所以我们说 HBase 的连接是一个重量级的连接,过程很长很繁琐,那我们就得考虑怎么优化这个查询速度了;

2.2.4、旁路缓存

为了提高查询效率,我们需要对数据做一个缓存,但是并不是把所有维表缓存起来,那样就没有必要把海量数据存到 HBase 了,我们只缓存热点数据;

旁路缓存注意事项:

  • 缓存要设过期时间,不然冷数据会常驻缓存,浪费资源

  • 要考虑维度数据是否会发生变化,如果发生变化要主动清除缓存(在写入 Phoenix 前判断 Maxwell 传过来的数据中 type 字段是否为 update,如果是删除缓存中对应的数据)

关于 Redis 的表设计:

  • 数据类型:String
  • Key:String(TableName+id)使用 String 做 key 只能存一条数据,方便管理每条数据的生命周期
public class JedisUtil {
    private static JedisPool jedisPool;

    private static void initJedisPool() {
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(100);
        poolConfig.setMaxIdle(5);
        poolConfig.setMinIdle(5);
        poolConfig.setBlockWhenExhausted(true);
        poolConfig.setMaxWaitMillis(2000);
        poolConfig.setTestOnBorrow(true);
        jedisPool = new JedisPool(poolConfig, "hadoop102", 6379, 10000);
    }

    public static Jedis getJedis() {
        if (jedisPool == null) {
            initJedisPool();
        }
        // 获取Jedis客户端
        Jedis jedis = jedisPool.getResource();
        return jedis;
    }

    public static void main(String[] args) {
        Jedis jedis = getJedis();
        String pong = jedis.ping();
        System.out.println(pong);
    }

}

重写 DimUtil:

    public static JSONObject getDimInfo(Connection connection,String tableName,String key) throws SQLException, InvocationTargetException, InstantiationException, IllegalAccessException {
        // 先查询 Redis
        Jedis jedis = JedisUtil.getJedis();
        String redisKey = "DIM:"+tableName+":"+key;
        String dimJsonStr = jedis.get(redisKey);
        if (dimJsonStr != null){
            // 重置过期时间
            jedis.expire(redisKey,24 * 60 * 60);
            // 归还连接
            jedis.close();
            // 返回维表数据
            return JSONObject.parseObject(dimJsonStr);
        }

        // 拼接 SQL
        String querySql = "select * from " + GmallConfig.HBASE_SCHEMA + "." + tableName + " where id = '" + key + "'";
        // 查询数据
        List<JSONObject> queryList = JdbcUtil.queryList(connection, querySql, JSONObject.class, false);

        // 写入到 Redis
        JSONObject dimInfo = queryList.get(0);
        jedis.set(redisKey,dimInfo.toJSONString());
        // 设置过期时间
        jedis.expire(redisKey,24 * 60 * 60);
        // 归还连接
        jedis.close();

        // 返回结果
        return dimInfo;
    }

    public static void deleteDimInfo(String tableName,String key){
        // 获取连接
        Jedis jedis = JedisUtil.getJedis();
        // 删除数据
        jedis.del("DIM"+tableName+":"+key);
        // 归还连接
        jedis.close();
    }

修改 DimSinkFunction 的 invoke 方法(如果是 update 数据就去删除旧的缓存) : 

    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        // 获取连接
        DruidPooledConnection connection = druidDataSource.getConnection();
        // 写出数据(需要知道写出的表名、字段)
        String sinkTable = value.getString("sinkTable");
        JSONObject data = value.getJSONObject("data");

        // 获取数据类型
        String type = value.getString("type");
        if (type!=null && type.equals("update")) {
            // Phoenix 都是大写
            DimUtil.deleteDimInfo(sinkTable.toUpperCase(),data.getString("id"));
        }

        // 如果插入数据失败 invoke 方法抛出的 Exception 会导致程序停止
        PhoenixUtil.upsertValues(connection,sinkTable,data);
        // 归还连接
        connection.close();
    }

2.2.5、异步 IO 优化查询速度

在Flink 流处理过程中,经常需要和外部系统进行交互,如通过维度表补全事实表中的维度字段。

默认情况下,在Flink 算子中,单个并行子任务只能以同步方式与外部系统交互:将请求发送到外部存储,IO阻塞,等待请求返回,然后继续发送下一个请求。这种方式将大量时间耗费在了等待结果上。

为了提高处理效率,可以有两种思路。

(1)增加算子的并行度,但需要耗费更多的资源。

(2)异步 IO。

Flink 在1.2中引入了Async I/O,将IO操作异步化。在异步模式下,单个并行子任务可以连续发送多个请求,按照返回的先后顺序对请求进行处理,发送请求后不需要阻塞式等待,省去了大量的等待时间,大幅提高了流处理效率,解决了与外部系统交互时网络延迟成为系统瓶颈的问题。

异步查询实际上是把维表的查询操作托管给单独的线程池完成,这样不会因为某一个查询造成阻塞,因此单个并行子任务可以连续发送多个请求,从而提高并发效率。对于涉及网络IO的操作,可以显著减少因为请求等待带来的性能损耗。

构建一个线程池工具类:

public class ThreadPoolUtil {

    private static ThreadPoolExecutor threadPoolExecutor;

    private ThreadPoolUtil(){

    }

    // 单例模式
    public ThreadPoolExecutor getThreadPoolExecutor() {
        if (threadPoolExecutor == null){
            synchronized (ThreadPoolUtil.class){
                if (threadPoolExecutor == null){
                    threadPoolExecutor = new ThreadPoolExecutor(4,
                            20,
                            100,
                            TimeUnit.SECONDS,
                            new LinkedBlockingDeque<>());
                }
            }
        }
        return threadPoolExecutor;
    }
}

2.2.6、编写异步函数

这个异步函数类我们使用了泛型,为了方便扩展之后的其它需求(不同流的数据类型不一样),并且添加了 2 个抽象方法:

  • getKey:因为我们的流中的数据(读取自下单事务事实表)已经携带了读取 Phoenix 维表所需要的 id 主键,但是我们又不能在代码中写死,所以我们通过这个抽象方法来让调用者实现获取主键字段的方法
  • addAttribute():同样由于不同流的数据类型不同,我们把给流字段补充这个操作(从 hbase 读到 jsonObject 数据返回)交给调用者去完成

在异步函数中我们来创建 Phoenix 客户端线程去读取数据;

public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> {

    private DruidDataSource dataSource;
    private ThreadPoolExecutor threadPoolExecutor;

    private String tableName;

    public DimAsyncFunction(){

    }

    public DimAsyncFunction(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 创建 phoenix 连接池
        dataSource = DruidDSUtil.createDataSource();
        threadPoolExecutor = ThreadPoolUtil.getThreadPoolExecutor();
    }

    @Override
    public void asyncInvoke(T input, ResultFuture<T> resultFuture) throws Exception {

        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                // 获取连接
                DruidPooledConnection connection = null;
                try {
                    connection = dataSource.getConnection();
                    // 查询维表 获取维度信息
                    String key = getKey(input);
                    JSONObject dimInfo = DimUtil.getDimInfo(connection, tableName, key);
                    // 将维度信息补充至当前数据
                    if (dimInfo != null){
                        addAttribute(input,dimInfo);
                    }
                    // 归还连接
                    connection.close();
                    // 将结果写出
                    resultFuture.complete(Collections.singletonList(input));
                } catch (Exception e) {
                    System.out.println("关联维表失败: "+input+","+tableName);
                    e.printStackTrace();
                }
            }
        });

    }

    @Override
    public void timeout(T input, ResultFuture<T> resultFuture) throws Exception {
        System.out.println("timeout"+input);
    }

    public abstract String getKey(T input);
    public abstract void addAttribute(T pojo,JSONObject dimInfo);
}

2.2.7、使用异步IO关联 sku_info 

我们的关联分为两步:

  1. 在开窗聚合前只关联 sku_info 只补充维度字段(spu_id、user_id、category3_id、tm_id),为的是减少不必要的数据传输,而且聚合后主流数据量变小,这时候再去关联其它维表字段可以减少查询的数量
  2. 在聚合后关联 spu、tm、category3、category2、category1 补充其它字段(spu_name、tm_name、category3_name、category2_id、category2_name、category1_id、category1_name)
// TODO 7. 关联 sku_info 维表补充 spu_id、tm_id、category3_id
        SingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuWithSkuDS = AsyncDataStream.unorderedWait(tradeUserSpuDS,
                new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_SKU_INFO") {
                    @Override
                    public String getKey(TradeUserSpuOrderBean input) {
                        return input.getSkuId();
                    }

                    @Override
                    public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
                        pojo.setSpuId(dimInfo.getString("SPU_ID"));
                        pojo.setTrademarkId(dimInfo.getString("TM_ID"));
                        pojo.setCategory3Id(dimInfo.getString("CATEGORY3_ID"));
                    }
                },
                100,
                TimeUnit.SECONDS);

2.2.8、提取水位线

        // TODO 8. 提取事件时间并生成 watermark
        SingleOutputStreamOperator<TradeUserSpuOrderBean> tradeUserSpuWithWmDS = tradeUserSpuWithSkuDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeUserSpuOrderBean>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<TradeUserSpuOrderBean>() {
                    @Override
                    public long extractTimestamp(TradeUserSpuOrderBean element, long recordTimestamp) {
                        return element.getTs();
                    }
                }));

2.2.9、分组开窗聚合

        按照粒度分组,之后增量聚合把相同粒度的数据的订单总数和订单金额进行累加,在全量聚合中再补充窗口起止时间、ck版本时间戳、订单总数等字段;

// TODO 9. 分组开窗聚合
        KeyedStream<TradeUserSpuOrderBean, Tuple4<String, String, String, String>> keyStream = tradeUserSpuWithWmDS.keyBy(line -> Tuple4.of(line.getSpuId(), line.getCategory3Id(), line.getTrademarkId(), line.getUserId()));
        SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceDS = keyStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TradeUserSpuOrderBean>() {
                    @Override
                    public TradeUserSpuOrderBean reduce(TradeUserSpuOrderBean value1, TradeUserSpuOrderBean value2) throws Exception {
                        value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                        value1.setOrderAmount(value1.getOrderAmount() + value2.getOrderAmount());
                        return value1;
                    }
                }, new WindowFunction<TradeUserSpuOrderBean, TradeUserSpuOrderBean, Tuple4<String, String, String, String>, TimeWindow>() {
                    @Override
                    public void apply(Tuple4<String, String, String, String> stringStringStringStringTuple4, TimeWindow window, Iterable<TradeUserSpuOrderBean> input, Collector<TradeUserSpuOrderBean> out) throws Exception {
                        TradeUserSpuOrderBean next = input.iterator().next();

                        next.setTs(System.currentTimeMillis());
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        next.setOrderCount((long) next.getOrderIdSet().size());

                        out.collect(next);
                    }
                });

2.2.10、异步IO关联其它维表并写出到 ck

// TODO 10. 关联 spu、tm、category 维表的 name 字段以及 category2 和 category1 的 id 和 name 字段
        // TODO 10.1 关联 Spu
        SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithSpuDS = AsyncDataStream.unorderedWait(reduceDS,
                new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_SPU_INFO") {
                    @Override
                    public String getKey(TradeUserSpuOrderBean input) {
                        return input.getSpuId();
                    }

                    @Override
                    public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
                        pojo.setSpuName(dimInfo.getString("SPU_NAME"));
                    }
                },
                100, TimeUnit.SECONDS);

        // TODO 10.2 关联 tm
        SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithTmDS = AsyncDataStream.unorderedWait(reduceWithSpuDS,
                new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_TRADEMARK") {
                    @Override
                    public String getKey(TradeUserSpuOrderBean input) {
                        return input.getTrademarkId();
                    }

                    @Override
                    public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
                        pojo.setTrademarkName(dimInfo.getString("TM_NAME"));
                    }
                },
                100, TimeUnit.SECONDS);
        // TODO 10.3 关联 category3
        SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate3DS = AsyncDataStream.unorderedWait(reduceWithTmDS,
                new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY3") {
                    @Override
                    public String getKey(TradeUserSpuOrderBean input) {
                        return input.getCategory3Id();
                    }

                    @Override
                    public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
                        pojo.setCategory3Name(dimInfo.getString("NAME"));
                        pojo.setCategory2Id("CATEGORY2_ID");
                    }
                },
                100, TimeUnit.SECONDS);
        // TODO 10.4 关联 category2
        SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate2DS = AsyncDataStream.unorderedWait(reduceWithCate3DS,
                new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY2") {
                    @Override
                    public String getKey(TradeUserSpuOrderBean input) {
                        return input.getCategory2Id();
                    }

                    @Override
                    public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
                        pojo.setCategory2Name(dimInfo.getString("NAME"));
                        pojo.setCategory1Id("CATEGORY1_ID");
                    }
                },
                100, TimeUnit.SECONDS);
        // TODO 10.5 关联 category1
        SingleOutputStreamOperator<TradeUserSpuOrderBean> reduceWithCate1DS = AsyncDataStream.unorderedWait(reduceWithCate2DS,
                new DimAsyncFunction<TradeUserSpuOrderBean>("DIM_BASE_CATEGORY1") {
                    @Override
                    public String getKey(TradeUserSpuOrderBean input) {
                        return input.getCategory1Id();
                    }

                    @Override
                    public void addAttribute(TradeUserSpuOrderBean pojo, JSONObject dimInfo) {
                        pojo.setCategory1Name(dimInfo.getString("NAME"));
                    }
                },
                100, TimeUnit.SECONDS);

        // TODO 11. 写出到 clickhouse
        reduceWithCate1DS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_trademark_category_user_order_window values " +
                "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));

        // TODO 12. 启动
        env.execute("DwsTradeUserSpuOrderWindow");

总结

        至此,DWS 层这两张表终于完成,耗时一天,第二张表是目前为止最复杂的一张,也是 DWS 层第一次关联维表;

        DWS 层的需求只剩两个,明天应该一上午就能完成,毕竟和这个需求差不多;ADS 层就简单了,明天下午应该就能结束;

        关于异步IO,我把链接放这,之后复习再来查看:异步 I/O | Apache Flink

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1989028.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux-WMware Tools安装失败“segmentation fault”解决方法】

VMware版本&#xff1a;17 Ubuntu版本: 22.04 安装常规办法&#xff0c;通过vmware安装Tool&#xff0c;安装显示报错&#xff1a;“segmentation fault”&#xff0c;查了下可能是tool和ubuntu版本不兼容导致的。解决办法&#xff1a;通过命令行逐次安装。 1、sudo apt insta…

PostgreSQL(二十五)PG_FDW的使用

目录 一、FDW的简介与特性 二、pg_fdw的部署与使用 1、编译postgres_fdw 2、添加postgres_fdw 3、创建FDW服务器 ​4、授权并创建用户映射 5、客户端创建FDW测试表 6、访问外部表 ​7、可能出现的问题 三、FDW的执行原理 1、PG-PG访问过程描述 2、PG-PG访问过程查看…

一部分优化算法

一、优化问题 1、优化目标 &#xff08;1&#xff09;优化和深度学习的目标是根本不同的。前者主要关注的是最小化目标&#xff0c;后者则关注在给定有限数据量的情况下寻找合适的模型。 &#xff08;2&#xff09;优化算法的目标函数通常是基于训练数据集的损失函数&#x…

springboot 定义类导入爆红,@Autowried自动注入失败

springboot 定义类导入爆红&#xff0c;Autowried自动注入失败 根据提供的异常信息&#xff0c;分析如下&#xff1a; 异常起因&#xff1a;UnsatisfiedDependencyException 表示在创建名为 ‘a1001Service’ 的 bean 时存在依赖问题&#xff0c;具体是在字段 ‘a1001Mapper’ …

算法日记day 32(贪心之划分字母区间|合并区间|单调递增的数字|监控二叉树)

一、划分字母区间 题目&#xff1a; 给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段&#xff0c;同一字母最多出现在一个片段中。 注意&#xff0c;划分结果需要满足&#xff1a;将所有划分结果按顺序连接&#xff0c;得到的字符串仍然是 s 。 返回一个表示每…

nginx 405错误是什么意思

405错误&#xff1a;方法不被允许 当Web服务器收到一个它不支持的HTTP请求方法时&#xff0c;就会返回405错误。 原因 405错误通常是由于客户端发出了不兼容或不支持的HTTP请求方法。例如&#xff0c;客户端可能请求一个只能通过GET方法访问的资源&#xff0c;但使用了POST方…

C代码做底层及Matlab_SimuLink做应用层设计单片机程序

前言:SimuLink工具极其强大,但是能直接支持单片机自主开发的很少,造成这个问题的原因主要是我们使用的芯片底层多是C代码工程,芯片厂家也只提供C代码库,很少能提供SimuLink的支持库,即使提供也不是很不完善,如NXP的一些芯片提供的SimuLink库不含盖高级应用,再比如意法半…

视创云展:轻松构建出独一无二的元宇宙空间

视创云展作为一款前沿的元宇宙数字营销平台&#xff0c;集成了多项核心技术&#xff0c;旨在为用户提供低门槛、高效能的元宇宙体验与创作工具。其核心技术主要包括&#xff1a; 1、低门槛、模块化&#xff0c;3D场景创作工具 视创云展集成了海量的元宇宙场景模板&#xff0c;…

SpringBoot中如何自定义自己的过滤器Filter(简易版)

本文不再说SpringMVC中的写法&#xff0c;毕竟现在项目都是SpringBoot&#xff0c;我们还是尽量使用SpringBoot的写法&#xff0c;首先了解一下Filter。 说白了&#xff0c;就是在请求到达服务器之前进行拦截&#xff0c;一般使用场景是拦截登录进行权限校验&#xff0c;当然一…

跟李沐学AI:GoogLeNet含并行连结的网络

Inception块 GoogleNet中的基本卷积块&#xff0c;从4个路径从不同层面抽取信息&#xff0c;然后再输出通道维合并。 数据输入后共有四条路径&#xff1a;第一个路径为1x1的卷积层&#xff1b;第二个路径先用1x1的卷积层修改通道数&#xff0c;在输入到3x3的卷积层&#xff0c…

腾讯云AI代码助手助力软件开发体验分享

引言 现在&#xff0c;AI工具在软件开发中变得越来越重要&#xff0c;它们能显著提升效率和代码质量。本文就来分享一下我用腾讯云AI代码助手的经历&#xff0c;看看它是怎么在开发中帮了大忙的。 开发环境介绍 这次的项目用的是JavaScript&#xff0c;开发环境是Windows 10…

用于胰腺癌自动化综述报告和可切除性分类的大型语言模型| 文献速递-基于深度学习的乳房、前列腺疾病诊断系统

Title 题目 Large Language Models for Automated Synoptic Reports and Resectability Categorization in Pancreatic Cancer 用于胰腺癌自动化综述报告和可切除性分类的大型语言模型 Background 背景 Structured radiology reports for pancreatic ductal adenocarcinom…

condition字符串匹配问题

概述 freeswitch是一款简单好用的VOIP开源软交换平台。 fs使用dialplan配置文件执行业务流程&#xff0c;condition条件变量的配置是必然会使用的&#xff0c;这里记录一次配置过程中的错误示范。 环境 CentOS 7.9 freeswitch 1.10.7 问题描述 dialplan配置如下&#xf…

如何做到项目真实性优化?保姆级写简历指南第五弹!

大家好&#xff0c;我是程序员鱼皮。做知识分享这些年来&#xff0c;我看过太多简历、也帮忙修改过很多的简历&#xff0c;发现很多同学是完全不会写简历的、会犯很多常见的问题&#xff0c;不能把自己的优势充分展示出来&#xff0c;导致措施了很多面试机会&#xff0c;实在是…

pdf拆分需要怎么做?6个软件帮助你快速拆分pdf文件

pdf拆分需要怎么做&#xff1f;6个软件帮助你快速拆分pdf文件 拆分PDF文件可以让你更方便地处理和管理文档内容&#xff0c;无论是提取特定页面还是将文件分成更小的部分。以下是六款帮助你快速拆分PDF文件的软件&#xff0c;每款软件都有其独特的功能和优势&#xff0c;供你选…

4章4节:临床数据科学中如何用R来进行缺失值的处理

在临床科研中,由于失访、无应答或记录不清等各种原因,经常会遇到数据缺失的问题。本文将深入探讨医学科研中数据缺失的成因、分类、影响以及应对方法,结合R语言的实际应用,为医学研究人员提供全面的解决方案。 一、认识缺失数据 其实,很多医学的纵向研究因获取数据资料时…

铜山金杏·打响区域公用品牌,助力乡村振兴新征程

为进一步提升徐州市铜山区农业产业发展的重要战略布局&#xff0c;从而更好地助力乡村振兴&#xff0c;徐州市铜山区农业农村局借助“铜山金杏地理标志农产品保护工程项目”联合山东百仕达地标产业有限公司打造铜山农特产品区域公用品牌——“铜山金杏”&#xff0c;充分挖掘铜…

大搜罗2024年数据恢复软件TOP3,互联网人士的年度推荐!

不管是咱们普通人&#xff0c;还是大大小小的公司&#xff0c;都怕数据一去不复返。好在科技给力&#xff0c;现在数据恢复软件越来越牛&#xff0c;帮我们解决了这块心病。今儿个&#xff0c;就给大家聊聊几款2024年超火的数据恢复软件&#xff0c;如转转大师数据恢复软件等&a…

防盗、防泄露、防篡改,我们把 ZooKeeper 的这种认证模式玩明白了

作者&#xff1a;子葵 你的 ZooKeeper 安全吗&#xff1f; 在当下网络安全事件频发的背景下&#xff0c;安全防护的构建成为日常开发与运维工作中的重中之重。ZooKeeper 存储着系统敏感实例信息与配置数据&#xff0c;但传统的使用方式并未为 ZooKeeper 配备强制身份验证机制…

Ubuntu查看IP地址

Ubuntu查看IP地址 文章目录 Ubuntu查看IP地址查看IP地址的命令遇到的问题解决方案 查看IP地址的命令 1.使用ifconfig 2.使用hostname -I&#xff08;-之前一个空格&#xff09; 3.使用ip addr show 遇到的问题 1.使用ifconfig只显示一个127.0.0.1地址 2.使用hostname -I …