二百三十七、Hive——DWS层生成每个清洗字段的异常情况记录

news2025/2/24 2:55:03

一、目的

在Hive中对每种业务数据的清洗字段的异常数据进行记录

例如这张图,上面是原始数据,下面是每台雷达每天的异常字段的记录

二、实施步骤

(一)建表

create  table  if not exists  dws_data_clean_record_queue(
    data_type      int        comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',
    device_no      string     comment '设备编号',
    create_time    timestamp  comment '创建时间',
    field_name     string     comment '字段名',
    field_value    string     comment '字段值'
)
comment '静态排队数据清洗记录表'
partitioned by (day string)
stored as orc
;

(二)SQL

1、运行比较简单,但代码比较难

with t3 as(
select
       device_no,
       case when device_no is null then CONCAT('device_no:','null')  END AS device_no_value,
       create_time,
       case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END AS lane_no_value,
       case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING))  END AS queue_len_value,
       case when queue_head < 0 or queue_head > 500 then  CONCAT('queue_head:', CAST(queue_head AS STRING))  END AS queue_head_value,
       case when queue_tail < 0 or queue_tail > 500 then  CONCAT('queue_tail:', CAST(queue_tail AS STRING))  END AS queue_tail_value,
       case when queue_count < 0 or queue_count > 100  then  CONCAT('queue_count:', CAST(queue_count AS STRING))  END AS queue_count_value,
       concat_ws(',',
                case when device_no is null then CONCAT('device_no:','null') end ,
                case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END ,
                case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING))  END,
                case when queue_head < 0 or queue_head > 500 then  CONCAT('queue_head:', CAST(queue_head AS STRING))  END,
                case when queue_tail < 0 or queue_tail > 500 then  CONCAT('queue_tail:', CAST(queue_tail AS STRING))  END,
                case when queue_count < 0 or queue_count > 100  then  CONCAT('queue_count:', CAST(queue_count AS STRING))  END
                ) AS kv_pairs  ,
       day
from (select
        t1.device_no,
        substr(create_time,1,19)                        create_time ,
        get_json_object(list_json,'$.laneNo')           lane_no,
        get_json_object(list_json,'$.queueLen')         queue_len,
        get_json_object(list_json,'$.queueHead')        queue_head,
        get_json_object(list_json,'$.queueTail')        queue_tail,
        get_json_object(list_json,'$.queueCount')       queue_count,
        date(t1.create_time)                            day
    from (
    select
       get_json_object(queue_json,'$.deviceNo')         device_no,
       get_json_object(queue_json,'$.createTime')       create_time,
       get_json_object(queue_json,'$.queueList')        queue_list
    from hurys_dc_ods.ods_queue
        where day='2024-05-15'
        ) as t1
lateral view explode(split(regexp_replace(regexp_replace(queue_list,
                                                '\\[|\\]','') ,
                        '\\}\\,\\{','\\}\\;\\{'),
                  '\\;')
          )list_queue as list_json
    )  as t2
        )
insert  overwrite  table  hurys_dc_dws.dws_data_clean_record_queue partition(day)
select
    '6' data_type,
    t3.device_no,
    create_time,
    split(pair, ':')[0] AS field_name,
    split(pair, ':')[1] AS field_value,
    day
from t3
lateral view explode(split(t3.kv_pairs , ',')) exploded_table AS pair
where device_no_value is not null or queue_len_value is not null or lane_no_value is not null
or queue_head_value is not null or queue_tail_value is not null or queue_count_value is not null
;

注意 

(1)t1是解析JSON一级原始数据

(2)t2是解析JSON二级原始数据,得到所有的字段

(3)t3是利用case when和CONCAT,对每个检测字段的字段名和异常值进行拼接。最重要的是,利用concat_ws生成各种检测字段的键值对kv_pairs

(4)最后,则是利用lateral view explode(split)对键值对进行炸开,然后切分每个字段,形成field_name和field_value

(5)另外,最后where指定条件,键值对里面的字段总要非空

2、运行比较耗资源、但代码简单

with t2 as(
    select
        t1.device_no,
        substr(create_time,1,19)                        create_time ,
        get_json_object(list_json,'$.laneNo')           lane_no,
        get_json_object(list_json,'$.queueLen')         queue_len,
        get_json_object(list_json,'$.queueHead')        queue_head,
        get_json_object(list_json,'$.queueTail')        queue_tail,
        get_json_object(list_json,'$.queueCount')       queue_count,
        date(t1.create_time)                            day
from (
    select
       get_json_object(queue_json,'$.deviceNo')         device_no,
       get_json_object(queue_json,'$.createTime')       create_time,
       get_json_object(queue_json,'$.queueList')        queue_list
    from hurys_dc_ods.ods_queue
        where day='2024-05-15'
) as t1
lateral view explode(split(regexp_replace(regexp_replace(queue_list,
                                                '\\[|\\]','') ,
                        '\\}\\,\\{','\\}\\;\\{'),
                  '\\;')
          )list_queue as list_json
)
insert  overwrite  table  hurys_dc_dws.dws_data_clean_record_queue partition(day)
select
       '6' data_type,
       device_no,
       create_time,
       'device_no' field_name ,
       case when device_no is null then device_no END AS field_value ,
       day
from t2
where device_no is null
union all
select
       '6' data_type,
       device_no,
       create_time,
       'lane_no'  field_name ,
       case when lane_no < 0 or lane_no >255 then lane_no END AS field_value ,
       day
from t2
where lane_no < 0 or lane_no >255
union all
select
       '6' data_type,
       device_no,
       create_time,
       'queue_len'  field_name ,
       case when queue_len < 0 or queue_len > 500 then queue_len END AS field_value ,
       day
from t2
where  queue_len < 0 or queue_len > 500
union all
select
       '6' data_type,
       device_no,
       create_time,
       'queue_head'  field_name ,
       case when queue_head < 0 or queue_head > 500 then queue_head END AS field_value ,
       day
from t2
where  queue_head < 0 or queue_head > 500
union all
select
       '6' data_type,
       device_no,
       create_time,
       'queue_tail'  field_name ,
       case when queue_tail < 0 or queue_tail > 500 then queue_tail END AS field_value ,
       day
from t2
where  queue_tail < 0 or queue_tail > 500
union all
select
       '6' data_type,
       device_no,
       create_time,
       'queue_count'  field_name ,
       case when queue_count < 0 or queue_count > 100 then queue_count END AS field_value ,
       day
from t2
where queue_count < 0 or queue_count > 100
;

(1)特点:每个字段union判断 笨方法

(三)查看表结果

花了一天时间终于搞定,又前进了一小步!加油啊,少年

2024年5月22日续写

(四)海豚调度任务T+1插入

#! /bin/bash
source /etc/profile

nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`

hive -e "
use hurys_dc_dws;

set hive.vectorized.execution.enabled=false;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=2000;


with t3 as(

select
       device_no,
       case when device_no is null then CONCAT('device_no:','null')  END AS device_no_value,
       create_time,
       case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END AS lane_no_value,
       case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING))  END AS queue_len_value,
       case when queue_head < 0 or queue_head > 500 then  CONCAT('queue_head:', CAST(queue_head AS STRING))  END AS queue_head_value,
       case when queue_tail < 0 or queue_tail > 500 then  CONCAT('queue_tail:', CAST(queue_tail AS STRING))  END AS queue_tail_value,
       case when queue_count < 0 or queue_count > 100  then  CONCAT('queue_count:', CAST(queue_count AS STRING))  END AS queue_count_value,
       concat_ws(',',
                case when device_no is null then CONCAT('device_no:','null') end ,
                case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END ,
                case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING))  END,
                case when queue_head < 0 or queue_head > 500 then  CONCAT('queue_head:', CAST(queue_head AS STRING))  END,
                case when queue_tail < 0 or queue_tail > 500 then  CONCAT('queue_tail:', CAST(queue_tail AS STRING))  END,
                case when queue_count < 0 or queue_count > 100  then  CONCAT('queue_count:', CAST(queue_count AS STRING))  END
                ) AS kv_pairs  ,
       day
from (select
        t1.device_no,
        substr(create_time,1,19)                        create_time ,
        get_json_object(list_json,'$.laneNo')           lane_no,
        get_json_object(list_json,'$.queueLen')         queue_len,
        get_json_object(list_json,'$.queueHead')        queue_head,
        get_json_object(list_json,'$.queueTail')        queue_tail,
        get_json_object(list_json,'$.queueCount')       queue_count,
        date(t1.create_time)                            day
    from (
    select
       get_json_object(queue_json,'$.deviceNo')         device_no,
       get_json_object(queue_json,'$.createTime')       create_time,
       get_json_object(queue_json,'$.queueList')        queue_list
    from hurys_dc_ods.ods_queue
        where day= '$yesdate'
        ) as t1
lateral view explode(split(regexp_replace(regexp_replace(queue_list,
                                                '\\\\[|\\\\]','') ,    
                                 '\\\\}\\\\,\\\\{','\\\\}\\\\;\\\\{'), 
                   '\\\\;')
          )list_queue as list_json
    )  as t2
where day = '$yesdate'
        )
insert  overwrite  table  hurys_dc_dws.dws_data_clean_record_queue partition(day)
select
    '6' data_type,
    t3.device_no,
    create_time,
    split(pair, ':')[0] AS field_name,
    split(pair, ':')[1] AS field_value,
    day
from t3
lateral view explode(split(t3.kv_pairs , ',')) exploded_table AS pair
where device_no_value is not null or queue_len_value is not null or lane_no_value is not null
or queue_head_value is not null or queue_tail_value is not null or queue_count_value is not null
"

(五)海豚任务执行以及表数据验证

1、海豚任务执行

2、表数据验证

2024-05-21表分区的数据已经有啦!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1691376.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Go团队:Go是什么

2024年的Google I/O大会[1]如期而至。 这届大会的核心主旨毫无疑问是坚定不移的以AI为中心&#xff1a;Google先是发布了上下文长度将达到惊人的200万token的Gemini 1.5 Pro[2]&#xff0c;然后面对OpenAI GPT-4o的挑衅&#xff0c;谷歌在大会上直接甩出大杀器Project Astra[3]…

关于pytest中用例名称使用中文乱码的解决

场景&#xff1a;使用pytest.mark.parametrize装饰器为用例自定义名称时&#xff0c;运行显示乱码。如下图所示&#xff1a; 解决方案&#xff1a; 1.在根目录 pytest.ini中增加一行代码 [pytest] disable_test_id_escaping_and_forfeit_all_rights_to_community_supportTrue…

学习使用博客记录生活

学习使用博客记录生活 新的改变 今天新的开始&#xff0c;让我用图片开始记录吧 看这个背景图片怎么样

微信小程序实现容器图片流式布局功能,配合小程序原生框架使用。

小程序实现容器图片流式布局功能&#xff0c;因为目前论坛上也有很多博主出过类似的文章&#xff0c;这里我就以一个小白角度去讲一下如何实现的吧。给作者一点点鼓励&#xff0c;先点个赞赞吧&#x1f44d;&#xff0c;蟹蟹&#xff01;&#xff01; 目标 实现下方效果图 技术…

基金/证券项目如何进行非交易日数据补全(实战)

一些大数据开发的项目&#xff0c;特别是基金/证券公司的项目&#xff0c;都经常会涉及到交易日与非交易日的概念。 如果要让你对一张交易日跑批的主表&#xff0c;怎么去补全非交易日的数据呢&#xff1f; 在遇到这种情况的时候&#xff0c;我们要去怎么处理&#xff1f;来&…

模板编译之入口分析

Vue 是一个渐进式 JavaScript 框架&#xff0c;提供了简单易用的模板语法&#xff0c;帮助开发者以声明式的方式构建用户界面。Vue 的模板编译原理是其核心之一&#xff0c;它将模板字符串编译成渲染函数&#xff0c;并在运行时高效地更新 DOM。本文将深入探讨 Vue 模板编译的原…

图片分类模型训练及Web端可视化预测(下)——Web端实现可视化预测

Web端实现可视化预测 基于Flask搭建Web框架&#xff0c;实现HTML登录页面&#xff0c;编写图片上传并预测展示页面。后端实现上一篇文章所训练好的模型&#xff0c;进行前后端交互&#xff0c;选择一张图片&#xff0c;并将预测结果展示到页面上。 文章目录 Web端实现可视化预测…

前端基于word模板导出word文档

项目环境 vue2 js vue-cli等 依赖包都可以在npm官网找到对应文档 npm官网(英文) 1、依赖 安装依赖 docxtemplater npm i docxtemplaterfile-saver npm i file-saverjszip-utils npm i jszip-utilsjszip npm i jszip在对应页面或模块中引入依赖 import Docxtemplater …

探索AI写作工具:五款推荐

在现实生活中&#xff0c;除了专业的文字工作者&#xff0c;各行各业都避免不了需要写一些东西&#xff0c;比如策划案、论文、公文、讲话稿、总结计划……等等。而随着科技的进步&#xff0c;数字化时代的深入发展&#xff0c;AI已经成为日常工作中必不可少的工具了&#xff0…

智慧之选:开源与闭源大模型的未来探索

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

有一个3x4的矩阵,要求编写程序求出其中值最大的那个元素,以及其所在的行号和列号

解题思路&#xff1a; 先考虑解此问题的思路。从若干数中求最大数的方法很多&#xff0c;现在采用"打擂台"的算法。如果有若干人比武&#xff0c;先有一人站在台上&#xff0c;再上去一人与其交手&#xff0c;败者下台&#xff0c;胜者留在台上。第3个人再上…

selenium安装出错

selenium安装步骤&#xff08;法1&#xff09;&#xff1a; 安装失败法1 第一次实验&#xff0c;失败 又试了一次&#xff0c;失败 安装法2-失败&#xff1a; ERROR: Could not install packages due to an EnvironmentError: [WinError 5] 拒绝访问。: c:\\programdata\\a…

【C++题解】1697. 请输出n~1之间所有的整数

问题:1697. 请输出n~1之间所有的整数 类型&#xff1a;循环 题目描述&#xff1a; 从键盘读入一个整数 n &#xff0c;请输出 n∼1 之间所有的整数&#xff0c;每行输出 1 个。 比如&#xff0c;假设读入 n5 &#xff0c;输出结果如下&#xff1a; 5 4 3 2 1 输入&#xff1…

UE5 像素流与web 交互

总结下虚幻与网页的交互&#xff0c;这里将ue5 与js 交互传递参数记录下&#xff0c;其它的博主写的就是缺胳膊少腿的要么就是封闭收费&#xff0c;这个是在官方可以查询到。这里记录下&#xff1a; 点个关注不迷路&#xff1a; 具体的使用如下&#xff1a; 在你的游戏玩家类…

推荐几款新手学习编程的网站

免费在线开发平台 介绍一款编程平台&#xff0c;专为学生和开发者量身打造&#xff01;平台拥有近4000道编程题目&#xff0c;支持多种编程语言&#xff08;包括C、C、JavaScript、TypeScript、Go、Rust、PHP、Java、Ruby、Python3和C#&#xff09;&#xff0c;为您提供全面的学…

【Oracle篇】rman标准化全库备份策略:完整备份or增量备份(第三篇,总共八篇)

&#x1f4ab;《博主介绍》&#xff1a;✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ &#x1f4ab;《擅长领域》&#xff1a;✌️擅长Oracle、MySQL、SQLserver、阿里云AnalyticDB for MySQL(分布式数据仓库)、Linux&#xff0c;也在扩展大数据方向的知识面✌️…

YoloV8改进策略:蒸馏改进|CWDLoss|使用蒸馏模型实现YoloV8无损涨点|特征蒸馏

摘要 在本文中&#xff0c;我们成功应用蒸馏策略以实现YoloV8小模型的无损性能提升。我们采用了CWDLoss作为蒸馏方法的核心&#xff0c;通过对比在线和离线两种蒸馏方式&#xff0c;我们发现离线蒸馏在效果上更为出色。因此&#xff0c;为了方便广大读者和研究者应用&#xff…

Kubernetes Service 之原理与 ClusterIP 和 NodePort 用法

Kubernetes Service 之原理与 ClusterIP 和 NodePort 用法 Service 定义 在 Kubernetes 中&#xff0c;由于Pod 是有生命周期的&#xff0c;如果 Pod 重启它的 IP 可能会发生变化以及升级的时候会重建 Pod&#xff0c;我们需要 Service 服务去动态的关联这些 Pod 的 IP 和端口…

六个免费的AI制图网站的介绍

六个免费的AI制图网站的介绍 以下是六个免费的AI制图网站的介绍&#xff0c;包括官网、特点、缺点以及使用时的注意事项&#xff1a; 海鲸AI 官网&#xff1a;AI绘画创作平台 - 海鲸AI | 智能艺术生成器特点&#xff1a;支持PC和移动端&#xff0c;集成了Midjourney AI模型&am…

【Spring】SSM介绍_SSM整合

1、SSM介绍 1.1简介 SSM&#xff08;Spring SpringMVC MyBatis&#xff09;整合是一种流行的Java Web应用程序框架组合&#xff0c;它将Spring框架的核心特性、SpringMVC作为Web层框架和MyBatis作为数据访问层框架结合在一起。这种整合方式提供了从数据访问到业务逻辑处理再…