一、环境要求
Hadoop+Hive+Spark+HBase 开发环境。
二、数据描述
本数据集包含了2017-09-11至2017-12-03之间有行为的约5458位随机用户的所有行为(行为包括点击、购买、加购、喜欢)。数据集的每一行表示一条用户行为,由用户ID、商品ID、商品类目ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下具体字段说明如下:
列名称 | 列中文名称 | 说明 |
user_id | 用户 ID | 整数类型,序列化后的用户 ID |
item_id | 商品 ID | 整数类型,序列化后的商品 ID |
category_id | 商品类目 ID | 整数类型,序列化后的商品所属类目 ID |
behavior_type | 行为类型 | 字符串,枚举类型,包括 ('pv', 'buy', 'cart', 'fav') |
time | 时间戳 | 行为发生的时间戳 |
用户行为类型共有四种,它们分别是:
行为类型 | 说明 |
pv | 商品详情页 pv,等价于点击 |
buy | 商品购买 |
cart | 将商品加入购物车 |
fav | 收藏商品 |
三、功能要求
1.数据准备
(1)在 HDFS 中创建目录/data/userbehavior,并将 UserBehavior.csv 文件传到该目录。
[root@kb135 examdata]# hdfs dfs -mkdir -p /data/userbehavior
[root@kb135 examdata]# hdfs dfs -put ./UserBehavior.csv /data/userbehavior
(2)通过 HDFS 命令查询出文档有多少行数据。
[root@kb135 examdata]# hdfs dfs -cat /data/userbehavior/UserBehavior.csv | wc -l
2.数据清洗
(1)在 Hive 中创建数据库 exam
hive (default)> create database exam;
hive (default)> use exam;
(2)在 exam 数据库中创建外部表 userbehavior,并将 HDFS 数据映射到表中
create external table userbehavior(
user_id int,
item_id int,
category_id int,
behavior_type string,
`time` bigint
)
row format delimited fields terminated by ","
stored as textfile location '/data/userbehavior';
(3)在 HBase 中创建命名空间 exam,并在命名空间 exam 创建 userbehavior 表,包
含一个列簇 info
hbase(main):002:0> create_namespace 'exam202010'
hbase(main):003:0> create 'exam202010:userbehavior','info'
(4)在 Hive 中创建外部表 userbehavior_hbase,并映射到 HBase 中,并将数
据加载到 HBase 中
create external table userbehavior_hbase(
user_id int,
item_id int,
category_id int,
behavior_type string,
`time` bigint
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with
serdeproperties("hbase.columns.mapping"=":key,info:item_id,info:category_id,info:behavior_type,info:time")
tblproperties ("hbase.table.name"="exam202010:userbehavior");
insert into userbehavior_hbase select * from userbehavior;
(5)在 exam 数据库中创建内部分区表 userbehavior_partitioned(按照日期进行分区),
并通过查询 userbehavior 表将时间戳格式化为”年-月-日时:分:秒”格式,将数据插入至 userbehavior_partitioned 表中,例如下图:
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
insert into table userbehavior_partition partition (dt)
select user_id,item_id,category_id,behavior_type,
from_unixtime(`time`) as `time`,
from_unixtime(`time`,'yyyy-MM-dd') dt
from userbehavior;
3.用户行为分析
使用 Spark,加载 HDFS 文件系统 UserBehavior.csv 文件,并分别使用 RDD 完成以下分析。
加载文件:
scala> val fileRdd = sc.textFile("hdfs://kb135:9000/data/userbehavior")
(1)统计 uv 值(一共有多少用户访问淘宝)
scala> fileRdd.map(x=>x.split(","))
.filter(_.length==5)
.map(x=>x(0))
.distinct().count
res1: Long = 5458
scala> fileRdd.map(x=>x.split(","))
.filter(_.length==5)
.groupBy(x=>x(0)).count
res2: Long = 5458
(2)分别统计浏览行为为点击,收藏,加入购物车,购买的总数量
scala> fileRdd.map(x=>x.split(","))
.filter(_.length==5)
.map(x=>(x(3),1))
.reduceByKey(_+_)
.collect.foreach(println)
(cart,30888)
(buy,11508)
(pv,503881)
(fav,15017)
scala> fileRdd.map(x=>x.split(","))
.filter(_.length==5)
.map(x=>(x(3),1))
.groupByKey()
.map(x=>(x._1,x._2.toList.size))
.collect.foreach(println)
(cart,30888)
(buy,11508)
(pv,503881)
(fav,15017)
4.找出有价值的用户
(1)使用 SparkSQL 统计用户最近购买时间。以 2017-12-03 为当前日期,计算时间范围为一个月,计算用户最近购买时间,时间的区间为 0-30 天,将其分为 5 档,0-6 天,7-12 天,13-18 天,19-24 天,25-30 天分别对应评分 4 到 0
with
tb as
(select user_id,
datediff('2017-12-03',max(dt)) as diff,
max(dt)
from userbehavior_partition
where dt>'2017-11-03' and behavior_type='buy'
group by user_id),
tb2 as
(select user_id,
(case when diff between 0 and 6 then 4
when diff between 7 and 12 then 3
when diff between 13 and 18 then 2
when diff between 19 and 24 then 1
when diff between 25 and 30 then 0
else null end ) tag
from tb)
select * from tb2 where tag=3;
(2)使用 SparkSQL 统计用户的消费频率。以 2017-12-03 为当前日期,计算时间范围为一个月,计算用户的消费次数,用户中消费次数从低到高为 1-161 次,将其分为 5 档,1-32,33-64,65-96,97-128,129-161 分别对应评分 0 到 4
with
tb as
(select user_id,
count(user_id) as num from userbehavior_partition
where dt between '2017-11-03' and '2017-12-03' and behavior_type='buy'
group by user_id)
select
user_id,
(case when num between 129 and 161 then 4
when num between 97 and 128 then 3
when num between 65 and 96 then 2
when num between 33 and 64 then 1
when num between 1 and 32 then 0
else null end) tag
from tb;