Apache Hudi 在袋鼠云数据湖平台的设计与实践

news2025/2/23 5:37:00

在大数据处理中,实时数据分析是一个重要的需求。随着数据量的不断增长,对于实时分析的挑战也在不断加大,传统的批处理方式已经不能满足实时数据处理的需求,需要一种更加高效的技术来解决这个问题。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是这样一种技术,提供了高效的实时数据仓库管理功能。

本文将介绍袋鼠云基于 Hudi 构建数据湖的整体方案架构及其在实时数据仓库处理方面的特点,并且为大家展示一个使用 Apache Hudi 的简单示例,便于新手上路。

Apache Hudi 介绍

Apache Hudi 是一个开源的数据湖存储系统,可以在 Hadoop 生态系统中提供实时数据仓库处理功能。Hudi 最早由 Uber 开发,后来成为 Apache 顶级项目。

Hudi 主要特性

· 支持快速插入和更新操作,以便在数据仓库中实时处理数据;

· 提供增量查询功能,可有效提高数据分析效率;

· 支持时间点查询,以便查看数据在某一时刻的状态;

· 与 Apache Spark、Hive 等大数据分析工具兼容。

Hudi 架构

Apache Hudi 的架构包括以下几个主要组件:

· Hudi 数据存储:Hudi 数据存储是 Hudi 的核心组件,负责存储数据,数据存储有两种类型:Copy-On-Write(COW)和 Merge-On-Read(MOR);

· Copy-On-Write:COW 存储类型会在对数据进行更新时,创建一个新的数据文件副本,将更新的数据写入副本中,之后,新的数据文件副本会替换原始数据文件;

· Merge-On-Read:MOR 存储类型会在查询时,将更新的数据与原始数据进行合并,这种方式可以减少数据存储的写入延迟,但会增加查询的计算量;

· Hudi 索引:Hudi 索引用于维护数据记录的位置信息,索引有两种类型:内置索引(如 Bloom 过滤器)和外部索引(如 HBase 索引);

· Hudi 查询引擎:Hudi 查询引擎负责处理查询请求,Hudi 支持多种查询引擎,如 Spark SQL、Hive、Presto 等。

file

Hudi 的使用场景

Apache Hudi 可以帮助企业和组织实现实时数据处理和分析。实时数据处理需要快速地处理和查询数据,同时还需要保证数据的一致性和可靠性。

Apache Hudi 的增量数据处理、ACID 事务性保证、写时合并等技术特性可以帮助企业更好地实现实时数据处理和分析,基于 Hudi 的特性可以在一定程度上在实时数仓的构建过程中承担上下游数据链路的对接(类似 Kafka 的角色)。既能实现增量的数据处理,也能为批流一体的处理提供存储基础。

Hudi 的优势和劣势

● 优势

· 高效处理大规模数据集;

· 支持实时数据更新和查询;

· 实现了增量写入机制,提高了数据访问效率;

· Hudi 可以与流处理管道集成;

· Hudi 提供了时间旅行功能,允许回溯数据的历史版本。

● 劣势

· 在读写数据时需要付出额外的代价;

· 操作比较复杂,需要使用专业的编程语言和工具。

Hudi 在袋鼠云数据湖平台上的实践

Hudi 在袋鼠云数据湖的技术架构

Hudi 在袋鼠云的数据湖平台上主要对数据湖管理提供助力:

· 元数据的接入,让用户可以快速的对表进行管理;

· 数据快速接入,包括对符合条件的原有表数据进行转换,快速搭建数据湖能力;

· 湖表的管理,监控小文件定期进行合并,提升表的查询性能,内在丰富的表操作功能,包括 time travel ,孤儿文件清理,过期快照清理等;

· 索引构建,提供多种索引包括 bloom filter,zorder 等,提升计算引擎的查询性能。

file

Hudi 使用示例

在介绍了 Hudi 的基本信息和袋鼠云数据湖平台的结构之后,我们来看一个使用示例,替换 Flink 在内存中的 join 过程。

在 Flink 中对多流 join 往往是比较头疼的场景,需要考虑 state ttl 时间设置,设置太小数据经常关联不上,设置太大内存又需要很高才能保留,我们通过 Hudi 的方式来换个思路实现。

● 构建 catalog

public String createCatalog(){
    String createCatalog = "CREATE CATALOG hudi_catalog WITH (\n" +
            "    'type' = 'hudi',\n" +
            "    'mode' = 'hms',\n" +
            "    'default-database' = 'default',\n" +
            "    'hive.conf.dir' = '/hive_conf_dir',\n" +
            "    'table.external' = 'true'\n" +
            ")";

    return createCatalog;
}

● 创建 hudi 表

public String createHudiTable(){

    String createTable = "CREATE TABLE if not exists hudi_catalog.flink_db.test_hudi_flink_join_2 (\n" +
            "  id int ,\n" +
            "  name VARCHAR(10),\n" +
            "  age int ,\n" +
            "  address VARCHAR(10),\n" +
            "  dt VARCHAR(10),\n" +
            "  primary key(id) not enforced\n" +
            ")\n" +
            "PARTITIONED BY (dt)\n" +
            "WITH (\n" +
            "  'connector' = 'hudi',\n" +
            "  'table.type' = 'MERGE_ON_READ',\n" +
            "  'changelog.enabled' = 'true',\n" +
            "  'index.type' = 'BUCKET',\n" +
            "  'hoodie.bucket.index.num.buckets' = '2',\n" +
            String.format("  '%s' = '%s',\n", FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE) +
            "  'write.payload.class' = '" + PartialUpdateAvroPayload.class.getName() + "'\n" +

    ");";

    return createTable;
}

● 更新 hudi 表的 flink_db.test_hudi_flink_join_2 的 id, name, age, dt 列

01 从 kafka 中读取 topic1

public String createKafkaTable1(){
    String kafkaSource1 = "CREATE TABLE source1\n" +
            "(\n" +
            "    id        INT,\n" +
            "    name      STRING,\n" +
            "    age        INT,\n" +
            "    dt        String,\n" +
            "    PROCTIME AS PROCTIME()\n" +
            ") WITH (\n" +
            "      'connector' = 'kafka'\n" +
            "      ,'topic' = 'join_topic1'\n" +
            "      ,'properties.bootstrap.servers' = 'localhost:9092'\n" +
            "      ,'scan.startup.mode' = 'earliest-offset'\n" +
            "      ,'format' = 'json'\n" +
            "      ,'json.timestamp-format.standard' = 'SQL'\n" +
            "      )";

    return kafkaSource1;
}

02 从 kafka 中读取 topic2

public String createKafkaTable2(){
    String kafkaSource2 = "CREATE TABLE source2\n" +
            "(\n" +
            "    id        INT,\n" +
            "    name      STRING,\n" +
            "    address        string,\n" +
            "    dt        String,\n" +
            "    PROCTIME AS PROCTIME()\n" +
            ") WITH (\n" +
            "      'connector' = 'kafka'\n" +
            "      ,'topic' = 'join_topic2'\n" +
            "      ,'properties.bootstrap.servers' = 'localhost:9092'\n" +
            "      ,'scan.startup.mode' = 'earliest-offset'\n" +
            "      ,'format' = 'json'\n" +
            "      ,'json.timestamp-format.standard' = 'SQL'\n" +
            "      )";

    return kafkaSource2;
}

● 执行插入逻辑1

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,age,dt) " +
        "select id, name,age,dt from source1";

● 通过 spark 查询数据

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 NULL 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 NULL 1

● 执行插入逻辑2

String insertSQL = "insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,address,dt) " +
        "select id, name, address,dt from source2";

● 运行成功

运行成功后在 spark 中查询对应的表数据:

20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 xc:address45 1

20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 xc:address30 1

可以发现在第二次数据运行之后,表数据的对应字段 address 已经更新,达到了类似在 Flink 中直接执行 join 的效果。

insert into hudi_catalog.flink_db.test_hudi_flink_join_2 select a.id, a.name, a.age,b.address a.dt from source1 a left join source2 b on a.id = b.id

《数栈产品白皮书》:https://www.dtstack.com/resources/1004?src=szsm

《数据治理行业实践白皮书》下载地址:https://www.dtstack.com/resources/1001?src=szsm

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szcsdn

同时,欢迎对大数据开源项目有兴趣的同学加入我们,一起交流最新开源技术信息,号码:30537511,项目地址:https://github.com/DTStack

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/562078.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

安科瑞对于热继电器对电动机保护的探讨

安科瑞 徐浩竣 江苏安科瑞电器制造有限公司 zx acrelxhj 摘要:电动机烧毁是每一个生产企业都无法回避的现象,怎样加强电动机保护,使生产工艺系统的稳定,减少企业非正常停机时间。电动机保护成为电气技术人员一个重要课题。因此&#xff0c…

嵌入式 QT QListWidget 显示列表视图的小部件类

目录 1. 添加对象 2. 设置间距 3. 获取内容 4. 删除对象 5.更改对象内容 在Qt框架中,QListWidget是一个用于显示列表视图的小部件类。它提供了一种方便的方式来显示和管理项目列表。QListWidget可以显示文本、图像和其他自定义的项目项,并允许用户进…

Parrot OS 5.3已经发布并可普遍下载

导读Parrot Security近日宣布,Parrot OS 5.3已经发布并可普遍下载,这是这个基于Debian的、面向红客和渗透测试者的、以安全为重点的发行版的最新稳定版本。 Parrot OS 5.3是Parrot OS 5 “Electro Ara “系列的第三部,在Parrot OS 5.2之后两个…

私有化部署即时通讯为什么更安全

即时通讯作为企业沟通工具,在企业的内部沟通和外部交流中发挥着越来越重要的作用。同时,企业即时通讯在提升企业内部效率的同时,也面临着巨大的安全威胁。 根据数据显示,全球有超过4亿人在使用 IM。而其中因用户隐私泄露导致的数据…

十一、数据仓库详细介绍(应用)

这是数据仓库详细介绍的最后一篇,后续还会在补充一些,把遗漏的或者没讲清楚的追加进来。 1. 前言 数据仓库是一种数据管理的方法论,理论概念很早就提出来了,而且各个行业都有广泛深入的应用。因此到目前为止该方法论的理论和实践体…

Taro小程序富文本解析4种方法

1. Taro组件rich-text 优点:使用极其方便,引用一下就行了。缺点:不支持视频,放弃!2. wxParse https://github.com/icindy/wxParse 优点:支持样式,视频缺点:进入页面图片会有由大变正常,太影响了吧。3. taro-parse https://taro-ext.jd.com/plugin/view/5e61f2acb33351…

【Netty】Netty 概述(一)

文章目录 前言一、Java原生API之痛二、Netty的优势2.1 非阻塞 I/O2.2 丰富的协议2.3 异步和事件驱动2.4 精心设计的API2.5 丰富的缓冲实现2.6 高效的网络传输 三、Netty 核心概念3.1 核心组件3.1.1 事件模型3.1.2 字节缓冲区3.1.3 通信API 3.2 传输服务3.2.1 NIO3.2.2 epoll3.2…

让数据背后的那些话创造价值 | 数据增长

从行业背景而言,流量红利逐渐消失,野蛮生长的互联网时代接近尾声。传统的烧钱模式、靠体力投放的形式日渐乏力。但是,企业总是要追求增长的。所以在行业大背景下,依靠技术和数据的力量寻求更科学、更高效的方法达成营销目标&#…

Windows系统数据结构——最小生成树、Prim算法和Kruskal算法

我是荔园微风,作为一名在IT界整整25年的老兵,今天总结一下Windows系统数据结构——最小生成树、Prim算法和Kruskal算法。 我在各在论坛看了很多相关帖子,发现一个简单的问题都被复杂化了。最小生成树、Prim算法和Kruskal算法真的没有大家想的…

【JavaSE】Java基础语法(五):数组详解

文章目录 🍸1.1 数组介绍🍸1.2 数组的动态初始化1.2.1 什么是动态初始化1.2.2 动态初始化格式🍸1.3 数组元素访问1.3.1 什么是索引1.3.2 访问数组元素格式1.3.3 示例代码 🍸1.4 内存分配1.4.1 内存概述1.4.2 java中的内存分配 &am…

Ubuntu crontab 遇到的sh脚本一些问题(手动执行可以,自动执行不行)

问题一: 问题描述: 在写一个脚本循环时候,出现“let:not found”,这是因为在ubuntu默认是指向bin/dash解释器的,dash是阉割版的bash,其功能远没有bash强大和丰富.并且dash不支持let和i等功能. 解决办法: 打开一个终端输入&#xf…

springboot基于Java的校园二手物品交易平台jspm9qw4i

本基于Java的校园二手物品交易平台采用Java语言和Jsp技术,框架采用SPRINGBOOT,搭配Mysql数据库,运行在Idea里。本系统针对校园二手商品的交易而开发,提供管理员、学生、学生二手三种角色的服务。总的功能包括商品的查询、商品的购…

基于html+css的图展示89

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

【六袆 - Redis】Redis内存数据库;redis数据结构;redis文档

Redis 关于redis 官方文档: https://redis.io/docs/about/ https://redis.com/redis-enterprise/data-structures/ 关于redis Redis: 是一个开源(BSD 许可)内存数据结构存储,用作数据库、缓存、消息代理和流引擎。Redis提供数据结…

CNVD - 5000w通用产品的收集方法

本文转载于:https://mp.weixin.qq.com/s?__bizMzg5OTY2NjUxMw&mid2247507214&idx1&sn0e6df46ee930cb35ab0650867cef8af5&chksmc04d5a30f73ad3261a6fa6a8cb8c4ddc4ee8fac2a58f495c05030adc2d27e3ead65264f24f75&mpshare1&scene23&srcid…

MySQL业务并发减数量,数量未减

业务背景 最近在折腾老系统,折腾了好久,发现一个数据库问题,用户点赞数量,如果用户取消点赞情况下,正常情况10次取消数据库都返回成功,但其中有2次没有取消。 数据库场景 在MySQL中看下面一个场景。 业务…

jQuery操作练习-隔行变色

<!DOCTYPE HTML> <html> <head> <meta http-equiv"Content-Type" content"text/html; charsetUTF-8"> <title>jQuery操作练习-隔行变色</title> <script type"text/javascript&q…

越来越好玩,用ChatGPT+Python 做有声小说!

菜鸟学Python-第623篇原创 现在我们几百人的会员群已经玩的越来越高级了&#xff0c;利用chatgpt花色玩法&#xff01;有玩百度问一问每天早上6点多起来抢单的&#xff0c;有玩微信机器人帮人部署接单的&#xff0c;也有玩咸鱼去给大学生指导论文的&#xff01; 利用chatgpt4玩…

USB设备连接和枚举

https://space.bilibili.com/489340606/channel/collectiondetail?sid896957 以下图片来自于沁恒微电子蔡亮工程师的讲课&#xff0c;对USB开发入门很有好处。 1. USB主设备和从设备 2. USB设备按功能分类 3. USB功能设备内部架构 可以有多个配置&#xff0c;但同一个时间只…

华为OD机试真题 Java 实现【字母组合】【2023Q1 200分】

一、题目描述 每个数字对应多个字母&#xff0c;对应关系如下&#xff1a; 0&#xff1a;a,b,c 1&#xff1a;d,e,f 2&#xff1a;g,h,i 3&#xff1a;j,k,l 4&#xff1a;m,n,o 5&#xff1a;p,q,r 6&#xff1a;s,t 7&#xff1a;u,v 8&#xff1a;w,x 9&#xff1a;y, z …