文章目录
- 一 DWS层与DWM层的设计
- 1 设计思路
- 2 DWS层需求分析
- 二 DWM层-UV计算
- 1 需求分析与思路
- 2 从kafka中读取数据
- (1)代码实现
- (2)测试
- (3)总结
- 3 UV过滤 -- 独立访客计算
- (1)实现思路
- (2)代码实现
- 4 写入kafka
- 5 测试
一 DWS层与DWM层的设计
1 设计思路
之前通过分流等手段,把数据分拆成了独立的kafka topic。那么接下来如何处理数据,就要思考一下到底要通过实时计算出哪些指标项。
因为实时计算与离线不同,实时计算的开发和运维成本都是非常高的,要结合实际情况考虑是否有必要像离线数仓一样,建一个大而全的中间层。
如果没有必要大而全,这时候就需要大体规划一下要实时计算出的指标需求了。把这些指标以主题宽表的形式输出,就是DWS层。
2 DWS层需求分析
统计主题 | 需求指标 | 输出方式 | 计算来源 | 来源层级 |
---|---|---|---|---|
访客 | pv | 可视化大屏 | page_log直接可求 | dwd |
uv | 可视化大屏 | 需要用page_log过滤去重 | dwm | |
跳出明细 | 可视化大屏 | 需要通过page_log行为判断 | dwm | |
进入页面数 | 可视化大屏 | 需要识别开始访问标识 | dwd | |
连续访问时长 | 可视化大屏 | page_log直接可求 | dwd | |
商品 | 点击 | 多维分析 | page_log直接可求 | dwd |
收藏 | 多维分析 | 收藏表 | dwd | |
加入购物车 | 多维分析 | 购物车表 | dwd | |
下单 | 可视化大屏 | 订单宽表 | dwm | |
支付 | 多维分析 | 支付宽表 | dwm | |
退款 | 多维分析 | 退款表 | dwd | |
评论 | 多维分析 | 评论表 | dwd | |
地区 | pv | 多维分析 | page_log直接可求 | dwd |
uv | 多维分析 | 需要用page_log过滤去重 | dwm | |
下单 | 可视化大屏 | 订单宽表 | dwm | |
关键词 | 搜索关键词 | 可视化大屏 | 页面访问日志 直接可求 | dwd |
点击商品关键词 | 可视化大屏 | 商品主题下单再次聚合 | dws | |
下单商品关键词 | 可视化大屏 | 商品主题下单再次聚合 | dws |
当然实际需求还会有更多,这里主要以为可视化大屏为目的进行实时计算的处理。
DWM层的定位是主要服务于DWS,因为部分需求直接从DWD层到DWS层中间会有一定的计算量,而且这部分计算的结果很有可能被多个DWS层主题复用,所以部分DWD层会形成一层DWM,这里涉及业务主要包括:访问UV计算、 跳出明细计算、订单宽表、支付宽表。
二 DWM层-UV计算
1 需求分析与思路
UV,全称是Unique Visitor,即独立访客,对于实时计算中,也可以称为DAU(Daily Active User),即每日活跃用户,因为实时计算中的uv通常是指当日的访客数。
那么如何从用户行为日志中识别出当日的访客,有以下两点:
- 其一,是识别出该访客打开的第一个页面,表示这个访客开始进入应用。
- 其二,由于访客可以在一天中多次进入应用,所以要在一天的范围内进行去重。
2 从kafka中读取数据
工作流程如下:
(1)代码实现
public class UnionVistorApp {
public static void main(String[] args) throws Exception {
//TODO 1 基本环境准备
//1.1 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 设置并行度
env.setParallelism(4);
//TODO 2 检查点设置
// //2.1 开启检查点
// env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
// //2.2 设置检查点超时时间
// env.getCheckpointConfig().setCheckpointTimeout(60000L);
// //2.3 设置重启策略
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
// //2.4 设置job取消后,检查点是否保留
// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// //2.5 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
// env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
// //2.6 指定操作HDFS的用户
// System.setProperty("HADOOP_USER_NAME","hzy");
//TODO 3 从kafka中读取数据
//3.1 声明消费主题以及消费者组
String topic = "dwd_page_log";
String groupId = "union_visitor_app_group";
//3.2 获取kafka消费者对象
FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
//3.3 读取数据封装流
DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
//TODO 4 对读取的数据进行类型转换 String -> JSONObject
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);
jsonObjDS.print(">>>");
env.execute();
}
}
(2)测试
需要启动的进程:zookeeper、kafka、模拟生成日志jar包,logger.sh、UnionVistorApp、BaseLogApp。
- 启动logger.sh、zk、kafka
- 运行Idea中的BaseLogApp
- 运行Idea中的UniqueVisitApp
- 查看控制台输出
- 执行流程
模拟生成数据->日志处理服务器->写到kafka的ODS层(ods_base_log)->BaseLogApp分流->dwd_page_log->UniqueVisitApp读取输出
输出信息如下:
BaseLogApp
启动流::3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"start":{"entry":"install","open_ad_skip_ms":0,"open_ad_ms":5918,"loading_time":1480,"open_ad_id":11},"ts":1670158358000}
曝光流::1> {"display_type":"query","page_id":"good_detail","item":"10","item_type":"sku_id","pos_id":4,"order":4,"ts":1670158358000}
曝光流::3> {"display_type":"query","page_id":"good_detail","item":"1","item_type":"sku_id","pos_id":5,"order":6,"ts":1670158358000}
曝光流::1> {"display_type":"query","page_id":"good_detail","item":"5","item_type":"sku_id","pos_id":1,"order":5,"ts":1670158358000}
主流::3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"cart","during_time":15330,"last_page_id":"good_detail"},"ts":1670158358000}
UnionVistorApp
>>>:2> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"good_detail","item":"3","during_time":9775,"item_type":"sku_id","last_page_id":"good_list","source_type":"query"},"displays":[{"display_type":"recommend","item":"10","item_type":"sku_id","pos_id":4,"order":1},{"display_type":"recommend","item":"3","item_type":"sku_id","pos_id":1,"order":2},{"display_type":"promotion","item":"2","item_type":"sku_id","pos_id":4,"order":3},{"display_type":"query","item":"8","item_type":"sku_id","pos_id":1,"order":4},{"display_type":"query","item":"10","item_type":"sku_id","pos_id":5,"order":5},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":5,"order":6}],"actions":[{"item":"3","action_id":"favor_add","item_type":"sku_id","ts":1670158362887}],"ts":1670158358000}
>>>:4> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"trade","item":"4,6,10","during_time":5294,"item_type":"sku_ids","last_page_id":"cart"},"ts":1670158358000}
>>>:3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"cart","during_time":15330,"last_page_id":"good_detail"},"ts":1670158358000}
(3)总结
执行流程:
- 模拟生成日志jar包
- 将模拟生成的日志数据发送给Nginx进行负载均衡
- Nginx将请求转发给三台日志采集服务
- 三台日志采集服务接收到日志数据,将日志数据发送给kafka的ods_base_log主题中
- BaseLogApp应用程序从ods_base_log中读取数据,进行分流
- 启动日志:dwd_start_log
- 曝光日志:dwd_display_log
- 页面日志:dwd_page_log
- UnionVistorApp从dwd_page_log主题中读取数据
3 UV过滤 – 独立访客计算
(1)实现思路
- 首先用keyby按照mid进行分组,每组表示当前设备的访问情况
- 分组后使用keystate状态,记录用户进入时间,实现RichFilterFunction完成过滤
- 重写open 方法用来初始化状态
- 重写filter方法进行过滤
- 可以直接筛掉last_page_id不为空的字段,因为只要有上一页,说明这条不是这个用户进入的首个页面。
- 状态用来记录用户的进入时间,只要这个lastVisitDate是今天,就说明用户今天已经访问过了所以筛除掉。如果为空或者不是今天,说明今天还没访问过,则保留。
- 因为状态值主要用于筛选是否今天来过,所以这个记录过了今天基本上没有用了,这里enableTimeToLive 设定了1天的过期时间,避免状态过大。
(2)代码实现
//TODO 5 按照设备id对数据进行分组
KeyedStream<JSONObject, String> keyedDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
//TODO 6 实现过滤
//实现目的:如有一个用户在6月访问一次,11月访问一次,6-11月共访问两次,
// 如果一直保留其6月的访问状态,直到11月才去更新,会消耗很多资源,
// 所以需要将其访问时间放入状态中,定时进行更新。
SingleOutputStreamOperator<JSONObject> filterDS = keyedDS.filter(
new RichFilterFunction<JSONObject>() {
// 声明状态变量,用于存放上次访问日期
private ValueState<String> lastVistDateState;
// 声明日期格式工具类
private SimpleDateFormat sdf;
@Override
public void open(Configuration parameters) throws Exception {
sdf = new SimpleDateFormat("yyyyMMdd");
ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("lastVistDateState", String.class);
// 注意:UV可以延伸为日活统计,其状态值主要用于筛选当天是否访问过
// 那么状态超过今天就没有存在的意义
// 所以设置状态的失效时间为1天
// 粒度为天,不记录时分秒
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1))
// 默认值,当状态创建或者写入的时候会更新状态失效时间
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
// 默认值,状态过期后,如果还没有被清理,是否返回给状态调用者
// .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
valueStateDescriptor.enableTimeToLive(ttlConfig);
lastVistDateState = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
// 如果从其他页面跳转过来,直接过滤掉
String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
if (lastPageId != null && lastPageId.length() > 0) {
return false;
}
// 获取状态中的上次访问日期
String lastVisitDate = lastVistDateState.value();
String curVisitDate = sdf.format(jsonObj.getLong("ts"));
if (lastVisitDate != null && lastVisitDate.length() > 1 && lastVisitDate.equals(curVisitDate)) {
// 今天已经访问过
return false;
} else {
// 今天还没访问过
lastVistDateState.update(curVisitDate);
return true;
}
}
}
);
filterDS.print(">>>");
env.execute();
4 写入kafka
将过滤处理后的UV写入到Kafka的dwm_unique_visitor。
//TODO 7 将过滤后的uv数据,写回到kafka的dwm层
filterDS.map(jsonObj -> jsonObj.toJSONString()).addSink(
MyKafkaUtil.getKafkaSink("dwm_unique_visitor")
);
5 测试
# 启动logger.sh、zk、kafka
# 运行Idea中的BaseLogApp
# 运行Idea中的UniqueVisitApp
# 查看控制台输出以及kafka的dwm_unique_visit主题
# 执行流程
模拟生成数据->日志处理服务器->写到kafka的ODS层(ods_base_log)->BaseLogApp分流->dwd_page_log->UniqueVisitApp读取并处理->写回到kafka的dwm层
程序运行整体流程如下: