一、目的
实时数仓用的是ClickHouse,为了避免Hive还要清洗数据,因此就直接把ClickHouse中清洗数据同步到Hive中就行
二、所需工具
ClickHouse:clickhouse-client-21.9.5.16
Kettle:kettle9.2
Hadoop:hadoop-3.1.3
Hive:hive-3.1.2
海豚调度器:dolphinscheduler-2.0.5
三、技术路径
由于Hive中DWD层是静态分区表,而无法在Kettle中动态指定分区日期
因此只能每日执行kettle任务,从ClickHouse同步到HDFS中,然后到Hive清洗表的每日分区下
四、表结构
4.1 clickhouse
create table if not exists hurys_jw.dwd_statistics( id String comment '唯一ID', device_no String comment '设备编号', source_device_type Nullable(String) comment '设备类型', sn Nullable(String) comment '设备序列号 ', model Nullable(String) comment '设备型号', create_time DateTime comment '创建时间', cycle Nullable(Int32) comment '统计数据周期' , lane_no Nullable(Int32) comment '车道编号', lane_type Nullable(Int32) comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道', section_no Nullable(Int32) comment '断面编号', coil_no Nullable(Int32) comment '线圈编号', volume_sum Nullable(Int32) comment '不区分车型机动车总流量', volume_person Nullable(Int32) comment '行人流量', volume_car_non Nullable(Int32) comment '非机动车流量', volume_car_small Nullable(Int32) comment '小车流量', volume_car_middle Nullable(Int32) comment '中车流量', volume_car_big Nullable(Int32) comment '大车流量', speed_avg Nullable(Decimal(10, 2)) comment '平均速度(km/h)', speed_85 Nullable(Decimal(10, 2)) comment '85位速度(km/h)', time_occupancy Nullable(Decimal(10, 2)) comment '时间占有率(%)', average_headway Nullable(Decimal(10, 2)) comment '平均车头时距(s)', average_gap Nullable(Decimal(10, 2)) comment '平均车间时距(s)', day Date comment '日期' ) ENGINE = MergeTree PARTITION BY day PRIMARY KEY (day,id) ORDER BY (day,id) TTL day + toIntervalDay(7) SETTINGS index_granularity = 8192;
4.2 hive
create external table if not exists hurys_db.dwd_statistics( id string comment '唯一ID', device_no string comment '设备编号', source_device_type string comment '设备类型', sn string comment '设备序列号 ', model string comment '设备型号', create_time string comment '创建时间', cycle int comment '统计数据周期' , lane_no int comment '车道编号', lane_type int comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道', section_no int comment '断面编号', coil_no int comment '线圈编号', volume_sum int comment '不区分车型机动车总流量', volume_person int comment '行人流量', volume_car_non int comment '非机动车流量', volume_car_small int comment '小车流量', volume_car_middle int comment '中车流量', volume_car_big int comment '大车流量', speed_avg decimal(10,2) comment '平均速度(km/h)', speed_85 decimal(10,2) comment '85位速度(km/h)', time_occupancy decimal(10,2) comment '时间占有率(%)', average_headway decimal(10,2) comment '平均车头时距(s)', average_gap decimal(10,2) comment '平均车间时距(s)' ) comment '统计数据外部表——静态分区' partitioned by (day string) row format delimited fields terminated by ',' tblproperties("skip.header.line.count"="1") ;
五、实施步骤
5.1 Kettle任务(clickhouse到hdfs)
5.1.1 获取系统信息
5.1.2 字段选择1
5.1.3 自动获取当前日期1
//Script here
var currentDate = date; // 这里 date 应该是从输入流中获取的 Date 对象
// 计算前一天的日期
var previousDate = new Date(currentDate.getTime() - 24*60*60*1000);
5.1.4 字段选择2
5.1.5 clickhouse输入
注意:day字段类型转换
5.1.6 字段选择3
5.1.7 Hadoop file output
5.1.8 运行kettle任务
5.1.9 HDFS文件
5.2 海豚任务(从HDFS到Hive表分区中)
5.2.1 配置海豚任务
#! /bin/bash
source /etc/profile
nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`
hadoop fs -test -e /user/hive/warehouse/hurys_db.db/dwd_statistics/day=$yesdate
if [ $? -ne 0 ]; then
echo "文件不存在"
else
hdfs dfs -rm -r /user/hive/warehouse/hurys_db.db/dwd_statistics/day=$yesdate
fi
/usr/local/hurys/dc_env/kettle/data-integration/pan.sh -rep=hurys_linux_kettle_repository -user=admin -pass=admin -dir=/clickhouse_to_hive/ -trans=01_ClickHouse_to_Hive_dwd_statistics
hdfs dfs -mkdir -p /user/hive/warehouse/hurys_db.db/dwd_statistics/day=$yesdate
hdfs dfs -mv /user/hive/warehouse/hurys_db.db/dwd_statistics/statistics.gz /user/hive/warehouse/hurys_db.db/dwd_statistics/day=$yesdate/statistics.gz
5.2.2 执行海豚任务
5.2.3 Hive分区表
5.2.4 刷新表分区,查看分区数据
--刷新表分区 msck repair table hurys_db.dwd_statistics; --查看表分区 show partitions hurys_db.dwd_statistics; --查看表数据 select * from hurys_db.dwd_statistics where day = '2024-10-16';
搞定!