旅游集市数仓建设
小白如何从0到1成为大数据工程师
目录
旅游集市数仓建设
1.上传数据
2.可能用到的UDF函数
3.创建所需数据库及表
1)ODS层
①ods_oidd
②ods_wcdr
③ods_ddr
④ods_dpi
2)DWD层
①dwd_res_regn_mergelocation_msk_d
②dwm_staypoint_msk_d
③dws_province_tourist_msk_d
④dws_city_tourist_msk_d
⑤dws_county_tourist_msk_d
3)DIM层
①dim_usertag_msk_m
4)ADS层
1)需求矩阵
2)根据区县游客表计算如下指标
1.上传数据
cd /usr/local/soft/
mkdir ctyun/
cd ctyun/
pwd
2.可能用到的UDF函数
cd /usr/local/soft/
mkdir jars/
cd jars/
pwd
添加资源并注册函数
add jars /usr/local/soft/jars/jtxy_hdfs-1.0-SNAPSHOT.jar;
create temporary function get_points as 'ctyun.udf.getPointsUDF';
create temporary function dateBetweenUDF as 'ctyun.udf.dateBetweenUDF';
create temporary function calLength as 'ctyun.udf.calLength';
create temporary function get_city_or_prov_id as 'ctyun.udf.getCityIdOrProvID';
3.创建所需数据库及表
create database ods;
use ods;
1)ODS层
①ods_oidd
OIDD是采集A接口的信令数据,包括手机在发生业务时的位置信息。OIDD信令类型数据分为三大 类,呼叫记录、短信记录和用户位置更新记录。
CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_oidd(
mdn string comment '手机号码'
,start_time string comment '业务开始时间'
,county_id string comment '区县编码'
,longi string comment '经度'
,lati string comment '纬度'
,bsid string comment '基站标识'
,grid_id string comment '网格号'
,biz_type string comment '业务类型'
,event_type string comment '事件类型'
,data_source string comment '数据源'
)
comment 'oidd位置数据表'
PARTITIONED BY (
day_id string comment '天分区'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/data/tour/ods/ods_oidd';
// 添加分区
alter table ods.ods_oidd add partition(day_id=20180503);
// 加载数据
load data local inpath '/usr/local/soft/ctyun/ods_oidd/day_id=20180503/*' into table ods_oidd partition(day_id=20180503);
//查看数据
select * from ods.ods_oidd limit 10;
dfs -mkdir -p /data/tour/ods/ods_oidd;
dfs -ls /data/tour/ods/ods_oidd;
dfs -ls /data/tour/ods/;
dfs -rmr /data/tour/ods/ods_oidd;
②ods_wcdr
WCDR采集网络中ABIS接口的数据,基于业务发生过程中三个扇区的测量信息,通过三角定位法 确定用户的位置信息。
CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_wcdr (
mdn string comment '手机号码'
,start_time string comment '业务开始时间'
,county_id string comment '区县编码'
,longi string comment '经度'
,lati string comment '纬度'
,bsid string comment '基站标识'
,grid_id string comment '网格号'
,biz_type string comment '业务类型'
,event_type string comment '事件类型'
,data_source string comment '数据源'
)
comment 'wcdr位置数据表'
PARTITIONED BY (
day_id string comment '天分区'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/data/tour/ods/ods_wcdr';
// 添加分区
alter table ods.ods_wcdr add partition(day_id=20180503);
// 加载数据
load data local inpath '/usr/local/soft/ctyun/ods_wcdr/day_id=20180503/*' into
table ods_wcdr partition(day_id=20180503);
//查看数据
select * from ods.ods_wcdr limit 10;
③ods_ddr
当前DDR中只有移动数据详单可以提取基站标识,其他语音,短信,增值等业务没有位置信息, 不做为数据融合的基础数据。
CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_ddr(
mdn string comment '手机号码'
,start_time string comment '业务开始时间'
,county_id string comment '区县编码'
,longi string comment '经度'
,lati string comment '纬度'
,bsid string comment '基站标识'
,grid_id string comment '网格号'
,biz_type string comment '业务类型'
,event_type string comment '事件类型'
,data_source string comment '数据源'
)
comment 'ddr位置数据表'
PARTITIONED BY (
day_id string comment '天分区'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/data/tour/ods/ods_ddr';
// 添加分区
alter table ods.ods_ddr add partition(day_id=20180503);
// 加载数据
load data local inpath '/usr/local/soft/ctyun/ods_ddr/day_id=20180503/*' into
table ods_ddr partition(day_id=20180503);
// 查询数据
select * from ods.ods_ddr limit 10;
④ods_dpi
移动DPI数据采集用户移动用户数据上网时移动核心网和PDSN之间接口的数据。
CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_dpi(
mdn string comment '手机号码'
,start_time string comment '业务开始时间'
,county_id string comment '区县编码'
,longi string comment '经度'
,lati string comment '纬度'
,bsid string comment '基站标识'
,grid_id string comment '网格号'
,biz_type string comment '业务类型'
,event_type string comment '事件类型'
,data_source string comment '数据源'
)
comment 'dpi位置数据表'
PARTITIONED BY (
day_id string comment '天分区'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/data/tour/ods/ods_dpi';
// 添加分区
alter table ods.ods_dpi add partition(day_id=20180503);
// 加载数据
load data local inpath '/usr/local/soft/ctyun/ods_dpi/day_id=20180503/*' into
table ods_dpi partition(day_id=20180503);
// 查询数据
select * from ods.ods_dpi limit 10;
2)DWD层
create database dwd;
use dwd;
①dwd_res_regn_mergelocation_msk_d
在ODS层中,由于数据来源不同,原始位置数据被分成了好几张表加载到了我们的ODS层。 为了方便大家的使用,我们在DWD层做了一张位置数据融合表,在这里,我们将oidd、wcdr、 ddr、dpi位置数据汇聚到一张表里面,统一字段名,提升数据质量,这样就有了一张可供大家方 便使用的明细表了。
CREATE EXTERNAL TABLE IF NOT EXISTS dwd.dwd_res_regn_mergelocation_msk_d (
mdn string comment '手机号码'
,start_time string comment '业务开始时间'
,county_id string comment '区县编码'
,longi string comment '经度'
,lati string comment '纬度'
,bsid string comment '基站标识'
,grid_id string comment '网格号'
,biz_type string comment '业务类型'
,event_type string comment '事件类型'
,data_source string comment '数据源'
)
comment '位置数据融合表'
PARTITIONED BY (
day_id string comment '天分区'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS ORCFile
location '/data/tour/dwd/dwd_res_regn_mergelocation_msk_d';
// 添加分区
alter table dwd.dwd_res_regn_mergelocation_msk_d add partition(day_id=20180503);
手动下载数据
// hive 直接运行速度太慢,可用手动load/put文件方式
// 手动load
// 注意 上面的建表语句文件存储格式修改为了ORCFile 所以不能直接使用下面的load方法
load data local inpath '/usr/local/soft/ctyun/dwd_merge/part-00000*' into
table dwd.dwd_res_regn_mergelocation_msk_d partition(day_id=20180503);
// union all
insert into table dwd.dwd_res_regn_mergelocation_msk_d
partition(day_id="20180503")
select mdn
,start_time
,county_id
,longi
,lati
,bsid
,grid_id
,biz_type
,event_type
,data_source
from ods.ods_oidd
where day_id = "20180503"
union all
select mdn
,start_time
,county_id
,longi
,lati
,bsid
,grid_id
,biz_type
,event_type
,data_source
from ods.ods_wcdr
where day_id = "20180503"
union all
select mdn
,start_time
,county_id
,longi
,lati
,bsid
,grid_id
,biz_type
,event_type
,data_source
from ods.ods_dpi
where day_id = "20180503"
union all
select mdn
,start_time
,county_id
,longi
,lati
,bsid
,grid_id
,biz_type
,event_type
,data_source
from ods.ods_ddr
where day_id = "20180503";
②dwm_staypoint_msk_d
计算一个人在一个网格内的停留时间,按手机号,网格id,区县id分组
1、对所有时间进行排序
2、取第一个点的开始时间和最后一个点的结束时间
create database dwm;
use dwm;
CREATE EXTERNAL TABLE IF NOT EXISTS dwm.dwm_staypoint_msk_d (
mdn string comment '用户手机号码'
,longi string comment '网格中心点经度'
,lati string comment '网格中心点纬度'
,grid_id string comment '停留点所在电信内部网格号'
,county_id string comment '停留点区县'
,duration string comment '机主在停留点停留的时间长度(分钟),lTime-eTime'
,grid_first_time string comment '网格第一个记录位置点时间(秒级)'
,grid_last_time string comment '网格最后一个记录位置点时间(秒级)'
)
comment '停留点表'
PARTITIONED BY (
day_id string comment '天分区'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
location '/data/tour/dwm/dwm_staypoint_msk_d';
通过grid_id 网格id 获取 网格中心点经纬度 longi、lati
该SQL执行会出现问题: 执行流程一直处于 0% Map 0% reduce
/**
insert into table dwm.dwm_staypoint_msk_d partition(day_id=20180503)
select t1.mdn
,get_points(grid_id)[0] as longi
,get_points(grid_id)[1] as lati
,t1.grid_id
,t1.county_id
,dateBetweenUDF(t1.grid_first_time,t1.grid_last_time) as duration
,t1.grid_first_time
,t1.grid_last_time
from (
select mdn
,grid_id
,county_id
,min(split(start_time,',')[0]) as grid_first_time
,max(split(start_time,',')[1]) as grid_last_time
from dwd.dwd_res_regn_mergelocation_msk_d
where day_id="20180503"
group by mdn, grid_id, county_id
)t1;
*/
优化后的SQL:
WITH split_table as (
SELECT
mdn
,grid_id
,county_id
,split(start_time,',')[1] as grid_first_time
,split(start_time,',')[0] as grid_last_time
FROM dwd.dwd_res_regn_mergelocation_msk_d
where day_id="20180503"
)
, max_min_table as (
SELECT
mdn
,grid_id
,county_id
,Max(grid_first_time) OVER(PARTITION BY mdn,grid_id,county_id) as grid_first_time
,MIN(grid_last_time) OVER(PARTITION BY mdn,grid_id,county_id) as grid_last_time
FROM split_table
)
insert into table dwm.dwm_staypoint_msk_d partition(day_id=20180503)
SELECT
t1.mdn
,get_points(t1.grid_id)[0] as longi
,get_points(t1.grid_id)[1] as lati
,t1.grid_id
,t1.county_id
,dateBetweenUDF(t1.grid_first_time,t1.grid_last_time) as duration
,t1.grid_first_time
,t1.grid_last_time
FROM (
SELECT
mdn
,grid_id
,county_id
,grid_first_time
,grid_last_time
FROM max_min_table
group by
mdn
,grid_id
,county_id
,grid_first_time
,grid_last_time
) t1
③dws_province_tourist_msk_d
游客定义 出行距离大于300km 常住地在用户画像表中 在省内停留时间大于3个小时
create database dws;
use dws;
CREATE EXTERNAL TABLE IF NOT EXISTS dws.dws_province_tourist_msk_d (
mdn string comment '手机号大写MD5加密'
,source_county_id string comment '游客来源区县'
,d_province_id string comment '旅游目的地省代码'
,d_stay_time double comment '游客在该省停留的时间长度(小时)'
,d_max_distance double comment '游客本次出游距离'
)
comment '旅游应用专题数据省级别-天'
PARTITIONED BY (
day_id string comment '日分区'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS PARQUET
location '/data/tour/dws/dws_province_tourist_msk_d';
停留点表dwm_staypoint_msk_d与用户画像维表dim_usertag_msk_m 通过mdn关联,使用 get_city_or_prov_id(county_id,"province")方法,传入county_id,返回province_id,然后按 mdn、province_id、resi_county_id分组,使用calLength(grid_id, resi_grid_id) 传入网格id、居 住地网格id,算出出行距离,并计算每个用户到每个省的累计出行时间,然后取出 累计时间最大 值超过3小时(180分钟),出行距离大于300km的用户
④dws_city_tourist_msk_d
出行距离大于100km 在市内停留时间大于3个小时
CREATE EXTERNAL TABLE IF NOT EXISTS dws.dws_city_tourist_msk_d (
mdn string comment '手机号大写MD5加密'
,source_county_id string comment '游客来源区县'
,d_city_id string comment '旅游目的地市代码'
,d_stay_time double comment '游客在该省市停留的时间长度(小时)'
,d_max_distance double comment '游客本次出游距离'
)
comment '旅游应用专题数据城市级别-天'
PARTITIONED BY (
day_id string comment '日分区'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS PARQUET
location '/data/tour/dws/dws_city_tourist_msk_d';
停留点表dwm_staypoint_msk_d与用户画像维表dim_usertag_msk_m 通过mdn关联,使用 get_city_or_prov_id(county_id,"city")方法,传入county_id,返回city_id,然后按mdn、city_id、 resi_county_id分组,使用calLength(grid_id, resi_grid_id) 传入网格id、居住地网格id,算出出行 距离,并计算每个用户到每个市的累计出行时间,然后取出 累计时间最大值超过3小时(180分 钟),出行距离大于100km的用户
⑤dws_county_tourist_msk_d
出行距离大于10km 在县内停留时间大于3个小时
CREATE EXTERNAL TABLE IF NOT EXISTS dws.dws_county_tourist_msk_d (
mdn string comment '手机号大写MD5加密'
,source_county_id string comment '游客来源区县'
,d_county_id string comment '旅游目的地县代码'
,d_stay_time double comment '游客在该县停留的时间长度(小时)'
,d_max_distance double comment '游客本次出游距离'
)
comment '旅游应用专题数据县级别-天'
PARTITIONED BY (
day_id string comment '日分区'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS PARQUET
location '/data/tour/dws/dws_county_tourist_msk_d';
停留点表dwm_staypoint_msk_d与用户画像维表dim_usertag_msk_m 通过mdn关联,按mdn、 county_id、resi_county_id分组,使用calLength(grid_id, resi_grid_id) 传入网格id、居住地id, 算出出行距离,并计算每个用户到每个县的累计出行时间,然后取出 累计时间最大值超过3小时 (180分钟),出行距离大于10km的用户
运行SQL时报错 我们要创建一下UDF函数
add jars /usr/local/soft/jars/jtxy_hdfs-1.0-SNAPSHOT.jar;
create temporary function get_points as 'ctyun.udf.getPointsUDF';
create temporary function dateBetweenUDF as 'ctyun.udf.dateBetweenUDF';
create temporary function calLength as 'ctyun.udf.calLength';
create temporary function get_city_or_prov_id as 'ctyun.udf.getCityIdOrProvID';
insert into table dws.dws_county_tourist_msk_d partition(day_id="20180503")
select ttt1.mdn
,ttt1.source_county_id
,ttt1.d_county_id
,ttt1.d_stay_time
,ttt1.d_max_distance
from(
select mdn
,resi_county_id as source_county_id
,county_id as d_county_id
,sum(duration) as d_stay_time
,max(calLength(tt1.grid_id,tt1.resi_grid_id)) as d_max_distance
from(
select t1.mdn
,t1.grid_id
,t1.county_id
,t1.duration
,t2.resi_county_id
,t2.resi_grid_id
from (
select *
from dwm.dwm_staypoint_msk_d
where day_id='20180503'
) t1 join(
select *
from dim.dim_usertag_msk_m
where month_id='201805'
) t2 on t1.mdn = t2.mdn
) tt1 group by tt1.mdn,tt1.county_id,tt1.resi_county_id
)ttt1 where d_stay_time > 180 and d_max_distance > 10000
;
3)DIM层
create database dim;
use dim;
①dim_usertag_msk_m
CREATE EXTERNAL TABLE IF NOT EXISTS dim.dim_usertag_msk_m (
mdn string comment '手机号大写MD5加密'
,name string comment '姓名'
,gender string comment '性别,1男2女'
,age string comment '年龄'
,id_number string comment '证件号码'
,number_attr string comment '号码归属地'
,trmnl_brand string comment '终端品牌'
,trmnl_price string comment '终端价格'
,packg string comment '套餐'
,conpot string comment '消费潜力'
,resi_grid_id string comment '常住地网格'
,resi_county_id string comment '常住地区县'
)
comment '用户画像表'
PARTITIONED BY (
month_id string comment '月分区'
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS PARQUET
location '/data/tour/dim/dim_usertag_msk_m';
// 添加分区
alter table dim.dim_usertag_msk_m add partition(month_id=201805);
// 加载数据
load data local inpath
'/usr/local/soft/ctyun/dim_usertag_msk_m/month_id=201805/*' into table
dim.dim_usertag_msk_m partition(month_id=201805);
// 查询数据
select * from dim_usertag_msk_m limit 10;
4)ADS层
根据需求建设
1)需求矩阵
2)根据区县游客表计算如下指标
客流量按天 [区县id,客流量]
select t1.d_county_id
,count(*) as d_county_cnt
from (
select d_county_id
from dws.dws_county_tourist_msk_d
where t1.day_id="20180503"
) t1 group by t1.d_county_id;
性别按天 [区县id,性别,客流量]
select t1.d_county_id
,t2.gender
,count(*) as d_county_gender_cnt
from(
select mdn
,d_county_id
from dws.dws_county_tourist_msk_d
where day_id="20180503"
) t1 left join (
select mdn
,gender
from dim.dim_usertag_msk_m
where month_id=20180503
) t2 on t1.mdn = t2.mdn
group by t1.d_county_id,t2.gender;
年龄按天 [区县id,年龄,客流量]
常住地按天 [区县id,常住地市,客流量]
归属地按天 [区县id,归属地市,客流量]
select t1.d_county_id
,t2.number_attr
,count(*) as d_county_number_attr_cnt
from(
select mdn
,d_county_id
from dws.dws_county_tourist_msk_d
where day_id="20180503"
) t1 left join (
select mdn
,number_attr
from dim.dim_usertag_msk_m
where month_id=20180503
) t2 on t1.mdn = t2.mdn
group by t1.d_county_id,t2.number_attr;
终端型号按天 [区县id,终端型号,客流量]
消费等级按天 [区县id,消费等级,客流量]
停留时长按天 [区县id,停留时长,客流量]