一、环境要求
Hadoop + hive + spark + hbase开发环境
开启hadoop:
start-all.sh
开启zookeeper:
zkServer.sh start
开启hive:
nohup hive --service metastore &
nohup hive --service hiveserver2 &
打开hive界面:
beeline -u jdbc:hive2://192.168.153.139:10000
开启hbase:
start-hbase.sh
打开hbase界面:
hbase shell
二、数据描述
ex_exam_record表结构如下:
三、功能要求
1、数据准备
yqRDD
.map(x=>("allWorld",x._2.toInt))
.reduceByKey(_+_)
.collect.foreach(println)
在HDFS中创建目录/app/data/exam,并将 countrydata.csv文件传到该目录。
[root@hadoop02 testdata]# hdfs dfs -put /opt/testdata/countrydata.csv /app/data/exam
2、在spark-shell中,加载HDFS文件系统countrydata.csv文件,并使用RDD完成以下统计计算。
①统计每个国家在数据截止统计时的累计确诊人数。
val fileRDD = sc.textFile("/app/data/exam/countrydata.csv")
val yqRDD = fileRDD
.map(x=>x.split(","))
.map(x=>(x(4),x(1)))
.groupByKey()
.map(x=>(x._1,x._2.toList.max))
.collect.foreach(println)
②统计全世界在数据截止统计时的总感染人数。
需要注意的是,这里的累计确诊人数、当日新增人数这两列的数据不一定对的上。比如存在治愈人数,会在累积确认人数里面减掉。
fileRDD
.map(x=>x.split(","))
.map(x=>(x(4),x(1).toInt))
.groupByKey()
.map(x => (x._1, x._2.toList.max))
.map(x => ("all world", x._2))
.reduceByKey(_+_)
.foreach(println)
③统计每个大洲中每日新增确诊人数最多的国家及确诊人数,并输出 20200408 这一天各大洲当日新增确诊人数最多的国家及确诊人数。
fileRDD
.map(x=>x.split(","))
.map(x=>((x(6),x(3)),(x(2),x(4))))
.reduceByKey((x,y)=>if(x._1>y._1) x else y)
.foreach(println)
fileRDD
.map(x=>x.split(","))
.map(x=>((x(6),x(3)),(x(2).toInt,x(4))))
.reduceByKey((x,y)=>if(x._1>y._1) x else y)
.filter(x=>x._1._2=="20200408")
.foreach(println)
④统计每个大洲中每日累计确诊人数最多的国家及确诊人数,并输出 20200607 这一天各大洲当日累计确诊人数最多的国家及确诊人数。
fileRDD
.map(x=>x.split(","))
.map(x=>((x(6),x(3)),(x(2).toInt,x(4))))
.reduceByKey((x,y)=>if(x._1>y._1) x else y)
.filter(x=>x._1._2=="20200607")
.foreach(println)
⑤ 统计每个大洲每月累计确诊人数,显示 202006 这个月每个大洲的累计确诊人数。
fileRDD
.map(x=>x.split(","))
.map(x=>((x(6),x(3).substring(0,6)),x(2).toInt))
.filter(x=>x._1._2=="202006")
.reduceByKey(_+_)
.foreach(println)
3、创建hbase数据表
hbase(main):004:0> create "exam:covid19_world","record"
4、请在 Hive 中创建数据库 exam
在该数据库中创建外部表 ex_exam_record 指向 /app/data/exam 下的疫情数据。
use exam;
create external table ex_exam_record(
id string,
confirmedCount int,
confirmedIncr int,
recordDate string,
countryName string,
countryShortCode string,
continent string
)
row format delimited fields terminated by ","
stored as textfile location "/app/data/exam"
创建外部表 ex_exam_covid19_record 映射至 HBase 中的 exam:covid19_world 表的 record 列族。
ex_exam_covid19_record 表结构如下:
create external table ex_exam_covid19_record(
key string,
maxIncreaseCountry string,
maxIncreaseCount int
)
stored by "org.apache.hadoop.hive.hbase.HBaseStorageHandler" with
serdeproperties ("hbase.columns.mapping"=":key,record:maxIncreaseCountry,record:maxIncreaseCount")
tblproperties ("hbase.table.name"="exam:covid19_world")
5、使用ex_exam_record表中的数据
insert into ex_spu_hbase(
select concat(continent,'-',recordDate) as rowkey,countryName,confirmedIncr from(
select recordDate,continent,countryName ,confirmedIncr,row_number() over (partition by recordDate order by confirmedIncr desc) as rk from ex_exam_record
) t where t.rk=='1')
hbase(main):006:0> scan 'exam:covid19_world'