Spark 分析计算连续三周登录的用户数

news2024/9/29 3:29:34

前言:本文用到了窗口函数 range between,可以参考这篇博客进行了解——窗口函数rows between 、range between的使用

创建数据环境

在 MySQL 中创建数据测试表 log_data

create table if not exists  log_data(
log_id varchar(200) comment '日志id',
user_id  varchar(200) comment '用户id', 
log_time datetime NULL DEFAULT NULL comment '登录时间');

插入数据:

insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('1', '2022-03-10 10:08:13', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('2', '2022-03-18 10:33:22', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('3', '2022-03-26 18:59:19', '1000');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('4', '2022-03-03 20:59:13', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('5', '2022-03-10 05:53:49', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('6', '2022-02-26 02:27:51', '1001');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('7', '2022-03-01 20:59:13', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('8', '2022-03-07 05:53:49', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('9', '2022-02-28 02:27:51', '1002');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('10', '2022-02-27 20:59:13', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('11', '2022-03-05 05:53:49', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('12', '2022-03-12 02:27:51', '1003');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('13', '2022-02-28 20:59:13', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('14', '2022-03-05 05:53:49', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('15', '2022-03-18 02:27:51', '1004');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('16', '2022-02-25 20:59:13', '1005');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('17', '2022-03-04 05:53:49', '1005');
insert into `log_data` (`log_id`, `log_time`, `user_id`) values ('18', '2022-03-11 02:27:51', '1005');

需求介绍

login_time 字段值为 2022-03-10 的最近连续三周登录的用户数,最终使用 Spark 中的 show 算子输出如下字段:

字段名介绍备注
end_date数据统计日期2022-03-10
active_total活跃用户数
date_range统计周期格式:统计开始时间_结束时间

需求分析

根据 login_time 字段值为 2022-03-10 的最近连续三周登录的用户数,想要完成这个需求,我们得先拿到最近三周的开始时间和结束时间,然后筛选范围数据,最后利用窗口函数计算其连续性。

  1. 确定当前 2022-03-10 是周几,然后求得周日的日期(也就是这三周的最后一天)。
  2. 拿到 2022-03-10 这周的周日时间后,获取两周前的开始日期(也就是这三周的第一天)。
  3. 筛选范围,计算每位用户是否符合三周的连续性。

需求实现

import org.apache.spark.sql.SparkSession

object Get3WeekUserCnt {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("Get3WeekUserCnt").master("local[*]").getOrCreate()

    // 1.读取 MySQL 数据
    spark.read.format("jdbc")
      .option("driver","com.mysql.jdbc.Driver")
      .option("url","jdbc:mysql://master:3306/test")
      .option("user","root")
      .option("password","123456")
      .option("dbtable","log_data")
      .load()
      .createTempView("data")

    // 2.获取 2022-03-10 这周的周日时间
    spark.sql(
      """
        |select
        |   user_id,
        |   log_time, 
        |   date_add("2022-03-10",if(pmod(datediff("2022-03-10","1970-01-01")-3,7)=0,0,7-pmod(datediff("2022-03-10","1970-01-01")-3,7))) date_end
        |from
        |   data
        |""".stripMargin).createTempView("first_data")

    // 3.获取两周前的日期,筛选符合要求的数据
    spark.sql(
      """
        |select
        |   *
        |from
        |   (select
        |      user_id,
        |      date(log_time) log_date,
        |      date_end,
        |      date_sub(date_end,20) date_begin
        |   from
        |      first_data)t1
        |where
        |   log_date <= date_end
        |   and
        |   log_date >= date_begin
        |""".stripMargin).createTempView("second_data")

    // 4.计算各个用户三周连续性
    spark.sql(
      """
        |select
        |   end_date,
        |   count(distinct user_id) active_total,
        |   date_range
        |from
        |   (select
        |      "2022-03-10" end_date,
        |       user_id,
        |       concat(date_begin,"_",date_end) date_range,
        |       count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between current row and 6*3600*24 following) cnt_1week,
        |       count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between 7*3600*24 following and 13*3600*24 following) cnt_2week,
        |       count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between 14*3600*24 following and 20*3600*24 following) cnt_3week
        |   from
        |      second_data)t1
        |where
        |   cnt_1week >= 1
        |   and
        |   cnt_2week >= 1
        |   and
        |   cnt_3week >= 1
        |group by
        |   end_date,
        |   date_range
        |""".stripMargin).show()
    
    spark.stop()
    
  }

}

输出结果如下:

结果验证:

输出结果有误!

通过对日历与日期进行对比,我们可以发现,实际上一共有3位用户满足连续登录三个月的请求。那么这是什么原因呢?首先,我们主要来看一下日期计算的这部分代码:

count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between current row and 6*3600*24 following) cnt_1week,

count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between 7*3600*24 following and 13*3600*24 following) cnt_2week,

count(user_id) over(partition by user_id order by unix_timestamp(log_date,"yyyy-MM-dd") range between 14*3600*24 following and 20*3600*24 following) cnt_3week

其中,第一行 cnt_1week 统计的是各个用户第一周登录的次数,后面两行以此类推。我们将用户 1001 代入其中,很快就发现了问题,如下:

我们设置的时间跨度为一周,但是没有考虑到该日期为本周的第几天。通过用户 1001 可以看出,如果是这样算的话,就相当于我们把每个日期都当成了每周的第一天。就比如我们这里将 26 号作为了第一天,而六天后,也就是3月4号,变成了这周的最后一天。它将2.26号——3.4号作为了一个自然周。很明显,我们这样计算是错误的。

解决方法也很简单,我们可以通过一个字段来记录每个用户登录日期的那一周的周天日期,然后在日期计算的时候将该字段作为计算的考量日期。

修改后代码如下:

import org.apache.spark.sql.SparkSession

object Get3WeekUserCnt {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("Get3WeekUserCnt").master("local[*]").getOrCreate()

    // 1.读取 MySQL 数据
    spark.read.format("jdbc")
      .option("driver","com.mysql.jdbc.Driver")
      .option("url","jdbc:mysql://master:3306/test")
      .option("user","root")
      .option("password","123456")
      .option("dbtable","log_data")
      .load()
      .createTempView("data")

    // 2.获取 2022-03-10 这周的周日时间
    spark.sql(
      """
        |select
        |   user_id,
        |   date(log_time) log_date,
        |   date_add("2022-03-10",if(pmod(datediff("2022-03-10","1970-01-01")-3,7)=0,0,7-pmod(datediff("2022-03-10","1970-01-01")-3,7))) date_end
        |from
        |   data
        |""".stripMargin).createTempView("first_data")

    // 3.获取两周前的日期与当前日期周天的日期,筛选符合要求的数据
    spark.sql(
      """
        |select
        |   *
        |from
        |   (select
        |      user_id,
        |      date_add(log_date,if(pmod(datediff(log_date,"1970-01-01")-3,7)=0,0,7-pmod(datediff(log_date,"1970-01-01")-3,7))) log_week_end_date,
        |      date_end,
        |      date_sub(date_end,20) date_begin
        |   from
        |      first_data)t1
        |where
        |   log_date <= date_end
        |   and
        |   log_date >= date_begin
        |""".stripMargin).createTempView("second_data")

    // 4.计算各个用户三周连续性
    spark.sql(
      """
        |select
        |   end_date,
        |   count(distinct user_id) active_total,
        |   date_range
        |from
        |   (select
        |      "2022-03-10" end_date,
        |	   user_id,
        |      concat(date_begin,"_",date_end) date_range,
        |      count(user_id) over(partition by user_id order by unix_timestamp(log_week_end_date,"yyyy-MM-dd") range between 6*3600*24 preceding and current row) cnt_1week,
        |      count(user_id) over(partition by user_id order by unix_timestamp(log_week_end_date,"yyyy-MM-dd") range between 1*3600*24 following and 7*3600*24 following) cnt_2week,
        |      count(user_id) over(partition by user_id order by unix_timestamp(log_week_end_date,"yyyy-MM-dd") range between 8*3600*24 following and 14*3600*24 following) cnt_3week
        |   from
        |      second_data)t1
        |where
        |   cnt_1week >= 1
        |   and
        |   cnt_2week >= 1
        |   and
        |   cnt_3week >= 1
        |group by
        |   end_date,
        |   date_range
        |""".stripMargin).show()

    spark.stop()

  }

}

注意,最后统计数量的时候不要忘记对用户的 ID 进行去重!

输出结果如下:

Bingo~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/375486.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

能在软路由docker给部署搭建teamsperk服务器么?并且设置好ddns

参考链接(4条消息) 【个人学习总结】使用docker搭建Teamspeak服务器_blcurtain的博客-CSDN博客_teamspeak3 docker(⊙﹏⊙)哎呀&#xff0c;崩溃啦&#xff01; (tdeh.top)TeamSpeak服务器搭建与使用 - 缘梦の镇 (cmsboy.cn)Openwrt X86 docker运行甜糖-软路由,x86系统,openwrt…

(四)K8S 安装 Nginx Ingress Controller

ingress-nginx 是 Kubernetes 的入口控制器&#xff0c;使用NGINX作为反向代理和负载均衡器 版本介绍 版本1&#xff1a;Ingress NGINX Controller(k8s社区的ingres-nginx) 以 NGINX 开源技术为基础&#xff08;kubernetes.io&#xff09;&#xff0c;可在GitHub的 kubernet…

如何创建并管理一个刷题小组?

“如何收回用户对题库的使用权”&#xff0c;这是一个大多数题库创建人都会碰到的管理问题&#xff0c;也是日常咨询频繁的问题。土著刷题在v1.10版本已经上线了小组模块功能&#xff0c;小组拥有丰富的用户管理功能&#xff0c;可以管理组员对于题库的使用权进行有效的管理。咱…

高压放大器在应力波法套筒灌浆密实度检测研究中的应用

实验名称&#xff1a;高压放大器在应力波法套筒灌浆密实度检测研究中的应用研究方向&#xff1a;无损检测测试目的&#xff1a;钢筋套筒灌浆连接技术被广泛应用于装配式建筑节点连接中&#xff0c;但灌浆不密实将导致节点失效的风险。因此&#xff0c;施工中对套筒灌浆的密实度…

使用xca工具生成自签证书

本文使用 xca 生成自签证书。 概述 之前使用 openssl 生成证书&#xff0c;在 golang 中测试&#xff0c;发现客户端连接失败&#xff0c;经查发现是Subject Alternative Name不支持导致的。因虚拟机 openssl 版本较低&#xff0c;有个功能无法实现&#xff0c;且升级麻烦&…

SAP SD模块学习总结2 2023.2.27

https://www.cnblogs.com/jiangzhengjun/p/7264657.html#_Toc410466840 首先是表&#xff1a; VBAK: 销售订单抬头 VBAP: 销售订单项目 VBUK: 抬头状态 VBUP: 行项目状态 VBKD:销售凭证&#xff1a; 业务数据 VBPA: 销售凭证: 合作伙伴 VBEP&#xff1a;销售凭证&#xff1a;…

【论文速递】COLING 2022 - 带有事件论元相关性的事件因果关系抽取

【论文速递】COLING 2022 - 带有事件论元相关性的事件因果关系抽取 【论文原文】&#xff1a;Event Causality Extraction with Event Argument Correlations 【作者信息】&#xff1a;Cui, Shiyao and Sheng, Jiawei and Cong, Xin and Li, Quangang and Liu, Tingwen and S…

Android NDK动态加载SO库

背景对于一个普通的android应用来说&#xff0c;so库的占比通常都是巨高不下的&#xff0c;因为我们无可避免的在开发中遇到各种各样需要用到native的需求&#xff0c;所以so库的动态化可以减少极大的包体积&#xff0c;自从2020腾讯的bugly团队发部关于动态化so的相关文章后&a…

fuse:纠结的page下刷流程之fuse_writepage_in_flight

fuse&#xff1a;纠结的page下刷流程细节之fuse_writepage_in_flightfuse_writepage_in_flight硬爬代码自己理解消化作者本人如是说fuse_writepage_in_flight 先说下这个函数&#xff0c;位于fs/fuse/file.c&#xff0c;这里以4.19内核来分析。因为这个函数里面藏了很多小细节…

华为OD机试模拟题 用 C++ 实现 - 删除指定目录(2023.Q1)

最近更新的博客 【华为OD机试模拟题】用 C++ 实现 - 最多获得的短信条数(2023.Q1)) 文章目录 最近更新的博客使用说明删除指定目录题目输入输出示例一输入输出说明Code使用说明 参加华为od机试,一定要注意不要完全背诵代码,需要理解之后模仿写出,通过率才会高。 华为…

Macbook M1 安装PDI(Kettle) 9.3

Macbook M1 安装PDI(Kettle) 9.3 当前 PDI&#xff08;Kettle&#xff09;最新版为9.3&#xff0c;依赖Java JDK 11。因为没有专门用于 M1的程序&#xff0c;需要下载并安装x86_64架构的JDK及依赖软件&#xff0c;并 “强制在Intel模式下运行shell” 的方式来实现 Kettle 的正…

骨传导蓝牙耳机排行,盘点几款性能不错的骨传导耳机

随着蓝牙耳机的普及&#xff0c;骨传导耳机也越来越受到欢迎&#xff0c;很多人也都开始在了解并尝试骨传导耳机。相比于其他类型耳机&#xff0c;在舒适度、安全方面有一定优势。尤其是在户外运动时&#xff0c;或者长时间佩戴运动时&#xff0c;使用骨传导耳机可以避免耳朵因…

从“入门”到“专家”,一份3000字完整的性能测试体系的知识分享

随着科技的飞速发展&#xff0c;软件产品广泛应用于各个行业领域&#xff0c;人们对计算机和网络的依赖性越来越大&#xff0c;对新奇事物也越来越感兴趣&#xff0c;成千上万的用户活跃在庞大的网络系统中&#xff0c;这给提供服务的系统带来严重的负荷&#xff0c;"高并…

QT之图形视图框架概述——Graphics View Framework

QT之图形视图框架概述——Graphics View Framework1. 概述2. 核心类3. 事件传递4. Graphics View 坐标系统5. 参考1. 概述 Graphics View Framework是子Qt 4.2引入的&#xff0c;用来取代之前版本中的QCanvas。Graphics View Framework提拱了用于大量2D图形项的管理和交互的能…

Spring Boot 统一功能处理(用户登录权限效验-拦截器、异常处理、数据格式返回)

文章目录1. 统一用户登录权限效验1.1 最初用户登录权限效验1.2 Spring AOP 统一用户登录验证1.3 Spring 拦截器1.4 练习&#xff1a;登录拦截器1.5 拦截器实现原理1.6 统一访问前缀添加2. 统一异常处理3. 统一数据格式返回3.1 统一数据格式返回的实现3.2 ControllerAdvice 源码…

day21_IO

今日内容 上课同步视频:CuteN饕餮的个人空间_哔哩哔哩_bilibili 同步笔记沐沐霸的博客_CSDN博客-Java2301 零、 复习昨日 一、作业 二、File 三、IO流 四、字节输入&输出流 零、 复习昨日 见晨考 一、作业 见答案二、File 2.1 介绍 File,通过一个路径代表文件或者文件夹 …

Panda Farm:首个部署在 Arbitrum 上的轻量化 GameFi 游戏

在2月16日&#xff0c;Bitget平台宣布 Launchpad 重新启动&#xff0c;并推出了重启后的首个项目 Panda Farm&#xff08;BBO&#xff09;&#xff0c;该 Launchpad 启动后得到了较高的关注。 Panda Farm 是部署在 Arbitrum 上的 GameFi应用&#xff0c;这可能首先意味着 Bitge…

技术干货 | Modelica建模秘籍之状态变量

在很多领域都有“系统”这个概念&#xff0c;它描述的往往是一些复杂关系的总和。假如我们将系统看做一个黑箱&#xff0c;那么&#xff0c;在系统的作用下&#xff0c;外界的输入有时会产生令人意想不到的输出&#xff0c;“蝴蝶效应”就是其中的典型案例。图1 一只南美洲亚马…

RPC编程:RPC框架设计目标

一&#xff1a;前导知识 Http是超文本传输协议&#xff0c;跨平台性非常好。Http可以传输文本&#xff0c;更多的时候传输的是文本&#xff0c;我们也是可以传输二进制的&#xff0c;我们基于Http进行下载的时候&#xff0c;就是走的Http协议。 Tcp协议&#xff0c;处理的时候…

OpenShift 4 - 使用辅助安装器安装单节点 OpenShift

文章目录单节点 OpenShift 和 OpenShift 辅助安装器单节点 OpenShiftOpenShift 辅助安装器使用辅助安装器安装单节点 OpenShift本文使用的安装环境准备环境在宿主机上安装 KVM 环境创建 SSH 证书根据集群配置&#xff0c;用辅助安装器生成 Discovery ISO用 Discovery ISO 启动 …