Flink 实时数仓(四)【DWD 层搭建(二)流量域事实表】

news2024/9/20 16:30:40

前言

        昨天刚搬到新校区,新校区小的可怜,好在之后出去实习交通可以方便点;待在学院太受限了,早点离开!

        今天开始完成 DWD 层剩余的需求,上一节我们把日志数据根据不同类型分流写入到了不同的主题;

1、流量域独立访客事务事实表

        独立访客指的其实就是我们 web 端日志分析指标中常说的 UV,上一节我们已经把页面日志写入到 dwd_page_traffic_log 主题当中了,所以这里我们直接对这个主题进行消费处理;

1.1、实现思路

        既然是独立访客,就必须对日志中的数据做去重(独立访客数一般用来做日活指标,因为我们的机器一般都是 24 小时全年无休的,所以我们实时数仓也可以做这种日级别的指标需求,通过状态来存储历史就可以实现),而怎么判断访客是否重复?这就又用到了 Flink 中的状态编程(状态就是历史);和上一节我们判断新老访客一样,我们这里也可以给每个 mid 维护一个名为 lastVisitDate 的 ValueState(对 mid 进行 keyby),存储上一次访问的日期(注意是日期,只精确到天),每来一条数据就判断它的 lastVisitDate:

  • 如果 lastVisitDate 为 null 或者 不是今天,则保留数据,否则丢弃

一旦进入第二天,lastVisitDate 状态就应该被清空(设置状态 TTL 为 1 天)

此外,对于 0 点的数据我们这里需要明确统计规则:

  • 独立访客数据对应的页面必然是会话起始页面,last_page_id 必为 null;所以对于跨天的访问不能计算在内(昨天到今天访问了多个页面,而今天页面的 last_page_id 必然不为 null),我们需要在消费数据后的第一步就需要进行过滤;

1.2、代码实现 

public class DwdTrafficUniqueVisitorDetail {

    public static void main(String[] args) throws Exception {
        // TODO 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 生产环境中设置为kafka主题的分区数

        // 1.1 开启checkpoint
        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次

        // 1.2 设置状态后端
        env.setStateBackend(new HashMapStateBackend());

        // TODO 2. 消费 kafka dwd_traffic_page_log 主题
        String topic = "dwd_traffic_page_log";
        String groupId = "uvDetail";
        DataStreamSource<String> pageDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        // TODO 3. 过滤 last_page_id != Null 的数据
        // 使用 flatMap 而没用 filter,因为 flatMap 可以把过滤和转json 两步都一起完成
        SingleOutputStreamOperator<JSONObject> jsonDS = pageDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSONObject.parseObject(value);
                    // 获取 last_page_id
                    String last_page_id = jsonObject.getJSONObject("page").getString("last_page_id");
                    if (last_page_id == null) {
                        out.collect(jsonObject);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // TODO 4. 按照 mid 分组
        KeyedStream<JSONObject, String> keyedStream = jsonDS.keyBy(json -> json.getJSONObject("common").getString("mid"));

        // TODO 5. 使用状态编程实现按照 mid 的日期进行去重
        // 使用富函数,因为富函数提供更多的信息如上下文等
        SingleOutputStreamOperator<JSONObject> uvDS = keyedStream.filter(new RichFilterFunction<JSONObject>() {
            private ValueState<String> lastVisitDate = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("lastVisit", String.class);
                lastVisitDate = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public boolean filter(JSONObject value) throws Exception {
                // 获取状态数据 & 当前数据中的时间并转为日期
                String lastDate = lastVisitDate.value();
                Long ts = value.getLong("ts");
                String curDate = DateFormatUtil.toDate(ts);
                if (lastDate == null || !lastDate.equals(curDate)) {
                    // 更新状态
                    lastVisitDate.update(curDate);
                    return true;
                }
                return false;
            }
        });

        // TODO 6. 数据写入 kafka
        String targetTopic = "dwd_traffic_unique_visitor_detail";
        uvDS.map(data -> data.toJSONString()).addSink(MyKafkaUtil.getFlinkKafkaProducer(targetTopic));

        // TODO 7. 执行任务
        env.execute("DwdTrafficUniqueVisitorDetail");
    }

}

1.3、TTL 优化

        上面我们的代码逻辑看起来已经没什么问题了,但是我们可以设想:假设一个用户,2024-01-01 首次登录之后,它的 lastVisitDate 状态会一直存储 2024-01-01,如果他下一次登录是在 2024-12-31,那么期间的 364 天我们依然要一直存储它的状态;而我们判断用户是否已经登录的逻辑是:lastVisitDate 是否为null 或者 lastVisitDate<今天,所以我们完全可以在一天之后把该用户的 lastVisitDate 状态清空,来减少状态的保存开销!

TTL 是给状态描述器设置的,而状态描述器是构造状态对象的必须参数!

TTL 是状态的一个属性,当我们修改状态值的时候,TTL 本身并不会更新!这里,我们需要在状态描述器中设置 TTL 的更新策略为创建或更新状态值的时候就更新 TTL ,重新开始过期倒计时;

我们只需要修改上面第 5 步,在初始化状态时,在状态描述器中给状态添加 TTL 属性:

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("lastVisit", String.class);
                // 给状态添加 TTL
                stateDescriptor.enableTimeToLive(new StateTtlConfig.Builder(Time.days(1))
                        // 设置 TTL 可更新,并且在创建或更新状态的时候更新
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build()
                );
                lastVisitDate = getRuntimeContext().getState(stateDescriptor);
            }

2、流量域用户跳出事务事实表

        跳出的概念:跳出指的是用户在一次会话中只访问了一个页面的情况(注意:粒度是会话),我们在之前做离线数仓的时候做过跳出率的指标,对于离线数仓,我们可以在 DWS 层构建一张流量域近1日会话粒度页面浏览表(dws_traffic_session_page_view_1d),通过下面的 SQL 就可以统计出该指标:

SELECT 
CAST(SUM(IF(page_count=1,1,0))/COUNT(*)) AS DECIMAL(16,2) AS bounce_rate
FROM dws_traffic_session_page_view_1d

        在这里的实时数仓中,我们不可能等到一天结束最后才去计算跳出率;但是我们这里又没有 session_id,所以我们只能换一种思路:

思路1(会话窗口)

  • 使用会话窗口,为每个 mid 开启一个会话窗口并指定间隔为 10 s;一旦到了 10s 触发窗口关闭,计算窗口内的数据条数,> 1 条则说明这次会话没有发生跳出;

        这种思路的问题很明显:① 如果我短时间(10s内)发生多个跳出,但是正好这些跳出都在一个会话,这会导致窗口结束时误以为这不是跳出,毕竟窗口内有多条数据;② 可能我的一次正常的会话,被会话窗口切分到两个不同的会话窗口,结果把一个非跳出访问计算为 2 个跳出访问;

思路2(状态编程)

        在离线数仓中,当我们没有 session_id 时,我们可以一天的数据按照 mid 进行分组,然后根据时间戳字段进行排序,这样来计算一个 session;但是这里是实时数仓,我们不知道什么时候一个 session 会结束,所以我们可以设置一个定时器,定时器时间范围内的数据如果没数据来就视作一个会话结束,触发计算;并结合状态编程,把新会话的首页存入状态

  • 遇到 last_page 为 null 的数据就试着取出状态
    • 如果状态为 null,则该页面是新的会话起始页,开启定时器将数据自身写入状态
    • 如果状态不为 null,说明刚跳出一次,并且在定时器时间范围内又进来一次;这种情况需要将第一条数据(跳出的数据,也就是写入状态中的数据)输出,然后将自身写入状态,定时器依然存在,等时间到了触发计算
  • 如果 last_page 不为 null,则状态中的数据和该条数据都丢弃

这种思路同样存在问题,当数据是乱序的时候一切都就乱套了;

思路3(Flink CEP)

Flink CEP 其实就是使用 状态编程 + within 开窗 来处理这种复杂事件

Flink CEP 定义的规则之间的连续策略

  • 严格连续: 期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。对应方法为 next()
  • 松散连续: 忽略匹配的事件之间的不匹配的事件。对应方法为followedBy();
  • 不确定的松散连续: 更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。对应方法为followedByAny()

定义模式之前的代码

        这里需要注意:因为我们后面要保证数据有序,所以我们最好指定事件时间的提取字段,并添加水位线设置合理的超时时间(理论上可以保证数据绝对有序):

public class DwdTrafficUserJumpDetail {

    public static void main(String[] args) {
        // TODO 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 生产环境中设置为kafka主题的分区数

        // 1.1 开启checkpoint
        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次

        // 1.2 设置状态后端
        env.setStateBackend(new HashMapStateBackend());

        // TODO 2. 消费 kafka dwd_traffic_page_log 主题
        String topic = "dwd_traffic_page_log";
        String groupId = "user_jump_detail";
        DataStreamSource<String> pageDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        // TODO 3. 将数据转为 JSON
        SingleOutputStreamOperator<JSONObject> jsonDS = pageDS.map(JSON::parseObject);

        // TODO 4. 提取事件时间 & 按照 mid 分组
        KeyedStream<JSONObject, String> keyedStream = jsonDS
                .assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
                        @Override
                        public long extractTimestamp(JSONObject element, long recordTimestamp) {
                            return element.getLong("ts");
                        }
                    }))
                .keyBy(json -> json.getJSONObject("common").getString("mid"));

接下来是核心的定义 CEP 模式的代码:

        // TODO 5. 定义 CEP 模式序列
        // 泛型方法类型指的是流的类型(下面的 start 和 next 作为提取事件的 key)
        Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                return value.getJSONObject("page").getString("last_page_id") == null;
            }
        }).next("next").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                return value.getJSONObject("page").getString("last_page_id") == null;
            }
        }).within(Time.seconds(10L));

        // 等价于 循环模式 共用一个 key: start 
        Pattern.<JSONObject>begin("start").where(new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject value) throws Exception {
                return value.getJSONObject("page").getString("last_page_id") == null;
            }
        })
                .times(2) // 默认是宽松近邻 followedBy
                .consecutive() // 严格近邻 next
                .within(Time.seconds(10L));

        // TODO 6. 建模式序列作用到流上
        PatternStream<JSONObject> patternStream = CEP.pattern(keyedStream, pattern);

        // TODO 7. 提取事件(匹配上的时间 和 超时时间)
        OutputTag<String> timeoutTag = new OutputTag<>("timeout");
        SingleOutputStreamOperator<String> selectDS = patternStream.select(timeoutTag,
                // 超时数据
                new PatternTimeoutFunction<JSONObject, String>() {
                    // 对于超时数据来说,当前的数据第一个规则匹配上了,第二个没有匹配上导致超时,那么我们要提取的就是当前数据(第一个数据,第二个数据没来)
                    // 这里的 Map 的 v 是 List 数据类型,因为考虑到我们可能使用的是循环模式(只有一个key)
                    @Override
                    public String timeout(Map<String, List<JSONObject>> map, long l) throws Exception {
                        return map.get("start").get(0).toJSONString();
                    }
                }, 
                    // 匹配上的数据
                    new PatternSelectFunction<JSONObject, String>() {
                    // 匹配上的数据,我们只要第一个数据,因为只能证明第一个数据是跳出数据
                    @Override
                    public String select(Map<String, List<JSONObject>> map) throws Exception {
                        return map.get("start").get(0).toJSONString();
                    }
                });
        DataStream<String> timeoutDS = selectDS.getSideOutput(timeoutTag);

        // TODO 8. 合并两种事件
        DataStream<String> unionDS = selectDS.union(timeoutDS);

        // TODO 9. 合并后的数据写入 kafka
        String targetTopic = "dwd_traffic_user_jump_detail";
        unionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(targetTopic));

        // TODO 10. 启动任务
        env.execute("DwdTrafficUserJumpDetail");
    }
}

上面我们定义了两种匹配规则:

  1. 第一条数据的 last_page_id 为 null ,且超时没有收到第二条数据,认定该条数据为跳出数据
  2. 第二条数据的 last_page_id 为 null ,则认定第一条数据是跳出数据

        超时时间内规则一被满足,未等到第二条数据则会被判定为超时数据。所以我们只要把超时数据和 满足连续两条数据的 last_page_id 均为 null 中的第一条数据 union 起来,得到的即为答案所需数据;

总结

        至此,流量域的三个需求都已经完成;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1972963.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LangChain +Streamlit+ Llama :将对话式人工智能引入您的本地设备成为可能(上篇)

&#x1f99c;️ LangChain Streamlit&#x1f525; Llama &#x1f999;&#xff1a;将对话式人工智能引入您的本地设备&#x1f92f; 将开源LLMs和LangChain集成以进行免费生成式问答&#xff08;不需要API密钥&#xff09; 在过去的几个月中&#xff0c;大型语言模型(LLMs)得…

云计算学习——5G网络技术

系列文章目录 提示&#xff1a;仅用于个人学习&#xff0c;进行查漏补缺使用。 Day1 网络参考模型 Day2 网络综合布线与应用 Day3 IP地址 Day4 华为eNSP网络设备模拟器的基础安装及简单使用 Day5 交换机的基本原理与配置 Day6 路由器的原理与配置 Day7 网络层协议介绍一 Day8 …

【Pyhton】数据类型之详讲字符串(上)

本篇文章将详细讲解字符串&#xff1a; 1、定义 定义字符串时&#xff0c;字符串的内容被双引号&#xff0c;单引号&#xff0c;三单引号&#xff0c;三双引号中的其中一个被括住。 例如&#xff1a; 双引号&#xff1a; v1"haha" 单引号&#xff1a; v1hahah…

数据结构-1.初始包装类与泛型

本节目标:学习包装类与泛型为阅读java集合源码打下基础. 1.包装类 在 Java 中&#xff0c;由于基本类型不是继承自 Object &#xff0c;为了在泛型代码中可以支持基本类型&#xff0c; Java 给每个基本类型都对应了一个包装类型. 1.1基本数据类型对应包装类 基本数据类型包装类…

编码的理解

人类的文字信息由各种各样的符号构成 文字-----符号 这些符号是不能在磁盘或内存当中存储的&#xff0c;计算机最早是由老美发明出来的&#xff0c;内存和磁盘当中只有 0&#xff0c;1&#xff0c;严格来说内存和磁盘中只能表示整型&#xff0c;那么我们应该怎么表示符号----…

linux进程控制——进程替换——exec函数接口

前言&#xff1a; 本节内容进入linux进程控制板块的最后一个知识点——进程替换。 通过本板块的学习&#xff0c; 我们了解了进程的基本控制方法——进程创建&#xff0c; 进程退出&#xff0c; 进程终止&#xff0c; 进程替换。 进程控制章节和上一节进程概念板块都是在谈进程…

智算新风向丨趋动科技获中国信通院泰尔实验室首张智算资源池化能力泰尔测评证书

近日&#xff0c;趋动科技“OrionX AI算力资源池化软件”经中国泰尔实验室依据《FG-Z14-0172-01智算资源池化平台测试方案》评估测试&#xff0c;获得智算资源池化能力泰尔测评证书&#xff0c;成为该领域首个完成此评价的产品。 图1.OrionX通过智算资源池化平台评测 随着AI大…

智能巡检企业级域名 SSL 证书

一、概述 SSL 证书是一种数字证书&#xff0c;用于在用户和服务器之间建立加密链接&#xff0c;确保数据传输的安全性&#xff0c;防止数据在传输过程中被截获或篡改。SSL 证书不仅保护了数据传输过程中的隐私和完整性&#xff0c;还可以帮助验证网站的身份&#xff0c;防止钓…

基于SpringBoot+Vue的科研管理系统(带1w+文档)

基于SpringBootVue的科研管理系统(带1w文档) 基于SpringBootVue的科研管理系统(带1w文档) 科研的管理系统设计过程中采用Java开发语言,B/S结构&#xff0c;采取springboot框架&#xff0c;并以MySql为数据库进行开发。结合以上技术&#xff0c;对本系统的整体、数据库、功能模块…

鸿蒙系统开发【加解密算法库框架】安全

加解密算法库框架 介绍 本示例使用ohos.security.cryptoFramework相关接口实现了对文本文件的加解密、签名验签操作。 实现场景如下&#xff1a; 1&#xff09;软件需要加密存储本地文本文件&#xff0c;需要调用加解密算法库框架选择密钥文本文件&#xff0c;对本地文本文…

PostgreSQL11 | 触发器

本文章代码已在pgsql11.22版本上运行且通过&#xff0c;展示页由pgAdmin8.4版本提供 上一篇总结了原著的第十章有关pgsql的视图的用法&#xff0c;本篇将总结pgsql的触发器的用法。 触发器 使用触发器可以自动化完成一些在插入数据或修改数据时&#xff0c;某些需要同期同步的…

【vulnhub】Depth靶机

靶机安装 下载地址&#xff1a;https://download.vulnhub.com/depth/DepthB2R.ova 运行环境&#xff1a;VMware运行该虚拟机 信息收集 靶机IP发现 nmap 192.168.93.0/24 端口扫描 nmap -A 192.168.93.154 -p- 只开起了8080端口 进行目录扫描8080端口 dirsearch -u http…

学习记录(9):Prompt提示词技巧

依旧照例先感谢前辈们的慷慨分享 今天学习的是这篇文章↓ 原文&#xff1a;转自公主号“博金斯的AI笔记” —《4篇Prompt论文的提示词技巧, 3 个 GPTs 实例测试》 文章目录 一、提示词框架二、逻辑链&#xff08;Chain of thought&#xff09;三、思维树&#xff08;Tree of th…

华为Atlas 500 智能小站+鸿蒙AI大算力边缘计算解决方案

Atlas 500 智能小站鸿蒙大算力解决方案 技术规格 表3-1 技术规格 项目 说明 处理器 海思Hi3559A处理器 双核ARM Cortex A731.6GHz&#xff0c;32KB I-Cache&#xff0c;64KB D-Cache /512KB L2 cache双核ARM Cortex A531.2GHz&#xff0c;32KB I-Cache&#xff0c;32KB D-C…

如何确保PLC系统安全的可靠性,这几个注意事项你需要牢记

PLC&#xff08;可编程逻辑控制器&#xff09;是现代工业自动化系统中的关键组成部分。在设计 PLC 系统时&#xff0c;安全性是至关重要的考虑因素。本文将介绍 PLC 系统设计中的一些安全注意事项&#xff0c;包括电源设计、接地设计、关键数字量输入输出设计和报警设计。 一.…

阿一网络安全学院之绕过CDN发现服务器真实IP

方法一&#xff1a; 探测次级域名 方法二&#xff1a; 访问文件 使用访问错误目录获得报错信息成功 方法三&#xff1a; 去掉 www 查询 方法四&#xff1a; 使用网络测绘搜索引擎查询 和方法二获得的 IP 一样 方法五&#xff1a; 使用外国节点查询&#xff1a; 用了无数…

关于使用php的mpdf插件遇到的一些问题

一.插件版本 "mpdf/mpdf": "^8.0", 二.报错&#xff1a;Undefined index: list_style_type 这个是插件无法识别 li 标签导致&#xff0c;生成pdf是加入下面代码 <style> li { list-style-type: none; list-style-image: none; list-style-positi…

HTML-03.新浪新闻-标题-样式2

1.<span>标签 <span>是一个在开发网页时大量使用的没有语义的布局标签 特点&#xff1a;一行可以显示多个&#xff08;行内组合元素&#xff09;&#xff0c;宽度和高度默认由内容撑开 2.CSS选择器 2.1 元素选择器&#xff1a;标签名{...} 元素名称{ color:red; }…

如何在 Debian 上安装运行极狐GitLab Runner?【一】

极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门面向中国程序员和企业提供企业级一体化 DevOps 平台&#xff0c;用来帮助用户实现需求管理、源代码托管、CI/CD、安全合规&#xff0c;而且所有的操作都是在一个平台上进行&#xff0c;省事省心省钱。可以一键安装极狐GitL…

04-Fastjson反序列化漏洞

免责声明 本文仅限于学习讨论与技术知识的分享&#xff0c;不得违反当地国家的法律法规。对于传播、利用文章中提供的信息而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;本文作者不为此承担任何责任&#xff0c;一旦造成后果请自行承担&…