一、目的
用Flume采集Kafka写入到Hive的ODS层在HDFS路径下的JSON数据,需要在DWD层进行解析并清洗
(一)Hive的ODS层建静态分区外部表
create external table if not exists ods_queue( queue_json string ) comment '静态排队数据表——静态分区' partitioned by (day string) row format delimited fields terminated by '\x001' lines terminated by '\n' stored as SequenceFile tblproperties("skip.header.line.count"="1");
(二)Flume将Kafka数据写入ODS层表在HDFS的路径下
(三)ODS层表数据样例
(四)ODS层表的JSON数据样例
JSON数据字段queue_json中除了单个字段如deviceNo外,还包括json数组queueList
{
"deviceNo": "radar-1020",
"createTime": "2023-10-16 10:18:55",
"laneNum": 5,
"queueList": [
{
"laneNo": 8,
"queueLen": 28.0,
"queueHead": 24.0,
"queueTail": 23.0,
"queueCount": 88
},
{
"laneNo": 5,
"queueLen": 12.0,
"queueHead": 45.0,
"queueTail": 2.0,
"queueCount": 91
},
{
"laneNo": 7,
"queueLen": 79.0,
"queueHead": 1.0,
"queueTail": 78.0,
"queueCount": 71
},
{
"laneNo": 7,
"queueLen": 87.0,
"queueHead": 99.0,
"queueTail": 38.0,
"queueCount": 42
},
{
"laneNo": 14,
"queueLen": 51.0,
"queueHead": 41.0,
"queueTail": 52.0,
"queueCount": 36
},
{
"laneNo": 1,
"queueLen": 10.0,
"queueHead": 81.0,
"queueTail": 27.0,
"queueCount": 57
},
{
"laneNo": 5,
"queueLen": 40.0,
"queueHead": 81.0,
"queueTail": 19.0,
"queueCount": 42
},
{
"laneNo": 5,
"queueLen": 80.0,
"queueHead": 96.0,
"queueTail": 34.0,
"queueCount": 100
}
]
}
二、所需Hive函数介绍
除了用到get_json_object函数或json_tuple函数外,json数组还需要用到explode函数、regexp_replace函数以及lateral view函数
(一)get_json_object函数(解析json字段,不包含json数组)
1、语法:get_json_object(string json_string, string path)
2、说明:解析 json 的字符串 json_string,返回 path 指定的内容。如果输入的 json 字符串无效,那么返回 NULL。
3、这个函数每次只能返回一个数据项。
4、SQL样例
select a.timestamp, get_json_object(a.appevents,
'$.eventid'
), get_json_object(a.appenvets,
'$.eventname'
) from log a;
(二)json_tuple函数(解析json字段,不包含json数组)
1、语法:json_tuple(json_string, k1, k2 ...)
2、
说明:解析json的字符串json_string,可指定多个json数据中的key,返回对应的value。如果输入的json字符串无效,那么返回NULL。
3、一次解析出多个数据项的函数
4、json_tuple
函数不能加$.,否则会解析不到
5、SQL样例
select a.timestamp, b.*
from log a lateral view json_tuple(a.appevent,
'eventid'
,
'eventname'
) b as f1, f2;
(三)
explode函数
1、语法:explode(Array OR Map)
2、说明:explode()函数接收一个array或者map类型的数据作为输入,然后将array或map里面的元素按照每行的形式输出,即将hive一列中复杂的array或者map结构拆分成多行显示,也被称为列转行函数。
(四)regexp_replace函数
1、语法: regexp_replace(string A, string B, string C)
2、说明:将字符串A中的符合java正则表达式B的部分替换为C。注意,在有些情况下要使用转义字符,类似oracle中的regexp_replace函数。
(五)lateral view函数
1、说明
在用UDTF比如explode的时候,由于SELECT 只支持一个字段,因此需要lateral view函数!
lateral view函数一般用于和split、explode等UDTF一起使用的,它能将一行数据拆分成多行数据,在并此基础上可以对拆分的数据进行聚合。
2、SQL样例
select col_A,col_B,tmp_table.tmp_col
from test_table
lateral view explode(split(col_C,'分隔符')) tmp_table as tmp_col
where partition_name='xxx';
3、SQL样例说明
col_A,col_B,col_C: 都是原表 test_table 的列(字段);
tmp_table:explode形成的新虚拟表,可以不写;
tmp_col:explode 形成的新列(字段);
三、解析JSON数据的HiveSQL(包含json数组)
(一)SQL语句
with t1 as(
select
get_json_object(queue_json,'$.deviceNo') device_no,
get_json_object(queue_json,'$.createTime') create_time,
get_json_object(queue_json,'$.laneNum') lane_num,
get_json_object(queue_json,'$.queueList') queue_list,
day
from hurys_dc_ods.ods_queue)
select
t1.device_no,
t1.lane_num,
t1.create_time,
get_json_object(list_json,'$.laneNo') lane_no,
get_json_object(list_json,'$.queueCount') queue_count,
get_json_object(list_json,'$.queueLen') queue_len,
get_json_object(list_json,'$.queueHead') queue_head,
get_json_object(list_json,'$.queueTail') queue_tail,
date(t1.create_time) day
from t1
lateral view explode(split(regexp_replace(regexp_replace(queue_list,'\\[|\\]','') , --将json数组两边的中括号去掉
'\\}\\,\\{','\\}\\;\\{'), --将json数组元素之间的逗号换成分号
'\\;') --以分号作为分隔符(split函数以分号作为分隔)
)list_queue as list_json
;
(二)HQL执行结果
json字段解析成功!