物流实时数仓:数仓搭建(DWS)二

news2025/1/10 3:18:12

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
物流实时数仓:数仓搭建(DWD)二
物流实时数仓:数仓搭建(DWS)一
物流实时数仓:数仓搭建(DWS)二


文章目录

  • 系列文章目录
  • 前言
  • 一、代码编写
    • 1.修复错误
    • 2.交易域货物类型粒度下单当日汇总表
      • 1.交易域货物类型下单聚合统计实体类
      • 2.交易域:货物类型下单数以及下单金额聚合统计
      • 3.CK建表
    • 3.交易域机构粒度下单当日汇总表
      • 1.交易域货物类型下单聚合统计实体类
      • 2.交易域机构粒度下单统计
      • 3.CK建表
    • 4.物流域转运完成当日汇总表
      • 1.物流域转运完成实体类
      • 2.物流域转运完成统计
      • 3.CK建表
    • 5.物流域发单当日汇总表
      • 1.物流域发单统计实体类
      • 2.物流域发单聚合统计
      • 3.CK建表
    • 6.物流域机构粒度派送成功当日汇总表
      • 1.物流域机构派送成功统计实体类
      • 2.物流域机构派送成功统计
      • 3.CK建表
    • 7.物流域机构粒度揽收当日汇总表
      • 1.物流域机构粒度揽收统计实体类
      • 2.物流域机构粒度揽收聚合统计
      • 3.CK建表
    • 8.物流域机构卡车类别粒度运输完成当日汇总表
      • 1.物流域机构卡车类别粒度统计实体类
      • 2.物流域机构卡车类别粒度聚合统计
      • 3.CK建表
  • 二、代码测试
    • 1.修改topic分区数
    • 2.集群启动
    • 3.代码测试
      • 1.交易域货物类型粒度下单当日汇总表
      • 2.交易域机构粒度下单当日汇总表
      • 3.物流域转运完成当日汇总表
      • 4.物流域发单当日汇总表
      • 5.物流域机构粒度派送成功当日汇总表
      • 6.物流域机构粒度揽收当日汇总表
      • 7.物流域机构卡车类别粒度运输完成当日汇总表
  • 总结


前言

上一次的博客中,我们编写了很多第三方的工具类,所以剩下的内容搭建会简单一些。


一、代码编写

1.修复错误

在后期编写代码测试的时候,发现了一个之前代码的错误。
在dwd层中的DwdTransTransFinish文件,在计算TransportTime参数时出现了负数,后来发现是两个数值做差的时候位置错了,要修改一下。
在代码约67行的位置。

源代码
finishBean.setTransportTime(Long.parseLong(finishBean.getActualStartTime()) - Long.parseLong(finishBean.getActualEndTime()));
修改后
finishBean.setTransportTime(Long.parseLong(finishBean.getActualEndTime()) - Long.parseLong(finishBean.getActualStartTime()));

由于代码错误,所以我们已经将错误的代码写入了kafka,所以我们需要删除之前的topic,然后从新生成一个。

kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic tms_dwd_trans_trans_finish
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_trans_finish

2.交易域货物类型粒度下单当日汇总表

要求:统计当日各货物类型下单次数和金额。

1.交易域货物类型下单聚合统计实体类

DwsTradeCargoTypeOrderDayBean.java

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

import java.math.BigDecimal;

/**
 * 交易域货物类型下单聚合统计实体类
 */
@Data
@Builder
public class DwsTradeCargoTypeOrderDayBean {
    // 当前日期
    String curDate;

    // 货物类型ID
    String cargoType;

    // 货物类型名称
    String cargoTypeName;

    // 下单金额
    BigDecimal orderAmountBase;


    // 下单次数
    Long orderCountBase;

    // 时间戳
    Long ts;
}

2.交易域:货物类型下单数以及下单金额聚合统计

DwsTradeCargoTypeOrderDay.java

package com.atguigu.tms.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTradeOrderDetailBean;
import com.atguigu.tms.realtime.beans.DwsTradeCargoTypeOrderDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

//交易域:货物类型下单数以及下单金额聚合统计
public class DwsTradeCargoTypeOrderDay {
    public static void main(String[] args) throws Exception {
        // 环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);

        // 从Kafka读取数据
        String topic = "tms_dwd_trade_order_detail";
        String groupId = "dws_trade_cargo_type_order_group";

        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");


        // 对流中的数据进行类型转换 jsonStr->实体类对象
        SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> mapDS = kafkaStrDS.map(
                new MapFunction<String, DwsTradeCargoTypeOrderDayBean>() {
                    @Override
                    public DwsTradeCargoTypeOrderDayBean map(String JsonStr) throws Exception {
                        DwdTradeOrderDetailBean dwdTradeOrderDetailBean = JSON.parseObject(JsonStr, DwdTradeOrderDetailBean.class);
                        DwsTradeCargoTypeOrderDayBean bean = DwsTradeCargoTypeOrderDayBean.builder()
                                .cargoType(dwdTradeOrderDetailBean.getCargoType())
                                .orderAmountBase(dwdTradeOrderDetailBean.getAmount())
                                .orderCountBase(1L)
                                .ts(dwdTradeOrderDetailBean.getTs() + 8 * 60 * 60 * 1000)
                                .build();
                        return bean;
                    }
                }
        );

        // 指定Watermark以及提起事件时间字段
        SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> withWatermarkDS = mapDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<DwsTradeCargoTypeOrderDayBean>forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<DwsTradeCargoTypeOrderDayBean>() {
                                    @Override
                                    public long extractTimestamp(DwsTradeCargoTypeOrderDayBean element, long l) {
                                        return element.getTs();
                                    }
                                }
                        )
        );

        // 按照货物类型进行分组
        KeyedStream<DwsTradeCargoTypeOrderDayBean, String> keyedDS = withWatermarkDS.keyBy(DwsTradeCargoTypeOrderDayBean::getCargoType);

        // 开窗
        WindowedStream<DwsTradeCargoTypeOrderDayBean, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.days(1)));

        // 指定自定义触发器
        WindowedStream<DwsTradeCargoTypeOrderDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<DwsTradeCargoTypeOrderDayBean>());

        // 聚合计算
        SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> aggregateDS = triggerDS.aggregate(
                new MyAggregationFunction<DwsTradeCargoTypeOrderDayBean>() {
                    @Override
                    public DwsTradeCargoTypeOrderDayBean add(DwsTradeCargoTypeOrderDayBean value, DwsTradeCargoTypeOrderDayBean accumulator) {
                        if (accumulator == null) {
                            return value;
                        }
                        accumulator.setOrderAmountBase(value.getOrderAmountBase().add(accumulator.getOrderAmountBase()));
                        accumulator.setOrderCountBase(value.getOrderCountBase() + accumulator.getOrderCountBase());
                        return accumulator;
                    }
                },
                new ProcessWindowFunction<DwsTradeCargoTypeOrderDayBean, DwsTradeCargoTypeOrderDayBean, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<DwsTradeCargoTypeOrderDayBean, DwsTradeCargoTypeOrderDayBean, String, TimeWindow>.Context context, Iterable<DwsTradeCargoTypeOrderDayBean> elements, Collector<DwsTradeCargoTypeOrderDayBean> out) throws Exception {
                        long sst = context.window().getStart() - 8 * 60 * 60 * 1000L;
                        for (DwsTradeCargoTypeOrderDayBean bean : elements) {
                            String curDate = DateFormatUtil.toDate(sst);
                            bean.setCurDate(curDate);
                            bean.setTs(System.currentTimeMillis());
                            out.collect(bean);
                        }
                    }
                }
        );

        // 关联货物维度
        SingleOutputStreamOperator<DwsTradeCargoTypeOrderDayBean> withCargoTypeDS = AsyncDataStream.unorderedWait(
                aggregateDS,
                new DimAsyncFunction<DwsTradeCargoTypeOrderDayBean>("dim_base_dic") {
                    @Override
                    public void join(DwsTradeCargoTypeOrderDayBean bean, JSONObject dimInfoJsonObj) {
                        bean.setCargoTypeName(dimInfoJsonObj.getString("name"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsTradeCargoTypeOrderDayBean bean) {
                        return Tuple2.of("id",bean.getCargoType());
                    }
                },
                60,
                TimeUnit.SECONDS
        );


        // 将结果写入ck
        withCargoTypeDS.print(">>>>");
        withCargoTypeDS.addSink(
                ClickHouseUtil.getJdbcSink("insert into dws_trade_cargo_type_order_day_base values(?,?,?,?,?,?)")
        );

        env.execute();
    }
}

3.CK建表

在我们创建的tms_realtime数据库中建表和视图。

CREATE TABLE IF NOT EXISTS dws_trade_cargo_type_order_day_base
(
    `cur_date` Date COMMENT '统计日期',
    `cargo_type` String COMMENT '货物类型ID',
    `cargo_type_name` String COMMENT '货物类型名称',
    `order_amount_base` Decimal(38, 20) COMMENT '下单金额',
    `order_count_base` UInt64 COMMENT '下单次数',
    `ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, cargo_type, cargo_type_name);

CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trade_cargo_type_order_day
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, cargo_type, cargo_type_name) AS
SELECT
    cur_date,
    cargo_type,
    cargo_type_name,
    argMaxState(order_amount_base, ts) AS order_amount,
    argMaxState(order_count_base, ts) AS order_count
FROM dws_trade_cargo_type_order_day_base
GROUP BY
    cur_date,
    cargo_type,
    cargo_type_name;

3.交易域机构粒度下单当日汇总表

要求:统计当日各机构下单次数和金额,并补充城市维度信息

1.交易域货物类型下单聚合统计实体类

DwsTradeOrgOrderDayBean.java

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

import java.math.BigDecimal;

/**
 * 交易域货物类型下单聚合统计实体类
 */
@Data
@Builder
public class DwsTradeOrgOrderDayBean {

    // 日期
    String curDate;

    // 机构ID
    String orgId;

    // 机构名称
    String orgName;

    // 城市ID
    String cityId;

    // 城市名称
    String cityName;

    // 发货人区县ID
    @TransientSink
    String senderDistrictId;

    // 下单金额
    BigDecimal orderAmountBase;

    // 下单次数
    Long orderCountBase;

    // 时间戳
    Long ts;
}

2.交易域机构粒度下单统计

DwsTradeOrgOrderDay.java

package com.atguigu.tms.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTradeOrderDetailBean;
import com.atguigu.tms.realtime.beans.DwsTradeOrgOrderDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

// 交易域:机构粒度下单聚合统计
public class DwsTradeOrgOrderDay {
    public static void main(String[] args) throws Exception {
        // 环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);

        // 从kafka的下单实时表中读取数据
        String topic = "tms_dwd_trade_order_detail";
        String groupId = "dws_trade_org_order_group";

        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");

        // 对读取的数据进行类型转换
        SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> mapDS = kafkaStrDS.map(
                new MapFunction<String, DwsTradeOrgOrderDayBean>() {
                    @Override
                    public DwsTradeOrgOrderDayBean map(String jsonStr) throws Exception {
                        DwdTradeOrderDetailBean dwdTradeOrderDetailBean = JSON.parseObject(jsonStr, DwdTradeOrderDetailBean.class);
                        DwsTradeOrgOrderDayBean bean = DwsTradeOrgOrderDayBean.builder()
                                .senderDistrictId(dwdTradeOrderDetailBean.getSenderDistrictId())
                                .cityId(dwdTradeOrderDetailBean.getSenderCityId())
                                .orderAmountBase(dwdTradeOrderDetailBean.getAmount())
                                .orderCountBase(1L)
                                .ts(dwdTradeOrderDetailBean.getTs())
                                .build();

                        return bean;
                    }
                }
        );

        // 关联机构维度
        SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> withOrgDS = AsyncDataStream.unorderedWait(
                mapDS,
                new DimAsyncFunction<DwsTradeOrgOrderDayBean>("dim_base_organ") {
                    @Override
                    public void join(DwsTradeOrgOrderDayBean bean, JSONObject dimInfoJsonObj) {
                        bean.setOrgId(dimInfoJsonObj.getString("id"));
                        bean.setOrgName(dimInfoJsonObj.getString("org_name"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsTradeOrgOrderDayBean bean) {
                        return Tuple2.of("region_id", bean.getSenderDistrictId());
                    }
                },
                60, TimeUnit.SECONDS
        );


        // 指定Watermark
        SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> withWatermarkDS = withOrgDS.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<DwsTradeOrgOrderDayBean>forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<DwsTradeOrgOrderDayBean>() {
                                    @Override
                                    public long extractTimestamp(DwsTradeOrgOrderDayBean element, long l) {
                                        return element.getTs();
                                    }
                                }
                        )

        );

        // 按照机构id进行分组
        KeyedStream<DwsTradeOrgOrderDayBean, String> keyedDS = withWatermarkDS.keyBy(DwsTradeOrgOrderDayBean::getOrgId);

        // 开窗
        WindowedStream<DwsTradeOrgOrderDayBean, String, TimeWindow> windowDS = keyedDS.window(TumblingEventTimeWindows.of(Time.days(1)));

        // 指定自定义触发器
        WindowedStream<DwsTradeOrgOrderDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<DwsTradeOrgOrderDayBean>());

        // 聚合
        SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> aggregateDS = triggerDS.aggregate(
                new MyAggregationFunction<DwsTradeOrgOrderDayBean>() {
                    @Override
                    public DwsTradeOrgOrderDayBean add(DwsTradeOrgOrderDayBean value, DwsTradeOrgOrderDayBean accumulator) {
                        if (accumulator == null) {
                            return value;
                        }
                        accumulator.setOrderAmountBase(value.getOrderAmountBase().add(accumulator.getOrderAmountBase()));
                        accumulator.setOrderCountBase(value.getOrderCountBase() + accumulator.getOrderCountBase());
                        return accumulator;
                    }
                },
                new ProcessWindowFunction<DwsTradeOrgOrderDayBean, DwsTradeOrgOrderDayBean, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<DwsTradeOrgOrderDayBean, DwsTradeOrgOrderDayBean, String, TimeWindow>.Context context, Iterable<DwsTradeOrgOrderDayBean> elements, Collector<DwsTradeOrgOrderDayBean> out) throws Exception {
                        long stt = context.window().getStart() - 8 * 60 * 60 * 1000;
                        String curDare = DateFormatUtil.toDate(stt);
                        for (DwsTradeOrgOrderDayBean bean : elements) {
                            bean.setCurDate(curDare);
                            bean.setTs(System.currentTimeMillis());
                            out.collect(bean);
                        }
                    }
                }
        );

        // 补充城市维度信息
        SingleOutputStreamOperator<DwsTradeOrgOrderDayBean> withCityDS = AsyncDataStream.unorderedWait(
                aggregateDS,
                new DimAsyncFunction<DwsTradeOrgOrderDayBean>("dim_base_region_info") {
                    @Override
                    public void join(DwsTradeOrgOrderDayBean bean, JSONObject dimInfoJsonObj) {
                        bean.setCityName(dimInfoJsonObj.getString("name"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsTradeOrgOrderDayBean bean) {
                        return Tuple2.of("id", bean.getCityId());
                    }
                },
                60, TimeUnit.SECONDS
        );

        // 将结果写入ck中

        withCityDS.print(">>>");
        withCityDS.addSink(
                ClickHouseUtil.getJdbcSink("insert into dws_trade_org_order_day_base values(?,?,?,?,?,?,?,?)")
        );

        env.execute();

    }
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trade_org_order_day_base
(
    `cur_date` Date COMMENT '统计日期',
    `org_id` String COMMENT '机构ID',
    `org_name` String COMMENT '机构名称',
    `city_id` String COMMENT '城市ID',
    `city_name` String COMMENT '城市名称',
    `order_amount_base` Decimal(38, 20) COMMENT '下单金额',
    `order_count_base` UInt64 COMMENT '下单次数',
    `ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name);

CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trade_org_order_day
(
    `cur_date` Date, 
    `org_id` String, 
    `org_name` String, 
    `city_id` String, 
    `city_name` String, 
    `order_amount` AggregateFunction(argMax, Decimal(38, 20), UInt64), 
    `order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name)
SETTINGS index_granularity = 8192 AS
SELECT 
    cur_date, 
    org_id, 
    org_name, 
    city_id, 
    city_name, 
    argMaxState(order_amount_base, ts) AS order_amount, 
    argMaxState(order_count_base, ts) AS order_count
FROM dws_trade_org_order_day_base
GROUP BY 
    cur_date, 
    org_id, 
    org_name, 
    city_id, 
    city_name;

4.物流域转运完成当日汇总表

要求:统计当日转运完成运单数,写入ClickHouse对应表。

1.物流域转运完成实体类

DwsTransBoundFinishDayBean.java

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;
/**
 * 物流域转运完成实体类
 */
@Data
@Builder
public class DwsTransBoundFinishDayBean {
    // 统计日期
    String curDate;

    // 转运完成次数
    Long boundFinishOrderCountBase;

    // 时间戳
    Long ts;
}

2.物流域转运完成统计

DwsTransBoundFinishDay.java

package com.atguigu.tms.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransDispatchDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransBoundFinishDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
/**
 * 物流域转运完成统计
 */
public class DwsTransBoundFinishDay {
    public static void main(String[] args) throws Exception {
        // TODO 1. 环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);

        // 并行度设置,部署时应注释,通过 args 指定全局并行度
        env.setParallelism(4);

        // TODO 2. 从 Kafka tms_dwd_trans_bound_finish_detail 主题读取数据
        String topic = "tms_dwd_trans_bound_finish_detail";
        String groupId = "dws_trans_bound_finish_day";

        KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> source = env
                .fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");

        // TODO 3. 转换数据结构
        SingleOutputStreamOperator<DwsTransBoundFinishDayBean> mappedStream = source.map(jsonStr -> {
            DwdTransDispatchDetailBean dispatchDetailBean = JSON.parseObject(jsonStr, DwdTransDispatchDetailBean.class);
            return DwsTransBoundFinishDayBean.builder()
                    .boundFinishOrderCountBase(1L)
                    .ts(dispatchDetailBean.getTs() + 8 * 60 * 60 * 1000L)
                    .build();
        });

        // TODO 4. 设置水位线
        SingleOutputStreamOperator<DwsTransBoundFinishDayBean> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<DwsTransBoundFinishDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L))
                        .withTimestampAssigner(new SerializableTimestampAssigner<DwsTransBoundFinishDayBean>() {
                            @Override
                            public long extractTimestamp(DwsTransBoundFinishDayBean element, long recordTimestamp) {
                                return element.getTs();
                            }
                        })
        ).uid("watermark_stream");

        // TODO 5. 开窗
        AllWindowedStream<DwsTransBoundFinishDayBean, TimeWindow> windowedStream =
                withWatermarkStream.windowAll(TumblingEventTimeWindows.of(
                        org.apache.flink.streaming.api.windowing.time.Time.days(1L)));

        // TODO 6. 引入触发器
        AllWindowedStream<DwsTransBoundFinishDayBean, TimeWindow> triggerStream = windowedStream.trigger(
                new MyTriggerFunction<DwsTransBoundFinishDayBean>()
        );

        // TODO 7. 聚合
        SingleOutputStreamOperator<DwsTransBoundFinishDayBean> aggregatedStream = triggerStream.aggregate(
                new MyAggregationFunction<DwsTransBoundFinishDayBean>() {
                    public DwsTransBoundFinishDayBean add(DwsTransBoundFinishDayBean value, DwsTransBoundFinishDayBean accumulator) {
                        if (accumulator == null) {
                            return value;
                        }
                        accumulator.setBoundFinishOrderCountBase(
                                accumulator.getBoundFinishOrderCountBase() + value.getBoundFinishOrderCountBase()
                        );
                        return accumulator;
                    }
                },
                new ProcessAllWindowFunction<DwsTransBoundFinishDayBean, DwsTransBoundFinishDayBean, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<DwsTransBoundFinishDayBean> elements, Collector<DwsTransBoundFinishDayBean> out) throws Exception {
                        for (DwsTransBoundFinishDayBean element : elements) {
                            String curDate = DateFormatUtil.toDate(context.window().getStart() - 8 * 60 * 60 * 1000L);
                            // 补充统计日期字段
                            element.setCurDate(curDate);
                            // 补充时间戳字段
                            element.setTs(System.currentTimeMillis());
                            out.collect(element);
                        }
                    }
                }
        ).uid("aggregate_stream");

        // TODO 8. 写出到 ClickHouse
        aggregatedStream.print(">>>>");
        aggregatedStream.addSink(
                ClickHouseUtil.getJdbcSink("insert into dws_trans_bound_finish_day_base values(?,?,?)")
        ).uid("clickhouse_sink");

        env.execute();
    }
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trans_bound_finish_day_base
(
    `cur_date` Date COMMENT '统计日期',
    `bound_finish_order_count_base` UInt64 COMMENT '转运完成次数',
    `ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY cur_date;

CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_bound_finish_day
(
    `cur_date` Date, 
    `bound_finish_order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY cur_date
SETTINGS index_granularity = 8192 AS
SELECT 
    cur_date, 
    argMaxState(bound_finish_order_count_base, ts) AS bound_finish_order_count
FROM dws_trans_bound_finish_day_base
GROUP BY cur_date;

5.物流域发单当日汇总表

要求:统计当日发单数,写入ClickHouse。

1.物流域发单统计实体类

DwsTransDispatchDayBean.java

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

/**
 * 物流域发单统计实体类
 */
@Data
@Builder
public class DwsTransDispatchDayBean {
    // 统计日期
    String curDate;

    // 发单数
    Long dispatchOrderCountBase;

    // 时间戳
    Long ts;
}

2.物流域发单聚合统计

DwsTransDispatchDay.java

package com.atguigu.tms.realtime.app.dws;


import com.alibaba.fastjson.JSON;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransDispatchDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransDispatchDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


/**
 * 物流域发单聚合统计
 */
public class DwsTransDispatchDay {
    public static void main(String[] args) throws Exception {
        // TODO 1. 环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);

        // 并行度设置,部署时应注释,通过 args 指定全局并行度
        env.setParallelism(4);

        // TODO 2. 从 Kafka tms_dwd_trans_dispatch_detail 主题读取数据
        String topic = "tms_dwd_trans_dispatch_detail";
        String groupId = "dws_trans_dispatch_day";

        KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> source = env
                .fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");

        // TODO 3. 转换数据结构
        SingleOutputStreamOperator<DwsTransDispatchDayBean> mappedStream = source.map(jsonStr -> {
            DwdTransDispatchDetailBean dispatchDetailBean = JSON.parseObject(jsonStr, DwdTransDispatchDetailBean.class);
            return DwsTransDispatchDayBean.builder()
                    .dispatchOrderCountBase(1L)
                    .ts(dispatchDetailBean.getTs() + 8 * 60 * 60 * 1000L)
                    .build();
        });

        // TODO 4. 设置水位线
        SingleOutputStreamOperator<DwsTransDispatchDayBean> withWatermarkStream = mappedStream.assignTimestampsAndWatermarks(
//                WatermarkStrategy.<DwsTransDispatchDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L))
                WatermarkStrategy.<DwsTransDispatchDayBean>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<DwsTransDispatchDayBean>() {
                            @Override
                            public long extractTimestamp(DwsTransDispatchDayBean element, long recordTimestamp) {
                                return element.getTs();
                            }
                        })
        ).uid("watermark_stream");

        // TODO 5. 开窗
        AllWindowedStream<DwsTransDispatchDayBean, TimeWindow> windowedStream =
                withWatermarkStream.windowAll(TumblingEventTimeWindows.of(
                        org.apache.flink.streaming.api.windowing.time.Time.days(1L)));

        // TODO 6. 引入触发器
        AllWindowedStream<DwsTransDispatchDayBean, TimeWindow> triggerStream = windowedStream.trigger(
                new MyTriggerFunction<DwsTransDispatchDayBean>()
        );

        // TODO 7. 聚合
        SingleOutputStreamOperator<DwsTransDispatchDayBean> aggregatedStream = triggerStream.aggregate(
                new MyAggregationFunction<DwsTransDispatchDayBean>() {
                    @Override
                    public DwsTransDispatchDayBean add(DwsTransDispatchDayBean value, DwsTransDispatchDayBean accumulator) {
                        if (accumulator == null) {
                            return value;
                        }
                        accumulator.setDispatchOrderCountBase(
                                accumulator.getDispatchOrderCountBase() + value.getDispatchOrderCountBase()
                        );
                        return accumulator;
                    }
                },
                new ProcessAllWindowFunction<DwsTransDispatchDayBean, DwsTransDispatchDayBean, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<DwsTransDispatchDayBean> elements, Collector<DwsTransDispatchDayBean> out) throws Exception {
                        for (DwsTransDispatchDayBean element : elements) {
                            String curDate = DateFormatUtil.toDate(context.window().getStart() - 8 * 60 * 60 * 1000L);
                            // 补充统计日期字段
                            element.setCurDate(curDate);
                            // 补充时间戳字段
                            element.setTs(System.currentTimeMillis());
                            out.collect(element);
                        }
                    }
                }
        ).uid("aggregate_stream");

        // TODO 8. 写出到 ClickHouse
        aggregatedStream.print(">>>>");
        aggregatedStream.addSink(
                ClickHouseUtil.getJdbcSink("insert into dws_trans_dispatch_day_base values(?,?,?)")
        ).uid("clickhouse_stream");

        env.execute();
    }
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trans_dispatch_day_base
(
    `cur_date` Date COMMENT '统计日期',
    `dispatch_order_count_base` UInt64 COMMENT '发单数',
    `ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY cur_date;

CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_dispatch_day
(
    `cur_date` Date, 
    `dispatch_order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY cur_date
SETTINGS index_granularity = 8192 AS
SELECT 
    cur_date, 
    argMaxState(dispatch_order_count_base, ts) AS dispatch_order_count
FROM dws_trans_dispatch_day_base
GROUP BY cur_date;

6.物流域机构粒度派送成功当日汇总表

要求:统计当日各机构派送成功次数(运单数),写入ClickHouse。

1.物流域机构派送成功统计实体类

DwsTransOrgDeliverSucDayBean.java

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;
/**
 * 物流域机构派送成功统计实体类
 */
@Data
@Builder
public class DwsTransOrgDeliverSucDayBean {
    // 统计日期
    String curDate;

    // 机构 ID
    String orgId;

    // 机构名称
    String orgName;

    // 地区 ID
    @TransientSink
    String districtId;

    // 城市 ID
    String cityId;

    // 城市名称
    String cityName;

    // 省份 ID
    String provinceId;

    // 省份名称
    String provinceName;

    // 派送成功次数
    Long deliverSucCountBase;

    // 时间戳
    Long ts;
}

2.物流域机构派送成功统计

DwsTransOrgDeliverSucDay.java

package com.atguigu.tms.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransDeliverSucDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransOrgDeliverSucDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * 物流域机构派送成功统计
 */
public class DwsTransOrgDeliverSucDay {
    public static void main(String[] args) throws Exception {
        // TODO 1. 环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);

        // 并行度设置,部署时应注释,通过 args 指定全局并行度
        env.setParallelism(4);

        // TODO 2. 从 Kafka tms_dwd_trans_deliver_detail 主题读取数据
        String topic = "tms_dwd_trans_deliver_detail";
        String groupId = "dws_trans_org_deliver_suc_day";
        KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> source = env
                .fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");

        // TODO 3. 转换数据结构
        SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> mappedStream = source.map(jsonStr -> {
            DwdTransDeliverSucDetailBean dwdTransDeliverSucDetailBean = JSON.parseObject(jsonStr, DwdTransDeliverSucDetailBean.class);
            return DwsTransOrgDeliverSucDayBean.builder()
                    .districtId(dwdTransDeliverSucDetailBean.getReceiverDistrictId())
                    .cityId(dwdTransDeliverSucDetailBean.getReceiverCityId())
                    .provinceId(dwdTransDeliverSucDetailBean.getReceiverProvinceId())
                    .deliverSucCountBase(1L)
                    .ts(dwdTransDeliverSucDetailBean.getTs() + 8 * 60 * 60 * 1000L)
                    .build();
        });

        // TODO 4. 获取维度信息
        // 获取机构 ID
        SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withOrgIdStream = AsyncDataStream.unorderedWait(
                mappedStream,
                new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_organ") {
                    @Override
                    public void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj)  {
                        bean.setOrgId(dimJsonObj.getString("id"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsTransOrgDeliverSucDayBean bean) {
                        return Tuple2.of("region_id", bean.getDistrictId());
                    }
                },
                60, TimeUnit.SECONDS
        ).uid("with_org_id_stream");

        // TODO 5. 设置水位线
        SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withWatermarkStream = withOrgIdStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<DwsTransOrgDeliverSucDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L))
                        .withTimestampAssigner(new SerializableTimestampAssigner<DwsTransOrgDeliverSucDayBean>() {
                            @Override
                            public long extractTimestamp(DwsTransOrgDeliverSucDayBean element, long recordTimestamp) {
                                return element.getTs();
                            }
                        })
                        .withIdleness(Duration.ofSeconds(20))
        ).uid("watermark_stream");

        // TODO 6. 按照机构 ID 分组
        KeyedStream<DwsTransOrgDeliverSucDayBean, String> keyedStream = withWatermarkStream.keyBy(DwsTransOrgDeliverSucDayBean::getOrgId);

        // TODO 7. 开窗
        WindowedStream<DwsTransOrgDeliverSucDayBean, String, TimeWindow> windowStream =
                keyedStream.window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.days(1L)));

        // TODO 8. 引入触发器
        WindowedStream<DwsTransOrgDeliverSucDayBean, String, TimeWindow> triggerStream = windowStream.trigger(new MyTriggerFunction<>());

        // TODO 9. 聚合
        SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> aggregatedStream = triggerStream.aggregate(
                new MyAggregationFunction<DwsTransOrgDeliverSucDayBean>() {
                    @Override
                    public DwsTransOrgDeliverSucDayBean add(DwsTransOrgDeliverSucDayBean value, DwsTransOrgDeliverSucDayBean accumulator) {
                        if (accumulator == null) {
                            return value;
                        }
                        accumulator.setDeliverSucCountBase(
                                accumulator.getDeliverSucCountBase() + value.getDeliverSucCountBase()
                        );
                        return accumulator;
                    }
                },
                new ProcessWindowFunction<DwsTransOrgDeliverSucDayBean, DwsTransOrgDeliverSucDayBean, String, TimeWindow>() {
                    @Override
                    public void process(String key, Context context, Iterable<DwsTransOrgDeliverSucDayBean> elements, Collector<DwsTransOrgDeliverSucDayBean> out) throws Exception {
                        for (DwsTransOrgDeliverSucDayBean element : elements) {
                            long stt = context.window().getStart();
                            element.setCurDate(DateFormatUtil.toDate(stt - 8 * 60 * 60 * 1000L));
                            element.setTs(System.currentTimeMillis());
                            out.collect(element);
                        }
                    }
                }
        ).uid("aggregate_stream");

        // TODO 10. 补全维度信息
        // 10.1 补充机构名称
        SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withOrgNameAndRegionIdStream = AsyncDataStream.unorderedWait(
                aggregatedStream,
                new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_organ") {
                    @Override
                    public void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj){
                        bean.setOrgName(dimJsonObj.getString("org_name"));
                    }

                    @Override
                    public Tuple2<String,String> getCondition(DwsTransOrgDeliverSucDayBean bean) {
                        return Tuple2.of("id",bean.getOrgId());
                    }
                },
                60, TimeUnit.SECONDS
        ).uid("with_org_name_and_region_id_stream");

        // 10.2 补充城市名称
        SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> withCityNameStream = AsyncDataStream.unorderedWait(
                withOrgNameAndRegionIdStream,
                new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_region_info") {
                    @Override
                    public void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj) {
                        bean.setCityName(dimJsonObj.getString("name"));
                    }

                    @Override
                    public Tuple2<String,String> getCondition(DwsTransOrgDeliverSucDayBean bean) {
                        return Tuple2.of("id",bean.getCityId());
                    }
                },
                60, TimeUnit.SECONDS
        ).uid("with_city_name_stream");

        // 11.3 补充省份名称
        SingleOutputStreamOperator<DwsTransOrgDeliverSucDayBean> fullStream = AsyncDataStream.unorderedWait(
                withCityNameStream,
                new DimAsyncFunction<DwsTransOrgDeliverSucDayBean>("dim_base_region_info") {
                    @Override
                    public void join(DwsTransOrgDeliverSucDayBean bean, JSONObject dimJsonObj) {
                        bean.setProvinceName(dimJsonObj.getString("name"));
                    }

                    @Override
                    public Tuple2<String,String> getCondition(DwsTransOrgDeliverSucDayBean bean) {
                        return Tuple2.of("id",bean.getProvinceId());
                    }
                },
                60, TimeUnit.SECONDS
        ).uid("with_province_name_stream");

        // TODO 12. 写出到 ClickHouse
        fullStream.print(">>>");
        fullStream.addSink(
                ClickHouseUtil.getJdbcSink("insert into dws_trans_org_deliver_suc_day_base values(?,?,?,?,?,?,?,?,?)")
        ).uid("clickhouse_stream");

        env.execute();
    }
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trans_org_deliver_suc_day_base
(
    `cur_date` Date COMMENT '统计日期',
    `org_id` String COMMENT '机构ID',
    `org_name` String COMMENT '机构名称',
    `city_id` String COMMENT '城市ID',
    `city_name` String COMMENT '城市名称',
    `province_id` String COMMENT '地区ID',
    `province_name` String COMMENT '地区名称',
    `deliver_suc_count_base` UInt64 COMMENT '派送成功次数',
    `ts` UInt64 COMMENT '时间戳'
) 
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);

CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_org_deliver_suc_day
(
    `cur_date` Date, 
    `org_id` String, 
    `org_name` String, 
    `city_id` String, 
    `city_name` String, 
    `province_id` String, 
    `province_name` String, 
    `deliver_suc_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity = 8192 AS
SELECT 
    cur_date, 
    org_id, 
    org_name, 
    city_id, 
    city_name, 
    province_id, 
    province_name, 
    argMaxState(deliver_suc_count_base, ts) AS deliver_suc_count
FROM dws_trans_org_deliver_suc_day_base
GROUP BY 
    cur_date, 
    org_id, 
    org_name, 
    city_id, 
    city_name, 
    province_id, 
    province_name;

7.物流域机构粒度揽收当日汇总表

要求:统计当日各机构揽收次数,写入ClickHouse。

1.物流域机构粒度揽收统计实体类

DwsTransOrgReceiveDayBean.java

package com.atguigu.tms.realtime.beans;
import lombok.Builder;
import lombok.Data;

/**
 *物流域机构粒度揽收统计实体类
 */
@Data
@Builder
public class DwsTransOrgReceiveDayBean {

    // 统计日期
    String curDate;

    // 转运站ID
    String orgId;

    // 转运站名称
    String orgName;

    // 地区ID
    @TransientSink
    String districtId;

    // 城市ID
    String cityId;

    // 城市名称
    String cityName;

    // 省份ID
    String provinceId;

    // 省份名称
    String provinceName;

    // 揽收次数(一个订单算一次)
    Long receiveOrderCountBase;

    // 时间戳
    Long ts;
}

2.物流域机构粒度揽收聚合统计

DwsTransOrgReceiveDay.java

package com.atguigu.tms.realtime.app.dws;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransReceiveDetailBean;
import com.atguigu.tms.realtime.beans.DwsTransOrgReceiveDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 *物流域机构粒度揽收聚合统计
 */
public class DwsTransOrgReceiveDay {
    public static void main(String[] args) throws Exception {
        // TODO 1. 环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);

        // 并行度设置,部署时应注释,通过 args 指定全局并行度
        env.setParallelism(4);

        // TODO 2. 从指定主题读取数据
        String topic = "tms_dwd_trans_receive_detail";
        String groupId = "dws_trans_org_receive_day";
        KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> source = env
                .fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");

        // TODO 3. 转换数据结构
        SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> mappedStream = source.map(
                jsonStr -> {
                    DwdTransReceiveDetailBean dwdTransReceiveDetailBean = JSON.parseObject(jsonStr, DwdTransReceiveDetailBean.class);
                    return DwsTransOrgReceiveDayBean.builder()
                            .districtId(dwdTransReceiveDetailBean.getSenderDistrictId())
                            .provinceId(dwdTransReceiveDetailBean.getSenderProvinceId())
                            .cityId(dwdTransReceiveDetailBean.getSenderCityId())
                            .receiveOrderCountBase(1L)
                            .ts(dwdTransReceiveDetailBean.getTs() + 8 * 60 * 60 * 1000L)
                            .build();
                }
        );

        // TODO 4. 关联维度信息
        // 关联机构id
        SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withOrgIdStream = AsyncDataStream.unorderedWait(
                mappedStream,
                new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_organ") {
                    @Override
                    public void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {
                        bean.setOrgId(dimJsonObj.getString("id"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsTransOrgReceiveDayBean bean) {
                        return Tuple2.of("region_id", bean.getDistrictId());
                    }
                }, 5 * 60,
                TimeUnit.SECONDS
        ).uid("with_org_id_stream");

        // TODO 5. 设置水位线
        SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withWatermarkStream = withOrgIdStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<DwsTransOrgReceiveDayBean>forBoundedOutOfOrderness(Duration.ofSeconds(5L))
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<DwsTransOrgReceiveDayBean>() {
                                    @Override
                                    public long extractTimestamp(DwsTransOrgReceiveDayBean bean, long recordTimestamp) {
                                        return bean.getTs();
                                    }
                                }
                        )
        ).uid("watermark_stream");

        // TODO 7. 按照 orgID 分组
        KeyedStream<DwsTransOrgReceiveDayBean, String> keyedStream = withWatermarkStream.keyBy(DwsTransOrgReceiveDayBean::getOrgId);

        // TODO 8. 开窗
        WindowedStream<DwsTransOrgReceiveDayBean, String, TimeWindow> windowStream =
                keyedStream.window(TumblingEventTimeWindows.of(
                        org.apache.flink.streaming.api.windowing.time.Time.days(1L)));

        // TODO 9. 引入触发器
        WindowedStream<DwsTransOrgReceiveDayBean, String, TimeWindow> triggerStream = windowStream.trigger(
                new MyTriggerFunction<DwsTransOrgReceiveDayBean>()
        );

        // TODO 10. 聚合
        SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> aggregatedStream = triggerStream.aggregate(
                new MyAggregationFunction<DwsTransOrgReceiveDayBean>() {
                    @Override
                    public DwsTransOrgReceiveDayBean add(DwsTransOrgReceiveDayBean value, DwsTransOrgReceiveDayBean accumulator) {
                        if (accumulator == null) {
                            return value;
                        }
                        accumulator.setReceiveOrderCountBase(
                                accumulator.getReceiveOrderCountBase() + value.getReceiveOrderCountBase());
                        return accumulator;
                    }
                },
                new ProcessWindowFunction<DwsTransOrgReceiveDayBean, DwsTransOrgReceiveDayBean, String, TimeWindow>() {
                    @Override
                    public void process(String key, Context context, Iterable<DwsTransOrgReceiveDayBean> elements, Collector<DwsTransOrgReceiveDayBean> out) throws Exception {
                        for (DwsTransOrgReceiveDayBean element : elements) {
                            // 补全统计日期字段
                            String curDate = DateFormatUtil.toDate(context.window().getStart() - 8 * 60 * 60 * 1000L);
                            element.setCurDate(curDate);
                            // 补全时间戳
                            element.setTs(System.currentTimeMillis());
                            out.collect(element);
                        }
                    }
                }
        ).uid("aggregate_stream");

        // TODO 11. 补充维度信息
        // 11.1 补充转运站名称
        SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withOrgNameStream = AsyncDataStream.unorderedWait(
                aggregatedStream,
                new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_organ") {
                    @Override
                    public void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {
                        bean.setOrgName(dimJsonObj.getString("org_name"));
                    }

                    @Override
                    public Tuple2<String,String> getCondition(DwsTransOrgReceiveDayBean bean) {
                        return Tuple2.of("id",bean.getOrgId());
                    }
                },
                60, TimeUnit.SECONDS
        ).uid("with_org_name_stream");

        // 11.2 补充城市名称
        SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> withCityNameStream = AsyncDataStream.unorderedWait(
                withOrgNameStream,
                new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_region_info") {
                    @Override
                    public void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {
                        bean.setCityName(dimJsonObj.getString("name"));
                    }

                    @Override
                    public Tuple2<String,String> getCondition(DwsTransOrgReceiveDayBean bean) {
                        return Tuple2.of("id",bean.getCityId());
                    }
                },
                60, TimeUnit.SECONDS
        ).uid("with_city_name_stream");

        // 11.3 补充省份名称
        SingleOutputStreamOperator<DwsTransOrgReceiveDayBean> fullStream = AsyncDataStream.unorderedWait(
                withCityNameStream,
                new DimAsyncFunction<DwsTransOrgReceiveDayBean>("dim_base_region_info") {
                    @Override
                    public void join(DwsTransOrgReceiveDayBean bean, JSONObject dimJsonObj) {
                        bean.setProvinceName(dimJsonObj.getString("name"));
                    }

                    @Override
                    public Tuple2<String,String> getCondition(DwsTransOrgReceiveDayBean bean) {
                        return Tuple2.of("id",bean.getProvinceId());
                    }
                },
                60, TimeUnit.SECONDS
        ).uid("with_province_name_stream");

        // TODO 12. 写出到 ClickHouse
        fullStream.print(">>>");
        fullStream.addSink(
                ClickHouseUtil.getJdbcSink("insert into dws_trans_org_receive_day_base values(?,?,?,?,?,?,?,?,?)")
        ).uid("clickhouse_stream");

        env.execute();
    }
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trans_org_receive_day_base
(
    `cur_date`                 Date COMMENT '统计日期',
    `org_id`                   String COMMENT '转运站ID',
    `org_name`                 String COMMENT '转运站名称',
    `city_id`                  String COMMENT '城市ID',
    `city_name`                String COMMENT '城市名称',
    `province_id`                String COMMENT '地区ID',
    `province_name`              String COMMENT '地区名称',
    `receive_order_count_base` UInt64 COMMENT '揽收次数',
    `ts`                       UInt64 COMMENT '时间戳'
)
    ENGINE = MergeTree
        ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name);

CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_org_receive_day
(
    `cur_date` Date, 
    `org_id` String, 
    `org_name` String, 
    `city_id` String,
    `city_name` String,
    `province_id` String, 
    `province_name` String, 
    `receive_order_count` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, city_id, city_name, province_id, province_name)
SETTINGS index_granularity = 8192 AS
SELECT 
    cur_date, 
    org_id, 
    org_name, 
    city_id,
    city_name,
    province_id, 
    province_name, 
    argMaxState(receive_order_count_base, ts) AS receive_order_count
FROM dws_trans_org_receive_day_base
GROUP BY 
    cur_date, 
    org_id, 
    org_name,
    city_id,
    city_name, 
    province_id, 
    province_name;

8.物流域机构卡车类别粒度运输完成当日汇总表

要求:统计各机构各类别卡车当日运输完成次数、里程和历经时长,写入ClickHouse。

1.物流域机构卡车类别粒度统计实体类

DwsTransOrgTruckModelTransFinishDayBean.java

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

import java.math.BigDecimal;

/*
 * 物流域机构卡车类别粒度统计实体类
 */
@Data
@Builder
public class DwsTransOrgTruckModelTransFinishDayBean {
    // 统计日期
    String curDate;

    // 机构ID
    String orgId;

    // 机构名称
    String orgName;

    // 卡车ID
    @TransientSink
    String truckId;

    // 卡车型号ID
    String truckModelId;

    // 卡车型号名称
    String truckModelName;

    // 用于关联城市信息的一级机构ID
    @TransientSink
    String joinOrgId;

    // 城市ID
    String cityId;

    // 城市名称
    String cityName;

    // 运输完成次数
    Long transFinishCountBase;

    // 运输完成里程
    BigDecimal transFinishDistanceBase;

    // 运输完成历经时长
    Long transFinishDurTimeBase;

    // 时间戳
    Long ts;
}

2.物流域机构卡车类别粒度聚合统计

DwsTransOrgTruckModelTransFinishDay.java

package com.atguigu.tms.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.app.func.DimAsyncFunction;
import com.atguigu.tms.realtime.app.func.MyAggregationFunction;
import com.atguigu.tms.realtime.app.func.MyTriggerFunction;
import com.atguigu.tms.realtime.beans.DwdTransTransFinishBean;
import com.atguigu.tms.realtime.beans.DwsTransOrgTruckModelTransFinishDayBean;
import com.atguigu.tms.realtime.utils.ClickHouseUtil;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

// 物流域机构卡车类别粒度聚合统计
public class DwsTransOrgTruckModelTransFinishDay {
    public static void main(String[] args) throws Exception {
        // 环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);

        // 从Kafka的运输事实表中读取数据
        String topic = "tms_dwd_trans_trans_finish";
        String groupId = "dws_trans_org_truck_model_group";
        KafkaSource<String> kafkaConsumer = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> kafkaDS = env
                .fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");

        // 对流中数据进行类型转换 jsonStr->实体类 关联卡车维度
        SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> mapDS = kafkaDS.map(
                new MapFunction<String, DwsTransOrgTruckModelTransFinishDayBean>() {
                    @Override
                    public DwsTransOrgTruckModelTransFinishDayBean map(String jsonStr) throws Exception {
                        DwdTransTransFinishBean finishBean = JSON.parseObject(jsonStr, DwdTransTransFinishBean.class);
                        DwsTransOrgTruckModelTransFinishDayBean bean = DwsTransOrgTruckModelTransFinishDayBean.builder()
                                .orgId(finishBean.getStartOrgId())
                                .orgName(finishBean.getStartOrgName())
                                .truckId(finishBean.getTruckId())
                                .transFinishCountBase(1L)
                                .transFinishDistanceBase(finishBean.getActualDistance())
                                .transFinishDurTimeBase(finishBean.getTransportTime())
                                .ts(finishBean.getTs() + 8 * 60 * 60 * 1000L)
                                .build();
                        return bean;
                    }
                }
        );


        // 关联卡车维度 获取卡车型号
        SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withTruckDS = AsyncDataStream.unorderedWait(
                mapDS,
                new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_truck_info") {
                    @Override
                    public void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {
                        bean.setTruckModelId(dimInfoJsonObj.getString("truck_model_id"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {
                        return Tuple2.of("id", bean.getTruckId());
                    }
                },
                60, TimeUnit.SECONDS
        );


        // 指定Watermark的生成策略并提起事件时间字段
        SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withWatermarkDS = withTruckDS.assignTimestampsAndWatermarks(
                WatermarkStrategy.<DwsTransOrgTruckModelTransFinishDayBean>forMonotonousTimestamps()
                        .withTimestampAssigner(
                                new SerializableTimestampAssigner<DwsTransOrgTruckModelTransFinishDayBean>() {
                                    @Override
                                    public long extractTimestamp(DwsTransOrgTruckModelTransFinishDayBean element, long l) {
                                        return element.getTs();
                                    }
                                }
                        )
        );


        // 按照机构id + 卡车型号进行分组
        KeyedStream<DwsTransOrgTruckModelTransFinishDayBean, String> keyDS = withWatermarkDS.keyBy(
                new KeySelector<DwsTransOrgTruckModelTransFinishDayBean, String>() {
                    @Override
                    public String getKey(DwsTransOrgTruckModelTransFinishDayBean bean) throws Exception {
                        return bean.getOrgId() + "+" + bean.getTruckModelId();
                    }
                }
        );

        // 开窗
        WindowedStream<DwsTransOrgTruckModelTransFinishDayBean, String, TimeWindow> windowDS = keyDS.window(TumblingEventTimeWindows.of(Time.days(1)));

        // 指定自定义触发器
        WindowedStream<DwsTransOrgTruckModelTransFinishDayBean, String, TimeWindow> triggerDS = windowDS.trigger(new MyTriggerFunction<DwsTransOrgTruckModelTransFinishDayBean>());

        // 聚合
        SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> aggregateDS = triggerDS.aggregate(
                new MyAggregationFunction<DwsTransOrgTruckModelTransFinishDayBean>() {
                    @Override
                    public DwsTransOrgTruckModelTransFinishDayBean add(DwsTransOrgTruckModelTransFinishDayBean value, DwsTransOrgTruckModelTransFinishDayBean accumulator) {
                        if (accumulator == null) {
                            return value;
                        }
                        accumulator.setTransFinishCountBase(value.getTransFinishCountBase() + accumulator.getTransFinishCountBase());
                        accumulator.setTransFinishDistanceBase(value.getTransFinishDistanceBase().add(accumulator.getTransFinishDistanceBase()));
                        accumulator.setTransFinishDurTimeBase(value.getTransFinishDurTimeBase() + accumulator.getTransFinishDurTimeBase());
                        return accumulator;
                    }
                },
                new ProcessWindowFunction<DwsTransOrgTruckModelTransFinishDayBean, DwsTransOrgTruckModelTransFinishDayBean, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<DwsTransOrgTruckModelTransFinishDayBean> elements, Collector<DwsTransOrgTruckModelTransFinishDayBean> out) throws Exception {
                        Long stt = context.window().getStart() - 8 * 60 * 60 * 1000L;
                        String curDate = DateFormatUtil.toDate(stt);
                        for (DwsTransOrgTruckModelTransFinishDayBean element : elements) {
                            element.setCurDate(curDate);
                            element.setTs(System.currentTimeMillis());
                            out.collect(element);
                        }
                    }
                }
        );


        // 关联维度信息
        // 获取卡车型号名称
        SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withTruckModelDS = AsyncDataStream.unorderedWait(
                aggregateDS,
                new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_truck_model") {
                    @Override
                    public void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {
                        bean.setTruckModelName(dimInfoJsonObj.getString("model_name"));

                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {
                        return Tuple2.of("id", bean.getTruckModelId());
                    }
                },
                60, TimeUnit.SECONDS
        );


        // 获取机构(对应的转运中心)的id
        SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withJoinOrgIdDS = AsyncDataStream.unorderedWait(
                withTruckModelDS,
                new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_base_organ") {
                    @Override
                    public void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {
                        String orgParentId = dimInfoJsonObj.getString("org_parent_id");
                        bean.setJoinOrgId(orgParentId != null ? orgParentId : bean.getOrgId());

                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {
                        return Tuple2.of("id", bean.getOrgId());
                    }
                },
                60, TimeUnit.SECONDS
        );

//         根据转运中心的id,到机构表中获取城市id
        SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withCityIdDS = AsyncDataStream.unorderedWait(
                withJoinOrgIdDS,
                new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_base_organ") {
                    @Override
                    public void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {
                        bean.setCityId(dimInfoJsonObj.getString("region_id"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {
                        return Tuple2.of("id", bean.getJoinOrgId());
                    }
                },
                60, TimeUnit.SECONDS
        );
        ;


//         根据城市id 到区域表中获取城市名称
        SingleOutputStreamOperator<DwsTransOrgTruckModelTransFinishDayBean> withCityNameDS = AsyncDataStream.unorderedWait(
                withCityIdDS,
                new DimAsyncFunction<DwsTransOrgTruckModelTransFinishDayBean>("dim_base_region_info") {
                    @Override
                    public void join(DwsTransOrgTruckModelTransFinishDayBean bean, JSONObject dimInfoJsonObj) {
                        bean.setCityName(dimInfoJsonObj.getString("name"));
                    }

                    @Override
                    public Tuple2<String, String> getCondition(DwsTransOrgTruckModelTransFinishDayBean bean) {
                        return Tuple2.of("id", bean.getCityId());
                    }
                },
                60, TimeUnit.SECONDS
        );


        // 将结果写入ck
        withCityNameDS.print(">>>");
        withCityNameDS.addSink(
                ClickHouseUtil.getJdbcSink("insert into dws_trans_org_truck_model_trans_finish_day_base values(?,?,?,?,?,?,?,?,?,?,?)")
        );


        env.execute();
    }
}

3.CK建表

CREATE TABLE IF NOT EXISTS dws_trans_org_truck_model_trans_finish_day_base
(
    `cur_date` Date COMMENT '统计日期',
    `org_id` String COMMENT '机构ID',
    `org_name` String COMMENT '机构名称',
    `truck_model_id` String COMMENT '卡车类型ID',
    `truch_model_name` String COMMENT '卡车类型名称',
    `city_id` String COMMENT '城市ID',
    `city_name` String COMMENT '城市名称',
    `trans_finish_count_base` UInt64 COMMENT '转运完成次数',
    `trans_finish_distance_base` Decimal(38, 20) COMMENT '转运完成里程',
    `trans_finish_dur_time_base` UInt64 COMMENT '转运完成历经时长',
    `ts` UInt64 COMMENT '时间戳'
)
ENGINE = MergeTree
ORDER BY (cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name);

CREATE MATERIALIZED VIEW IF NOT EXISTS dws_trans_org_truck_model_trans_finish_day
(
    `cur_date` Date, 
    `org_id` String, 
    `org_name` String, 
    `truck_model_id` String, 
    `truch_model_name` String, 
    `city_id` String, 
    `city_name` String, 
    `trans_finish_count` AggregateFunction(argMax, UInt64, UInt64), 
    `trans_finish_distance` AggregateFunction(argMax, Decimal(38, 20), UInt64), 
    `trans_finish_dur_time` AggregateFunction(argMax, UInt64, UInt64)
)
ENGINE = AggregatingMergeTree()
ORDER BY (cur_date, org_id, org_name, truck_model_id, truch_model_name, city_id, city_name)
SETTINGS index_granularity = 8192 AS
SELECT 
    cur_date, 
    org_id, 
    org_name, 
    truck_model_id, 
    truch_model_name, 
    city_id, 
    city_name, 
    argMaxState(trans_finish_count_base, ts) AS trans_finish_count, 
    argMaxState(trans_finish_distance_base, ts) AS trans_finish_distance, 
    argMaxState(trans_finish_dur_time_base, ts) AS trans_finish_dur_time
FROM dws_trans_org_truck_model_trans_finish_day_base
GROUP BY 
    cur_date, 
    org_id, 
    org_name, 
    truck_model_id, 
    truch_model_name, 
    city_id, 
    city_name;

二、代码测试

1.修改topic分区数

我们要保证kafka中的topic的分区数,和程序中Flink设置的并行度一样都是4。

kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trade_order_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_bound_finish_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_dispatch_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_deliver_detail --partitions 4
kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic tms_dwd_trans_trans_finish --partitions 4

2.集群启动

将HDFS,zk,kf,hbase,redise和clickhouse全部即启动
因为我们要一次性测试7张表,所以我们要将ODS层和DWD层的四个文件全部启动。
OdsApp、DwdBoundRelevantApp、DwdOrderRelevantApp和DwdTransTransFinish。

3.代码测试

你可以每次打开一个DWS层的应用,然后生成数据,查看CK数据库,也可以7个全部启动,只需要生成一次数据。有些数据可能不常见,要多生成几次。

1.交易域货物类型粒度下单当日汇总表

测试代码

select 
    cur_date,
    cargo_type,
    cargo_type_name,
    argMaxMerge(order_amount) as order_amount,
    argMaxMerge(order_count) as order_count 
from dws_trade_cargo_type_order_day
group by cur_date,
    cargo_type,
    cargo_type_name
LIMIT 10;

在这里插入图片描述

2.交易域机构粒度下单当日汇总表

测试代码

select 
    cur_date,
    org_id,
    org_name,
    city_id,
    city_name,
    argMaxMerge(order_amount) as order_amount,
    argMaxMerge(order_count) as order_count 
from dws_trade_org_order_day
group by cur_date,
    org_id,
    org_name,
    city_id,
    city_name
LIMIT 10;

在这里插入图片描述

3.物流域转运完成当日汇总表

测试代码

select 
    cur_date,
    argMaxMerge(bound_finish_order_count) as bound_finish_order_count
from dws_trans_bound_finish_day
group by cur_date
LIMIT 10;

在这里插入图片描述

4.物流域发单当日汇总表

测试代码

select 
    cur_date,
    argMaxMerge(dispatch_order_count) as dispatch_order_count
from dws_trans_dispatch_day
group by cur_date
LIMIT 10;

在这里插入图片描述

5.物流域机构粒度派送成功当日汇总表

测试代码

SELECT
    cur_date,
    org_id,
    org_name,
    city_id,
    city_name,
    province_id,
    province_name,
    argMaxMerge(deliver_suc_count) AS deliver_suc_count
FROM dws_trans_org_deliver_suc_day
GROUP BY
    cur_date,
    org_id,
    org_name,
    city_id,
    city_name,
    province_id,
    province_name
LIMIT 10;

在这里插入图片描述

6.物流域机构粒度揽收当日汇总表

测试代码

SELECT
    cur_date,
    org_id,
    org_name,
    city_id,
    city_name,
    province_id,
    province_name,
    argMaxMerge(receive_order_count) AS receive_order_count
FROM dws_trans_org_receive_day
GROUP BY
    cur_date,
    org_id,
    org_name,
    city_id,
    city_name,
    province_id,
    province_name
LIMIT 10;

在这里插入图片描述

7.物流域机构卡车类别粒度运输完成当日汇总表

测试代码

SELECT 
    cur_date, 
    org_id, 
    org_name, 
    truck_model_id, 
    truch_model_name, 
    city_id, 
    city_name, 
    argMaxMerge(trans_finish_count) AS trans_finish_count, 
    argMaxMerge(trans_finish_distance) AS trans_finish_distance, 
    argMaxMerge(trans_finish_dur_time) AS trans_finish_dur_time
FROM dws_trans_org_truck_model_trans_finish_day
GROUP BY 
    cur_date, 
    org_id, 
    org_name, 
    truck_model_id, 
    truch_model_name, 
    city_id, 
    city_name
LIMIT 10;

在这里插入图片描述


总结

至此实时数仓的DWS层就搭建完毕了,并且代码已经全度推到了github上。
URL:https://github.com/lcc-666/tms-parent

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1359479.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

7nm项目之顶层规划——04 power routing and pushdown

1.设计数据导入&#xff08;见01&#xff09; 2.初始化 top floorplan with def 3.创建 block partition 4.调整 block floorplan (size/location/area/connection, manul work) 5.format floorplan size and location 6.create tracks 7.pin assignment 8.power routi…

<sa8650>sa8650 qcxser-之-QCX错误报告接口

<sa8650>sa8650 qcxser-之-QCX错误报告接口 1 前言2 错误报告设计3 报告错误的QCarCam APIs3.1 错误ID3.2 错误code3.3 错误源4 错误报告流1 前言 本章主要讲解QCX服务的错误报告接口,如何将qcxserver的错误诊断信息报告给Safety Monitor。 2 错误报告设计 图2-1显示了通…

Java实现阿里云OSS文件上传

1、OSS介绍 阿里云对象存储OSS&#xff08;Object Storage Service&#xff09;是一款云存储服务&#xff0c;通常用于图片、音视频、日志等海量文件的存储&#xff0c;并且数据以对象&#xff08;Object&#xff09;的形式存储在OSS的存储空间&#xff08;Bucket &#xff09…

Centos7.9或Deebian12安装K3s和k9s详细流程

1、在线安装k3s 安装的版本为&#xff1a;v1.23.15k3s1 curl -sfL https://rancher-mirror.rancher.cn/k3s/k3s-install.sh | INSTALL_K3S_MIRRORcn INSTALL_K3S_VERSION"v1.23.15k3s1" sh - 2、安装完成&#xff0c;测试 kubectl get nodes正常输出即没…

使用tensorboard查看loss曲线

1.安装tensorboard pip install tensorboard 如果报错“no module named past”,执行下面的命令安装future pip install future2.在main.py中使用tensorboard绘制loss函数图像 # 导入 from torch.utils.tensorboard import SummaryWriterdef fit(self):for epoch in range(s…

Python之基本数据类型

目录 一、基本数据类型总结 二、基本数据类型 Number&#xff08;数字&#xff09; String&#xff08;字符串&#xff09; Bool&#xff08;布尔类型&#xff09; List&#xff08;列表&#xff09; Tuple&#xff08;元组&#xff09; Set&#xff08;集合&#xff09…

初探ElasticSearch

1.什么是ElasticSearch&#xff1f; ElasticSearch简称ES&#xff0c;也成为弹性搜索&#xff0c;是基于Apache Lucene构建的开源搜索引擎。其实Lucene本身就是一款性能很好的开源搜索引擎工具包&#xff0c;但是Lucene的API相对复杂&#xff0c;而且掌握它需要很深厚的“内功…

Simpy:Python之离散时间序列仿真

Simpy&#xff1a;Python之离散时间序列仿真 文章目录 Simpy&#xff1a;Python之离散时间序列仿真简介基本使用语法简单案例在数据中心中的应用案例 简介 下载地址网站&#xff1a; https://pypi.org/project/simpy/ 有关教程网站&#xff1a; https://simpy.readthedocs.…

Vue 之 修饰符汇总

一、简介 在Vue中&#xff0c;修饰符是一种特殊的语法&#xff0c;用于修改指令或事件绑定的行为&#xff0c;它们以点号&#xff08;.&#xff09;的形式添加到指令或事件的后面&#xff0c;并可以改变其默认行为或添加额外的功能&#xff0c;如&#xff1a;禁止事件冒泡、数…

基于springboot的java读取文档内容(超简单)

读取一个word文档里面的内容&#xff0c;并取出来。 代码&#xff1a; SneakyThrowsGetMapping(value "/readWordDoc")ApiOperationSupport(order 1)ApiOperation(value "文档读取 ", notes "文档读取 ")public R ReadWordDoc () {System.o…

【深入浅出RocketMQ原理及实战】「云原生升级系列」打造新一代云原生“消息、事件、流“统一消息引擎的融合处理平台

打造新一代云原生"消息、事件、流"统一消息引擎的融合处理平台 云原生架构RocketMQ的云原生架构实现RocketMQ的云原生发展历程互联网时期的诞生无法支持云原生的能力 云原生阶段的升级云原生升级方向促进了Mesh以及多语言化发展可分合化的存算分离架构存储分离架构的…

复现PointNet(分割网络):Windows + PyTorch+代码

一、平台 Windows 10 GPU RTX 3090 CUDA 11.1 cudnn 8.9.6 Python 3.9 Torch 1.9.1cu111 所用的原始代码&#xff1a;https://github.com/fxia22/pointnet.pytorch​​​​​​​ 二、数据 shapenetcore_partanno_segmentation_benchmark_v0 三、代码 分享给有需要的…

数据分析工具PlotJuggler使用小技巧

一款优秀的开源的工具能事倍功倍。今天给大家推荐的工具主要是Davide Faconti开发。该工具是基于QT开发&#xff0c;支持静态文件和实时数据流画图分析。以下是该工具的官网链接https://github.com/facontidavide/PlotJuggler。本人旨在介绍使用心得。 1.支持静态文件和实时数…

IDEA中自动导包及快捷键

导包设置及快捷键 设置&#xff1a;Setting->Editor->General->Auto import快捷键 设置&#xff1a;Setting->Editor->General->Auto import java区域有两个关键选项 Add unambiguous imports on the fly 快速添加明确的导包 IDEA将在我们书写代码的时候…

SwinTransformer

patch embedding (b,3,224,224)->(b,N,96) N:patch数量 为每个stage中的每个Swin Transformer block设置drop_rate&#xff0c;根据设置[2,2,6,2]&#xff0c;每个Swin Transformer block的drop_path为0~0.1等间距采样的12个小数&#xff0c;参数0.1也可以更改。还有个drop参…

前端页面的生命周期

性能问题呈现给用户的感受往往就是简单而直接的&#xff1a;加载资源缓慢、运行过程卡顿或响应交互延迟等。而在前端工程师的眼中&#xff0c;从域名解析、TCP建立连接到HTTP的请求与响应&#xff0c;以及从资源请求、文件解析到关键渲染路径等&#xff0c;每一个环节都有可能因…

C语言编译器(C语言编程软件)完全攻略(第十一部分:VS2022使用教程(使用VS2022编写C语言程序))

介绍常用C语言编译器的安装、配置和使用。 十一、VS2022使用教程&#xff08;使用VS2022编写C语言程序&#xff09; 继《十、VS2022下载和安装教程&#xff08;图解版&#xff09;》之后&#xff0c;本节教大家如何用 VS2022 运行 C 语言程序。 例如&#xff0c;在 VS2022 中…

精致旅游公司Treker网页设计 html模板

一、需求分析 旅游网站通常具有多种功能&#xff0c;以下是一些常见的旅游网站功能&#xff1a; 酒店预订&#xff1a;旅游网站可以提供酒店预订服务&#xff0c;让用户搜索并预订符合其需求和预算的酒店房间。 机票预订&#xff1a;用户可以通过旅游网站搜索、比较和预订机票…

用js封装实现余额函数

要求: 1. 运行程序后, 浏览器显示输入确认框(prompt) 2. 第一个输入确认框提示输入银行卡余额 3. 第二个输入确认框提示输入当月食宿消费金额 4. 第三个输入确认框提示输入当月生活消费金额 5. 输入完毕后,在页面中显示银行卡剩余金额 6. 提示: 所有功能代码封装在函数内部&…

【物联网】手把手完整实现STM32+ESP8266+MQTT+阿里云+APP应用——第3节-云产品流转配置

&#x1f31f;博主领域&#xff1a;嵌入式领域&人工智能&软件开发 本节目标&#xff1a;本节目标是进行云产品流转配置为后面实际的手机APP的接入做铺垫。云产品流转配置的目的是为了后面能够让后面实际做出来的手机APP可以控制STM32/MCU&#xff0c;STM32/MCU可以将数…