文章目录
- 【Flink实时数仓】数据仓库项目实战 《四》日志数据分流-流量域 【DWD】
- 1.流量域未经加工的事务事实表
- 1.1主要任务
- 1.1.1数据清洗(ETL)
- 1.1.2新老访客状态标记修复
- 1.1.3新老访客状态标记修复
- 1.2图解
- 1.3代码
- 1.4数据测试
- 1.4.1 测试脏数据
- 1.4.2 测试err 和 start 数据
- 1.4.3 输入数据Display Action Page 数据
【Flink实时数仓】数据仓库项目实战 《四》日志数据分流-流量域 【DWD】
DWD层设计要点:
(1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。
(2)DWD层表名的命名规范为dwd_数据域_表名
1.流量域未经加工的事务事实表
1.1主要任务
1.1.1数据清洗(ETL)
数据传输过程中可能会出现部分数据丢失的情况,导致 JSON 数据结构不再完整,因此需要对脏数据进行过滤。
1.1.2新老访客状态标记修复
日志数据 common 字段下的 is_new 字段是用来标记新老访客状态的,1 表示新访客,0 表示老访客。前端埋点采集到的数据可靠性无法保证,可能会出现老访客被标记为新访客的问题,因此需要对该标记进行修复。
1.1.3新老访客状态标记修复
本节将通过分流对日志数据进行拆分,生成五张事务事实表写入 Kafka.
流量域页面浏览事务事实表
流量域启动事务事实表
流量域动作事务事实表
流量域曝光事务事实表
流量域错误事务事实表
1.2图解
1.3代码
代码来自尚硅谷,微信关注尚硅谷公众号 回复: 大数据 即可获取源码及资料。
展示主流程代码。具体工具类及实现请下载源码。
package com.atguigu.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
//数据流:web/app -> Nginx -> 日志服务器(.log) -> Flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程 序: Mock(lg.sh) -> Flume(f1) -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK)
public class BaseLogApp {
public static void main(String[] args) throws Exception {
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); //生产环境中设置为Kafka主题的分区数
//1.1 开启CheckPoint
//env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
//env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//1.2 设置状态后端
//env.setStateBackend(new HashMapStateBackend());
//env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/211126/ck");
//System.setProperty("HADOOP_USER_NAME", "atguigu");
//TODO 2.消费Kafka topic_log 主题的数据创建流
String topic = "topic_log";
String groupId = "base_log_app_211126";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));
//TODO 3.过滤掉非JSON格式的数据&将每行数据转换为JSON对象
OutputTag<String> dirtyTag = new OutputTag<String>("Dirty") {
};
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
try {
JSONObject jsonObject = JSON.parseObject(value);
out.collect(jsonObject);
} catch (Exception e) {
ctx.output(dirtyTag, value);
}
}
});
//获取侧输出流脏数据并打印
DataStream<String> dirtyDS = jsonObjDS.getSideOutput(dirtyTag);
dirtyDS.print("Dirty>>>>>>>>>>>>");
//TODO 4.按照Mid分组
KeyedStream<JSONObject, String> keyedStream = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"));
//TODO 5.使用状态编程做新老访客标记校验
SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = keyedStream.map(new RichMapFunction<JSONObject, JSONObject>() {
private ValueState<String> lastVisitState;
@Override
public void open(Configuration parameters) throws Exception {
lastVisitState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-visit", String.class));
}
@Override
public JSONObject map(JSONObject value) throws Exception {
//获取is_new标记 & ts 并将时间戳转换为年月日
String isNew = value.getJSONObject("common").getString("is_new");
Long ts = value.getLong("ts");
String curDate = DateFormatUtil.toDate(ts);
//获取状态中的日期
String lastDate = lastVisitState.value();
//判断is_new标记是否为"1"
if ("1".equals(isNew)) {
if (lastDate == null) {
lastVisitState.update(curDate);
} else if (!lastDate.equals(curDate)) {
value.getJSONObject("common").put("is_new", "0");
}
} else if (lastDate == null) {
lastVisitState.update(DateFormatUtil.toDate(ts - 24 * 60 * 60 * 1000L));
}
return value;
}
});
//TODO 6.使用侧输出流进行分流处理 页面日志放到主流 启动、曝光、动作、错误放到侧输出流
OutputTag<String> startTag = new OutputTag<String>("start") {
};
OutputTag<String> displayTag = new OutputTag<String>("display") {
};
OutputTag<String> actionTag = new OutputTag<String>("action") {
};
OutputTag<String> errorTag = new OutputTag<String>("error") {
};
SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
//尝试获取错误信息
String err = value.getString("err");
if (err != null) {
//将数据写到error侧输出流
ctx.output(errorTag, value.toJSONString());
}
//移除错误信息
value.remove("err");
//尝试获取启动信息
String start = value.getString("start");
if (start != null) {
//将数据写到start侧输出流
ctx.output(startTag, value.toJSONString());
} else {
//获取公共信息&页面id&时间戳
String common = value.getString("common");
String pageId = value.getJSONObject("page").getString("page_id");
Long ts = value.getLong("ts");
//尝试获取曝光数据
JSONArray displays = value.getJSONArray("displays");
if (displays != null && displays.size() > 0) {
//遍历曝光数据&写到display侧输出流
for (int i = 0; i < displays.size(); i++) {
JSONObject display = displays.getJSONObject(i);
display.put("common", common);
display.put("page_id", pageId);
display.put("ts", ts);
ctx.output(displayTag, display.toJSONString());
}
}
//尝试获取动作数据
JSONArray actions = value.getJSONArray("actions");
if (actions != null && actions.size() > 0) {
//遍历曝光数据&写到display侧输出流
for (int i = 0; i < actions.size(); i++) {
JSONObject action = actions.getJSONObject(i);
action.put("common", common);
action.put("page_id", pageId);
ctx.output(actionTag, action.toJSONString());
}
}
//移除曝光和动作数据&写到页面日志主流
value.remove("displays");
value.remove("actions");
out.collect(value.toJSONString());
}
}
});
//TODO 7.提取各个侧输出流数据
DataStream<String> startDS = pageDS.getSideOutput(startTag);
DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
DataStream<String> actionDS = pageDS.getSideOutput(actionTag);
DataStream<String> errorDS = pageDS.getSideOutput(errorTag);
//TODO 8.将数据打印并写入对应的主题
pageDS.print("Page>>>>>>>>>>");
startDS.print("Start>>>>>>>>");
displayDS.print("Display>>>>");
actionDS.print("Action>>>>>>");
errorDS.print("Error>>>>>>>>");
String page_topic = "dwd_traffic_page_log";
String start_topic = "dwd_traffic_start_log";
String display_topic = "dwd_traffic_display_log";
String action_topic = "dwd_traffic_action_log";
String error_topic = "dwd_traffic_error_log";
pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(page_topic));
startDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(start_topic));
displayDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(display_topic));
actionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(action_topic));
errorDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(error_topic));
//TODO 9.启动任务
env.execute("BaseLogApp");
}
}
1.4数据测试
1.4.1 测试脏数据
[root@hadoop102 kafka]# bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_log
>{"common":{"ar":
>
idea 结果脏数据打印,kafka未输出。
1.4.2 测试err 和 start 数据
输入数据
[root@hadoop102 kafka]# bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_log
>{"common":{"ar":
>{"common":{"ar":"110000","ba":"Xiaomi","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","os":"Android 11.0","uid":"513","vc":"v2.1.134"},"err":{"error_code":2633,"msg":" Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)"},"start":{"entry":"notice","loading_time":12438,"open_ad_id":7,"open_ad_ms":4407,"open_ad_skip_ms":0},"ts":1651217959000}
>
输出数据
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_start_log
{"common":{"ar":"110000","uid":"513","os":"Android 11.0","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","vc":"v2.1.134","ba":"Xiaomi"},"start":{"entry":"notice","open_ad_skip_ms":0,"open_ad_ms":4407,"loading_time":12438,"open_ad_id":7},"ts":1651217959000}
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_error_log
{"common":{"ar":"110000","uid":"513","os":"Android 11.0","ch":"xiaomi","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_1818969","vc":"v2.1.134","ba":"Xiaomi"},"err":{"msg":" Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":2633},"start":{"entry":"notice","open_ad_skip_ms":0,"open_ad_ms":4407,"loading_time":12438,"open_ad_id":7},"ts":1651217959000}
idea打印数据
1.4.3 输入数据Display Action Page 数据
输入数据
{"common":{"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone"},"err":{"msg":" Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":1559},"page":{"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity"},"displays":[{"display_type":"query","item":"15","item_type":"sku_id","pos_id":1,"order":1},{"display_type":"query","item":"26","item_type":"sku_id","pos_id":3,"order":2},{"display_type":"query","item":"31","item_type":"sku_id","pos_id":2,"order":3},{"display_type":"promotion","item":"29","item_type":"sku_id","pos_id":5,"order":4},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":2,"order":5},{"display_type":"recommend","item":"1","item_type":"sku_id","pos_id":1,"order":6}],"actions":[{"item":"5","action_id":"favor_add","item_type":"sku_id","ts":1651217964522}],"ts":1651217961000}
输出数据
dwd_traffic_page_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_page_log
{"common":{"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone"},"page":{"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity"},"ts":1651217961000}
dwd_traffic_page_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_page_log
{"common":{"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone"},"page":{"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity"},"ts":1651217961000}
dwd_traffic_display_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_display_log
{"display_type":"query","page_id":"good_detail","item":"15","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":1,"order":1,"ts":1651217961000}
{"display_type":"query","page_id":"good_detail","item":"26","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":3,"order":2,"ts":1651217961000}
{"display_type":"query","page_id":"good_detail","item":"31","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":2,"order":3,"ts":1651217961000}
{"display_type":"promotion","page_id":"good_detail","item":"29","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":5,"order":4,"ts":1651217961000}
{"display_type":"query","page_id":"good_detail","item":"9","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":2,"order":5,"ts":1651217961000}
{"display_type":"recommend","page_id":"good_detail","item":"1","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","item_type":"sku_id","pos_id":1,"order":6,"ts":1651217961000}
dwd_traffic_action_log
[root@hadoop102 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic dwd_traffic_action_log
{"page_id":"good_detail","item":"5","common":"{\"ar\":\"500000\",\"uid\":\"981\",\"os\":\"iOS 13.3.1\",\"ch\":\"Appstore\",\"is_new\":\"1\",\"md\":\"iPhone Xs Max\",\"mid\":\"mid_7030190\",\"vc\":\"v2.0.1\",\"ba\":\"iPhone\"}","action_id":"favor_add","item_type":"sku_id","ts":1651217964522}
dwd_traffic_error_log
{"common":{"ar":"500000","uid":"981","os":"iOS 13.3.1","ch":"Appstore","is_new":"1","md":"iPhone Xs Max","mid":"mid_7030190","vc":"v2.0.1","ba":"iPhone"},"err":{"msg":" Exception in thread \\ java.net.SocketTimeoutException\\n \\tat com.atgugu.gmall2020.mock.bean.log.AppError.main(AppError.java:xxxxxx)","error_code":1559},"page":{"page_id":"good_detail","item":"5","during_time":7045,"item_type":"sku_id","last_page_id":"good_list","source_type":"activity"},"displays":[{"display_type":"query","item":"15","item_type":"sku_id","pos_id":1,"order":1},{"display_type":"query","item":"26","item_type":"sku_id","pos_id":3,"order":2},{"display_type":"query","item":"31","item_type":"sku_id","pos_id":2,"order":3},{"display_type":"promotion","item":"29","item_type":"sku_id","pos_id":5,"order":4},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":2,"order":5},{"display_type":"recommend","item":"1","item_type":"sku_id","pos_id":1,"order":6}],"actions":[{"item":"5","action_id":"favor_add","item_type":"sku_id","ts":1651217964522}],"ts":1651217961000}
idea打印数据