Java Spark 操作 Apache Kudu

news2024/11/15 14:03:41

一、Apache Kudu

在这里插入图片描述

Apache Kudu是一种列式分布式存储引擎,它的设计目标是支持快速分析和高吞吐量的数据访问,同时也能够支持低延迟、实时查询和更新操作。它被称为Hadoop生态系统的新一代存储层,能够与Apache Spark、Apache Impala、Apache Hive等大数据处理框架集成使用。

Kudu在设计上采用了多版本并发控制(MVCC)技术,支持ACID事务,并提供了灵活的数据复制机制,可以在不同的数据中心、机房之间实现数据同步和备份,确保数据的高可靠性和可用性。

Kudu提供了对多种数据类型的支持,包括结构化数据和半结构化数据,支持复杂的数据模型和数据访问模式,例如时间序列数据、地理空间数据等。此外,Kudu还提供了多种API和工具,如Java、Python、SQLAPI以及Impala Shell、Spark Shell等工具,方便用户进行数据分析和查询操作。

Apache Kudu 部署可参考下面通过使用 k8s 快速部署方案:

K8s 部署 Apache Kudu 集群

二、Java Api 操作 kudu

这里创建一个 SpringBoot 项目,在 pom 中引入下面依赖:

<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-client-tools</artifactId>
    <version>1.14.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-client</artifactId>
    <version>1.14.0</version>
</dependency>

下面在配置文件中增加配置,将 kudu 信息放置在此:

kudu:
  master: 10.218.222.13:30751
  defaultSocketReadTimeout: 10000

创建 KuduClient 并注入 Spring 容器中:

@Configuration
public class KuduConfig {

    @Value("${kudu.master}")
    private String master;

    @Value("${kudu.defaultSocketReadTimeout}")
    private Integer defaultSocketReadTimeout;

    @Bean
    public KuduClient kuduClient() {
        KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(master);
        kuduClientBuilder.defaultSocketReadTimeoutMs(defaultSocketReadTimeout);
        return kuduClientBuilder.build();
    }
    
}

1. 创建表

1.1 创建表,使用 hash 分区

@Resource
KuduClient kuduClient;

@Test
void createTable() throws KuduException {
    String tableName = "user";
    if (!kuduClient.tableExists(tableName)) {
        //构建创建表的schema信息-----就是表的字段和类型
        ArrayList<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
        columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
        columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build());
        columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build());
        Schema schema = new Schema(columnSchemas);
        //指定创建表的相关属性
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(3);
        ArrayList<String> partitionList = new ArrayList<String>();
        // 指定kudu表的分区字段是什么
        partitionList.add("id");
        // 按照 id.hashcode % 分区数 = 分区号
        tableOptions.addHashPartitions(partitionList, 6);
        kuduClient.createTable(tableName, schema, tableOptions);
    }
}

1.2 创建表,使用范围分区

@Test
public void testRangePartition() throws KuduException {
    String tableName = "user1";
    if (!kuduClient.tableExists(tableName)) {
        //设置表的schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
        columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build());
        columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build());
        //创建schema
        Schema schema = new Schema(columnSchemas);
        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(3);
        //设置范围分区的规则
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("id");
        //设置按照那个字段进行range分区
        tableOptions.setRangePartitionColumns(parcols);
        // 设置分区的范围 , 10个范围,每个范围放 10000
        int count = 0;
        for (int i = 0; i < 10; i++) {
            //范围开始
            PartialRow lower = schema.newPartialRow();
            lower.addInt("id", count);
            //范围结束
            PartialRow upper = schema.newPartialRow();
            count += 10000;
            upper.addInt("id", count);
            //设置每一个分区的范围
            tableOptions.addRangePartition(lower, upper);
        }
        kuduClient.createTable("student", schema, tableOptions);
    }
}

1.3 创建表,同时使用hash分区和range分区

/**
 * 哈希分区有利于提高写入数据的吞吐量,而范围分区可以避免tablet无限增长问题,
 * hash分区和range分区结合,可以极大的提升kudu的性能
 */
@Test
public void testMultilevelPartition() throws KuduException {
    String tableName = "user2";
    if (!kuduClient.tableExists(tableName)) {
        //设置表的schema
        LinkedList<ColumnSchema> columnSchemas = new LinkedList<ColumnSchema>();
        columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
        columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build());
        columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build());
        //创建schema
        Schema schema = new Schema(columnSchemas);
        //创建表时提供的所有选项
        CreateTableOptions tableOptions = new CreateTableOptions();
        //设置副本数
        tableOptions.setNumReplicas(1);
        //分区字段
        LinkedList<String> parcols = new LinkedList<String>();
        parcols.add("id");
        //hash分区
        tableOptions.addHashPartitions(parcols, 5);
        //range分区
        int count = 0;
        for (int i = 0; i < 10; i++) {
            PartialRow lower = schema.newPartialRow();
            lower.addInt("id", count);
            count += 10000;
            PartialRow upper = schema.newPartialRow();
            upper.addInt("id", count);
            tableOptions.addRangePartition(lower, upper);
        }
        kuduClient.createTable("cat", schema, tableOptions);
    }
}

2. 写入数据

@Test
public void insertTable() throws KuduException {
    String tableName = "user";
    //构建 kuduSession 对象
    KuduSession kuduSession = kuduClient.newSession();
    kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
    //构建 Operation 的子类实例对象
    KuduTable kuduTable = kuduClient.openTable(tableName);
    // 写入数据
    for (int i = 1; i <= 10; i++) {
        Insert insert = kuduTable.newInsert();
        PartialRow row = insert.getRow();
        row.addInt("id", i);
        row.addString("name", "name-" + i);
        row.addInt("age", 20 + i);
        // 提交数据
        kuduSession.apply(insert);
    }
}

3. 查询数据

@Test
public void queryData() throws KuduException {
    String tableName = "user";
    //构建查询扫描器
    KuduScanner.KuduScannerBuilder kuduScannerBuilder = kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
    // 查询的列
    ArrayList<String> columnsList = new ArrayList<String>();
    columnsList.add("id");
    columnsList.add("name");
    columnsList.add("age");
    kuduScannerBuilder.setProjectedColumnNames(columnsList);
    // 条件筛选
    kuduScannerBuilder.addPredicate(
            KuduPredicate.newComparisonPredicate(
                    new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build(),
                    KuduPredicate.ComparisonOp.EQUAL,
                    "张三"));
    // 范围查询
    kuduScannerBuilder.addPredicate(
            KuduPredicate.newComparisonPredicate(
                    new ColumnSchema.ColumnSchemaBuilder("age", Type.INT32).build(),
                    KuduPredicate.ComparisonOp.LESS,
                    50));

    // in
    kuduScannerBuilder.addPredicate(KuduPredicate.newInListPredicate(
            new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).build(),
            ImmutableList.of(1, 2, 3, 4)));

    kuduScannerBuilder.limit(100);
    //启用故障容错模式,如果设置为true,则在发生错误时,会尝试重新连接并重试操作。
    kuduScannerBuilder.setFaultTolerant(true);
    //返回结果集
    KuduScanner kuduScanner = kuduScannerBuilder.build();
    //遍历数据
    while (kuduScanner.hasMoreRows()) {
        RowResultIterator rowResults = kuduScanner.nextRows();
        while (rowResults.hasNext()) {
            RowResult row = rowResults.next();
            int id = row.getInt("id");
            String name = row.getString("name");
            int age = row.getInt("age");
            System.out.println("id=" + id + "  name=" + name + "  age=" + age);
        }
    }
}

4. 修改数据

@Test
public void updateData() throws KuduException {
    String tableName = "user";
    KuduSession kuduSession = kuduClient.newSession();
    kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
    KuduTable kuduTable = kuduClient.openTable(tableName);

    //Update update = kuduTable.newUpdate();
    //如果id存在就表示修改,不存在就新增
    Upsert upsert = kuduTable.newUpsert(); 
    PartialRow row = upsert.getRow();
    row.addInt("id", 1);
    row.addString("name", "name-100");
    row.addInt("age", 20);
    row.addInt("sex", 0);
    kuduSession.apply(upsert);
}

5. 删除数据

@Test
public void deleteData() throws KuduException {
    String tableName = "user";
    KuduSession kuduSession = kuduClient.newSession();
    kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
    KuduTable kuduTable = kuduClient.openTable(tableName);
    Delete delete = kuduTable.newDelete();
    PartialRow row = delete.getRow();
    row.addInt("id", 100);
    kuduSession.apply(delete);
}

6. 删除表

@Test
public void dropTable() throws KuduException {
    String tableName = "student";
    if (kuduClient.tableExists(tableName)) {
        kuduClient.deleteTable(tableName);
    }
}

三、Spark 操作 kudu

pom 中新增依赖:

<dependency>
    <groupId>org.apache.kudu</groupId>
    <artifactId>kudu-spark3_2.12</artifactId>
    <version>1.14.0</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.0.1</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.1</version>
</dependency>

1. KuduContext

Sparkkudu 提供了 KuduContext 可进行基本的数据操作:

在这里插入图片描述

例如:使用 KuduContext 创建表:

class KuduSparkTests {

    public static void main(String[] args) throws AnalysisException, KuduException {
        String master = "10.218.222.13:30751";

        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                .getOrCreate();

        SparkContext sc = spark.sparkContext();
        sc.setLogLevel("warn");
        
        KuduContext kuduContext = new KuduContext(master, sc);
        // 创建表
        String tableName = "user1";
        if (!kuduContext.tableExists(tableName)) {
            // 声明字段
            List<ColumnSchema> columns = Arrays.asList(
                    new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build(),
                    new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).build()
            );
            Schema schema = new Schema(columns);
            CreateTableOptions tableOptions = new CreateTableOptions();
            tableOptions.setNumReplicas(3);
            ArrayList<String> partitionList = new ArrayList<String>();
            // 指定kudu表的分区字段是什么
            partitionList.add("id");
            // 按照 id.hashcode % 分区数 = 分区号
            tableOptions.addHashPartitions(partitionList, 6);
            kuduContext.createTable(tableName, schema, tableOptions);
        }
        spark.stop();
    }
}

基于 KuduContext 也可以创建原生的 Session ,使用原生的方式操作数据:

KuduTable table = kuduContext.syncClient().openTable("user");
KuduSession session = kuduContext.syncClient().newSession();

2. SparkSQL

相比 Api 的操作,kudu 还可以使用 SparkSQL 进行操作:

class KuduSparkTests {

    public static void main(String[] args) throws AnalysisException, KuduException {
        String master = "10.218.222.13:30751";
        
        SparkSession spark = SparkSession
                .builder()
                .appName("sparksql")
                .master("local[*]")
                .getOrCreate();
                
		String tableName = "user1";
        Dataset<Row> kuduData = spark.read().format("kudu")
                .option("kudu.master", master)
                .option("kudu.table", tableName)
                .load();
        kuduData.printSchema();
        kuduData.show();
        kuduData.createGlobalTempView("user");
        spark.sql("select count(*) from user").show();

        spark.stop();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/659168.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python3 字典与集合 | 菜鸟教程(七)

目录 一、Python3 字典 &#xff08;一&#xff09;字典是另一种可变容器模型&#xff0c;且可存储任意类型对象。 &#xff08;二&#xff09;字典的每个键值 key>value 对用冒号 : 分割&#xff0c;每个对之间用逗号(,)分割&#xff0c;整个字典包括在花括号 {} 中 &am…

西南交通大学智能监测 培训课程练习5

2023.06.17培训 linux的简单实用 打包、部署后端jar服务 目录 一、连接远程服务器 二、maven项目打包 2.1添加build依赖 2.2使用maven打包 三、Linux基础操作 3.1利用Xftp上传文件 3.1.1返回上一层目录 3.1.2查看文件 3.1.3进入文件 3.1.4创建文件夹 3.1.5上传文件 …

安装Apache mysql php

一.Apache网站服务 Apache起源 源于 APatchy Server&#xff0c;著名的开源Web服务软件 1995年时&#xff0c;发布Apache服务程序的1.0版本 由Apache软件基金会 (ASF) 负责维护 最新的名称为“Apache HTTP Server 安装Apache----下面两个插件是httpd2.4以后的版…

SpringBoot整合activiti7实现简单的员工请假流程

Activiti 是一个开源架构的工作流引擎&#xff0c;基于bpmn2.0 标准进行流程定义。其前身是JBPM&#xff0c;Activiti 通过嵌入到业务系统开发中进行使用。 整合springboot 引入相关依赖 <!-- 引入Activiti7 --><dependency><groupId>org.activiti</gro…

什么是2.5G和5G多千兆端口?

概要 在当前数字化时代&#xff0c;对于高速数据传输和网络连接的需求不断增长。为了满足这种需求&#xff0c;网络技术也在不断发展和进步。2.5G和5G多千兆端口是一种新型的网络连接技术&#xff0c;提供了比传统千兆以太网更高的传输速率和带宽。本文将详细介绍 的定义、工作…

[元带你学: eMMC协议详解 15] 写保护(Write Protect)详解

依JEDEC eMMC 5.1及经验辛苦整理&#xff0c;付费内容&#xff0c;禁止转载。 所在专栏 《元带你学: eMMC协议详解》 内容摘要 全文 1300字&#xff0c; 主要讲述写保护的用法&#xff0c; 写保护的类型。 Write Protect Management 为了允许主机保护数据不被擦除或覆盖写入&…

vue-server-renderer实现服务端渲染

vue-server-renderer实现服务端渲染 简单认识vue-server-renderer&#xff1a; 是 Vue.js 官方提供的一个库&#xff0c;用于将 Vue 组件渲染成 HTML 字符串或流&#xff0c;通常用于服务端渲染。 具体的咱们vue-server-renderer如何实现 1、预编译组件&#xff1a;根据 Vue …

云安全技术(一)之什么是云计算

对于在云环境中工作的安全专家而言&#xff0c;从传统数据中心模型获得的许多知识和最佳实践仍然适用于云计算环境&#xff0c;但安全专家对云计算概念、不同类型的云模型和云服务的深入理解对于成功实施和监督(Overseeing)安全策略和合规性至关重要。 什么是云计算 1.1 云计…

扫码枪(扫描枪)扫码在vue中的使用教学

1.扫描枪使用原理浅析。 扫描枪的使用原理其实很简单&#xff1a;就是把光信号转换成电信号&#xff0c;再将电信号通过模拟数字转换器转化为数字信号传输到计算机中处理。其实可以简单理解为&#xff1a;二维码/条形码 转换成 字符串。 2.扫描枪功能开发前准备。 正所谓“工…

关于【C语言】中scanf与getchar的用法和常见错误详解

写这篇博客的起因是最近博主自己学习中总是遇到类似的错误&#xff0c;并曾百思不得其解。 今天分享出来是希望帮助大家在写代码时避免这些错误。话不多说&#xff0c;我们直接开始吧&#xff01; 君兮_的个人主页 勤时当勉励 岁月不待人 C/C 游戏开发 输入函数scanf与getcha…

[架构之路-213]- 架构 - 架构设计过程快速概览与在线画图工具

目录 第一步&#xff1a;业务系统 &#xff08;1&#xff09;收集目标系统的用户需求 &#xff08;2&#xff09;定义用例图 第二步 领域建模 &#xff08;1&#xff09;业务流程定义 &#xff08;2&#xff09;业务功能分解 &#xff08;3&#xff09;非功能性架构&…

贝莱德出手了!

* * * 原创&#xff1a;刘教链 * * * 号外&#xff1a;今天在“刘教链Pro”发表了头条《内参&#xff1a;贝莱德向SEC申请的究竟是BTC信托还是现货ETF&#xff1f;信托和ETF的5点关键区别》&#xff0c;以及次条《私钥争夺战》&#xff0c;欢迎关注公众号“刘教链Pro”并阅读。…

mysql压缩包方式安装、data数据恢复

前言 最近电脑重装了系统&#xff0c;C盘彻底格式化了&#xff0c;但是D盘中的文件还是保留了下来。 我的MySQL的数据都在D盘了&#xff0c;想要重新恢复MySQL&#xff0c;还是很简单的&#xff1a; 重新安装Mysql将源数据拷贝到新的Mysql的data目录下 顺便记录一下MySQL压缩…

遥感数字图像处理教程复习整理

目录 01 说明 遥感影像的存储方式 BSQ方式 BIL方式 BIP方式 如何计算图像存储空间大小(字节数)&#xff1f; 简单的单位换算 计算公式 简单地 复杂地 如何查看影像的基本信息/辅助信息&#xff1f; 如何进行直方图的阈值分割&#xff1f; 辐射校正 系统辐射校正 …

SonarQube社区版安装插件实现扫描多分支

社区版不支持扫描多分支 社区版不支持扫描多分支&#xff0c;收费版才支持&#xff0c;开源社区有插件可以实现多分支的扫描 插件下载 点击此处跳转下载地址 我的SonarQube是安装的最新版本10&#xff0c;下载的插件版本也是最新的1.14.0&#xff0c;切记下载相对应支持的插件…

html学习与总结表单input系列标签

文章目录 表单标签input系列标签表单input总结button按钮标签select下拉菜单标签textarea文本域标签label标签 表单标签 input系列标签 标签属性说明inputtext文本框inputpassword密码框inputradio单选框inputcheckbok复选框 checked 默认选中inputfile文件上传 multiple 设置…

springboot整合spring-data-redis

前言 其实&#xff0c;整合是一个循序渐进的学习&#xff0c;你肯定是要了解之前底层的相关知识&#xff0c;才能够具体知道现在框架方法api到底tm有什么作用&#xff0c;所以建议先看看我之前的redis博客。 可以不看&#xff0c;但是可以以我这个为目录&#xff0c;针对性得…

python图像处理实战(二)—图像几何变换

&#x1f680;写在前面&#x1f680; &#x1f58a;个人主页&#xff1a;https://blog.csdn.net/m0_52051577?typeblog &#x1f381;欢迎各位大佬支持点赞收藏&#xff0c;三连必回&#xff01;&#xff01; &#x1f508;本人新开系列专栏—python图像处理 ❀愿每一个骤雨初…

Python接口自动化测试实战

目录 前言&#xff1a; 1.接口定义&#xff1a; 2.基本流程 3.需求分析 4.用例设计 5.脚本开发 5.3结果校验 6.结果分析 前言&#xff1a; Python是一款在自动化测试领域应用广泛的编程语言。通过使用Python的测试框架&#xff08;如unittest和pytest&#xff09;&…

【第一次】21级计科计算机组成原理课外练习

【第一次】21级计科计算机组成原理课外练习 一、判断题二、单选题三、多选题四、主观题 一、判断题 1-1 国防科技大学成功研制的“银河-II”通用并行巨型机的峰值速度超过同为国防科技大学研制的“天河一号”超级计算机。 错误 1-2 目前高端光刻机技术被荷兰ASML公司垄断&…