Hudi第三章:集成Flink

news2024/10/7 14:29:24

系列文章目录

Hudi第一章:编译安装
Hudi第二章:集成Spark
Hudi第二章:集成Spark(二)
Hudi第三章:集成Flink


文章目录

  • 系列文章目录
  • 前言
  • 一、环境准备
    • 1.上传并解压
    • 2.修改配置文件
    • 3.拷贝jar包
    • 4.启动sql-client
      • 1.启动hadoop
      • 2.启动session
      • 3.启动sql-client
  • 二、sql-client编码
    • 1.创建表
    • 2.插入数据
    • 3.查询数据
    • 4.更新数据
    • 5.流式插入
  • 三、IDEA编码
    • 1.编写pom.xml
    • 2.编写demo
  • 总结


前言

之前的两次博客学习了hudi和spark的集成,现在我们来学习hudi和flink的集成。


一、环境准备

1.上传并解压

在这里插入图片描述

2.修改配置文件

vim /opt/module/flink-1.13.6/conf/flink-conf.yaml
直接在最后追加即可。

classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4

state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop102:8020/ckps
state.backend.incremental: true

sudo vim /etc/profile.d/my_env.sh

export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

source /etc/profile.d/my_env.sh

3.拷贝jar包

cp /opt/software/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar  /opt/module/flink-1.13.6/lib/
cp /opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink-1.13.6/lib/
cp /opt/module/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/module/flink-1.13.6/lib/

4.启动sql-client

1.启动hadoop

2.启动session

/opt/module/flink-1.13.6/bin/yarn-session.sh -d

3.启动sql-client

bin/sql-client.sh embedded -s yarn-session

启动成功后可以在web端看一下。
在这里插入图片描述
也可以跳转到flink的webui。
在这里插入图片描述
在这里插入图片描述
现在我们就可以在终端写代码了。
在这里插入图片描述

二、sql-client编码

1.创建表

CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop102:8020/tmp/hudi_flink/t1',
  'table.type' = 'MERGE_ON_READ'
);

在这里插入图片描述

2.插入数据

INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

3.查询数据

我们先更改一下表格式,默认的看得可能不习惯。

set sql-client.execution.result-mode=tableau;
select * from t1;

在这里插入图片描述

4.更新数据

前面说过hudi的更新操作就是插入一条主键相同的新数据,由更新的ts来覆盖旧的。

insert into t1 values
  ('id1','Danny',27,TIMESTAMP '1970-01-02 00:00:01','par1');

在这里插入图片描述
可以看到数据已经完成了更新。

5.流式插入

flink最常用的还是流式数据的处理。

CREATE TABLE sourceT (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t2',
  'table.type' = 'MERGE_ON_READ'
);

我们创建两张表,第一张的连接器是datagen可以用来流式的生产数据。第二张表是正常的hudi表。

insert into t2 select * from sourceT;

我们可以在webui看一下。
在这里插入图片描述
因为是流式处理,所以这个进程是不会停止的。

select * from t2 limit 10;

在这里插入图片描述
再查看一次
在这里插入图片描述
我们会发现是不断有数据产生。

三、IDEA编码

我们需要将编译好的一个包拉到本地。
在这里插入图片描述
然后将他倒入maven仓库

mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar

1.编写pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.hudi</groupId>
    <artifactId>flink-hudi-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.13.6</flink.version>
        <hudi.version>0.12.0</hudi.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>   <!--不会打包到依赖中,只参与编译,不参与运行 -->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!--idea运行时也有webui-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
            <scope>provided</scope>
        </dependency>
        <!--手动install到本地maven仓库-->
        <dependency>
            <groupId>org.apache.hudi</groupId>
            <artifactId>hudi-flink_2.12</artifactId>
            <version>${hudi.version}</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2.编写demo

HudiDemo.java
一个简单的流式数据处理和刚刚一样。

package com.atguigu.hudi.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


import java.util.concurrent.TimeUnit;


public class HudiDemo {
    public static void main(String[] args) {
        System.setProperty("HADOOP_USER_NAME", "atguigu");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        // 设置状态后端RocksDB
        EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
        embeddedRocksDBStateBackend.setDbStoragePath("/home/chaoge/Downloads/hudi");
        embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
        env.setStateBackend(embeddedRocksDBStateBackend);

        // checkpoint配置
        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/ckps");
        checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        StreamTableEnvironment sTableEnv = StreamTableEnvironment.create(env);

        sTableEnv.executeSql("CREATE TABLE sourceT (\n" +
                "  uuid varchar(20),\n" +
                "  name varchar(10),\n" +
                "  age int,\n" +
                "  ts timestamp(3),\n" +
                "  `partition` varchar(20)\n" +
                ") WITH (\n" +
                "  'connector' = 'datagen',\n" +
                "  'rows-per-second' = '1'\n" +
                ")");

        sTableEnv.executeSql("create table t2(\n" +
                "  uuid varchar(20),\n" +
                "  name varchar(10),\n" +
                "  age int,\n" +
                "  ts timestamp(3),\n" +
                "  `partition` varchar(20)\n" +
                ")\n" +
                "with (\n" +
                "  'connector' = 'hudi',\n" +
                "  'path' = 'hdfs://hadoop102:8020/tmp/hudi_idea/t2',\n" +
                "  'table.type' = 'MERGE_ON_READ'\n" +
                ")");

        sTableEnv.executeSql("insert into t2 select * from sourceT");

    }
}

当我们运行的时候,可以再本地webui查看。
127.0.0.1:8081/
在这里插入图片描述
也可以在hdfs路径看一下。
在这里插入图片描述


总结

flink第一次就先写到这里剩下的还要在写一次。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1076135.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年中国助消化药物行业现状分析:消化不良患者逐年上升,提升需求量[图]

助消化药物主要分为促胃动力药物、消化酶抑制剂、胃酸抑制药物和消食剂4种类型。促胃动力药物的作用机制是通过增强胃肠道平滑肌动力促进胃酸分泌&#xff0c;从而达到助消化的目的&#xff0c;临床常用药物包括多潘立酮、莫沙必利、西沙比利等。 助消化药物分类 资料来源&…

SI314兼容替代 GTX314L—低功耗14通道电容触摸传感器芯片 应用智能门锁

1.介绍 Si314是一款具有自动灵敏度校准功能的14通道电容传感器&#xff0c;其工作电压范围为1.8~5.5v.Si314设置休眠模式来节省功耗&#xff0c;此时&#xff0c;功耗电流为10uA3. 3V。 Si314各个感应通道可实现独立使能、校准、灵敏度调节&#xff0c;可以确保可靠性&#x…

一款好用的PDF文档解密软件

PDF Decrypter pro 纯免费&#xff0c;没有页数限制&#xff0c;没有额外水印&#xff0c;强烈推荐&#xff01;

使用docker创建redis实例、主从复制、哨兵集群

单机模式 1 拉取镜像 docker pull redis:7.2.1 2 新建redis映射配置文件夹data和conf $ mkdir -p /mydata/redis/data $ mkdir -p /mydata/redis/conf 3 切换到redis配置文件映射目录/mydata/redis/conf cd /mydata/redis/conf 4 编辑配置文件 vim redis.…

算法题:单调递增的数字(贪心算法解决序列问题)

这道题参考了一位网友的思路&#xff0c;采用了贪心算法动态规划&#xff0c;具体思路如下&#xff1a;&#xff08;完整题目附在了最后面&#xff09; 1、从高到低遍历数字的每个位数&#xff0c;找到第一个数值递减&#xff08;<&#xff09;的地方&#xff0c;把当前位-…

[spring] spring jpa - hibernate 名词解释配置

[spring] spring jpa - hibernate 名词解释&配置 之前过了一遍依赖注入的内容&#xff0c;这次过一下数据相关的部分&#xff0c;完成了这部分内容&#xff0c;下篇就涉及到 API 实现了 操作的部分放到下一篇&#xff0c;本篇主要是概念配置 整体课程上来说&#xff0c;…

c++ pthread库使用

c pthread库使用 1. pthread库安装2. 测试demo3. 配置include/lib路径3.1 添加include路径3.2 配置lib文件路径 4.显示结果5. 遇到的bug参考文献 1. pthread库安装 打开ftp://sourceware.org/pub/pthreads-win32&#xff0c;下载一个安装包&#xff0c;如pthreads-w32-2-8-0-r…

STM32 CubeMX PWM三种模式(HAL库)

STM32 CubeMX PWM两种模式&#xff08;HAL库&#xff09; STM32 CubeMX STM32 CubeMX PWM两种模式&#xff08;HAL库&#xff09;一、互补对称输出STM32 CubeMX设置代码部分 二、带死区互补模式STM32 CubeMX设置代码 三、普通模式STM32 CubeMX设置代码部分 总结 一、互补对称输…

实现Java基于类的代理方式 - CGLIB动态代理(动态代理篇 三)

CGLIB&#xff08;Code Generation Library&#xff09;是一个基于类的动态代理库&#xff0c;它可以在运行时生成字节码来创建代理类。相比于JDK动态代理&#xff0c;CGLIB动态代理不需要接口&#xff0c;可以代理任意类。 CGLIB动态代理的实现原理是通过继承目标类来创建代理…

什么是网络流量监控

随着许多服务迁移到云&#xff0c;网络基础架构的维护变得复杂。虽然云采用在生产力方面是有利的&#xff0c;但它也可能让位于未经授权的访问&#xff0c;使 IT 系统容易受到安全攻击。 为了确保其网络的安全性和平稳的性能&#xff0c;IT 管理员需要监控用户访问的每个链接以…

【C++杂货铺】一文带你走进RBTree

文章目录 一、红黑树的概念二、红黑树的性质三、红黑树结点的定义四、红黑树的插入操作4.1 情况一&#xff1a;uncle 存在且为红4.2 情况二&#xff1a;uncle 不存在4.3 情况三&#xff1a;uncle 存在且为黑4.4 插入完整源码 五、红黑树的验证六、红黑树与 AVL 树的比较七、结语…

可替代allegroA3909的国产芯片GC3909的数据分析

GLOBALCHIP的 GC3909 是一款双通道 12V 直流电机驱动芯片&#xff0c;为摄像机、消费类产品、玩具和其他低压或 者电池供电的运动控制类应用提供了集成的电机驱动解决方案。芯片一般用来驱动两个直流电机 或者驱动一个步进电机。 而GC3909 可以工作在 3.8~12V 的电源电压上&…

Ant Design+react 路由跳转

今天我们来继续探讨react的路由跳转 首先&#xff0c;创建router文件夹中的index import { lazy } from "react"; import { Outlet,useRoutes } from react-router-dom; //引入页面&#xff0c;引用了路由懒加载 const One lazy(() > import(../pages/one)); c…

QT作业二

1、思维导图 https://www.zhixi.com/view/9e899ee0 2、作业 #include <iostream>using namespace std;class Rect {int width;int height; public:void init(int w,int h);//初始化函数void set_w(int w);//更改宽度void set_h(int h);//更改高度void show();//输出矩形…

DEDECMS织梦保存当前栏目更改时失败的解决方法

织梦编辑栏目时提示“保存当前栏目更改时失败&#xff0c;请检查你的输入资料是否存在问题&#xff01;” 那是因为你的后台栏目编辑文件php里有做过二次开发&#xff0c;添加了栏目数据表里不存在的字段。 跟着下面的步骤&#xff0c;让程序告诉你缺少什么引起的保存失败吧。 …

轻松玩转直播带货,铭顺科技揭秘数字人跳舞直播软件亮点

直播带货在商业领域越来越受追捧&#xff0c;而数字人直播作为一种创新推广方式&#xff0c;正逐渐引起关注。在这个竞争激烈的直播行业中&#xff0c;数字人私有化部署解决方案以其独特的优势和亮点&#xff0c;为用户提供了一个轻松玩转直播带货的利器。 数字人私有化部署解决…

leetCode 167.两数之和 || - 输入有序数组 双指针解法

167. 两数之和 II - 输入有序数组 - 力扣&#xff08;LeetCode&#xff09; 给你一个下标从 1 开始的整数数组 numbers &#xff0c;该数组已按 非递减顺序排列 &#xff0c;请你从数组中找出满足相加之和等于目标数 target 的两个数。如果设这两个数分别是 numbers[index1] …

无人化微波产品智能测试系统

无人化微波产品智能测试系统是面向射频微波产品领域客户数字化转型需求&#xff0c;推出的一款新形态自动测试系统。该系统实现了微波产品测试由单工位串行测试向多工位并行测试转变&#xff0c;具有测试容量大、测试效能高、测试管理信息化等特点。该系统可数倍提升客户的测试…

C# redis通过stream实现消息队列以及ack机制

redis实现 查看redis版本 redis需要>5.0 Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型&#xff0c;Stream 是一个包含 0 个或者多个元素的有序队列&#xff0c;这些元素根据 ID 的大小进行有序排列。 它实现了大部分消息队列的功能&#xff1a; 消息 ID…

Vue路由进阶--VueRouter声明式导航

Vue路由进阶–VueRouter声明式导航 文章目录 Vue路由进阶--VueRouter声明式导航1、声明式导航1.1、导航链接1.2、高亮类名1.3、跳转传参1.4、动态路由参数可选符 1、声明式导航 1.1、导航链接 需求&#xff1a;实现导航高亮效果 vue-router提供了一个全局组件router-link(取…