【Flink实战】flink消费http数据并将数组展开多行

news2024/9/22 22:12:21

文章目录

  • 一. 需求描述
  • 二. 方案思路
    • 1. 解决思路
    • 2. flink json 解析
      • 2.1. 通过json path解析非array数据
      • 2.2. 通过json path解析array数据
    • 3. CROSS JOIN逻辑
  • 三. 方案实现
    • 1. http json数据样例
    • 2. flink sql 说明

一. 需求描述

flink消费http接口的数据,将json中的数组展开多行

如下样例数据以及要求处理的数据效果

{  
  "name": "John Doe",  
  "age": 30,  
  "address": {  
    "street": {  
      "street": "123 Main St",  
      "city": "New York",  
      "state": "NY"  
    },  
    "city": "New York",  
    "state": "NY"  
  },  
  "phoneNumbers": [  
    {  
      "type": "home",  
      "number": "212-555-1234"  
    },  
    {  
      "type": "fax",  
      "number": "646-555-4567"  
    }  
  ],  
  "children": [],  
  "spouse": null  
}

nameagestreetcitystatephone_typephone_number
John Doe30123 Main StNew YorkNYhome212-555-1234
John Doe30123 Main StNew YorkNYfax646-555-4567

二. 方案思路

1. 解决思路

  1. flink 消费http接口的数据(json),发送到下游
  2. 下游算子解析json数据,当遇到数组时,算子解析返回array
  3. 通过使用CROSS JOIN 将数组数据拍平,如上表格展现

2. flink json 解析

2.1. 通过json path解析非array数据

如下通过flink内置函数:JSON_VALUE 进行数据解析,支持多种类型的输出,默认输出为string。

在这里插入图片描述

这里使用 cast转换,如下举例

cast(JSON_VALUE(json_string,'$.id') as int) as id ,  
JSON_VALUE(json_string,'$.name')  as name,  
cast(JSON_VALUE(json_string,'$.details.age.real') as int) as  `real`  ,  
JSON_VALUE(json_string,'$.details.address') as address,

 

2.2. 通过json path解析array数据

官网:目前JSON_QUERY虽然能够包装为array但实际上总是会返回为string,不符合要求。

在这里插入图片描述

如下:

<dependencies>  
    <dependency>  
        <groupId>com.jayway.jsonpath</groupId>  
        <artifactId>json-path</artifactId>  
        <version>2.6.0</version>  
    </dependency>  
    <dependency>  
        <groupId>org.apache.flink</groupId>  
        <artifactId>flink-table-common</artifactId>  
        <version>${flink.version}</version>  
        <scope>provided</scope>  
    </dependency>

通过udf解决

package com.dtstack.chunjun.local.test;  
  
import com.jayway.jsonpath.JsonPath;  
import org.apache.flink.table.functions.ScalarFunction;  
  
import java.util.ArrayList;  
import java.util.List;  
  
public class JsonArrayFieldExtractor extends ScalarFunction {  
  
    public List<String> eval(String jsonString, String jsonPath) {  
        if (jsonString == null || jsonString.isEmpty()) {  
            return new ArrayList<String>();  
        }  
        try {  
            List<?> result = JsonPath.read(jsonString, jsonPath);   
            List<String> stringList = new ArrayList<>();  
            for (Object obj : result) {  
                stringList.add(obj.toString());  
            }  
            return stringList;  
        } catch (Exception e) {  
            return new ArrayList<String>();  
        }  
    }  
  
}

3. CROSS JOIN逻辑

Array Expansion

在这里插入图片描述

注意:CROSS JOIN 返回两个连接表的笛卡尔积,当有多个数组时会产生笛卡尔积。比如:两个数组,分别有100个元素,那么如果使用两次CROSS JOIN 则会产生1万行数据。

 

三. 方案实现

1. http json数据样例

{  
  "id": 1,  
  "name": "Alice",  
  "details": {  
    "age": {"real":11},  
    "address": "123Mainst",  
    "contacts": [  
      {  
        "type": "email",  
        "value": "alice@example.com"  
      },  
      {  
        "type": "phone",  
        "value": "123-456-7890"  
      }  
    ],  
    "grade": [  
      {  
        "grade": [{"zz":11},{"zz":11}],  
        "bb": {"rr":{"yy":"alice@example.com"}}  
      },  
      {  
        "grade": [{"zz":22}],  
        "bb": {"rr":{"yy":"alice@example.com"}}  
      }  
    ]  
  }  
}

 

2. flink sql 说明

CREATE TEMPORARY SYSTEM FUNCTION get_json_array AS 'com.dtstack.chunjun.local.test.JsonArrayFieldExtractor';

CREATE TABLE source
(
       json_string varchar
) WITH (
      'connector' = 'http-x'
      ,'url' = 'http://localhost:8088/api/arraypage'
      ,'intervalTime'= '3000'
      ,'method'='get'                              --请求方式:get 、post
      ,'decode'='text'                             -- 数据格式:只支持json模式
                                                   -- 以下4个参数要同时存在:
      ,'page-param-name'='pagenum'                          -- 多次请求参数1:分页参数名:例如:pageNum
      ,'start-index'='1'                             -- 多次请求参数2:开始的位置
      ,'end-index'='4'                               -- 多次请求参数3:结束的位置
      ,'step'='1'                                  -- 多次请求参数4:步长:默认值为1
      );

CREATE TABLE sink
(
    id               int,
    name             varchar,
    `real`               int,
    address                varchar,
    zz                int,
    yy                varchar
) WITH (
      'connector' = 'print'
      );


insert into sink   SELECT
                      cast(JSON_VALUE(json_string,'$.id') as int) as id ,
                       JSON_VALUE(json_string,'$.name')  as name,
                       cast(JSON_VALUE(json_string,'$.details.age.real') as int) as  `real`  ,
                        JSON_VALUE(json_string,'$.details.address') as address,
                        cast(`$.grade[*].grade[*].zz` as int ) as zz,
                        `$.details.grade[*].bb.rr.yy` as yy
                      FROM source
                    CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].grade[*].zz' )) AS T(`$.grade[*].grade[*].zz`)
                    CROSS JOIN UNNEST(get_json_array(json_string, '$.details.grade[*].bb.rr.yy'   )) AS T1(`$.details.grade[*].bb.rr.yy`);





--{
--  "id": 1,
--  "name": "Alice",
--  "details": {
--    "age": {"real":11},
--    "address": "123Mainst",
--    "contacts": [
--      {
--        "type": "email",
--        "value": "alice@example.com"
--      },
--      {
--        "type": "phone",
--        "value": "123-456-7890"
--      }
--    ],
--    "grade": [
--      {
--        "grade": [{"zz":11},{"zz":11}],
--        "bb": {"rr":{"yy":"alice@example.com"}}
--      },
--      {
--        "grade": [{"zz":22}],
--        "bb": {"rr":{"yy":"alice@example.com"}}
--      }
--    ]
--  }
--}

消费结果
在这里插入图片描述

具体逻辑描述

  1. http连接器消费http接口数据 具体使用chunjun的http连接器,相关代码见:我提供的相关pr:
    [feature-DTStack#1775][connector][http] http supports offline mode

  2. 使用JSON_VALUE、get_json_array解析为string和array<string>,之后使用cast进行类型转换

  3. CROSS JOIN 生成笛卡尔积

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2155993.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【JavaEE初阶】多线程7(面试要点)

欢迎关注个人主页&#xff1a;逸狼 创造不易&#xff0c;可以点点赞吗~ 如有错误&#xff0c;欢迎指出~ 目录 常见的锁策略 乐观锁vs悲观锁 重量级锁vs轻量级锁 挂起等待锁vs自旋锁 公平锁vs非公平锁 可重入锁vs不可重入锁 读写锁 synchronized的加锁过程 锁升级的过程 偏向锁 …

博途TIA v18下载时,需要重启才能安装下载路径是灰色改不了

一、需要重启才能安装 删除下面注册表P开头的文件&#xff1a; 二、下载路径是灰色改不了 注册表HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows\CurrentVersion里找到C:\Program Files或者C:\Program Files&#xff08;x86&#xff09;&#xff0c;具体哪个看安装的时候对应…

【面向对象】设计原则

单一职责&#xff1a;低耦合&#xff0c;高内聚。一个类不要负责太多的功能&#xff0c;否则会导致类内部耦合度高&#xff0c;不利于扩展&#xff1b;开闭原则&#xff1a;对扩展开放&#xff0c;对修改关闭。扩展新功能时&#xff0c;不建议修改原有的代码&#xff0c;建议通…

【CSS】样式

文本color 颜色font-size 大小font-family 字体font-style 样式font-weight 加粗text-decoration 下划线text-shadow 阴影text-transform 大小写变换text-indent 缩进text-align 水平对齐 、vertical-align垂直对齐text-overflow 溢出word-wrap 换行 、word-break 截断 、white…

GAMES101(15节)

Irradiance辐射度量学 辐射度量学在渲染领域&#xff0c;可以帮助理解基于物理的光照模型 radiant energy辐射能量Q&#xff0c;累计总能量&#xff08;单位J joule焦耳&#xff09;&#xff0c;就像太阳能板&#xff0c;光照时间越长接收能量越多&#xff0c;收到的能量总和…

Vue点击按钮生成pdf文件/Vue点击按钮生成png图片

本次案例是vue的点击生成pdf文件和png格式的图片 一、生成pdf文件案例 看代码之前&#xff0c;我们肯定得需要看看&#xff0c;效果图是什么的啦&#xff0c;这样子才能先看看自己想要实现的效果是不是这样子的&#xff01;上效果图嘿嘿嘿~ A、实现的效果图 这是页面&#…

【M-LOAM学习】

M-LOAM(INITIALIZATION) Article Analysis Scan-Based Motion Estimation 通过在consecutive frame (each LiDAR)&#xff08;因为omp parallel&#xff09;中寻找correspondences然后通过最小化所有考虑feature之间residual error的transformation between frame to frame 针…

java(3)数组的定义与使用

目录 1.前言 2.正文 2.1数组的概念 2.2数组的创建与初始化 2.2.1数组的创建 2.2.1数组的静态初始化 2.2.2数组的动态初始化 2.3数组是引用类型 2.3.1引用类型与基本类型区别 2.3.2认识NULL 2.4二维数组 2.5数组的基本运用 2.5.1数组的遍历 2.5.2数组转字符串 2.…

ETCD学习使用

一、介绍 etcd&#xff08;分布式键值存储&#xff09;是一个开源的分布式系统工具&#xff0c;用于可靠地存储和提供键值对数据。etcd 通常通过 HTTP 或 gRPC 提供 API&#xff0c;允许应用程序通过简单的接口与其交互。由于其可靠性和稳定性&#xff0c;etcd 在构建可扩展、分…

基于springboot的在线视频点播系统

文未可获取一份本项目的java源码和数据库参考。 国外研究现状&#xff1a; 与传统媒体不同的是&#xff0c;新媒体在理念和应用上都采用了新颖的媒介或媒体。新媒体是指应用在数字技术、在传统媒体基础上改造、或者更新换代而来的媒介或媒体。新兴媒体与传统媒体在理念和应用…

UML——统一建模语言

序言&#xff1a; 是统一建模语言的简称&#xff0c;它是一种由一整套图表组成的标准化建模语言。UML用于帮助系统开发人员阐明&#xff0c;展示&#xff0c;构建和记录软件系统的产出。UML代表了一系列在大型而复杂系统建模中被证明是成功的做法&#xff0c;是开发面向对象软件…

ModbusTCP通讯错误的排查

Modbus是一种由MODICON公司开发的工业现场总线协议标准&#xff0c;是一项应用层报文传输协议。该协议用于传输数字和模拟变量[1]。有关该协议的报文具体格式&#xff0c;以及一些基本概念&#xff0c;见[1]。 本文以一个例子&#xff0c;阐述当ModbusTCP通讯出现错误的时候&a…

文件上传、重定向、Gin路由

文件上传 单个文件上传 index.html 文件上传前端页面代码&#xff1a; <!DOCTYPE html> <html lang"zh-CN"> <head><title>index</title> </head> <body> <form action"/upload" method"post"…

MySQL学习(索引)

文章目录 基本概念单列索引普通索引&#xff08;index&#xff09;唯一索引&#xff08;unique&#xff09;主键索引 组合索引全文索引&#xff08;fulltext&#xff09;空间索引&#xff08;spatial&#xff09;MySQL存储引擎 基本概念 通过某种算法&#xff0c;构建数据模型&…

云手机的海外原生IP有什么用?

在全球数字化进程不断加快的背景下&#xff0c;企业对网络的依赖程度日益加深。云手机作为一项创新的工具&#xff0c;正逐步成为企业优化网络结构和全球业务拓展的必备。尤其是云手机所具备的海外原生IP功能&#xff0c;为企业进入国际市场提供了独特的竞争优势。 什么是海外原…

高等数学——微分学

1. 一元函数微分学 1.1. 导数概念 1.2. 导数运算 1.3. 导数与几何 2. 多元函数微分学 2.1. 多元函数的极限 2.1.1. 计算 直接代入法 无穷小乘有界 有理化型 等价无穷小型 ……总结 2.1.2. 是否存在 考试中,判断极限是否存在的问题,答案一般都是不存在。因为,证明一个…

视频怎么剪切掉一部分?6款视频剪切软件,零基础也能快速学会!

您是否也曾遇到了这样的一个问题&#xff1a;在录制完视频之后&#xff0c;发现视频中存在一些多余或者不想要的片段&#xff0c;想要将它剪切掉却不知道具体要怎么操作&#xff1f;别担心&#xff0c;几乎所有视频都会需要这样的调整才能更加出色。如果您是刚入门的视频剪辑初…

MATLAB中多张fig图合并为一个图

将下列两个图和为一个图 打开查看-----绘图浏览器 点击第一幅图中曲线右键复制&#xff0c;到第二幅图中粘贴即可完成

设计模式之组合模式例题

答案&#xff1a;C A 知识点&#xff1a;组合模式的意图&#xff1a;将对象组合成树型结构以表示“整体-部分”的层次结构&#xff0c;使得用户对单个对象和组合对象的使用具有一致性

TMS320F28335的RS232 通信实验

TMS320F28335 内部含有非常多的通信接口,其中串口是通信接口中应用 非常广泛之一,开发板上集成了一个 RS232 模块,其中串口就是接在 F28335 芯 片的 SCIA 接口。 F28335 通过 SCIA 实现与 PC 机对话,F28335 的 SCIA 收到 PC 机发来的数据后 原封不动的返回给 PC 机显示,定…