文章目录
- 前言
- 一、群起集群
- 1. sc 脚本
- 2. cluster 脚本
- 3. myhadoop 脚本
- 4. zk.sh 脚本
- 5. kf.sh 脚本
- 6. f1.sh 脚本
- 7. f2.sh 脚本
- 二、简便使用脚本
- 1. xsync 脚本
- 2. jpsall 脚本
- 3. xcall.sh 脚本
- 4. lg.sh 脚本
- 三、数据传输相关脚本
- 1. mysql_to_hdfs.sh 脚本
- 2. hdfs_to_ods_db.sh 脚本
- 3. hdfs_to_ods_log.sh 脚本
- 4. ods_to_dwd_db.sh 脚本
- 5. ods_to_dwd_log.sh 脚本
- 6. dwd_to_dws.sh 脚本
- 7. dws_to_dwt.sh 脚本
- 8. dwt_to_ads.sh 脚本
前言
本文的19个自定义脚本是另一篇大数据电商数仓项目
https://blog.csdn.net/m0_48170265/article/details/130031285
所用到的一些自定义脚本,简化开发流程。
一、群起集群
1. sc 脚本
脚本篇幅少,但关联的相关内容最多。
一般用这个脚本就可以启动hadoop集群、hive数据库、kafka等服务(脚本嵌套脚本),但不含ke.sh等几个对这个电商数仓项目来说可用可不用的脚本内容
#!/bin/bash
case $1 in
"start"){
#启动 cluster相关集群
cluster start
#启动 hiveservices相关集群
hiveservices start
};;
"stop"){
#停止 cluster.sh相关集群
cluster stop
#停止 hiveservices相关集群
hiveservices stop
};;
esac
注:sc 脚本中的 hiveservices 脚本不是自定义脚本,是hive解压后 bin 目录下自带的脚本
2. cluster 脚本
#!/bin/bash
case $1 in
"start"){
echo ================== 启动 集群 ==================
#启动 Zookeeper集群
zk.sh start
#启动 Hadoop集群
myhadoop start
#启动 Kafka采集集群
kf.sh start
#启动 Flume采集集群
f1.sh start
#启动 Flume消费集群
f2.sh start
};;
"stop"){
echo ================== 停止 集群 ==================
#停止 Flume消费集群
f2.sh stop
#停止 Flume采集集群
f1.sh stop
#停止 Kafka采集集群
kf.sh stop
#停止 Hadoop集群
myhadoop stop
#停止 Zookeeper集群
zk.sh stop
};;
esac
3. myhadoop 脚本
启动 hadoop集群
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo " =================== 启动 hadoop集群 ==================="
echo " --------------- 启动 hdfs ---------------"
ssh hadoop105 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
echo " --------------- 启动 yarn ---------------"
ssh hadoop106 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
echo " --------------- 启动 historyserver ---------------"
ssh hadoop105 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")
echo " =================== 关闭 hadoop集群 ==================="
echo " --------------- 关闭 historyserver ---------------"
ssh hadoop105 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"
echo " --------------- 关闭 yarn ---------------"
ssh hadoop106 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
echo " --------------- 关闭 hdfs ---------------"
ssh hadoop105 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)
echo "Input Args Error..."
;;
esac
4. zk.sh 脚本
#!/bin/bash
case $1 in
"start"){
for i in hadoop105 hadoop106 hadoop107
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop105 hadoop106 hadoop107
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop105 hadoop106 hadoop107
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
};;
esac
5. kf.sh 脚本
#!/bin/bash
case $1 in
"start"){
for i in hadoop105 hadoop106 hadoop107
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "
done
};;
"stop"){
for i in hadoop105 hadoop106 hadoop107
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
done
};;
esac
6. f1.sh 脚本
#! /bin/bash
case $1 in
"start"){
for i in hadoop105 hadoop106
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop105 hadoop106
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done
};;
esac
7. f2.sh 脚本
#! /bin/bash
case $1 in
"start"){
for i in hadoop107
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop107
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
done
};;
esac
二、简便使用脚本
1. xsync 脚本
进行文件或者目录的分发
#!/bin/bash
#1. 判断参数个数
if [ $# -lt 1 ]
then
echo Not Enough Arguement!
exit;
fi
#2. 遍历集群所有机器
for host in hadoop105 hadoop106 hadoop107
do
echo ==================== $host ====================
#3. 遍历所有目录,挨个发送
for file in $@
do
#4. 判断文件是否存在
if [ -e $file ]
then
#5. 获取父目录
pdir=$(cd -P $(dirname $file); pwd)
#6. 获取当前文件的名称
fname=$(basename $file)
ssh $host "mkdir -p $pdir"
rsync -av $pdir/$fname $host:$pdir
else
echo $file does not exists!
fi
done
done
2. jpsall 脚本
#!/bin/bash
for host in hadoop105 hadoop106 hadoop107
do
echo =============== $host ===============
ssh $host jps
done
3. xcall.sh 脚本
#! /bin/bash
for i in hadoop105 hadoop106 hadoop107
do
echo --------- $i ----------
ssh $i "$*"
done
如:
4. lg.sh 脚本
#!/bin/bash
for i in hadoop105 hadoop106; do
echo "========== $i =========="
ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2020-05-10.jar >/dev/null 2>&1 &"
done
三、数据传输相关脚本
1. mysql_to_hdfs.sh 脚本
#! /bin/bash
APP=gmall
sqoop=/opt/module/sqoop/bin/sqoop
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d '-1 day' +%F`
fi
import_data(){
$sqoop import \
--connect jdbc:mysql://hadoop105:3306/$APP \
--username root \
--password 111111 \
--target-dir /origin_data/$APP/db/$1/$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec lzop \
--null-string '\\N' \
--null-non-string '\\N'
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /origin_data/$APP/db/$1/$do_date
}
import_order_info(){
import_data order_info "select
id,
final_total_amount,
order_status,
user_id,
out_trade_no,
create_time,
operate_time,
province_id,
benefit_reduce_amount,
original_total_amount,
feight_fee
from order_info
where (date_format(create_time,'%Y-%m-%d')='$do_date'
or date_format(operate_time,'%Y-%m-%d')='$do_date')"
}
import_coupon_use(){
import_data coupon_use "select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from coupon_use
where (date_format(get_time,'%Y-%m-%d')='$do_date'
or date_format(using_time,'%Y-%m-%d')='$do_date'
or date_format(used_time,'%Y-%m-%d')='$do_date')"
}
import_order_status_log(){
import_data order_status_log "select
id,
order_id,
order_status,
operate_time
from order_status_log
where date_format(operate_time,'%Y-%m-%d')='$do_date'"
}
import_activity_order(){
import_data activity_order "select
id,
activity_id,
order_id,
create_time
from activity_order
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_user_info(){
import_data "user_info" "select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time
from user_info
where (DATE_FORMAT(create_time,'%Y-%m-%d')='$do_date'
or DATE_FORMAT(operate_time,'%Y-%m-%d')='$do_date')"
}
import_order_detail(){
import_data order_detail "select
od.id,
order_id,
user_id,
sku_id,
sku_name,
order_price,
sku_num,
od.create_time,
source_type,
source_id
from order_detail od
join order_info oi
on od.order_id=oi.id
where DATE_FORMAT(od.create_time,'%Y-%m-%d')='$do_date'"
}
import_payment_info(){
import_data "payment_info" "select
id,
out_trade_no,
order_id,
user_id,
alipay_trade_no,
total_amount,
subject,
payment_type,
payment_time
from payment_info
where DATE_FORMAT(payment_time,'%Y-%m-%d')='$do_date'"
}
import_comment_info(){
import_data comment_info "select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
comment_txt,
create_time
from comment_info
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_order_refund_info(){
import_data order_refund_info "select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from order_refund_info
where date_format(create_time,'%Y-%m-%d')='$do_date'"
}
import_sku_info(){
import_data sku_info "select
id,
spu_id,
price,
sku_name,
sku_desc,
weight,
tm_id,
category3_id,
create_time
from sku_info where 1=1"
}
import_base_category1(){
import_data "base_category1" "select
id,
name
from base_category1 where 1=1"
}
import_base_category2(){
import_data "base_category2" "select
id,
name,
category1_id
from base_category2 where 1=1"
}
import_base_category3(){
import_data "base_category3" "select
id,
name,
category2_id
from base_category3 where 1=1"
}
import_base_province(){
import_data base_province "select
id,
name,
region_id,
area_code,
iso_code
from base_province
where 1=1"
}
import_base_region(){
import_data base_region "select
id,
region_name
from base_region
where 1=1"
}
import_base_trademark(){
import_data base_trademark "select
tm_id,
tm_name
from base_trademark
where 1=1"
}
import_spu_info(){
import_data spu_info "select
id,
spu_name,
category3_id,
tm_id
from spu_info
where 1=1"
}
import_favor_info(){
import_data favor_info "select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from favor_info
where 1=1"
}
import_cart_info(){
import_data cart_info "select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from cart_info
where 1=1"
}
import_coupon_info(){
import_data coupon_info "select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from coupon_info
where 1=1"
}
import_activity_info(){
import_data activity_info "select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from activity_info
where 1=1"
}
import_activity_rule(){
import_data activity_rule "select
id,
activity_id,
condition_amount,
condition_num,
benefit_amount,
benefit_discount,
benefit_level
from activity_rule
where 1=1"
}
import_base_dic(){
import_data base_dic "select
dic_code,
dic_name,
parent_code,
create_time,
operate_time
from base_dic
where 1=1"
}
case $1 in
"order_info")
import_order_info
;;
"base_category1")
import_base_category1
;;
"base_category2")
import_base_category2
;;
"base_category3")
import_base_category3
;;
"order_detail")
import_order_detail
;;
"sku_info")
import_sku_info
;;
"user_info")
import_user_info
;;
"payment_info")
import_payment_info
;;
"base_province")
import_base_province
;;
"base_region")
import_base_region
;;
"base_trademark")
import_base_trademark
;;
"activity_info")
import_activity_info
;;
"activity_order")
import_activity_order
;;
"cart_info")
import_cart_info
;;
"comment_info")
import_comment_info
;;
"coupon_info")
import_coupon_info
;;
"coupon_use")
import_coupon_use
;;
"favor_info")
import_favor_info
;;
"order_refund_info")
import_order_refund_info
;;
"order_status_log")
import_order_status_log
;;
"spu_info")
import_spu_info
;;
"activity_rule")
import_activity_rule
;;
"base_dic")
import_base_dic
;;
"first")
import_base_category1
import_base_category2
import_base_category3
import_order_info
import_order_detail
import_sku_info
import_user_info
import_payment_info
import_base_province
import_base_region
import_base_trademark
import_activity_info
import_activity_order
import_cart_info
import_comment_info
import_coupon_use
import_coupon_info
import_favor_info
import_order_refund_info
import_order_status_log
import_spu_info
import_activity_rule
import_base_dic
;;
"all")
import_base_category1
import_base_category2
import_base_category3
import_order_info
import_order_detail
import_sku_info
import_user_info
import_payment_info
import_base_trademark
import_activity_info
import_activity_order
import_cart_info
import_comment_info
import_coupon_use
import_coupon_info
import_favor_info
import_order_refund_info
import_order_status_log
import_spu_info
import_activity_rule
import_base_dic
;;
esac
2. hdfs_to_ods_db.sh 脚本
#!/bin/bash
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
sql1="
load data inpath '/origin_data/$APP/db/order_info/$do_date' OVERWRITE into table ${APP}.ods_order_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_detail/$do_date' OVERWRITE into table ${APP}.ods_order_detail partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/sku_info/$do_date' OVERWRITE into table ${APP}.ods_sku_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/user_info/$do_date' OVERWRITE into table ${APP}.ods_user_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/payment_info/$do_date' OVERWRITE into table ${APP}.ods_payment_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category1/$do_date' OVERWRITE into table ${APP}.ods_base_category1 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category2/$do_date' OVERWRITE into table ${APP}.ods_base_category2 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_category3/$do_date' OVERWRITE into table ${APP}.ods_base_category3 partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_trademark/$do_date' OVERWRITE into table ${APP}.ods_base_trademark partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_info/$do_date' OVERWRITE into table ${APP}.ods_activity_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_order/$do_date' OVERWRITE into table ${APP}.ods_activity_order partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/cart_info/$do_date' OVERWRITE into table ${APP}.ods_cart_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/comment_info/$do_date' OVERWRITE into table ${APP}.ods_comment_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/coupon_info/$do_date' OVERWRITE into table ${APP}.ods_coupon_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/coupon_use/$do_date' OVERWRITE into table ${APP}.ods_coupon_use partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/favor_info/$do_date' OVERWRITE into table ${APP}.ods_favor_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_refund_info/$do_date' OVERWRITE into table ${APP}.ods_order_refund_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/order_status_log/$do_date' OVERWRITE into table ${APP}.ods_order_status_log partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/spu_info/$do_date' OVERWRITE into table ${APP}.ods_spu_info partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/activity_rule/$do_date' OVERWRITE into table ${APP}.ods_activity_rule partition(dt='$do_date');
load data inpath '/origin_data/$APP/db/base_dic/$do_date' OVERWRITE into table ${APP}.ods_base_dic partition(dt='$do_date');
"
sql2="
load data inpath '/origin_data/$APP/db/base_province/$do_date' OVERWRITE into table ${APP}.ods_base_province;
load data inpath '/origin_data/$APP/db/base_region/$do_date' OVERWRITE into table ${APP}.ods_base_region;
"
case $1 in
"first"){
$hive -e "$sql1$sql2"
};;
"all"){
$hive -e "$sql1"
};;
esac
3. hdfs_to_ods_log.sh 脚本
#!/bin/bash
# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive
hadoop=/opt/module/hadoop-3.1.3/bin/hadoop
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
echo ================== 日志日期为 $do_date ==================
sql="
load data inpath '/origin_data/$APP/log/topic_log/$do_date' into table ${APP}.ods_log partition(dt='$do_date');
"
$hive -e "$sql"
$hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer -Dmapreduce.job.queuename=default /warehouse/$APP/ods/ods_log/dt=$do_date
4. ods_to_dwd_db.sh 脚本
#!/bin/bash
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$2" ] ;then
do_date=$2
else
do_date=`date -d "-1 day" +%F`
fi
sql1="
set mapreduce.job.queuename=default;
set hive.exec.dynamic.partition.mode=nonstrict;
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_dim_sku_info partition(dt='$do_date')
select
sku.id,
sku.spu_id,
sku.price,
sku.sku_name,
sku.sku_desc,
sku.weight,
sku.tm_id,
ob.tm_name,
sku.category3_id,
c2.id category2_id,
c1.id category1_id,
c3.name category3_name,
c2.name category2_name,
c1.name category1_name,
spu.spu_name,
sku.create_time
from
(
select * from ${APP}.ods_sku_info where dt='$do_date'
)sku
join
(
select * from ${APP}.ods_base_trademark where dt='$do_date'
)ob on sku.tm_id=ob.tm_id
join
(
select * from ${APP}.ods_spu_info where dt='$do_date'
)spu on spu.id = sku.spu_id
join
(
select * from ${APP}.ods_base_category3 where dt='$do_date'
)c3 on sku.category3_id=c3.id
join
(
select * from ${APP}.ods_base_category2 where dt='$do_date'
)c2 on c3.category2_id=c2.id
join
(
select * from ${APP}.ods_base_category1 where dt='$do_date'
)c1 on c2.category1_id=c1.id;
insert overwrite table ${APP}.dwd_dim_coupon_info partition(dt='$do_date')
select
id,
coupon_name,
coupon_type,
condition_amount,
condition_num,
activity_id,
benefit_amount,
benefit_discount,
create_time,
range_type,
spu_id,
tm_id,
category3_id,
limit_num,
operate_time,
expire_time
from ${APP}.ods_coupon_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_dim_activity_info partition(dt='$do_date')
select
id,
activity_name,
activity_type,
start_time,
end_time,
create_time
from ${APP}.ods_activity_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_fact_order_detail partition(dt='$do_date')
select
id,
order_id,
user_id,
sku_id,
sku_num,
order_price,
sku_num,
create_time,
province_id,
source_type,
source_id,
original_amount_d,
if(rn=1,final_total_amount-(sum_div_final_amount-final_amount_d),final_amount_d),
if(rn=1,feight_fee-(sum_div_feight_fee-feight_fee_d),feight_fee_d),
if(rn=1,benefit_reduce_amount-(sum_div_benefit_reduce_amount-benefit_reduce_amount_d),benefit_reduce_amount_d)
from
(
select
od.id,
od.order_id,
od.user_id,
od.sku_id,
od.sku_name,
od.order_price,
od.sku_num,
od.create_time,
oi.province_id,
od.source_type,
od.source_id,
round(od.order_price*od.sku_num,2) original_amount_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.final_total_amount,2) final_amount_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.feight_fee,2) feight_fee_d,
round(od.order_price*od.sku_num/oi.original_total_amount*oi.benefit_reduce_amount,2) benefit_reduce_amount_d,
row_number() over(partition by od.order_id order by od.id desc) rn,
oi.final_total_amount,
oi.feight_fee,
oi.benefit_reduce_amount,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.final_total_amount,2)) over(partition by od.order_id) sum_div_final_amount,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.feight_fee,2)) over(partition by od.order_id) sum_div_feight_fee,
sum(round(od.order_price*od.sku_num/oi.original_total_amount*oi.benefit_reduce_amount,2)) over(partition by od.order_id) sum_div_benefit_reduce_amount
from
(
select * from ${APP}.ods_order_detail where dt='$do_date'
) od
join
(
select * from ${APP}.ods_order_info where dt='$do_date'
) oi
on od.order_id=oi.id
)t1;
insert overwrite table ${APP}.dwd_fact_payment_info partition(dt='$do_date')
select
pi.id,
pi.out_trade_no,
pi.order_id,
pi.user_id,
pi.alipay_trade_no,
pi.total_amount,
pi.subject,
pi.payment_type,
pi.payment_time,
oi.province_id
from
(
select * from ${APP}.ods_payment_info where dt='$do_date'
)pi
join
(
select id, province_id from ${APP}.ods_order_info where dt='$do_date'
)oi
on pi.order_id = oi.id;
insert overwrite table ${APP}.dwd_fact_order_refund_info partition(dt='$do_date')
select
id,
user_id,
order_id,
sku_id,
refund_type,
refund_num,
refund_amount,
refund_reason_type,
create_time
from ${APP}.ods_order_refund_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_fact_comment_info partition(dt='$do_date')
select
id,
user_id,
sku_id,
spu_id,
order_id,
appraise,
create_time
from ${APP}.ods_comment_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_fact_cart_info partition(dt='$do_date')
select
id,
user_id,
sku_id,
cart_price,
sku_num,
sku_name,
create_time,
operate_time,
is_ordered,
order_time,
source_type,
source_id
from ${APP}.ods_cart_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_fact_favor_info partition(dt='$do_date')
select
id,
user_id,
sku_id,
spu_id,
is_cancel,
create_time,
cancel_time
from ${APP}.ods_favor_info
where dt='$do_date';
insert overwrite table ${APP}.dwd_fact_coupon_use partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.coupon_id is null,old.coupon_id,new.coupon_id),
if(new.user_id is null,old.user_id,new.user_id),
if(new.order_id is null,old.order_id,new.order_id),
if(new.coupon_status is null,old.coupon_status,new.coupon_status),
if(new.get_time is null,old.get_time,new.get_time),
if(new.using_time is null,old.using_time,new.using_time),
if(new.used_time is null,old.used_time,new.used_time),
date_format(if(new.get_time is null,old.get_time,new.get_time),'yyyy-MM-dd')
from
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ${APP}.dwd_fact_coupon_use
where dt in
(
select
date_format(get_time,'yyyy-MM-dd')
from ${APP}.ods_coupon_use
where dt='$do_date'
)
)old
full outer join
(
select
id,
coupon_id,
user_id,
order_id,
coupon_status,
get_time,
using_time,
used_time
from ${APP}.ods_coupon_use
where dt='$do_date'
)new
on old.id=new.id;
insert overwrite table ${APP}.dwd_fact_order_info partition(dt)
select
if(new.id is null,old.id,new.id),
if(new.order_status is null,old.order_status,new.order_status),
if(new.user_id is null,old.user_id,new.user_id),
if(new.out_trade_no is null,old.out_trade_no,new.out_trade_no),
if(new.tms['1001'] is null,old.create_time,new.tms['1001']),--1001对应未支付状态
if(new.tms['1002'] is null,old.payment_time,new.tms['1002']),
if(new.tms['1003'] is null,old.cancel_time,new.tms['1003']),
if(new.tms['1004'] is null,old.finish_time,new.tms['1004']),
if(new.tms['1005'] is null,old.refund_time,new.tms['1005']),
if(new.tms['1006'] is null,old.refund_finish_time,new.tms['1006']),
if(new.province_id is null,old.province_id,new.province_id),
if(new.activity_id is null,old.activity_id,new.activity_id),
if(new.original_total_amount is null,old.original_total_amount,new.original_total_amount),
if(new.benefit_reduce_amount is null,old.benefit_reduce_amount,new.benefit_reduce_amount),
if(new.feight_fee is null,old.feight_fee,new.feight_fee),
if(new.final_total_amount is null,old.final_total_amount,new.final_total_amount),
date_format(if(new.tms['1001'] is null,old.create_time,new.tms['1001']),'yyyy-MM-dd')
from
(
select
id,
order_status,
user_id,
out_trade_no,
create_time,
payment_time,
cancel_time,
finish_time,
refund_time,
refund_finish_time,
province_id,
activity_id,
original_total_amount,
benefit_reduce_amount,
feight_fee,
final_total_amount
from ${APP}.dwd_fact_order_info
where dt
in
(
select
date_format(create_time,'yyyy-MM-dd')
from ${APP}.ods_order_info
where dt='$do_date'
)
)old
full outer join
(
select
info.id,
info.order_status,
info.user_id,
info.out_trade_no,
info.province_id,
act.activity_id,
log.tms,
info.original_total_amount,
info.benefit_reduce_amount,
info.feight_fee,
info.final_total_amount
from
(
select
order_id,
str_to_map(concat_ws(',',collect_set(concat(order_status,'=',operate_time))),',','=') tms
from ${APP}.ods_order_status_log
where dt='$do_date'
group by order_id
)log
join
(
select * from ${APP}.ods_order_info where dt='$do_date'
)info
on log.order_id=info.id
left join
(
select * from ${APP}.ods_activity_order where dt='$do_date'
)act
on log.order_id=act.order_id
)new
on old.id=new.id;
"
sql2="
insert overwrite table ${APP}.dwd_dim_base_province
select
bp.id,
bp.name,
bp.area_code,
bp.iso_code,
bp.region_id,
br.region_name
from ${APP}.ods_base_province bp
join ${APP}.ods_base_region br
on bp.region_id=br.id;
"
sql3="
insert overwrite table ${APP}.dwd_dim_user_info_his_tmp
select * from
(
select
id,
name,
birthday,
gender,
email,
user_level,
create_time,
operate_time,
'$do_date' start_date,
'9999-99-99' end_date
from ${APP}.ods_user_info where dt='$do_date'
union all
select
uh.id,
uh.name,
uh.birthday,
uh.gender,
uh.email,
uh.user_level,
uh.create_time,
uh.operate_time,
uh.start_date,
if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date) end_date
from ${APP}.dwd_dim_user_info_his uh left join
(
select
*
from ${APP}.ods_user_info
where dt='$do_date'
) ui on uh.id=ui.id
)his
order by his.id, start_date;
insert overwrite table ${APP}.dwd_dim_user_info_his
select * from ${APP}.dwd_dim_user_info_his_tmp;
"
case $1 in
"first"){
$hive -e "$sql1$sql2"
};;
"all"){
$hive -e "$sql1$sql3"
};;
esac
5. ods_to_dwd_log.sh 脚本
#!/bin/bash
hive=/opt/module/hive/bin/hive
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
SET mapreduce.job.queuename=default;
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
insert overwrite table ${APP}.dwd_start_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.start') is not null;
insert overwrite table ${APP}.dwd_action_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(action,'$.action_id'),
get_json_object(action,'$.item'),
get_json_object(action,'$.item_type'),
get_json_object(action,'$.ts')
from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.actions')) tmp as action
where dt='$do_date'
and get_json_object(line,'$.actions') is not null;
insert overwrite table ${APP}.dwd_display_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.ts'),
get_json_object(display,'$.displayType'),
get_json_object(display,'$.item'),
get_json_object(display,'$.item_type'),
get_json_object(display,'$.order')
from ${APP}.ods_log lateral view ${APP}.explode_json_array(get_json_object(line,'$.displays')) tmp as display
where dt='$do_date'
and get_json_object(line,'$.displays') is not null;
insert overwrite table ${APP}.dwd_page_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.ts')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.page') is not null;
insert overwrite table ${APP}.dwd_error_log partition(dt='$do_date')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.actions'),
get_json_object(line,'$.displays'),
get_json_object(line,'$.ts'),
get_json_object(line,'$.err.error_code'),
get_json_object(line,'$.err.msg')
from ${APP}.ods_log
where dt='$do_date'
and get_json_object(line,'$.err') is not null;
"
$hive -e "$sql"
6. dwd_to_dws.sh 脚本
#!/bin/bash
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
set mapreduce.job.queuename=default;
with
tmp_start as
(
select
mid_id,
brand,
model,
count(*) login_count
from ${APP}.dwd_start_log
where dt='$do_date'
group by mid_id,brand,model
),
tmp_page as
(
select
mid_id,
brand,
model,
collect_set(named_struct('page_id',page_id,'page_count',page_count)) page_stats
from
(
select
mid_id,
brand,
model,
page_id,
count(*) page_count
from ${APP}.dwd_page_log
where dt='$do_date'
group by mid_id,brand,model,page_id
)tmp
group by mid_id,brand,model
)
insert overwrite table ${APP}.dws_uv_detail_daycount partition(dt='$do_date')
select
nvl(tmp_start.mid_id,tmp_page.mid_id),
nvl(tmp_start.brand,tmp_page.brand),
nvl(tmp_start.model,tmp_page.model),
tmp_start.login_count,
tmp_page.page_stats
from tmp_start
full outer join tmp_page
on tmp_start.mid_id=tmp_page.mid_id
and tmp_start.brand=tmp_page.brand
and tmp_start.model=tmp_page.model;
with
tmp_login as
(
select
user_id,
count(*) login_count
from ${APP}.dwd_start_log
where dt='$do_date'
and user_id is not null
group by user_id
),
tmp_cart as
(
select
user_id,
count(*) cart_count
from ${APP}.dwd_action_log
where dt='$do_date'
and user_id is not null
and action_id='cart_add'
group by user_id
),tmp_order as
(
select
user_id,
count(*) order_count,
sum(final_total_amount) order_amount
from ${APP}.dwd_fact_order_info
where dt='$do_date'
group by user_id
) ,
tmp_payment as
(
select
user_id,
count(*) payment_count,
sum(payment_amount) payment_amount
from ${APP}.dwd_fact_payment_info
where dt='$do_date'
group by user_id
),
tmp_order_detail as
(
select
user_id,
collect_set(named_struct('sku_id',sku_id,'sku_num',sku_num,'order_count',order_count,'order_amount',order_amount)) order_stats
from
(
select
user_id,
sku_id,
sum(sku_num) sku_num,
count(*) order_count,
cast(sum(final_amount_d) as decimal(20,2)) order_amount
from ${APP}.dwd_fact_order_detail
where dt='$do_date'
group by user_id,sku_id
)tmp
group by user_id
)
insert overwrite table ${APP}.dws_user_action_daycount partition(dt='$do_date')
select
tmp_login.user_id,
login_count,
nvl(cart_count,0),
nvl(order_count,0),
nvl(order_amount,0.0),
nvl(payment_count,0),
nvl(payment_amount,0.0),
order_stats
from tmp_login
left outer join tmp_cart on tmp_login.user_id=tmp_cart.user_id
left outer join tmp_order on tmp_login.user_id=tmp_order.user_id
left outer join tmp_payment on tmp_login.user_id=tmp_payment.user_id
left outer join tmp_order_detail on tmp_login.user_id=tmp_order_detail.user_id;
with
tmp_order as
(
select
sku_id,
count(*) order_count,
sum(sku_num) order_num,
sum(final_amount_d) order_amount
from ${APP}.dwd_fact_order_detail
where dt='$do_date'
group by sku_id
),
tmp_payment as
(
select
sku_id,
count(*) payment_count,
sum(sku_num) payment_num,
sum(final_amount_d) payment_amount
from ${APP}.dwd_fact_order_detail
where (dt='$do_date'
or dt=date_add('$do_date',-1))
and order_id in
(
select
id
from ${APP}.dwd_fact_order_info
where (dt='$do_date'
or dt=date_add('$do_date',-1))
and date_format(payment_time,'yyyy-MM-dd')='$do_date'
)
group by sku_id
),
tmp_refund as
(
select
sku_id,
count(*) refund_count,
sum(refund_num) refund_num,
sum(refund_amount) refund_amount
from ${APP}.dwd_fact_order_refund_info
where dt='$do_date'
group by sku_id
),
tmp_cart as
(
select
item sku_id,
count(*) cart_count
from ${APP}.dwd_action_log
where dt='$do_date'
and user_id is not null
and action_id='cart_add'
group by item
),tmp_favor as
(
select
item sku_id,
count(*) favor_count
from ${APP}.dwd_action_log
where dt='$do_date'
and user_id is not null
and action_id='favor_add'
group by item
),
tmp_appraise as
(
select
sku_id,
sum(if(appraise='1201',1,0)) appraise_good_count,
sum(if(appraise='1202',1,0)) appraise_mid_count,
sum(if(appraise='1203',1,0)) appraise_bad_count,
sum(if(appraise='1204',1,0)) appraise_default_count
from ${APP}.dwd_fact_comment_info
where dt='$do_date'
group by sku_id
)
insert overwrite table ${APP}.dws_sku_action_daycount partition(dt='$do_date')
select
sku_id,
sum(order_count),
sum(order_num),
sum(order_amount),
sum(payment_count),
sum(payment_num),
sum(payment_amount),
sum(refund_count),
sum(refund_num),
sum(refund_amount),
sum(cart_count),
sum(favor_count),
sum(appraise_good_count),
sum(appraise_mid_count),
sum(appraise_bad_count),
sum(appraise_default_count)
from
(
select
sku_id,
order_count,
order_num,
order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_order
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
payment_count,
payment_num,
payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_payment
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
refund_count,
refund_num,
refund_amount,
0 cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_refund
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
cart_count,
0 favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_cart
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
favor_count,
0 appraise_good_count,
0 appraise_mid_count,
0 appraise_bad_count,
0 appraise_default_count
from tmp_favor
union all
select
sku_id,
0 order_count,
0 order_num,
0 order_amount,
0 payment_count,
0 payment_num,
0 payment_amount,
0 refund_count,
0 refund_num,
0 refund_amount,
0 cart_count,
0 favor_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from tmp_appraise
)tmp
group by sku_id;
with
tmp_login as
(
select
area_code,
count(*) login_count
from ${APP}.dwd_start_log
where dt='$do_date'
group by area_code
),
tmp_op as
(
select
province_id,
sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',1,0)) order_count,
sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',final_total_amount,0)) order_amount,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',1,0)) payment_count,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',final_total_amount,0)) payment_amount
from ${APP}.dwd_fact_order_info
where (dt='$do_date' or dt=date_add('$do_date',-1))
group by province_id
)
insert overwrite table ${APP}.dws_area_stats_daycount partition(dt='$do_date')
select
pro.id,
pro.province_name,
pro.area_code,
pro.iso_code,
pro.region_id,
pro.region_name,
nvl(tmp_login.login_count,0),
nvl(tmp_op.order_count,0),
nvl(tmp_op.order_amount,0.0),
nvl(tmp_op.payment_count,0),
nvl(tmp_op.payment_amount,0.0)
from ${APP}.dwd_dim_base_province pro
left join tmp_login on pro.area_code=tmp_login.area_code
left join tmp_op on pro.id=tmp_op.province_id;
with
tmp_op as
(
select
activity_id,
sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',1,0)) order_count,
sum(if(date_format(create_time,'yyyy-MM-dd')='$do_date',final_total_amount,0)) order_amount,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',1,0)) payment_count,
sum(if(date_format(payment_time,'yyyy-MM-dd')='$do_date',final_total_amount,0)) payment_amount
from ${APP}.dwd_fact_order_info
where (dt='$do_date' or dt=date_add('$do_date',-1))
and activity_id is not null
group by activity_id
),
tmp_display as
(
select
item activity_id,
count(*) display_count
from ${APP}.dwd_display_log
where dt='$do_date'
and item_type='activity_id'
group by item
),
tmp_activity as
(
select
*
from ${APP}.dwd_dim_activity_info
where dt='$do_date'
)
insert overwrite table ${APP}.dws_activity_info_daycount partition(dt='$do_date')
select
nvl(tmp_op.activity_id,tmp_display.activity_id),
tmp_activity.activity_name,
tmp_activity.activity_type,
tmp_activity.start_time,
tmp_activity.end_time,
tmp_activity.create_time,
tmp_display.display_count,
tmp_op.order_count,
tmp_op.order_amount,
tmp_op.payment_count,
tmp_op.payment_amount
from tmp_op
full outer join tmp_display on tmp_op.activity_id=tmp_display.activity_id
left join tmp_activity on nvl(tmp_op.activity_id,tmp_display.activity_id)=tmp_activity.id;
"
$hive -e "$sql"
7. dws_to_dwt.sh 脚本
#!/bin/bash
APP=gmall
hive=/opt/module/hive/bin/hive
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
set mapreduce.job.queuename=default;
insert overwrite table ${APP}.dwt_uv_topic
select
nvl(new.mid_id,old.mid_id),
nvl(new.model,old.model),
nvl(new.brand,old.brand),
if(old.mid_id is null,'$do_date',old.login_date_first),
if(new.mid_id is not null,'$do_date',old.login_date_last),
if(new.mid_id is not null, new.login_count,0),
nvl(old.login_count,0)+if(new.login_count>0,1,0)
from
(
select
*
from ${APP}.dwt_uv_topic
)old
full outer join
(
select
*
from ${APP}.dws_uv_detail_daycount
where dt='$do_date'
)new
on old.mid_id=new.mid_id;
insert overwrite table ${APP}.dwt_user_topic
select
nvl(new.user_id,old.user_id),
if(old.login_date_first is null and new.login_count>0,'$do_date',old.login_date_first),
if(new.login_count>0,'$do_date',old.login_date_last),
nvl(old.login_count,0)+if(new.login_count>0,1,0),
nvl(new.login_last_30d_count,0),
if(old.order_date_first is null and new.order_count>0,'$do_date',old.order_date_first),
if(new.order_count>0,'$do_date',old.order_date_last),
nvl(old.order_count,0)+nvl(new.order_count,0),
nvl(old.order_amount,0)+nvl(new.order_amount,0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0),
if(old.payment_date_first is null and new.payment_count>0,'$do_date',old.payment_date_first),
if(new.payment_count>0,'$do_date',old.payment_date_last),
nvl(old.payment_count,0)+nvl(new.payment_count,0),
nvl(old.payment_amount,0)+nvl(new.payment_amount,0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0)
from
${APP}.dwt_user_topic old
full outer join
(
select
user_id,
sum(if(dt='$do_date',login_count,0)) login_count,
sum(if(dt='$do_date',order_count,0)) order_count,
sum(if(dt='$do_date',order_amount,0)) order_amount,
sum(if(dt='$do_date',payment_count,0)) payment_count,
sum(if(dt='$do_date',payment_amount,0)) payment_amount,
sum(if(login_count>0,1,0)) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from ${APP}.dws_user_action_daycount
where dt>=date_add( '$do_date',-30)
group by user_id
)new
on old.user_id=new.user_id;
insert overwrite table ${APP}.dwt_sku_topic
select
nvl(new.sku_id,old.sku_id),
sku_info.spu_id,
nvl(new.order_count30,0),
nvl(new.order_num30,0),
nvl(new.order_amount30,0),
nvl(old.order_count,0) + nvl(new.order_count,0),
nvl(old.order_num,0) + nvl(new.order_num,0),
nvl(old.order_amount,0) + nvl(new.order_amount,0),
nvl(new.payment_count30,0),
nvl(new.payment_num30,0),
nvl(new.payment_amount30,0),
nvl(old.payment_count,0) + nvl(new.payment_count,0),
nvl(old.payment_num,0) + nvl(new.payment_num,0),
nvl(old.payment_amount,0) + nvl(new.payment_amount,0),
nvl(new.refund_count30,0),
nvl(new.refund_num30,0),
nvl(new.refund_amount30,0),
nvl(old.refund_count,0) + nvl(new.refund_count,0),
nvl(old.refund_num,0) + nvl(new.refund_num,0),
nvl(old.refund_amount,0) + nvl(new.refund_amount,0),
nvl(new.cart_count30,0),
nvl(old.cart_count,0) + nvl(new.cart_count,0),
nvl(new.favor_count30,0),
nvl(old.favor_count,0) + nvl(new.favor_count,0),
nvl(new.appraise_good_count30,0),
nvl(new.appraise_mid_count30,0),
nvl(new.appraise_bad_count30,0),
nvl(new.appraise_default_count30,0) ,
nvl(old.appraise_good_count,0) + nvl(new.appraise_good_count,0),
nvl(old.appraise_mid_count,0) + nvl(new.appraise_mid_count,0),
nvl(old.appraise_bad_count,0) + nvl(new.appraise_bad_count,0),
nvl(old.appraise_default_count,0) + nvl(new.appraise_default_count,0)
from
(
select
sku_id,
spu_id,
order_last_30d_count,
order_last_30d_num,
order_last_30d_amount,
order_count,
order_num,
order_amount ,
payment_last_30d_count,
payment_last_30d_num,
payment_last_30d_amount,
payment_count,
payment_num,
payment_amount,
refund_last_30d_count,
refund_last_30d_num,
refund_last_30d_amount,
refund_count,
refund_num,
refund_amount,
cart_last_30d_count,
cart_count,
favor_last_30d_count,
favor_count,
appraise_last_30d_good_count,
appraise_last_30d_mid_count,
appraise_last_30d_bad_count,
appraise_last_30d_default_count,
appraise_good_count,
appraise_mid_count,
appraise_bad_count,
appraise_default_count
from ${APP}.dwt_sku_topic
)old
full outer join
(
select
sku_id,
sum(if(dt='$do_date', order_count,0 )) order_count,
sum(if(dt='$do_date',order_num ,0 )) order_num,
sum(if(dt='$do_date',order_amount,0 )) order_amount ,
sum(if(dt='$do_date',payment_count,0 )) payment_count,
sum(if(dt='$do_date',payment_num,0 )) payment_num,
sum(if(dt='$do_date',payment_amount,0 )) payment_amount,
sum(if(dt='$do_date',refund_count,0 )) refund_count,
sum(if(dt='$do_date',refund_num,0 )) refund_num,
sum(if(dt='$do_date',refund_amount,0 )) refund_amount,
sum(if(dt='$do_date',cart_count,0 )) cart_count,
sum(if(dt='$do_date',favor_count,0 )) favor_count,
sum(if(dt='$do_date',appraise_good_count,0 )) appraise_good_count,
sum(if(dt='$do_date',appraise_mid_count,0 ) ) appraise_mid_count ,
sum(if(dt='$do_date',appraise_bad_count,0 )) appraise_bad_count,
sum(if(dt='$do_date',appraise_default_count,0 )) appraise_default_count,
sum(order_count) order_count30 ,
sum(order_num) order_num30,
sum(order_amount) order_amount30,
sum(payment_count) payment_count30,
sum(payment_num) payment_num30,
sum(payment_amount) payment_amount30,
sum(refund_count) refund_count30,
sum(refund_num) refund_num30,
sum(refund_amount) refund_amount30,
sum(cart_count) cart_count30,
sum(favor_count) favor_count30,
sum(appraise_good_count) appraise_good_count30,
sum(appraise_mid_count) appraise_mid_count30,
sum(appraise_bad_count) appraise_bad_count30,
sum(appraise_default_count) appraise_default_count30
from ${APP}.dws_sku_action_daycount
where dt >= date_add ('$do_date', -30)
group by sku_id
)new
on new.sku_id = old.sku_id
left join
(select * from ${APP}.dwd_dim_sku_info where dt='$do_date') sku_info
on nvl(new.sku_id,old.sku_id)= sku_info.id;
insert overwrite table ${APP}.dwt_activity_topic
select
nvl(new.id,old.id),
nvl(new.activity_name,old.activity_name),
nvl(new.activity_type,old.activity_type),
nvl(new.start_time,old.start_time),
nvl(new.end_time,old.end_time),
nvl(new.create_time,old.create_time),
nvl(new.display_count,0),
nvl(new.order_count,0),
nvl(new.order_amount,0.0),
nvl(new.payment_count,0),
nvl(new.payment_amount,0.0),
nvl(new.display_count,0)+nvl(old.display_count,0),
nvl(new.order_count,0)+nvl(old.order_count,0),
nvl(new.order_amount,0.0)+nvl(old.order_amount,0.0),
nvl(new.payment_count,0)+nvl(old.payment_count,0),
nvl(new.payment_amount,0.0)+nvl(old.payment_amount,0.0)
from
(
select
*
from ${APP}.dwt_activity_topic
)old
full outer join
(
select
*
from ${APP}.dws_activity_info_daycount
where dt='$do_date'
)new
on old.id=new.id;
insert overwrite table ${APP}.dwt_area_topic
select
nvl(old.id,new.id),
nvl(old.province_name,new.province_name),
nvl(old.area_code,new.area_code),
nvl(old.iso_code,new.iso_code),
nvl(old.region_id,new.region_id),
nvl(old.region_name,new.region_name),
nvl(new.login_day_count,0),
nvl(new.login_last_30d_count,0),
nvl(new.order_day_count,0),
nvl(new.order_day_amount,0.0),
nvl(new.order_last_30d_count,0),
nvl(new.order_last_30d_amount,0.0),
nvl(new.payment_day_count,0),
nvl(new.payment_day_amount,0.0),
nvl(new.payment_last_30d_count,0),
nvl(new.payment_last_30d_amount,0.0)
from
(
select
*
from ${APP}.dwt_area_topic
)old
full outer join
(
select
id,
province_name,
area_code,
iso_code,
region_id,
region_name,
sum(if(dt='$do_date',login_count,0)) login_day_count,
sum(if(dt='$do_date',order_count,0)) order_day_count,
sum(if(dt='$do_date',order_amount,0.0)) order_day_amount,
sum(if(dt='$do_date',payment_count,0)) payment_day_count,
sum(if(dt='$do_date',payment_amount,0.0)) payment_day_amount,
sum(login_count) login_last_30d_count,
sum(order_count) order_last_30d_count,
sum(order_amount) order_last_30d_amount,
sum(payment_count) payment_last_30d_count,
sum(payment_amount) payment_last_30d_amount
from ${APP}.dws_area_stats_daycount
where dt>=date_add('$do_date',-30)
group by id,province_name,area_code,iso_code,region_id,region_name
)new
on old.id=new.id;
"
$hive -e "$sql"
8. dwt_to_ads.sh 脚本
#!/bin/bash
hive=/opt/module/hive/bin/hive
APP=gmall
# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
do_date=$1
else
do_date=`date -d "-1 day" +%F`
fi
sql="
set mapreduce.job.queuename=default;
insert into table ${APP}.ads_uv_count
select
'$do_date' dt,
daycount.ct,
wkcount.ct,
mncount.ct,
if(date_add(next_day('$do_date','MO'),-1)='$do_date','Y','N') ,
if(last_day('$do_date')='$do_date','Y','N')
from
(
select
'$do_date' dt,
count(*) ct
from ${APP}.dwt_uv_topic
where login_date_last='$do_date'
)daycount join
(
select
'$do_date' dt,
count (*) ct
from ${APP}.dwt_uv_topic
where login_date_last>=date_add(next_day('$do_date','MO'),-7)
and login_date_last<= date_add(next_day('$do_date','MO'),-1)
) wkcount on daycount.dt=wkcount.dt
join
(
select
'$do_date' dt,
count (*) ct
from ${APP}.dwt_uv_topic
where date_format(login_date_last,'yyyy-MM')=date_format('$do_date','yyyy-MM')
)mncount on daycount.dt=mncount.dt;
insert into table ${APP}.ads_new_mid_count
select
login_date_first,
count(*)
from ${APP}.dwt_uv_topic
where login_date_first='$do_date'
group by login_date_first;
insert into table ${APP}.ads_silent_count
select
'$do_date',
count(*)
from ${APP}.dwt_uv_topic
where login_date_first=login_date_last
and login_date_last<=date_add('$do_date',-7);
insert into table ${APP}.ads_back_count
select
'$do_date',
concat(date_add(next_day('$do_date','MO'),-7),'_', date_add(next_day('$do_date','MO'),-1)),
count(*)
from
(
select
mid_id
from ${APP}.dwt_uv_topic
where login_date_last>=date_add(next_day('$do_date','MO'),-7)
and login_date_last<= date_add(next_day('$do_date','MO'),-1)
and login_date_first<date_add(next_day('$do_date','MO'),-7)
)current_wk
left join
(
select
mid_id
from ${APP}.dws_uv_detail_daycount
where dt>=date_add(next_day('$do_date','MO'),-7*2)
and dt<= date_add(next_day('$do_date','MO'),-7-1)
group by mid_id
)last_wk
on current_wk.mid_id=last_wk.mid_id
where last_wk.mid_id is null;
insert into table ${APP}.ads_wastage_count
select
'$do_date',
count(*)
from
(
select
mid_id
from ${APP}.dwt_uv_topic
where login_date_last<=date_add('$do_date',-7)
group by mid_id
)t1;
insert into table ${APP}.ads_user_retention_day_rate
select
'$do_date',--统计日期
date_add('$do_date',-1),--新增日期
1,--留存天数
sum(if(login_date_first=date_add('$do_date',-1) and login_date_last='$do_date',1,0)),--$do_date的1日留存数
sum(if(login_date_first=date_add('$do_date',-1),1,0)),--$do_date新增
sum(if(login_date_first=date_add('$do_date',-1) and login_date_last='$do_date',1,0))/sum(if(login_date_first=date_add('$do_date',-1),1,0))*100
from ${APP}.dwt_uv_topic
union all
select
'$do_date',--统计日期
date_add('$do_date',-2),--新增日期
2,--留存天数
sum(if(login_date_first=date_add('$do_date',-2) and login_date_last='$do_date',1,0)),--$do_date的2日留存数
sum(if(login_date_first=date_add('$do_date',-2),1,0)),--$do_date新增
sum(if(login_date_first=date_add('$do_date',-2) and login_date_last='$do_date',1,0))/sum(if(login_date_first=date_add('$do_date',-2),1,0))*100
from ${APP}.dwt_uv_topic
union all
select
'$do_date',--统计日期
date_add('$do_date',-3),--新增日期
3,--留存天数
sum(if(login_date_first=date_add('$do_date',-3) and login_date_last='$do_date',1,0)),--$do_date的3日留存数
sum(if(login_date_first=date_add('$do_date',-3),1,0)),--$do_date新增
sum(if(login_date_first=date_add('$do_date',-3) and login_date_last='$do_date',1,0))/sum(if(login_date_first=date_add('$do_date',-3),1,0))*100
from ${APP}.dwt_uv_topic;
insert into table ${APP}.ads_continuity_wk_count
select
'$do_date',
concat(date_add(next_day('$do_date','MO'),-7*3),'_',date_add(next_day('$do_date','MO'),-1)),
count(*)
from
(
select
mid_id
from
(
select
mid_id
from ${APP}.dws_uv_detail_daycount
where dt>=date_add(next_day('$do_date','monday'),-7)
and dt<=date_add(next_day('$do_date','monday'),-1)
group by mid_id
union all
select
mid_id
from ${APP}.dws_uv_detail_daycount
where dt>=date_add(next_day('$do_date','monday'),-7*2)
and dt<=date_add(next_day('$do_date','monday'),-7-1)
group by mid_id
union all
select
mid_id
from ${APP}.dws_uv_detail_daycount
where dt>=date_add(next_day('$do_date','monday'),-7*3)
and dt<=date_add(next_day('$do_date','monday'),-7*2-1)
group by mid_id
)t1
group by mid_id
having count(*)=3
)t2;
insert into table ${APP}.ads_continuity_uv_count
select
'$do_date',
concat(date_add('$do_date',-6),'_','$do_date'),
count(*)
from
(
select mid_id
from
(
select mid_id
from
(
select
mid_id,
date_sub(dt,rank) date_dif
from
(
select
mid_id,
dt,
rank() over(partition by mid_id order by dt) rank
from ${APP}.dws_uv_detail_daycount
where dt>=date_add('$do_date',-6) and dt<='$do_date'
)t1
)t2
group by mid_id,date_dif
having count(*)>=3
)t3
group by mid_id
)t4;
insert into table ${APP}.ads_user_topic
select
'$do_date',
sum(if(login_date_last='$do_date',1,0)),
sum(if(login_date_first='$do_date',1,0)),
sum(if(payment_date_first='$do_date',1,0)),
sum(if(payment_count>0,1,0)),
count(*),
sum(if(login_date_last='$do_date',1,0))/count(*),
sum(if(payment_count>0,1,0))/count(*),
sum(if(login_date_first='$do_date',1,0))/sum(if(login_date_last='$do_date',1,0))
from ${APP}.dwt_user_topic;
with
tmp_uv as
(
select
'$do_date' dt,
sum(if(array_contains(pages,'home'),1,0)) home_count,
sum(if(array_contains(pages,'good_detail'),1,0)) good_detail_count
from
(
select
mid_id,
collect_set(page_id) pages
from ${APP}.dwd_page_log
where dt='$do_date'
and page_id in ('home','good_detail')
group by mid_id
)tmp
),
tmp_cop as
(
select
'$do_date' dt,
sum(if(cart_count>0,1,0)) cart_count,
sum(if(order_count>0,1,0)) order_count,
sum(if(payment_count>0,1,0)) payment_count
from ${APP}.dws_user_action_daycount
where dt='$do_date'
)
insert into table ${APP}.ads_user_action_convert_day
select
tmp_uv.dt,
tmp_uv.home_count,
tmp_uv.good_detail_count,
tmp_uv.good_detail_count/tmp_uv.home_count*100,
tmp_cop.cart_count,
tmp_cop.cart_count/tmp_uv.good_detail_count*100,
tmp_cop.order_count,
tmp_cop.order_count/tmp_cop.cart_count*100,
tmp_cop.payment_count,
tmp_cop.payment_count/tmp_cop.order_count*100
from tmp_uv
join tmp_cop
on tmp_uv.dt=tmp_cop.dt;
insert into table ${APP}.ads_product_info
select
'$do_date' dt,
sku_num,
spu_num
from
(
select
'$do_date' dt,
count(*) sku_num
from
${APP}.dwt_sku_topic
) tmp_sku_num
join
(
select
'$do_date' dt,
count(*) spu_num
from
(
select
spu_id
from
${APP}.dwt_sku_topic
group by
spu_id
) tmp_spu_id
) tmp_spu_num
on
tmp_sku_num.dt=tmp_spu_num.dt;
insert into table ${APP}.ads_product_sale_topN
select
'$do_date' dt,
sku_id,
payment_amount
from
${APP}.dws_sku_action_daycount
where
dt='$do_date'
order by payment_amount desc
limit 10;
insert into table ${APP}.ads_product_favor_topN
select
'$do_date' dt,
sku_id,
favor_count
from
${APP}.dws_sku_action_daycount
where
dt='$do_date'
order by favor_count desc
limit 10;
insert into table ${APP}.ads_product_cart_topN
select
'$do_date' dt,
sku_id,
cart_count
from
${APP}.dws_sku_action_daycount
where
dt='$do_date'
order by cart_count desc
limit 10;
insert into table ${APP}.ads_product_refund_topN
select
'$do_date',
sku_id,
refund_last_30d_count/payment_last_30d_count*100 refund_ratio
from ${APP}.dwt_sku_topic
order by refund_ratio desc
limit 10;
insert into table ${APP}.ads_appraise_bad_topN
select
'$do_date' dt,
sku_id,
appraise_bad_count/(appraise_good_count+appraise_mid_count+appraise_bad_count+appraise_default_count) appraise_bad_ratio
from
${APP}.dws_sku_action_daycount
where
dt='$do_date'
order by appraise_bad_ratio desc
limit 10;
insert into table ${APP}.ads_order_daycount
select
'$do_date',
sum(order_count),
sum(order_amount),
sum(if(order_count>0,1,0))
from ${APP}.dws_user_action_daycount
where dt='$do_date';
insert into table ${APP}.ads_payment_daycount
select
tmp_payment.dt,
tmp_payment.payment_count,
tmp_payment.payment_amount,
tmp_payment.payment_user_count,
tmp_skucount.payment_sku_count,
tmp_time.payment_avg_time
from
(
select
'$do_date' dt,
sum(payment_count) payment_count,
sum(payment_amount) payment_amount,
sum(if(payment_count>0,1,0)) payment_user_count
from ${APP}.dws_user_action_daycount
where dt='$do_date'
)tmp_payment
join
(
select
'$do_date' dt,
sum(if(payment_count>0,1,0)) payment_sku_count
from ${APP}.dws_sku_action_daycount
where dt='$do_date'
)tmp_skucount on tmp_payment.dt=tmp_skucount.dt
join
(
select
'$do_date' dt,
sum(unix_timestamp(payment_time)-unix_timestamp(create_time))/count(*)/60 payment_avg_time
from ${APP}.dwd_fact_order_info
where dt='$do_date'
and payment_time is not null
)tmp_time on tmp_payment.dt=tmp_time.dt;
with
tmp_order as
(
select
user_id,
order_stats_struct.sku_id sku_id,
order_stats_struct.order_count order_count
from ${APP}.dws_user_action_daycount lateral view explode(order_detail_stats) tmp as order_stats_struct
where date_format(dt,'yyyy-MM')=date_format('$do_date','yyyy-MM')
),
tmp_sku as
(
select
id,
tm_id,
category1_id,
category1_name
from ${APP}.dwd_dim_sku_info
where dt='$do_date'
)
insert into table ${APP}.ads_sale_tm_category1_stat_mn
select
tm_id,
category1_id,
category1_name,
sum(if(order_count>=1,1,0)) buycount,
sum(if(order_count>=2,1,0)) buyTwiceLast,
sum(if(order_count>=2,1,0))/sum( if(order_count>=1,1,0)) buyTwiceLastRatio,
sum(if(order_count>=3,1,0)) buy3timeLast ,
sum(if(order_count>=3,1,0))/sum( if(order_count>=1,1,0)) buy3timeLastRatio ,
date_format('$do_date' ,'yyyy-MM') stat_mn,
'$do_date' stat_date
from
(
select
tmp_order.user_id,
tmp_sku.category1_id,
tmp_sku.category1_name,
tmp_sku.tm_id,
sum(order_count) order_count
from tmp_order
join tmp_sku
on tmp_order.sku_id=tmp_sku.id
group by tmp_order.user_id,tmp_sku.category1_id,tmp_sku.category1_name,tmp_sku.tm_id
)tmp
group by tm_id, category1_id, category1_name;
insert into table ${APP}.ads_area_topic
select
'$do_date',
id,
province_name,
area_code,
iso_code,
region_id,
region_name,
login_day_count,
order_day_count,
order_day_amount,
payment_day_count,
payment_day_amount
from ${APP}.dwt_area_topic;
"
$hive -e "$sql"