物流实时数仓:数仓搭建(DWD)一

news2025/1/12 0:58:31

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建
物流实时数仓:数仓搭建(DIM)
物流实时数仓:数仓搭建(DWD)一


文章目录

  • 系列文章目录
  • 前言
  • 一、文件编写
    • 1.目录创建
    • 2.bean文件
      • 1.DwdOrderDetailOriginBean
      • 2.DwdOrderInfoOriginBean
      • 3.DwdTradeCancelDetailBean
      • 4.DwdTradeOrderDetailBean
      • 5.DwdTradePaySucDetailBean
      • 6.DwdTransBoundFinishDetailBean
      • 7.DwdTransDeliverSucDetailBean
      • 8.DwdTransDispatchDetailBean
      • 9.DwdTransReceiveDetailBean
      • 10.DwdTransSignDetailBean
    • 3.DwdOrderRelevantApp
  • 二、代码测试
    • 1.环境启动
    • 2.kafka消费者
    • 3.修改配置
    • 4.测试结果
  • 总结


前言

这次博客我们进行DWD层的搭建,内容比较多,一次可能写不完。
在这里插入图片描述
以上就是本次博客需要完成的内容,简单来说就是,从kafka读取数据,然后根据不同的关键字,将其从主流中进行分离,然后在写入各自的kafka中以便后续的操作


一、文件编写

1.目录创建

我们现在beans中创建后边需要的的bean
在这里插入图片描述
然后在dwd目录中创建此次需要的app
在这里插入图片描述

2.bean文件

1.DwdOrderDetailOriginBean

package com.atguigu.tms.realtime.beans;

import lombok.Data;

import java.math.BigDecimal;

/**
 *订单货物明细实体类
 */

@Data
public class DwdOrderDetailOriginBean {
    // 编号(主键)
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumnLength;

    // 宽cm
    Integer volumnWidth;

    // 高cm
    Integer volumnHeight;

    // 重量 kg
    BigDecimal weight;

    // 创建时间
    String createTime;

    // 更新时间
    String updateTime;

    // 是否删除
    String isDeleted;
}

2.DwdOrderInfoOriginBean

package com.atguigu.tms.realtime.beans;

import lombok.Data;

import java.math.BigDecimal;

/**
 * 订单实体类
 */
@Data
public class DwdOrderInfoOriginBean {
    // 编号(主键)
    String id;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    Long estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 创建时间
    String createTime;

    // 更新时间
    String updateTime;

    // 是否删除
    String isDeleted;
}

3.DwdTradeCancelDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;

/**
 * 交易域:取消运单事务事实表实体类
 */
@Data
public class DwdTradeCancelDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 取消时间
    String cancelTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.cancelTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

4.DwdTradeOrderDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;

/**
 *交易域:下单事务事实表实体类
 */
@Data
public class DwdTradeOrderDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 下单时间
    String orderTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;
        this.orderTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        detailOriginBean.createTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                detailOriginBean.createTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
    }
}

5.DwdTradePaySucDetailBean

package com.atguigu.tms.realtime.beans;
import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;

/**
 *交易域:支付成功事务事实表实体类
 */
@Data
public class DwdTradePaySucDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 支付时间
    String payTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;

        this.payTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

6.DwdTransBoundFinishDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;
/**
 *物流域:转运完成事务事实表实体类
 */
@Data
public class DwdTransBoundFinishDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 发单时间
    String boundFinishTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.boundFinishTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

7.DwdTransDeliverSucDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;
/**
 *物流域:派送成功事务事实表实体类
 */
@Data
public class DwdTransDeliverSucDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 派送成功时间
    String deliverTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.deliverTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

8.DwdTransDispatchDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;

import lombok.Data;

import java.math.BigDecimal;

/**
 *物流域:发单事务事实表实体类
 */
@Data
public class DwdTransDispatchDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 发单时间
    String dispatchTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.dispatchTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

9.DwdTransReceiveDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;

/**
 *物流域:揽收(接单)事务事实表实体类
 */
@Data
public class DwdTransReceiveDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 揽收时间
    String receiveTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.receiveTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

10.DwdTransSignDetailBean

package com.atguigu.tms.realtime.beans;

import com.atguigu.tms.realtime.utils.DateFormatUtil;
import lombok.Data;

import java.math.BigDecimal;

/**
 * 物流域:签收事务事实表实体类
 */
@Data
public class DwdTransSignDetailBean {
    // 运单明细ID
    String id;

    // 运单id
    String orderId;

    // 货物类型
    String cargoType;

    // 长cm
    Integer volumeLength;

    // 宽cm
    Integer volumeWidth;

    // 高cm
    Integer volumeHeight;

    // 重量 kg
    BigDecimal weight;

    // 签收时间
    String signTime;

    // 运单号
    String orderNo;

    // 运单状态
    String status;

    // 取件类型,1为网点自寄,2为上门取件
    String collectType;

    // 客户id
    String userId;

    // 收件人小区id
    String receiverComplexId;

    // 收件人省份id
    String receiverProvinceId;

    // 收件人城市id
    String receiverCityId;

    // 收件人区县id
    String receiverDistrictId;

    // 收件人姓名
    String receiverName;

    // 发件人小区id
    String senderComplexId;

    // 发件人省份id
    String senderProvinceId;

    // 发件人城市id
    String senderCityId;

    // 发件人区县id
    String senderDistrictId;

    // 发件人姓名
    String senderName;

    // 支付方式
    String paymentType;

    // 货物个数
    Integer cargoNum;

    // 金额
    BigDecimal amount;

    // 预计到达时间
    String estimateArriveTime;

    // 距离,单位:公里
    BigDecimal distance;

    // 时间戳
    Long ts;

    public void mergeBean(DwdOrderDetailOriginBean detailOriginBean, DwdOrderInfoOriginBean infoOriginBean) {
        // 合并原始明细字段
        this.id = detailOriginBean.id;
        this.orderId = detailOriginBean.orderId;
        this.cargoType = detailOriginBean.cargoType;
        this.volumeLength = detailOriginBean.volumnLength;
        this.volumeWidth = detailOriginBean.volumnWidth;
        this.volumeHeight = detailOriginBean.volumnHeight;
        this.weight = detailOriginBean.weight;

        // 合并原始订单字段
        this.orderNo = infoOriginBean.orderNo;
        this.status = infoOriginBean.status;
        this.collectType = infoOriginBean.collectType;
        this.userId = infoOriginBean.userId;
        this.receiverComplexId = infoOriginBean.receiverComplexId;
        this.receiverProvinceId = infoOriginBean.receiverProvinceId;
        this.receiverCityId = infoOriginBean.receiverCityId;
        this.receiverDistrictId = infoOriginBean.receiverDistrictId;
        this.receiverName = infoOriginBean.receiverName;
        this.senderComplexId = infoOriginBean.senderComplexId;
        this.senderProvinceId = infoOriginBean.senderProvinceId;
        this.senderCityId = infoOriginBean.senderCityId;
        this.senderDistrictId = infoOriginBean.senderDistrictId;
        this.senderName = infoOriginBean.senderName;
        this.paymentType = infoOriginBean.paymentType;
        this.cargoNum = infoOriginBean.cargoNum;
        this.amount = infoOriginBean.amount;
        this.estimateArriveTime = DateFormatUtil.toYmdHms(
                infoOriginBean.estimateArriveTime - 8 * 60 * 60 * 1000);
        this.distance = infoOriginBean.distance;
        this.signTime =
                DateFormatUtil.toYmdHms(DateFormatUtil.toTs(
                        infoOriginBean.updateTime.replaceAll("T", " ")
                                .replaceAll("Z", ""), true)
                        + 8 * 60 * 60 * 1000);
        this.ts = DateFormatUtil.toTs(
                infoOriginBean.updateTime.replaceAll("T", " ")
                        .replaceAll("Z", ""), true)
                + 8 * 60 * 60 * 1000;
    }
}

3.DwdOrderRelevantApp

package com.atguigu.tms.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.beans.*;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class DwdOrderRelevantApp {
    public static void main(String[] args) throws Exception {
        // 1.环境准备
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);

        // 2.从Kafka读数据
        String topic = "tms_ods";
        String groupId = "dwd_order_relevant_group";
        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId, args);
        SingleOutputStreamOperator<String> kafkaStrDS = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source")
                .uid("kafka_source");

        // 3.筛选订单和订单明细数据
        SingleOutputStreamOperator<String> filterDS = kafkaStrDS.filter((FilterFunction<String>) jsonStr -> {
            JSONObject jsonObj = JSON.parseObject(jsonStr);
            String tableName = jsonObj.getJSONObject("source").getString("table");
            return "order_info".equals(tableName) || "order_cargo".equals(tableName);
        });
//        filterDS.print(">>>");
        // 4.对流中的数据类型进行转换 jsonStr->jsonObj
        SingleOutputStreamOperator<JSONObject> jsonObjDS = filterDS.map((MapFunction<String, JSONObject>) jsonStr -> {
            JSONObject jsonObj = JSON.parseObject(jsonStr);
            String tableName = jsonObj.getJSONObject("source").getString("table");
            jsonObj.put("table", tableName);
            jsonObj.remove("source");
            jsonObj.remove("transaction");
            return jsonObj;
        });

//        jsonObjDS.print(">>>");

        // 5.按照order_id进行分组
        KeyedStream<JSONObject, String> keyDS = jsonObjDS.keyBy((KeySelector<JSONObject, String>) jsonObj -> {
            String table = jsonObj.getString("table");
            if ("order_info".equals(table)) {
                return jsonObj.getJSONObject("after").getString("id");

            }
            return jsonObj.getJSONObject("after").getString("order_id");
        });
//        keyDS.print(">>>");

        // 6.定义侧输出流标签 下单放到主流,支付成功、取消运单、揽收(接单)、发单 转运完成、派送成功、签收放到侧输出流
        // 支付成功明细流标签
        OutputTag<String> paySucTag = new OutputTag<String>("dwd_trade_pay_suc_detail") {
        };
        // 取消运单明细流标签
        OutputTag<String> cancelDetailTag = new OutputTag<String>("dwd_trade_cancel_detail") {
        };
        // 揽收明细流标签
        OutputTag<String> receiveDetailTag = new OutputTag<String>("dwd_trans_receive_detail") {
        };
        // 发单明细流标签
        OutputTag<String> dispatchDetailTag = new OutputTag<String>("dwd_trans_dispatch_detail") {
        };
        // 转运完成明细流标签
        OutputTag<String> boundFinishDetailTag = new OutputTag<String>("dwd_trans_bound_finish_detail") {
        };
        // 派送成功明细流标签
        OutputTag<String> deliverSucDetailTag = new OutputTag<String>("dwd_trans_deliver_detail") {
        };
        // 签收明细流标签
        OutputTag<String> signDetailTag = new OutputTag<String>("dwd_trans_sign_detail") {
        };

        // 7.分流
        SingleOutputStreamOperator<String> orderDetailDS = keyDS.process(
                new KeyedProcessFunction<String, JSONObject, String>() {

                    private ValueState<DwdOrderInfoOriginBean> infoBeanState;
                    private ValueState<DwdOrderDetailOriginBean> detailBeanState;

                    @Override
                    public void open(Configuration parameters) {
                        ValueStateDescriptor<DwdOrderInfoOriginBean> InfoOriginBeanStateDescriptor
                                = new ValueStateDescriptor<>("infoBeanState", DwdOrderInfoOriginBean.class);
                        InfoOriginBeanStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(5)).build());

                        infoBeanState = getRuntimeContext().getState(InfoOriginBeanStateDescriptor);

                        ValueStateDescriptor detailBeanStateDescriptor
                                = new ValueStateDescriptor<>("detailBeanState", DwdOrderDetailOriginBean.class);
                        detailBeanState = getRuntimeContext().getState(detailBeanStateDescriptor);

                    }

                    @Override
                    public void processElement(JSONObject jsonObj, KeyedProcessFunction<String, JSONObject, String>.Context ctx, Collector<String> out) throws Exception {
                        String table = jsonObj.getString("table");
                        String op = jsonObj.getString("op");
                        JSONObject data = jsonObj.getJSONObject("after");
                        if ("order_info".equals(table)) {
                            //处理的是订单数据
                            DwdOrderInfoOriginBean infoOriginBean = data.toJavaObject(DwdOrderInfoOriginBean.class);

                            // 脱敏
                            String senderName = infoOriginBean.getSenderName();
                            String receiverName = infoOriginBean.getReceiverName();

                            senderName = senderName.charAt(0) + senderName.substring(1).replaceAll(".", "\\*");
                            receiverName = receiverName.charAt(0) + receiverName.substring(1).replaceAll(".", "\\*");

                            infoOriginBean.setSenderName(senderName);
                            infoOriginBean.setReceiverName(receiverName);

                            DwdOrderDetailOriginBean detailOriginBean = detailBeanState.value();
                            if ("c".equals(op)) {
                                // 下单操作
                                if (detailOriginBean == null) {
                                    // 订单数据 比明细数据先到,将订单数据放到状态中
                                    infoBeanState.update(infoOriginBean);
                                } else {
                                    // 说明订单数据来之前,明细数据已经来到了,直接关联
                                    DwdTradeOrderDetailBean dwdTradeOrderDetailBean = new DwdTradeOrderDetailBean();
                                    dwdTradeOrderDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                    // 将下单业务过程数据 放到主流中
                                    out.collect(JSON.toJSONString(dwdTradeOrderDetailBean));
                                }

                            } else if ("u".equals(op) && detailOriginBean != null) {
                                // 其它操作
                                // 获取修改前的数据
                                JSONObject oldData = jsonObj.getJSONObject("before");
                                // 获取修改前的状态值
                                String oldStatus = oldData.getString("status");
                                String status = infoOriginBean.getStatus();
                                if (!oldStatus.equals(status)) {
                                    // 说明修改的是status字段
                                    String changeLog = oldStatus + " -> " + status;
                                    switch (changeLog) {
                                        case "60010 -> 60020":
                                            // 处理支付成功数据
                                            DwdTradePaySucDetailBean dwdTradePaySucDetailBean = new DwdTradePaySucDetailBean();
                                            dwdTradePaySucDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(paySucTag, JSON.toJSONString(dwdTradePaySucDetailBean));
                                            break;
                                        case "60020 -> 60030":
                                            // 处理揽收明细数据
                                            DwdTransReceiveDetailBean dwdTransReceiveDetailBean = new DwdTransReceiveDetailBean();
                                            dwdTransReceiveDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(receiveDetailTag, JSON.toJSONString(dwdTransReceiveDetailBean));
                                            break;
                                        case "60040 -> 60050":
                                            // 处理发单明细数据
                                            DwdTransDispatchDetailBean dispatchDetailBean = new DwdTransDispatchDetailBean();
                                            dispatchDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(dispatchDetailTag, JSON.toJSONString(dispatchDetailBean));
                                            break;
                                        case "60050 -> 60060":
                                            // 处理转运完成明细数据
                                            DwdTransBoundFinishDetailBean boundFinishDetailBean = new DwdTransBoundFinishDetailBean();
                                            boundFinishDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(boundFinishDetailTag, JSON.toJSONString(boundFinishDetailBean));
                                            break;
                                        case "60060 -> 60070":
                                            // 处理派送成功数据
                                            DwdTransDeliverSucDetailBean dwdTransDeliverSucDetailBean = new DwdTransDeliverSucDetailBean();
                                            dwdTransDeliverSucDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(deliverSucDetailTag, JSON.toJSONString(dwdTransDeliverSucDetailBean));
                                            break;
                                        case "60070 -> 60080":
                                            // 处理签收明细数据
                                            DwdTransSignDetailBean dwdTransSignDetailBean = new DwdTransSignDetailBean();
                                            dwdTransSignDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                            ctx.output(signDetailTag, JSON.toJSONString(dwdTransSignDetailBean));
                                            // 签收后订单数据不会再发生变化,状态可以清除
                                            detailBeanState.clear();
                                            break;
                                        default:
                                            if (status.equals("60999")) {
                                                DwdTradeCancelDetailBean dwdTradeCancelDetailBean = new DwdTradeCancelDetailBean();
                                                dwdTradeCancelDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                                ctx.output(cancelDetailTag, JSON.toJSONString(dwdTradeCancelDetailBean));
                                                // 取消后订单数据不会再发生变化,状态可以清除
                                                detailBeanState.clear();
                                            }
                                            break;
                                    }
                                }
                            }
                        } else {
                            // 处理订单明细
                            DwdOrderDetailOriginBean detailOriginBean = data.toJavaObject(DwdOrderDetailOriginBean.class);
                            if ("c".equals(op)) {
                                detailBeanState.update(detailOriginBean);
                                // 获取状态中存放的订单数据 注意:只有下单操作,并且订单数据先到,明细数据后到的情况,才会从状态中拿到订单数据
                                DwdOrderInfoOriginBean infoOriginBean = infoBeanState.value();
                                if (infoOriginBean != null) {
                                    //属于下单业务过程
                                    DwdTradeOrderDetailBean dwdTradeOrderDetailBean = new DwdTradeOrderDetailBean();
                                    dwdTradeOrderDetailBean.mergeBean(detailOriginBean, infoOriginBean);
                                    // 将下单业务过程数据 放到主流中
                                    out.collect(JSON.toJSONString(dwdTradeOrderDetailBean));
                                }
                            }
                        }
                    }
                }
        ).uid("process_data");

        // 8.从主流中提取侧输出流
        // 支付成功明细流
        //8.1 支付成功明细流
        SideOutputDataStream<String> paySucDS = orderDetailDS.getSideOutput(paySucTag);

        // 8.2 取消运单明细流
        SideOutputDataStream<String> cancelDetailDS = orderDetailDS.getSideOutput(cancelDetailTag);
        // 8.3 揽收明细流
        SideOutputDataStream<String> receiveDetailDS = orderDetailDS.getSideOutput(receiveDetailTag);
        // 8.4 发单明细流
        SideOutputDataStream<String> dispatchDetailDS = orderDetailDS.getSideOutput(dispatchDetailTag);
        // 8.5 转运成功明细流
        SideOutputDataStream<String> boundFinishDetailDS = orderDetailDS.getSideOutput(boundFinishDetailTag);
        // 8.6 派送成功明细流
        SideOutputDataStream<String> deliverSucDetailDS = orderDetailDS.getSideOutput(deliverSucDetailTag);
        // 8.7 签收明细流
        SideOutputDataStream<String> signDetailDS = orderDetailDS.getSideOutput(signDetailTag);

        // 9.将不同流的数据写到kafka的不同主题中
        // 9.1.1 交易域下单明细主题
        String detailTopic = "tms_dwd_trade_order_detail";
        // 9.1.2 交易域支付成功明细主题
        String paySucDetailTopic = "tms_dwd_trade_pay_suc_detail";
        // 9.1.3 交易域取消运单明细主题
        String cancelDetailTopic = "tms_dwd_trade_cancel_detail";
        // 9.1.4 物流域接单(揽收)明细主题
        String receiveDetailTopic = "tms_dwd_trans_receive_detail";
        // 9.1.5 物流域发单明细主题
        String dispatchDetailTopic = "tms_dwd_trans_dispatch_detail";
        // 9.1.6 物流域转运完成明细主题
        String boundFinishDetailTopic = "tms_dwd_trans_bound_finish_detail";
        // 9.1.7 物流域派送成功明细主题
        String deliverSucDetailTopic = "tms_dwd_trans_deliver_detail";
        // 9.1.8 物流域签收明细主题
        String signDetailTopic = "tms_dwd_trans_sign_detail";

        // 9.2 发送数据到 Kafka
        // 9.2.1 运单明细数据
        KafkaSink<String> kafkaProducer = KafkaUtil.getKafkaSink(detailTopic, args);
        orderDetailDS.print("~~");
        orderDetailDS
                .sinkTo(kafkaProducer)
                .uid("order_detail_sink");

        // 9.2.2 支付成功明细数据
        KafkaSink<String> paySucKafkaProducer = KafkaUtil.getKafkaSink(paySucDetailTopic, args);
        paySucDS.print("!!");
        paySucDS
                .sinkTo(paySucKafkaProducer)
                .uid("pay_suc_detail_sink");

        // 9.2.3 取消运单明细数据
        KafkaSink<String> cancelKafkaProducer = KafkaUtil.getKafkaSink(cancelDetailTopic, args);
        cancelDetailDS.print("@@");
        cancelDetailDS
                .sinkTo(cancelKafkaProducer)
                .uid("cancel_detail_sink");

        // 9.2.4 揽收明细数据
        KafkaSink<String> receiveKafkaProducer = KafkaUtil.getKafkaSink(receiveDetailTopic, args);
        receiveDetailDS.print("##");
        receiveDetailDS
                .sinkTo(receiveKafkaProducer)
                .uid("reveive_detail_sink");

        // 9.2.5 发单明细数据
        KafkaSink<String> dispatchKafkaProducer = KafkaUtil.getKafkaSink(dispatchDetailTopic, args);
        dispatchDetailDS.print("$$");
        dispatchDetailDS
                .sinkTo(dispatchKafkaProducer)
                .uid("dispatch_detail_sink");

        // 9.2.6 转运完成明细主题
        KafkaSink<String> boundFinishKafkaProducer = KafkaUtil.getKafkaSink(boundFinishDetailTopic, args);
        boundFinishDetailDS.print("%%");
        boundFinishDetailDS
                .sinkTo(boundFinishKafkaProducer)
                .uid("bound_finish_detail_sink");

        // 9.2.7 派送成功明细数据
        KafkaSink<String> deliverSucKafkaProducer = KafkaUtil.getKafkaSink(deliverSucDetailTopic, args);
        deliverSucDetailDS.print("^^");
        deliverSucDetailDS
                .sinkTo(deliverSucKafkaProducer)
                .uid("deliver_suc_detail_sink");

        // 9.2.8 签收明细数据
        KafkaSink<String> signKafkaProducer = KafkaUtil.getKafkaSink(signDetailTopic, args);
        signDetailDS.print("&&");
        signDetailDS
                .sinkTo(signKafkaProducer)
                .uid("sign_detail_sink");
        env.execute();
    }
}

二、代码测试

1.环境启动

hadoop,zk,kf全部启动
根据流程图可以看到,流程中没有使用到dim层的内容,所以我们不需要启动hbase。

2.kafka消费者

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_order_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_pay_suc_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_cancel_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_receive_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_dispatch_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_bound_finish_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_deliver_detail
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_sign_detail

一共需要查看8个消费者主题,你可以开8个窗口,也可以一个一个看,kafka如果没有消费者,会先将数据保存,等待消费,所以不需要8个主题同时消费。

3.修改配置

在这里插入图片描述

4.测试结果

先启动OdsApp和DwdOrderRelevantApp,然后生成模拟数据,之后查看kakfa消费者,有些数据可能要多生成几次才行。

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_order_detail
在这里插入图片描述

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_pay_suc_detail
在这里插入图片描述
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trade_cancel_detail

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_receive_detail
在这里插入图片描述
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_dispatch_detail
这个主题是特殊情况,正常可能没有输出。

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_bound_finish_detail
在这里插入图片描述

kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_deliver_detail
在这里插入图片描述
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_dwd_trans_sign_detail
在这里插入图片描述


总结

至此这篇博客的内容结束。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1309802.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode 162. 寻找峰值(优质解法)

代码&#xff1a; class Solution {public int findPeakElement(int[] nums) {int left0,rightnums.length-1;while (left<right){int midleft(right-left)/2;if(nums[mid]>nums[mid1]){rightmid;}else {leftmid1;}}return left;} } 题解&#xff1a; 通过题意进行分析…

CRM是怎样帮助企业从激烈竞争中脱颖而出的?

有限的市场机会与资源推动了市场竞争。市场竞争是在所难免的&#xff0c;但企业可以借助CRM管理系统调整其业务策略&#xff0c;在市场上很多竞争者中突围。CRM系统怎样帮助企业赢得市场竞争&#xff1f; 以下五个功能点是关键&#xff1a;数据分析、客户管理、合作伙伴、营销自…

XCP详解「4.2·问题-加载信号过多导致FIFO buffer overflow」

APE write报问题 报文也提示80 溢出 检查测量配置

【噪音控制 】 铁氧体磁珠

1. 片状铁氧体磁珠的直流重叠特性 片状铁氧体磁珠是一种使用铁氧体的电感器。因此&#xff0c;当大电流通过时&#xff0c;需要特别注意由于磁饱和所造成的性能改变。图1是电流通过片状铁氧体磁珠时的阻抗值的变化示例。 图1 片状铁氧体磁珠的直流重叠特性示例 正因如此&am…

2023一起益企广东省中小企业数字化赋能活动(深圳站)成功举办

12月12日&#xff0c;由广东工业和信息化厅指导&#xff0c;广东省中小企业服务中心、深圳市中小企业服务局主办&#xff0c;深圳联通承办的2023年“一起益企”广东省中小企业数字化赋能专项对接志愿服务活动&#xff08;深圳站&#xff09;在深圳成功举办。 本次活动涵盖中小企…

Failed to open the referenced table ‘qrtz_job_details‘

依赖【表】或者【其他对象】执行拉到最前面即可。

RocketMQ 总体概括

目录 概述RocketMQ 领域模型MQ 解决的问题电商平台案例初步设计引入中间件设计 MQ 选型结束 概述 官网地址 RocketMQ 领域模型 官方领域模型概述 下面图&#xff0c;是在自己理解的基础上&#xff0c;对官方的模型图添加了一些。 Topic&#xff1a;主题&#xff0c;可以理解…

用栈解决迷宫问题

思想 使用栈来解决迷宫问题的思想是通过深度优先搜索算法来探索迷宫中的路径。栈的特点是后进先出&#xff0c;这正好符合深度优先搜索的思想&#xff0c;即先探索一个方向直到无法继续为止&#xff0c;然后回溯到上一个节点&#xff0c;再探索其他方向。 具体来说&#xff0…

DC电源模块:为您的电子设备提供稳定可靠的能量

DC电源模块&#xff1a;为您的电子设备提供稳定可靠的能量 BOSHIDA DC电源模块是一种电子设备&#xff0c;用于为其他电子设备提供稳定可靠的直流电能。它通常由一个输入端&#xff0c;一个输出端和一些内部电路组成。输入端通常接收来自交流电源或其他电源的电能&#xff0c;经…

前端走向未来:真相还是焦虑的贩卖?

目录 一、为什么会出现“前端已死”的言论 二、你如何看待“前端已死” 三、前端技术的未来发展趋势 四、前端人&#xff0c;该如何打好这场职位突围战&#xff1f; 我的其他博客 一、为什么会出现“前端已死”的言论 近来&#xff0c;IT圈内流传着“Java 已死、前端已凉”…

冷链托盘四向穿梭车|适用于-18℃~-25℃海格里斯HEGERLS冷库型托盘四向车系统

近年来随着物流行业的迅猛发展&#xff0c;托盘式四向穿梭车在电力、食品、医用、冷链等等行业得到了广泛应用&#xff0c;尤其在冷链物流场景中应用较多&#xff0c;目前设备已具备在-20℃至-25℃的环境中运行&#xff0c;尤其是-18℃及以下的冷链系统&#xff0c;采用四向穿梭…

mysql8 windows下修改my.ini配置 this is incompatible with sql_mode=only_full_group_by

1、找到安装路径 show variables like %sql_mode;SHOW VARIABLES LIKE config_file;SHOW VARIABLES LIKE %datadir%;SHOW VARIABLES; 2、修改 sql_modeSTRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION

Android取消深色适配

从Android10&#xff08;API 29&#xff09;开始&#xff0c;在原有的主题适配的基础上&#xff0c;Google开始提供了Force Dark机制&#xff0c;在系统底层直接对颜色和图片进行转换处理&#xff0c;原生支持深色模式。当系统设置深色主题背景或者进入省电模式情况下会进入深色…

cgteamwork与shotgrid对比

最近有项目接触使用并二开cgteamwork&#xff0c; 也重新认识了cgteamwork&#xff0c;感受到国产软件的强大&#xff0c;国内中小CG公司的首选&#xff0c;原因&#xff1a; 1 上手容易&#xff0c;不会的有售前工程师教&#xff0c;他们全国各地城市到处跑。 感概业务的强大…

Educoder--Linux实验--FTP服务器搭建(第1关~第4关)

第1关 sudo apt-get update sudo apt-get install vsftpd 过程中出现中断时&#xff0c;按“Y”继续。 第2关 sudo service vsftpd start sudo service vsftpd status 第3关 启动vsftpd服务&#xff1b; service vsftpd start 初始化嘎嘎重要&#xff01;&#xff01; …

台式扫描电镜与落地式扫描电镜详细对比

随着材料科学和微纳技术的快速发展&#xff0c;扫描电子显微镜已成为研究微观结构的一种重要工具。根据外形体积的不同&#xff0c;扫描电镜可以分为两大类:落地式扫描电镜和台式扫描电镜。本文将从探测器、易操作性、安装环境和价格等多个方面对两者进行比较。 一、发展历史 …

产品经理必掌握自定义元件流程图泳道图

&#x1f3ac; 艳艳耶✌️&#xff1a;个人主页 &#x1f525; 个人专栏 &#xff1a; 《产品经理管理项目周期及【Axure RP9】简介&安装&基本使用》 ⛺️ 越努力 &#xff0c;越幸运 目录 一、什么是自定义元件 1.1如何自定义元件 二、什么是流程图 &…

可替代LM5145,5.5V-100V Vin同步降压控制器_SCT82A30

SCT82A30是一款100V电压模式控制同步降压控制器&#xff0c;具有线路前馈。40ns受控高压侧MOSFET的最小导通时间支持高转换比&#xff0c;实现从48V输入到低压轨的直接降压转换&#xff0c;降低了系统复杂性和解决方案成本。如果需要&#xff0c;在低至6V的输入电压下降期间&am…

销售技巧培训之如何提升金融销售技巧

销售技巧培训之如何提升金融销售技巧 在金融行业&#xff0c;销售技巧是决定业绩成败的关键因素之一。无论是销售保险、股票、债券&#xff0c;还是提供投资咨询服务&#xff0c;都需要掌握一定的销售技巧。本文将探讨如何提升金融销售技巧&#xff0c;通过案例分析&#xff0…

【虹科分享】基于Redis Enterprise,LangChain,OpenAI 构建一个电子商务聊天机器人

如何构建你自己的商务聊天机器人&#xff1f;注意哦&#xff0c;是你自己的聊天机器人。一起来看看Redis Enterprise的向量检索是怎么帮你实现这个愿望的吧。 鉴于最近人工智能支持的API和网络开发工具的激增&#xff0c;似乎每个人都在将聊天机器人集成到他们的应用程序中。 …