基于“Doris”的type2拉链表的Mysql实现
需求说明
基于Doris实现Type2、拉链表。主要对上游系统里的面积字段进行监控,如果发现变化则跟踪记录到维度表里。
解决方案
type2相关概念见如下链接:
SCD缓慢变化维拉链表
这里特别需要注意的是:
1、因为Doris不支持多表关联的update和delete且在type2实现时需要保证错位的endtime和starttime一致,因此直接在Doris里实现不现实。
2、该方法实际的type2实现是在mysql里,通过同步的方式再回到Doris内。
实施步骤
编写Type2存储过程
#Step1 在Mysql里编写Type2的存储过程
CREATE PROCEDURE `type2pro`()
BEGIN
DECLARE currtime datetime default now();
DROP TABLE IF EXISTS tmp_stationdate;
CREATE TABLE `tmp_stationdate`
(
`src` varchar(30),
`stationid` varchar(100) COLLATE utf8_bin DEFAULT NULL,
`stationname` varchar(100) COLLATE utf8_bin DEFAULT NULL,
`heatingarea` decimal(12,2) DEFAULT NULL,
`normalheatingarea` decimal(12,2) DEFAULT NULL,
`extraheatingarea` decimal(12,2) DEFAULT NULL,
`constructionarea` decimal(12,2) DEFAULT NULL,
`chargearea` decimal(12,2) DEFAULT NULL,
`ispush` varchar(10) COLLATE utf8_bin DEFAULT NULL,
`iscalculate` varchar(10) COLLATE utf8_bin DEFAULT NULL,
`participationcalculation` varchar(10) COLLATE utf8_bin DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
-- # 插入临时表,保存之前最新的记录
INSERT INTO tmp_stationdate
SELECT 'hefei',C.stationid,C.stationname,C.heatingarea,C.normalheatingarea,C.extraheatingarea,C.constructionarea,C.chargearea,
C.ispush,C.iscalculate,C.participationcalculation
FROM dim_station C
LEFT JOIN g_station B
ON B.uniqueid = C.stationid AND B.heatingarea = C.heatingarea AND B.normalheatingarea = C.normalheatingarea
WHERE (B.heatingarea IS NULL OR B.normalheatingarea IS NULL) AND C.iscurrent = 1;
-- # 更新之前的维度变换的数据,即iscurrent=1的endtime更新为当前修改时间
UPDATE dim_station C
LEFT JOIN g_station B
ON B.uniqueid = C.stationid AND B.heatingarea = C.heatingarea AND B.normalheatingarea = C.normalheatingarea
SET C.endtime = currtime,C.iscurrent = 0
WHERE (B.heatingarea IS NULL OR B.normalheatingarea IS NULL) AND C.iscurrent = 1;
-- # 插入维度变换的数据,并置最新的iscurrent为1
INSERT INTO dim_station(
stationid , stationname,starttime,endtime,iscurrent,heatingarea,normalheatingarea,extraheatingarea,constructionarea,chargearea,ispush,iscalculate,participationcalculation
)
SELECT E.uniqueid AS stationid,D.stationname,currtime starttime,STR_TO_DATE('9999-12-31 23:59:59','%Y-%m-%d %H:%i:%s') endtime,1 iscurrent,E.heatingarea,E.normalheatingarea,E.extraheatingarea,E.constructionarea,E.chargearea,E.ispush,E.iscalculate,E.participationcalculation
FROM tmp_stationdate D
JOIN g_station E
ON D.stationid = E.uniqueid;
-- # 插入新增的维度数据并置iscurrent为1
INSERT INTO dim_station(
stationid , stationname,starttime,endtime,iscurrent,heatingarea,normalheatingarea,extraheatingarea,constructionarea,chargearea,ispush,iscalculate,participationcalculation
)
SELECT A.uniqueid AS stationid,A.name stationname,currtime starttime,STR_TO_DATE('9999-12-31 23:59:59','%Y-%m-%d %H:%i:%s') endtime,1 iscurrent,A.heatingarea,A.normalheatingarea,A.extraheatingarea,A.constructionarea,A.chargearea,A.ispush,A.iscalculate,A.participationcalculation
FROM g_station A
LEFT JOIN dim_station B
ON A.uniqueid = B.stationid
WHERE B.stationid IS NULL;
END
编写调用Shell脚本
#Step2 定义Type2调用及数据同步的Shell
#!/bin/bash
port="9030"
username="root"
passwd="Mysql#2023"
dbname="businessdb"
hostname="192.168.2.50"
labelval=`date "+%Y%m%d-%H%M%S"`
# 1 Doris数据入本地文件
mysqldump -h192.168.2.50 -P9030 -uroot -p888888 --no-create-info --no-tablespaces --databases dorisdb --tables station > '/root/workspace/station.sql'
mysqldump -h192.168.2.50 -P9030 -uroot -p888888 --no-create-info --no-tablespaces --databases dorisdb --tables dim_station > '/root/workspace/dim_station.sql'
# 2 Mysql清空原有表数据
mysql -uroot -p$passwd -D$dbname -e 'TRUNCATE TABLE station;'
mysql -uroot -p$passwd -D$dbname -e 'TRUNCATE TABLE dim_station;'
# 3 Mysql本地文件入库
mysql -uroot -p$passwd -D$dbname < '/root/workspace/station.sql'
mysql -uroot -p$passwd -D$dbname < '/root/workspace/dim_station.sql'
# 4 Mysql执行存储过程生成Type2
mysql -uroot -p$passwd -D$dbname -e 'call type2pro()'
# 5 删除已经存在的本地文件
rm -rf /var/lib/mysql-files/dim_station.txt
# 6 Mysql导出本地文件,默认字段分割符是制表符"\t"
mysql -uroot -p'Runa#2020' -D$dbname -e "SELECT * FROM dim_station INTO OUTFILE '/var/lib/mysql-files/dim_station.txt'";
# 7 清空Doris库表数据(修改dim_station表结构,不再进行表清空操作)
mysql -h192.168.2.50 -P9030 -uroot -p888888 -Ddorisdb -e "TRUNCATE TABLE dim_station;"
# 8 load到Doris库里,这里的%09代表制表符"\t"
curl --location-trusted -u root:888888 -T /var/lib/mysql-files/dim_station.txt http://192.168.2.50:8030/api/zaozhuang/dim_station/_load?label=dim_station_$labelval&column_separator=%09
编辑Crontab调度
#Step3 Crontab里定义调度
crontab -e新增调度,这里是每隔2个小时执行一次。
0 */2 * * * /root/type2 >/root/type2.log2>&1