Hudi系列10:Flink流式插入

news2025/1/10 2:40:31

文章目录

  • 流式插入概述
  • 一. Hudi流式插入案例1(datagen)
    • 1.1 准备工作
    • 1.2 源端准备
    • 1.3 目标端表准备
    • 1.4 ETL准备
    • 1.5 数据验证
    • 1.6 通过SPARK SQL查看数据
  • 二. Hudi流式插入案例2(Kafka)
    • 2.1 准备工作
    • 2.2 源端准备
      • 2.2.1 创建kafka的topic (hudi_flink)
      • 2.2.2 Flink SQL Client消费kafka数据
    • 2.3 目标端表准备
    • 2.4 ETL准备
    • 2.5 验证数据
  • 参考:

流式插入概述

kafka (映射为一个flink table source_table)-> flink (insert into target_table select * from source_table) -> hudi (映射为一个 flink table target_table)

一. Hudi流式插入案例1(datagen)

1.1 准备工作

# 启动yarn session(非root账户)
/home/flink-1.14.5/bin/yarn-session.sh -d  2>&1 &

# 在yarn session模式下启动Flink SQL
/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session

1.2 源端准备

这里我们使用了数据生成器,datagen,下面有参数控制数据生成的频率。

-- sets up the result mode to tableau to show the results directly in the CLI
set execution.result-mode=tableau;

-- 设置checkpoint,不然会一直卡住
set execution.checkpointing.interval=10sec;

create table my_sourceT_12 (
   uuid varchar(200),
   name varchar(100),
   age  int,
   ts   timestamp(3)
) with (
   'connector' = 'datagen',
   'rows-per-second' =  '1'
)
;

image.png

1.3 目标端表准备

CREATE TABLE my_targetT_12(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3)
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hp5:8020/user/hudi_data/my_targetT_12',
  'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

1.4 ETL准备

insert into my_targetT_12 (uuid, name, age, ts)
select uuid, name, age, ts
  from my_sourceT_12 ;

select * from my_targetT_12;

image.png

1.5 数据验证

在页面运行这个,依旧会有一个报错

select * from my_targetT_12;

报错:
这个报错是我测试环境的CPU资源不够导致的

org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Skipping monitoring container  since CPU usage is not yet available.

HDFS查看数据:
image.png

1.6 通过SPARK SQL查看数据

连接Spark SQL:

# Spark 3.3
spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

创建Hudi表:
建表的语法存在差异,需要进行调整,有的字段类型都不对应

CREATE TABLE my_targetT_12(
  uuid VARCHAR(20) ,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP
)
using hudi
location 'hdfs://hp5:8020/user/hudi_data/my_targetT_12';

查询Hudi表数据:

select * from my_targetT_12 limit 10;

测试记录:
image.png

image.png

二. Hudi流式插入案例2(Kafka)

2.1 准备工作

# 启动yarn session(非root账户)
/home/flink-1.15.2/bin/yarn-session.sh -d  2>&1 &

# 在yarn session模式下启动Flink SQL
/home/flink-1.15.2/bin/sql-client.sh embedded -s yarn-session

2.2 源端准备

这里我们使用Kafka作为源端

2.2.1 创建kafka的topic (hudi_flink)

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/
./kafka-topics.sh --zookeeper hp2:2181 --create --replication-factor 3 --partitions 3 --topic hudi_flink4

2.2.2 Flink SQL Client消费kafka数据

  1. 将Flink连接Kafka的jar包放到Flink的lib目录
    https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/kafka/

  2. 创建kafka表

-- sets up the result mode to tableau to show the results directly in the CLI
set execution.result-mode=tableau;

-- 设置checkpoint,不然会一直卡住
set execution.checkpointing.interval=10sec;

CREATE TABLE hudi_flink_kafka_source4 (
  orderId STRING,
  userId STRING,
  orderTime STRING,
  ip STRING,
  orderMoney DOUBLE,
  orderStatus INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'hudi_flink4',
  'properties.bootstrap.servers' = 'hp2:9092',
  'properties.group.id' = 'zqs-1004',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true'
);
  1. 往kafka的topic插入数据
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/kafka-console-producer.sh --broker-list hp2:9092 --topic hudi_flink4

{"orderId": "20211122103434136000001","userId": "300000971","orderTime": "2021-11-22 10:34:34.136","ip": "123.232.118.98","orderMoney": 485.48,"orderStatus": 0}
{"orderId": "20211122103434136000002","userId": "300000972","orderTime": "2021-11-22 10:34:34.136","ip": "123.232.118.98","orderMoney": 485.48,"orderStatus": 0}
{"orderId": "20211122103434136000003","userId": "300000973","orderTime": "2021-11-22 10:34:34.136","ip": "123.232.118.98","orderMoney": 485.48,"orderStatus": 0}
  1. 在flink sql客户端查看数据消费
select * from hudi_flink_kafka_source4 ;

image.png

2.3 目标端表准备

CREATE TABLE hudi_flink_kafka_sink4 (
  orderId STRING PRIMARY KEY NOT ENFORCED,
  userId STRING,
  orderTime STRING,
  ip STRING,
  orderMoney DOUBLE,
  orderStatus INT,
  ts STRING,
  partition_day STRING
)
PARTITIONED BY (partition_day) 
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hp5:8020/user/hudi_data/hudi_flink_kafka_sink4',
  'table.type' = 'MERGE_ON_READ',
  'write.operation' = 'upsert',
  'hoodie.datasource.write.recordkey.field'= 'orderId',
  'write.precombine.field' = 'ts',
  'write.tasks'= '1',
  'compaction.tasks' = '1', 
  'compaction.async.enabled' = 'true', 
  'compaction.trigger.strategy' = 'num_commits', 
  'compaction.delta_commits' = '1'
);

2.4 ETL准备

INSERT INTO hudi_flink_kafka_sink4 
SELECT
  orderId, userId, orderTime, ip, orderMoney, orderStatus,
  substring(orderId, 0, 17) AS ts, substring(orderTime, 0, 10) AS partition_day 
FROM hudi_flink_kafka_source4 ;

2.5 验证数据

HDFS:
image.png

参考:

  1. https://blog.csdn.net/NC_NE/article/details/125705845

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/189645.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

卷积神经网络中的权值共享和局部连接

卷积神经网络中的权值共享和局部连接卷积神经网络的两大特点权值共享全连接卷积神经网络的两大特点 权值共享,就是输入一张图,用一个filter去扫这张图,filter里面的数就叫权重,这张图每个位置都是被同样的filter扫的,…

Flink官方例子解析:WordCount

1. 简介 今天介绍的是官方子项目flink-examples-streaming里面的WordCount例子。 WordCount ,中文:单词统计,是大数据计算常用的例子。 2. WordCount需要实现的功能 监听指定目录下的文件,读取文件的文本内容;如果未…

Python继承机制及其使用

Python 类的封装、继承、多态 3 大特性,前面章节已经详细介绍了 Python 类的封装,本节继续讲解 Python 类的继承机制。继承机制经常用于创建和现有类功能类似的新类,又或是新类只需要在现有类基础上添加一些成员(属性和方法&#…

RASP技术进阶系列(三):重大漏洞自动化热修复

在上篇文章《RASP技术进阶系列(二):东西向Web流量智能检测防御》中提到,在企业日常安全运营以及HW场景下,应用漏洞攻击应急响应和恶意流量溯源分析是安全团队的重点工作。在恶意流量溯源方面,指向攻击来源的…

趁着你对象吃泡面的功夫,我修复了误删除的文件

文章目录前言一. linux下文件删除原理1.1 文件删除原理的简单介绍1.2 测试inode号是否容易被覆盖?二. 实验测试过程2.1 实验环境:2.2 新增一块硬盘测试2.3 对磁盘分区2.3.1 分区(使用fdisk分区)2.3.2 格式化,创建目录挂…

网络化多智能体系统的共识与合作

在所有参与者之间提供快速协议和团队合作的算法通过自组织网络系统实现有效的任务执行。By Reza Olfati-Saber, Member IEEE, J. Alex Fax, and Richard M. Murray, Fellow IEEE小于 翻译摘要:本文提供了一个理论框架,用于分析多智能体网络系统的共识算法…

Linux文件与目录的查看:ls

前言 ls作为我们在Linux系统中最常用的命令,因为我们常常需要去知道文件或是目录的相关信息,但我们Linux的文件所记录的信息实在是太多了,ls也没有需要全部都列出来,所以,当我们执行ls命令时,默认显示的只…

【数据结构】基础:二叉搜索树

【数据结构】基础:二叉搜索树 摘要:本文为二叉树的进阶,主要介绍其概念与基本实现(递归与非递归),再介绍其应用,主要介绍内容为KV模型。最后为简单的性能分析。 文章目录【数据结构】基础&#…

【数据结构】1.1 数据结构的研究内容

文章目录数据结构的研究内容数据结构研究的内容小结数据结构的研究内容 早期,计算机主要用于数值计算: 首先,分析问题、提取操作对象,然后,找出操作对象之间的关系,用数学语言加以描述,建立相应数学方程。…

Java日志门面技术 SLF4J

文章目录背景SLF4J概述切换日志框架实际应用配合自身简单日志实现(slf4j-simple)配置logback日志实现配置Log4J日志实现(需适配器)配置JUL日志实现(需适配器)添加slf4j-nop依赖(日志开关)桥接旧的日志实现框架背景 随着系统开发的进行,可能会更新不同的日志框架&am…

TF数据流图图与TensorBoard

2.1 TF数据流图 学习目标 目标 说明TensorFlow的数据流图结构应用 无内容预览 2.1.1 案例:TensorFlow实现一个加法运算 1 代码2 TensorFlow结构分析2.1.2 数据流图介绍 2.1.1 案例:TensorFlow实现一个加法运算 2.1.1.1 代码 def tensorflow_demo():&…

CMMI对企业有什么价值,如何高效落地?

1、获得权威认证 CMMI是全球性软件与系统工程行业的唯一权威认证,是对企业软件研发与能力服务的认可。 CMMI企业价值 CoCode项目管理全面支持CMMI3-5级高效落地​ 2、降本增效,提高企业能力。 CMMI对软件开发过程进行规范化梳理,保证软…

虚拟机ubuntu系统内存满,无法进入桌面,扩展内存

1、 关闭虚拟机,在虚拟机设置中将原先20GB扩展到30GB 注意:有快照需要删除快照后才能扩展 2、命令行进入ubuntu 内存满了,无法进入Ubuntu图形界面 按下ctrlaltf2~f6组合键 输入用户名和密码进入命令行模式 3、删除一些东西 删除回收站…

vuex的modules和辅助函数

一、回顾:vuex状态管理器1、版本问题:vue2对应的是vuex3;vue3对应的vuex42、vuex作用:每个vuex中都有一个Store(仓库),用于管理vue项目中用到的状态变量(属性)。vuex维护的是一个单一的状态树vu…

工作常用cron总结

一、cron表达式详解 corn从左到右(用空格隔开): 秒 分 小时 日 月 周 (星期中的日期,1代表周日,7代表周六) 年 定时任务统计 数据同步 0 0 10 * * ? 每天上午10点触发…

Spring 整合Mybatis。

目录 一、环境准备 1、Mybatis 环境 2、整合思路分析 二、Spring整合Mybatis 三、Spring整合Junit 一、环境准备 1、Mybatis 环境 ▶ 步骤1 : 准备数据库表 Mybatis是来操作数据库表,所以先创建一个数据库及表 create database spring_db character set utf8; …

LeetCode刷题系列 -- 1008. 前序遍历构造二叉搜索树

给定一个整数数组,它表示BST(即 二叉搜索树 )的 先序遍历 ,构造树并返回其根。保证 对于给定的测试用例,总是有可能找到具有给定需求的二叉搜索树。二叉搜索树 是一棵二叉树,其中每个节点, Node.left 的任何后代的值 严…

JVM的理解(垃圾回收算法和类加载过程)

文章目录1、JVM的位置2、JVM的体系结构3、JVM组件3.1、类加载器(加载class文件)3.1.1、类加载器的执行步骤3.2、PC寄存器3.3、方法区3.4、栈3.5、堆4、GC算法4.1、引用计数法4.2、复制算法1、模型2、原理图4.3、标记清除4.4、标记压缩总结:1、…

2023年了学Java还能找到工作么?

Java人才需求缺口巨大 为何还有人找不到工作? 近两年,传统企业开始数字化转型,各企业对互联网IT技术人才呈现井喷趋势。对于进可攻前端、后可守后端的Java程序员而言,市场对他们青睐有加,薪资更是水涨船高。然而在…

Cesium 本地化部署和新增sandcastle案例

源码下载git clone https://gitee.com/mirrors-gis/cesium.gitcd cesium npm install // or yarn install构建 因为下载的源码,还没有构建出cesium的api,以及api对应的文档 ,如果此时直接运行 npm start ,会启动一个8080端口的一个服务,通过 http://localhost:8080 可以看…