Hudi(三)集成Flink

news2025/1/11 15:03:45

1、环境准备

        将编译好的jar包放到Flink的lib目录下。

cp hudi-flink1.13-bundle-0.12.0.jar /opt/module/flink-1.13.2/lib

2、sql-client方式

2.1、修改flink-conf.yaml配置

vim /opt/module/flink-1.13.2/conf/flink-conf.yaml

state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop1:9000/ckps
state.backend.incremental: true

2.2、yarn-session模式启动

1、启动

1、先启动hadoop集群,然后通过yarn-session启动flink:
/opt/module/flink-1.13.2/bin/yarn-session.sh -d
2、再启动sql-client
/opt/module/flink-1.13.2/bin/sql-client.sh embedded -s yarn-session

2、写入数据

表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:
SET 'sql-client.execution.result-mode' = 'table'; --默认

变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。
SET 'sql-client.execution.result-mode' = 'changelog';

Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业执行模式的不同(execution.type):
SET 'sql-client.execution.result-mode' = 'tableau';

-- 创建hudi表
CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/t1',   --hudi表的基本路径
  'table.type' = 'MERGE_ON_READ'   --默认是COW
);

-- 插入数据
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

3、IDEA编码方式

3.1、环境准备

1、手动install依赖

mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.11 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar

2、编写代码

import org.apache.flink.contrib.streaming.state.{EmbeddedRocksDBStateBackend, PredefinedOptions}
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment

import java.util.concurrent.TimeUnit



object HudiExample {
  def main(args: Array[String]): Unit = {

    // val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 设置状态后端RocksDB
    val embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true)

    //  embeddedRocksDBStateBackend.setDbStoragePath("file:///E:/rocksdb")
    embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)
    env.setStateBackend(embeddedRocksDBStateBackend)
    // checkpoint配置
    env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10), CheckpointingMode.EXACTLY_ONCE)
    val checkpointConfig = env.getCheckpointConfig
    checkpointConfig.setCheckpointStorage("hdfs://hadoop1:9000/ckps")
    checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(10))
    checkpointConfig.setTolerableCheckpointFailureNumber(5)
    checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1))
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    val sTableEnv = StreamTableEnvironment.create(env)
    sTableEnv.executeSql("CREATE TABLE sourceT (\n" +
      "  uuid varchar(20),\n" +
      "  name varchar(10),\n" +
      "  age int,\n" + "  ts timestamp(3),\n" +
      "  `partition` varchar(20)\n" +
      ") WITH (\n" +
      "  'connector' = 'datagen',\n" +
      "  'rows-per-second' = '1'\n" +
      ")")
    sTableEnv.executeSql("create table t2(\n" +
      "  uuid varchar(20),\n" +
      "  name varchar(10),\n" +
      "  age int,\n" +
      "  ts timestamp(3),\n" +
      "  `partition` varchar(20)\n" +
      ")\n" +
      "with (\n" +
      "  'connector' = 'hudi',\n" +
      "  'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/t2',\n" +
      "  'table.type' = 'MERGE_ON_READ'\n" +
      ")")
    sTableEnv.executeSql("insert into t2 select * from sourceT")
  }
}

3、提交运行

bin/flink run -t yarn-per-job -c com.my.example.HudiExample ./myjars/HudiExample-1.0-SNAPSHOT-jar-with-dependencies.jar

4、核心参数设置

        Flink可配参数:https://hudi.apache.org/docs/configurations#FLINK_SQL

4.1、去重参数

        通过如下语法设置主键:

-- 设置单个主键
create table hoodie_table (
  f0 int primary key not enforced,
  f1 varchar(20),
  ...
) with (
  'connector' = 'hudi',
  ...
)

-- 设置联合主键
create table hoodie_table (
  f0 int,
  f1 varchar(20),
  ...
  primary key(f0, f1) not enforced
) with (
  'connector' = 'hudi',
  ...
)

名称

说明

默认值

备注

hoodie.datasource.write.recordkey.field

主键字段

--

支持主键语法 PRIMARY KEY 设置,支持逗号分隔的多个字段

precombine.field

(0.13.0 之前版本为

 write.precombine.field)

去重时间字段

--

record 合并的时候会按照该字段排序,选值较大的 record 为合并结果;不指定则为处理序:选择后到的 record

4.2、并发参数

名称

说明

默认值

备注

write.tasks

writer 的并发,每个 writer 顺序写 1~N buckets

4

增加并发对小文件个数没影响

write.bucket_assign.tasks

bucket assigner 的并发

Flink的并行度

增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件( bucket)

write.index_bootstrap.tasks

Index bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 checkpoint,因此需要设置多一些的 checkpoint 失败容忍次数

Flink的并行度

只在 index.bootstrap.enabled true 时生效

read.tasks

读算子的并发(batch stream

4

compaction.tasks

online compaction 算子的并发

writer 的并发

online compaction 比较耗费资源,建议走 offline compaction

案例演示

可以flink建表时在with中指定,或Hints临时指定参数的方式:在需要调整的表名后面加上 /*+ OPTIONS() */

CREATE TABLE sourceT (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t2',
  'table.type' = 'MERGE_ON_READ'
);

insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */ 
select * from sourceT;

执行如下图所示:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/564453.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringCloud Gateway高级应用

目录 1 SpringCloud技术栈1.1 SpringCloud技术栈1.2 SpringCloud经典技术介绍1.3 SpringCloud项目场景 2 SpringCloud Gateway2.1 Gateway工作原理2.2 Gateway路由2.2.1 业务说明2.2.2 基于配置路由设置2.2.3 基于代码路由配置2.2.4 Gateway-Predicate2.2.5 断言源码剖析 2.3 G…

Settings apk进行系统签名覆盖安装

由于AndroidStudio中Settings编译出来的包是未签名的,不能将设备覆盖安装替换原先签名的包,故需要将AndroidStudio打包出来的apk进行签名 一.拷贝未签名的apk 注意签名过程需要在ubuntu中进行,所以需要将未签名的apk拷贝到ubuntu中,如下: 二.拷贝libconscrypt_openjd…

Sketch文件用什么软件打开

现在,想要在线打开 Sketch 文件只需要 2 步就能搞定了! 第一步,访问Windows 也能用的「协作版 Sketch」——即时设计官网并点击免费使用,即可进入即时设计工作台。 第二步,进入即时设计工作台后,点击【文件…

【软件分析/静态分析】学习笔记01——Introduction

🔗 课程链接:李樾老师和谭天老师的:南京大学《软件分析》课程01(Introduction)_哔哩哔哩_bilibili 目录 一、静态程序分析介绍 1.1 PL and Static Analysis 程序语言和静态分析 1.2 为什么要学 Static Analysis? …

JavaScript 基础 DOM (三)

日期对象 实例化 获得当前时间 const date new Date() 获得指定时间 const date1 new Date( 指定时间) 方法 // 1. 实例化const date new Date();// 2. 调用时间对象方法// 通过方法分别获取年、月、日,时、分、秒const year date.getFullYear(); // 四位年份 时…

JDK8以后接口的新特性

JDK8以前,接口内只能定义抽象方法; JDK8,接口内允许定义默认方法、静态方法; JDK9,接口内允许定义私有方法 default:默认方法 public interface Essay01 {/*** 在接口内定义默认方法*/public default v…

CMU - FarPlanning 代码速读

https://github.com/MichaelFYang/far_planner https://www.cmu-exploration.com/ 系统结构 Far Planner 属于 High-level planning module,进行全局规划,找到可行路径;将 way_point发布给 Local planner和 path following KeyPoint Local-la…

帮公司面了个要21K的测试,结果.....

深耕IT行业多年,我们发现,对于一个程序员而言,能去到一线互联网公司,会给我们以后的发展带来多大的影响。 很多人想说,这个我也知道,但是进大厂实在是太难了,简历投出去基本石沉大海&#xff0…

arm嵌入式系统下,手把手教你移植pppoe拨号客户端,使用pppoe拨号上网

移植pppoe拨号客户端 一、概述二、移植过程1、内核配置2、pppd工具编译3、pppoe工具编译 三、配置pppoe参数四、创建节点信息五、pppoe服务器搭建 一、概述 PPPoE(英语:Point-to-Point Protocol Over Ethernet),以太网上的点对点协…

Windows GUI自动化控制工具之python uiAutomation

对 Windows GUI进行自动化控制的工具有很多,比如pywinauto、pyautogui、pywin32、Autoit、airtest、UIAutomation等,UI Automation API是微软提供的自动化框架,可在支持 Windows Presentation Foundation (WPF) 的所有操作系统上使用&#xf…

Niagara—— Niagara Editor界面

目录 一,菜单栏 二,工具栏 三,预览面板 四,参数面板 五,系统总览面板 六,暂存区面板 七,选择面板 打开Niagara Editor: 双击Niagara发射器或系统;右击Niagara发射…

Qt--事件分发器

写在前面 在 Qt 中,事件分发器(Event Dispatcher)是一个核心概念,用于处理 GUI 应用程序中的事件。事件分发器负责将事件从一个对象传递到另一个对象,直到事件被处理或被取消。 每个继承自QObject或QObject的类都可以在本类中重写bool even…

基于 Amazon API Gatewy 的跨账号跨网络的私有 API 集成

一、背景介绍 本文主要讨论的问题是在使用 Amazon API Gateway,通过 Private Integration、Private API 来完成私有网络环境下的跨账号或跨网络的 API 集成。API 管理平台会被设计在单独的账号中(亚马逊云科技提供的是多租户的环境),因为客观上不同业务…

生于零售的亚马逊云科技,如何加速中国跨境电商企业出海?

导读:跨境电商进入精耕细作的新阶段。 作为中国企业出海的重要领域之一,近几年跨境电商行业处在快速发展中。商务部数据显示,2022年中国跨境电商出口达1.55万亿,同比增长11.7%。2023年1-2月,跨境电商进出口总额同比增长…

【wpf】视觉树上找元素的注意事项

前言 我们通过 VisualTreeHelper类 可以在视觉树上找元素,下面提供几个封装好的方法: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Media; using Sy…

分析| Flutter 3.10版本有哪些变化?

Flutter是Google推出的一款用于构建高性能、高保真度移动应用程序、Web和桌面应用程序的开源UI工具包。Flutter使用自己的渲染引擎绘制UI,为用户提供更快的性能和更好的体验。Flutter还提供了丰富的构建工具、库和插件,使开发人员能够更快地构建应用程序…

从浅入深理解序列化和反序列化

文章目录 什么是java序列化什么情况需要使用 Java 序列化为什么要序列化序列化和反序列化过程如下RPC 框架为什么需要序列化序列化用途序列化机制可以让对象地保存到硬盘上,减轻内存压力的同时,也起了持久化的作用序列化机制让Java对象可以在网络传输 实…

LINUX 提权 脏牛CVE-2016-5195

这里写复现过程,不写原理 Linux内核 > 2.6.22(2007年发行,到2016年10月18日才修复) 靶场环境是vluhub上的。网卡自己配置好 nmap扫一下 80端口开的,上去 52.136 再扫 1898开放 访问开干 是个cms msf上线找这…

【VictoriaMetrics】VictoriaMetrics单机版批量和单条数据写入(opentsdb格式)

VictoriaMetrics单机版支持以opentsdb格式的数据写入包含linux形式和postman形式,写入支持单条数据写入以及多条数据写入,下面操作演示下如何使用 1、首先需要启动VictoriaMetrics单机版服务 注意,如果支持opentsdb协议需要在启动单机版VictoriaMetrics的时候加上opentsdbH…

一、尚医通微信登录

文章目录 一、登录需求1、登录需求 二、微信登录1、OAuth21.1OAuth2解决什么问题1.1.1 开放系统间授权1.1.2图例1.1.3方式一:用户名密码复制1.1.4方式二:通用开发者key1.1.5方式三:颁发令牌 1.2 OAuth2最简向导1.2.1 OAuth主要角色1.2.2最简向…