实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本

news2025/1/7 18:23:06

1. Hudi 表管理

1.1 Hudi 表基础管理

创建 Hudi 表
在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例):

CREATE TABLE real_time_dw.dwd_order_fact (
    order_id STRING,
    user_id STRING,
    product_id STRING,
    amount DOUBLE,
    order_date STRING,
    update_time TIMESTAMP
)
PARTITIONED BY (order_date)
STORED AS PARQUET
TBLPROPERTIES (
    'type'='MERGE_ON_READ',
    'primaryKey'='order_id',
    'preCombineField'='update_time'
);
1.2 数据操作

插入/更新数据
利用 Hudi 写入工具(如 Spark)进行批量或实时插入更新:

from pyspark.sql import SparkSession
from datetime import datetime

spark = SparkSession.builder \
    .appName("Hudi Example") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# 加载数据
data = [
    {"order_id": "1", "user_id": "101", "product_id": "201", "amount": 99.99, "order_date": "2025-01-01", "update_time": datetime.now()},
    {"order_id": "2", "user_id": "102", "product_id": "202", "amount": 199.99, "order_date": "2025-01-01", "update_time": datetime.now()}
]
df = spark.createDataFrame(data)

# 写入 Hudi
hudi_options = {
    "hoodie.table.name": "dwd_order_fact",
    "hoodie.datasource.write.recordkey.field": "order_id",
    "hoodie.datasource.write.precombine.field": "update_time",
    "hoodie.datasource.write.partitionpath.field": "order_date",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.hive.sync.enable": "true",
    "hoodie.datasource.hive.database": "real_time_dw",
    "hoodie.datasource.hive.table": "dwd_order_fact",
    "hoodie.datasource.hive.partition_fields": "order_date"
}

df.write.format("hudi").options(**hudi_options).mode("append").save("hdfs://path/to/hudi/dwd_order_fact")
1.3 Hudi 表维护

表清理

  • 配置清理策略,清理过期版本:
    hoodie.cleaner.commits.retained=10
    hoodie.cleaner.policy=KEEP_LATEST_COMMITS
    
    保留最近 10 个提交版本。

表压缩

  • 针对 MOR 表,定期运行 compaction 任务:
    spark-submit --class org.apache.hudi.utilities.HoodieCompactor \
      --master yarn \
      --table-path hdfs://path/to/hudi/dwd_order_fact \
      --table-name dwd_order_fact
    

元数据管理

  • 更新 Hive 元数据:
    MSCK REPAIR TABLE real_time_dw.dwd_order_fact;
    

2. Flink 性能调优

2.1 Checkpoint 性能优化

增量 Checkpoint
启用 RocksDB 增量检查点,减少状态存储大小:

env.getCheckpointConfig().enableIncrementalCheckpoints(true);

异步快照
减少 Checkpoint 对性能的影响:

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // 60秒超时
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 优先使用Checkpoint恢复
2.2 Watermark 优化

如果数据有延迟,可以允许一定的 out-of-order 数据处理:

WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 最大延迟5秒
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());
2.3 状态管理优化

状态后端选择

  • 优先选择 RocksDB 状态后端,支持更大的状态数据:
    env.setStateBackend(new RocksDBStateBackend("hdfs://path/to/checkpoints", true));
    

TTL(Time-to-Live)设置

  • 自动清理无用状态:
    stateDescriptor.enableTimeToLive(StateTtlConfig
        .newBuilder(Time.hours(1))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .build());
    
2.4 Task Slot 配置

根据并发优化 TaskManager:

  • 每个 TaskManager 提供更多 slots:
    taskmanager.numberOfTaskSlots: 4
    

3. 治理工具脚本

3.1 数据质量治理(Great Expectations)

脚本自动化
以下 Python 脚本可以实现自动化数据校验(如字段非空和值域校验):

from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import DataContext

context = DataContext()

batch_request = BatchRequest(
    datasource_name="my_s3_datasource",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="dwd_order_fact",
    runtime_parameters={"path": "s3://path/to/hudi/dwd_order_fact/"},
    batch_identifiers={"default_identifier_name": "2025-01-01"}
)

validator = context.get_validator(batch_request=batch_request)

# 非空校验
validator.expect_column_values_to_not_be_null("order_id")
# 值域校验
validator.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "SHIPPED", "CANCELLED"])
# 保存结果
validator.save_expectation_suite("order_fact_suite")

context.run_validation_operator(
    "action_list_operator",
    assets_to_validate=[validator]
)
3.2 数据权限管理(Apache Ranger)

策略 JSON 配置
以下为权限策略 JSON 文件的示例,适用于 Ranger API 批量添加策略:

{
  "policyName": "dwd_order_fact_policy",
  "serviceType": "hive",
  "resources": {
    "database": {
      "values": ["real_time_dw"],
      "isExcludes": false,
      "isRecursive": false
    },
    "table": {
      "values": ["dwd_order_fact"],
      "isExcludes": false,
      "isRecursive": false
    }
  },
  "policyItems": [
    {
      "accesses": [{"type": "select", "isAllowed": true}],
      "users": ["bi_user"],
      "groups": ["BI_Group"]
    },
    {
      "accesses": [{"type": "select", "isAllowed": true}, {"type": "insert", "isAllowed": true}],
      "users": ["etl_user"],
      "groups": ["ETL_Team"]
    }
  ]
}

通过 Ranger REST API 部署该策略:

curl -u admin:admin -H "Content-Type: application/json" -X POST -d @policy.json http://<RANGER_HOST>:6080/service/public/v2/api/policy
3.3 数据血缘治理(Apache Atlas)

Flink 血缘注册脚本
通过 REST API 自动将 Flink 作业的输入输出血缘关系上传到 Atlas:

curl -X POST http://<ATLAS_HOST>:21000/api/atlas/v2/entity \
-H "Content-Type: application/json" \
-d '{
  "entity": {
    "typeName": "process",
    "attributes": {
      "name": "flink_order_job",
      "inputs": [
        {"typeName": "kafka_topic", "uniqueAttributes": {"qualifiedName": "order_topic"}}
      ],
      "outputs": [
        {"typeName": "hdfs_path", "uniqueAttributes": {"qualifiedName": "hdfs://path/to/hudi/dwd_order_fact"}}
      ]
    }
  }
}'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2272118.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

cursor 使用技巧

一、创建项目前期步骤 1.先给AI设定一个对应项目经理角色&#xff0c; 2.然后跟AI沟通项目功能&#xff0c;生成功能设计文件&#xff1a;README.md README.md项目功能 3.再让AI总结写出开发项目规则文件&#xff1a; .cursorrules 是技术栈进行限定&#xff0c;比如使用什…

xinput1_3.dll丢失修复方法。方法1-方法6

总结 xinput1_3.dll的核心作用 xinput1_3.dll作为Microsoft DirectX库的关键组件&#xff0c;对于游戏控制器的支持起着至关重要的作用。它不仅提供了设备兼容性、多控制器管理和反馈机制等核心功能&#xff0c;还通过XInput API简化了开发人员对控制器状态的检索和设备特性的…

【C++】P2550 [AHOI2001] 彩票摇奖

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目描述输入格式&#xff1a;输出格式&#xff1a;输入输出样例&#xff1a; &#x1f4af;题解思路1. 问题解析 &#x1f4af;我的实现实现逻辑问题分析 &#x1f4af;老…

01:C语言的本质

C语言的本质 1、ARM架构与汇编2、局部变量初始化与空间分配2.1、局部变量的初始化2.1、局部变量数组初始化 3、全局变量/静态变量初始化化与空间分配4、堆空间 1、ARM架构与汇编 ARM简要架构如下&#xff1a;CPU&#xff0c;ARM(能读能写)&#xff0c;Flash&#xff08;能读&a…

8086汇编(16位汇编)学习笔记10.寄存器总结

8086汇编(16位汇编)学习笔记10.寄存器总结-C/C基础-断点社区-专业的老牌游戏安全技术交流社区 - BpSend.net 寄存器 8086CPU有14个寄存器 它们的名称为&#xff1a; AX、BX、CX、DX、SI、DI、SP、BP、 IP**、CS、DS、ES、**SS、PSW。 8086CPU所有的寄存器都是16位的&#…

如何在 Ubuntu 22.04 上安装 Cassandra NoSQL 数据库教程

简介 本教程将向你介绍如何在 Ubuntu 22.04 上安装 Cassandra NoSQL 数据库。 Apache Cassandra 是一个分布式的 NoSQL 数据库&#xff0c;旨在处理跨多个普通服务器的大量数据&#xff0c;并提供高可用性&#xff0c;没有单点故障。Apache Cassandra 是一个高度可扩展的分布…

uni-app:实现普通选择器,时间选择器,日期选择器,多列选择器

效果 选择前效果 1、时间选择器 2、日期选择器 3、普通选择器 4、多列选择器 选择后效果 代码 <template><!-- 时间选择器 --><view class"line"><view classitem1><view classleft>时间</view><view class"right&quo…

centos,789使用mamba快速安装R及语言包devtools

如何进入R语言运行环境请参考&#xff1a;Centos7_miniconda_devtools安装_R语言入门之R包的安装_r语言devtools包怎么安装-CSDN博客 在R里面使用安装devtools经常遇到依赖问题&#xff0c;排除过程过于费时&#xff0c;使用conda安装包等待时间长等。下面演示centos,789都是一…

STM32第十一课:STM32-基于标准库的42步进电机的简单IO控制(附电机教程,看到即赚到)

一&#xff1a;步进电机简介 步进电机又称为脉冲电机&#xff0c;简而言之&#xff0c;就是一步一步前进的电机。基于最基本的电磁铁原理,它是一种可以自由回转的电磁铁,其动作原理是依靠气隙磁导的变化来产生电磁转矩&#xff0c;步进电机的角位移量与输入的脉冲个数严格成正…

kafka使用以及基于zookeeper集群搭建集群环境

一、环境介绍 zookeeper下载地址&#xff1a;https://zookeeper.apache.org/releases.html kafka下载地址&#xff1a;https://kafka.apache.org/downloads 192.168.142.129 apache-zookeeper-3.8.4-bin.tar.gz kafka_2.13-3.6.0.tgz 192.168.142.130 apache-zookee…

JSON结构快捷转XML结构API集成指南

JSON结构快捷转XML结构API集成指南 引言 在当今的软件开发世界中&#xff0c;数据交换格式的选择对于系统的互操作性和效率至关重要。JSON&#xff08;JavaScript Object Notation&#xff09;和XML&#xff08;eXtensible Markup Language&#xff09;是两种广泛使用的数据表…

Android14 CTS-R6和GTS-12-R2不能同时测试的解决方法

背景 Android14 CTS r6和GTS 12-r1之后&#xff0c;tf-console默认会带起OLC Server&#xff0c;看起来olc server可能是想适配ATS(android-test-station)&#xff0c;一种网页版可视化、可配置的跑XTS的方式。这种网页版ATS对测试人员是比较友好的&#xff0c;网页上简单配置下…

BurpSuite工具安装

BurpSuite介绍&#xff1a; BurpSuite是由PortSwigger开发的一款集成化的Web应用安全检测工具&#xff0c;广泛应用于Web应用的漏洞扫描和攻击模拟&#xff0c;主要用于抓包该包(消息拦截与构造) 一、Burp suite安装 windows系统需要提前配置好java环境&#xff0c;前面博客…

Win11+WLS Ubuntu 鸿蒙开发环境搭建(一)

参考文章 Windows11安装linux子系统 WSL子系统迁移、备份与导入全攻略 如何扩展 WSL 2 虚拟硬盘的大小 Win10安装的WSL子系统占用磁盘空间过大如何释放 《Ubuntu — 调整文件系统大小命令resize2fs》 penHarmony南向开发笔记&#xff08;一&#xff09;开发环境搭建 一&a…

flink cdc oceanbase(binlog模式)

接上文&#xff1a;一文说清flink从编码到部署上线 环境&#xff1a;①操作系统&#xff1a;阿里龙蜥 7.9&#xff08;平替CentOS7.9&#xff09;&#xff1b;②CPU&#xff1a;x86&#xff1b;③用户&#xff1a;root。 预研初衷&#xff1a;现在很多项目有国产化的要求&#…

和为0的四元组-蛮力枚举(C语言实现)

目录 一、问题描述 二、蛮力枚举思路 1.初始化&#xff1a; 2.遍历所有可能的四元组&#xff1a; 3.检查和&#xff1a; 4.避免重复&#xff1a; 5.更新计数器&#xff1a; 三、代码实现 四、运行结果 五、 算法复杂度分析 一、问题描述 给定一个整数数组 nums&…

某xx到家app逆向

去官网下载app即可 https://www.dongjiaotn.com/#/home查壳 360的壳子 直接脱壳即可 抓包 请求地址 https://api.gateway.znjztfn.cn/server/user/index 请求参数 {"lng": "xxxx","lat": "xxxx","city_id": "1376&…

docker搭建gitlab和jenkins

搭建gitlab 搭建gitlab首先需要一个gitlab的镜像 其次最好为他设置一个单独的目录 然后编写一个docker-compose文件 version: 3.1 services:gitlab:image: gitlab_zh:latest //此处为你的镜像名称container_name: gitlab //容器名称restart: always …

嵌入式linux中socket控制与实现

一、概述 1、首先网络,一看到这个词,我们就会想到IP地址和端口号,那IP地址和端口各有什么作用呢? (1)IP地址如身份证一样,是标识的电脑的,一台电脑只有一个IP地址。 (2)端口提供了一种访问通道,服务器一般都是通过知名端口号来识别某个服务。例如,对于每个TCP/IP实…

推荐系统重排:MMR 多样性算法

和谐共存&#xff1a;相关性与多样性在MMR中共舞 推荐系统【多样性算法】系列文章&#xff08;置顶&#xff09; 1.推荐系统重排&#xff1a;MMR 多样性算法 2.推荐系统重排&#xff1a;DPP 多样性算法 引言 在信息检索和推荐系统中&#xff0c;提供既与用户查询高度相关的文…