大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu

news2025/1/15 23:19:59

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(正在更新…)

章节内容

上节我们完成了如下的内容:

  • Kudu Java API
  • 增删改查 编写案例测试

在这里插入图片描述

实现思路

将数据从 Flink 下沉到 Kudu 的基本思路如下:

  • 环境准备:确保 Flink 和 Kudu 环境正常运行,并配置好相关依赖。
  • 创建 Kudu 表:在 Kudu 中定义要存储的数据表,包括主键和列类型。
  • 数据流设计:使用 Flink 的 DataStream API 读取输入数据流,进行必要的数据处理和转换。
  • 写入 Kudu:通过 Kudu 的连接器将处理后的数据写入 Kudu 表。需要配置 Kudu 客户端和表的相关信息。
  • 执行作业:启动 Flink 作业,实时将数据流中的数据写入 Kudu,便于后续查询和分析。

添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <flink.version>1.11.1</flink.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.17.0</version>
        </dependency>

    </dependencies>
</project>

数据源

new UserInfo("001", "Jack", 18),
new UserInfo("002", "Rose", 20),
new UserInfo("003", "Cris", 22),
new UserInfo("004", "Lily", 19),
new UserInfo("005", "Lucy", 21),
new UserInfo("006", "Json", 24),

自定义下沉器

package icu.wzk.kudu;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.apache.log4j.Logger;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Map;


public class MyFlinkSinkToKudu extends RichSinkFunction<Map<String, Object>> {

    private final static Logger logger = Logger.getLogger("MyFlinkSinkToKudu");

    private KuduClient kuduClient;
    private KuduTable kuduTable;

    private String kuduMasterAddr;
    private String tableName;
    private Schema schema;
    private KuduSession kuduSession;
    private ByteArrayOutputStream out;
    private ObjectOutputStream os;

    public MyFlinkSinkToKudu(String kuduMasterAddr, String tableName) {
        this.kuduMasterAddr = kuduMasterAddr;
        this.tableName = tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        out = new ByteArrayOutputStream();
        os = new ObjectOutputStream(out);
        kuduClient = new KuduClient.KuduClientBuilder(kuduMasterAddr).build();
        kuduTable = kuduClient.openTable(tableName);
        schema = kuduTable.getSchema();
        kuduSession = kuduClient.newSession();
        kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND);
    }

    @Override
    public void invoke(Map<String, Object> map, Context context) throws Exception {
        if (null == map) {
            return;
        }
        try {
            int columnCount = schema.getColumnCount();
            Insert insert = kuduTable.newInsert();
            PartialRow row = insert.getRow();
            for (int i = 0; i < columnCount; i ++) {
                Object value = map.get(schema.getColumnByIndex(i).getName());
                insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value);
                OperationResponse response = kuduSession.apply(insert);
                if (null != response) {
                    logger.error(response.getRowError().toString());
                }
            }
        } catch (Exception e) {
            logger.error(e);
        }
    }

    @Override
    public void close() throws Exception {
        try {
            kuduSession.close();
            kuduClient.close();
            os.close();
            out.close();
        } catch (Exception e) {
            logger.error(e);
        }
    }

    private void insertData(PartialRow row, Type type, String columnName, Object value) {
        try {
            switch (type) {
                case STRING:
                    row.addString(columnName, value.toString());
                    return;
                case INT32:
                    row.addInt(columnName, Integer.valueOf(value.toString()));
                    return;
                case INT64:
                    row.addLong(columnName, Long.valueOf(value.toString()));
                    return;
                case DOUBLE:
                    row.addDouble(columnName, Double.valueOf(value.toString()));
                    return;
                case BOOL:
                    row.addBoolean(columnName, Boolean.valueOf(value.toString()));
                    return;
                case BINARY:
                    os.writeObject(value);
                    row.addBinary(columnName, out.toByteArray());
                    return;
                case FLOAT:
                    row.addFloat(columnName, Float.valueOf(value.toString()));
                default:
                    throw new UnsupportedOperationException("Unknown Type: " + type);
            }

        } catch (Exception e) {
            logger.error("插入数据异常: " + e);
        }
    }
}

编写实体

package icu.wzk.kudu;

public class UserInfo {

    private String id;

    private String name;

    private Integer age;

    public UserInfo(String id, String name, Integer age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

执行建表

package icu.wzk.kudu;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.CreateTableOptions;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;

import java.util.ArrayList;
import java.util.List;

public class KuduCreateTable {

    public static void main(String[] args) throws KuduException {
        String masterAddress = "localhost:7051,localhost:7151,localhost:7251";
        KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress);
        KuduClient kuduClient = kuduClientBuilder.build();

        String tableName = "user";
        List<ColumnSchema> columnSchemas = new ArrayList<>();
        ColumnSchema id = new ColumnSchema
                .ColumnSchemaBuilder("id", Type.INT32)
                .key(true)
                .build();
        columnSchemas.add(id);
        ColumnSchema name = new ColumnSchema
                .ColumnSchemaBuilder("name", Type.STRING)
                .key(false)
                .build();
        columnSchemas.add(name);
        ColumnSchema age = new ColumnSchema
                .ColumnSchemaBuilder("age", Type.INT32)
                .key(false)
                .build();
        columnSchemas.add(age);

        Schema schema = new Schema(columnSchemas);
        CreateTableOptions options = new CreateTableOptions();
        // 副本数量为1
        options.setNumReplicas(1);
        List<String> colrule = new ArrayList<>();
        colrule.add("id");
        options.addHashPartitions(colrule, 3);

        kuduClient.createTable(tableName, schema, options);
        kuduClient.close();
    }

}

主逻辑代码

package icu.wzk.kudu;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

public class SinkToKuduTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserInfo> dataSource = env.fromElements(
                new UserInfo("001", "Jack", 18),
                new UserInfo("002", "Rose", 20),
                new UserInfo("003", "Cris", 22),
                new UserInfo("004", "Lily", 19),
                new UserInfo("005", "Lucy", 21),
                new UserInfo("006", "Json", 24)
        );
        SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource
                .map(new MapFunction<UserInfo, Map<String, Object>>() {
                    @Override
                    public Map<String, Object> map(UserInfo value) throws Exception {
                        Map<String, Object> map = new HashMap<>();
                        map.put("id", value.getId());
                        map.put("name", value.getName());
                        map.put("age", value.getAge());
                        return map;
                    }
                });

        String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251";
        String tableInfo = "user";
        mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));

        env.execute("SinkToKuduTest");
    }

}

解释分析

环境设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:初始化 Flink 的执行环境,这是 Flink 应用的入口。

数据源创建

DataStreamSource dataSource = env.fromElements(…):创建了一个包含多个 UserInfo 对象的数据源,模拟了一个输入流。

数据转换

SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(…):使用 map 函数将 UserInfo 对象转换为 Map<String, Object>,便于后续处理和写入 Kudu。每个 UserInfo 的属性都被放入一个 HashMap 中。

Kudu 配置信息

String kuduMasterAddr = “localhost:7051,localhost:7151,localhost:7251”; 和 String tableInfo = “user”;:定义 Kudu 的主节点地址和目标表的信息。

数据下沉

mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));:将转换后的数据流添加到 Kudu 的自定义 Sink 中。MyFlinkSinkToKudu 类应该实现了将数据写入 Kudu 的逻辑。

执行作业

env.execute(“SinkToKuduTest”);:启动 Flink 作业,执行整个数据流处理流程。

测试运行

  • 先运行建表
  • 再运行主逻辑

我们建表之后,确认user表存在。然后我们运行Flink程序,将数据写入Kudu。
在这里插入图片描述
确认有表后,执行 Flink 程序:
在这里插入图片描述

注意事项

  • 并发性:根据 Kudu 集群的规模和配置,可以调整 Flink 作业的并发性,以提高写入性能。
  • 批量写入:Kudu 支持批量插入,可以通过适当配置 Flink 的 sink 来提高性能。
  • 故障处理:确保在作业中处理异常和重试逻辑,以确保数据不会丢失。
  • 监控与调试:使用 Flink 的监控工具和 Kudu 的工具(如 Kudu UI)来监控数据流和性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2166129.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Boot房屋租赁平台:现代化解决方案

1 绪论 1.1 研究背景 中国的科技的不断进步&#xff0c;计算机发展也慢慢的越来越成熟&#xff0c;人们对计算机也是越来越更加的依赖&#xff0c;科研、教育慢慢用于计算机进行管理。从第一台计算机的产生&#xff0c;到现在计算机已经发展到我们无法想象。给我们的生活改变很…

Recaptcha2 图像识别 API 对接说明

Recaptcha2 图像识别 API 对接说明 本文将介绍一种 Recaptcha2 图像识别2 API 对接说明&#xff0c;它可以通过用户输入识别的内容和 Recaptcha2验证码图像&#xff0c;最后返回需要点击的小图像的坐标&#xff0c;完成验证。 接下来介绍下 Recaptcha2 图像识别 API 的对接说…

8.12DoG (Difference of Gaussians)

基本概念 不同尺度的高斯模糊图像之间的差异&#xff08;DoG&#xff09;&#xff0c;用于边缘检测。函数: cv::GaussianBlur() 结合 cv::Laplacian() 或者自定义DoG实现。 在OpenCV中并没有直接提供一个名为“DoG”&#xff08;Difference of Gaussians&#xff09;的函数&a…

【学术会议征稿】第四届人工智能、机器人和通信国际会议(ICAIRC 2024)

第四届人工智能、机器人和通信国际会议&#xff08;ICAIRC 2024&#xff09; 2024 4th International Conference on Artificial Intelligence, Robotics, and Communication 第四届人工智能、机器人和通信国际会议&#xff08;ICAIRC 2024&#xff09;定于2024年12月27-29日…

css 自定义滚动条样式

* { scrollbar-color: auto !important; scrollbar-width: auto; } //滚动条宽高 ::-webkit-scrollbar { width: 4px; height: 4px; background: transparent; } ::-webkit-scrollbar-thumb { //滑块部分 border-radius: 5px; background-color: rgba(32, 224, 254, 1); } ::-…

【Python报错已解决】TypeError: can only concatenate str (not “float“) to str

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…

docker compose的使用

docker compose 1.概述 是 Docker 官方提供的一款开源工具&#xff0c;主要用于简化在单个主机上定义和运行多容器 Docker 应用的过程。它的核心作用是容器编排&#xff0c;使得开发者能够在一个统一的环境中以声明式的方式管理多容器应用的服务及其依赖关系。 也就是说Docker…

用 Django 5 快速生成一个简单 进销存 系统 添加 个打印 按钮

一、前置条件&#xff1a; 1.安装好python 【关联网址】 2. 安装好vscode 【关联网址】 插件 3. 登陆海螺AI【关联网址】 4. 安装好 pip install django 【关联网址】 pip install django -i https://mirrors.aliyun.com/pypi/simple/ 二、开始生成 1. 打开vscode 打开…

[数据库实验五] 审计及触发器

一、实验目的与要求&#xff1a; 1.了解MySQL审计功能及实现方式 2.掌握触发器的工作原理、定义及操作方法 二、实验内容&#xff1a; 注&#xff1a; 在同一个触发器内编写多行代码&#xff0c;需要用结构begin ……end 函数current_user()获得当前登录用户名 1.自动保存…

Linux 应用层自定义协议与序列化

文章目录 一、应用层1、协议2、序列化 && 反序列化3、通过Json库进行数据的序列化 && 反序列化Json::Value类Json::Reader类Json::Writer类 二、为什么read、write、recv、send和Tcp支持全双工&#xff1f;发数据的本质&#xff1a;tcp支持全双工通信的原因&am…

gitlab-runner集成CI/CD完整项目部署

目录 1.环境安装 2.gitlab代码仓库搭建 3.gitlab-runner-安装以及注册 4..gitlab-ci.yml脚本 5.脚本说明 6.build.sh 7.test.sh 8. deploy.sh 9.运行流水线 10.选择流水线分支 11.查看运行阶段 12.查看运行日志 13.查看服务器真实日志 1.环境安装 确保服务器的Java环…

Python_异常机制

软件程序在运行过程中&#xff0c;非常可能遇到刚刚提到的这些问题&#xff0c;我们称之为异常&#xff0c;英文是&#xff1a;Exception&#xff0c;意思是例外。遇到这些例外情况&#xff0c;或者叫异常&#xff0c;我们怎么让写的程序做出合理的处理&#xff0c;安全的退出&…

Footprint Growthly Quest 工具:赋能 Telegram 社区实现 Web3 飞速增长

作者&#xff1a;Stella L (stellafootprint.network) 在 Web3 的快节奏世界里&#xff0c;社区互动是关键。而众多 Web3 社区之所以能够蓬勃发展&#xff0c;很大程度上得益于 Telegram 平台。正因如此&#xff0c;Footprint Analytics 精心打造了 Growthly —— 一款专为 Tel…

Tkinter制作登录界面以及登陆后页面切换

Tkinter制作登录界面以及登陆后页面切换 前言序言1. 由来2. 思路3. 项目结构描述4. 项目实战1. 登录界面实现&#xff08;代码&#xff09;2. 首页界面实现&#xff08;代码&#xff09;3. 打包build.py&#xff08;与main.py同级目录&#xff09;4. 打包安装包 前言 本帖子&a…

【nrm】npm 注册表管理器

nrm是什么 nrm&#xff08;NPM Registry Manager&#xff09;是一个用于管理 Node.js 包管理器&#xff08;如 npm 和 Yarn&#xff09;的注册表工具。它可以帮助用户快速切换不同的 npm 源&#xff0c;以便于提高包安装的速度和效率&#xff0c;特别是在中国大陆地区&#xf…

Ubuntu23.10下处理libncurses5-dev包的安装问题

Ubuntu23.10下处理libncurses5-dev包的安装问题 导语环境准备问题和解决方案总结参考文献 导语 使用Ubuntu23.10的时候&#xff0c;遇到需要termios的场景&#xff0c;结果发现无论是codeblocks还是系统本身的gcc都无法找到term.h和curse.h&#xff0c;网上找了很多解决方案都…

了解云计算工作负载保护的重要性,确保数据和应用程序安全

云计算de小白 云计算技术的快速发展使数据和应用程序安全成为一种关键需求&#xff0c;而不仅仅是一种偏好。随着越来越多的客户公司将业务迁移到云端&#xff0c;保护他们的云工作负载&#xff08;指所有部署的应用程序和服务&#xff09;变得越来越重要。云工作负载保护&…

【stm32】TIM定时器输出比较-PWM驱动LED呼吸灯/舵机/直流电机

TIM定时器输出比较 一、输出比较简介1、OC&#xff08;Output Compare&#xff09;输出比较2、PWM简介3、输出比较通道(高级)4、输出比较通道(通用)5、输出比较模式6、PWM基本结构配置步骤&#xff1a;程序代码&#xff1a;PWM驱动LED呼吸灯 7、参数计算8、舵机简介程序代码&am…

nginx 安装(Centos)

nginx 安装-适用于 Centos 7.x [rootiZhp35weqb4z7gvuh357fbZ ~]# lsb_release -a LSB Version: :core-4.1-amd64:core-4.1-noarch Distributor ID: CentOS Description: CentOS Linux release 7.9.2009 (Core) Release: 7.9.2009 Codename: Core# 创建文件…

大模型训练:K8s 环境中数千节点存储最佳实践

今天这篇博客来自全栈工程师朱唯唯&#xff0c;她在前不久举办的 KubeCon 中国大会上进行了该主题分享。 Kubernetes 已经成为事实的应用编排标准&#xff0c;越来越多的应用在不断的向云原生靠拢。与此同时&#xff0c;人工智能技术的迅速发展&#xff0c;尤其是大型语言模型&…