spark应用----统计分析电商网站的用户行为数据

news2024/9/20 1:01:21

目录

项目说明

题目一:Top5热门品类

题目二:Top5热门品类中每个品类的Top5活跃Session统计

 scala实现

 新建maven项目结构如下

 配置pom.xml文件

scala代码

python实现


项目说明

本项目的数据是采集电商网站的用户行为数据,主要包含用户的4种行为:搜索、点击、下单和支付。需求说明:品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。

数据格式如下:

编号

字段名称

字段类型

字段含义

1

date

String

用户点击行为的日期

2

user_id

Long

用户的ID

3

session_id

String

Session的ID

4

page_id

Long

某个页面的ID

5

action_time

String

动作的时间点

6

search_keyword

String

用户搜索的关键词

7

click_category_id

Long

点击某一个商品品类的ID

8

click_product_id

Long

某一个商品的ID

9

order_category_ids

String

一次订单中所有品类的ID集合

10

order_product_ids

String

次订单中所有商品的ID集合

11

pay_category_ids

String

一次支付中所有品类的ID集合

12

pay_product_ids

String

次支付中所有商品的ID集合

13

city_id

Long

城市 id

题目一:Top5热门品类

需求:品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。本项目需求为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。输出结果如下:

 

  1. 通过使用groupby实现,Top5热门品类,将聚合后的数据排序,取前5名
  2. 通过使用reduceByKey实现,Top5热门品类,将聚合后的数据排序,取前5名

题目二:Top5热门品类中每个品类的Top5活跃Session统计

需求:对于排名前5的品类,分别获取每个品类点击次数排名前5的sessionId。(注意: 这里我们只关注点击次数,不关心下单和支付次数)这个就是说,对于top5的品类,每一个都要获取对它点击次数排名前5的sessionId。这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的session的行为。

  1. 输出结果


 scala实现

 新建maven项目结构如下

 配置pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>sparkrdd-141</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.1.0</spark.version>
    </properties>

    <dependencies>

        <!-- Spark Core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Scala 标准库 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

    </dependencies>


</project>

scala代码

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel

object Main {
        def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.WARN)

        val conf = new SparkConf().setMaster("local[*]").setAppName("WordCountHelloWorld")
        val sc = new SparkContext(conf)

        // 加载数据
        val rdd01 = sc.textFile("src/main/resources/user_visit_action.txt")
        val rdd02 = rdd01.map(_.split("_"))
        val rdd03 = rdd02.map(x => x.slice(6, x.length)) // ['-1', '-1', 'null', 'null', 'null', 'null', '18'], ['9', '51', 'null', 'null', 'null', 'null', '6']

        // 过滤点击、订单和支付数据
        val click_RDD = rdd03.filter(_(0) != "-1")
        val order_RDD = rdd03.filter(_(2) != "null")
        val pay_RDD = rdd03.filter(_(4) != "null")
        // ==================================================================================================================
        // 题目1
        // 使用groupBy
        val click_RDD11 = click_RDD.map(x => (x(0), 1))
        val click_RDD12 = click_RDD11.groupBy(_._1)
        val click_RDD13 = click_RDD12.mapValues(_.map(_._2).sum)
        // println(click_RDD13.collect())

        val order_RDD11 = order_RDD.flatMap(x => x(2).split(','))
        val order_RDD12 = order_RDD11.map(x => (x, 1))
        val order_RDD13 = order_RDD12.groupBy(_._1)
        val order_RDD14 = order_RDD13.mapValues(_.map(_._2).sum)
        // println(order_RDD14.collect())

        val pay_RDD11 = pay_RDD.flatMap(x => x(4).split(','))
        val pay_RDD12 = pay_RDD11.map(x => (x, 1))
        val pay_RDD13 = pay_RDD12.groupBy(_._1)
        val pay_RDD14 = pay_RDD13.mapValues(_.map(_._2).sum)
        // println(pay_RDD14.collect())

        val end_RDD10 = click_RDD13.join(order_RDD14).join(pay_RDD14)  // ('16', ((5928, 1782), 1233)), ('19', ((6044, 1722), 1158)),
        val end_RDD11 = end_RDD10.map(x => (x._1.toInt, x._2._1._1, x._2._1._2, x._2._2))

        // 缓存一下
        // end_RDD11.cache()
        end_RDD11.persist(StorageLevel.DISK_ONLY)

        // 法1 使用top
        val end_RDD_top = end_RDD11.top(5)(Ordering[(Int, Int, Int)].on(x => (x._2, x._3, x._4)))
        println('\n' + end_RDD_top.mkString("\n"))

        // 法2 使用sortBy
        val end_RDD_sort = end_RDD11.sortBy(x => (-x._2, -x._3, -x._4))
        println('\n' + end_RDD_sort.take(5).mkString("\n"))

        // ==================================================================================================================
        // 题目2
        // reduceByKey实现
        // 点击
        val click_RDD01 = click_RDD.map(x => (x(0), 1))
        val click_RDD02 = click_RDD01.reduceByKey(_ + _)
        // println('\n' + click_RDD02.collect())

        // 订单
        val order_RDD01 = order_RDD.flatMap(x => x(2).split(','))
        val order_RDD02 = order_RDD01.map(x => (x, 1))
        val order_RDD03 = order_RDD02.reduceByKey(_ + _)
        // println('\n' + order_RDD03.collect())

        // 支付
        val pay_RDD01 = pay_RDD.flatMap(x => x(4).split(','))
        val pay_RDD02 = pay_RDD01.map(x => (x, 1))
        val pay_RDD03 = pay_RDD02.reduceByKey(_ + _)
        // println('\n' + pay_RDD03.collect())

        val end_RDD = click_RDD02.join(order_RDD03).join(pay_RDD03)
        val end_RDD01 = end_RDD.map(x => (x._1.toInt, x._2._1._1, x._2._1._2, x._2._2))

        // 缓存一下
        // end_RDD01.cache()
        end_RDD01.persist(StorageLevel.DISK_ONLY)

        // 法1 使用top
        val end_RDD02 = end_RDD01.top(5)(Ordering[(Int, Int, Int)].on(x => (x._2, x._3, x._4)))
        println('\n' + end_RDD02.mkString("\n"))

        // 法2 使用sortBy
        val end_RDD03 = end_RDD01.sortBy(x => (-x._2, -x._3, -x._4))
        println('\n' + end_RDD03.take(5).mkString("\n"))


        // ==================================================================================================================
        // 题目3
        //     过滤出top5点击品类id 中的会话id
        val list_clickID_top = end_RDD_top.map(_._1).toList
        println(list_clickID_top)
        val session_RDD = rdd02.map(x => (x(6), x(2)))
        val session_RDD01 = session_RDD.filter(x => list_clickID_top.contains(x._1.toInt))

        //    构造 (clickid_sessionid,1)
        val session_RDD02 = session_RDD01.map(x => (x._1 + "_" + x._2, 1))
        //    聚合
        val session_RDD03 = session_RDD02.reduceByKey(_ + _)

        //    构造 (clickid,(sessionid,sum))
        val session_RDD04 = session_RDD03.map(x => (x._1.split("_"), x._2)).map(x => (x._1(0), (x._1(1), x._2)))
        //    根据clickid分组
        val session_RDD05 = session_RDD04.groupBy(_._1)

        // 排序取Top5热门品类中每个品类的Top5活跃Session统计
        val sorted_rdd = session_RDD05.mapValues(x => x.toList.sortBy(_._2._2).reverse.take(5))
        val sorted_rdd2 = sorted_rdd.mapValues(x => x.map(_._2))

        sorted_rdd2.foreach(x => println("(" + x._1 + ", " + x._2.toList + ")"))
        // println(sorted_rdd.collect())
        // val session_RDD06 = session_RDD05.map(x => (x._1, x._2.toList)).mapValues(x => (x.sorted(Ordering.by((_: (String, Int))._2).reverse).take(5)))  // [('12', ('6502cdc9-cf95-4b08-8854-f03a25baa917', 1)),
        // println(sorted_rdd.collect())
        }
}

python实现

from pyspark import SparkConf, SparkContext, StorageLevel

if __name__ == '__main__':
    conf = SparkConf().setSparkHome('spark_hw').setMaster("local[*]")
    sc = SparkContext(conf=conf)

    rdd01 = sc.textFile(r"D:\TASK\PythonProject\pythonSpark\01_RDD\user_visit_action.txt")
    rdd02 = rdd01.map(lambda x: x.split('_'))
    rdd03 = rdd02.map(lambda x: x[
                                6:])  # ['-1', '-1', 'null', 'null', 'null', 'null', '18'], ['9', '51', 'null', 'null', 'null', 'null', '6']]

    click_RDD = rdd03.filter(lambda x: x[0] != '-1')
    order_RDD = rdd03.filter(lambda x: x[2] != 'null')
    pay_RDD = rdd03.filter(lambda x: x[4] != 'null')

    # ==================================================================================================================
    # reduceByKey实现

    # 点击
    click_RDD01 = click_RDD.map(lambda x: (x[0], 1))
    click_RDD02 = click_RDD01.reduceByKey(lambda sum1, x: sum1 + x)
    # print('\n', click_RDD02.collect())

    # 订单
    order_RDD01 = order_RDD.flatMap(lambda x: x[2].split(','))
    order_RDD02 = order_RDD01.map(lambda x: (x, 1))
    order_RDD03 = order_RDD02.reduceByKey(lambda sum1, x: sum1 + x)
    # print('\n', order_RDD03.collect())

    # 支付
    pay_RDD01 = pay_RDD.flatMap(lambda x: x[4].split(','))
    pay_RDD02 = pay_RDD01.map(lambda x: (x, 1))
    pay_RDD03 = pay_RDD02.reduceByKey(lambda sum1, x: sum1 + x)
    # print('\n', pay_RDD03.collect())

    end_RDD = click_RDD02.join(order_RDD03).join(
        pay_RDD03)  # ('16', ((5928, 1782), 1233)), ('19', ((6044, 1722), 1158)),
    end_RDD01 = end_RDD.map(lambda x: (int(x[0]), x[1][0][0], x[1][0][1], x[1][1]))

    # 缓存一下
    # end_RDD01.cache()
    end_RDD01.persist(StorageLevel.DISK_ONLY)

    # 法1 使用top
    end_RDD02 = end_RDD01.top(5, key=lambda x: (x[1], x[2], x[3]))
    print('\n', end_RDD02)
    # 法2 使用sortBy
    end_RDD02 = end_RDD01.sortBy(lambda x: (-x[1], -x[2], -x[3]))
    print('\n', end_RDD02.take(5))

    # ==================================================================================================================
    # 使用sortBy
    click_RDD11 = click_RDD.map(lambda x: (x[0], 1))
    click_RDD12 = click_RDD11.groupBy(lambda x: x[0])
    click_RDD13 = click_RDD12.mapValues(lambda x: sum(i for j, i in x))
    # print(click_RDD13.collect())

    order_RDD11 = order_RDD.flatMap(lambda x: x[2].split(','))
    order_RDD12 = order_RDD11.map(lambda x: (x, 1))
    order_RDD13 = order_RDD12.groupBy(lambda x: x[0])
    order_RDD14 = order_RDD13.mapValues(lambda x: sum(i for j, i in x))
    # print(order_RDD14.collect())

    pay_RDD11 = pay_RDD.flatMap(lambda x: x[4].split(','))
    pay_RDD12 = pay_RDD11.map(lambda x: (x, 1))
    pay_RDD13 = pay_RDD12.groupBy(lambda x: x[0])
    pay_RDD14 = pay_RDD13.mapValues(lambda x: sum(i for j, i in x))
    # print(pay_RDD14.collect())

    end_RDD10 = click_RDD13.join(order_RDD14).join(
        pay_RDD14)  # ('16', ((5928, 1782), 1233)), ('19', ((6044, 1722), 1158)),
    end_RDD11 = end_RDD10.map(lambda x: (int(x[0]), x[1][0][0], x[1][0][1], x[1][1]))

    # 缓存一下
    # end_RDD11.cache()
    end_RDD11.persist(StorageLevel.DISK_ONLY)

    # 法1 使用top
    end_RDD_top = end_RDD11.top(5, key=lambda x: (x[1], x[2], x[3]))
    print('\n', end_RDD_top)

    # 法2 使用sortBy
    end_RDD_sort = end_RDD11.sortBy(lambda x: (-x[1], -x[2], -x[3]))
    print('\n', end_RDD_sort.take(5))

    # ==================================================================================================================
    # 过滤出top5点击品类id 中的会话id
    list_clickID_top = [x[0] for x in end_RDD_top]
    print(list_clickID_top)
    session_RDD = rdd02.map(lambda x: (x[6], x[2]))
    session_RDD01 = session_RDD.filter(lambda x: int(x[0]) in list_clickID_top)

    # 构造 (clickid_sessionid,1)
    session_RDD02 = session_RDD01.map(lambda x: (x[0] + '_' + x[1], 1))

    # 聚合
    session_RDD03 = session_RDD02.reduceByKey(lambda sum1, x: sum1 + x)

    # 构造 (clickid,(sessionid,sum))
    session_RDD04 = session_RDD03.map(lambda x: (x[0].split('_'), x[1])).map(lambda x: (x[0][0], (x[0][1], x[1])))
    # 根据 clcikid分组
    session_RDD05 = session_RDD04.groupBy(lambda x: x[0])
    session_RDD05.cache()
    session_RDD05.foreach(print)
    # for i in session_RDD05.collect():
    #     for j in i:
    #         print(list(j))
    # session_RDD05.saveAsTextFile("../my.txt")

    # x为迭代器对象 y为迭代器对象中的每一个元素
    sorted_rdd = session_RDD05.mapValues(lambda x: sorted(x, key=lambda y: y[1][1], reverse=True)[:5])
    sorted_rdd = sorted_rdd.mapValues(lambda x: (j for i, j in x))

    # modified_rdd = sorted_rdd.mapValues(lambda values: [y for _, y in values])

    sorted_rdd.foreach(lambda x: print("(", x[0], list(x[1]), ")"))
    # print(sorted_rdd.collect())
    # session_RDD06 = session_RDD05.map(lambda x: (x[0], list(x[1]))).mapValues(lambda x: (sorted(-i[1][1] for i in x)[:5]))  # [('12', ('6502cdc9-cf95-4b08-8854-f03a25baa917', 1)),
    # print(sorted_rdd.collect())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/691374.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自制GPD Win2底壳

直接看效果吧&#xff0c;壳子做了一个月&#xff0c;算是从0开始吧&#xff0c; 打样就打了好几套&#xff0c;最后还差点小细节没做好&#xff0c;整体效果还算满意。

资深老鸟整理,性能测试平均负载详情,一篇足够...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 uptime 命令 每次…

6.用python写网络爬虫,表单交互

在前面几章中&#xff0c;我们下载的静态网页总是返回相同的内容。而在本章中&#xff0c;我们将与网页进行交互 根据用户输入返回对应的内容。本章将包含如下几个主题&#xff1a; 发送 POST 请求提交表单&#xff1a; 使用 cookie 登录网站&#xff1a; 用于简化表单提交的高…

EasyExcel概述

首先导入依赖 <dependencies><dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>3.9</version></dependency><dependency><groupId>org.apache.poi</groupId><…

element 表格复选框设置禁用

禁用之后的效果&#xff0c;移入显示不可选中 <el-table :data"tableData" :row-class-name"tableRowClassName" border height"500" style"width: 100%" selection-change"handleSelectionChange"><el-table-colu…

项目风险管理6大黄金法则

在软件项目管理过程中&#xff0c;风险无处不在。风险的不确定性&#xff0c;往往导致项目延期、费用增加对项目保质保量交付造成极大影响。 如何更好地进行风险管理&#xff0c;以积极的态度处理项目风险&#xff0c;最大程度减轻风险对项目的威胁&#xff0c;就显得尤为重要。…

2023年,推荐这5款主流低代码开发平台

近几年&#xff0c;在技术领域低代码是比较热门的话题&#xff0c;低代码是基于可视化和模型驱动理念&#xff0c;结合云原生与多端体验技术&#xff0c;它能够在多数业务场景下实现大幅度的提效降本&#xff0c;为专业开发者提供了一种全新的高生产力开发范式。 低代码平台对…

uCOSiii的默认任务

uCOS有uCOSii和uCOSiii,这两个都是一个可裁剪、可剥夺型的多任务内核。 uCOSiii没有任务数限制&#xff0c;uCOSiii内部任务有5个&#xff1a; 中断服务服务管理任务&#xff0c;时钟节拍任务&#xff0c;定时器任务 &#xff0c;统计任务&#xff0c;空闲任务。 1、优先级…

C语言程序环境和预处理(1)

本章主要以图片和文字的形式给大家讲解 程序的翻译环境和程序的执行环境 在ANSI C的任何一种实现中&#xff0c;存在两个不同的环境。 第1种是翻译环境&#xff0c;在这个环境中源代码被转换为可执行的机器指令。 第2种是执行环境&#xff0c;它用于实际执行代码 2. 详解编译…

ModbusRTU协议封装,控制RJ45报警器,复制一下就能用哦~

本文只对 写保持寄存器 HoldingRegister 做操作,其他类型的寄存器方法方法也在ModbusWriteOrRead类中,可自行测试。 报警器设备型号(USB版):JD01AX07 01 设备外观及亮灯: 文档说明-部分: 注: 以下图第一个绿灯开启的二进制命令为例: 01 06:寄存器类型 00 00:…

机器学习6:使用 TensorFlow 的训练线性回归模型

纸上得来终觉浅&#xff0c;绝知此事要躬行。前面 5 篇文章介绍了机器学习相关的部分基础知识&#xff0c;在本章&#xff0c;笔者将讲解基于 TensorFlow 实现一个简单的线性回归模型&#xff0c;以便增强读者对机器学习的体感。 目录 1.环境准备 1.1 安装 Python3 1.2 安装…

MySQL 卸载与安装

卸载 先打开控制面板>>>程序>>>程序和功能 里卸载mysql的所有程序。 然后去计算机文件里查看有没有mysql文件残留的&#xff0c;全部删除。 在系统变量Path中删除mysql的路径。 再去删除服务&#xff0c;以管理员身份运行终端。 最后再去注册表里删除关于my…

搭建selenoid环境

1、拉取浏览器镜像 docker pull selenoid/vnc:chrome_103.02、拉取selenoid-ui容器镜像 docker pull aerokube/selenoid-ui:1.10.43、拉取selenoid容器镜像 docker pull aerokube/selenoid4、编写配置文件 vi /selenoid/config/browsers.json volumes可以做容器路径映射&…

【Linux 驱动篇(二)】LED 驱动开发

文章目录 一、Linux 下 LED 灯驱动原理1. 地址映射1.1 ioremap 函数1.2 iounmap 函数 2. I/O 内存访问函数2.1 读操作函数2.2 写操作函数 二、实验程序编写1. LED 灯驱动程序编写2. 编写测试 APP 三、运行测试1. 编译驱动程序和测试 APP1.1 编译驱动程序1.2 编译测试 APP 2. 运…

云端安全由繁到简,亚马逊云科技护航业务创新新局面

数字化愿景与现实存在的差距困扰着诸多企业&#xff0c;但造成这种差距的一个重要因素却一直被很多管理者所忽视&#xff0c;那就是企业未能建立应有的数字安全与合规体系。应用迭代的速度加快、数据快速膨胀、企业云原生道路上遭遇的种种困境&#xff0c;与数字安全部门有限的…

6款高质量国产软件,让你办公舒适度拉满,高效完成工作

布丁扫描——强大的文档扫描器 布丁扫描是一款可以用手机进行扫描的国产软件&#xff0c;可以快速、方便地转换纸质文件为电子文件&#xff0c;提高工作效率。 它可以将手机的摄像头用作扫描仪&#xff0c;将纸质文件、照片、证件等物品转换成数字格式的文件&#xff0c;还可对…

vue3-实战-14-管理后台-数据大屏-男女比例-年龄比例-地图以及轨迹-趋势折线图等

目录 1-男女比例【柱状图】 1.1-大屏男女比例原型需求 1.2-结构样式逻辑开发 2-年龄比例-饼图 2.1-原型需求分析 2.2-结构样式逻辑开发 3-中国地图和运行轨迹 3.1-地图组件需求原型 3.2-结构样式逻辑开发 4-未来7天游客数量趋势图-折线图 5-右侧的相关图 6-总结 1-…

数字空间-服务器应用监控系统

完整资料进入【数字空间】查看——baidu搜索"writebug" 随着各行业信息化建设的不断深入发展&#xff0c;单独的服务器已经再无法满足企业的需求。网络和应用规模日趋扩大&#xff0c;服务器网络集群大量应用于中、小型企业中&#xff0c;服务器性能监控和日常维护变…

金属元素螯合剂:(S)-DOTAGA-(COOt-Bu)4,1023889-20-4,可应用于制备纳米材料

文章关键词&#xff1a;金属元素螯合剂&#xff0c;大环配体&#xff0c;DOTA标记(COOt-Bu)4 ●中文名&#xff1a;(S)-DOTAGA-四叔丁酯 ●英文名&#xff1a;(S)-DOTAGA-(COOt-Bu)4 ●外观以及性质&#xff1a; 西安凯新生物科技有限公司供应的​(S)-DOTAGA-(COOt-Bu)4中DOT…

vue项目 ‘npm run dev‘ 报错 npm ERR! errno 134

npm ERR! errno 134 表示 npm 执行出现了致命错误&#xff0c;通常是由于内存不足或程序崩溃导致的。 这时需要我们分配更多的内存给vue-cli-service serve 解决方案 1. 安装increase-memory-limit cross-env依赖 npm install increase-memory-limit cross-env increase-mem…