Hive 巡检工具-对表数据量、主键重复数量以及每个字段标签的空值检测

news2024/11/24 14:06:13

目录

背景

巡检工具

数据准备

1、准备一张配置信息表,该表保存需要巡检的数据信息(规则code不可重复)

 2、pyspark代码编写

结果表数据展示

规则自动检测并自增

数据准备


背景

该需求是利用pyspark对部分重点产出表进行数据质量监控。主要是对

1、数据表的数据量,该指标后续可计算该表的数据量波动等

2、主键重复数量,用来保障主键的唯一性

3、每个字段的空值量,该指标后续可计算字段的空值波动率以及检测出该字段是否为空,从而提醒去上游查看数据源是否正常。

检测的后续结果可以结合BI/邮件等,对结果进行展示。

巡检工具

数据准备

准备一张配置信息表,该表保存需要巡检的数据信息(规则code不可重复)

 其中

(1)规则code用来保证该条规则的唯一性,可以和t-2分区进行关联,从而计算波动率等。

(2)规则类型用来区分是表or字段 的检测规则

(3)规则名称row_cnt代表表行数检测

举例

表行数检测配置-将要检测的表的行数进行配置

 字段空值检测配置-将要检测的字段进行配置

 主键重复检测配置-配置主键

 pyspark代码编写

1、读取上边我们在表里的规则配置信息

2、利用pyspark分别对各个规则进行检测

3、讲结果写入数据表

# encoding=utf8
import smtplib
import re
import findspark

findspark.init()
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import warnings
import sys

warnings.filterwarnings('ignore')
spark = SparkSession.builder.appName('cdp_label_check').getOrCreate()


def get_config():
    """
    获取配置
    """
    df_config = pd.DataFrame()
    try:
        exec_sql = """select * from dm_subject.dwd_upf_monitor_label_config where  rule_name<>'empty_cnt'"""
        print(exec_sql)
        df_config = spark.sql(exec_sql).toPandas()

    except Exception as e:
        print("df_config exception ", e)

    return df_config

def get_empty_stat_config(bizdate):
    """
    获取配置 批量统计空值数量
    """
    df_config = pd.DataFrame()
    try:
        exec_sql = """select 
                concat('select ',"'",'column',"'",' ','rule_type,',"'",'empty_cnt',"'", ' ','rule_name,',"'",table_name,"'",' table_name, ','split(b.column_name,',"'",'####',"'",')[0]  column_name, ',partition_name, ' partition_name, ','coalesce( row_cnt,0) row_cnt',',split(b.column_name,',"'",'####',"'",')[1] rule_code', ' from ( select ',partition_name ,', ',clause_map,' map_cnt ',' from ( select ',partition_name ,', ',clause_sum,' from ',table_name,' where ',partition_name,'={bizdate} group by ',partition_name ,')a)a lateral view explode(map_cnt) b as column_name,row_cnt')
                caluse_select
                from 
                (
                select 
                table_name,partition_name
                ,concat_ws(',',collect_set(concat('sum(if (coalesce(cast(',column_name,' as string),', "'","')" ,'=', "'","'," ,'1,','0)) ',column_name)))clause_sum
                ,concat('map(',concat_ws(',', collect_set(concat("'",concat(column_name,'####',rule_code),"',",column_name))),')')clause_map
                from dm_subject.dwd_upf_monitor_label_config a  
                where a.rule_type='column'
                and rule_name='empty_cnt'
                group by table_name,partition_name
                )a """
        print(exec_sql)
        df_config = spark.sql(exec_sql).toPandas()

    except Exception as e:
        print("df_config exception ", e)

    return df_config

def cal_table_row_cnt(rule_code, rule_type, rule_name, table_name, column_name, partition_name, bizdate):
    """
    计算数据表数量
    """

    try:
        exec_sql = """insert overwrite table dm_subject.dwd_upf_monitor_label_result_di PARTITION(dt={bizdate},rule_code='{rule_code}')
                select 
                    '{rule_type}' rule_type,
                    '{rule_name}' rule_name,
                    '{table_name}' table_name,
                    '' column_name,
                    '{partition_name}' partition_name,
                    count(1) row_cnt
                     from {table_name} 
                    where {partition_name}={bizdate}
                    """.format(rule_code=rule_code, rule_type=rule_type, rule_name=rule_name, table_name=table_name,
                               column_name=column_name, partition_name=partition_name, bizdate=bizdate)

        print("sql execute begin " + exec_sql)
        df = spark.sql(exec_sql)
        print("sql execute end " + exec_sql)


    except Exception as e:
        print("table_row_cnt exception ", e)


def cal_column_duplicate_cnt(rule_code, rule_type, rule_name, table_name, column_name, partition_name, bizdate):
    """
    计算列重复值数量
    """
    try:
        exec_sql = """insert overwrite table dm_subject.dwd_upf_monitor_label_result_di PARTITION(dt={bizdate},rule_code='{rule_code}')
                select 
                    '{rule_type}' rule_type,
                    '{rule_name}' rule_name,
                    '{table_name}' table_name,
                    '{column_name}' column_name,
                    '{partition_name}' partition_name,
                    count(1) row_cnt
                from
                    (select {column_name}
                     from {table_name} 
                    where {partition_name}={bizdate}
                    group by {column_name}
                    having count(1)>1
                    )a 

                    """.format(rule_code=rule_code, rule_type=rule_type, rule_name=rule_name, table_name=table_name,
                               column_name=column_name, partition_name=partition_name, bizdate=bizdate)

        print("sql execute begin " + exec_sql)
        df = spark.sql(exec_sql)
        print("sql execute end " + exec_sql)


    except Exception as e:
        print("column_duplicate_cnt exception ", e)


def cal_column_empty_cnt(rule_code, rule_type, rule_name, table_name, column_name, partition_name, bizdate):
    """
    计算列空值数量
    """
    try:
        exec_sql = """insert overwrite table dm_subject.dwd_upf_monitor_label_result_di PARTITION(dt={bizdate},rule_code='{rule_code}')
                select 
                    '{rule_type}' rule_type,
                    '{rule_name}' rule_name,
                    '{table_name}' table_name,
                    '{column_name}' column_name,
                    '{partition_name}' partition_name,
                    count(1) row_cnt
                     from {table_name} 
                    where {partition_name}={bizdate}
                    and ({column_name} is null or {column_name} ='')
                    """.format(rule_code=rule_code, rule_type=rule_type, rule_name=rule_name, table_name=table_name,
                               column_name=column_name, partition_name=partition_name, bizdate=bizdate)

        print("sql execute begin " + exec_sql)
        df = spark.sql(exec_sql)
        print("sql execute end " + exec_sql)
    except Exception as e:
        print("column_empty_cnt exception ", e)
############
##############
def cal_column_empty_cnt_batch( clause_select, bizdate):
    """
    计算列空值数量
    """
    try:
        clause_insert="""insert overwrite table dm_subject.dwd_upf_monitor_label_result_di PARTITION(dt={bizdate},rule_code) """
        clause_prepare=clause_insert+clause_select
        exec_sql = clause_prepare.format(clause_select=clause_select, bizdate=bizdate)

        print("sql execute begin " + exec_sql)
        df = spark.sql(exec_sql)
        print("sql execute end " + exec_sql)
    except Exception as e:
        print("column_empty_cnt exception ", e)
if __name__ == "__main__":
    #分区日期传入
    bizdate = sys.argv[1]

    print("cdp_label_check execute begin " + bizdate)
    #读取配置表 获取所有的规则
    df_config = get_config()

    df = df_config.values.tolist()
    for conf in df:
        print(conf)
        rule_code, rule_type, rule_name, table_name, column_name, partition_name = conf
        #主键唯一性检测
        if rule_name == 'duplicate_cnt':
            cal_column_duplicate_cnt(rule_code, rule_type, rule_name, table_name, column_name, partition_name, bizdate)
        #表行数检测
        if rule_name == 'row_cnt':
            cal_table_row_cnt(rule_code, rule_type, rule_name, table_name, column_name, partition_name, bizdate)

    #循环检测监控列表中每个字段的空值情况
    empty_stat_config = get_empty_stat_config(bizdate)
    df = empty_stat_config.values.tolist()
    for conf in df:
        print(conf)
        clause_select = conf[0]
        #执行结果写入
        cal_column_empty_cnt_batch(clause_select, bizdate)
    print("cdp_label_check execute end " + bizdate)


结果表数据展示

规则自动检测并自增

思考,如果我们表经常增加字段,那么每次都要往表里手动维护规则进去会很麻烦,所以我们编写自增规则的一个脚本

数据准备

1、准备一个表,里边存储已经不需要监控的字段or已经失效的字段

2、准备一个表,存储需要自增字段的来源表的元数据信息,可以获取到该表里的字段即可

pyspark代码编写

#coding:utf-8

import findspark 

findspark.init()
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

spark = SparkSession.builder.appName('cdp_label_add_column').getOrCreate()

def get_cur_col_list():
    """
    获取当前监控的字段列表
    """
    try:
        exec_sql = """
            select * from dm_subject.tmp_dwd_upf_monitor_label_config01
            """
        print ("exec_sql:", exec_sql)
        cur_col_list = spark.sql(exec_sql).toPandas().values.tolist()
    except Exception as e:
        print ("get_cur_col_list error: ", e)
        exit()
        
    return cur_col_list

def get_lost_label_list():
    """
    获取获取已下线或失效标签列表
    """
    lost_label_list = []
    try:
        exec_sql = """
            select * from dm_subject.dm_subject_cdp_lose_label_list
            """
        print ("exec_sql:", exec_sql)
        for i in spark.sql(exec_sql).toPandas().values.tolist(): lost_label_list.append(i[0])
    except Exception as e:
        print ("get_lost_label_list error: ", e)
        exit()
        
    return lost_label_list

def get_all_columns(table_name, db_name):
    """
    获取该表所有字段信息
    """
    all_columns_list = pd.DataFrame()
    try:
        exec_sql = """
            SELECT t3.db_name,t1.table_name,t1.table_alias,t2.column_index,t2.column_name,t2.column_type,t2.column_comment FROM(
                SELECT id,table_name,table_alias,db_id FROM ods_dsp.ds_metadata_dsmetadata_meta_table
                WHERE table_name REGEXP %s AND hoodie_record_type <> 'DELETE')t1 
            INNER JOIN(SELECT * FROM ods_dsp.ds_metadata_dsmetadata_meta_column WHERE hoodie_record_type <> 'DELETE')t2
            ON t1.id = t2.table_id
            INNER JOIN(SELECT id,db_name FROM ods_dsp.ds_metadata_dsmetadata_meta_db WHERE db_name REGEXP %s AND hoodie_record_type <> 'DELETE')t3
            ON t1.db_id = t3.id ORDER BY t1.table_name,t2.column_index ASC
        """%("\'%s\'"%(table_name), "\'%s\'"%(db_name))
        all_columns_list = spark.sql(exec_sql).toPandas()
    except Exception as e:
        print ("get_all_columns error: ", e)
        exit()
    #print ("all_columns_list:", all_columns_list)
    return all_columns_list.values.tolist()
 	
def main(check_list, db_name):
    #获取当前在监控的标签列表
    col_list, table_indx = [], {}
    cur_col_list = get_cur_col_list()
    for i in cur_col_list:
        #contract_agg_empty_001	column	empty_cnt	dm_cdp.dm_cdp_be_contract_agg	contr_no	dt
        rule_code, rule_type, rule_name, table_name, column_name, partition_nam = i 
        #剔除不在监控范围的数据
        if rule_type!='column' and rule_name!='empty_cnt':continue
        #记录最大的rulecode
        table_indx[table_name.split('.')[1].strip()]=rule_code if table_name not in table_indx.keys() else max(table_indx[table_name], rule_code)
        col_list.append(column_name)
    #print ('col_list: ', col_list, 'table_indx: ', table_indx)

    #获取已下线或失效标签列表
    lost_label_list = get_lost_label_list()

    #获取线上所有字段
    add_col = []
    for table_name in check_list:
        all_columns_list = get_all_columns(table_name, db_name)
        #[['dm_cdp', 'dm_cdp_ue_user_agg', '用户实体的聚合属性', 0, 'one_id', 'bigint', 'one_id'],]
        for i in all_columns_list:
            _, _, table_comment, col_index, col_name, col_type, col_comment = i 
            #如果字段不在当前监控列表或者不在失效列表中
            if col_name not in col_list and col_name not in lost_label_list:
                add_col.append('%s,%s'%(table_name, col_name))
    #print (add_col)

    import datetime 
    #如果没有检测到新增字段
    if not add_col:
        print ("%s not need add column!", datetime.date.today().strftime('%Y%m%d'))
        exit()
    #检测到之后执行新增                  
    res = "insert into table dm_subject.tmp_dwd_upf_monitor_label_config01 values"
    for col in add_col:
        tb_name, col_name = col.split(",")
        max_rule_code = table_indx[tb_name]
        #计算自增1的rule_code
        new_rule_code = '_'.join(max_rule_code.split('_')[:-1])+'_%s'%('%03d'%(int(max_rule_code.split('_')[-1])+1))
        table_indx[tb_name] = new_rule_code
        res += '(\'%s\',\'column\',\'empty_cnt\',\'%s\',\'%s\',\'dt\'),'%(new_rule_code,'.'.join([db_name, tb_name]), col_name)
    try:
        print ('res sql: ', res.strip(','))
        spark.sql(res.strip(','))
    except Exception as e:
        print ("exec res sql error: ", e)

    

if __name__ == '__main__':
    check_list = [
        'dm_cdp_ue_user_bas'
        ,'dm_cdp_ue_user_agg'
        ,'dm_cdp_ue_user_ext'
        ,'dm_cdp_be_contract_bas'
        ,'dm_cdp_be_contract_agg'
        ,'dm_cdp_be_contract_ext'
    ]
    main(check_list, 'dm_cdp')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/628149.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何使用二三层仪表模拟无状态的DDOS攻击测试

什么是DDOS攻击 分布式拒绝服务攻击(Distributed Denial of Service&#xff0c;简称DDoS)是指通过大规模互联网流量淹没目标服务器或其周边基础设施&#xff0c;以破坏目标服务器、服务或网络正常流量的恶意行为。 大量虚假的用户占用网络资源&#xff0c;把资源耗尽&#x…

PREP黄金沟通法则

PREP黄金沟通法则 掌握PREP黄金沟通四步法则&#xff0c;改善沟通困局&#xff0c;让交流更高效&#xff01; 模型介绍 Point: 结论先行让对方第一时间知道你想表达的观点。Reason: 摆出依据摆出你观点的依据&#xff0c;要做到客观公正、统一度量、表达准确、不出现歧义。Exa…

软件测试的案例分析 - 闰年4.2

这篇博客的目录 文章目的正文错误之一出错后怎么改正&#xff1f;正确而简明的算法 文章目的 显示不同的博客能获得多少博客质量分 &#xff08;这是关于博客质量分的测试 https://www.csdn.net/qc) 这个博客得了 60 分。 希望在新的质量分系统中&#xff0c;获得 80 - 90 分左…

Goby 漏洞更新 |MDT KNX 管理面板默认口令

漏洞名称&#xff1a;MDT KNX 管理面板默认口令 English Name&#xff1a;MDT KNX manager panel default credentials vulnerability CVSS core: 7.5 影响资产数&#xff1a;1135 漏洞描述&#xff1a; MDT是一家智能楼宇自动化服务商&#xff0c;基于KNX技术进行产品制造…

互联网产品的帮助中心页面制作方法?

帮助中心&#xff08;Help Center&#xff09;是企业或组织为了向客户提供技术支持和解决方案而设立的一个资源库&#xff0c;为客户提供常见问题解答、使用指南、教程等信息&#xff0c;旨在提高客户满意度和降低客户支持成本。帮助中心通常提供多种服务方式&#xff0c;包括在…

企业级微服务架构实战项目--xx优选2

一 常用核心功能 1.1 mp返回分页工具类 1.2 返回统一的数据格式 第2部分 1.3 异常统一的处理 系统在运行过程中如果出现了异常&#xff0c;默认会直接返回异常信息&#xff0c;比如500错误提示。但是我们想让异常结果也显示为统一的返回结果对象&#xff0c;并且统一处理系统的…

Vue 组件化: 计算属性、内容分发、自定义事件

目录 1. 计算属性 1.1 计算属性的特点 2. 内容分发 2.1 使用插槽的示例 3. 自定义事件 1. 计算属性 什么是计算属性 ? 计算属性的重点突出在属性两字, 首先它是个属性, 其次这个属性有计算的能力, 这里的计算就是个函数; 简单来说, 它就是一个能够将计算结果缓存起来的属…

认识 SpringCloud 核心组件

✅作者简介&#xff1a;大家好&#xff0c;我是Cisyam&#xff0c;热爱Java后端开发者&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;Cisyam-Shark的博客 &#x1f49e;当前专栏&#xff1a; 微服务探索之旅 ✨特色专…

Linux进程间通信【共享内存】

✨个人主页&#xff1a; 北 海 &#x1f389;所属专栏&#xff1a; Linux学习之旅 &#x1f383;操作环境&#xff1a; CentOS 7.6 阿里云远程服务器 文章目录 &#x1f307;前言&#x1f3d9;️正文1、什么是共享内存&#xff1f;2、共享内存的相关知识2.1、共享内存的数据结构…

人工智能十大新星揭晓,华人学者占90%

人工智能领域著名杂志 IEEE Intelligent Systems发布了 2022 年度“人工智能十大新星”&#xff08;AIs 10 to Watch&#xff09;名单 &#xff0c;其中有九位都是华人研究者。知识人网小编推荐给大家。 近日&#xff0c;人工智能领域著名杂志 IEEE Intelligent Systems公布了 …

在JavaScript中的栈数据结构(Stack )

文章目录 导文什么是Stack 类&#xff1f;如何创建一个Stack如何修改Stack中的值栈声明方法举例添加移除查看查看栈顶元素检查栈是否为空检查栈的长度 清空栈元素打印栈元素 完整的Stack函数&#xff1a;创建Stack的其他方法-用 ES6 语法声明 Stack 类 使用Stack类在 JavaScrip…

关于GDPR体系文件介绍,介绍GDPR体系文件的内容和意义

随着数字化时代的到来&#xff0c;个人数据保护成为了一个日益受到关注的问题。欧盟于2018年5月25日颁布了“通用数据保护条例”&#xff08;GDPR&#xff09;&#xff0c;旨在加强对欧洲公民个人数据的保护。GDPR对企业和组织的数据保护和处理流程提出了严格的要求&#xff0c…

自助化打印面单教程

我们都知道&#xff0c;这几年快递行业&#xff0c;从传统纸质面单过渡到了电子面单。以往企业寄快递&#xff0c;能够自行填写纸质面单&#xff0c;等待收件员上门收件&#xff0c;现如今&#xff0c;企业寄件能否自行打印电子面单&#xff1f; 首先我们要先对比一下传统面单和…

云智研发笔试编程题(一):图像相似度

题目描述 给出两幅相同大小的黑白图像 (用0-1矩阵) 表示求它们的相似度。若两幅图像在相同位置上的像素点颜色相同&#xff0c;则称它们在该位置具有相同的像素点。两幅图像的相似度定义为相同像素点数占总像素点数的百分比。 输入描述 第一行包含两个整数m和n&#xff0c;表…

C++概述——浅谈C++对C的拓展

纵有疾风起&#xff0c;人生不言弃。本文篇幅较长&#xff0c;如有错误请不吝赐教&#xff0c;感谢支持。 &#x1f4ac;C核心编程一 一.C简介二.第一个程序Hello,world&#xff01;三.C的特点四.C对C的扩展1️⃣作用域运算符::2️⃣C命名空间(namespace)①名字控制②为什么有…

golang性能分析 pprof的使用 graphviz

golang性能分析 pprof的使用 graphviz 1 参考文档2 pprof、Graphviz介绍3 Graphviz下载 安装4 使用 1 参考文档 参考1&#xff1a;golang性能分析&#xff0c;pprof的使用&#xff0c;graphviz&#xff0c;火焰图 参考2&#xff1a;Golang中的pprof分析环境搭建【Windows环境】…

隆重共建开放,共享未来 | 2023 开放原子全球开源峰会 OpenAtom OpenHarmony 分论坛即将启幕

在全球数字化进程快速发展的背景下&#xff0c;OpenAtom OpenHarmony&#xff08;以下简称“OpenHarmony”&#xff09;旨在通过面向全场景、全连接、全智能时代、基于开源的方式&#xff0c;搭建下一代智能终端设备操作系统的框架和平台&#xff0c;为消费、金融、能源、教育、…

光模块失效的原因及预防措施汇总

光模块从生产到使用都必须有规范化的操作方法&#xff0c;任何不规范的动作都可能造成光模块隐性的损伤或者永久的失效&#xff0c;下面就跟着小编来学习一下预防光模块失效的方法吧! 一、光模块失效的主要原因 1、光模块受到静电伤害&#xff08;ESD损伤&#xff09;&#xf…

2023 年最新版 Java 后端最全面试攻略,全面对标 BATJ互联网大厂

前言 小编分享的这份 Java 后端开发面试总结包含了 JavaOOP、Java 集合容器、Java 异常、并发编程、Java 反射、Java 序列化、JVM、Redis、Spring MVC、MyBatis、MySQL 数据库、消息中间件 MQ、Dubbo、Linux、ZooKeeper、 分布式 &数据结构与算法等 25 个专题技术点&#…

爬虫案例-使用Session登录某知名网站(JS逆向AES-CBC加密+MD5加密)

总体概览&#xff1a;使用Session登录该网站&#xff0c;其中包括对password参数进行js逆向破解 &#xff08;涉及加密&#xff1a;md5加密AES-CBC加密&#xff09; 难度&#xff1a;两颗星 目标网址&#xff1a;aHR0cHM6Ly93d3cuZnhiYW9nYW8uY29tLw 下面文章将分为四个部分…