Hudi Flink SQL代码示例及本地调试

news2024/11/26 10:02:07

前言

之前在Flink Hudi DataStream API代码示例中总结了Hudi Flink DataStream API的代码及本地调试,并且在文中提到其实大家用Table API更多一些,但是我感觉Table API调试源码可能会比较难一点,因为可能会涉及到SQL解析,不清楚Table API的入口在哪里。
但是在我总结的上篇文章Flink用户自定义连接器(Table API Connectors)学习总结中知道了其实Flink Table API读写Hudi是通过自定义实现了一个Hudi的Table API Connectors(‘connector’ = ‘hudi’),相关类为HoodieTableFactoryHoodieTableSinkHoodieTableSource,入口为HoodieTableFactory

代码

GitHub地址:https://github.com/dongkelun/hudi-demo/tree/master/hudi0.13_flink1.15

package com.dkl.hudi.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.hudi.configuration.FlinkOptions;

import static org.apache.hudi.examples.quickstart.utils.QuickstartConfigurations.sql;

public class HudiFlinkSQL {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode());
        String tableName = "t1";
        if (args.length > 0) {
            tableName = args[0];
        }
        String tablePath = "/tmp/flink/hudi/" + tableName;
        String hoodieTableDDL = sql(tableName)
                .field("id int")
                .field("name string")
                .field("price double")
                .field("ts bigint")
                .field("dt string")
                .option(FlinkOptions.PATH, tablePath)
//                .option(FlinkOptions.READ_AS_STREAMING, true)
//                .option(FlinkOptions.TABLE_TYPE, "COPY_ON_WRITE")
                .partitionField("dt")
                .pkField("id")
                .end();
        tableEnv.executeSql(hoodieTableDDL);
        tableEnv.executeSql(String.format("insert into %s values (1,'hudi',10,100,'2023-05-28')", tableName));
        tableEnv.executeSql(String.format("select * from %s", tableName)).print();
    }
}

其实就是通过tableEnv.executeSql执行Flink SQL,其中创建Hudi表的SQL语句是利用Hudi源码中模块hudi-examples-flink里面的sql方法生成的,它会根据参数返回对应的创建Hudi表的SQL语句,示例:

        String tableName = "t1";
        String tablePath = "/tmp/flink/hudi/" + tableName;
        String hoodieTableDDL = sql(tableName)
                .option(FlinkOptions.PATH, tablePath)
                .option(FlinkOptions.READ_AS_STREAMING, true)
                .option(FlinkOptions.TABLE_TYPE, "COPY_ON_WRITE")
                .end();
        System.out.println(hoodieTableDDL);

输出

create table t1(
  `uuid` VARCHAR(20),
  `name` VARCHAR(10),
  `age` INT,
  `ts` TIMESTAMP(3),
  `partition` VARCHAR(10),
  PRIMARY KEY(uuid) NOT ENFORCED
)
PARTITIONED BY (`partition`)
with (
  'connector' = 'hudi',
  'read.streaming.enabled' = 'true',
  'path' = '/tmp/flink/hudi/t1',
  'connector' = 'hudi',
  'table.type' = 'COPY_ON_WRITE'
)

如果不指定字段的话,会有官方文档示例中几个默认字段,默认主键为uuid,默认为分区表,分区字段为partition

        hoodieTableDDL = sql(tableName)
                .field("id int")
                .field("name string")
                .field("price double")
                .field("ts bigint")
                .field("dt string")
                .option(FlinkOptions.PATH, tablePath)
                .noPartition()
                .pkField("id")
                .end();
        System.out.println(hoodieTableDDL);

可以通过field方法指定表的字段,pkField指定表的主键,noPartition设置表为非分区表,partitionField指定表的分区字段

create table t1(
  id int,
  name string,
  price double,
  ts bigint,
  dt string,
  PRIMARY KEY(id) NOT ENFORCED
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/flink/hudi/t1',
  'connector' = 'hudi'
)

本地运行调试

运行结果

调试

我们根据开头提到的文章Flink用户自定义连接器(Table API Connectors)学习总结可知入口为HoodieTableFactory,其中sink的入口为createDynamicTableSink,source的入口为createDynamicTableSource



本地调试源码的时候可能会遇到在Idea中源码下载不下来的情况,我们可以直接去官网下载对应的源码jar包,然后放到自己本地的仓库中,方便我们调试的时候阅读源码。比如hudi-flink对应的源码的下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink/0.13.0/hudi-flink-0.13.0-sources.jar,然后将hudi-flink-0.13.0-sources.jar放到路径 m2\repository\org\apache\hudi\hudi-flink\0.13.0 中就可以了

我们发现Table API的入口和DataStream API的入口差不多,DataStream API的入口是在HoodiePipelinesinksource方法里,而这两个方法也是也是分别调用了HoodieTableFactorycreateDynamicTableSinkcreateDynamicTableSource

  /**
   * Returns the data stream sink with given catalog table.
   *
   * @param input        The input datastream
   * @param tablePath    The table path to the hoodie table in the catalog
   * @param catalogTable The hoodie catalog table
   * @param isBounded    A flag indicating whether the input data stream is bounded
   */
  private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {
    FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));
    HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
    return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
        .getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)))
        .consumeDataStream(input);
  }

  /**
   * Returns the data stream source with given catalog table.
   *
   * @param execEnv      The execution environment
   * @param tablePath    The table path to the hoodie table in the catalog
   * @param catalogTable The hoodie catalog table
   */
    private static DataStream<RowData> source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable) {
    FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));
    HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
    DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) ((ScanTableSource) hoodieTableFactory
        .createDynamicTableSource(context))
        .getScanRuntimeProvider(new ScanRuntimeProviderContext());
    return dataStreamScanProvider.produceDataStream(execEnv);
  }

源码调试

我们还可以直接在源码里进行调试,这样方便我们直接看到修改源码后的效果。直接在源码里进行调试可能配置环境会比较麻烦一点,每个版本也不太一样。比如我在hudi 0.13.0源码中模块hudi-examples-flink里进行调试,需要做如下修改:

详细的pom修改可以看我提到github上的commit:https://github.com/dongkelun/hudi/commit/558910d4cab189d0cbfa9c69332f3e4e74e56b41

这样我们就可以本地直接运行源码中的类:HoodieFlinkQuickstart,也可以直接修改源码查看效果:

源码里的demo和测试用例比较全,我们可以多看一看多调试一下,利于提升对源码的理解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/587423.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【单片机 TB作品】基于STM32F103C8T6单片机的甲醛监测与报警系统

文章目录 原理图ZE08-CH2O甲醛传感器A9短信模块实物 原理图 ZE08-CH2O甲醛传感器 ZE08-CH2O是一种甲醛&#xff08;甲醛&#xff09;传感器&#xff0c;用于检测环境中的甲醛浓度。甲醛是一种有毒气体&#xff0c;广泛用于建筑材料、家具、地毯、化妆品和其他消费品的生产中。…

STM32单片机(三)第二节:GPIO输出练习(LED闪烁、LED流水灯、蜂鸣器)

❤️ 专栏简介&#xff1a;本专栏记录了从零学习单片机的过程&#xff0c;其中包括51单片机和STM32单片机两部分&#xff1b;建议先学习51单片机&#xff0c;其是STM32等高级单片机的基础&#xff1b;这样再学习STM32时才能融会贯通。 ☀️ 专栏适用人群 &#xff1a;适用于想要…

案例挑战——MVVM框架理解和实践

MVVM框架理解和实践 一、背景介绍二、 什么是MVVM架构&#xff1f;1.架构示意图2.MVVM概念总结3.实现VM的框架 三、通过案例来理解MVVM框架1.没有使用MVVM架构的程序2.使用了MVVM架构的程序3.对比 四、总结 一、背景介绍 最近正在做新版项目的MVVM架构的结合业务的具体落地&am…

报表控件FastReport使用指南——如何打开WebP格式的图片

FastReport 是功能齐全的报表控件&#xff0c;可以帮助开发者可以快速并高效地为.NET&#xff0c;VCL&#xff0c;COM&#xff0c;ActiveX应用程序添加报表支持&#xff0c;由于其独特的编程原则&#xff0c;现在已经成为了Delphi平台最优秀的报表控件&#xff0c;支持将编程开…

SWUST软件技术基础实验笔记

目录 前言 堆栈的操作 实验目的 实验要求 单链表操作 实验目的 实验要求 二叉树操作 实验目的 实验要求 查找与排序 实验目的 实验要求 查找算法 排序算法 实验总结 前言 软件技术基础实验分为四个部分&#xff0c;涵盖了堆栈的操作、单链表操作、二叉树操作以…

微前端——qiankun配置方法

什么是微前端 微前端是指存在于浏览器中的微服务&#xff0c;其借鉴了微服务的架构理念&#xff0c;将微服务的概念扩展到了前端。 如果对微服务的概念比较陌生的话&#xff0c;可以简单的理解为微前端就是将一个大型的前端应用拆分成多个模块&#xff0c;每个微前端模块可以…

Qt文件系统源码分析—第八篇QFileSystemWatcher

深度 本文主要分析Windows平台&#xff0c;Mac、Linux暂不涉及 本文只分析到Win32 API/Windows Com组件/STL库函数层次&#xff0c;再下层代码不做探究 本文QT版本5.15.2 类关系图 QTemporaryFile继承QFile QFile、QSaveFile继承QFileDevice QFileDevice继承QIODevice Q…

RK最强ARM系列之RK3588+AI+Ethercat(linux +xenomai+igh)实时解决方案

RK3588是瑞芯微新一代旗舰级高端处理器&#xff0c;具有高算力、低功耗、超强多媒体、丰富数据接口等特点。搭载四核A76四核A55的八核CPU和ARM G610MP4 GPU&#xff0c;内置6.0TOPs算力的NPU。 有五大技术优势 1. 内置多种功能强大的嵌入式硬件引擎&#xff0c;支持8K60fps 的…

5.29-kubernetes learning

文章目录 HomeGet StartedThe kubernetes network model First of all &#xff0c;we should understand the layout of this official website page. Home The Home chapter is that the official website has manuals for different versions of k8s ,and then generally…

chatgpt赋能python:Python中单词排序的方法—从入门到精通

Python中单词排序的方法—从入门到精通 Python是一门很流行的编程语言&#xff0c;它是一门被广泛使用的高级编程语言&#xff0c;为开发者提供了丰富的工具和库&#xff0c;在处理字符串、文本信息时也有着广泛的应用。本文主要介绍在Python中进行单词排序的方法。 什么是单…

python pycharm的安装教程

pycharm安装教程&#xff0c;超详细_皮小孩ls的博客-CSDN博客目录 前言 python的安装教程&#xff1a; 1.下载地址&#xff1a; 2. 安装 1&#xff09;customize installation 勾选 use 2&#xff09;.默认 . 3&#xff09;. 选择安装位置 4&#xff09;.耐心等待&…

【CSSpart4--盒子模型】

CSSpart4--盒子模型 网页布局的三大核心&#xff1a;盒子模型&#xff0c;浮动&#xff0c;定位网页布局的过程&#xff08;本质&#xff09;&#xff1a;盒子模型的组成四部分&#xff1a;边框&#xff0c;内容&#xff0c;内边距&#xff0c;外边距 一 、盒子边框border:1.1 …

Queue 队列的实现与应用

目录 1.概念2.常用的队列方法2.1 方法2.2 代码 3.自己实现队列3.1 构造MyQueue3.2 入队列offer()3.3 出队列poll()3.4 获得队头peek()3.5 是否为空isEmpty()3.6 获得队列大小size() 4.循环队列4.1 概念4.2 解析4.3 如何判断队列满4.4 代码&#xff08;保留一个位置实现&#xf…

vue+nodejs校园二手物品交易市场网站_xa1i4

。为满足如今日益复杂的管理需求&#xff0c;各类管理系统程序也在不断改进。本课题所设计的校园二手交易市场&#xff0c;使用vue框架&#xff0c;Mysql数据库、nodejs语言进行开发&#xff0c;它的优点代码不能从浏览器查看&#xff0c;保密性非常好&#xff0c;比其他的管理…

轻松实现动态人脸识别,AidLux加速智慧城市场景化应用落地

随着AI技术进入全新发展阶段&#xff0c;智能物联网&#xff08;AIoT&#xff09;的渗透率进一步加深&#xff0c;应用场景不断拓展&#xff0c;人脸识别也迅速走进了人们的日常生活&#xff0c;在手机解锁、公司考勤、支付验证、天网抓捕在逃嫌犯等场景中发挥着重要作用。 人脸…

dataV教程-浅用dataV

一别多日&#xff0c;好久没有和大家相见了。其一的原因是因为公司的项目&#xff0c;其二就是因为太懒了。现在给大家浅浅的介绍一下这个好用的大屏展示框架吧。如果后续有深入的话&#xff0c;我会出一个详解版本的。 一、dataV介绍 前言:由于当前的大数据时代&#xff0c;…

Github标星60K!mall前台商城系统正式发布,支持完整订单流程!

之前有很多小伙伴问我&#xff0c;mall项目有没有前台商城系统&#xff0c;可见大家对mall项目的前台商城系统还是非常期待的。最近抽空把前台商城系统的功能给完善了&#xff0c;目前已经可以支持完整的订单流程。我已经把前台商城系统开源了&#xff0c;项目地址也放在文末了…

重磅发布!面向装备制造业服务化转型白皮书(私信获取)

《面向装备制造业服务化转型白皮书》 关于白皮书 《面向装备制造业服务化转型白皮书》通过调研160余家装备制造企业的服务化路径及模式&#xff0c;研讨支持企业开展服务型制造的系统化方案&#xff0c;希望为装备制造业服务化转型&#xff0c;探索切实有效的路径以供参考。 …

【MySQL】- 02 MySQL explain执行

目录 1.使用explain语句去查看分析结果2.MYSQL中的组合索引3.使用慢查询分析&#xff08;实用&#xff09;4.MYISAM和INNODB的锁定explain用法详解关于MySQL执行计划的局限性&#xff1a;备注&#xff1a; 1.使用explain语句去查看分析结果 如explain select * from test1 whe…

nSoftware IPWorks 2022 C++ Crack

nSoftware IPWorks 2022 C最全面的互联网组件套件&#xff0c;PKI 代理远程签署代码和文档&#xff0c;无需暴露您的私钥&#xff0c;一种安全的自托管解决方案&#xff0c;可使用集中存储的密钥实现远程代码和文档签名&#xff0c;随附的 PKCS#11 驱动程序允许与 Jarsigner、S…