二百二十九、离线数仓——离线数仓Hive从Kafka、MySQL到ClickHouse的完整开发流程

news2024/12/28 5:11:19

一、目的

为了整理离线数仓开发的全流程,算是温故知新吧

离线数仓的数据源是Kafka和MySQL数据库,Kafka存业务数据,MySQL存维度数据

采集工具是Kettle和Flume,Flume采集Kafka数据,Kettle采集MySQL数据

离线数仓是Hive

目标数据库是ClickHouse

任务调度器是海豚

二、数据采集

(一)Flume采集Kafka数据

1、Flume配置文件

## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1

## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092
a1.sources.s1.kafka.topics = topic_b_queue
a1.sources.s1.kafka.consumer.group.id = queue_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000

## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/queue
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/queue

## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = queue
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 1200000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.fileType = SequenceFile
a1.sinks.k1.hdfs.codeC = gzip

## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

2、用海豚调度Flume任务

#!/bin/bash
source /etc/profile

/usr/local/hurys/dc_env/flume/flume190/bin/flume-ng agent -n a1 -f /usr/local/hurys/dc_env/flume/flume190/conf/queue.properties

3、目标路径

(二)Kettle采集MySQL维度数据

1、Kettle任务配置

2、用海豚调度Kettle任务

#!/bin/bash
source /etc/profile

/usr/local/hurys/dc_env/kettle/data-integration/pan.sh -rep=hurys_linux_kettle_repository -user=admin -pass=admin -dir=/mysql_to_hdfs/ -trans=23_MySQL_to_HDFS_tb_radar_lane level=Basic >>/home/log/kettle/23_MySQL_to_HDFS_tb_radar_lane_`date +%Y%m%d`.log 

3、目标路径

三、ODS层

(一)业务数据表

use hurys_dc_ods;

create external table  if not exists  ods_queue(
    queue_json  string
)
comment '静态排队数据表——静态分区'
partitioned by (day string)
stored as SequenceFile
;
--刷新表分区
msck repair table ods_queue;
--查看表分区
show partitions ods_queue;
--查看表数据
select * from ods_queue;

(二)维度数据表

use hurys_dc_basic;

create  external  table  if not exists  tb_device_scene(
    id        int      comment '主键id',
    device_no string   comment '设备编号',
    scene_id  string   comment '场景编号'
)
comment '雷达场景表'
row format delimited fields terminated by ','
stored as  textfile  location '/data/tb_device_scene'
tblproperties("skip.header.line.count"="1") ;
--查看表数据
select * from hurys_dc_basic.tb_device_scene;

四、DWD层

(一)业务数据清洗

1、业务数据的JSON有多层

--1、静态排队数据内部表——动态分区  dwd_queue
create  table  if not exists  dwd_queue(
    device_no    string          comment '设备编号',
    lane_num     int             comment '车道数量',
    create_time  timestamp       comment '创建时间',
    lane_no      int             comment '车道编号',
    lane_type    int             comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',
    queue_count  int             comment '排队车辆数',
    queue_len    decimal(10,2)   comment '排队长度(m)',
    queue_head   decimal(10,2)   comment '排队第一辆车距离停止线距离(m)',
    queue_tail   decimal(10,2)   comment '排队最后一辆车距离停止线距离(m)'
)
comment '静态排队数据表——动态分区'
partitioned by (day string)
stored as orc
;
--动态插入数据

with t1 as(
select
       get_json_object(queue_json,'$.deviceNo')   device_no,
       get_json_object(queue_json,'$.createTime') create_time,
       get_json_object(queue_json,'$.laneNum')    lane_num,
       get_json_object(queue_json,'$.queueList')  queue_list
from hurys_dc_ods.ods_queue
    )
insert  overwrite  table  hurys_dc_dwd.dwd_queue partition(day)
select
        t1.device_no,
        t1.lane_num,
        substr(create_time,1,19)                                               create_time ,
        get_json_object(list_json,'$.laneNo')                                  lane_no,
        get_json_object(list_json,'$.laneType')                                lane_type,
        get_json_object(list_json,'$.queueCount')                              queue_count,
        cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2))       queue_len,
        cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2))       queue_head,
        cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2))       queue_tail,
        date(t1.create_time) day
from t1
lateral view explode(split(regexp_replace(regexp_replace(queue_list,
                                                '\\[|\\]','') ,   --将json数组两边的中括号去掉
                            '\\}\\,\\{','\\}\\;\\{'),  --将json数组元素之间的逗号换成分号
                  '\\;') --以分号作为分隔符(split函数以分号作为分隔)
          )list_queue as list_json
where  device_no is not null  and create_time is not null and  get_json_object(list_json,'$.queueLen') between 0 and 500
and  get_json_object(list_json,'$.queueHead')  between 0 and 500 and  get_json_object(list_json,'$.queueTail')  between 0 and 500 and  get_json_object(list_json,'$.queueCount') between 0 and 100
group by t1.device_no, t1.lane_num, substr(create_time,1,19), get_json_object(list_json,'$.laneNo'), get_json_object(list_json,'$.laneType'), get_json_object(list_json,'$.queueCount'), cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2)), cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2)), cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2)), date(t1.create_time)
;
--查看分区
show partitions dwd_queue;
--查看数据
select * from dwd_queue
where day='2024-03-11';
--删掉表分区
alter table hurys_dc_dwd.dwd_queue drop partition (day='2024-03-11');

2、业务数据的JSON只有一层

--2、转向比数据内部表——动态分区  dwd_turnratio
create  table  if not exists  dwd_turnratio(
    device_no       string        comment '设备编号',
    cycle           int           comment '转向比数据周期' ,
    create_time     timestamp     comment '创建时间',
    volume_sum      int           comment '指定时间段内通过路口的车辆总数',
    speed_avg       decimal(10,2) comment '指定时间段内通过路口的所有车辆速度的平均值',
    volume_left     int           comment '指定时间段内通过路口的左转车辆总数',
    speed_left      decimal(10,2) comment '指定时间段内通过路口的左转车辆速度的平均值',
    volume_straight int           comment '指定时间段内通过路口的直行车辆总数',
    speed_straight  decimal(10,2) comment '指定时间段内通过路口的直行车辆速度的平均值',
    volume_right    int           comment '指定时间段内通过路口的右转车辆总数',
    speed_right     decimal(10,2) comment '指定时间段内通过路口的右转车辆速度的平均值',
    volume_turn     int           comment '指定时间段内通过路口的掉头车辆总数',
    speed_turn      decimal(10,2) comment '指定时间段内通过路口的掉头车辆速度的平均值'
)
comment '转向比数据表——动态分区'
partitioned by (day string)   --分区字段不能是表中已经存在的数据,可以将分区字段看作表的伪列。
stored as orc                 --表存储数据格式为orc
;
--动态插入数据
--解析json字段、去重、非空、volumeSum>=0
--speed_avg、speed_left、speed_straight、speed_right、speed_turn 等字段保留两位小数
--0<=volume_sum<=1000、0<=speed_avg<=150、0<=volume_left<=1000、0<=speed_left<=100、0<=volume_straight<=1000
--0<=speed_straight<=150、0<=volume_right<=1000、0<=speed_right<=100、0<=volume_turn<=100、0<=speed_turn<=100
with t1 as(
select
        get_json_object(turnratio_json,'$.deviceNo')        device_no,
        get_json_object(turnratio_json,'$.cycle')           cycle,
        get_json_object(turnratio_json,'$.createTime')      create_time,
        get_json_object(turnratio_json,'$.volumeSum')       volume_sum,
        cast(get_json_object(turnratio_json,'$.speedAvg')     as decimal(10,2))    speed_avg,
        get_json_object(turnratio_json,'$.volumeLeft')      volume_left,
        cast(get_json_object(turnratio_json,'$.speedLeft')    as decimal(10,2))    speed_left,
        get_json_object(turnratio_json,'$.volumeStraight')  volume_straight,
        cast(get_json_object(turnratio_json,'$.speedStraight')as decimal(10,2))    speed_straight,
        get_json_object(turnratio_json,'$.volumeRight')     volume_right,
        cast(get_json_object(turnratio_json,'$.speedRight')   as decimal(10,2))    speed_right ,
        case when  get_json_object(turnratio_json,'$.volumeTurn')  is null then 0 else get_json_object(turnratio_json,'$.volumeTurn')  end as   volume_turn ,
        case when  get_json_object(turnratio_json,'$.speedTurn')   is null then 0 else cast(get_json_object(turnratio_json,'$.speedTurn')as decimal(10,2))   end as   speed_turn
from hurys_dc_ods.ods_turnratio)
insert overwrite table hurys_dc_dwd.dwd_turnratio partition (day)
select
       t1.device_no,
       cycle,
       substr(create_time,1,19)              create_time ,
       volume_sum,
       speed_avg,
       volume_left,
       speed_left,
       volume_straight,
       speed_straight ,
       volume_right,
       speed_right ,
       volume_turn,
       speed_turn,
       date(create_time) day
from t1
where device_no is not null and volume_sum between 0 and 1000 and speed_avg between 0 and 150 and volume_left  between 0 and 1000
and speed_left between 0 and 100 and volume_straight between 0 and 1000 and speed_straight between 0 and 150
and volume_right between 0 and 1000 and speed_right between 0 and 100 and volume_turn between 0 and 100 and speed_turn between 0 and 100
group by t1.device_no, cycle, substr(create_time,1,19), volume_sum, speed_avg, volume_left, speed_left, volume_straight, speed_straight, volume_right, speed_right, volume_turn, speed_turn, date(create_time)
;
--查看分区
show partitions dwd_turnratio;
--查看数据
select * from hurys_dc_dwd.dwd_turnratio
where day='2024-03-11';
--删掉表分区
alter table hurys_dc_dwd.dwd_turnratio drop partition (day='2024-03-11');

(二)维度数据清洗

create table if not exists  dwd_radar_lane(
    device_no         string  comment '雷达编号',
    lane_no           string  comment '车道编号',
    lane_id           string  comment '车道id',
    lane_direction    string  comment '行驶方向',
    lane_type         int     comment '车道类型 0渠化,1来向路段,2出口,3去向路段,4路口,5非路口路段,6其他',
    lane_length       float   comment '车道长度',
    lane_type_name    string  comment '车道类型名称'
)
comment '雷达车道信息表'
stored as orc
;
--create table if not exists  dwd_radar_lane  stored as orc as
--加载数据
insert overwrite table  hurys_dc_dwd.dwd_radar_lane
select
device_no, lane_no, lane_id, lane_direction, lane_type,lane_length ,
       case when lane_type='0' then '渠化'
            when lane_type='1' then '来向路段'
            when lane_type='2' then '出口'
            when lane_type='3' then '去向路段'
       end as lane_type_name
from hurys_dc_basic.tb_radar_lane
where lane_length is not null
group by device_no, lane_no, lane_id, lane_direction, lane_type, lane_length
;
--查看表数据
select * from hurys_dc_dwd.dwd_radar_lane;

五、DWS层

create  table  if not exists  dws_statistics_volume_1hour(
    device_no        string         comment '设备编号',
    scene_name       string         comment '场景名称',
    lane_no          int            comment '车道编号',
    lane_direction   string         comment '车道流向',
    section_no       int            comment '断面编号',
    device_direction string         comment '雷达朝向',
    sum_volume_hour  int            comment '每小时总流量',
    start_time       timestamp      comment '开始时间'
)
comment '统计数据流量表——动态分区——1小时周期'
partitioned by (day string)
stored as orc
;
--动态加载数据  --两个一起 1m41s 、 convert.join=false  1m43s、
--注意字段顺序  查询语句中字段顺序与建表字段顺序一致
insert  overwrite  table  hurys_dc_dws.dws_statistics_volume_1hour  partition(day)
select
       dwd_st.device_no,
       dwd_sc.scene_name,
       dwd_st.lane_no,
       dwd_rl.lane_direction,
       dwd_st.section_no,
       dwd_rc.device_direction,
       sum(volume_sum) sum_volume_hour,
       concat(substr(create_time, 1, 14), '00:00') start_time,
       day
from hurys_dc_dwd.dwd_statistics as dwd_st
    right join hurys_dc_dwd.dwd_radar_lane as dwd_rl
              on dwd_rl.device_no=dwd_st.device_no and dwd_rl.lane_no=dwd_st.lane_no
    right join hurys_dc_dwd.dwd_device_scene as dwd_ds
              on dwd_ds.device_no=dwd_st.device_no
    right join hurys_dc_dwd.dwd_scene as dwd_sc
              on dwd_sc.scene_id = dwd_ds.scene_id
    right join hurys_dc_dwd.dwd_radar_config as dwd_rc
              on dwd_rc.device_no=dwd_st.device_no
where dwd_st.create_time is not null
group by dwd_st.device_no, dwd_sc.scene_name, dwd_st.lane_no, dwd_rl.lane_direction, dwd_st.section_no, dwd_rc.device_direction, concat(substr(create_time, 1, 14), '00:00'), day
;
--查看分区
show partitions dws_statistics_volume_1hour;
--查看数据
select * from hurys_dc_dws.dws_statistics_volume_1hour
where day='2024-02-29';

六、ADS层

这里的ADS层,其实就是用Kettle把Hive的DWS层结果数据同步到ClickHouse中,也是一个Kettle任务而已

这样用海豚进行调度每一层的任务,整个离线数仓流程就跑起来了

七、海豚调度任务(除了2个采集任务外)

(一)delete_stale_data(根据删除策略删除ODS层原始数据)

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
day_30_ago_date=`date -d "30 day ago " +%Y-%m-%d`

#静态排队数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=${day_30_ago_date}
fi

#轨迹数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_track/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_track/day=${day_30_ago_date}
fi

#动态排队数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_queue_dynamic/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_queue_dynamic/day=${day_30_ago_date}
fi

#区域数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_area/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_area/day=${day_30_ago_date}
fi

#事件数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_event/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_event/day=${day_30_ago_date}
fi

#删除表分区
hive -e "
use hurys_dc_ods;

alter table hurys_dc_ods.ods_area drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_event drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_queue drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_queue_dynamic drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_track drop partition (day='$day_30_ago_date')
"

(二)flume(Flume采集Kafka业务数据)

(三)create_database_table(自动创建Hive和ClickHouse的库表)

1、创建Hive库表

#! /bin/bash
source /etc/profile

hive -e "
source  1_dws.sql
"

2、创建ClickHouse库表

#! /bin/bash
source /etc/profile

clickhouse-client --user default --password hurys@123 -d default --multiquery <1_ads.sql

(四)hive_dws(DWS层任务)

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`

hive -e "
use hurys_dc_dws;

set hive.vectorized.execution.enabled=false;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=2000;
    
            
insert  overwrite  table  hurys_dc_dws.dws_statistics_volume_1hour  partition(day='$yesdate')
select
       dwd_st.device_no,
       dwd_sc.scene_name,
       dwd_st.lane_no,
       dwd_rl.lane_direction,
       dwd_st.section_no,
       dwd_rc.device_direction,
       sum(volume_sum) sum_volume_hour,
       concat(substr(create_time, 1, 14), '00:00') start_time
from hurys_dc_dwd.dwd_statistics as dwd_st
    right join hurys_dc_dwd.dwd_radar_lane as dwd_rl
              on dwd_rl.device_no=dwd_st.device_no and dwd_rl.lane_no=dwd_st.lane_no
    right join hurys_dc_dwd.dwd_device_scene as dwd_ds
              on dwd_ds.device_no=dwd_st.device_no
    right join hurys_dc_dwd.dwd_scene as dwd_sc
              on dwd_sc.scene_id = dwd_ds.scene_id
    right join hurys_dc_dwd.dwd_radar_config as dwd_rc
              on dwd_rc.device_no=dwd_st.device_no
where dwd_st.create_time is not null  and  day= '$yesdate'
group by dwd_st.device_no, dwd_sc.scene_name, dwd_st.lane_no, dwd_rl.lane_direction, dwd_st.section_no, dwd_rc.device_direction, concat(substr(create_time, 1, 14), '00:00')    
"

(五)hive_basic(维度表基础库)

#! /bin/bash
source /etc/profile

hive -e "
set hive.vectorized.execution.enabled=false;

use hurys_dc_basic
"

(六)dolphinscheduler_log(删除海豚日志文件)

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`

cd  /usr/local/hurys/dc_env/dolphinscheduler/dolphin/logs/

rm -rf dolphinscheduler-api.$yesdate*.log
rm -rf dolphinscheduler-master.$yesdate*.log
rm -rf dolphinscheduler-worker.$yesdate*.log

(七)Kettle_Hive_to_ClickHouse(Kettle采集Hive的DWS层数据同步到ClickHouse的ADS层中)

#!/bin/bash
source /etc/profile

/usr/local/hurys/dc_env/kettle/data-integration/pan.sh -rep=hurys_linux_kettle_repository -user=admin -pass=admin -dir=/hive_to_clickhouse/ -trans=17_Hive_to_ClickHouse_ads_avg_volume_15min level=Basic >>/home/log/kettle/17_Hive_to_ClickHouse_ads_avg_volume_15min_`date +%Y%m%d`.log 

(八)Kettle_MySQL_to_HDFS(Kettle采集MySQL维度表数据到HDFS中)

(九)hive_dwd(DWD层任务)

1、业务数据的JSON有多层

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`

hive -e "
use hurys_dc_dwd;

set hive.vectorized.execution.enabled=false;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=1500;

with t1 as(
select
       get_json_object(queue_json,'$.deviceNo')   device_no,
       get_json_object(queue_json,'$.createTime') create_time,
       get_json_object(queue_json,'$.laneNum')    lane_num,
       get_json_object(queue_json,'$.queueList')  queue_list
from hurys_dc_ods.ods_queue
where date(get_json_object(queue_json,'$.createTime')) = '$yesdate'
    )
insert  overwrite  table  hurys_dc_dwd.dwd_queue partition(day='$yesdate')
select
        t1.device_no,
        t1.lane_num,
        substr(create_time,1,19)                                               create_time ,
        get_json_object(list_json,'$.laneNo')                                  lane_no,
        get_json_object(list_json,'$.laneType')                                lane_type,
        get_json_object(list_json,'$.queueCount')                              queue_count,
        cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2))       queue_len,
        cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2))       queue_head,
        cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2))       queue_tail
from t1
lateral view explode(split(regexp_replace(regexp_replace(queue_list,
                                                '\\\\[|\\\\]','') ,      --将json数组两边的中括号去掉
                                 '\\\\}\\\\,\\\\{','\\\\}\\\\;\\\\{'),   --将json数组元素之间的逗号换成分号
                   '\\\\;')   --以分号作为分隔符(split函数以分号作为分隔)
          )list_queue as list_json
where  device_no is not null  and  get_json_object(list_json,'$.queueLen') between 0 and 500 and  get_json_object(list_json,'$.queueHead')  between 0 and 500 and  get_json_object(list_json,'$.queueTail')  between 0 and 500 and  get_json_object(list_json,'$.queueCount') between 0 and 100
group by t1.device_no, t1.lane_num, substr(create_time,1,19), get_json_object(list_json,'$.laneNo'), get_json_object(list_json,'$.laneType'), get_json_object(list_json,'$.queueCount'), cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2)), cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2)), cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2))
"

2、业务数据的JSON单层

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`

hive -e "
use hurys_dc_dwd;

set hive.vectorized.execution.enabled=false;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=1500;

with t1 as(
select
        get_json_object(turnratio_json,'$.deviceNo')        device_no,
        get_json_object(turnratio_json,'$.cycle')           cycle,
        get_json_object(turnratio_json,'$.createTime')      create_time,
        get_json_object(turnratio_json,'$.volumeSum')       volume_sum,
        cast(get_json_object(turnratio_json,'$.speedAvg')     as decimal(10,2))    speed_avg,
        get_json_object(turnratio_json,'$.volumeLeft')      volume_left,
        cast(get_json_object(turnratio_json,'$.speedLeft')    as decimal(10,2))    speed_left,
        get_json_object(turnratio_json,'$.volumeStraight')  volume_straight,
        cast(get_json_object(turnratio_json,'$.speedStraight')as decimal(10,2))    speed_straight,
        get_json_object(turnratio_json,'$.volumeRight')     volume_right,
        cast(get_json_object(turnratio_json,'$.speedRight')   as decimal(10,2))    speed_right ,
        case when  get_json_object(turnratio_json,'$.volumeTurn')  is null then 0 else get_json_object(turnratio_json,'$.volumeTurn')  end as   volume_turn ,
        case when  get_json_object(turnratio_json,'$.speedTurn')   is null then 0 else cast(get_json_object(turnratio_json,'$.speedTurn')as decimal(10,2))   end as   speed_turn
from hurys_dc_ods.ods_turnratio
where date(get_json_object(turnratio_json,'$.createTime')) = '$yesdate'
)
insert overwrite table hurys_dc_dwd.dwd_turnratio partition (day='$yesdate')
select
       t1.device_no,
       cycle,
       substr(create_time,1,19)              create_time ,
       volume_sum,
       speed_avg,
       volume_left,
       speed_left,
       volume_straight,
       speed_straight ,
       volume_right,
       speed_right ,
       volume_turn,
       speed_turn
from t1
where device_no is not null and volume_sum between 0 and 1000 and speed_avg between 0 and 150 and volume_left  between 0 and 1000 and speed_left between 0 and 100 and volume_straight between 0 and 1000 and speed_straight between 0 and 150 and volume_right between 0 and 1000 and speed_right between 0 and 100 and volume_turn between 0 and 100 and speed_turn between 0 and 100
group by t1.device_no, cycle, substr(create_time,1,19), volume_sum, speed_avg, volume_left, speed_left, volume_straight, speed_straight, volume_right, speed_right, volume_turn, speed_turn
"

3、维度数据

#! /bin/bash
source /etc/profile

hive -e "
use hurys_dc_dwd;

set hive.vectorized.execution.enabled=false;

insert overwrite table hurys_dc_dwd.dwd_holiday
select
day, holiday,year
from hurys_dc_basic.tb_holiday
group by day, holiday, year
"

(十)hive_ods(ODS层任务)

#! /bin/bash
source /etc/profile

hive -e "
use hurys_dc_ods;

msck repair table ods_queue;

msck repair table ods_turnratio;

msck repair table ods_queue_dynamic;

msck repair table ods_statistics;

msck repair table ods_area;

msck repair table ods_pass;

msck repair table ods_track;

msck repair table ods_evaluation;

msck repair table ods_event;
"

目前,整个离线数仓的流程大致就是这样,有问题的后面再完善!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1564667.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Topaz Video AI for Mac v5.0.0激活版 视频画质增强软件

Topaz Video AI for Mac是一款功能强大的视频处理软件&#xff0c;专为Mac用户设计&#xff0c;旨在通过人工智能技术为视频编辑和增强提供卓越的功能。这款软件利用先进的算法和深度学习技术&#xff0c;能够自动识别和分析视频中的各个元素&#xff0c;并进行智能修复和增强&…

llama.cpp运行qwen0.5B

编译llama.cp 参考 下载模型 05b模型下载 转化模型 创建虚拟环境 conda create --prefixD:\miniconda3\envs\llamacpp python3.10 conda activate D:\miniconda3\envs\llamacpp安装所需要的包 cd G:\Cpp\llama.cpp-master pip install -r requirements.txt python conver…

docker容器环境安装记录(MAC M1)(完善中)

0、背景 在MAC M1中搭建商城项目环境时&#xff0c;采用docker统一管理开发工具&#xff0c;期间碰到了许多环境安装问题&#xff0c;做个总结。 1、安装redis 在宿主机新建redis.conf文件运行创建容器命令&#xff0c;进行容器创建、端口映射、文件挂载、以指定配置文件启动…

《QT实用小工具·八》数据库通用翻页类

1、概述 源码放在文章末尾 该项目实现数据库通用翻页类&#xff0c;主要包含如下功能&#xff1a; 1:自动按照设定的每页多少行数据分页 2:只需要传入表名/字段集合/每页行数/翻页指示按钮/文字指示标签 3:提供公共静态方法绑定字段数据到下拉框 4:建议条件字段用数字类型的主…

37.HarmonyOS鸿蒙系统 App(ArkUI) 创建第一个应用程序hello world

HarmonyOS App(ArkUI) 创建第一个应用程序helloworld 线性布局 1.鸿蒙应用程序开发app_hap开发环境搭建 3.DevEco Studio安装鸿蒙手机app本地模拟器 打开DevEco Studio,点击文件-》新建 双击打开index.ets 复制如下代码&#xff1a; import FaultLogger from ohos.faultL…

SpringBoot+ECharts+Html 字符云/词云案例详解

1. 技术点 SpringBoot、MyBatis、thymeleaf、MySQL、ECharts 等 2. 准备条件 在mysql中创建数据库echartsdb&#xff0c;数据库中创建表t_comment表&#xff0c;表中设置两个字段word与count&#xff0c;添加表中的数据。如&#xff1a;附件中的 echartsdb.sql 3. SpringBoot…

将excel数据拆分成多个excel文件

一、背景&#xff1a; 平时在日常工作中&#xff0c;经常需要将excel的文件数据进行拆分&#xff0c;拆分成多个excel文件&#xff0c;然而用人工来处理这个既耗时&#xff0c;又费精力&#xff0c;眼睛会疲劳&#xff0c;时间长了操作上会出现失误&#xff0c;导致数据拆分错…

BetterZip2024Mac上一款功能强大的Mac平台解压压缩软件

一、软件概述 BetterZip是一款Mac平台上的压缩解压缩工具&#xff0c;它为用户提供了一个方便的方式来处理各种压缩文件&#xff0c;包括但不限于ZIP、TAR、GZIP等格式。除了基本的压缩解压缩功能外&#xff0c;BetterZip还具备文件预览、文件加密、分卷压缩等高级功能&#x…

JUC:double-checked locking(DCL) 懒汉单例模式

文章目录 double-checked locking(DCL) 问题解决方法 volatile作用 double-checked locking(DCL) 问题 第一个if用于后续进入的线程&#xff0c;不用再获取锁来判断是否已经创建了对象。第二个if&#xff0c;为的是第一个进入的线程创建对象&#xff0c;以及防止卡在第一个if之…

nodeJs 实现视频的转换(超详细教程)

前段时间拿到一个视频是4k的&#xff0c;没法播放&#xff0c;于是通过 node.js 和 ffmpeg 实现了视频的转换。在win10 系统下实现。 所需工具 node 16.19 直接安装 ffmpeg-5.1.1-essentials_build 解压后重名 ffmpeg 放到C盘 然后配置下环境变量 Git-2.42.0.2-64-bit 直接…

【HTML】注册页面制作 案例二

&#xff08;大家好&#xff0c;今天我们将通过案例实战对之前学习过的HTML标签知识进行复习巩固&#xff0c;大家和我一起来吧&#xff0c;加油&#xff01;&#x1f495;&#xff09; 案例复习 通过综合案例&#xff0c;主要复习&#xff1a; 表格标签&#xff0c;可以让内容…

linux通过进程pid查询容器docker

我遇到的问题是在docker中启动了进行&#xff0c;占用显卡&#xff0c;如下nvidis-smi查看&#xff1a; 现在要查询pid16325属于哪个容器ID&#xff0c;指令&#xff1a; ps -e -o pid,cmd,comm,cgroup | grep 16325查到如下结果&#xff0c;其中12:cpuset:/docker/ 后面的 8…

算法打卡day32|贪心算法篇06|Leetcode 738.单调递增的数字、968.监控二叉树

算法题 Leetcode 738.单调递增的数字 题目链接:738.单调递增的数字 大佬视频讲解&#xff1a;单调递增的数字视频讲解 个人思路 这个题目就是从例子中找规律&#xff0c;例如 332&#xff0c;从后往前遍历&#xff0c;32不是单调递增将2变为9,3减1&#xff0c;变成了329&…

浏览器工作原理与实践--WebAPI:XMLHttpRequest是怎么实现的

在上一篇文章中我们介绍了setTimeout是如何结合渲染进程的循环系统工作的&#xff0c;那本篇文章我们就继续介绍另外一种类型的WebAPI——XMLHttpRequest。 自从网页中引入了JavaScript&#xff0c;我们就可以操作DOM树中任意一个节点&#xff0c;例如隐藏/显示节点、改变颜色、…

HarmonyOS 应用开发之通过数据管理服务实现数据共享静默访问

场景介绍 典型跨应用访问数据的用户场景下&#xff0c;数据提供方会存在多次被拉起的情况。 为了降低数据提供方拉起次数&#xff0c;提高访问速度&#xff0c;OpenHarmony提供了一种不拉起数据提供方直接访问数据库的方式&#xff0c;即静默数据访问。 静默数据访问通过数据…

社交媒体市场:揭示Facebook的商业模式

在数字化时代&#xff0c;社交媒体已经成为人们生活中不可或缺的一部分。Facebook作为全球最大的社交媒体平台之一&#xff0c;其商业模式的运作方式对于了解社交媒体市场的发展趋势和影响力至关重要。本文将深入探讨Facebook的商业模式&#xff0c;剖析其运作机制&#xff0c;…

用户体验:探讨Facebook如何优化用户体验

在数字化时代&#xff0c;用户体验是社交媒体平台成功与否的关键因素之一。作为全球最大的社交媒体平台之一&#xff0c;Facebook一直在努力优化用户体验&#xff0c;从功能设计到内容呈现再到隐私保护&#xff0c;不断提升用户满意度。本文将深入探讨Facebook如何优化用户体验…

Codeforces Round 928 (Div. 4)F. Vlad and Avoiding X 二维转一维成为线性,然后dfs就可以线性暴力

当所有的都是Black时&#xff0c;只需要8个点就可以不出现“X”型。 ——题解 Problem - F - Codeforces 思路&#xff1a; 如标题。此题还是值得思考练习下暴力写法的。 **为什么上图有的被粉色标记了呢&#xff0c;因为白色和粉色之间互不干扰。** 所以题解把两种…

【测试篇】接口测试

接口测试&#xff0c;可以用可视化工具 postman。 如何做接口测试&#xff1f;&#xff1f; 我们可以先在浏览器中随机进入一个网页&#xff0c;打开开发者工具&#xff08;F12&#xff09;。 随便找一个接口Copy–>Copy as cURL(bash) 打开postman 复制地址 进行发送。 …

django-haystack,具有全文搜索功能的 Python 库!

目录 前言 安装与配置 全文搜索基础 搜索引擎配置 索引配置 搜索视图与模板 过滤器与排序 自定义搜索逻辑 应用场景 1. 电子商务网站的商品搜索 2. 新闻网站的文章搜索 3. 社交网站的用户搜索 4.企业内部系统的文档搜索 总结 前言 大家好&#xff0c;今天为大家分享…