starrock通过导入实现数据变更

news2025/1/13 13:09:40

770dacbf535836e27a8c9df7777ae0cf.jpeg

当数据文件中同时包含 UPSERT 和 DELETE 操作时,必须添加 __op 字段,并且确保数据文件中包含一个代表操作类型的列,取值为 0 或 1。其中,取值为 0 时代表 UPSERT 操作,取值为 1 时代表 DELETE 操作。

数据样例
  1. 准备数据文件。

    a. 在本地文件系统创建一个 CSV 格式的数据文件 example3.csv。文件包含四列,分别代表用户 ID、用户姓名、用户得分和操作类型,如下所示:

    101,Tom,100,1
    102,Sam,70,0
    103,Stan,80,0

    b. 把 example3.csv 文件中的数据上传到 Kafka 集群的 topic3 中。

  2. 准备 StarRocks 表。

    a. 在数据库 test_db 中创建一张名为 table3 的主键模型表。表包含 idname 和 score 三列,分别代表用户 ID、用户名称和用户得分,主键为 id 列,如下所示:

    CREATE TABLE `table3`
    (
        `id` int(11) NOT NULL COMMENT "用户 ID",
        `name` varchar(65533) NOT NULL COMMENT "用户姓名",
        `score` int(11) NOT NULL COMMENT "用户得分"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    说明

    自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 确定分桶数量。

    b. 向 table3 表中插入数据,如下所示:

    INSERT INTO table3 VALUES
        (101, 'Tom', 100),
        (102, 'Sam', 90);
导入数据

通过导入,把 example3.csv 文件中 id 为 101 的数据从 table3 表中删除,把 example3.csv 文件中 id 为 102 的数据更新到 table3 表,并且把 example3.csv 文件中 id 为 103 的数据插入到 table3 表:

  • 通过 Stream Load 导入:

    curl --location-trusted -u <username>:<password> \
        -H "Expect:100-continue" \
        -H "label:label4" \
        -H "column_separator:," \
        -H "columns: id, name, score, temp, __op = temp" \
        -T example3.csv -XPUT \
        http://<fe_host>:<fe_http_port>/api/test_db/table3/_stream_load

    说明

    上述示例中,通过 columns 参数把 example3.csv 文件中代表组别代码的第四列临时命名为 temp,然后定义 __op 字段等于临时命名的 temp 列。这样,StarRocks 可以根据 example3.csv 文件中第四列的取值是 0 还是 1 来确定执行 UPSERT 还是 DELETE 操作。

代码实现

数据转成JSON,自动增加一个是否删除标志

private String toJsonString(TapTable tapTable, Map<String, Object> record, boolean delete) throws JsonProcessingException {
    if (null == tapTable) throw new IllegalArgumentException("TapTable cannot be null");
    if (null == record) throw new IllegalArgumentException("Record cannot be null");
    LinkedHashMap<String, Object> linkedRecord = new LinkedHashMap<>();
    for (String field : tapTable.getNameFieldMap().keySet()) {
      Object value = record.get(field);
      if (null == value) {
        linkedRecord.put(field, null);
      } else {
        linkedRecord.put(field, value.toString());
      }
    }
    linkedRecord.put(Constants.STARROCKS_DELETE_SIGN, delete ? 1 : 0);
    return objectMapper.writeValueAsString(linkedRecord);
  }

stream load导入

public RespContent put(final TapTable table) throws StreamLoadException, StarRocksRetryableException {
    StarRocksConfig config = starRocksContext.getStarRocksConfig();
    StarRocksContext.WriteFormat writeFormat = starRocksContext.getWriteFormat();
    String loadUrl = null;
    try {
      String[] httpNodes = config.getStarRocksHttp().split(",");
      loadUrl = buildLoadUrl(httpNodes[new Random().nextInt(httpNodes.length)], config.getDatabase(), table.getId());
      TapLogger.info("starrocks-load: loadUrl = {}", loadUrl);
      final String prefix = buildPrefix(table.getId());


      String label = prefix + "-" + UUID.randomUUID();
      List<String> columns = new ArrayList<>();
      for (Map.Entry<String, TapField> entry : table.getNameFieldMap().entrySet()) {
        columns.add(entry.getKey());
      }
      // add the STARROCKS_DELETE_SIGN at the end of the column
      columns.add(Constants.STARROCKS_DELETE_SIGN);
      columns.add("__op = "+Constants.STARROCKS_DELETE_SIGN);
      HttpPutBuilder putBuilder = new HttpPutBuilder();
      InputStreamEntity entity = new InputStreamEntity(recordStream, recordStream.getContentLength());
      Collection<String> primaryKeys = table.primaryKeys(true);
      if (CollectionUtils.isEmpty(primaryKeys)) {
        putBuilder.setUrl(loadUrl)
            // 前端表单传出来的值和tdd json加载的值可能有差别,如前端传的pwd可能是null,tdd的是空字符串
            .baseAuth(config.getUser(), config.getPassword())
            .addCommonHeader()
            .addFormat(writeFormat)
            .addColumns(columns)
            .setLabel(label)
            .enableAppend()
            .setEntity(entity);
      } else {
        putBuilder.setUrl(loadUrl)
            // 前端表单传出来的值和tdd json加载的值可能有差别,如前端传的pwd可能是null,tdd的是空字符串
            .baseAuth(config.getUser(), config.getPassword())
            .addCommonHeader()
            .addFormat(writeFormat)
            .addColumns(columns)
            .setLabel(label)
            .enableDelete()
            .setEntity(entity);
      }
      HttpPut httpPut = putBuilder.build();
      TapLogger.debug(TAG, "Call stream load http api, url: {}, headers: {}", loadUrl, putBuilder.header);
      return handlePreCommitResponse(httpClient.execute(httpPut));
    } catch (StarRocksRetryableException e) {
      metrics.clear();
      throw e;
    } catch (Exception e) {
      throw new StreamLoadException(String.format("Call stream load error: %s", e.getMessage()), e);
    }
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1134441.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[蓝桥杯-610]分数

题面 解答 这一题如果不知道数论结论的话&#xff0c;做这个题会有两种天壤之别的体验 此题包含以下两个数论知识 1. 2^02^12^2...2^(n-1)2^n-1 2. 较大的数如果比较小的数的两倍大1或者小1&#xff0c;则两者互质 所以答案就是2^n-1/2^(n-1) 标程1 我的初次解答 #in…

分享一款spring渗透测试工具-支持springboot敏感路径扫描和spring漏洞扫描

工具简介&#xff1a; SBSCAN是一款专注于spring框架的渗透测试工具&#xff0c;可以对指定站点进行spring boot敏感信息扫描以及进行spring相关漏洞的扫描与验证。 最全的敏感路径字典&#xff1a;最全的spring boot站点敏感路径字典&#xff0c;帮你全面检测站点是否存在敏…

ZYNQ中Block Memory Generator数据位宽更改

1.场景&#xff1a; 实际使用中&#xff0c;启用了PS中AXI_GP端口的AXI总线&#xff0c;总线的位宽设置为32bit。所需控制的BRAM的接口是AXI接口&#xff0c;数据位宽是64bit。其他设备的AXI接口都是32bit。 2.问题&#xff1a; 搭建好block原理图后&#xff0c;BRAM这…

Kafak - 单机/集群快速安装指北(3.x版本)

文章目录 官方下载地址上传安装包解压安装包到指定目录修改解压包名为kafka修改config目录下的配置文件server.propertie配置环境变量其他机器同上 - 修改配置文件中的brokerid启动集群停止Kraft 方式部署集群----(不使用zookeeper) 官方下载地址 http://kafka.apache.org/dow…

STM32G030F6P6点灯闪烁

前言 &#xff08;1&#xff09;如果有嵌入式企业需要招聘湖南区域日常实习生&#xff0c;任何区域的暑假Linux驱动实习岗位&#xff0c;可C站直接私聊&#xff0c;或者邮件&#xff1a;zhangyixu02gmail.com&#xff0c;此消息至2025年1月1日前均有效 &#xff08;2&#xff0…

机器学习——正则化

正则化 在机器学习学习中往往不知道需要不知道选取的特征个数&#xff0c;假如特征个数选取过少&#xff0c;容易造成欠拟合&#xff0c;特征个数选取过多&#xff0c;则容易造成过拟合。由此为了保证模型能够很好的拟合样本&#xff0c;同时为了不要出现过拟合现象&#xff0…

Kafka与Spark案例实践

1.概述 Kafka系统的灵活多变&#xff0c;让它拥有丰富的拓展性&#xff0c;可以与第三方套件很方便的对接。例如&#xff0c;实时计算引擎Spark。接下来通过一个完整案例&#xff0c;运用Kafka和Spark来合理完成。 2.内容 2.1 初始Spark 在大数据应用场景中&#xff0c;面对…

NineData:高效、安全、可靠的DB2数据管理平台

Db2 是老牌厂商 IBM 研发和维护的关系型数据库管理系统。作为一个拥有悠久历史的数据库系统&#xff0c;Db2 凭借它的高可靠、可扩展和高安全性等诸多优点&#xff0c;在如今的数据库市场依然占据相当大的份额。 对于诸多金融行业的企业而言&#xff0c; Db2 作为承载其核心业务…

一键自助建站系统api版系统源码

自助建站系统,一建建站系统api版,自动建站 安装推荐php7.2或7.2以下都行 可使用虚拟主机或者服务器进行搭建。 分站进入网站后台 域名/admin 初始账号123456qq.com密码123456 找到后台的网站设置 将主站域名及你在主站的通信secretId和通信secretKey填进去。 即可正常使用 通信…

Vue2 - 脚手架中整合 Vditor(全网唯一一篇帮你搞定)

目录 一、Vue2 框架整合 Vditor 1.1、安装 1.2、引入 Vditor 相关文件 1.3、配置 Vditor 1.4、使用 Vditor 一、Vue2 框架整合 Vditor 1.1、安装 npm install vditor --save 1.2、引入 Vditor 相关文件 import Vditor from "vditor" import "vditor/dist/…

Redis 命令 和 数据类型 您知道多少

文章目录 一、概述二、Redis 命令行客户端连接 Redis 服务器三、在 Redis 帮助命令的说明四、Redis 通用命令 generic4.1 通用命令说明4.1 keys 命令&#xff0c;列举出当前库的所有键4.2 type 命令&#xff0c;可以查看键对应值的类型4.3 object encoding 命令&#xff0c;查看…

SCT52240STDR双路 4A/4A 高速MOSFET/IGBT栅极驱动器, 可并联输出

SCT52240是是一款宽供电电压、双通道、高速、低测栅极驱动器&#xff0c;包括功率MOSFET&#xff0c;IGBT。单个通道能够提供高达4A拉电流和4A灌电流的轨到轨驱动能力&#xff0c;并实现轨到轨输出。高达24V宽电压范围提高功率器件开关瞬间栅极驱动的振铃幅值裕度。13ns输入输出…

java springboot2.7 写一个本地 pdf 预览的接口

依赖方面 创建的是 接口web项目就好了 然后包管理工具打开需要这些 import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; imp…

vue项目中将html转为pdf并下载

个人项目地址&#xff1a; SubTopH前端开发个人站 &#xff08;自己开发的前端功能和UI组件&#xff0c;一些有趣的小功能&#xff0c;感兴趣的伙伴可以访问&#xff0c;欢迎提出更好的想法&#xff0c;私信沟通&#xff0c;网站属于静态页面&#xff09; SubTopH前端开发个人…

【机器学习】sklearn特征值选取与处理

sklearn特征值选取与处理 文章目录 sklearn特征值选取与处理1. 调用数据集与数据集的划分2. 字典特征选取3. 英文文本特征值选取4. 中文特征值选取5. 中文分词文本特征抽取6. TfidfVectorizer特征抽取7. 归一化处理8. 标准化处理9. 过滤低方差特征10. 主成分分析11. 案例&#…

node实战——搭建带swagger接口文档的后端koa项目(node后端就业储备知识)

文章目录 ⭐前言⭐初始化项目⭐配置router目录自动扫描路由⭐swagger文件配置自动生成json文件⭐封装扫描目录路由加入swagger⭐配置项目入口总文件⭐运行效果⭐总结⭐结束⭐前言 大家好,我是yma16,本文分享关于node实战——搭建带swagger接口文档的后端koa项目(node后端就…

挑战吧,HarmonyOS应用开发工程师

一年一度属于工程师的专属节日1024&#xff0c;多重活动亮相啦~ 参与活动即有机会获得HUAWEI Freebuds 5i 耳机等精美礼品&#xff01; 点击“阅读原文”查看更多活动详情&#xff01;

SAD notes

ESKF 总结 prediction 更新误差先验 F F F通过3.42来算 得到 这里有点绕的一点是: 误差状态的 F F F牵涉到名义状态, 而名义状态又需要在时间上推进更新 其中, F中的名义状态的推进通过公式3.41得到, (名义状态不考虑误差, 这一点从3.41d, 3.41e可以看出, 误差状态只考虑…

“成为视频制作达人:高效为视频批量添加文字水印的技巧分享“

"作为一名视频制作达人&#xff0c;我经常需要处理大量的视频文件。有时候&#xff0c;为了提高视频的识别度和个性化&#xff0c;我会选择给视频添加文字水印。今天&#xff0c;我将分享如何使用“固乔剪辑助手”软件批量给视频添加文字水印的技巧。 首先&#xff0c;我们…

职业技术认证:《研发效能(DevOps)工程师》——开启职业发展新篇章

在互联网行业中&#xff0c;资质认证可以证明在该领域内的专业能力和知识水平。各种技术水平认证也是层出不穷&#xff0c;而考取具有公信力和权威性的认证是从业者的首选。同时&#xff0c;随着国内企业技术实力的提升和国家对于自主可控的重视程度不断提高&#xff0c;国产证…