【实时数仓】DWM层设计模式、独立访客(UV)的计算

news2024/9/30 23:31:34

文章目录

  • 一 DWS层与DWM层的设计
    • 1 设计思路
    • 2 DWS层需求分析
  • 二 DWM层-UV计算
    • 1 需求分析与思路
    • 2 从kafka中读取数据
      • (1)代码实现
      • (2)测试
      • (3)总结
    • 3 UV过滤 -- 独立访客计算
      • (1)实现思路
      • (2)代码实现
    • 4 写入kafka
    • 5 测试

一 DWS层与DWM层的设计

1 设计思路

之前通过分流等手段,把数据分拆成了独立的kafka topic。那么接下来如何处理数据,就要思考一下到底要通过实时计算出哪些指标项。

因为实时计算与离线不同,实时计算的开发和运维成本都是非常高的,要结合实际情况考虑是否有必要像离线数仓一样,建一个大而全的中间层。

如果没有必要大而全,这时候就需要大体规划一下要实时计算出的指标需求了。把这些指标以主题宽表的形式输出,就是DWS层。

2 DWS层需求分析

统计主题需求指标输出方式计算来源来源层级
访客pv可视化大屏page_log直接可求dwd
uv可视化大屏需要用page_log过滤去重dwm
跳出明细可视化大屏需要通过page_log行为判断dwm
进入页面数可视化大屏需要识别开始访问标识dwd
连续访问时长可视化大屏page_log直接可求dwd
商品点击多维分析page_log直接可求dwd
收藏多维分析收藏表dwd
加入购物车多维分析购物车表dwd
下单可视化大屏订单宽表dwm
支付多维分析支付宽表dwm
退款多维分析退款表dwd
评论多维分析评论表dwd
地区pv多维分析page_log直接可求dwd
uv多维分析需要用page_log过滤去重dwm
下单可视化大屏订单宽表dwm
关键词搜索关键词可视化大屏页面访问日志 直接可求dwd
点击商品关键词可视化大屏商品主题下单再次聚合dws
下单商品关键词可视化大屏商品主题下单再次聚合dws

当然实际需求还会有更多,这里主要以为可视化大屏为目的进行实时计算的处理。

DWM层的定位是主要服务于DWS,因为部分需求直接从DWD层到DWS层中间会有一定的计算量,而且这部分计算的结果很有可能被多个DWS层主题复用,所以部分DWD层会形成一层DWM,这里涉及业务主要包括:访问UV计算、 跳出明细计算、订单宽表、支付宽表。

二 DWM层-UV计算

1 需求分析与思路

UV,全称是Unique Visitor,即独立访客,对于实时计算中,也可以称为DAU(Daily Active User),即每日活跃用户,因为实时计算中的uv通常是指当日的访客数。

那么如何从用户行为日志中识别出当日的访客,有以下两点:

  • 其一,是识别出该访客打开的第一个页面,表示这个访客开始进入应用。
  • 其二,由于访客可以在一天中多次进入应用,所以要在一天的范围内进行去重。

在这里插入图片描述

2 从kafka中读取数据

工作流程如下:

在这里插入图片描述

(1)代码实现

public class UnionVistorApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 基本环境准备
        //1.1 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);

        //TODO 2 检查点设置
//        //2.1 开启检查点
//        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
//        //2.2 设置检查点超时时间
//        env.getCheckpointConfig().setCheckpointTimeout(60000L);
//        //2.3 设置重启策略
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
//        //2.4 设置job取消后,检查点是否保留
//        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        //2.5 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
//        //2.6 指定操作HDFS的用户
//        System.setProperty("HADOOP_USER_NAME","hzy");

        //TODO 3 从kafka中读取数据
        //3.1 声明消费主题以及消费者组
        String topic = "dwd_page_log";
        String groupId = "union_visitor_app_group";
        //3.2 获取kafka消费者对象
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        //3.3 读取数据封装流
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 4 对读取的数据进行类型转换 String -> JSONObject
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        jsonObjDS.print(">>>");
        env.execute();
    }
}

(2)测试

需要启动的进程:zookeeper、kafka、模拟生成日志jar包,logger.sh、UnionVistorApp、BaseLogApp。

  • 启动logger.sh、zk、kafka
  • 运行Idea中的BaseLogApp
  • 运行Idea中的UniqueVisitApp
  • 查看控制台输出
  • 执行流程

模拟生成数据->日志处理服务器->写到kafka的ODS层(ods_base_log)->BaseLogApp分流->dwd_page_log->UniqueVisitApp读取输出

输出信息如下:

BaseLogApp

启动流::3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"start":{"entry":"install","open_ad_skip_ms":0,"open_ad_ms":5918,"loading_time":1480,"open_ad_id":11},"ts":1670158358000}
曝光流::1> {"display_type":"query","page_id":"good_detail","item":"10","item_type":"sku_id","pos_id":4,"order":4,"ts":1670158358000}
曝光流::3> {"display_type":"query","page_id":"good_detail","item":"1","item_type":"sku_id","pos_id":5,"order":6,"ts":1670158358000}
曝光流::1> {"display_type":"query","page_id":"good_detail","item":"5","item_type":"sku_id","pos_id":1,"order":5,"ts":1670158358000}
主流::3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"cart","during_time":15330,"last_page_id":"good_detail"},"ts":1670158358000}

UnionVistorApp

>>>:2> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"good_detail","item":"3","during_time":9775,"item_type":"sku_id","last_page_id":"good_list","source_type":"query"},"displays":[{"display_type":"recommend","item":"10","item_type":"sku_id","pos_id":4,"order":1},{"display_type":"recommend","item":"3","item_type":"sku_id","pos_id":1,"order":2},{"display_type":"promotion","item":"2","item_type":"sku_id","pos_id":4,"order":3},{"display_type":"query","item":"8","item_type":"sku_id","pos_id":1,"order":4},{"display_type":"query","item":"10","item_type":"sku_id","pos_id":5,"order":5},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":5,"order":6}],"actions":[{"item":"3","action_id":"favor_add","item_type":"sku_id","ts":1670158362887}],"ts":1670158358000}
>>>:4> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"trade","item":"4,6,10","during_time":5294,"item_type":"sku_ids","last_page_id":"cart"},"ts":1670158358000}
>>>:3> {"common":{"ar":"110000","uid":"45","os":"Android 11.0","ch":"360","is_new":"1","md":"Xiaomi Mix2 ","mid":"mid_13","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"cart","during_time":15330,"last_page_id":"good_detail"},"ts":1670158358000}

(3)总结

执行流程:

  • 模拟生成日志jar包
  • 将模拟生成的日志数据发送给Nginx进行负载均衡
  • Nginx将请求转发给三台日志采集服务
  • 三台日志采集服务接收到日志数据,将日志数据发送给kafka的ods_base_log主题中
  • BaseLogApp应用程序从ods_base_log中读取数据,进行分流
    • 启动日志:dwd_start_log
    • 曝光日志:dwd_display_log
    • 页面日志:dwd_page_log
  • UnionVistorApp从dwd_page_log主题中读取数据

3 UV过滤 – 独立访客计算

(1)实现思路

  • 首先用keyby按照mid进行分组,每组表示当前设备的访问情况
  • 分组后使用keystate状态,记录用户进入时间,实现RichFilterFunction完成过滤
  • 重写open 方法用来初始化状态
  • 重写filter方法进行过滤
    • 可以直接筛掉last_page_id不为空的字段,因为只要有上一页,说明这条不是这个用户进入的首个页面。
    • 状态用来记录用户的进入时间,只要这个lastVisitDate是今天,就说明用户今天已经访问过了所以筛除掉。如果为空或者不是今天,说明今天还没访问过,则保留。
    • 因为状态值主要用于筛选是否今天来过,所以这个记录过了今天基本上没有用了,这里enableTimeToLive 设定了1天的过期时间,避免状态过大。

(2)代码实现

		//TODO 5 按照设备id对数据进行分组
        KeyedStream<JSONObject, String> keyedDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));

        //TODO 6 实现过滤
        //实现目的:如有一个用户在6月访问一次,11月访问一次,6-11月共访问两次,
        // 如果一直保留其6月的访问状态,直到11月才去更新,会消耗很多资源,
        // 所以需要将其访问时间放入状态中,定时进行更新。
        SingleOutputStreamOperator<JSONObject> filterDS = keyedDS.filter(
                new RichFilterFunction<JSONObject>() {
                    // 声明状态变量,用于存放上次访问日期
                    private ValueState<String> lastVistDateState;
                    // 声明日期格式工具类
                    private SimpleDateFormat sdf;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        sdf = new SimpleDateFormat("yyyyMMdd");
                        ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("lastVistDateState", String.class);
                        // 注意:UV可以延伸为日活统计,其状态值主要用于筛选当天是否访问过
                        // 那么状态超过今天就没有存在的意义
                        // 所以设置状态的失效时间为1天
                        // 粒度为天,不记录时分秒
                        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1))
                                // 默认值,当状态创建或者写入的时候会更新状态失效时间
//                                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                                // 默认值,状态过期后,如果还没有被清理,是否返回给状态调用者
//                                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                                .build();
                        valueStateDescriptor.enableTimeToLive(ttlConfig);
                        lastVistDateState = getRuntimeContext().getState(valueStateDescriptor);
                    }

                    @Override
                    public boolean filter(JSONObject jsonObj) throws Exception {
                        // 如果从其他页面跳转过来,直接过滤掉
                        String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
                        if (lastPageId != null && lastPageId.length() > 0) {
                            return false;
                        }
                        // 获取状态中的上次访问日期
                        String lastVisitDate = lastVistDateState.value();
                        String curVisitDate = sdf.format(jsonObj.getLong("ts"));
                        if (lastVisitDate != null && lastVisitDate.length() > 1 && lastVisitDate.equals(curVisitDate)) {
                            // 今天已经访问过
                            return false;
                        } else {
                            // 今天还没访问过
                            lastVistDateState.update(curVisitDate);
                            return true;
                        }
                    }
                }
        );


        filterDS.print(">>>");
        env.execute();

4 写入kafka

将过滤处理后的UV写入到Kafka的dwm_unique_visitor。

//TODO 7 将过滤后的uv数据,写回到kafka的dwm层
filterDS.map(jsonObj -> jsonObj.toJSONString()).addSink(
        MyKafkaUtil.getKafkaSink("dwm_unique_visitor")
);

5 测试

# 启动logger.sh、zk、kafka
# 运行Idea中的BaseLogApp
# 运行Idea中的UniqueVisitApp
# 查看控制台输出以及kafka的dwm_unique_visit主题
# 执行流程 
模拟生成数据->日志处理服务器->写到kafka的ODS层(ods_base_log)->BaseLogApp分流->dwd_page_log->UniqueVisitApp读取并处理->写回到kafka的dwm层

程序运行整体流程如下:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/95982.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring+SpringMVC+MP登录案例(含拦截器)

技术框架 后端&#xff1a;Spring、Spring MVC、Mybatis-Plus 前端&#xff1a;HTML、CSS、Layui、JS、Jquery 功能模块技术 1、用户的每一个请求使用了SpringMVC 拦截器技术&#xff0c;没有登录的用户自动重定向到登录页 2、统一请求模式&#xff0c;使用Restful风格对后端…

贤鱼的刷题日常(数据结构栈学习)-1551:Sumsets--题目详解

&#x1f3c6;今日学习目标&#xff1a; &#x1f340;例题讲解1551:Sumsets ✅创作者&#xff1a;贤鱼 ⏰预计时间&#xff1a;25分钟 &#x1f389;个人主页&#xff1a;贤鱼的个人主页 &#x1f525;专栏系列&#xff1a;c &#x1f341;贤鱼的个人社区&#xff0c;欢迎你的…

学Python的理由有哪些?这四大理由足够了

学Python的理由有哪些&#xff1f;可能有人会说Python是一种计算机语言&#xff0c;具有简洁性、易读性、及可扩展性&#xff0c;相对于其他语言学起来会更加容易&#xff0c;目前应用也非常广泛等等。其实总结起来&#xff0c;学Python的理由不外乎四点&#xff0c;即丰富免费…

Python数据分析主要功能是什么?可以用来做什么?

Python是一种计算机程序设计语言&#xff0c;具有简洁性、易读性以及可扩展性&#xff0c;相较于其他语言学习起来更加容易。随着互联网的发展&#xff0c;Python知识也被越来越多的人所熟知。但还是有很多人不了解它究竟可以用来做什么&#xff0c;接下来就跟随我了解一下吧&a…

【轻量级开源ROS 的机器人设备(5)】--(2)拟议的框架——µROS节点

接上文&#xff1a; 【轻量级开源ROS 的机器人设备&#xff08;5&#xff09;】--&#xff08;1&#xff09;拟议的框架——ROS节点 四、开发工具 为了方便用户应用程序的开发&#xff0c;一个代码生成器&#xff0c;一个 堆栈使用分析器和演示项目包含在框架中包裹。 4.1 代…

截止12.17 bitahub踩坑,mask无数次更改,lama代码的那些痛,羊了个羊

前面那篇跑出了STCN&#xff0c;倒是STCN熟悉了很多了 对bitahub&#xff0c;需要注意一个问题 要进ssh请用debug卡&#xff01;&#xff01;&#xff01;&#xff01; 要进ssh请用debug卡&#xff01;&#xff01;&#xff01;&#xff01; 要进ssh请用debug卡&#xff01;&…

数据库文档展示工具

实用工具&#xff1a;数据库文档展示工具 简介 数据库文档展示工具&#xff08;database doc&#xff09;&#xff0c;又叫数据库注释浏览工具&#xff0c;是一个简单的数据库展示各个字段注释的开源工具。在日常开发工作中&#xff0c;您有否这样的体验&#xff1f; 想给前…

干货 | 数字经济创新创业——数字技术创造新经济

下文整理自清华大学大数据能力提升项目能力提升模块课程“Innovation & Entrepreneurship for Digital Economy”&#xff08;数字经济创新创业课程)的精彩内容。主讲嘉宾&#xff1a;Kris Singh: CEO at SRII, Palo Alto, CaliforniaVisiting Professor of Tsinghua Unive…

Elasticsearch 多索引搜索 多条件筛选 去除重复数据

Elasticsearch 多索引搜索 多条件筛选先看结构 分别是索引media_data_es,live_room_essearch_type :dfs_query_then_fetch 不重复复合查询 复合查询就是把一些简单查询组合在一起实现更复杂的查询需求&#xff0c;除此之外&#xff0c;复合查询还可以控制另外一个查询的行为。 …

Spring MVC介绍

Spring MVC 简介什么是Spring MVC了解 MVCMVC 和Spring MVC的联系如何创建一个Spring Web项目在Spring Web 项目中&#xff0c;如何连接Http请求Controller注解可以用其他类注解代替吗连接其他类型的请求如何获取请求中的数据获取单个请求参数获取多个请求参数获取对象获取表单…

高通平台开发系列讲解(DSI篇)DSI层在拨号中的调用逻辑

沉淀、分享、成长,让自己和他人都能有所收获!😄 📢DSI层在拨号中起到的是承上启下的作用。 拨号初始化: 通过mcm_data_init_srv接口调用dsi_init_ex接口,而dsi_init_ex接口进一步通过依次调用dsi_init_cb_func来初始化注册回调、dsi_init_cb_data来初始化数据回调及dsi…

XXL-Job分布式任务调度框架-- 介绍和服务搭建1

一 xxl-job介绍 1.1 xxl-job介绍 xxl-job是轻量级的分布式任务调度框架&#xff0c;目标是开发迅速、简单、清理、易扩展; 老版本是依赖quartz的定时任务触发&#xff0c;在v2.1.0版本开始 移除quartz依 。 分布式任务调度平台XXL-JOB/ 分布式任务调度平台XXL-JOB 二 xxl-…

Prometheus之集成Flink

目录1. 基本介绍2. 拷贝Flink jar包3. 修改Flink的配置文件flink-conf.yaml4. 重启Flink集群5. 验证是否集成成功1. 基本介绍 Flink提供的Metrics可以在Flink内部收集一些指标&#xff0c;通过这些指标让开发人员更 好地查看作业或集群的状态 2. 拷贝Flink jar包 在Flink集群…

[附源码]Node.js计算机毕业设计互联网在线笔记管理系统Express

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

【C++】list的介绍和使用

​&#x1f320; 作者&#xff1a;阿亮joy. &#x1f386;专栏&#xff1a;《吃透西嘎嘎》 &#x1f387; 座右铭&#xff1a;每个优秀的人都有一段沉默的时光&#xff0c;那段时光是付出了很多努力却得不到结果的日子&#xff0c;我们把它叫做扎根 目录&#x1f449;list 的介…

操作系统原理和接口

这个阶段的课程讲授操作系统的原理和Linux系统给应用层提供的C编程接口。操作系统通过系统调用提供的抽象层是一切中间层和应用软件的根本。 课程建设思路-传统误区 长久以来这个阶段的课程是按照《UNIX环境高级编程》这本书进行讲解的。这个环节的课程甚至曾因此被称为"高…

新能源电动汽车充电桩收费平台

安科瑞 华楠 一、业务模式 平台客户 两种合作方式 1&#xff09;数据托管方式 安科瑞指导用户完成充电桩的安装&#xff0c;用户的充电桩将数据上传至安科瑞充电桩收费运营云平台&#xff0c;委托安科瑞管理&#xff0c;按规定/约定收取托管费用。 2&#xff09;用户自建平…

D-028 DDR3硬件电路设计

DDR3硬件电路设计1 简介2 电路设计3 设计要点1 简介 RAM&#xff08;Random Access Memory&#xff09;是随机存储器&#xff0c;存储单元中的内容可以按需任意去除或者存入&#xff0c;并且存取的速度与存储单元的位置无关。这种存储器在断电时&#xff0c;将丢失其存储的内容…

@Pointcut 的 12 种用法

这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注…

数据结构---鸡尾酒排序

鸡尾酒排序是基于冒泡排序的一种升级排序法&#xff08;双向冒泡排序&#xff09;冒泡排序&#xff1a;每一轮都是从左到右来比较元素&#xff0c;进行单向的位置交换的。鸡尾酒排序的元素比较和交换过程是双向的。解决的问题如下&#xff1a;从小到大排序{2,3,4,5,6,7,8,1} 如…