文章目录
- sparkRDD编程实战
- 1、Spark RDD 实现单词计数
- 2、Spark RDD 实现分组求TopN
- 3、Spark RDD 实现二次排序
- 4、Spark RDD 计算平均成绩
- 5、Spark RDD 倒排索引统计每日新增用户
- 6、Spark案例实操
- 7、Spark RDD 综合应用
- 需求1:Top10热门品类
- 需求说明
- 实现方案一
- 实现方案二
- 实现方案三
- 实现方案四
- 需求2:Top10热门品类中每个品类的Top10活跃Session统计
- 需求3:页面单跳转换率统计
sparkRDD编程实战
1、Spark RDD 实现单词计数
实现思路:
先使用flatmap对一行的单词进行切分,然后构造成为二元组(单词,计数),然后按照单词进行聚合。
代码如下:
RDD实现:
package test3.wordcount
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 思路;
* 1. flapMap(_.split(" "))
* 2. map(x => (x, 1))
* 3. reduceByKey(_ + _)
*/
object wordCount {
def main(args: Array[String]): Unit = {
// 环境准备
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
// 读取数据
val rdd: RDD[String] = sc.textFile("datas/1.txt")
val word = rdd
.flatMap(x => x.split(" ")) //单词切分
.map((_, 1)) //构成二元组
.reduceByKey(_ + _) //根据key聚合
// 输出结果
word.collect().foreach(println)
}
}
2、Spark RDD 实现分组求TopN
分组求TopN是大数据领域常见的需求,主要是根据数据的某一列进行分组,然后将分组后的每一组数据按照指定的列进行排序,最后取每一组的前N行数据。
有以下学生成绩数据,grade.txt
Andy,98
Jack,87
Bill,99
Andy,78
Jack,85
Bill,86
Andy,90
Jack,88
Bill,76
Andy,58
Jack,67
Bill,79
同一个学生有多门成绩,现需要计算每个学生分数最高的前3个成绩,期望的输出结果如下:
实现思路:
因为每一行为一条数据,所以先构成(姓名,成绩)二元组,然后根据姓名进行分组,对组内数据按照降序排列,取前3个,最后按照输出语句打印结果。
代码实现:
Spark RDD实现:
package test3.topn
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object getTopN {
def main(args: Array[String]): Unit = {
// 环境准备
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TopN")
val sc = new SparkContext(sparkConf)
// 读取数据
val rdd: RDD[String] = sc.textFile("datas/grade.txt")
rdd
.map(line => {
val fields = line.split(",") //每一行按照,进行切分
(fields(0), fields(1)) //返回(姓名,成绩)二元组
})
.groupBy(_._1) //根据姓名进行分组,(Andy,CompactBuffer((Andy,98), (Andy,78), (Andy,90), (Andy,58)))
.mapValues(x => { //((Andy,98), (Andy,78), (Andy,90), (Andy,58))
x.map(_._2).toList.sortWith(_ > _).take(3) //根据值进行排序,取前三个
})
.collect().foreach( //打印成绩
x => {
println("姓名:" + x._1)
x._2.foreach(y => {
println("成绩:" + y)
})
println("********************")
})
}
}
3、Spark RDD 实现二次排序
二次排序是指对需要排序的元素首先按照第一个字段进行排序,若第一个字段相等,则按照第二个字段排序。例如,文件sort.txt中有以下内容:
6 7
5 8
2 9
7 5
4 3
8 3
2 7
6 1
首先按照第一个字段升序排列,若第一个字段相等,则按照第二个字段降序排列,期望的输出结果如下:
2 9
2 7
4 3
5 8
6 7
6 1
7 5
8 3
实现思路:
先对数据进行切分构成二元组,根据第一个数字进行分组,按照第二个数字进行组内排序,然后按照第一个数据进行排序,最后打印输出。
代码实现:
package test3.twiceorder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object twiceOrder {
def main(args: Array[String]): Unit = {
//环境准备
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TwoOrder")
val sc = new SparkContext(sparkConf)
//读取数据
val rdd: RDD[String] = sc.textFile("datas/sort.txt")
rdd
.map( //构建二元组
line => {
val fields = line.split(" ")
(fields(0), fields(1))
}
)
.groupBy(_._1) //根据第一个数分组,(6,CompactBuffer((6,7), (6,1)))
.mapValues(x => {
x.toList.sortWith(_._2 > _._2) //降序,(6,List((6,7), (6,1)))
})
.sortByKey() //默认为升序
.collect().foreach(_._2.foreach(println))
}
}
输出结果为:
4、Spark RDD 计算平均成绩
对输入文件中的学生3科成绩进行计算,得出每个学生的平均成绩。输入文件中的每行内容均为一个学生的姓名和其相应的成绩,每门学科为一个文件。要求输出结果中每行有两列数据,其中第一列代表学生的姓名,第二列代表其平均成绩。输入的3个文件内容如下:
math.txt文件内容如下:
张三 88
李四 99
王五 66
赵六 77
chinese.txt文件内容如下:
张三 78
李四 89
王五 96
赵六 67
english.txt文件内容如下:
张三 80
李四 82
王五 84
赵六 86
期望输出结果如下:
张三 82
李四 90
王五 82
赵六 76
实现思路:
首先根据数据切分成为二元组,然后将数据按照姓名进行聚合,再进行遍历得到平均成绩,最后输出结果。
代码实现:
package test3.getavg
import org.apache.spark.{SparkConf, SparkContext}
object getAvg {
def main(args: Array[String]): Unit = {
//环境准备
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ScoreAvg")
val sc = new SparkContext(sparkConf)
//读取文件
val math = sc.textFile("datas/math.txt")
val chinese = sc.textFile("datas/chinese.txt")
val english = sc.textFile("datas/english.txt")
//数据合并
val score = math.union(chinese).union(english)
score
.map(line => {
val fields = line.split(" ") //两个空格
(fields(0), fields(1).toInt)
})
.reduceByKey(_ + _)
.map(x => {
val name = x._1
val avg = (x._2 / 3).toString.format("%.2f")
(name, avg)
})
.collect().foreach(y => {
println(y._1 + " " + y._2)
})
}
}
5、Spark RDD 倒排索引统计每日新增用户
已知有以下用户访问历史数据,第一列为用户访问网站的日期,第二列为用户名:
2020-01-01,user1
2020-01-01,user2
2020-01-01,user3
2020-01-02,user1
2020-01-02,user2
2020-01-02,user4
2020-01-03,user2
2020-01-03,user5
2020-01-03,user6
现需要根据上述数据统计每日新增的用户数量,期望的统计结果为:
2020-01-01,3
2020-01-02,1
2020-01-03,2
即2020-01-01新增了3个用户(分别为user1、user2、user3),2020-01-02新增了1个用户(user4),2020-01-03新增了两个用户(分别为user5、user6)。
实现思路:
由于一行为一条记录,先对数据进行切分构成二元组(时间,用户),然后按照用户进行分组,得到分组后的数据,取第一条数据为该用户第一次出现的数据,然后按照时间进行分组,最后输出结果。
最重要的是要理解每日新增用户,即该用户在改天第一次出现。
代码实现:
package test3
import org.apache.spark.{SparkConf, SparkContext}
object NewVisiter {
def main(args: Array[String]): Unit = {
//环境准备
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ScoreAvg")
val sc = new SparkContext(sparkConf)
//读取数据
val rdd = sc.textFile("datas/date_user.txt")
rdd.map(line => {
val split = line.split(",")
(split(0), split(1))
})
.groupBy(_._2) //根据用户分组
.map(x => {
val first = x._2.toList(0)
first
}) //取第一次出现的日期
.groupByKey() //根据日期分组
.sortByKey() //对日期排序
.collect().foreach(y => { //打印输出
println(y._1 + "," + y._2.size)
})
}
}
输出结果为:
6、Spark案例实操
1)数据准备
agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔。
2)需求描述
统计出每一个省份每个广告被点击数量排行的 Top3
3) 思路分析
首先我们先对原始数据进行结构的转换,只需要省份和广告即可,接着我们按照(省份,广告)进行聚合,然后按照省份进行分组,再进行组内排序,取前三个,最后打印输出。
4)代码实现
package test3.topn
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object getTop3 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// TODO 案例实操
// 1. 获取原始数据:时间戳,省份,城市,用户,广告
val dataRDD = sc.textFile("datas/agent.log")
// 2. 将原始数据进行结构的转换。方便统计
// 时间戳,省份,城市,用户,广告 => ((省份,广告),1)
val mapRDD = dataRDD.map(
line => {
val datas = line.split(" ")
((datas(1), datas(4)), 1)
}
)
// 3. 将转换结构后的数据,进行分组聚合
// ( ( 省份,广告 ), 1 ) => ( ( 省份,广告 ), sum )
val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)
// 4. 将聚合的结果进行结构的转换
// ( ( 省份,广告 ), sum ) => ( 省份, ( 广告, sum ) )
val newMapRDD = reduceRDD.map {
case ((prv, ad), sum) => {
(prv, (ad, sum))
}
}
// 5. 将转换结构后的数据根据省份进行分组
// ( 省份, 【( 广告A, sumA ),( 广告B, sumB )】 )
val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()
// 6. 将分组后的数据组内排序(降序),取前3名
val resultRDD = groupRDD.mapValues(
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
}
)
// 7. 采集数据打印在控制台
resultRDD.collect().foreach(println)
sc.stop()
}
}
输出结果为:
7、Spark RDD 综合应用
上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的4种行为:搜索,点击,下单,支付。数据规则如下:
- 数据文件中每行数据采用下划线分隔数据
- 每一行数据表示用户的一次行为,这个行为只能是4种行为的一种
- 如果搜索关键字为null,表示数据不是搜索数据
- 如果点击的品类ID和产品ID为-1,表示数据不是点击数据
- 针对于下单行为,一次可以下单多个商品,所以品类ID和产品ID可以是多个,id之间采用逗号分隔,如果本次不是下单行为,则数据采用null表示
- 支付行为和下单行为类似
详细字段说明:
编号 | 字段名称 | 字段类型 | 字段含义 |
---|---|---|---|
1 | date | String | 用户点击行为的日期 |
2 | user_id | Long | 用户的ID |
3 | session_id | String | Session的ID |
4 | page_id | Long | 某个页面的ID |
5 | action_time | String | 动作的时间点 |
6 | search_keyword | String | 用户搜索的关键词 |
7 | click_category_id | Long | 某一个商品品类的ID |
8 | click_product_id | Long | 某一个商品的ID |
9 | order_category_ids | String | 一次订单中所有品类的ID集合 |
10 | order_product_ids | String | 一次订单中所有商品的ID集合 |
11 | pay_category_ids | String | 一次支付中所有品类的ID集合 |
12 | pay_product_ids | String | 一次支付中所有商品的ID集合 |
13 | city_id | Long | 城市 id |
样例类:
//用户访问动作表
case class UserVisitAction(
date: String,//用户点击行为的日期
user_id: Long,//用户的ID
session_id: String,//Session的ID
page_id: Long,//某个页面的ID
action_time: String,//动作的时间点
search_keyword: String,//用户搜索的关键词
click_category_id: Long,//某一个商品品类的ID
click_product_id: Long,//某一个商品的ID
order_category_ids: String,//一次订单中所有品类的ID集合
order_product_ids: String,//一次订单中所有商品的ID集合
pay_category_ids: String,//一次支付中所有品类的ID集合
pay_product_ids: String,//一次支付中所有商品的ID集合
city_id: Long
)//城市 id
需求1:Top10热门品类
需求说明
品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量来统计热门品类。
鞋 点击数 下单数 支付数
衣服 点击数 下单数 支付数
电脑 点击数 下单数 支付数
例如,综合排名 = 点击数20%+下单数30%+支付数*50%
本项目需求优化为:先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数。
实现方案一
需求分析
分别统计每个品类点击的次数,下单的次数和支付的次数:(品类,点击总数)(品类,下单总数)(品类,支付总数)
需求实现
package test3.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_Req1_HotCategoryTop10Analysis {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
// 1. 读取原始日志数据
val actionRDD = sc.textFile("datas/user_visit_action.txt")
// 2. 统计品类的点击数量:(品类ID,点击数量)
val clickActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(6) != "-1" //过滤不是点击的品类ID
}
)
val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
action => {
val datas = action.split("_")
(datas(6), 1)
}
).reduceByKey(_ + _)
// 3. 统计品类的下单数量:(品类ID,下单数量)
val orderActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(8) != "null"
}
)
// orderid => 1,2,3 进行扁平化操作
// 【(1,1),(2,1),(3,1)】
val orderCountRDD = orderActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
// 4. 统计品类的支付数量:(品类ID,支付数量)
val payActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(10) != "null"
}
)
// orderid => 1,2,3
// 【(1,1),(2,1),(3,1)】
val payCountRDD = payActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
// 5. 将品类进行排序,并且取前10名
// 点击数量排序,下单数量排序,支付数量排序
// 元组排序:先比较第一个,再比较第二个,再比较第三个,依此类推
// ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )
//
// cogroup = connect + group
val cogroupRDD: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] =
clickCountRDD.cogroup(orderCountRDD, payCountRDD)
val analysisRDD = cogroupRDD.mapValues {
case (clickIter, orderIter, payIter) => {
var clickCnt = 0
val iter1 = clickIter.iterator
if (iter1.hasNext) {
clickCnt = iter1.next()
}
var orderCnt = 0
val iter2 = orderIter.iterator
if (iter2.hasNext) {
orderCnt = iter2.next()
}
var payCnt = 0
val iter3 = payIter.iterator
if (iter3.hasNext) {
payCnt = iter3.next()
}
(clickCnt, orderCnt, payCnt)
}
}
// 根据元组进行降序排序
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
// 6. 将结果采集到控制台打印出来
resultRDD.foreach(println)
sc.stop()
}
}
输出结果为:
实现方案二
代码优化:
cogroup性能可能较低
actionRDD重复使用
package test3.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_Req1_HotCategoryTop10Analysis1 {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
// Q : actionRDD重复使用
// Q : cogroup性能可能较低
// 1. 读取原始日志数据
val actionRDD = sc.textFile("datas/user_visit_action.txt")
actionRDD.cache()
// 2. 统计品类的点击数量:(品类ID,点击数量)
val clickActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(6) != "-1"
}
)
val clickCountRDD: RDD[(String, Int)] = clickActionRDD.map(
action => {
val datas = action.split("_")
(datas(6), 1)
}
).reduceByKey(_ + _)
// 3. 统计品类的下单数量:(品类ID,下单数量)
val orderActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(8) != "null"
}
)
// orderid => 1,2,3
// 【(1,1),(2,1),(3,1)】
val orderCountRDD = orderActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(8)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
// 4. 统计品类的支付数量:(品类ID,支付数量)
val payActionRDD = actionRDD.filter(
action => {
val datas = action.split("_")
datas(10) != "null"
}
)
// orderid => 1,2,3
// 【(1,1),(2,1),(3,1)】
val payCountRDD = payActionRDD.flatMap(
action => {
val datas = action.split("_")
val cid = datas(10)
val cids = cid.split(",")
cids.map(id => (id, 1))
}
).reduceByKey(_ + _)
// (品类ID, 点击数量) => (品类ID, (点击数量, 0, 0))
// (品类ID, 下单数量) => (品类ID, (0, 下单数量, 0))
// => (品类ID, (点击数量, 下单数量, 0))
// (品类ID, 支付数量) => (品类ID, (0, 0, 支付数量))
// => (品类ID, (点击数量, 下单数量, 支付数量))
// ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )
// 5. 将品类进行排序,并且取前10名
// 点击数量排序,下单数量排序,支付数量排序
// 元组排序:先比较第一个,再比较第二个,再比较第三个,依此类推
// ( 品类ID, ( 点击数量, 下单数量, 支付数量 ) )
//
val rdd1 = clickCountRDD.map {
case (cid, cnt) => {
(cid, (cnt, 0, 0))
}
}
val rdd2 = orderCountRDD.map {
case (cid, cnt) => {
(cid, (0, cnt, 0))
}
}
val rdd3 = payCountRDD.map {
case (cid, cnt) => {
(cid, (0, 0, cnt))
}
}
// 将三个数据源合并在一起,统一进行聚合计算
val soruceRDD: RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)
val analysisRDD = soruceRDD.reduceByKey(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
// 6. 将结果采集到控制台打印出来
resultRDD.foreach(println)
sc.stop()
}
}
实现方案三
需求分析
存在大量的shuffle操作(reduceByKey)
一次性统计每个品类点击的次数,下单的次数和支付的次数:(品类,(点击总数,下单总数,支付总数))
需求实现
package test3.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_Req1_HotCategoryTop10Analysis2 {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
// Q : 存在大量的shuffle操作(reduceByKey)
// reduceByKey 聚合算子,spark会提供优化,缓存
// 1. 读取原始日志数据
val actionRDD = sc.textFile("datas/user_visit_action.csv")
// 2. 将数据转换结构
// 点击的场合 : ( 品类ID,( 1, 0, 0 ) )
// 下单的场合 : ( 品类ID,( 0, 1, 0 ) )
// 支付的场合 : ( 品类ID,( 0, 0, 1 ) )
val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
action => {
val datas = action.split(",")
if (datas(6) != "-1") {
// 点击的场合
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
// 下单的场合
val ids = datas(8).split("-")
ids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
// 支付的场合
val ids = datas(10).split("-")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
}
)
// 3. 将相同的品类ID的数据进行分组聚合
// ( 品类ID,( 点击数量, 下单数量, 支付数量 ) )
val analysisRDD = flatRDD.reduceByKey(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
// 4. 将统计结果根据数量进行降序处理,取前10名
val resultRDD = analysisRDD.sortBy(_._2, false).take(10)
// 5. 将结果采集到控制台打印出来
resultRDD.foreach(println)
sc.stop()
}
}
实现方案四
需求分析
使用累加器的方式聚合数据
需求实现
package test3.req
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable
object Spark04_Req1_HotCategoryTop10Analysis3 {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
// 1. 读取原始日志数据
val actionRDD = sc.textFile("datas/user_visit_action.csv")
val acc = new HotCategoryAccumulator
sc.register(acc, "hotCategory")
// 2. 将数据转换结构
actionRDD.foreach(
action => {
val datas = action.split(",")
if (datas(6) != "-1") {
// 点击的场合
acc.add((datas(6), "click"))
} else if (datas(8) != "null") {
// 下单的场合
val ids = datas(8).split("-")
ids.foreach(
id => {
acc.add((id, "order"))
}
)
} else if (datas(10) != "null") {
// 支付的场合
val ids = datas(10).split("-")
ids.foreach(
id => {
acc.add((id, "pay"))
}
)
}
}
)
val accVal: mutable.Map[String, HotCategory] = acc.value
val categories: mutable.Iterable[HotCategory] = accVal.map(_._2)
val sort = categories.toList.sortWith(
(left, right) => {
if (left.clickCnt > right.clickCnt) {
true
} else if (left.clickCnt == right.clickCnt) {
if (left.orderCnt > right.orderCnt) {
true
} else if (left.orderCnt == right.orderCnt) {
left.payCnt > right.payCnt
} else {
false
}
} else {
false
}
}
)
// 5. 将结果采集到控制台打印出来
sort.take(10).foreach(println)
sc.stop()
}
case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)
/**
* 自定义累加器
* 1. 继承AccumulatorV2,定义泛型
* IN : ( 品类ID, 行为类型 )
* OUT : mutable.Map[String, HotCategory]
* 2. 重写方法(6)
*/
class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {
private val hcMap = mutable.Map[String, HotCategory]()
override def isZero: Boolean = {
hcMap.isEmpty
}
override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
new HotCategoryAccumulator()
}
override def reset(): Unit = {
hcMap.clear()
}
override def add(v: (String, String)): Unit = {
val cid = v._1
val actionType = v._2
val category: HotCategory = hcMap.getOrElse(cid, HotCategory(cid, 0, 0, 0))
if (actionType == "click") {
category.clickCnt += 1
} else if (actionType == "order") {
category.orderCnt += 1
} else if (actionType == "pay") {
category.payCnt += 1
}
hcMap.update(cid, category)
}
override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
val map1 = this.hcMap
val map2 = other.value
map2.foreach {
case (cid, hc) => {
val category: HotCategory = map1.getOrElse(cid, HotCategory(cid, 0, 0, 0))
category.clickCnt += hc.clickCnt
category.orderCnt += hc.orderCnt
category.payCnt += hc.payCnt
map1.update(cid, category)
}
}
}
override def value: mutable.Map[String, HotCategory] = hcMap
}
}
需求2:Top10热门品类中每个品类的Top10活跃Session统计
需求说明
在需求一的基础上,增加每个品类用户session的点击统计
package test3.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_Req2_HotCategoryTop10SessionAnalysis {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
val actionRDD = sc.textFile("datas/user_visit_action.csv")
actionRDD.cache()
val top10Ids: Array[String] = top10Category(actionRDD)
// 1. 过滤原始数据,保留点击和前10品类ID
val filterActionRDD = actionRDD.filter(
action => {
val datas = action.split(",")
if (datas(6) != "-1") {
top10Ids.contains(datas(6))
} else {
false
}
}
)
// 2. 根据品类ID和sessionid进行点击量的统计
val reduceRDD: RDD[((String, String), Int)] = filterActionRDD.map(
action => {
val datas = action.split(",")
((datas(6), datas(2)), 1)
}
).reduceByKey(_ + _)
// 3. 将统计的结果进行结构的转换
// (( 品类ID,sessionId ),sum) => ( 品类ID,(sessionId, sum) )
val mapRDD = reduceRDD.map {
case ((cid, sid), sum) => {
(cid, (sid, sum))
}
}
// 4. 相同的品类进行分组
val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()
// 5. 将分组后的数据进行点击量的排序,取前10名
val resultRDD = groupRDD.mapValues(
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
}
)
resultRDD.collect().foreach(println)
sc.stop()
}
def top10Category(actionRDD: RDD[String]) = {
val flatRDD: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
action => {
val datas = action.split(",")
if (datas(6) != "-1") {
// 点击的场合
List((datas(6), (1, 0, 0)))
} else if (datas(8) != "null") {
// 下单的场合
val ids = datas(8).split("-")
ids.map(id => (id, (0, 1, 0)))
} else if (datas(10) != "null") {
// 支付的场合
val ids = datas(10).split("-")
ids.map(id => (id, (0, 0, 1)))
} else {
Nil
}
}
)
val analysisRDD = flatRDD.reduceByKey(
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2, t1._3 + t2._3)
}
)
analysisRDD.sortBy(_._2, false).take(10).map(_._1)
}
}
输出结果为:
(20,List((710373f5-3a2e-4fec-9ddb-623d779273e6,6), (f632bbe4-e5ba-4dcf-91ef-89f9688376a9,6), (7d0014a4-c501-456f-9d30-fe6af79d933c,6), (3208994d-5867-4c8f-841f-42700aeabfb3,6), (c34d6503-b62b-4bab-9b85-7b0aefd8a5e0,5), (78bc6698-ac26-4f1b-aaa0-05232f3322e5,5), (8b9a0f5a-bff9-4f37-b3bb-6aa3c24d24f7,5), (c9f87d6f-6a62-4e20-a2bb-aa0b690c134c,5), (7ea420e7-eaa4-42dd-b517-470656fb87b6,5), (1c5e83e4-370c-4163-a387-6d61a0233e24,5)))
(15,List((8b9a0f5a-bff9-4f37-b3bb-6aa3c24d24f7,8), (e01a941e-08e7-4fd6-aa77-eb10b5c2644b,8), (4c840047-6302-4e7d-8505-f5a0ac11da2f,5), (dd89c2b4-d6f1-4112-b705-6ce263abdfd4,5), (1a163c8d-3d58-4a76-8b28-d732bb51aec8,5), (0f0c33f2-a9c9-4264-befe-dce0d892526a,5), (f9936a33-3cad-4ac0-8153-77234c6cdb09,5), (57500425-a1d6-4dad-8ec4-224e443e567f,5), (75cf4d14-87f4-4e39-8050-c3d164a002bc,4), (b3fa6eb2-9028-46d0-9222-df5e649238bb,4)))
(2,List((36003baf-6d64-49be-a047-9fc8c0e76ca8,6), (0d10c830-c48c-418c-b51b-66d1130b4b57,5), (233e790a-1d96-4559-9205-471dc235161d,5), (1ecc15fb-85e4-4c0f-b68f-7351d9f77895,5), (e222b39c-8b13-4865-a8ba-892bd8919b5b,5), (72b227ad-aaad-4c6c-965f-d2ed3428aa20,5), (e5a2e03a-e650-446e-bff2-4845dc726278,5), (285278ad-affb-4b17-a442-3b83dae12576,5), (fd1471b0-1b95-465f-95c7-256ec3e65e26,5), (fd6b98aa-6662-4d30-b63d-683264bf4a3d,5)))
(17,List((6b53902b-93e2-4e08-be23-da9e12699dc5,7), (39b5df21-1c7f-4a8a-9766-84ee5f6954e0,6), (f632bbe4-e5ba-4dcf-91ef-89f9688376a9,6), (b21551c7-40e1-405c-81e2-d7caf53e6c07,5), (36003baf-6d64-49be-a047-9fc8c0e76ca8,5), (6501d135-cf17-4020-9852-51b6c3d5f1d7,5), (595f8ca5-7d03-4a32-956e-894821cd17f9,5), (5a841cd1-99ed-47fa-af9b-72183fd37b67,5), (7328ce83-2c98-4d39-a1bf-1af472bb8c68,5), (6bc0e641-e0ec-49ee-a41c-71913ab10373,5)))
(13,List((d4125ad2-454d-40b6-b0d6-769ff58a26b0,6), (23087483-70c1-4e5f-9d53-5f7020032929,6), (a550b6e4-6e9f-45f8-86ec-040919295efd,5), (7a20dbce-58a5-464b-86e6-7abef1cedd64,5), (0f0c33f2-a9c9-4264-befe-dce0d892526a,5), (70e2e082-08e6-4e14-bec6-6ac266e3bf90,5), (0031c47d-1bcf-43d9-ad6a-252339b47082,5), (8b9a0f5a-bff9-4f37-b3bb-6aa3c24d24f7,5), (fe75781f-0aa1-4cfd-a11f-128ee35707db,5), (9584d531-47b6-455c-9843-24e45f47939b,5)))
(14,List((6220a3ac-6474-4aba-ae86-85c4f3c65314,6), (6e37815c-76d5-4899-a85e-3cc91dc60529,6), (d87cf8b6-67ad-4883-9f1e-d4eb50d50158,6), (dd4b853e-9e00-40d1-a003-4e328561995c,6), (5c319dfc-0b9d-41a7-b20a-2a819145e888,5), (157a3539-83e2-4948-84df-265988a648fd,5), (36003baf-6d64-49be-a047-9fc8c0e76ca8,5), (7ec2504f-b5b6-45ed-ae16-c54ea59d4e47,5), (ce061e15-48d2-44c0-b923-efd7a704c513,5), (b2b38179-9918-4a67-b652-345af23f442d,5)))
(7,List((870e7817-4f91-4db8-a091-08c2945d4bed,6), (5024e7c8-3210-4d97-8b9a-55827b4a4273,5), (e39459f1-213d-4f98-91e8-ecb79093b9d0,5), (4674e216-220a-4cbe-b0c0-479ebe5a6a52,5), (c1d705bf-bfb2-4a19-94af-c32ea0d5885d,5), (83f0b201-158f-452e-8a4a-903de6ec9744,5), (8d912b31-76d4-461b-aa9f-26a574e9dcd0,4), (fce37a59-9b2a-4051-a207-01ae3503fa4f,4), (395525cf-eea1-4dcb-b718-89226f31184e,4), (839d451e-f58e-4a82-a394-963fa6e6212b,4)))
(5,List((c34d6503-b62b-4bab-9b85-7b0aefd8a5e0,6), (9a6b1b56-8b8b-45d0-b422-ebb45c5ff927,6), (a3c9bfa8-81de-4dc3-a1a2-c958eb3374e5,6), (9355860b-ce62-466f-8dfd-94b668ec7931,6), (d4125ad2-454d-40b6-b0d6-769ff58a26b0,5), (9dd9044d-e14b-454a-9cc4-d100bc56859a,5), (d540f8d1-4e5e-4fea-bb0e-f69c02cd338d,5), (ad553fdd-b68a-461a-b6c2-b7a5ded07da8,5), (e0c6ee82-27f8-4190-807c-a739662bc8f7,5), (088cae29-b669-4604-b110-e436883dcab7,4)))
(3,List((18834b31-772b-4675-a1f9-da1cf2e2de7e,5), (397d9b65-5a74-47a8-b583-515c30c7cd00,5), (a550b6e4-6e9f-45f8-86ec-040919295efd,4), (99913a44-56dd-4d27-b87d-4173b60562f6,4), (60bc8718-2e40-40d1-9062-6bc116f38d7f,4), (0eca6613-bb30-42a8-b1f3-4b0515c165e2,4), (ce061e15-48d2-44c0-b923-efd7a704c513,4), (58ea579d-fe43-4ddf-9fb7-212316cdfbd7,4), (e72fd989-370d-49b8-ab1e-6078b0dfa592,4), (7d0014a4-c501-456f-9d30-fe6af79d933c,4)))
(10,List((710373f5-3a2e-4fec-9ddb-623d779273e6,6), (8c2f5cc7-ebb2-45c3-a712-13471326c550,6), (e2e81c3a-839f-438f-b8bb-414c318baf33,5), (47084fc3-c314-4713-b7f0-928dbeddb2c3,5), (c9e0a14f-f6e5-4ea1-8841-f91ff88b1510,4), (593fbab5-d874-419d-b362-b5cd3d82f106,4), (fe13c870-bfcc-4e9c-8a2b-faa105b67aa5,4), (b6b10d25-6be7-452a-a266-f67c2d96556b,4), (65a90a47-d8cd-4df0-a4a8-4b66a9b209b4,4), (d2855c58-0cb4-4748-86a4-0abaabcccd8c,4)))
需求3:页面单跳转换率统计
需求说明
1)页面单跳转化率
计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。
比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。
2)统计页面单跳转化率意义
产品经理和运营总监,可以根据这个指标,去尝试分析,整个网站,产品,各个页面的表现怎么样,是不是需要去优化产品的布局;吸引用户最终可以进入最后的支付页面。
数据分析师,可以此数据做更深一步的计算和分析。
企业管理层,可以看到整个公司的网站,各个页面的之间的跳转的表现如何,可以适当调整公司的经营战略或策略。
需求分析
功能实现
package test3.req
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_Req3_PageflowAnalysis {
def main(args: Array[String]): Unit = {
// TODO : Top10热门品类
val sparConf = new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
val sc = new SparkContext(sparConf)
val actionRDD = sc.textFile("datas/user_visit_action.csv")
val actionDataRDD = actionRDD.map(
action => {
val datas = action.split(",")
UserVisitAction(
datas(0),
datas(1).toLong,
datas(2),
datas(3).toLong,
datas(4),
datas(5),
datas(6).toLong,
datas(7).toLong,
datas(8),
datas(9),
datas(10),
datas(11),
datas(12).toLong
)
}
)
actionDataRDD.cache()
// TODO 对指定的页面连续跳转进行统计
// 1-2,2-3,3-4,4-5,5-6,6-7
val ids = List[Long](1, 2, 3, 4, 5, 6, 7)
val okflowIds: List[(Long, Long)] = ids.zip(ids.tail)
// TODO 计算分母
val pageidToCountMap: Map[Long, Long] = actionDataRDD.filter(
action => {
ids.init.contains(action.page_id)
}
).map(
action => {
(action.page_id, 1L)
}
).reduceByKey(_ + _).collect().toMap
// TODO 计算分子
// 根据session进行分组
val sessionRDD: RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)
// 分组后,根据访问时间进行排序(升序)
val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(
iter => {
val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
// 【1,2,3,4】
// 【1,2】,【2,3】,【3,4】
// 【1-2,2-3,3-4】
// Sliding : 滑窗
// 【1,2,3,4】
// 【2,3,4】
// zip : 拉链
val flowIds: List[Long] = sortList.map(_.page_id)
val pageflowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)
// 将不合法的页面跳转进行过滤
pageflowIds.filter(
t => {
okflowIds.contains(t)
}
).map(
t => {
(t, 1)
}
)
}
)
// ((1,2),1)
val flatRDD: RDD[((Long, Long), Int)] = mvRDD.map(_._2).flatMap(list => list)
// ((1,2),1) => ((1,2),sum)
val dataRDD = flatRDD.reduceByKey(_ + _)
// TODO 计算单跳转换率
// 分子除以分母
dataRDD.foreach {
case ((pageid1, pageid2), sum) => {
val lon: Long = pageidToCountMap.getOrElse(pageid1, 0L)
println(s"页面${pageid1}跳转到页面${pageid2}单跳转换率为:" + (sum.toDouble / lon))
}
}
sc.stop()
}
//用户访问动作表
case class UserVisitAction(
date: String, //用户点击行为的日期
user_id: Long, //用户的ID
session_id: String, //Session的ID
page_id: Long, //某个页面的ID
action_time: String, //动作的时间点
search_keyword: String, //用户搜索的关键词
click_category_id: Long, //某一个商品品类的ID
click_product_id: Long, //某一个商品的ID
order_category_ids: String, //一次订单中所有品类的ID集合
order_product_ids: String, //一次订单中所有商品的ID集合
pay_category_ids: String, //一次支付中所有品类的ID集合
pay_product_ids: String, //一次支付中所有商品的ID集合
city_id: Long
) //城市 id
}
spark core编程