文章目录
- 一 DWM层-订单宽表
- 1 维表关联代码实现
- (1)优化2:异步查询
- a 关联省市维度
- b 关联SKU维度
- c 关联SPU维度
- d 关联品类维度
- e 关联品牌维度
- f 最终结果展示
- (2)结果写入kafka sink
- 二 DWM层-支付宽表
- 1 需求分析与思路
- 2 需求实现
- (1)创建支付实体类PaymentInfo
- (2)创建支付宽表实体类PaymentWide
- (3)读取数据
- (4)指定水位线并提取事件时间字段(待优化)
- (5)封装日期转换工具类
- (6)修改后版本
- (7)线程安全的日期转换工具类
- (8)抽取Flink应用基类
- (9)支付表和订单宽表双流join并写入到kafka
- (10)测试
一 DWM层-订单宽表
1 维表关联代码实现
(1)优化2:异步查询
a 关联省市维度
// TODO 9 和地区维度进行关联
SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDS = AsyncDataStream.unorderedWait(
orderWideWithUserDS,
new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
// name
// area_code
// iso_code
// iso_3166_2
@Override
public void join(OrderWide orderWide, JSONObject dimJsonObj) throws Exception {
orderWide.setProvince_name(dimJsonObj.getString("NAME"));
orderWide.setProvince_area_code("AREA_CODE");
orderWide.setProvince_iso_code("ISO_CODE");
orderWide.setProvince_3166_2_code("ISO_3166_2");
}
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getProvince_id().toString();
}
},
60,
TimeUnit.SECONDS
);
orderWideWithProvinceDS.print();
测试
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2022 --table base_province --client_id maxwell_1
模拟生成业务数据,验证结果。
b 关联SKU维度
// TODO 10 和sku维度进行关联
SingleOutputStreamOperator<OrderWide> orderWideWithSkuDS = AsyncDataStream.unorderedWait(
orderWideWithProvinceDS,
new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
orderWide.setSku_name(jsonObject.getString("SKU_NAME"));
orderWide.setCategory3_id(jsonObject.getLong("CATEGORY3_ID"));
orderWide.setSpu_id(jsonObject.getLong("SPU_ID"));
orderWide.setTm_id(jsonObject.getLong("TM_ID"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getSku_id());
}
}, 60, TimeUnit.SECONDS);
初始化SKU维度数据到Hbase(通过Maxwell的Bootstrap)
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2022 --table sku_info --client_id maxwell_1
c 关联SPU维度
SingleOutputStreamOperator<OrderWide> orderWideWithSpuDS = AsyncDataStream.unorderedWait(
orderWideWithSkuDS,
new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
orderWide.setSpu_name(jsonObject.getString("SPU_NAME"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getSpu_id());
}
}, 60, TimeUnit.SECONDS);
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2022 --table spu_info --client_id maxwell_1
d 关联品类维度
SingleOutputStreamOperator<OrderWide> orderWideWithCategory3DS = AsyncDataStream.unorderedWait(
orderWideWithSpuDS,
new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
orderWide.setCategory3_name(jsonObject.getString("NAME"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getCategory3_id());
}
}, 60, TimeUnit.SECONDS);
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2022 --table base_category3 --client_id maxwell_1
e 关联品牌维度
SingleOutputStreamOperator<OrderWide> orderWideWithTmDS = AsyncDataStream.unorderedWait(
orderWideWithCategory3DS, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
@Override
public void join(OrderWide orderWide, JSONObject jsonObject) throws Exception {
orderWide.setTm_name(jsonObject.getString("TM_NAME"));
}
@Override
public String getKey(OrderWide orderWide) {
return String.valueOf(orderWide.getTm_id());
}
}, 60, TimeUnit.SECONDS);
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2022 --table base_trademark --client_id maxwell_1
f 最终结果展示
模拟数据生成,查询结果
至此订单宽表准备完成,执行最终写回kafka中的操作。
(2)结果写入kafka sink
// TODO 14 将订单宽表数据写回到kafka的dwd_order_wide主题中
// JSON.parseObject(jsonStr) : 将json格式的字符串转换为JSON对象
// JSON.parseObject(jsonStr,类型) : 将json格式的字符串转换为指定格式对象
// JSON.toJSONString(orderWide) : 将对象转换为JSON格式字符串
orderWideWithTmDS
// 将OrderWide对象转换为JSON格式字符串
.map(orderWide -> JSON.toJSONString(orderWide))
.addSink(
// 放到同一个主题中
MyKafkaUtil.getKafkaSink("dwm_order_wide")
);
启动kafka消费者
kfkcon.sh dwm_order_wide
模拟生成数据,kafka中结果如下
订单宽表已完成,总体流程如下
二 DWM层-支付宽表
1 需求分析与思路
支付宽表的目的,最主要的原因是支付表没有到订单明细,支付金额没有细分到商品上,没有办法统计商品级的支付状况。
所以支付宽表的核心就是要把支付表的信息与订单明细关联上。
解决方案有两个
- 一个是把订单明细表(或者宽表)输出到Hbase上,在支付宽表计算时查询hbase,这相当于把订单明细作为一种维度进行管理。
- 一个是用流的方式接收订单明细,然后用双流join方式进行合并。因为订单与支付产生有一定的时差。所以必须用intervalJoin来管理流的状态时间,保证当支付到达时订单明细还保存在状态中。
总体流程如下
2 需求实现
(1)创建支付实体类PaymentInfo
package com.hzy.gmall.realtime.beans;
import lombok.Data;
import java.math.BigDecimal;
/**
* Desc: 支付信息实体类
*/
@Data
public class PaymentInfo {
Long id;
Long order_id;
Long user_id;
BigDecimal total_amount;
String subject;
String payment_type;
String create_time;
String callback_time; // 支付成功的时间需要看回调时间而不是创建时间
}
(2)创建支付宽表实体类PaymentWide
package com.hzy.gmall.realtime.beans;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.beanutils.BeanUtils;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
/**
* Desc: 支付宽表实体类
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class PaymentWide {
Long payment_id;
String subject;
String payment_type;
String payment_create_time;
String callback_time;
Long detail_id;
Long order_id ;
Long sku_id;
BigDecimal order_price ;
Long sku_num ;
String sku_name;
Long province_id;
String order_status;
Long user_id;
BigDecimal total_amount;
BigDecimal activity_reduce_amount;
BigDecimal coupon_reduce_amount;
BigDecimal original_total_amount;
BigDecimal feight_fee;
BigDecimal split_feight_fee;
BigDecimal split_activity_amount;
BigDecimal split_coupon_amount;
BigDecimal split_total_amount;
String order_create_time;
String province_name;//查询维表得到
String province_area_code;
String province_iso_code;
String province_3166_2_code;
Integer user_age ;
String user_gender;
Long spu_id; //作为维度数据 要关联进来
Long tm_id;
Long category3_id;
String spu_name;
String tm_name;
String category3_name;
public PaymentWide(PaymentInfo paymentInfo, OrderWide orderWide){
mergeOrderWide(orderWide);
mergePaymentInfo(paymentInfo);
}
public void mergePaymentInfo(PaymentInfo paymentInfo ) {
if (paymentInfo != null) {
try {
BeanUtils.copyProperties(this,paymentInfo);
payment_create_time=paymentInfo.create_time;
payment_id = paymentInfo.id;
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
public void mergeOrderWide(OrderWide orderWide ) {
if (orderWide != null) {
try {
BeanUtils.copyProperties(this,orderWide);
order_create_time=orderWide.create_time;
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
}
(3)读取数据
package com.hzy.gmall.realtime.app.dwm;
/**
* 支付宽表的准备
*/
public class PaymentWideApp {
public static void main(String[] args) throws Exception {
//TODO 1.基本环境准备
//1.1 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 设置并行度
env.setParallelism(4);
//TODO 2.检查点相关设置(略)
//TODO 3.从Kafka中读取数据
//3.1 声明消费的主题以及消费者组
String paymentInfoSourceTopic = "dwd_payment_info";
String orderWideSourceTopic = "dwm_order_wide";
String groupId = "payment_wide_app_group";
//3.2 获取kafka消费者
FlinkKafkaConsumer<String> paymentInfoSource = MyKafkaUtil.getKafkaSource(paymentInfoSourceTopic, groupId);
FlinkKafkaConsumer<String> orderWideSource = MyKafkaUtil.getKafkaSource(orderWideSourceTopic, groupId);
//3.3 读取数据 封装为流
DataStreamSource<String> paymentInfoStrDS = env.addSource(paymentInfoSource);
DataStreamSource<String> orderWideStrDS = env.addSource(orderWideSource);
//TODO 4.对流中数据类型进行转换 String ->实体对象
//支付
SingleOutputStreamOperator<PaymentInfo> paymentInfoDS = paymentInfoStrDS.map(jsonStr -> JSON.parseObject(jsonStr, PaymentInfo.class));
//订单宽表
SingleOutputStreamOperator<OrderWide> orderWideInfoDS = orderWideStrDS.map(jsonStr -> JSON.parseObject(jsonStr, OrderWide.class));
paymentInfoDS.print("支付数据:");
orderWideInfoDS.print("订单宽表:");
env.execute();
}
}
开启BaseDBApp、OrderWideApp、PaymentWideApp,模拟生成业务数据,查看结果。
(4)指定水位线并提取事件时间字段(待优化)
//TODO 5.指定Watermark并提取事件时间字段
//支付
SingleOutputStreamOperator<PaymentInfo> paymentInfoWithWatermarkDS = paymentInfoDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<PaymentInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
new SerializableTimestampAssigner<PaymentInfo>() {
@Override
public long extractTimestamp(PaymentInfo paymentInfo, long recordTimestamp) {
String callbackTimeStr = paymentInfo.getCallback_time();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long ts = null;
try {
ts = sdf.parse(callbackTimeStr).getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return ts;
}
}
)
);
// 订单宽表
SingleOutputStreamOperator<OrderWide> orderWideWithWatermarkDS = orderWideInfoDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderWide>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
new SerializableTimestampAssigner<OrderWide>() {
@Override
public long extractTimestamp(OrderWide orderWide, long recordTimestamp) {
String createTimeStr = orderWide.getCreate_time();
// 每调用一次创建一个对象,不好
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-HH-dd HH:mm:ss");
Long ts = null;
try {
ts = sdf.parse(createTimeStr).getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return ts;
}
}
)
);
(5)封装日期转换工具类
package com.hzy.gmall.realtime.utils;
/**
* 日期转换的工具类
*/
public class DateTimeUtil {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static Long toTs(String dateStr){
Long ts = null;
try {
ts = sdf.parse(dateStr).getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return ts;
}
}
(6)修改后版本
//TODO 5.指定Watermark并提取事件时间字段
//支付
SingleOutputStreamOperator<PaymentInfo> paymentInfoWithWatermarkDS = paymentInfoDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<PaymentInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
new SerializableTimestampAssigner<PaymentInfo>() {
@Override
public long extractTimestamp(PaymentInfo paymentInfo, long recordTimestamp) {
return DateTimeUtil.toTs(paymentInfo.getCallback_time());
}
}
)
);
// 订单宽表
SingleOutputStreamOperator<OrderWide> orderWideWithWatermarkDS = orderWideInfoDS.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderWide>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(
new SerializableTimestampAssigner<OrderWide>() {
@Override
public long extractTimestamp(OrderWide orderWide, long recordTimestamp) {
return DateTimeUtil.toTs(orderWide.getCreate_time());
}
}
)
);
(7)线程安全的日期转换工具类
SimpleDateFormat
是线程不安全的,在JDK1.8之后,使用DateTimeFormatter
替换。
package com.hzy.gmall.realtime.utils;
/**
* 日期转换的工具类
*/
public class DateTimeUtil {
// private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
// 将日期对象转换为字符串
public static String toYMDHMS(Date date){
LocalDateTime localDateTime = LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
String dateStr = dtf.format(localDateTime);
return dateStr;
}
// 将字符串日期转换为毫秒数
public static Long toTs(String dateStr){
//Date == LocalDateTime Calendar == Instant
LocalDateTime localDateTime = LocalDateTime.parse(dateStr, dtf);
long ts = localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
return ts;
}
}
(8)抽取Flink应用基类
package com.hzy.gmall.realtime.app.dwd;
public abstract class Base {
public void entry() throws Exception {
//TODO 1 基本环境准备
//流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(4);
//TODO 2 检查点设置
//开启检查点
env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
// 设置检查点超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
// 设置job取消后,检查点是否保留
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
// 指定操作HDFS的用户
System.setProperty("HADOOP_USER_NAME","hzy");
executeBiz(env);
env.execute();
}
public abstract void executeBiz(StreamExecutionEnvironment env);
}
package com.hzy.gmall.realtime.app.dwd;
public class dwsApp extends Base{
public static void main(String[] args) throws Exception {
Base base = new dwsApp();
base.entry();
}
@Override
public void executeBiz(StreamExecutionEnvironment env) {
env.addSource(MyKafkaUtil.getKafkaSource("",""));
}
}
(9)支付表和订单宽表双流join并写入到kafka
// TODO 7 支付和订单宽表的双流join
SingleOutputStreamOperator<PaymentWide> paymentWideDS = paymentInfoKeyedDS
.intervalJoin(orderWideKeyedDS)
.between(Time.minutes(-30), Time.minutes(0))
.process(
new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() {
@Override
public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, Context ctx, Collector<PaymentWide> out) throws Exception {
out.collect(new PaymentWide(paymentInfo, orderWide));
}
}
);
paymentWideDS.print(">>>>");
// TODO 8 将支付宽表数据写到kafka的dwm_payment_wide
paymentWideDS
.map(paymentWide -> JSON.toJSONString(paymentWide))
.addSink(MyKafkaUtil.getKafkaSink("dwm_payment_wide"));
(10)测试
启动三个应用程序和一个kafka消费者,模拟业务数据的生成,查看最终结果。