【实时数仓】DWM层跳出明细计算之需求分析、读取数据、通过Flink的CEP完成跳出判断、写回kafka、测试

news2025/1/25 4:44:49

文章目录

  • 一 DWM层-跳出明细计算
    • 1 需求分析与思路
      • (1)什么是跳出
      • (2)计算跳出行为的思路
      • (3)实现思路
    • 2 读取数据
      • (1)代码编写
      • (2)测试
    • 3 通过Flink的CEP完成跳出判断
      • (1)确认添加了CEP的依赖包
      • (2)设定时间语义为事件时间并指定数据中的ts字段为事件时间
      • (3)根据日志数据的mid进行分组
      • (4)配置CEP表达式
      • (5)根据表达式筛选流
      • (6)提取命中的数据
        • a 从模式中提取
        • b 处理超时的部分匹配
        • c 便捷的API
      • (7)利用测试数据完成测试
    • 4 写回kafka
    • 5 测试

一 DWM层-跳出明细计算

1 需求分析与思路

(1)什么是跳出

跳出: 用户成功访问了网站的一个页面后就退出,不再继续访问网站的其它页面。而跳出率就是用跳出次数除以访问次数。

关注跳出率,可以看出从某几个网站引流过来的访客是否能很快的被吸引,渠道引流过来的用户之间的质量对比,对于应用优化前后跳出率的对比也能看出优化改进的成果。

(2)计算跳出行为的思路

首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么要抓住几个特征:

  • 该页面是用户近期访问的第一个页面(新的会话)

    这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。

  • 首次访问之后很长一段时间(自己设定,一般为30min),用户没继续再有其他页面的访问。

这第一个特征的识别很简单,保留last_page_id为空的就可以了。但是第二个访问的判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。更麻烦的它并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的组合行为呢?

最简单的办法就是Flink自带的复杂事件处理(CEP)技术。CEP非常适合通过多条数据组合来识别某个事件。

用户跳出事件,本质上就是一个条件事件加一个超时事件的组合。

(3)实现思路

实现思路如下:

在这里插入图片描述

  • 从kafka中读取数据。
  • 使用CEP对数据进行过滤。
    • 该页面是用户近期访问的第一个页面。
    • 如果在指定时间内,有当前设备对网站其他页面的访问,说明发生了跳转。
    • 反之,则发生了跳出。
    • 可以使用within指定匹配的时间;涉及到了时间,flink1.12默认的时间语义就是事件时间语义,需要指定watermark以及提取事件时间字段。
  • 使用CEP编程步骤
    • 定义pattern
    • 将pattern应用到流上
    • 从流中按照指定的模式提取数据

在这里插入图片描述

2 读取数据

从kafka的dwd_page_log主题中读取页面日志。

(1)代码编写

/**
 * 用户跳出明细统计
 */
public class UserJumpDetailAPP {
    public static void main(String[] args) throws Exception {
        //TODO 1 基本环境准备
        //1.1 流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        //TODO 2 检查点设置(略)

        //TODO 3 从kafka中读取数据
        //3.1 声明消费主题以及消费者组
        String topic = "dwd_page_log";
        String groupId = "user_jump_detail_app_group";
        //3.2 获取kafka消费者对象
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        //3.3 读取数据封装流
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 4 对读取的数据进行类型转换 String -> JSONObject
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);
        jsonObjDS.print(">>>");

        env.execute();
    }
}

(2)测试

启动相关进程,模拟日志生成,查看是否可以正常接收到数据。

3 通过Flink的CEP完成跳出判断

(1)确认添加了CEP的依赖包

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

(2)设定时间语义为事件时间并指定数据中的ts字段为事件时间

由于这里涉及到时间的判断,所以必须设定数据流的EventTime和水位线。这里没有设置延迟时间,实际生产情况可以视乱序情况增加一些延迟。

增加延迟把forMonotonousTimestamps换为forBoundedOutOfOrderness即可。

注意:flink1.12默认的时间语义就是事件时间,所以不需要执行。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// TODO 5.指定watermark以及提取事件时间字段
SingleOutputStreamOperator<JSONObject> jsonObjWithWatermark = jsonObjDS.assignTimestampsAndWatermarks(
        WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
                .withTimestampAssigner(
                        new SerializableTimestampAssigner<JSONObject>() {
                            @Override
                            public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
                                return jsonObj.getLong("ts");
                            }
                        }
                )
);

(3)根据日志数据的mid进行分组

因为用户的行为都是要基于相同的Mid的行为进行判断,所以要根据Mid进行分组。

// TODO 6 按照mid进行分组
KeyedStream<JSONObject, String> keyedDS = jsonObjWithWatermark.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));

(4)配置CEP表达式

跳出 or 跳转的3个条件

  • 必须是一个新会话,lastPageId为空
  • 访问了其他页面
  • 不能超过一定时间
// TODO 7 定义pattern
Pattern<JSONObject, JSONObject> pattern = Pattern.<JSONObject>begin("first").where(
        // 条件1:开启一个新的会话访问
        new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject jsonObj) throws Exception {
                String lastPageId = jsonObj.getJSONObject("page").getString("last_page_id");
                if (lastPageId == null || lastPageId.length() == 0) {
                    return true;
                }
                return false;
            }
        }
).next("second").where(
        // 条件2:访问了网站的其他页面
        new SimpleCondition<JSONObject>() {
            @Override
            public boolean filter(JSONObject jsonObj) throws Exception {
                String pageId = jsonObj.getJSONObject("page").getString("page_id");
                if (pageId != null && pageId.length() > 0) {
                    return true;
                }
                return false;
            }
        }
)
        // within方法:定文匹配模式的事件序列出现的最大时间间隔。
        // 如果未完成的事件序列超过了这个事件,就会被丢弃:
        .within(Time.seconds(10));

(5)根据表达式筛选流

// TODO 8 将pattern应用到流上
PatternStream<JSONObject> patternDS = CEP.pattern(keyedDS, pattern);

(6)提取命中的数据

a 从模式中提取

在获得到一个PatternStream之后,可以应用各种转换来发现事件序列。推荐使用PatternProcessFunction

PatternProcessFunction有一个processMatch的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List<IN>>的格式接收一个匹配,映射的键是模式序列中的每个模式的名称,值是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()times())时, 对一个模式会有不止一个事件被接受。

官网说明:从模式中提取。

b 处理超时的部分匹配

当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction里另外实现这个接口。 TimedOutPartialMatchHandler提供了另外的processTimedOutMatch方法,这个方法对每个超时的部分匹配都会调用。

c 便捷的API

前面提到的PatternProcessFunction是在Flink 1.8之后引入的,从那之后推荐使用这个接口来处理匹配到的结果。 用户仍然可以使用像select/flatSelect这样旧格式的API,它们会在内部被转换为PatternProcessFunction

使用方法:

  • 设定超时时间标识 timeoutTag。
  • flatSelect方法中,实现PatternFlatTimeoutFunction中的timeout方法。
  • 所有out.collect的数据都被打上了超时标记。
  • 本身的flatSelect方法因为不需要未超时的数据所以不接受数据。
  • 通过SideOutput侧输出流输出超时数据。
        // TODO 9 从流中提取数据
        //9.1 定义侧输出流标记,FlinkCDC会将超时数据匹配放到侧数据流中
        OutputTag<String> timeoutTag = new OutputTag<>("timeoutTag");
        // 9.2 提取数据
//        patternDS.select(
//                timeoutTag,
//                new PatternTimeoutFunction<JSONObject, String>() {
//                    @Override
//                    public String timeout(Map<String, List<JSONObject>> pattern, long timestamp) throws Exception {
//                        return null;
//                    }
//                },
//                new PatternSelectFunction<JSONObject, String>() {
//                    @Override
//                    public String select(Map<String, List<JSONObject>> map) throws Exception {
//                        return null;
//                    }
//                }
//        );

        SingleOutputStreamOperator<String> resDS = patternDS.flatSelect(
                timeoutTag,
                // 处理超时数据 -- 跳出,需要进行统计
                new PatternFlatTimeoutFunction<JSONObject, String>() {
                    @Override
                    public void timeout(Map<String, List<JSONObject>> pattern, long timestamp, Collector<String> out) throws Exception {
                        List<JSONObject> jsonObjectList = pattern.get("first");
                        for (JSONObject jsonObj : jsonObjectList) {
                            out.collect(jsonObj.toJSONString());
                        }
                    }
                },
                // 处理完全匹配的数据 -- 跳转,不在此需求统计范围之内
                new PatternFlatSelectFunction<JSONObject, String>() {
                    @Override
                    public void flatSelect(Map<String, List<JSONObject>> map, Collector<String> collector) throws Exception {

                    }
                }
        );

        // 9.3 从侧输出流中获取超时数据(跳出)
        DataStream<String> jumpDS = resDS.getSideOutput(timeoutTag);
        
        jumpDS.print(">>>>");

(7)利用测试数据完成测试

{common:{mid:101},page:{page_id:home},ts:10000} ,
{common:{mid:102},page:{page_id:home},ts:12000},
{common:{mid:102},page:{page_id:good_list,last_page_id:home},ts:15000},
{common:{mid:102},page:{page_id:good_list,last_page_id:detail},ts:30000} 
        //3.3 读取数据封装流
//        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        DataStream<String> kafkaDS = env
                .fromElements(
                        "{\"common\":{\"mid\":\"101\"},\"page\":{\"page_id\":\"home\"},\"ts\":10000} ",
                        "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"home\"},\"ts\":12000}",
                        "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                                "\"home\"},\"ts\":15000} ",
                        "{\"common\":{\"mid\":\"102\"},\"page\":{\"page_id\":\"good_list\",\"last_page_id\":" +
                                "\"detail\"},\"ts\":30000} "
                );

输出结果:

>>>>:3> {"common":{"mid":"101"},"page":{"page_id":"home"},"ts":10000}

4 写回kafka

将跳出数据写回到kafka的DWM层。

// TODO 10 将跳出明细写到kafka的dwm层主题
jumpDS.addSink(MyKafkaUtil.getKafkaSink("dwm_user_jump_detail"));

5 测试

将测试数据注释掉,打开kafka数据源。

打开zookeeper、kafka、日志采集服务、kafka消费者、BaseLogApp、UserJumpDetailAPP,查看输出结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/98247.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【MATLAB100个实用小技巧】——数值分析(85-100)

文章目录前言系列文章85.86. 三次样条插值法87. NEWTON 插值88. hermite 插值89. newton 形式的 hermite 插值90. 平方根法91. gauss 消去法92. 三角分解法93. jacobi 迭代法94. gauss 迭代法95. sor 迭代法96. 最速下降法97. 共额梯度法98. newton 迭代法99. broyden 迭代法10…

前端媒体查询@media示例详解和calc()函数的使用

媒体查询media media 可以针对不同的屏幕尺寸设置不同的样式&#xff0c;特别是如果需要设置设计响应式的页面&#xff0c;media 是非常有用的。当重置浏览器大小的过程中&#xff0c;页面也会根据浏览器的宽度和高度重新渲染页面。 eg&#xff1a;如果文档宽度小于 500 像素…

pytorch 自编码器实现图像的降噪

自编码器 自动编码器是一种无监督的深度学习算法&#xff0c;它学习输入数据的编码表示&#xff0c;然后重新构造与输出相同的输入。它由编码器和解码器两个网络组成。编码器将高维输入压缩成低维潜在(也称为潜在代码或编码空间) &#xff0c;以从中提取最相关的信息&#xff…

SpringCloud之Hystrix

复杂分布式体系结构中的应用程序有数十个依赖关系&#xff0c;每个依赖关系在某些时候将不可避免失败&#xff01; 服务雪崩 多个微服务之间调用的时候&#xff0c;假设微服务A调用微服务B和微服务C&#xff0c;微服务B和微服务C又调用其他的微服务&#xff0c;这就是所谓的“…

HttpRunner3.x 安装与使用

HttpRunner3.x 安装与使用HttpRunner3.x 安装与使用安装使用运行脚手架项目方式一&#xff1a;录制生成用例步骤1&#xff1a;导出har文件步骤2&#xff1a;转化成测试用例文件har2casehmake步骤3&#xff1a;执行测试用例方式二&#xff1a;手工编写测试用例HttpRunner3.x 安装…

Jenkins自动部署springboot的Docker镜像,解决Status [1]问题

Jenkins凡是要指定路径的命令&#xff0c;一定要写绝对路径&#xff0c;不能写相对路径&#xff01;不要以为配置了Remote directory&#xff0c;那么命令就会在Remote directory下执行&#xff01;这种想法是错误的&#xff01;&#xff01;&#xff01; 《jenkins自动化发布到…

机器学习实战教程(五):朴素贝叶斯实战篇

一、前言 上篇文章机器学习实战教程&#xff08;四&#xff09;&#xff1a;朴素贝叶斯基础篇_M_Q_T的博客-CSDN博客讲解了朴素贝叶斯的基础知识。本篇文章将在此基础上进行扩展&#xff0c;你将看到以下内容&#xff1a; 拉普拉斯平滑垃圾邮件过滤(Python3)新浪新闻分类(skle…

毕业设计 - 基于Java的敬老院管理系统设计与实现【源码+论文】

文章目录前言一、项目设计1. 模块设计系统的主要功能性需求2. 实现效果二、部分源码项目源码前言 今天学长向大家分享一个 java web项目: 基于Java的敬老院管理系统设计与实现 一、项目设计 1. 模块设计 站在护工角度来看&#xff0c;他们迫切希望&#xff0c;在运用该系统…

【Flink】Flink Starting Offset 启动消费位置 指定时间消费

文章目录 1.概述2.测试3.源码1.概述 首先参考文章:【Flink】Flink 1.14.0 全新的 Kafka Connector Kafka Source 能够通过指定 OffsetsInitializer来消费从不同偏移量开始的消息。内置的初始值设定项包括: KafkaSource.builder()// Start from committed offset of the co…

【YOLOv7-环境搭建③】PyCharm安装和环境、解释器配置

下载链接&#xff1a; 来源&#xff1a;&#xff08;博主&#xff09;唐三. 链接:https://pan.baidu.com/s/1y6s_EScOqvraFcx7iPSy1g 提取码:m1oa 安装&#xff1a; 以管理员身份打开安装完成后&#xff0c;打开软件到达以下界面&#xff0c;框框全选到达以下界面&#xf…

【指纹识别】指纹识别匹配门禁系统【含GUI Matlab源码 587期】

⛄一、指纹识别简介 1 指纹识别的引入和原理 1.1 指纹的基本知识 指纹&#xff0c;由于其具有终身不变性、唯一性和方便性&#xff0c;已几乎成为生物特征识别的代名词。指纹是指人的手指末端正面皮肤上凸凹不平产生的纹线。纹线有规律的排列形成不同的纹型。纹线的起点、终点…

kotlin基础学习笔记第九章——泛型

实化类型参数允许你在运行时的内联函数中引用作为类型实参的具体类型&#xff08;对普通的类和函数来说&#xff0c;这样行不通&#xff0c;因为类型实参在运行时会被擦除&#xff09;。 声明点变型可以说明一个带类型参数的泛型类型&#xff0c;是否是另一个泛型类型的子类型或…

什么是MySQL插入意向锁?

Insert Intention Lock&#xff0c;中文我们也称之为插入意向锁。 这个可以算是对我们之前所讲的 Gap Lock 的一个补充&#xff0c;关于 Gap Lock&#xff0c;如果还有小伙伴不懂&#xff0c;可以参考&#xff1a;记录锁、间隙锁与 Next-Key Locks。 1. 为什么需要插入意向锁…

吃透Chisel语言.40.Chisel实战之单周期RISC-V处理器实现(下)——具体实现和最终测试

Chisel实战之单周期RISC-V处理器实现&#xff08;下&#xff09;——具体实现和最终测试 上一篇文章中我们对本项目的需求进行了分析&#xff0c;并得到了初步的设计&#xff0c;这一篇文章我们就可以基于该设计来实现我们的单周期RISC-V处理器了。实现之后也必须用实际代码来…

[ 数据结构 -- 手撕排序算法第三篇 ] 希尔排序

文章目录前言一、常见的排序算法二、希尔排序2.1 希尔排序(缩小增量排序)2.1.1 预排序阶段2.1.2 插入排序阶段2.2 单趟希尔排序2.2.1 思路分析三、希尔排序实现代码四、希尔排序的时间复杂度五、希尔排序和直接插入排序效率测试5.1 测试5.2 结论5.2.1 随机数比较5.2.2 有序数组…

【二维码识别】灰度+二值化+校正二维码生成与识别【含GUI Matlab源码 635期】

⛄一、二维码生成与识别简介 如今,移动互联网技术日新月异,随着5G时代的来临,广泛应用于数据处理过程中的二维码信息安全日益成为人们越来越关注的问题。以QR码为代表的二维码,以其在信息存储、传输和识别技术领域优异的表现,成为信息共享、移动支付等领域的宠儿。不可避免地,…

利用深度学习生成数据的时间序列预测(Matlab代码实现)

目录 &#x1f4a5;1 概述 &#x1f4da;2 运行结果 &#x1f389;3 参考文献 &#x1f468;‍&#x1f4bb;4 Matlab代码 &#x1f4a5;1 概述 数据分析研究目前仍是行业热点,相关学者从数据分析关键技术中的异常检测、入侵检测、时间序列预测等角度展开研究。然而,现有研…

Go环境搭建与IDE开发工具配置

安装Go语言编译器 Go语言编译器》编译器将源代码编译为可执行程序》源代码程序员使用高级语言所书写的代码文件》高级语言c/c/go…》机器语言0和1构成&#xff0c;机器能直接识别》汇编语言比机器语言稍微可读一点点的指令集 编译器下载地址 根据系统下载对应的go编译器版本…

微信小程序保存相册授权全过程:第一次授权、已授权、拒绝后再授权

微信小程序部分功能需要使用授权&#xff08;也就是需要用户显式同意&#xff0c;系统会阻止开发者任何静默获取授权行为&#xff09;&#xff0c;以存储相册为例&#xff0c;用户需要获得"scope.writePhotosAlbum"权限 微信系统接口wx.getSetting可以获取已经获得的…

MySQL连接数据库

①MySQLpymysql ②django开发操作数据库&#xff0c;orm框架 安装第三方模块&#xff1a;orm pip install mysqlclient ORM Django链接数据库 在settings.py中修改 查看创建的数据库的端口号和用户名&#xff1a; Django操作表&#xff1a; 创建表 models.py from django…