Doris:StreamLoad导入数据

news2025/1/12 10:45:24

    

目录

1.基本原理

2.支持数据格式

3.StreamLoad语法

3.1.请求参数

3.2.返回参数

4.StreamLoad实践

4.1.使用 curl命令

4.2.使用Java代码


    Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。

1.基本原理

        Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
        用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
        导入的最终结果由 Coordinator BE 返回给用户。

2.支持数据格式

        目前 Stream Load 支持数据格式:CSV(文本)、JSON,1.2+ 支持PARQUET 和 ORC。 

3.StreamLoad语法

        Stream Load 通过 HTTP 协议提交和传输数据。这里通过 curl 命令展示如何提交导入。用户也可以通过其他 HTTP client 进行操作。

3.1.请求参数

        Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。

参数参数说明
user/passwdStream load 由于创建导入的协议使用的是 HTTP 协议,通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。
label导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。
column_separator用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。
line_delimiter用于指定导入文件中的换行符,默认为\n。
max_filter_ratio导入任务的最大容忍率,默认为0容忍,取值范围是0~1。当导入的错误率超过该值,则导入失败。如果用户希望忽略错误的行,可以通过设置这个参数大于 0,来保证导入可以成功。
where导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入num_rows_unselected。
Partitions待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL
columns待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
format

指定导入数据格式,支持csv、json,默认是csv

支持csv_with_names(支持csv文件行首过滤)、csv_with_names_and_types(支持csv文件前两行过滤)

exec_mem_limit导入内存限制。默认为 2GB,单位为字节。
merge_type 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE。
APPEND是默认值,表示这批数据全部需要追加到现有数据中,
DELETE 表示删除与这批数据key相同的所有行,
MERGE 语义 需要与delete 条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理
two_phase_commitStream load 导入可以开启两阶段事务提交模式:在Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
enable_profile当 enable_profile 为 true 时,Stream Load profile将会打印到日志中。否则不会打印。

3.2.返回参数

        Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。

参数参数说明
TxnId导入的事务ID。
Label导入 Label。由用户指定或系统自动生成。
Status导入完成状态:
"Success":表示导入成功。
"Publish Timeout":该状态也表示导入已经完成,只是数据可能会延迟可见,无需重试。
"Label Already Exists":Label 重复,需更换 Label。
"Fail":导入失败。
ExistingJobStatus已存在的 Label 对应的导入作业的状态。
Message导入错误信息。
NumberTotalRows导入总处理的行数。
NumberLoadedRows成功导入的行数。
NumberFilteredRows数据质量不合格的行数。
NumberUnselectedRows被 where 条件过滤的行数。
LoadBytes导入的字节数。
LoadTimeMs导入完成时间。单位毫秒。
BeginTxnTimeMs向Fe请求开始一个事务所花费的时间,单位毫秒。
StreamLoadPutTimeMs向Fe请求获取导入数据执行计划所花费的时间,单位毫秒。
ReadDataTimeMs读取数据所花费的时间,单位毫秒。
WriteDataTimeMs执行写入数据操作所花费的时间,单位毫秒。
CommitAndPublishTimeMs向Fe请求提交并且发布事务所花费的时间,单位毫秒。
ErrorURL如果有数据质量问题,通过访问这个 URL 查看具体错误行。

4.StreamLoad实践

4.1.使用 curl命令

curl命令格式如下:

curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load

# Header 中支持属性见下面的 ‘导入任务参数’ 说明
# 格式为: -H "key1:value1"

csv文件数据如下:

id,username,age,sex,phone,register_time
3,user_3,24,0,13212345678,2023-11-03 10:23:34
4,user_4,31,0,13312345678,2023-11-03 12:34:56
5,user_5,53,1,13412345678,2023-11-03 09:12:34

执行导入:

 curl --location-trusted -u root -T /home/weisx/opt/doris/user.csv -H "label:label_user" -H "column_separator:," -H "format:csv_with_names" http://localhost:8030/api/demo/user/_stream_load

134c8f4632454e3c9a0991b84cac3ee2.png

8cb5af8095b34e4983df3c15c74144bc.png

4.2.使用Java代码

package com.yichenkeji.dataplus.core.drois.util;


import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.stream.Collectors;

@Slf4j
public class StreamLoadTest {
    public static void main(String[] args) throws IOException {
        List<Map<String,Object>> datas = loadData();
        String label = "label_user_java";
        String username = "root";
        String password = "";
        String db = "demo";
        String table ="user";
        String loadUrl = String.format("http://192.168.179.131:8030/api/%s/%s/_stream_load",db,table);
        List<String> columns = Arrays.asList("id,username,age,sex,phone,register_time".split(","));
        String columnSeparator = ",";
        String format = "csv";
        String  loadData = datas.stream().map(data -> columns.stream().map(column -> data.get(column).toString()).collect(Collectors.joining(columnSeparator))).collect(Collectors.joining("\n"));
        sendData(label,username,password,loadUrl,columns,loadData,columnSeparator,null,format,null);
    }

    /**
     * 加载数据
     * @return
     */
    private static List<Map<String, Object>> loadData() {
        List<Map<String,Object>> datas = new ArrayList<>();
        Map<String,Object> map = new HashMap<>();
        map.put("id",6);
        map.put("username","user_6");
        map.put("age",52);
        map.put("sex",1);
        map.put("phone","13612345678");
        map.put("register_time","2023-11-02-12:34:36");
        datas.add(map);
        return  datas;
    }

    /**
     * Basic access authentication 签名
     * @param username doris用户名
     * @param password doris用户密码
     * @return
     */
    public static String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }
    /**
     * Stream load 导入数据
     * @param label 导入任务的标识。每个导入任务,都有一个在单 database 内部唯一的 label。label 是用户在导入命令中自定义的名称。通过这个 label,用户可以查看对应导入任务的执行情况。
     * @param username
     * @param password
     * @param loadUrl
     * @param columns 待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
     * @param loadData
     * @param columnSeparator 用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。
     * @param lineDelimiter 用于指定导入文件中的换行符,默认为\n。
     * @param format 指定导入数据格式,支持csv、json,默认是csv
     * @param mergeType 数据的合并类型:一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值
     * @throws IOException
     */
    public static void sendData(String label, String username, String password, String loadUrl
            , List<String> columns, String loadData, String columnSeparator, String lineDelimiter
            , String format, String mergeType) throws IOException {
        HttpClientBuilder
                httpClientBuilder = HttpClients
                .custom()
                .setRedirectStrategy(new DefaultRedirectStrategy() {
                    @Override
                    protected boolean isRedirectable(String method) {
                        return true;
                    }
                });
        log.info("loadUrl:{},columns:{}",loadUrl,columns);
        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            StringEntity entity = new StringEntity(loadData, "UTF-8");
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username, password));
            // the label header is optional, not necessary
            // use label header can ensure at most once semantics
            put.setHeader("label", label);
            if(StringUtils.isNotBlank(columnSeparator)){
                put.setHeader("column_separator", columnSeparator);
            }

            if(StringUtils.isNotBlank(lineDelimiter)){
                put.setHeader("line_delimiter", lineDelimiter);
            }


            put.setHeader("format", format);

            put.setHeader("merge_type", mergeType);
            //字段
            if (null != columns && !columns.isEmpty()) {
                put.setHeader("columns", String.join(",",
                        columns.stream().map(f -> String.format("`%s`", f)).
                                collect(Collectors.toList())));
            }
            //数据
            put.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResultStr  =  null;
                if (response.getEntity() != null) {
                    loadResultStr  =  EntityUtils.toString(response.getEntity());
                }
                final int statusCode = response.getStatusLine().getStatusCode();

                log.info("statusCode:{},loadResultStr:{}",statusCode,loadResultStr);

            }
        }
    }
}

c88d986aeb55462ca5f1e256b52faa02.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1168582.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

图片有水印怎么去?这三招轻松去除图片水印

当我们在网站找一些的图片做头像或者壁纸的时候&#xff0c;会发现一些好看的图片上会带有一些平台水印&#xff0c;这些水印影响了整张照片的美观&#xff0c;那么图片有水印怎么去呢&#xff1f;这时就需要借用图片处理工具来操作。那你们知道图片有水印怎么去吗?今天我就来…

LCR 166.珠宝的最高价值 + 动态规划 + 记忆化搜索 + 递推 + 空间优化

LCR 166. 珠宝的最高价值 - 力扣&#xff08;LeetCode&#xff09; 现有一个记作二维矩阵 frame 的珠宝架&#xff0c;其中 frame[i][j] 为该位置珠宝的价值。拿取珠宝的规则为&#xff1a; 只能从架子的左上角开始拿珠宝每次可以移动到右侧或下侧的相邻位置到达珠宝架子的右下…

匪夷所思,spring aop这么写竟然会失效!!

背景 spring 版本&#xff1a;3.2.8.RELEASEJDK版本&#xff1a;1.8本地是正常&#xff0c;线上环境是有问题的 应用从云下迁移到云上的过程中出现了一个应用部分aop 通知失效的问题&#xff0c;场景如下&#xff1a; node1 节点上的category 是失效的&#xff0c;element是正…

ubuntu 分区 方案

ubuntu 分区 方案 自动分区啥样子的&#xff1f; 手动分区 需要怎么操作&#xff1f; 注意点是啥&#xff1f; swap分区 要和 内存大小 差不多 安装ubuntu系统时硬盘分区方案 硬盘分区概述 一块硬盘最多可以分4个主分区&#xff0c;主分区之外的成为扩展分区。硬盘可以没有…

C++--二叉搜索树初阶

前言&#xff1a;二叉搜索树是一种常用的数据结构&#xff0c;支持快速的查找、插入、删除操作&#xff0c;C中map和set的特性也是以二叉搜索树作为铺垫来实现的&#xff0c;而二叉搜索树也是一种树形结构&#xff0c;所以&#xff0c;在学习map和set之前&#xff0c;我们先来学…

学习率设置

在我们刚刚接触深度学习时&#xff0c;对学习率只有一个很基础的认知&#xff0c;当学习率过大的时候会导致模型难以收敛&#xff0c;过小的时候会收敛速度过慢&#xff0c;其实学习率是一个十分重要的参数&#xff0c;合理的学习率才能让模型收敛到最小点而非局部最优点或鞍点…

学 Java 怎么进外企?

作者&#xff1a;**苍何&#xff0c;CSDN 2023 年 实力新星&#xff0c;前大厂高级 Java 工程师&#xff0c;阿里云专家博主&#xff0c;土木转码&#xff0c;现任部门技术 leader&#xff0c;专注于互联网技术分享&#xff0c;职场经验分享。 &#x1f525;热门文章推荐&#…

HNU程序设计 练习三-控制结构

1.台球游戏 【问题描述】 在本台球游戏中&#xff0c;包含多种颜色的球&#xff0c;其中&#xff1a;红球15只各1分、黄球1只2分、绿球1只3分、咖啡球1只4分、蓝球1只5分、粉球1只6分、黑球1只7分。 球的颜色表示为&#xff1a; r-红色球 y-黄色球 g-绿色球 c-咖啡色球 b-蓝色…

闭循环低温恒温器的使用注意事项

与液氮恒温器相比&#xff0c;闭循环低温恒温器显得稍微复杂一些&#xff01;这主要表现在组成部分、体积重量、使用操作、升降温时间等方面。闭循环低温恒温器主要由冷头、氦压缩机、两根氦气连管组成&#xff0c;配套设备还有控温仪、真空泵&#xff0c;可能还有循环水冷机。…

离散数学实践(2)-编程实现关系性质的判断

*本文为博主本人校内的离散数学专业课的实践作业。由于实验步骤已经比较详细&#xff0c;故不再对该实验额外提供详解&#xff0c;本文仅提供填写的实验报告内容与代码部分&#xff0c;以供有需要的同学学习、参考。 -------------------------------------- 编程语言&#xff…

VM虚拟机逆向 --- [NCTF 2018]wcyvm 复现

文章目录 前言题目分析 前言 第四题了&#xff0c;搞定&#xff0c;算是独立完成比较多的一题&#xff0c;虽然在还原汇编的时候还是很多问题。 题目分析 代码很简单&#xff0c;就是指令很多。 opcode在unk_6021C0处&#xff0c;解密的数据在dword_6020A0处 opcode [0x08, …

谈一谈SQLite、MySQL、PostgreSQL三大数据库

每一份付出&#xff0c;必将有一份收货&#xff0c;就像这个小小的果实&#xff0c;时间到了&#xff0c;也就会开花结果… 三大数据库概述 SQLite、MySQL 和 PostgreSQL 都是流行的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;但它们在功能、适用场景和性…

【UE】从UI拖拽生成物体 —— 更改位置与定点销毁

本篇在上一篇博客&#xff08;【UE】从UI中拖拽生成物体-CSDN博客&#xff09;基础上继续增加更改生成的Actor的位置与定点销毁Actor的功能。 目录 效果 步骤 一、修改生成好的Actor位置 解决问题一&#xff1a;从UI拖出多个actor后&#xff0c;只能对第一个拖出的actor的…

传智杯-21算法赛初赛B组题目详细解法解析-AB题(C/C++、Python、Java)

🚀 欢迎来到 ACM 算法题库专栏 🚀 在ACM算法题库专栏,热情推崇算法之美,精心整理了各类比赛题目的详细解法,包括但不限于ICPC、CCPC、蓝桥杯、LeetCode周赛、传智杯等等。无论您是刚刚踏入算法领域,还是经验丰富的竞赛选手,这里都是提升技能和知识的理想之地。 ✨ 经典…

UG\NX二次开发 先设置默认颜色再创建对象

文章作者:里海 来源网站:里海NX二次开发3000例专栏 感谢粉丝订阅 感谢 qq_42461973 订阅本专栏,非常感谢。 简介 有粉丝问,可不可以先设置默认颜色再创建对象?这个是可以的,下面是例子: 效果 代码 #include "me.hpp" using namespace std;

java/springboot服务第三方接口安全签名(Signature)实现方案

前言 有的时候&#xff0c;我们需要把我们系统里的接口开放给第三方应用或企业使用&#xff0c;那第三方的系统并不在我们自己的认证授权用户体系内&#xff0c;此时&#xff0c;要如何保证我们接口的数据安全和身份识别呢&#xff1f; 在为第三方系统提供接口的时候&#xf…

筑基新一代数据底座,中国科大让智慧科研更有数

著名科学哲学家库恩在《科学革命的结构》中认为&#xff0c;范式是科研的一种理论体系&#xff0c;范式的突破会带来一系列科学革命。 如今在科研领域&#xff0c; 人工智能不断打破科研边界&#xff0c;AI for Science被视为下一个科研新范式&#xff0c;不仅为科学研究带来了…

Cesium:为地图添加指北针、缩放按钮和比例尺

作者&#xff1a;CSDN _乐多_ 网上找的很多代码用不了。本文记录了Cesium中为地图添加指北针、缩放按钮和比例尺的可用代码。 文章目录 一、代码 一、代码 const viewer new Cesium.Viewer(cesiumContainer, {// ...navigationHelpButton: false,sceneModePicker: false,sc…

校验验证码是否过期(定时刷新验证码)

需求&#xff1a; 我们在登录的时候会遇到通过接口请求验证码的操作&#xff0c;这里的验证码会有过期的时间&#xff0c;当我们验证码过期了&#xff0c;我们要进行重新刷新验证码。 我们这里根据后端返回的当前时间和过期时间判断&#xff0c;过期的时间超过了当前时间的时候…

Java面向对象 下(六)

Java面向对象 ( 下) 观看b站尚硅谷视频做的笔记 文章目录 Java面向对象 ( 下)1、 关键字&#xff1a;static1.1、static 的使用1.1.1、static 修饰属性1.1.2、 static 修饰方法1.1.3、 static 修饰代码块1.1.4、 static 修饰内部类1.1.5、类变量 vs 实例变量内存解析 1.2、 自…