flink datastream api实现数据实时写入hudi

news2025/1/10 2:23:24

Apache Hudi(发音为“hoodie”)是下一代流数据湖平台。 Apache Hudi 将核心仓库和数据库功能直接引入数据湖。 Hudi 提供表、事务、高效的更新插入/删除、高级索引、流式摄取服务、数据集群/压缩优化和并发性,同时将您的数据保持为开源文件格式。

Hudi目前支持Flink、Spark与Java引擎实现数据写入。今天我们挑选其中一种,来看一下Flink引擎中的DataStream API写入方式。

根据官网以及hudi相关代码,目前基于Flink DataStream API写入hudi的方式也可分为hudi官网所述的如下方式(https://hudi.apache.org/docs/flink-quick-start-guide#insert-data):

image.png

以及hudi源码中HoodieFlinkStreamer类提供的示例。

imagea744f2a9f121831a.png

image283f77a9ca3fe255.png

大概地,我们将上述两种方式分别成为HoodiePipeline方式和HoodieFlinkStreamer方式,两种方式本质上还是大同小异。下面我们简要分析一下这两种方式。

HoodiePipeline方式

Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");

DataStream<RowData> dataStream = env.addSource(...);
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
    .column("uuid VARCHAR(20)")
    .column("name VARCHAR(10)")
    .column("age INT")
    .column("ts TIMESTAMP(3)")
    .column("`partition` VARCHAR(20)")
    .pk("uuid")
    .partition("partition")
    .options(options);

builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
env.execute("Api_Sink");    

以数据写入为例,在读取数据源数据之后,基于表字段和表路径构建HoodiePipeline.Builder,进而在sink函数中传入数据源。

sink函数为具体执行者。其内容如下:

    public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) {
      TableDescriptor tableDescriptor = getTableDescriptor();
      return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded);
    }

HoodiePipeline.sink内容为:

  /**
   * Returns the data stream sink with given catalog table.
   *
   * @param input        The input datastream
   * @param tablePath    The table path to the hoodie table in the catalog
   * @param catalogTable The hoodie catalog table
   * @param isBounded    A flag indicating whether the input data stream is bounded
   */
  private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) {
    FactoryUtil.DefaultDynamicTableContext context = Utils.getTableContext(tablePath, catalogTable, Configuration.fromMap(catalogTable.getOptions()));
    HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
    return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
        .getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)))
        .consumeDataStream(input);
  }

分析该return语句,其调用的方法为HoodieTableFactory#createDynamicTableSink,HoodieTableSink#getSinkRuntimeProvider,上述代码即hudi扩展flink的动态表的相关方法。

其中,HoodieTableSink#getSinkRuntimeProvider内容为:

  @Override
  public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    return (DataStreamSinkProviderAdapter) dataStream -> {

      // setup configuration
      long ckpTimeout = dataStream.getExecutionEnvironment()
          .getCheckpointConfig().getCheckpointTimeout();
      conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
      // set up default parallelism
      OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism());
      // set up client id
      OptionsInference.setupClientId(conf);

      RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType();

      // bulk_insert mode
      final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
      if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
        return Pipelines.bulkInsert(conf, rowType, dataStream);
      }

      // Append mode
      if (OptionsResolver.isAppendMode(conf)) {
        DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream, context.isBounded());
        if (OptionsResolver.needsAsyncClustering(conf)) {
          return Pipelines.cluster(conf, rowType, pipeline);
        } else {
          return Pipelines.dummySink(pipeline);
        }
      }

      DataStream<Object> pipeline;
      // bootstrap
      final DataStream<HoodieRecord> hoodieRecordDataStream =
          Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
      // write pipeline
      pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
      // compaction
      if (OptionsResolver.needsAsyncCompaction(conf)) {
        // use synchronous compaction for bounded source.
        if (context.isBounded()) {
          conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
        }
        return Pipelines.compact(conf, pipeline);
      } else {
        return Pipelines.clean(conf, pipeline);
      }
    };
  }

HoodieFlinkStreamer方式

相信分析完HoodiePipeline方式,HoodieFlinkStreamer方式也就死一目了然了,其直接使用的是HoodieTableSink#getSinkRuntimeProvider方法中的代码构造DataStream。

Flink DataStream API实现Hudi数据写入

官方给了HoodiePipeline方式写入hudi的示例,但是HoodieFlinkStreamer方式给的并不全。下面我们以HoodieFlinkStreamer方式为例,读取kafka数据进而写入Hudi。

kafka发送数据

数据结构

package com.zh.ch.bigdata.examples.kafka;

import java.io.Serializable;

public class HudiSource implements Serializable {

    private int uuid;

    private String name;

    private int age;

    private int ts;

    public HudiSource() {
    }

    public HudiSource(int uuid, String name, int age, int ts) {
        this.uuid = uuid;
        this.name = name;
        this.age = age;
        this.ts = ts;
    }

    public int getUuid() {
        return uuid;
    }

    public void setUuid(int uuid) {
        this.uuid = uuid;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public int getTs() {
        return ts;
    }

    public void setTs(int ts) {
        this.ts = ts;
    }
}

producer类

package com.zh.ch.bigdata.examples.kafka;

import com.alibaba.fastjson2.JSON;
import com.zh.ch.bigdata.examples.utils.PropertiesUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class KafkaProducerExample implements Runnable {

    private static final Logger log = LoggerFactory.getLogger(KafkaProducerExample.class);

    @Override
    public void run() {
        Properties kafkaConfig = PropertiesUtil.load("kafka/src/main/resources/kafkaConfig.properties");
        try (KafkaProducer<String, Object> producer = new KafkaProducer<>(kafkaConfig)) {
            for (int i = 500; i < 600; i++) {
                producer.send(new ProducerRecord<>("hudi_topic_20230619_2", Integer.toString(i), JSON.toJSONString(new HudiSource(i, "name" + i, i, i))));
            }
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                4,
                8,
                10,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10));
        executor.execute(new KafkaProducerExample());
        executor.shutdown();
    }
}

相关参数:

bootstrap.servers        =10.8.0.1:30092
linger.ms                =1
acks                     =1
key.serializer           =org.apache.kafka.common.serialization.StringSerializer
value.serializer         =org.apache.kafka.common.serialization.StringSerializer

key.deserializer         =org.apache.kafka.common.serialization.StringDeserializer
value.deserializer       =org.apache.kafka.common.serialization.StringDeserializer
group.id                 =consumer-group-1

Flink消费数据写入Hudi

package com.zh.ch.bigdata.examples.hudi;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.hudi.com.beust.jcommander.JCommander;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.transform.Transformer;
import org.apache.hudi.sink.utils.Pipelines;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.StreamerUtil;

import java.util.ArrayList;
import java.util.List;

/**
 * 运行参数
 * <p>
 * --table-type MERGE_ON_READ
 * --kafka-bootstrap-servers kafka:30092
 * --kafka-topic hudi_topic_20230619_2
 * --target-table hudi_tbl
 * --target-base-path file:///data/hudi/hudidb/hudi_tbl
 * --kafka-group-id consumer-group
 * --source-avro-schema
 * "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ {\"name\": \"name\",\"type\": \"string\"},{\"name\":
 * \"age\", \"type\": \"int\"},{\"name\":\"uuid\",\"type\": \"int\"},{\"name\":\"ts\",\"type\": \"long\"}]}"
 */

public class HudiFlinkStreamer {

    public static void main(String[] args) throws Exception {
        // 创建flink DataStream执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final FlinkStreamerConfig cfg = new FlinkStreamerConfig();
        JCommander cmd = new JCommander(cfg, null, args);
        if (cfg.help || args.length == 0) {
            cmd.usage();
            System.exit(1);
        }

        // 设置并行度
        env.setParallelism(4);

        // 设置checkpoint
        env.enableCheckpointing(30000);

        cfg.setString("rest.port", "8081");

        env.getConfig().setGlobalJobParameters(cfg);
        // We use checkpoint to trigger write operation, including instant generating and committing,
        // There can only be one checkpoint at one time.
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        env.setStateBackend(cfg.stateBackend);
        if (cfg.flinkCheckPointPath != null) {
            env.getCheckpointConfig().setCheckpointStorage(cfg.flinkCheckPointPath);
        }

        Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);

        TypedProperties kafkaProps = DFSPropertiesConfiguration.getGlobalProps();
        kafkaProps.putAll(StreamerUtil.appendKafkaProps(cfg));

        // Read from kafka source

        RowType.RowField rowField = new RowType.RowField("name", new VarCharType(VarCharType.MAX_LENGTH));
        RowType.RowField rowField1 = new RowType.RowField("age", new IntType());
        RowType.RowField rowField2 = new RowType.RowField("uuid", new IntType());
        RowType.RowField rowField3 = new RowType.RowField("ts", new IntType());

        List<RowType.RowField> rowFields = new ArrayList<>();
        rowFields.add(rowField);
        rowFields.add(rowField1);
        rowFields.add(rowField2);
        rowFields.add(rowField3);

        RowType rowType = new RowType(rowFields);

        DataStream<RowData> dataStream = env
            .addSource(new FlinkKafkaConsumer<>(cfg.kafkaTopic, new JsonRowDataDeserializationSchema(rowType,
                InternalTypeInfo.of(rowType), false, true, TimestampFormat.ISO_8601), kafkaProps))
            .name("kafka_source").uid("uid_kafka_source");

        if (cfg.transformerClassNames != null && !cfg.transformerClassNames.isEmpty()) {
            Option<Transformer> transformer = StreamerUtil.createTransformer(cfg.transformerClassNames);
            if (transformer.isPresent()) {
                dataStream = transformer.get().apply(dataStream);
            }
        }

        OptionsInference.setupSinkTasks(conf, env.getParallelism());
        DataStream<Object> pipeline;
        // Append mode
        if (OptionsResolver.isAppendMode(conf)) {
            pipeline = Pipelines.append(conf, rowType, dataStream, false);
            if (OptionsResolver.needsAsyncClustering(conf)) {
                Pipelines.cluster(conf, rowType, pipeline);
            }
            else {
                Pipelines.dummySink(pipeline);
            }
        }
        else {
            DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, dataStream);
            pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
            if (OptionsResolver.needsAsyncCompaction(conf)) {
                Pipelines.compact(conf, pipeline);
            }
            else {
                Pipelines.clean(conf, pipeline);
            }
        }
        pipeline.print();

        env.execute(cfg.targetTableName);

    }

}

总结

针对上述两种方式,我们可以发现其实都是大同小异的,最后都是调用的一段相同代码,都是相当灵活的。在使用过程中,可结合自己的业务场景分别选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/682052.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

项目经理一直盲目跟风的PMP认证,到底还剩多少含金量?

早上好&#xff0c;我是老原。 到底有没有必要考证&#xff1f; 到底考啥证&#xff1f; PMP&#xff1f;软考&#xff1f;还是NPDP&#xff1f; …… 这是老原的粉丝们亘古不变的话题。 我这有不少朋友就是这样&#xff0c;前两年就在问我要不要考证&#xff0c;直到现在…

为什么你的手机号需要二次实名,这几个原因你想过没有?

尊敬的客户&#xff1a;您的手机卡存在安全风险&#xff0c;为保护您的权益&#xff0c;您的电话呼出、短信和流量使用功能被限制。请机主本人使用被保护的手机号码登录网页链接 进行实名核验&#xff0c;核验通过后&#xff0c;将自动恢复正常通信服务。如核验未通过&#xff…

vite构建工具初识

一、什么是vite vite官网地址&#xff1a;https://cn.vitejs.dev/ Vite 是一个由 Vue.js 作者尤雨溪开发的新一代前端构建工具&#xff0c;它相比于传统的 webpack&#xff0c;具有更快的启动速度、更高的开发效率和更简洁的配置方式。 Vite的主要特点包括&#xff1a; 快速…

2023年上海/广州/深圳DAMA-CDGA/CDGP数据治理认证班

DAMA认证为数据管理专业人士提供职业目标晋升规划&#xff0c;彰显了职业发展里程碑及发展阶梯定义&#xff0c;帮助数据管理从业人士获得企业数字化转型战略下的必备职业能力&#xff0c;促进开展工作实践应用及实际问题解决&#xff0c;形成企业所需的新数字经济下的核心职业…

Java 设计模式实战系列—策略模式

从优惠打折活动说起 电商平台为了增加销量经常搞一些活动&#xff0c;比如 618、双十一&#xff0c;还有一些节假日活动&#xff0c;根据销量的变化又经常更新不同的活动。最开始为了增加销量&#xff0c;全场都六折&#xff1a; // 打六折 public BigDecimal sixDiscount(Bi…

Spring Cloud 之注册中心 Eureka 精讲

&#x1f353; 简介&#xff1a;java系列技术分享(&#x1f449;持续更新中…&#x1f525;) &#x1f353; 初衷:一起学习、一起进步、坚持不懈 &#x1f353; 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正&#x1f64f; &#x1f353; 希望这篇文章对你有所帮助,欢…

【单周期CPU】LoongArch | 立即数扩展模块Ext | 32位算术逻辑运算单元(ALU)

前言&#xff1a;本章内容主要是演示在vivado下利用Verilog语言进行单周期简易CPU的设计。一步一步自己实现模型机的设计。本章先介绍单周期简易CPU中基本组合逻辑部件的设计。 &#x1f4bb;环境&#xff1a;一台内存4GB以上&#xff0c;装有64位Windows操作系统和Vivado 201…

HarmonyOS学习路之开发篇—AI功能开发(文档检测校正)

基本概念 文档校正提供了文档翻拍过程的辅助增强功能&#xff0c;包含两个子功能&#xff1a; 文档检测&#xff1a;能够自动识别图片中的文档&#xff0c;返回文档在原图中的位置信息。这里的文档泛指外形方正的事物&#xff0c;比如书本、相片、画框等。文档校正&#xff1a…

java中如何实现字符串反转

java中如何实现字符串反转 方式1&#xff1a;通过创建StringBuilder或StringBuffer对象&#xff0c;并使用其reverse()方法实现字符串的反转 上代码&#xff1a; /*** 给定一个字符串&#xff0c;通过创建SpringBuilder对象的方式将字符串进行反转* return*/public static …

cadence从原理图到pcb

完成原理图设计后&#xff0c;需要进行如下步骤才能开始画PCB&#xff1a; 原理图规制检测(DRC)生成网表新建PCB文件&#xff0c;设置封装路径导入网表设置原点和栅格绘制PCB板框将器件导入PCB 原理图规制检测(DRC) 选中原理图文件&#xff0c;运行Tools->Design Rules C…

synchronized锁升级详细过程

目录 一、锁升级基础 1&#xff09;偏向锁 2&#xff09;轻量级锁&#xff08;自旋锁&#xff09; 3&#xff09;重量级锁 二、为什么要有锁升级过程&#xff1f; 1&#xff09;减少无竞争情况下的同步操作开销 2&#xff09;尽量避免线程切换的开销 3&#xff09;降低…

MySQL 数据库

文章目录 数据库的基本概念数据表数据库数据库管理系统数据库系统 数据库的发展史当今主流数据库介绍SQL Server &#xff08;微软公司产品&#xff09;Oracle &#xff08;甲骨文公司产品&#xff09;DB2 &#xff08;IBM公司产品&#xff09;MySQL &#xff08;甲骨文公司收购…

语法篇·Servlet基础

一、初识Servlet 1.1简介 Servlet是一种使用Java语言来开发动态网站的技术。Servlet是运行在Web服务器或应用服务器上的程序&#xff0c;它是作为来自Web浏览器或其他HTTP客户端的请求和HTTP服务器上的数据库或应用程序之间的中间层。Servlet可以收集来自网页表单的用户输入&a…

上位机与两台PLC之间无线以太网通信

本文以组态王和2台三菱FX5u PLC为例&#xff0c;介绍组态王与多台 PLC的无线以太网通信实现过程。在本方案中采用了三菱PLC无线通讯终端DTD419MB&#xff0c;作为实现无线通讯的硬件设备。 在这一无线以太网通讯系统的搭建中&#xff0c;用户无需更改网络参数和原有程序&#…

Java版本的工程项目管理系统源代码之工程项目管理系统面临的挑战 spring cloud +支持二开

管理方式 项目管理服务&#xff08;PM&#xff09; 是指工程项目管理企业按照合同约定&#xff0c;在工程项目决策阶段&#xff0c;为业主编制可行性研究报告&#xff0c;进行可行性分析和项目策划&#xff1b;在工程项目实施阶段&#xff0c;为业主提供招标代理、设计管理、采…

为什么个人项目我更推荐使用Caddy?

为什么个人项目我更推荐使用Caddy? 为什么个人项目我更推荐使用Caddy? 前言什么是Caddy?Caddy是够用且省心的简单的配置自动化 https结尾参考链接 前言 最近我把自己一些项目里面的 nginx 换成了 caddy&#xff0c;运转相当良好&#xff0c;比较开心&#xff0c;所以写了…

java 会员中心管理系统Myeclipse开发mysql数据库web结构jsp编程计算机网页项目

一、源码特点 JSP 会员中心管理系统 是一套完善的系统源码&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;以及相应配套的设计文档&#xff0c;系统主要采用B/S模式开发。 研究的基本内容是基于Web的会员中心管理系…

印刷企业使用MES生产管理系统后,会出现哪些弊端

MES生产管理系统是一种用于企业管理、控制和优化生产过程的管理软件。在印刷企业中&#xff0c;印刷MES管理系统可以帮助企业更好地管理生产过程&#xff0c;提高生产效率和质量。但是&#xff0c;在使用印刷MES管理系统时&#xff0c;也会存在一些弊端。本文将探讨这些弊端&am…

java版本Spring Cloud + Spring Boot +二次开发+企业电子招标采购系统源码

一、立项管理 1、招标立项申请 功能点&#xff1a;招标类项目立项申请入口&#xff0c;用户可以保存为草稿&#xff0c;提交。 2、非招标立项申请 功能点&#xff1a;非招标立项申请入口、用户可以保存为草稿、提交。 3、采购立项列表 功能点&#xff1a;对草稿进行编辑&#x…

基于轻量级yolov5s开发构建车道线实例分割检测识别系统

车道线实例分割检测是指利用计算机视觉技术对图像或视频中的车道线进行精确的识别和定位任务。该任务旨在区分和标记出每条独立的车道线&#xff0c;并提供它们的准确位置和形状信息。 实例分割是目标检测和语义分割的结合&#xff0c;不仅要找到目标的边界框&#xff0c;还需…