物流实时数仓:数仓搭建(DWD)二

news2025/1/11 23:44:06

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一
物流实时数仓:数仓搭建(DWD)二


文章目录

  • 系列文章目录
  • 前言
  • 一、代码编写
    • 1.文件创建
    • 2.物流域运输完成事实表
      • 1.DwdTransTransFinishBean
      • 2.DwdTransTransFinish
    • 3.中转域中转流程分流应用
      • 1.DwdOrderOrgBoundOriginBean
      • 2.DwdBoundInboundBean
      • 3.DwdBoundSortBean
      • 4.DwdBoundOutboundBean
      • 5.DwdBoundRelevantApp
  • 二、代码测试
    • 1.环境启动
    • 2.运行flink程序
  • 总结


前言

这次博客我们完成剩下的DWD层的建设
在这里插入图片描述
由流程图可知,我们还需要编写两个Flink程序


提示:以下是本篇文章正文内容,下面案例可供参考

一、代码编写

1.文件创建

我们需要在添加5个bean文件
在这里插入图片描述
还有两个app文件。
在这里插入图片描述

2.物流域运输完成事实表

1.DwdTransTransFinishBean

package com.atguigu.tms.realtime.beans;

import lombok.Data;

import java.math.BigDecimal;

/**
 * 物流域:运输完成事实表实体类
 */
@Data
public class DwdTransTransFinishBean {
    // 编号(主键)
    String id;

    // 班次ID
    String shiftId;

    // 线路ID
    String lineId;

    // 起始机构ID
    String startOrgId;

    // 起始机构名称
    String startOrgName;

    // 目的机构id
    String endOrgId;

    // 目的机构名称
    String endOrgName;

    // 运单个数
    Integer orderNum;

    // 司机1 ID
    String driver1EmpId;

    // 司机1名称
    String driver1Name;

    // 司机2 ID
    String driver2EmpId;

    // 司机2名称
    String driver2Name;

    // 卡车ID
    String truckId;

    // 卡车号牌
    String truckNo;

    // 实际启动时间
    String actualStartTime;

    // 实际到达时间
    String actualEndTime;

    // 运输时长
    Long transportTime;

    // 实际行驶距离
    BigDecimal actualDistance;

    // 时间戳
    Long ts;
}

2.DwdTransTransFinish

package com.atguigu.tms.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DwdTransTransFinishBean;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

// 物流域:运输完成事实表
public class DwdTransTransFinish {
    public static void main(String[] args) throws Exception {
        // 准备环境
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);

        // 从kafka读取数据
        String topic = "tms_ods";
        String groupId = "dwd_trans_tran_finish_group";
        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka_source")
                .uid("Kafka_source");

        // 筛选出运输完成的数据
        SingleOutputStreamOperator<String> filterDS = kafkaStrDS.filter(
                new FilterFunction<String>() {
                    @Override
                    public boolean filter(String jsonStr) throws Exception {
                        JSONObject jsonObj = JSON.parseObject(jsonStr);
                        // 将transport_task 表的操作数据过滤处理
                        String table = jsonObj.getJSONObject("source").getString("table");
                        if (!"transport_task".equals(table)) {
                            return false;
                        }
                        String op = jsonObj.getString("op");
                        JSONObject beforeJsonObj = jsonObj.getJSONObject("before");
                        if (beforeJsonObj == null) {
                            return false;
                        }
                        JSONObject afterJsonObj = jsonObj.getJSONObject("after");
                        String oldActualEndTime = beforeJsonObj.getString("actual_end_time");
                        String newActualEndTime = afterJsonObj.getString("actual_end_time");

                        return "u".equals(op) && oldActualEndTime == null && newActualEndTime != null;
                    }
                }
        );


        // 筛选出的数据进行处理
        SingleOutputStreamOperator<String> processDS = filterDS.process(
                new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
                        JSONObject jsonObj = JSON.parseObject(jsonStr);
                        DwdTransTransFinishBean finishBean = jsonObj.getObject("after", DwdTransTransFinishBean.class);

                        // 补充运输时常字段
                        finishBean.setTransportTime(Long.parseLong(finishBean.getActualStartTime()) - Long.parseLong(finishBean.getActualEndTime()));

                        // 将运输结束时间转换为毫秒-8小时 赋值给事件时间字段ts
                        finishBean.setTs(Long.parseLong(finishBean.getActualEndTime()) - 8 * 60 * 60 * 1000L);

                        // 处理时间问题
                        finishBean.setActualStartTime(DateFormatUtil.toYmdHms(Long.parseLong(finishBean.getActualStartTime()) - 8 * 60 * 60 * 1000L));
                        finishBean.setActualEndTime(DateFormatUtil.toYmdHms(Long.parseLong(finishBean.getActualEndTime()) - 8 * 60 * 60 * 1000L));

                        // 脱敏
                        String driver1Name = finishBean.getDriver1Name();
                        String driver2Name = finishBean.getDriver2Name();
                        String truckNo = finishBean.getTruckNo();

                        driver1Name = driver1Name.charAt(0) +
                                driver1Name.substring(1).replaceAll(".", "\\*");
                        driver2Name = driver2Name == null ? driver2Name : driver2Name.charAt(0) +
                                driver2Name.substring(1).replaceAll(".", "\\*");
                        truckNo = DigestUtils.md5Hex(truckNo);

                        finishBean.setDriver1Name(driver1Name);
                        finishBean.setDriver2Name(driver2Name);
                        finishBean.setTruckNo(truckNo);

                        out.collect(JSON.toJSONString(finishBean));
                    }
                }
        );
        // 处理后的数据写入kafka
        String sinkTopic = "tms_dwd_trans_trans_finish";
        processDS.sinkTo(KafkaUtil.getKafkaSink(sinkTopic,args)).uid("kafka_sink");

        env.execute();
    }
}

3.中转域中转流程分流应用

1.DwdOrderOrgBoundOriginBean

package com.atguigu.tms.realtime.beans;

import lombok.Data;

/**
 * 中转实体类
 */
@Data
public class DwdOrderOrgBoundOriginBean {
    // 编号(主键)
    String id;

    // 运单编号
    String orderId;

    // 机构id
    String orgId;

    // 状态 出库 入库
    String status;

    // 入库时间
    String inboundTime;

    // 入库人员id
    String inboundEmpId;

    // 分拣时间
    String sortTime;

    // 分拣人员id
    String sorterEmpId;

    // 出库时间
    String outboundTime;

    // 出库人员id
    String outboundEmpId;

    // 创建时间
    String createTime;

    // 修改时间
    String updateTime;

    // 删除标志
    String isDeleted;
}

2.DwdBoundInboundBean

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

/**
 * 中转域:入库实体类
 */
@Data
@Builder
public class DwdBoundInboundBean {
    // 编号(主键)
    String id;

    // 运单编号
    String orderId;

    // 机构id
    String orgId;

    // 入库时间
    String inboundTime;

    // 入库人员id
    String inboundEmpId;

    // 时间戳
    Long ts;
}

3.DwdBoundSortBean

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

/**
 * 中转域:分拣实体类
 */
@Data
@Builder
public class DwdBoundSortBean {
    // 编号(主键)
    String id;

    // 运单编号
    String orderId;

    // 机构id
    String orgId;

    // 分拣时间
    String sortTime;

    // 分拣人员id
    String sorterEmpId;

    // 时间戳
    Long ts;
}

4.DwdBoundOutboundBean

package com.atguigu.tms.realtime.beans;

import lombok.Builder;
import lombok.Data;

/**
 * 中转域:出库实体类
 */
@Data
@Builder
public class DwdBoundOutboundBean {
    // 编号(主键)
    String id;

    // 运单编号
    String orderId;

    // 机构id
    String orgId;

    // 出库时间
    String outboundTime;

    // 出库人员id
    String outboundEmpId;

    // 时间戳
    Long ts;
}

5.DwdBoundRelevantApp

package com.atguigu.tms.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.DwdBoundInboundBean;
import com.atguigu.tms.realtime.beans.DwdBoundOutboundBean;
import com.atguigu.tms.realtime.beans.DwdBoundSortBean;
import com.atguigu.tms.realtime.beans.DwdOrderOrgBoundOriginBean;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class DwdBoundRelevantApp {
    public static void main(String[] args) throws Exception {
        // 环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);

        // 从kafka读取数据
        String topic = "tms_ods";
        String groupId = "dwd_bound_group";

        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");

        // 筛选出订单机构中转表
        SingleOutputStreamOperator<String> filterDS = kafkaStrDS.filter(
                new FilterFunction<String>() {
                    @Override
                    public boolean filter(String jsonStr) throws Exception {
                        JSONObject jsonObj = JSON.parseObject(jsonStr);
                        String table = jsonObj.getJSONObject("source").getString("table");
                        return "order_org_bound".equals(table);
                    }
                }
        );


        // 定义侧输出流标签
        OutputTag<String> sortTag = new OutputTag<String>("sortTag") {
        };
        OutputTag<String> outboundTag = new OutputTag<String>("outboundTag") {
        };



        // 分流 入库->主流  分流->分拣侧输出流 出库->出库侧输出流
        SingleOutputStreamOperator<String> inboundDS = filterDS.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
                JSONObject jsonObj = JSON.parseObject(jsonStr);
                String op = jsonObj.getString("op");

                DwdOrderOrgBoundOriginBean after = jsonObj.getObject("after", DwdOrderOrgBoundOriginBean.class);
                DwdOrderOrgBoundOriginBean before = jsonObj.getObject("before", DwdOrderOrgBoundOriginBean.class);

                // 获取中转数据id
                String id = after.getId();
                // 获取运单id
                String orderId = after.getOrderId();
                // 获取机构id
                String orgId = after.getOrgId();

                if ("c".equals(op)) {
                    long ts = Long.parseLong(after.getInboundEmpId()) - 8 * 60 * 60 * 1000L;
                    String inboundTime = DateFormatUtil.toYmdHms(ts);
                    String inboundEmpId = after.getInboundEmpId();

                    // 入库
                    DwdBoundInboundBean inboundBean = DwdBoundInboundBean.builder()
                            .id(id)
                            .orderId(orderId)
                            .orgId(orgId)
                            .inboundTime(inboundTime)
                            .inboundEmpId(inboundEmpId)
                            .ts(ts)
                            .build();
                    out.collect(JSON.toJSONString(inboundBean));
                } else {
                    // 将分拣数据放到侧输出流
                    String beforeSortTime = before.getSortTime();
                    String afterSortTime = after.getSortTime();
                    if (beforeSortTime == null && afterSortTime != null) {
                        long ts = Long.parseLong(after.getSortTime()) - 8 * 60 * 60 * 1000L;
                        String sortTime = DateFormatUtil.toYmdHms(ts);
                        String sorterEmpId = after.getSorterEmpId();
                        DwdBoundSortBean sortBean = DwdBoundSortBean.builder()
                                .id(id)
                                .orderId(orderId)
                                .orgId(orgId)
                                .sortTime(sortTime)
                                .sorterEmpId(sorterEmpId)
                                .ts(ts)
                                .build();
                        ctx.output(sortTag, JSON.toJSONString(sortBean));
                    }

                    // 筛选储库操作 将数据库放到出库侧输出流
                    String beforeOutboundTime = before.getOutboundTime();
                    String afterOutboundTime = after.getOutboundTime();

                    if (beforeOutboundTime == null && afterOutboundTime != null) {
                        long ts = Long.parseLong(after.getOutboundTime()) - 8 * 60 * 60 * 1000L;
                        String outboundTime = DateFormatUtil.toYmdHms(ts);
                        String outboundEmpId = after.getOutboundEmpId();

                        DwdBoundOutboundBean outboundBean = DwdBoundOutboundBean.builder()
                                .id(id)
                                .orderId(orderId)
                                .orgId(orgId)
                                .outboundTime(outboundTime)
                                .outboundEmpId(outboundEmpId)
                                .ts(ts)
                                .build();
                        ctx.output(outboundTag, JSON.toJSONString(outboundBean));
                    }

                }

            }
        });

        // 从主流中提取侧输出流
        // 分拣流
        SideOutputDataStream<String> sortDS = inboundDS.getSideOutput(sortTag);
        // 出库流
        SideOutputDataStream<String> outboundDS = inboundDS.getSideOutput(outboundTag);

        // 将不同流数据写到kafka主题
        //中转域入库事实主题
        String inboundTopic = "tms_dwd_bound_inbound";
        //中转域分拣事实主题
        String sortTopic = "tms_dwd_bound_sort";
        //中转域出库事实主题
        String outboundTopic = "tms_dwd_bound_outbound";

        inboundDS.sinkTo(KafkaUtil.getKafkaSink(inboundTopic, args)).uid("inbound_sink");
        sortDS.sinkTo(KafkaUtil.getKafkaSink(sortTopic, args)).uid("sort_sink");
        outboundDS.sinkTo(KafkaUtil.getKafkaSink(outboundTopic, args)).uid("outbound_sink");

        env.execute();
    }
}

二、代码测试

1.环境启动

hadoop,zk,kf和odsapp全部启动

2.运行flink程序

启动DwdTransTransFinish
然后开一个kafka消费者

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_trans_finish

然后继续生产数据

在这里插入图片描述
kafka消费到数据之后,程序就可以关闭了。

启动DwdBoundRelevantApp
然后开启3个消费者。

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_bound_inbound
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_bound_sort
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_bound_outbound

在这里插入图片描述在这里插入图片描述
在这里插入图片描述


总结

至此数仓Dwd层搭建完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1319326.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【赠书第11期】Unity 3D游戏开发

文章目录 前言 1 Unity 3D简介 2 Unity 3D基本概念 2.1 场景&#xff08;Scene&#xff09; 2.2 游戏对象&#xff08;Game Object&#xff09; 2.3 组件&#xff08;Component&#xff09; 2.4 资源&#xff08;Asset&#xff09; 3 Unity 3D重要组件 3.1 物理引擎 …

基于SSM的图书馆预约座位系统的设计与实现(部署+源码+LW)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。今天给大家介绍一篇基于SSM的图书馆预约座位…

如何使用JavaScript 将数据网格绑定到 GraphQL 服务

前言 作为一名前端开发人员&#xff0c;GraphQL对于我们来说是令人难以置信的好用。它可以用来简化数据访问&#xff0c;这让我们的工作变得更加容易。 什么是 GraphQL&#xff1f;它是一个抽象层&#xff0c;位于任意数量的数据源之上&#xff0c;并为您提供一个简单的 API …

【深度学习】注意力机制(三)

本文介绍一些注意力机制的实现&#xff0c;包括EMHSA/SA/SGE/AFT/Outlook Attention。 【深度学习】注意力机制&#xff08;一&#xff09; 【深度学习】注意力机制&#xff08;二&#xff09; 【深度学习】注意力机制&#xff08;四&#xff09; 【深度学习】注意力机制&a…

PCB设计规则中的经验公式_笔记

PCB设计规则中的经验公式 规则1 - 临界长度规则2 - 信号带宽与上升时间规则3- 时钟信号带宽规则4-信号传输速度规则5- 集肤 (效应) 深度规则6 - 50Ω传输线电容规则7 - 50Ω传输线电感规则8 - 回流路径电感规则9 - 地弹噪声规则10- 串行传输比特率与信号带宽规则11- PCB走线直流…

HR人才测评,招聘企业中高层管理的岗位胜任力测评方案

不管是哪一个企业&#xff0c;中高层管理都是企业的核心层&#xff0c;在对这部分人才进行测评方案制定的时候&#xff0c;则要更加细致谨慎一些&#xff0c;避免出现人才录用失误的情况。 中高层管理人员是公司的支柱&#xff0c;需要具备的素质主要偏向于管理能力、综合素质…

CyclicBarrier学习一

一、定义 CyclicBarrier 字面意思回环栅栏&#xff08;循环屏障&#xff09;&#xff0c;通过它可以实现让一组线程等待至某个状态&#xff08;屏障点&#xff09;之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后&#xff0c;CyclicBarrier可以被重用。 CyclicB…

vue写了这么久了您是否知道:为什么data属性是一个函数而不是一个对象?

一、实例和组件定义data的区别 vue实例的时候定义data属性既可以是一个对象&#xff0c;也可以是一个函数 const app new Vue({el:"#app",// 对象格式data:{foo:"foo"},// 函数格式data(){return {foo:"foo"}} })组件中定义data属性&#xff…

信号与线性系统翻转课堂笔记4——连续LTI系统的微分方程模型与求解

信号与线性系统翻转课堂笔记4——连续LTI系统的微分方程模型与求解 The Flipped Classroom4 of Signals and Linear Systems 对应教材&#xff1a;《信号与线性系统分析&#xff08;第五版&#xff09;》高等教育出版社&#xff0c;吴大正著 一、要点 &#xff08;1&#x…

Zotero攻略

给大家分享一下我对于Zotero的使用。 1、下载链接 Zotero | Your personal research assistant 进入后直接下载即可 2、一些好用的插件 &#xff08;1&#xff09;Zotero Connector 下载地址&#xff1a;Zotero | Connectors 超级好用&#xff01;不用一篇一篇下PDF了&am…

geemap学习笔记028:Landsat8计算时间序列NDVI并导出

前言 本节则是以Landsat8影像数据为例&#xff0c;进行NDVI时间序列计算&#xff0c;并将得到的时间序列NDVI进行展示并导出。 1 导入库并显示地图 import ee import geemap import datetime import pandas as pd import os ee.Initialize()2 定义时间范围 # 定义日期范围 …

记录 | Visual Studio报错:const char*类型的值不能用于初始化char*类型

Visual Studio 报错&#xff1a; const char *”类型的值不能用于初始化“char *”类型的实体错误 解决办法&#xff1a; 1&#xff0c;强制类型转换&#xff0c;例如&#xff1a; char * Singer::pv[] {(char*)"other", (char*)"alto", (char*)"c…

【网络协议】网络运维管理神经-SNMP协议

文章目录 什么是SNMP&#xff1f;SNMP的组件SNMP的历史版本SNMP端口SNMP配置案例SNMP工作原理SNMP的基本工作原理SNMP的操作类型SNMP TrapsSNMP Inform SNMP的应用场景推荐阅读 什么是SNMP&#xff1f; SNMP&#xff08;Simple Network Management Protocol&#xff0c;简单网…

学习黑马vue

项目分析 项目下载地址&#xff1a;vue-admin-template-master: 学习黑马vue 项目下载后没有环境可参考我的篇文章&#xff0c;算是比较详细&#xff1a;vue安装与配置-CSDN博客 安装这两个插件可格式化代码&#xff0c;vscode这个软件是免费的&#xff0c;官网&#xff1a;…

2023年OceanBase开发者大会-核心PPT资料下载

一、峰会简介 2023年OceanBase开发者大会主要涵盖了OceanBase的最新技术进展、产品更新以及开发者工具的发布。大会发布了OceanBase 4.1版本&#xff0c;公布了两大友好工具&#xff0c;升级了文档的易用性&#xff0c;并统一了企业版和社区版的代码分支。这些举措全面呈现了O…

2017年AMC8数学竞赛中英文真题典型考题、考点分析和答案解析

从战争中学习战争最有效。前几天&#xff0c;六分成长分析了2023年、2022年、2020、2019、2018年的AMC8的典型考题、考点和详细答案解析。 今天继续为大家分享2017年的AMC8的五道典型考题&#xff0c;所有的这些试题六分成长独家制作了在线版本&#xff0c;适合各种终端和设备…

Android Compose Transition 动画

Transition 是一种动画效果&#xff0c;用于在组件的状态之间进行平滑的过渡。它可以帮助我们在状态变化时&#xff0c;以一种流畅的方式更新 UI。通过使用 Compose 的 Transition API&#xff0c;您可以在应用中创建各种各样的动画效果&#xff0c;从而增强用户体验并提高应用…

智能五子棋1

*一、项目需求* 五子棋是一种简单的黑白棋&#xff0c;历史悠久&#xff0c;起源于中国&#xff0c;后传入日本&#xff0c;在日本被称为“连珠”&#xff0c;是一种老少皆宜的益智游戏。 人工智能五子棋系统的目标用户是一切想致力于研究人机对弈算法理论的相关研究者和一切…

I2C总线(一)核心

基于linux-3.14.16 一、简介 硬件上&#xff0c;i2c总线由&#xff0c;i2c控制器、i2c总线、i2c设备组成。 驱动代码将通过设置i2c寄存器&#xff0c;从而在总线上产生数据信息&#xff0c;来和i2c设备通信&#xff08;读/写&#xff09;。 i2c核心&#xff0c;主要的功能包…