尚硅谷大数据项目《在线教育之实时数仓》笔记005

news2025/1/22 18:46:56

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第9章 数仓开发之DWD层

P031

P032

P033

P034

P035

P036

P037

P038

P039

P040


第9章 数仓开发之DWD层

P031

DWD层设计要点:

(1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。

(2)DWD层表名的命名规范为dwd_数据域_表名

存放事实表,从kafka的topic_log和topic_db中读取需要用到的业务流程相关数据,将业务流程关联起来做成明细数据写回kafka当中。

尚硅谷大数据学科全套教程\3.尚硅谷大数据学科--项目实战\尚硅谷大数据项目之在线教育数仓\尚硅谷大数据项目之在线教育数仓-3实时\资料\13.总线矩阵及指标体系

在线教育实时业务总线矩阵.xlsx

9.1.3 图解

P032

package com.atguigu.edu.realtime.app.dwd.log;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.DateFormatUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author 
 * @create 2023-04-21 14:01
 */
public class BaseLogApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 创建环境设置状态后端
        StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);

        //TODO 2 从kafka中读取主流数据
        String topicName = "topic_log";
        String groupId = "base_log_app";
        DataStreamSource<String> baseLogSource = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId),
                WatermarkStrategy.noWatermarks(),
                "base_log_source"
        );

        //TODO 3 对数据进行清洗转换
        // 3.1 定义侧输出流
        OutputTag<String> dirtyStreamTag = new OutputTag<String>("dirtyStream") {
        };
        // 3.2 清洗转换
        SingleOutputStreamOperator<JSONObject> cleanedStream = baseLogSource.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    ctx.output(dirtyStreamTag, value);
                }
            }
        });
        // 3.3 将脏数据写出到kafka对应的主题
        SideOutputDataStream<String> dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag);
        String dirtyTopicName = "dirty_data";
        dirtyStream.sinkTo(KafkaUtil.getKafkaProducer(dirtyTopicName, "dirty_trans"));

        //TODO 4 新老访客标记修复

        //TODO 5 数据分流

        //TODO 6 写出到kafka不同的主题

        //TODO 7 执行任务
    }
}

P033

KafkaUtil.java

P034

新老访客逻辑介绍

P035

BaseLogApp.java

//TODO 4 新老访客标记修复

[atguigu@node001 log]$ pwd
/opt/module/data_mocker/01-onlineEducation/log
[atguigu@node001 log]$ cat -n 200 app.2023-09-19.log
{"common":{"ar":"26","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_188","os":"iOS 13.3.1","sc":"1","sid":"b4d6c8eb-d025-4855-af0a-fe351ff16ef9","uid":"20","vc":"v2.1.134"},"page":{"during_time":901000,"item":"173","item_type":"paper_id","last_page_id":"course_detail","page_id":"exam"},"ts":1645456489411}
{
    "common":{
        "ar":"26",
        "ba":"iPhone",
        "ch":"Appstore",
        "is_new":"0",
        "md":"iPhone 8",
        "mid":"mid_188",
        "os":"iOS 13.3.1",
        "sc":"1",
        "sid":"b4d6c8eb-d025-4855-af0a-fe351ff16ef9",
        "uid":"20",
        "vc":"v2.1.134"
    },
    "page":{
        "during_time":901000,
        "item":"173",
        "item_type":"paper_id",
        "last_page_id":"course_detail",
        "page_id":"exam"
    },
    "ts":1645456489411
}

P036

BaseLogApp.java

//TODO 5 数据分流

P037

//TODO 6 写出到kafka不同的主题

hadoop、zookeeper、kafka。

  1. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic

  2. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic action_topic

  3. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic display_topic

  4. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic start_topic

  5. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic error_topic

  6. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic appVideo_topic

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic
[2023-11-01 14:36:17,581] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] Error while fetching metadata with correlation id 2 : {page_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-11-01 14:36:18,710] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] Error while fetching metadata with correlation id 6 : {page_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-11-01 14:36:18,720] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] The following subscribed topics are not assigned to any members: [page_topic]  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

[atguigu@node001 ~]$ f1.sh start
 -------- 启动 node001 采集flume启动 -------
[atguigu@node001 ~]$ cd /opt/module/data
data/        data_mocker/ datax/       
[atguigu@node001 ~]$ cd /opt/module/data
data/        data_mocker/ datax/       
[atguigu@node001 ~]$ cd /opt/module/data_mocker/
[atguigu@node001 data_mocker]$ cd 01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ ll
总用量 30460
-rw-rw-r-- 1 atguigu atguigu     2223 9月  19 10:43 application.yml
-rw-rw-r-- 1 atguigu atguigu  4057995 7月  25 10:28 edu0222.sql
-rw-rw-r-- 1 atguigu atguigu 27112074 7月  25 10:28 edu2021-mock-2022-06-18.jar
drwxrwxr-x 2 atguigu atguigu     4096 10月 26 14:01 log
-rw-rw-r-- 1 atguigu atguigu     1156 7月  25 10:44 logback.xml
-rw-rw-r-- 1 atguigu atguigu      633 7月  25 10:45 path.json
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar 
SLF4J: Class path contains multiple SLF4J bindings.

P038

9.2 流量域独立访客事务事实表

P039

package com.atguigu.edu.realtime.app.dwd.log;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.DateFormatUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author yhm
 * @create 2023-04-21 16:24
 */
public class DwdTrafficUniqueVisitorDetail {
    public static void main(String[] args) throws Exception {
        // TODO 1 创建环境设置状态后端
        StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);

        // TODO 2 读取kafka日志主题数据
        String topicName = "dwd_traffic_page_log";
        DataStreamSource<String> pageLogStream = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, "dwd_traffic_unique_visitor_detail"), WatermarkStrategy.noWatermarks(), "unique_visitor_source");

        // TODO 3 转换结构,过滤last_page_id不为空的数据
        SingleOutputStreamOperator<JSONObject> firstPageStream = pageLogStream.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    String lastPageID = jsonObject.getJSONObject("page").getString("last_page_id");
                    if (lastPageID == null) {
                        out.collect(jsonObject);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // TODO 4 安装mid分组
        KeyedStream<JSONObject, String> keyedStream = firstPageStream.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getJSONObject("common").getString("mid");
            }
        });

        // TODO 5 判断独立访客
        SingleOutputStreamOperator<JSONObject> filteredStream = keyedStream.filter(new RichFilterFunction<JSONObject>() {
            ValueState<String> lastVisitDtState;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                ValueStateDescriptor<String> stringValueStateDescriptor = new ValueStateDescriptor<>("last_visit_dt", String.class);
                // 设置状态的存活时间
                stringValueStateDescriptor.enableTimeToLive(StateTtlConfig
                        .newBuilder(Time.days(1L))
                        // 设置状态的更新模式为创建及写入
                        // 每次重新写入的时候记录时间  到1天删除状态
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build());
                lastVisitDtState = getRuntimeContext().getState(stringValueStateDescriptor);
            }

            @Override
            public boolean filter(JSONObject jsonObject) throws Exception {
                String visitDt = DateFormatUtil.toDate(jsonObject.getLong("ts"));
                String lastVisitDt = lastVisitDtState.value();
                // 对于迟到的数据,last日期会大于visit日期,数据也不要
                if (lastVisitDt == null || (DateFormatUtil.toTs(lastVisitDt) < DateFormatUtil.toTs(visitDt))) {
                    lastVisitDtState.update(visitDt);
                    return true;
                }
                return false;
            }
        });

        // TODO 6 将独立访客数据写出到对应的kafka主题
        String targetTopic = "dwd_traffic_unique_visitor_detail";
        SingleOutputStreamOperator<String> sinkStream = filteredStream.map((MapFunction<JSONObject, String>) JSONAware::toJSONString);
        sinkStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic, "unique_visitor_trans"));

        // TODO 7 运行任务
        env.execute();
    }
}

P040

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_traffic_unique_visitor_detail
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_traffic_page_log

[atguigu@node001 01-onlineEducation]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1162228.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Azure 机器学习 - 使用 Visual Studio Code训练图像分类 TensorFlow 模型

了解如何使用 TensorFlow 和 Azure 机器学习 Visual Studio Code 扩展训练图像分类模型来识别手写数字。 关注TechLead&#xff0c;分享AI全维度知识。作者拥有10年互联网服务架构、AI产品研发经验、团队管理经验&#xff0c;同济本复旦硕&#xff0c;复旦机器人智能实验室成员…

免费(daoban)gpt,同时去除广告

一. 内容简介 免费(daoban)gpt&#xff0c;同时去除广告&#xff0c;https://chat18.aichatos.xyz/&#xff0c;也可当gpt用&#xff0c;就是有点广告&#xff0c;大家也可以支持一下 二. 软件环境 2.1 Tampermonkey 三.主要流程 3.1 创建javascript脚本 点击添加新脚本 …

超详细!!!顺序表的实现

顺链表的实现 顺序表的概念及结构概念顺序表与数组的区别顺序表的结构 动态顺序表的实现头文件 "SeqList.h"定义结构体 SL 源文件顺列表的实现初始化顺列表 void SLInit(SL* ps)检查顺列表空间大小 void SLCheckCapacity(SL* ps)尾插数据 void SLPushBack(SL* ps,SLD…

java强转实验

不存在继承关系时&#xff0c;强转会出现编译时异常。即&#xff1a;无法将两个不同类型的对象做转换 当存在继承关系时&#xff0c;强转正常。备注&#xff1a;同名字段&#xff0c;类型一致&#xff0c;可以强转替代getset。同名字段&#xff0c;类型不一致&#xff0c;强转会…

网络安全之XSS漏洞

一. 引言 Cross-Site Scripting&#xff08;跨站脚本攻击&#xff09;简称XSS&#xff0c;是一种代码注入攻击。XSS 攻击通常指的是利用网页的漏洞&#xff0c;攻击者通过巧妙的方法注入 XSS 代码到网页&#xff0c;因为浏览器无法分辨哪些脚本是可信的&#xff0c;导致 XSS 脚…

matlab求解时变系统的Riccati矩阵微分方程

对于代数Riccati方程的求解网上能找到很多的资源&#xff0c;matlab也有成熟的函数&#xff0c;但是对于时变系统的Riccati矩阵微分方程&#xff0c;能找到的资料还比较少。 一、求解代数Riccati方程 可以在网上找到很多资料&#xff0c;如 https://blog.csdn.net/m0_622999…

python中有哪些你觉得超级牛的模块?

之前在做数据分析的时候&#xff0c;用过一个自动化生成数据探索报告的Python库&#xff1a;ydata_profiling 一般我们在做数据处理前会进行数据探索&#xff0c;包括看统计分布、可视化图表、数据质量情况等&#xff0c;这个过程会消耗很多时间&#xff0c;可能需要上百行代码…

Linux--线程--互斥锁

1.互斥量 a&#xff09;互斥量&#xff08;mutex&#xff09;从本质上来说是一把锁&#xff0c;一般在主线程中定义一个互斥量&#xff0c;就是定义一把锁。然后根据我们的需求来对线程操作这把锁。 b&#xff09;如果给所有的线程都加上锁了&#xff0c;线程们会去争取内存空…

2018年第三届 美亚杯电子取证 个人赛题解

1 Victor的笔记本电脑己成功取证并制作成法证映像档 (Forensic Image)&#xff0c;下列哪个是其MD5哈希值? (2分) A. FC20782C21751AB76B2A93F3A17922D0 B. 5F1BDEB87EE9F710C90CFB3A0BB01616 C. A0BB016160CFB3A0BB0161661670CFB3 D. 917ED59083C8B35C54D3FCBFE4C4BB0B E. F…

当你在浏览器地址栏输入一个URL后,将会发生的事情?个人笔记

客户端 在浏览器输入 URL 回车之后发生了什么&#xff08;超详细版&#xff09; - 知乎 (zhihu.com) 大致流程是&#xff1a; URL 解析DNS 查询TCP 连接处理请求接受响应渲染页面 1.URL解析 地址解析&#xff1a; 首先判断你输入是否是一个合法的URL还是一个待搜索的关键…

上市公司-供应链效率数据集(2000-2022年)

参照张倩肖&#xff08;2023&#xff09;、Feng&#xff08;2015&#xff09;、张树山&#xff08;2023&#xff09;的做法&#xff0c;团队以库存周转天数来衡量供应链效率 库存周转天数有效克服了因企业保留安全库存而导致供应链效率较低的测算误差&#xff0c;体现供应链上…

回归预测 | Matlab实现POA-CNN-SVM鹈鹕算法优化卷积神经网络-支持向量机多变量回归预测

Matlab实现POA-CNN-SVM鹈鹕算法优化卷积神经网络-支持向量机多变量回归预测 目录 Matlab实现POA-CNN-SVM鹈鹕算法优化卷积神经网络-支持向量机多变量回归预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.POS-CNN-SVM鹈鹕算法优化卷积神经网络-支持向量机的多变量回归…

好用的CRM软件都有哪些功能?

好用的CRM软件不仅仅是将客户资料存档&#xff0c;更大的作用还在于充分发挥数据的价值提升客户管理效率。如果您了解过多款CRM软件就一定会发现它们的功能都不尽相同&#xff0c;但是好用的CRM工具离不开这些功能&#xff1a; 一、客户视图 客户视图主要由4类数据组成&#…

基于springboot实现游戏分享网站系统项目【项目源码+论文说明】

基于springboot实现游戏分享网站演示 摘要 网络的广泛应用给生活带来了十分的便利。所以把游戏分享管理与现在网络相结合&#xff0c;利用java技术建设游戏分享网站&#xff0c;实现游戏分享的信息化。则对于进一步提高游戏分享管理发展&#xff0c;丰富游戏分享管理经验能起到…

跨境商城源码价格

在当今数字商务的时代&#xff0c;跨境电商已经成为了越来越多企业的选择。然而&#xff0c;要建立一个高效、便捷、全球化的跨境商城并不是一件简单的事情。所幸&#xff0c;现在有一个开源的解决方案&#xff0c;给企业提供了无限的可能性。跨境商城源码价格合乎实际&#xf…

浅谈AcrelEMS-CB商业建筑能源管理系统解决方案-安科瑞 蒋静

1概述 AcrelEMS-CB商业建筑能源管理系统&#xff0c;集电力监控、电能质量监测与治理、电气安全预警、能耗分析、照明控制、新能源使用、能源收费以及设备运维等功能于一体&#xff0c;通过一套系统对商业建筑的能源进行统一监控、统一运维和调度&#xff0c;系统可以通过WEB和…

对比学习(contrastive Learning)

起源和定义 自监督学习又可以分为对比学习(contrastive learning)和生成学习(generative learning)两条主要的技术路线。 比学习的核心思想是将正样本和负样本在特征空间对比&#xff0c;从而学习样本的特征表示&#xff0c;使得样本与正样本的特征表示尽可能接近。正样本和负…

webase编译合约一直转圈卡住解决方案

问题:webase编译合约一直转圈卡住,等再久也没反应 解决方案: 进入webase-web目录,然后进入static\js目录,执行以下命令: curl -#L https://osp-1257653870.cos.ap-guangzhou.myqcloud.com/WeBASE/download/solidity/wasm/v0.4.25.js -o v0.4.25.js curl -#L https://os…

Unity AssetBundle打包

1&#xff0c;AssetBundle的概念与作用 AssetBundle是一个存档文件&#xff0c;是Unity提供的一种用于存储资源的资源压缩包&#xff0c;可以包含模型、贴图、音频、预制体等。 Unity中的AssetBundle系统是对资源管理的一种扩展&#xff0c;通过将资源分布在不同的AB包中可以最…

SpringBoot--Web开发篇:含enjoy模板引擎整合,SpringBoot整合springMVC;及上传文件至七牛云;restFul

SpringBoot的Web开发 官网学习&#xff1a; 进入spring官网 --> projects --> SpringBoot --> LEARN --> Reference Doc. --> Web --> 就能看到上述页面 静态资源映射规则 官方文档 总结&#xff1a; 只要是静态资源&#xff0c;放在类路径下&#xff1…