如何通过 AWS Managed Apache Flink 实现 Iceberg 的实时同步

news2025/1/6 18:16:09

AWS Managed Apache Flink (以下以 MAF 代指)是 AWS 提供的一款 Serverless 的 Flink 服务。

1. 问题

大家在使用 MAF 的时候,可能遇到最大的一个问题就是 MAF 的依赖管理,很多时候在 Flink 上运行的代码,托管到 MAF 上之后发现有很多依赖问题需要解决,大体上感觉就是 MAF 一定需要一个纯洁的环境,纯洁的 Flink 代码包。
而我们在使用 MAF 向 Iceberg 表写入数据时候更是如此。在使用 MAF 向 Iceberg 写入数据时,使用 Glue Data Catalog,会遇到如下报错:

Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:211)
at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:139)
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:406)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1356)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1111)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:701)

分析上面的错误,发现是在执行 Craete catalog 的时候,调用了 clusterHadoopConf 方法。我们在继续分析源码,在Iceberg 的源码 FlinkCatalogFactory 中,找到报错的代码位置,如下:

public static Configuration clusterHadoopConf() {
  return HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration());
}

而 HadoopUtils 这个类是来自于 org.apache.flink.runtime.util.HadoopUtils,我怀疑可能是 MAF 的环境是依赖于 EKS,因此镜像中并没有包含和 hadoop 相关的依赖,导致这里方法加载默认配置的时候,找不到 org/apache/hadoop/conf/Configuration 类,但是当我尝试在 maven 中加入 hadoop-client 依赖后,仍然存在这个问题。

2. 解决方案

通过上面的分析,我们知道了问题是出在了 org.apache.flink.runtime.util.HadoopUtils这个类,查找了很多资料,终于在 github 的 issue 中发现也有人遇到过这样的问题【#3044】,并且给出了一个绕行的方法,就是在自己的代码工程中重写 org.apache.flink.runtime.util.HadoopUtils这个类,不得不承认这是一个高明的方法。

重写HadoopUtils
在我们的代码工程中创建一个 package,并且添加一个名为 HadoopUtils 的 class,填入如下代码:

package org.apache.flink.runtime.util;

import org.apache.hadoop.conf.Configuration;

public class HadoopUtils {

    public static Configuration getHadoopConfiguration(
            org.apache.flink.configuration.Configuration flinkConfiguration) {
        return new Configuration(false);
    }
}

然后重新打包代码。
也可以参考 github 上的代码,链接🔗 github code
然后我们就可以编译打包代码。

3. Demo

下面我们通过一个完整的 Demo 来了解如何在 MAF 上实现 Iceberg 表的实时摄入。Demo 中会使用一个数据生成工具 Datafaker ,生成数据并且写入 MSK(kafka)中。

3.1 编译代码

获取 Demo代码,直接编译打包。

3.2 创建 MAF Application

  1. 将打包的 jar 上传至S3
  2. 进入 MAF 控制台,创建 Application,版本选择 Flink 1.18。
  3. 在 Application code location 部份填写在第1步上传的 jar 位置。
  4. MAF 会自动创建一个 IAM Role,在完成 Application 创建之后,请记得给这个 IAM Role 添加 Glue 读和写 Data Catalog 的权限,因为 Demo 代码工程会使用 Glue data catalog 作为 Iceberg catalog。
  5. 创建完 Application 就可以直接点击 Run 运行了。

3.3 生成数据

export MYBROKERS=<kafka-server>
export KAFKA_HOME=/home/ec2-user/environment/kafka_2.12-2.8.1
export TOPIC=datafaker_user_order_list_01
export IMPORT_ROWS=100000
#写入一条记录的间隔时间,也可以不设置
export INTERVAL=0.01
datafaker kafka $MYBROKERS $TOPIC $IMPORT_ROWS --meta dataformat_01.txt --interval $INTERVAL

这里就不详细介绍 datafaker 的使用了,如果想了解 datafaker 的参数配置可以从这个 github datafaker 获取。

3.4 在 Athena 中查询数据写入的结果

注意,如果 Athena 开启了 Reuse query results,可能会导致 count(*) 查询的不是最新的结果。
在这里插入图片描述

  1. 运维监控
    4.1 Metrics
    由于写入 Iceberg 表,不会在 Flink UI 看到 Records Recevied 以及 Records Send 等指标,因此如果想查看 Iceberg Sink 写入的数据量,需要进入Flink UI Sink 算子中,查看 Metrics 的 committedDataFilesRecordCount 指标。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1672784.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[Algorithm][回溯][找出所有子集的异或总和再求和][全排列 II][电话号码的字母组合][括号生成]详细讲解

目录 1.找出所有子集的异或总和再求和1.题目链接2.算法原理详解3.代码实现 2.全排列 II1.题目链接2.算法原理详解3.代码实现 3.电话号码的字母组合1.题目链接2.算法原理详解3.代码实现 4.括号生成1.题目链接2.算法原理详解3.代码实现 1.找出所有子集的异或总和再求和 1.题目链…

PCIE协议-2-事务层规范-TLP Prefix Rules

2.2.10 TLP前缀规则 以下规则适用于任何包含TLP前缀的TLP&#xff1a; 对于任何TLP&#xff0c;TLP中byte0的Fmt[2:0]字段中的值100b表示存在TLP前缀&#xff0c;并且Type[4]位指示TLP前缀的类型。 Type[4]位中的值0b表示存在本地TLP前缀。Type[4]位中的值1b表示存在端到端TL…

数据结构与算法-排序算法1-冒泡排序

本文先介绍排序算法&#xff0c;然后具体写冒泡排序。 目录 1.排序算法简介 2.常见的排序算法分类如下图&#xff1a; 3.冒泡排序&#xff1a; 1.介绍&#xff1a; 2.动态图解 3.举例 4.小结冒泡排序规则 5.冒泡排序代码 6.优化 7.优化后时间 代码&#xff1a; 运…

Java | Leetcode Java题解之第88题合并两个有序数组

题目&#xff1a; 题解&#xff1a; class Solution {public void merge(int[] nums1, int m, int[] nums2, int n) {int p1 m - 1, p2 n - 1;int tail m n - 1;int cur;while (p1 > 0 || p2 > 0) {if (p1 -1) {cur nums2[p2--];} else if (p2 -1) {cur nums1[p…

Vue的学习 —— <vue指令>

目录 前言 正文 内容渲染指令 内容渲染指令的使用方法 v-text v-html 属性绑定指令 双向数据绑定指令 事件绑定指令 条件渲染指令 循环列表渲染指令 侦听器 前言 在完成Vue开发环境的搭建后&#xff0c;若想将Vue应用于实际项目&#xff0c;首要任务是学习Vue的基…

黑马基于Web-socket的java聊天室基本解析

要是用Web-socket协议&#xff0c;我们要前端upgrade升级成web-socket协议 首先我们要引入springboot的websocket起步依赖&#xff0c;这样子方便使用&#xff0c;自己指定版本注意 <dependency><groupId>org.springframework.boot</groupId><artifactId&…

绘唐3启动器怎么启动一键追爆款3正式版

绘唐3启动器怎么启动一键追爆款3正式版 工具入口 一.文案助手&#xff1a; 【注意&#xff01;&#xff01;】如果图片无显示&#xff0c;一般情况下被杀毒拦截&#xff0c;需关闭杀毒软件或者信任文件路径。 win10设置排除文件&#xff1a; 1.【新建工程】使用前先新建工程…

std::ref和std::cref的使用和原理分析

目录 1.用法 2.std::reference_wrapper介绍 3.std::ref原理分析 4.std::cref原理分析 5.总结 1.用法 它的定义如下&#xff1a; std::ref&#xff1a;用于包装按引用传递的值。 std::cref&#xff1a;用户包装按const引用传递的值。 C本身就有引用&#xff08;&&#…

使用 Python 中的 TensorFlow 检测垃圾短信

前言 系列专栏&#xff1a;机器学习&#xff1a;高级应用与实践【项目实战100】【2024】✨︎ 在本专栏中不仅包含一些适合初学者的最新机器学习项目&#xff0c;每个项目都处理一组不同的问题&#xff0c;包括监督和无监督学习、分类、回归和聚类&#xff0c;而且涉及创建深度学…

【鸿蒙开发】第二十四章 IPC与RPC进程间通讯服务

1 IPC与RPC通信概述 IPC&#xff08;Inter-Process Communication&#xff09;与RPC&#xff08;Remote Procedure Call&#xff09;用于实现跨进程通信&#xff0c;不同的是前者使用Binder驱动&#xff0c;用于设备内的跨进程通信&#xff0c;后者使用软总线驱动&#xff0c;…

一个基于servlet的MVC项目-登录验证

一、MVC的概念 MVC是Model、View、Controller的缩写&#xff0c;分别代表 Web 应用程序中的3种职责1 模型:用于存储数据以及处理用户请求的业务逻辑。 2视图:向控制器提交数据&#xff0c;显示模型中的数据。 3控制器:根据视图提出的请求&#xff0c;判断将请求和数据交给哪个…

Linux下网络命令

目录 需求1-查看本机是否存在22端口解法1解法2解法3 需求2-查看其他主机是否存在22端口解法1解法2解法3 需求3-查看TCP连接解法1/2 需求4-统计80端口tcp连接次数解法 需求5-查看总体网络速度解法 需求6-查看进程流量解法 需求7-dns解法 需求8-traceroute到baidu解法 需求9-查看…

git仓库使用

git仓库是会限制空间大小限制的 git网络库的容量限制_github仓库大小限制-CSDN博客 git是用于管理github的工具 电脑左下角搜索git打开GitBash.exe 进入到要下载到本地的目录 下载到本地的文件不要更改&#xff01; 如果要使用请务必把文件复制到别的空间去再在这个别的空间…

centos7中查询Nacos的安装路径和配置信息如何查找?

在 CentOS 7 上查询 Nacos 的安装路径和配置信息通常涉及几个步骤。这些步骤主要依赖于你是如何安装 Nacos 的&#xff08;比如使用压缩包还是 Docker 等方式&#xff09;。下面是一些通用的方法来帮助你找到 Nacos 的安装路径和配置信息&#xff1a; 1. 查找 Nacos 的安装路径…

智能自助终端主板RK3288/RK3568在酒店前台自助机方案的应用,支持鸿蒙,支持免费定制

酒店前台自助机解决方案是一款基于自助服务终端&#xff0c;能够让客人通过简单的操作完成入住登记/退房的解决方案&#xff0c;大幅提高酒店的工作效率&#xff0c;提升客人体验&#xff0c;降低人力成本。 该方案解决了以下传统前台登记入住方式的痛点&#xff1a; 1、人流量…

elasticsearch 动态映射

文章目录 动态映射动态映射的弊端静态映射实战&#xff1a;映射创建后还可以更新吗 动态映射 动态映射的核心是在自动检测字段类型后添加新字段 哪些字段类型支持动态检测呢&#xff1f; 答&#xff1a;boolean类型、float类型、long类型、Object类型、Array类型、date类型、…

Hive-表设计优化

Hive-表设计优化 1.Hive查询基本原理 Hive的设计思想是通过元数据解析描述将HDFS上的文件映射成表。 基本的查询原理是当用户通过HQL语句对Hive中的表进行复杂数据处理和计算时&#xff0c;默认将其转换为分布式计算MapReduce程序对HDFS中的数据进行读取处理的过程。 当执行…

黑马甄选离线数仓项目day02(数据采集)

datax介绍 官网&#xff1a; https://github.com/alibaba/DataX/blob/master/introduction.md DataX 是阿里云 DataWorks数据集成 的开源版本&#xff0c;在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。 DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre…

48-Qt控件详解:Buttons Containers2

一 Group Box:组合框 #include "widget.h"#include<QGroupBox> #include<QRadioButton> #include<QPushButton> #include<QVBoxLayout>//可以在水平方向和垂直方向进行排列的控件&#xff0c;QHBoxLayout/QVBoxLayout #include <QGridLa…

Adobe Media Encoder ME v24.3.0 解锁版 (视频和音频编码渲染工具)

Adobe系列软件安装目录 一、Adobe Photoshop PS 25.6.0 解锁版 (最流行的图像设计软件) 二、Adobe Media Encoder ME v24.3.0 解锁版 (视频和音频编码渲染工具) 三、Adobe Premiere Pro v24.3.0 解锁版 (领先的视频编辑软件) 四、Adobe After Effects AE v24.3.0 解锁版 (视…