[数据质量]手动实现 阿里云DataWorks 的数据质量监控告警功能

news2025/4/26 4:51:05

目录

      • 手动实现 DataWorks 的数据质量监控告警功能
        • 1. 简介:
        • 2. 数据表准备
          • 2.1 tmp_monitor_tbl_info (数据监控信息表)
          • 2.2 tmp_monitor_rule_info (数据质量监控规则表)
          • 2.3 tmp_monitor_tbl_info_log_di (数据监控信息记录表)
        • 3. 程序开发
          • 3.1 数据检查程序
          • 3.2 告警信息推送程序
          • 3.3 告警样例
        • 4. 规则代码
          • 4.1 表行数,上周期差值
          • 4.2 表行数, 固定值
          • 4.3 表行数, n天波动率
        • 5. 结语
        • end

手动实现 DataWorks 的数据质量监控告警功能

1. 简介:

使用Python 实现对数据库表的监控告警功能, 并将告警信息通过钉钉机器人发送到钉钉群
实现DataWorks中数据质量的基本功能, 当然 DW的数据质量的规则类型很多, 用起来比较方便, 这里目前简单实现了其中三个规则类型的功能, 仅供参考, 欢迎交流;
初次使用Python, 请多指教
使用工具: MaxCompute

2. 数据表准备
2.1 tmp_monitor_tbl_info (数据监控信息表)

创建 数据监控信息表 (事务表)

CREATE TABLE IF NOT EXISTS tmp_monitor_tbl_info (
      `id`					STRING      COMMENT '表编号id'
	, `tbl_name`			STRING      COMMENT '表名'
	, `pt_format`			STRING      COMMENT '分区格式: yyyy-MM-dd,yyyyMMdd 等'
	, `val_type`			STRING      COMMENT '值类型: 表行数,周期值等'
    , `monitor_flag` 		int         COMMENT '监控标识: 0:不监控, 1:监控;'
    , `rule_code` 			int         COMMENT '规则编码: 1:表行数,上周期差值, 2:表行数,固定值 等'
    , `rule_type`			STRING      COMMENT '规则类型: 表行数,上周期差值; 表行数,固定值; 与固定值比较 等'
    , `expect_val` 			int         COMMENT '期望值'
    , `tbl_sort_code`       int         COMMENT '表类型编码: 0:其它(维表类), 1:亚马逊, 2:中小平台, 3:市场数据 等'
    , `tbl_sort_name`       STRING      COMMENT '表类型名字: 0:其它(维表类), 1:亚马逊, 2:中小平台, 3:市场数据 等'
    , `pt_num`				INT         COMMENT '分区日期差值'
    , `monitor_freq`		STRING      COMMENT '监控频率: h:小时监控; d:天监控;'
    , `monitor_hour`		STRING      COMMENT '监控时间: 默认值:every hour'
    , `rule_parameters`		STRING      COMMENT '规则参数: 不同规则的补充参数(默认:无)'
    , `tbl_id`			    STRING      COMMENT '表ID: 一个表对应一个ID'
) COMMENT '数据监控表信息' 
tblproperties ("transactional"="true", 
               "write.bucket.num" = "10", 
               "acid.data.retain.hours"="120") 
;

更新数据DML

-- 插入几条数据
INSERT INTO TABLE tmp_monitor_tbl_info (id, tbl_name, pt_format, val_type, monitor_flag, rule_code, rule_type, expect_val
                                                            , tbl_sort_code, tbl_sort_name, pt_num, monitor_freq, monitor_hour, rule_parameters, tbl_id) 
VALUES   
  ('7583f809d87a3743d1bbc151372edc74',   'ods_amazon_amz_order_df',              'yyyyMMdd', '表行数',  1,  1,  '表行数,上周期差值',   0,   1, '亚马逊-退款分析',  -1, 'd', '03', '无', 149)
, ('be970b38a04b6ab054f5c1f9ccb91b07',   'ods_dawnwin_erp_op_order_df',          'yyyyMMdd', '表行数',  1,  1,  '表行数,上周期差值',   0,   1, '亚马逊-退款分析',  -1, 'd', '03', '无', 150)
, ('1f18a778e6631176bab2a7c4d3415915',   'dws_amazon_market_refund_analysis_di', 'yyyyMMdd', '表行数',  1,  2,  '表行数,固定值',     200,   1, '亚马逊-退款分析',  -1, 'd', '08', '无', 151)
;

-- id 生成 MD5(CONCAT(tbl_name, '_', rule_code, '_', rule_parameters))
SELECT MD5(CONCAT('dwd_gmv_scm_book_balance_di', '_', 3, '_', '7')) AS id ;

-- 更新一条数据
UPDATE tmp_monitor_tbl_info SET tbl_id = 29 WHERE tbl_id = 33 ;
-- 删除一条数据
DELETE FROM tmp_monitor_tbl_info  WHERE tbl_id = 30 ;

-- 查看数据
SELECT * FROM tmp_monitor_tbl_info ;

-- 添加列 添加字段
ALTER TABLE tmp_monitor_tbl_info ADD COLUMNS pt_num INT COMMENT '分区日期差值' ;
-- 更新数据, 给新增字段设置值
UPDATE tmp_monitor_tbl_info SET pt_num = -1 ;
-- 更新数据, 修改其中一条数据一个字段值
UPDATE tmp_monitor_tbl_info SET pt_num = -2 WHERE tbl_id = 38 ;

-- 更新数据
UPDATE tmp_monitor_tbl_info SET expect_val = 580 WHERE tbl_id in (26, 27) ;

-- 添加列 添加字段
ALTER TABLE tmp_monitor_tbl_info ADD COLUMNS tbl_id INT COMMENT '表ID: 一个表对应一个ID' ;

ALTER TABLE tmp_monitor_tbl_info_log_di ADD COLUMNS actual_val DECIMAL(20,6) COMMENT '真实值: 与期望值对应的真实值' ;

-- 更新数据, 给新增字段设置值
UPDATE tmp_monitor_tbl_info SET rule_parameters = '无' ;
UPDATE tmp_monitor_tbl_info SET tbl_id = CAST(id AS INT) ;
UPDATE tmp_monitor_tbl_info SET id = MD5(CONCAT(tbl_name, '_', rule_code, '_', rule_parameters)) ;

-- 更新数据, 修改其中一条数据一个字段值
UPDATE tmp_monitor_tbl_info SET monitor_freq = 'd' WHERE tbl_id = 3 ;
UPDATE tmp_monitor_tbl_info SET monitor_freq = 'd' WHERE tbl_id = 4 ;

-- 删除一条数据
DELETE FROM tmp_monitor_tbl_info WHERE tbl_id = 72 AND rule_parameters IN ('1', '7') ;

-- 插入几条数据
INSERT INTO TABLE tmp_monitor_tbl_info (id, tbl_name, pt_format, val_type, monitor_flag, rule_code, rule_type, expect_val, tbl_sort_code, tbl_sort_name, pt_num, monitor_freq, monitor_hour, rule_parameters, tbl_id) 
VALUES   
  ('5dc8f336c5c3b4c222c3215910e12400',	'dwd_gmv_scm_book_balance_di', 'yyyyMMdd',	'表行数',	1,	3,  '表行数,n天波动率',     5,		    5,	'gmv',	0, 'd', '15,17,19', '1', 72)
, ('2394d8dc9954381bfe3f3d29a02bd263',	'dwd_gmv_scm_book_balance_di', 'yyyyMMdd',	'表行数',	1,	3,  '表行数,n天波动率',     10,		5,	'gmv',	0, 'd', '15,17,19', '7', 72)
;

-- 查看 监控表信息
SELECT * FROM tmp_monitor_tbl_info
ORDER BY tbl_id DESC
;
2.2 tmp_monitor_rule_info (数据质量监控规则表)

创建表DDL

-- 规则表
CREATE TABLE IF NOT EXISTS tmp_monitor_rule_info(
      `rule_id`                       INT             COMMENT '规则id'
    , `rule_name`                     STRING          COMMENT '规则名字' -- 表行数,上周期差值
    , `rule_code`		              STRING          COMMENT '规则代码'
    , `rule_sql`                      STRING          COMMENT '规则sql'
)
COMMENT '数据质量监控规则表'
tblproperties ("transactional"="true", 
               "write.bucket.num" = "10", 
               "acid.data.retain.hours"="120") 
--lifecycle 365
;

插入数据/更新数据 DML

INSERT INTO TABLE tmp_monitor_rule_info (rule_id, rule_name, rule_code, rule_sql) 
VALUES   
    (1, '表行数,上周期差值',   'upper_period_diff', '')
  , (2, '表行数,固定值',       'line_fixed_val', '')
  , (3, '表行数,n天波动率',    'cycle_volatility', '')
;

UPDATE tmp_monitor_rule_info 
SET rule_sql = "set odps.sql.hive.compatible=true ; INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}') SELECT a.id , a.tbl_name , a.stat_time , a.pt_format , a.stat_pt , a.val_type , a.val , a.rule_code , a.rule_type , a.expect_val , IF (a.val = 0, 1, (IF ((a.val - NVL(b.val,0)) >= {expect_val}, 0, 1 ))) AS is_exc , a.tbl_sort_code , a.tbl_sort_name , a.val - NVL(b.val,0) AS actual_val FROM ( SELECT concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id , '{tbl_name}' AS tbl_name , '{now_time}' AS stat_time , '{pt_format}' AS pt_format , date_format('{date_str}' ,'{pt_format}') AS stat_pt , '{val_type}' AS val_type , COUNT(1) AS val , '{rule_code}' AS rule_code , '{rule_type}' AS rule_type , {expect_val} AS expect_val , {tbl_sort_code} AS tbl_sort_code , '{tbl_sort_name}' AS tbl_sort_name FROM {tbl_name} WHERE pt = date_format('{date_str}' ,'{pt_format}') ) a LEFT JOIN ( SELECT tbl_name, val FROM ( SELECT tbl_name, val , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC ) AS rn FROM {res_tbl_name} WHERE pt = DATE_ADD('{pt}', -1) AND tbl_name = '{tbl_name}' AND rule_code = 1 ) WHERE rn = 1 ) b ON a.tbl_name = b.tbl_name ;"
WHERE rule_id = 1 ;

UPDATE tmp_monitor_rule_info 
SET rule_sql = "set odps.sql.hive.compatible=true ; INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}') SELECT concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id , '{tbl_name}' AS tbl_name , '{now_time}' AS stat_time , '{pt_format}' AS pt_format , date_format('{date_str}' ,'{pt_format}') AS stat_pt , '{val_type}' AS val_type , ct AS val , '{rule_code}' AS rule_code , '{rule_type}' AS rule_type , {expect_val} AS expect_val , IF (ct >= {expect_val}, 0, 1 ) AS is_exc , {tbl_sort_code} AS tbl_sort_code , '{tbl_sort_name}' AS tbl_sort_name , ct AS actual_val FROM ( SELECT COUNT(1) AS ct FROM {tbl_name} WHERE pt = date_format('{date_str}' ,'{pt_format}') ) a ;"
WHERE rule_id = 2 ;

UPDATE tmp_monitor_rule_info 
SET rule_sql = "set odps.sql.hive.compatible=true ; INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}') SELECT id , tbl_name , stat_time , pt_format , stat_pt , val_type , val_now AS val , rule_code , rule_type , expect_val , IF(val_last > 0, IF(ABS(val_now - val_last) / val_last * 100 > {expect_val}, 1, 0), IF(val_last = 0, 1, 0)) AS is_exc , tbl_sort_code , tbl_sort_name , IF(val_last > 0, ABS(val_now - val_last) / val_last * 100, -1) AS actual_val FROM ( SELECT concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', '{rule_parameters}', '_', HOUR('{now_time}') ) AS id , '{tbl_name}' AS tbl_name , '{now_time}' AS stat_time , '{pt_format}' AS pt_format , date_format('{date_str}' ,'{pt_format}') AS stat_pt , '{val_type}' AS val_type , COUNT(1) AS val_now , '{rule_code}' AS rule_code , REPLACE('{rule_type}', 'n', '{rule_parameters}') AS rule_type , {expect_val} AS expect_val , {tbl_sort_code} AS tbl_sort_code , '{tbl_sort_name}' AS tbl_sort_name , ( SELECT val FROM ( SELECT tbl_name, val , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC ) AS rn FROM {res_tbl_name} WHERE pt = DATE_ADD('{pt}', - CAST('{rule_parameters}' AS INT)) AND tbl_name = '{tbl_name}' AND rule_code = 3 ) WHERE rn = 1 ) AS val_last FROM {tbl_name} WHERE pt = date_format('{date_str}' ,'{pt_format}') ) a ;"
WHERE rule_id = 3 ;

-- 查看数据
SELECT * FROM tmp_monitor_rule_info ;
2.3 tmp_monitor_tbl_info_log_di (数据监控信息记录表)

创建表

CREATE TABLE IF NOT EXISTS puture_bigdata_dev.tmp_monitor_tbl_info_log_di (
	  `id`					STRING          COMMENT '监控id编码:md5(表名_分区)_小时'
	, `tbl_name`			STRING          COMMENT '表名'
	, `stat_time`			STRING          COMMENT '统计时间'
	, `pt_format`			STRING          COMMENT '分区格式: yyyy-MM-dd,yyyyMMdd 等'
	, `stat_pt`				STRING          COMMENT '统计分区'
	, `val_type`			STRING          COMMENT '值类型: 表行数,周期值等'
    , `val` 				int             COMMENT '统计值'
    , `rule_code` 			int             COMMENT '规则编码: 1:表行数,上周期差值, 2:表行数,固定值 等'
    , `rule_type`			STRING          COMMENT '规则类型: 表行数,上周期差值; 表行数,固定值; 与固定值比较 等'
    , `expect_val` 			int             COMMENT '期望值'
    , `is_exc` 				int             COMMENT '是否异常: 0:否,1:是,默认值0'
    , `tbl_sort_code`       int             COMMENT '表类型编码: 0:其它(维表类), 1:亚马逊, 2:中小平台, 3:市场数据 等'
    , `tbl_sort_name`       STRING          COMMENT '表类型名字: 0:其它(维表类), 1:亚马逊, 2:中小平台, 3:市场数据 等'
    , `actual_val`          DECIMAL(20,6)   COMMENT '真实值: 与期望值对应的真实值' 
) COMMENT '数据监控信息记录表'
PARTITIONED BY (pt STRING COMMENT '数据日期, yyyy-MM-dd') 
;
3. 程序开发
3.1 数据检查程序

程序代码

'''PyODPS 3
请确保不要使用从 MaxCompute下载数据来处理。下载数据操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。 
推荐使用 PyODPS DataFrame(从 MaxCompute 表创建)和MaxCompute SQL来处理数据。
更详细的内容可以参考:https://help.aliyun.com/document_detail/90481.html
'''
import os
from odps import ODPS, DataFrame
from datetime import datetime, timedelta
from dateutil import parser
options.tunnel.use_instance_tunnel = True

# 获取当前时间
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("now_time:{}".format(now_time) )
pt = args['date']
hh = args['hour']
print("pt:{}".format(pt) )
print("hh:{}".format(hh) )
date = datetime.strptime(pt, "%Y-%m-%d") 

# 结果表
res_tbl_name = "tmp_monitor_tbl_info_log_di"  

# 监控表列表
# 小时任务里有天级别的监控表,做过滤,减少重复查询
sql_tbl_info = f"""
SELECT * FROM tmp_monitor_tbl_info    
WHERE monitor_flag = 1 AND tbl_sort_code = 5
    AND (monitor_freq = 'h' OR FIND_IN_SET('{hh}', monitor_hour) > 0)	
"""

# 查出规则sql
def rule():
    rule = f"""
    SELECT rule_sql FROM tmp_monitor_rule_info    
    WHERE rule_id = {rule_code} ;
    """
    return rule

# 具体规则(规则信息匹配之后, 执行规则sql)
def sql_rule():
    sql = eval(f'f"{rule_sql}"') 
    return sql

# 执行监控统计代码
def ex_monitor(sql: str):
    try :
        #print (sql)
        o.execute_sql(sql, hints={'odps.sql.hive.compatible': True , "odps.sql.submit.mode":"script"})
        print("{}: 运行成功".format(tbl_name) )

    except Exception as e:
        print('{}: 运行异常 ======> '.format(tbl_name) + str(e))

# 执行sql, 打印结果

if __name__ == '__main__':
    try :
        with o.execute_sql(sql_tbl_info, hints={'odps.sql.hive.compatible': True}).open_reader() as reader:

            for row_record in reader:
                #print(row_record) # 打印一条数据值
                tbl_name = row_record.tbl_name
                pt_format = row_record.pt_format
                val_type = row_record.val_type
                monitor_flag = row_record.monitor_flag
                rule_code = row_record.rule_code
                rule_type = row_record.rule_type
                expect_val = row_record.expect_val
                tbl_sort_code = row_record.tbl_sort_code
                tbl_sort_name = row_record.tbl_sort_name
                pt_num = row_record.pt_num
                rule_parameters = row_record.rule_parameters
                date_str = (date + timedelta(days=pt_num)).strftime('%Y-%m-%d')
                # 根据 rule_code 查找对应的规则代码
                with o.execute_sql(rule(), hints={'odps.sql.hive.compatible': True}).open_reader(tunnel=True, limit=False) as reader:
                    for record in reader:
                        rule_sql = record.rule_sql
                        ex_monitor(sql_rule())
                           
    except Exception as e:
        print('异常 ======> ' + str(e))
3.2 告警信息推送程序

程序代码

'''PyODPS 3
请确保不要使用从 MaxCompute下载数据来处理。下载数据操作常包括Table/Instance的open_reader以及 DataFrame的to_pandas方法。 
推荐使用 PyODPS DataFrame(从 MaxCompute 表创建)和MaxCompute SQL来处理数据。
更详细的内容可以参考:https://help.aliyun.com/document_detail/90481.html
'''

import json
import requests 
from datetime import datetime
import os
from odps import ODPS, DataFrame

date_str = args['date']

# 接口地址和token信息
url = 'https://oapi.dingtalk.com/robot/send?access_token=*****' # 修改为自己的token

now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(now_time)

# 结果表
tbl_info = "tmp_monitor_tbl_info" 
res_tbl_name = "tmp_monitor_tbl_info_log_di" 

# 查出最新数据中, 存在异常的数据表信息, 根据不同业务流程配置 tbl_sort_code
sql_query = f"""
SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc, actual_val, rule_code
FROM (
    SELECT tbl_name, stat_time, stat_pt, val_type, val, rule_type, expect_val, is_exc, actual_val, rule_code
        , ROW_NUMBER() OVER(PARTITION BY tbl_name, rule_code, rule_type ORDER BY stat_time DESC) AS rn 
    FROM {res_tbl_name} 
    WHERE pt = '{date_str}' 
         AND tbl_sort_code IN (10, 13) -- 表种类
         AND tbl_name IN (SELECT tbl_name FROM {tbl_info} WHERE monitor_flag = 1 AND tbl_sort_code IN (10, 13) ) -- 表种类
) a
WHERE rn = 1 AND is_exc = 1 
"""

# 钉钉机器人,发送消息
def dd_robot(url:str, content: str):
  HEADERS = {"Content-Type": "application/json;charset=utf-8"}
  #content里面要设置关键字
  data_info = {
    "msgtype": "text",
    "text": {
    "content": content
    },
    "isAtAll": False
    #这是配置需要@的人
     # ,"at": {"atMobiles": ["15xxxxxx06",'18xxxxxx1']}
  }
  value = json.dumps(data_info)
  response = requests.post(url,data=value,headers=HEADERS)
  if response.json()['errmsg']!='ok':
    print(response.text)

# 主函数
if __name__ == '__main__': # py3可以省略
    try :
        with o.execute_sql(sql_query, hints={'odps.sql.hive.compatible': True}).open_reader() as reader:
            result_rows = list(reader) # 读取所有的结果行
            result_count = len(result_rows) # 获取结果条数
            #print("结果条数:", result_count) # 打印结果条数

            if result_count > 0 :
                for row in result_rows:
                    tbl_name = row.tbl_name
                    stat_time = row.stat_time
                    stat_pt = row.stat_pt
                    val_type = row.val_type
                    val = row.val
                    rule_type = row.rule_type
                    expect_val = row.expect_val
                    actual_val = row.actual_val
                    rule_code = row.rule_code
                    #print (tbl_name)
                    content = "数据质量(DQC)校验告警 \n  "
                    content = content + "【对象名称】:" + tbl_name + " \n  "
                    content = content + "【实际分区】:pt=" + stat_pt + " \n  "
                    content = content + "【触发规则】: " + rule_type + " \n  "
                    # 根据不同的规则, 输出不同的告警信息样式
                    if rule_code == '2' :
                        content = content + "【规则明细】: 当前样本值: " + val + " | 阈值: " + expect_val + " \n  "
                    else :
                        content = content + "【规则明细】: 当前样本值: " + val + " | 阈值: " + expect_val + " | 实际值: " + actual_val + " \n  "
                    
                    content = content + now_time 
                    dd_robot(url, content)
            else :
                print("无异常情况;")
    except Exception as e:
        print('异常 ========>' + str(e) )
3.3 告警样例
数据质量(DQC)校验告警 
  【对象名称】:dwd_gmv_scm_book_balance_di 
  【实际分区】:pt=20240624 
  【触发规则】: 表行数,1天波动率 
  【规则明细】: 当前样本值: 1244 | 阈值: 5 | 实际值: 14.548803 
  2024-06-24 16:48:56

在这里插入图片描述

4. 规则代码
  • 写入规则表里的 rule_sql 部分
4.1 表行数,上周期差值
# 1. 表行数,上周期差值
def sql_upper_period_diff():
    sql = f"""
    set odps.sql.hive.compatible=true ;

    INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}')
    SELECT 
          a.id
        , a.tbl_name
        , a.stat_time
        , a.pt_format
        , a.stat_pt
        , a.val_type
        , a.val
        , a.rule_code
        , a.rule_type
        , a.expect_val
        , IF (a.val = 0, 1, (IF ((a.val - NVL(b.val,0)) >= {expect_val}, 0, 1 ))) AS is_exc
        , a.tbl_sort_code
        , a.tbl_sort_name 
        , a.val - NVL(b.val,0) AS actual_val
    FROM (
        SELECT 
              concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id
            , '{tbl_name}' AS tbl_name
            , '{now_time}' AS stat_time
            , '{pt_format}' AS pt_format
            , date_format('{date_str}' ,'{pt_format}') AS stat_pt
            , '{val_type}' AS val_type
            , COUNT(1) AS val 
            , '{rule_code}' AS rule_code
            , '{rule_type}' AS rule_type
            , {expect_val} AS expect_val
            , {tbl_sort_code} AS tbl_sort_code
            , '{tbl_sort_name}' AS tbl_sort_name
        FROM {tbl_name}
        WHERE pt = date_format('{date_str}' ,'{pt_format}')
    ) a 
    LEFT JOIN 
    (
        SELECT tbl_name, val FROM (
            SELECT tbl_name, val
                , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC ) AS rn 
            FROM {res_tbl_name}
            WHERE pt = DATE_ADD('{pt}', -1)
                AND tbl_name = '{tbl_name}'
                AND rule_code = 1
        ) WHERE rn = 1
    ) b
    ON a.tbl_name = b.tbl_name
    ;
    """
    return sql
4.2 表行数, 固定值
# 2. 表行数, 固定值
def sql_line_fixed_val():
    sql = f"""
    set odps.sql.hive.compatible=true ;

    INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}')
    SELECT 
          concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', HOUR('{now_time}') ) AS id
        , '{tbl_name}' AS tbl_name
        , '{now_time}' AS stat_time
        , '{pt_format}' AS pt_format
        , date_format('{date_str}' ,'{pt_format}') AS stat_pt
        , '{val_type}' AS val_type
        , ct AS val 
        , '{rule_code}' AS rule_code
        , '{rule_type}' AS rule_type
        , {expect_val} AS expect_val
        , IF (ct >= {expect_val}, 0, 1 ) AS is_exc
        , {tbl_sort_code} AS tbl_sort_code
        , '{tbl_sort_name}' AS tbl_sort_name
        , ct AS actual_val
    FROM (
        SELECT COUNT(1) AS ct
        FROM {tbl_name}
        WHERE pt = date_format('{date_str}' ,'{pt_format}') 
    ) a ;
    """
    return sql
4.3 表行数, n天波动率
# 3. 表行数, n天波动率
def sql_cycle_volatility():
    sql = f"""
    set odps.sql.hive.compatible=true ;

    INSERT INTO TABLE {res_tbl_name} PARTITION (pt='{pt}')
    SELECT 
          id
        , tbl_name
        , stat_time
        , pt_format
        , stat_pt
        , val_type
        , val_now AS val
        -- , val_last
        , rule_code
        , rule_type
        , expect_val
        , IF(val_last > 0, IF(ABS(val_now - val_last) / val_last * 100 > {expect_val}, 1, 0), IF(val_last = 0, 1, 0)) AS is_exc
        , tbl_sort_code
        , tbl_sort_name 
        , IF(val_last > 0, ABS(val_now - val_last) / val_last * 100, -1) AS actual_val
    FROM (
        SELECT 
          concat( md5(concat('{tbl_name}', '_', date_format('{date_str}' ,'{pt_format}')) ), '_', {rule_code}, '_', '{rule_parameters}', '_', HOUR('{now_time}') ) AS id 
        , '{tbl_name}' AS tbl_name
        , '{now_time}' AS stat_time
        , '{pt_format}' AS pt_format
        , date_format('{date_str}' ,'{pt_format}') AS stat_pt
        , '{val_type}' AS val_type
        , COUNT(1) AS val_now 
        , '{rule_code}' AS rule_code
        , REPLACE('{rule_type}', 'n', '{rule_parameters}')  AS rule_type
        , {expect_val} AS expect_val
        , {tbl_sort_code} AS tbl_sort_code
        , '{tbl_sort_name}' AS tbl_sort_name
        , (
            SELECT val FROM (
                SELECT tbl_name, val
                    , ROW_NUMBER() OVER(PARTITION BY tbl_name ORDER BY stat_time DESC ) AS rn 
                FROM {res_tbl_name}                                                                             
                WHERE pt = DATE_ADD('{pt}', - CAST('{rule_parameters}' AS INT))
                    AND tbl_name = '{tbl_name}'
                    AND rule_code = 3
            ) WHERE rn = 1
        ) AS val_last
        FROM {tbl_name}
        WHERE pt = date_format('{date_str}' ,'{pt_format}')
    ) a ;
    """
    return sql
5. 结语
  • 代码可以直接copy, 可开箱即用(部分内容, 如分区层级, 可根据你自己公司的数据表进行调整);
  • 规则内容如有不懂, 欢迎咨询讨论;
  • DataWorks的数据质量需要另外收费, 自己实现免费使用(当然DW可选择的规则会有很多, 目前只实现了常用的几个);
  • 初次学习使用 python, 如若代码中有可精简的操作, 或方法, 欢迎指教;
  • 后续会持续优化操作, 更新功能, 持续关注, 欢迎交流;
  • 本博文是之前的升级版本: Python常用日期函数和日期处理方法;
  • 欢迎留言讨论;
end

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1882577.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Navcat Premium17破解安装及数据库连接教程

一、前言 Navicat Premium 是一套数据库开发工具,是一个可多重连接数据库的管理工具。Navicat Premium让你从单一应用程序中同时连接 MySQL、MariaDB、MongoDB、SQL Server、Oracle、PostgreSQL 和 SQLite 数据库。目前17已经支持了很久都没有支持的Redis数据库了。…

大数据之Zookeeper部署

文章目录 集群规划环境准备集群部署参考资料 集群规划 确定使用Hadoop101、hadoop102和hadoop103三台服务器来构建Zookeeper集群。 hadoop101hadoop102hadoop103zookeeperzookeeperzookeeper 环境准备 安装zookeeper前需要确保下面的环境配置成功,具体可以参考大…

七一建党节|热烈庆祝中国共产党成立103周年!

时光荏苒,岁月如梭。 在这热情似火的夏日, 我们迎来了中国共产党成立103周年的重要时刻。 这是一个值得全体中华儿女共同铭记和庆祝的日子, 也是激励我们不断前进的重要时刻。 103年, 风雨兼程,砥砺前行。 从嘉兴…

五种肉苁蓉属植物叶绿体基因组-文献精读25

Structural mutations of small single copy (SSC) region in the plastid genomes of five Cistanche species and inter-species identification 五种肉苁蓉属植物叶绿体基因组中小单拷贝 (SSC) 区域的结构突变及物种间鉴定 摘要 背景 肉苁蓉属是列当科的重要属类&#xf…

小型气象站:便携、高效的气象监测新选择

在气象监测领域,一款小巧而功能全面的设备正逐渐受到广泛关注——那就是小型气象站。它不仅体积小巧、重量轻,而且采用了众多先进技术,使其在气象数据的采集、传输和分析中展现出强大的能力。 小型气象站之所以备受青睐,首先得益于…

error LNK2019: 无法解析的外部符号 _SDL_main,该符号在函数 _main_getcmdline 中被引用

VC MFC情况下出现此问题, 网上搜索了很多文章无法解决。 error LNK2019: 无法解析的外部符号 _SDL_main,该符号在函数 _main_utf8 中被引用_sdl2main.lib出现无法解析的外部符号-CSDN博客 字符集必须设置为:

交叉编译 tcpdump libpcap

文章目录 交叉编译 tcpdump & libpcap概述源码下载交叉编译 libpcap交叉编译 tcpdump 交叉编译 tcpdump & libpcap 概述 tcpdump 是一个强大的命令行包分析器,libpcap 是一个可移植的用于网络流量捕获的 C/C 库。tcpdump 依赖于 libpcap 库,同…

Jenkins接口自动化项目的工程创建

jenkins的下载安装 jenkins下载的官网地址 https://www.jenkins.io/download/ java环境变量的配置下载 jenkins是用java语言编写的所以要配置java环境 需要安装java的JDK 推荐安装JDK17(https://blog.csdn.net/wochunyang/article/details/138520209) JDK17的下载地址 ht…

红黑树模拟

1.红黑树概念 红黑树,是一种二叉搜索树,但每个节点上增加了一个存储位表示结点的颜色,可以是RED或BLACK。通过任何一条根到叶子节点的途径上各个节点的着色方式的限制,红黑树确保没有一条路径会超过其他路径的二倍,因…

基于Cisco模拟器的组网实验

课程目的 综合运用所学的网络原理、网络规划和网络集成等知识理论,按照下图所示,完成网络的规划、集成与配置,并利用ACL实现对网络的管理。 实验内容 连接并配置路由器,配置路由协议(RIP或OSPF)&#xf…

[Microsoft Office]Word设置页码从第二页开始为1

目录 第一步:设置页码格式 第二步:设置“起始页码”为0 第三步:双击页码,出现“页脚”提示 第四步:选中“首页不同” 第一步:设置页码格式 第二步:设置“起始页码”为0 第三步:双…

现在电气真的比不过计算机吗 ?

电气工程和计算机科学在今天的科技和工业领域中各有其重要性和发展空间,并不存在简单的比较谁“比不过”谁的情况。我收集制作一份plc学习包,对于新手而言简直不要太棒,里面包括了新手各个时期的学习方向,包括了编程教学&#xff…

【爱上C++】详解string类2:模拟实现、深浅拷贝

在上一篇文章中我们介绍了string类的基本使用,本篇文章我们将讲解string类一些常用的模拟实现,其中有很多细小的知识点值得我们深入学习。Let’s go! 文章目录 类声明默认成员函数构造函数析构函数拷贝构造函数深浅拷贝问题传统写法现代写法…

哪个量化软件最好用?散户也可以很快上手!QMT!

一、QMT是什么 QMT(Quantitative Multi-market Trading System)是一款专为高净值客户、量化爱好者及专业量化投资者设计的量化交易软件。它集行情显示、策略研究、交易执行和风控管理于一体,为投资者提供全方位的量化交易解决方案 二、QMT量化…

SAP配置发布WebService接口并调用(超级详细)

文章目录 前言一、案例介绍/笔者需求二、WebService是什么? a.传输协议 b.数据协议 c.WSDL d.UDDI 三、WebService 和 WebApi 的区别以及优缺点 a.主要区别 b.优缺点 四、SAP如何发布一个webser…

Perforce网络研讨会预告 | HelixCore vs SVN vs ClearCase:嵌入式开发中的数据管理趋势及工具对比分析

现如今,开发嵌入式软件涉及的规模比以往任何时候都庞大。团队在全球范围内不断扩展,文件数量呈指数级增长,项目每年所涉及的数字资产和元数据也更多,并且越来越多的团队成员要在相同的复杂项目中并行工作。 面对如此庞大的开发规…

《梦醒蝶飞:释放Excel函数与公式的力量》7.4 MID函数

第七章:文本处理函数 第4节:7.4 MID函数 7.4.1.MID函数简介 MID函数用于从文本字符串的指定位置开始提取指定数量的字符。它特别适用于从字符串中提取子字符串或处理具有特定格式的数据。 语法: MID(text, start_num, num_chars) - **t…

聚焦政企人才培养,打造多元化课程体系

树莓集团在课程内容上展现了显著的多元化特点,通过广泛的合作、创新的课程内容和产教融合的教学模式,为广大学子提供了全面、系统、专业的教育资源和实践机会。 1、专业广度:树莓集团旗下拥有树莓教育成都王老师摄影培训学校,作为…

Elasticsearch:Painless scripting 语言(一)

Painless 是一种高性能、安全的脚本语言,专为 Elasticsearch 设计。你可以使用 Painless 在 Elasticsearch 支持脚本的任何地方安全地编写内联和存储脚本。 Painless 提供众多功能,这些功能围绕以下核心原则: 安全性:确保集群的…

ZXL-2000砌体砂浆强度点荷仪

一、产品简介: 砌体砂浆强度点荷仪(又名:砂浆点荷仪),是根据GB/T50315-2000《砌体工程现场检验技术规程》而研制生产的。是砌体砂浆强度检测的专用仪器,其特点是能在现场或试验室直接测试,不影…