Hudi(四)集成Flink(2)

news2024/12/25 9:29:38

6、读取方式

6.1、流读(Streaming Query)

        当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

1、WITH参数

名称

Required

默认值

说明

read.streaming.enabled

false

false

设置 true 开启流读模式

read.start-commit

false

最新 commit

指定 'yyyyMMddHHmmss' 格式的起始 commit(闭区间)

read.streaming.skip_compaction

false

false

流读时是否跳过 compaction 的 commits,跳过 compaction 有两个用途:

1)避免 upsert 语义下重复消费 (compaction 的 instant 为重复数据,如果不跳过,有小概率会重复消费)

2) changelog 模式下保证语义正确性

0.11开始,以上两个问题已经通过保留 compaction instant time 修复

clean.retain_commits

false

10

cleaner最多保留的历史commits数,大于此数量的历史commits会被清理掉,changelog模式下,这个参数可以控制changelog的保留时间,例如checkpoint周期为5分钟一次,默认最少保留50分钟的时间。

        注意:当参数read.streaming.skip_compaction打开并且streaming reader消费落后于clean.retain_commits数时,流读可能会丢失数据。从0.11开始,compaction不会再变更record的 instant time,因此理论上数据不会再重复消费,但是还是会重复读取并丢弃,因此额外的开销还是无法避免,对性能有要求的话还是可以开启此参数。 

CREATE TABLE t5(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
) WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t5',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'   -- 默认60s
);


insert into t5 select * from sourceT;

select * from t5;

6.2、增量读取(Incremental Query)

        从 0.10.0 开始支持。

        如果有增量读取 batch 数据的需求,增量读取包含三种场景。

        (1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;

        (2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit

        (3)TimeTravel:Batch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明)

        WITH 参数

名称

Required

默认值

说明

read.start-commit

false

默认从最新 commit

支持 earliest 从最早消费

read.end-commit

false

默认到最新 commit

7、限流

        如果将全量数据(百亿数量级)和增量先同步到kafka,再通过flink流式消费的方式将库表数据直接导成hoodie表,因为直接消费全量部分数据:量大(吞吐高)、乱序严重(写入的partition随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。
        WITH参数

名称

Required

默认值

说明

write.rate.limit

false

0

默认关闭限速

8、写入方式

8.1、CDC数据同步

        CDC数据保存了完整的数据库变更,当前可通过两种途径将数据导入hudi:

        第一种:通过cdc-connector直接对接DB的binlog将数据导入hudi,优点是不依赖消息队列,缺点是对db server造成压力。
        第二种:对接cdc format消费kafka数据导入hudi,优点是可扩展性强,缺点是依赖kafka。
        注意:如果上游数据无法保证顺序,需要指定write.precombine.field字段

8.2、离线批量导入

        如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

1、原理

  1. 批量导入省去了avro的序列化以及数据的merge过程,后续不会再有去重操作,数据的唯一性需要自己来保证。
  2. bulk_insert需要在Batch Execuiton Mode下执行更高效,Batch模式默认会按照partition path排序输入消息再写入Hoodie,避免file handle频繁切换导致性能下降。
    1. SET execution.runtime-mode=batch;
    2. SET execution.checkpointing.interval=0;
  3. bulk_insert write task的并发通过参数write.tasks指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task的并发数就是划分的bucket数,当然每个bucket在写到文件大小上限(parquet 120MB)的时候会rollover到新的文件句柄,所以最后:写文件数量>=bulk_insert write task数。

2、WITH参数

名称

Required

默认值

说明

write.operation

true

upsert

配置 bulk_insert 开启该功能

write.tasks

false

4

bulk_insert 写 task 的并发,最后的文件数 >=write.tasks

write.bulk_insert.shuffle_by_partition

write.bulk_insert.shuffle_input

(从 0.11 开始)

false

true

是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量 但是可能有数据倾斜风险

write.bulk_insert.sort_by_partition

write.bulk_insert.sort_input

(从 0.11 开始)

false

true

是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量

write.sort.memory

128

sort 算子的可用 managed memory(单位 MB)

8.3、全量接增量

        如果已经有全量的离线Hoodie表,需要接上实时写入,并且保证数据不重复,可以开启index bootstrap功能。
        如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。

        WITH参数

名称

Required

默认值

说明

index.bootstrap.enabled

true

false

开启索引加载,会将已存表的最新数据一次性加载到 state 中

index.partition.regex

false

*

设置正则表达式进行分区筛选,默认为加载全部分区

9、写入模式

9.1、Changelog模式

        如果希望Hoodie保留消息的所有变更(I/-U/U/D),之后接上Flink引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie的MOR表通过行存原生支持保留消息的所有变更(format层面的集成),通过流读MOR表可以消费到所有的变更记录。

1、WITH参数

名称

Required

默认值

说明

changelog.enabled

false

false

默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更。

        批(快照)读仍然会合并所有的中间结果,不管format是否已存储中间状态。

        开启changelog.enabled参数后,中间的变更也只是Best Effort:异步的压缩任务会将中间变更合并成1条,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。当然,通过调整压缩的buffer时间可以预留一定的时间buffer给reader,比如调整压缩的两个参数:

        compaction.delta_commits:5 

        compaction.delta_seconds: 3600

        说明:
        Changelog 模式开启流读的话,要在 sql-client 里面设置参数:
        set sql-client.execution.result-mode=tableau; 
        或者
        set sql-client.execution.result-mode=changelog;
        否则中间结果在读的时候会被直接合并。(参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#running-sql-queries)

2、流读 changelog

        仅在0.10.0支持,本feature为实验性。

        开启changelog模式后,hudi会保留一段时间的changelog供下游consumer消费,我们可以通过流读ODS层changelog接上ETL逻辑写入到DWD层,如下图的pipeline:

        流读的时候我们要注意changelog有可能会被compaction合并掉,中间记录会消除,可能会影响计算结果,需要关注sql-client的属性(result-mode)同上。

3、案例演示

-- 使用changelog
CREATE TABLE t6(
  id int PRIMARY KEY NOT ENFORCED,
  age INT
) WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t6',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4',
  'changelog.enabled' = 'true'
);


insert into t6 values(1,1);
insert into t6 values(1,2);

select * from t6;  可以获取最新的数据,一条
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; 可以获取2条


-- 不使用changelog
CREATE TABLE t6_v(
  id int PRIMARY KEY NOT ENFORCED,
  age INT
) WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t6',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'
);
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; 只会获取1条,读时合并

9.2、Append模式

        从0.10开始支持
        对于INSERT模式:

  • MOR默认会apply小文件策略:会追加写avro log文件
  • COW每次直接写新的parquet文件,没有小文件策略

        Hudi支持丰富的Clustering策略,优化INSERT模式下的小文件问题:

1、Inline Clustering

        只有Copy On Write表支持该模式

名称

Required

默认值

说明

write.insert.cluster

false

false

是否在写入时合并小文件,COW 表默认 insert 写不合并小文件,开启该参数后,每次写入会优先合并之前的小文件(不会去重),吞吐会受影响

2、Async Clustering

        从 0.12 开始支持

(1)WITH参数

名称

Required

默认值

说明

clustering.schedule.enabled

false

false

是否在写入时定时异步调度 clustering plan,默认关闭

clustering.delta_commits

false

4

调度 clsutering plan 的间隔 commits,

clustering.schedule.enabled 为 true 时生效

clustering.async.enabled

false

false

是否异步执行 clustering plan,默认关闭

clustering.tasks

false

4

Clustering task 执行并发

clustering.plan.strategy.target.file.max.bytes

false

1024 * 1024 * 1024

Clustering 单文件目标大小,默认 1GB

clustering.plan.strategy.small.file.limit

false

600

小于该大小的文件才会参与 clustering,默认600MB

clustering.plan.strategy.sort.columns

false

N/A

支持指定特殊的排序字段

clustering.plan.partition.filter.mode

false

NONE

支持

NONE:不做限制

RECENT_DAYS:按时间(天)回溯

SELECTED_PARTITIONS:指定固定的 partition

clustering.plan.strategy.daybased.lookback.partitions

false

2

RECENT_DAYS 生效,默认 2 天

(2)Clustering Plan Strategy

        支持定制化的 clustering 策略。

名称

Required

默认值

说明

clustering.plan.partition.filter.mode

false

NONE

支持

  • NONE:不做限制
  • RECENT_DAYS:按时间(天)回溯
  • SELECTED_PARTITIONS:指定固定的 partition

clustering.plan.strategy.daybased.lookback.partitions

false

2

RECENT_DAYS 生效,默认 2 天

clustering.plan.strategy.cluster.begin.partition

false

N/A

SELECTED_PARTITIONS 生效,

指定开始 partition(inclusive)

clustering.plan.strategy.cluster.end.partition

false

N/A

SELECTED_PARTITIONS 生效,

指定结束 partition(incluseve)

clustering.plan.strategy.partition.regex.pattern

false

N/A

正则表达式过滤 partitions

clustering.plan.strategy.partition.selected

false

N/A

显示指定目标 partitions,支持逗号 , 分割多个 partition

10、Bucket索引

        从0.11开始支持

        默认的flink流式写入使用state存储索引信息:primarykey到fileId的映射关系。当数据量比较大的时候,state的存储开销可能成为瓶颈,bucket索引通过固定的hash策略,将相同key的数据分配到同一个fileGroup中,避免了索引的存储和查询开销。

1、WITH参数

名称

Required

默认值

说明

index.type

false

FLINK_STATE

设置BUCKET开启Bucket索引功能

hoodie.bucket.index.hash.field

false

主键

可以设置成主键的子集

hoodie.bucket.index.num.buckets

false

4

默认每个partition的bucket数,当前设置后则不可再变更。

2、和state索引的对比:
(1)bucket index没有state的存储计算开销,性能较好。
(2)bucket index无法扩buckets,state index则可以依据文件的大小动态扩容。
(3)bucket index不支持跨partition的变更(如果输入是cdc流则没有这个限制),state index没有限制。

11、Hudi Catalog

        将表的元数据持久化。从0.12.0开始支持,通过catalog可以管理flink创建的表,避免重复建表操作,另外hms模式的catalog支持自动补全hive同步参数。

        DFS模式Catalog SQL样例:

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog 的默认路径}',
    'mode'='dfs' 
  );

         Hms模式Catalog SQL样例:

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog 的默认路径}',
    'hive.conf.dir' = '${hive-site.xml 所在的目录}',
    'mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性
  );	

1、With参数

名称

Required

默认值

说明

catalog.path

true

--

默认的 catalog 根路径,用作表路径的自动推导,默认的表路径:${catalog.path}/${db_name}/${table_name}

default-database

false

default

默认的 database 名

hive.conf.dir

false

--

hive-site.xml 所在的目录,只在 hms 模式下生效

mode

false

dfs

支持 hms模式通过 hive 管理元数据

table.external

false

false

是否创建外部表,只在 hms 模式下生效

2、使用dfs方式 

1、创建sql-client初始化sql文件

vim /opt/module/flink-1.13.2/conf/sql-client-init.sql

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '/tmp/hudi_catalog',
    'mode'='dfs' 
  );

USE CATALOG hoodie_catalog;

2、 指定sql-client启动时加载sql文件

hadoop fs -mkdir /tmp/hudi_catalog

bin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session

需要先创建库 

3、建库建表插入

create database test;
use test;

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20),
primary key (uuid) not enforced
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t2',
  'table.type' = 'MERGE_ON_READ'
);

insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a');

4、退出sql-client,重新进入,表信息还在

use test;
show tables;
select * from t2;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/595164.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于双分支残差结构的低光照图像增强研究与应用实现

1.摘要 在低光条件下拍摄的照片会因曝光不足而产生一系列的视觉问题,如亮度低、信息丢失、噪声和颜色失真等。为了解决上述问题,提出一个结合注意力的双分支残差低光照图像增强网络。首先,采用改进InceptionV2提取浅层特征;其次&a…

AndroidStudio插件 - Json转Class、实体类(JSON To Kotlin Class)

Kotlin用挺长时间了,最近网络请求时因为接收、解析实体类的问题,后台直接给到了json文件,客户端可直接将json转为对应的model,故此我们需要用到一些插件来提升工作效率 为了提升工作效率,一键转换json为吾所需实体类是…

详解Comparable和Comparator

目录 Comparable接口 Comparator接口 Comparable接口 Comparable接口在源码中的声明&#xff1a; public interface Comparable<T> {public int compareTo(T o); } 可以看到&#xff0c;只要一个compareTo方法&#xff0c;也就是说&#xff0c;实现Comparable接口的类…

网站留言板的功能

开发环境&#xff1a;dreamweaverCC html jscirpt php 前置条件&#xff1a;1、一个简单的网站已经搭建完毕&#xff0c;支持用户登录网站。 2、用户已登录网站。 实现步骤&#xff1a; 一、新建留言板网页 1、新建网页&#xff1a;whiteboard.html 留言板&#xff08;j…

基于AT89C51单片机的十字路口交通灯设计

点击链接获取Keil源码与Project Backups仿真图&#xff1a; https://download.csdn.net/download/qq_64505944/87849986?spm1001.2014.3001.5503 源码获取 主要内容&#xff1a; 本项目中采用单片机 AT89C51为中心器件来设计交通信号灯控制器&#xff0c; 系统实用性强、操…

丢失mfc100u.dll修复,总结mfc100u.dll丢失的四个解决方法

mfc100u.dll是 Microsoft Visual C 2010 可再发行组件包的一部分系统文件。它通常位于 Windows 系统文件夹中&#xff0c;用于支持各种应用程序的运行时库。如果出现缺失或损坏的情况可能会影响应用程序的正常运行。打开软件或者游戏程序的时候&#xff0c;会提示‘由于找不到m…

[Java Web]Cookie,Session,Filter,Listener,Thymeleaf模板

文章目录 CookieSessionFilterListener了解JSP页面与加载规则使用Thymeleaf模板引擎Thymeleaf语法基础为标签添加内容 Thymeleaf流程控制语法Thymeleaf模板布局提取重复内容参数传递 探讨Tomcat类加载机制 Cookie 它可以在浏览器中保存一些信息&#xff0c;并且在下次请求时&a…

layui框架学习(26:弹出层模块_tips框输入框)

弹出层模块layer中的tips框和输入框函数也是其底层核心函数open的特定应用实现&#xff0c;其中tips框是可以将弹出层与具体元素绑定&#xff0c;能出现在指定元素周围&#xff0c;而输入框则是弹出信息框获取用户的输入&#xff0c;这两类弹出框的说明如下&#xff1a;   ti…

JAVA开发(如何学习一门IT技术)

无论是初学者还是有经验的专业人士&#xff0c;在学习一门新的IT技术时&#xff0c;都需要采取一种系统性的学习方法。那么作为一名技术er&#xff0c;你是如何系统的学习it技术的呢。 一、it技术介绍 IT技术包含了几个方向&#xff0c;一个是软件工程&#xff0c;一个网络工程…

怎么把老旧图片变清晰?分享三个方法给大家!

老旧照片常常因为时间的流逝而失去清晰度&#xff0c;给人们带来了遗憾。然而&#xff0c;随着图像处理技术的进步&#xff0c;我们现在有多种方法可以提高老旧照片的清晰度。本文将介绍三种常用的方法&#xff0c;帮助您使老旧照片焕然一新。 第一种方法&#xff1a;使用图像…

leetcode--N 皇后 II(java)

N 皇后 II leetcode 52 题 - N 皇后 II (困难)题目描述解题思路代码演示动态规划专题 leetcode 52 题 - N 皇后 II (困难) 原题链接: https://leetcode.cn/problems/n-queens-ii/ 题目描述 n 皇后问题 研究的是如何将 n 个皇后放置在 n n 的棋盘上&#xff0c;并且使皇后彼此之…

C++——菱形继承和虚继承

0.关注博主有更多知识 C知识合集 目录 1.什么是菱形继承和虚继承 2.菱形继承所带来的问题 3.虚继承的解决方案 3.1虚基表 4.继承与组合 菱形继承和虚继承本身就是一个"bug"&#xff0c;甚至在C程序员当中有"谁用谁尚阿比"的说法。至于为什么要谈菱…

[bugfix]解决visual studio installer双击后进度条一闪而过之后无反应的问题

问题描述&#xff1a; 源于跑一个神经网络代码&#xff0c;跑着跑着说需要microsoft visual C 14.0版本及其以上&#xff0c;然而我苦于根本下不了microsoft visuall C包的状态啊&#xff0c;而且点它没反应这件事已经持续了1年左右&#xff0c;因为没太耽误我做事我就一直没管…

21 条法则助你“玩转”分库分表

好好的系统&#xff0c;为什么要分库分表&#xff1f; 我们结合具体业务场景&#xff0c;以t_order表为例进行架构优化。由于数据量已经达到亿级别&#xff0c;查询性能严重下降&#xff0c;因此我们采用了分库分表技术来处理这个问题。具体而言&#xff0c;我们将原本的单库分…

java生成随机数

文章目录 java生成随机数导入包生成一个随机数生成一个 [ 0 , b o u n d ) \color{red}{[0,bound)} [0,bound)的随机数生成一个 20 \color{red}{20} 20到 90 \color{red}{90} 90的随机数总结现在尝试生成 − 10 \color{red}{-10} −10到 10 \color{red}{10} 10之间的随机数 ja…

《最新出炉》Python+Playwright自动化测试-2-playwright的API及其他知识

一.简介 上一篇我已经将PythonPlaywright的环境搭建好了&#xff0c;而且也简单的演示了一下三款浏览器的启动和关闭&#xff0c;是不是很简单啊。今天主要是把一篇的中的代码进行一次详细的注释&#xff0c;然后说一下playwright的API和其他相关知识点。那么首先将上一篇中的…

MyBatis——MyBatis注解开发

MyBatis编写SQL除了使用Mapper.xml还可以使用注解完成。当可以使用Auto Mapping时使用注解非常简单&#xff0c;不需要频繁的在接口和mapper.xml两个文件之间进行切换。但是必须配置resultMap时使用注解将会变得很麻烦&#xff0c;这种情况下推荐使用mapper.xml进行配置。 MyB…

问题解决:cmd中创建文件夹被拒绝访问。

问题&#xff1a; 在cmd中准备创建一个B盘node.js文件夹下的一个node_global文件被拒绝访问出错。 Microsoft Windows [版本 10.0.19045.2965] (c) Microsoft Corporation。保留所有权利。C:\Users\SueMagic>md B:\nodejs\node_global 拒绝访问。C:\Users\SueMagic>原因…

Learning C++ No.26 【深入学习位图】

引言&#xff1a; 北京时间&#xff1a;2023/5/30/15:30&#xff0c;刚睡醒&#xff0c;两点的闹钟&#xff0c;硬是睡到了2点40&#xff0c;那种睡不醒的感觉&#xff0c;真的很难受&#xff0c;但是没办法&#xff0c;欠的课越来越多&#xff0c;压的我喘不过气了都&#xf…

华为OD机试真题B卷 Java 实现【整理扑克牌】,附详细解题思路

一、题目描述 给定一组数字&#xff0c;表示扑克牌的牌面数字&#xff0c;忽略扑克牌的花色&#xff0c;请按如下规则对这一组扑克牌进行整理&#xff1a; 步骤1 对扑克牌进行分组&#xff0c;形成组合牌&#xff0c;规则如下&#xff1a; 当牌面数字相同张数大于等于4时&a…