文章目录
- 一 DWS层-访客主题计算
- 1 写入OLAP数据库
- (1)增加ClickhouseUtil
- a JdbcSink.<T>sink( )的四个参数说明
- b ClickhouseUtil中获取JdbcSink函数的实现
- c 构造者设计模式
- d 赋值给问号占位符并创建TransientSink注解
- e 在GmallConfig中配置ClickHouse的连接地址
- f 为主程序增加写入ClickHouse的Sink
- (2)整体测试
- 二 DWS层-商品主题计算
- 1 需求分析与思路
- 2 封装商品统计实体类ProductStats
- 3 创建ProductStatsApp,从Kafka主题中获得数据流
- 4 把JSON字符串数据流转换为统一数据对象的数据流
- (1)转换点击以及曝光流数据
- (2)转化收藏流数据
- (3)转换加购流数据
- (4)转换退款流数据
- (5)创建电商业务常量类GmallConstant
- (6)转换评价流数据
一 DWS层-访客主题计算
1 写入OLAP数据库
(1)增加ClickhouseUtil
a JdbcSink.sink( )的四个参数说明
- 参数1: 传入Sql,格式如:insert into xxx values(?,?,?,?)
- 参数2: 可以用lambda表达实现(jdbcPreparedStatement, t) -> t为数据对象,要装配到语句预编译器的参数中。
- 参数3:设定一些执行参数,比如重试次数,批次大小。
- 参数4:设定连接参数,比如地址,端口,驱动名。
b ClickhouseUtil中获取JdbcSink函数的实现
package com.hzy.gmall.realtime.utils;
/**
* 操作ClickHouse的工具类
*/
public class ClickhouseUtil {
public static <T> SinkFunction<T> getJdbcSink(String sql){
// "insert into visitor_stats_2022 values(?,?,?,?,?,?,?,?,?,?,?,?)"
SinkFunction<T> sinkFunction = JdbcSink.<T>sink(
sql,
new JdbcStatementBuilder<T>() {
// 参数 T obj:就是流中的一条数据
@Override
public void accept(PreparedStatement ps, T obj) throws SQLException {
// 获取流中obj属性值,赋值给问号
}
},
// 构造者设计模式
new JdbcExecutionOptions.Builder()
// 5条数据为一批,一批处理一次
// 4个并行度,每一个slot数量到5,才会保存到ClickHouse
.withBatchSize(5)
// // 2s后直接保存到ClickHouse
// .withBatchIntervalMs(2000)
// // 最大重试次数为3
// .withMaxRetries(3)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUrl(GmallConfig.CLICKHOUSE_URL)
.build()
);
return sinkFunction;
}
}
c 构造者设计模式
在设置一个对象属性值时,通常方式有两种:
第一种
Animal animal=new Animal("XX","XXkg","XX");
第二种
Animal animal=new Animal();
animal.setAge("XX");
animal.setweight("XXkg");
animal.setFood("XX");
- 第一种方式::相当于在构造函数里传递参数,但这样加入参数的时候,如果属性过多,不能明确的知道往这个对象里加入了哪种属性的内容。
- 第二种方式:虽然可以根据set函数名看到将要设置的值是什么值,但是这种写法,略显冗余。
在设计模式中有构造者模式(builder),在类的构造器或静态工厂具有多个参数。设计这种类时,builder模式就是个不错的选择。
构造者就是在外部类中创建的一个内部类,然后在内部类中对属性赋值,一次赋值完成后,返回的类型是内部类自己,而上面的第二种赋值方式没有返回值,不可以链式赋值,而构造者模式可以进行链式赋值。在经过许多步赋值操作之后,通过.build()方法通过内部类去创建外部类对象。
详情见Java的构造者模式(builder)。
d 赋值给问号占位符并创建TransientSink注解
该注解标记不需要保存的字段。
由于之前的ClickhouseUtil工具类的写入机制就是把该实体类的所有字段按次序一次写入数据表。但是实体类有时会用到一些临时字段,计算中有用但是并不需要最终保存在临时表中。我们可以把这些字段做一些标识,然后再写入的时候判断标识来过滤掉这些字段。
为字段打标识通常的办法就是给字段加个注解,这里就增加一个自定义注解@TransientSink来标识该字段不需要保存到数据表中。
new JdbcStatementBuilder<T>() {
// 参数 T obj:就是流中的一条数据
// 获取流中对象obj的属性值,赋值给问号占位符
@Override
public void accept(PreparedStatement ps, T obj) throws SQLException {
// 获取流中对象所属类的属性
Field[] fields = obj.getClass().getDeclaredFields();
int skipNum = 0;
// 对属性数组进行遍历
for (int i = 0; i < fields.length; i++) {
// 获取每一个属性对象
Field field = fields[i];
// 判断该属性是否有@trannsient注解修饰
TransientSink transientSink = field.getAnnotation(TransientSink.class);
if (transientSink != null){
skipNum++;
continue;
}
// 设置私有属性的访问权限
field.setAccessible(true);
try {
// 获取对象的属性值
Object fieldValue = field.get(obj);
// 将属性的值赋值给问号占位符
// JDBC相关操作和查询结果集的列从1开始
ps.setObject(i+1-skipNum,fieldValue);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
e 在GmallConfig中配置ClickHouse的连接地址
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://hadoop101:8123/default";
f 为主程序增加写入ClickHouse的Sink
// TODO 9 将聚合统计之后的数据写到ClickHouse
reduceDS.addSink(
ClickhouseUtil.getJdbcSink("insert into visitor_stats_2022 values(?,?,?,?,?,?,?,?,?,?,?,?)")
);
(2)整体测试
- 启动ZK、Kafka、logger.sh、ClickHouse、【HDFS】
- 运行BaseLogApp
- 运行UniqueVisitApp
- 运行UserJumpDetailApp
- 运行VisitorStatsApp
- 运行rt_applog目录下的jar包
- 查看控制台输出
- 查看ClickHouse中visitor_stats_2022表数据
ClickHouse中数据如下:
二 DWS层-商品主题计算
统计主题 | 需求指标 | 输出方式 | 计算来源 | 来源层级 |
---|---|---|---|---|
商品 | 点击 | 多维分析 | page_log | dwd |
曝光 | 多维分析 | page_log | dwd | |
收藏 | 多维分析 | favor_info | dwd | |
加入购物车 | 多维分析 | cart_info | dwd | |
下单 | 可视化大屏 | order_wide | dwm | |
支付 | 多维分析 | payment_wide | dwm | |
退款 | 多维分析 | order_refund_info | dwd | |
评价 | 多维分析 | comment_info | dwd |
与访客的dws层的宽表类似,也是把多个事实表的明细数据汇总起来组合成宽表
1 需求分析与思路
- 从Kafka主题中获得数据流
- 把Json字符串数据流转换为统一数据对象的数据流
- 把统一的数据结构流合并为一个流
- 设定事件时间与水位线
- 分组、开窗、聚合
- 写入ClickHouse
整体流程如下图:
2 封装商品统计实体类ProductStats
使用构造者模式定义实体类。
package com.hzy.gmall.realtime.beans;
/**
* Desc: 商品统计实体类
* @Builder注解
* 可以使用构造者方式创建对象,给属性赋值
* @Builder.Default
* 在使用构造者方式给属性赋值的时候,属性的初始值会丢失
* 该注解的作用就是修复这个问题
* 例如在属性上赋值了初始值为0L,如果不加这个注解,通过构造者创建的对象属性值会变为null
*/
@Data
@Builder
public class ProductStats {
String stt;//窗口起始时间
String edt; //窗口结束时间
Long sku_id; //sku编号
String sku_name;//sku名称
BigDecimal sku_price; //sku单价
Long spu_id; //spu编号
String spu_name;//spu名称
Long tm_id; //品牌编号
String tm_name;//品牌名称
Long category3_id;//品类编号
String category3_name;//品类名称
@Builder.Default
Long display_ct = 0L; //曝光数
@Builder.Default
Long click_ct = 0L; //点击数
@Builder.Default
Long favor_ct = 0L; //收藏数
@Builder.Default
Long cart_ct = 0L; //添加购物车数
@Builder.Default
Long order_sku_num = 0L; //下单商品个数
@Builder.Default //下单商品金额
BigDecimal order_amount = BigDecimal.ZERO;
@Builder.Default
Long order_ct = 0L; //订单数
@Builder.Default //支付金额
BigDecimal payment_amount = BigDecimal.ZERO;
@Builder.Default
Long paid_order_ct = 0L; //支付订单数
@Builder.Default
Long refund_order_ct = 0L; //退款订单数
@Builder.Default
BigDecimal refund_amount = BigDecimal.ZERO;
@Builder.Default
Long comment_ct = 0L;//评论数
@Builder.Default
Long good_comment_ct = 0L; //好评数
@Builder.Default
@TransientSink
Set orderIdSet = new HashSet(); //用于统计订单数
@Builder.Default
@TransientSink
Set paidOrderIdSet = new HashSet(); //用于统计支付订单数
@Builder.Default
@TransientSink
Set refundOrderIdSet = new HashSet();//用于退款支付订单数
Long ts; //统计时间戳
}
3 创建ProductStatsApp,从Kafka主题中获得数据流
package com.hzy.gmall.realtime.app.dws;
/**
* 商品主题统计DWS
*/
public class ProductStatsApp {
public static void main(String[] args) throws Exception {
// TODO 1 基本环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// TODO 2 检查点相关配置(略)
// TODO 3 从kafka主题中读取数据
// 3.1 声明消费主题及消费者组
String groupId = "product_stats_app";
String pageViewSourceTopic = "dwd_page_log";
String orderWideSourceTopic = "dwm_order_wide";
String paymentWideSourceTopic = "dwm_payment_wide";
String cartInfoSourceTopic = "dwd_cart_info";
String favorInfoSourceTopic = "dwd_favor_info";
String refundInfoSourceTopic = "dwd_order_refund_info";
String commentInfoSourceTopic = "dwd_comment_info";
// 3.2 创建消费者对象
FlinkKafkaConsumer<String> pageViewSource = MyKafkaUtil.getKafkaSource(pageViewSourceTopic,groupId);
FlinkKafkaConsumer<String> orderWideSource = MyKafkaUtil.getKafkaSource(orderWideSourceTopic,groupId);
FlinkKafkaConsumer<String> paymentWideSource = MyKafkaUtil.getKafkaSource(paymentWideSourceTopic,groupId);
FlinkKafkaConsumer<String> favorInfoSourceSouce = MyKafkaUtil.getKafkaSource(favorInfoSourceTopic,groupId);
FlinkKafkaConsumer<String> cartInfoSource = MyKafkaUtil.getKafkaSource(cartInfoSourceTopic,groupId);
FlinkKafkaConsumer<String> refundInfoSource = MyKafkaUtil.getKafkaSource(refundInfoSourceTopic,groupId);
FlinkKafkaConsumer<String> commentInfoSource = MyKafkaUtil.getKafkaSource(commentInfoSourceTopic,groupId);
// 3.3 读取数据,封装为流
DataStreamSource<String> pageViewStrDS = env.addSource(pageViewSource);
DataStreamSource<String> favorInfoStrDS = env.addSource(favorInfoSourceSouce);
DataStreamSource<String> orderWideStrDS= env.addSource(orderWideSource);
DataStreamSource<String> paymentWideStrDS= env.addSource(paymentWideSource);
DataStreamSource<String> cartInfoStrDS= env.addSource(cartInfoSource);
DataStreamSource<String> refundInfoStrDS= env.addSource(refundInfoSource);
DataStreamSource<String> commentInfoStrDS= env.addSource(commentInfoSource);
env.execute();
}
}
4 把JSON字符串数据流转换为统一数据对象的数据流
点击流字符串格式
{
"page": {
"page_id": "good_detail",
"item":"9",
"during_time":15839,
"item_type":"sku_id",
"last_page_id":"home",
"source_type":"query"
},
曝光流字符串格式
{
"displays": [
{
"display_type": "activity",
"item": "1",
"item_type": "activity_id",
"pos_id": 5,
"order": 1
}
],
(1)转换点击以及曝光流数据
对点击流和曝光流分别做处理。
// TODO 4 对流中的数据进行类型转换 jsonStr -> ProductStats
// 4.1 转换点击以及曝光流数据
SingleOutputStreamOperator<ProductStats> clickAndDisplayStatsDS = pageViewStrDS.process(
new ProcessFunction<String, ProductStats>() {
@Override
public void processElement(String jsonStr, Context ctx, Collector<ProductStats> out) throws Exception {
JSONObject jsonObj = JSON.parseObject(jsonStr);
Long ts = jsonObj.getLong("ts");
// 判断是否为点击行为
JSONObject pageJsonObj = jsonObj.getJSONObject("page");
String pageId = pageJsonObj.getString("page_id");
if (pageId.equals("good_detail")) {
// 如果当前日志记录的页面是商品的详情页,认为这条日志记录的是点击行为
Long skuId = pageJsonObj.getLong("item");
// 下面代码等同于 new 外部类.内部类().build();
ProductStats productStats = ProductStats.builder()
.sku_id(skuId)
.click_ct(1L)
.ts(ts)
.build();
out.collect(productStats);
}
// 判断是否为曝光行为
JSONArray displays = jsonObj.getJSONArray("displays");
if (displays != null && displays.size() > 0) {
// 如果displays数组不为空,说明页面上有曝光行为,对所有曝光行为进行遍历
for (int i = 0; i < displays.size(); i++) {
JSONObject displaysJsonObj = displays.getJSONObject(i);
// 判断曝光的是否为商品
if (displaysJsonObj.getString("item_type").equals("sku_id")) {
Long skuId = displaysJsonObj.getLong("item");
ProductStats productStats1 = ProductStats.builder()
.sku_id(skuId)
.display_ct(1L)
.ts(ts)
.build();
out.collect(productStats1);
}
}
}
}
}
);
(2)转化收藏流数据
// 4.2 转化收藏流数据
SingleOutputStreamOperator<ProductStats> favorStatsDS = favorInfoStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
JSONObject jsonObj = JSON.parseObject(jsonStr);
ProductStats productStats = ProductStats.builder()
.sku_id(jsonObj.getLong("sku_id"))
.favor_ct(1L)
.ts(DateTimeUtil.toTs(jsonObj.getString("create_time")))
.build();
return productStats;
}
}
);
(3)转换加购流数据
// 4.3 转换加购流数据
SingleOutputStreamOperator<ProductStats> cartStatsDS = cartInfoStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
JSONObject jsonObj = JSON.parseObject(jsonStr);
ProductStats productStats = ProductStats.builder()
.sku_id(jsonObj.getLong("sku_id"))
.cart_ct(1L)
.ts(DateTimeUtil.toTs(jsonObj.getString("create_time")))
.build();
return productStats;
}
}
);
(4)转换退款流数据
一个订单中有多个商品需要退款,需要去重,使用HashSet,实体类中定义如下。
@Builder.Default
@TransientSink
Set refundOrderIdSet = new HashSet();//用于退款支付订单数
// 4.4 转换退款流数据
SingleOutputStreamOperator<ProductStats> refundStatsDS = refundInfoStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
JSONObject jsonObj = JSON.parseObject(jsonStr);
ProductStats productStats = ProductStats.builder()
.sku_id(jsonObj.getLong("sku_id"))
.refundOrderIdSet(
new HashSet(Collections.singleton(jsonObj.getLong("order_id")))
)
.refund_amount(jsonObj.getBigDecimal("refund_amout"))
.ts(DateTimeUtil.toTs(jsonObj.getString("create_time")))
.build();
return productStats;
}
}
);
(5)创建电商业务常量类GmallConstant
为转换评价流数据做准备
package com.hzy.gmall.realtime.beans;
/**
* Desc: 电商业务常量
*/
public class GmallConstant {
//10 单据状态
public static final String ORDER_STATUS_UNPAID="1001"; //未支付
public static final String ORDER_STATUS_PAID="1002"; //已支付
public static final String ORDER_STATUS_CANCEL="1003";//已取消
public static final String ORDER_STATUS_FINISH="1004";//已完成
public static final String ORDER_STATUS_REFUND="1005";//退款中
public static final String ORDER_STATUS_REFUND_DONE="1006";//退款完成
//11 支付状态
public static final String PAYMENT_TYPE_ALIPAY="1101";//支付宝
public static final String PAYMENT_TYPE_WECHAT="1102";//微信
public static final String PAYMENT_TYPE_UNION="1103";//银联
//12 评价
public static final String APPRAISE_GOOD="1201";// 好评
public static final String APPRAISE_SOSO="1202";// 中评
public static final String APPRAISE_BAD="1203";// 差评
public static final String APPRAISE_AUTO="1204";// 自动
//13 退货原因
public static final String REFUND_REASON_BAD_GOODS="1301";// 质量问题
public static final String REFUND_REASON_WRONG_DESC="1302";// 商品描述与实际描述不一致
public static final String REFUND_REASON_SALE_OUT="1303";// 缺货
public static final String REFUND_REASON_SIZE_ISSUE="1304";// 号码不合适
public static final String REFUND_REASON_MISTAKE="1305";// 拍错
public static final String REFUND_REASON_NO_REASON="1306";// 不想买了
public static final String REFUND_REASON_OTHER="1307";// 其他
//14 购物券状态
public static final String COUPON_STATUS_UNUSED="1401";// 未使用
public static final String COUPON_STATUS_USING="1402";// 使用中
public static final String COUPON_STATUS_USED="1403";// 已使用
//15退款类型
public static final String REFUND_TYPE_ONLY_MONEY="1501";// 仅退款
public static final String REFUND_TYPE_WITH_GOODS="1502";// 退货退款
//24来源类型
public static final String SOURCE_TYPE_QUREY="2401";// 用户查询
public static final String SOURCE_TYPE_PROMOTION="2402";// 商品推广
public static final String SOURCE_TYPE_AUTO_RECOMMEND="2403";// 智能推荐
public static final String SOURCE_TYPE_ACTIVITY="2404";// 促销活动
//购物券范围
public static final String COUPON_RANGE_TYPE_CATEGORY3="3301";//
public static final String COUPON_RANGE_TYPE_TRADEMARK="3302";//
public static final String COUPON_RANGE_TYPE_SPU="3303";//
//购物券类型
public static final String COUPON_TYPE_MJ="3201";//满减
public static final String COUPON_TYPE_DZ="3202";// 满量打折
public static final String COUPON_TYPE_DJ="3203";// 代金券
public static final String ACTIVITY_RULE_TYPE_MJ="3101";
public static final String ACTIVITY_RULE_TYPE_DZ ="3102";
public static final String ACTIVITY_RULE_TYPE_ZK="3103";
public static final String KEYWORD_SEARCH="SEARCH";
public static final String KEYWORD_CLICK="CLICK";
public static final String KEYWORD_CART="CART";
public static final String KEYWORD_ORDER="ORDER";
}
(6)转换评价流数据
需要统计总评论数、好评数。
// 4.5 转换评价流数据
SingleOutputStreamOperator<ProductStats> commentStatsDS = commentInfoStrDS.map(
new MapFunction<String, ProductStats>() {
@Override
public ProductStats map(String jsonStr) throws Exception {
JSONObject jsonObj = JSON.parseObject(jsonStr);
long goodct = GmallConstant.APPRAISE_GOOD.equals(jsonObj.getString("appraise")) ? 1L : 0L;
ProductStats productStats = ProductStats.builder()
.sku_id(jsonObj.getLong("sku_id"))
.ts(DateTimeUtil.toTs(jsonObj.getString("create_time")))
.comment_ct(1L)
.good_comment_ct(goodct)
.build();
return productStats;
}
}
);