Flink Hudi DataStream API代码示例

news2025/1/18 19:01:40

前言

总结Flink通过DataStream API读写Hudi Demo示例,主要是自己备忘用。

  • 最开始学习Flink时都是使用Flink SQL,所以对于Flink SQL读写Hudi比较熟悉。但是对于写代码实现并不熟悉,而有些需求是基于Flink代码实现的,所以需要学习总结一下。
  • 仅为了实现用代码读写Hudi的需求,其实有两种方式,一种是在代码里通过Flink SQL API,也就是代码中执行Flink SQL,这种方式其实和通过SQL实现差不多,另一种方式是通过DataStream API实现。(现实中包括网上教程使用最多的应该是Flink SQL API)
  • 本文主要是总结DataStream API方式
  • DataStream API方式有一种好处是方便IDEA本地调试Hudi源码,便于学习,当然SQL API也是可以进行本地调试源码的,但是因为我对Flink SQL源码不熟悉,调试起来比较费劲。SQL API调试源码的难点在于我不知道从Flink SQL的源码到Hudi源码的入口在哪,因为这里牵扯到SQL解析的源码,可能比较麻烦(没有研究过)。比如我之前总结的Hudi Spark SQL源码相关的文章:Hudi Spark SQL源码学习总结-Create Table

代码

GitHub地址:https://github.com/dongkelun/hudi-demo/tree/master/hudi0.13_flink1.15

官网地址:https://hudi.apache.org/docs/flink-quick-start-guide/

package com.dkl.hudi.flink;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;

import java.util.HashMap;
import java.util.Map;

public class HudiDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String targetTable = "t1";
        if (args.length > 0) {
            targetTable = args[0];
        }
        String basePath = "/tmp/flink/hudi/" + targetTable;

        Map<String, String> options = new HashMap<>();
        options.put(FlinkOptions.PATH.key(), basePath);
//        options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
//        options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
//        options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
        options.put("hive_sync.mode", "hms");
        options.put("hive_sync.conf.dir", "/usr/hdp/3.1.0.0-78/hive/conf");
        options.put("hive_sync.db", "hudi");
        options.put("hive_sync.table", targetTable);
        options.put("hive_sync.partition_fields", "dt");
        options.put("hive_sync.partition_extractor_class", "org.apache.hudi.hive.HiveStylePartitionValueExtractor");
        options.put("hoodie.datasource.write.hive_style_partitioning", "true");
        options.put("hoodie.datasource.hive_sync.create_managed_table", "true");

//        options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read
//        options.put(FlinkOptions.READ_START_COMMIT.key(), "'20210316134557'"); // specifies the start commit instant time

        DataStream<RowData> dataStream = env.fromElements(
                GenericRowData.of(1, StringData.fromString("hudi1"), 1.1, 1000L, StringData.fromString("2023-04-07")),
                GenericRowData.of(2, StringData.fromString("hudi2"), 2.2, 2000L, StringData.fromString("2023-04-08"))
        );
//        dataStream.print();
        HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
                .column("id int")
                .column("name string")
                .column("price double")
                .column("ts bigint")
                .column("dt string")
                .pk("id")
                .partition("dt")
                .options(options);

        builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
        env.execute("Hudi_Api_Sink");
        DataStream<RowData> rowDataDataStream = builder.source(env);
        rowDataDataStream.print();
        env.execute("Hudi_Api_Source");
    }
}
  • 因为本地连接服务器上的hive比较麻烦,所以本地运行的话,需要把同步hive关掉,如果在服务器上运行,把同步hive的配置项打开就可以了
  • 这里的代码和官方文档是差不多的,主要是官方文档没有提供如何构造写Hudi的数据集DataStream<RowData>,这里给出简单的示例

pom

我在GitHub上提交pom的引用的依赖比较多,是因为在Idea本地调试和在服务器上运行需要的依赖不太一样,本地运行需要的依赖比较多,而且还有很多依赖冲突。如果只需要在服务器上运行,则只需要下面三个依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink1.15-bundle</artifactId>
            <version>${hudi.version}</version>
        </dependency>
    </dependencies>

github上的依赖既可以在本地进行调试,也可以打包直接在服务器上运行。因为打包时没有将依赖打到包里面,这需要在服务器上面的flink lib下提前配置好相应的jar包。

服务器运行

bin/flink run -c com.dkl.hudi.flink.HudiDemo /opt/dkl/hudi0.13_flink1.15-1.0.jar flink_hudi_dmeo

本地运行调试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/554626.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ZYNQ无SD卡配置Linux系统到QSPI Flash和eMMC

硬件&#xff1a;黑金AX7450开发板、zynq7100、QSPI Flash、eMMC Flash 软件&#xff1a;Vivado 2017.4、Petalinux 我用了一台Windows主机&#xff0c;用于设计Vivado和烧写QSPI Flash&#xff0c;一台Ubuntu主机&#xff0c;用于运行Petalinux配置Linux系统。 硬件设计 新建…

10 工具Bootchart的使用(windows)

Bootchart的使用方法&#xff08;windows&#xff09; 下载bootchart.jar并拷贝到windows, 然后保证windows也安装了open jdk 1.8; 下载地址&#xff1a;https://download.csdn.net/download/Johnny2004/87807973 打开设备开机启动bootchart的开关: adb shell touch /data/boo…

生成VLC 及其模块的全面帮助

使用vlc.exe -H命令生成VLC帮助文件vlc-help.txt -h, --help, --no-help 打印 VLC 帮助 (可以和 --advanced 和 --help-verbose 一起使用) (默认禁用) -H, --full-help, --no-full-help VLC 及…

如何创造一个属于自己的springboot stater

如何创造一个属于自己的springboot stater 什么是staterstater是怎么实现注入进来的如何进行约定 基于上述理论的demo实现功能代码目录核心实现spring.factoriesSpringMessageSubscribe&#xff08;扫描所有Subscribe注解生成消息订阅&#xff09;基于Redis的消息订阅基于redis…

对封装好的Vue组件库进行打包,并发布到npm上

1. 新建vue 项目 并且在根目录创建两个文件夹 packages和examples。 packages&#xff1a;用于存放所有的组件 examples&#xff1a;用于进行测试组件&#xff0c;把src改为examples 2.配置vue.config.js 并设置入口文件 如果没有vue.config.js文件 就需要在项目根目录下创…

数说故事@FBIC丨首发食饮SMI社媒心智品牌榜,为品牌支招紧跟健康新风尚

第八届Foodaily创博会&#xff08;FBIC全球食品饮料创新大会&#xff09;于5月14-16日在上海跨国采购会展中心圆满落幕&#xff0c;呈现了一场食品饮料行业盛会。数说故事与众多食饮健康品牌一起&#xff0c;走过了一段大数据AI加持的创新之旅。 数说故事VP孙淑娟Jessie受邀分享…

Android APP 集成系统签名

由于android 系统权限限制&#xff0c;很多时候普通APP权限无法完成&#xff0c;需要系统APP才有足够的权限&#xff0c; 比如&#xff1a;安装、卸载应用&#xff0c;重启设备&#xff0c;恢复出厂设置&#xff0c;以及设置里面的一些功能&#xff0c;都是需要系统权限才能调…

【WLAN网络故障,带你搞定它!】

01 无线网卡搜索不到 AP的无线信号 01 问题现象 无线网卡搜索不到 AP 的无线信号 02 问题分析 无线网卡搜索不到 AP 的信号 ,原因可以从两方面着手&#xff1a; 1.无线网卡 AP本身 在遇到该问题的时候&#xff0c;我们可以从以上两个方面进行处理。 03 处理过程 1.无线…

Python GUI编程:使用wxPython处理长文本

这段代码的应用场景有&#xff1a; 在文本编辑器和IDE等应用程序中&#xff0c;可以使用这个示例代码来处理长文本&#xff0c;以便用户更好地查看和编辑文本。在数据分析和科学计算等领域中&#xff0c;可以使用这个示例代码来显示和处理大量的数据和结果。在日志分析和系统监…

解决方案 TestCenter自动测试软件平台

方案概述 TestCenter是一个专为加速您的测试系统软件开发而设计的自动测试系统软件平台&#xff0c;主要应用于测试程序的开发、运行和管理。TestCenter实现了对测试资源管理、测试程序开发与调试、测试数据管理以及测试程序发布等功能的无缝集成和统一部署&#xff0c;这将帮…

Google I/O 2023 推出Flutter 3.10 快来看看都有哪些变化

本文首发自[慕课网] &#xff0c;想了解更多IT干货内容&#xff0c;程序员圈内热闻&#xff0c;欢迎关注"慕课网"及“慕课网公众号”&#xff01; 作者&#xff1a; CrazyCodeBoy |慕课网名师 今年的Google I/O满满的 AI与狠活&#xff0c;而且还推出 Flutter 3…

Flutter一天一控件之ListTile(列表的实现)

ListTile简介 Flutter中的ListTile控件是一种常用的列表项控件&#xff0c;它可以用于显示列表中的每一个项&#xff0c;通常包含标题、副标题、图标等内容。ListTile控件的外观和行为类似于Android中的ListView中的列表项。 一个简单的ListTile示例&#xff1a; ListTile(l…

大流量卡介绍:网上的大流量卡都是怎么来的?

大流量卡介绍&#xff0c;你知道网上的大流量卡都是怎么来的&#xff1f; 其实&#xff0c;网上29元155G、39元180G的优惠套餐&#xff0c;本身都是我们常见的流量卡如电信星卡、联通王卡、移动花卡等等&#xff0c;之所以这么便宜&#xff0c;只不过运营商在这些套餐上面增加…

【新星计划】数据库 排名函数 初识

数据库 排名函数 初识 查询排序初识排名函数row_number()rank()dense_rank()ntile()percent_rank() 开窗函数为聚合函数使用开窗函数 小结 查询排序 在日常工作中&#xff0c;我们对所有需要的数据都会进行一个排序操作&#xff0c;以获得我们最需要的数据。 排序指令 order …

Unreal Niagara粒子入门1

记录下学习Niagara粒子的过程&#xff0c;这次调的是比较简单的一个效果&#xff1a; 使用了随粒子生命的缩放、打开速率解算、基本的发射器和Niagara容器。 1.创建Niagara Niagara中&#xff0c;发射器和NiagaraSystem文件是可以分开创建的&#xff1a; 通常直接点Niagara…

GPT-2(Transformer Decoder)的TensorFlow实现(附源码)

文章目录 一、GPT2实现步骤二、源码 一、GPT2实现步骤 机器学习模型的开发实现步骤一般都包含以下几个部分&#xff1a;   1. 遵照模型的网络架构&#xff0c;实现每一层&#xff08;Layer/Block&#xff09;的函数&#xff1b;   2. 将第1步中的函数组合在一起&#xff0c…

微信小程序nodejs+vue校园快递代拿系统uniapp校园互助系统

语言 node.js 框架&#xff1a;Express 前端:Vue.js 数据库&#xff1a;mysql 数据库工具&#xff1a;Navicat 开发软件&#xff1a;VScode 平台旨在解决目前大学生找人帮忙&#xff0c;难&#xff0c;慢&#xff0c;不可靠以及想兼职同学找不到好的平台的问题。对于招人帮忙的…

应急演练脚本编写的几个步骤

应急演练是一项非常重要的活动&#xff0c;对于保障企业的安全和稳定运行至关重要。而一个完整的应急演练需要编写一个详细的脚本来指导演练过程。以下是应急演练脚本编写的几个步骤。 定义演练场景 首先&#xff0c;需要定义演练场景&#xff0c;这将决定演练的目标和方向。在…

美国原装二手 SR560 低噪声电压前置放大器

Stanford Research SR560低噪声电压前置放大器 ​Stanford Research SR560 是一款高性能、低噪声前置放大器&#xff0c;适用于各种应用&#xff0c;包括低温测量、光学检测和音频工程。 SR560 具有一个具有 4 nV/√Hz 输入噪声和 100 MΩ 输入阻抗的差分前端。完整的噪声系数…

三招教你图片文字转语音怎么转

随着数字化时代的到来&#xff0c;人们对于数字信息的获取和处理需求越来越大&#xff0c;而图片文字转语音技术正是为了满足这一需求而诞生的。这项技术不仅可以辅助视力障碍者&#xff0c;让他们能更轻松地获取信息和理解内容&#xff0c;而且也可以帮助正在学习外语的人们练…