【实时数仓】DWD层需求分析及实现思路、idea环境搭建、实现DWD层处理用户行为日志的功能

news2025/1/16 16:45:38

文章目录

  • 一 DWD层需求分析及实现思路
    • 1 分层需求分析
    • 2 每层的职能
    • 3 DWD层职能详细介绍
      • (1)用户行为日志数据
      • (2)业务数据
        • 4 DWD层数据准备实现思路
  • 二 环境搭建
    • 1 创建maven工程
    • 2 修改配置文件
      • (1)添加依赖
      • (2) 添加配置文件
    • 3 创建如下包结构
  • 三 准备用户行为日志-DWD层
    • 1 主要任务
    • 2 分区、分组和分流
    • 3 代码实现
      • (1)接收Kafka数据,并进行转换
        • a 封装操作Kafka的工具类,并提供获取kafka消费者的方法(读)
        • b Flink调用工具类读取数据的主程序
        • c 测试

一 DWD层需求分析及实现思路

1 分层需求分析

建设实时数仓的目的,主要是增加数据计算的复用性。每次新增加统计需求时,不至于从原始数据进行计算,而是从半成品继续加工而成。

这里从kafka的ods层读取用户行为日志以及业务数据,并进行简单处理,写回到kafka作为dwd层。

在这里插入图片描述

2 每层的职能

分层数据描述生成计算工具存储媒介
ODS原始数据,日志和业务数据日志服务器,maxwellkafka
DWD根据数据对象为单位进行分流,比如订单、页面访问等等。FLINKkafka
DWM对于部分数据对象进行进一步加工,比如独立访问、跳出行为。依旧是明细数据。FLINKkafka
DIM维度数据FLINKHBase
DWS根据某个维度主题将多个事实数据轻度聚合,形成主题宽表。FLINKClickhouse
ADS把Clickhouse中的数据根据可视化需要进行筛选聚合。Clickhouse SQL可视化展示

3 DWD层职能详细介绍

(1)用户行为日志数据

根据日志的不同类别做分流。

前端埋点中的数据全部放在kafka中ods_base_log主题中,如启动日志,页面访问日志,曝光日志等。虽然同是日志,但是却分为不同的种类,将来做数据统计时,全部从这一个主题中获取数据不方便。所以需要从ods_base_log主题中将数据取出来,根据日志的类型,将不同类型的数据放到不同的主题中,进行分流操作,如启动日志放到启动主题中,曝光日志放到曝光主题中,页面日志放到日志主题中。

(2)业务数据

根据业务数据的类型(维度 or 事实)做分流。

MySQL存储的业务数据中有很多张表,这些表分为两类,一类是事实表,一类是维度表。在采集数据时,只要业务数据发生变化就会通过maxwell采集到kafka的ods_base_db_m主题中,并没有区分事实和维度。如果是事实数据,希望将其放到kafka的不同单独主题中,如订单主题,订单明细主题,支付主题等。对于维度数据,不适合存放在kafka中,kafka不适合做长期存储,默认存储7天。海量数据的分析计算,同样不适合存放到MySQL中,因为在做分析计算时要不停的进行查询操作,给业务数据库造成很大的压力,且MySQL对于大量数据的查询,性能也较差。

在使用维度数据时,需要根据维度id查询出具体的数据,K-V型数据库比较适合存储维度数据,根据K获取V效率较高,KV数据库包括Redis和Hbase,Redis对于长期存储压力比较大,最终选择Hbase存储维度数据。

4 DWD层数据准备实现思路

  • 功能1:环境搭建。
  • 功能2:计算用户行为日志DWD层。
  • 功能3:计算业务数据DWD层。

二 环境搭建

1 创建maven工程

创建maven工程,gmall2022-realtime。

在这里插入图片描述

2 修改配置文件

(1)添加依赖

<properties>
    <java.version>1.8</java.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
    <flink.version>1.12.0</flink.version>
    <scala.version>2.12</scala.version>
    <hadoop.version>3.1.3</hadoop.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.68</version>
    </dependency>
    <!--如果保存检查点到hdfs上,需要引入此依赖-->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,这里使用log4j作为具体的日志实现-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
    </dependency>

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.14.0</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(2) 添加配置文件

在resources目录下创建log4j.properties配置文件

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

3 创建如下包结构

在这里插入图片描述

目录作用
app产生各层数据的flink任务
bean数据对象
common公共常量
utils工具类

三 准备用户行为日志-DWD层

前面采集的日志数据已经保存到Kafka中,作为日志数据的ODS层,从kafka的ODS层读取的日志数据分为3类,页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。

页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流。

1 主要任务

  • 识别新老用户:本身客户端业务有新老用户的标识,但是不够准确,需要用实时计算再次确认(不涉及业务操作,只是单纯的做个状态确认)。
  • 利用侧输出流实现数据拆分:根据日志数据内容,将日志数据分为3类, 页面日志、启动日志和曝光日志。页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光日志侧输出流。
  • 将不同流的数据推送下游的kafka的不同Topic中。

整体流程如下图:

在这里插入图片描述

2 分区、分组和分流

三者之间的关系和区别如下图:

在这里插入图片描述

3 代码实现

(1)接收Kafka数据,并进行转换

a 封装操作Kafka的工具类,并提供获取kafka消费者的方法(读)

/**
 * 操作kafka工具类
 */
public class MyKafkaUtil {
    private static final String KAFKA_SERVER = "hadoop101:9092,hadoop102:9092,hadoop103:9092";

    // 获取kafka的消费者
    public static FlinkKafkaConsumer<String> getKafkaSource(String topic,String groupId){
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_SERVER);
        // 定义消费者组
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        return new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),props);
    }
}

b Flink调用工具类读取数据的主程序

/**
 * 对日志数据进行分流操作
 *  启动、曝光、页面
 *    启动日志放到启动侧输出流中
 *    曝光日志放到曝光侧输出流中
 *    页面日志放到主流中
 *  将不同流的数据写回到kafka的dwd主题中
 */
public class BaseLogApp {
    public static void main(String[] args) throws Exception{
        // TODO 1 基本环境准备
        // 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(4);
        // TODO 2 检查点相关设置
        // 开启检查点
        // 每5S中开启一次检查点,检查点模式为EXACTLY_ONCE
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        //   设置检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        //   设置重启策略
        // 重启三次,每次间隔3s钟
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
        //   设置job取消后,检查点是否保留
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //   设置状态后端 -- 基于内存 or 文件系统 or RocksDB
        //   内存:状态存在TaskManager内存中,检查点存在JobManager内存中
        //   文件系统:状态存在TaskManager内存中,检查点存在指定的文件系统路径中
        //   RocksDB:看做和Redis类似的数据库,状态存在TaskManager内存中,检查点存在JobManager内存和本地磁盘上
        //   hadoop中nm的地址
        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
        //   指定操作HDFS的用户
        System.setProperty("HADOOP_USER_NAME","hzy");
        // TODO 3 从kafka读取数据
        // 声明消费的主题和消费者组
        String topic = "ods_base_log";
        String groupId = "base_log_app_group";

        // 获取kafka消费者
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        // 读取数据,封装为流
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        // TODO 4 对读取的数据进行结构的转换 jsonStr -> jsonObj
//        // 匿名内部类实现
//        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(
//                new MapFunction<String, JSONObject>() {
//                    @Override
//                    public JSONObject map(String jsonStr) throws Exception {
//                        return JSON.parseObject(jsonStr);
//                    }
//                }
//        );
//        // lambda表达式实现
//        kafkaDS.map(
//                jsonStr -> JSON.parse(jsonStr)
//        );

        // 方法的默认调用,注意导入的是alibaba JSON包
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        jsonObjDS.print(">>>");

        // TODO 5 修复新老访客状态

        // TODO 6 按照日志类型对日志进行分流

        // TODO 7 将不同流的数据写到kafka的dwd不同主题中


        env.execute();
    }
}

c 测试

# 启动zookeeper
# 启动kafka
# 启动采集服务
logger.sh start
# 启动nm以将检查点保存到hdfs上
start-dfs.sh
# 等待安全模式关闭,启动主程序,如果出现权限问题,可以将权限放开
hdfs dfs -chmod -R 777 /
# 或者增加以下代码到主程序中
System.setProperty("HADOOP_USER_NAME","hzy");
# 程序运行起来后,启动模拟生成日志数据jar包,在主程序中可以接收到数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/81830.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Faster RCNN精读

Faster R-CNN: Towards Real-Time Object Detection with Region Proposal Networks Faster R-CNN&#xff1a;使用区域建议网络实现实时目标检测 优异的网络模型总是经得起时间的推敲&#xff0c;Faster RCNN便是其中一员。 目录 一、摘要 二、结论 三、介绍和相关工作 …

R语言析因设计分析:线性模型中的对比

对比度可用于对线性模型中的处理进行比较。 常见的用途是使用析因设计时&#xff0c;除析因设计外还使用控制或检查处理。在下面的第一个示例中&#xff0c;有两个级别&#xff08;1和2&#xff09;的两个处理&#xff08;D和C&#xff09;&#xff0c;然后有一个对照 处理。此…

周末来哥家小聚一下

欢迎关注勤于奋 每天12点准时更新国外LEAD相关技术 是的&#xff0c;周末来哥家小聚&#xff0c;他们这儿人不多&#xff0c;就是一些认识的人&#xff0c;没有外人&#xff0c;加上疫情&#xff0c;一般不聚集&#xff0c;大家都清楚没事。 在他家没事&#xff0c;就陪小朋友…

灵活的类加载器OSGI

灵活的类加载器OSGI 简介 OSGi中的每个模块&#xff08;称为Bundle&#xff09;与普通的Java类库区别并不太大&#xff0c;两者一般都以JAR格式进行 封装[2]&#xff0c;并且内部存储的都是Java的Package和Class。但是一个Bundle可以声明它所依赖的Package&#xff08;通 过I…

最近的一点杂感

这是学习笔记的第 2444篇文章最近居家办公几周了&#xff0c;除了工作也能想想生活的事情&#xff0c;说说最近自己比较深的几四点感受吧。熵增和待办事项最初居家办公的时候&#xff0c;我们也有日会&#xff0c;也会有一些频繁的沟通&#xff0c;但是总是感觉目标的达成效果上…

JavaScript大作业 基于HTML+CSS+JavaScript站酷静态页面官网7页

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

超高质量堆排序详细讲解,图文并茂,看不懂打我

目录 一,题目 二&#xff0c;堆排序 1、堆排序基本介绍 2、堆排序基本思想 3、堆排序步骤图解说明 四&#xff0c;总结堆排序的整体思路 五&#xff0c;整体代码实现 1.普通版 2&#xff0c;STL版 一,题目 给定你一个长度为 n 的整数数列。 请你使用快堆排序对这个数列…

斐波那契数列问题【Java实现】

目录 题目&#xff1a; 背景&#xff1a; 方法一&#xff1a;暴力递归 复杂度O(2^N) 方法二&#xff1a;复杂度O(N) 方法三&#xff1a;O&#xff08;logN&#xff09;复杂度 两个矩阵相乘&#xff1a; 求矩阵m的p次方的代码实现&#xff1a; 用矩阵乘法求斐波那契数列第…

Linux | Posix信号量(semaphore) | 环形队列实现生产消费模型 | 线程池实现

文章目录POSIX信号量信号量接口讲解基于信号量和环形队列实现生产消费模型线程池的实现Posix信号量和System V信号量作用相同&#xff0c;都是用于共享资源的同步访问&#xff0c;Posix信号量通常用于线程间通信&#xff0c;而System V信号量常用于进程间通信&#xff0c;这篇博…

2022-12-09 Redis 学习

Redis简介 Nosql NOSQL&#xff1a;即Not-OnlySQL&#xff08;泛指非关系型的数据库&#xff09;&#xff0c;作为关系型数据库的补充。 作用&#xff1a;应对基于海量用户海量数据前提下的数据处理问题 特征&#xff1a; 可扩容&#xff0c;可伸缩大量数据下高性能灵活的数…

记录在使用git进行上传本地文件到github上遇到的一些问题以及解决办法

1.warning: in the working copy of ‘XXX’, LF will be replaced by 这是一个警告错误&#xff0c;其实可以不用管他。 如果想要不出现这个警告可以参考这个博客&#xff1a;https://blog.csdn.net/Babylonxun/article/details/126598477 2.fatal: ‘origin’ does not app…

阳了别怕,保护好自己

关注、星标公众号&#xff0c;直达精彩内容图片素材来源&#xff1a;网络素材作者&#xff1a;技术让梦想更伟大 | 李肖遥最近已经陆续有四五个同事&#x1f40f;了&#xff0c;如果是上一周&#xff0c;那我现在也是妥妥的密接了&#xff0c;不过现在放开的趋势愈发明显&#…

Web 服务的概述

Web 服务的概述 由于能够提供图形、声音等多媒体数据&#xff0c;再加上可以交互的动态 Web 语言的广泛普及&#xff0c;WWW&#xff08;World Wide Web&#xff0c;万维网&#xff09;深受Internet用户欢迎。一个最重要的证明就是&#xff0c;当前的绝大部分Internet流量都…

用SQL语句进行数据库查询(简单查询)

前言 &#x1f388;个人主页:&#x1f388; :✨✨✨初阶牛✨✨✨ &#x1f43b;推荐专栏: &#x1f354;&#x1f35f;&#x1f32f; c语言初阶 &#x1f511;个人信条: &#x1f335;知行合一 &#x1f349;本篇简介:>: 讲解使用SQL语句进行简单的数据查询、条件查询等. ✨…

考的好不如选的好?中国石油大学(华东)计算机考研报录比

中国石油大学&#xff08;华东&#xff09;是一所211大学&#xff0c;位于山东省青岛市。中国石油大学计算机学科评估B-&#xff0c;计算机实力在211大学中还算可以。前段时间&#xff0c;中国石油大学&#xff08;华东&#xff09;公布了今年考研的报名人数&#xff0c;区分出…

c#入门-枚举和数字互相转化

枚举的数值绑定 枚举的本质是数字&#xff0c;可以声明为常量。 同类的枚举之间&#xff0c;可以直接使用关系运算符&#xff08;大于小于等于这些&#xff09; 同类的枚举之间还可以直接使用数学运算符&#xff08;但不能直接和数字进行数学运算&#xff09; 枚举的默认数值…

app自动化环境配置mac版

android sdk 试了各种sdk tools下载、ADT bundle tools总有各种报错&#xff0c;最后直接官网android studio全套&#xff0c;记住下载页面的https改为http 没有试 brew install android-sdkandroid模拟器 能适配mac M1芯片的模拟器好像只有夜神&#xff0c;但是安装后一直卡…

微服务框架 SpringCloud微服务架构 微服务保护 32 隔离和降级 32.5 熔断策略【慢调用】

微服务框架 【SpringCloudRabbitMQDockerRedis搜索分布式&#xff0c;系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 微服务保护 文章目录微服务框架微服务保护32 隔离和降级32.5 熔断策略【慢调用】32.5.1 熔断策略【慢调用】32.5.2 案例32 隔离和降级 32.5…

为什么APM飞控装不上mission planner双旋翼三旋翼倾转旋翼机固件以及apm飞控红黄绿颜色LED灯的含义

双旋翼bicopter三旋翼tricopter教程本来就不多&#xff0c;看几遍安装博主用的同版本地面站还是无法加装固件&#xff0c;全部参数表也无法搜出你需要的参数尤其是frame class 问题出在那❓还是硬件兼容问题 打开官方网站 点击frame class 这里列出了所有支持的固件 问题就出…

Cybertec PostgreSQL透明加密解析

目前PostgreSQL官方并未推出透明加密功能&#xff0c;但是cybertec开源了一个分支&#xff0c;支持透明加密。感兴趣的同学可以参考&#xff1a;https://www.cybertec-postgresql.com/en/products/postgresql-transparent-data-encryption/它支持对数据和WAL进行透明加密。本文…