一、环境要求
Hadoop + hive + spark + hbase开发环境
开启hadoop:
start-all.sh
开启zookeeper:
zkServer.sh start
开启hive:
nohup hive --service metastore &
nohup hive --service hiveserver2 &
打开hive界面:
beeline -u jdbc:hive2://192.168.153.139:10000
开启hbase:
start-hbase.sh
打开hbase界面:
hbase shell
二、数据描述
meituan_waimai_meishi.csv 是美团外卖平台的部分外卖SPU(Standard Product Unit,标准产品单元)数据,包含了外卖平台某地区某一时间的外卖信息。具体字段说明如下:
表1:美团外卖SPU商品数据字典
三、功能要求
1、数据准备
在HDFS中创建目录/app/data/exam,并将 meituan_waimai_meishi.csv文件传到该目录。并通过HDFS命令查询出文档有多少行数据。
1、hdfs中创建目录/app/data/exam
[root@hadoop02 ~]# hdfs dfs -mkdir -p /app/data/exam
2、上传文件
[root@hadoop02 ~]# hdfs dfs -put /opt/testdata/meituan_waimai_meishi.csv /app/data/exam
3、查询出文档有多少行数据
[root@hadoop02 ~]# hdfs dfs -cat /app/data/exam/meituan_waimai_meishi.csv | wc -l
2、使用Spark加载HDFS文件系统 meituan_waimai_meishi.csv文件。分别使用RDD和SparkSQL完成以下分析(不用考虑数据去重)
①统计每个店铺分别有多少商品(SPU)
②统计每个店铺的总销售额
③统计每个店铺销售额最高的前三个商品。输出内容包括店铺名、商品名和销售额,其中销售额为0 的商品不进行统计计算。例如:如果某个店铺销售为0,则不进行统计。
加载文件:
[root@hadoop02 ~]# spark-shell
RDD:
scala> val fileRDD =sc.textFile("/app/data/exam/meituan_waimai_meishi.csv").filter(x=>x.startsWith("spu_id")==false).map(x=>x.split(",",-1)).filter(x=>x.size==12)
1、统计每个店铺分别有多少商品(SPU)
scala> fileRDD.map(x=>(x(2),1)).reduceByKey(_+_).collect.foreach(println)
2、统计每个店铺的总销售额
scala> fileRDD.map(x=>(x(2),x(5).toDouble*x(7).toInt)).reduceByKey(_+_).collect.foreach(println)
3、统计每个店铺销售额最高的前三个商品
scala> fileRDD.map(x=>(x(2),x(4),x(5).toDouble*x(7).toInt)).filter(x=>x._3!=0).groupBy(x=>x._1).mapValues(v=>v.toList.sortBy(x=>0-x._3).take(3)).flatMap(x=>x._2).collect.foreach(println)
SparkSQL:
scala> val fileRDD2 = spark.read.format("csv").option("header",true).option("inferSchema",true).load("/app/data/exam/meituan_waimai_meishi.csv")
scala> fileRDD2.createOrReplaceTempView("tbl")
1、统计每个店铺分别有多少商品(SPU)
scala> spark.sql("select shop_name,count(*) from tbl group by shop_name").show
2、统计每个店铺的总销售额
scala> spark.sql("select shop_name,sum(spu_price*month_sales) as money from tbl group by shop_name").show
3、统计每个店铺销售额最高的前三个商品
scala> spark.sql("select shop_name, spu_price*month_sales as sales from (select shop_name,spu_price,month_sales,row_number() over(partition by shop_name order by spu_price*month_sales desc) as rk from tbl where month_sales!=0 )as table where table.rk<=3").show(100)
RDD:
SparkSQL:
3、创建HBase数据表
在hbase中创建命名空间(namespace)exam,在该命名空间下创建spu表,该表下有一个列族result。
进入hbase:
[root@hadoop02 ~]# hbase shell
创建exam下的spu表,spu表有一个列族
hbase(main):007:0> create 'exam:spu','result'
4、在hive中创建数据库spu_db,在该数据库中创建外部表ex_spu指向/app/data/exam下的测试数据;
创建外部表ex_spu_hbase,映射至HBase中的exam:spu表中的result列族
create database spu_db;
use spu_db;
create 'exam:spu','result'
创建hive表
create external table ex_spu(
spu_id string,
shop_id string,
shop_name string,
category_name string,
spu_name string,
spu_price double,
spu_originprice double,
month_sales int,
praise_num int,
spu_unit string,
spu_desc string,
spu_image string
)
row format delimited fields terminated by ","
stored as textfile location "/app/data/exam"
tblproperties ("skip.header.line.count"="1")
创建hbase表
create external table ex_spu_hbase (
key string,
sales double,
praise int
)
stored by "org.apache.hadoop.hive.hbase.HBaseStorageHandler"
with serdeproperties ("hbase.columns.mapping"=":key,result:sales,result:praise")
tblproperties ("hbase.table.name"="exam:spu")
5、统计查询
①统计每个店铺的总销售额sales,店铺的商品总点赞数praise,并将shop_id和shop_name的作何作为RowKey,并将结果映射到HBase。
②完成统计后,分别在hive和HBase中查询结果数据。
第一步:统计数据
select concat(shop_id,shop_name) key,sum(spu_price*month_sales) as sales,sum(praise_num) as praise from ex_spu group by shop_id,shop_name
第二步:映射hbase
insert into ex_spu_hbase
(select concat(shop_id,shop_name) key,sum(spu_price*month_sales) as sales,sum(praise_num) as praise from ex_spu group by shop_id,shop_name );