二百七十一、Kettle——ClickHouse增量导入数据清洗记录表

news2025/1/11 2:00:03

一、目的

在完成错误数据表任务后,需要对每条错误数据的错误字段及其字段值进行分析

Hive中原有SQL语句和ClickHouse现有SQL语句很大不同

二、Hive中原有代码

2.1 表结构

--31、静态排队数据清洗记录表
create  table  if not exists  hurys_db.dwd_data_clean_record_queue(
    id             string     comment '唯一ID',
    data_type      int        comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',
    device_no      string     comment '设备编号',
    create_time    string  comment '创建时间',
    field_name     string     comment '字段名',
    field_value    string     comment '字段值'
)
comment '静态排队数据清洗记录表'
partitioned by (day string)
stored as orc
;

2.2 SQL代码

with t3 as(
select
       id,
       device_no,
       case when device_no is null then CONCAT('device_no:','null')  END AS device_no_value,
       create_time,
       case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END AS lane_no_value,
       case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING))  END AS queue_len_value,
       case when queue_head < 0 or queue_head > 500 then  CONCAT('queue_head:', CAST(queue_head AS STRING))  END AS queue_head_value,
       case when queue_tail < 0 or queue_tail > 500 then  CONCAT('queue_tail:', CAST(queue_tail AS STRING))  END AS queue_tail_value,
       case when queue_count < 0 or queue_count > 100  then  CONCAT('queue_count:', CAST(queue_count AS STRING))  END AS queue_count_value,
       concat_ws(',',
                case when device_no is null then CONCAT('device_no:','null') end ,
                case when lane_no < 0 or lane_no >255 then CONCAT('lane_no:', CAST(lane_no AS STRING)) END ,
                case when queue_len < 0 or queue_len > 500 then CONCAT('queue_len:', CAST(queue_len AS STRING))  END,
                case when queue_head < 0 or queue_head > 500 then  CONCAT('queue_head:', CAST(queue_head AS STRING))  END,
                case when queue_tail < 0 or queue_tail > 500 then  CONCAT('queue_tail:', CAST(queue_tail AS STRING))  END,
                case when queue_count < 0 or queue_count > 100  then  CONCAT('queue_count:', CAST(queue_count AS STRING))  END
                ) AS kv_pairs  ,
       day
from hurys_db.dwd_queue_error
    where day='2024-09-10'
)
insert  overwrite  table  hurys_db.dwd_data_clean_record_queue partition(day)
select
    id,
    '6' data_type,
    t3.device_no,
    create_time,
    split(pair, ':')[0] AS field_name,
    split(pair, ':')[1] AS field_value,
    day
from t3
lateral view explode(split(t3.kv_pairs , ',')) exploded_table AS pair
where device_no_value is not null or queue_len_value is not null or lane_no_value is not null
or queue_head_value is not null or queue_tail_value is not null or queue_count_value is not null
;

三、ClickHouse中现有代码

3.1 表结构

--31、静态排队数据清洗记录表(长期存储)
create  table  if not exists  hurys_jw.dwd_data_clean_record_queue(
    id             String            comment '唯一ID',
    data_type      Nullable(Int32)      comment '1:转向比,2:统计,3:评价,4:区域,5:过车,6:静态排队,7:动态排队,8:轨迹,9:事件数据,10:事件资源',
    device_no      Nullable(String)     comment '设备编号',
    create_time    DateTime          comment '创建时间',
    field_name     Nullable(String)     comment '字段名',
    field_value    Nullable(String)     comment '字段值',
    day            Date                 comment '日期'
)
ENGINE = MergeTree
PARTITION BY day
PRIMARY KEY (day,id)
ORDER BY (day,id)
SETTINGS index_granularity = 8192;

3.2 SQL代码

SELECT
    id,
    '6' AS data_type,
    device_no,
    create_time,
    splitByString(':', pair)[1] AS field_name,
    splitByString(':', pair)[2] AS field_value,
    day
FROM (SELECT
        id,
        device_no,
        create_time,
        day,
        arrayConcat(
            if(device_no IS NULL, ['device_no:null'], []),
            if(lane_no < 0 OR lane_no > 255, [concat('lane_no:', toString(lane_no))], []),
            if(queue_len < 0 OR queue_len > 500, [concat('queue_len:', toString(queue_len))], []),
            if(queue_head < 0 OR queue_head > 500, [concat('queue_head:', toString(queue_head))], []),
            if(queue_tail < 0 OR queue_tail > 500, [concat('queue_tail:', toString(queue_tail))], []),
            if(queue_count < 0 OR queue_count > 100, [concat('queue_count:', toString(queue_count))], [])
        ) AS pairs
    FROM hurys_jw.dwd_queue_error
    WHERE device_no IS NULL OR
          lane_no < 0 OR lane_no > 255 OR   queue_len < 0 OR queue_len > 500 OR
          queue_head < 0 OR queue_head > 500 OR  queue_tail < 0 OR queue_tail > 500 OR
          queue_count < 0 OR queue_count > 100
) AS subquery
array join pairs AS pair
;

注意:1、错误数据表dwd_queue_error的清洗字段不能设置nullable,这是一大坑

           2、如果错误数据表中的清洗字段是Decimal(10,1),那么相关字段就要调整

arrayConcat(
    if(device_no IS NULL, ['device_no:null'], []),
    if(lane_no < 0 OR lane_no > 255, [concat('lane_no:', toString(lane_no))], []),
    if(azimuth < 0 OR azimuth > toDecimal32(359.9,1), [concat('azimuth:', toString(azimuth))], []),
    if(rcs < -64 OR rcs > toDecimal32(63.5,1), [concat('rcs:', toString(rcs))], []),
    if(prob < 0 OR prob > 100, [concat('prob:', toString(prob))], [])
) AS pairs

3.3 Kettle任务

3.3.1 newtime

3.3.2 替换NULL值

3.3.3 clickhouse输入

3.3.4 字段选择

3.3.5 clickhouse输出

3.3.6 执行任务

3.3.7 海豚调度

由于不需要实时记录,因为把所有数据的清洗记录任务放在一个海豚工作流里面,T+1执行即可!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2227666.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

kubernetes——part2-3 使用RKE构建企业生产级Kubernetes集群

使用RKE构建企业生产级Kubernetes集群 一、RKE工具介绍 RKE是一款经过CNCF认证的开源Kubernetes发行版&#xff0c;可以在Docker容器内运行。 它通过删除大部分主机依赖项&#xff0c;并为部署、升级和回滚提供一个稳定的路径&#xff0c;从而解决了Kubernetes最常见的安装复杂…

软件测试学习笔记丨Selenium学习笔记:css定位

本文转自测试人社区&#xff0c;原文链接&#xff1a;https://ceshiren.com/t/topic/22511 本文为霍格沃兹测试开发学社的学习经历分享&#xff0c;写出来分享给大家&#xff0c;希望有志同道合的小伙伴可以一起交流技术&#xff0c;一起进步~ 说明&#xff1a;本篇博客基于sel…

【瑞吉外卖】-day01

目录 前言 第一天项目启动 获取资料 创建项目 ​编辑 连接本地数据库 连接数据库 修改用户名和密码 ​编辑创建表 创建启动类来进行测试 导入前端页面 创建项目所需目录 检查登录功能 登录界面 登录成功 登录失败 代码 退出功能 易错点 前言 尝试一下企业级项…

【论文阅读】ESRGAN

学习资料 论文题目&#xff1a;增强型超分辨率生成对抗网络&#xff08;ESRGAN: Enhanced Super-Resolution Generative Adversarial Networks&#xff09;论文地址&#xff1a;[1809.00219] ESRGAN&#xff1a;增强型超分辨率生成对抗网络代码&#xff1a;xinntao / ESRGAN&am…

【HarmonyOS】判断应用是否已安装

【HarmonyOS】判断应用是否已安装 前言 在鸿蒙中判断应用是否已安全&#xff0c;只是通过包名是无法判断应用安装与否。在鸿蒙里新增了一种判断应用安装的工具方法&#xff0c;即&#xff1a;canOpenLink。 使用该工具函数的前提是&#xff0c;本应用配置了查询标签querySch…

Linux内核-tmpfs虚拟文件系统

作者介绍&#xff1a;简历上没有一个精通的运维工程师。希望大家多多关注作者&#xff0c;下面的思维导图也是预计更新的内容和当前进度(不定时更新)。 我们的Linux进阶部分&#xff0c;到目前为止&#xff0c;已经讲过&#xff1a;硬件&#xff0c;日常运维&#xff0c;基础软…

机器视觉运动控制一体机在DELTA并联机械手视觉上下料应用

市场应用背景 DELTA并联机械手是由三个相同的支链所组成&#xff0c;每个支链包含一个转动关节和一个移动关节&#xff0c;具有结构紧凑、占地面积小、高速高灵活性等特点&#xff0c;可在有限的空间内进行高效的作业&#xff0c;广泛应用于柔性上下料、包装、分拣、装配等需要…

基于SSM+小程序的购物管理系统1

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1、项目介绍 基于SSM小程序的购物管理系统1&#xff0c;可以实现首页、个人中心、商品分类管理、商品信息管理、特价商品管理、用户管理、留言板管理、系统管理、订单管理等功能。方便用户对首页、商品…

Redis 事务 总结

前言 相关系列 《Redis & 目录》&#xff08;持续更新&#xff09;《Redis & 事务 & 源码》&#xff08;学习过程/多有漏误/仅作参考/不再更新&#xff09;《Redis & 事务 & 总结》&#xff08;学习总结/最新最准/持续更新&#xff09;《Redis & 事务…

正点原子阿尔法ARM开发板-IMX6ULL(十一)——IIC协议和SPI协议--AP3216C环境光传感器和ICM20608六轴传感器

文章目录 一、前言二、 IIC协议2.1 协议解读2.1.1 起始位、停止位、数据传输2.1.2 写时序2.1.3 读时序 2.2 代码分析2.3 AP3216C环境光传感器的代码分析 三、SPI协议3.1 协议解读3.2 代码分析3.3 ICM-20608六轴传感器代码分析 一、前言 看了IIC&#xff0c;我之前毕设用过这个…

Vmware虚拟机解决摄像头无效,相机失效

问题&#xff1a; 使用vmware虚拟机&#xff0c;打开windows的虚拟机&#xff0c;发现找不到摄像头&#xff0c;打开自带的相机软件报错&#xff1a; 解决方法如下&#xff0c;依次点击vmware状态栏的 虚拟机-可移动设备-chicony integrated camera-连接&#xff08;断开与主…

MySQL用户权限管理属于SQL语句中的DCL语句

1.用户授权 语法&#xff1a;grant 权限&#xff0c;权限&#xff0c;on 库名&#xff0c;表名 to 用户名 [identified by 密码] MySQL5的版本&#xff0c;如果这个用户事先不存在&#xff0c;这个grant命令去给用户授权的时候&#xff0c;会将用户一起创建出来&#xff0…

已解决 django.db.utils.OperationalError: (1051, “Unknown table

报错信息&#xff1a; django.db.utils.OperationalError: (1051, "Unknown table bjybolg.tool_submission")python manage.py migrate --fake 命令用于告诉 Django 假装已经应用某个迁移&#xff0c;而不实际执行该迁移的操作。这通常在以下情况下非常有用&#x…

Linux shell编程学习笔记87:blkid命令——获取块设备信息

0 引言 在进行系统安全检测时&#xff0c;我们需要收集块设备的信息&#xff0c;这些可以通过blkid命令来获取。 1 blkid命令的安装 blkid命令是基于libblkid库的命令行工具&#xff0c;可以在大多数Linux发行版中使用。 如果你的Linux系统中没有安装blkid命令&#xff0c;…

堆的应用——堆排序和TOP-K问题

1.堆排序 想法⼀&#xff1a; 基于已有数组建堆、取堆顶元素完成排序。也就是利用写好的堆数据结构&#xff08;之前的文章有讲解&#xff09;&#xff0c;去实现排序。 void HeapSort(int* a, int n){HP hp;for(int i 0; i < n; i){HPPush(&hp,a[i]);}int i 0;whi…

HexForge:一款用于扩展安全汇编和十六进制视图的IDA插件

关于HexForge HexForge是一款用于扩展安全汇编和十六进制视图的IDA插件&#xff0c;在该工具的帮助下&#xff0c;广大研究人员可以方便地直接从 IDA Pro 界面数据解码、解密或执行安全数据审计任务。 功能介绍 1、从 IDA 的反汇编或十六进制视图复制原始十六进制&#xff1b;…

00 DSA-- 入门、实现动态数组、实现链表、栈和队列、环形数组、哈希表

两种代码模式 核心代码模式 核心代码模式&#xff1a;就是给你一个函数框架&#xff0c;你需要实现函数逻辑&#xff0c;这种模式一般称之为。 目前大部分刷题平台和技术面试/笔试场景都是核心代码模式。 比如力扣第一题两数之和&#xff0c;就是给出 twoSum 函数的框架如下…

Jmeter压力测试简单教程(包括服务器状态监控)

前段时间公司需要对服务器进行压力测试&#xff0c;包括登录前的页面和登录后的页面&#xff0c;主要目的是测试负载均衡的实现效果。不知道是不是因为Jmeter不如loadRunner火爆还是什么&#xff0c;网上关于Jmeter的资料有很多但是大多千篇一律&#xff0c;要么简单弄个页面测…

Android 开发 调节声音 SeekBar自定义样式

效果图 xml布局 mipmap/seekbar图片随意一张图都可以&#xff0c;这里我的图就不贴出来了 <SeekBarandroid:id"id/seekBar"android:layout_marginLeft"8dp"android:layout_width"377dp"android:layout_height"8dp"android:layou…

循序渐进丨openGauss / MogDB 数据库内存占用相关SQL

一、内存总体分布 数据库总体内存使用分布 select * from gs_total_memory_detail; 当dynamic_used_memory大于max_dynamic_memory就会报内存不足&#xff1b;如果此时dynamic_used_memory小于max_dynamic_memory&#xff0c;而dynamic_peak_memory大于max_dynamic_memory表…