sparkRDD编程实战

news2024/11/23 12:18:23

文章目录

  • sparkRDD编程实战
    • 1、Spark RDD 实现单词计数
    • 2、Spark RDD 实现分组求TopN
    • 3、Spark RDD 实现二次排序
    • 4、Spark RDD 计算平均成绩
    • 5、Spark RDD 倒排索引统计每日新增用户
    • 6、Spark案例实操
    • 7、Spark RDD 综合应用
      • 需求1:Top10热门品类
        • 需求说明
        • 实现方案一
        • 实现方案二
        • 实现方案三
        • 实现方案四
      • 需求2:Top10热门品类中每个品类的Top10活跃Session统计
      • 需求3:页面单跳转换率统计

在这里插入图片描述

sparkRDD编程实战

1、Spark RDD 实现单词计数

实现思路:

先使用flatmap对一行的单词进行切分,然后构造成为二元组(单词,计数),然后按照单词进行聚合。

代码如下:

RDD实现:

package test3.wordcount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 思路;
 * 1. flapMap(_.split(" "))
 * 2. map(x => (x, 1))
 * 3. reduceByKey(_ + _)
 */
object wordCount {
  def main(args: Array[String]): Unit = {
    // 环境准备
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    // 读取数据
    val rdd: RDD[String] = sc.textFile("datas/1.txt")

    val word = rdd
      .flatMap(x => x.split(" ")) //单词切分
      .map((_, 1)) //构成二元组
      .reduceByKey(_ + _) //根据key聚合
    // 输出结果
    word.collect().foreach(println)
  }
}

2、Spark RDD 实现分组求TopN

分组求TopN是大数据领域常见的需求,主要是根据数据的某一列进行分组,然后将分组后的每一组数据按照指定的列进行排序,最后取每一组的前N行数据。

有以下学生成绩数据,grade.txt

Andy,98
Jack,87
Bill,99
Andy,78
Jack,85
Bill,86
Andy,90
Jack,88
Bill,76
Andy,58
Jack,67
Bill,79

同一个学生有多门成绩,现需要计算每个学生分数最高的前3个成绩,期望的输出结果如下:

image-20230317235127889

实现思路:

因为每一行为一条数据,所以先构成(姓名,成绩)二元组,然后根据姓名进行分组,对组内数据按照降序排列,取前3个,最后按照输出语句打印结果。

代码实现:

Spark RDD实现:

package test3.topn

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object getTopN {
  def main(args: Array[String]): Unit = {
    // 环境准备
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TopN")
    val sc = new SparkContext(sparkConf)
    // 读取数据
    val rdd: RDD[String] = sc.textFile("datas/grade.txt")

    rdd
      .map(line => {
        val fields = line.split(",") //每一行按照,进行切分
        (fields(0), fields(1)) //返回(姓名,成绩)二元组
      })
      .groupBy(_._1) //根据姓名进行分组,(Andy,CompactBuffer((Andy,98), (Andy,78), (Andy,90), (Andy,58)))
      .mapValues(x => { //((Andy,98), (Andy,78), (Andy,90), (Andy,58))
        x.map(_._2).toList.sortWith(_ > _).take(3) //根据值进行排序,取前三个
      })
      .collect().foreach( //打印成绩
      x => {
        println("姓名:" + x._1)
        x._2.foreach(y => {
          println("成绩:" + y)
        })
        println("********************")
      })
  }
}

3、Spark RDD 实现二次排序

二次排序是指对需要排序的元素首先按照第一个字段进行排序,若第一个字段相等,则按照第二个字段排序。例如,文件sort.txt中有以下内容:

6 7
5 8
2 9
7 5
4 3
8 3
2 7
6 1

首先按照第一个字段升序排列,若第一个字段相等,则按照第二个字段降序排列,期望的输出结果如下:

2 9
2 7
4 3
5 8
6 7
6 1
7 5
8 3

实现思路:

先对数据进行切分构成二元组,根据第一个数字进行分组,按照第二个数字进行组内排序,然后按照第一个数据进行排序,最后打印输出。

代码实现:

package test3.twiceorder

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object twiceOrder {
  def main(args: Array[String]): Unit = {
    //环境准备
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TwoOrder")
    val sc = new SparkContext(sparkConf)

    //读取数据
    val rdd: RDD[String] = sc.textFile("datas/sort.txt")

    rdd
      .map( //构建二元组
        line => {
          val fields = line.split(" ")
          (fields(0), fields(1))
        }
      )
      .groupBy(_._1) //根据第一个数分组,(6,CompactBuffer((6,7), (6,1)))
      .mapValues(x => {
        x.toList.sortWith(_._2 > _._2) //降序,(6,List((6,7), (6,1)))
      })
      .sortByKey() //默认为升序
      .collect().foreach(_._2.foreach(println))
  }
}

输出结果为:

image-20230602161055511

4、Spark RDD 计算平均成绩

对输入文件中的学生3科成绩进行计算,得出每个学生的平均成绩。输入文件中的每行内容均为一个学生的姓名和其相应的成绩,每门学科为一个文件。要求输出结果中每行有两列数据,其中第一列代表学生的姓名,第二列代表其平均成绩。输入的3个文件内容如下:

math.txt文件内容如下:

张三  88
李四  99
王五  66
赵六  77

chinese.txt文件内容如下:

张三  78
李四  89
王五  96
赵六  67

english.txt文件内容如下:

张三  80
李四  82
王五  84
赵六  86

期望输出结果如下:

张三  82
李四  90
王五  82
赵六  76

实现思路:

首先根据数据切分成为二元组,然后将数据按照姓名进行聚合,再进行遍历得到平均成绩,最后输出结果。

代码实现:

package test3.getavg

import org.apache.spark.{SparkConf, SparkContext}

object getAvg {
  def main(args: Array[String]): Unit = {
    //环境准备
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ScoreAvg")
    val sc = new SparkContext(sparkConf)
    //读取文件
    val math = sc.textFile("datas/math.txt")
    val chinese = sc.textFile("datas/chinese.txt")
    val english = sc.textFile("datas/english.txt")
    //数据合并
    val score = math.union(chinese).union(english)

    score
      .map(line => {
        val fields = line.split("  ") //两个空格
        (fields(0), fields(1).toInt)
      })
      .reduceByKey(_ + _)
      .map(x => {
        val name = x._1
        val avg = (x._2 / 3).toString.format("%.2f")
        (name, avg)
      })
      .collect().foreach(y => {
      println(y._1 + "  " + y._2)
    })
  }
}

5、Spark RDD 倒排索引统计每日新增用户

已知有以下用户访问历史数据,第一列为用户访问网站的日期,第二列为用户名:

2020-01-01,user1
2020-01-01,user2
2020-01-01,user3
2020-01-02,user1
2020-01-02,user2
2020-01-02,user4
2020-01-03,user2
2020-01-03,user5
2020-01-03,user6

现需要根据上述数据统计每日新增的用户数量,期望的统计结果为:

2020-01-01,3 
2020-01-02,1 
2020-01-03,2

即2020-01-01新增了3个用户(分别为user1、user2、user3),2020-01-02新增了1个用户(user4),2020-01-03新增了两个用户(分别为user5、user6)。

实现思路:

由于一行为一条记录,先对数据进行切分构成二元组(时间,用户),然后按照用户进行分组,得到分组后的数据,取第一条数据为该用户第一次出现的数据,然后按照时间进行分组,最后输出结果。

最重要的是要理解每日新增用户,即该用户在改天第一次出现。

代码实现:

package test3

import org.apache.spark.{SparkConf, SparkContext}

object NewVisiter {
  def main(args: Array[String]): Unit = {
    //环境准备
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ScoreAvg")
    val sc = new SparkContext(sparkConf)
    //读取数据
    val rdd = sc.textFile("datas/date_user.txt")
    rdd.map(line => {
      val split = line.split(",")
      (split(0), split(1))
    })
      .groupBy(_._2) //根据用户分组
      .map(x => {
        val first = x._2.toList(0)
        first
      }) //取第一次出现的日期
      .groupByKey() //根据日期分组
      .sortByKey() //对日期排序
      .collect().foreach(y => { //打印输出
      println(y._1 + "," + y._2.size)
    })
  }
}

输出结果为:

image-20230318113528083

6、Spark案例实操

1)数据准备

agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。

2)需求描述

统计出每一个省份每个广告被点击数量排行的 Top3

3) 思路分析

首先我们先对原始数据进行结构的转换,只需要省份和广告即可,接着我们按照(省份,广告)进行聚合,然后按照省份进行分组,再进行组内排序,取前三个,最后打印输出。

4)代码实现

package test3.topn

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object getTop3 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 案例实操

    // 1. 获取原始数据:时间戳,省份,城市,用户,广告
    val dataRDD = sc.textFile("datas/agent.log")

    // 2. 将原始数据进行结构的转换。方便统计
    //    时间戳,省份,城市,用户,广告 => ((省份,广告),1)
    val mapRDD = dataRDD.map(
      line => {
        val datas = line.split(" ")
        ((datas(1), datas(4)), 1)
      }
    )

    // 3. 将转换结构后的数据,进行分组聚合
    //    ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum )
    val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)

    // 4. 将聚合的结果进行结构的转换
    //    ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) )
    val newMapRDD = reduceRDD.map {
      case ((prv, ad), sum) => {
        (prv, (ad, sum))
      }
    }

    // 5. 将转换结构后的数据根据省份进行分组
    //    ( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 )
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()

    // 6. 将分组后的数据组内排序(降序),取前3名
    val resultRDD = groupRDD.mapValues(
      iter => {
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
      }
    )

    // 7. 采集数据打印在控制台
    resultRDD.collect().foreach(println)

    sc.stop()


  }
}

输出结果为:

image-20230602171618311

7、Spark RDD 综合应用

image-20230319102825000

上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的4种行为:搜索,点击,下单,支付。数据规则如下:

  • 数据文件中每行数据采用下划线分隔数据
  • 每一行数据表示用户的一次行为,这个行为只能是4种行为的一种
  • 如果搜索关键字为null,表示数据不是搜索数据
  • 如果点击的品类ID和产品ID为-1,表示数据不是点击数据
  • 针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,id之间采用逗号分隔,如果本次不是下单行为,则数据采用null表示
  • 支付行为和下单行为类似

详细字段说明:

编号字段名称字段类型字段含义
1dateString用户点击行为的日期
2user_idLong用户的ID
3session_idStringSession的ID
4page_idLong某个页面的ID
5action_timeString动作的时间点
6search_keywordString用户搜索的关键词
7click_category_idLong某一个商品品类的ID
8click_product_idLong某一个商品的ID
9order_category_idsString一次订单中所有品类的ID集合
10order_product_idsString一次订单中所有商品的ID集合
11pay_category_idsString一次支付中所有品类的ID集合
12pay_product_idsString一次支付中所有商品的ID集合
13city_idLong城市 id

样例类:

//用户访问动作表
case class UserVisitAction(
    date: String,//用户点击行为的日期
    user_id: Long,//用户的ID
    session_id: String,//Session的ID
    page_id: Long,//某个页面的ID
    action_time: String,//动作的时间点
    search_keyword: String,//用户搜索的关键词
    click_category_id: Long,//某一个商品品类的ID
    click_product_id: Long,//某一个商品的ID
    order_category_ids: String,//一次订单中所有品类的ID集合
    order_product_ids: String,//一次订单中所有商品的ID集合
    pay_category_ids: String,//一次支付中所有品类的ID集合
    pay_product_ids: String,//一次支付中所有商品的ID集合
    city_id: Long
)//城市 id

需求1:Top10热门品类

image-20230319103515409

需求说明

​ 品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。

鞋			 点击数 下单数  支付数
衣服		    点击数 下单数  支付数
电脑		    点击数 下单数  支付数

例如,综合排名 = 点击数20%+下单数30%+支付数*50%

本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数

实现方案一

需求分析

分别统计每个品类点击的次数,下单的次数和支付的次数:(品类,点击总数)(品类,下单总数)(品类,支付总数)

需求实现

package test3.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_Req1_HotCategoryTop10Analysis {

  def main(args: Array[String]): Unit = {

    // TODO : Top10热门品类
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(sparConf)

    // 1. 读取原始日志数据
    val actionRDD = sc.textFile("datas/user_visit_action.txt")

    // 2. 统计品类的点击数量:(品类ID,点击数量)
    val clickActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(6) != "-1" //过滤不是点击的品类ID
      }
    )

    val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
      action => {
        val datas = action.split("_")
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    // 3. 统计品类的下单数量:(品类ID,下单数量)
    val orderActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(8) != "null"
      }
    )

    // orderid => 1,2,3 进行扁平化操作
    // 【(1,1),(2,1),(3,1)】
    val orderCountRDD = orderActionRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(8)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    // 4. 统计品类的支付数量:(品类ID,支付数量)
    val payActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(10) != "null"
      }
    )

    // orderid => 1,2,3
    // 【(1,1),(2,1),(3,1)】
    val payCountRDD = payActionRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(10)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    // 5. 将品类进行排序,并且取前10名
    //    点击数量排序,下单数量排序,支付数量排序
    //    元组排序:先比较第一个,再比较第二个,再比较第三个,依此类推
    //    ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )
    //
    //  cogroup = connect + group
    val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
    clickCountRDD.cogroup(orderCountRDD, payCountRDD)
    val analysisRDD = cogroupRDD.mapValues {
      case (clickIter, orderIter, payIter) => {

        var clickCnt = 0
        val iter1 = clickIter.iterator
        if (iter1.hasNext) {
          clickCnt = iter1.next()
        }
        var orderCnt = 0
        val iter2 = orderIter.iterator
        if (iter2.hasNext) {
          orderCnt = iter2.next()
        }
        var payCnt = 0
        val iter3 = payIter.iterator
        if (iter3.hasNext) {
          payCnt = iter3.next()
        }

        (clickCnt, orderCnt, payCnt)
      }
    }
    // 根据元组进行降序排序
    val resultRDD = analysisRDD.sortBy(_._2, false).take(10)

    // 6. 将结果采集到控制台打印出来
    resultRDD.foreach(println)

    sc.stop()
  }
}

输出结果为:

image-20230603104649516

实现方案二

代码优化:

cogroup性能可能较低
actionRDD重复使用
package test3.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_Req1_HotCategoryTop10Analysis1 {

  def main(args: Array[String]): Unit = {

    // TODO : Top10热门品类
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(sparConf)

    // Q : actionRDD重复使用
    // Q : cogroup性能可能较低

    // 1. 读取原始日志数据
    val actionRDD = sc.textFile("datas/user_visit_action.txt")
    actionRDD.cache()

    // 2. 统计品类的点击数量:(品类ID,点击数量)
    val clickActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(6) != "-1"
      }
    )

    val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
      action => {
        val datas = action.split("_")
        (datas(6), 1)
      }
    ).reduceByKey(_ + _)

    // 3. 统计品类的下单数量:(品类ID,下单数量)
    val orderActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(8) != "null"
      }
    )

    // orderid => 1,2,3
    // 【(1,1),(2,1),(3,1)】
    val orderCountRDD = orderActionRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(8)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    // 4. 统计品类的支付数量:(品类ID,支付数量)
    val payActionRDD = actionRDD.filter(
      action => {
        val datas = action.split("_")
        datas(10) != "null"
      }
    )

    // orderid => 1,2,3
    // 【(1,1),(2,1),(3,1)】
    val payCountRDD = payActionRDD.flatMap(
      action => {
        val datas = action.split("_")
        val cid = datas(10)
        val cids = cid.split(",")
        cids.map(id => (id, 1))
      }
    ).reduceByKey(_ + _)

    // (品类ID, 点击数量) => (品类ID, (点击数量, 0, 0))
    // (品类ID, 下单数量) => (品类ID, (0, 下单数量, 0))
    //                    => (品类ID, (点击数量, 下单数量, 0))
    // (品类ID, 支付数量) => (品类ID, (0, 0, 支付数量))
    //                    => (品类ID, (点击数量, 下单数量, 支付数量))
    // ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )

    // 5. 将品类进行排序,并且取前10名
    //    点击数量排序,下单数量排序,支付数量排序
    //    元组排序:先比较第一个,再比较第二个,再比较第三个,依此类推
    //    ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )
    //
    val rdd1 = clickCountRDD.map {
      case (cid, cnt) => {
        (cid, (cnt, 0, 0))
      }
    }
    val rdd2 = orderCountRDD.map {
      case (cid, cnt) => {
        (cid, (0, cnt, 0))
      }
    }
    val rdd3 = payCountRDD.map {
      case (cid, cnt) => {
        (cid, (0, 0, cnt))
      }
    }

    // 将三个数据源合并在一起,统一进行聚合计算
    val soruceRDD: RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)

    val analysisRDD = soruceRDD.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )

    val resultRDD = analysisRDD.sortBy(_._2, false).take(10)

    // 6. 将结果采集到控制台打印出来
    resultRDD.foreach(println)

    sc.stop()
  }
}

实现方案三

需求分析

存在大量的shuffle操作(reduceByKey)

一次性统计每个品类点击的次数,下单的次数和支付的次数:(品类,(点击总数,下单总数,支付总数))

需求实现

package test3.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark03_Req1_HotCategoryTop10Analysis2 {

  def main(args: Array[String]): Unit = {

    // TODO : Top10热门品类
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(sparConf)

    // Q : 存在大量的shuffle操作(reduceByKey)
    // reduceByKey 聚合算子,spark会提供优化,缓存

    // 1. 读取原始日志数据
    val actionRDD = sc.textFile("datas/user_visit_action.csv")

    // 2. 将数据转换结构
    //    点击的场合 : ( 品类ID,( 1, 0, 0 ) )
    //    下单的场合 : ( 品类ID,( 0, 1, 0 ) )
    //    支付的场合 : ( 品类ID,( 0, 0, 1 ) )
    val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
      action => {
        val datas = action.split(",")
        if (datas(6) != "-1") {
          // 点击的场合
          List((datas(6), (1, 0, 0)))
        } else if (datas(8) != "null") {
          // 下单的场合
          val ids = datas(8).split("-")
          ids.map(id => (id, (0, 1, 0)))
        } else if (datas(10) != "null") {
          // 支付的场合
          val ids = datas(10).split("-")
          ids.map(id => (id, (0, 0, 1)))
        } else {
          Nil
        }
      }
    )

    // 3. 将相同的品类ID的数据进行分组聚合
    //    ( 品类ID,( 点击数量, 下单数量, 支付数量 ) )
    val analysisRDD = flatRDD.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )

    // 4. 将统计结果根据数量进行降序处理,取前10名
    val resultRDD = analysisRDD.sortBy(_._2, false).take(10)

    // 5. 将结果采集到控制台打印出来
    resultRDD.foreach(println)

    sc.stop()
  }
}

实现方案四

需求分析

使用累加器的方式聚合数据

需求实现

package test3.req

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object Spark04_Req1_HotCategoryTop10Analysis3 {

  def main(args: Array[String]): Unit = {

    // TODO : Top10热门品类
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(sparConf)

    // 1. 读取原始日志数据
    val actionRDD = sc.textFile("datas/user_visit_action.csv")

    val acc = new HotCategoryAccumulator
    sc.register(acc, "hotCategory")

    // 2. 将数据转换结构
    actionRDD.foreach(
      action => {
        val datas = action.split(",")
        if (datas(6) != "-1") {
          // 点击的场合
          acc.add((datas(6), "click"))
        } else if (datas(8) != "null") {
          // 下单的场合
          val ids = datas(8).split("-")
          ids.foreach(
            id => {
              acc.add((id, "order"))
            }
          )
        } else if (datas(10) != "null") {
          // 支付的场合
          val ids = datas(10).split("-")
          ids.foreach(
            id => {
              acc.add((id, "pay"))
            }
          )
        }
      }
    )

    val accVal: mutable.Map[String, HotCategory] = acc.value
    val categories: mutable.Iterable[HotCategory] = accVal.map(_._2)

    val sort = categories.toList.sortWith(
      (left, right) => {
        if (left.clickCnt > right.clickCnt) {
          true
        } else if (left.clickCnt == right.clickCnt) {
          if (left.orderCnt > right.orderCnt) {
            true
          } else if (left.orderCnt == right.orderCnt) {
            left.payCnt > right.payCnt
          } else {
            false
          }
        } else {
          false
        }
      }
    )

    // 5. 将结果采集到控制台打印出来
    sort.take(10).foreach(println)

    sc.stop()
  }

  case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)

  /**
   * 自定义累加器
   * 1. 继承AccumulatorV2,定义泛型
   * IN : ( 品类ID, 行为类型 )
   * OUT : mutable.Map[String, HotCategory]
   * 2. 重写方法(6)
   */
  class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {

    private val hcMap = mutable.Map[String, HotCategory]()

    override def isZero: Boolean = {
      hcMap.isEmpty
    }

    override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
      new HotCategoryAccumulator()
    }

    override def reset(): Unit = {
      hcMap.clear()
    }

    override def add(v: (String, String)): Unit = {
      val cid = v._1
      val actionType = v._2
      val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
      if (actionType == "click") {
        category.clickCnt += 1
      } else if (actionType == "order") {
        category.orderCnt += 1
      } else if (actionType == "pay") {
        category.payCnt += 1
      }
      hcMap.update(cid, category)
    }

    override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
      val map1 = this.hcMap
      val map2 = other.value

      map2.foreach {
        case (cid, hc) => {
          val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
          category.clickCnt += hc.clickCnt
          category.orderCnt += hc.orderCnt
          category.payCnt += hc.payCnt
          map1.update(cid, category)
        }
      }
    }

    override def value: mutable.Map[String, HotCategory] = hcMap
  }
}

需求2:Top10热门品类中每个品类的Top10活跃Session统计

需求说明

在需求一的基础上,增加每个品类用户session的点击统计

package test3.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark05_Req2_HotCategoryTop10SessionAnalysis {

  def main(args: Array[String]): Unit = {

    // TODO : Top10热门品类
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(sparConf)

    val actionRDD = sc.textFile("datas/user_visit_action.csv")
    actionRDD.cache()
    val top10Ids: Array[String] = top10Category(actionRDD)

    // 1. 过滤原始数据,保留点击和前10品类ID
    val filterActionRDD = actionRDD.filter(
      action => {
        val datas = action.split(",")
        if (datas(6) != "-1") {
          top10Ids.contains(datas(6))
        } else {
          false
        }
      }
    )

    // 2. 根据品类ID和sessionid进行点击量的统计
    val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map(
      action => {
        val datas = action.split(",")
        ((datas(6), datas(2)), 1)
      }
    ).reduceByKey(_ + _)

    // 3. 将统计的结果进行结构的转换
    //  (( 品类ID,sessionId ),sum) => ( 品类ID,(sessionId, sum) )
    val mapRDD = reduceRDD.map {
      case ((cid, sid), sum) => {
        (cid, (sid, sum))
      }
    }

    // 4. 相同的品类进行分组
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()

    // 5. 将分组后的数据进行点击量的排序,取前10名
    val resultRDD = groupRDD.mapValues(
      iter => {
        iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
      }
    )

    resultRDD.collect().foreach(println)


    sc.stop()
  }

  def top10Category(actionRDD: RDD[String]) = {
    val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
      action => {
        val datas = action.split(",")
        if (datas(6) != "-1") {
          // 点击的场合
          List((datas(6), (1, 0, 0)))
        } else if (datas(8) != "null") {
          // 下单的场合
          val ids = datas(8).split("-")
          ids.map(id => (id, (0, 1, 0)))
        } else if (datas(10) != "null") {
          // 支付的场合
          val ids = datas(10).split("-")
          ids.map(id => (id, (0, 0, 1)))
        } else {
          Nil
        }
      }
    )

    val analysisRDD = flatRDD.reduceByKey(
      (t1, t2) => {
        (t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
      }
    )

    analysisRDD.sortBy(_._2, false).take(10).map(_._1)
  }
}

输出结果为:

(20,List((710373f5-3a2e-4fec-9ddb-623d779273e6,6), (f632bbe4-e5ba-4dcf-91ef-89f9688376a9,6), (7d0014a4-c501-456f-9d30-fe6af79d933c,6), (3208994d-5867-4c8f-841f-42700aeabfb3,6), (c34d6503-b62b-4bab-9b85-7b0aefd8a5e0,5), (78bc6698-ac26-4f1b-aaa0-05232f3322e5,5), (8b9a0f5a-bff9-4f37-b3bb-6aa3c24d24f7,5), (c9f87d6f-6a62-4e20-a2bb-aa0b690c134c,5), (7ea420e7-eaa4-42dd-b517-470656fb87b6,5), (1c5e83e4-370c-4163-a387-6d61a0233e24,5)))
(15,List((8b9a0f5a-bff9-4f37-b3bb-6aa3c24d24f7,8), (e01a941e-08e7-4fd6-aa77-eb10b5c2644b,8), (4c840047-6302-4e7d-8505-f5a0ac11da2f,5), (dd89c2b4-d6f1-4112-b705-6ce263abdfd4,5), (1a163c8d-3d58-4a76-8b28-d732bb51aec8,5), (0f0c33f2-a9c9-4264-befe-dce0d892526a,5), (f9936a33-3cad-4ac0-8153-77234c6cdb09,5), (57500425-a1d6-4dad-8ec4-224e443e567f,5), (75cf4d14-87f4-4e39-8050-c3d164a002bc,4), (b3fa6eb2-9028-46d0-9222-df5e649238bb,4)))
(2,List((36003baf-6d64-49be-a047-9fc8c0e76ca8,6), (0d10c830-c48c-418c-b51b-66d1130b4b57,5), (233e790a-1d96-4559-9205-471dc235161d,5), (1ecc15fb-85e4-4c0f-b68f-7351d9f77895,5), (e222b39c-8b13-4865-a8ba-892bd8919b5b,5), (72b227ad-aaad-4c6c-965f-d2ed3428aa20,5), (e5a2e03a-e650-446e-bff2-4845dc726278,5), (285278ad-affb-4b17-a442-3b83dae12576,5), (fd1471b0-1b95-465f-95c7-256ec3e65e26,5), (fd6b98aa-6662-4d30-b63d-683264bf4a3d,5)))
(17,List((6b53902b-93e2-4e08-be23-da9e12699dc5,7), (39b5df21-1c7f-4a8a-9766-84ee5f6954e0,6), (f632bbe4-e5ba-4dcf-91ef-89f9688376a9,6), (b21551c7-40e1-405c-81e2-d7caf53e6c07,5), (36003baf-6d64-49be-a047-9fc8c0e76ca8,5), (6501d135-cf17-4020-9852-51b6c3d5f1d7,5), (595f8ca5-7d03-4a32-956e-894821cd17f9,5), (5a841cd1-99ed-47fa-af9b-72183fd37b67,5), (7328ce83-2c98-4d39-a1bf-1af472bb8c68,5), (6bc0e641-e0ec-49ee-a41c-71913ab10373,5)))
(13,List((d4125ad2-454d-40b6-b0d6-769ff58a26b0,6), (23087483-70c1-4e5f-9d53-5f7020032929,6), (a550b6e4-6e9f-45f8-86ec-040919295efd,5), (7a20dbce-58a5-464b-86e6-7abef1cedd64,5), (0f0c33f2-a9c9-4264-befe-dce0d892526a,5), (70e2e082-08e6-4e14-bec6-6ac266e3bf90,5), (0031c47d-1bcf-43d9-ad6a-252339b47082,5), (8b9a0f5a-bff9-4f37-b3bb-6aa3c24d24f7,5), (fe75781f-0aa1-4cfd-a11f-128ee35707db,5), (9584d531-47b6-455c-9843-24e45f47939b,5)))
(14,List((6220a3ac-6474-4aba-ae86-85c4f3c65314,6), (6e37815c-76d5-4899-a85e-3cc91dc60529,6), (d87cf8b6-67ad-4883-9f1e-d4eb50d50158,6), (dd4b853e-9e00-40d1-a003-4e328561995c,6), (5c319dfc-0b9d-41a7-b20a-2a819145e888,5), (157a3539-83e2-4948-84df-265988a648fd,5), (36003baf-6d64-49be-a047-9fc8c0e76ca8,5), (7ec2504f-b5b6-45ed-ae16-c54ea59d4e47,5), (ce061e15-48d2-44c0-b923-efd7a704c513,5), (b2b38179-9918-4a67-b652-345af23f442d,5)))
(7,List((870e7817-4f91-4db8-a091-08c2945d4bed,6), (5024e7c8-3210-4d97-8b9a-55827b4a4273,5), (e39459f1-213d-4f98-91e8-ecb79093b9d0,5), (4674e216-220a-4cbe-b0c0-479ebe5a6a52,5), (c1d705bf-bfb2-4a19-94af-c32ea0d5885d,5), (83f0b201-158f-452e-8a4a-903de6ec9744,5), (8d912b31-76d4-461b-aa9f-26a574e9dcd0,4), (fce37a59-9b2a-4051-a207-01ae3503fa4f,4), (395525cf-eea1-4dcb-b718-89226f31184e,4), (839d451e-f58e-4a82-a394-963fa6e6212b,4)))
(5,List((c34d6503-b62b-4bab-9b85-7b0aefd8a5e0,6), (9a6b1b56-8b8b-45d0-b422-ebb45c5ff927,6), (a3c9bfa8-81de-4dc3-a1a2-c958eb3374e5,6), (9355860b-ce62-466f-8dfd-94b668ec7931,6), (d4125ad2-454d-40b6-b0d6-769ff58a26b0,5), (9dd9044d-e14b-454a-9cc4-d100bc56859a,5), (d540f8d1-4e5e-4fea-bb0e-f69c02cd338d,5), (ad553fdd-b68a-461a-b6c2-b7a5ded07da8,5), (e0c6ee82-27f8-4190-807c-a739662bc8f7,5), (088cae29-b669-4604-b110-e436883dcab7,4)))
(3,List((18834b31-772b-4675-a1f9-da1cf2e2de7e,5), (397d9b65-5a74-47a8-b583-515c30c7cd00,5), (a550b6e4-6e9f-45f8-86ec-040919295efd,4), (99913a44-56dd-4d27-b87d-4173b60562f6,4), (60bc8718-2e40-40d1-9062-6bc116f38d7f,4), (0eca6613-bb30-42a8-b1f3-4b0515c165e2,4), (ce061e15-48d2-44c0-b923-efd7a704c513,4), (58ea579d-fe43-4ddf-9fb7-212316cdfbd7,4), (e72fd989-370d-49b8-ab1e-6078b0dfa592,4), (7d0014a4-c501-456f-9d30-fe6af79d933c,4)))
(10,List((710373f5-3a2e-4fec-9ddb-623d779273e6,6), (8c2f5cc7-ebb2-45c3-a712-13471326c550,6), (e2e81c3a-839f-438f-b8bb-414c318baf33,5), (47084fc3-c314-4713-b7f0-928dbeddb2c3,5), (c9e0a14f-f6e5-4ea1-8841-f91ff88b1510,4), (593fbab5-d874-419d-b362-b5cd3d82f106,4), (fe13c870-bfcc-4e9c-8a2b-faa105b67aa5,4), (b6b10d25-6be7-452a-a266-f67c2d96556b,4), (65a90a47-d8cd-4df0-a4a8-4b66a9b209b4,4), (d2855c58-0cb4-4748-86a4-0abaabcccd8c,4)))

需求3:页面单跳转换率统计

需求说明

1)页面单跳转化率

计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。

比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。

image-20230319104116598

2)统计页面单跳转化率意义

​ 产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。

​ 数据分析师,可以此数据做更深一步的计算和分析。

​ 企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略或策略。

需求分析

功能实现

package test3.req

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_Req3_PageflowAnalysis {

  def main(args: Array[String]): Unit = {

    // TODO : Top10热门品类
    val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    val sc = new SparkContext(sparConf)

    val actionRDD = sc.textFile("datas/user_visit_action.csv")

    val actionDataRDD = actionRDD.map(
      action => {
        val datas = action.split(",")
        UserVisitAction(
          datas(0),
          datas(1).toLong,
          datas(2),
          datas(3).toLong,
          datas(4),
          datas(5),
          datas(6).toLong,
          datas(7).toLong,
          datas(8),
          datas(9),
          datas(10),
          datas(11),
          datas(12).toLong
        )
      }
    )
    actionDataRDD.cache()

    // TODO 对指定的页面连续跳转进行统计
    // 1-2,2-3,3-4,4-5,5-6,6-7
    val ids = List[Long](1, 2, 3, 4, 5, 6, 7)
    val okflowIds: List[(Long, Long)] = ids.zip(ids.tail)

    // TODO 计算分母
    val pageidToCountMap: Map[Long, Long] = actionDataRDD.filter(
      action => {
        ids.init.contains(action.page_id)
      }
    ).map(
      action => {
        (action.page_id, 1L)
      }
    ).reduceByKey(_ + _).collect().toMap

    // TODO 计算分子

    // 根据session进行分组
    val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)

    // 分组后,根据访问时间进行排序(升序)
    val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
      iter => {
        val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)

        // 【1,2,3,4】
        // 【1,2】,【2,3】,【3,4】
        // 【1-2,2-3,3-4】
        // Sliding : 滑窗
        // 【1,2,3,4】
        // 【2,3,4】
        // zip : 拉链
        val flowIds: List[Long] = sortList.map(_.page_id)
        val pageflowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)

        // 将不合法的页面跳转进行过滤
        pageflowIds.filter(
          t => {
            okflowIds.contains(t)
          }
        ).map(
          t => {
            (t, 1)
          }
        )
      }
    )
    // ((1,2),1)
    val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(_._2).flatMap(list => list)
    // ((1,2),1) => ((1,2),sum)
    val dataRDD = flatRDD.reduceByKey(_ + _)

    // TODO 计算单跳转换率
    // 分子除以分母
    dataRDD.foreach {
      case ((pageid1, pageid2), sum) => {
        val lon: Long = pageidToCountMap.getOrElse(pageid1, 0L)

        println(s"页面${pageid1}跳转到页面${pageid2}单跳转换率为:" + (sum.toDouble / lon))
      }
    }


    sc.stop()
  }

  //用户访问动作表
  case class UserVisitAction(
                              date: String, //用户点击行为的日期
                              user_id: Long, //用户的ID
                              session_id: String, //Session的ID
                              page_id: Long, //某个页面的ID
                              action_time: String, //动作的时间点
                              search_keyword: String, //用户搜索的关键词
                              click_category_id: Long, //某一个商品品类的ID
                              click_product_id: Long, //某一个商品的ID
                              order_category_ids: String, //一次订单中所有品类的ID集合
                              order_product_ids: String, //一次订单中所有商品的ID集合
                              pay_category_ids: String, //一次支付中所有品类的ID集合
                              pay_product_ids: String, //一次支付中所有商品的ID集合
                              city_id: Long
                            ) //城市 id
}

spark core编程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/610549.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Python SMTP/POP3/IMAP】零基础也能轻松掌握的学习路线与参考资料

Python是一种高级编程语言,广泛应用于Web开发、人工智能、数据科学、自动化等领域。SMTP/POP3/IMAP是与邮件相关的三个协议,分别用于发送邮件、接收邮件和访问邮件。使用Python可以轻松实现这些功能,本文将介绍Python SMTP/POP3/IMAP的学习路…

【Python】Python系列教程-- Python3 条件控制(十六)

文章目录 前言if 语句if 嵌套match...case 前言 往期回顾: Python系列教程–Python3介绍(一)Python系列教程–Python3 环境搭建(二)Python系列教程–Python3 VScode(三)Python系列教程–Pytho…

Spring Boot整合Swagger2 Swagger2配置

目录 什么是Swagger? Swagger如何使用 如何使用Swagger 查看SwaggerAPI文档 什么是Swagger? Swagger是一款流行的RESTful API文档生成工具,它支持多种编程语言和多种框架,包括但不限于Java、Python、Node.js、Go等,Spring Boot也提供了…

【机器学习】第二章:K近邻(分类)

系列文章目录 第二章:K近邻(分类) 相关代码地址:https://github.com/wzybmw888/MachineLearning.git 文章目录 系列文章目录一、最近邻算法二、最近邻算法的缺陷(1)策略一:K近邻(k‐…

java源码为什么需要编译成字节码?

作用1: jvm支持多语言,需要字节码作为统一的规范 作用2: 字节码转成机器的指令会更快 作用3: 如果没有对应的反编译器,字节码还具有一定的安全保密作用

【Rust日报】2023-06-02 Rust 1.70.0 稳定版发布

Rust 1.70.0 稳定版发布 Rust 团队很高兴地宣布 Rust 的新版本 1.70.0。Rust 是一种编程语言,它使每个人都能构建可靠、高效的软件。 最大的特性是,OnceCell稳定版可用啦。 如果你通过 rustup 安装了以前版本的 Rust,你可以通过以下方式获得 …

Linux 之大数据定制篇-Shell 编程

Linux 之大数据定制篇-Shell 编程 为什么要学习Shell 编程 Linux 运维工程师在进行服务器集群管理时,需要编写Shell 程序来进行服务器管理。对于JavaEE 和Python 程序员来说,工作的需要,你的老大会要求你编写一些Shell 脚本进行程序或者是服…

Mocha AE:图层相关面板

Mocha AE 左侧的图层面板、图层属性面板以及边缘属性面板提供了与图层、样条、跟踪等相关的选项。 Layers 图层 图层的上下顺序相当重要。 上方所有图层的样条区域将被自动排除出跟踪遮罩 Track Mattes。 也可在同一图层上绘制多个样条形状。相交的样条区域将被排除出遮罩。 Vi…

【MySQL】一文带你了解MySQL中的子查询

文章目录 1. 需求分析与问题解决1. 1实际问题1.2 子查询的基本使用1.3 子查询的分类 2. 单行子查询2.1 单行比较操作符2.2 代码示例2.3 HAVING 中的子查询2.4 注意的问题 3. 多行子查询3.1 多行比较操作符3.2 代码示例 4. 相关子查询4.1 相关子查询执行流程4.2 代码示例 子查询…

图论学习(六)

图的连通度 删去任意一条边后便不连通 删去任意一条边后仍连通,但删去点u后不连通。 G3和G4删去任意一条边或任意一个点后仍连通,但从直观上看,G4的连通程度比G3高。 割边 设e是图G的一条边,若ω(G-e)>ω(G),则…

uniapp微信一键登录微信授权

前言 现在小程序逐渐成为主流,常用的微信授权登录很重要很常见的一个功能,今天自己总结了一下。 准备工作 1.如果你想自己想试一下这个功能首先你需要有一个开发中的项目并且你在开发成员里面。 2.配置自己的微信开发者工具的appid码 3.在hbuilderx的…

echarts 如何实现图例单个数据项加上背景颜色和饼图中的背景图自适应

需求: 实现效果如下: ECharts中,可以通过设置legend中的formatter属性来自定义图例项的显示格式。以下是一个示例: option = {// ...legend: {data: [A, B, C],formatter: function (name) {var color = #fff;if (name === A) {color = #ff0000; // 设置A的背景颜色为红色…

如何使用ArcGIS计算容积率

字段计算 为建筑图层新建一个area字段,用于记录单层建筑的面积,如下图所示。 单层建筑面积 为建筑图层新建一个areaAll字段,用于记录总建筑面积,areaAllarea*floor,如下图所示。 计算总面积 为小区图层新建一个area…

chatgpt赋能python:Python大于0的SEO

Python大于0的SEO Python是一种高级编程语言,被广泛用于数据科学、机器学习、Web应用程序和网络爬虫等领域。Python大于0的SEO是指使用Python编写程序来优化网站的排名。在本文中,我们将介绍Python大于0的SEO的基础知识和一些实用技巧。 什么是Python大…

【redis基础】哨兵

hi,这里是redis系列文章,本篇是【redis基础】哨兵,上一篇链接:【redis】redis主从复制_努力努力再努力mlx的博客-CSDN博客 目录 概念 作用 如何使用哨兵(案例演示实战步骤) redis sentinel架构提前说明 重点参数…

【Java】Java(四十九):注解及自定义注解

文章目录 什么是注解?概述注解的作用自定义注解注解的定义格式带有属性的注解 注解的使用注解的使用格式 元注解元注解的作用:常用元注解: 注解解析 什么是注解? 注解(Annotation)也称为元数据,是一种代码级别的说明注…

数据库管理-第八十期 Exadata to RAC(x86) ADG(20230605)

数据库管理 2023-06-05 第八十期 Exadata to RAC(x86) ADG1 环境2 搭建流程2.1配置静态监听-主库2.2配置静态监听-备库2.3配置本地命名-主备库2.4数据库配置-主库2.5生成参数文件和密码文件-主库2.6创建目录并上传密码文件-备库2.7添加数据库服务-备库2.8修改参数文件-备库2.9复…

超级智能的治理

原文链接:https://openai.com/blog/governance-of-superintelligence#SamAltman 作者丨Sam Altman,Greg Brockman,Ilya Sutskever 译者 | Ted Liu 审校 | LsssY 编辑丨肖钰雯 现在是开始思考超级智能治理的好时机--未来的人工智能系统甚至比通…

基于SpringBoot+vue的租房网站设计与实现

博主介绍: 大家好,我是一名在Java圈混迹十余年的程序员,精通Java编程语言,同时也熟练掌握微信小程序、Python和Android等技术,能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架下…

Python中的Time和DateTime

Python在处理与时间相关的操作时有两个重要模块:time和datetime。在本文中,我们介绍这两个模块并为每个场景提供带有代码和输出的说明性示例。 time模块主要用于处理时间相关的操作,例如获取当前时间、时间的计算和格式化等。它提供了一些函数…